java CompletionService ExecutorCompletionSerivce
作者:互联网
我们来想一个问题:
如果向Executor提交了一组计算任务,并且希望在计算完成后获得结果,那么我们可以保留与每个任务关联的Future,然后反复使用get方法,从而通过轮询来拿到返回结果,但是这样有些繁琐。
废话不说,上代码。
1 package com.citi.test.mutiplethread.demo0511; 2 3 import java.util.ArrayList; 4 import java.util.List; 5 import java.util.concurrent.Callable; 6 import java.util.concurrent.ExecutionException; 7 import java.util.concurrent.ExecutorService; 8 import java.util.concurrent.Executors; 9 import java.util.concurrent.Future; 10 11 public class TestCompletionService { 12 public static void main(String[] args) { 13 ExecutorService service=Executors.newFixedThreadPool(10); 14 List<Future<Integer>> list=new ArrayList<Future<Integer>>(); //跟CompletionService比,多余的步骤 15 for(int i=0;i<9;i++){ 16 final int index=i; 17 Future<Integer> submit = service.submit(new Callable<Integer>() { //这句话前面部分也是多余的步骤 18 @Override 19 public Integer call() throws Exception { 20 // int a=1/0; 21 return index; 22 } 23 }); 24 list.add(submit); 25 } 26 for(Future<Integer> temp: list){ 27 try { 28 Integer integer = temp.get(); 29 System.out.println(Thread.currentThread().getName()+" "+integer); 30 } catch (InterruptedException | ExecutionException e) { 31 // TODO Auto-generated catch block 32 e.printStackTrace(); 33 } 34 } 35 service.shutdown(); 36 } 37 }
可以看到上面的方法也是可以的,但是有些繁琐,java提供了一种更好的方法:完成服务(CompletionSerivce)
不多说,上代码,
1 package com.citi.test.mutiplethread.demo0511; 2 3 import java.util.concurrent.Callable; 4 import java.util.concurrent.CompletionService; 5 import java.util.concurrent.ExecutionException; 6 import java.util.concurrent.ExecutorCompletionService; 7 import java.util.concurrent.ExecutorService; 8 import java.util.concurrent.Executors; 9 import java.util.concurrent.Future; 10 11 public class TestCompletionService2 { 12 public static void main(String[] args) { 13 ExecutorService executor=Executors.newCachedThreadPool(); 14 CompletionService<Integer> completionService= new ExecutorCompletionService<Integer>(executor); 15 for(int i=0;i<10;i++){ 16 final int index=i; 17 completionService.submit(new Callable<Integer>() { 18 @Override 19 public Integer call() throws Exception { 20 return index; 21 } 22 }); 23 } 24 for(int i=0;i<10;i++){ 25 try { 26 Future<Integer> take = completionService.take(); //可以看到我们可以直接用completionService 来拿到Future对象。非常简便。 27 Integer integer = take.get(); 28 System.out.println(Thread.currentThread().getName()+" "+integer); 29 } catch (InterruptedException e) { 30 // TODO Auto-generated catch block 31 e.printStackTrace(); 32 } catch (ExecutionException e) { 33 // TODO Auto-generated catch block 34 e.printStackTrace(); 35 } 36 } 37 executor.shutdown(); 38 } 39 }
可以看到,我们不用像原来那样,
在向ExecutorService中提交任务之后,都去拿Future对象,
而是在提交完所有任务后,直接通过completionService.take来拿future对象了。
说一下原理,通过表象可以看出
CompletionService将Executor和BlockingQueue的功能融合在一起,你可以将callable任务提交给它执行,然后使用类似队列的操作的take和poll等方法来获得已完成的结果。
ExecutorCompletionSerivce实现了CompletionService,将计算部分委托给一个Executor。
通过源码可以看到,
CompletionService 这是个接口,定义了一些规范。
ExecutorCompletionSerivce是CompletionService的实现类,在构造函数中创建了一个BlockingQueue来保存计算完成的结果。
当提交某个任务时,该任务首先被包装为一个QueueingFuture,这是FutureTask的一个子类,然后改写子类的done方法,并将结果放到这个BlockingQueue中,之后想拿结果时,可以直接用BlockingQueue的take和poll方法来获取结果,这里而且是阻塞的。
成员变量和内部类
//成员变量。 private final Executor executor; private final AbstractExecutorService aes; private final BlockingQueue<Future<V>> completionQueue; //继承自FutureTask,重写done方法,来实现在任务完成后,把结果加入到BlockingQueue中 /** * FutureTask extension to enqueue upon completion */ private class QueueingFuture extends FutureTask<Void> { QueueingFuture(RunnableFuture<V> task) { super(task, null); this.task = task; } protected void done() { completionQueue.add(task); } private final Future<V> task; }
构造方法有两个
public ExecutorCompletionService(Executor executor) { if (executor == null) throw new NullPointerException(); this.executor = executor; this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null; this.completionQueue = new LinkedBlockingQueue<Future<V>>(); } //可以传入想用的阻塞队列 public ExecutorCompletionService(Executor executor, BlockingQueue<Future<V>> completionQueue) { if (executor == null || completionQueue == null) throw new NullPointerException(); this.executor = executor; this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null; this.completionQueue = completionQueue; }
submit方法具体实现
public Future<V> submit(Callable<V> task) { if (task == null) throw new NullPointerException(); //将callable提交给ExecutorCompletionSerivce之后,任务会被包装成QueueingFuture,而QueueingFuture继承自FutureTask,只不过重写了done方法,重写之后的done方法,可以看到是把包装之后的task放入到了阻塞队列中,这样在提交完所有任务之后,我们可以直接调用ExecutorCompletion.take方法来获得结果了 RunnableFuture<V> f = newTaskFor(task); executor.execute(new QueueingFuture(f)); return f; } private RunnableFuture<V> newTaskFor(Callable<V> task) { if (aes == null) return new FutureTask<V>(task); else return aes.newTaskFor(task); } private class QueueingFuture extends FutureTask<Void> { QueueingFuture(RunnableFuture<V> task) { super(task, null); this.task = task; } protected void done() { completionQueue.add(task); } private final Future<V> task; }
标签:task,java,CompletionService,ExecutorCompletionSerivce,util,concurrent,executor,i 来源: https://www.cnblogs.com/liumy/p/11645154.html