ForkJoinPool 的源码涉及到大量的位运算,这里会把核心部分说清楚,想要理解的更深入,还需要大家自己一点点追踪查看

结合上面的铺垫,你应该知道 ForkJoinPool 里有三个重要的角色:

源码分析的整个流程也是围绕这几个类的方法来说明,但在了解这三个角色之前,我们需要先了解 ForkJoinPool 都为这三个角色铺垫了哪些内容

故事就得从 ForkJoinPool 的构造方法说起

ForkJoinPool 构造方法

public ForkJoinPool() {
  this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
       defaultForkJoinWorkerThreadFactory, null, false);

public ForkJoinPool(int parallelism) {
  this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);

public ForkJoinPool(int parallelism,
                    ForkJoinWorkerThreadFactory factory,
                    UncaughtExceptionHandler handler,
                    boolean asyncMode) {
       asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
       "ForkJoinPool-" + nextPoolId() + "-worker-");

除了以上三个构造方法之外,在 JDK1.8 中还增加了另外一种初始化 ForkJoinPool 对象的方式(QQ:这是什么设计模式?):

static final ForkJoinPool common;

     * @return the common pool instance
     * @since 1.8
public static ForkJoinPool commonPool() {
  // assert common != null : "static init error";
  return common;

Common 是在静态块里面初始化的(只会被执行一次):

common = java.security.AccessController.doPrivileged
            (new java.security.PrivilegedAction<ForkJoinPool>() {
                public ForkJoinPool run() { return makeCommonPool(); }});

private static ForkJoinPool makeCommonPool() {
  int parallelism = -1;

  ... 其他默认初始化内容 

    if (parallelism < 0 && // default 1 less than #cores
        (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
      parallelism = 1;
  if (parallelism > MAX_CAP)
    parallelism = MAX_CAP;

  // 执行上面的构造方法
  return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,

因为这是一个单例通用的 ForkJoinPool,所以切记:

如果使用通用 ForkJoinPool,最好只做 CPU 密集型的计算操作,不要有不确定性的 I/O 内容在任务里面,以防拖垮整体


private ForkJoinPool(int parallelism,
                     ForkJoinWorkerThreadFactory factory,
                     UncaughtExceptionHandler handler,
                     int mode,
                     String workerNamePrefix) {
  this.workerNamePrefix = workerNamePrefix;
  this.factory = factory;
  this.ueh = handler;
  this.config = (parallelism & SMASK) | mode;
  long np = (long)(-parallelism); // offset ctl counts
  this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);


1parallelism并行度,这并不是定义的线程数,具体线程数,以及 WorkQueue 的长度等都是根据这个并行度来计算的,通过上面 makeCommonPool 方法可以知道,parallelism 默认值是 CPU 核心线程数减 1
2factory很常见了,创建 ForkJoinWorkerThread 的工厂接口
4mode上面说的 WorkQueue 的模式,LIFO/FIFO;


想知道 ForkJoinPool 的成员变量 config 要表达的意思,就要仔细拆开来看

static final int SMASK        = 0xffff;        // short bits == max index

this.config = (parallelism & SMASK) | mode;

parallelism & SMASK 其实就是要保证并行度的值不能大于 SMASK,上面所有的构造方法在传入 parallelism 的时候都会调用 checkParallelism 来检查合法性:

static final int MAX_CAP      = 0x7fff;        // max #workers - 1

private static int checkParallelism(int parallelism) {
        if (parallelism <= 0 || parallelism > MAX_CAP)
            throw new IllegalArgumentException();
        return parallelism;

可以看到 parallelism 的最大值就是 MAX_CAP 了,0x7fff 肯定小于0xffff。所以 config 的值其实就是:

this.config = parallelism | mode;

这里假设 parallelism 就是 MAX_CAP , 然后与 mode 进行或运算,其中 mode 有三种:


 // Mode bits for ForkJoinPool.config and WorkQueue.config
 static final int MODE_MASK    = 0xffff << 16;  // top half of int
 static final int LIFO_QUEUE   = 0;
 static final int FIFO_QUEUE   = 1 << 16;
 static final int SHARED_QUEUE = 1 << 31;       // must be negative

所以 parallelism | mode 根据 mode 的不同会产生两种结果,但是会得到一个确认的信息:

config 的第 17 位表示模式,低 15 位表示并行度 parallelism

当我们需要从 config 中获取模式 mode 时候,只需要用mode 掩码 (MODE_MASK)和 config 做与运算就可以了

所以一张图概括 config 就是:

long np = (long)(-parallelism); // offset ctl counts

上面这段代码就是将并行度 parallelism 补码转换为 long 型,以 MAX_CAP 作为并行度为例,np 的值就是

这个 np 的值,就会用作 ForkJoinPool 成员变量 ctl 的计算:

// Active counts 活跃线程数
private static final int  AC_SHIFT   = 48;
private static final long AC_UNIT    = 0x0001L << AC_SHIFT;
private static final long AC_MASK    = 0xffffL << AC_SHIFT;

// Total counts 总线程数
private static final int  TC_SHIFT   = 32;
private static final long TC_UNIT    = 0x0001L << TC_SHIFT;
private static final long TC_MASK    = 0xffffL << TC_SHIFT;
private static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15); // sign

// 计算 ctl 
this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);

最后二者再进行或运算,如果并行度还是 MAX_CAP ,那 ctl 的最后结果就是:

到这里,我们才阅读完一个构造函数的内容,从最终的结论可以看出,初始化后 AC = TC,并且 ctl 是一个小于零的数,ctl 是 64 位的 long 类型,低 32 位是如何构造的并没有在构造函数中体现出来,但注释给了明确的说明:

* Bits and masks for field ctl, packed with 4 16 bit subfields:
* AC: Number of active running workers minus target parallelism
* TC: Number of total workers minus target parallelism
* SS: version count and status of top waiting thread
* ID: poolIndex of top of Treiber stack of waiters
* When convenient, we can extract the lower 32 stack top bits
* (including version bits) as sp=(int)ctl.  The offsets of counts
* by the target parallelism and the positionings of fields makes
* it possible to perform the most common checks via sign tests of
* fields: When ac is negative, there are not enough active
* workers, when tc is negative, there are not enough total
* workers.  When sp is non-zero, there are waiting workers.  To
* deal with possibly negative fields, we use casts in and out of
* "short" and/or signed shifts to maintain signedness.
* Because it occupies uppermost bits, we can add one active count
* using getAndAddLong of AC_UNIT, rather than CAS, when returning
* from a blocked join.  Other updates entail multiple subfields
* and masking, requiring CAS.

这段注释主要说明了低 32 位的作用(后面会从源码中体现出来,这里先有个印象会对后面源码阅读有帮助),按注释含义先完善一下 ctl 的值:

注释中还说,另 sp=(int)ctl,即获取 64 位 ctl 的低 32 位(SS | ID),因为低 32 位都是创建出线程之后才会存在的值,所以推断出,如果 sp != 0, 就存在等待的工作线程,唤醒使用就行,不用创建新的线程。这样就通过 ctl 可以获取到有关线程所需要的一切信息了

除了构造方法所构建的成员变量,ForkJoinPool 还有一个非常重要的成员变量 runState,和你之前了解的知识一样,线程池也需要状态来进行管理

volatile int runState;               // lockable status

// runState bits: SHUTDOWN must be negative, others arbitrary powers of two
private static final int  RSLOCK     = 1;       //线程池被锁定
private static final int  RSIGNAL    = 1 << 1;    //线程池有线程需要唤醒
private static final int  STARTED    = 1 << 2;  //线程池已经初始化
private static final int  STOP       = 1 << 29;    //线程池停止
private static final int  TERMINATED = 1 << 30; //线程池终止
private static final int  SHUTDOWN   = 1 << 31; //线程池关闭

runState 有上面 6 种状态切换,按注释所言,只有 SHUTDOWN 状态是负数,其他都是整数,在并发环境更改状态必然要用到锁,ForkJoinPool 对线程池加锁和解锁分别由 lockRunState 和 unlockRunState 来实现 (这两个方法可以暂且不用深入理解,可以暂时跳过,只需要理解它们是帮助安全更改线程池状态的锁即可)

不深入了解可以,但是我不能不写啊...... 你待会不是得回看吗?


* Acquires the runState lock; returns current (locked) runState.
// 从方法注释中看到,该方法一定会返回 locked 的 runState,也就是说一定会加锁成功
private int lockRunState() {
  int rs;
  return ((((rs = runState) & RSLOCK) != 0 ||
           !U.compareAndSwapInt(this, RUNSTATE, rs, rs |= RSLOCK)) ?
          awaitRunStateLock() : rs);
* Spins and/or blocks until runstate lock is available.  See
* above for explanation.
private int awaitRunStateLock() {
  Object lock;
  boolean wasInterrupted = false;
  for (int spins = SPINS, r = 0, rs, ns;;) {
    if (((rs = runState) & RSLOCK) == 0) {
      // 通过CAS加锁
      if (U.compareAndSwapInt(this, RUNSTATE, rs, ns = rs | RSLOCK)) {
        if (wasInterrupted) {
          try {
            // 重置线程终端标记
          } catch (SecurityException ignore) {
            // 这里竟然 catch 了个寂寞
        // 加锁成功返回最新的 runState,for 循环的唯一正常出口
        return ns;
    else if (r == 0)
      r = ThreadLocalRandom.nextSecondarySeed();
    else if (spins > 0) {
      r ^= r << 6; r ^= r >>> 21; r ^= r << 7; // xorshift
      if (r >= 0)
    // Flag1 如果是其他线程正在初始化占用锁,则调用 yield 方法让出 CPU,让其快速初始化
    else if ((rs & STARTED) == 0 || (lock = stealCounter) == null)
      Thread.yield();   // initialization race
    // Flag2 如果其它线程持有锁,并且线程池已经初始化,则将唤醒位标记为1
    else if (U.compareAndSwapInt(this, RUNSTATE, rs, rs | RSIGNAL)) {
      // 进入互斥锁
      synchronized (lock) {
        // 再次判断,如果等于0,说明进入互斥锁前刚好有线程进行了唤醒,就不用等待,直接进行唤醒操作即可,否则就进入等待
        if ((runState & RSIGNAL) != 0) {
          try {
          } catch (InterruptedException ie) {
            if (!(Thread.currentThread() instanceof
              wasInterrupted = true;

上面代码 33 ~ 34 (Flag1)行以及 36 ~ 50 (Flag2) 行,如果你没看后续代码,现在来理解是有些困难的,我这里先提前说明一下:

Flag1: 当完整的初始化 ForkJoinPool 时,直接利用了 stealCounter 这个原子变量,因为初始化时(调用 externalSubmit 时),才会对 StealCounter 赋值。所以,这里的逻辑是,当状态不是 STARTED 或者 stealCounter 为空,让出线程等待,也就是说,别的线程还没初始化完全,让其继续占用锁初始化即可

Flag2: 我们在讲等待/通知模型时就说,不要让无限自旋尝试,如果资源不满足就等待,如果资源满足了就通知,所以,如果 (runState & RSIGNAL) == 0 成立,说明有线程需要唤醒,直接唤醒就好,否则也别浪费资源,主动等待一会


Q1: 既然是加锁,为什么不用已有的轮子 ReentrantLock 呢?

PS:如果你读过并发系列 Java AQS队列同步器以及ReentrantLock的应用 ,你会知道 ReentrantLock 是用一个完整字段 state 来控制同步状态。但这里在竞争锁的时候还会判断线程池的状态,如果是初始化状态主动 yield 放弃 CPU 来减少竞争;另外,用一个完整的 runState 不同位来表示状态也体现出更细的粒度吧

Q2: synchronized 大法虽好,但是我们都知道这是比较重量级的锁,为什么还在这里应用了呢?

PS: 首先 synchronized 经过不断优化,没有它刚诞生时那么重,另外按照 Flag 2 的代码含义,进入 synchronized 同步块的概率还是很低的,可以用最简单的方式稳稳兜底(奥卡姆剃刀了原理?)

有加锁自然要解锁,向下看 unlockRunState



     * Unlocks and sets runState to newRunState.
     * @param oldRunState a value returned from lockRunState
     * @param newRunState the next value (must have lock bit clear).
    private void unlockRunState(int oldRunState, int newRunState) {
        if (!U.compareAndSwapInt(this, RUNSTATE, oldRunState, newRunState)) {
            Object lock = stealCounter;
            runState = newRunState;              // clears RSIGNAL bit
            if (lock != null)
                synchronized (lock) { lock.notifyAll(); }

这两个方法贯穿着后续代码分析的始终,多注意 unlockRunState 的入参即可,另外你也看到了通知都是用的 notifyAll,而不是 notify,这个问题我们之前重点说明过,你还记得为什么吗?如果不记得,打开并发编程之等待通知机制 回忆一下吧



回到本文最开始带有 main 函数的 demo,我们向 ForkJoinPool 提交任务调用的是 invoke 方法, 其实 ForkJoinPool 还支持 submit 和 execute 两种方式来提交任务。并发的玩法非常类似,这三类方法的作业也很好区分:


public <T> T invoke(ForkJoinTask<T> task) {
  if (task == null)
    throw new NullPointerException();
  return task.join();

public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
  if (task == null)
    throw new NullPointerException();
  return task;

public void execute(ForkJoinTask<?> task) {
  if (task == null)
    throw new NullPointerException();

相信你已经发现了,提交任务的方法都会调用 externalPush(task) 这个用法,源码的主角终于要登场了


如果你看 externalPush 代码,第一行就是声明一个 WorkQueue 数组变量,为了后续流程更加丝滑,咱还得铺垫一点 WorkQueue 的知识(又要铺垫)



static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M

// Instance fields
volatile int scanState;    // versioned, <0: inactive; odd:scanning
int stackPred;             // pool stack (ctl) predecessor  前任池(WorkQueue[])索引,由此构成一个栈
int nsteals;               // number of steals  偷取的任务个数
int hint;                  // randomization and stealer index hint  记录偷取者的索引,方便后面顺藤摸瓜
int config;                // pool index and mode
volatile int qlock;        // 1: locked, < 0: terminate; else 0
volatile int base;         // index of next slot for poll
int top;                   // index of next slot for push
ForkJoinTask<?>[] array;   // the elements (initially unallocated)  任务数组
final ForkJoinPool pool;   // the containing pool (may be null)
final ForkJoinWorkerThread owner; // owning thread or null if shared  当前工作队列的工作线程,共享模式下为null
volatile Thread parker;    // == owner during call to park; else null  调用park阻塞期间为owner,其他情况为null
volatile ForkJoinTask<?> currentJoin;  // task being joined in awaitJoin  记录当前join来的任务
volatile ForkJoinTask<?> currentSteal; // mainly used by helpStealer  记录从其他工作队列偷取过来的任务

我们上面说了,WorkQueue 是一个双端队列,线程池有 runState,WorkQueue 有 scanState

操作线程池需要锁,操作队列也是需要锁的,qlock 就派上用场了

WorkQueue 中也有个 config,但是和 ForkJoinPool 中的是不一样的,WorkQueue 中的config 记录了该 WorkQueue 在 WorkQueue[] 数组的下标以及 mode



文章前面说过,task 会细分成 submission task 和 worker taskworker task 是 fork 出来的,那从这个入口进入的,自然也就是 submission task 了,也就是说:

  • 通过invoke() | submit() | execute() 等方法提交的 task, 是 submission task,会放到 WorkQueue 数组的偶数索引位置
  • 调用 fork() 方法生成出的任务,叫 worker task,会放到 WorkQueue 数组的奇数索引位置


     * Tries to add the given task to a submission queue at
     * submitter's current queue. Only the (vastly) most common path
     * is directly handled in this method, while screening for need
     * for externalSubmit.
     * @param task the task. Caller must ensure non-null.
    final void externalPush(ForkJoinTask<?> task) {
        WorkQueue[] ws; WorkQueue q; int m;
          //Flag1: 通过ThreadLocalRandom产生随机数,用于下面计算槽位索引
        int r = ThreadLocalRandom.getProbe();
        int rs = runState; //初始状态为0
          //Flag2: 如果ws,即ForkJoinPool中的WorkQueue数组已经完成初始化,且根据随机数定位的index存在workQueue,且cas的方式加锁成功
        if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
            (q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 &&
            U.compareAndSwapInt(q, QLOCK, 0, 1)) {
            ForkJoinTask<?>[] a; int am, n, s;
            if ((a = q.array) != null && 
                (am = a.length - 1) > (n = (s = q.top) - q.base)) {  //组长度大于任务个数,不需要扩容
                int j = ((am & s) << ASHIFT) + ABASE; //WorkQueue中的任务数组不为空
                U.putOrderedObject(a, j, task); //向Queue中放入任务
                U.putOrderedInt(q, QTOP, s + 1);//top值加一
                U.putIntVolatile(q, QLOCK, 0);  //对WorkQueue操作解锁
                  if (n <= 1)
                    signalWork(ws, q);
            U.compareAndSwapInt(q, QLOCK, 1, 0);
          //Flag3: 不满足上述条件,也就是说上面的这些 WorkQueue[]等都不存在,就要通过这个方法一切从头开始创建

上面加了三处 Flag,为了让大家更好的理解代码还是有必要做进一步说明的:

Flag1: ThreadLocalRandom 是 ThreadLocal 的衍生物,每个线程默认的 probe 是 0,当线程调用ThreadLocalRandom.current()时,会初始化 seed 和 probe,维护在线程内部,这里就知道是生成一个随机数就好,具体细节还是值得大家自行看一下

Flag2: 这里包含的信息还是非常多的

// 二进制为:0000 0000 0000 0000 0000 0000 0111 1110 
static final int SQMASK       = 0x007e;        // max 64 (even) slots

Flag3: 看过 flag2 的描述,你也就很好理解 Flag 3 了,如果是第一次提交任务,必走 Flag 3 的 externalSubmit 方法


这个方法很长,但没超过 80 行,具体请看方法注释

    private void externalSubmit(ForkJoinTask<?> task) {
        int r;                                    // initialize caller's probe
        if ((r = ThreadLocalRandom.getProbe()) == 0) {
            r = ThreadLocalRandom.getProbe();
        for (;;) {
            WorkQueue[] ws; WorkQueue q; int rs, m, k;
            boolean move = false;
              // 如果线程池的状态为终止状态,则帮助终止
            if ((rs = runState) < 0) {
                tryTerminate(false, false);     // help terminate
                throw new RejectedExecutionException();
              //Flag1: 再判断一次状态是否为初始化,因为在lockRunState过程中有可能状态被别的线程更改了
            else if ((rs & STARTED) == 0 ||     // initialize
                     ((ws = workQueues) == null || (m = ws.length - 1) < 0)) {
                int ns = 0;
                  //Flag1.1: 加锁
                rs = lockRunState();
                try {
                    if ((rs & STARTED) == 0) {
                          // 初始化stealcounter的值(任务窃取计数器,原子变量)
                        U.compareAndSwapObject(this, STEALCOUNTER, null,
                                               new AtomicLong());
                        // create workQueues array with size a power of two
                        int p = config & SMASK; // ensure at least 2 slots
                          //Flag1.2: 如果你看过HashMap 的源码,这个就很好理解了,获取2次幂大小
                        int n = (p > 1) ? p - 1 : 1;
                        n |= n >>> 1; n |= n >>> 2;  n |= n >>> 4;
                        n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1;
                          //初始化 WorkQueue 数组
                        workQueues = new WorkQueue[n];
                          // 标记初始化完成
                        ns = STARTED;
                } finally {
                      // 解锁
                    unlockRunState(rs, (rs & ~RSLOCK) | ns);
              //Flag2 上面分析过,取偶数位槽位,将任务放进偶数槽位
            else if ((q = ws[k = r & m & SQMASK]) != null) {
                  // 对 WorkQueue 加锁
                if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) {
                    ForkJoinTask<?>[] a = q.array;
                    int s = q.top;
                      // 初始化任务提交标识
                    boolean submitted = false; // initial submission or resizing
                    try {                      // locked version of push
                        if ((a != null && a.length > s + 1 - q.base) ||
                            (a = q.growArray()) != null) {
                            int j = (((a.length - 1) & s) << ASHIFT) + ABASE;
                            U.putOrderedObject(a, j, task);
                            U.putOrderedInt(q, QTOP, s + 1);
                            submitted = true;
                    } finally {
                        U.compareAndSwapInt(q, QLOCK, 1, 0);
                      // 任务提交成功了
                    if (submitted) {
                        signalWork(ws, q);
                move = true;                   // move on failure
              //Flag3: 接Flag2,如果找到的槽位是空,则要初始化一个WorkQueue
            else if (((rs = runState) & RSLOCK) == 0) { // create new queue
                q = new WorkQueue(this, null);
                  // 设置工作队列的窃取线索值
                q.hint = r;
                  // 如上面 WorkQueue 中config 的介绍,记录当前WorkQueue在WorkQueue[]数组中的值,和队列模式
                q.config = k | SHARED_QUEUE;
                  // 初始化为 inactive 状态
                q.scanState = INACTIVE;
                rs = lockRunState();           // publish index
                if (rs > 0 &&  (ws = workQueues) != null &&
                    k < ws.length && ws[k] == null)
                    ws[k] = q;                 // else terminated
                unlockRunState(rs, rs & ~RSLOCK);
                move = true;                   // move if busy
            if (move)
                r = ThreadLocalRandom.advanceProbe(r);

Flag1.1 : 有个细节需要说一下,我们在 Java AQS队列同步器以及ReentrantLock的应用 时提到过使用锁的范式以及为什么要这样用,ForkJoinPool 这里同样遵循这种范式

Lock lock = new ReentrantLock();

Flag1.2: 简单描述这个过程,就是根据不同的并行度来初始化不同大小的 WorkQueue[]数组,数组大小要求是 2 的 n 次幂,所以给大家个表格直观理解一下并行度和队列容量的关系:

5 ~ 816
9 ~ 1632

Flag 1,2,3: 如果你理解了上面这个方法,很显然,第一次执行这个方法内部的逻辑顺序应该是 Flag1——> Flag3——>Flag2

externalSubmit 如果任务成功提交,就会调用 signalWork 方法了


前面铺垫的知识要大规模派上用场(一大波僵尸来袭),are you ready?

如果 ForkJoinPool 的 ctl 成员变量的作用已经忘了,赶紧向上翻重新记忆一下

static final int SS_SEQ       = 1 << 16;       // version count

final void signalWork(WorkQueue[] ws, WorkQueue q) {
        long c; int sp, i; WorkQueue v; Thread p;
          // ctl 小于零,说明活动的线程数 AC 不够
        while ((c = ctl) < 0L) {                       // too few active
              // 取ctl的低32位,如果为0,说明没有等待的线程
            if ((sp = (int)c) == 0) {                  // no idle workers
                  // 取TC的高位,如果不等于0,则说明目前的工作着还没有达到并行度
                if ((c & ADD_WORKER) != 0L)            // too few workers
                      //添加 Worker,也就是说要创建线程了
            if (ws == null)                            // unstarted/terminated
            if (ws.length <= (i = sp & SMASK))         // terminated
            if ((v = ws[i]) == null)                   // terminating
              //程序执行到这里,说明有空闲线程,计算下一个scanState,增加了版本号,并且调整为 active 状态
            int vs = (sp + SS_SEQ) & ~INACTIVE;        // next scanState
            int d = sp - v.scanState;                  // screen CAS
            //计算下一个ctl的值,活动线程数 AC + 1,通过stackPred取得前一个WorkQueue的索引,重新设置回sp,行程最终的ctl值
              long nc = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & v.stackPred);
              //更新 ctl 的值
            if (d == 0 && U.compareAndSwapLong(this, CTL, c, nc)) {
                v.scanState = vs;                      // activate v
                  if ((p = v.parker) != null)
            if (q != null && q.base == q.top)          // no more work

假设程序刚开始执行,那么活动线程数以及总线程数肯定都没达到并行度要求,这时就会调用 tryAddWorker 方法了


tryAddWorker 的逻辑就非常简单了,因为是操作线程池,同样会用到 lockRunState/unlockRunState 的锁控制

    private void tryAddWorker(long c) {
        boolean add = false;
        do {
            long nc = ((AC_MASK & (c + AC_UNIT)) |
                       (TC_MASK & (c + TC_UNIT)));
            if (ctl == c) {
                int rs, stop;                 // check if terminating
                if ((stop = (rs = lockRunState()) & STOP) == 0)
                      //更新ctl 的值,
                    add = U.compareAndSwapLong(this, CTL, c, nc);
                unlockRunState(rs, rs & ~RSLOCK);
                if (stop != 0)
                if (add) {
         // 重新获取ctl,并且没有达到最大线程数,并且没有空闲的线程
        } while (((c = ctl) & ADD_WORKER) != 0L && (int)c == 0);

一切顺利,就要调用 createWorker 方法来创建真正的 Worker 了,形势逐渐明朗


介绍过了 WorkerQueue 和 ForkJoinTask,上文说的三个重要角色中的最后一个 ForkJoinWorkerThread 终于登场了

    private boolean createWorker() {
        ForkJoinWorkerThreadFactory fac = factory;
        Throwable ex = null;
        ForkJoinWorkerThread wt = null;
        try {
            if (fac != null && (wt = fac.newThread(this)) != null) {
                return true;
        } catch (Throwable rex) {
            ex = rex;
        deregisterWorker(wt, ex);
        return false;

Worker 线程是如何与 WorkQueue 对应的,就藏在 fac.newThread(this) 这个方法里面,下面这点代码展示一下调用过程

public ForkJoinWorkerThread newThread(ForkJoinPool pool);

static final class DefaultForkJoinWorkerThreadFactory
  implements ForkJoinWorkerThreadFactory {
  public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
    return new ForkJoinWorkerThread(pool);

protected ForkJoinWorkerThread(ForkJoinPool pool) {
  // Use a placeholder until a useful name can be set in registerWorker
  this.pool = pool;
  this.workQueue = pool.registerWorker(this);

很显然核心内容在 registerWorker 方法里面了


WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner) {
  this.pool = pool;
  this.owner = owner;
  // Place indices in the center of array (that is not yet allocated)
  base = top = INITIAL_QUEUE_CAPACITY >>> 1;

final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
  UncaughtExceptionHandler handler;
  wt.setDaemon(true);                           // configure thread
  if ((handler = ueh) != null)
  WorkQueue w = new WorkQueue(this, wt);
  int i = 0;                                    // assign a pool index
  int mode = config & MODE_MASK;
  int rs = lockRunState();
  try {
    WorkQueue[] ws; int n;                    // skip if no array
    if ((ws = workQueues) != null && (n = ws.length) > 0) {
      int s = indexSeed += SEED_INCREMENT;  // unlikely to collide
      int m = n - 1;
      i = ((s << 1) | 1) & m;               // odd-numbered indices
      if (ws[i] != null) {                  // collision
        int probes = 0;                   // step by approx half n
        int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2;
        while (ws[i = (i + step) & m] != null) {
          if (++probes >= n) {
            workQueues = ws = Arrays.copyOf(ws, n <<= 1);
            m = n - 1;
            probes = 0;
      w.hint = s;                           // use as random seed
      w.config = i | mode;
      w.scanState = i;                      // publication fence
      ws[i] = w;
  } finally {
    unlockRunState(rs, rs & ~RSLOCK);
  wt.setName(workerNamePrefix.concat(Integer.toString(i >>> 1)));
  return w;

到这里线程是顺利创建成功了,可是如果线程没有创建成功,就需要 deregisterWorker来做善后工作了


deregisterWorker 方法接收刚刚创建的线程引用和异常作为参数,来做善后工作,将 registerWorker 相关工作撤销回来

final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
  WorkQueue w = null;
  if (wt != null && (w = wt.workQueue) != null) {
    WorkQueue[] ws;                           // remove index from array
    int idx = w.config & SMASK;
    int rs = lockRunState();
    if ((ws = workQueues) != null && ws.length > idx && ws[idx] == w)
      ws[idx] = null;
    unlockRunState(rs, rs & ~RSLOCK);
  long c;                                       // decrement counts
  do {} while (!U.compareAndSwapLong
               (this, CTL, c = ctl, ((AC_MASK & (c - AC_UNIT)) |
                                     (TC_MASK & (c - TC_UNIT)) |
                                     (SP_MASK & c))));
  if (w != null) {
    w.qlock = -1;                             // ensure set
    w.cancelAll();                            // cancel remaining tasks
  for (;;) {                                    // possibly replace
    WorkQueue[] ws; int m, sp;
    if (tryTerminate(false, false) || w == null || w.array == null ||
        (runState & STOP) != 0 || (ws = workQueues) == null ||
        (m = ws.length - 1) < 0)              // already terminating
    if ((sp = (int)(c = ctl)) != 0) {         // wake up replacement
      if (tryRelease(c, ws[sp & m], AC_UNIT))
    else if (ex != null && (c & ADD_WORKER) != 0L) {
      tryAddWorker(c);                      // create replacement
    else                                      // don't need replacement
  if (ex == null)                               // help clean on way out
  else                                          // rethrow

总之 deregisterWorker 方法从线程池里注销线程,清空WorkQueue,同时更新ctl,最后做可能的替换,根据线程池的状态决定是否找一个自己的替代者:

deregisterWorker 线程解释清楚了是为了帮助大家完整理解流程,但 registerWorker 成功后的流程还没走完,咱得继续,有了 Worker,那就调用 wt.start() 干活吧


ForkJoinWorkerThread 继承自Thread,调用start() 方法后,自然要调用自己重写的 run() 方法

public void run() {
  if (workQueue.array == null) { // only run once
    Throwable exception = null;
    try {
    } catch (Throwable ex) {
      exception = ex;
    } finally {
      try {
      } catch (Throwable ex) {
        if (exception == null)
          exception = ex;
      } finally {
        pool.deregisterWorker(this, exception);

方法的重点自然是进入到 runWorker


runWorker 是很常规的三部曲操作:


    final void runWorker(WorkQueue w) {
        w.growArray();                   // allocate queue
        int seed = w.hint;               // initially holds randomization hint
        int r = (seed == 0) ? 1 : seed;  // avoid 0 for xorShift
          for (ForkJoinTask<?> t;;) {
            if ((t = scan(w, r)) != null)
            else if (!awaitWork(w, r))
            r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift

先来看 scan 方法


ForkJoinPool 的任务窃取机制要来了,如何 steal 的,就藏在scan 方法中

private ForkJoinTask<?> scan(WorkQueue w, int r) {
  WorkQueue[] ws; int m;
  if ((ws = workQueues) != null && (m = ws.length - 1) > 0 && w != null) {
    int ss = w.scanState;                     // initially non-negative
    for (int origin = r & m, k = origin, oldSum = 0, checkSum = 0;;) {
      WorkQueue q; ForkJoinTask<?>[] a; ForkJoinTask<?> t;
      int b, n; long c;
      if ((q = ws[k]) != null) {
        if ((n = (b = q.base) - q.top) < 0 &&
            (a = q.array) != null) {      // non-empty
          long i = (((a.length - 1) & b) << ASHIFT) + ABASE;
          if ((t = ((ForkJoinTask<?>)
                    U.getObjectVolatile(a, i))) != null &&
              q.base == b) {
            if (ss >= 0) {
              if (U.compareAndSwapObject(a, i, t, null)) {
                q.base = b + 1;
                if (n < -1)       // signal others
                  signalWork(ws, q);
                return t;
            else if (oldSum == 0 &&   // try to activate
                     w.scanState < 0)
              tryRelease(c = ctl, ws[m & (int)c], AC_UNIT);
          if (ss < 0)                   // refresh
            ss = w.scanState;
          r ^= r << 1; r ^= r >>> 3; r ^= r << 10;
          origin = k = r & m;           // move and rescan
          oldSum = checkSum = 0;
        checkSum += b;
      if ((k = (k + 1) & m) == origin) {    // continue until stable
        if ((ss >= 0 || (ss == (ss = w.scanState))) &&
            oldSum == (oldSum = checkSum)) {
          if (ss < 0 || w.qlock < 0)    // already inactive
          int ns = ss | INACTIVE;       // try to inactivate
          long nc = ((SP_MASK & ns) |
                     (UC_MASK & ((c = ctl) - AC_UNIT)));
          w.stackPred = (int)c;         // hold prev stack top
          U.putInt(w, QSCANSTATE, ns);
          if (U.compareAndSwapLong(this, CTL, c, nc))
            ss = ns;
            w.scanState = ss;         // back out
        checkSum = 0;
  return null;

如果顺利扫描到任务,那就要调用 runTask 方法来真正的运行这个任务了


马上就接近真相了,steal 到任务了,就干点正事吧

        final void runTask(ForkJoinTask<?> task) {
            if (task != null) {
                scanState &= ~SCANNING; // mark as busy
                  //Flag1: 记录当前的任务是偷来的,至于如何执行task,是我们写在compute方法中的,我们一会看doExec() 方法
                (currentSteal = task).doExec();
                U.putOrderedObject(this, QCURRENTSTEAL, null); // release for GC
                ForkJoinWorkerThread thread = owner;
                if (++nsteals < 0)      // collect on overflow
                scanState |= SCANNING;
                if (thread != null)

Flag1: doExec 方法才是真正执行任务的关键,它是链接我们自定义 compute 方法的核心,来看 doExec 方法


形势一片大好,挺住,揭开 exec 的面纱,就看到本质了

//ForkJoinTask中的抽象方法,RecursiveTask 和 RecursiveAction 都重写了它
protected abstract boolean exec();

final int doExec() {
  int s; boolean completed;
  if ((s = status) >= 0) {
    try {
      completed = exec();
    } catch (Throwable rex) {
      return setExceptionalCompletion(rex);
    if (completed)
      s = setCompletion(NORMAL);
  return s;

//RecursiveTask重写的内容,终于看到我们文章开头 demo 中的compute 了
protected final boolean exec() {
  result = compute();
  return true;

到这里,我们已经看到本质了,绕了这么一大圈,终于和我们自己重写的compute方法联系到了一起,真是不容易,但是 runWorker 三部曲还差最后一曲 awaitWork 没谱,我们来看看


上面说的是 scan 到了任务,要是没有scan到任务,那就得将当前线程阻塞一下,具体标注在注释中,可以简单了解一下

private boolean awaitWork(WorkQueue w, int r) {
  if (w == null || w.qlock < 0)                 // w is terminating
    return false;
  for (int pred = w.stackPred, spins = SPINS, ss;;) {
    if ((ss = w.scanState) >= 0)
    else if (spins > 0) {
      r ^= r << 6; r ^= r >>> 21; r ^= r << 7;
      if (r >= 0 && --spins == 0) {         // randomize spins
        WorkQueue v; WorkQueue[] ws; int s, j; AtomicLong sc;
        if (pred != 0 && (ws = workQueues) != null &&
            (j = pred & SMASK) < ws.length &&
            (v = ws[j]) != null &&        // see if pred parking
            (v.parker == null || v.scanState >= 0))
          spins = SPINS;                // continue spinning
    else if (w.qlock < 0)                     // recheck after spins
      return false;
    else if (!Thread.interrupted()) {
      long c, prevctl, parkTime, deadline;
      int ac = (int)((c = ctl) >> AC_SHIFT) + (config & SMASK);
      if ((ac <= 0 && tryTerminate(false, false)) ||
          (runState & STOP) != 0)           // pool terminating
        return false;
      if (ac <= 0 && ss == (int)c) {        // is last waiter
        prevctl = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & pred);
        int t = (short)(c >>> TC_SHIFT);  // shrink excess spares
        if (t > 2 && U.compareAndSwapLong(this, CTL, c, prevctl))
          return false;                 // else use timed wait
        parkTime = IDLE_TIMEOUT * ((t >= 0) ? 1 : 1 - t);
        deadline = System.nanoTime() + parkTime - TIMEOUT_SLOP;
        prevctl = parkTime = deadline = 0L;
      Thread wt = Thread.currentThread();
      U.putObject(wt, PARKBLOCKER, this);   // emulate LockSupport
      w.parker = wt;
      if (w.scanState < 0 && ctl == c)      // recheck before park
        U.park(false, parkTime);
      U.putOrderedObject(w, QPARKER, null);
      U.putObject(wt, PARKBLOCKER, null);
      if (w.scanState >= 0)
      if (parkTime != 0L && ctl == c &&
          deadline - System.nanoTime() <= 0L &&
          U.compareAndSwapLong(this, CTL, c, prevctl))
        return false;                     // shrink pool
  return true;

到这里,ForkJoinPool 的完整流程算是有个基本了解了,但是我们前面讲的这些内容都是从 submission task 作为切入点的。刚刚聊到的 compute 方法,我们按照分治算法范式写了自己的逻辑,具体请回看文中开头的demo,很关键的一点是,我们在 compute 中调用了 fork 方法,这就给我们了解 worker task 的机会了,继续来看 fork 方法


Fork 方法的逻辑很简单,如果当前线程是 ForkJoinWorkerThread 类型,也就是说已经通过上文注册的 Worker,那么直接调用 push 方法将 task 放到当前线程拥有的 WorkQueue 中,否则就再调用 externalPush 重走我们已上说的所有逻辑(你敢再走一遍吗?)

public final ForkJoinTask<V> fork() {
  Thread t;
  if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
  return this;

//push 方法很简单,这里就不再过多解释了
final void push(ForkJoinTask<?> task) {
  ForkJoinTask<?>[] a; ForkJoinPool p;
  int b = base, s = top, n;
  if ((a = array) != null) {    // ignore if queue removed
    int m = a.length - 1;     // fenced write for task visibility
    U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
    U.putOrderedInt(this, QTOP, s + 1);
    if ((n = s - b) <= 1) {
      if ((p = pool) != null)
        p.signalWork(p.workQueues, this);
    else if (n >= m)

有 fork 就有 join,继续看一下 join 方法()


join 的核心调用在 doJoin,但是看到这么多级联三元运算符,我慌了

public final V join() {
  int s;
  if ((s = doJoin() & DONE_MASK) != NORMAL)
  return getRawResult();

private int doJoin() {
  int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
  //status,task 的运行状态
  return (s = status) < 0 ? s :
  ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
    (w = (wt = (ForkJoinWorkerThread)t).workQueue).
    tryUnpush(this) && (s = doExec()) < 0 ? s :
  wt.pool.awaitJoin(w, this, 0L) :

我们将 doJoin 方法用我们最熟悉的 if/else 做个改动,是不是就豁然开朗了

private int doJoin() {
  int s;
  Thread t;
  ForkJoinWorkerThread wt;
  ForkJoinPool.WorkQueue w;

  if((s = status) < 0) { // 有结果,直接返回
    return s;
  }else {
    if((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {         
      // 如果是 ForkJoinWorkerThread Worker
      if((w = (wt = (ForkJoinWorkerThread) t).workQueue).tryUnpush(this) // 类似上面提到的 scan,但是是专项尝试从本工作队列里取出等待的任务
         // 取出了任务,就去执行它,并返回结果
         && (s = doExec()) < 0) { 
        return s;
      }else {
        // 也有可能别的线程把这个任务偷走了,那就执行内部等待方法
        return wt.pool.awaitJoin(w, this, 0L); 
    }else { 
      // 如果不是 ForkJoinWorkerThread,执行外部等待方法
      return externalAwaitDone();


其中 awaitJoin 和 externalAwaitDone 都用到了 Helper(帮助) 和 Compensating(补偿) 两种策略,这两种策略大家完全可以自行阅读了,尤其是 awaitJoin 方法,强烈推荐大家自行阅读,其中 pop 的过程在这里,这里不再展开

到这里,有关 ForkJoinPool 相关的内容就算是结束了,为了让大家有个更好的理解 fork/join 机制,我们还是画几张图解释一下

