摘要:基于 AMQP 协议实现的消息中间件。默认端口是15672。
目录
[TOC]
RabbitMQ
RabbitMQ 是最受欢迎的开源消息中间件之一,在全球范围内被广泛应用。
- 是轻量级且易于部署的,能支持多种消息协议(模式)。可以部署在分布式系统中,以满足大规模、高可用的要求。
- RabbitMQ 使用的还是消息队列这种消息模型,不过引入了一个
exchange的概念。 - RabbitMQ 是基于 AMQP 协议实现的消息中间件。
AMQP
AMQP(Advanced Message Queuing Protocol 高级消息队列协议):用于在应用程序之间传递消息的网络协议,通常用于消息队列系统。定义了消息的格式、传递方法和消息队列的行为,允许不同应用程序之间进行异步通信。
- 在RabbitMQ中,AMQP是主要的通信协议,用于生产者将消息发送到队列,消费者从队列中接收消息,以及在消息代理(如RabbitMQ)中进行消息路由和处理。
- 提供统一消息服务的应用层标准(二进制应用层协议),面向消息的中间件设计,兼容 JMS。有跨平台、跨语言特性。
端口
- AMQP 默认端口是5672。
| 端口 | 作用 | |
|---|---|---|
| 4369 | epmd(代表 Erlang端口映射守护进程),erlang发现口 | |
| 5671 | AMQP 默认端口。非加密的,默认情况下消息在网络上传输时不会被加密。 | |
| 5672 | AMQPS(AMQP over SSL/TLS)协议默认端口。提供加密的通信。加密连接需要配置证书等安全设置。 | |
| 15671 | 管理监听端口 | |
| 15672 | 管理界面ui使用的端口 | |
| 25672 | ( Erlang distribution) server间内部通信口 |
Spring AMQP 依赖和配置
Spring AMQP 实现:
- 首先需要在
pom.xml中添加 Spring AMQP 的相关依赖;
1 | |
- 然后修改
application.yml,添加RabbitMQ的相关配置;
1 | |
消息(队列)模型

以5种消息模式中的路由模式为例。

| 标志 | 中文名 | 英文名 | 描述 |
|---|---|---|---|
| P | 生产者 | Producer | 消息的发送者,可以将消息发送到交换机 |
| X | 交换机 | Exchange | 接收生产者发送的消息,并根据路由键发送给指定队列 |
| Q | 队列 | Queue | 存储从交换机发来的消息 |
| C | 消费者 | Consumer | 消息的接收者,从队列中获取消息并进行消费 |
| type | 交换机类型 | type | 不同类型的交换机转发消息方式不同 |
| 3. fanout | 发布/订阅模式 | fanout | 广播消息给所有绑定交换机的队列 |
| 4. direct | 路由模式 | direct | 根据路由键发送消息 |
| 5. topic | 通配符模式 | topic | 根据路由键的匹配规则发送消息 |
如何实现路由?
- 消费者(指定
BindingKey)将交换器和队列绑定。 - 生产者先指定一个
RoutingKey路由键,然后将消息发送到交换机。 - 交换机(根据
exchange type按照策略)将消息发送到(相同BindingKey)对应的队列中去。
Broker
Broker(消息中间件的服务节点):可简单地看作一个 RabbitMQ 服务实例(服务器)。
- 包括
Exchange和Queue。
Queue
Queue(消息队列):用来保存消息直到发送给消费者。是消息的容器。
- RabbitMQ 中消息只能存储在 队列 中,这和 Kafka 这种消息中间件相反。
- Kafka 将消息存储在 topic 这个逻辑层面,而相对应的队列逻辑只是 topic 实际存储文件中的位移标识。
- 一个消息可投入一个或多个队列。
- 多个消费者可订阅同一个队列,队列中的消息会被平均分摊(Round-Robin,即轮询)给多个消费者进行处理。
而不是每个消费者都收到所有的消息并处理,避免消息被重复消费。 - RabbitMQ 不支持队列层面的广播消费。
消息队列的重要属性:
- 持久性:broker重启前都有效。
- 自动删除:在所有消费者停止使用之后自动删除。
- 惰性:没有主动声明队列,调用会导致异常。
- 排他性:一旦启用,声明它的消费者才能使用。
Exchange
Exchange(交换器):用来接收生产者发送的消息、(根据配置的策略)将这些消息路由到对应的(一个或多个)队列。如果路由不到,会返回给生产者、或直接丢弃 。
- 可将交换器理解成一个由绑定构成的路由表,绑定是多对多的关系。
4 种消息模式
这4种消息模式是构建基于RabbitMQ的消息应用的基础。
这些消息模式基于 Java API 的实现,也可以通过 Spring AMQP 来实现。
Exchange 有 4 种 Exchange Types,对应着不同的路由转发策略:
fanout(扇出)发布/订阅模式:该交换器收到的信息会被发送到所有与该交换器绑定的队列中。不需做任何判断,所以速度最快。常用来广播消息。direct直连路由模式(默认):把消息路由到那些Bindingkey与RoutingKey完全匹配的 Queue 中。- 常用在处理有优先级的任务,根据任务的优先级把消息发送到对应队列,这样可指派更多的资源去处理高优先级的队列。
- 例如,如果发送消息的路由键(RoutingKey)为 log 时,两个消息队列都会收到消息,如果路由键为 debug ,exchange 只会把消息发送到消息队列1中。
topic主题通配符模式:在 direct 基础之上(做了扩展),引入了模糊匹配机制。使用通配符。:不推荐,不依赖于路由键的匹配规则来路由消息,而是根据发送的消息内容中的 headers 属性进行匹配。性能很差,也不实用,基本上不用。headers
简单模式
简单模式:是最简单的消息模式。包含一个生产者、一个消费者和一个队列。生产者向队列里发送消息,消费者从队列中获取消息并消费。
模式示意图:

Spring AMQP 实现:
- 添加
简单模式相关Java配置,创建一个名为simple.hello的队列、一个生产者和一个消费者;
1 | |
- 生产者通过
send方法向队列simple.hello中发送消息;
1 | |
- 消费者从队列
simple.hello中获取消息;
1 | |
- 在controller中添加测试接口,调用该接口开始发送消息;
1 | |
- 运行后结果如下,可以发现生产者往队列中发送消息,消费者从队列中获取消息并消费。


工作队列模式
工作模式:指向多个互相竞争的消费者发送消息的模式。包含一个生产者、多个消费者和一个队列。
- 两个消费者同时绑定到一个队列上去,当消费者获取消息处理耗时任务时,空闲的消费者从队列中获取并消费消息。

Spring AMQP 实现:
- 添加工作模式相关Java配置,创建一个名为
work.hello的队列、一个生产者和两个消费者;
1 | |
- 生产者通过
send方法向队列work.hello中发送消息,消息中包含一定数量的.号;
1 | |
- 两个消费者从队列
work.hello中获取消息,名称分别为instance 1和instance 2,消息中包含.号越多,耗时越长;
1 | |
- 在controller中添加测试接口,调用该接口开始发送消息;
1 | |
- 运行后结果如下,可以发现生产者往队列中发送包含不同数量
.号的消息,instance 1和instance 2消费者互相竞争,分别消费了一部分消息。



fanout 发布/订阅模式
发布/订阅模式:指同时向多个消费者发送消息的模式(类似广播的形式)。包含一个生产者、多个消费者、多个队列和一个交换机。
- 两个消费者同时绑定到不同的队列上去,两个队列绑定到交换机上去,生产者通过发送消息到交换机,所有消费者接收并消费相同消息。

Spring AMQP实现:
- 添加
发布/订阅模式相关Java配置,创建一个名为exchange.fanout的交换机、一个生产者、两个消费者和两个匿名队列,将两个匿名队列都绑定到交换机;
1 | |
- 生产者通过
send方法向交换机exchange.fanout中发送消息,消息中包含一定数量的.号;
1 | |
- 消费者从绑定的匿名队列中获取消息,消息中包含
.号越多,耗时越长。由于该消费者可以从两个队列中获取并消费消息,可以看做两个消费者,名称分别为instance 1和instance 2;
1 | |
- 在controller中添加测试接口,调用该接口开始发送消息;
1 | |
- 运行后结果如下,可以发现生产者往队列中发送包含不同数量
.号的消息,instance 1和instance 2同时获取并消费了相同消息。



MQTT 协议实现即时通讯
有时候项目中会用到
即时通讯功能,比如电商系统中的客服聊天功能,还有在支付过程中,当用户支付成功后,第三方支付服务会回调我们的回调接口,此时需要通知前端支付成功。RabbitMQ可以很方便的实现
即时通讯功能,如果没有特殊的业务需求,甚至可以不写后端代码。
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅模式的轻量级通讯协议,该协议构建于TCP/IP协议上。
- 最大优点在于,可以以极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。

MQTT相关概念
- Publisher(发布者):消息的发出者,负责发送消息。
- Subscriber(订阅者):消息的订阅者,负责接收并处理消息。
- Broker(代理):消息代理,位于消息发布者和订阅者之间,各类支持MQTT协议的消息中间件都可以充当。
- Topic(主题):可以理解为消息队列中的路由,订阅者订阅了主题之后,就可以收到发送到该主题的消息。
- Payload(负载);可以理解为发送消息的内容。
- QoS(消息质量):全称
Quality of Service,即消息的发送质量,主要有QoS 0、QoS 1、QoS 2三个等级,下面分别介绍下:- QoS 0(Almost Once):至多一次,只发送一次,会发生消息丢失或重复;
- QoS 1(Atleast Once):至少一次,确保消息到达,但消息重复可能会发生;
- QoS 2(Exactly Once):只有一次,确保消息只到达一次。
direct 直连路由模式
路由模式:根据路由键选择性给多个消费者发送消息。
- 多个消费者、多个队列,每个队列通过
routing key全文匹配。 - 消费者绑定唯一队列,队列绑定到交换机上时需要指定路由key,仅消费指定路由key的消息。
- 生产者发送消息到交换机并且指定路由键,交换机通过
路由键转发到对应多个队列,队列绑定的消费者接收并消费消息。
应用场景:
- 在商品库存中增加了1台iphone12,iphone12促销活动消费者指定routing key为iphone12 promote,
- 只有此促销活动会接收到消息,其它促销活动不关心也不会消费此routing key的消息。


Spring AMQP实现:
- 添加
路由模式相关Java配置,创建一个名为exchange.direct的交换机、一个生产者、两个消费者和两个匿名队列, - 队列通过
路由键都绑定到交换机,队列1的路由键为orange和black,队列2的路由键为green和black;
1 | |
- 生产者通过
send方法向交换机exchange.direct中发送消息,发送时使用不同的路由键,根据路由键会被转发到不同的队列;
1 | |
- 消费者从自己绑定的匿名队列中获取消息,由于该消费者可以从两个队列中获取并消费消息,可以看做两个消费者,名称分别为
instance 1和instance 2;
1 | |
- 在controller中添加测试接口,调用该接口开始发送消息;
1 | |
- 运行后结果如下,可以发现生产者往队列中发送包含不同
路由键的消息,instance 1获取到了orange和black消息,instance 2获取到了green和black消息。



topic 主题通配符模式
通配符模式:根据路由键匹配规则选择性给多个消费者发送消息。包含一个生产者、两个消费者、两个队列和一个交换机。
-
两个消费者同时绑定到不同的队列上去,两个队列通过
路由键匹配规则绑定到交换机上去, -
生产者发送消息到交换机,交换机通过
路由键匹配规则转发到不同队列,队列绑定的消费者接收并消费消息。
应用场景:
iphone促销活动可以接收主题为多种iPhone的消息,如iphone12、iphone13等。

特殊匹配符号:
*:只能匹配一个单词;#:可以匹配零个或多个单词。
Spring AMQP实现:
- 添加
通配符模式相关Java配置,创建一个名为exchange.topic的交换机、一个生产者、两个消费者和两个匿名队列,匹配*.orange.*和*.*.rabbit发送到队列1,匹配lazy.#发送到队列2;
1 | |
- 生产者通过
send方法向交换机exchange.topic中发送消息,消息中包含不同的路由键;
1 | |
- 消费者从自己绑定的匿名队列中获取消息,由于该消费者可以从两个队列中获取并消费消息,可以看做两个消费者,名称分别为
instance 1和instance 2;
1 | |
- 在controller中添加测试接口,调用该接口开始发送消息;
1 | |
- 运行后结果如下,可以发现生产者往队列中发送包含不同
路由键的消息,instance 1和instance 2分别获取到了匹配的消息。



header 首部
忽略routing_key,使用Headers信息(一个Hash的数据结构)进行匹配,优势在于可以有更多更灵活的匹配规则。
根据应用程序消息的特定属性进行匹配。
RPC

远程过程调用:在远程计算机上运行功能并等待结果。
应用场景:
需要等待接口返回数据,如订单支付。
Publisher Confirms
发布者确认:与发布者进行可靠的发布确认,发布者确认是RabbitMQ扩展,可以实现可靠的发布。
在通道上启用发布者确认后,RabbitMQ将异步确认发送者发布的消息,这意味着它们已在服务器端处理。
应用场景:
对于消息可靠性要求较高,比如钱包扣款。
延迟消息
学习本文需要对RabbitMQ有所了解,还不了解的朋友可以看下:《花了3天总结的RabbitMQ实用技巧,有点东西!》
RabbitMQ实现延迟消息的方式有两种,实现原理:
-
死信队列:如果消息发送到该队列并超过了设置的时间,就会被转发到设置好的处理超时消息的队列当中去,利用该特性可以实现延迟消息。具体参考《mall整合RabbitMQ实现延迟消息》。
-
延迟插件:通过安装插件,自定义交换机,让交换机拥有延迟发送消息的能力,从而实现延迟消息。
由于死信队列方式需要创建两个交换机(死信队列交换机 + 处理队列交换机)、两个队列(死信队列+处理队列),而延迟插件方式只需创建一个交换机和一个队列,所以后者使用起来更简单。
死信队列
整合RabbitMQ实现延迟消息,以发送延迟消息取消超时订单为例。
业务场景说明
用于解决用户下单以后,订单超时如何取消订单的问题。
- 用户进行下单操作(会有锁定商品库存、使用优惠券、积分一系列的操作);
- 生成订单,获取订单的id;
- 获取到设置的订单超时时间(假设设置的为60分钟不支付取消订单);
- 按订单超时时间发送一个延迟消息给RabbitMQ,让它在订单超时后触发取消订单的操作;
- 如果用户没有支付,进行取消订单操作(释放锁定商品库存、返还优惠券、返回积分一系列操作)。
添加消息队列的枚举配置类
用于延迟消息队列及处理取消订单消息队列的常量定义,包括交换机名称、队列名称、路由键名称。
1 | |
添加RabbitMQ的配置类
用于配置交换机、队列、队列与交换机的绑定关系。
1 | |
管理页面
在RabbitMQ管理页面可以看到以下交换机和队列


交换机及队列说明
mall.order.direct(取消订单消息队列所绑定的交换机):绑定的队列为mall.order.cancel,一旦有消息以mall.order.cancel为路由键发过来,会发送到此队列。mall.order.direct.ttl(订单延迟消息队列所绑定的交换机):绑定的队列为mall.order.cancel.ttl,一旦有消息以mall.order.cancel.ttl为路由键发送过来,会转发到此队列,并在此队列保存一定时间,等到超时后会自动将消息发送到mall.order.cancel(取消订单消息消费队列)。
添加延迟消息的发送者
用于向订单延迟消息队列(mall.order.cancel.ttl)里发送消息。
1 | |
添加取消订单消息的接收者
用于从取消订单的消息队列(mall.order.cancel)里接收消息。
1 | |
添加OmsPortalOrderService接口
1 | |
添加OmsPortalOrderService的实现类
1 | |
添加 OmsPortalOrderController 定义接口
1 | |
进行接口测试
调用下单接口
- 注意:已经将延迟消息时间设置为30秒



项目源码地址
https://github.com/macrozheng/mall-learning/tree/master/mall-tiny-08
使用插件实现延迟消息
插件安装
首先需要下载并安装RabbitMQ的延迟插件。
-
去RabbitMQ的官网下载插件,插件地址:https://www.rabbitmq.com/community-plugins.html
-
直接搜索
rabbitmq_delayed_message_exchange即可找到要下载的插件,下载和RabbitMQ配套的版本,不要弄错; -
将插件文件复制到RabbitMQ安装目录的
plugins目录下; -
进入RabbitMQ安装目录的
sbin目录下,使用如下命令启用延迟插件;
1 | |
- 启用插件成功后就可以看到如下信息,之后重新启动RabbitMQ服务即可。

实现延迟消息
接下来需要在SpringBoot中实现延迟消息功能,这次依然沿用商品下单的场景。比如说有个用户下单了,60分钟不支付订单,订单就会被取消,这就是一个典型的延迟消息使用场景。
- 创建RabbitMQ的Java配置,主要用于配置交换机、队列和绑定关系;
1 | |
- 创建一个取消订单消息的发出者,通过给消息设置
x-delay头来设置消息从交换机发送到队列的延迟时间;
1 | |
- 创建一个取消订单消息的接收者,用于处理订单延迟插件队列中的消息。
1 | |
- 然后在订单业务实现类中添加如下逻辑,当下单成功之前,往消息队列中发送一个取消订单的延迟消息,这样如果订单没有被支付的话,就能取消订单了;
1 | |
-
启动项目后,在Swagger中调用下单接口;
-
调用完成后查看控制台日志可以发现,从消息发送和消息接收处理正好相差了
30s,即设置的延迟时间。
1 | |
Web管理界面
rabbitmq_management