线程池 ThreadPoolExecutor 源码详细分析
作者:互联网
1、线程池的作用
一方面当执行大量一步任务的时候线程池能够提供较好的性能,在不使用线程池的时候,每当需要执行异步的时候都是直接 new 一线程进行运行,而线程的创建和销毁都是需要开销的。使用线程池的时候,线程池里面的线程是可复用的,不会每次执行异步任务的时候都重新创建和销毁线程。
另一方面线程池提供了一种资源限制和管理的手段,比如可以限制线程的个数,动态新增线程等,每个 ThreadPoolExecutor
也保留了一些基本的统计数据,比如当前线程池完成的任务数目等。
2、ThreadPoolExecutor 继承体系
- Executor:只定义了一个方法
execute()
,用于执行提交的任务 - ExecutorService:定义了一些线程池管理、任务提交、线程检测的方法
- AbstractExecutorService:提供了
ExecutorService
接口执行方法的默认实现,用于同一处理Callable
任务和Runnable
任务。
下面介绍一下线程池的拒绝策略
当线程池的任务缓存队列已经满了并且线程池中的线程数目达到 maximumPoolSize
的时候,如果还有任务到来就会采取任务拒绝策略,通常有以下四种拒绝策略:
- AbortPolicy:直接抛出一个
RejectedExecutionException
的异常,让你感知到任务被拒绝了,下面你可以根据业务逻辑选择重试或者放弃提交。 - DisCardPolicy:当新任务被提交后直接被丢弃,也不会给你任何的通知,可能造成数据丢失的现象。
- DisCardOldestPolicy:如果线程池没有关闭且没有能力执行,则会丢弃任务队列中的头结点,然后重新提交被拒绝的任务。通常是存活时间最长的任务,这种策略与第二种不同之处在于它丢弃的任务不是最新提交的,而是队列中存活时间最长的,这样就可以腾出资源给新提交的任务,但是同理也存在一定的数据丢失风险。
- CallerRunsPolicy:当有新任务提交的时候,如果线程池没被关闭且没有能力执行,则把这个任务交于提交任务的线程执行,也就是谁提交任务,谁就负责执行任务。这样做主要有两点好处:
- 提交的新任务不会被丢弃,这样也就不会造成业务损失。
- 由于谁提交任务谁就要负责执行,这样提交任务的线程就得负责执行任务,而执行任务又是比较耗时得,在这段期间,提交任务得线程被占用,也就不会提交新的任务,减缓了任务提交得速度,相当于是一个负反馈。在此期间,线程池中的线程也可以充分利用这段时间来执行掉一部分任务,腾出一定的空间,相当于是给了线程池一定的缓冲期。
阻塞队列
BlockingQueue
BlockingQueue 在 java.util.concurrent 包下,其他阻塞类都实现自 BlockingQueue 接口,BlockingQueue 提供了线程安全的队列访问方式,当向队列中插入数据的时候,如果队列已经满了,线程则会阻塞等待队列中的元素被取出后再插入;当从队列中取数据的时候,如果队列为空,则线程会阻塞等待队列中有新元素再获取。
LinkedBlockingQueue
LinkedBlockingQueue 是一个由链表实现的线程安全的有界阻塞队列,容量默认值为 Integer.MAX_VALUE,也可以自定义容量,建议指定容量大小,默认大小在添加速度大于删除速度的情况下有造成内存溢出的风险,LinkedBlockingQueue 是先进先出的方式存储元素。
ArrayBlockingQueue
ArrayBlockingQueue 是一个有边界的阻塞队列,它的内部实现是一个数组。它的容量是有限的,我们在其初始化的时候指定它的容量大小,容量大小一旦指定就不可改变。ArrayBlockingQueue 也是先进先出的方式存储数据,ArrayBlockingQueue 内部的阻塞队列通过 ReenterLock 和 Condition 条件队列实现的,因此 ArrayBlockingQueue 中的元素存在公平访问和非公平访问的区别,对于公平访问队列,被阻塞的线程可以按照阻塞的先后顺序访问队列,即先阻塞的线程先访问队列。而非公平队列,当队列可用的时候,阻塞的线程将进入争夺访问资源的竞争中,也就是说谁先抢到谁就执行,没有固定的先后顺序。
DelayQueue
DelayQueue 是一个支持延时获取元素的无界阻塞队列,队列中的元素必须实现 Delayed 接口,在创建元素的时候可以指定延迟时间,只有到达了延迟的时间之后,才能获取到该元素。实现了 Delayed 接口必须重写两个方法,getDelay(TimeUnit) 和 compareTo(Delayed)。
PriorityQueue
PriorityQueue 是一个基于优先级堆的无界优先级队列
。优先级队列的元素按照其自然顺序进行排序,或者根据构造队列的时候提供的 Comparator 进行排序,具体取决于所使用的构造方法。优先级队列不允许使用 null 元素。
PriorityQueue 需要注意的点:
- PriorityQueue 是非线程安全的,在多线程情况下可使用
PriorityBlockingQueue
类替代。 - PriorityQueue 不允许插入 null 元素。
LinkedTransferQueue
LinkedTransferQueue 是一个由链表结构组成的无界阻塞队列,相对于其他阻塞队列,LinkedTransferQueue 采用一种预占模式。意思就是消费者线程取元素的时候,如果队列不为空,则直接取走数据,若是队列为空,那就生成一个节点(节点元素为 null)入队,然后消费者线程被等待在这个节点上,后面生产者线程入队的时候发现有一个元素为 null 的节点,生产者线程就不入队了,直接就将元素填充到该节点,并唤醒该节点等待的线程,被唤醒的消费者线程取走元素,从调用的方法返回。我们称这种节点操作为 匹配 方式。
总结
队列 | 有界性 | 锁 | 数据结构 |
---|---|---|---|
ArrayBlockingQueue | bounded | 加锁 | arraylist |
LinkedBlockingQueue | optionally-bounded | 加锁 | linkedlist |
ConcurrentLinkedQueue | unbounded | 无锁 | linkedlist |
LinkedTransferQueue | unbounded | 无锁 | linkedlist |
PriorityBlockingQueue | unbounded | 加锁 | heap |
DelayQueue | unbounded | 加锁 | heap |
五种常用线程池
- newCachedThreadPool:可缓存的线程池
- newCachedThreadPool 用于创建一个可缓存的线程池,之所以叫可缓存线程池,是因为它在创建新线程的时候如果有可重用的线程,则重用它们,否则创建一个新线程并将其添加到线程池中。
- 在线程池的 keepAliveTime 时间超过默认的 60秒后,该线程会被终止并从缓存中移除,因此在没有线程任务运行的时候,newCachedThreadPool 将不会占用系统的线程资源
- newFixedThreadPool:固定大小的线程池
- newFixedThreadPool 可用于创建一个固定的线程数量的线程池
- 如果任务数量大于等于指定线程池中线程的数量,则新提交的任务将在阻塞队列中排队,直到有可用的线程资源。
- newScheduledThreadPool:可做任务调度的线程池
- newScheduledThreadPool 用于创建可定时调度的线程池,可设置在给定延迟时间后执行或者定期执行某个线程任务。
- newSingleThreadPool :单个线程的线程池
- newSingleThreadPool:创建的线程池会确保池中永远有且只有一个可用的线程
- 在该线程停止或者发生异常的时候,newSingleThreadPool 线程池会启动一个新的线程代替该线程继续执行任务
- newWorkStealingPool:足够大小的线程池
- newWorkStealingPool 用于创建持有足够多线程的线程池来达到快速运算的目的。
- 在内部通过使用多个队列来减少各个线程调度产生的竞争
- 足够的线程指的是 JDK 根据当前线程的运行需求向操作系统中申请足够多的线程,以保障线程的快速执行
3、ThreadPoolExecutor 源码分析
3.1、成员属性
// 高三位:表示当前线程池运行状态,除去高三位的低位29位:表示当前线程池中所拥有的线程数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 表示在ctl 中,低 COUNT_BITS 位是用于存放当前线程数量的位数
// 这里为什么不直接使用29,而是使用 Integer.SIZE - 3 表示?
// 小概率情况,防止 Integer在JDK版本变更中所占的字节位数不是4个字节了
private static final int COUNT_BITS = Integer.SIZE - 3; // 29
// 低 COUNT_BITS 位 所能表达的最大数值
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits 在ctl中的高三位所代表的运行状态
// -1 负数在计算机中以补码的形式存在 1110 0000 0000 0000 0000 0000 0000 0000 转换成整数是一个负数
private static final int RUNNING = -1 << COUNT_BITS;
// 0000 0000 0000 0000 0000 0000 0000 000
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 0010 0000 0000 0000 0000 0000 0000 0000
private static final int STOP = 1 << COUNT_BITS;
// 0100 0000 0000 0000 0000 0000 0000 0000
private static final int TIDYING = 2 << COUNT_BITS;
// 0110 0000 0000 0000 0000 0000 0000 0000
private static final int TERMINATED = 3 << COUNT_BITS;
// 任务队列,当线程池中的线程达到核心线程数的时候,再提交的时候就会直接提交到 workQueue
// workQueue instanceof ArrayBrokingQueue LinkedBrokingQueue 同步队列
private final BlockingQueue<Runnable> workQueue;
// 线程池的全局锁,增加worker,减少worker 的时候需要持有mainLock,修改线程池运行状态的时候也需要
private final ReentrantLock mainLock = new ReentrantLock();
// 线程池真正存放worker -> thread的地方
private final HashSet<Worker> workers = new HashSet<Worker>();
// 当外部线程调用 awaitTermination() 方法的时候,外部线程会等待当前线程池状态为 Terminated 为止
// 等待是如何实现的?就是将外部线程封装成waitNode放入到 Condition 队列中了,waitNode.thread 就是外部线程,会被park掉(处于WAITING 状态)
// 当线程池状态变为 Termination的时候,会去唤醒这些线程,通过 termination.signAll() ,唤醒之后这些线程会进入到阻塞队列,头结点会去抢占mainLock
// 抢到锁的线程会继续执行 awaitTermin() 后面的程序,这些线程最后都会正常执行
// 简单理解,termination.await() 会将线程阻塞在这里
// termination.signAll() 会将阻塞在这里的线程依次唤醒
private final Condition termination = mainLock.newCondition();
// 记录线程池生命周期内线程数最大值
private int largestPoolSize;
// 记录线程池所完成的任务总数,当worker 退出的时候会将 worker 完成的任务累计到completedTaskCount
private long completedTaskCount;
// 创建线程的时候会使用线程工厂,当我们使用 Executors.newFix.. newCache.. 创建线程池的时候使用的是 DefaultThreadFactory
// 一般不建议使用 DefaultThreadFactory,推荐自己实现 ThreadFactory,为什么不推荐使用DefaultThreadFactory?
// 因为DefaultThreadFactory创建线程的时候给赋值给线程的名字是 namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"
// 当线程池中的某个线程出现问题的时候,并不能通过该线程名字去定位到哪个地方出现了问题,这是很浪费时间的
private volatile ThreadFactory threadFactory;
// 拒绝策略:juc包中默认提供了四种方式,默认采用 AbortPolicy,直接抛出异常的方式
private volatile RejectedExecutionHandler handler;
// 空闲线程存活时间,当allowCoreThreadTimeOut == false的时候,会维护核心线程数内的线程存活,超出时间的线程会超时
// allowCoreThreadTimeOut == true的时候,核心线程数量内的线程空闲的时候,也会被回收
private volatile long keepAliveTime;
// 控制核心线程数量内的线程是否可以被回收 true,可以,false 不可以
private volatile boolean allowCoreThreadTimeOut;
// 核心线程数量限制
private volatile int corePoolSize;
// 线程池最大线程数量限制
private volatile int maximumPoolSize;
// 缺省的拒绝策略:采用的是 AbortPolicy 直接抛出异常的方式
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
构造方法
// 构造方法
public ThreadPoolExecutor(int corePoolSize, // 核心线程数量限制
int maximumPoolSize, // 最大线程数限制
// 空闲线程存活时间,当allowCoreThreadTimeOut == false的时候,会维护核心线程数内的线程存活,超出时间的线程会超时
// allowCoreThreadTimeOut == true的时候,核心线程数量内的线程空闲的时候,也会被回收
long keepAliveTime,
TimeUnit unit, // 时间单位 seconds nano..
// 任务队列,当线程池中的线程达到核心线程数的时候,再提交的时候就会直接提交到 workQueue
// workQueue instanceof ArrayBrokingQueue LinkedBrokingQueue 同步队列
BlockingQueue<Runnable> workQueue,
// 创建线程的时候会使用线程工厂,当我们使用 Executors.newFix.. newCache.. 创建线程池的时候使用的是 DefaultThreadFactory
// 一般不建议使用 DefaultThreadFactory,推荐自己实现 ThreadFactory,为什么不推荐使用DefaultThreadFactory?
// 因为DefaultThreadFactory创建线程的时候给赋值给线程的名字是 namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"
// 当线程池中的某个线程出现问题的时候,并不能通过该线程名字去定位到哪个地方出现了问题,这是很浪费时间的
ThreadFactory threadFactory,
// 四种拒绝策略
RejectedExecutionHandler handler) {
// 判断参数是否越界
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
// 工作队列和线程工厂和拒绝策略都不能为空
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
3.2、成员预热方法
// Packing and unpacking ctl
// 获取当前线程池运行状态
// ~CAPACITY & c = ctl => ~0001 1111 1111 1111 1111 1111 1111 1111 & 1110 0000 0000 0000 0000 0000 0000 0011(假设是RUNNING状态)
// ~CAPACITY => 1110 0000 0000 0000 0000 0000 0000 0011
// ctl => 1110 0000 0000 0000 0000 0000 0000 0000
// ~CAPACITY & c => 1110 0000 0000 0000 0000 0000 0000 0000
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 获取当前线程池线程数量
// c == ctl => 1110 0000 0000 0000 0000 0000 0000 0111
// CAPACITY => 0001 1111 1111 1111 1111 1111 1111 1111
// c & CAPACITY => 0000 0000 0000 0000 0000 0000 0000 0111
private static int workerCountOf(int c) { return c & CAPACITY; }
// 用在重置当前线程池 ctl 值的时候会用到
// rs:代表线程池状态 wc:代表当前线程池 worker(线程)数量
// rs: 1110 0000 0000 0000 0000 0000 0000 0000 (假设位RUNNING状态)
// wc: 0000 0000 0000 0000 0000 0000 0000 0111
// rs | wc:1110 0000 0000 0000 0000 0000 0000 0111 == 重置为ctl的值
private static int ctlOf(int rs, int wc) { return rs | wc; }
// 比较当前线程池ctl所表示的状态,是否小于某个状态s
// c = 1110 0000 0000 0000 0000 0000 0000 0111 < 0000 0000 0000 0000 0000 0000 0000 0000 == true
// 所有情况下,RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
// 比较当前线程池 ctl 所表示的状态,是否大于等于某个状态
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
// 小于SHUTDOWN 一定是 RUNNING 状态 SGUTDOWN == 0
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
// 使用 CAS 的方式让 ctl 的值 + 1,成功返回true,失败返回false
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
// 使用 CAS 的方式让 ctl 的值 -1,成功返回true,失败返回false
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}
// 将ctl 的值减-1,这个方法一定成功
private void decrementWorkerCount() {
// 这里会一直进行重试,直到成功为止
do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}
3.3、Worker 内部类分析
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
// Worker 采用了AQS 的独占模式
// 独占模式:两个重要属性 state 和 ExclusiveOwnerThread
// state:0的时候,表示未被占用,>0 的时候表示被占用,<0 的时候,表示初始状态,这种情况下不能被抢锁
// ExclusiveOwnerThread:表示独占锁线程
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
// Worker 内部封装的工作线程
final Thread thread;
/** Initial task to run. Possibly null. */
// 假设 firstTask 不为空,那么当 worker 启动后(内部的线程启动)会优先执行 firstTask,当执行完firstTask后,会当workQueue中去获取下一个任务
// 初始化任务,只在worker 第一次执行任务的时候执行,之后都是从workqueue中获取任务执行
Runnable firstTask;
/** Per-thread task counter */
// 记录当前所完成的任务数量
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
// firstTask 可以为 null,为null启动后会到 queue中获取
Worker(Runnable firstTask) {
// 设置 AQS 独占模式为初始化状态,不能被抢占锁
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
// 使用线程工厂创建了一个线程,并且将当前 worker 指定为 Runnable,也就是说当 thread启动的时候,会以worker.run()为入口
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
// 当worker 启动的时候,会执行run()方法
public void run() {
// ThreadPoolExector -> runWork(Worker w) 这个是核心方法,等后面分析worker启动后逻辑的时候会以这里为切入点
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
// 判断当前worker的独占锁是否被独占
// 0 表示未被占用
// 1 表示已经占用
protected boolean isHeldExclusively() {
return getState() != 0;
}
// 尝试去占用当前 worker 的独占锁
//
protected boolean tryAcquire(int unused) {
// 使用 CAS 修改 AQS 中的state,期望值为0(0的时候表示未被占用),修改成功表示当前线程修改成功
// 那么设置 ExclusiveOwnerThread 为当前线程
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// 尝试释放当前 worker 的独占锁
// 外部不会直接调用该方法,这个方法是 AQS的,外部调用 unlock的时候,unlock -> AQS.release -> tryRelease
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
// 加锁,加锁失败的时候,会阻塞当前线程,直到获取到锁位置
public void lock() { acquire(1); }
// 尝试去加锁,如果当前锁是未被持有状态,加锁成功后会返回true,否则不会阻塞当前线程,直接返回false
public boolean tryLock() { return tryAcquire(1); }
// 一般情况下,咱们调用unlock 要保证当前线程是持有锁的
// 特殊情况,当 worker 的 state == -1的时候,调用 unlock 表示初始化state,设置 state == 0
// 启动worker 之前会先调用 unlock() 这个方法,会强制刷新ExclusiveOwnerThread == null 和 state == 0
public void unlock() { release(1); }
// 就是返回当前 worker 的lock 是否被占用
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
3.4、execute(Runnable command) 方法分析
// 线程池提交方法
// Runnable command:可以是普通的 Runnable 实现类,也可以是 FutureTask
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// 获取ctl 最新值 ctl:高三位 表示线程池状态,低位表示当前线程池线程数量
int c = ctl.get();
// workerCountOf(c):获取当前线程池数量
// 条件成立:表示当前线程数量小于核心线程数量,此次提交任务,直接创建一个新的worker,对应线程池中多了一个新线程
if (workerCountOf(c) < corePoolSize) {
// addWorker 即为创建线程的过程,会创建worker对象,并且将command作为fistTask
// core == true:表示采用核心线程数量限制,false:表示采用 maximumPoolSize
if (addWorker(command, true))
// 创建成功后直接返回,addWorker 方法里面会启动新创建的worker,将firstTask执行
return;
// 执行到这条语句,说明addWorker 一定是失败了
// 有几种可能?
// 1.存在并发现象,execute 方法是有可能有多个线程同时调用的,当workerCountOf(c)<corePoolSize成立后
// 其他线程可能也成立了,并且向线程池中创建了worker,这个时候线程池中线程数量已经达到了核心线程数,所以当前线程失败了
// 2.当前线程池状态发生改变了,RUNNING SHUTDOWN STOP TIDYING TERMINATION
// 当线程池状态是非 RUNNING 状态的时候,addWorker(firstWorker != null,true | false) 一定会失败
// SHUTDOWN 状态下,也有可能创建成功,前提 firstTask == null 而且当前queue不为空,特殊情况,在addWorker方法中有说明
c = ctl.get();
}
// 执行到这里有几种情况?
// 1.当前线程数量已经达到了 corePoolSize
// 2.addWorker 失败,并发导致
// 条件成立:说明当前线程池处于RUNNING 状态,则尝试将 task 放入到workqueue中
if (isRunning(c) && workQueue.offer(command)) {
// 进入到内部的前提条件:当前线程池中的线程数量大于等于corePoolSize,并且当前线程池处于RUNNING状态,而且当前线程将command放入到workqueue成功
// 再次获取ctl的值
int recheck = ctl.get();
// 条件一:! isRunning(recheck) 成立:说明你提交到队列之后,线程池状态被外部线程给修改了,比如:shutdown() shutdownNow()
// 这种情况成立的话,说明线程池状态被外部线程给修改了,需要把刚刚提交的任务给删除掉
// 条件二:remove(command) :有可能成功,也有可能失败
// 成功:提交之后,线程池中的线程还未消费(处理)
// 失败:提交之后,在shutdown() shutdownNow() 之前,就被线程池中的线程给处理了
if (! isRunning(recheck) && remove(command))
// 提交之后,线程状态为非RUNNING了且移除任务队列成功,走个拒绝策略
reject(command);
// 有几种情况会走到这里?
// 1.当前线程池是RUNNING 状态
// 2.线程池状态是非 RUNNING 状态,但是 remove 提交的任务失败
// 担心当前线程池是 RUNNING 状态,但是线程池中的存活数量是0,这个时候,会很尴尬,任务没线程去跑了
// 这里其实是一个担保机制,保证线程池在RUNNING 状态下,最起码得有一个线程在工作
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 执行到这里,有几种情况?
// 1.workqueue.offer(command)失败,说明当前队列满了
// 2.当前线程是非RUNNING 状态
// 1.offer失败,需要做什么?说明当前queue满了,这个时候,如果当前线程数量尚未达到maximumPoolSize得话,尝试创建新得worker,直接执行command
// 假设线程数量达到 maximumPoolSize 得话,这里会也会失败,也会走拒绝策略
// 2.线程池状态为非RUNNING 状态,这个时候因为 command != null addWorker 一定是返回false
else if (!addWorker(command, false))
reject(command);
}
3.4、addWorker(Runnable firstTask, boolean core) 方法分析(重点)
// 返回值总结:
// true:表示worker 创建成功,且线程启动成功
// false:表示创建失败
// 1.线程池状态 rs > SHUTDOWN (STOP TIDYING TERMINATION)
// 2.rs == shutdown 但是队列中已经没有任务了 或者 当前状态是SHUTDOWN 且队列未空,但是当前任务的firstTask不为null
// 3. 当前线程池已经达到指定指标(corePoolSize / maximumPoolSize)
// 4.threadFactory 实现类创建的线程为null
// Runnable firstTask:可以为null,表示启动worker之后,worker 自动到queue中去获取任务,如果不是null,则worker优先执行 firstTask
// boolean core:表示采用的线程池的线程数限制,true:采用核心线程数限制,false:采用maximumPoolSize线程数限制
private boolean addWorker(Runnable firstTask, boolean core) {
// 自旋操作:判断当前线程池状态是否允许创建线程
retry:
for (;;) {
// 获取当前ctl值保存到c中
int c = ctl.get();
// 获取当前线程池运行状态保存到rs中
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// 条件一:rs >= SHUTDOWN 成立:说明当前线程池状态不是RUNNING 状态
// 条件二:前置条件:当前线程池状态不是RUNNING 状态 ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())
// rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()
// 表示:当前线程池状态是SHUTDOWN状态 && 当前提交的任务是null addWorker这个方法可能不是execute去调用的 && 当前任务队列不是空
// 排除掉这种情况,当前线程池状态是SHUTDOWN状态,但是队列里面还有任务尚未处理,这个时候是允许添加worker的,但是不允许再次提交task
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
// 什么情况下会返回false?
// 线程池状态 rs >= SHUTDOWN
// rs == SHUTDOWN 但是队列中已经没有任务了 或者 rs == SHUTDOWN 且 firstTask != null
return false;
// 上面这些代码,就是判断当前线程池状态是否允许添加worker
// 内部自旋操作:获取创建线程令牌的过程
for (;;) {
// 获取当前线程池中的线程数量,保存到wc中
int wc = workerCountOf(c);
// 条件一:wc >= CAPACITY 永远不成立,因为CAPACITY是一个5亿多的数字
// 条件二:wc >= (core ? corePoolSize : maximumPoolSize)
// core == true,判断当前线程数量是否 >= corePoolSize,会拿核心线程数量做限制
// core == false,判断当前线程数量是否 >= maximumPoolSize,会拿最大线程数量做限制
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
// 执行到这里,说明当前已经无法添加线程了,已经达到指定限制了
return false;
// 条件成立:说明记录线程数量已经加1成功,相当于申请到了一块令牌
// 条件失败:说明可能有其他线程,修改过ctl这个值了,CAS 发生了冲突
// 可能发生过什么冲突?
// 1.其他线程execute() 申请过令牌了,在这之前,改变了ctl的值,期望值c与内存中的ctl的值不符合,导致CAS 失败
// 2.外部线程可能调用过 shutdown() 或者 shutdownNow() 导致线程池状态发生了变化了,ctl高三位表示线程池状态
// 线程池状态改变后,CAS 操作也会失败
if (compareAndIncrementWorkerCount(c))
// 进入到这里,一定是CAS 失败,申请到令牌了,跳出外部自旋操作
break retry;
// CAS 失败,没有成功的申请到令牌
// 获取最新的 ctl 值
c = ctl.get(); // Re-read ctl
// 判断当前线程池状态是否发生过变化,如果外部在这之前调用过shutdown 或者shurdownNow会导致线程池状态发生变化
if (runStateOf(c) != rs)
// 线程池状态发生变化后,直接返回到外层循环,外层循环负责判断当前线程池状态,是否允许创建线程
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// CAS 成功,跳出外部自旋操作来到了这里
// workerStarted:表示当前创建的worker是否已经启动 false:未启动 true:已经启动
boolean workerStarted = false;
// workerAdded:表示创建的worker是否添加到线程池中 false:未添加 true:已经添加
boolean workerAdded = false;
// w:表示后面创建worker 的一个引用
Worker w = null;
try {
// 创建worker
w = new Worker(firstTask);
// 将新创建的worker 节点的线程赋值给t
final Thread t = w.thread;
// 为什么这里还要做 t != null 这个判断?
// 为了防止ThreadFactory 实现类有bug,因为ThreadFactory 是一个接口,谁都可以实现
// 防止程序员自己实现的ThreadFactory 实现类有bug,导致创建出来的Thread为null
if (t != null) {
// 将全局锁的引用保存到mainLock变量中
final ReentrantLock mainLock = this.mainLock;
// 持有全局锁,可能会阻塞,直到获取成功为止,同一时刻操作线程池内部相关的操作都必须持有锁
mainLock.lock();
// 从这里加锁之后,其他线程是无法修改线程池状态的
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
// 获取最新的线程池运行状态保存到rs中
int rs = runStateOf(ctl.get());
// 条件一:rs < SHUTDOWN 成立:当前线程池处于RUNNING 状态,最正常的状态
// 条件二: 前置条件:当前线程池状态不是RUNNING 状态
// (rs == SHUTDOWN && firstTask == null):当前状态为SHUTDOWN 状态且firstTask为null
// 其实判断的就是SHUTDOWN 状态下的特殊情况,只不过这里不再判断队列是否为空了
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// t.isAlive() 当前线程start 后,线程isAlive 会返回true
// 防止程序员在ThreadFactory实现类创建线程返回给外部之前,将线程给start了
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 将咱们创建的worker 添加到线程池中
workers.add(w);
// 获取最新当前线程池的线程数量
int s = workers.size();
// 条件成立:说明当前线程数量是一个新高,更新lagestPoolSize线程池中的线程最大数量
if (s > largestPoolSize)
largestPoolSize = s;
// 表示线程已经加入到线程池中了
workerAdded = true;
}
} finally {
// 释放线程池全局锁
mainLock.unlock();
}
// 条件成立:说明当前添加worker成功
// 条件失败:说明线程池在lock之前,线程池状态发生了变化导致添加失败
if (workerAdded) {
// 成功后则将创建的worker启动
t.start();
// 启动标记设置为true
workerStarted = true;
}
}
} finally {
// 条件成立:说明添加当前线程到线程池失败或者启动线程失败,需要做清理工作
// 1.释放令牌
// 2.将当前worker 清理出workers集合
if (! workerStarted)
addWorkerFailed(w);
}
// 返回新创建的线程是否启动
return workerStarted;
}
addWorkerFailed(Worker w) 方法
// addWorker添加线程到线程池中失败或者线程启动失败的后续工作,两个操作
// 1.释放令牌
// 2.将当前worker 清理出workers集合
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
// 持有线程池全局锁,因为操作的是线程池相关的东西
mainLock.lock();
try {
// 条件成立:需要将worker 在workers 中清理除去
if (w != null)
workers.remove(w);
// 线程池计数恢复-1,前面+1获取令牌了,这里因为失败,所以要-1,相当于归还令牌
decrementWorkerCount();
// 回头讲,shutdown shutdownNow再说
tryTerminate();
} finally {
// 释放线程池全局锁
mainLock.unlock();
}
}
3.5、runWorker(Worker w) 方法分析
// w:就是启动worker
final void runWorker(Worker w) {
// wt == w.thread
Thread wt = Thread.currentThread();
// 将初始执行的task赋值给task
Runnable task = w.firstTask;
// 清空当前w.firstTask的引用
w.firstTask = null;
// 这里为什么先调用unlock? 就是为了初始化worker state = 0 和 ExclusiveOwnerThread = null
// 启动worker 之前会先调用 unlock() 这个方法,会强制刷新ExclusiveOwnerThread == null 和 state == 0
w.unlock(); // allow interrupts
// 是否是突然退出?如果为true,表示发生异常了,当前线程突然退出的,回头需要做一些处理 false:表示正常退出的
boolean completedAbruptly = true;
try {
// 条件一:task != null 指的就是firstTask 是否为null 如果是null,直接执行循环体里面
// 条件二:(task = getTask()) != null 条件成立:说明当前线程在queue中获取任务成功,getTask是会阻塞线程的方法
// getTask()如果返回null,说明当前线程需要执行退出逻辑
while (task != null || (task = getTask()) != null) {
// worker 设置独占锁为当前线程
// 为什么要设置独占锁呢?就是怕shutdown()的时候会判断当前worker状态,根据独占锁是否空闲来判断当前worker是否正在工作
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
// 条件一:runStateAtLeast(ctl.get(), STOP) 说明线程池状态大于等于STOP,线程池目前处于STOP/TIDYING/TERMINATION,此时线程一定要给它一个中断信号
// 条件一成立:(runStateAtLeast(ctl.get(), STOP) && !wt.isInterrupted()
// 上面条件如果成立,说明当前线程池状态是大于等于STOP且当前线程是未设置中断状态的,此时需要进入到if里面,给当前线程一个中断
// 假设:(runStateAtLeast(ctl.get(), STOP) == false
// (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))
// Thread.interrupted() 获取当前中断状态,且设置中断位为false,连续调用两次interrupted()方法,第二次一定是返回false
// runStateAtLeast(ctl.get(), STOP):大概率这里还是false
// 其实它在强制刷新当前线程的中断标记为false,因为有可能上一次执行task的时候,业务代码里面将线程的中断标记位设置为了true且没有做处理
// 所以这里一定要强制刷新一下,不会再影响到后面的task了
// 假如:Thread.interrupted() == true && runStateAtLeast(ctl.get(), STOP) == true
// 这种情况有可能发生嘛?
// 有可能,因为外部线程再当前线程第一次(runStateAtLeast(ctl.get(), STOP) == false后,有机会调用shutdown shutdownNow方法将线程池状态修改
// 这个时候也会将当前线程的中断标记位再次设置为中断状态
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 钩子方法,留给子类实现的
beforeExecute(wt, task);
// 表示异常情况,如果thrown不为空,表示task运行过程中发生了异常,向上层抛出了异常
Throwable thrown = null;
try {
// task可能是firstTask,也可能是普通的runnable接口实现类
// 如果前面是通过submit()提交的runnable/callable会被封装成FutureTask对象
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 钩子方法,留给子类实现的
afterExecute(task, thrown);
}
} finally {
// 将局部变量task设置为 null
task = null;
// 更新worker完成任务数量
w.completedTasks++;
// worker 处理完一个任务后会释放独占锁,然后再次到queue中获取任务
// 1.正常情况下会再次回到getTask()那里获取任务 while(getTask())
// 2.task.run()内部抛出异常了
w.unlock();
}
} // while
// 什么情况下会来到这里?
// getTask()返回null的时候,说明当前线程应该执行退出逻辑了
completedAbruptly = false;
} finally {
// task.run() 内部抛出异常的时候,当上面的finally执行完以后跳到这里
// 正常提出 completedAbruptly = false 异常退出:completedAbruptly = true
// 这个方法以后再说
processWorkerExit(w, completedAbruptly);
}
}
3.6、getTask() 方法分析(重点)
// 什么情况下会返回null?
// 1.rs >= STOP成立说明:当前线程池的状态最低也是STOP状态,一定要返回null了
// 2.说明当前线程池状态为SHUTDOWN状态且任务队列已经为null,此时一定要返回null
// 3.线程池中的线程数量超过最大限制的时候,会有一部分线程返回为null
// 4.线程池中的线程数超过corePoolSize的时候,会有一部分线程超时后返回null
// 到当前的任务队列queue中去获取任务
private Runnable getTask() {
// 表示当前线程获取任务是否超时 默认false true表示已经超时
boolean timedOut = false; // Did the last poll() time out?
// 自旋
for (;;) {
// 获取最新ctl的值保存到c中
int c = ctl.get();
// 获取当前线程池的运行状态
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// 条件一:rs >= SHUTDOWN 条件成立:说明当前线程池是非RUNNING 状态,可能是SHUTDOWN/STOP/TIDYING/TERMINATION//
// 条件二:rs >= STOP || workQueue.isEmpty()
// 2.1:rs >= STOP成立说明:当前线程池的状态最低也是STOP状态,一定要返回null了
// 2.2:前置条件:状态是SHUTDOWN,workqueue.isEmpty() 条件成立:说明当前线程池状态为SHUTDOWN状态且任务队列已经为null,此时一定要返回null
// 返回null,runworker()方法就会将返回null的线程执行退出线程池的逻辑
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
// 使用CAS + 死循环的方式将ctl的值-1
decrementWorkerCount();
return null;
}
// 执行到这里,有几种情况:
// 1.线程池是RUNNING 状态
// 2.线程池是SHUTDOWN 状态,但是队列queue还未为空,此时可以创建线程(添加worker)
// 获取线程池中的线程数量
int wc = workerCountOf(c);
// timed == true:表示当前这个线程获取task的时候是支持超时机制的,使用queue.poll(时间单位,数量)
// 当获取task超时的情况下,下一次自旋就可能返回null了
// timed == false:表示当前这个线程获取task的时候是不支持超时机制的,当前线程会使用 queue.take()
// 情况一:allowCoreThreadTimeOut == true 表示核心线程数量内的空闲线程可以被回收,所有线程都是使用queue.poll()超时机制这种方式获取task
// 情况二:allowCoreThreadTimeOut == false 表示当前线程池会维护核心数量内的线程
// wc > corePoolSize:条件成立:当前线程池中的线程数量是大于核心线程数的,此时让所有路过这里的线程都使用poll,超时的方式去获取任务
// 这样就可能会有一部分线程获取不到任务,返回null,然后runworker会执行线程退出逻辑
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 条件一:(wc > maximumPoolSize || (timed && timedOut)
// 1.1:wc > maximumPoolSize 为什么会成立?setMaximumPoolSize() 方法,可能外部线程将线程池最大线程数设置为比初始化时的要小
// 1.2:timed && timedOut 条件成立:前置条件,当前线程使用poll()的方式从队列queue中获取worker执行,上一次循环的时候,使用poll的方式获取任务的时候,超时了
// 条件一为true:表示线程可以被回收,达到回收的标准,当确实需要回收的时候再回收
// 条件二:wc > 1 || workQueue.isEmpty()
// 2.1:wc > 1:条件成立,说明当前线程池中还有其他线程,当前线程不需要从队列中获取任务可以直接回收
// 2.2:workQueue.isEmpty():前置条件,条件成立:wc == 1 ,说明当前任务队列已经空了,最后一个线程,也可以放心的退出
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
// 使用 CAS 机制,将ctl的值减去1,减一成功的线程返回null
// CAS 失败?为什么会CAS 失败?
// 1.其他线程先你一步推出了
// 2.线程池状态发生变化了
if (compareAndDecrementWorkerCount(c))
return null;
// 再次自旋的时候,timed有可能就是false了,因为当前线程CAS失败,很有可能是因为其他线程成功退出导致的,再次查询的时候
// 检查发现,当前线程就可能属于不需要回收范围内了
continue;
}
// 获取任务的逻辑
try {
// poll(long timeout, TimeUnit unit):从BlockingQueue取出一个队首的对象,如果在指定时间内,
// 队列一旦有数据可取,则立即返回队列中的数据。否则直到时间超时还没有数据可取,返回失败。
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
// take 方法:获取队列中的第一个元素,如果被阻塞,则等待
workQueue.take();
// 条件成立:返回任务
if (r != null)
return r;
// 说明当前线程超时了
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
3.7、processWorkerExit(Worker w, boolean completedAbruptly) 方法分析
// 线程退出的逻辑
// w:线程执行的worker要获取任务
// completedAbruptly:线程是否发生异常 true:表示线程发生了异常 false:表示线程没有发生异常
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 条件成立:代表当前w这个worker 是发生异常退出的 task任务执行过程中向上抛出了异常
// 异常退出的时候,ctl的线程数量并没有-1
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
// 获取线程池的全局锁引用
final ReentrantLock mainLock = this.mainLock;
// 加锁
mainLock.lock();
try {
// 将当前worker完成的task数量,汇总到线程池的completedTaskCount
completedTaskCount += w.completedTasks;
// 将worker 从任务中移除
workers.remove(w);
} finally {
mainLock.unlock();
}
// 回头再说
tryTerminate();
// 获取最新的ctl值
int c = ctl.get();
// 条件成立:说明当前线程池状态为RUNNING 或者 SHUTDOWN 状态
if (runStateLessThan(c, STOP)) {
// 条件成立:说明当前线程是正常退出的
if (!completedAbruptly) {
// min:表示线程池最低持有的线程数量
// allowCoreThreadTimeOut == true:说明核心线程数内的线程,也会超时被回收 min = 0
// allowCoreThreadTimeOut == false:说明核心线程数内的线程,不会超时被回收 min = corePoolSize
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
// 前提状态:线程池状态为RUNNING SHUTDOWN
// 条件一: min == 0成立
// 条件二:! workQueue.isEmpty() 说明任务队列中还有任务,最起码留一个线程
if (min == 0 && ! workQueue.isEmpty())
min = 1;
// 条件成立:线程池中还拥有足够的线程
// 考虑一个问题:workerCountOf(c) >= min => (0 >= 0)
// 有可能!
// 什么情况下,当线程池中的核心线程数是可以被回收的,会出现这种情况,这种情况下,当前线程池中的线程数会变为0
// 下次再提交任务的时候,会再次创建线程
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// 1.当前线程再执行task的时候发生异常,这里一定要创建一个新的worker顶上去
// 2.! workQueue.isEmpty() 说明任务队列中还有任务,最起码要留一个线程,当前状态为 RUNNING | SHUTDOWN
// 3.线程数量小于corePoolSize值,此时会创建线程,维护线程池中的线程数量在corePollSize左右
addWorker(null, false);
}
}
3.8、shutdown() 方法分析
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
// 获取线程池的全局锁
mainLock.lock();
try {
// 判断是否有权限
checkShutdownAccess();
// 设置线程池的状态为 SHUTDOWN
advanceRunState(SHUTDOWN);
// 中断空闲线程
interruptIdleWorkers();
// 空方法,子类可以扩展
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
// 释放线程池全局锁
mainLock.unlock();
}
// 后面再说
tryTerminate();
}
// 中断空闲线程
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
// 通过自旋操作设置线程池的状态为SHUTDWON
private void advanceRunState(int targetState) {
// 自旋
for (;;) {
int c = ctl.get();
// 条件一成立:假设targetState == SHUTDOWN,说明当前线程池状态是 >= SHUTDOWN
// 条件一不成立:假设targetState == SHUTDOWN,说明当前线程池状态是 RUNNING
if (runStateAtLeast(c, targetState) ||
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
break;
}
}
3.9、tryTerminate() 方法分析
final void tryTerminate() {
// 自旋操作
for (;;) {
// 获取最新ctl的值
int c = ctl.get();
// 条件一:isRunning(c) 成立:直接返回就行了,线程池很正常
// 条件二:runStateAtLeast(c, TIDYING) 说明已经有其他线程在执行 TIDYING 转到 TERMIATION 状态了,当前线程直接回去
// 条件三:runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()
// SHUTDOWN 特殊情况,如果是这种情况,直接回去,得等队列中得任务处理完后再转化状态
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
// 什么情况会执行到这里?
// 1.线程池状态为 STOP 得时候
// 2.线程池状态为 SHUTDWON并且队列已经空了
// 条件成立:当前线程池中得线程数量 > 0
if (workerCountOf(c) != 0) { // Eligible to terminate
// 中断一个空闲线程
// 空闲线程在哪空闲呢?queue.take() 或者 queue.poll()
// 1.唤醒后的线程会在getTask方法返回null
// 2.执行退出逻辑得时候,会再次调用 tryTerminate 方法,唤醒下一个空闲线程
// 3.因为线程池状态是(线程池状态为 STOP 得时候 | 线程池状态为 SHUTDWON并且队列已经空了) 最终调用addWorker得时候会失败得
// 最终空闲线程都会在这里退出,非空闲线程当执行完当前task得时候,也会调用tryTerminate方法,有可能会走到这里
interruptIdleWorkers(ONLY_ONE);
return;
}
// 执行到这里得线程是谁?
// workerCountOf(c) == 0得时候,会来到这里
// 最后一个退出得线程,咱们知道,在(线程池状态 == STOP | 线程池状态为 SHUTDWON并且队列已经空了)
// 线程唤醒后,都会执行退出逻辑,退出过程中会先将 workerCount 计数 -1 => ctl中得计数-1
// 调用 tryTerminate方法之前,已经减过了,所以 0 得时候,表示这是最后一个退出得线程了
final ReentrantLock mainLock = this.mainLock;
// 获取全局锁
mainLock.lock();
try {
// 设置线程池状态为 TIDYING
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
// 调用钩子方法
terminated();
} finally {
// 设置线程池状态为 TERMINATED 状态
ctl.set(ctlOf(TERMINATED, 0));
// 唤醒调用 awaitTermination() 方法得线程
termination.signalAll();
}
return;
}
} finally {
// 释放全局锁
mainLock.unlock();
}
// else retry on failed CAS
}
}
3.10、shutdownNow() 方法分析
public List<Runnable> shutdownNow() {
// 返回值引用
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
// 获取线程池的全局锁
mainLock.lock();
try {
// 校验权限
checkShutdownAccess();
// 设置线程池状态为STOP
advanceRunState(STOP);
// 中断线程池中的所有线程
interruptWorkers();
// 导出未处理的task
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
// 返回当前任务队列中未处理的任务
return tasks;
}
至此,线程池 ThreadPoolExecutor 类源码就告一段落了,下面会开始更新 AQS 部分的内容,如果有错误,请指正!
标签:0000,worker,源码,线程,当前,ctl,null,ThreadPoolExecutor 来源: https://blog.csdn.net/weixin_46410481/article/details/122864251