20211018-ThreadPoolExecutor
作者:互联网
成员变量
ctl变量
/**
* The main pool control state, ctl, is an atomic integer packing
* two conceptual fields
* workerCount, indicating the effective number of threads
* runState, indicating whether running, shutting down etc
* RUNNING: Accept new tasks and process queued tasks
* SHUTDOWN: Don't accept new tasks, but process queued tasks
* STOP: Don't accept new tasks, don't process queued tasks,
* and interrupt in-progress tasks
* TIDYING: All tasks have terminated, workerCount is zero,
* the thread transitioning to state TIDYING
* will run the terminated() hook method
* TERMINATED: terminated() has completed
*/
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;// 32-3=29
private static final int CAPACITY = (1 << COUNT_BITS) - 1;// 2的29次方-1,0001... 低29位表示线程数最大数,高3位表示executors状态
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }// 运行状态,即上面的 RUNNING等
private static int workerCountOf(int c) { return c & CAPACITY; } // worker即工人数量
private static int ctlOf(int rs, int wc) { return rs | wc; }// runState 与 workerCount的和
mainLock+works
private final ReentrantLock mainLock = new ReentrantLock();// 访问works的锁
private final HashSet<Worker> workers = new HashSet<Worker>();
ThreadPoolExecutor
execute
1、worker<coreSize,新增worker
2、worker>=coreSize,queue未满,加入任务队列
3、worker>=coreSize,queue满了,但是worker<maxSize,新增worker
4、worker>=maxSize,queue满了,拒绝策略拒绝
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
addWorker
1、第一个for
自旋+CAS:增加 ctl内 worker数量
2、第二个for
new Worker,再加入 works
worker.start
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))// 数量限制与workers数量比较,决定能否新增worker
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
Worker
public void run() {
runWorker(this);
}
runWorker
1、取任务,来自 firstTask 或者 getTask()
2、有任务,task.run(),进入下一个while
3、无任务,processWorkerExit
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;// 突然中断,如果while条件未满足则非突然的,其他都是突然的
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
getTask
1、是否淘汰(核心线程运行超时 或 worker数量大于corePoolSize)
2、是淘汰-超时时间内获取任务
3、不淘汰-不限时阻塞获取任务
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;// 是否淘汰
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :// 需要淘汰,超时时间内获取任务,此处使用空闲时间
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
processWorkerExit
1、完成任务计数 更新
2、移除当前worker
3、非正常完成,新增worker
4、正常完成,worker数量满足最小要求,直接退出;不满足min要求,新增worker
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();// 是突然的,ctl的work计数未调整,此处调整
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;// 完成任务计数 更新
workers.remove(w);// 移除当前worker
} finally {
mainLock.unlock();
}
tryTerminate();// 不太清楚有什么用
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {// 正常完成
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;// 允许核心线程空闲超时时死亡,则线程池最小线程数为0;否则最小线程数是corePoolSize
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
参数配置
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
1、需要配置哪些
int corePoolSize 核心线程数 int maximumPoolSize 最大线程数 queueSize 队列长度
2、如何配置
依照
每秒请求数(QPS,如 100~1000)+
每个请求耗时(COST,0.5s)+
系统最大响应时间(MAXRSP,2s)
corePoolSize = QPS/(1/COST) = QPS/2 = 50~500
其中 (1/COST) 可以理解为单个线程 1s内可以完成的请求数 n(0<n<无限大),此处为 2,即 1s内一个线程能完成 2个请求
哦,网上还说了个什么 8020原则,貌似希望核心线程数满足 80% 的最大请求数,那么此处应该就是 400
queueSize = (MAXRSP-COST) * (max(QPS)-corePoolSize*(1/COST)) = 1.5 * (1000-800) = 300
太大:接入了无法满足最大响应时间的请求
太小:能满足最大响应时间的请求又拒绝了
队列大小应该满足最大响应时间,目前看是队列满时,最后一个任务出队完成刚好满足最大响应时间
最大响应时间 2s - 请求耗时 0.5s = 最长待 1.5秒,即 1.5s内核心线程数可以堆积的任务数
maximumPoolSize = max(QPS)/(1/COST) = 500
太大,创建过多线程,OOM;应该大于corePoolSize=400 但是小于最大 QPS 所需线程数=500
原则上最大线程数与队列都满负荷运作,应该满足最大请求数,此处QPS=1000
3、问
进入队列的请求与下一秒新的请求,谁会先执行
队列内的请求由以往work完成
新的请求看情况是
1、入队-新请求后执行
2、新增worker-新请求应该会先执行
3、拒绝
标签:int,20211018,worker,private,ctl,null,final,ThreadPoolExecutor 来源: https://www.cnblogs.com/zpq5935/p/16298960.html