摘要:阿里开源的一个队列模型的消息中间件,有高性能、高吞吐量、高可靠、高实时、分布式的特点。为了解决 Kafka 的缺点:在低延迟和高可靠性方面的表现不能满足要求。
目录
[TOC]
RocketMQ
阿里开源的一个队列模型的消息中间件,有高性能、高吞吐量、高可靠、高实时、分布式的特点。
- 包括了数据存储、高可用(主从同步)、RPC 通信、注册中心、配置中心等方面的知识与代码实现。
- NameServer 的端口是 9876,Broker 的端口是 10911,自动主从切换 Controller 端口是 9878。dashboard 控制台端口8082。
- 是阿里巴巴在 2012 年开源的分布式消息中间件,目前已经捐赠给 Apache 软件基金会,Apache 的顶级项目。作为经历过多次阿里巴巴双十一这种“超级工程”的洗礼,并有稳定出色表现的国产中间件,以其高性能、低延时和高可靠等特性,近年来已经也被越来越多的国内企业使用。
- 是一款开源的分布式消息系统,基于高可用分布式集群技术,提供低延时的、高可靠的消息发布与订阅服务。
- 常用用法有:基本消息(消息发送、消费),顺序消息,延时消息,批量消息,过滤消息,消息事务,Logappender 日志,OpenMessaging。
- Apache 官方文档
背景
淘宝中间件团队在对 Kafka 做过充分 Review 之后, Kafka 无限消息堆积,高效的持久化速度吸引了我们。
-
Kafka 缺点:在低延迟和高可靠性方面的表现不能满足要求。
-
但是,同时发现这个消息系统主要定位于日志传输,对于使用在淘宝交易、订单、充值等场景下还有诸多特性不满足,为此我们重新用 Java 语言编写了 RocketMQ ,定位于非日志的可靠消息传输(日志场景也 OK)。
目前 RocketMQ 在阿里集团被广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理, binglog 分发等场景。
开发者指南
这个开发者指南旨在帮助您快速了解并使用 Apache RocketMQ
1. 概念和特性
-
概念(Concept):介绍RocketMQ的基本概念模型。
-
特性(Features):介绍RocketMQ实现的功能特性。
2. 架构设计
-
架构(Architecture):介绍RocketMQ部署架构和技术架构。
-
设计原理(Design):介绍RocketMQ关键机制的设计原理,主要包括消息存储、通信机制、消息过滤、负载均衡、事务消息等。
3. 样例
- 样例(Example) :介绍RocketMQ的常见用法,包括基本样例、顺序消息样例、延时消息样例、批量消息样例、过滤消息样例、事务消息样例等。
4. 最佳实践
- 最佳实践(Best Practice):介绍RocketMQ的最佳实践,包括生产者、消费者、Broker以及NameServer的最佳实践,客户端的配置方式以及JVM和linux的最佳参数配置。
- 消息轨迹指南(Message Trace):介绍RocketMQ消息轨迹的使用方法。
- 权限管理(Auth Management):介绍如何快速部署和使用支持权限控制特性的RocketMQ集群。
- 自动主从切换快速开始:RocketMQ 5.0 自动主从切换快速开始。
- 自动主从切换部署升级指南:RocketMQ 5.0 自动主从切换部署升级指南。
- Proxy 部署指南:介绍如何部署Proxy (包括
Local模式和Cluster模式).
5. 运维管理
- 集群部署(Operation):介绍单Master模式、多Master模式、多Master多slave模式等RocketMQ集群各种形式的部署方法以及运维工具mqadmin的使用方式。
集群搭建
在 conf 目录下,RocketMQ 提供了多种 Broker 的配置文件:
broker.conf:单主,异步刷盘。2m/:双主,异步刷盘。2m-2s-async/:两主两从,异步复制,异步刷盘。2m-2s-sync/:两主两从,同步复制,异步刷盘。dledger/:Dledger 集群,至少三节点。
6. RocketMQ 5.0 新特性
功能特性
样例代码
见 RocketMQ 样例文档。
快速入门
定时消息
消费重试
消费异常处理机制
广播消费
顺序消息
消息过滤
事务消息
监控端点
更多的配置项信息
接入阿里云的消息队列 RocketMQ
总结
- RocketMQ 提供的高级消息类型:
- 定时/延时消息:消息被发送至服务端后,在指定时间后才能被消费。通过设置定时时间,可以实现分布式场景的延时调度触发效果。
- 顺序消息:支持消费者按照发送消息的先后顺序获取消息,从而实现业务场景中的顺序处理。
- 事务消息:支持在分布式场景下保障消息生产和本地事务的最终一致性。
- 消息过滤:消费者可以通过订阅指定消息标签对消息进行过滤,确保最终只接收被过滤后的消息合集。过滤规则的计算和匹配在服务端完成。
- 消息标签(
MessageTag):细粒度消息分类属性,可以在主题层级之下做消息类型的细分。
- 消息标签(
- 消费者负载均衡
- 消费进度管理:
- 消息位点(
MessageQueueOffset):消息是按到达服务端的先后顺序存储在指定主题的多个队列中,每条消息在队列中都有一个唯一的Long类型坐标,即消息位点。 - 消费位点(
ConsumerOffset):一条消息被某个消费者消费完成后不会立即从队列中删除,RocketMQ 会基于每个消费者分组记录消费过的最新一条消息的位点,即消费位点。 - 重置消费位点:以时间轴为坐标,在消息持久化存储的时间范围内,重新设置消费者分组对已订阅主题的消费进度,设置完成后消费者将接收设定时间点之后,由生产者发送到服务端的消息。
- 消息位点(
- 消费重试
- 消息存储和清理机制
- 消息堆积:生产者已经将消息发送到服务端,但由于消费者的消费能力有限,未能在短时间内将所有消息正确消费掉,此时在服务端保存着未被消费的消息,该状态即消息堆积。
- RocketMQ 5.0 开始支持自动主从切换的模式。
高级消息类型
2 顺序消息
消息有序:指的是一类消息消费时,能按照发送的顺序来消费。
- 例如:一个订单产生了三条消息分别是订单创建、付款、完成。消费时要按照这个顺序消费才能有意义,但是同时订单之间是可以并行消费的。
- RocketMQ可以严格的保证消息有序。
顺序消息分为全局顺序消息与分区顺序消息,全局顺序是指某个Topic下的所有消息都要保证顺序;部分顺序消息只要保证每一组消息被顺序消费即可。
- 全局顺序:对于指定的一个 Topic,所有消息按照严格的先入先出(FIFO)的顺序进行发布和消费。
- 适用场景:性能要求不高,所有的消息严格按照 FIFO 原则进行消息发布和消费的场景。
- 分区顺序:对于指定的一个 Topic,所有消息根据 sharding key 进行区块分区。 同一个分区内的消息按照严格的 FIFO 顺序进行发布和消费。
- Sharding key 是顺序消息中用来区分不同分区的关键字段,和普通消息的 Key 是完全不同的概念。
- 适用场景:性能要求高,以 sharding key 作为分区字段,在同一个区块中严格的按照 FIFO 原则进行消息发布和消费的场景。
7 事务消息
事务消息(Transactional Message):指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败。
- RocketMQ的事务消息提供类似 X/Open XA 的分布事务功能,通过事务消息能达到分布式事务的最终一致。
8 定时消息
定时消息(延迟队列):指消息发送到broker后,不会立即被消费,等待特定时间投递给真正的topic。
- broker有配置项messageDelayLevel,默认值为“
1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”,18个level。可以配置自定义messageDelayLevel。 - 注意,messageDelayLevel是broker的属性,不属于某个topic。
- 发消息时,设置delayLevel等级即可:msg.setDelayLevel(level)。
level有以下三种情况:
- level == 0,消息为非延迟消息
- 1<=level<=maxLevel,消息延迟特定时间,例如level==1,延迟1s
- level > maxLevel,则level== maxLevel,例如level==20,延迟2h
定时消息会暂存在名为SCHEDULE_TOPIC_XXXX的topic中,并根据delayTimeLevel存入特定的queue,queueId = delayTimeLevel – 1,即一个queue只存相同延迟的消息,保证具有相同发送延迟的消息能够顺序消费。broker会调度地消费SCHEDULE_TOPIC_XXXX,将消息写入真实的topic。
需要注意的是,定时消息会在第一次写入和调度写入真实topic时都会计数,因此发送数量、tps都会变高。
1 订阅与发布
消息的发布是指某个生产者向某个topic发送消息;消息的订阅是指某个消费者关注了某个topic中带有某些tag的消息,进而从该topic消费数据。
3 消息过滤
RocketMQ的消费者可以根据Tag进行消息过滤,也支持自定义属性过滤。消息过滤目前是在Broker端实现的.
- 优点:减少了对于Consumer无用消息的网络传输,
- 缺点:增加了Broker的负担、而且实现相对复杂。
4 消息可靠性
RocketMQ支持消息的高可靠,影响消息可靠性的几种情况:
- Broker非正常关闭
- Broker异常Crash
- OS Crash
- 机器掉电,但是能立即恢复供电情况
- 机器无法开机(可能是cpu、主板、内存等关键设备损坏)
- 磁盘设备损坏
1)、2)、3)、4) 四种情况都属于硬件资源可立即恢复情况,RocketMQ在这四种情况下能保证消息不丢,或者丢失少量数据(依赖刷盘方式是同步还是异步)。
5)、6)属于单点故障,且无法恢复,一旦发生,在此单点上的消息全部丢失。RocketMQ在这两种情况下,通过异步复制,可保证99%的消息不丢,但是仍然会有极少量的消息可能丢失。
- 通过同步双写技术可以完全避免单点,同步双写势必会影响性能,适合对消息可靠性要求极高的场合,例如与Money相关的应用。
- 注:RocketMQ从3.0版本开始支持同步双写。
5 至少一次
至少一次(At least Once)指每个消息必须投递一次。Consumer先Pull消息到本地,消费完成后,才向服务器返回ack,如果没有消费一定不会ack消息,所以RocketMQ可以很好的支持此特性。
消费
广播消费
集群消费
消费异常处理机制
重试机制
6 回溯消费
回溯消费:指Consumer已经消费成功的消息,由于业务上需求需要重新消费,要支持此功能,Broker在向Consumer投递成功消息后,消息仍然需要保留。
- 并且重新消费一般是按照时间维度,例如由于Consumer系统故障,恢复后需要重新消费1小时前的数据,那么Broker要提供一种机制,可以按照时间维度来回退消费进度。
- RocketMQ支持按照时间回溯消费,时间维度精确到毫秒。
9 消息重试
Consumer消费消息失败后,要提供一种重试机制,令消息再消费一次。Consumer消费消息失败通常可以认为有以下几种情况:
- 由于消息本身的原因,例如反序列化失败,消息数据本身无法处理(例如话费充值,当前消息的手机号被注销,无法充值)等。
- 这种错误通常需要跳过这条消息,再消费其它消息,而这条失败的消息即使立刻重试消费,99%也不成功,所以最好提供一种定时重试机制,即过10秒后再重试。
- 由于依赖的下游应用服务不可用,例如db连接不可用,外系统网络不可达等。
- 遇到这种错误,即使跳过当前失败的消息,消费其他消息同样也会报错。这种情况建议应用sleep 30s,再消费下一条消息,这样可以减轻Broker重试消息的压力。
RocketMQ会为每个消费组都设置一个Topic名称为“%RETRY%+consumerGroup”的重试队列(这里需要注意的是,这个Topic的重试队列是针对消费组,而不是针对每个Topic设置的),用于暂时保存因为各种异常而导致Consumer端无法消费的消息。
- 考虑到异常恢复起来需要一些时间,会为重试队列设置多个重试级别,每个重试级别都有与之对应的重新投递延时,重试次数越多投递延时就越大。
- RocketMQ对于重试消息的处理是先保存至Topic名称为“SCHEDULE_TOPIC_XXXX”的延迟队列中,后台定时任务按照对应的时间进行Delay后重新保存至“%RETRY%+consumerGroup”的重试队列中。
10 消息重投
生产者在发送消息时,同步消息失败会重投,异步消息有重试,oneway没有任何保证。消息重投保证消息尽可能发送成功、不丢失,但可能会造成消息重复,消息重复在RocketMQ中是无法避免的问题。
- 消息重复在一般情况下不会发生,当出现消息量大、网络抖动,消息重复就会是大概率事件。另外,生产者主动重发、consumer负载变化也会导致重复消息。
如下方法可以设置消息重试策略:
retryTimesWhenSendFailed:同步发送失败重投次数,默认为2,因此生产者会最多尝试发送retryTimesWhenSendFailed + 1次。- 不会选择上次失败的broker,尝试向其他broker发送,最大程度保证消息不丢。
- 超过重投次数,抛出异常,由客户端保证消息不丢。
- 当出现RemotingException、MQClientException和部分MQBrokerException时会重投。
retryTimesWhenSendAsyncFailed:异步发送失败重试次数,异步重试不会选择其他broker,仅在同一个broker上做重试,不保证消息不丢。retryAnotherBrokerWhenNotStoreOK:消息刷盘(主或备)超时或slave不可用(返回状态非SEND_OK),是否尝试发送到其他broker,默认false。十分重要消息可以开启。
11 流量控制
生产者流控,因为broker处理能力达到瓶颈;消费者流控,因为消费能力达到瓶颈。
生产者流控:
- commitLog文件被锁时间超过osPageCacheBusyTimeOutMills时,参数默认为1000ms,返回流控。
- 如果开启transientStorePoolEnable == true,且broker为异步刷盘的主机,且transientStorePool中资源不足,拒绝当前send请求,返回流控。
- broker每隔10ms检查send请求队列头部请求的等待时间,如果超过waitTimeMillsInSendQueue,默认200ms,拒绝当前send请求,返回流控。
- broker通过拒绝send 请求方式实现流量控制。
注意,生产者流控,不会尝试消息重投。
消费者流控:
- 消费者本地缓存消息数超过pullThresholdForQueue时,默认1000。
- 消费者本地缓存消息大小超过pullThresholdSizeForQueue时,默认100MB。
- 消费者本地缓存消息跨度超过consumeConcurrentlyMaxSpan时,默认2000。
消费者流控的结果是降低拉取频率。
12 死信队列
死信队列:用于处理无法被正常消费的消息。
- 当一条消息初次消费失败,消息队列会自动进行消息重试;
- 达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。
RocketMQ将这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),将存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。
- 在RocketMQ中,可以通过使用console控制台对死信队列中的消息进行重发来使得消费者实例再次进行消费。
监控端点
更多的配置项信息
基本概念
-
消息模型(
Message Model):RocketMQ主要由 Producer、Broker、Consumer 三部分组成,其中Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息。- Broker 在实际部署过程中对应一台服务器,每个 Broker 可以存储多个Topic的消息,每个Topic的消息也可以分片存储于不同的 Broker。
- Message Queue 用于存储消息的物理地址,每个Topic中的消息地址存储于多个 Message Queue 中。ConsumerGroup 由多个Consumer 实例构成。
-
消息生产者(
Producer):负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到broker服务器。-
RocketMQ提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要Broker返回确认信息,单向发送不需要。
-
生产者组(
Producer Group):同一类Producer的集合,这类Producer发送同一类消息且发送逻辑一致。如果发送的是事务消息且原始生产者在发送之后崩溃,则Broker服务器会联系同一生产者组的其他生产者实例以提交或回溯消费。
-
-
消息消费者(
Consumer):负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从Broker服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:- 拉取式消费(
Pull Consumer):应用通常主动调用Consumer的拉消息方法从Broker服务器拉消息、主动权由应用控制。一旦获取了批量消息,应用就会启动消费过程。 - 推动式消费(
Push Consumer):应用不需要主动调用Consumer的拉消息方法,在底层已经封装了拉取的调用逻辑,在用户层面看来是broker把消息推送过来的,其实底层还是consumer去broker主动拉取消息。
- 拉取式消费(
-
代理服务器(
Broker Server):消息中转角色,负责存储消息、转发消息。负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。- 消费者组(
Consumer Group):同一类Consumer的集合,这类Consumer通常消费同一类消息且消费逻辑一致。消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易。要注意的是,消费者组的消费者实例必须订阅完全相同的Topic。RocketMQ 支持两种消息模式:- 集群消费(
Clustering):集群消费模式下,相同Consumer Group的每个Consumer实例平均分摊消息。同一条消息只会被相同消费者分组的一个消费者所消费。 - 广播消费(
Broadcasting):广播消费模式下,相同Consumer Group的每个Consumer实例都接收全量的消息。同一条消息会被相同消费者分组的所有消费者所消费。
- 集群消费(
- 主题(
Topic):表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行消息订阅的基本单位。 - 消息(
Message):消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题。RocketMQ中每个消息拥有唯一的Message ID,且可以携带具有业务标识的Key。系统提供了通过Message ID和Key查询消息的功能。- 标签(
Tag):为消息设置的标志,用于同一主题下区分不同类型的消息。来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。标签能够有效地保持代码的清晰度和连贯性,并优化RocketMQ提供的查询系统。消费者可以根据Tag实现对不同子主题的不同消费逻辑,实现更好的扩展性。 - 普通顺序消息(
Normal Ordered Message):普通顺序消费模式下,消费者通过同一个消息队列( Topic 分区,称作 Message Queue) 收到的消息是有顺序的,不同消息队列收到的消息则可能是无顺序的。 - 严格顺序消息(
Strictly Ordered Message):严格顺序消息模式下,消费者收到的所有消息均是有顺序的。
- 标签(
- 消费者组(
-
名字服务(
Name Server):名字服务充当路由消息的提供者。生产者或消费者能够通过名字服务查找各主题相应的Broker IP列表。多个Namesrv实例组成集群,但相互独立,没有信息交换。
技术架构


1 技术架构四大角色


RocketMQ架构上主要分为四部分,如上图所示:
Producer
Producer:消息发布的角色,支持分布式集群方式部署。Producer通过MQ的负载均衡模块选择相应的Broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟。
Consumer
Consumer:消息消费的角色,支持分布式集群方式部署。支持以push推,pull拉两种模式对消息进行消费。同时也支持集群方式和广播方式的消费,它提供实时消息订阅机制,可以满足大多数用户的需求。
NameServer
NameServer 名称服务器(注册中心):是一个非常简单的Topic路由注册中心,其角色类似Dubbo中的zookeeper,支持Broker的动态注册与发现。主要提供两个功能:
- Broker 管理:接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活;
- 路由信息管理 :每个NameServer将保存关于Broker集群的整个路由信息和用于客户端查询的队列信息。然后Producer和Consumer通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费。
- NameServer通常也是集群的方式部署,各实例间相互不进行信息通讯。Broker是向每一台NameServer注册自己的路由信息,所以每一个NameServer实例上面都保存一份完整的路由信息。
- 当某个NameServer因某种原因下线了,Broker仍然可以向其它NameServer同步其路由信息,Producer和Consumer仍然可以动态感知Broker的路由的信息。
Broker
Broker 消息(队列)服务器:主要负责消息的存储、投递和查询及服务高可用保证(用多个 Broker 来保证负载均衡)。
- 即,生产者生产消息到
Broker,消费者从Broker拉取消息并消费。 - 一个
Topic分布在多个Broker上,一个Broker可配置多个Topic,是多对多的关系。 Broker会将路由表注册到NameServer中,消费者和生产者就从中获取路由表、然后按照路由表的信息和对应的Broker进行通信。
一般来说,消息队列中间件都有一个 Broker Server(代理服务器),消息中转角色,负责存储消息、转发消息。
- 例如说在 RocketMQ 中,Broker 负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。另外,Broker 也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。
为了实现这些功能,Broker包含了以下几个重要子模块。
Remoting Module:整个Broker的实体,负责处理来自Client端的请求。Client Manager:负责管理客户端(Producer/Consumer)和维护Consumer的Topic订阅信息。Store Service:提供方便简单的API接口处理消息存储到物理硬盘和查询功能。HA Service:高可用服务,提供Master Broker 和 Slave Broker之间的数据同步功能。Index Service:根据特定的Message key对投递到Broker的消息进行索引服务,以提供消息的快速查询。

存储机制
在
Topic中的队列是以什么样的形式存在?
2 网络部署架构
NameServer是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。Broker部署相对复杂:- Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave 的对应关系通过指定相同的BrokerName,不同的BrokerId 来定义,BrokerId为0表示Master,非0表示Slave。
- Master也可以部署多个。每个Broker与
NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有NameServer。 - 注意:当前RocketMQ版本在部署架构上支持一Master多Slave,但只有BrokerId=1的从服务器才会参与消息的读负载。
Producer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic 服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。Consumer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。- Consumer既可以从Master订阅消息,也可以从Slave订阅消息,消费者在向Master拉取消息时,Master服务器会根据拉取偏移量与最大偏移量的距离(判断是否读老消息,产生读I/O),以及从服务器是否可读等因素建议下一次是从Master还是Slave拉取。

集群工作流程
结合部署架构图,描述集群工作流程:
- 启动 Namesrv:起来后监听端口,等待 Broker、Producer、Consumer 连上来,相当于一个路由控制中心。
- Broker 启动,跟所有的 Namesrv 保持长连接,定时发送心跳包。
- 心跳包中,包含当前 Broker 信息(IP+端口等)以及存储所有 Topic 信息。
- 注册成功后,Namesrv 集群中就有 Topic 跟 Broker 的映射关系。
- 收发消息前,先创建 Topic 。创建 Topic 时,需要指定该 Topic 要存储在哪些 Broker上。也可以在发送消息时自动创建Topic。
- Producer 启动并发送消息:
- 启动时,先跟 Namesrv 集群中的其中一台建立长连接,并从Namesrv 中获取当前发送的 Topic 存在哪些 Broker 上。
- 然后跟对应的 Broker 建立长连接,直接向 Broker 发消息。
- Consumer 启动并消费消息:
- Consumer 跟 Producer 类似。跟其中一台 Namesrv 建立长连接,获取当前订阅 Topic 存在哪些 Broker 上。
- 然后直接跟 Broker 建立连接通道,开始消费消息。

实现高可用的原理
1. Producer
- Producer 自身在应用中,所以无需考虑高可用。
- Producer 配置多个 Namesrv 列表,从而保证 Producer 和 Namesrv 的连接高可用。并且,会从 Namesrv 定时拉取最新的 Topic 信息。
- Producer 会和所有 Consumer 直连,在发送消息时,会选择一个 Broker 进行发送。如果发送失败,则会使用另外一个 Broker 。
- Producer 会定时向 Broker 心跳,证明其存活。而 Broker 会定时检测,判断是否有 Producer 异常下线。
2. Consumer
- Consumer 需要部署多个节点,以保证 Consumer 自身的高可用。当相同消费者分组中有新的 Consumer 上线,或者老的 Consumer 下线,会重新分配 Topic 的 Queue 到目前消费分组的 Consumer 们。
- Consumer 配置多个 Namesrv 列表,从而保证 Consumer 和 Namesrv 的连接高可用。并且,会从 Consumer 定时拉取最新的 Topic 信息。
- Consumer 会和所有 Broker 直连,消费相应分配到的 Queue 的消息。如果消费失败,则会发回消息到 Broker 中。
- Consumer 会定时向 Broker 心跳,证明其存活。而 Broker 会定时检测,判断是否有 Consumer 异常下线。
3. Namesrv
- Namesrv 需要部署多个节点,以保证 Namesrv 的高可用。
- Namesrv 本身是无状态,不产生数据的存储,是通过 Broker 心跳将 Topic 信息同步到 Namesrv 中。
- 多个 Namesrv 之间不会有数据的同步,是通过 Broker 向多个 Namesrv 多写。
4. Broker
- 多个 Broker 可以形成一个 Broker 分组。每个 Broker 分组存在一个 Master 和多个 Slave 节点。
- Master 节点,可提供读和写功能。Slave 节点,可提供读功能。
- Master 节点会不断发送新的 CommitLog 给 Slave节点。Slave 节点不断上报本地的 CommitLog 已经同步到的位置给 Master 节点。
- Slave 节点会从 Master 节点拉取消费进度、Topic 配置等等。
- 多个 Broker 分组,形成 Broker 集群。
- Broker 集群和集群之间,不存在通信与数据同步。
- Broker 可以配置同步刷盘或异步刷盘,根据消息的持久化的可靠性来配置。

消息模型
RocketMQ 中的消息模型和 Kafaka 类似,把 Kafaka 中的分区换成了队列。不过虽然消息模型类似,但是实现方式还是有很大差别。
消息
消息(Message):是最小数据传输单元。生产者将业务数据的负载和拓展属性包装成消息发送到服务端,服务端按照相关语义将消息投递到消费端进行消费。
- 消息类型(
MessageType):按照消息传输特性的不同而定义的分类,用于类型管理和安全校验。 支持的消息类型有普通消息、顺序消息、事务消息和定时/延时消息。 - 从5.0版本开始,支持强制校验消息类型,即每个主题Topic只允许发送一种消息类型的消息,这样可以更好的运维和管理生产系统,避免混乱。但同时保证向下兼容4.x版本行为,强制校验功能默认开启。
主题模式 / 发布订阅模式
RocketMQ 通过在一个 Topic 中配置多个队列、且每个队列维护每个消费者组的消费位置,实现了主题模式/发布订阅模式 。
-
Producer(生产者): 将业务消息按照要求封装成消息并发送至服务端。代表某一类的生产者,如多个秒杀系统合在一起,一般生产相同的消息。生产消息后指定向主题中的某个队列发送消息。 Topic(主题): 消息传输和存储的顶层容器,用于标识同一类业务逻辑的消息,如订单消息、物流消息等。一个主题中需维护多个队列,提高并发能力。主题通过TopicName来做唯一标识和区分。- 消息队列(
MessageQueue):是消息存储和传输的实际容器,也是消息的最小存储单元。 主题都是由多个队列组成,以此实现队列数量的水平拆分和队列内部的流式存储。队列通过QueueId来做唯一标识和区分。
- 消息队列(
-
Con'sumer Group(消费者组): 承载多个消费行为一致的消费者的负载均衡分组。代表某一类的消费者,如多个短信系统合在一起,一般消费相同的消息。每个消费组在每个队列上维护一个消费位移 。-
和消费者不同,消费者分组并不是运行实体,而是一个逻辑资源。通过消费者分组内初始化多个消费者实现消费性能的水平扩展以及高可用容灾。
- 消费者(
Consumer):用来接收并处理消息的运行实体。通从服务端获取消息,并将消息转化成业务可理解的信息,供业务逻辑处理。 - 订阅关系:是消费者获取消息、处理消息的规则和状态配置。由消费者分组动态注册到服务端系统,并在后续的消息传输中按照订阅关系定义的过滤规则,进行消息匹配和消费进度维护。
-


其它基本概念
Apache RocketMQ 中的基本概念:
- 消息视图(
MessageView):面向开发视角提供的一种消息只读接口。通过消息视图可以读取消息内部的多个属性和负载信息,但是不能对消息本身做任何修改。 - 消息轨迹:在一条消息从生产者发出到消费者接收并处理过程中,由各个相关节点的时间、地点等数据汇聚而成的完整链路信息。通过消息轨迹,您能清晰定位消息从生产者发出,经由服务端,投递给消费者的完整链路,方便定位排查问题。
- 消息索引(
MessageKey):面向消息的索引属性。通过设置的消息索引可以快速查找到对应的消息内容。 - 事务检查器(
TransactionChecker):生产者用来执行本地事务检查和异常事务恢复的监听器。事务检查器应该通过业务侧数据的状态来检查和判断事务消息的状态。 - 事务状态(
TransactionResolution):事务消息发送过程中,事务提交的状态标识,服务端通过事务状态控制事务消息是否应该提交和投递。事务状态包括事务提交、事务回滚和事务未决。 - 消费结果(
ConsumeResult):PushConsumer消费监听器处理消息完成后返回的处理结果,用来标识本次消息是否正确处理。消费结果包含消费成功和消费失败。
消息消费问题
- 消息丢失:
- 消息堆积:
- 重复消费:
- 保证顺序消费:
- 保证数据一致性:分布式事务消息 + 事务反查机制
- 保证高可用:
消息丢失
- 生产者:事务消息
- Broker:刷盘机制
- 消费者:
刷盘机制
队列中的消息是如何进行存储持久化?
在单个结点层面:
- 同步刷盘:生产者消息发过来时,只有持久化到磁盘,Broker 存储端(MQ)才返回一个成功的
ACK。- 能保证消息不丢失,但是影响了性能。
- 适用于金融等特定业务场景。
异步刷盘:采用后台异步线程提交的方式执行刷盘操作。只要消息写入PageCache 缓存,就返回一个成功的ACK响应。- 提高了MQ的性能,但是如果这时候机器断电了,就会丢失消息。
- 适用于(如发验证码等)对于消息保证要求不太高的业务场景。
集群主从模式下:Borker 一般是集群部署的,根据主节点返回消息给客户端时是否需要同步从节点分类:
- 同步复制(同步双写):只有主节点和从节点都写入成功,才反馈成功的ack给生产者。 保证了消息不丢失,但是降低了系统的吞吐量。
- 异步复制: 只要消息写入主节点成功,就返回成功的ack。速度快,但是会有性能问题。
消息堆积
重复消费
保证顺序消费
保证数据一致性
分布式事务消息 + 事务反查机制
分布式事务消息
事务消息是的实现:
- 生产者产生消息,发送一条半事务消息到MQ服务器;
- MQ收到消息后,将消息持久化到存储系统,【这条消息的状态是待发送状态】;
- MQ服务器返回ACK确认给生产者,此时MQ不会触发消息推送事件;
- 事务:生产者执行业务操作、本地事务;
- 事务:如果本地事务执行成功,即
commit执行结果到MQ服务器;如果执行失败,发送rollback; - 如果是正常的
commit,【MQ服务器更新消息状态为可发送】;如果是rollback,即删除消息; - 如果消息状态更新为可发送,则MQ服务器会
push消息给消费者。 - 消费者消费完返回 ACK 给 MQ;
- 事务:如果MQ服务器长时间没有收到生产者的
commit或者rollback,它会反查生产者,然后根据查询到的结果执行最终状态。
保证高可用
安装
启动
1 | |
问题
Please set the ROCKETMQ_HOME variable in your environment!- 解决:修改runserver.sh文件,将71行和76行的Xms和Xmx等改小一点
- 找不到或无法加载主类
org.apache.rocketmq.namesrv.namesrvstartup- 解决:修改broker的配置文件
dashboard 控制台
是 RocketMQ 的图形化管理控制台,提供 Broker 集群信息查看,Topic 管理,Producer、Consumer 信息展示,消息查询等等常用功能。
安装搭建
下载 rocketmq-dashboard 可视化控制台,并打包
1 | |