【并发】ExecutorCompletionService使用
作者:互联网
介绍
ExecutorCompletionService是线程池和队列的配合使用,可以将线程池执行完成的结果存入队列当中,通过take或者poll方法获取执行完成的结果
遇到的场景
举例需求是将Excel表中的学生信息导入到数据库当中,在将Excel数据读取出来之后,需要对某些列的数据进行计算处理,在将表数据全部读取出来之后,遍历每一行数据,提交给线程池执行,执行完成的任务将返回结果存入ExecutorCompletionService默认的BlockingQueue队列当中,通过take方法拿到我们需要的数据,再做数据批量插入
源码分析
一、属性
// 线程池
private final Executor executor;
// 将Runnable任务和Callable任务包装成FutureTask任务
private final AbstractExecutorService aes;
// 用于存放任务执行完成后的结果,如果任务执行失败,不会存入
private final BlockingQueue<Future<V>> completionQueue;
二、构造方法
ExecutorCompletionService提供了两个构造方法,必传参数是线程池,可选是否传入用于存放结果的队列,不传入默认使用BlockingQueue
public ExecutorCompletionService(Executor executor) {
// 未传入线程池,抛出空指针
if (executor == null)
throw new NullPointerException();
// 赋值
this.executor = executor;
// 如果线程池对象是AbstractExecutorService的子类,则给ase赋值
this.aes = (executor instanceof AbstractExecutorService) ?(AbstractExecutorService) executor : null;
// 未传入队列,初始化BlockingQueue
this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}
public ExecutorCompletionService(Executor executor,BlockingQueue<Future<V>> completionQueue) {
// 同上
if (executor == null || completionQueue == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = completionQueue;
}
三、内部类
/*
* QueueingFuture是FutureTask的子类,重写了done方法。
* FutureTask中的done方法是个空方法,模板方法,可以进行重写
*/
private class QueueingFuture extends FutureTask<Void> {
QueueingFuture(RunnableFuture<V> task) {
super(task, null);
this.task = task;
}
// 任务执行完成执行该方法,将执行完成的任务添加到队列当中
protected void done() { completionQueue.add(task); }
private final Future<V> task;
}
四、任务的封装
// 将传入的Callable任务封装成RunnableFuture任务
private RunnableFuture<V> newTaskFor(Callable<V> task) {
if (aes == null)
// 使用构造方法创建
return new FutureTask<V>(task);
else
// aes的newTaskFor方法内部使用的也是构造方法创建
return aes.newTaskFor(task);
}
// 将传入的Runnable任务封装成RunnableFuture任务
private RunnableFuture<V> newTaskFor(Runnable task, V result) {
if (aes == null)
return new FutureTask<V>(task, result);
else
return aes.newTaskFor(task, result);
}
五、任务的执行
// 提交Callable任务
public Future<V> submit(Callable<V> task) {
// 判空
if (task == null) throw new NullPointerException();
// 封装成RunnableFuture类型
RunnableFuture<V> f = newTaskFor(task);
// 线程池执行
executor.execute(new QueueingFuture(f));
return f;
}
public Future<V> submit(Runnable task, V result) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task, result);
executor.execute(new QueueingFuture(f));
return f;
}
六、结果获取
// 获取完成的任务,获取不到会阻塞,直到获取到或者等待线程被中断抛出中断异常
public Future<V> take() throws InterruptedException {
return completionQueue.take();
}
// 获取完成的任务,获取不到不会阻塞,不支持中断,从队列中获取不到完成的任务直接返回null
public Future<V> poll() {
return completionQueue.poll();
}
// 获取完成的任务,循环的获取已完成的任务,直到获取到,或者超时,或者获取线程被中断,支持中断异常
public Future<V> poll(long timeout, TimeUnit unit)
throws InterruptedException {
return completionQueue.poll(timeout, unit);
}
参考文章:事故总结集锦-多线程使用不当导致的OOM -ExecutorCompletionService的 “套路”
参考文章:ExecutorCompletionService源码分析
标签:task,executor,并发,任务,线程,使用,null,completionQueue,ExecutorCompletionService 来源: https://www.cnblogs.com/hujh2022/p/16444908.html