其他分享
首页 > 其他分享> > rocketMq-消息存储-buildIndex

rocketMq-消息存储-buildIndex

作者:互联网

前言

上篇文章介绍了,在 dispatcherList 有两个 dispatcher ,其中一个就是buildIndex ,这个顾名思义就是建立索引,方便研发人员通过关键字查询消息,判断消息内容是否正确或者丢失,这个也是比kafka 的优势点。

原理

buildIndex 整体设计采用hashMap 数据结构设计的,即数据和链表,一个索引文件mappedFile默认有500w个hash槽,2000w个索引,还有文件头(40个字节),一个索引为20个字节,一个索引文件大概有400m;根据关键字求得hash 值,取模的方式确定在哪个hash槽,如果hash槽有值,说明有消息了,接着通过链表解决冲突,每个 hash 槽维护着当前key 最新的索引下标值(indexCount ),indexCount 是全局递增,header 存储的当前索引文件最新的,每个hash 槽存储的对应链表最新indexCount ;

 关键字--key : 有两种一种是topic 与 业务唯一值组成;另外一种是msgId 组成;

简单讲一个索引的建立;根据key的hash 值定位到那个hash槽,把其中的值取出来(hash_indexCount);在根据header的indexCount ,取名为header_indexCount ;索引数据存储定位位置=header(40)+hash槽大小(4)*hash槽数量(500w)+header_indexCount * 一个索引大小(20);位置求出了,我们就可以存储索引数据,数据主要有两个,一个是physicsOffset,一个就是hash_indexCount;physicsOffset:可以根据这个值去commintLog查找对应的消息;而hash_indexCount;可以根据key 查询到链条上所有的索引值,就是通过hash_indexCount反向查找上一个索引值,类似反向链表;

建议:针对类似hashMap 的结构,建议去看看数组和链表的数据结构与算法;更好的理解,本人也是看了,理解会更到位一一些;

整个流程图如下

  BuildIndex源码解析

入口为indexService,接下来我们就对其方法分析

 

    public void buildIndex(DispatchRequest req) {
        //获取最新indexFile 没有或者满了就创建
        IndexFile indexFile = retryGetAndCreateIndexFile();
        if (indexFile != null) {
            long endPhyOffset = indexFile.getEndPhyOffset();
            DispatchRequest msg = req;
            String topic = msg.getTopic();
            String keys = msg.getKeys();
            //小于表示已经有了,也是防止重复
            if (msg.getCommitLogOffset() < endPhyOffset) {
                return;
            }

            final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
            switch (tranType) {
                case MessageSysFlag.TRANSACTION_NOT_TYPE:
                case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
                case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
                    break;
                    //消息状态为回滚的就不用构建索引了
                case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                    return;
            }
             //通过uniqkey + topic 创建索引
           if (req.getUniqKey() != null) {
                indexFile = putKey(indexFile, msg, buildKey(topic, req.getUniqKey()));
                if (indexFile == null) {
                    log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey());
                    return;
                }
            }
            // 通过key+ topic 创建索引,如果key为多个用逗号分隔
            if (keys != null && keys.length() > 0) {
                String[] keyset = keys.split(MessageConst.KEY_SEPARATOR);
                for (int i = 0; i < keyset.length; i++) {
                    String key = keyset[i];
                    if (key.length() > 0) {
                        indexFile = putKey(indexFile, msg, buildKey(topic, key));
                        if (indexFile == null) {
                            log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey());
                            return;
                        }
                    }
                }
            }
        } else {
            log.error("build index error, stop building index");
        }
    }

先获取到indexFile ,就是取indexFileList 最后一个,判断其是否满了,满了,就创建一个;接下来主要讲构建索引,用两种入参构建都是调用putKey,我们重点讲解这个方法;

 这个可以理解为死循环创建索引

 根据key 取到hash值,定位那个槽,以及计算出槽在当前文件的位置

 这是获取keyHash 的实现

 获取到槽的值,这个slotValue 其实就是存的索引的index ,跟header存的一样,不一样的是header 存的是当前文件最新的索引index,而slotValue 存的是当前key 对应链条的最新索引的index

   //计算出要存储索引的开始位置
                int absIndexPos =
                    IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
                        + this.indexHeader.getIndexCount() * indexSize;
                //追加keyHash
                this.mappedByteBuffer.putInt(absIndexPos, keyHash);
                //追加physicsOffset
                this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
                //时间差
                this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
                //上一个链值 ,slotValue 等于0表示此值为链路第一个值
                this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);
                //存储当前索引的下坐标
                this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());
                // 如果是1表明是第一个index
                if (this.indexHeader.getIndexCount() <= 1) {
                    //开始的physicsOffset
                    this.indexHeader.setBeginPhyOffset(phyOffset);
                    this.indexHeader.setBeginTimestamp(storeTimestamp);
                }

                if (invalidIndex == slotValue) {
                    this.indexHeader.incHashSlotCount();
                }
                //设置索引数量+1
                this.indexHeader.incIndexCount();
                //结尾的physicsOffset
                this.indexHeader.setEndPhyOffset(phyOffset);
                this.indexHeader.setEndTimestamp(storeTimestamp);

                return true;

 定位存储索引位置后,就添加索引数据,其中主要有keyhash值,physicsOffset,上一个slotValue,更新hash槽为当前索引的下坐标,更新头部信息,索引数量+1;索引的存储就这样结束了,是不是很简单。

总结

 建立索引的过程可以理解为hashMap的一个put 流程;其索引值有physicsOffset,和链表上一个节点的索引下坐标,这样就形成了反向链表;根据这个原则就可以通过topic + key 很快定位有哪里physicsOffset,通过physicsOffset就能很快查询到消息的具体内容。

标签:存储,hash,req,索引,rocketMq,key,indexFile,indexCount,buildIndex
来源: https://blog.csdn.net/qq_30834791/article/details/118581077