编程语言
首页 > 编程语言> > rocketmq源码解析之管理请求获取最小的offset

rocketmq源码解析之管理请求获取最小的offset

作者:互联网

说在前面

管理请求 GET_MIN_OFFSET 获取最小的offset

源码解析
进入这个方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#getMinOffset 获取最小的offset

 private RemotingCommand getMinOffset(ChannelHandlerContext ctx,
        RemotingCommand request) throws RemotingCommandException {
        final RemotingCommand response = RemotingCommand.createResponseCommand(GetMinOffsetResponseHeader.class);
        final GetMinOffsetResponseHeader responseHeader = (GetMinOffsetResponseHeader) response.readCustomHeader();
        final GetMinOffsetRequestHeader requestHeader =
            (GetMinOffsetRequestHeader) request.decodeCommandCustomHeader(GetMinOffsetRequestHeader.class);
//        根据topic和queueId查找最小的offset =》
        long offset = this.brokerController.getMessageStore().getMinOffsetInQueue(requestHeader.getTopic(), requestHeader.getQueueId());
        responseHeader.setOffset(offset);
        response.setCode(ResponseCode.SUCCESS);
        response.setRemark(null);
        return response;
    }

进入这个方法org.apache.rocketmq.store.DefaultMessageStore#getMinOffsetInQueue 根据topic和queueId查询最小的offset

public long getMinOffsetInQueue(String topic, int queueId) {
//        根据topic和queueId查询消费者队列 =》
        ConsumeQueue logic = this.findConsumeQueue(topic, queueId);
        if (logic != null) {
//            获取队列中的最小offset
            return logic.getMinOffsetInQueue();
        }

        return -1;
    }

进入这个方法org.apache.rocketmq.store.DefaultMessageStore#findConsumeQueue 按topic和queueId查询消费队列,这个方法前面已介绍过

public ConsumeQueue findConsumeQueue(String topic, int queueId) {
        ConcurrentMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);
        if (null == map) {
            ConcurrentMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128);
            ConcurrentMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);
            if (oldMap != null) {
                map = oldMap;
            } else {
                map = newMap;
            }
        }

//        按queue id查找消费者队列
        ConsumeQueue logic = map.get(queueId);
        if (null == logic) {
            ConsumeQueue newLogic = new ConsumeQueue(
                topic,
                queueId,
//                消费者队列存储地址 user.home/store/consumequeue
                StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),
//                每个文件存储默认30W
                this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(),
                this);
            ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);
            if (oldLogic != null) {
                logic = oldLogic;
            } else {
                logic = newLogic;
            }
        }

        return logic;
    }

往上返回到这个方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#getMinOffset结束

说在最后

本次解析仅代表个人观点,仅供参考。

加入技术微信群
在这里插入图片描述

钉钉技术群
在这里插入图片描述

标签:queueId,ConsumeQueue,offset,topic,源码,logic,null,rocketmq
来源: https://blog.csdn.net/qq_23283355/article/details/100054596