其他分享
首页 > 其他分享> > [09] Reactor 线程模型解析

[09] Reactor 线程模型解析

作者:互联网

摘自《Netty 即时聊天实战与底层原理》

1. NioEventLoopGroup 创建

这部分,我们着重分析下面两行代码。

NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup();

1.1 确定 NioEventLoop 的个数

在 NioEventLoopGroup 构造方法中,如果没有传递构造参数,那么默认构造参数为 0,这个 0 在后面决定要创建多少个线程的时候会用上。

NioEventLoopGroup

public NioEventLoopGroup() {
  this(0);
}

public NioEventLoopGroup(int nThreads) {
  this(nThreads, (Executor) null);
}

// 省略中间调用过程...

public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
                         final SelectStrategyFactory selectStrategyFactory) {
  super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}

MultithreadEventLoopGroup

protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
  super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}

private static final int DEFAULT_EVENT_LOOP_THREADS;

static {
  DEFAULT_EVENT_LOOP_THREADS = Math.max(1,
    SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
}

从这里其实可以分析出来,在 Demo 代码中,如果没有传递构造参数,那么这里的 nThreads 就是 DEFAULT_EVENT_LOOP_THREADS,而如果传递了 1,那么这里的 nThreads 就是 1。

nThreads 标识了最终线程池最多会创建多少个线程,默认最多会创建 DEFAULT_EVENT_LOOP_THREADS 个线程(CPU 核数 * 2)。

1.2 NioEventLoopGroup 的创建总体框架

我们已经知道,Netty 是如何确定最终创建多少个线程的。接下来分析 NioEventLoopGroup 的线程创建过程。接着上一部分的调用过程,我们来到如下代码。

MultithreadEventExecutorGroup

protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
    // 这里的第 3 个参数需重点关注↘
    this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}

// =============== 入口 ===============
protected MultithreadEventExecutorGroup(int nThreads,
        Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) {

    // 1. 创建 ThreadPerTaskExecutor
    if (executor == null) {
        executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
    }

    // 2. 创建 NioEventLoop
    children = new EventExecutor[nThreads];
    for (int i = 0; i < nThreads; i ++) {
        // ...
        children[i] = newChild(executor, args);
        // ...
    }

    // 3. 创建线程选择器
    chooser = chooserFactory.newChooser(children);

    // ...
}

重点就在于这 3 部分:

  1. 创建 ThreadPerTaskExecutor:ThreadPerTaskExecutor 标识每次调用 execute() 方法的时候,就会创建一个线程;
  2. 创建 NioEventLoop:NioEventLoop 对应线程池里线程的概念,这里其实就是用一个 for 循环创建的;
  3. 创建线程选择器:线程选择器的作用是确定每次如何从线程池中选择一个线程,也就是每次如何从 NioEventLoopGroup 中选择一个 NioEventLoop;

a. 创建 ThreadPerTaskExecutor

MultithreadEventExecutorGroup

protected MultithreadEventExecutorGroup(int nThreads,
        Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) {

    // 1. 创建 ThreadPerTaskExecutor
    if (executor == null) {
        // newDefaultThreadFactory() 调用的不是下面这个 protected 方法的,
        // 实际调用的是 MultithreadEventLoopGroup 中的(方法重写)。
        executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
    }

    // 2. 创建 NioEventLoop

    // 3. 创建线程选择器

    // ...
}

protected ThreadFactory newDefaultThreadFactory() {
    return new DefaultThreadFactory(getClass());
}

先来看下 ThreadPerTaskExecutor,这个类的代码不多。它的作用是,每次执行 execute 方法的时候,它都会调用 threadFactory 来创建一个线程,把需要执行的命令传递进去,然后执行。

public final class ThreadPerTaskExecutor implements Executor {
  private final ThreadFactory threadFactory;

  public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
    this.threadFactory = threadFactory;
  }

  @Override
  public void execute(Runnable command) {
    threadFactory.newThread(command).start();
  }
}

接下来,再来看下 threadFactory 的创建。

public abstract class MultithreadEventLoopGroup
        extends MultithreadEventExecutorGroup implements EventLoopGroup {

    @Override
    protected ThreadFactory newDefaultThreadFactory() {
        // io.netty.channel.nio.NioEventLoopGroup, 10
        return new DefaultThreadFactory(getClass(), Thread.MAX_PRIORITY);
    }
}

DefaultThreadFactory

public DefaultThreadFactory(Class<?> poolType, int priority) {
    this(poolType, false, priority);
}

public DefaultThreadFactory(Class<?> poolType, boolean daemon, int priority) {
  this(toPoolName(poolType), daemon, priority);
}

/**
 * 把 NioEventLoopGroup 的首字母变成小写
 */
public static String toPoolName(Class<?> poolType) {
  if (poolType == null) {
    throw new NullPointerException("poolType");
  }

  String poolName = StringUtil.simpleClassName(poolType);
  switch (poolName.length()) {
    case 0:
      return "unknown";
    case 1:
      return poolName.toLowerCase(Locale.US);
    default:
      if (Character.isUpperCase(poolName.charAt(0)) && Character.isLowerCase(poolName.charAt(1))) {
        return Character.toLowerCase(poolName.charAt(0)) + poolName.substring(1);
      } else {
        return poolName;
      }
  }
}

public DefaultThreadFactory(String poolName, boolean daemon, int priority) {
  this(poolName, daemon, priority, System.getSecurityManager() == null ?
      Thread.currentThread().getThreadGroup() : System.getSecurityManager().getThreadGroup());
}

public DefaultThreadFactory(String poolName, boolean daemon, int priority, ThreadGroup threadGroup) {
  prefix = poolName + '-' + poolId.incrementAndGet() + '-';
  this.daemon = daemon;
  this.priority = priority;
  this.threadGroup = threadGroup;
}

private static final AtomicInteger poolId = new AtomicInteger();
private final AtomicInteger nextId = new AtomicInteger();
private final String prefix;
private final boolean daemon;
private final int priority;
protected final ThreadGroup threadGroup;

省去其他无关信息,我们看到,最终 DefaultThreadFactory 会用一个成员变量 prefix 来标记线程名的前缀,其中 poolId 是全局的自增 ID。不难分析,prefix 的最终格式为“nioEventLoopGroup-线程池编号-”。

@Override
public Thread newThread(Runnable r) {
  Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet());
  // ...
  return t;
}

protected Thread newThread(Runnable r, String name) {
  return new FastThreadLocalThread(threadGroup, r, name);
}

可以看到,最终创建出来的线程名是 prefix 加一个自增的 nextId。这里的 nextId 是对象级别的成员变量,只在一个 NioEventLoopGroup 里递增。所以,最终看到,Netty 里的线程名字都类似于 nioEventLoopGroup-2-3,表示这个线程是属于第几个 NioEventLoopGroup 的第几个 NioEventLoop。

我们还注意到,DefaultThreadFacotry 创建出来的线程实体是经过 Netty 优化过的 FastThreadLocalThread,也可以理解为,这个类型的线程实体在操作 ThreadLocal 的时候,要比 JDK 快,这部分内容的分析,我们放到后续章节中。

到这里,我们已经了解到,Netty 的线程实体是由 ThreadPerTaskExecutor 创建的,ThreadPerTaskExecutor 每次执行 execute 的时候都会创建一个 FastThreadLocalThread 的线程实体。接下来,我们就分析一下 NioEventLoopGroup 创建总体框架的第二个过程。

b. 创建 NioEventLoop

MultithreadEventExecutorGroup

protected MultithreadEventExecutorGroup(int nThreads,
        Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) {

    // 1. 创建 ThreadPerTaskExecutor

    // 2. 创建 NioEventLoop
    children = new EventExecutor[nThreads];
    for (int i = 0; i < nThreads; i ++) {
        // ...
        children[i] = newChild(executor, args);
        // ...
    }

    // 3. 创建线程选择器

    // ...
}

Netty 使用 for 循环来创建 nThreads 个 NioEventLoop,通过前面的分析,我们可能已经猜到,一个 NioEventLoop 对应一个线程实体,这个线程实体是 FastThreadLocalThread。

先来分析 newChild() 方法。

NioEventLoopGroup

@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
  return new NioEventLoop(this, executor, (SelectorProvider) args[0],
    ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}

newChild() 方法传递一个 executor 参数,这个参数就是前面分析的 ThreadPerTaskExecutor,而 args 参数是我们通过层层调用传递过来的一系列参数。

我们看到,newChild() 方法最终创建的是一个 NioEventLoop 对象,这里的 this 指的是 NioEventLoopGroup,表示归属于哪个 NioEventLoopGroup。

继续分析 NioEventLoop 的创建过程。

NioEventLoop

NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
            SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {

  super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);

  // ...

  final SelectorTuple selectorTuple = openSelector();

  // ...
}

我们同样略去了非关键代码。首先,继续调用父类构造方法;然后,通过调用 openSelector() 方法来创建一个 Selector。Selector 是 NIO 编程里最核心的概念,一个 Selector 可以将多个连接绑定在一起,负责监听这些连接的读写事件,即多路复用。

在 openSelector() 方法中,Netty 通过反射对 Selector 底层的数据结果进行了优化。

继续分析 NioEventLoop,往上层层调用父类构造方法,最终来到以下逻辑。

SingleThreadEventExecutor

protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
        boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) {
  // ...
  taskQueue = newTaskQueue(this.maxPendingTasks);
  // ...
}

这里最关键的代码其实只有一行,其作用是创建一个任务队列,Netty 中所有的异步执行,本质上都是通过这个任务队列来协调完成的。

这里的 newTaskQueue 是一个 protected 方法,在 NioEventLoop 中被重写。

@Override
protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
  // This event loop never calls takeTask()
  return maxPendingTasks == Integer.MAX_VALUE
          ? PlatformDependent.<Runnable>newMpscQueue()
          : PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
}

这里创建的是一个高性能的 MPSC 队列,即多生产者单消费者队列,单消费者指某个 NioEventLoop 对应的线程,而多生产者就是此 NioEventLoop 对应的线程之外的线程,通常情况下就是我们的业务线程。比如,我们在调用 writeAndFlush() 的时候,可以不用考虑线程安全,随意调用,这些线程指的就是多生产者。

如果我们继续向下跟踪,会发现 Netty 的 MPSC 队列直接使用的 JCTools,可以说 Netty 的高性能,很大程度上功劳要归功于这个工具包。

关于 NioEventLoop 的创建,最关键的其实就是两部分:创建一个 Selector 和创建一个 MPSC 队列,这三者均为一对一的关系(注意,此时并没有创建关联的线程)。

c. 创建线程选择器

关于 NioEventLoopGroup 的最后一部分内容,就是创建〈线程选择器〉,那么〈线程选择器〉的作用是什么呢?

在传统的 NIO 编程中,一个新连接被创建后,通常需要给这个连接绑定一个 Selector,之后这个连接的整个生命周期都有这个 Selector 管理。而从上面的代码中,我们分析到,Netty 中一个 Selector 对应一个 NioEventLoop,线程选择器的作用正是为“一个连接”在一个 EventLoopGroup 中选择一个 NioEventLoop,从而将这个连接绑定到某个 Selector 上。

下面是线程选择器的接口描述。

interface EventExecutorChooser {
    EventExecutor next();
}

接下来分析 NioEventLoopGroup 创建总体框架的最后一个过程 —— 创建〈线程选择器〉。

protected MultithreadEventExecutorGroup(int nThreads,
        Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) {

    // 1. 创建 ThreadPerTaskExecutor

    // 2. 创建 NioEventLoop

    // 3. 创建线程选择器
    chooser = chooserFactory.newChooser(children);

    // ...
}

DefaultEventExecutorChooserFactory

@UnstableApi
public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {

  public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();

  private DefaultEventExecutorChooserFactory() { }

  @SuppressWarnings("unchecked")
  @Override
  public EventExecutorChooser newChooser(EventExecutor[] executors) {
    // 根据是否是 2 的幂次来创建不同的选择器
    if (isPowerOfTwo(executors.length)) {
      return new PowerOfTwoEventExecutorChooser(executors);
    } else {
      return new GenericEventExecutorChooser(executors);
    }
  }

  private static boolean isPowerOfTwo(int val) {
    return (val & -val) == val;
  }

  // --- 通过简单的累加取模来实现循环的逻辑
  private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
    private final AtomicInteger idx = new AtomicInteger();
    private final EventExecutor[] executors;

    PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
      this.executors = executors;
    }

    @Override
    public EventExecutor next() {
      return executors[idx.getAndIncrement() & executors.length - 1];
    }
  }

  // --- 通过位运算实现的
  private static final class GenericEventExecutorChooser implements EventExecutorChooser {
    private final AtomicInteger idx = new AtomicInteger();
    private final EventExecutor[] executors;

    GenericEventExecutorChooser(EventExecutor[] executors) {
      this.executors = executors;
    }

    @Override
    public EventExecutor next() {
      return executors[Math.abs(idx.getAndIncrement() % executors.length)];
    }
  }
}

Netty 通过判断 NioEventLoopGroup 中的 NioEventLoop 是否是 2 的幂来创建不同的线程选择器,不管是哪一种选择器,最终效果都是从第一个 NioEventLoop 遍历到最后一个 NioEventLoop,再从第一个开始,如此循环。

比如,CPU 核数为 4,默认创建 8 个 NioEventLoop,符合 2 的幂,这里 executors.length-1 计算出来是 7,也就是二进制数 111,那么,只要使用一个自增的 ID,和 111 进行与运算,就能实现循环的效果,而与运算是 OS 底层支持的,比取模运算效率高很多。

1.3 小结

  1. 在默认情况下,NioEventLoopGroup 会创建两倍 CPU 核数个 NioEventLoop,一个 NioEventLoop 和一个 Selector 及一个 MPSC 任务队列一一对应;
  2. NioEventLoop 线程的命名规则是“nioEventLoopGroup-xx-yy”,xx 表示全局第 xx 个 NioEventLoopGroup,yy 表示这个 NioEventLoop 在 NioEventLoopGroup 中是第 yy 个;
  3. 线程选择器的作用是为“一个连接”选择一个 NioEventLoop,如果 NioEventLoop 的个数为 2 的幂,则 Netty 会使用与运算进行优化。

2. NioEventLoop 对应线程的创建和启动

2.1 NioEventLoop 的启动入口

在上一篇《服务端启动》中,#5-注册服务端 Channel 到 Selector 的过程中会经过以下逻辑。

AbstractChannel$AbstractUnsafe

@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {

    AbstractChannel.this.eventLoop = eventLoop;

    // ~ 第一个 Channel 过来时创建
    if (eventLoop.inEventLoop()) {
        register0(promise);
    } else {
        // ===== Step Into [execute] =====
        eventLoop.execute(new Runnable() {
            @Override
            public void run() {
                register0(promise);
            }
        });
    }
}

这里只展示核心代码部分,当代码调用到这里的时候,当前线程是 main 方法对应的主线程。

了解到当前线程是什么之后,接下来分析一下 inEventLoop() 这个方法。

AbstractEventExecutor

@Override
public boolean inEventLoop() {
    return inEventLoop(Thread.currentThread());
}

@Override
public boolean inEventLoop(Thread thread) {
    return thread == this.thread;
}

首先调用重载方法,将当前线程,也就是 main 方法对应的主线程传递进来,然后将这个线程与 this.thread 进行比较。由于 this.thread 此时并未赋值,所以为空,因而返回 false。

另外,我们在 Netty 的源码中,会在很多地方看到 inEventLoop() 这样的判断,这个方法的本质含义就是:判断当前线程是否是 Netty 的 Reactor 线程,也就是 NioEventLoop 对应的线程实体。后面会看到,创建一个线程之后,会将这个线程实体保存到 thread 这个成员变量中。

因为这个地方返回 false,所以接下来调用 eventLoop.execute() 这个逻辑。

2.2 创建线程并启动

SingleThreadEventExecutor

@Override
public void execute(Runnable task) {

    boolean inEventLoop = inEventLoop();
    if (inEventLoop) {
        addTask(task);
    } else {
        startThread();
        addTask(task);
        if (isShutdown() && removeTask(task)) {
            reject();
        }
    }

    if (!addTaskWakesUp && wakesUpForTask(task)) {
        wakeup(inEventLoop);
    }
}

由于 execute 方法是 public 的,因此可能被用户代码使用。比如,我们经常使用 ctx.executor().execute(...),所以,这里又进行了一次外部线程判断逻辑,确保执行 task 不会遇到线程安全问题。

这里是 main 方法对应的线程,所以接下来执行 startThread() 方法。

SingleThreadEventExecutor

private void startThread() {
    if (state == ST_NOT_STARTED) {
        if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
            doStartThread();
        }
    }
}

可以看到,Netty 会判断 Reactor 线程有没有被启动。如果没有被启动,则调用 doStartThread() 方法启动线程。

SingleThreadEventExecutor

private void doStartThread() {
    executor.execute(new Runnable() {
        @Override
        public void run() {
            thread = Thread.currentThread();
            // ...
            // NioEventLoop 继承自 SingleThreadEventExecutor
            // 所以这个 run 实际会去调用 NioEventLoop#run -> #3
            SingleThreadEventExecutor.this.run();
            // ...
        }
    });
}

// ThreadPerTaskExecutor
@Override
public void execute(Runnable command) {
    threadFactory.newThread(command).start();
}

在执行 doStartThread() 的时候,会调用内部成员变量 executor 的 execute() 方法,而根据我们在 #1.2.a 的分析,executor 就是 ThreadPerTaskExecutor,这个对象的作用就是每次执行 Runnable 的时候,就会先创建一个线程再执行。

在这个 Runnable 中,通过一个成员变量 thread 来保存 ThreadPerTaskExecutor 创建出来的线程,这个线程就是我们在 #1.2.a 中分析的 FastThreadLocalThread。至此,我们终于知道,一个 NioEventLoop 是如何与一个线程实体绑定的:NioEventLoop 通过 ThreadPerTaskExecutor 创建一个 FastThreadLocalThread,然后通过一个成员变量来指向这个线程

NioEventLoop 保存完线程的引用之后,随即调用 run() 方法,这个 run() 方法就是 Netty Reactor 的核心所在。

2.3 小结

通过这部分的学习,基本理清了 Netty 在服务端启动过程中,boss 对应的 NioEventLoopGroup 是如何创建第一个 NioEventLoop 线程的,又是如何开始线程的 Reactor 循环的,而在后面的客户端新连接接入过程中,我们会继续分析 worker 对应的 NioEventLoop 又是如何创建线程并启动的。

3. NioEventLoop 的执行流程

先来看下 NioEventLoop 的执行总体流程:

@Override
protected void run() {
    for (;;) {
        switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
            case SelectStrategy.CONTINUE:
                continue;
            case SelectStrategy.SELECT:
                // =====> 1. 执行一次事件轮询
                select(wakenUp.getAndSet(false));

                // 'wakenUp.compareAndSet(false, true)' is always evaluated
                // before calling 'selector.wakeup()' to reduce the wake-up
                // overhead. (Selector.wakeup() is an expensive operation.)
                //
                // However, there is a race condition in this approach.
                // The race condition is triggered when 'wakenUp' is set to
                // true too early.
                //
                // 'wakenUp' is set to true too early if:
                // 1) Selector is waken up between 'wakenUp.set(false)' and
                //    'selector.select(...)'. (BAD)
                // 2) Selector is waken up between 'selector.select(...)' and
                //    'if (wakenUp.get()) { ... }'. (OK)
                //
                // In the first case, 'wakenUp' is set to true and the
                // following 'selector.select(...)' will wake up immediately.
                // Until 'wakenUp' is set to false again in the next round,
                // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
                // any attempt to wake up the Selector will fail, too, causing
                // the following 'selector.select(...)' call to block
                // unnecessarily.
                //
                // To fix this problem, we wake up the selector again if wakenUp
                // is true immediately after selector.select(...).
                // It is inefficient in that it wakes up the selector for both
                // the first case (BAD - wake-up required) and the second case
                // (OK - no wake-up required).

                if (wakenUp.get()) {
                    selector.wakeup();
                }
                // fall through
            default:
        }

        cancelledKeys = 0;
        // #3.2 会用到这个变量!
        needsToSelectAgain = false;
        final int ioRatio = this.ioRatio;
        if (ioRatio == 100) {
            try {
                // =====> 2. 处理产生 IO 事件的 Channel
                processSelectedKeys();
            } finally {
                // Ensure we always run tasks.
                // =====> 3. 执行任务
                runAllTasks();
            }
        } else {
            final long ioStartTime = System.nanoTime();
            try {
                // =====> 2. 处理产生 IO 事件的 Channel
                processSelectedKeys();
            } finally {
                // Ensure we always run tasks.
                final long ioTime = System.nanoTime() - ioStartTime;
                // =====> 3. 执行任务
                runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
            }
        }
    }
}

我们抽取主干,发现 Reactor 线程做的事情其实很简单,用下图就可以说明。

Reactor 线程大概做的事情不断循环以下 3 个步骤:

  1. 执行一次事件循环。首先轮询注册到 Reactor 线程对应的 Selector 上的所有 Channel 的 IO 事件;
    select(wakenUp.getAndSet(false));
    if (wakenUp.get()) {
        selector.wakeup();
    }
    
  2. 处理产生 IO 事件的 Channel。如果有读写或者新连接接入事件,则处理:
    processSelectedKeys();
    
  3. 处理任务队列。
    runAllTasks(...);
    

3.1 执行一次事件轮询

private void select(boolean oldWakenUp) throws IOException {
  for (;;) {
    // 1. 定时任务截至时间快到了,中断本次轮询;
    // 2. 轮询过程中发现有任务加入,中断本次轮询;
    // 3. 阻塞式 select 操作;
    // 4. 解决 JDK 的 NIO BUG (Epoll空轮询)。
  }
}

接下来,详细分析一下 Netty 关于事件轮询的 4 段主要逻辑。

(1)定时任务截至时间快到了,中断本次轮询。

private void select(boolean oldWakenUp) throws IOException {
  Selector selector = this.selector;

  int selectCnt = 0;
  long currentTimeNanos = System.nanoTime();
  long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);

  for (;;) {
    // 1. 定时任务截至时间快到了,中断本次轮询
    long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
    if (timeoutMillis <= 0) {
      if (selectCnt == 0) {
        selector.selectNow();
        selectCnt = 1;
      }
      break;
    }

    // ...

  }

我们可以看到,NioEventLoop 中 Reactor 线程的 select 操作也是一个 for 循环。在 for 循环第一步中,如果发现当前的定时任务队列中有任务的截至时间快到了(<=0.5ms),就跳出循环。此外,跳出之前,如果发现目前为止还没有进行过 select 操作,即 if (selectCnt == 0),那么就调用一次 selectNow(),该方法会立即返回,不会阻塞。

这里说明一点,Netty 里的定时任务队列是按照延时时间从小到大进行排列的,delayNanos(currentTimeNanos) 方法即取出第一个定时任务的延迟时间。

SingleThreadEventExecutor

/**
 * Returns the amount of time left until the scheduled task
 * with the closest dead line is executed.
 */
protected long delayNanos(long currentTimeNanos) {
  ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
  if (scheduledTask == null) {
    return SCHEDULE_PURGE_INTERVAL;
  }

  return scheduledTask.delayNanos(currentTimeNanos);
}

关于 Netty 的定时任务(包括普通队列、定时任务)相关的细节我们在接下来的小节会详细分析,这里就不过多展开了。

(2)轮询过程中发现有任务加入,中断本次轮询。

private void select(boolean oldWakenUp) throws IOException {
  Selector selector = this.selector;

  int selectCnt = 0;
  long currentTimeNanos = System.nanoTime();
  long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);

  for (;;) {
    // 1. 定时任务截至时间快到了,中断本次轮询
    // ...

    // 2. 轮询过程中发现有任务加入,中断本次轮询
    // If a task was submitted when wakenUp value was true, the task didn't get a chance to call
    // Selector#wakeup. So we need to check task queue again before executing select operation.
    // If we don't, the task might be pended until select operation was timed out.
    // It might be pended until idle timeout if IdleStateHandler existed in pipeline.
    if (hasTasks() && wakenUp.compareAndSet(false, true)) {
      selector.selectNow();
      selectCnt = 1;
      break;
    }

    // ...

  }
}

Netty 为了保证任务队列里的任务能够及时执行,在进行阻塞 select 操作的时候会判断任务队列是否为空。如果不为空,就进行一次非阻塞 select 操作,跳出循环;否则,继续执行下面的操作。

(3)阻塞式 select 操作

private void select(boolean oldWakenUp) throws IOException {
  Selector selector = this.selector;

  int selectCnt = 0;
  long currentTimeNanos = System.nanoTime();
  long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);

  for (;;) {
    // 1. 定时任务截至时间快到了,中断本次轮询;
    long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
    // ...

    // 2. 轮询过程中发现有任务加入,中断本次轮询;
    // ...

    // 3. 阻塞式 select 操作;
    int selectedKeys = selector.select(timeoutMillis);
    selectCnt ++;

    if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
      // - Selected something                             检测到 IO 事件
      // - waken up by user                               被用户主动唤醒
      // - the task queue has a pending task              任务队列里面有任务需要执行
      // - a scheduled task is ready for processing       第 1 个定时任务即将要被执行
      break;
    }

    // ...
  }
}

执行到这一步,说明 Netty 任务队列为空,并且所有定时任务的延迟时间还未到(大于 0.5 ms)。于是,进行一次阻塞 select 操作,截至到第一个定时任务的截止时间。

这里,我们可以问自己一个问题,如果第一个定时任务的延迟非常长,比如 1h,那么有没有可能线程一直阻塞在 select 操作上?

答案是当然有可能,但是,只要在这段时间内有新任务加入,该阻塞就会被释放。我们接下来简单分析一下当有外部线程执行 EventLoop 的 execute() 方法时会发生什么情况。

回过头去看下 #2.2 的 SingleThreadEventExecutor#execute() { ... wakeup(inEventLoop); ...},接下来进入 wakeup 实现(在 NioEventLoop 中)。

// NioEventLoop extends SingleThreadEventExecutor
@Override
protected void wakeup(boolean inEventLoop) {
    if (!inEventLoop && wakenUp.compareAndSet(false, true)) {
        selector.wakeup();
    }
}

可以看到,在外部线程添加任务的的时候,会调用 wakeup() 方法来唤醒 selector.select(timeoutMillis)。

阻塞 select 操作结束之后,Netty 又做了一系列状态判断来决定是否中断本次轮询,中断本次轮询的条件有如下几种,对应着 if 判断条件里面的一系列或条件。

(4)解决 JDK 的 NIO BUG

select 操作的最后一步,就是解决 JDK 的 NIO Bug,该 Bug 会导致 Selector 一直在空轮询,最终导致 CPU 100%,NIO Server 不可用。严格意义上来说,Netty 没有解决 JDK 的 Bug,而是通过一种方式巧妙地避开了这个 Bug,具体做法如下。

private void select(boolean oldWakenUp) throws IOException {
  Selector selector = this.selector;

  int selectCnt = 0;
  long currentTimeNanos = System.nanoTime();
  long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);

  for (;;) {
    // ...

    // 3. 阻塞式 select 操作;
    int selectedKeys = selector.select(timeoutMillis);
    selectCnt ++;

    // ...

    // 4. 解决 JDK 的 NIO BUG
    long time = System.nanoTime();
    // 判断 select 操作是否至少持续了 timeoutMillis 秒
    // (time - currentTimeNanos >= TimeUnit.MILLISECONDS.toNanos(timeoutMillis))
    if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
      // timeoutMillis elapsed without anything selected.
      selectCnt = 1;
    } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                    selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
      // The selector returned prematurely many times in a row.
      // Rebuild the selector to work around the problem.
      logger.warn(
          "Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
          selectCnt, selector);

      rebuildSelector();
      selector = this.selector;

      // Select again to populate selectedKeys.
      selector.selectNow();
      selectCnt = 1;
      break;
    }

    currentTimeNanos = time;
  }

  // ...

}

Netty 会在一开始记录下开始时间 currentTimeNanos,在阻塞 select 之后记录下结束时间,判断 select 操作是否至少持续了 timeoutMillis 秒,如果持续时间大于等于 timeoutMillis,说明这是一次有效地轮询,重置 selectCnt 标志;否则,表明该阻塞方法并没有阻塞这么长时间,可能触发了 JDK 的空轮询 Bug(说可能是因为还有可能是外部线程执行任务中断了本次 select 操作),当空轮询的次数超过一定阈值(默认 512)的时候,就开始重建 Selector。

NioEventLoop

int selectorAutoRebuildThreshold = SystemPropertyUtil.getInt("io.netty.selectorAutoRebuildThreshold", 512);

if (selectorAutoRebuildThreshold < MIN_PREMATURE_SELECTOR_RETURNS) {
    selectorAutoRebuildThreshold = 0;
}

SELECTOR_AUTO_REBUILD_THRESHOLD = selectorAutoRebuildThreshold;

由此可以看到,默认 SELECTOR_AUTO_REBUILD_THRESHOLD 为 512。

下面我们简单描述一下 Netty 通过 rebuildSelector 来绕过空轮询 Bug 的过程。

rebuildSelector 操作其实很简单:创建一个新的 Selector,将之前注册到老的 Selector 上的 Channel 重新转移到新的 Selector 上。抽取完主要代码后的骨架如下。

/**
 * Replaces the current {@link Selector} of this event loop with newly created
 * {@link Selector}s to work around the infamous epoll 100% CPU bug.
 */
public void rebuildSelector() {
  if (!inEventLoop()) {
    execute(() -> rebuildSelector0());
    return;
  }
  rebuildSelector0();
}

private void rebuildSelector0() {
  final Selector oldSelector = selector;
  final SelectorTuple newSelectorTuple;

  newSelectorTuple = openSelector();

  // Register all channels to the new Selector.
  int nChannels = 0;
  for (SelectionKey key: oldSelector.keys()) {
    Object a = key.attachment();

    // 1. 拿到有效的key
    if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {
      continue;
    }
    int interestOps = key.interestOps();
    // 2. 取消该 key 在旧的 Selector 上的事件注册
    key.cancel();
    // 3. 将该 key 对应的 Channel 注册到新的 Selector 上
    SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);
    // 4. 重新绑定 Channel 和新的 key
    if (a instanceof AbstractNioChannel) {
      // Update SelectionKey
      ((AbstractNioChannel) a).selectionKey = newKey;
    }
    nChannels ++;
  }

  selector = newSelectorTuple.selector;
  unwrappedSelector = newSelectorTuple.unwrappedSelector;

  // time to close the old selector as everything else is registered to the new one
  oldSelector.close();

  logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
}

首先,通过 openSelector() 方法创建一个新的 Selector,然后执行具体的转移步骤:

  1. 拿到有效的 key;
  2. 取消该 key 在旧的 Selector 上的事件注册;
  3. 将该 key 对应的 Channel 注册到新的 Selector 上;
  4. 重新绑定 Channel 和新的 key;

转移完成后,就可以将原有的 Selector 废弃,后面所有的轮询都在新的 Selector 上进行。

最后,我们总结 Reactor 线程 select 操作做的事情:不断地轮询是否有 IO 事件发生,并且在轮询过程中不断检查是否有任务需要执行,保证 Netty 任务队列中的任务能够及时执行,轮询过程使用一个计数器避开了 JDK 的空轮询 Bug。

3.2 处理产生 IO 事件的 Channel

我们已经了解到 Netty Reactor 线程执行总体框架的第一步是轮询出注册在 Selector 上的 IO 事件,那么接下来就要处理这些 IO 事件(process selected keys)。下面来看下 Netty 处理 IO 事件的细节。

进入 Reactor 线程的 run() 方法,找到处理 IO 事件的代码,如下所示:

private void processSelectedKeys() {
  if (selectedKeys != null) {
    processSelectedKeysOptimized();
  } else {
    processSelectedKeysPlain(selector.selectedKeys());
  }
}

我们发现处理 IO 事件,Netty 有两种选择,从名字上看,一种是处理优化过的 SelectedKeys,一种是正常处理。

我们把对优化过的 SelectedKeys 的处理稍微展开一下,看看 Netty 是如何优化的。我们查看 SelectedKeys 被引用过的地方,有如下代码。

NioEventLoop

private Selector selector;
private Selector unwrappedSelector;
private SelectedSelectionKeySet selectedKeys;

private static final class SelectorTuple {
  final Selector unwrappedSelector;
  final Selector selector;
}

private SelectorTuple openSelector() {
  final Selector unwrappedSelector = provider.openSelector();
  // ...
  final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();

  // selectorImplClass -> sun.nio.ch.SelectorImpl
  Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
  Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
  selectedKeysField.setAccessible(true);
  publicSelectedKeysField.setAccessible(true);
  selectedKeysField.set(unwrappedSelector, selectedKeySet);
  publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);

  // ...

  selectedKeys = selectedKeySet;

  // ...

  return new SelectorTuple(...);
}

首先,selectedKeys 是一个 SelectedSelectionKeySet 类对象,在 NioEventLoop 的 openSelector() 方法中创建,之后通过反射将 selectedKeys 与 sun.nio.ch.SelectorImpl 中的两个成员变量绑定。

在 sun.nio.ch.SelectorImpl 中,我们可以看到,其实这两个成员变量是两个 HashSet。

public abstract class SelectorImpl extends AbstractSelector {
  protected Set<SelectionKey> selectedKeys = new HashSet();
  protected HashSet<SelectionKey> keys = new HashSet();
  private Set<SelectionKey> publicKeys;
  private Set<SelectionKey> publicSelectedKeys;

  protected SelectorImpl(SelectorProvider sp) {
    super(sp);
    if (Util.atBugLevel("1.4")) {
      this.publicKeys = this.keys;
      this.publicSelectedKeys = this.selectedKeys;
    } else {
      this.publicKeys = Collections.unmodifiableSet(this.keys);
      this.publicSelectedKeys = Util.ungrowableSet(this.selectedKeys);
    }

  }

}

Selector 在调用 select() 方法的时候,如果有 IO 事件发生,就会往里面的两个成员变量中塞相应的 SelectionKey,即相当于往 HashSet 中添加元素,既然 Netty 通过反射将 JDK 中的两个成员变量替换掉,那我们就应该意识到,是不是 SelectedSelectionKeySet 在 add() 方法中做了某些优化呢?

SelectedSelectionKeySet

final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {

    SelectionKey[] keys;
    int size;

    SelectedSelectionKeySet() {
        keys = new SelectionKey[1024];
    }

    @Override
    public boolean add(SelectionKey o) {
        if (o == null) {
            return false;
        }

        keys[size++] = o;
        if (size == keys.length) {
            increaseCapacity();
        }

        return true;
    }

    @Override
    public int size() {
        return size;
    }

    @Override
    public boolean remove(Object o) {
        return false;
    }

    @Override
    public boolean contains(Object o) {
        return false;
    }

    @Override
    public Iterator<SelectionKey> iterator() {
        throw new UnsupportedOperationException();
    }

    void reset() {
        reset(0);
    }

    void reset(int start) {
        Arrays.fill(keys, start, size, null);
        size = 0;
    }

    private void increaseCapacity() {
        SelectionKey[] newKeys = new SelectionKey[keys.length << 1];
        System.arraycopy(keys, 0, newKeys, 0, size);
        keys = newKeys;
    }
}

这个类继承了 AbstractSet,说明该类可以当作一个 set 来用,但底层实际使用的是一个数组。add() 方法分为 3 步:① 将 SelectionKey 塞到该数组的尾部;② 如果该数组的长度等于数组的物理长度,就将该数组扩容。

可以看到,待程序运行一段时间后,等数组的长度足够长,每次在轮询到 NIO 事件时,Netty 只需要 O(1) 的时间复杂度就能将 SelectionKey 塞到 set 中去,而 JDK 底层使用的 HashSet put 的时间复杂度最少是 O(1),最差是 O(N),使用数组替换掉 HashSet 还有一个好处是遍历的时候非常高效。

关于 Netty 对 SelectionKeySet 的优化,就到这了。下面我们继续分析 Netty 对 IO 事件的处理,转到 processSelectedKeysOptimized()。

private void processSelectedKeysOptimized() {
  for (int i = 0; i < selectedKeys.size; ++i) {
    // 1. 取出 IO 事件及其对应的 Channel
    final SelectionKey k = selectedKeys.keys[i];
    // null out entry in the array to allow to have it GC'ed once the Channel close
    selectedKeys.keys[i] = null;

    final Object a = k.attachment();

    // 2. 处理该 Channel
    if (a instanceof AbstractNioChannel) {
      processSelectedKey(k, (AbstractNioChannel) a);
    } else {
      @SuppressWarnings("unchecked")
      NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
      processSelectedKey(k, task);
    }

    // 3. 判断是否该再进行一次轮询
    if (needsToSelectAgain) {
      // null out entries in the array to allow to have it GC'ed once the Channel close
      selectedKeys.reset(i + 1);

      selectAgain();
      i = -1;
    }
  }
}

我们可以将该过程分为以下 3 个步骤:

(1)取出 IO 事件及对应的 Channel。

这里其实也能体会到优化过的 SelectedSelectionKeySet 的好处,遍历时便利的是数组,相对 JDK 原生的 HashSet,效率有所提高。

拿到当前的 SelectionKey 之后,将 selectedKey[i] 置为 null。这里简单解释以下这么做的理由:假设一个 NioEventLoop 平均每次轮询出 N 个 IO 事件,高峰期轮询出 3N 个事件,那么 selectedKeys 的物理长度要 >= 3N,如果每次处理这些 key,不置 selectedKeys[i] 为空,那么高峰期一过,这些保存在数组末尾的 selectedKeys[i] 对应的 SelectionKey 将一直无法被回收,SelectionKey 对应的对象可能不大,但是要知道,它可是有 attachment 的。这里的 attachment 具体是什么下面会讲到,但是有一点我们必须清楚,attachment 可能很大,这样一来,这些对象就一直存活,造成 JVM 无法回收,内存就泄露了。

(2)处理该 Channel。拿到对应的 attachment 之后,Netty 做了如下判断。

if (a instanceof AbstractNioChannel) {
  processSelectedKey(k, (AbstractNioChannel) a);
}

源码读到这,我们需要思考为什么会有这么一条判断,为什么说 attachment 可能会是 AbstractNioChannel 对象?这个问题其实在之前已经说过了,这里再来说一下。

我们的思路应该是找到底层 Selector,然后在 Selector 调用 register() 方法的时候,看一下注册到 Selector 上的对象是什么,我们在 AbstractNioChannel 中搜索到如下方法。

@Override
protected void doRegister() throws Exception {
  // ...
  selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
  // ...
}

javaChannel() 方法返回 Netty 类 AbstractChannel 对应的 JDK 底层 Channel 对象。

protected SelectableChannel javaChannel() {
  return ch;
}

由上不难推论出,Netty 的轮询注册机制其实是将 AbstractNioChannel 内部的 JDK 类 SelectableChannel 对象注册到 JDK 类 Selector 对象上,并且将 AbstractNioChannel 作为 SelectableChannel 对象的一个 attachment 附属上,这样在 JDK 轮询出某条 SelectableChannel 有 IO 事件发生时,就可以直接取出 AbstractNioChannel 进行后续操作。

在 Netty 的 Channel 中,有两大类型的 Channel,一个是 NioServerSocketChannel,由 boss NioEventLoop 负责处理;一个是 NioSocketChannel,由 worker NioEventLoop 负责处理,所以:

这两部分逻辑体现在如下方法里:

NioEventLoop

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
  final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();

  // ...

  int readyOps = k.readyOps();
  // We first need to call finishConnect() before try to trigger a read(...) or write(...)
  // as otherwise the NIO JDK channel implementation may throw a NotYetConnectedException.
  if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
    // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
    int ops = k.interestOps();
    ops &= ~SelectionKey.OP_CONNECT;
    k.interestOps(ops);

    unsafe.finishConnect();
  }

  // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
  if ((readyOps & SelectionKey.OP_WRITE) != 0) {
    // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
    ch.unsafe().forceFlush();
  }

  // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead to a spin loop
  if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
    unsafe.read();
  }

}

processSelectedKeysOptimized() 方法处理 attachment 的时候,还有一个 else 分支,我们也来分析一下。

if (a instanceof AbstractNioChannel) {
  processSelectedKey(k, (AbstractNioChannel) a);
} else {
  @SuppressWarnings("unchecked")
  NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
  processSelectedKey(k, task);
}

说明注册到 Selector 上的 attachment 还有另外一种类型,就是 NioTask,NioTask 主要用于当一个 SelectableChannel 注册到 Selector 的时候,执行一些任务。

NioTask 的定义如下。

public interface NioTask<C extends SelectableChannel> {
  void channelReady(C ch, SelectionKey key) throws Exception;
  void channelUnregistered(C ch, Throwable cause) throws Exception;
}

由于 NioTask 在 Netty 内部没有使用的地方,这里不过多展开介绍。

(3)判断是否该再进行一次轮询。

NioEventLoop

private boolean needsToSelectAgain;

private void processSelectedKeysOptimized() {
  for (int i = 0; i < selectedKeys.size; ++i) {
    // 1. 取出 IO 事件及其对应的 Channel
    // ...

    // 2. 处理该 Channel
    // ...

    // 3. 判断是否该再进行一次轮询
    if (needsToSelectAgain) {
      // null out entries in the array to allow to have it GC'ed once the Channel close
      selectedKeys.reset(i + 1);

      selectAgain();
      i = -1;
    }
  }
}

回忆一下 Netty Reactor 线程的前两个步骤,分别是抓取 IO 事件及处理 IO 事件。每次在抓取到 IO 事件之后,都会将 needsToSelectAgain 重置为 false,那么什么时候 needsToSelectAgain 会重新被设置成 true 呢?

全局搜索 needsToSelectAgain = true,只有下面一处。

NioEventLoop

private static final int CLEANUP_INTERVAL = 256;

void cancel(SelectionKey key) {
  key.cancel();
  cancelledKeys ++;
  if (cancelledKeys >= CLEANUP_INTERVAL) {
    cancelledKeys = 0;
    needsToSelectAgain = true;
  }
}

继续查看 cancel() 方法被调用的地方。

AbstractNioChannel

@Override
protected void doDeregister() throws Exception {
    eventLoop().cancel(selectionKey());
}

不难看出,Channel 从 Selector 上移除的时候,调用 cancel() 方法将 key 取消,并且在被取消的 key 到达 CLEANUP_INTERVAL 的时候,设置 needsToSelectAgain 为 true。也就是说,对于每个 NioEventLoop 而言,每隔 256 个 Channel 从 Selector 上移除的时候,就标记 needsToSelectAgain 为 true。

重新看回 NioEventLoop#processSelectedKeysOptimized() 的实现,也就是说每满 256 次,就会进入 if 代码块。首先,将数组中下标从 i+1 开始往后 size 个位置置空,方便 JVM 垃圾回收,然后调用 selectAgain 重新装填 SelectionKey 数组。

NioEventLoop

private void selectAgain() {
  needsToSelectAgain = false;
  selector.selectNow();
}

Netty 这么做的目的应该是每隔 256 次连接断开,重新清理一下 selectionKeys,这相当于用批量删除替代了 Selector 原 HashSet 数据结构的删除。

最后,对处理 IO 事件部分的内容总结一下:

  1. Netty 使用数组替代 JDK 原生的 HashSet 来提升处理 IO 事件的效率;
  2. 每个 SelectionKey 上都绑定了 Netty 类 AbstractNioChannel 对象作为 attachment,在处理每个 SelectionKey 的时候,都可以找到 AbstractNioChannel,然后通过 Pipeline 将处理串行到 ChannelHandler,回调到用户方法。

3.3 添加任务

前面我们已经知道,每一次事件轮询,首先都会检测出 IO 事件,如果有 IO 事件,那么就去处理。IO 事件主要包含新连接接入事件和连接的数据读写事件,这两步处理完之后,还有最后一步:处理任务队列,即 runAllTasks()。

这一部分,我们先分析在用户代码里添加任务的逻辑,后面再分析任务的执行。我们取 3 种典型的 Task 使用场景来分析。

(1)用户自定义普通任务

ctx.channel().eventLoop().execute(() -> {
    // ...
});

我们跟进 execute() 方法。

SingleThreadEventExecutor

@Override
public void execute(Runnable task) {
    if (task == null) {
        throw new NullPointerException("task");
    }

    // -------------------------------------------
    boolean inEventLoop = inEventLoop();
    if (inEventLoop) {
        addTask(task);
    } else {
        startThread();
        addTask(task);
        if (isShutdown() && removeTask(task)) {
            reject();
        }
    }
    // -------------------------------------------

    if (!addTaskWakesUp && wakesUpForTask(task)) {
        wakeup(inEventLoop);
    }
}

我们看到,不管是外部线程还是 Reactor 线程,execute 方法都会调用 addTask 方法。

protected void addTask(Runnable task) {
    if (task == null) {
        throw new NullPointerException("task");
    }
    if (!offerTask(task)) {
        reject(task);
    }
}

final boolean offerTask(Runnable task) {
    if (isShutdown()) {
        reject();
    }
    // LinkedBlockingQueue<Runnable>(maxPendingTasks)
    return taskQueue.offer(task);
}

我们跟到 offerTask 方法,这个 Task 就落地了,Netty 内部使用一个 taskQueue 将 Task 保存起来。上面我们已经分析过这个 taskQueue,它其实是一个 MPSC Queue,每一个 NioEventLoop 都与它一一对应。

Netty 使用 MPSC Queue,方便将外部线程的异步任务聚集,在 Reactor 线程内部用单线程来批量执行以提升性能。我们可以借鉴 Netty 的任务执行模式来处理类似多线程数据聚合,定时上报应用。

(2)非当前 Reactor 线程调用 Channel 的各类方法

// 当前线程为业务线程
channel.write(...)

这个场景是:在业务线程里,根据用户的标识,找到对应的 Channel,然后调用 Channel 的 write 类方法向该用户推送消息。

关于 channel.write 方法的调用,后面会详细剖析。这里我们只需要知道,最终 write 方法调用以下方法。

AbstractChannelHandlerContext

private void write(Object msg, boolean flush, ChannelPromise promise) {
    AbstractChannelHandlerContext next = findContextOutbound();
    final Object m = pipeline.touch(msg, next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        if (flush) {
            next.invokeWriteAndFlush(m, promise);
        } else {
            next.invokeWrite(m, promise);
        }
    } else {
        AbstractWriteTask task;
        if (flush) {
            task = WriteAndFlushTask.newInstance(next, m, promise);
        }  else {
            task = WriteTask.newInstance(next, m, promise);
        }
        safeExecute(executor, task, promise, m);
    }
}

private static void safeExecute(EventExecutor executor, Runnable runnable, ChannelPromise promise, Object msg) {
    // ...
    executor.execute(runnable); // 回退到 <场景1>
    // ...
}

外部线程在调用 write 的时候,executor.inEventLoop() 会返回 false,接下来进入 else 分支,将 write 操作封装成一个 WriteTask(这里仅仅是 write 而没有 flush,因此 flush 参数为 false),然后调用 safeExecute 方法来执行。

接下来的调用链就进入第一种场景了,但是和第一种场景有一个明显的区别就是,第一种场景的调用链的发起线程是 Reactor 线程,第二种线程的调用链的发起线程是用户线程,用户线程可能会有很多个,在这种场景下,Netty 的 MPSC Queue 就有了用武之地!

(3)用户自定义定时任务

这段代码的作用是,在一定时间之后执行某个任务。

ctx.channel().eventLoop().schedule(() -> {
    // ...
}, 1101, TimeUnit.SECONDS);

跟进 schedule 方法。

AbstractScheduledEventExecutor

@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
    // ...

    // 通过 ScheduledFutureTask 将用户自定义任务再次包装成一个 Netty 内部的任务
    return schedule(new ScheduledFutureTask<Void>(
            this, command, null, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));
}

// 完整逻辑见再下一个代码块
<V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
    // ...
    scheduledTaskQueue().add(task);
    // ...
    return task;
}

到了这,对于 scheduledTaskQueue,我们觉得有点似曾相识,在非定时任务的处理中,Netty 通过一个 MPSC 队列将任务落地。这里,是否也有一个类似的队列来承载这类定时任务呢?带着这个疑问,我们继续向前探索。

PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue() {
    if (scheduledTaskQueue == null) {
        scheduledTaskQueue = new DefaultPriorityQueue<ScheduledFutureTask<?>>(
                // Use same initial capacity as java.util.PriorityQueue
                SCHEDULED_FUTURE_TASK_COMPARATOR, 11);
    }
    return scheduledTaskQueue;
}

果不其然,scheduledTaskQueue() 方法会返回一个优先级队列,然后调用 add 方法将定时任务对象 ScheduledFutureTask 加入队列,但是,为什么可以直接使用优先级队列,而不需要考虑多线程的并发?

如果是在外部线程调用 schedule 方法,Netty 会将添加定时任务这个逻辑封装成一个普通的 task,这个 task 的任务是一个添加“添加定时任务”的任务,而不是添加定时任务,所以其实就退回到第二种场景。这样,对 PriorityQueue 的访问就变成单线程,即只有 Reactor 线程会访问,因此,不存在多线程并发问题。

以下是如何保证定时任务并发的完整逻辑。

<V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
    // --- 如果是 Reactor 线程,则直接往优先级队列中添加任务
    if (inEventLoop()) {
        scheduledTaskQueue().add(task);
    } else {
        // --- 如果是外部线程,则将添加定时任务这个逻辑进一步封装,退回到 <场景2>
        execute(new Runnable() {
            @Override
            public void run() {
                scheduledTaskQueue().add(task);
            }
        });
    }

    return task;
}

在阅读源码过程中,我们应该多问几个为什么。比如这里,为什么定时任务要保存在优先级队列中,我们可以先不看源码,来思考一下优先级队列的特性:优先级队列按照一定顺序来排列内部元素,内部元素必须是可以比较的,联系到这里的每个元素都是定时任务,那就说明定时任务是可以比较的,那么到底有哪些地方可以比较呢?

每个任务都有一个下一次执行的截止时间,截止时间是可以比较的。在截至时间相同的情况下,任务添加的顺序也是可以比较的。就像这样,在阅读源码过程中,一定要多和自己对话,多问几个为什么。

带着猜想,我们研究一下 ScheduledFutureTask,抽取出关键代码部分。

final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFuture<V>, PriorityQueueNode {
    // 每个任务都有一个唯一的 ID
    private static final AtomicLong nextTaskId = new AtomicLong();
    // 基准时间
    private static final long START_TIME = System.nanoTime();

    static long nanoTime() { return System.nanoTime() - START_TIME; }

    private final long id = nextTaskId.getAndIncrement();

    /* 0 - no repeat, >0 - repeat at fixed rate, <0 - repeat with fixed delay */
    /* 标识一个任务是否重复执行及以何种方式来定期执行 */
    private final long periodNanos;

    @Override
    public int compareTo(Delayed o) {
        if (this == o) {
            return 0;
        }

        ScheduledFutureTask<?> that = (ScheduledFutureTask<?>) o;
        long d = deadlineNanos() - that.deadlineNanos();
        if (d < 0) {
            return -1;
        } else if (d > 0) {
            return 1;
        } else if (id < that.id) {
            return -1;
        } else if (id == that.id) {
            throw new Error();
        } else {
            return 1;
        }
    }

    @Override
    public void run() { /* 见下文 */ }
}

这里,我们一眼就找到了 compareTo 方法,这个方法实现自 Comparable 接口。接下来,我们分析一下这个方法的实现。首先,两个定时任务的比较,确实是先比较任务的截止时间,在截至时间相同的情况下,再比较 ID,即任务添加的顺序;如果 ID 再相同,就抛出 Error。

这样,在执行任务定时任务的时候,就能保证截止时间最近的任务先执行。

Netty 里的定时任务机制,除了我们前面提到的 schedule 方法,还有以下两种:

  1. scheduleAtFixedRate:每隔一段时间执行一次;
  2. scheduleWithFixedDelay:隔相同时间再执行一次;

对于这 3种方法,调用的时候逻辑非常类似,唯一的区别就是,在构造 ScheduledFutureTask 的时候,某个参数不一样,这个参数就是 periodNanos。

/* 0 - no repeat, >0 - repeat at fixed rate, <0 - repeat with fixed delay */
/* 标识一个任务是否重复执行及以何种方式来定期执行 */
private final long periodNanos;
  1. schedule 方法:传递的 periodNanos = 0,表示这个任务不会被重复执行;
  2. scheduleAtFixedRate 方法:传递的 periodNanos > 0,表示以固定速度来执行任务,与任务执行的耗时无关;
  3. scheduleWithFixedDelay 方法:传递的 periodNanos < 0,表示以固定的延时时间来执行,即每次任务执行完毕之后,隔相同的时间再次执行。

如下代码,就是 Netty 处理这 3 种定时任务的逻辑。

ScheduledFutureTask

@Override
public void run() {
    assert executor().inEventLoop();

    // 1. 对应 schedule 方法,表示一次性任务;
    if (periodNanos == 0) {
        if (setUncancellableInternal()) {
            V result = task.call();
            setSuccessInternal(result);
        }
    } else {
        // check if is done as it may was cancelled
        if (!isCancelled()) {
            task.call();
            if (!executor().isShutdown()) {
                long p = periodNanos;
                if (p > 0) {
                    // 2. 对应 scheduleAtFixedRate 方法,表示以固定速度执行任务;
                    deadlineNanos += p;
                } else {
                    // 3. 对应 scheduleWithFixedDelay 方法,表示以固定的延时执行任务
                    deadlineNanos = nanoTime() - p; // 这里的 p 为负数,相当于加上一个正数
                }
                if (!isCancelled()) {
                    // scheduledTaskQueue can never be null as we lazy init it before submit the task!
                    Queue<ScheduledFutureTask<?>> scheduledTaskQueue =
                            ((AbstractScheduledEventExecutor) executor()).scheduledTaskQueue;
                    assert scheduledTaskQueue != null;
                    scheduledTaskQueue.add(this);
                }
            }
        }
    }
}

if (periodNanos == 0) 对应若干时间后执行一次的定时任务类型,执行完了该任务就结束了。否则,进入 else 代码块,再区分是哪种类型的任务。

  1. periodNanos > 0,表示以固定速度执行某个任务,和任务的持续时间无关,所以该任务下一次执行的截止时间为本次截止时间加上时间间隔 p;
  2. periodNanos < 0,表示每次任务执行完毕之后,间隔多长时间之后再次执行,所以该任务下一次执行的截止时间为当前时间加上间隔时间,-p 就表示加上一个正的间隔时间。

其实这里可以看出,Netty 的 3 种定时任务逻辑就是通过调整下一次任务的截至时间来运行的。修改完下一次执行的截至时间,把当前任务再次加入队列,就能够确保任务的适时执行。

Netty 内部的任务添加机制了解的差不多之后,我们就可以分析 Netty Reactor 线程执行总体框架的第三个步骤,即 runAllTasks 了。

3.4 执行任务

首先,我们将目光转向最外层的外观代码。

SingleThreadEventExecutor

runAllTasks(long timeoutNanos);

顾名思义,这行代码表示尽量在一定时间内将所有的任务都取出来执行一遍。timeoutNanos 表示该方法最多执行多长时间。

Netty 为什么要这么做?我们可以想一想,Reactor 线程如果在此停留的时间过长,那么将积攒许多的 IO 事件无法处理(见 Reactor 线程的前两个步骤),最终导致大量客户端请求阻塞。因此,在默认情况下,Netty 会精细控制内部任务队列的执行代码。

接下来详细分析任务执行逻辑,我们依然会抽取出关键代码。

/**
 * Poll all tasks from the task queue and run them via Runnable#run() method.  This method stops running
 * the tasks in the task queue and returns if it ran longer than timeoutNanos.
 */
protected boolean runAllTasks(long timeoutNanos) {
    // 1. 转移定时任务至 MPSC Queue
    fetchFromScheduledTaskQueue();
    Runnable task = pollTask();

    // 2. 计算本轮任务执行的截止时间
    final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
    long runTasks = 0;
    long lastExecutionTime;

    // 3. 执行任务
    for (;;) {
        safeExecute(task);

        runTasks ++;

        // Check timeout every 64 tasks because nanoTime() is relatively expensive.
        // XXX: Hard-coded value - will make it configurable if it is really a problem.
        if ((runTasks & 0x3F) == 0) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            if (lastExecutionTime >= deadline) {
                break;
            }
        }

        task = pollTask();
        if (task == null) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            break;
        }
    }

    afterRunningAllTasks();
    this.lastExecutionTime = lastExecutionTime;
    return true;
}

以上这段代码便是 Reactor 执行任务的逻辑,可以拆解成下面 3 个步骤:

  1. 从定时任务队列 scheduleTaskQueue 转移定时任务到普通任务队列 taskQueue;
  2. 计算本轮任务执行的截止时间;
  3. 执行任务;

执行这 3 个步骤,我们一步步来分析下。

(1)转移定时任务

转移定时任务至 MPSC Queue。首先调用 fetchFromScheduledTaskQueue() 方法,将快到期的定时任务转移到 MPSC Queue 里面。

SingleThreadEventExecutor

private boolean fetchFromScheduledTaskQueue() {
    // nanoTime 计算方法见下一个代码块
    long nanoTime = AbstractScheduledEventExecutor.nanoTime();
    Runnable scheduledTask  = pollScheduledTask(nanoTime);
    while (scheduledTask != null) {
        if (!taskQueue.offer(scheduledTask)) {
            // No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.
            scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
            return false;
        }
        scheduledTask  = pollScheduledTask(nanoTime);
    }
    return true;
}

可以看到,Netty 在把任务从 scheduleTaskQueue 转移到 taskQueue 的时候还是非常小心的。当 taskQueue#offer 失败的时候,需要把从 scheduleTaskQueue 里取出来的任务重新添加回去。

从 scheduleTaskQueue 中拉取一个定时任务的逻辑如下,传入的参数 nanoTime 为当前纳秒减去 scheduleTaskQueue 类被加载时的纳秒。

AbstractScheduledEventExecutor

protected static long nanoTime() {
	return ScheduledFutureTask.nanoTime();
}

ScheduledFutureTask

private static final long START_TIME = System.nanoTime();

static long nanoTime() {
    return System.nanoTime() - START_TIME;
}

这个时间就表示当前时间相对 ScheduledFutureTask 类加载的时间。前面关于 ScheduledFutureTask,其实我们没有展开介绍,在创建 ScheduledFutureTask 的时候,deadlineNanos 也被设置为相对于 ScheduledFutureTask 类加载的时间。

Netty 使用当前的相对时间与任务的相对截至时间进行比较。

/**
 * Return the Runnable which is ready to be executed with the given nanoTime.
 * You should use #nanoTime() to retrieve the the correct nanoTime.
 */
protected final Runnable pollScheduledTask(long nanoTime) {
    assert inEventLoop();

    Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
    ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();
    if (scheduledTask == null) {
        return null;
    }

    // 任务的相对截止时间与当前的相对时间比较
    if (scheduledTask.deadlineNanos() <= nanoTime) {
        scheduledTaskQueue.remove();
        return scheduledTask;
    }
    return null;
}

可以看到,只有在当前任务的截止时间已经到了时,该任务才会被移除队列。

(2)计算本轮任务执行的截止时间

到了这一步,所有截止时间已经到达的定时任务均被填充到普通任务队列。接下来,Netty 会计算一下本轮任务最多可以执行到什么时候。

SingleThreadEventExecutor

/**
 * Poll all tasks from the task queue and run them via Runnable#run() method.  This method stops running
 * the tasks in the task queue and returns if it ran longer than timeoutNanos.
 */
protected boolean runAllTasks(long timeoutNanos) {
    // 1. 转移定时任务至 MPSC Queue
    // ...

    // 2. 计算本轮任务执行的截止时间
    final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
    long runTasks = 0;
    long lastExecutionTime;

    // 3. 执行任务
    // ...
}

Netty 使用 Reactor 线程传入的超时时间 timeoutNanos 来计算当前任务循环的截止时间,并且使用 runTasks、lastExecutionTime 来时刻记录任务的状态。

(3)执行任务

/**
 * Poll all tasks from the task queue and run them via Runnable#run() method.  This method stops running
 * the tasks in the task queue and returns if it ran longer than timeoutNanos.
 */
protected boolean runAllTasks(long timeoutNanos) {
    // 1. 转移定时任务至 MPSC Queue
    // ...

    // 2. 计算本轮任务执行的截止时间
    // ...

    // 3. 执行任务
    for (;;) {
        // 3.1 不抛异常则执行任务
        safeExecute(task);
        // 3.2 累计当前已执行任务
        runTasks ++;
        // 3.3 每隔 64 次计算当前时间是否已过截止时间
        if ((runTasks & 0x3F) == 0) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            if (lastExecutionTime >= deadline) {
                break;
            }
        }
        // 3.4 判断本轮任务是否执行完毕
        task = pollTask();
        if (task == null) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            break;
        }
    }

    afterRunningAllTasks();
    this.lastExecutionTime = lastExecutionTime;
    return true;
}

这一步便是 Netty 里面执行所有任务的核心代码了。首先调用 safeExecute 来确保任务安全执行,忽略任何异常。

protected static void safeExecute(Runnable task) {
    try {
        task.run();
    } catch (Throwable t) {
        logger.warn("A task raised an exception. Task: {}", task, t);
    }
}

然后,将已运行任务 runTasks++。接着,每隔 0x3F(64) 个任务,判断当前时间是否超过本次 Reactor 任务循环的截止时间。如果超过,那就停止本轮任务的执行;如果没有超过,那就继续执行。最后,如果任务全部执行完毕,则记录下最后一次任务执行时间。

我们可以看到,Netty 对性能的优化考虑得相当周到:假设任务队列里有海量小任务,如果每次执行完任务都要判断是否到截止时间,那么效率是比较低的,而通过批量的方式,效率要高很多。

Netty 很多性能优化用的都是批量策略,后面还会看到。

3.5 小结

  1. NioEventLoop 在执行过程中不断检测是否有事件发生,如果有事件发生就处理,处理完事件之后再处理外部线程提交过来的异步任务;
  2. 在检测是否有事件发生的时候,为了保证异步任务的及时处理,只要有任务要处理,就立即停止事件检测,随即处理任务;
  3. 外部线程异步执行的任务分为两种:定时任务和普通任务,分别落地到 MPSC Queue 和 PriorityQueue,而 PriorityQueue 中的任务最终都会填充到 MPSC Queue 中处理;
  4. Netty 每隔 64 个任务检查一次是否该退出任务循环。

4. 总结

  1. NioEventLoopGroup 在用户代码中被创建,默认情况下会创建两倍 CPU 核数个 NioEventLoop;
  2. NioEventLoop 是懒启动的,boss NioEventLoop 在服务端启动的时候启动,worker NioEventLoop 在新连接接入的时候启动;
  3. 当 CPU 核数为 2 的幂时,为每一个新连接绑定 NioEventLoop 之后都会做一个或转与的优化;
  4. 每一个连接都对应一个 Channel,每个 Channel 都绑定唯一一个 NioEventLoop,每个 NioEventLoop 都对应一个 FastThreadLocalThread 线程实体和一个 Selector。因此,单个连接的所有操作都在一个线程上执行,是线程安全的;
  5. 每个 NioEventLoop 都对应一个 Selector,这个 Selector 可以批量处理注册到它上面的 Channel;
  6. 每个 NioEventLoop 的执行都包含事件检测、处理,以及异步任务的执行;
  7. 用户线程池在对 Channel 进行一些操作的时候,均为线程安全的,这是因为 Netty 会把外部线程的操作都封装成一个 Task 塞到这个 Channel 绑定的 NioEventLoop 中的 MPSC Queue,在该 NioEventLoop 事件循环的第 3 个过程中进行串行执行。

标签:NioEventLoop,Netty,task,Reactor,...,09,任务,线程
来源: https://www.cnblogs.com/liujiaqi1101/p/16133422.html