编程语言
首页 > 编程语言> > RocketMQ 源码分析 —— Message 拉取与消费(上)

RocketMQ 源码分析 —— Message 拉取与消费(上)

作者:互联网

摘要: 原创出处 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-first/ 「芋道源码」欢迎转载,保留摘要,谢谢!


阅读源码最好的方式,是使用 IDEA 进行调试 RocketMQ 源码,不然会一脸懵逼。

胖友可以点击「芋道源码」扫码关注,回复 git001 关键字
获得艿艿添加了中文注释的 RocketMQ 源码地址。

阅读源码很孤单,加入源码交流群,一起坚持!

1、概述

本章主要解析 消费 逻辑涉及到的源码。因为篇幅较长,分成上下两篇:

  1. 上篇:Broker 相关源码。
  2. 下篇:Consumer 相关源码。

本文即是上篇。


ok,先看第一张关于消费逻辑的图:

消费逻辑图

再看消费逻辑精简的顺序图(实际情况会略有差别):

Consumer&Broker消费精简图.png

2、ConsumeQueue 结构

ConsumeQueueMappedFileQueueMappedFile 的关系如下:

ConsumeQueue : MappedFileQueue : MappedFile = 1 : 1 : N。

反应到系统文件如下:

Yunai-MacdeMacBook-Pro-2:consumequeue yunai$ pwd
/Users/yunai/store/consumequeue
Yunai-MacdeMacBook-Pro-2:consumequeue yunai$ cd TopicRead3/
Yunai-MacdeMacBook-Pro-2:TopicRead3 yunai$ ls -ls
total 0
0 drwxr-xr-x  3 yunai  staff  102  4 27 21:52 0
0 drwxr-xr-x  3 yunai  staff  102  4 27 21:55 1
0 drwxr-xr-x  3 yunai  staff  102  4 27 21:55 2
0 drwxr-xr-x  3 yunai  staff  102  4 27 21:55 3
Yunai-MacdeMacBook-Pro-2:TopicRead3 yunai$ cd 0/
Yunai-MacdeMacBook-Pro-2:0 yunai$ ls -ls
total 11720
11720 -rw-r--r--  1 yunai  staff  6000000  4 27 21:55 00000000000000000000

ConsumeQueueMappedFileQueueMappedFile 的定义如下:

ConsumeQueue 存储在 MappedFile 的内容必须大小是 20B( ConsumeQueue.CQ_STORE_UNIT_SIZE ),有两种内容类型:

  1. MESSAGE_POSITION_INFO :消息位置信息。
  2. BLANK : 文件前置空白占位。当历史 Message 被删除时,需要用 BLANK占位被删除的消息。

MESSAGE_POSITION_INFO 在 ConsumeQueue 存储结构:

第几位字段说明数据类型字节数
1offset消息 CommitLog 存储位置Long8
2size消息长度Int4
3tagsCode消息tagsCodeLong8

BLANK 在 ConsumeQueue 存储结构:

第几位字段说明数据类型字节数
1
0Long8
2
Integer.MAX_VALUEInt4
3
0Long8

3、ConsumeQueue 存储

CommitLog重放ConsumeQueue图

主要有两个组件:

ReputMessageService

ReputMessageService顺序图

  1: class ReputMessageService extends ServiceThread {
  2: 
  3:     /**
  4:      * 开始重放消息的CommitLog物理位置
  5:      */
  6:     private volatile long reputFromOffset = 0;
  7: 
  8:     public long getReputFromOffset() {
  9:         return reputFromOffset;
 10:     }
 11: 
 12:     public void setReputFromOffset(long reputFromOffset) {
 13:         this.reputFromOffset = reputFromOffset;
 14:     }
 15: 
 16:     @Override
 17:     public void shutdown() {
 18:         for (int i = 0; i < 50 && this.isCommitLogAvailable(); i++) {
 19:             try {
 20:                 Thread.sleep(100);
 21:             } catch (InterruptedException ignored) {
 22:             }
 23:         }
 24: 
 25:         if (this.isCommitLogAvailable()) {
 26:             log.warn("shutdown ReputMessageService, but commitlog have not finish to be dispatched, CL: {} reputFromOffset: {}",
 27:                 DefaultMessageStore.this.commitLog.getMaxOffset(), this.reputFromOffset);
 28:         }
 29: 
 30:         super.shutdown();
 31:     }
 32: 
 33:     /**
 34:      * 剩余需要重放消息字节数
 35:      *
 36:      * @return 字节数
 37:      */
 38:     public long behind() {
 39:         return DefaultMessageStore.this.commitLog.getMaxOffset() - this.reputFromOffset;
 40:     }
 41: 
 42:     /**
 43:      * 是否commitLog需要重放消息
 44:      *
 45:      * @return 是否
 46:      */
 47:     private boolean isCommitLogAvailable() {
 48:         return this.reputFromOffset < DefaultMessageStore.this.commitLog.getMaxOffset();
 49:     }
 50: 
 51:     private void doReput() {
 52:         for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
 53: 
 54:             // TODO 疑问:这个是啥
 55:             if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable() //
 56:                 && this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {
 57:                 break;
 58:             }
 59: 
 60:             // 获取从reputFromOffset开始的commitLog对应的MappeFile对应的MappedByteBuffer
 61:             SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
 62:             if (result != null) {
 63:                 try {
 64:                     this.reputFromOffset = result.getStartOffset();
 65: 
 66:                     // 遍历MappedByteBuffer
 67:                     for (int readSize = 0; readSize < result.getSize() && doNext; ) {
 68:                         // 生成重放消息重放调度请求
 69:                         DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
 70:                         int size = dispatchRequest.getMsgSize(); // 消息长度
 71:                         // 根据请求的结果处理
 72:                         if (dispatchRequest.isSuccess()) { // 读取成功
 73:                             if (size > 0) { // 读取Message
 74:                                 DefaultMessageStore.this.doDispatch(dispatchRequest);
 75:                                 // 通知有新消息
 76:                                 if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()
 77:                                     && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {
 78:                                     DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
 79:                                         dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
 80:                                         dispatchRequest.getTagsCode());
 81:                                 }
 82:                                 // FIXED BUG By shijia
 83:                                 this.reputFromOffset += size;
 84:                                 readSize += size;
 85:                                 // 统计
 86:                                 if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
 87:                                     DefaultMessageStore.this.storeStatsService
 88:                                         .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet();
 89:                                     DefaultMessageStore.this.storeStatsService
 90:                                         .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
 91:                                         .addAndGet(dispatchRequest.getMsgSize());
 92:                                 }
 93:                             } else if (size == 0) { // 读取到MappedFile文件尾
 94:                                 this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
 95:                                 readSize = result.getSize();
 96:                             }
 97:                         } else if (!dispatchRequest.isSuccess()) { // 读取失败
 98:                             if (size > 0) { // 读取到Message却不是Message
 99:                                 log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset);
100:                                 this.reputFromOffset += size;
101:                             } else { // 读取到Blank却不是Blank
102:                                 doNext = false;
103:                                 if (DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) {
104:                                     log.error("[BUG]the master dispatch message to consume queue error, COMMITLOG OFFSET: {}",
105:                                         this.reputFromOffset);
106: 
107:                                     this.reputFromOffset += result.getSize() - readSize;
108:                                 }
109:                             }
110:                         }
111:                     }
112:                 } finally {
113:                     result.release();
114:                 }
115:             } else {
116:                 doNext = false;
117:             }
118:         }
119:     }
120: 
121:     @Override
122:     public void run() {
123:         DefaultMessageStore.log.info(this.getServiceName() + " service started");
124: 
125:         while (!this.isStopped()) {
126:             try {
127:                 Thread.sleep(1);
128:                 this.doReput();
129:             } catch (Exception e) {
130:                 DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
131:             }
132:         }
133: 
134:         DefaultMessageStore.log.info(this.getServiceName() + " service end");
135:     }
136: 
137:     @Override
138:     public String getServiceName() {
139:         return ReputMessageService.class.getSimpleName();
140:     }
141: 
142: }

DefaultMessageStore#doDispatch(...)

  1: /**
  2:  * 执行调度请求
  3:  * 1. 非事务消息 或 事务提交消息 建立 消息位置信息 到 ConsumeQueue
  4:  * 2. 建立 索引信息 到 IndexFile
  5:  *
  6:  * @param req 调度请求
  7:  */
  8: public void doDispatch(DispatchRequest req) {
  9:     // 非事务消息 或 事务提交消息 建立 消息位置信息 到 ConsumeQueue
 10:     final int tranType = MessageSysFlag.getTransactionValue(req.getSysFlag());
 11:     switch (tranType) {
 12:         case MessageSysFlag.TRANSACTION_NOT_TYPE:
 13:         case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
 14:             DefaultMessageStore.this.putMessagePositionInfo(req.getTopic(), req.getQueueId(), req.getCommitLogOffset(), req.getMsgSize(),
 15:                 req.getTagsCode(), req.getStoreTimestamp(), req.getConsumeQueueOffset());
 16:             break;
 17:         case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
 18:         case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
 19:             break;
 20:     }
 21:     // 建立 索引信息 到 IndexFile
 22:     if (DefaultMessageStore.this.getMessageStoreConfig().isMessageIndexEnable()) {
 23:         DefaultMessageStore.this.indexService.buildIndex(req);
 24:     }
 25: }
 26: 
 27: /**
 28:  * 建立 消息位置信息 到 ConsumeQueue
 29:  *
 30:  * @param topic 主题
 31:  * @param queueId 队列编号
 32:  * @param offset commitLog存储位置
 33:  * @param size 消息长度
 34:  * @param tagsCode 消息tagsCode
 35:  * @param storeTimestamp 存储时间
 36:  * @param logicOffset 队列位置
 37:  */
 38: public void putMessagePositionInfo(String topic, int queueId, long offset, int size, long tagsCode, long storeTimestamp,
 39:     long logicOffset) {
 40:     ConsumeQueue cq = this.findConsumeQueue(topic, queueId);
 41:     cq.putMessagePositionInfoWrapper(offset, size, tagsCode, storeTimestamp, logicOffset);
 42: }

ConsumeQueue#putMessagePositionInfoWrapper(...)

  1: /**
  2:  * 添加位置信息封装
  3:  *
  4:  * @param offset commitLog存储位置
  5:  * @param size 消息长度
  6:  * @param tagsCode 消息tagsCode
  7:  * @param storeTimestamp 消息存储时间
  8:  * @param logicOffset 队列位置
  9:  */
 10: public void putMessagePositionInfoWrapper(long offset, int size, long tagsCode, long storeTimestamp,
 11:     long logicOffset) {
 12:     final int maxRetries = 30;
 13:     boolean canWrite = this.defaultMessageStore.getRunningFlags().isWriteable();
 14:     // 多次循环写,直到成功
 15:     for (int i = 0; i < maxRetries && canWrite; i++) {
 16:         // 调用添加位置信息
 17:         boolean result = this.putMessagePositionInfo(offset, size, tagsCode, logicOffset);
 18:         if (result) {
 19:             // 添加成功,使用消息存储时间 作为 存储check point。
 20:             this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(storeTimestamp);
 21:             return;
 22:         } else {
 23:             // XXX: warn and notify me
 24:             log.warn("[BUG]put commit log position info to " + topic + ":" + queueId + " " + offset
 25:                 + " failed, retry " + i + " times");
 26: 
 27:             try {
 28:                 Thread.sleep(1000);
 29:             } catch (InterruptedException e) {
 30:                 log.warn("", e);
 31:             }
 32:         }
 33:     }
 34: 
 35:     // XXX: warn and notify me 设置异常不可写入
 36:     log.error("[BUG]consume queue can not write, {} {}", this.topic, this.queueId);
 37:     this.defaultMessageStore.getRunningFlags().makeLogicsQueueError();
 38: }
 39: 
 40: /**
 41:  * 添加位置信息,并返回添加是否成功
 42:  *
 43:  * @param offset commitLog存储位置
 44:  * @param size 消息长度
 45:  * @param tagsCode 消息tagsCode
 46:  * @param cqOffset 队列位置
 47:  * @return 是否成功
 48:  */
 49: private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
 50:     final long cqOffset) {
 51:     // 如果已经重放过,直接返回成功
 52:     if (offset <= this.maxPhysicOffset) {
 53:         return true;
 54:     }
 55:     // 写入位置信息到byteBuffer
 56:     this.byteBufferIndex.flip();
 57:     this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
 58:     this.byteBufferIndex.putLong(offset);
 59:     this.byteBufferIndex.putInt(size);
 60:     this.byteBufferIndex.putLong(tagsCode);
 61:     // 计算consumeQueue存储位置,并获得对应的MappedFile
 62:     final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;
 63:     MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
 64:     if (mappedFile != null) {
 65:         // 当是ConsumeQueue第一个MappedFile && 队列位置非第一个 && MappedFile未写入内容,则填充前置空白占位
 66:         if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) { // TODO 疑问:为啥这个操作。目前能够想象到的是,一些老的消息很久没发送,突然发送,这个时候刚好满足。
 67:             this.minLogicOffset = expectLogicOffset;
 68:             this.mappedFileQueue.setFlushedWhere(expectLogicOffset);
 69:             this.mappedFileQueue.setCommittedWhere(expectLogicOffset);
 70:             this.fillPreBlank(mappedFile, expectLogicOffset);
 71:             log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " "
 72:                 + mappedFile.getWrotePosition());
 73:         }
 74:         // 校验consumeQueue存储位置是否合法。TODO 如果不合法,继续写入会不会有问题?
 75:         if (cqOffset != 0) {
 76:             long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();
 77:             if (expectLogicOffset != currentLogicOffset) {
 78:                 LOG_ERROR.warn(
 79:                     "[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
 80:                     expectLogicOffset,
 81:                     currentLogicOffset,
 82:                     this.topic,
 83:                     this.queueId,
 84:                     expectLogicOffset - currentLogicOffset
 85:                 );
 86:             }
 87:         }
 88:         // 设置commitLog重放消息到ConsumeQueue位置。
 89:         this.maxPhysicOffset = offset;
 90:         // 插入mappedFile
 91:         return mappedFile.appendMessage(this.byteBufferIndex.array());
 92:     }
 93:     return false;
 94: }
 95: 
 96: /**
 97:  * 填充前置空白占位
 98:  *
 99:  * @param mappedFile MappedFile
100:  * @param untilWhere consumeQueue存储位置
101:  */
102: private void fillPreBlank(final MappedFile mappedFile, final long untilWhere) {
103:     // 写入前置空白占位到byteBuffer
104:     ByteBuffer byteBuffer = ByteBuffer.allocate(CQ_STORE_UNIT_SIZE);
105:     byteBuffer.putLong(0L);
106:     byteBuffer.putInt(Integer.MAX_VALUE);
107:     byteBuffer.putLong(0L);
108:     // 循环填空
109:     int until = (int) (untilWhere % this.mappedFileQueue.getMappedFileSize());
110:     for (int i = 0; i < until; i += CQ_STORE_UNIT_SIZE) {
111:         mappedFile.appendMessage(byteBuffer.array());
112:     }
113: }

FlushConsumeQueueService

  1: class FlushConsumeQueueService extends ServiceThread {
  2:     private static final int RETRY_TIMES_OVER = 3;
  3:     /**
  4:      * 最后flush时间戳
  5:      */
  6:     private long lastFlushTimestamp = 0;
  7: 
  8:     private void doFlush(int retryTimes) {
  9:         int flushConsumeQueueLeastPages = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueLeastPages();
 10: 
 11:         // retryTimes == RETRY_TIMES_OVER时,进行强制flush。主要用于shutdown时。
 12:         if (retryTimes == RETRY_TIMES_OVER) {
 13:             flushConsumeQueueLeastPages = 0;
 14:         }
 15:         // 当时间满足flushConsumeQueueThoroughInterval时,即使写入的数量不足flushConsumeQueueLeastPages,也进行flush
 16:         long logicsMsgTimestamp = 0;
 17:         int flushConsumeQueueThoroughInterval = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueThoroughInterval();
 18:         long currentTimeMillis = System.currentTimeMillis();
 19:         if (currentTimeMillis >= (this.lastFlushTimestamp + flushConsumeQueueThoroughInterval)) {
 20:             this.lastFlushTimestamp = currentTimeMillis;
 21:             flushConsumeQueueLeastPages = 0;
 22:             logicsMsgTimestamp = DefaultMessageStore.this.getStoreCheckpoint().getLogicsMsgTimestamp();
 23:         }
 24:         // flush消费队列
 25:         ConcurrentHashMap<String, ConcurrentHashMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable;
 26:         for (ConcurrentHashMap<Integer, ConsumeQueue> maps : tables.values()) {
 27:             for (ConsumeQueue cq : maps.values()) {
 28:                 boolean result = false;
 29:                 for (int i = 0; i < retryTimes && !result; i++) {
 30:                     result = cq.flush(flushConsumeQueueLeastPages);
 31:                 }
 32:             }
 33:         }
 34:         // flush 存储 check point
 35:         if (0 == flushConsumeQueueLeastPages) {
 36:             if (logicsMsgTimestamp > 0) {
 37:                 DefaultMessageStore.this.getStoreCheckpoint().setLogicsMsgTimestamp(logicsMsgTimestamp);
 38:             }
 39:             DefaultMessageStore.this.getStoreCheckpoint().flush();
 40:         }
 41:     }
 42: 
 43:     public void run() {
 44:         DefaultMessageStore.log.info(this.getServiceName() + " service started");
 45: 
 46:         while (!this.isStopped()) {
 47:             try {
 48:                 int interval = DefaultMessageStore.this.getMessageStoreConfig().getFlushIntervalConsumeQueue();
 49:                 this.waitForRunning(interval);
 50:                 this.doFlush(1);
 51:             } catch (Exception e) {
 52:                 DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
 53:             }
 54:         }
 55: 
 56:         this.doFlush(RETRY_TIMES_OVER);
 57: 
 58:         DefaultMessageStore.log.info(this.getServiceName() + " service end");
 59:     }
 60: 
 61:     @Override
 62:     public String getServiceName() {
 63:         return FlushConsumeQueueService.class.getSimpleName();
 64:     }
 65: 
 66:     @Override
 67:     public long getJointime() {
 68:         return 1000 * 60;
 69:     }
 70: }

4、Broker 提供[拉取消息]接口

PullMessageRequestHeader

  1: public class PullMessageRequestHeader implements CommandCustomHeader {
  2:     /**
  3:      * 消费者分组
  4:      */
  5:     @CFNotNull
  6:     private String consumerGroup;
  7:     /**
  8:      * Topic
  9:      */
 10:     @CFNotNull
 11:     private String topic;
 12:     /**
 13:      * 队列编号
 14:      */
 15:     @CFNotNull
 16:     private Integer queueId;
 17:     /**
 18:      * 队列开始位置
 19:      */
 20:     @CFNotNull
 21:     private Long queueOffset;
 22:     /**
 23:      * 消息数量
 24:      */
 25:     @CFNotNull
 26:     private Integer maxMsgNums;
 27:     /**
 28:      * 系统标识
 29:      */
 30:     @CFNotNull
 31:     private Integer sysFlag;
 32:     /**
 33:      * 提交消费进度位置
 34:      */
 35:     @CFNotNull
 36:     private Long commitOffset;
 37:     /**
 38:      * 挂起超时时间
 39:      */
 40:     @CFNotNull
 41:     private Long suspendTimeoutMillis;
 42:     /**
 43:      * 订阅表达式
 44:      */
 45:     @CFNullable
 46:     private String subscription;
 47:     /**
 48:      * 订阅版本号
 49:      */
 50:     @CFNotNull
 51:     private Long subVersion;
 52: }

PullMessageProcessor#proce***equest(...)

// ... 文章过长,超过微信限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-first/ 阅读

MessageStore#getMessage(...)

// ... 文章过长,超过微信限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-first/ 阅读

DefaultMessageFilter#isMessageMatched(...)

  1: public class DefaultMessageFilter implements MessageFilter {
  2: 
  3:     @Override
  4:     public boolean isMessageMatched(SubscriptionData subscriptionData, Long tagsCode) {
  5:         // 消息tagsCode 空
  6:         if (tagsCode == null) {
  7:             return true;
  8:         }
  9:         // 订阅数据 空
 10:         if (null == subscriptionData) {
 11:             return true;
 12:         }
 13:         // classFilter
 14:         if (subscriptionData.isClassFilterMode())
 15:             return true;
 16:         // 订阅表达式 全匹配
 17:         if (subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL)) {
 18:             return true;
 19:         }
 20:         // 订阅数据code数组 是否包含 消息tagsCode
 21:         return subscriptionData.getCodeSet().contains(tagsCode.intValue());
 22:     }
 23: 
 24: }

PullRequestHoldService

// ... 文章过长,超过微信限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-first/ 阅读

PullMessageProcessor#executeRequestWhenWakeup(...)

  1: public void executeRequestWhenWakeup(final Channel channel, final RemotingCommand request) throws RemotingCommandException {
  2:     Runnable run = new Runnable() {
  3:         @Override
  4:         public void run() {
  5:             try {
  6:                 // 调用拉取请求。本次调用,设置不挂起请求。
  7:                 final RemotingCommand response = PullMessageProcessor.this.proce***equest(channel, request, false);
  8: 
  9:                 if (response != null) {
 10:                     response.setOpaque(request.getOpaque());
 11:                     response.markResponseType();
 12:                     try {
 13:                         channel.writeAndFlush(response).addListener(new ChannelFutureListener() {
 14:                             @Override
 15:                             public void operationComplete(ChannelFuture future) throws Exception {
 16:                                 if (!future.isSuccess()) {
 17:                                     LOG.error("Proce***equestWrapper response to {} failed", future.channel().remoteAddress(), future.cause());
 18:                                     LOG.error(request.toString());
 19:                                     LOG.error(response.toString());
 20:                                 }
 21:                             }
 22:                         });
 23:                     } catch (Throwable e) {
 24:                         LOG.error("Proce***equestWrapper process request over, but response failed", e);
 25:                         LOG.error(request.toString());
 26:                         LOG.error(response.toString());
 27:                     }
 28:                 }
 29:             } catch (RemotingCommandException e1) {
 30:                 LOG.error("ExecuteRequestWhenWakeup run", e1);
 31:             }
 32:         }
 33:     };
 34:     // 提交拉取请求到线程池
 35:     this.brokerController.getPullMessageExecutor().submit(new RequestTask(run, channel, request));
 36: }

5、Broker 提供[更新消费进度]接口

Yunai-MacdeMacBook-Pro-2:config yunai$ pwd
/Users/yunai/store/config
Yunai-MacdeMacBook-Pro-2:config yunai$ ls -ls
total 40
8 -rw-r--r--  1 yunai  staff    21  4 28 16:58 consumerOffset.json
8 -rw-r--r--  1 yunai  staff    21  4 28 16:58 consumerOffset.json.bak
8 -rw-r--r--  1 yunai  staff    21  4 28 16:58 delayOffset.json
8 -rw-r--r--  1 yunai  staff    21  4 28 16:58 delayOffset.json.bak
8 -rw-r--r--  1 yunai  staff  1401  4 27 21:51 topics.json
Yunai-MacdeMacBook-Pro-2:config yunai$ cat consumerOffset.json
{
 "offsetTable":{
  "%RETRY%please_rename_unique_group_name_4@please_rename_unique_group_name_4":{0:0
  },
  "TopicRead3@please_rename_unique_group_name_4":{1:5
  }
 }
}

BrokerController#initialize(...)

  1:this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
  2:    @Override
  3:    public void run() {
  4:        try {
  5:            BrokerController.this.consumerOffsetManager.persist();
  6:        } catch (Throwable e) {
  7:            log.error("schedule persist consumerOffset error.", e);
  8:        }
  9:    }
 10:}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);

ConfigManager

// ... 文章过长,超过微信限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-first/ 阅

MixAll#string2File(...)

// ... 文章过长,超过微信限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-first/ 阅读

ConsumerOffsetManager

// ... 文章过长,超过微信限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-first/ 阅读

6、Broker 提供[发回消息]接口

大部分逻辑和 Broker 提供[接收消息]接口 类似,可以先看下相关内容。

SendMessageProcessor#consumerSendMsgBack(...)

// ... 文章过长,超过微信限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-first/ 阅读

7、结尾

感谢同学们对本文的阅读、收藏、点赞。

如果解析存在问题或者表达误解的,表示抱歉。如果方便的话,可以一起沟通下。让我们来一场 1 :1 交流(搞基)。

再次表示十分感谢。


标签:...,MappedFile,队列,DefaultMessageStore,拉取,源码,消息,ConsumeQueue,Message
来源: https://blog.51cto.com/15079076/2595016