介绍
在复杂系统的架构中,会有负责处理消息队列的微服务,如:服务A负责产生消息给消息队列,而服务B则负责消费消息队列中的任务。
开发中消息队列通常有如下使用场景:
- 任务异步处理
将不需要同步处理的并且耗时长的操作由消息队列通知消息接收方进行异步处理。提高应用程序的响应时间 - 应用程序解耦合
MQ 相当于一个中介,生产方通过 MQ 与消费方交互,将应用程序进行解耦
其工作原理如下图:
说明:
- Producer:消息生产者,将生产的消息发送到 MQ
- Broker:消息队列服务进程,包括 Exchange 和 Queue
- Exchange:消息队列交换机,将消息按一定的规则转发到某个队列,对消息进行过滤
- Queue:消息队列,存储消息的队列,消息到达队列将消息转发给指定的消费者
- Consumer: 消息消费者,接收 MQ 转发过来的消息
Exchange
有 4 种不同的交换机:
- 直连交换机 Direct Exchange
- 扇形交换机 Fanout Exchange
- 主题交换机 Topic Exchange
- 首部交换机 Headers Exchange
扇形交换机
扇形交换机是最基础的交换机类型,把能接收到的消息全部发给绑定在自己身上的队列,即广播消息,因为不需要思考,所以扇形交换机处理消息的速度也是最快的。
直连交换机
直连交换机是一种带路由功能的交换机,队列和交换机绑定时,会需要一个 routing_key ,当消息被发送时,需要指定一个 bing_key ,当消息被送达交换机的时候,就会被送到指定的队列。
主题交换机
直连交换机的 routing_key 方案很简单,如果我们希望一条消息发送给多个队列,就需要绑定多个 routing_key ,假设每个交换机上都要绑定多个 routing_key ,这样管理起来会很繁琐,所以 RabbitMQ 提供了主题交换机。
即发送到主题交换机上的消息需要携带指定规则的 routing_key ,主题交换机会根据这个规则将消息发送到对应的一个或多个队列。
交换机和队列的 routing_key 需要采用 .#.…… 的格式,每个部分用 . 隔开
- ‘*’表示一个单词
- ‘#’表示任意数量(零个或多个)单词
首部交换机
首部交换机是通过 Headers 信息来交换的。有点像 HTTP 的 Headers。首部交换机应用相对少,这里就不过多解释了。
下载和启动
RabbitMQ 下载镜像和 MySQL 类似,也是去 【 Docker 】里下载
下载
1 | docker pull rabbitmq:3.9.18-management-alpine |
这里下载,我们需要下载有 UI 页面的,即版本号带 ‘management’ 的,因为 RabbitMQ 没有客户端。
启动
1 | docker run -d --restart=always --name rabbitmq -p 5672:5672 -p 15672:15672 -v /Users/gladysdrea/Applications/Docker/RabbitMQ/data:/var/lib/rabbitmq/ -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin rabbitmq:3.9.18-management-alpine |
说明:
- 这里有 2个 -p ,一个是服务端的端口(5672),一个是客户端页面的访问端口(15672)。
- 2个 -e ,也是同理
启动之后,我们可以打开客户端访问地址( http://localhost:15672/#/ ),登录(密码就是启动命令里设置的,这里我们是 admin ),就会看到如下页面
在 MidWay 中使用 RabbitMQ
生产者
首先安装需要的依赖
1 | npm i amqplib amqp-connection-manager --save |
接下来创建连接,在 service 目录下,新建一个 rabbitmq.ts 的文件
1 | // src/service/rabbitmq.ts |
我们创建了一个用来封装消息通信的 service ,是全局唯一的 Singleton 单例。因为增加了 @Autoload() 装饰器,所以可以自执行初始化。
这样基础的调用服务就抽象好了,我们只需要在使用的地方,调用 sendToQueue 方法就可以了。如下:
我们在用户注册的时候,发送一个消息队列
1 | // src/service/user.service.ts |
我们可以打开上边说的客户端页面,找到 Queue ,看是否有刚刚发送的队列。
消费者
消费者需要的依赖如下:
1 | npm i @midwayjs/rabbitmq@3 --save |
安装好之后,我们需要开启组件,在 configuration.ts 中进行如下配置:
1 | // src/configuration.ts |
组件开始之后,我们还需要在 config 中配置 RabbitMQ 地址
1 | // src/config/config.default.ts |
一般消费者,我们习惯写在 consumer 中,比如:src/consumer/user.consumer.ts
1 | // src/consumer/user.consumer.ts |
@RabbitMQListener() 装饰器第二个参数
1 | export interface RabbitMQListenerOptions { |
到此 RabbitMQ 就介绍完了。