其他分享
首页 > 其他分享> > 【并发】ExecutorCompletionService使用

【并发】ExecutorCompletionService使用

作者:互联网

介绍

ExecutorCompletionService是线程池和队列的配合使用,可以将线程池执行完成的结果存入队列当中,通过take或者poll方法获取执行完成的结果

遇到的场景

举例需求是将Excel表中的学生信息导入到数据库当中,在将Excel数据读取出来之后,需要对某些列的数据进行计算处理,在将表数据全部读取出来之后,遍历每一行数据,提交给线程池执行,执行完成的任务将返回结果存入ExecutorCompletionService默认的BlockingQueue队列当中,通过take方法拿到我们需要的数据,再做数据批量插入

源码分析

一、属性

// 线程池
private final Executor executor;
// 将Runnable任务和Callable任务包装成FutureTask任务
private final AbstractExecutorService aes;
// 用于存放任务执行完成后的结果,如果任务执行失败,不会存入
private final BlockingQueue<Future<V>> completionQueue;

二、构造方法

ExecutorCompletionService提供了两个构造方法,必传参数是线程池,可选是否传入用于存放结果的队列,不传入默认使用BlockingQueue

public ExecutorCompletionService(Executor executor) {
	// 未传入线程池,抛出空指针
	if (executor == null)
	throw new NullPointerException();
	// 赋值
	this.executor = executor;
	// 如果线程池对象是AbstractExecutorService的子类,则给ase赋值
	this.aes = (executor instanceof AbstractExecutorService) ?(AbstractExecutorService) executor : null;
	// 未传入队列,初始化BlockingQueue
	this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}

public ExecutorCompletionService(Executor executor,BlockingQueue<Future<V>> completionQueue) {
	// 同上
	if (executor == null || completionQueue == null)
		throw new NullPointerException();
	this.executor = executor;
	this.aes = (executor instanceof AbstractExecutorService) ?
		(AbstractExecutorService) executor : null;
	this.completionQueue = completionQueue;
}

三、内部类

/*
 * QueueingFuture是FutureTask的子类,重写了done方法。
 * FutureTask中的done方法是个空方法,模板方法,可以进行重写
 */
private class QueueingFuture extends FutureTask<Void> {
	QueueingFuture(RunnableFuture<V> task) {
		super(task, null);
		this.task = task;
	}
	// 任务执行完成执行该方法,将执行完成的任务添加到队列当中
	protected void done() { completionQueue.add(task); }
	private final Future<V> task;
}

四、任务的封装

// 将传入的Callable任务封装成RunnableFuture任务
private RunnableFuture<V> newTaskFor(Callable<V> task) {
	if (aes == null)
		// 使用构造方法创建
		return new FutureTask<V>(task);
	else
		// aes的newTaskFor方法内部使用的也是构造方法创建
		return aes.newTaskFor(task);
}
// 将传入的Runnable任务封装成RunnableFuture任务
private RunnableFuture<V> newTaskFor(Runnable task, V result) {
	if (aes == null)
		return new FutureTask<V>(task, result);
	else
		return aes.newTaskFor(task, result);
}

五、任务的执行

// 提交Callable任务
public Future<V> submit(Callable<V> task) {
	// 判空
	if (task == null) throw new NullPointerException();
	// 封装成RunnableFuture类型
	RunnableFuture<V> f = newTaskFor(task);
	// 线程池执行
	executor.execute(new QueueingFuture(f));
	return f;
}
public Future<V> submit(Runnable task, V result) {
	if (task == null) throw new NullPointerException();
	RunnableFuture<V> f = newTaskFor(task, result);
	executor.execute(new QueueingFuture(f));
	return f;
}

六、结果获取

// 获取完成的任务,获取不到会阻塞,直到获取到或者等待线程被中断抛出中断异常
public Future<V> take() throws InterruptedException {
	return completionQueue.take();
}
// 获取完成的任务,获取不到不会阻塞,不支持中断,从队列中获取不到完成的任务直接返回null
public Future<V> poll() {
	return completionQueue.poll();
}
// 获取完成的任务,循环的获取已完成的任务,直到获取到,或者超时,或者获取线程被中断,支持中断异常
public Future<V> poll(long timeout, TimeUnit unit)
	throws InterruptedException {
	return completionQueue.poll(timeout, unit);
}

参考文章:事故总结集锦-多线程使用不当导致的OOM -ExecutorCompletionService的 “套路”
参考文章:ExecutorCompletionService源码分析

标签:task,executor,并发,任务,线程,使用,null,completionQueue,ExecutorCompletionService
来源: https://www.cnblogs.com/hujh2022/p/16444908.html