RocketMQ 源码分析 —— Message 拉取与消费(上)
作者:互联网
摘要: 原创出处 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-first/ 「芋道源码」欢迎转载,保留摘要,谢谢!
- 1、概述
- 2、ConsumeQueue 结构
- 3、ConsumeQueue 存储
- 4、Broker 提供[拉取消息]接口
- 5、Broker 提供[更新消费进度]接口
- 6、Broker 提供[发回消息]接口
- 7、结尾
阅读源码最好的方式,是使用 IDEA 进行调试 RocketMQ 源码,不然会一脸懵逼。
胖友可以点击「芋道源码」扫码关注,回复 git001 关键字
获得艿艿添加了中文注释的 RocketMQ 源码地址。阅读源码很孤单,加入源码交流群,一起坚持!
1、概述
本章主要解析 消费 逻辑涉及到的源码。因为篇幅较长,分成上下两篇:
- 上篇:
Broker
相关源码。 - 下篇:
Consumer
相关源码。
本文即是上篇。
ok,先看第一张关于消费逻辑的图:
消费逻辑图
再看消费逻辑精简的顺序图(实际情况会略有差别):
Consumer&Broker消费精简图.png
2、ConsumeQueue 结构
ConsumeQueue
、MappedFileQueue
、MappedFile
的关系如下:
ConsumeQueue
:MappedFileQueue
:MappedFile
= 1 : 1 : N。
反应到系统文件如下:
Yunai-MacdeMacBook-Pro-2:consumequeue yunai$ pwd
/Users/yunai/store/consumequeue
Yunai-MacdeMacBook-Pro-2:consumequeue yunai$ cd TopicRead3/
Yunai-MacdeMacBook-Pro-2:TopicRead3 yunai$ ls -ls
total 0
0 drwxr-xr-x 3 yunai staff 102 4 27 21:52 0
0 drwxr-xr-x 3 yunai staff 102 4 27 21:55 1
0 drwxr-xr-x 3 yunai staff 102 4 27 21:55 2
0 drwxr-xr-x 3 yunai staff 102 4 27 21:55 3
Yunai-MacdeMacBook-Pro-2:TopicRead3 yunai$ cd 0/
Yunai-MacdeMacBook-Pro-2:0 yunai$ ls -ls
total 11720
11720 -rw-r--r-- 1 yunai staff 6000000 4 27 21:55 00000000000000000000
ConsumeQueue
、MappedFileQueue
、MappedFile
的定义如下:
MappedFile
:00000000000000000000等文件。MappedFileQueue
:MappedFile
所在的文件夹,对MappedFile
进行封装成文件队列,对上层提供可无限使用的文件容量。- 每个
MappedFile
统一文件大小。 - 文件命名方式:fileName[n] = fileName[n - 1] + mappedFileSize。在
ConsumeQueue
里默认为 6000000B。
- 每个
ConsumeQueue
:针对MappedFileQueue
的封装使用。Store : ConsumeQueue = ConcurrentHashMap<String/* topic */, ConcurrentHashMap<Integer/* queueId */, ConsumeQueue>>
。
ConsumeQueue
存储在 MappedFile
的内容必须大小是 20B( ConsumeQueue.CQ_STORE_UNIT_SIZE
),有两种内容类型:
MESSAGE_POSITION_INFO
:消息位置信息。BLANK
: 文件前置空白占位。当历史Message
被删除时,需要用BLANK
占位被删除的消息。
MESSAGE_POSITION_INFO
在 ConsumeQueue
存储结构:
第几位 | 字段 | 说明 | 数据类型 | 字节数 |
---|---|---|---|---|
1 | offset | 消息 CommitLog 存储位置 | Long | 8 |
2 | size | 消息长度 | Int | 4 |
3 | tagsCode | 消息tagsCode | Long | 8 |
BLANK
在 ConsumeQueue
存储结构:
第几位 | 字段 | 说明 | 数据类型 | 字节数 |
---|---|---|---|---|
1 | 0 | Long | 8 | |
2 | Integer.MAX_VALUE | Int | 4 | |
3 | 0 | Long | 8 |
3、ConsumeQueue 存储
CommitLog重放ConsumeQueue图
主要有两个组件:
ReputMessageService
:write ConsumeQueue。FlushConsumeQueueService
:flush ConsumeQueue。
ReputMessageService
ReputMessageService顺序图
1: class ReputMessageService extends ServiceThread {
2:
3: /**
4: * 开始重放消息的CommitLog物理位置
5: */
6: private volatile long reputFromOffset = 0;
7:
8: public long getReputFromOffset() {
9: return reputFromOffset;
10: }
11:
12: public void setReputFromOffset(long reputFromOffset) {
13: this.reputFromOffset = reputFromOffset;
14: }
15:
16: @Override
17: public void shutdown() {
18: for (int i = 0; i < 50 && this.isCommitLogAvailable(); i++) {
19: try {
20: Thread.sleep(100);
21: } catch (InterruptedException ignored) {
22: }
23: }
24:
25: if (this.isCommitLogAvailable()) {
26: log.warn("shutdown ReputMessageService, but commitlog have not finish to be dispatched, CL: {} reputFromOffset: {}",
27: DefaultMessageStore.this.commitLog.getMaxOffset(), this.reputFromOffset);
28: }
29:
30: super.shutdown();
31: }
32:
33: /**
34: * 剩余需要重放消息字节数
35: *
36: * @return 字节数
37: */
38: public long behind() {
39: return DefaultMessageStore.this.commitLog.getMaxOffset() - this.reputFromOffset;
40: }
41:
42: /**
43: * 是否commitLog需要重放消息
44: *
45: * @return 是否
46: */
47: private boolean isCommitLogAvailable() {
48: return this.reputFromOffset < DefaultMessageStore.this.commitLog.getMaxOffset();
49: }
50:
51: private void doReput() {
52: for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
53:
54: // TODO 疑问:这个是啥
55: if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable() //
56: && this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {
57: break;
58: }
59:
60: // 获取从reputFromOffset开始的commitLog对应的MappeFile对应的MappedByteBuffer
61: SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
62: if (result != null) {
63: try {
64: this.reputFromOffset = result.getStartOffset();
65:
66: // 遍历MappedByteBuffer
67: for (int readSize = 0; readSize < result.getSize() && doNext; ) {
68: // 生成重放消息重放调度请求
69: DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
70: int size = dispatchRequest.getMsgSize(); // 消息长度
71: // 根据请求的结果处理
72: if (dispatchRequest.isSuccess()) { // 读取成功
73: if (size > 0) { // 读取Message
74: DefaultMessageStore.this.doDispatch(dispatchRequest);
75: // 通知有新消息
76: if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()
77: && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {
78: DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
79: dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
80: dispatchRequest.getTagsCode());
81: }
82: // FIXED BUG By shijia
83: this.reputFromOffset += size;
84: readSize += size;
85: // 统计
86: if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
87: DefaultMessageStore.this.storeStatsService
88: .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet();
89: DefaultMessageStore.this.storeStatsService
90: .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
91: .addAndGet(dispatchRequest.getMsgSize());
92: }
93: } else if (size == 0) { // 读取到MappedFile文件尾
94: this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
95: readSize = result.getSize();
96: }
97: } else if (!dispatchRequest.isSuccess()) { // 读取失败
98: if (size > 0) { // 读取到Message却不是Message
99: log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset);
100: this.reputFromOffset += size;
101: } else { // 读取到Blank却不是Blank
102: doNext = false;
103: if (DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) {
104: log.error("[BUG]the master dispatch message to consume queue error, COMMITLOG OFFSET: {}",
105: this.reputFromOffset);
106:
107: this.reputFromOffset += result.getSize() - readSize;
108: }
109: }
110: }
111: }
112: } finally {
113: result.release();
114: }
115: } else {
116: doNext = false;
117: }
118: }
119: }
120:
121: @Override
122: public void run() {
123: DefaultMessageStore.log.info(this.getServiceName() + " service started");
124:
125: while (!this.isStopped()) {
126: try {
127: Thread.sleep(1);
128: this.doReput();
129: } catch (Exception e) {
130: DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
131: }
132: }
133:
134: DefaultMessageStore.log.info(this.getServiceName() + " service end");
135: }
136:
137: @Override
138: public String getServiceName() {
139: return ReputMessageService.class.getSimpleName();
140: }
141:
142: }
- 说明:重放消息线程服务。
- 该服务不断生成 消息位置信息 到 消费队列(ConsumeQueue)
- 该服务不断生成 消息索引 到 索引文件(IndexFile)
- ReputMessageService用例图
- 第 75 至 81 行 :当
Broker
是主节点 &&Broker
开启的是长轮询,通知消费队列有新的消息。NotifyMessageArrivingListener
会 调用PullRequestHoldService#notifyMessageArriving(...)
方法,详细解析见:PullRequestHoldService - 第 61 行 :获取
reputFromOffset
开始的CommitLog
对应的MappedFile
对应的MappedByteBuffer
。 - 第 67 行 :遍历
MappedByteBuffer
。 - 第 69 行 :生成重放消息重放调度请求 (
DispatchRequest
) 。请求里主要包含一条消息 (Message
) 或者 文件尾 (BLANK
) 的基本信息。 - 第 72 至 96 行 :请求是有效请求,进行逻辑处理。
- 第 73 至 92 行 :请求对应的是
Message
,进行调度,生成ConsumeQueue
和IndexFile
对应的内容。详细解析见: - 第 93 至 96 行 :请求对应的是
Blank
,即文件尾,跳转指向下一个MappedFile
。 - 第 97 至 110 行 :请求是无效请求。出现该情况,基本是一个BUG。
- 第 75 至 81 行 :当
- 第 127 至 128 行 :每 1ms 循环执行重放逻辑。
- 第 18 至 30 行 :
shutdown
时,多次sleep(100)
直到CommitLog
回放到最新位置。恩,如果未回放完,会输出警告日志。
DefaultMessageStore#doDispatch(...)
1: /**
2: * 执行调度请求
3: * 1. 非事务消息 或 事务提交消息 建立 消息位置信息 到 ConsumeQueue
4: * 2. 建立 索引信息 到 IndexFile
5: *
6: * @param req 调度请求
7: */
8: public void doDispatch(DispatchRequest req) {
9: // 非事务消息 或 事务提交消息 建立 消息位置信息 到 ConsumeQueue
10: final int tranType = MessageSysFlag.getTransactionValue(req.getSysFlag());
11: switch (tranType) {
12: case MessageSysFlag.TRANSACTION_NOT_TYPE:
13: case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
14: DefaultMessageStore.this.putMessagePositionInfo(req.getTopic(), req.getQueueId(), req.getCommitLogOffset(), req.getMsgSize(),
15: req.getTagsCode(), req.getStoreTimestamp(), req.getConsumeQueueOffset());
16: break;
17: case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
18: case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
19: break;
20: }
21: // 建立 索引信息 到 IndexFile
22: if (DefaultMessageStore.this.getMessageStoreConfig().isMessageIndexEnable()) {
23: DefaultMessageStore.this.indexService.buildIndex(req);
24: }
25: }
26:
27: /**
28: * 建立 消息位置信息 到 ConsumeQueue
29: *
30: * @param topic 主题
31: * @param queueId 队列编号
32: * @param offset commitLog存储位置
33: * @param size 消息长度
34: * @param tagsCode 消息tagsCode
35: * @param storeTimestamp 存储时间
36: * @param logicOffset 队列位置
37: */
38: public void putMessagePositionInfo(String topic, int queueId, long offset, int size, long tagsCode, long storeTimestamp,
39: long logicOffset) {
40: ConsumeQueue cq = this.findConsumeQueue(topic, queueId);
41: cq.putMessagePositionInfoWrapper(offset, size, tagsCode, storeTimestamp, logicOffset);
42: }
ConsumeQueue#putMessagePositionInfoWrapper(...)
1: /**
2: * 添加位置信息封装
3: *
4: * @param offset commitLog存储位置
5: * @param size 消息长度
6: * @param tagsCode 消息tagsCode
7: * @param storeTimestamp 消息存储时间
8: * @param logicOffset 队列位置
9: */
10: public void putMessagePositionInfoWrapper(long offset, int size, long tagsCode, long storeTimestamp,
11: long logicOffset) {
12: final int maxRetries = 30;
13: boolean canWrite = this.defaultMessageStore.getRunningFlags().isWriteable();
14: // 多次循环写,直到成功
15: for (int i = 0; i < maxRetries && canWrite; i++) {
16: // 调用添加位置信息
17: boolean result = this.putMessagePositionInfo(offset, size, tagsCode, logicOffset);
18: if (result) {
19: // 添加成功,使用消息存储时间 作为 存储check point。
20: this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(storeTimestamp);
21: return;
22: } else {
23: // XXX: warn and notify me
24: log.warn("[BUG]put commit log position info to " + topic + ":" + queueId + " " + offset
25: + " failed, retry " + i + " times");
26:
27: try {
28: Thread.sleep(1000);
29: } catch (InterruptedException e) {
30: log.warn("", e);
31: }
32: }
33: }
34:
35: // XXX: warn and notify me 设置异常不可写入
36: log.error("[BUG]consume queue can not write, {} {}", this.topic, this.queueId);
37: this.defaultMessageStore.getRunningFlags().makeLogicsQueueError();
38: }
39:
40: /**
41: * 添加位置信息,并返回添加是否成功
42: *
43: * @param offset commitLog存储位置
44: * @param size 消息长度
45: * @param tagsCode 消息tagsCode
46: * @param cqOffset 队列位置
47: * @return 是否成功
48: */
49: private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
50: final long cqOffset) {
51: // 如果已经重放过,直接返回成功
52: if (offset <= this.maxPhysicOffset) {
53: return true;
54: }
55: // 写入位置信息到byteBuffer
56: this.byteBufferIndex.flip();
57: this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
58: this.byteBufferIndex.putLong(offset);
59: this.byteBufferIndex.putInt(size);
60: this.byteBufferIndex.putLong(tagsCode);
61: // 计算consumeQueue存储位置,并获得对应的MappedFile
62: final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;
63: MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
64: if (mappedFile != null) {
65: // 当是ConsumeQueue第一个MappedFile && 队列位置非第一个 && MappedFile未写入内容,则填充前置空白占位
66: if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) { // TODO 疑问:为啥这个操作。目前能够想象到的是,一些老的消息很久没发送,突然发送,这个时候刚好满足。
67: this.minLogicOffset = expectLogicOffset;
68: this.mappedFileQueue.setFlushedWhere(expectLogicOffset);
69: this.mappedFileQueue.setCommittedWhere(expectLogicOffset);
70: this.fillPreBlank(mappedFile, expectLogicOffset);
71: log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " "
72: + mappedFile.getWrotePosition());
73: }
74: // 校验consumeQueue存储位置是否合法。TODO 如果不合法,继续写入会不会有问题?
75: if (cqOffset != 0) {
76: long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();
77: if (expectLogicOffset != currentLogicOffset) {
78: LOG_ERROR.warn(
79: "[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
80: expectLogicOffset,
81: currentLogicOffset,
82: this.topic,
83: this.queueId,
84: expectLogicOffset - currentLogicOffset
85: );
86: }
87: }
88: // 设置commitLog重放消息到ConsumeQueue位置。
89: this.maxPhysicOffset = offset;
90: // 插入mappedFile
91: return mappedFile.appendMessage(this.byteBufferIndex.array());
92: }
93: return false;
94: }
95:
96: /**
97: * 填充前置空白占位
98: *
99: * @param mappedFile MappedFile
100: * @param untilWhere consumeQueue存储位置
101: */
102: private void fillPreBlank(final MappedFile mappedFile, final long untilWhere) {
103: // 写入前置空白占位到byteBuffer
104: ByteBuffer byteBuffer = ByteBuffer.allocate(CQ_STORE_UNIT_SIZE);
105: byteBuffer.putLong(0L);
106: byteBuffer.putInt(Integer.MAX_VALUE);
107: byteBuffer.putLong(0L);
108: // 循环填空
109: int until = (int) (untilWhere % this.mappedFileQueue.getMappedFileSize());
110: for (int i = 0; i < until; i += CQ_STORE_UNIT_SIZE) {
111: mappedFile.appendMessage(byteBuffer.array());
112: }
113: }
#putMessagePositionInfoWrapper(...)
说明 :添加位置信息到ConsumeQueue
的封装,实际需要调用#putMessagePositionInfo(...)
方法。- 第 13 行 :判断
ConsumeQueue
是否允许写入。当发生Bug时,不允许写入。 - 第 17 行 :调用
#putMessagePositionInfo(...)
方法,添加位置信息。 - 第 18 至 21 行 :添加成功,使用消息存储时间 作为 存储检查点。
StoreCheckpoint
的详细解析见:Store初始化与关闭。 - 第 22 至 32 行 :添加失败,目前基本可以认为是BUG。
- 第 35 至 37 行 :写入失败时,标记
ConsumeQueue
写入异常,不允许继续写入。
- 第 13 行 :判断
#putMessagePositionInfo(...)
说明 :添加位置信息到ConsumeQueue
,并返回添加是否成功。- 这块比较有疑问,如果计算出来的存储位置不合法,不返回添加失败,继续进行添加位置信息,会不会有问题???
- 这块比较有疑问,什么场景下会需要。猜测产生的原因:一个
Topic
长期无消息产生,突然N天后进行发送,Topic
对应的历史消息以及和消费队列数据已经被清理,新生成的MappedFile
需要前置占位。 - 第 51 至 54 行 :如果
offset
(存储位置) 小于等于maxPhysicOffset
(CommitLog
消息重放到ConsumeQueue
最大的CommitLog
存储位置),表示已经重放过,此时,不再重复写入,直接返回写入成功。 - 第 55 至 60 行 :写 位置信息到byteBuffer。
- 第 62 至 63 行 :计算
ConsumeQueue
存储位置,并获得对应的MappedFile。 - 第 65 至 73 行 :当
MappedFile
是ConsumeQueue
当前第一个文件 &&MappedFile
未写入内容 && 重放消息队列位置大于0,则需要进行MappedFile
填充前置BLANK
。 - 第 74 至 87 行 :校验
ConsumeQueue
存储位置是否合法,不合法则输出日志。 - 第 89 行 :设置
CommitLog
重放消息到ConsumeQueue
的最大位置。 - 第 91 行 :插入消息位置到
MappedFile
。
FlushConsumeQueueService
1: class FlushConsumeQueueService extends ServiceThread {
2: private static final int RETRY_TIMES_OVER = 3;
3: /**
4: * 最后flush时间戳
5: */
6: private long lastFlushTimestamp = 0;
7:
8: private void doFlush(int retryTimes) {
9: int flushConsumeQueueLeastPages = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueLeastPages();
10:
11: // retryTimes == RETRY_TIMES_OVER时,进行强制flush。主要用于shutdown时。
12: if (retryTimes == RETRY_TIMES_OVER) {
13: flushConsumeQueueLeastPages = 0;
14: }
15: // 当时间满足flushConsumeQueueThoroughInterval时,即使写入的数量不足flushConsumeQueueLeastPages,也进行flush
16: long logicsMsgTimestamp = 0;
17: int flushConsumeQueueThoroughInterval = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueThoroughInterval();
18: long currentTimeMillis = System.currentTimeMillis();
19: if (currentTimeMillis >= (this.lastFlushTimestamp + flushConsumeQueueThoroughInterval)) {
20: this.lastFlushTimestamp = currentTimeMillis;
21: flushConsumeQueueLeastPages = 0;
22: logicsMsgTimestamp = DefaultMessageStore.this.getStoreCheckpoint().getLogicsMsgTimestamp();
23: }
24: // flush消费队列
25: ConcurrentHashMap<String, ConcurrentHashMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable;
26: for (ConcurrentHashMap<Integer, ConsumeQueue> maps : tables.values()) {
27: for (ConsumeQueue cq : maps.values()) {
28: boolean result = false;
29: for (int i = 0; i < retryTimes && !result; i++) {
30: result = cq.flush(flushConsumeQueueLeastPages);
31: }
32: }
33: }
34: // flush 存储 check point
35: if (0 == flushConsumeQueueLeastPages) {
36: if (logicsMsgTimestamp > 0) {
37: DefaultMessageStore.this.getStoreCheckpoint().setLogicsMsgTimestamp(logicsMsgTimestamp);
38: }
39: DefaultMessageStore.this.getStoreCheckpoint().flush();
40: }
41: }
42:
43: public void run() {
44: DefaultMessageStore.log.info(this.getServiceName() + " service started");
45:
46: while (!this.isStopped()) {
47: try {
48: int interval = DefaultMessageStore.this.getMessageStoreConfig().getFlushIntervalConsumeQueue();
49: this.waitForRunning(interval);
50: this.doFlush(1);
51: } catch (Exception e) {
52: DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
53: }
54: }
55:
56: this.doFlush(RETRY_TIMES_OVER);
57:
58: DefaultMessageStore.log.info(this.getServiceName() + " service end");
59: }
60:
61: @Override
62: public String getServiceName() {
63: return FlushConsumeQueueService.class.getSimpleName();
64: }
65:
66: @Override
67: public long getJointime() {
68: return 1000 * 60;
69: }
70: }
- 说明 :flush
ConsumeQueue
(消费队列) 线程服务。 - 第 11 至 14 行 :当
retryTimes == RETRY_TIMES_OVER
时,进行强制flush。用于shutdown
时。 - 第 15 至 23 行 :每 flushConsumeQueueThoroughInterval 周期,执行一次 flush 。因为不是每次循环到都能满足 flushConsumeQueueLeastPages 大小,因此,需要一定周期进行一次强制 flush 。当然,不能每次循环都去执行强制 flush,这样性能较差。
- 第 24 至 33 行 :flush
ConsumeQueue
(消费队列)。- flush 逻辑:MappedFile#落盘。
- 第 34 至 40 行 :flush
StoreCheckpoint
。StoreCheckpoint
的详细解析见:Store初始化与关闭。 - 第 43 至 59 行 :每 1000ms 执行一次
flush
。如果 wakeup() 时,则会立即进行一次flush
。目前,暂时不存在 wakeup() 的调用。
4、Broker 提供[拉取消息]接口
PullMessageRequestHeader
1: public class PullMessageRequestHeader implements CommandCustomHeader {
2: /**
3: * 消费者分组
4: */
5: @CFNotNull
6: private String consumerGroup;
7: /**
8: * Topic
9: */
10: @CFNotNull
11: private String topic;
12: /**
13: * 队列编号
14: */
15: @CFNotNull
16: private Integer queueId;
17: /**
18: * 队列开始位置
19: */
20: @CFNotNull
21: private Long queueOffset;
22: /**
23: * 消息数量
24: */
25: @CFNotNull
26: private Integer maxMsgNums;
27: /**
28: * 系统标识
29: */
30: @CFNotNull
31: private Integer sysFlag;
32: /**
33: * 提交消费进度位置
34: */
35: @CFNotNull
36: private Long commitOffset;
37: /**
38: * 挂起超时时间
39: */
40: @CFNotNull
41: private Long suspendTimeoutMillis;
42: /**
43: * 订阅表达式
44: */
45: @CFNullable
46: private String subscription;
47: /**
48: * 订阅版本号
49: */
50: @CFNotNull
51: private Long subVersion;
52: }
- 说明:拉取消息请求Header
- topic + queueId + queueOffset + maxMsgNums
- sysFlag :系统标识。
- 第 0 位
FLAG_COMMIT_OFFSET
:标记请求提交消费进度位置,和commitOffset
配合。 - 第 1 位
FLAG_SUSPEND
:标记请求是否挂起请求,和suspendTimeoutMillis
配合。当拉取不到消息时,Broker
会挂起请求,直到有消息。最大挂起时间:suspendTimeoutMillis
毫秒。 - 第 2 位
FLAG_SUBSCRIPTION
:是否过滤订阅表达式,和subscription
配置。
- 第 0 位
- subVersion :订阅版本号。请求时,如果版本号不对,则无法拉取到消息,需要重新获取订阅信息,使用最新的订阅版本号。
PullMessageProcessor#proce***equest(...)
// ... 文章过长,超过微信限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-first/ 阅读
- 说明:处理拉取消息请求,返回响应。
- 第 14 至 19 行 :校验
Broker
是否可读。 - 第 21 至 33 行 :校验
SubscriptionGroupConfig
(订阅分组配置) 是否存在 && 可以消费。 - 第 35 至 38 行 :处理
PullMessageRequestHeader.sysFlag
对应的标志位。 - 第 40 至 62 行 :校验
TopicConfig
(主题配置) 是否存在 && 可读 && 队列编号正确。 - 第 64 至 110 行 :校验
SubscriptionData
(订阅信息) 是否正确。 - 第 113 行 :调用
MessageStore#getMessage(...)
获取GetMessageResult
(消息)。详细解析见:MessageStore#getMessage(...)。 - 第 122 至 152 行 :计算建议拉取消息
brokerId
。 - 第 154 至 201 行 :
- 第 204 至 244 行 :
Hook
逻辑,#executeConsumeMessageHookBefore(...)
。 - 第 247 至 283 行 :拉取消息成功,即拉取到消息。
- 第 255 至 263 行 :方式一 :调用
readGetMessageResult(...)
获取消息内容到堆内内存,设置到 响应body
。 - 第 265 至 281 行 :方式二 :基于
zero-copy
实现,直接响应,无需堆内内存,性能更优。TODO :此处等对zero-copy有研究,再补充一些。
- 第 255 至 263 行 :方式一 :调用
- 第 284 至 300 行 :拉取不到消息,当满足条件 (
Broker
允许挂起 && 请求要求挂起),执行挂起请求。详细解析见:PullRequestHoldService。 - 第 304 至 328 行 :TODO :此处等对
tools
模块研究后再补充。 - 第 339 至 346 :持久化消费进度,当满足 (
Broker
非主 && 请求要求持久化进度)。详细解析见:更新消费进度。
MessageStore#getMessage(...)
// ... 文章过长,超过微信限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-first/ 阅读
- 说明 :根据 消息分组(
group
) + 主题(Topic
) + 队列编号(queueId
) + 队列位置(offset
) + 订阅信息(subscriptionData
) 获取 指定条数(maxMsgNums
) 消息(Message
)。 - 第 14 至 18 行 :判断
Store
是否处于关闭状态,若关闭,则无法获取消息。 - 第 19 至 23 行 :判断当前运行状态是否可读,若不可读,则无法获取消息。
- 第 37 行 :根据 主题(
Topic
) + 队列编号(queueId
) 获取 消息队列(ConsumeQueue
)。#findConsumeQueue(...)
:第 159 至 196 行。
- 第 43 至 58 行 :各种队列位置(
offset
) 无法读取消息,并针对对应的情况,计算下一次Client
队列拉取位置。- 第 43 至 45 行 :消息队列无消息。
- 第 46 至 48 行 :查询的消息队列位置(
offset
) 太小。 - 第 49 至 51 行 :查询的消息队列位置(
offset
) 恰好等于 消息队列最大的队列位置。该情况是正常现象,相当于查询最新的消息。 - 第 52 至 58 行 :查询的消息队列位置(
offset
) 超过过多。 #nextOffsetCorrection(...)
:第 198 至 212 行。
- 第 61 行 :根据 消费队列位置(
offset
) 获取 对应的MappedFile
。 - 第 72 至 128 行 :循环获取
消息位置信息
。#checkInDiskByCommitOffset(...)
:第 214 至 224 行。#isTheBatchFull(...)
:第 226 至 264 行。- 第 74 至 76 行 :读取每一个
消息位置信息
。 - 第 79 至 83 行 :当
offsetPy
小于nextPhyFileStartOffset
时,意味着对 应的Message
已经移除,所以直接continue,直到可读取的Message
。 - 第 84 至 90 行 :判断是否已经获得足够的消息。
- 第 92 行 :判断消息是否符合条件。详细解析见:DefaultMessageFilter#isMessageMatched(...)。
- 第 94 行 :从
CommitLog
获取对应 消息的MappedByteBuffer
。 - 第 95 至 99 行 :获取
消息MappedByteBuffer
成功。 - 第 100 至 106 行 :获取
消息MappedByteBuffer
失败。从CommitLog
无法读取到消息,说明 该消息对应的文件(MappedFile
) 已经删除,此时计算下一个MappedFile
的起始位置。该逻辑需要配合(第 79 至 83 行)一起理解。 - 第 117 至 120 行 :统计剩余可拉取消息字节数。
- 第 123 行 :计算下次拉取消息的消息队列编号。
- 第 124 至 128 行 :根据剩余可拉取消息字节数与内存判断是否建议读取从节点。
- 第 130 行 :释放
bufferConsumeQueue
对MappedFile
的指向。此处MappedFile
是ConsumeQueue
里的文件,不是CommitLog
下的文件。 - 第 133 至 136 行 :获得消费队列位置(
offset
) 获取 对应的MappedFile
为空,计算ConsumeQueue
从offset
开始的下一个MappedFile
对应的位置。 - 第 143 至 150 行 :记录统计信息:消耗时间、拉取到消息/未拉取到消息次数。
- 第 151 至 156 行 :设置返回结果并返回。
DefaultMessageFilter#isMessageMatched(...)
1: public class DefaultMessageFilter implements MessageFilter {
2:
3: @Override
4: public boolean isMessageMatched(SubscriptionData subscriptionData, Long tagsCode) {
5: // 消息tagsCode 空
6: if (tagsCode == null) {
7: return true;
8: }
9: // 订阅数据 空
10: if (null == subscriptionData) {
11: return true;
12: }
13: // classFilter
14: if (subscriptionData.isClassFilterMode())
15: return true;
16: // 订阅表达式 全匹配
17: if (subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL)) {
18: return true;
19: }
20: // 订阅数据code数组 是否包含 消息tagsCode
21: return subscriptionData.getCodeSet().contains(tagsCode.intValue());
22: }
23:
24: }
- 说明 :消息过滤器默认实现。
PullRequestHoldService
// ... 文章过长,超过微信限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-first/ 阅读
PullRequestHoldService
说明 :拉取消息请求挂起维护线程服务。- 当拉取消息请求获得不了消息时,则会将请求进行挂起,添加到该服务。
- 当有符合条件信息时 或 挂起超时时,重新执行获取消息逻辑。
#suspendPullRequest(...)
说明 :添加拉取消息挂起请求到集合(pullRequestTable
)。#run(...)
说明 :定时检查挂起请求是否有需要通知重新拉取消息并进行通知。- 第 65 至 70 行 :根据
长轮训
or短轮训
设置不同的等待时间。 - 第 71 至 77 行 :检查挂起请求是否有需要通知的。
- 第 65 至 70 行 :根据
#checkHoldRequest(...)
说明 :遍历挂起请求,检查是否有需要通知的。#notifyMessageArriving(...)
说明 :检查指定队列是否有需要通知的请求。- 第 139 至 143 行 :如果
maxOffset
过小,重新获取一次最新的。 - 第 144 至 155 行 :有新的匹配消息,唤醒请求,即再次拉取消息。
- 第 156 至 165 行 :超过挂起时间,唤醒请求,即再次拉取消息。
- 第 148 || 159 行 :唤醒请求,再次拉取消息。原先担心拉取消息时间过长,导致影响整个挂起请求的遍历,后面查看
#executeRequestWhenWakeup(...)
,实际是丢到线程池进行一步的消息拉取,不会有性能上的问题。详细解析见:PullMessageProcessor#executeRequestWhenWakeup(...)。 - 第 166 至 172 行 :不符合唤醒的请求重新添加到集合(
pullRequestTable
)。
- 第 139 至 143 行 :如果
PullMessageProcessor#executeRequestWhenWakeup(...)
1: public void executeRequestWhenWakeup(final Channel channel, final RemotingCommand request) throws RemotingCommandException {
2: Runnable run = new Runnable() {
3: @Override
4: public void run() {
5: try {
6: // 调用拉取请求。本次调用,设置不挂起请求。
7: final RemotingCommand response = PullMessageProcessor.this.proce***equest(channel, request, false);
8:
9: if (response != null) {
10: response.setOpaque(request.getOpaque());
11: response.markResponseType();
12: try {
13: channel.writeAndFlush(response).addListener(new ChannelFutureListener() {
14: @Override
15: public void operationComplete(ChannelFuture future) throws Exception {
16: if (!future.isSuccess()) {
17: LOG.error("Proce***equestWrapper response to {} failed", future.channel().remoteAddress(), future.cause());
18: LOG.error(request.toString());
19: LOG.error(response.toString());
20: }
21: }
22: });
23: } catch (Throwable e) {
24: LOG.error("Proce***equestWrapper process request over, but response failed", e);
25: LOG.error(request.toString());
26: LOG.error(response.toString());
27: }
28: }
29: } catch (RemotingCommandException e1) {
30: LOG.error("ExecuteRequestWhenWakeup run", e1);
31: }
32: }
33: };
34: // 提交拉取请求到线程池
35: this.brokerController.getPullMessageExecutor().submit(new RequestTask(run, channel, request));
36: }
- 说明 :执行请求唤醒,即再次拉取消息。该方法调用线程池,因此,不会阻塞。
- 第 7 行 :调用拉取消息请求。本次调用,设置即使请求不到消息,也不挂起请求。如果不设置,请求可能被无限挂起,被
Broker
无限循环。 - 第 35 行 :提交拉取消息请求到线程池。
5、Broker 提供[更新消费进度]接口
Yunai-MacdeMacBook-Pro-2:config yunai$ pwd
/Users/yunai/store/config
Yunai-MacdeMacBook-Pro-2:config yunai$ ls -ls
total 40
8 -rw-r--r-- 1 yunai staff 21 4 28 16:58 consumerOffset.json
8 -rw-r--r-- 1 yunai staff 21 4 28 16:58 consumerOffset.json.bak
8 -rw-r--r-- 1 yunai staff 21 4 28 16:58 delayOffset.json
8 -rw-r--r-- 1 yunai staff 21 4 28 16:58 delayOffset.json.bak
8 -rw-r--r-- 1 yunai staff 1401 4 27 21:51 topics.json
Yunai-MacdeMacBook-Pro-2:config yunai$ cat consumerOffset.json
{
"offsetTable":{
"%RETRY%please_rename_unique_group_name_4@please_rename_unique_group_name_4":{0:0
},
"TopicRead3@please_rename_unique_group_name_4":{1:5
}
}
}
consumerOffset.json
:消费进度存储文件。consumerOffset.json.bak
:消费进度存储文件备份。- 每次写入
consumerOffset.json
,将原内容备份到consumerOffset.json.bak
。实现见:MixAll#string2File(...)。
BrokerController#initialize(...)
1:this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
2: @Override
3: public void run() {
4: try {
5: BrokerController.this.consumerOffsetManager.persist();
6: } catch (Throwable e) {
7: log.error("schedule persist consumerOffset error.", e);
8: }
9: }
10:}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
- 说明 :每 5s 执行一次持久化逻辑。
ConfigManager
// ... 文章过长,超过微信限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-first/ 阅
MixAll#string2File(...)
// ... 文章过长,超过微信限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-first/ 阅读
ConsumerOffsetManager
// ... 文章过长,超过微信限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-first/ 阅读
- 说明 :消费进度管理器。
6、Broker 提供[发回消息]接口
大部分逻辑和 Broker
提供[接收消息]接口 类似,可以先看下相关内容。
SendMessageProcessor#consumerSendMsgBack(...)
// ... 文章过长,超过微信限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-first/ 阅读
- 说明 :当
Consumer
消费某条消息失败时,会调用该接口发回消息。Broker
会存储发回的消息。这样,下次Consumer
拉取该消息,能够从CommitLog
和ConsumeQueue
顺序读取。 - [x] 因为大多数逻辑和
Broker
接收普通消息 很相似,时候TODO
标记成独有的逻辑。 - 第 4 至 7 行 :初始化响应。
- [x] 第 9 至 20 行 :Hook逻辑。
- [x] 第22 至 30 行 :判断消费分组是否存在。
- 第 32 至 37 行 :检查
Broker
是否有写入权限。 - [x] 第 39 至 44 行 :检查重试队列数是否大于0。
- 第 47 行 :计算 retry topic。
- [x] 第 50 行 :随机分配队列编号,依赖
retryQueueNums
。 - [x] 第 52 至 56 行 :计算
sysFlag
。 - 第 58 至 72 行 :获取
TopicConfig
。如果不存在,则创建。 - [x] 第 74 至 80 行 :查询消息。若不存在,返回异常错误。
- [x] 第 82 至 86 行 :设置
retryTopic
到消息拓展属性。 - [x] 第 89 行 :设置消息不等待存储完成。
当
Broker
刷盘方式为同步,会导致同步落盘不能批量提交,这样会不会存在问题?有知道的同学麻烦告知下。- [x] 第 91 至 116 行 :处理
delayLevel
。 - 第 118 至 131 行 :创建
MessageExtBrokerInner
。 - [x] 第 133 至 135 行 :设置原始消息编号到拓展属性。
- 第 137 至 161 行 :添加消息。
7、结尾
感谢同学们对本文的阅读、收藏、点赞。
如果解析存在问题或者表达误解的,表示抱歉。如果方便的话,可以一起沟通下。让我们来一场 1 :1 交流(搞基)。
再次表示十分感谢。
标签:...,MappedFile,队列,DefaultMessageStore,拉取,源码,消息,ConsumeQueue,Message 来源: https://blog.51cto.com/15079076/2595016