其他分享
首页 > 其他分享> > 线程池_ThreadPoolExecutor原理分析

线程池_ThreadPoolExecutor原理分析

作者:互联网

java.uitl.concurrent.ThreadPoolExecutor 类是 Executor 框架中最核心的类。

线程池简介

什么是线程池

线程池就是创建若干个可执行的线程放入一个池(容器)中,有任务需要处理时,会提交到线程池中的任务队列,处理完之后线程并不会被销毁,而是仍然在线程池中等待下一个任务。

为什么要使用线程池

因为 Java 中创建一个线程,需要调用操作系统内核的 API,操作系统要为线程分配一系列的资源,成本很高,所以线程是一个重量级的对象,应该避免频繁创建和销毁。

使用线程池就能很好地避免频繁创建和销毁,所以有必要引入线程池。使用 线程池的好处 有以下几点:

线程池状态和工作线程数量

这本来是两个不同的概念,但是在ThreadPoolExecutor中我们使用一个变量ctl来存储这两个值,这样我们只需要维护这一个变量的并发问题,提高运行效率。

/**
 * 记录线程池中Worker工作线程数量和线程池的状态
 * int类型是32位,它的高3位,表示线程池的状态,低29位表示Worker的数量
 */
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// COUNT_BITS 29位,
private static final int COUNT_BITS = Integer.SIZE - 3;
// 表示线程池中创建Worker工作线程数量的最大值。即 0b0001.....1(29位1)
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

怎么使用一个变量ctl存储两个值呢?

就是利用int变量的高3位来储存线程池状态,用int变量的低29位来储存工作线程数量。

这样就有两个需要注意的地方:

  • 工作线程数量最大值不能超过int类型29位的值CAPACITY 即0b0001.....1(29位1)
  • 因为线程池状态都是高3位储存的,所以工作线程数量不会影响状态值大小关系。

线程池状态

// 高3位值是111
private static final int RUNNING    = -1 << COUNT_BITS;
// 高3位值是000
private static final int SHUTDOWN   =  0 << COUNT_BITS;
// 高3位值是001
private static final int STOP       =  1 << COUNT_BITS;
// 高3位值是010
private static final int TIDYING    =  2 << COUNT_BITS;
// 高3位值是011
private static final int TERMINATED =  3 << COUNT_BITS;

线程池状态分析:

① RUNNING:运行状态。接受新任务,并且也能处理阻塞队列中的任务。

② SHUTDOWN:关闭状态。不接受新任务,但可以处理阻塞队列中的任务。

③ STOP:停止状态。不接受新任务,也不处理队列中的任务。会中断正在处理任务的线程。在线程池处于 RUNNING 或 SHUTDOWN 状态时,调用 shutdownNow 方法会使线程池进入到该状态。

④ TIDYING:整理状态。如果所有的任务都已终止了,workerCount (有效线程数) 为 0,线程池进入该状态后会调用 terminated 方法进入 TERMINATED 状态。

⑤ TERMINATED:已终止状态。在 terminated 方法执行完后进入该状态。默认 terminated 方法中什么也没有做。进入 TERMINATED 的条件如下:

操作ctl的方法

获取线程池的状态

/**
 * 获取线程池的状态。因为线程池的状态是使用高3位储存,所以屏蔽低29位就行了。
 * 所以就c与~CAPACITY(0b1110..0)进行&操作,屏蔽低29位的值了。
 * 注意:这里是屏蔽低29位的值,而不是右移29位。
 */
private static int runStateOf(int c)     { return c & ~CAPACITY; }

获取工作线程数量

/**
 * 获取线程池中Worker工作线程的数量,
 * 因为只使用低29位保存Worker的数量,只要屏蔽高3位的值就行了
 * 所以就c与CAPACITY(0b0001...1)进行&操作,屏蔽高3位的值了。
 */
private static int workerCountOf(int c)  { return c & CAPACITY; }

合并ctl的值

/**
 * 得到ctl的值。
 * 接受两个参数rs和wc。rs表示线程池的状态,wc表示Worker工作线程的数量。
 * 对于rs来说我们只需要高3位的值,对于wc来说我们需要低29位的值。
 * 所以我们将rs | wc就可以得到ctl的值了。
 */
private static int ctlOf(int rs, int wc) { return rs | wc; }

其他方法

// 因为RUNNING状态高三位是111,所以状态值rs与工作线程数量ws相与的结果值c一定是个负数,
// 而其他状态值都是大于等于0的数,所以c是负数,那么表示当前线程处于运行状态。
private static boolean isRunning(int c) {
    return c < SHUTDOWN;
}

/**
 * 使用CAS函数将ctl值自增
 */
private boolean compareAndIncrementWorkerCount(int expect) {
    return ctl.compareAndSet(expect, expect + 1);
}

/**
 * 使用CAS函数将ctl值自减
 */
private boolean compareAndDecrementWorkerCount(int expect) {
    return ctl.compareAndSet(expect, expect - 1);
}
/**
 * 使用CAS函数加循环方法这种乐观锁的方式,解决并发问题。
 * 保证使ctl值减一
 */
private void decrementWorkerCount() {
    do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}

重要成员变量

// 记录线程池中Worker工作线程数量和线程池的状态
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

// 任务线程的阻塞队列,因为是阻塞队列,所以它是并发安全的
private final BlockingQueue<Runnable> workQueue;

// 独占锁,用来保证操作成员变量的并发安全问题
private final ReentrantLock mainLock = new ReentrantLock();

// 等待线程池完全终止的条件Condition,
private final Condition termination = mainLock.newCondition();

//-----------------  需要mainLock来保证并发安全-------------------------//
// 线程池中工作线程集合。Worker中持有线程thread变量
private final HashSet<Worker> workers = new HashSet<Worker>();

// 线程池中曾拥有过的最大工作线程个数
private int largestPoolSize;

// 线程池完成过任务的总个数
private long completedTaskCount;
//-----------------  需要mainLock来保证并发安全-------------------------//

// 创建线程的工厂类
private volatile ThreadFactory threadFactory;

// 当任务被拒绝时,用来处理这个被拒绝的任务
private volatile RejectedExecutionHandler handler;
// 工作线程空闲的超时时间keepAliveTime
private volatile long keepAliveTime;

// 是否允许核心池线程超时释放
private volatile boolean allowCoreThreadTimeOut;

// 线程池核心池线程个数
private volatile int corePoolSize;

// 线程池最大的线程个数
private volatile int maximumPoolSize;

成员变量说明:

构造方法

ThreadPoolExecutor 有四个构造方法,前三个都是基于第四个实现。第四个构造方法定义如下:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
}

参数说明:

1、corePoolSize:核心线程数量。当有新任务通过 execute 方法提交时 ,线程池会执行以下判断:

所以,任务提交时,判断的顺序为 corePoolSize => workQueue => maximumPoolSize。

2、maximumPoolSize:最大线程数量。

值得注意的是:如果使用了无界的任务队列这个参数就没什么效果。

3、keepAliveTime:线程保持活动的时间(非核心线程的存活时间)。

当线程池中的线程数量大于 corePoolSize 的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了 keepAliveTime。

所以,如果任务很多,并且每个任务执行的时间比较短,可以调大这个时间,提高线程的利用率。

4、unit:keepAliveTime 的时间单位。有 7 种取值。可选的单位有天(DAYS),小时(HOURS),分钟(MINUTES),毫秒(MILLISECONDS),微秒(MICROSECONDS, 千分之一毫秒)和毫微秒(NANOSECONDS, 千分之一微秒)。

5、workQueue - 等待执行的任务队列。用于保存等待执行的任务的阻塞队列。 可以选择以下几个阻塞队列。

(1)ArrayBlockingQueue - 有界阻塞队列。

此队列是基于数组的先进先出队列(FIFO)。

此队列创建时必须指定大小。

(2)LinkedBlockingQueue - 无界阻塞队列。

此队列是基于链表的先进先出队列(FIFO)。
如果创建时没有指定此队列大小,则默认为 Integer.MAX_VALUE。
吞吐量通常要高于 ArrayBlockingQueue。
使用 LinkedBlockingQueue 意味着: maximumPoolSize 将不起作用,线程池能创建的最大线程数为 corePoolSize,因为任务等待队列是无界队列。
Executors.newFixedThreadPool 使用了这个队列。

(3)SynchronousQueue - 不会保存提交的任务,而是将直接新建一个线程来执行新来的任务。

每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态。
吞吐量通常要高于 LinkedBlockingQueue。
Executors.newCachedThreadPool 使用了这个队列。

(4)PriorityBlockingQueue - 具有优先级的无界阻塞队列。

6、threadFactory - 线程工厂。可以通过线程工厂给每个创建出来的线程设置更有意义的名字。

7、handler - 饱和策略。它是 RejectedExecutionHandler 类型的变量。当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。线程池支持以下策略:

AbortPolicy - 丢弃任务并抛出异常。这也是默认策略。
DiscardPolicy - 丢弃任务,但不抛出异常。
DiscardOldestPolicy - 丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)。
CallerRunsPolicy - 只用调用者所在的线程来运行任务。

如果以上策略都不能满足需要,也可以通过实现 RejectedExecutionHandler 接口来定制处理策略。如记录日志或持久化不能处理的任务。

常用方法

 

标签:队列,private,int,任务,线程,ctl,原理,ThreadPoolExecutor
来源: https://www.cnblogs.com/xfeiyun/p/15868784.html