Netty网络框架学习笔记-18(NioEventLoop源码与处理器异步任务分析_2020.06.25)
作者:互联网
前言:
编写netty网络服务器的时候, 第一行代码, 就是创建线程组
NioEventLoopGroup bossGroup = new NioEventLoopGroup()
下面就来分析下, 其中一个
NioEventLoop
NioEventLoop关系
- 说明
- ScheduledExecutorService 接口表示是一个定时任务接口,EventLoop 可以接受定时任务。
- EventLoop 接口:Netty 接口文档说明该接口作用:一旦 Channel 注册了,就处理该 Channel 对应的所有 I/O 操作。
- SingleThreadEventExecutor 表示这是一个单个线程的线程池
- EventLoop 是一个单例的线程池,里面含有一个死循环的线程不断的做着 3 件事情:监听端口,处理端口事件,处理队列事件。每个 EventLoop 都可以绑定多个 Channel,而每个 Channel 始终只能由一个 EventLoop 来处理
NioEventLoop的创建
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
EventLoopTaskQueueFactory taskQueueFactory, EventLoopTaskQueueFactory tailTaskQueueFactory) {
super(parent, executor, false, newTaskQueue(taskQueueFactory), newTaskQueue(tailTaskQueueFactory),
rejectedExecutionHandler);
this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
final SelectorTuple selectorTuple = openSelector();
this.selector = selectorTuple.selector;
this.unwrappedSelector = selectorTuple.unwrappedSelector;
}
super => 创建
SingleThreadEventLoop
单线程protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) { this(parent, threadFactory, addTaskWakesUp, DEFAULT_MAX_PENDING_TASKS, RejectedExecutionHandlers.reject()); }
NioEventLoop-execute方法
NioEventLoop本身并没有实现该方法, 所以调用的是父类
SingleThreadEventExecutor # execute
private void execute(Runnable task, boolean immediate) {
// 判断当前线程是不是 EventLoop 线程
boolean inEventLoop = inEventLoop();
// 添加到任务队列里面
addTask(task);
// 如果不是当前线程, 则新启动一个线程
if (!inEventLoop) {
startThread();
// 如果线程已经停止,并且删除任务成功,则执行拒绝策略,默认是抛出RejectedExecutionException异常。
if (isShutdown()) {
boolean reject = false;
try {
if (removeTask(task)) {
reject = true;
}
} catch (UnsupportedOperationException e) {
// The task queue does not support removal so the best thing we can do is to just move on and
// hope we will be able to pick-up the task before its completely terminated.
// In worst case we will log on termination.
}
if (reject) {
reject();
}
}
}
// 如果 addTaskWakesUp 是 false,并且任务不是 NonWakeupRunnable 类型的,
// 就尝试唤醒 selector。这 个时候,阻塞在 selecor 的线程就会立即返回
if (!addTaskWakesUp && immediate) {
wakeup(inEventLoop);
}
}
doStartThread
private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run() {
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
// 执行run方法
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
for (;;) {
int oldState = state;
if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
break;
}
}
// Check if confirmShutdown() was called at the end of the loop.
if (success && gracefulShutdownStartTime == 0) {
if (logger.isErrorEnabled()) {
logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " +
"be called before run() implementation terminates.");
}
}
try {
// Run all remaining tasks and shutdown hooks. At this point the event loop
// is in ST_SHUTTING_DOWN state still accepting tasks which is needed for
// graceful shutdown with quietPeriod.
for (;;) {
if (confirmShutdown()) {
break;
}
}
// Now we want to make sure no more tasks can be added from this point. This is
// achieved by switching the state. Any new tasks beyond this point will be rejected.
for (;;) {
int oldState = state;
if (oldState >= ST_SHUTDOWN || STATE_UPDATER.compareAndSet(
SingleThreadEventExecutor.this, oldState, ST_SHUTDOWN)) {
break;
}
}
// We have the final set of tasks in the queue now, no more can be added, run all remaining.
// No need to loop here, this is the final pass.
confirmShutdown();
} finally {
try {
cleanup();
} finally {
// Lets remove all FastThreadLocals for the Thread as we are about to terminate and notify
// the future. The user may block on the future and once it unblocks the JVM may terminate
// and start unloading classes.
// See https://github.com/netty/netty/issues/6596.
FastThreadLocal.removeAll();
STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
threadLock.countDown();
int numUserTasks = drainTasks();
if (numUserTasks > 0 && logger.isWarnEnabled()) {
logger.warn("An event executor terminated with " +
"non-empty task queue (" + numUserTasks + ')');
}
terminationFuture.setSuccess(null);
}
}
}
}
});
}
- 首先调用 executor 的 execute 方法,这个 executor 就是在创建 Event LoopGroup 的时候创建的 ThreadPerTaskExecutor 类。该 execute 方法会将 Runnable 包装成 Netty 的 FastThreadLocalThread。
- 任务中,首先判断线程中断状态,然后设置最后一次的执行时间。
- 执行当前 NioEventLoop 的 run 方法,注意:这个方法是个死循环,是整个 EventLoop 的核心
- 在 finally 块中,使用 CAS 不断修改 state 状态,改成 ST_SHUTTING_DOWN。也就是当线程 Loop 结束的时候。关闭线程。最后还要死循环确认是否关闭,否则不会 break。然后,执行 cleanup 操作,更新状态为ST_TERMINATED,并释放当前线程锁。如果任务队列不是空,则打印队列中还有多少个未完成的任务。 并回调 terminationFuture 方法。
- 其实最核心的就是 Event Loop 自身的 run 方法。再继续深入 run 方法
EventLoop 作为 Netty 的核心的运行机制总结
- 每次执行
execute
方法都是向队列中添加任务。当第一次添加时就启动线程,执行 run 方法,而 run 方法是整个 EventLoop 的核心,就像 EventLoop 的名字一样,Loop Loop ,不停的 Loop ,Loop 做什么呢?做 3 件 事情。
-
调用 selector 的 select 方法,默认阻塞一秒钟,如果有定时任务,则在定时任务剩余时间的基础上在加上 0.5 秒进行阻塞。当执行 execute 方法的时候,也就是添加任务的时候,唤醒 selecor,防止 selecotr 阻塞时间过长。
-
当 selector 返回的时候,回调用 processSelectedKeys 方法对 selectKey 进行处理。
-
当 processSelectedKeys 方法执行结束后,则按照 ioRatio 的比例执行 runAllTasks 方法,默认是 IO 任务时间和非 IO 任务时间是相同的,你也可以根据你的应用特点进行调优 。比如 非 IO 任务比较多,那么你就将
ioRatio
调小一点,这样非 IO 任务就能执行的长一点。防止队列积攒过多的任务。
handler中加入线程池和Context 中添加线程池分析
在 Netty 中做耗时的,不可预料的操作,比如数据库,网络请求,会严重影响 Netty 对 Socket 的处理速度。
而解决方法就是将耗时任务添加到异步线程池中。但就添加线程池这步操作来讲,可以有 2 种方式,而且这 2 种方式实现的区别也蛮大的。
第一种handler中加入线程池:
服务端与客户端启动都是一样的, 处理器中内容不一样, 这里只显示不同处
假设在自定义服务端处理器中这样写:
将这个任务提交到了一个自定义的业务线程池中,这样,就不会阻塞 Netty 的 IO 线程。
private static final EventExecutorGroup defaultEventExecutorGroup = new DefaultEventExecutorGroup(16);
private static final EventExecutor unorderedThreadPoolEventExecutor = new UnorderedThreadPoolEventExecutor(16);
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
log.info("NettyGroupServiceHandle-读取到信息:{}", msg);
log.info("NettyGroupServiceHandle-读取到多少次信息:{}", ++count);
defaultEventExecutorGroup.execute(() -> {
String name = Thread.currentThread().getName();
log.info("NettyGroupServiceHandle===线程名称:{}, 进行一个耗时10秒的任务",name);
ThreadUtil.sleep(10, TimeUnit.SECONDS);
log.info("NettyGroupServiceHandle===线程名称:{}, 一个耗时10秒的任务完成",name);
ctx.writeAndFlush(String.format("线程%s, 10秒任务完成了!!!, 时间戳: %s",name,System.currentTimeMillis()));
});
unorderedThreadPoolEventExecutor.execute(() -> {
String name = Thread.currentThread().getName();
log.info("NettyGroupServiceHandle===线程名称:{}, 进行一个耗时10秒的任务", name);
ThreadUtil.sleep(10, TimeUnit.SECONDS);
log.info("NettyGroupServiceHandle===线程名称:{}, 一个耗时10秒的任务完成",name);
ctx.writeAndFlush(String.format("线程%s, 10秒任务完成了!!!, 时间戳: %s",name,System.currentTimeMillis()));
});
}
客户端处理器
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
log.info("NettyGroupClientHandle-读取到信息:{}", msg);
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
// 通道激活, 模拟3个几乎同时发送消息的请求
EventExecutor executor = ctx.executor();
executor.schedule(()-> sendUUID(ctx),1000, TimeUnit.MILLISECONDS);
executor.schedule(()-> sendUUID(ctx),900, TimeUnit.MILLISECONDS);
executor.schedule(()-> sendUUID(ctx),800, TimeUnit.MILLISECONDS);
}
private void sendUUID(ChannelHandlerContext ctx) {
for (int i = 0; i < 10; i++) {
ctx.writeAndFlush(UUID.fastUUID() +"||");
}
}
结果:
## 服务端日志
15:12:55.639 [nioEventLoopGroup-3-1] INFO com.zhihao.netty.tcp.eventExecutorGroup.NettyGroupServiceHandle - NettyGroupServiceHandle-读取到信息:172f832a-f7a0-42ae-9dc0-54f866e0cbc8
15:12:55.639 [nioEventLoopGroup-3-1] INFO com.zhihao.netty.tcp.eventExecutorGroup.NettyGroupServiceHandle - NettyGroupServiceHandle-读取到多少次信息:1
15:12:55.665 [defaultEventExecutorGroup-4-1] INFO com.zhihao.netty.tcp.eventExecutorGroup.NettyGroupServiceHandle - NettyGroupServiceHandle===线程名称:defaultEventExecutorGroup-4-1, 进行一个耗时10秒的任务
15:12:55.667 [unorderedThreadPoolEventExecutor-5-1] INFO com.zhihao.netty.tcp.eventExecutorGroup.NettyGroupServiceHandle - NettyGroupServiceHandle===线程名称:unorderedThreadPoolEventExecutor-5-1, 进行一个耗时10秒的任务
15:12:55.704 [nioEventLoopGroup-3-1] INFO com.zhihao.netty.tcp.eventExecutorGroup.NettyGroupServiceHandle - NettyGroupServiceHandle-读取到信息:b9f07858-92c6-47d7-b1b4-0b2835398641
15:12:55.704 [nioEventLoopGroup-3-1] INFO com.zhihao.netty.tcp.eventExecutorGroup.NettyGroupServiceHandle - NettyGroupServiceHandle-读取到多少次信息:2
15:12:55.704 [defaultEventExecutorGroup-4-2] INFO com.zhihao.netty.tcp.eventExecutorGroup.NettyGroupServiceHandle - NettyGroupServiceHandle===线程名称:defaultEventExecutorGroup-4-2, 进行一个耗时10秒的任务
15:12:55.704 [unorderedThreadPoolEventExecutor-5-2] INFO com.zhihao.netty.tcp.eventExecutorGroup.NettyGroupServiceHandle - NettyGroupServiceHandle===线程名称:unorderedThreadPoolEventExecutor-5-2, 进行一个耗时10秒的任务
15:12:55.814 [nioEventLoopGroup-3-1] INFO com.zhihao.netty.tcp.eventExecutorGroup.NettyGroupServiceHandle - NettyGroupServiceHandle-读取到信息:1a16bfdb-fc0d-47a3-a7e3-525db2b93606
15:12:55.814 [nioEventLoopGroup-3-1] INFO com.zhihao.netty.tcp.eventExecutorGroup.NettyGroupServiceHandle - NettyGroupServiceHandle-读取到多少次信息:3
15:12:55.814 [defaultEventExecutorGroup-4-3] INFO com.zhihao.netty.tcp.eventExecutorGroup.NettyGroupServiceHandle - NettyGroupServiceHandle===线程名称:defaultEventExecutorGroup-4-3, 进行一个耗时10秒的任务
15:12:55.814 [unorderedThreadPoolEventExecutor-5-3] INFO com.zhihao.netty.tcp.eventExecutorGroup.NettyGroupServiceHandle - NettyGroupServiceHandle===线程名称:unorderedThreadPoolEventExecutor-5-3, 进行一个耗时10秒的任务
### --------------------------------------------------------------------------------------
15:13:05.670 [unorderedThreadPoolEventExecutor-5-1] INFO com.zhihao.netty.tcp.eventExecutorGroup.NettyGroupServiceHandle - NettyGroupServiceHandle===线程名称:unorderedThreadPoolEventExecutor-5-1, 一个耗时10秒的任务完成
15:13:05.670 [defaultEventExecutorGroup-4-1] INFO com.zhihao.netty.tcp.eventExecutorGroup.NettyGroupServiceHandle - NettyGroupServiceHandle===线程名称:defaultEventExecutorGroup-4-1, 一个耗时10秒的任务完成
15:13:05.716 [defaultEventExecutorGroup-4-2] INFO com.zhihao.netty.tcp.eventExecutorGroup.NettyGroupServiceHandle - NettyGroupServiceHandle===线程名称:defaultEventExecutorGroup-4-2, 一个耗时10秒的任务完成
15:13:05.716 [unorderedThreadPoolEventExecutor-5-2] INFO com.zhihao.netty.tcp.eventExecutorGroup.NettyGroupServiceHandle - NettyGroupServiceHandle===线程名称:unorderedThreadPoolEventExecutor-5-2, 一个耗时10秒的任务完成
15:13:05.824 [defaultEventExecutorGroup-4-3] INFO com.zhihao.netty.tcp.eventExecutorGroup.NettyGroupServiceHandle - NettyGroupServiceHandle===线程名称:defaultEventExecutorGroup-4-3, 一个耗时10秒的任务完成
15:13:05.824 [unorderedThreadPoolEventExecutor-5-3] INFO com.zhihao.netty.tcp.eventExecutorGroup.NettyGroupServiceHandle - NettyGroupServiceHandle===线程名称:unorderedThreadPoolEventExecutor-5-3, 一个耗时10秒的任务完成
### 客户端日志
15:13:05.676 [nioEventLoopGroup-2-1] INFO com.zhihao.netty.tcp.eventExecutorGroup.NettyGroupClientHandle - NettyGroupClientHandle-读取到信息:线程unorderedThreadPoolEventExecutor-5-1, 10秒任务完成了!!!, 时间戳: 1656400385670线程defaultEventExecutorGroup-4-1, 10秒任务完成了!!!, 时间戳: 1656400385670
15:13:05.717 [nioEventLoopGroup-2-1] INFO com.zhihao.netty.tcp.eventExecutorGroup.NettyGroupClientHandle - NettyGroupClientHandle-读取到信息:线程defaultEventExecutorGroup-4-2, 10秒任务完成了!!!, 时间戳: 1656400385716线程unorderedThreadPoolEventExecutor-5-2, 10秒任务完成了!!!, 时间戳: 1656400385716
15:13:05.825 [nioEventLoopGroup-2-1] INFO com.zhihao.netty.tcp.eventExecutorGroup.NettyGroupClientHandle - NettyGroupClientHandle-读取到信息:线程unorderedThreadPoolEventExecutor-5-3, 10秒任务完成了!!!, 时间戳: 1656400385824线程defaultEventExecutorGroup-4-3, 10秒任务完成了!!!, 时间戳: 1656400385824
在 channelRead 方法,模拟了一个耗时 10 秒的操作,我们将这个任务提交到了一个自定义的业务线程池中,这样,就不会阻塞 Netty 的 处理请求的 IO 线程。
当IO线程轮询到一个socket事件,然后,IO线程开始处理,当走到耗时handler的时候,将耗时任务交给业务线程池。
当耗时任务执行完毕再执行
ctx.writeAndFlush
方法的时候,会将这个任务交给处理 IO 线程。
第二种添加处理器时添加线程池
private static final EventExecutorGroup defaultEventExecutorGroup = new DefaultEventExecutorGroup(16);
private static final EventExecutor unorderedThreadPoolEventExecutor = new UnorderedThreadPoolEventExecutor(16);
public static void main(String[] args) {
NioEventLoopGroup bossGroup = new NioEventLoopGroup(2);
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap = bootstrap.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.handler(new LoggingHandler(LogLevel.INFO)) // boosGroup服务器处理程序, 日志
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 添加框架提供的字符串编解码处理器
pipeline.addLast(new StringEncoder());
pipeline.addLast(new StringDecoder());
// 添加自己的处理器
pipeline.addLast(defaultEventExecutorGroup,"one",new NettyGroupServiceHandleTwo());
//pipeline.addLast(unorderedThreadPoolEventExecutor,"two",new NettyGroupServiceHandleTwo());
}
});
try {
ChannelFuture channelFuture = bootstrap.bind(new InetSocketAddress("127.0.0.1", 8888)).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
服务端处理器
public class NettyGroupServiceHandleTwo extends SimpleChannelInboundHandler<String> {
private static int count = 0;
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
String name = Thread.currentThread().getName();
log.info("NettyGroupServiceHandleTwo===线程名称:{}, 进行一个耗时10秒的任务", name);
ThreadUtil.sleep(10, TimeUnit.SECONDS);
log.info("NettyGroupServiceHandleTwo===线程名称:{}, 一个耗时10秒的任务完成", name);
ctx.writeAndFlush(String.format("线程%s, 10秒任务完成了!!!, 时间戳: %s", name, System.currentTimeMillis()));
// 传递给下一个处理器
ctx.fireChannelRead(msg);
}
}
使用
DefaultEventExecutorGroup
结果:
15:52:52.149 [defaultEventExecutorGroup-2-1] INFO com.zhihao.netty.tcp.eventExecutorGroup.NettyGroupServiceHandleTwo - NettyGroupServiceHandleTwo===线程名称:defaultEventExecutorGroup-2-1, 进行一个耗时10秒的任务
15:53:02.161 [defaultEventExecutorGroup-2-1] INFO com.zhihao.netty.tcp.eventExecutorGroup.NettyGroupServiceHandleTwo - NettyGroupServiceHandleTwo===线程名称:defaultEventExecutorGroup-2-1, 一个耗时10秒的任务完成
15:53:02.163 [defaultEventExecutorGroup-2-1] INFO com.zhihao.netty.tcp.eventExecutorGroup.NettyGroupServiceHandleTwo - NettyGroupServiceHandleTwo===线程名称:defaultEventExecutorGroup-2-1, 进行一个耗时10秒的任务
15:53:12.176 [defaultEventExecutorGroup-2-1] INFO com.zhihao.netty.tcp.eventExecutorGroup.NettyGroupServiceHandleTwo - NettyGroupServiceHandleTwo===线程名称:defaultEventExecutorGroup-2-1, 一个耗时10秒的任务完成
15:53:12.176 [defaultEventExecutorGroup-2-1] INFO com.zhihao.netty.tcp.eventExecutorGroup.NettyGroupServiceHandleTwo - NettyGroupServiceHandleTwo===线程名称:defaultEventExecutorGroup-2-1, 进行一个耗时10秒的任务
15:53:22.186 [defaultEventExecutorGroup-2-1] INFO com.zhihao.netty.tcp.eventExecutorGroup.NettyGroupServiceHandleTwo - NettyGroupServiceHandleTwo===线程名称:defaultEventExecutorGroup-2-1, 一个耗时10秒的任务完成
### 客户端日志
15:53:02.167 [nioEventLoopGroup-2-1] INFO com.zhihao.netty.tcp.eventExecutorGroup.NettyGroupClientHandle - NettyGroupClientHandle-读取到信息:线程defaultEventExecutorGroup-2-1, 10秒任务完成了!!!, 时间戳: 1656402782161
15:53:12.176 [nioEventLoopGroup-2-1] INFO com.zhihao.netty.tcp.eventExecutorGroup.NettyGroupClientHandle - NettyGroupClientHandle-读取到信息:线程defaultEventExecutorGroup-2-1, 10秒任务完成了!!!, 时间戳: 1656402792176
15:53:22.186 [nioEventLoopGroup-2-1] INFO com.zhihao.netty.tcp.eventExecutorGroup.NettyGroupClientHandle - NettyGroupClientHandle-读取到信息:线程defaultEventExecutorGroup-2-1, 10秒任务完成了!!!, 时间戳: 1656402802186
从结果看出, 客户端是同一个的情况下, 无论是同时并发写的情况下, 都是使用线程池中的一个线程进行处理, 并且需等上一个通道中的线程执行完毕, 在处理下一个, 这就很尴尬了!
使用
pipeline.addLast(unorderedThreadPoolEventExecutor,"two",new NettyGroupServiceHandleTwo());
结果:
15:56:35.712 [unorderedThreadPoolEventExecutor-3-1] INFO com.zhihao.netty.tcp.eventExecutorGroup.NettyGroupServiceHandleTwo - NettyGroupServiceHandleTwo===线程名称:unorderedThreadPoolEventExecutor-3-1, 进行一个耗时10秒的任务
15:56:35.776 [unorderedThreadPoolEventExecutor-3-3] INFO com.zhihao.netty.tcp.eventExecutorGroup.NettyGroupServiceHandleTwo - NettyGroupServiceHandleTwo===线程名称:unorderedThreadPoolEventExecutor-3-3, 进行一个耗时10秒的任务
15:56:35.886 [unorderedThreadPoolEventExecutor-3-2] INFO com.zhihao.netty.tcp.eventExecutorGroup.NettyGroupServiceHandleTwo - NettyGroupServiceHandleTwo===线程名称:unorderedThreadPoolEventExecutor-3-2, 进行一个耗时10秒的任务
15:56:45.722 [unorderedThreadPoolEventExecutor-3-1] INFO com.zhihao.netty.tcp.eventExecutorGroup.NettyGroupServiceHandleTwo - NettyGroupServiceHandleTwo===线程名称:unorderedThreadPoolEventExecutor-3-1, 一个耗时10秒的任务完成
15:56:45.785 [unorderedThreadPoolEventExecutor-3-3] INFO com.zhihao.netty.tcp.eventExecutorGroup.NettyGroupServiceHandleTwo - NettyGroupServiceHandleTwo===线程名称:unorderedThreadPoolEventExecutor-3-3, 一个耗时10秒的任务完成
15:56:45.896 [unorderedThreadPoolEventExecutor-3-2] INFO com.zhihao.netty.tcp.eventExecutorGroup.NettyGroupServiceHandleTwo - NettyGroupServiceHandleTwo===线程名称:unorderedThreadPoolEventExecutor-3-2, 一个耗时10秒的任务完成
### 客户端日志
15:56:45.728 [nioEventLoopGroup-2-1] INFO com.zhihao.netty.tcp.eventExecutorGroup.NettyGroupClientHandle - NettyGroupClientHandle-读取到信息:线程unorderedThreadPoolEventExecutor-3-1, 10秒任务完成了!!!, 时间戳: 1656403005723
15:56:45.786 [nioEventLoopGroup-2-1] INFO com.zhihao.netty.tcp.eventExecutorGroup.NettyGroupClientHandle - NettyGroupClientHandle-读取到信息:线程unorderedThreadPoolEventExecutor-3-3, 10秒任务完成了!!!, 时间戳: 1656403005785
15:56:45.897 [nioEventLoopGroup-2-1] INFO com.zhihao.netty.tcp.eventExecutorGroup.NettyGroupClientHandle - NettyGroupClientHandle-读取到信息:线程unorderedThreadPoolEventExecutor-3-2, 10秒任务完成了!!!, 时间戳: 1656403005896
从结果可以看出来, 在客户端多次调用写的时候, 服务端并不会把 线程池中的某一个线程和管道绑定, 而是每次客户端新写的时候, 都会使用一个不同的线程来处理, 由于使用的是不同的线程, 根据不存在需要等待上一个线程处理完毕在处理下一个, 大大提高了系统的并发能力。
然后就是,
ctx.writeAndFlush()
方法, 和上面debug的结果是一样的,不同的是读取消息的时候, 会不同下面可以debug看看
当我们在调用addLast方法添加线程池后,handler将优先使用这个线程池,如果不添加,将使用处理请求的 IO线程池
当走到AbstractChannelHandlerContext的invokeChannelRead方法的时候,executor.inEventLoop()是不会通过的,因为当前线程是IO线程context(t也就是Handler)的executor是业务线程,所以会异步执行
两种方式比较
- 第一种
handler中添加异步,可能更加的自由,比如如果需要访问数据库,那我就异步,如果不需要,就不异步,异步会拖长接口响应时间。因为需要将任务放进mpscTask中。如果IO时间很短,task很多,可能一个循环下来,都没时间执行整个task,导致响应时间达不到指标。
- 第二种
是Netty标准方式(即加入到队列),但是,这么做会将整个handler都交给业务线程池。不论耗时不耗时,都加入到队列里,不够灵活。
最终根据业务方案自定义选择。
- 问题: 为什么不使用JDK提供的线程池来做额外耗时的业务处理?
因为netty设计线程组的目的就是去掉JDK锁竞争, 无锁化运行
netty中文档也建议使用对应提供的线程组
1
标签:NioEventLoop,Netty,NettyGroupServiceHandle,netty,INFO,源码,线程,com,10 来源: https://www.cnblogs.com/zhihao-plus/p/16419982.html