编程语言
首页 > 编程语言> > 消息队列学习笔记7——生产、消费源码分析

消息队列学习笔记7——生产、消费源码分析

作者:互联网

文章目录


一、RocketMQ 生产者源码分析

版本 release-4.5.1

客户端是一个单独的 Module,在 rocketmq/client 目录中。

源码分析,可以从测试用例入手,一步一步跟踪其方法调用链路,理清实现过程。

Producer 的所有测试用例都在同一个测试类中org.apache.rocketmq.client.producer.DefaultMQProducerTest

这个测试类的主要测试方法如下:

init
terminate
testSendMessage_ZeroMessage
testSendMessage_NoNameSrv
testSendMessage_NoRoute
testSendMessageSync_Success
testSendMessageSync_WithBodyCompressed
testSendMessageAsync_Success
testSendMessageAsync
testSendMessageAsync_BodyCompressed
testSendMessageSync_SuccessWithHook

RockectMQ 的 Producer 入口类为org.apache.rocketmq.client.producer.DefaultMQProducer
在这里插入图片描述
这里面 RocketMQ 使用了一个设计模式:门面模式(Facade Pattern)。接口 MQProducer 就是这个模式中的门面。

类 DefaultMQProducer 实现了接口 MQProducer,它里面的方法实现大多没有任何的业务逻辑,只是封装了对其他实现类的方法调用,也可以理解为是门面的一部分。Producer 的大部分业务逻辑的实现都在类 DefaultMQProducerImpl 中。

1.启动过程

首先从测试用例的方法 init() 入手:

  @Before
  public void init() throws Exception {
      String producerGroupTemp = producerGroupPrefix + System.currentTimeMillis();
      producer = new DefaultMQProducer(producerGroupTemp);
      producer.setNamesrvAddr("127.0.0.1:9876");
      producer.setCompressMsgBodyOverHowmuch(16);

      //省略构造测试消息的代码

      producer.start();

      //省略用于测试构造mock的代码
  }

DefaultMQProducerImpl#start() :

public void start(final boolean startFactory) throws MQClientException {
    switch (this.serviceState) {
        case CREATE_JUST:
            this.serviceState = ServiceState.START_FAILED;

            // 省略参数检查和异常情况处理的代码

            // 获取MQClientInstance的实例mQClientFactory,没有则自动创建新的实例
            this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
            // 在mQClientFactory中注册自己
            boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
            // 省略异常处理代码

            // 启动mQClientFactory
            if (startFactory) {
                mQClientFactory.start();
            }
            this.serviceState = ServiceState.RUNNING;
            break;
        case RUNNING:
        case START_FAILED:
        case SHUTDOWN_ALREADY:
            // 省略异常处理代码
        default:
            break;
    }
    // 给所有Broker发送心跳
    this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}

RocketMQ 使用一个成员变量 serviceState 来记录和管理自身的服务状态,这实际上是状态模式 (State Pattern) 这种设计模式的变种实现。状态模式允许一个对象在其内部状态改变时改变它的行为,对象看起来就像是改变了它的类。

与标准的状态模式不同的是,它没有使用状态子类,而是使用分支流程(switch-case)来实现不同状态下的不同行为,在管理比较简单的状态时,使用这种设计会让代码更加简洁。这种模式非常广泛地用于管理有状态的类,推荐你在日常开发中使用。

在设计状态的时候,有两个要点是需要注意的,第一是,不仅要设计正常的状态,还要设计中间状态和异常状态,否则,一旦系统出现异常,你的状态就不准确了,你也就很难处理这种异常状态。比如在这段代码中,RUNNINGSHUTDOWN_ALREADY 是正常状态,CREATE_JUST 是一个中间状态,START_FAILED 是一个异常状态。

第二个要点是,将这些状态之间的转换路径考虑清楚,并在进行状态转换的时候,检查上一个状态是否能转换到下一个状态。比如,在这里,只有处于 CREATE_JUST 状态才能转换为 RUNNING 状态,这样就可以确保这个服务是一次性的,只能启动一次。从而避免了多次启动服务而导致的各种问题。

接下来看一下启动过程的实现:

其中实例 mQClientFactory 对应的类 MQClientInstance 是 RocketMQ 客户端中的顶层类,大多数情况下,可以简单地理解为每个客户端对应类 MQClientInstance 的一个实例。这个实例维护着客户端的大部分状态信息,以及所有的 Producer、Consumer 和各种服务的实例。

MQClientInstance#start() 中的代码:

// 启动请求响应通道
this.mQClientAPIImpl.start();
// 启动各种定时任务
this.startScheduledTask();
// 启动拉消息服务
this.pullMessageService.start();
// 启动Rebalance服务
this.rebalanceService.start();
// 启动Producer服务
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);

2.消息发送过程

在 Producer 的接口 MQProducer 中,定义了 19 个不同参数的发消息的方法,按照发送方式不同可以分成三类:

DefaultMQProducerImpl#send()(对应源码中的 1132 行):

@Deprecated
public void send(final Message msg, final MessageQueueSelector selector, final Object arg, final SendCallback sendCallback, final long timeout)
    throws MQClientException, RemotingException, InterruptedException {
    final long beginStartTime = System.currentTimeMillis();
    ExecutorService executor = this.getAsyncSenderExecutor();
    try {
        executor.submit(new Runnable() {
            @Override
            public void run() {
                long costTime = System.currentTimeMillis() - beginStartTime;
                if (timeout > costTime) {
                    try {
                        try {
                            sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, sendCallback,
                                timeout - costTime);
                        } catch (MQBrokerException e) {
                            throw new MQClientException("unknownn exception", e);
                        }
                    } catch (Exception e) {
                        sendCallback.onException(e);
                    }
                } else {
                    sendCallback.onException(new RemotingTooMuchRequestException("call timeout"));
                }
            }

        });
    } catch (RejectedExecutionException e) {
        throw new MQClientException("exector rejected ", e);
    }
}

RocketMQ 使用了一个 ExecutorService 来实现异步发送:使用 asyncSenderExecutor 的线程池,异步调用方法 sendSelectImpl(),继续发送消息的后续工作,当前线程把发送任务提交给 asyncSenderExecutor 就可以返回了。单向发送和同步发送的实现则是直接在当前线程中调用方法 sendSelectImpl()。

sendSelectImpl() 的实现:

// 省略部分代码
MessageQueue mq = null;

// 选择将消息发送到哪个队列(Queue)中
try {
    List<MessageQueue> messageQueueList =
        mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList());
    Message userMessage = MessageAccessor.cloneMessage(msg);
    String userTopic = NamespaceUtil.withoutNamespace(userMessage.getTopic(), mQClientFactory.getClientConfig().getNamespace());
    userMessage.setTopic(userTopic);

    mq = mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg));
} catch (Throwable e) {
    throw new MQClientException("select message queue throwed exception.", e);
}

// 省略部分代码

// 发送消息
if (mq != null) {
    return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout - costTime);
} else {
    throw new MQClientException("select message queue return null.", null);
}
// 省略部分代码

方法 sendSelectImpl() 中主要的功能就是选定要发送的队列,然后调用方法 sendKernelImpl() 发送消息。

选择哪个队列发送由 MessageQueueSelector#select 方法决定。在这里 RocketMQ 使用了策略模式(Strategy Pattern),来解决不同场景下需要使用不同的队列选择算法问题。

sendKernelImpl() 主要功能就是构建发送消息的头 RequestHeader 和上下文 SendMessageContext,然后调用方法 MQClientAPIImpl#sendMessage(),将消息发送给队列所在的 Broker。

核心类:


二、Kafka 消费者源码分析

版本 2.2

Kafka 消费模型的几个要点:

Kafka 的 Consumer Demo :

     // 设置必要的配置信息
     Properties props = new Properties();
     props.put("bootstrap.servers", "localhost:9092");
     props.put("group.id", "test");
     props.put("enable.auto.commit", "true");
     props.put("auto.commit.interval.ms", "1000");
     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

     // 创建Consumer实例
     KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

     // 订阅Topic
     consumer.subscribe(Arrays.asList("foo", "bar"));

     // 循环拉消息
     while (true) {
         ConsumerRecords<String, String> records = consumer.poll(100);
         for (ConsumerRecord<String, String> record : records)
             System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
     }

整个消费流程主要聚焦在三个问题上:

1.订阅过程

  public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {
      acquireAndEnsureOpen();
      try {
          // 省略部分代码

          // 重置订阅状态
          this.subscriptions.subscribe(new HashSet<>(topics), listener);

          // 更新元数据
          metadata.setTopics(subscriptions.groupSubscription());
      } finally {
          release();
      }
  }

订阅的主流程主要更新了两个属性:一个是订阅状态 subscriptions,另一个是更新元数据中的 topic 信息。

:开始的 acquireAndEnsureOpen() 和 try-finally release(),作用就是保护这个方法只能单线程调用。

metadata.setTopics() 里面,这个方法的实现除了更新元数据类 Metadata 中的 topic 相关的一些属性以外,还调用了 Metadata.requestUpdate()方法请求更新元数据。

    public synchronized int requestUpdate() {
        this.needUpdate = true;
        return this.updateVersion;
    }

这里面并没有真正发送更新元数据的请求,只是将需要更新元数据的标志位 needUpdate 设置为 true 就结束了。

总结:在订阅的实现过程中,Kafka 更新了订阅状态 subscriptions 和元数据 metadata 中的相关 topic 的一些属性,将元数据状态置为“需要立即更新”,但是并没有真正发送更新元数据的请求,整个过程没有和集群有任何网络数据交换。

2.拉取消息

这个流程的时序图如下:
在这里插入图片描述
KafkaConsumer.poll() 方法 (对应源码 1179 行) 里面,先后调用了 2 个私有方法:

方法 updateAssignmentMetadataIfNeeded() 中,调用了 coordinator.poll()方法,poll() 方法里面又调用了 client.ensureFreshMetadata()方法,又调用了 client.poll() 方法,实现了与 Cluster 通信,在 Coordinator 上注册 Consumer 并拉取和更新元数据。至此,“元数据会在什么时候真正做一次更新”这个问题也有了答案。

类 ConsumerNetworkClient 封装了 Consumer 和 Cluster 之间所有的网络通信的实现,这个类是一个非常彻底的异步实现。它没有维护任何的线程,所有待发送的 Request 都存放在属性 unsent 中,返回的 Response 存放在属性 pendingCompletion 中。每次调用 poll() 方法的时候,在当前线程中发送所有待发送的 Request,处理所有收到的 Response。

方法 pollForFetches() 的实现:

    private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(Timer timer) {
        // 省略部分代码
        // 如果缓存里面有未读取的消息,直接返回这些消息
        final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
        if (!records.isEmpty()) {
            return records;
        }
        // 构造拉取消息请求,并发送
        fetcher.sendFetches();
        // 省略部分代码
        // 发送网络请求拉取消息,等待直到有消息返回或者超时
        client.poll(pollTimer, () -> {
            return !fetcher.hasCompletedFetches();
        });
        // 省略部分代码
        // 返回拉到的消息
        return fetcher.fetchedRecords();
    }

在方法 fetcher.sendFetches() 的实现里面,Kafka 根据元数据的信息,构造到所有需要的 Broker 的拉消息的 Request,然后调用 client.Send() 方法将这些请求异步发送出去。并且,注册了一个回调类来处理返回的 Response,所有返回的 Response 被暂时存放在 Fetcher.completedFetches 中。需要注意的是,这时的 Request 并没有被真正发给各个 Broker,而是被暂存在了 client.unsend 中等待被发送。

然后,在调用client.poll() 方法时,会真正将之前构造的所有 Request 发送出去,并处理收到的 Response。

最后,fetcher.fetchedRecords() 方法中,将返回的 Response 反序列化后转换为消息列表,返回给调用者。


参考资料:李玥——消息队列高手课

耶律妙月 发布了25 篇原创文章 · 获赞 0 · 访问量 1983 私信 关注

标签:状态,队列,mQClientFactory,笔记,方法,发送,源码,消息,Consumer
来源: https://blog.csdn.net/qq_36089832/article/details/103990059