其他分享
首页 > 其他分享> > 聊聊 Kafka: Producer 的网络模型

聊聊 Kafka: Producer 的网络模型

作者:互联网

一、Producer 的网络模型

我们前面几篇有说 Producer 发送流程的源码分析,但那个是大的轮廓,涉及到发送很多相关的内容,比如:

那这篇老周主要来说下 Producer 的网络模型,这里直接给出 Producer 的网络模型图,如下:

在这里插入图片描述

从图中可以看出,KafkaProducer 相当于客户端,与 Sender 调用层交互,Sender 调用 NetworkClient,NetworkClient 调用 Selector,而 Selector 底层封装了 Java NIO 的相关接口。心中有了 Producer 的网络模型大致轮廓后,我们接下来就可以来分析 Producer 的网络模型。

二、Producer 与 Broker 的交互流程

我们在业务代码通过生产者 producer 调用 send 方法来发送消息,不难发现都是通过走 Producer 的实现类 KafkaProducer 的 send 方法:

在这里插入图片描述

2.1 org.apache.kafka.clients.producer.KafkaProducer#doSend

上面的两个 send 方法最终会走到 doSend 方法里来:

在这里插入图片描述

这块的源码老周在前两篇的 Producer 源码解析那一篇分析了的哈,这里主要说下与 Broker 通信的交互分析。主要有两点:

主要看下 sender.wakeup() 方法,主要作用就是将 Sender 线程从阻塞中唤醒。

2.2 org.apache.kafka.clients.producer.internals.Sender#wakeup

/**
 * Wake up the selector associated with this send thread
 */
public void wakeup() {
    this.client.wakeup();
}

/**
 * Interrupt the client if it is blocked waiting on I/O.
 */
@Override
public void wakeup() {
	this.selector.wakeup();
}

/**
 * Interrupt the nioSelector if it is blocked waiting to do I/O.
 */
@Override
public void wakeup() {
    this.nioSelector.wakeup();
}

不难发现,调用链是:

Sender -> NetworkClient -> Selector(Kafka 封装的)-> Selector(java.nio.channels.Selector Java NIO)

wakeup() 的主要作用就是唤醒阻塞在 select()/select(long) 上的线程,为什么要唤醒?因为注册了新的 channel 或者事件。

再回到 Kafka 这里,KafkaProducer 中 dosend() 方法调用 sender.wakeup() 方法作用就很明显了。作用就是:当有新的 RecordBatch 创建后,旧的 RecordBatch 就可以发送了,如果线程阻塞在 select() 方法中,就将其唤醒,Sender 重新开始运行 run() 方法,在这个方法中,旧的 RecordBatch 将会被选中,进而可以及时将这些请求发送出去。

2.3 org.apache.kafka.clients.producer.internals.Sender#run

在这里插入图片描述

跟到 runOnce 方法里去:

在这里插入图片描述

继续跟,核心是 Sender 线程每次循环具体执行的地方,即 sendProducerData() 方法:

在这里插入图片描述

最后调用 client.poll() 方法,关于 socket 的一些实际的读写操作。

我们来小结一下 Sender.run() 方法的大致流程,主要分为以下五步:

2.4 org.apache.kafka.clients.NetworkClient#poll

在这里插入图片描述

主要分为以下三步:

2.5 org.apache.kafka.common.network.Selector#poll

Kafka 中的 Selector 类主要是 Java NIO 相关接口的封装,Socket 相关 IO 操作都是在这个类中完成的。主要操作都是在下面这个方法中调用的:

在这里插入图片描述

2.5.1 org.apache.kafka.common.network.Selector#clear

在这里插入图片描述

2.5.2 org.apache.kafka.common.network.Selector#select

Selector.select() 方法底层还是调用的 Java NIO 的原生接口,这里的 nioSelector 其实就是 java.nio.channels.Selector 的实例对象,这个方法最坏情况下,会阻塞 ms 的时间,如果在一次轮询,只要有一个 Channel 的事件就绪,它就会立马返回。

/**
 * Check for data, waiting up to the given timeout.
 *
 * @param timeoutMs Length of time to wait, in milliseconds, which must be non-negative
 * @return The number of keys ready
 */
private int select(long timeoutMs) throws IOException {
    if (timeoutMs < 0L)
        throw new IllegalArgumentException("timeout should be >= 0");

    if (timeoutMs == 0L)
        return this.nioSelector.selectNow();
    else
        return this.nioSelector.select(timeoutMs);
}

2.5.3 org.apache.kafka.common.network.Selector#pollSelectionKeys

这部分代码是 socket IO 的主要部分,发送 Send 及接收 Receive 都是在这里完成的,在 poll() 方法中,这个方法会调用三次:

在这里插入图片描述

2.5.4 org.apache.kafka.common.network.Selector#addToCompletedReceives

/**
 * adds a receive to completed receives
 */
private void addToCompletedReceives(KafkaChannel channel, NetworkReceive networkReceive, long currentTimeMs) {
    if (hasCompletedReceive(channel))
        throw new IllegalStateException("Attempting to add second completed receive to channel " + 											channel.id());
		// 添加到 completedReceives 中
    this.completedReceives.put(channel.id(), networkReceive);
    sensors.recordCompletedReceive(channel.id(), networkReceive.size(), currentTimeMs);
}

接收到的所有 Receive 都会被放入到 completedReceives 的集合中等待后续处理。

三、总结

总结的话我就不复述了,上面的源码流程说的很清楚了。最后再来一张 Producer 网络模型的时序图:

在这里插入图片描述

希望对你有所帮助,我们下期再见。

标签:调用,Producer,Selector,发送,聊聊,wakeup,Kafka,连接
来源: https://blog.csdn.net/riemann_/article/details/121217620