Java多线程(三):线程组与线程池
作者:互联网
目录:
1. 线程组
在一个系统中,如果线程数量较多,且功能分配比较明确,就可以将相同功能的线程放在同一个线程组(ThreadGroup) 中。
ThreadGroup有两个比较重要的功能:
activeCount()
方法可以获得活动线程总数,但由于是动态的,因此这是一个估计值list()
方法可以打印出这个线程中所有的线程信息,对调试有一定的帮助
线程组使用举例:
public class TestThreadGroup {
public static void main(String[] args) {
ThreadGroup applesThread = new ThreadGroup("apples");
ThreadGroup bananasThread = new ThreadGroup("bananas");
Apple apple = new Apple();
Banana banana = new Banana();
for (int i = 0; i < 3; i++) {
new Thread(applesThread, apple, "apple" + i).start();
}
for (int i = 0; i < 3; i++) {
new Thread(bananasThread, banana, "banana" + i).start();
}
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
// activeCount()方法可以获得活动线程总数,但由于是动态的,因此这是一个估计值
System.out.println("当前活动线程总数: " + applesThread.activeCount());
// list()方法可以打印出这个线程中所有的线程信息,对调试有一定的帮助
System.out.println("bananas组中的所有线程信息: ");
bananasThread.list();
}
}
class Apple implements Runnable {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + ": I'm an apple");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class Banana implements Runnable {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + ": I'm a banana");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
执行结果:
2. 线程池
线程池是Java并发编程中一项重要的工具。
- 虽然与进程相比,线程是一种相对轻量级的工具,但其创建、关闭依然需要消费相当的资源。
- 如果高频率的创建、关闭线程,对系统性能的影响依旧十分明显的。
为解决此问题,我们可以使用JDK提供的工具——线程池。
- 正如他的名字所表达的,线程池就像一个大池子,里面存放着一些线程;
- 当有需要的时候,就从里面取出线程以完成指定的工作;
- 使用完在将线程重新放回线程池中。
JDK为线程池提供了一套Executor
框架,以帮助开发人员有效的进行线程控制
Executor框架的核心成员:
-
Interface Executor
-
interface ExecutorService extends Executor
-
abstract class AbstractExecutorService implements ExecutorService
-
class ThreadPoolExecutor extends AbstractExecutorService
:- 实现了
Executor
接口,任何Runnable
的对象都可以被ThreadPoolExecutor
线程池调用 - 是线程池工厂(Executors)重要的内部实现支持类。
- 实现了
-
interface ScheduledExecutorService extends ExecutorService
:- 在
ExecutorService
接口之上扩展了在给定时间执行某任务的功能。 - 如固定延时后执行、周期性执行等。
- 在
-
class Executors
:- 线程池工厂,可以利用此工厂构建线程池
2.1 Executors(线程池工厂)
主要的工厂方法:
工厂方法 | 功能 |
---|---|
ExecutorService newFixedThreadPool(int nThreads) | 返回一个固定线程数量的线程池,当有新任务提交时,如有空闲线程则立即执行,否则暂存在任务队列中进行等待(先进先出)。 |
ExecutorService newSingleThreadExecutor() | 相当于newFixedThreadPool(1) 。 |
ExecutorService newCachedThreadPool() | 返回一个可根据实际情况调整的线程池,优先复用线程,线程不够时创建。 |
ScheduledExecutorService newSingleThreadScheduledExecutor() | 返回一个ScheduledExecutorService 对象,线程池大小为1。ScheduledExecutorService 接口在ExecutorService 接口之上扩展了在给定时间执行某任务的功能。 |
ScheduledExecutorService newScheduledThreadPool(int corePoolSize) | 与newSingleThreadScheduledExecutor() 类似,但此方法可以指定线程数量。 |
以 newFixedThreadPool(int nThreads) 为例,介绍线程池的使用:
public class TestPool {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(2);
Runnable task = () -> {
System.out.println(Thread.currentThread().getName() + ": hello thread pool!");
};
for (int i = 0; i < 4; i++) {
executorService.submit(task);
}
// 执行完所有任务后,关闭线程池(不再接受新任务)。如果线程池已经关闭,则调用没有其他效果。
executorService.shutdown();
}
}
2.2 计划任务(延时执行,周期执行)
Java线程池还提供了计划执行的功能,主要包括以下三个方法:
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
方法会在给定的时间,对任务进行一次调度
... scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
对任务进行周期性的调度,任务的调度频率是唯一的(从第一次开始执行到第二次开始执行的时间是一定的)。
... scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
对任务进行周期性的调度,任务的调度频率不是唯一的(从第一次执行结束到第二次开始执行的时间是一定的)。
schedule举例:
public class ScheduledTask {
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3);
Runnable runnable = () -> {
System.out.println(new SimpleDateFormat("HH:mm:ss").format(new Date(System.currentTimeMillis())) + ": 开始执行run()方法");
// 任务需要执行1s
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(new SimpleDateFormat("HH:mm:ss").format(new Date(System.currentTimeMillis())) + ": run()方法执行结束");
};
System.out.println(new SimpleDateFormat("HH:mm:ss").format(new Date(System.currentTimeMillis())) + ": start");
/*
延时2s后调用任务:
schedule(Runnable command, long delay, TimeUnit unit)
*/
scheduledExecutorService.schedule(runnable, 2, TimeUnit.SECONDS);
scheduledExecutorService.shutdown();
}
}
scheduleAtFixedRate举例:
public class ScheduledTask {
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3);
Runnable runnable = () -> {
System.out.println(new SimpleDateFormat("HH:mm:ss").format(new Date(System.currentTimeMillis())) + ": 开始执行run()方法");
// 任务需要执行1s
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(new SimpleDateFormat("HH:mm:ss").format(new Date(System.currentTimeMillis())) + ": run()方法执行结束");
};
System.out.println(new SimpleDateFormat("HH:mm:ss").format(new Date(System.currentTimeMillis())) + ": start");
/*
每隔2s调用一次runnable任务:
scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
*/
scheduledExecutorService.scheduleAtFixedRate(runnable, 0, 2, TimeUnit.SECONDS);
}
}
scheduleWithFixedDelay举例:
package com.ju.pool;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class ScheduledTask {
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3);
Runnable runnable = () -> {
System.out.println(new SimpleDateFormat("HH:mm:ss").format(new Date(System.currentTimeMillis())) + ": 开始执行run()方法");
// 任务需要执行1s
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(new SimpleDateFormat("HH:mm:ss").format(new Date(System.currentTimeMillis())) + ": run()方法执行结束");
};
System.out.println(new SimpleDateFormat("HH:mm:ss").format(new Date(System.currentTimeMillis())) + ": start");
/*
一开始延时5s开始执行任务,从前一次任务结束到下一次任务开始间隔2s:
scheduleAtFixedRate(Runnable command, long initialDelay, long delay, TimeUnit unit)
*/
scheduledExecutorService.scheduleWithFixedDelay(runnable, 5, 2, TimeUnit.SECONDS);
}
}
2.3 核心线程池的内部实现
除了计划执行外,其余三个工厂方法(newFixedThreadPool
、newSingleThreadExecutor
、newCachedThreadPool
)的内部实现均使用了ThreadPoolExcutor
类。
ThreadPoolExcutor 类的一个参数较全的构造器如下所示:
public ThreadPoolExecutor(int corePoolSize, // 指定线程池中固定存在的线程数量
int maximumPoolSize, // 指定了线程池的最大线程数量
long keepAliveTime, // 指定了线程池中超过corePoolSize的空闲线程,在多长时间内会被销毁
TimeUnit unit, // keepAliveTime的单位
BlockingQueue<Runnable> workQueue, // 任务队列,由于存储已被提交但未被执行的任务
ThreadFactory threadFactory, // 线程工厂,由于创建线程,一般用默认的即可
RejectedExecutionHandler handler) // 拒绝策略:任务太多来不及处理时,如何拒绝任务
其中大部分的参数均比较容易理解,需要解释类型的有三个:
- 任务队列(BlockingQueue)
- 线程工厂(ThreadFactory)
- 拒绝策略(RejectedExecutionHandler)
2.3.1 任务队列(BlockingQueue)
-
直接提交的队列(SynchronousQueue):
- 是一个特殊的
BlockingQueue
,没有容量,每一个插入操作都要等待一个相应的删除操作 - 反之,每一个删除操作也都要等待一个相应的插入操作
- 提交的任务并不会被真正的保存,而是总是将新的任务提交给线程
- 有空闲线程则直接使用
- 没有空闲线程,则尝试创建
- 线程数量达到最大(
maximumPoolSize
),则执行拒绝策略
- 是一个特殊的
-
有界的任务队列(ArrayBlockingQueue):
-
队列遵循先进先出规则
-
corePoolSize
有剩余则直接获得线程处理任务 -
corePoolSize
已满则将任务加入队列 -
如等待队列已满,在总线程不大于
maximumPoolSize
的前提下创建新的线程执行任务 -
如大于
maximumPoolSize
,则执行拒绝策略
-
-
无界的任务队列(LinkedBlockingQueue):
- 队列遵循先进先出规则
corePoolSize
有剩余则直接获得线程处理任务corePoolSize
已满则将任务加入队列- 由于队列无界,所以不会出现队列已满的情况
- 无界队列会一直增长(有需要的话),直至耗尽系统的内存
-
优先任务队列(PriorityBlockingQueue):
- 带有执行优先级,可以控制任务执行的先后顺序
- 是特殊的无界队列,根据任务自身的优先级顺序先后执行
2.3.2 线程工厂(ThreadFactory)
-
用于创建线程池中的线程
-
可以自定义实现自己的线程工厂
public interface ThreadFactory { Thread newThread(Runnable r); }
2.3.3 拒绝策略(RejectedExecutionHandler)
AbortPolicy
: 直接抛出异常,阻之系统正常工作(默认的拒绝策略)CallerRunsPolicy
: 只要线程池未关闭,就直接在调用者线程中,运行当前被丢弃的任务DiscardOldestPolicy
: 丢弃 最“老” 的一个请求,也就是即将被执行的一个任务,并尝试再次提交当前任务DiscardPolicy
:丢弃无法处理的任务,不予任何处理
2.4 Executors中线程池的实现
对于newFixedThreadPool
、newSingleThreadExecutor
、newCachedThreadPool
方法,其方法内部均调用了下面的构造器。
- 即使用了默认的线程工厂和拒绝策略
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
2.4.1 newFixedThreadPool 的实现
public static ExecutorService newFixedThreadPool(int nThreads) { // 方法参数 nThreads,线程池中固定存在的线程数量
return new ThreadPoolExecutor(nThreads, nThreads, // corePoolSize 和 maximumPoolSize 均为 nThreads
0L, TimeUnit.MILLISECONDS, // 存活时间为0ms
new LinkedBlockingQueue<Runnable>()); // 使用无界的任务队列(LinkedBlockingQueue)
}
2.4.2 newSingleThreadExecutor 的实现
// 基本与 newFixedThreadPool 相同,只是限定了线程池中固定存在的线程数量为1
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
2.4.3 newCachedThreadPool 的实现
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, // corePoolSize = 0; maximumPoolSize = Integer.MAX_VALUE;
60L, TimeUnit.SECONDS, // keepAliveTime = 60s;
new SynchronousQueue<Runnable>()); // 直接提交的队列(SynchronousQueue)
}
2.5 扩展线程池
在默认的ThreadPoolExecutor 实现中,提供了空的beforeExecute()
、afterExecute()
和terminated()
方法。在实际应用中,可以对其进行扩展来实现对线程池运行状态的跟踪,输出一些有用的调试信息,以帮助系统故障诊断,这对于多线程程序错误排查是很有帮助的。
beforeExecute()
:任务执行前被调用afterExecute()
:任务执行后被调用terminated()
:线程池退出前被调用
public class MyPool {
public static void main(String[] args) {
ExecutorService myPool = new ThreadPoolExecutor(5, 5,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()) {
@Override
protected void beforeExecute(Thread t, Runnable r) {
System.out.println(t.getName() + ": 任务执行前");
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
System.out.println("任务执行结束");
}
@Override
protected void terminated() {
System.out.println("线程池退出");
}
};
myPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + "任务执行中...");
});
myPool.shutdown();
}
}
2.6 分而治之(Fork & Join)
“分而治之”是一个非常有效地处理大量数据的方法。
利用JDK提供的 Fork & Join 框架,我们可以十分方便的将一个大任务分解为多个子任务。当所有的子任务都执行完成后,在收集它们各自的结果,从而得到最终的结果。
- Fork & Join 框架中,任务数和线程数并不是一一对应的
- 大多数情况下,一个物理线程需要执行多个逻辑任务
- 每个线程都会有一个任务队列
- 如果线程A执行结束了,而线程B还剩下许多任务没有执行,A就会从B中”拿来“一个任务进行处理,以尽可能有效的利用物理资源
使用示例:
// 利用fork&join,实现start到end的累加
public class TestForkJoinPool extends RecursiveTask<Long> {
private static final int THRESHOLD = 30000; // 划分门槛
private static final int GROUP_SIZE = 10; // 每次划分的组数
private long start; // 计数起始值
private long end; // 计数终止值
public TestForkJoinPool(long start, long end) {
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
long sum = 0;
if (end - start < THRESHOLD) {
for (long i = start; i <= end; i++) {
sum += i;
}
} else {
// 将任务划分位多个子任务
long step = (end - start + 1) / GROUP_SIZE;
ArrayList<TestForkJoinPool> subTasks = new ArrayList<>();
long head = start;
long tail;
for (int i = 0; i < GROUP_SIZE; i++) {
tail = head + step;
if (tail > end) {
tail = end;
}
TestForkJoinPool subTask = new TestForkJoinPool(head, tail);
head = tail + 1;
subTasks.add(subTask);
subTask.fork();
}
// 收集子任务的计算结果
for (TestForkJoinPool subTask : subTasks) {
sum += subTask.join();
}
}
return sum;
}
public static void main(String[] args) {
long startTime = 0;
long endTime = 0;
long result = 0;
ForkJoinPool forkJoinPool = new ForkJoinPool();
startTime = System.currentTimeMillis();
ForkJoinTask<Long> taskResult = forkJoinPool.submit(new TestForkJoinPool(0, 2000000000L));
try {
result = taskResult.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
endTime = System.currentTimeMillis();
System.out.println("fork&join计算: sum = " + result + " 花费了" + (endTime - startTime) + "ms");
startTime = System.currentTimeMillis();
result = 0;
for (int i = 0; i <= 2000000000L; i++) {
result += i;
}
endTime = System.currentTimeMillis();
System.out.println("普通计算: sum = " + result + " 花费了" + (endTime - startTime) + "ms");
}
}
参考书籍: Java高并发程序设计(第二版)
标签:组与,System,long,任务,线程,new,多线程,public 来源: https://blog.csdn.net/weixin_42041195/article/details/117606743