编程语言
首页 > 编程语言> > 【Netty源码分析摘录】(七)新连接的接入

【Netty源码分析摘录】(七)新连接的接入

作者:互联网

文章目录

1.问题

当 netty 的服务端启动以后,就可以开始接收客户端的连接了。那么在 netty 中,服务端是如何来进行新连接的创建的呢?在开始进行源码阅读之前,可以先思考以下三个问题。

2.检测新连接接入

在上一篇文章Netty 源码分析系列之 NioEventLoop 的执行流程中,分析了 NioEventLoop 线程在启动后,会不停地去循环处理网络 IO 事件、普通任务和定时任务。在处理网络 IO 事件时,当轮询到 IO 事件类型为 OP_ACCEPT 时(如下代码所示),就表示有新客户端来连接服务端了,也就是检测到了新连接。这个时候,服务端 channel 就会进行新连接的读取。

public final class NioEventLoop  {

    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
      //注意入参ch,ch是NioServerSocketChannel,即服务器端通道,得到的unsafe是NioMessageUnsafe 实例
       final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
       .....
		if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
		    unsafe.read();
		}

可以看到,当是 OP_ACCEPT 事件时,就会调用unsafe.read() 方法来进行新连接的接入。此时 unsafe 对象是 NioMessageUnsafe(服务端通道的Unsafe) 类型的实例,为什么呢?因为只有服务端 channel 才会对 OP_ACCEPT 事件感兴趣,而服务端 channel 中 unsafe 属性保存的是 NioMessageUnsafe 类型的实例。

注意processSelectedKey()方法的入参ch,ch是NioServerSocketChannel,即服务器端通道,unsafe 对象也是由NioServerSocketChannel得到的。

public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
  
    private final class NioMessageUnsafe extends AbstractNioUnsafe {  //内部类

        private final List<Object> readBuf = new ArrayList<Object>();

        @Override
        public void read() {
      
            final ChannelPipeline pipeline = pipeline();
            try {
                try {
                    do {
                        //【1】调用 doReadMessages()方法来读取连接
                        int localRead = doReadMessages(readBuf);
                       .....
                    } while (allocHandle.continueReading());
                } catch (Throwable t) {
                    exception = t;
                }

                int size = readBuf.size();
                for (int i = 0; i < size; i ++) {
                    readPending = false;
                    //【2】通过服务端 channel 中的 pipeline 来进行传播
                    pipeline.fireChannelRead(readBuf.get(i));
                }
                readBuf.clear();
                allocHandle.readComplete();
                pipeline.fireChannelReadComplete();

read()方法的源码很长,但它主要干了两件事:

3.创建客户端 channel

服务端 channel 在监听到 OP_ACCEPT 事件后,会为新连接创建一个客户端 channel,后面数据的读写均是通过这个客户端 channel 来进行的。而这个客户端 channel 是通过 doReadMessages()方法来创建的,该方法是定义在 NioServerSocketChannel 中的,下面是其源码。

public class NioServerSocketChannel  {

   protected int doReadMessages(List<Object> buf) throws Exception {
    //javaChannel()获取原生的服务端channel,再调用SocketUtils.accept获取客户端的channel
    SocketChannel ch = SocketUtils.accept(javaChannel());
    try {
        if (ch != null) {
            // 将原生的客户端channel包装成netty中的客户端channel:NioSocketChannel
            buf.add(new NioSocketChannel(this, ch));
            return 1;
        }
    } catch (Throwable t) {
        // 异常日志打印等...
    }
    return 0;
}

在该方法中,首先会通过 javaChannel()获取到 JDK 原生的服务端 channel,即 ServerSocketChannel,这个原生的服务端 channel 是被保存在 NioServerSocketChannelch属性中,在初始化 NioServerSocketChannel 时会对ch属性赋值(可以参考这篇文章:【Netty源码分析摘录】(三)服务端Channel初始化)。

创建完 JDK 原生的服务端 channel 后,会通过 SocketUtils 这个工具类来创建一个 JDK 原生的客户端 channel,即 SocketChannel。SocketUtils 这个工具类的底层实现,实际上就是调用 JDK 原生的 API,即 ServerSocketChannel.accept()

在创建完原生的 SocketChannel 后,netty 需要将其包装成 netty 中定义的服务端 channel 类型,即:NioSocketChannel。如何包装的呢?通过 new 关键字调用 NioSocketChannel 的构造方法来进行包装。在构造方法中,做了很多初始化工作。跟踪源码,发现会调用到 AbstractNioChannel 类的如下构造方法。

protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    // 【4】此时的parent = NioServerSocketChannel,ch = SocketChannel(JDK原生的客户端channel),readInterestOp = OP_READ
    super(parent);
    // 【1】保存channel
    this.ch = ch;
    //【2】保存channe感兴趣的事件
    this.readInterestOp = readInterestOp;
    try {
        //【3】 设置为非阻塞
        ch.configureBlocking(false);
    } catch (IOException e) {
        // 异常处理...
    }
}

在该构造方法中:

AbstractChannel 类的构造方法源码如下:

protected AbstractChannel(Channel parent) {
    // parent的值为NioServerSocketChannel
    this.parent = parent;
    id = newId();
    // 对于客户端channel而言,创建的unsafe是NioSocketChannelUnsafe
    unsafe = newUnsafe();
    // DefaultChannelPipeline
    pipeline = newChannelPipeline();
}

在该构造方法中,对于客户端 channel 而言,parent 的值为 NioServerSocketChannel,也就是 netty 服务端启动时创建的服务端 channel。然后创建的 unsafe 是 NioSocketChannelUnsafe,最后会为客户端 channel 创建一个默认的 pipeline,此时 pipeline 的结构如下。(如果看过前几篇文章,可能会发现,服务端 channel 在创建时也会调用到该构造方法)

注意:服务端通道和客户端通道的相关类:
在这里插入图片描述
客户端通道内的pipeline (还记得服务端的pipeline 吗,因此存在2个pipeline ):
在这里插入图片描述

最终还会为 NioSocketChannel 创建一个 NioSocketChannelConfig 对象,这个对象是用来保存用户为客户端 channel 设置的一些 TCP 配置和属性,在创建这个 config 对象时,会将 TCP 的 TCP_NODELAY 参数设置为 true。TCP 在默认情况下,会将小的数据包积攒成大的数据包以后才发出去,而 netty 为了及时地 i 将较小的数据报发送出去,因此将 TCP_NODELAY 参数设置为 true,表示不延迟发送。

至此,新连接对应的客户端 channel 就创建完成了,后面网络数据的读写,都是基于这个 NioSocketChannel 来进行的。

4. 绑定 NioEventLoop

当客户端的 channel 创建完成后,在 read()方法中,就会通过 pipeline.fireChannelRead(socketChannel)这一行代码,将客户端 channel 通过 pipeline 进行传播,依次执行 pipeline 中每一个 handler 的 channelRead()方法。(注意,这儿的 pipeline 是服务端 channel 中保存的 pipeline,在创建客户端 channel 时,也会为每个新建的客户端 channel 创建一个 pipeline,这里千万不要搞混了)

在服务端启动的时候,服务端 channel 中 pipeline 的结构图如下(详细解释可以参考【Netty源码分析摘录】(三)服务端Channel初始化)。
在这里插入图片描述
该 pipeline 中,对于 head 和 tail 而言,它俩的 channelRead()方法没做什么实际意义的工作,直接是向下一个节点传播了,这里重要的是 ServerBootstrapAcceptor 节点的 channelRead()方法。该方法的源码如下:

private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {

   public void channelRead(ChannelHandlerContext ctx, Object msg) {
    final Channel child = (Channel) msg;

    // 【4.1】向客户端的channel中添加用户自定义的childHandler
    child.pipeline().addLast(childHandler);

    // 保存用户为客户端channel配置的属性
    setChannelOptions(child, childOptions, logger);

    for (Entry<AttributeKey<?>, Object> e: childAttrs) {
        child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
    }

    try {
        // 【4.2】将客户端channel注册到工作线程池,即从workerGroup中选择出一个NioEventloop,再将客户端channel绑定到NioEventLoop上
        childGroup.register(child).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    forceClose(child, future.cause());
                }
            }
        });
    } catch (Throwable t) {
        forceClose(child, t);
    }
}

在该方法中,【4.1】首先向客户端 channel 的 pipeline 中的节点中添加了一个 childHandler,这个 childHandler 是用户自己定义的,什么意思呢?如下图所示,用户通过 childHandler()方法自定义了一个 ChannelInitializer 类型的 childHandler,这个此时就会向客户端 channel 的 pipeline 中的节点中添加该 childHandler(这个地方很重要,后面会用到)。然后通过 setChannelOptions 保存用户为客户端 channel 配置的 TCP 参数和属性。

在这里插入图片描述
最重要的一步在【4.2】 childGroup.register(child),这一行代码会将客户端 channel 注册到 workerGroup 线程池中的某一个 NioEventLoop 上。(在服务端端口绑定的过程中,也是类似于调用 NioEventLoopGroup 的 register()方法,将服务端 channel 注册到 bossGroup 中的某一个 NioEventLoop 中)。

此时的 childGroup 是 workerGroup(Reactor 主从多线程线程模型中的从线程池),调用 register()方法时,会调用到如下方法。

public ChannelFuture register(Channel channel) {
    // next()方法会从workerGroup中选择出一个NioEventLoop
    return next().register(channel);
}

next()方法会从 workerGroup中选择出一个 NioEventLoop(关于 next()方法的详细介绍请参考: 《【Netty源码分析摘录】(五)NioEventLoop的创建与启动》,当然,那篇文章是bossGroup取一个NioEventLoop,逻辑是一样的),由于 NioEventLoop 继承了 SingleThreadEventLoop,所以这儿最后调用的是 SingleThreadEventLoop 中的如下的 register()方法。

public ChannelFuture register(final ChannelPromise promise) {
    ObjectUtil.checkNotNull(promise, "promise");
    /**
     *  对于客户端channel而言
     *  promise是DefaultChannelPromise
     *  promise.channel()获取到的是NioSocketChannel
     *  promise.channel().unsafe()得到的是NioSocketChannelUnsafe
     *  由于NioSocketChannelUnsafe继承了AbstractUnsafe,所以当调用unsafe.register()时,会调用到AbstractUnsafe类的register()方法
     */
    // this为NioEventLoop
    promise.channel().unsafe().register(this, promise);
    return promise;
}

这里的 unsafe()获取到的是 NioSocketChannelUnsafe 对象,由于 NioSocketChannelUnsafe 继承了 AbstractUnsafe,所以当调用 unsafe.register()时,会调用到 AbstractUnsafe 类的 register()方法。

该方法精简后的源码如下:

    protected abstract class AbstractUnsafe  {
  
    public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    // 省略部分代码....

    // 【4.3】对客户端channel而言,这一步是给NioSocketChannel的eventLoop属性赋值
    AbstractChannel.this.eventLoop = eventLoop;

    // 判断是同步执行register0(),还是异步执行register0()
    if (eventLoop.inEventLoop()) {
        // 同步执行
        register0(promise);
    } else {
        try {
        	// 提交到NioEventLoop线程中,异步执行
            eventLoop.execute(new Runnable() {
                @Override
                public void run() {
                    register0(promise);
                }
            });
        } catch (Throwable t) {
            // 省略部分代码
        }
    }
}

实际上,服务端 channel 注册到 NioEventLoop 上时,也是调用的到了该方法

对于客户端而言,在该方法中,通过如下一行代码【4.3】,就将客户端 channel 与一个 NioEventLoop 进行了绑定,这就回答了文章开头的第三个问题:

AbstractChannel.this.eventLoop = eventLoop;

接着会判断当前线程是否等于传入的 eventLoop 中保存的线程,这里肯定不是。为什么呢?因为当前线程是 bossGroup 线程组中的线程,而 eventLoop 是 workerGroup 线程组中的线程,所以这里会返回 false,那么就会异步执行 register0()方法。register0()方法的源码如下。

4.1 register0

    protected abstract class AbstractUnsafe   {

   private void register0(ChannelPromise promise) {
      try {
        // 省略部分代码...
        boolean firstRegistration = neverRegistered;
        /**
         * 【4.10】对于客户端的channel而言,doRegister()方法做的事情就是将服务端Channel注册到多路复用器上
         */
        doRegister();
        neverRegistered = false;
        registered = true;

        //【4.11】会执行handlerAdded方法
        pipeline.invokeHandlerAddedIfNeeded();

        safeSetSuccess(promise);
        //【4.12】通过在pipeline传播来执行每个ChannelHandler的channelRegistered()方法
        pipeline.fireChannelRegistered();

           // 如果客户端channel已经激活,就执行下面逻辑。
        if (isActive()) {
            if (firstRegistration) {
                pipeline.fireChannelActive();
            } else if (config().isAutoRead()) {
                beginRead();
            }
        }
    } catch (Throwable t) {
        // 省略部分代码...
    }
}

在 register0()方法中,有三步重要的逻辑:

下面分别来看看这三步都干了哪些事情:

4.1.1 doRegister()

doRegister()就是真正将客户端 channel 注册到多路复用器上的一步。doRegister()调用的是 AbstractNioChannel 类中的 doRegister()方法,删减后源码如下:

public class AbstractNioChannel {
  protected void doRegister() throws Exception {
    boolean selected = false;
    for (;;) {
        try {
          //javaChannel()得到的是客户端channel
            selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
            return;
        } catch (CancelledKeyException e) {
            // 异常处理......
        }
    }
}

AbstractNioChannel 是客户端channel和服务端channel的公共父类,因此方法逻辑是同一个,只不过实际执行时,根据子类的成员变量或重载的方法,实现各自的注册逻辑。

注意这里在调 JDK 原生的 register()方法时,第三个参数传入的是 this,此时 this 代表的就是当前的 NioSocketChannel 对象。将 this 作为一个 attachment 保存到多路复用器 Selector 上,这样做的好处就是,后面可以通过多路复用器 Selector 获取到客户端的 channel。

后面会单独讲述nio 技术中attachment 的作用

第二个参数传入的是 0,表示此时将客户端 channel 注册到多路复用器上,客户端 chennel 感兴趣的事件标识符是 0,即此时对任何事件都不感兴趣(在后面才会将感兴趣的事件设置为 OP_READ)。

和服务器的通道注册一样,此时尚未设置最主要的感兴趣的事件,都是稍后再设置

4.1.2 pipeline.invokeHandlerAddedIfNeeded()

当 doRegister()方法执行完以后,就会执行第二步:pipeline.invokeHandlerAddedIfNeeded()这一步做的事情就是回调 pipeline 中 handler 的 handlerAdded()方法。

4.1.3 pipeline.fireChannelRegistered()

往下执行,代码会执行到 pipeline.fireChannelRegistered(),也就是前面我们提到的第三步。这一步做的事情就是传播 Channel 注册事件,如何传播呢?就是沿着 pipeline 中的头结点这个 handler 开始,往后依次执行每个 handler 的 channelRegistered()方法。

在前面我们提到过,会向客户端 channel 的 pipeline 中添加一个 ChannelInitializer 类型的匿名类,因此在传播执行 channelRegistered()方法的时候,就会执行到该匿名类的 channelRegistered()方法,从而最终会执行该匿名类中重写的 initChannel(channel)方法,即如下图所示的代码。关于是如何调用到 initChannel(channel)方法中的,可以参考这篇文章:Netty 源码分析系列之服务端 Channel 注册,里面进行了很详细的分析。不过读源码最佳方式还是亲自动手,Debug 调试一下你也许会体会更深,更容易理解。

在这里插入图片描述

4.1.3 pipeline.fireChannelActive()

再次回到 register0()方法中,最后会判断 isActive()是否为 true,此时由于客户端 channel 已经注册到多路复用器上了,因此会返回 true,而且由于此时客户端 channel 是第一次注册,所以会 pipeline.fireChannelActive()这一行代码,也就是又会通过客户端 channel 的 pipeline 向下传播执行所有 handler 的 channelActive()方法,最终会调用到 AbstractChannel 的 doBeginRead()方法(这一步的调用过程很复杂,建议直接 DEBUG)。doBeginRead 方法的源码如下。

protected void doBeginRead() throws Exception {
    // Channel.read() or ChannelHandlerContext.read() was called
    final SelectionKey selectionKey = this.selectionKey;
    if (!selectionKey.isValid()) {
        return;
    }

    readPending = true;
    /**
     * 在客户端channel注册到多路复用器上时,将selectionKey的interestOps属性设置为了0
     * selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
     */
    final int interestOps = selectionKey.interestOps();
    /**
     * readInterestOp属性的值,是在NioSocketChannel的构造器中,被设置为SelectionKey.OP_READ
     */
    if ((interestOps & readInterestOp) == 0) {
        // 对于客户端channel而言,interestOps | readInterestOp运算的结果为OP_READ
        // 所以最终selectionKey感兴趣的事件为OP_READ事件,至此,客户端channel终于可以开始接收客户端的链接了。
        selectionKey.interestOps(interestOps | readInterestOp);
    }
}

至此,客户端 channel 感兴趣的就变成了 OP_READ 事件,那么接下来就可以进行数据的读写了。

设置感兴趣的事件,客户端通道的逻辑和服务端通道逻辑相似,都是在构造函数中提前设置的readInterestOp变量。

5.总结

本文主要分析了当一个新连接进来后,netty 服务端是如何为这个新连接创建客户端 channel 的,又是如何将其绑定到 NioEventLoop 线程中的。客户端 channel 注册过程与服务端 channel 的注册过程非常相似,调用过程几乎一样,所以建议先阅读这篇文章Netty 源码分析系列之服务端 Channel 注册。

参考
《Netty源码分析系列之新连接的接入》

标签:NioEventLoop,Netty,pipeline,源码,服务端,摘录,方法,channel,客户端
来源: https://blog.csdn.net/m0_45406092/article/details/117786834