Netty-NioEventLoop线程工作机制
作者:互联网
I/O读写操作原理
异步任务执行原理
定时任务执行原理
Netty多线程最佳实践
I/O读写操作原理
NioEventLoop作为Reactor线程,负责TCP连接的创建和接入,以及TCP消息的读写,Reactor线程职责如下:
- 作为NIO服务端,接受客户端的TCP连接
- 作为NIO客户端,向服务端发起TCP连接
- 读取通信对端的请求或者应答信息
- 向通信对端发送消息请求或者应答消息
由于Reactor模式使用的是异步非阻塞I/O,因此所有的I/O操作都不会导致阻塞,理论上一个线程可以独立处理所有I/O相关操作。但对于高负载、大并发的应用场景却不适合。原因如下:
- 一个NIO线程同时处理成百上千条链路,在性能上无法支撑,即便NIO线程的CPU负荷达到100%,也无法满足海量消息的编码、解码、读取和发送需求。
- 当NIO线程负载过重时,处理速度将变慢,这回导致大量客户端连接超时,超时之后往往会进行重发消息,这更加加重NIO线程的负荷,最终将导致大量消息积压和处理超时,成为系统的性能瓶颈。
- 可靠性问题:一旦NIO线程出现意外,会导致整个系统通信模块不可用,不能接受和处理外部消息,造成节点故障。
对于Netty,在创建NioEventLoopGroup时可以指定工作的I/O线程数,通常为"CPU内核书X2"或者"CPU内核书+1",这样可提升网络的读写性能,需要指出的是,不要把I/O线程数设置得过大,除了会导致线程竞争加剧,还会带来其他副作用。
NioEventLoop线程处理网络读写等操作的关键是聚合了一个Selector,代码如下:
public final class NioEventLoop extends SingleThreadEventLoop{
private Selector selector;
private SelectorTuple openSelector(){
final Selector unwrappedSelector;
try{
unwrappedSelector = provider.openSelector();
} catch(IOException e) {
throw new ChannelException("failed to open a new selector", e);
}
if(DISABLE_KEYSET_OPTIMIZATION){
return new SelectorTuple(unwrappedSelector);
}
}
}
除了支持JDK原生的Selector,Netty也支持创建其他SPI提供的Selector,同时Netty对Selector的遍历也做了性能优化,对于网络消息处理,通过轮询Selector的SelectorSelectionKeySet实现,代码如下(NioEventLoop类):
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch){
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if(!k.isValid()){
final EventLoop eventLoop;
}
try{
int readyOps = k.readyOps();
if((readyOps & SelectionKey.OP_CONNECT) != 0){
int ops = k.interestOps();
ops &= -SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
if((readyOps & SelectionKey.OP_WRITE) != 0){
ch.unsafe().forceFlush();
}
if((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps = 0){
unsafe.read();
}
} catch(CancellKeyException ignored){
unsafe.close(unsafe.voidPromise());
}
}
通过对SelectionKey的取值进行判断,完成对应的I/O操作。
- 如果为OP_CONNECT,则代表客户端异步连接操作执行结果。
- 如果为OP_WRITE,说明发生了写半包,发送队列尚有消息未完成发送,需要继续执行发送操作。
- 如果为OP_READ,说明SocketChannel上有消息可以读取,执行read ByteBuffer操作。
- 如果为OP_ACCEPT,说明ServerSocketChannel上有新的客户端TCP连接接入,需要执行accept操作,完成TCP握手和客户端TCP连接的接入。
异步任务执行原理
除了一些标准的网络I/O操作,NioEventLoop也支持各种Runnable类型的任务执行,任务的使用有两种场景。
- Netty系统任务,主要用于任务的异步执行,或者用于用户线程切换到Netty的NioEventLoop线程,避免业务ChannelHaneler加锁。
- 用户自定义用来辅助I/O操作的业务任务。
AbstractWriteTask就是比较典型的Netty系统任务,它将write操作封装成任务,放入NioEventLoop任务队列异步执行,代码如下(AbstractWriteTask类):
abstract static class AbstractWriteTask implements Runnable{
public final void run(){
try{
if(ESTIMATE_TASK_SIZE_ON_SUBMIT){
ctx.pipleline.decrementPendingOutboundBytes(size);
}
write(ctx, msg, promise);
} finally{
ctx = null;
mas = null;
promise = null;
handle.recycle(this);
}
}
}
任务存放在SingleThreadEventExecutor类的成员变量Queue taskQueue中,每次Selector轮询完,执行taskQueue中的任务,代码如下(SingleThreadEventExecutor类):
protected boolean runAllTasks(){
assert inEventLoop();
boolean fetchedAll;
boolean ranAtLeastOne = false;
do{
fetchedAll = fetchFromScheduledTaskQueue();
if(runAllTasksFrom(taskQueue)){
ranAtLeastOne = true;
}
} while(!fetchedAll);
if(ranAtLeastOne){
lastExecutionTime = ScheduledFutureTask.nanoTime();
}
afterRunningAllTasks();
return ranAtLeastOne;
}
由于NioEventLoop需要同时处理I/O事件和非I/O任务,为了保证两者都能得到足够的CPU时间,Netty提供了I/O比例供用户定制。如果I/O操作多余定时任务和其他任务,则可以将I/O比例调大,默认为50%。
当限制的执行时间到期时,无论当前积压的任务是否执行完,都需要退出循环,防止长时间执行任务而阻塞网络I/O操作。
定时任务执行原理
除了不同的Runnable类型的任务,NioEventLoop还支持执行定时任务,通过调用schedule接口,可以实现定时任务的执行。通过调用fetchFromScheduledTaskQueue(),将到期的定时任务加入taskQueue并随taskQueue执行,实际上可以理解为taskQueue本身就是需要立即执行的定时任务队列,相关代码如下(SingleThreadEventExecutor类):
private boolean fetchFromScheduledTaskQueue(){
long nanoTime = AbstractSchduledEventExecutor.nanoTime();
Runnable scheduledTask = pollScheduledTask(nanoTime);
while (scheduledTask != null){
if(!taskQueue.offer(scheduledTask)){
scheduledQueue().add(scheduledTask);
return false;
}
scheduledTask = pollScheduledTask(nanoTime);
}
return true;
}
在Netty中,定时任务最经典的使用场景是链路空闲状态监测,在初始化IdleStateHandler时,同步创建ReaderIdleTimeoutTask、WriterIdleTimeoutTask和AllIdleTimeoutTask三个定时任务,负责链路空闲状态检测,相关代码:
private void initialize(ChannelHandlerContext ctx){
lastReadTime = lastWriteTime = ticksInNanos();
if(readerIdleTimeNanos > 0){
readerIdleTimeout = schdule(ctx, new ReaderIdleTimeoutTask(ctx), readerIdleTimeNanos, TimeUnit.NANOSECONDS);
}
if(writerIdleTimeNanos > 0){
writerIdleTimeout = schdule(ctx, new WriterIdleTimeoutTask(ctx), writerIdleTimeNanos, TimeUnit.NANOSECONDS);
}
if(allIdleTimeNanos > 0){
allIdleTimeNanos = schdule(ctx, new AllIdleTimeoutTask(ctx), allIdleTimeNanos, TimeUnit.NANOSECONDS);
}
}
对于用户而言如果需要执行一些周期性任务,不需要自己创建定时器或者使用JDK的ScheduledExecutorService,可以直接使用Netty的NioEventLoop定时任务,实现诸如心跳发送之类的功能。
Netty多线程最佳实践
- 创建两个NioEventLoopGroup,用于逻辑隔离NIO Acceptor和NIO I/O线程。
- 尽量不要在ChannelHandler中启动用户线程,解码后用于将POJO消息派发到后端业务线程除外。
- 解码要放在NIO线程调用的解码Handler中进行,不要切换到用户线程完成消息的解码。
- 如果业务逻辑操作非常简单,没有复杂的业务逻辑,也没有可能导致线程被阻塞的磁盘操作、数据库操作、网络操作等,可以直接在NIO线程上完成业务逻辑编排,不需要切换到用户线程。
- 如果业务逻辑复杂,不要在NIO线程上完成,建议将解码后的POJO消息封装成任务,派发到业务线程池中由业务线程执行,以保证NIO线程尽快被释放,处理其他I/O操作。
推荐线程数量计算公式:
- 线程数量 = (线程总时间 / 瓶颈资源时间) x 瓶颈资源的线程并行数
- QPS = 1000 / 线程总时间 x 线程数
标签:NioEventLoop,Netty,NIO,ctx,任务,线程 来源: https://blog.csdn.net/MarchRS/article/details/104546014