rocketmq源码解析之管理请求获取最大的offset
作者:互联网
说在前面
本次继续解析管理请求,GET_MAX_OFFSET 获取最大的offset
源码解析
进入到这个方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#getMaxOffset获取最大的offset
private RemotingCommand getMaxOffset(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(GetMaxOffsetResponseHeader.class);
final GetMaxOffsetResponseHeader responseHeader = (GetMaxOffsetResponseHeader) response.readCustomHeader();
final GetMaxOffsetRequestHeader requestHeader =
(GetMaxOffsetRequestHeader) request.decodeCommandCustomHeader(GetMaxOffsetRequestHeader.class);
// 根据topic和queueId获取最大的offset =》
long offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(requestHeader.getTopic(), requestHeader.getQueueId());
responseHeader.setOffset(offset);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
进入这个方法org.apache.rocketmq.store.DefaultMessageStore#getMaxOffsetInQueue 根据topic和queueId获取最大的offset
public long getMaxOffsetInQueue(String topic, int queueId) {
// 根据topic和queueId找到消费者队列=》
ConsumeQueue logic = this.findConsumeQueue(topic, queueId);
if (logic != null) {
// 获取最大的offset =》
long offset = logic.getMaxOffsetInQueue();
return offset;
}
// 如果不存在指定topic和queueId的消费队列直接返回0
return 0;
}
进入这个方法org.apache.rocketmq.store.DefaultMessageStore#findConsumeQueue 根据topic和queueId查询消费队列
public ConsumeQueue findConsumeQueue(String topic, int queueId) {
ConcurrentMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);
if (null == map) {
ConcurrentMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128);
ConcurrentMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);
if (oldMap != null) {
map = oldMap;
} else {
map = newMap;
}
}
// 按queue id查找消费者队列
ConsumeQueue logic = map.get(queueId);
if (null == logic) {
ConsumeQueue newLogic = new ConsumeQueue(
topic,
queueId,
// 消费者队列存储地址 user.home/store/consumequeue
StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),
// 每个文件存储默认30W
this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(),
this);
ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);
if (oldLogic != null) {
logic = oldLogic;
} else {
logic = newLogic;
}
}
return logic;
}
往上返回到这个方法org.apache.rocketmq.store.ConsumeQueue#getMaxOffsetInQueue 查询最大的offset
public long getMaxOffsetInQueue() {
// =》
return this.mappedFileQueue.getMaxOffset() / CQ_STORE_UNIT_SIZE;
}
进入这个方法org.apache.rocketmq.store.MappedFileQueue#getMaxOffset
public long getMaxOffset() {
// 获取存储映射文件队列中索引位置最大的映射文件
MappedFile mappedFile = getLastMappedFile();
if (mappedFile != null) {
// 映射文件的起始offset+映射文件的可读取的索引位置
return mappedFile.getFileFromOffset() + mappedFile.getReadPosition();
}
// 如果队列中没有存储映射文件直接返回0
return 0;
}
进入这个方法org.apache.rocketmq.store.MappedFileQueue#getLastMappedFile() 获取映射文件集合中最后一个映射文件
public MappedFile getLastMappedFile() {
MappedFile mappedFileLast = null;
while (!this.mappedFiles.isEmpty()) {
try {
mappedFileLast = this.mappedFiles.get(this.mappedFiles.size() - 1);
break;
} catch (IndexOutOfBoundsException e) {
//continue;
} catch (Exception e) {
log.error("getLastMappedFile has exception.", e);
break;
}
}
return mappedFileLast;
}
可以看到映射文件集合是用CopyOnWriteArrayList实现
// 并发线程安全队列存储映射文件
private final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();
往上返回到这个方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#getMaxOffset结束
说在最后
本次解析仅代表个人观点,仅供参考。
加入技术微信群
钉钉技术群
标签:queueId,return,logic,topic,源码,offset,rocketmq 来源: https://blog.csdn.net/qq_23283355/article/details/100054531