rocketmq源码解析之管理请求获取最小的offset
作者:互联网
说在前面
管理请求 GET_MIN_OFFSET 获取最小的offset
源码解析
进入这个方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#getMinOffset 获取最小的offset
private RemotingCommand getMinOffset(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(GetMinOffsetResponseHeader.class);
final GetMinOffsetResponseHeader responseHeader = (GetMinOffsetResponseHeader) response.readCustomHeader();
final GetMinOffsetRequestHeader requestHeader =
(GetMinOffsetRequestHeader) request.decodeCommandCustomHeader(GetMinOffsetRequestHeader.class);
// 根据topic和queueId查找最小的offset =》
long offset = this.brokerController.getMessageStore().getMinOffsetInQueue(requestHeader.getTopic(), requestHeader.getQueueId());
responseHeader.setOffset(offset);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
进入这个方法org.apache.rocketmq.store.DefaultMessageStore#getMinOffsetInQueue 根据topic和queueId查询最小的offset
public long getMinOffsetInQueue(String topic, int queueId) {
// 根据topic和queueId查询消费者队列 =》
ConsumeQueue logic = this.findConsumeQueue(topic, queueId);
if (logic != null) {
// 获取队列中的最小offset
return logic.getMinOffsetInQueue();
}
return -1;
}
进入这个方法org.apache.rocketmq.store.DefaultMessageStore#findConsumeQueue 按topic和queueId查询消费队列,这个方法前面已介绍过
public ConsumeQueue findConsumeQueue(String topic, int queueId) {
ConcurrentMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);
if (null == map) {
ConcurrentMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128);
ConcurrentMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);
if (oldMap != null) {
map = oldMap;
} else {
map = newMap;
}
}
// 按queue id查找消费者队列
ConsumeQueue logic = map.get(queueId);
if (null == logic) {
ConsumeQueue newLogic = new ConsumeQueue(
topic,
queueId,
// 消费者队列存储地址 user.home/store/consumequeue
StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),
// 每个文件存储默认30W
this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(),
this);
ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);
if (oldLogic != null) {
logic = oldLogic;
} else {
logic = newLogic;
}
}
return logic;
}
往上返回到这个方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#getMinOffset结束
说在最后
本次解析仅代表个人观点,仅供参考。
加入技术微信群
钉钉技术群
标签:queueId,ConsumeQueue,offset,topic,源码,logic,null,rocketmq 来源: https://blog.csdn.net/qq_23283355/article/details/100054596