其他分享
首页 > 其他分享> > 20211018-ThreadPoolExecutor

20211018-ThreadPoolExecutor

作者:互联网

成员变量

ctl变量

/**
* The main pool control state, ctl, is an atomic integer packing
    * two conceptual fields
    *   workerCount, indicating the effective number of threads
    *   runState,   indicating whether running, shutting down etc
   
* RUNNING: Accept new tasks and process queued tasks
    *   SHUTDOWN: Don't accept new tasks, but process queued tasks
    *   STOP:     Don't accept new tasks, don't process queued tasks,
    *             and interrupt in-progress tasks
    *   TIDYING: All tasks have terminated, workerCount is zero,
    *             the thread transitioning to state TIDYING
    *             will run the terminated() hook method
    *   TERMINATED: terminated() has completed
    */
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;// 32-3=29
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;// 2的29次方-1,0001... 低29位表示线程数最大数,高3位表示executors状态

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

// Packing and unpacking ctl
private static int runStateOf(int c)     { return c & ~CAPACITY; }// 运行状态,即上面的 RUNNING等
private static int workerCountOf(int c) { return c & CAPACITY; } // worker即工人数量
private static int ctlOf(int rs, int wc) { return rs | wc; }// runState 与 workerCount的和

mainLock+works

private final ReentrantLock mainLock = new ReentrantLock();// 访问works的锁
private final HashSet<Worker> workers = new HashSet<Worker>();

 

 

ThreadPoolExecutor

execute

1、worker<coreSize,新增worker

2、worker>=coreSize,queue未满,加入任务队列

3、worker>=coreSize,queue满了,但是worker<maxSize,新增worker

4、worker>=maxSize,queue满了,拒绝策略拒绝

public void execute(Runnable command) {
   if (command == null)
       throw new NullPointerException();
   int c = ctl.get();
   if (workerCountOf(c) < corePoolSize) {
       if (addWorker(command, true))
           return;
       c = ctl.get();
  }
   if (isRunning(c) && workQueue.offer(command)) {
       int recheck = ctl.get();
       if (! isRunning(recheck) && remove(command))
           reject(command);
       else if (workerCountOf(recheck) == 0)
           addWorker(null, false);
  }
   else if (!addWorker(command, false))
       reject(command);
}

 

addWorker

1、第一个for

自旋+CAS:增加 ctl内 worker数量

2、第二个for

new Worker,再加入 works

worker.start

private boolean addWorker(Runnable firstTask, boolean core) {
       retry:
       for (;;) {
           int c = ctl.get();
           int rs = runStateOf(c);

           // Check if queue empty only if necessary.
           if (rs >= SHUTDOWN &&
               ! (rs == SHUTDOWN &&
                  firstTask == null &&
                  ! workQueue.isEmpty()))
               return false;

           for (;;) {
               int wc = workerCountOf(c);
               if (wc >= CAPACITY ||
                   wc >= (core ? corePoolSize : maximumPoolSize))// 数量限制与workers数量比较,决定能否新增worker
                   return false;
               if (compareAndIncrementWorkerCount(c))
                   break retry;
               c = ctl.get();  // Re-read ctl
               if (runStateOf(c) != rs)
                   continue retry;
               // else CAS failed due to workerCount change; retry inner loop
          }
      }

       boolean workerStarted = false;
       boolean workerAdded = false;
       Worker w = null;
       try {
           w = new Worker(firstTask);
           final Thread t = w.thread;
           if (t != null) {
               final ReentrantLock mainLock = this.mainLock;
               mainLock.lock();
               try {
                   // Recheck while holding lock.
                   // Back out on ThreadFactory failure or if
                   // shut down before lock acquired.
                   int rs = runStateOf(ctl.get());

                   if (rs < SHUTDOWN ||
                      (rs == SHUTDOWN && firstTask == null)) {
                       if (t.isAlive()) // precheck that t is startable
                           throw new IllegalThreadStateException();
                       workers.add(w);
                       int s = workers.size();
                       if (s > largestPoolSize)
                           largestPoolSize = s;
                       workerAdded = true;
                  }
              } finally {
                   mainLock.unlock();
              }
               if (workerAdded) {
                   t.start();
                   workerStarted = true;
              }
          }
      } finally {
           if (! workerStarted)
               addWorkerFailed(w);
      }
       return workerStarted;
  }

 

Worker

public void run() {
runWorker(this);
}

 

runWorker

1、取任务,来自 firstTask 或者 getTask()

2、有任务,task.run(),进入下一个while

3、无任务,processWorkerExit

final void runWorker(Worker w) {
   Thread wt = Thread.currentThread();
   Runnable task = w.firstTask;
   w.firstTask = null;
   w.unlock(); // allow interrupts
   boolean completedAbruptly = true;// 突然中断,如果while条件未满足则非突然的,其他都是突然的
   try {
       while (task != null || (task = getTask()) != null) {
           w.lock();
           // If pool is stopping, ensure thread is interrupted;
           // if not, ensure thread is not interrupted. This
           // requires a recheck in second case to deal with
           // shutdownNow race while clearing interrupt
           if ((runStateAtLeast(ctl.get(), STOP) ||
                (Thread.interrupted() &&
                 runStateAtLeast(ctl.get(), STOP))) &&
               !wt.isInterrupted())
               wt.interrupt();
           try {
               beforeExecute(wt, task);
               Throwable thrown = null;
               try {
                   task.run();
              } catch (RuntimeException x) {
                   thrown = x; throw x;
              } catch (Error x) {
                   thrown = x; throw x;
              } catch (Throwable x) {
                   thrown = x; throw new Error(x);
              } finally {
                   afterExecute(task, thrown);
              }
          } finally {
               task = null;
               w.completedTasks++;
               w.unlock();
          }
      }
       completedAbruptly = false;
  } finally {
       processWorkerExit(w, completedAbruptly);
  }
}

 

getTask

1、是否淘汰(核心线程运行超时 或 worker数量大于corePoolSize)

2、是淘汰-超时时间内获取任务

3、不淘汰-不限时阻塞获取任务

private Runnable getTask() {
   boolean timedOut = false; // Did the last poll() time out?

   for (;;) {
       int c = ctl.get();
       int rs = runStateOf(c);

       // Check if queue empty only if necessary.
       if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
           decrementWorkerCount();
           return null;
      }

       int wc = workerCountOf(c);

       // Are workers subject to culling?
       boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;// 是否淘汰

       if ((wc > maximumPoolSize || (timed && timedOut))
           && (wc > 1 || workQueue.isEmpty())) {
           if (compareAndDecrementWorkerCount(c))
               return null;
           continue;
      }

       try {
           Runnable r = timed ?
               workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :// 需要淘汰,超时时间内获取任务,此处使用空闲时间
           workQueue.take();
           if (r != null)
               return r;
           timedOut = true;
      } catch (InterruptedException retry) {
           timedOut = false;
      }
  }
}

 

 

processWorkerExit

1、完成任务计数 更新

2、移除当前worker

3、非正常完成,新增worker

4、正常完成,worker数量满足最小要求,直接退出;不满足min要求,新增worker

private void processWorkerExit(Worker w, boolean completedAbruptly) {
   if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
       decrementWorkerCount();// 是突然的,ctl的work计数未调整,此处调整

   final ReentrantLock mainLock = this.mainLock;
   mainLock.lock();
   try {
       completedTaskCount += w.completedTasks;// 完成任务计数 更新
       workers.remove(w);// 移除当前worker
  } finally {
       mainLock.unlock();
  }

   tryTerminate();// 不太清楚有什么用

   int c = ctl.get();
   if (runStateLessThan(c, STOP)) {
       if (!completedAbruptly) {// 正常完成
           int min = allowCoreThreadTimeOut ? 0 : corePoolSize;// 允许核心线程空闲超时时死亡,则线程池最小线程数为0;否则最小线程数是corePoolSize
           if (min == 0 && ! workQueue.isEmpty())
               min = 1;
           if (workerCountOf(c) >= min)
               return; // replacement not needed
      }
       addWorker(null, false);
  }
}

 

参数配置

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) 

1、需要配置哪些

int corePoolSize		核心线程数
int maximumPoolSize		最大线程数
queueSize				队列长度

 

2、如何配置

依照

每秒请求数(QPS,如 100~1000)+

每个请求耗时(COST,0.5s)+

系统最大响应时间(MAXRSP,2s)

corePoolSize = QPS/(1/COST) = QPS/2 = 50~500

其中 (1/COST) 可以理解为单个线程 1s内可以完成的请求数 n(0<n<无限大),此处为 2,即 1s内一个线程能完成 2个请求

哦,网上还说了个什么 8020原则,貌似希望核心线程数满足 80% 的最大请求数,那么此处应该就是 400

 

queueSize = (MAXRSP-COST) * (max(QPS)-corePoolSize*(1/COST)) = 1.5 * (1000-800) = 300

太大:接入了无法满足最大响应时间的请求

太小:能满足最大响应时间的请求又拒绝了

 

队列大小应该满足最大响应时间,目前看是队列满时,最后一个任务出队完成刚好满足最大响应时间

最大响应时间 2s - 请求耗时 0.5s = 最长待 1.5秒,即 1.5s内核心线程数可以堆积的任务数

 

maximumPoolSize = max(QPS)/(1/COST) = 500

太大,创建过多线程,OOM;应该大于corePoolSize=400 但是小于最大 QPS 所需线程数=500

原则上最大线程数与队列都满负荷运作,应该满足最大请求数,此处QPS=1000

3、问

进入队列的请求与下一秒新的请求,谁会先执行

队列内的请求由以往work完成

新的请求看情况是

1、入队-新请求后执行

2、新增worker-新请求应该会先执行

3、拒绝

标签:int,20211018,worker,private,ctl,null,final,ThreadPoolExecutor
来源: https://www.cnblogs.com/zpq5935/p/16298960.html