rocketMq-消息存储-buildIndex
作者:互联网
前言
上篇文章介绍了,在 dispatcherList 有两个 dispatcher ,其中一个就是buildIndex ,这个顾名思义就是建立索引,方便研发人员通过关键字查询消息,判断消息内容是否正确或者丢失,这个也是比kafka 的优势点。
原理
buildIndex 整体设计采用hashMap 数据结构设计的,即数据和链表,一个索引文件mappedFile默认有500w个hash槽,2000w个索引,还有文件头(40个字节),一个索引为20个字节,一个索引文件大概有400m;根据关键字求得hash 值,取模的方式确定在哪个hash槽,如果hash槽有值,说明有消息了,接着通过链表解决冲突,每个 hash 槽维护着当前key 最新的索引下标值(indexCount ),indexCount 是全局递增,header 存储的当前索引文件最新的,每个hash 槽存储的对应链表最新indexCount ;
关键字--key : 有两种一种是topic 与 业务唯一值组成;另外一种是msgId 组成;
简单讲一个索引的建立;根据key的hash 值定位到那个hash槽,把其中的值取出来(hash_indexCount);在根据header的indexCount ,取名为header_indexCount ;索引数据存储定位位置=header(40)+hash槽大小(4)*hash槽数量(500w)+header_indexCount * 一个索引大小(20);位置求出了,我们就可以存储索引数据,数据主要有两个,一个是physicsOffset,一个就是hash_indexCount;physicsOffset:可以根据这个值去commintLog查找对应的消息;而hash_indexCount;可以根据key 查询到链条上所有的索引值,就是通过hash_indexCount反向查找上一个索引值,类似反向链表;
建议:针对类似hashMap 的结构,建议去看看数组和链表的数据结构与算法;更好的理解,本人也是看了,理解会更到位一一些;
整个流程图如下
BuildIndex源码解析
入口为indexService,接下来我们就对其方法分析
public void buildIndex(DispatchRequest req) {
//获取最新indexFile 没有或者满了就创建
IndexFile indexFile = retryGetAndCreateIndexFile();
if (indexFile != null) {
long endPhyOffset = indexFile.getEndPhyOffset();
DispatchRequest msg = req;
String topic = msg.getTopic();
String keys = msg.getKeys();
//小于表示已经有了,也是防止重复
if (msg.getCommitLogOffset() < endPhyOffset) {
return;
}
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
switch (tranType) {
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
break;
//消息状态为回滚的就不用构建索引了
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
return;
}
//通过uniqkey + topic 创建索引
if (req.getUniqKey() != null) {
indexFile = putKey(indexFile, msg, buildKey(topic, req.getUniqKey()));
if (indexFile == null) {
log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey());
return;
}
}
// 通过key+ topic 创建索引,如果key为多个用逗号分隔
if (keys != null && keys.length() > 0) {
String[] keyset = keys.split(MessageConst.KEY_SEPARATOR);
for (int i = 0; i < keyset.length; i++) {
String key = keyset[i];
if (key.length() > 0) {
indexFile = putKey(indexFile, msg, buildKey(topic, key));
if (indexFile == null) {
log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey());
return;
}
}
}
}
} else {
log.error("build index error, stop building index");
}
}
先获取到indexFile ,就是取indexFileList 最后一个,判断其是否满了,满了,就创建一个;接下来主要讲构建索引,用两种入参构建都是调用putKey,我们重点讲解这个方法;
这个可以理解为死循环创建索引
根据key 取到hash值,定位那个槽,以及计算出槽在当前文件的位置
这是获取keyHash 的实现
获取到槽的值,这个slotValue 其实就是存的索引的index ,跟header存的一样,不一样的是header 存的是当前文件最新的索引index,而slotValue 存的是当前key 对应链条的最新索引的index
//计算出要存储索引的开始位置
int absIndexPos =
IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
+ this.indexHeader.getIndexCount() * indexSize;
//追加keyHash
this.mappedByteBuffer.putInt(absIndexPos, keyHash);
//追加physicsOffset
this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
//时间差
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
//上一个链值 ,slotValue 等于0表示此值为链路第一个值
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);
//存储当前索引的下坐标
this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());
// 如果是1表明是第一个index
if (this.indexHeader.getIndexCount() <= 1) {
//开始的physicsOffset
this.indexHeader.setBeginPhyOffset(phyOffset);
this.indexHeader.setBeginTimestamp(storeTimestamp);
}
if (invalidIndex == slotValue) {
this.indexHeader.incHashSlotCount();
}
//设置索引数量+1
this.indexHeader.incIndexCount();
//结尾的physicsOffset
this.indexHeader.setEndPhyOffset(phyOffset);
this.indexHeader.setEndTimestamp(storeTimestamp);
return true;
定位存储索引位置后,就添加索引数据,其中主要有keyhash值,physicsOffset,上一个slotValue,更新hash槽为当前索引的下坐标,更新头部信息,索引数量+1;索引的存储就这样结束了,是不是很简单。
总结
建立索引的过程可以理解为hashMap的一个put 流程;其索引值有physicsOffset,和链表上一个节点的索引下坐标,这样就形成了反向链表;根据这个原则就可以通过topic + key 很快定位有哪里physicsOffset,通过physicsOffset就能很快查询到消息的具体内容。
标签:存储,hash,req,索引,rocketMq,key,indexFile,indexCount,buildIndex 来源: https://blog.csdn.net/qq_30834791/article/details/118581077