编程语言
首页 > 编程语言> > Netty学习笔记(4) Netty源码 - accept 和 read流程

Netty学习笔记(4) Netty源码 - accept 和 read流程

作者:互联网

文章目录


前言

笔记基于黑马的Netty教学讲义加上自己的一些理解,感觉这是看过的视频中挺不错的,基本没有什么废话,视频地址:黑马Netty。下面是。

还是这一段代码:

public class TestSourceServer {
    public static void main(String[] args) {
        new ServerBootstrap()
                //EventLoop有一个线程和执行器selector,用于关注事件,解决一些任务
                .group(new NioEventLoopGroup())
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<NioSocketChannel>(){

                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new LoggingHandler());
                    }
                }).bind(8080);
    }
}



[1] 标记

1. nio中的accept回顾

  1. selector.selecr() 阻塞直到事件发生
  2. 遍历处理 selectedKeys
  3. 拿到一个 key,判断类型是不是 accept
  4. 创建 SocketChannel,设置非阻塞
  5. 将 SocketChannel注册到 selector
  6. 设置 SocketChannel 关注 read 事件



2. netty中的accept流程

接着上一篇文章 NioServerSocketChannel 的第10点,下面这里就是进入 accept事件,可以说到了这里完成了nio 的1,2,3点,而unsafe.read(); 完成剩下的三点
在这里插入图片描述
下面就是 read 方法中的代码,这里面我们主要观察4、5、6三点在哪会被执行

@Override
        public void read() {
            assert eventLoop().inEventLoop();
            final ChannelConfig config = config();
            final ChannelPipeline pipeline = pipeline();
            final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
            allocHandle.reset(config);

            boolean closed = false;
            Throwable exception = null;
            try {
                try {
                    do {
                    	//4、创建 SocketChannel,设置非阻塞
                    	//下面看了源码后这里是 localRead =1 
                        int localRead = doReadMessages(readBuf);
                        if (localRead == 0) {
                            break;
                        }
                        if (localRead < 0) {
                            closed = true;
                            break;
                        }

                        allocHandle.incMessagesRead(localRead);
                    } while (allocHandle.continueReading());
                } catch (Throwable t) {
                    exception = t;
                }

                int size = readBuf.size();
                for (int i = 0; i < size; i ++) {
                    readPending = false;
                    //pipeline:拿到NioServertSocketChannel的流水线
                    //调用上面的handler处理
                    //这一步其实上面的处理器只有三个 head-accept-end
                    //都是前面的文章说过的
                    pipeline.fireChannelRead(readBuf.get(i));
                }
                readBuf.clear();
                allocHandle.readComplete();
                pipeline.fireChannelReadComplete();

                if (exception != null) {
                    closed = closeOnReadError(exception);

                    pipeline.fireExceptionCaught(exception);
                }

                if (closed) {
                    inputShutdown = true;
                    if (isOpen()) {
                        close(voidPromise());
                    }
                }
            } finally {
                if (!readPending && !config.isAutoRead()) {
                    removeReadOp();
                }
            }
        }
    }

下面是重要的方法,作用写在上面了


1. int localRead = doReadMessages(readBuf)

@Override
protected int doReadMessages(List<Object> buf) throws Exception {
	//建立连接,创建SocketChannel返回
     SocketChannel ch = SocketUtils.accept(javaChannel());

     try {
         if (ch != null) {
         	 //创建了 NioSocketChannel,下面就是把NioSocketChannel当成一个消息放到结果里面
         	 //到时候pipeline上的处理器会获取到这些信息并进行处理
             buf.add(new NioSocketChannel(this, ch));
             return 1;
         }
     } catch (Throwable t) {
         logger.warn("Failed to create a new channel from an accepted socket.", t);

         try {
             ch.close();
         } catch (Throwable t2) {
             logger.warn("Failed to close a socket.", t2);
         }
     }

     return 0;
 }

//accept
public static SocketChannel accept(final ServerSocketChannel serverSocketChannel) throws IOException {
        try {
            return AccessController.doPrivileged(new PrivilegedExceptionAction<SocketChannel>() {
                @Override
                public SocketChannel run() throws IOException {
                	//调用serverSocketChannel.accept把连接建立完成
                    return serverSocketChannel.accept();
                }
            });
        } catch (PrivilegedActionException e) {
            throw (IOException) e.getCause();
        }
    }


总结:这个方法主要是



2. pipeline.fireChannelRead(readBuf.get(i))

运行到这一步,意思就是调用 pipeline 上的 handler 来处理消息,一旦调用就会跳转到下面 ServerBootstrapAcceptor 这个 accept 处理器的 read 方法中,下面就是这个方法的流程

 @Override
        @SuppressWarnings("unchecked")
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            final Channel child = (Channel) msg;
			//设置处理器
            child.pipeline().addLast(childHandler);
			//下面设置一些参数
            setChannelOptions(child, childOptions, logger);
            setAttributes(child, childAttrs);

            try {
            	//这时比较重要的一些流程,其实就是把一个新的 eventLoop
            	//在里面找到一个 selector 来和channel进行绑定,并设置一个
            	//线程监听绑定的channel的事件
                childGroup.register(child).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (!future.isSuccess()) {
                            forceClose(child, future.cause());
                        }
                    }
                });
            } catch (Throwable t) {
                forceClose(child, t);
            }
        }



1. childGroup.register(child).addListener(new ChannelFutureListener()

我们一层层进入这个方法
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述


注意下面这个方法为什么不走 if ,因为我们运行到这里的时候走的线程是 NioEventGroup 里面的 ServerSocketChannel 的线程,而我们新建的 SocketChannel 和 当前的线程应该不能是同一个才对
在这里插入图片描述


似曾相识的 doRegister(),这个方法在之前的文章(netty的源码中)也说过,里面的作用就是 把 nioServerSocketChannel 和 selector 绑定起来,并且没有关注事件,到这里,第五步完成,5. 将 SocketChannel注册到 selectornio步骤
在这里插入图片描述


我们在这个方法中继续运行,在 diRegister() 方法下面有一个方法,这个方法的作用就是触发我们新创建的 channel 上面的初始化事件我们继续运行之后就会来到我们编写的客户端的 initChannel 方法里面,主要的作用看名字也知道,就是添加处理器handler 的

在这里插入图片描述


我们继续沿着上面的代码向下走,到下面的 fireChannelActive() 方法中,这个方法就是用来关注 read 事件的

在这里插入图片描述
看这个方法的调用链
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
来到最终的调用方法,可以看到就是在这里调用了关注 read事件,至此,第六步完成 nio步骤,当然中间的调用链不用管,只是说看到这里能意识到最终确实是完成了 nio 中accept 流程的这步,只不过 netty 中对这六步做了层层的封装。

在这里插入图片描述



3. netty 中的 read 流程

还是这个方法,客户端连接上之后发送一条数据给服务端,注意第一次进入是accept,第二次进入才是 read,可以看到这里 readyOps 变成了 1
在这里插入图片描述

@Override
        public final void read() {
            final ChannelConfig config = config();
            if (shouldBreakReadReady(config)) {
                clearReadPending();
                return;
            }
            //获取 pipeline,要用到里面的 handler 处理器来处理
            final ChannelPipeline pipeline = pipeline();
            // 获取 ByteBuf,因为消息在这里面
            final ByteBufAllocator allocator = config.getAllocator();
            //allocHandle :动态调整上面的ByteBuf大小,使用直接内存,因为是 io操作,使用直接内存效率高
            final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
           
            allocHandle.reset(config);

            ByteBuf byteBuf = null;
            boolean close = false;
            try {
                do {
                	//分配具体的 ByteBuf,分完就可以读数据了
                    byteBuf = allocHandle.allocate(allocator);
                    //这个方法是读取客户端发送过来的数据
                    allocHandle.lastBytesRead(doReadBytes(byteBuf));
                    //证明读完了
                    if (allocHandle.lastBytesRead() <= 0) {
                        //没有东西读了,就把 ByteBuf 释放掉
                        byteBuf.release();
                        byteBuf = null;
                        close = allocHandle.lastBytesRead() < 0;
                        if (close) {
                            // There is nothing left to read as we received an EOF.
                            readPending = false;
                        }
                        break;
                    }
					//读一次消息就增加一次
                    allocHandle.incMessagesRead(1);
                    readPending = false;
                    //这也是个重要的方法,意思是调用我们服务端的handler来处理发送的消息
                    //这个方法调用之后就会进入我们写的handler那里
                    pipeline.fireChannelRead(byteBuf);
                    byteBuf = null;
                } while (allocHandle.continueReading());

                allocHandle.readComplete();
                pipeline.fireChannelReadComplete();

                if (close) {
                    closeOnRead(pipeline);
                }
            } catch (Throwable t) {
                handleReadException(pipeline, byteBuf, t, close, allocHandle);
            } finally {
                if (!readPending && !config.isAutoRead()) {
                    removeReadOp();
                }
            }
        }
    }




如有错误,欢迎指出!!!!

标签:Netty,pipeline,read,allocHandle,accept,源码,new,SocketChannel
来源: https://blog.csdn.net/laohuangaa/article/details/122751100