RocketMQ
1. 架构模型

- Broker 做了集群并且还进行了主从部署 ,由于消息分布在各个 Broker 上,一旦某个 Broker 宕机,则该 Broker 上的消息读写都会受到影响。所以 Rocketmq 提供了 master/slave 的结构,salve 定时从 master 同步数据(同步刷盘或者异步刷盘),如果 master 宕机,则 slave 提供消费服务,但是不能写入消息 (后面我还会提到哦)。
- 为了保证 HA ,NameServer 也做了集群部署,但是请注意它是 去中心化 的。也就意味着它没有主节点,你可以很明显地看出 NameServer 的所有节点是没有进行 Info Replicate 的,在 RocketMQ 中是通过 单个 Broker 和所有 NameServer 保持长连接 ,并且在每隔 30 秒 Broker 会向所有 Nameserver 发送心跳,心跳包含了自身的 Topic 配置信息,这个步骤就对应这上面的 Routing Info 。
- 在生产者需要向 Broker 发送消息的时候,需要先从 NameServer 获取关于 Broker 的路由信息,然后通过 轮询 的方法去向每个队列中生产数据以达到 负载均衡 的效果。 第四、消费者通过 NameServer 获取所有 Broker 的路由信息后,向 Broker 发送 Pull 请求来获取消息数据。Consumer 可以以两种模式启动—— 广播(Broadcast)和集群(Cluster)。广播模式下,一条消息会发送给 同一个消费组中的所有消费者 ,集群模式下消息只会发送给一个消费者。
2. 顺序消费
2.1 局部有序
所谓局部有序是指 消费者通过 同一个消费队列收到的消息是有顺序的 ,不同消息队列收到的消息则可能是无顺序的。普通顺序消息在 Broker 重启情况下不会保证消息顺序性 (短暂时间) 。
同时需要保证一个队列同时只会被同一个线程处理消息。
1
2
3
4
5
@RocketMQMessageListener(
consumerGroup = "spring-consumer1",
topic = "spring-topic1",
consumeMode = ConsumeMode.ORDERLY //顺序消息需要修改的地方
)2.2 全局有序
将所有消息全部发送到一个队列。
3. 分布式事务

1、Producer 向 broker 发送半消息
2、Producer 端收到响应,消息发送成功,此时消息是半消息,标记为 “不可投递” 状态,Consumer 消费不了。
3、Producer 端执行本地事务。
4、正常情况本地事务执行完成,Producer 向 Broker 发送 Commit/Rollback,如果是 Commit,Broker 端将半消息标记为正常消息,Consumer 可以消费,如果是 Rollback,Broker 丢弃此消息。
5、异常情况,Broker 端迟迟等不到二次确认。在一定时间后,会查询所有的半消息,然后到 Producer 端查询半消息的执行情况。
6、Producer 端查询本地事务的状态
7、根据事务的状态提交 commit/rollback 到 broker 端。(5,6,7 是消息回查)
8、消费者段消费到消息之后,执行本地事务,执行本地事务。
生产者代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@GetMapping("/test")
public void test() {
Sc sc = new Sc();
sc.setCno("001");
sc.setSno("001");
sc.setGrade(100);
Message<Sc> msg = MessageBuilder
.withPayload(sc)
.build();
TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(TOPIC, msg, null);
//发送状态
String sendStatus = result.getSendStatus().name();
// 本地事务执行状态
String localTxState = result.getLocalTransactionState().name();
log.info("send tx message sendStatus:{},localTXState:{}",sendStatus,localTxState);
}消费者代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Component
@RocketMQMessageListener(
consumerGroup = "tx-consumer",
topic = "tx-topic"
)
@Slf4j
public class TransactionConsumer implements RocketMQListener<Sc>, RocketMQPushConsumerLifecycleListener {
@Override
public void onMessage(Sc sc) {
log.info("事务消息消费者收到消息{}:",sc);
}
@Override
public void prepareStart(DefaultMQPushConsumer consumer) {
consumer.setMaxReconsumeTimes(3);
}
}生产者消息监听器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
@RocketMQTransactionListener
@Slf4j
public class TransactionMsgListener implements RocketMQLocalTransactionListener
{
/**
* 执行本地事务
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object obj) {
log.info("开始执行本地事务");
log.info("content:{}",new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8));
RocketMQLocalTransactionState resultState = RocketMQLocalTransactionState.COMMIT;
try
{
//处理业务
String jsonStr = msg.getPayload().toString();
log.info("invoke msg content:{}",jsonStr);
}
catch (Exception e)
{
log.error("invoke local mq trans error",e);
resultState = RocketMQLocalTransactionState.UNKNOWN;
}
return resultState;
}
/**
* 检查本地事务的状态
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg)
{
log.info("start check Local rocketMQ transaction");
RocketMQLocalTransactionState resultState = RocketMQLocalTransactionState.COMMIT;
try
{
String jsonStr = msg.getPayload().toString();
log.info("check trans msg content:{}",jsonStr);
}
catch (Exception e)
{
resultState = RocketMQLocalTransactionState.ROLLBACK;
}
return resultState;
}
}4. 消息发送重试机制(生产者到 Broker)
- 生产者在发送消息时,若采用同步或异步发送方式,发送失败会重试,但 oneway 消息发送方式
发送失败是没有重试机制的 - 消息重投机制可以保证消息尽可能发送成功、不丢失,但可能会造成消息重复。消息重复在 RocketMQ 中是无法避免的问题
- 消息重复在一般情况下不会发生,当出现消息量大、网络抖动,消息重复就会成为大概率事件
producer 主动重发、consumer 负载变化(发生 Rebalance,不会导致消息重复,但可能出现重复
消费)也会导致重复消息 - 消息重复无法避免,但要避免消息的重复消费。
- 避免消息重复消费的解决方案是,为消息添加唯一标识(例如消息 key),使消费者对消息进行消费判断来避免重复消费
其中,消息发送重试有三种策略可以选择:同步发送失败策略、异步发送失败策略、消息刷盘失败策略
- 同步发送失败策略(retryTimesWhenSendFailed)
对于普通消息,消息发送默认采用轮询策略来选择所发送到的队列。如果发送失败,默认重试2次。但在重试时是不会选择上次发送失败的 Broker,而是选择其它 Broker。当然,若只有一个 Broker 其也只能发送到该 Broker,但其会尽量发送到该 Broker 上的其它 Queue。同时,Broker 还具有失败隔离功能,使 Producer 尽量选择未发生过发送失败的 Broker 作为目标 Broker。其可以保证其它消息尽量不发送到问题 Broker,为了提升消息发送效率,降低消息发送耗时。 - 异步发送失败策略(retryTimesWhenSendAsyncFailed)
异步发送失败重试时,默认2次,异步重试不会选择其他 broker,仅在同一个 broker 上做重试,所以该策略无法保证消息不丢。 - 消息刷盘失败策略(retryAnotherBrokerWhenNotStoreOK)
消息刷盘超时(Master 或 Slave)或 slave 不可用(slave 在做数据同步时向 master 返回状态不是
SEND_OK)时,默认是不会将消息尝试发送到其他 Broker 的。不过,对于重要消息可以通过在 Broker 的配置文件设置retryAnotherBrokerWhenNotStoreOK属性为 true 来开启。
5. 消费重试机制
- 顺序消息的消费重试
对于顺序消息,当 Consumer 消费消息失败后,为了保证消息的顺序性,其会自动不断地进行消息重试,直到消费成功。消费重试默认间隔时间为 1000 毫秒。重试期间应用会出现消息消费被阻塞的情况 - 无序消息的消费重试
对于无序消息(普通消息、延时消息、事务消息),当 Consumer 消费消息失败时,可以通过设置返回状态达到消息重试的效果。不过需要注意,无序消息的重试只对集群消费方式生效,广播消费方式不提供失败重试特性。即对于广播消费,消费失败后,失败消息不再重试,继续消费后续消息。对于无序消息集群消费下的重试消费,每条消息默认最多重试 16 次,但每次重试的间隔时间是不同的,会逐渐变长。在到达最大重试次数后,会将消息投递到死信队列
5.1 重试队列
对于需要重试消费的消息,并不是 Consumer 在等待了指定时长后再次去拉取原来的消息进行消费,而是将这些需要重试消费的消息放入到了一个特殊 Topic 的队列中,而后进行再次消费的。这个特殊的队列就是重试队列。当出现需要进行重试消费的消息时,Broker 会为每个消费组都设置一个 Topic 名称为%RETRY%consumerGroup@consumerGroup 的重试队列
- 这个重试队列是针对消费者组的,而不是针对每个 Topic 设置的(一个 Topic 的消息可以让多个消费者组进行消费,所以会为这些消费者组各创建一个重试队列)
- 只有当出现需要进行重试消费的消息时,才会为该消费者组创建重试队列

6. 如何保证消息不丢失
6.1 消息发送流程

- 生产阶段,Producer 新建消息,然后通过网络将消息投递给 MQ Broker
- 存储阶段,消息将会存储在 Broker 端磁盘中
- 消息阶段, Consumer 将会从 Broker 拉取消息
6.2 生产阶段
生产者(Producer) 通过网络发送消息给 Broker,当 Broker 收到之后,将会返回确认响应信息给 Producer。所以生产者只要接收到返回的确认响应,就代表消息在生产阶段未丢失。
消息发送成功仅代表消息已经到了 Broker 端,Broker 在不同配置下,可能会返回不同响应状态:
SendStatus.SEND_OKSendStatus.FLUSH_DISK_TIMEOUTSendStatus.FLUSH_SLAVE_TIMEOUTSendStatus.SLAVE_NOT_AVAILABLE
6.3 Broker 存储阶段
默认情况下,消息只要到了 Broker 端,将会优先保存到内存中,然后立刻返回确认响应给生产者。随后 Broker 定期批量的将一组消息从内存异步刷入磁盘。
这种方式减少 I/O 次数,可以取得更好的性能,但是如果发生机器掉电,异常宕机等情况,消息还未及时刷入磁盘,就会出现丢失消息的情况。
若想保证 Broker 端不丢消息,保证消息的可靠性,我们需要将消息保存机制修改为同步刷盘方式,即消息存储磁盘成功,才会返回响应。
1
2
## 默认情况为 ASYNC_FLUSH
flushDiskType = SYNC_FLUSH若 Broker 未在同步刷盘时间内(默认为 5s)完成刷盘,将会返回 SendStatus.FLUSH_DISK_TIMEOUT 状态给生产者。
6.3.1 集群部署
为了保证可用性,Broker 通常采用一主(master)多从(slave)部署方式。为了保证消息不丢失,消息还需要复制到 slave 节点。
默认方式下,消息写入 master 成功,就可以返回确认响应给生产者,接着消息将会异步复制到 slave 节点。
此时若 master 突然宕机且不可恢复,那么还未复制到 slave 的消息将会丢失。
为了进一步提高消息的可靠性,我们可以采用同步的复制方式,master 节点将会同步等待 slave 节点复制完成,才会返回确认响应。
结合生产阶段与存储阶段,若需要严格保证消息不丢失,broker 需要采用如下配置:
1
2
3
4
5
6
7
## master 节点配置
flushDiskType = SYNC_FLUSH
brokerRole=SYNC_MASTER
## slave 节点配置
brokerRole=slave
flushDiskType = SYNC_FLUSH同时这个过程我们还需要生产者配合,判断返回状态是否是 SendStatus.SEND_OK。若是其他状态,就需要考虑补偿重试。
6.4 消费阶段
消费者从 broker 拉取消息,然后执行相应的业务逻辑。一旦执行成功,将会返回 ConsumeConcurrentlyStatus.CONSUME_SUCCESS 状态给 Broker。
如果 Broker 未收到消费确认响应或收到其他状态,消费者下次还会再次拉取到该条消息,进行重试。这样的方式有效避免了消费者消费过程发生异常,或者消息在网络传输中丢失的情况。
7. 延迟消息
7.1 延迟等级
延时消息的延迟时长不支持随意时长的延迟,是通过特定的延迟等级来指定的。延时等级定义在
RocketMQ 服务端的 MessageStoreConfig 类中的如下变量中:
延迟等级是从下标 1 开始算起
7.2 原理
- Producer 将消息发送到 Broker 后,Broker 会首先将消息写入到 commitlog 文件,然后需要将其分发到相应的 consumequeue。不过,在分发之前,系统会先判断消息中是否带有延时等级。若没有,则直接正常分发;若有则需要经历一个复杂的过程:
- 修改消息的 Topic 为
SCHEDULE_TOPIC_XXXX - 根据延时等级,在
consumequeue目录中SCHEDULE_TOPIC_XXXX主题下创建出相应的queueId目录与consumequeue文件(如果没有这些目录与文件的话)。 - 修改消息索引单元内容。索引单元中的
Message Tag HashCode部分原本存放的是消息的 Tag 的 Hash 值。现修改为消息的投递时间。投递时间是指该消息被重新修改为原 Topic 后再次被写入到commitlog中的时间。投递时间 = 消息存储时间 + 延时等级时间。消息存储时间指的是消息被发送到 Broker 时的时间戳。 - 将消息索引写入到
SCHEDULE_TOPIC_XXXX主题下相应的consumequeue中
- 修改消息的 Topic 为
- Broker 内部有⼀个延迟消息服务类
ScheuleMessageService,其会消费SCHEDULE_TOPIC_XXXX中的消息,即按照每条消息的投递时间,将延时消息投递到⽬标 Topic 中。不过,在投递之前会从 commitlog 中将原来写入的消息再次读出,并将其原来的延时等级设置为 0,即原消息变为了一条不延迟的普通消息。然后再次将消息投递到目标 Topic 中。 - 延迟消息服务类 ScheuleMessageService 将延迟消息再次发送给了 commitlog,并再次形成新的消息索引条目,分发到相应 Queue。
8. 零拷贝
在操作系统中,使用传统的方式,数据需要经历几次拷贝,还要经历用户态/内核态切换。
- 从磁盘复制数据到内核态内存;
- 从内核态内存复制到用户态内存;
- 然后从用户态内存复制到网络驱动的内核态内存;
- 最后是从网络驱动的内核态内存复制到网卡中进行传输。
其中一共有 4 次状态切换+4 次拷贝
所以,可以通过零拷贝的方式,减少用户态与内核态的上下文切换和内存拷贝的次数,用来提升 I/O 的性能。零拷贝比较常见的实现方式是 mmap,这种机制在 Java 中是通过 MappedByteBuffer 实现的。

基于 mmap IO 读写其实就变成 mmap + write 的操作,也就是用 mmap 替代传统 IO 中的 read 操作。当用户发起 mmap 调用的时候会发生上下文切换 1,进行内存映射,然后数据被拷贝到内核缓冲区,mmap 返回,发生上下文切换 2;随后用户调用 write,发生上下文切换 3,将内核缓冲区的数据拷贝到 Socket 缓冲区,write 返回,发生上下文切换 4。
发生 4 次上下文切换和 3 次 IO 拷贝操作,
9. 死信队列
当一条消息初次消费失败,消息队列会自动进行消费重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。这个队列就是死信队列(Dead-Letter Queue,DLQ),而其中的消息则称为死信消息(Dead-Letter Message,DLM)。
死信队列具有如下特征:
- 死信队列中的消息不会再被消费者正常消费,即 DLQ 对于消费者是不可见的
- 死信存储有效期与正常消息相同,均为 3 天(commitlog 文件的过期时间),3 天后会被自动删除
- 死信队列就是一个特殊的 Topic,名称为%DLQ%consumerGroup@consumerGroup ,即每个消
费者组都有一个死信队列如果⼀个消费者组未产生死信消息,则不会为其创建相应的死信队列
10. 为什么不使用 zookeeper
- 基于可用性的考虑,根据 CAP 理论,同时最多只能满足两个点,而 Zookeeper 满足的是 CP,也就是说 Zookeeper 并不能保证服务的可用性,Zookeeper 在进行选举的时候,整个选举的时间太长,期间整个集群都处于不可用的状态,而这对于一个注册中心来说肯定是不能接受的,作为服务发现来说就应该是为可用性而设计。
- 基于性能的考虑,NameServer 本身的实现非常轻量,而且可以通过增加机器的方式水平扩展,增加集群的抗压能力,而 Zookeeper 的写是不可扩展的,Zookeeper 要解决这个问题只能通过划分领域,划分多个 Zookeeper 集群来解决,首先操作起来太复杂,其次这样还是又违反了 CAP 中的 A 的设计,导致服务之间是不连通的。
- 持久化的机制来带的问题,ZooKeeper 的 ZAB 协议对每一个写请求,会在每个 ZooKeeper 节点上保持写一个事务日志,同时再加上定期的将内存数据镜像(Snapshot)到磁盘来保证数据的一致性和持久性,而对于一个简单的服务发现的场景来说,这其实没有太大的必要,这个实现方案太重了。而且本身存储的数据应该是高度定制化的。
- 消息发送应该弱依赖注册中心,而 RocketMQ 的设计理念也正是基于此,生产者在第一次发送消息的时候从 NameServer 获取到 Broker 地址后缓存到本地,如果 NameServer 整个集群不可用,短时间内对于生产者和消费者并不会产生太大影响。
11. 存储机制
RocketMQ 主要的存储文件包括 CommitLog 文件、ConsumeQueue 文件、Indexfile 文件。
- CommitLog:消息主体以及元数据的存储主体,存储 Producer 端写入的消息主体内容,消息内容不是定长的。单个文件大小默认 1G, 文件名长度为 20 位,左边补零,剩余为起始偏移量,比如 00000000000000000000 代表了第一个文件,起始偏移量为 0,文件大小为 1G=1073741824;当第一个文件写满了,第二个文件为 00000000001073741824,起始偏移量为 1073741824,以此类推。消息主要是顺序写入日志文件,当文件满了,写入下一个文件。
- ConsumeQueue:消息消费队列,引入的目的主要是提高消息消费的性能,由于 RocketMQ 是基于主题 topic 的订阅模式,消息消费是针对主题进行的,如果要遍历 commitlog 文件中根据 topic 检索消息是非常低效的。
ConsumeQueue 文件可以看成是基于 Topic 的 CommitLog 索引文件,故 ConsumeQueue 文件夹的组织方式如下:topic/queue/file 三层组织结构,具体存储路径为:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}。同样 ConsumeQueue 文件采取定长设计,每一个条目共 20 个字节,分别为 8 字节的 CommitLog 物理偏移量、4 字节的消息长度、8 字节 tag hashcode,单个文件由 30W 个条目组成,可以像数组一样随机访问每一个条目,每个 ConsumeQueue 文件大小约 5.72M;

- IndexFile:IndexFile(索引文件)提供了一种可以通过 key 或时间区间来查询消息的方法。Index 文件的存储位置是: {fileName},文件名 fileName 是以创建时的时间戳命名的,固定的单个 IndexFile 文件大小约为 400M,一个 IndexFile 可以保存 2000W 个索引,IndexFile 的底层存储设计为在文件系统中实现 HashMap 结构,故 RocketMQ 的索引文件其底层实现为 hash 索引。

总结一下:RocketMQ 采用的是混合型的存储结构,即为 Broker 单个实例下所有的队列共用一个日志数据文件(即为 CommitLog)来存储。
RocketMQ 的混合型存储结构(多个 Topic 的消息实体内容都存储于一个 CommitLog 中)针对 Producer 和 Consumer 分别采用了数据和索引部分相分离的存储结构,Producer 发送消息至 Broker 端,然后 Broker 端使用同步或者异步的方式对消息刷盘持久化,保存至 CommitLog 中。
只要消息被刷盘持久化至磁盘文件 CommitLog 中,那么 Producer 发送的消息就不会丢失。正因为如此,Consumer 也就肯定有机会去消费这条消息。当无法拉取到消息后,可以等下一次消息拉取,同时服务端也支持长轮询模式,如果一个消息拉取请求未拉取到消息,Broker 允许等待 30s 的时间,只要这段时间内有新消息到达,将直接返回给消费端。
这里,RocketMQ 的具体做法是,使用 Broker 端的后台服务线程—ReputMessageService 不停地分发请求并异步构建 ConsumeQueue(逻辑消费队列)和 IndexFile(索引文件)数据。
12. offset 管理
消费进度 offset 是用来记录每个 Queue 的不同消费组的消费进度的。根据消费进度记录器的不同,可以分为两种模式:本地模式和远程模式。

- offset 本地管理模式
当消费模式为广播消费时,offset 使用本地模式存储。因为每条消息会被所有的消费者消费,每个消费者管理自己的消费进度,各个消费者之间不存在消费进度的交集。
Consumer 在广播消费模式下 offset 相关数据以 json 的形式持久化到 Consumer 本地磁盘文件中,默认文件路径为当前用户主目录下的.rocketmq_offsets/${clientId}/${group}/Offsets.json。
其中${clientId}为当前消费者 id,默认为ip@DEFAULT;${group}为消费者组名称。 - offset 远程管理模式
当消费模式为集群消费时,offset 使用远程模式管理。因为所有 Cosnumer 实例对消息采用的是均衡消费,所有 Consumer 共享 Queue 的消费进度。Consumer 在集群消费模式下 offset 相关数据以 json 的形式持久化到 Broker 磁盘文件中,文件路径为当前用户主目录下的 store/config/consumerOffset.json 。Broker 启动时会加载这个文件,并写入到一个双层 Map(ConsumerOffsetManager)。外层 map 的 key 为 topic@group,value 为内层 map。内层 map 的 key 为 queueId,value 为 offset。当发生 Rebalance 时,新的 Consumer 会从该 Map 中获取到相应的数据来继续消费。集群模式下 offset 采用远程管理模式,主要是为了保证 Rebalance 机制。
12.1 Rebalance 机制
Rebalance 即再均衡,指的是,将⼀个 Topic 下的多个 Queue 在同⼀个 Consumer Group 中的多个 Consumer 间进行重新分配的过程。
在 Broker 中维护着多个 Map 集合,这些集合中动态存放着当前 Topic 中 Queue 的信息、Consumer Group 中 Consumer 实例的信息。一旦发现消费者所订阅的 Queue 数量发生变化,或消费者组中消费者的数量发生变化,立即向 Consumer Group 中的每个实例发出 Rebalance 通知。
Rebalance 机制的本意是为了提升消息的并行消费能力。例如,⼀个 Topic 下 5 个队列,在只有 1 个消费者的情况下,这个消费者将负责消费这 5 个队列的消息。如果此时我们增加⼀个消费者,那么就可以给其中⼀个消费者分配 2 个队列,给另⼀个分配 3 个队列,从而提升消息的并行消费能力。
Rebalance 限制
由于⼀个队列最多分配给⼀个消费者,因此当某个消费者组下的消费者实例数量大于队列的数量时,多余的消费者实例将分配不到任何队列。
Rebalance 危害
Rebalance 的在提升消费能力的同时,也带来一些问题:
消费暂停:在只有一个 Consumer 时,其负责消费所有队列;在新增了一个 Consumer 后会触发
Rebalance 的发生。此时原 Consumer 就需要暂停部分队列的消费,等到这些队列分配给新的 Consumer 后,这些暂停消费的队列才能继续被消费。消费重复:Consumer 在消费新分配给自己的队列时,必须接着之前 Consumer 提交的消费进度的 offset 继续消费。然而默认情况下,offset 是异步提交的,这个异步性导致提交到 Broker 的 offset 与 Consumer 实际消费的消息并不一致。这个不一致的差值就是可能会重复消费的消息。
- 同步提交:consumer 提交了其消费完毕的一批消息的 offset 给 broker 后,需要等待 broker 的成功 ACK。当收到 ACK 后,consumer 才会继续获取并消费下一批消息。在等待 ACK 期间 consumer
是阻塞的。 - 异步提交:consumer 提交了其消费完毕的一批消息的 offset 给 broker 后,不需要等待 broker 的成功 ACK。consumer 可以直接获取并消费下一批消息。对于一次性读取消息的数量,需要根据具体业务场景选择一个相对均衡的是很有必要的。因为数量过大,系统性能提升了,但产生重复消费的消息数量可能会增加;数量过小,系统性能会下降,但被重复消费的消息数量可能会减少。
- 同步提交:consumer 提交了其消费完毕的一批消息的 offset 给 broker 后,需要等待 broker 的成功 ACK。当收到 ACK 后,consumer 才会继续获取并消费下一批消息。在等待 ACK 期间 consumer
消费突刺:由于 Rebalance 可能导致重复消费,如果需要重复消费的消息过多,或者因为 Rebalance 暂停时间过长从而导致积压了部分消息。那么有可能会导致在 Rebalance 结束之后瞬间需要消费很多消息。
Rebalance的触发时机
- 消费者启动时主动进行一次Rebalance
- 消费者启动后设置定时进行Rebalance,20s/次
- 消费者组实例数量发生变化时,broker通知消费者进行Rebalance
- 所订阅的topic的messageQueue数量发生变化时、订阅关系变化时,broker通知消费者进行Rebalance
Rebalance的触发场景
- 消费者启动
- 消费者扩缩容
- 消费者宕机
- broker扩缩容
- messageQueue数量调整
- 网络问题导致客户端
- broker连接断开
13 队列负载均衡策略
AllocateMessageQueueAveragely:默认均衡规则
AllocateMessageQueueAveragelyByCircle:循环平均分配,是第1种方式的变种。针对queue数量多余Consumer数量的情况下,使用循环分配规则。如有3个Consumer、5个queue,则Consumer0消费queue0和queue3、Consumer1消费queue1和queue4、Consumer2消费queue2。
AllocateMessageQueueByMachineRoom:机房分配策略

AllocateMessageQueueConsistentHash:一致性Hash方式分配。

14. 消费者获取消息流程
每个消费者实例属于一个消费者组(Consumer Group)。一个消费者组内的多个消费者实例会共同消费一个主题(Topic)的消息,通常一个消费者实例会负责多个消息队列(Message Queue),@RocketMQMessageListener 注解用于标注消费者类,指定该消费者的主题、消费者组、消费模式等信息。每个消费者实例会根据这些信息进行初始化,包括拉取任务队列的创建和管理。
14.1 消费者初始化阶段
初始化时,会创建一个阻塞队列,LinkedBlockingQueue<PullRequest> pullRequestQueue,根据负载均衡策略,从 Broker 获取需要消费的消息队列,并为每个消息队列创建 PullRequest。将这个PullRequest放进阻塞队列中。
启动过程中,会初始化核心组件:PullMessageService(拉取服务)、RebalanceService(负载均衡服务)、消费线程池。
负载均衡服务(RebalanceService)
- 周期性运行:默认每20秒触发一次队列重平衡(可通过参数调整)。
- 队列分配策略:根据消费者组内实例数,采用分配策略(如平均分配、一致性哈希等)为当前消费者分配 MessageQueue。
- 创建 ProcessQueue:为每个分配的 MessageQueue 创建对应的 ProcessQueue,用于暂存拉取到的消息并跟踪消费进度。
- 生成 PullRequest:为每个分配的 MessageQueue 生成初始的 PullRequest,包含 Topic、Broker 名称、队列 ID、拉取偏移量(初始为
-1,表示从 Broker 建议的位置开始)。
14.2 消息拉取阶段
PullMessageService 工作机制
- 单线程模型:每个消费者实例仅有一个 PullMessageService 线程(非多线程)。
- 阻塞队列操作:从
LinkedBlockingQueue<PullRequest>中取出 PullRequest,若队列为空则阻塞等待。
拉取消息流程
构建拉取请求:根据
PullRequest中的 Topic、队列 ID、拉取偏移量(首次为-1)向对应 Broker 发送拉取请求。Broker 响应:
- 若拉取成功,返回消息数据及建议的下一次拉取偏移量(
nextBeginOffset)。 - 若拉取失败(如队列不存在),触发延迟重试机制(如 3秒后重新入队)。
- 若拉取成功,返回消息数据及建议的下一次拉取偏移量(
更新 PullRequest:将
nextBeginOffset更新到 PullRequest 中,并重新放入拉取队列尾部(实现持续轮询拉取)。
消息暂存到 ProcessQueue
- 写入 ProcessQueue:拉取到的消息按顺序存入对应
MessageQueue的ProcessQueue中,并维护以下状态:msgTreeMap:有序消息集合(Key 为队列偏移量)。consuming:当前是否正在消费。locked:队列是否被锁定(集群模式下)。
- 流控机制:若
ProcessQueue中消息总数或大小超过阈值(默认 1000条),暂停该队列的拉取任务。
- 写入 ProcessQueue:拉取到的消息按顺序存入对应
队列锁定(
Locked状态)的核心目的是避免重复消费,具体场景包括:
- Rebalance 期间:当消费者实例增减时,Broker 通过队列锁定确保同一队列仅被一个实例消费。
- 顺序消息严格性:顺序消费要求独占队列,锁定防止其他实例干扰消费顺序。
- 故障转移:若消费者实例宕机,Broker 检测到心跳超时后解除锁定,其他实例可接管队列。
14.3 消息消费阶段
提交消费任务到线程池:
异步消费模式:PullMessageService 将消息提交到消费线程池后立即返回,不等待消费完成。
消费任务封装:每个消费任务包含一批消息,提交时携带 ProcessQueue 引用。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23public class ConsumeRequest { private final List<MessageExt> messageExts; private final MessageQueue messageQueue; private final ProcessQueue processQueue; public ConsumeRequest(List<MessageExt> messageExts, MessageQueue messageQueue, ProcessQueue processQueue) { this.messageExts = messageExts; this.messageQueue = messageQueue; this.processQueue = processQueue; } public List<MessageExt> getMessageExts() { return this.messageExts; } public MessageQueue getMessageQueue() { return this.messageQueue; } public ProcessQueue getProcessQueue() { return this.processQueue; } }
消费线程池处理:
并发消费:线程池中的线程调用
ConsumeMessageConcurrentlyService处理消息,支持批量消费。顺序消费:若为顺序消费模式,使用
ConsumeMessageOrderlyService,确保同一队列的消息串行处理。
消费结果处理
- 消费成功:
- 从 ProcessQueue 的
msgTreeMap中移除已消费消息。 - 更新消费位点:记录 ProcessQueue 中最小偏移量(
minOffset),并异步提交到 Broker(默认每5秒提交一次)。
- 从 ProcessQueue 的
- 消费失败:
- 并发模式:返回
RECONSUME_LATER,消息将进入重试队列(%RETRY% 主题),延迟后重新投递。 - 顺序模式:持续重试当前消息,直到成功或超过最大重试次数。
- 并发模式:返回
- 消费成功:
Tips:
PullMessageService根据PullRequest中的信息,构建拉取消息请求并发送给 Broker。请求到达 broker 的PullMessageProcessor后,如果有新消息立即返回,如果没有新消息,会将请求放入到PullRequestHoldService中进行等待,等待期间PullRequestHoldService会每 1s 轮训一次检查有没有新消息到达。等待有新消息到达或者等待 30s 后进行返回。线程池中的消息会被并行消费,线程数可以通过
@RocketMQMessageListener的consumeThreadNumber属性设置,默认是20
拉取请求(PullRequest)什么时候被放到阻塞队列中?
RebalanceService会定时 20s 轮询一次查看消费者组中的消费者是否有变化(增/减),例如新建消费者节点时就属于增行为,会重平衡为当前消费者节点分配 queueId ,包装PullRequest拉取请求放入到阻塞队列中。- 在上一次拉取请求完成后并且消费者消费完成后将拉取请求再次放入到阻塞队列中。



