编程语言
首页 > 编程语言> > 线程池 ThreadPoolExecutor 源码详细分析

线程池 ThreadPoolExecutor 源码详细分析

作者:互联网

1、线程池的作用

一方面当执行大量一步任务的时候线程池能够提供较好的性能,在不使用线程池的时候,每当需要执行异步的时候都是直接 new 一线程进行运行,而线程的创建和销毁都是需要开销的。使用线程池的时候,线程池里面的线程是可复用的,不会每次执行异步任务的时候都重新创建和销毁线程。

另一方面线程池提供了一种资源限制和管理的手段,比如可以限制线程的个数,动态新增线程等,每个 ThreadPoolExecutor 也保留了一些基本的统计数据,比如当前线程池完成的任务数目等。

2、ThreadPoolExecutor 继承体系

下面介绍一下线程池的拒绝策略

当线程池的任务缓存队列已经满了并且线程池中的线程数目达到 maximumPoolSize 的时候,如果还有任务到来就会采取任务拒绝策略,通常有以下四种拒绝策略:

阻塞队列

BlockingQueue

BlockingQueuejava.util.concurrent 包下,其他阻塞类都实现自 BlockingQueue 接口,BlockingQueue 提供了线程安全的队列访问方式,当向队列中插入数据的时候,如果队列已经满了,线程则会阻塞等待队列中的元素被取出后再插入;当从队列中取数据的时候,如果队列为空,则线程会阻塞等待队列中有新元素再获取。

LinkedBlockingQueue

LinkedBlockingQueue 是一个由链表实现的线程安全的有界阻塞队列,容量默认值为 Integer.MAX_VALUE,也可以自定义容量,建议指定容量大小,默认大小在添加速度大于删除速度的情况下有造成内存溢出的风险,LinkedBlockingQueue 是先进先出的方式存储元素。

ArrayBlockingQueue

ArrayBlockingQueue 是一个有边界的阻塞队列,它的内部实现是一个数组。它的容量是有限的,我们在其初始化的时候指定它的容量大小,容量大小一旦指定就不可改变。ArrayBlockingQueue 也是先进先出的方式存储数据,ArrayBlockingQueue 内部的阻塞队列通过 ReenterLockCondition 条件队列实现的,因此 ArrayBlockingQueue 中的元素存在公平访问和非公平访问的区别,对于公平访问队列,被阻塞的线程可以按照阻塞的先后顺序访问队列,即先阻塞的线程先访问队列。而非公平队列,当队列可用的时候,阻塞的线程将进入争夺访问资源的竞争中,也就是说谁先抢到谁就执行,没有固定的先后顺序。

DelayQueue

DelayQueue 是一个支持延时获取元素的无界阻塞队列,队列中的元素必须实现 Delayed 接口,在创建元素的时候可以指定延迟时间,只有到达了延迟的时间之后,才能获取到该元素。实现了 Delayed 接口必须重写两个方法,getDelay(TimeUnit)compareTo(Delayed)

PriorityQueue

PriorityQueue 是一个基于优先级堆的无界优先级队列。优先级队列的元素按照其自然顺序进行排序,或者根据构造队列的时候提供的 Comparator 进行排序,具体取决于所使用的构造方法。优先级队列不允许使用 null 元素。

PriorityQueue 需要注意的点

LinkedTransferQueue

LinkedTransferQueue 是一个由链表结构组成的无界阻塞队列,相对于其他阻塞队列,LinkedTransferQueue 采用一种预占模式。意思就是消费者线程取元素的时候,如果队列不为空,则直接取走数据,若是队列为空,那就生成一个节点(节点元素为 null)入队,然后消费者线程被等待在这个节点上,后面生产者线程入队的时候发现有一个元素为 null 的节点,生产者线程就不入队了,直接就将元素填充到该节点,并唤醒该节点等待的线程,被唤醒的消费者线程取走元素,从调用的方法返回。我们称这种节点操作为 匹配 方式。

总结

队列有界性数据结构
ArrayBlockingQueuebounded加锁arraylist
LinkedBlockingQueueoptionally-bounded加锁linkedlist
ConcurrentLinkedQueueunbounded无锁linkedlist
LinkedTransferQueueunbounded无锁linkedlist
PriorityBlockingQueueunbounded加锁heap
DelayQueueunbounded加锁heap

五种常用线程池

3、ThreadPoolExecutor 源码分析

3.1、成员属性

// 高三位:表示当前线程池运行状态,除去高三位的低位29位:表示当前线程池中所拥有的线程数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

// 表示在ctl 中,低 COUNT_BITS 位是用于存放当前线程数量的位数
// 这里为什么不直接使用29,而是使用 Integer.SIZE - 3 表示?
// 小概率情况,防止 Integer在JDK版本变更中所占的字节位数不是4个字节了
private static final int COUNT_BITS = Integer.SIZE - 3; // 29

// 低 COUNT_BITS 位 所能表达的最大数值
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits  在ctl中的高三位所代表的运行状态
// -1 负数在计算机中以补码的形式存在 1110 0000 0000 0000 0000 0000 0000 0000 转换成整数是一个负数
private static final int RUNNING    = -1 << COUNT_BITS;

// 0000 0000 0000 0000 0000 0000 0000 000
private static final int SHUTDOWN   =  0 << COUNT_BITS;

// 0010 0000 0000 0000 0000 0000 0000 0000
private static final int STOP       =  1 << COUNT_BITS;

// 0100 0000 0000 0000 0000 0000 0000 0000
private static final int TIDYING    =  2 << COUNT_BITS;

// 0110 0000 0000 0000 0000 0000 0000 0000
private static final int TERMINATED =  3 << COUNT_BITS;

// 任务队列,当线程池中的线程达到核心线程数的时候,再提交的时候就会直接提交到 workQueue
// workQueue instanceof ArrayBrokingQueue LinkedBrokingQueue 同步队列
private final BlockingQueue<Runnable> workQueue;

// 线程池的全局锁,增加worker,减少worker 的时候需要持有mainLock,修改线程池运行状态的时候也需要
private final ReentrantLock mainLock = new ReentrantLock();

// 线程池真正存放worker -> thread的地方
private final HashSet<Worker> workers = new HashSet<Worker>();

// 当外部线程调用 awaitTermination() 方法的时候,外部线程会等待当前线程池状态为 Terminated 为止
// 等待是如何实现的?就是将外部线程封装成waitNode放入到 Condition 队列中了,waitNode.thread 就是外部线程,会被park掉(处于WAITING 状态)
// 当线程池状态变为 Termination的时候,会去唤醒这些线程,通过 termination.signAll() ,唤醒之后这些线程会进入到阻塞队列,头结点会去抢占mainLock
// 抢到锁的线程会继续执行 awaitTermin() 后面的程序,这些线程最后都会正常执行
// 简单理解,termination.await() 会将线程阻塞在这里
//         termination.signAll() 会将阻塞在这里的线程依次唤醒
private final Condition termination = mainLock.newCondition();

// 记录线程池生命周期内线程数最大值
private int largestPoolSize;

// 记录线程池所完成的任务总数,当worker 退出的时候会将 worker 完成的任务累计到completedTaskCount
private long completedTaskCount;

// 创建线程的时候会使用线程工厂,当我们使用 Executors.newFix.. newCache.. 创建线程池的时候使用的是 DefaultThreadFactory
// 一般不建议使用 DefaultThreadFactory,推荐自己实现 ThreadFactory,为什么不推荐使用DefaultThreadFactory?
// 因为DefaultThreadFactory创建线程的时候给赋值给线程的名字是 namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"
// 当线程池中的某个线程出现问题的时候,并不能通过该线程名字去定位到哪个地方出现了问题,这是很浪费时间的
private volatile ThreadFactory threadFactory;

// 拒绝策略:juc包中默认提供了四种方式,默认采用 AbortPolicy,直接抛出异常的方式
private volatile RejectedExecutionHandler handler;

// 空闲线程存活时间,当allowCoreThreadTimeOut == false的时候,会维护核心线程数内的线程存活,超出时间的线程会超时
// allowCoreThreadTimeOut == true的时候,核心线程数量内的线程空闲的时候,也会被回收
private volatile long keepAliveTime;

// 控制核心线程数量内的线程是否可以被回收 true,可以,false 不可以
private volatile boolean allowCoreThreadTimeOut;

// 核心线程数量限制
private volatile int corePoolSize;

// 线程池最大线程数量限制
private volatile int maximumPoolSize;

// 缺省的拒绝策略:采用的是 AbortPolicy 直接抛出异常的方式
private static final RejectedExecutionHandler defaultHandler =
    new AbortPolicy();

构造方法

// 构造方法
public ThreadPoolExecutor(int corePoolSize, // 核心线程数量限制
                          int maximumPoolSize, // 最大线程数限制
                          // 空闲线程存活时间,当allowCoreThreadTimeOut == false的时候,会维护核心线程数内的线程存活,超出时间的线程会超时
                          // allowCoreThreadTimeOut == true的时候,核心线程数量内的线程空闲的时候,也会被回收
                          long keepAliveTime,
                          TimeUnit unit, // 时间单位 seconds nano..
                          // 任务队列,当线程池中的线程达到核心线程数的时候,再提交的时候就会直接提交到 workQueue
                          // workQueue instanceof ArrayBrokingQueue LinkedBrokingQueue 同步队列
                          BlockingQueue<Runnable> workQueue,
                          // 创建线程的时候会使用线程工厂,当我们使用 Executors.newFix.. newCache.. 创建线程池的时候使用的是 DefaultThreadFactory
                          // 一般不建议使用 DefaultThreadFactory,推荐自己实现 ThreadFactory,为什么不推荐使用DefaultThreadFactory?
                          // 因为DefaultThreadFactory创建线程的时候给赋值给线程的名字是 namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"
                          // 当线程池中的某个线程出现问题的时候,并不能通过该线程名字去定位到哪个地方出现了问题,这是很浪费时间的
                          ThreadFactory threadFactory,
                          // 四种拒绝策略
                          RejectedExecutionHandler handler) {
    // 判断参数是否越界
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    // 工作队列和线程工厂和拒绝策略都不能为空
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

3.2、成员预热方法

// Packing and unpacking ctl
// 获取当前线程池运行状态
// ~CAPACITY & c = ctl => ~0001 1111 1111 1111 1111 1111 1111 1111 & 1110 0000 0000 0000 0000 0000 0000 0011(假设是RUNNING状态)
// ~CAPACITY =>      1110 0000 0000 0000 0000 0000 0000 0011
// ctl =>            1110 0000 0000 0000 0000 0000 0000 0000
// ~CAPACITY & c =>  1110 0000 0000 0000 0000 0000 0000 0000
private static int runStateOf(int c)     { return c & ~CAPACITY; }

// 获取当前线程池线程数量
// c == ctl =>      1110 0000 0000 0000 0000 0000 0000 0111
// CAPACITY =>      0001 1111 1111 1111 1111 1111 1111 1111
// c & CAPACITY =>  0000 0000 0000 0000 0000 0000 0000 0111
private static int workerCountOf(int c)  { return c & CAPACITY; }

// 用在重置当前线程池 ctl 值的时候会用到
// rs:代表线程池状态  wc:代表当前线程池 worker(线程)数量
// rs:     1110 0000 0000 0000 0000 0000 0000 0000 (假设位RUNNING状态)
// wc:     0000 0000 0000 0000 0000 0000 0000 0111
// rs | wc:1110 0000 0000 0000 0000 0000 0000 0111 == 重置为ctl的值
private static int ctlOf(int rs, int wc) { return rs | wc; }

// 比较当前线程池ctl所表示的状态,是否小于某个状态s
// c = 1110 0000 0000 0000 0000 0000 0000 0111 < 0000 0000 0000 0000 0000 0000 0000 0000 == true
// 所有情况下,RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED
private static boolean runStateLessThan(int c, int s) {
    return c < s;
}

// 比较当前线程池 ctl 所表示的状态,是否大于等于某个状态
private static boolean runStateAtLeast(int c, int s) {
    return c >= s;
}

// 小于SHUTDOWN 一定是 RUNNING 状态 SGUTDOWN == 0
private static boolean isRunning(int c) {
    return c < SHUTDOWN;
}

// 使用 CAS 的方式让 ctl 的值 + 1,成功返回true,失败返回false
private boolean compareAndIncrementWorkerCount(int expect) {
    return ctl.compareAndSet(expect, expect + 1);
}

// 使用 CAS 的方式让 ctl 的值 -1,成功返回true,失败返回false
private boolean compareAndDecrementWorkerCount(int expect) {
    return ctl.compareAndSet(expect, expect - 1);
}

// 将ctl 的值减-1,这个方法一定成功
private void decrementWorkerCount() {
    // 这里会一直进行重试,直到成功为止
    do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}

3.3、Worker 内部类分析

private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{
    // Worker 采用了AQS 的独占模式
    // 独占模式:两个重要属性 state 和 ExclusiveOwnerThread
    // state:0的时候,表示未被占用,>0 的时候表示被占用,<0 的时候,表示初始状态,这种情况下不能被抢锁
    // ExclusiveOwnerThread:表示独占锁线程
    private static final long serialVersionUID = 6138294804551838833L;

    /** Thread this worker is running in.  Null if factory fails. */
    // Worker 内部封装的工作线程
    final Thread thread;
    /** Initial task to run.  Possibly null. */
    // 假设 firstTask 不为空,那么当 worker 启动后(内部的线程启动)会优先执行 firstTask,当执行完firstTask后,会当workQueue中去获取下一个任务
    // 初始化任务,只在worker 第一次执行任务的时候执行,之后都是从workqueue中获取任务执行
    Runnable firstTask;
    /** Per-thread task counter */
    // 记录当前所完成的任务数量
    volatile long completedTasks;

    /**
     * Creates with given first task and thread from ThreadFactory.
     * @param firstTask the first task (null if none)
     */
    // firstTask 可以为 null,为null启动后会到 queue中获取
    Worker(Runnable firstTask) {
        // 设置 AQS 独占模式为初始化状态,不能被抢占锁
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        // 使用线程工厂创建了一个线程,并且将当前 worker 指定为 Runnable,也就是说当 thread启动的时候,会以worker.run()为入口
        this.thread = getThreadFactory().newThread(this);
    }

    /** Delegates main run loop to outer runWorker  */
    // 当worker 启动的时候,会执行run()方法
    public void run() {
        // ThreadPoolExector -> runWork(Worker w) 这个是核心方法,等后面分析worker启动后逻辑的时候会以这里为切入点
        runWorker(this);
    }

    // Lock methods
    //
    // The value 0 represents the unlocked state.
    // The value 1 represents the locked state.
    // 判断当前worker的独占锁是否被独占
    // 0 表示未被占用
    // 1 表示已经占用
    protected boolean isHeldExclusively() {
        return getState() != 0;
    }

    // 尝试去占用当前 worker 的独占锁
    //
    protected boolean tryAcquire(int unused) {
        // 使用 CAS 修改 AQS 中的state,期望值为0(0的时候表示未被占用),修改成功表示当前线程修改成功
        // 那么设置 ExclusiveOwnerThread 为当前线程
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }

    // 尝试释放当前 worker 的独占锁
    // 外部不会直接调用该方法,这个方法是 AQS的,外部调用 unlock的时候,unlock -> AQS.release -> tryRelease
    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }

    // 加锁,加锁失败的时候,会阻塞当前线程,直到获取到锁位置
    public void lock()        { acquire(1); }

    // 尝试去加锁,如果当前锁是未被持有状态,加锁成功后会返回true,否则不会阻塞当前线程,直接返回false
    public boolean tryLock()  { return tryAcquire(1); }

    // 一般情况下,咱们调用unlock 要保证当前线程是持有锁的
    // 特殊情况,当 worker 的 state == -1的时候,调用 unlock 表示初始化state,设置 state == 0
    // 启动worker 之前会先调用 unlock() 这个方法,会强制刷新ExclusiveOwnerThread == null 和 state == 0
    public void unlock()      { release(1); }

    // 就是返回当前 worker 的lock 是否被占用
    public boolean isLocked() { return isHeldExclusively(); }

    void interruptIfStarted() {
        Thread t;
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}

3.4、execute(Runnable command) 方法分析

// 线程池提交方法
// Runnable command:可以是普通的 Runnable 实现类,也可以是 FutureTask
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    // 获取ctl 最新值 ctl:高三位 表示线程池状态,低位表示当前线程池线程数量
    int c = ctl.get();
    // workerCountOf(c):获取当前线程池数量
    // 条件成立:表示当前线程数量小于核心线程数量,此次提交任务,直接创建一个新的worker,对应线程池中多了一个新线程
    if (workerCountOf(c) < corePoolSize) {
        // addWorker 即为创建线程的过程,会创建worker对象,并且将command作为fistTask
        // core == true:表示采用核心线程数量限制,false:表示采用 maximumPoolSize
        if (addWorker(command, true))
            // 创建成功后直接返回,addWorker 方法里面会启动新创建的worker,将firstTask执行
            return;
        // 执行到这条语句,说明addWorker 一定是失败了
        // 有几种可能?
        // 1.存在并发现象,execute 方法是有可能有多个线程同时调用的,当workerCountOf(c)<corePoolSize成立后
        // 其他线程可能也成立了,并且向线程池中创建了worker,这个时候线程池中线程数量已经达到了核心线程数,所以当前线程失败了
        // 2.当前线程池状态发生改变了,RUNNING SHUTDOWN STOP TIDYING TERMINATION
        // 当线程池状态是非 RUNNING 状态的时候,addWorker(firstWorker != null,true | false) 一定会失败
        // SHUTDOWN 状态下,也有可能创建成功,前提 firstTask == null 而且当前queue不为空,特殊情况,在addWorker方法中有说明
        c = ctl.get();
    }

    // 执行到这里有几种情况?
    // 1.当前线程数量已经达到了 corePoolSize
    // 2.addWorker 失败,并发导致
    // 条件成立:说明当前线程池处于RUNNING 状态,则尝试将 task 放入到workqueue中
    if (isRunning(c) && workQueue.offer(command)) {
        // 进入到内部的前提条件:当前线程池中的线程数量大于等于corePoolSize,并且当前线程池处于RUNNING状态,而且当前线程将command放入到workqueue成功
        // 再次获取ctl的值
        int recheck = ctl.get();
        // 条件一:! isRunning(recheck) 成立:说明你提交到队列之后,线程池状态被外部线程给修改了,比如:shutdown() shutdownNow()
        // 这种情况成立的话,说明线程池状态被外部线程给修改了,需要把刚刚提交的任务给删除掉
        // 条件二:remove(command) :有可能成功,也有可能失败
        // 成功:提交之后,线程池中的线程还未消费(处理)
        // 失败:提交之后,在shutdown() shutdownNow() 之前,就被线程池中的线程给处理了
        if (! isRunning(recheck) && remove(command))
            // 提交之后,线程状态为非RUNNING了且移除任务队列成功,走个拒绝策略
            reject(command);

        // 有几种情况会走到这里?
        // 1.当前线程池是RUNNING 状态
        // 2.线程池状态是非 RUNNING 状态,但是 remove 提交的任务失败
        // 担心当前线程池是 RUNNING 状态,但是线程池中的存活数量是0,这个时候,会很尴尬,任务没线程去跑了
        // 这里其实是一个担保机制,保证线程池在RUNNING 状态下,最起码得有一个线程在工作
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 执行到这里,有几种情况?
    // 1.workqueue.offer(command)失败,说明当前队列满了
    // 2.当前线程是非RUNNING 状态

    // 1.offer失败,需要做什么?说明当前queue满了,这个时候,如果当前线程数量尚未达到maximumPoolSize得话,尝试创建新得worker,直接执行command
    // 假设线程数量达到 maximumPoolSize 得话,这里会也会失败,也会走拒绝策略
    // 2.线程池状态为非RUNNING 状态,这个时候因为 command != null addWorker 一定是返回false
    else if (!addWorker(command, false))
        reject(command);
}

3.4、addWorker(Runnable firstTask, boolean core) 方法分析(重点)

// 返回值总结:
// true:表示worker 创建成功,且线程启动成功
// false:表示创建失败
// 1.线程池状态 rs > SHUTDOWN (STOP TIDYING TERMINATION)
// 2.rs == shutdown 但是队列中已经没有任务了 或者 当前状态是SHUTDOWN 且队列未空,但是当前任务的firstTask不为null
// 3. 当前线程池已经达到指定指标(corePoolSize / maximumPoolSize)
// 4.threadFactory 实现类创建的线程为null
// Runnable firstTask:可以为null,表示启动worker之后,worker 自动到queue中去获取任务,如果不是null,则worker优先执行 firstTask
// boolean core:表示采用的线程池的线程数限制,true:采用核心线程数限制,false:采用maximumPoolSize线程数限制
private boolean addWorker(Runnable firstTask, boolean core) {
    // 自旋操作:判断当前线程池状态是否允许创建线程
    retry:
    for (;;) {
        // 获取当前ctl值保存到c中
        int c = ctl.get();
        // 获取当前线程池运行状态保存到rs中
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        // 条件一:rs >= SHUTDOWN 成立:说明当前线程池状态不是RUNNING 状态
        // 条件二:前置条件:当前线程池状态不是RUNNING 状态 ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())
        // rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()
        // 表示:当前线程池状态是SHUTDOWN状态 && 当前提交的任务是null addWorker这个方法可能不是execute去调用的 && 当前任务队列不是空
        // 排除掉这种情况,当前线程池状态是SHUTDOWN状态,但是队列里面还有任务尚未处理,这个时候是允许添加worker的,但是不允许再次提交task
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            // 什么情况下会返回false?
            // 线程池状态 rs >= SHUTDOWN
            // rs == SHUTDOWN 但是队列中已经没有任务了 或者 rs == SHUTDOWN 且 firstTask != null
            return false;

        // 上面这些代码,就是判断当前线程池状态是否允许添加worker

        // 内部自旋操作:获取创建线程令牌的过程
        for (;;) {
            // 获取当前线程池中的线程数量,保存到wc中
            int wc = workerCountOf(c);
            // 条件一:wc >= CAPACITY 永远不成立,因为CAPACITY是一个5亿多的数字
            // 条件二:wc >= (core ? corePoolSize : maximumPoolSize)
            // core == true,判断当前线程数量是否 >= corePoolSize,会拿核心线程数量做限制
            // core == false,判断当前线程数量是否 >= maximumPoolSize,会拿最大线程数量做限制
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                // 执行到这里,说明当前已经无法添加线程了,已经达到指定限制了
                return false;

            // 条件成立:说明记录线程数量已经加1成功,相当于申请到了一块令牌
            // 条件失败:说明可能有其他线程,修改过ctl这个值了,CAS 发生了冲突
            // 可能发生过什么冲突?
            // 1.其他线程execute() 申请过令牌了,在这之前,改变了ctl的值,期望值c与内存中的ctl的值不符合,导致CAS 失败
            // 2.外部线程可能调用过 shutdown() 或者 shutdownNow() 导致线程池状态发生了变化了,ctl高三位表示线程池状态
            // 线程池状态改变后,CAS 操作也会失败
            if (compareAndIncrementWorkerCount(c))
                // 进入到这里,一定是CAS 失败,申请到令牌了,跳出外部自旋操作
                break retry;
            // CAS 失败,没有成功的申请到令牌
            // 获取最新的 ctl 值
            c = ctl.get();  // Re-read ctl
            // 判断当前线程池状态是否发生过变化,如果外部在这之前调用过shutdown 或者shurdownNow会导致线程池状态发生变化
            if (runStateOf(c) != rs)
                // 线程池状态发生变化后,直接返回到外层循环,外层循环负责判断当前线程池状态,是否允许创建线程
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    // CAS 成功,跳出外部自旋操作来到了这里

    // workerStarted:表示当前创建的worker是否已经启动 false:未启动 true:已经启动
    boolean workerStarted = false;
    // workerAdded:表示创建的worker是否添加到线程池中 false:未添加 true:已经添加
    boolean workerAdded = false;
    // w:表示后面创建worker 的一个引用
    Worker w = null;
    try {
        // 创建worker
        w = new Worker(firstTask);
        // 将新创建的worker 节点的线程赋值给t
        final Thread t = w.thread;
        // 为什么这里还要做 t != null 这个判断?
        // 为了防止ThreadFactory 实现类有bug,因为ThreadFactory 是一个接口,谁都可以实现
        // 防止程序员自己实现的ThreadFactory 实现类有bug,导致创建出来的Thread为null
        if (t != null) {
            // 将全局锁的引用保存到mainLock变量中
            final ReentrantLock mainLock = this.mainLock;
            // 持有全局锁,可能会阻塞,直到获取成功为止,同一时刻操作线程池内部相关的操作都必须持有锁
            mainLock.lock();

            // 从这里加锁之后,其他线程是无法修改线程池状态的
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                // 获取最新的线程池运行状态保存到rs中
                int rs = runStateOf(ctl.get());

                // 条件一:rs < SHUTDOWN 成立:当前线程池处于RUNNING 状态,最正常的状态
                // 条件二: 前置条件:当前线程池状态不是RUNNING 状态
                // (rs == SHUTDOWN && firstTask == null):当前状态为SHUTDOWN 状态且firstTask为null
                // 其实判断的就是SHUTDOWN 状态下的特殊情况,只不过这里不再判断队列是否为空了
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    // t.isAlive() 当前线程start 后,线程isAlive 会返回true
                    // 防止程序员在ThreadFactory实现类创建线程返回给外部之前,将线程给start了
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    // 将咱们创建的worker 添加到线程池中
                    workers.add(w);
                    // 获取最新当前线程池的线程数量
                    int s = workers.size();
                    // 条件成立:说明当前线程数量是一个新高,更新lagestPoolSize线程池中的线程最大数量
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    // 表示线程已经加入到线程池中了
                    workerAdded = true;
                }
            } finally {
                // 释放线程池全局锁
                mainLock.unlock();
            }
            // 条件成立:说明当前添加worker成功
            // 条件失败:说明线程池在lock之前,线程池状态发生了变化导致添加失败
            if (workerAdded) {
                // 成功后则将创建的worker启动
                t.start();
                // 启动标记设置为true
                workerStarted = true;
            }
        }
    } finally {
        // 条件成立:说明添加当前线程到线程池失败或者启动线程失败,需要做清理工作
        // 1.释放令牌
        // 2.将当前worker 清理出workers集合
        if (! workerStarted)
            addWorkerFailed(w);
    }
    // 返回新创建的线程是否启动
    return workerStarted;
}

addWorkerFailed(Worker w) 方法

// addWorker添加线程到线程池中失败或者线程启动失败的后续工作,两个操作
// 1.释放令牌
// 2.将当前worker 清理出workers集合
private void addWorkerFailed(Worker w) {
    final ReentrantLock mainLock = this.mainLock;
    // 持有线程池全局锁,因为操作的是线程池相关的东西
    mainLock.lock();
    try {
        // 条件成立:需要将worker 在workers 中清理除去
        if (w != null)
            workers.remove(w);
        // 线程池计数恢复-1,前面+1获取令牌了,这里因为失败,所以要-1,相当于归还令牌
        decrementWorkerCount();
        // 回头讲,shutdown shutdownNow再说
        tryTerminate();
    } finally {
        // 释放线程池全局锁
        mainLock.unlock();
    }
}

3.5、runWorker(Worker w) 方法分析

// w:就是启动worker
final void runWorker(Worker w) {
    // wt == w.thread
    Thread wt = Thread.currentThread();
    // 将初始执行的task赋值给task
    Runnable task = w.firstTask;
    // 清空当前w.firstTask的引用
    w.firstTask = null;
    // 这里为什么先调用unlock? 就是为了初始化worker state = 0 和 ExclusiveOwnerThread = null
    // 启动worker 之前会先调用 unlock() 这个方法,会强制刷新ExclusiveOwnerThread == null 和 state == 0
    w.unlock(); // allow interrupts

    // 是否是突然退出?如果为true,表示发生异常了,当前线程突然退出的,回头需要做一些处理 false:表示正常退出的
    boolean completedAbruptly = true;

    try {
        // 条件一:task != null 指的就是firstTask 是否为null 如果是null,直接执行循环体里面
        // 条件二:(task = getTask()) != null 条件成立:说明当前线程在queue中获取任务成功,getTask是会阻塞线程的方法
        // getTask()如果返回null,说明当前线程需要执行退出逻辑
        while (task != null || (task = getTask()) != null) {
            // worker 设置独占锁为当前线程
            // 为什么要设置独占锁呢?就是怕shutdown()的时候会判断当前worker状态,根据独占锁是否空闲来判断当前worker是否正在工作
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt

            // 条件一:runStateAtLeast(ctl.get(), STOP) 说明线程池状态大于等于STOP,线程池目前处于STOP/TIDYING/TERMINATION,此时线程一定要给它一个中断信号
            // 条件一成立:(runStateAtLeast(ctl.get(), STOP) && !wt.isInterrupted()
            // 上面条件如果成立,说明当前线程池状态是大于等于STOP且当前线程是未设置中断状态的,此时需要进入到if里面,给当前线程一个中断

            // 假设:(runStateAtLeast(ctl.get(), STOP) == false
            // (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))
            // Thread.interrupted() 获取当前中断状态,且设置中断位为false,连续调用两次interrupted()方法,第二次一定是返回false
            // runStateAtLeast(ctl.get(), STOP):大概率这里还是false
            // 其实它在强制刷新当前线程的中断标记为false,因为有可能上一次执行task的时候,业务代码里面将线程的中断标记位设置为了true且没有做处理
            // 所以这里一定要强制刷新一下,不会再影响到后面的task了
            // 假如:Thread.interrupted() == true && runStateAtLeast(ctl.get(), STOP) == true
            // 这种情况有可能发生嘛?
            // 有可能,因为外部线程再当前线程第一次(runStateAtLeast(ctl.get(), STOP) == false后,有机会调用shutdown shutdownNow方法将线程池状态修改
            // 这个时候也会将当前线程的中断标记位再次设置为中断状态
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();

            try {
                // 钩子方法,留给子类实现的
                beforeExecute(wt, task);
                // 表示异常情况,如果thrown不为空,表示task运行过程中发生了异常,向上层抛出了异常
                Throwable thrown = null;
                try {
                    // task可能是firstTask,也可能是普通的runnable接口实现类
                    // 如果前面是通过submit()提交的runnable/callable会被封装成FutureTask对象
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    // 钩子方法,留给子类实现的
                    afterExecute(task, thrown);
                }
            } finally {
                // 将局部变量task设置为 null
                task = null;
                // 更新worker完成任务数量
                w.completedTasks++;
                // worker 处理完一个任务后会释放独占锁,然后再次到queue中获取任务
                // 1.正常情况下会再次回到getTask()那里获取任务 while(getTask())
                // 2.task.run()内部抛出异常了
                w.unlock();
            }
        } // while
        // 什么情况下会来到这里?
        // getTask()返回null的时候,说明当前线程应该执行退出逻辑了
        completedAbruptly = false;
    } finally {
        // task.run() 内部抛出异常的时候,当上面的finally执行完以后跳到这里
        // 正常提出 completedAbruptly = false 异常退出:completedAbruptly = true
        // 这个方法以后再说
        processWorkerExit(w, completedAbruptly);
    }
}

3.6、getTask() 方法分析(重点)

// 什么情况下会返回null?
// 1.rs >= STOP成立说明:当前线程池的状态最低也是STOP状态,一定要返回null了
// 2.说明当前线程池状态为SHUTDOWN状态且任务队列已经为null,此时一定要返回null
// 3.线程池中的线程数量超过最大限制的时候,会有一部分线程返回为null
// 4.线程池中的线程数超过corePoolSize的时候,会有一部分线程超时后返回null
// 到当前的任务队列queue中去获取任务
private Runnable getTask() {
    // 表示当前线程获取任务是否超时 默认false true表示已经超时
    boolean timedOut = false; // Did the last poll() time out?

    // 自旋
    for (;;) {

        // 获取最新ctl的值保存到c中
        int c = ctl.get();
        // 获取当前线程池的运行状态
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        // 条件一:rs >= SHUTDOWN 条件成立:说明当前线程池是非RUNNING 状态,可能是SHUTDOWN/STOP/TIDYING/TERMINATION//
        // 条件二:rs >= STOP || workQueue.isEmpty()
        // 2.1:rs >= STOP成立说明:当前线程池的状态最低也是STOP状态,一定要返回null了
        // 2.2:前置条件:状态是SHUTDOWN,workqueue.isEmpty() 条件成立:说明当前线程池状态为SHUTDOWN状态且任务队列已经为null,此时一定要返回null
        // 返回null,runworker()方法就会将返回null的线程执行退出线程池的逻辑
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            // 使用CAS + 死循环的方式将ctl的值-1
            decrementWorkerCount();
            return null;
        }

        // 执行到这里,有几种情况:
        // 1.线程池是RUNNING 状态
        // 2.线程池是SHUTDOWN 状态,但是队列queue还未为空,此时可以创建线程(添加worker)

        // 获取线程池中的线程数量
        int wc = workerCountOf(c);

        // timed == true:表示当前这个线程获取task的时候是支持超时机制的,使用queue.poll(时间单位,数量)
        // 当获取task超时的情况下,下一次自旋就可能返回null了
        // timed == false:表示当前这个线程获取task的时候是不支持超时机制的,当前线程会使用 queue.take()

        // 情况一:allowCoreThreadTimeOut == true 表示核心线程数量内的空闲线程可以被回收,所有线程都是使用queue.poll()超时机制这种方式获取task
        // 情况二:allowCoreThreadTimeOut == false 表示当前线程池会维护核心数量内的线程

        // wc > corePoolSize:条件成立:当前线程池中的线程数量是大于核心线程数的,此时让所有路过这里的线程都使用poll,超时的方式去获取任务
        // 这样就可能会有一部分线程获取不到任务,返回null,然后runworker会执行线程退出逻辑
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        // 条件一:(wc > maximumPoolSize || (timed && timedOut)
        // 1.1:wc > maximumPoolSize 为什么会成立?setMaximumPoolSize() 方法,可能外部线程将线程池最大线程数设置为比初始化时的要小
        // 1.2:timed && timedOut 条件成立:前置条件,当前线程使用poll()的方式从队列queue中获取worker执行,上一次循环的时候,使用poll的方式获取任务的时候,超时了
        // 条件一为true:表示线程可以被回收,达到回收的标准,当确实需要回收的时候再回收
        // 条件二:wc > 1 || workQueue.isEmpty()
        // 2.1:wc > 1:条件成立,说明当前线程池中还有其他线程,当前线程不需要从队列中获取任务可以直接回收
        // 2.2:workQueue.isEmpty():前置条件,条件成立:wc == 1 ,说明当前任务队列已经空了,最后一个线程,也可以放心的退出
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            // 使用 CAS 机制,将ctl的值减去1,减一成功的线程返回null
            // CAS 失败?为什么会CAS 失败?
            // 1.其他线程先你一步推出了
            // 2.线程池状态发生变化了
            if (compareAndDecrementWorkerCount(c))
                return null;
            // 再次自旋的时候,timed有可能就是false了,因为当前线程CAS失败,很有可能是因为其他线程成功退出导致的,再次查询的时候
            // 检查发现,当前线程就可能属于不需要回收范围内了
            continue;
        }

        // 获取任务的逻辑
        try {
            // poll(long timeout, TimeUnit unit):从BlockingQueue取出一个队首的对象,如果在指定时间内,
            // 队列一旦有数据可取,则立即返回队列中的数据。否则直到时间超时还没有数据可取,返回失败。
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                // take 方法:获取队列中的第一个元素,如果被阻塞,则等待
                workQueue.take();

            // 条件成立:返回任务
            if (r != null)
                return r;
            // 说明当前线程超时了
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

3.7、processWorkerExit(Worker w, boolean completedAbruptly) 方法分析

// 线程退出的逻辑
// w:线程执行的worker要获取任务
// completedAbruptly:线程是否发生异常 true:表示线程发生了异常 false:表示线程没有发生异常
private void processWorkerExit(Worker w, boolean completedAbruptly) {
    // 条件成立:代表当前w这个worker 是发生异常退出的 task任务执行过程中向上抛出了异常
    // 异常退出的时候,ctl的线程数量并没有-1
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();

    // 获取线程池的全局锁引用
    final ReentrantLock mainLock = this.mainLock;
    // 加锁
    mainLock.lock();
    try {
        // 将当前worker完成的task数量,汇总到线程池的completedTaskCount
        completedTaskCount += w.completedTasks;
        // 将worker 从任务中移除
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }

    // 回头再说
    tryTerminate();

    // 获取最新的ctl值
    int c = ctl.get();
    // 条件成立:说明当前线程池状态为RUNNING 或者 SHUTDOWN 状态
    if (runStateLessThan(c, STOP)) {
        // 条件成立:说明当前线程是正常退出的
        if (!completedAbruptly) {
            // min:表示线程池最低持有的线程数量
            // allowCoreThreadTimeOut == true:说明核心线程数内的线程,也会超时被回收 min = 0
            // allowCoreThreadTimeOut == false:说明核心线程数内的线程,不会超时被回收 min = corePoolSize
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;

            // 前提状态:线程池状态为RUNNING SHUTDOWN
            // 条件一: min == 0成立
            // 条件二:! workQueue.isEmpty() 说明任务队列中还有任务,最起码留一个线程
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;

            // 条件成立:线程池中还拥有足够的线程
            // 考虑一个问题:workerCountOf(c) >= min => (0 >= 0)
            // 有可能!
            // 什么情况下,当线程池中的核心线程数是可以被回收的,会出现这种情况,这种情况下,当前线程池中的线程数会变为0
            // 下次再提交任务的时候,会再次创建线程
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }

        // 1.当前线程再执行task的时候发生异常,这里一定要创建一个新的worker顶上去
        // 2.! workQueue.isEmpty() 说明任务队列中还有任务,最起码要留一个线程,当前状态为 RUNNING | SHUTDOWN
        // 3.线程数量小于corePoolSize值,此时会创建线程,维护线程池中的线程数量在corePollSize左右
        addWorker(null, false);
    }
}

3.8、shutdown() 方法分析

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    // 获取线程池的全局锁
    mainLock.lock();
    try {
        // 判断是否有权限
        checkShutdownAccess();

        // 设置线程池的状态为 SHUTDOWN
        advanceRunState(SHUTDOWN);
        // 中断空闲线程
        interruptIdleWorkers();
        // 空方法,子类可以扩展
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        // 释放线程池全局锁
        mainLock.unlock();
    }
    // 后面再说
    tryTerminate();
}

// 中断空闲线程
private void interruptIdleWorkers() {
    interruptIdleWorkers(false);
}

// 通过自旋操作设置线程池的状态为SHUTDWON
private void advanceRunState(int targetState) {
    // 自旋
    for (;;) {
        int c = ctl.get();
        // 条件一成立:假设targetState == SHUTDOWN,说明当前线程池状态是 >= SHUTDOWN
        // 条件一不成立:假设targetState == SHUTDOWN,说明当前线程池状态是 RUNNING
        if (runStateAtLeast(c, targetState) ||
            ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
            break;
    }
}

3.9、tryTerminate() 方法分析

final void tryTerminate() {
    // 自旋操作
    for (;;) {
        // 获取最新ctl的值
        int c = ctl.get();
        // 条件一:isRunning(c) 成立:直接返回就行了,线程池很正常
        // 条件二:runStateAtLeast(c, TIDYING) 说明已经有其他线程在执行 TIDYING 转到 TERMIATION 状态了,当前线程直接回去
        // 条件三:runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()
        // SHUTDOWN 特殊情况,如果是这种情况,直接回去,得等队列中得任务处理完后再转化状态
        if (isRunning(c) ||
            runStateAtLeast(c, TIDYING) ||
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;

        // 什么情况会执行到这里?
        // 1.线程池状态为 STOP 得时候
        // 2.线程池状态为 SHUTDWON并且队列已经空了
        // 条件成立:当前线程池中得线程数量 > 0
        if (workerCountOf(c) != 0) { // Eligible to terminate
            // 中断一个空闲线程
            // 空闲线程在哪空闲呢?queue.take() 或者 queue.poll()
            // 1.唤醒后的线程会在getTask方法返回null
            // 2.执行退出逻辑得时候,会再次调用 tryTerminate 方法,唤醒下一个空闲线程
            // 3.因为线程池状态是(线程池状态为 STOP 得时候 | 线程池状态为 SHUTDWON并且队列已经空了) 最终调用addWorker得时候会失败得
            // 最终空闲线程都会在这里退出,非空闲线程当执行完当前task得时候,也会调用tryTerminate方法,有可能会走到这里
            interruptIdleWorkers(ONLY_ONE);
            return;
        }

        // 执行到这里得线程是谁?
        // workerCountOf(c) == 0得时候,会来到这里
        // 最后一个退出得线程,咱们知道,在(线程池状态 == STOP | 线程池状态为 SHUTDWON并且队列已经空了)
        // 线程唤醒后,都会执行退出逻辑,退出过程中会先将 workerCount 计数 -1 => ctl中得计数-1
        // 调用 tryTerminate方法之前,已经减过了,所以 0 得时候,表示这是最后一个退出得线程了
        final ReentrantLock mainLock = this.mainLock;
        // 获取全局锁
        mainLock.lock();
        try {
            // 设置线程池状态为 TIDYING
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    // 调用钩子方法
                    terminated();
                } finally {
                    // 设置线程池状态为 TERMINATED 状态
                    ctl.set(ctlOf(TERMINATED, 0));
                    // 唤醒调用 awaitTermination() 方法得线程
                    termination.signalAll();
                }
                return;
            }
        } finally {
            // 释放全局锁
            mainLock.unlock();
        }
        // else retry on failed CAS
    }
}

3.10、shutdownNow() 方法分析

public List<Runnable> shutdownNow() {
    // 返回值引用
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    // 获取线程池的全局锁
    mainLock.lock();
    try {
        // 校验权限
        checkShutdownAccess();
        // 设置线程池状态为STOP
        advanceRunState(STOP);
        // 中断线程池中的所有线程
        interruptWorkers();
        // 导出未处理的task
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    // 返回当前任务队列中未处理的任务
    return tasks;
}

至此,线程池 ThreadPoolExecutor 类源码就告一段落了,下面会开始更新 AQS 部分的内容,如果有错误,请指正!

标签:0000,worker,源码,线程,当前,ctl,null,ThreadPoolExecutor
来源: https://blog.csdn.net/weixin_46410481/article/details/122864251