ConsumeQueue构建过程分析
作者:互联网
1. 前言
理论上来说,RocketMQ只要有CommitLog文件就可以正常运行了,那为何还要维护ConsumeQueue文件呢?
ConsumeQueue是消费队列,引入它的目的是为了提高消费者的消费速度。毕竟RocketMQ是基于Topic主题订阅模式的,消费者往往只关心自己订阅的消息,如果每次消费都从CommitLog文件中检索数据,无疑性能是非常差的。有了ConsumeQueue,消费者就可以根据消息在CommitLog文件中的偏移量快速定位到消息进行消费了。
之前的文章已经说过,Broker会将客户端发送的消息写入CommitLog文件,持久化存储。但是整个流程并没有涉及到ConsumeQueue文件的操作,那么ConsumeQueue文件是如何被构建的呢?
2. ReputMessageService
ReputMessageService是「消息重放服务」,请允许我这么命名。Broker在启动的时候,会开启一个线程每毫秒执行一次doReput()
方法。
它的目的就是对写入CommitLog文件里的消息进行「重放」,它有一个属性reputFromOffset
,记录的是消息重放的偏移量,MessageStore启动的时候会对其进行赋值。
它的工作原理是,根据重放偏移量reputFromOffset
去读取CommitLog里的待重放的消息,并构建DispatchRequest对象,然后将DispatchRequest对象分发出去,交给各个CommitLogDispatcher处理。
MessageStore维护了CommitLogDispatcher对象集合,目前只有三个处理器:
- CommitLogDispatcherBuildConsumeQueue:构建ConsumeQueue索引。
- CommitLogDispatcherBuildIndex:构建Index索引。
- CommitLogDispatcherCalcBitMap:构建布隆过滤器,加速SQL92过滤效率。
本篇文章主要分析CommitLogDispatcherBuildConsumeQueue,看看RocketMQ是如何构建ConsumeQueue的。
3. 源码分析
笔者画了一下ConsumeQueue构建过程的时序图,整个构建过程并不算复杂。
1.doReput()
方法1毫秒执行一次,它的方法体是一个for循环,只要reputFromOffset没有到达CommitLog文件的最大偏移量,就会一直继续重放消息。
private boolean isCommitLogAvailable() {
return this.reputFromOffset < DefaultMessageStore.this.commitLog.getMaxOffset();
}
它首先会根据reputFromOffset去CommitLog文件中截取一段ByteBuffer,这个缓冲区里就是待重放的消息数据。
public SelectMappedBufferResult getData(final long offset, final boolean returnFirstOnNotFound) {
// CommitLog单个文件的大小
int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog();
// 根据索引构建进度找到等待构建的文件,文件名就是起始Offset,遍历文件即可找到
MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, returnFirstOnNotFound);
if (mappedFile != null) {
// 计算Offset在当前文件的读指针位置
int pos = (int) (offset % mappedFileSize);
/**
* 基于MappedFile的MappedByteBuffer派生出一个ByteBuffer对象
* 共享同一块内存,但是拥有自己的指针
*/
SelectMappedBufferResult result = mappedFile.selectMappedBuffer(pos);
return result;
}
return null;
}
SelectMappedBufferResult类属性如下:
// 起始偏移量
private final long startOffset;
// 缓冲区
private final ByteBuffer byteBuffer;
// 长度
private int size;
// 关联的MappedFile对象
private MappedFile mappedFile;
2.有了SelectMappedBufferResult,就可以读取消息数据了。由于消息重放并不需要知道消息主体内容,因此不会读取消息Body,只是读取相关属性,并构建DispatchRequest对象。读取的属性如下:
// 消息所属Topic
private final String topic;
// 消息所属队列ID
private final int queueId;
// 消息在CommitLog文件中的偏移量
private final long commitLogOffset;
// 消息大小
private int msgSize;
// 消息Tag哈希码
private final long tagsCode;
// 消息存盘时间
private final long storeTimestamp;
// 逻辑消费队列位点
private final long consumeQueueOffset;
private final String keys;
private final boolean success;
// 消息唯一键
private final String uniqKey;
// 消息系统标记
private final int sysFlag;
// 事务消息偏移量
private final long preparedTransactionOffset;
// 属性
private final Map<String, String> propertiesMap;
3.有了DispatchRequest对象,接下来就是调用doDispatch
方法将请求分发出去了。此时CommitLogDispatcherBuildConsumeQueue将被触发,它会将请求转交给DefaultMessageStore执行。
DefaultMessageStore.this.putMessagePositionInfo(request);
4.MessageStore先根据消息Topic和QueueID定位到ConsumeQueue文件,然后将索引追加到文件中。
public void putMessagePositionInfo(DispatchRequest dispatchRequest) {
// 根据Topic和QueueID定位到ConsumeQueue文件
ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());
// 追加索引到文件
cq.putMessagePositionInfoWrapper(dispatchRequest);
}
写索引之前,会先确保消息仓库是可写状态:
boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable();
然后,初始化一个ByteBuffer,容量为20字节,依次往里面写入:消息Offset、size、tagsCode。
// 每个索引的长度是20字节,byteBufferIndex是循环使用的
this.byteBufferIndex.flip();
this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
/**
* 索引结构:Offset+size+tagsCode
* 8字节 4字节 8字节
*/
this.byteBufferIndex.putLong(offset);
this.byteBufferIndex.putInt(size);
this.byteBufferIndex.putLong(tagsCode);
根据消费队列位点和单个索引的长度计算索引应该写入的文件位置,因为是顺序写的嘛,所以获取最新的ConsumeQueue文件,如果文件写满会创建新的继续写。
final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
写之前,校验预期的偏移量和逻辑偏移量是否相等,正常情况下两者应该相等,如果不等说明数据构建错乱了,需要重新构建了。
if (cqOffset != 0) {
// 偏移量:当前文件的写指针位置+文件起始偏移量(文件名)
long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();
// 正常情况下,expectLogicOffset和currentLogicOffset应该相等
if (expectLogicOffset < currentLogicOffset) {
log.warn("Build consume queue repeatedly, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset);
return true;
}
if (expectLogicOffset != currentLogicOffset) {
LOG_ERROR.warn(
"[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
expectLogicOffset,
currentLogicOffset,
this.topic,
this.queueId,
expectLogicOffset - currentLogicOffset
);
}
}
检验通过后,就可以正常写了。先更新当前ConsumeQueue记录消息的最大偏移量maxPhysicOffset,再将20个字节的索引数据写入到文件。
// 更新当前ConsumerQueue记录的消息在CommitLog中的最大偏移量
this.maxPhysicOffset = offset + size;
// 将20字节的索引数据写入文件
return mappedFile.appendMessage(this.byteBufferIndex.array());
至此,就完成了CommitLog中的消息到ConsumeQueue文件里的索引同步。
ConsumeQueue索引条目结构:
长度 | 说明 |
---|---|
8 | 消息在CommitLog文件中的偏移量 |
4 | 消息长度 |
8 | 消息Tag哈希码,根据Tag过滤消息 |
4. 总结
ConsumeQueue是RocketMQ用来加速消费者消费效率的索引文件,它是一个逻辑消费队列,并不保存消息本身,只是一个消息索引。索引长度为20个字节,记录了消息在CommitLog文件里的偏移量,消息长度,和消息Tag的哈希值。Consumer消费消息时可以根据Tag哈希值快速过滤消息,然后根据偏移量快速定位到消息,再根据消息长度读取出一条完整的消息。
Broker将消息写入CommitLog后并不会马上写ConsumeQueue,而是由一个异步线程ReputMessageService将消息进行重放,重放的过程中由CommitLogDispatcherBuildConsumeQueue将消息构建到ConsumeQueue文件,构建的频率为1毫秒一次,几乎是近实时的,不用担心消费会延迟。
标签:分析,文件,private,偏移量,构建,消息,ConsumeQueue,final 来源: https://blog.csdn.net/qq_32099833/article/details/120167005