编程语言
首页 > 编程语言> > 全站最硬核 百万字强肝RocketMq源码 火热更新中~(九十二)延时队列

全站最硬核 百万字强肝RocketMq源码 火热更新中~(九十二)延时队列

作者:互联网

this.dispatcherList = new LinkedList<>();
this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue());
this.dispatcherList.addLast(new CommitLogDispatcherBuildIndex());

doDispatch()会遍历CommitLogDispatcher,调用它们的dispatch()方法。其中专门用来通知ConsumeQueue的Dispatcher是CommitLogDispatcherBuildConsumeQueue

class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {

        @Override
        public void dispatch(DispatchRequest request) {
            final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());
            switch (tranType) {
                case MessageSysFlag.TRANSACTION_NOT_TYPE:
                case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
                    DefaultMessageStore.this.putMessagePositionInfo(request);
                    break;
                case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
                case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                    break;
            }
        }
    }

当ReputMessageService调用了CommitLogDispatcherBuildConsumeQueue的dispatch()后,CommitLogDispatcherBuildConsumeQueue便会调用 DefaultMessageStore.this.putMessagePositionInfo(request):

public void putMessagePositionInfo(DispatchRequest dispatchRequest) {
    ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());
    cq.putMessagePositionInfoWrapper(dispatchRequest);
}

putMessagePositionInfo()逻辑有两步:

1:调用findConsumeQueue(),根据消息的topic以及消息所属的ConsumeQueueId,找到对应的ConsumeQueue。

findConsumeQueue()会先从consumeQueueTable中查询topic的ConsumeQueueMap,如果未找到,便会为Topic创建一个新的ConcurrentMap<Integer/* queueId */, ConsumeQueue>,存放到表中。

接着在从Topic的ConcurrentMap中,根据QueueId,查询ConsumeQueue,如果未找到,便也会创建一个新的ConsumeQueue,存放到Map中。ConsumeQueue便是此时被创建的。

2:当找到消息对应的ConsumeQueue后,便调用ConsumeQueue的putMessagePositionInfoWrapper()方法,更新ConsumeQueue。

ConsumeQueue的更新

上面主要讲了ReputMessageService是如何通知ConsumeQueue的,现在我们就要看看ConsumeQueue收到通知后是如何更新的,更新逻辑就在putMessagePositionInfoWrapper()中。

putMessagePositionInfoWrapper()中调用了putMessagePositionInfo(),并引入了重试机制。

我们来看看putMessagePositionInfo()中的主要逻辑:

1:判断消息是否已经被处理过

 if (offset <= this.maxPhysicOffset) {
            return true;
 }

maxPhysicOffset记录了上一次ConsumeQueue更新的消息在CommitLog中的偏移量,如果本次消息偏移量小于maxPhysicOffset,则表明消息已经被更新过,直接返回。

标签:全站,CommitLogDispatcherBuildConsumeQueue,调用,TRANSACTION,putMessagePositionInfo,Me
来源: https://blog.csdn.net/GBS20200720/article/details/122758150