编程语言
首页 > 编程语言> > rocketmq源码解析消息拉取处理器②

rocketmq源码解析消息拉取处理器②

作者:互联网

说在前面

消息拉取处理器

源码解析
进入到这个方法,根据offset找到映射文件,org.apache.rocketmq.store.MappedFileQueue#findMappedFileByOffset(long, boolean)上面介绍过了。

往上返回到这个方法,根据offset找到下个映射文件,org.apache.rocketmq.store.CommitLog#rollNextFile

 public long rollNextFile(final long offset) {
        int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog();
        return offset + mappedFileSize - offset % mappedFileSize;
    }

往上返回到这个方法,执行消息消费的钩子方法在消费消息之前,org.apache.rocketmq.broker.processor.PullMessageProcessor#executeConsumeMessageHookBefore

 public void executeConsumeMessageHookBefore(final ConsumeMessageContext context) {
        if (hasConsumeMessageHook()) {
            for (ConsumeMessageHook hook : this.consumeMessageHookList) {
                try {
//                    用户可以实现自己在消费之前的钩子方法
                    hook.consumeMessageBefore(context);
                } catch (Throwable e) {
                }
            }
        }
    }

往上返回到这个方法,暂停消息拉取服务,org.apache.rocketmq.broker.longpolling.PullRequestHoldService#suspendPullRequest

 public void suspendPullRequest(final String topic, final int queueId, final PullRequest pullRequest) {
        String key = this.buildKey(topic, queueId);
        ManyPullRequest mpr = this.pullRequestTable.get(key);
        if (null == mpr) {
            mpr = new ManyPullRequest();
            ManyPullRequest prev = this.pullRequestTable.putIfAbsent(key, mpr);
            if (prev != null) {
                mpr = prev;
            }
        }

        mpr.addPullRequest(pullRequest);
    }

往上返回到这个方法,提交offset,org.apache.rocketmq.broker.offset.ConsumerOffsetManager#commitOffset(java.lang.String, java.lang.String, java.lang.String, int, long)

 public void commitOffset(final String clientHost, final String group, final String topic, final int queueId,
        final long offset) {
        // topic@group
        String key = topic + TOPIC_GROUP_SEPARATOR + group;
//        =》
        this.commitOffset(clientHost, key, queueId, offset);
    }

进入这个方法,org.apache.rocketmq.broker.offset.ConsumerOffsetManager#commitOffset(java.lang.String, java.lang.String, int, long)

private void commitOffset(final String clientHost, final String key, final int queueId, final long offset) {
        ConcurrentMap<Integer, Long> map = this.offsetTable.get(key);
        if (null == map) {
            map = new ConcurrentHashMap<Integer, Long>(32);
            map.put(queueId, offset);
            this.offsetTable.put(key, map);
        } else {
            Long storeOffset = map.put(queueId, offset);
            if (storeOffset != null && offset < storeOffset) {
                log.warn("[NOTIFYME]update consumer offset less than store. clientHost={}, key={}, queueId={}, requestOffset={}, storeOffset={}", clientHost, key, queueId, offset, storeOffset);
            }
        }
    }

往上返回到这个方法,org.apache.rocketmq.broker.processor.PullMessageProcessor#processRequest(io.netty.channel.Channel, org.apache.rocketmq.remoting.protocol.RemotingCommand, boolean)结束。

说在最后
本次解析仅代表个人观点,仅供参考。

在这里插入图片描述

钉钉技术群
在这里插入图片描述

标签:queueId,String,拉取,源码,key,offset,final,rocketmq
来源: https://blog.csdn.net/qq_23283355/article/details/100075445