消息队列学习笔记7——生产、消费源码分析
作者:互联网
文章目录
一、RocketMQ 生产者源码分析
版本 release-4.5.1
客户端是一个单独的 Module,在 rocketmq/client
目录中。
源码分析,可以从测试用例入手,一步一步跟踪其方法调用链路,理清实现过程。
Producer 的所有测试用例都在同一个测试类中org.apache.rocketmq.client.producer.DefaultMQProducerTest
这个测试类的主要测试方法如下:
init
terminate
testSendMessage_ZeroMessage
testSendMessage_NoNameSrv
testSendMessage_NoRoute
testSendMessageSync_Success
testSendMessageSync_WithBodyCompressed
testSendMessageAsync_Success
testSendMessageAsync
testSendMessageAsync_BodyCompressed
testSendMessageSync_SuccessWithHook
RockectMQ 的 Producer 入口类为org.apache.rocketmq.client.producer.DefaultMQProducer
这里面 RocketMQ 使用了一个设计模式:门面模式(Facade Pattern)。接口 MQProducer 就是这个模式中的门面。
类 DefaultMQProducer 实现了接口 MQProducer,它里面的方法实现大多没有任何的业务逻辑,只是封装了对其他实现类的方法调用,也可以理解为是门面的一部分。Producer 的大部分业务逻辑的实现都在类 DefaultMQProducerImpl 中。
1.启动过程
首先从测试用例的方法 init() 入手:
@Before
public void init() throws Exception {
String producerGroupTemp = producerGroupPrefix + System.currentTimeMillis();
producer = new DefaultMQProducer(producerGroupTemp);
producer.setNamesrvAddr("127.0.0.1:9876");
producer.setCompressMsgBodyOverHowmuch(16);
//省略构造测试消息的代码
producer.start();
//省略用于测试构造mock的代码
}
DefaultMQProducerImpl#start() :
public void start(final boolean startFactory) throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// 省略参数检查和异常情况处理的代码
// 获取MQClientInstance的实例mQClientFactory,没有则自动创建新的实例
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
// 在mQClientFactory中注册自己
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
// 省略异常处理代码
// 启动mQClientFactory
if (startFactory) {
mQClientFactory.start();
}
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
// 省略异常处理代码
default:
break;
}
// 给所有Broker发送心跳
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}
RocketMQ 使用一个成员变量 serviceState 来记录和管理自身的服务状态,这实际上是状态模式 (State Pattern) 这种设计模式的变种实现。状态模式允许一个对象在其内部状态改变时改变它的行为,对象看起来就像是改变了它的类。
与标准的状态模式不同的是,它没有使用状态子类,而是使用分支流程(switch-case)来实现不同状态下的不同行为,在管理比较简单的状态时,使用这种设计会让代码更加简洁。这种模式非常广泛地用于管理有状态的类,推荐你在日常开发中使用。
在设计状态的时候,有两个要点是需要注意的,第一是,不仅要设计正常的状态,还要设计中间状态和异常状态,否则,一旦系统出现异常,你的状态就不准确了,你也就很难处理这种异常状态。比如在这段代码中,RUNNING
和 SHUTDOWN_ALREADY
是正常状态,CREATE_JUST
是一个中间状态,START_FAILED
是一个异常状态。
第二个要点是,将这些状态之间的转换路径考虑清楚,并在进行状态转换的时候,检查上一个状态是否能转换到下一个状态。比如,在这里,只有处于 CREATE_JUST 状态才能转换为 RUNNING 状态,这样就可以确保这个服务是一次性的,只能启动一次。从而避免了多次启动服务而导致的各种问题。
接下来看一下启动过程的实现:
- 通过一个单例模式(Singleton Pattern)的 MQClientManager 获取 MQClientInstance 的实例 mQClientFactory,没有则自动创建新的实例;
- 在 mQClientFactory 中注册自己;就是往Map里插一条。
- 启动 mQClientFactory;
- 给所有 Broker 发送心跳。
其中实例 mQClientFactory 对应的类 MQClientInstance 是 RocketMQ 客户端中的顶层类,大多数情况下,可以简单地理解为每个客户端对应类 MQClientInstance 的一个实例。这个实例维护着客户端的大部分状态信息,以及所有的 Producer、Consumer 和各种服务的实例。
MQClientInstance#start() 中的代码:
// 启动请求响应通道
this.mQClientAPIImpl.start();
// 启动各种定时任务
this.startScheduledTask();
// 启动拉消息服务
this.pullMessageService.start();
// 启动Rebalance服务
this.rebalanceService.start();
// 启动Producer服务
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
- 启动实例 mQClientAPIImpl,其中 mQClientAPIImpl 是类 MQClientAPIImpl 的实例,封装了客户端与 Broker 通信的方法;
- 启动各种定时任务,包括与 Broker 之间的定时心跳,定时与 NameServer 同步数据等任务;
- 启动拉取消息服务;
- 启动 Rebalance 服务;
- 启动默认的 Producer 服务。
2.消息发送过程
在 Producer 的接口 MQProducer 中,定义了 19 个不同参数的发消息的方法,按照发送方式不同可以分成三类:
- 单向发送(Oneway):发送消息后立即返回,不处理响应,不关心是否发送成功;
- 同步发送(Sync):发送消息后等待响应;
- 异步发送(Async):发送消息后立即返回,在提供的回调方法中处理响应。
DefaultMQProducerImpl#send()(对应源码中的 1132 行):
@Deprecated
public void send(final Message msg, final MessageQueueSelector selector, final Object arg, final SendCallback sendCallback, final long timeout)
throws MQClientException, RemotingException, InterruptedException {
final long beginStartTime = System.currentTimeMillis();
ExecutorService executor = this.getAsyncSenderExecutor();
try {
executor.submit(new Runnable() {
@Override
public void run() {
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeout > costTime) {
try {
try {
sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, sendCallback,
timeout - costTime);
} catch (MQBrokerException e) {
throw new MQClientException("unknownn exception", e);
}
} catch (Exception e) {
sendCallback.onException(e);
}
} else {
sendCallback.onException(new RemotingTooMuchRequestException("call timeout"));
}
}
});
} catch (RejectedExecutionException e) {
throw new MQClientException("exector rejected ", e);
}
}
RocketMQ 使用了一个 ExecutorService 来实现异步发送:使用 asyncSenderExecutor 的线程池,异步调用方法 sendSelectImpl(),继续发送消息的后续工作,当前线程把发送任务提交给 asyncSenderExecutor 就可以返回了。单向发送和同步发送的实现则是直接在当前线程中调用方法 sendSelectImpl()。
sendSelectImpl() 的实现:
// 省略部分代码
MessageQueue mq = null;
// 选择将消息发送到哪个队列(Queue)中
try {
List<MessageQueue> messageQueueList =
mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList());
Message userMessage = MessageAccessor.cloneMessage(msg);
String userTopic = NamespaceUtil.withoutNamespace(userMessage.getTopic(), mQClientFactory.getClientConfig().getNamespace());
userMessage.setTopic(userTopic);
mq = mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg));
} catch (Throwable e) {
throw new MQClientException("select message queue throwed exception.", e);
}
// 省略部分代码
// 发送消息
if (mq != null) {
return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout - costTime);
} else {
throw new MQClientException("select message queue return null.", null);
}
// 省略部分代码
方法 sendSelectImpl() 中主要的功能就是选定要发送的队列,然后调用方法 sendKernelImpl() 发送消息。
选择哪个队列发送由 MessageQueueSelector#select 方法决定。在这里 RocketMQ 使用了策略模式(Strategy Pattern),来解决不同场景下需要使用不同的队列选择算法问题。
sendKernelImpl() 主要功能就是构建发送消息的头 RequestHeader 和上下文 SendMessageContext,然后调用方法 MQClientAPIImpl#sendMessage(),将消息发送给队列所在的 Broker。
核心类:
- DefaultMQProducerImpl:Producer 的内部实现类,大部分 Producer 的业务逻辑,也就是发消息的逻辑,都在这个类中。
- MQClientInstance:这个类中封装了客户端一些通用的业务逻辑,无论是 Producer 还是 Consumer,最终需要与服务端交互时,都需要调用这个类中的方法;
- MQClientAPIImpl:这个类中封装了客户端服务端的 RPC,对调用者隐藏了真正网络通信部分的具体实现;
- NettyRemotingClient:RocketMQ 各进程之间网络通信的底层实现类。
二、Kafka 消费者源码分析
版本 2.2
Kafka 消费模型的几个要点:
- Kafka 的每个 Consumer(消费者)实例属于一个 ConsumerGroup(消费组);
- 在消费时,ConsumerGroup 中的每个 Consumer 独占一个或多个 Partition(分区);
- 对于每个 ConsumerGroup,在任意时刻,每个 Partition 至多有 1 个 Consumer 在消费;
- 每个 ConsumerGroup 都有一个 Coordinator(协调者)负责分配 Consumer 和 Partition 的对应关系,当 Partition 或是 Consumer 发生变更时,会触发 rebalance(重新分配)过程,重新分配 Consumer 与 Partition 的对应关系;
- Consumer 维护与 Coordinator 之间的心跳,这样 Coordinator 就能感知到 Consumer 的状态,在 Consumer 故障的时候及时触发 rebalance。
Kafka 的 Consumer Demo :
// 设置必要的配置信息
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 创建Consumer实例
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 订阅Topic
consumer.subscribe(Arrays.asList("foo", "bar"));
// 循环拉消息
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
整个消费流程主要聚焦在三个问题上:
- 订阅过程是如何实现的?
- Consumer 是如何与 Coordinator 协商,确定消费哪些 Partition 的?
- 拉取消息的过程是如何实现的?
1.订阅过程
public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {
acquireAndEnsureOpen();
try {
// 省略部分代码
// 重置订阅状态
this.subscriptions.subscribe(new HashSet<>(topics), listener);
// 更新元数据
metadata.setTopics(subscriptions.groupSubscription());
} finally {
release();
}
}
订阅的主流程主要更新了两个属性:一个是订阅状态 subscriptions,另一个是更新元数据中的 topic 信息。
- 订阅状态 subscriptions 主要维护了订阅的 topic 和 patition 的消费位置等状态信息。
- 属性 metadata 中维护了 Kafka 集群元数据的一个子集,包括集群的 Broker 节点、Topic 和 Partition 在节点上分布,以及Coordinator 给 Consumer 分配的 Partition 信息。
注:开始的 acquireAndEnsureOpen() 和 try-finally release(),作用就是保护这个方法只能单线程调用。
metadata.setTopics()
里面,这个方法的实现除了更新元数据类 Metadata 中的 topic 相关的一些属性以外,还调用了 Metadata.requestUpdate()
方法请求更新元数据。
public synchronized int requestUpdate() {
this.needUpdate = true;
return this.updateVersion;
}
这里面并没有真正发送更新元数据的请求,只是将需要更新元数据的标志位 needUpdate 设置为 true 就结束了。
总结:在订阅的实现过程中,Kafka 更新了订阅状态 subscriptions 和元数据 metadata 中的相关 topic 的一些属性,将元数据状态置为“需要立即更新”,但是并没有真正发送更新元数据的请求,整个过程没有和集群有任何网络数据交换。
2.拉取消息
这个流程的时序图如下:
在 KafkaConsumer.poll()
方法 (对应源码 1179 行) 里面,先后调用了 2 个私有方法:
- updateAssignmentMetadataIfNeeded(): 更新元数据。
- pollForFetches():拉取消息。
方法 updateAssignmentMetadataIfNeeded()
中,调用了 coordinator.poll()
方法,poll() 方法里面又调用了 client.ensureFreshMetadata()
方法,又调用了 client.poll() 方法,实现了与 Cluster 通信,在 Coordinator 上注册 Consumer 并拉取和更新元数据。至此,“元数据会在什么时候真正做一次更新”这个问题也有了答案。
类 ConsumerNetworkClient 封装了 Consumer 和 Cluster 之间所有的网络通信的实现,这个类是一个非常彻底的异步实现。它没有维护任何的线程,所有待发送的 Request 都存放在属性 unsent 中,返回的 Response 存放在属性 pendingCompletion 中。每次调用 poll() 方法的时候,在当前线程中发送所有待发送的 Request,处理所有收到的 Response。
方法 pollForFetches() 的实现:
private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(Timer timer) {
// 省略部分代码
// 如果缓存里面有未读取的消息,直接返回这些消息
final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
if (!records.isEmpty()) {
return records;
}
// 构造拉取消息请求,并发送
fetcher.sendFetches();
// 省略部分代码
// 发送网络请求拉取消息,等待直到有消息返回或者超时
client.poll(pollTimer, () -> {
return !fetcher.hasCompletedFetches();
});
// 省略部分代码
// 返回拉到的消息
return fetcher.fetchedRecords();
}
在方法 fetcher.sendFetches()
的实现里面,Kafka 根据元数据的信息,构造到所有需要的 Broker 的拉消息的 Request,然后调用 client.Send()
方法将这些请求异步发送出去。并且,注册了一个回调类来处理返回的 Response,所有返回的 Response 被暂时存放在 Fetcher.completedFetches
中。需要注意的是,这时的 Request 并没有被真正发给各个 Broker,而是被暂存在了 client.unsend 中等待被发送。
然后,在调用client.poll()
方法时,会真正将之前构造的所有 Request 发送出去,并处理收到的 Response。
最后,fetcher.fetchedRecords()
方法中,将返回的 Response 反序列化后转换为消息列表,返回给调用者。
参考资料:李玥——消息队列高手课
耶律妙月 发布了25 篇原创文章 · 获赞 0 · 访问量 1983 私信 关注标签:状态,队列,mQClientFactory,笔记,方法,发送,源码,消息,Consumer 来源: https://blog.csdn.net/qq_36089832/article/details/103990059