其他分享
首页 > 其他分享> > Netty-组件(一)EventLoop

Netty-组件(一)EventLoop

作者:互联网

EventLoop

事件循环对象&事件循环组

EventLoop 本质上是一个单线程执行器(同时维护了一个selector),里面的run方法处理channel 上源源不断的IO事件,也就是集成NIO中负责分配任务和处理accept请求的boss线程

它的继承关系比较复杂:

事件循环组:

EventLoopGroup 就是一组EventLoop,channel 一般会调用EventLoopGroup 的register 方法来绑定其中的一个EventLoop,后续这个Channel 上的IO事件都由此EventLoop 来处理(保证了IO事件处理时的线程安全)

处理普通事件

执行普通任务的意义在于:

  • 异步处理
  • 事件分发的时候可以通过这种方式,将接下来一段代码的执行从一个线程传递给另一个线程
@Slf4j
public class TestEventLoop {

    public static void main(String[] args) {

        /**
         * 1、创建事件循环组
         *  NioEventLoopGroup 可以用于处理IO事件,定时事件,普通任务
         *  DefaultEventLoopGroup 只能处理定时事件和普通任务,并不能处理IO事件
         *
         *  一个事件循环是一个线程来维护的
         *  这里传入的参数是创建线程的个数,如果没有传入参数则线程默认数为1
         */
        EventLoopGroup group = new NioEventLoopGroup(2);

        /*2、group.next
        * 循环获取下一个事件循环对象,这样就保证负载均衡
        * 依次将不同的channel 注册到不同eventLoop 上
        * 就类似于NIO对于两个worker 的处理
        * */
        group.next().submit(() -> {
            /**
             * 将事件提交给某个事件循环对象去处理(交给一个线程去处理)
             * 实现一个异步处理
             */
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            log.debug("ok");
        });

        log.debug("main");
    }
}

image-20211110152447645

处理定时任务

使用意义:

  • 实现keep-live 的时候可以作为连接的保护而使用
  • 通过另一条线程规律执行某个线程
        /**
         * 启动一个定时任务,并以一定的频率来执行
         * 1、参数一是一个Runnable 接口类型的任务对象
         * 2、参数二是初始延时事件
         * 3、参数三是间隔时间
         * 4、执行单位
         */
        group.next().scheduleAtFixedRate(()->{
            log.info("ok");
        },0,1, TimeUnit.SECONDS);

image-20211110153213782

处理IO事件

@Slf4j
public class EventLoopServer {

    public static void main(String[] args) {
        new ServerBootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioServerSocketChannel.class)
                /*初始化器*/
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    /**
                     * 这个方法会在connection 建立后调用
                     * @param nioSocketChannel 泛型对象
                     * @throws Exception
                     */
                    @Override
                    protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {

                        /*向流水线中添加处理,这里添加一个自定义的处理器*/
                        nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                            @Override
                            /*这里表示关注的是读操作
                            没有StringDecoder的时候
                            Object msg 是ByteBuf类型的数据*/
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                ByteBuf buf = (ByteBuf) msg;
                                log.debug(buf.toString(StandardCharsets.UTF_8));
                            }
                        });
                    }
                })
                .bind(8080);
    }
}

image-20211111135551286

当服务器和客户端两个channel 建立连接之后,那么服务器会用同一个EventLoop 或者说同一个线程来处理这个客户端,建立一个绑定关系

image-20211111135947864

但是并不是说有多少个channel 就会建立多少了eventLoop,eventLoop的数量是既定的,所以会将channel 循环的分给eventLoop,以达到负载均衡的效果,其次就是上面说的每一个eventLoop 对一个channel 都会负责到底

分工细化一

将eventLoop 的职责划分可以划的更细一些,划分为boss 和worker

.channel 会将ServerSocketChannel 绑定到第一个EventLoop上

.childHandler 会将SocketChannel 绑定到参数二的EventLoopGroup 中的EventLoop 上,初始化器针对的是已经绑定后的SocketChannel

        new ServerBootstrap()
                /**
                 * 划分为boss 和 worker
                 * 第一个boss 只负责ServerSocketChannel accept 事件
                 * 第二个worker 只负责socketChannel 上的读写事件 线程数默认是CPU核心数*2
                 * 这里并不需要将第一个EventLoopGroup 的线程数设为一
                 * 因为这里的serverSocketChannel默认只会和第一个EventLoopGroup 中的一个进行绑定
                 * 而并没有多个ServerSocketChannel
                 * 也只会占用一个线程
                 */
                .group(new NioEventLoopGroup(),new NioEventLoopGroup())
                .channel(NioServerSocketChannel.class)

分工细化二

  public static void main(String[] args) {
        /*创建处理特殊事件的Group*/
        EventLoopGroup group = new DefaultEventLoop();
        new ServerBootstrap()
                .group(new NioEventLoopGroup(),new NioEventLoopGroup())
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                        nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                ByteBuf buf = (ByteBuf) msg;
                                log.debug(buf.toString(StandardCharsets.UTF_8));
                                /*将消息传递给下一个handler*/
                                ctx.fireChannelRead(msg);
                            }
                        }).addLast(group,"handler2",new ChannelInboundHandlerAdapter(){
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                super.channelRead(ctx, msg);
                            }
                        });
                    }
                })
                .bind(8080);
    }

切换线程

在流水线执行过程中,handler 执行是如何切换EventLoop的,例如:如何从NioEventLoop 切换到DefaultEventLoop 上执行的(换人)

image-20211111144854012

如果两个handler 绑定的是同一个线程(EventLoop)就可以直接调用,如果绑定的是不同线程,而是会将要调用的代码封装为一个Runnable 对象中然后传递给下一个handler的线程去处理

标签:Netty,group,EventLoop,线程,事件,组件,new,channel
来源: https://blog.csdn.net/weixin_46248230/article/details/122200657