其他分享
首页 > 其他分享> > ConsumeQueue构建过程分析

ConsumeQueue构建过程分析

作者:互联网

1. 前言

理论上来说,RocketMQ只要有CommitLog文件就可以正常运行了,那为何还要维护ConsumeQueue文件呢?

ConsumeQueue是消费队列,引入它的目的是为了提高消费者的消费速度。毕竟RocketMQ是基于Topic主题订阅模式的,消费者往往只关心自己订阅的消息,如果每次消费都从CommitLog文件中检索数据,无疑性能是非常差的。有了ConsumeQueue,消费者就可以根据消息在CommitLog文件中的偏移量快速定位到消息进行消费了。

之前的文章已经说过,Broker会将客户端发送的消息写入CommitLog文件,持久化存储。但是整个流程并没有涉及到ConsumeQueue文件的操作,那么ConsumeQueue文件是如何被构建的呢?

2. ReputMessageService

ReputMessageService是「消息重放服务」,请允许我这么命名。Broker在启动的时候,会开启一个线程每毫秒执行一次doReput()方法。

它的目的就是对写入CommitLog文件里的消息进行「重放」,它有一个属性reputFromOffset,记录的是消息重放的偏移量,MessageStore启动的时候会对其进行赋值。

它的工作原理是,根据重放偏移量reputFromOffset去读取CommitLog里的待重放的消息,并构建DispatchRequest对象,然后将DispatchRequest对象分发出去,交给各个CommitLogDispatcher处理。

MessageStore维护了CommitLogDispatcher对象集合,目前只有三个处理器:

  1. CommitLogDispatcherBuildConsumeQueue:构建ConsumeQueue索引。
  2. CommitLogDispatcherBuildIndex:构建Index索引。
  3. CommitLogDispatcherCalcBitMap:构建布隆过滤器,加速SQL92过滤效率。

本篇文章主要分析CommitLogDispatcherBuildConsumeQueue,看看RocketMQ是如何构建ConsumeQueue的。

3. 源码分析

笔者画了一下ConsumeQueue构建过程的时序图,整个构建过程并不算复杂。
在这里插入图片描述

1.doReput()方法1毫秒执行一次,它的方法体是一个for循环,只要reputFromOffset没有到达CommitLog文件的最大偏移量,就会一直继续重放消息。

private boolean isCommitLogAvailable() {
    return this.reputFromOffset < DefaultMessageStore.this.commitLog.getMaxOffset();
}

它首先会根据reputFromOffset去CommitLog文件中截取一段ByteBuffer,这个缓冲区里就是待重放的消息数据。

public SelectMappedBufferResult getData(final long offset, final boolean returnFirstOnNotFound) {
    // CommitLog单个文件的大小
    int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog();
    // 根据索引构建进度找到等待构建的文件,文件名就是起始Offset,遍历文件即可找到
    MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, returnFirstOnNotFound);
    if (mappedFile != null) {
        // 计算Offset在当前文件的读指针位置
        int pos = (int) (offset % mappedFileSize);
        /**
         * 基于MappedFile的MappedByteBuffer派生出一个ByteBuffer对象
         * 共享同一块内存,但是拥有自己的指针
         */
        SelectMappedBufferResult result = mappedFile.selectMappedBuffer(pos);
        return result;
    }
    return null;
}

SelectMappedBufferResult类属性如下:

// 起始偏移量
private final long startOffset;
// 缓冲区
private final ByteBuffer byteBuffer;
// 长度
private int size;
// 关联的MappedFile对象
private MappedFile mappedFile;

2.有了SelectMappedBufferResult,就可以读取消息数据了。由于消息重放并不需要知道消息主体内容,因此不会读取消息Body,只是读取相关属性,并构建DispatchRequest对象。读取的属性如下:

// 消息所属Topic
private final String topic;
// 消息所属队列ID
private final int queueId;
// 消息在CommitLog文件中的偏移量
private final long commitLogOffset;
// 消息大小
private int msgSize;
// 消息Tag哈希码
private final long tagsCode;
// 消息存盘时间
private final long storeTimestamp;
// 逻辑消费队列位点
private final long consumeQueueOffset;
private final String keys;
private final boolean success;
// 消息唯一键
private final String uniqKey;
// 消息系统标记
private final int sysFlag;
// 事务消息偏移量
private final long preparedTransactionOffset;
// 属性
private final Map<String, String> propertiesMap;

3.有了DispatchRequest对象,接下来就是调用doDispatch方法将请求分发出去了。此时CommitLogDispatcherBuildConsumeQueue将被触发,它会将请求转交给DefaultMessageStore执行。

DefaultMessageStore.this.putMessagePositionInfo(request);

4.MessageStore先根据消息Topic和QueueID定位到ConsumeQueue文件,然后将索引追加到文件中。

public void putMessagePositionInfo(DispatchRequest dispatchRequest) {
    // 根据Topic和QueueID定位到ConsumeQueue文件
    ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());
    // 追加索引到文件
    cq.putMessagePositionInfoWrapper(dispatchRequest);
}

写索引之前,会先确保消息仓库是可写状态:

boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable();

然后,初始化一个ByteBuffer,容量为20字节,依次往里面写入:消息Offset、size、tagsCode。

// 每个索引的长度是20字节,byteBufferIndex是循环使用的
this.byteBufferIndex.flip();
this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
/**
* 索引结构:Offset+size+tagsCode
* 8字节 4字节 8字节
*/
this.byteBufferIndex.putLong(offset);
this.byteBufferIndex.putInt(size);
this.byteBufferIndex.putLong(tagsCode);

根据消费队列位点和单个索引的长度计算索引应该写入的文件位置,因为是顺序写的嘛,所以获取最新的ConsumeQueue文件,如果文件写满会创建新的继续写。

final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);

写之前,校验预期的偏移量和逻辑偏移量是否相等,正常情况下两者应该相等,如果不等说明数据构建错乱了,需要重新构建了。

if (cqOffset != 0) {
    // 偏移量:当前文件的写指针位置+文件起始偏移量(文件名)
    long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();

    // 正常情况下,expectLogicOffset和currentLogicOffset应该相等
    if (expectLogicOffset < currentLogicOffset) {
        log.warn("Build  consume queue repeatedly, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
                 expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset);
        return true;
    }
    if (expectLogicOffset != currentLogicOffset) {
        LOG_ERROR.warn(
            "[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
            expectLogicOffset,
            currentLogicOffset,
            this.topic,
            this.queueId,
            expectLogicOffset - currentLogicOffset
        );
    }
}

检验通过后,就可以正常写了。先更新当前ConsumeQueue记录消息的最大偏移量maxPhysicOffset,再将20个字节的索引数据写入到文件。

// 更新当前ConsumerQueue记录的消息在CommitLog中的最大偏移量
this.maxPhysicOffset = offset + size;
// 将20字节的索引数据写入文件
return mappedFile.appendMessage(this.byteBufferIndex.array());

至此,就完成了CommitLog中的消息到ConsumeQueue文件里的索引同步。

ConsumeQueue索引条目结构:

长度说明
8消息在CommitLog文件中的偏移量
4消息长度
8消息Tag哈希码,根据Tag过滤消息

4. 总结

ConsumeQueue是RocketMQ用来加速消费者消费效率的索引文件,它是一个逻辑消费队列,并不保存消息本身,只是一个消息索引。索引长度为20个字节,记录了消息在CommitLog文件里的偏移量,消息长度,和消息Tag的哈希值。Consumer消费消息时可以根据Tag哈希值快速过滤消息,然后根据偏移量快速定位到消息,再根据消息长度读取出一条完整的消息。

Broker将消息写入CommitLog后并不会马上写ConsumeQueue,而是由一个异步线程ReputMessageService将消息进行重放,重放的过程中由CommitLogDispatcherBuildConsumeQueue将消息构建到ConsumeQueue文件,构建的频率为1毫秒一次,几乎是近实时的,不用担心消费会延迟。

标签:分析,文件,private,偏移量,构建,消息,ConsumeQueue,final
来源: https://blog.csdn.net/qq_32099833/article/details/120167005