java – CompletableFuture没有被执行.如果我使用ExecutorService池按预期工作,但不使用默认的forkJoin公共池
作者:互联网
我试图运行以下类,它终止而不执行CompletableFuture.
public class ThenApplyExample {
public static void main(String[] args) throws Exception {
//ExecutorService es = Executors.newCachedThreadPool();
CompletableFuture<Student> studentCompletableFuture = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 3;
})// If I put executorservice created n commented above, programme work as expected.
.thenApply(i -> {
for (int j = 0; j <= i; j++) {
System.out.println("Inside first then apply");
}
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("First then apply is finished");
return ++i;
})
.thenApply(i -> {
System.out.println("Inside 2nd then apply");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Inside 2nd then apply stopped");
return i++;
})
.thenApply(i -> {
System.out.println("Inside 3nd then apply");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Inside 3nd then apply stopped");
return "The i is ::: " + i;
})
.thenApply(s -> Student.builder().id(1).name(s).address("Some address").build());
System.out.println("Executing..");
System.out.println("Executing..");
System.out.println("Executing..");
System.out.println("Executing..");
System.out.println("Executing..");
//es.shutdown();
}
}
我得到的输出是
Executing..
Executing..
Executing..
Executing..
Executing..
而预期产量是
Executing..
Executing..
Executing..
Executing..
Executing..
Inside first then apply
Inside first then apply
Inside first then apply
Inside first then apply
First then apply is finished
Inside 2nd then apply
Inside 2nd then apply stopped
Inside 3nd then apply
Inside 3nd then apply stopped
Note : In the above programme, I am not using studentCompletableFuture.get(). I dont want to use it since it blocks the code.
如果我在程序的最后添加studentCompletableFuture.get(),它会按预期工作,或者如果我在supplyAsync第二个参数中添加executorservice(检查程序中的注释),它会按预期再次工作.
我的问题是,当程序使用默认的ForkJoin公共池时,为什么它会终止?
解决方法:
ForkJoinPool
中的线程是守护程序线程(至少默认情况下).这是相关的Javadoc(强调我的):
A
ForkJoinPool
differs from other kinds ofExecutorService
mainly by
virtue of employing work-stealing: all threads in the pool attempt to
find and execute tasks submitted to the pool and/or created by other
active tasks (eventually blocking waiting for work if none exist).
This enables efficient processing when most tasks spawn other subtasks
(as do mostForkJoinTask
s), as well as when many small tasks are
submitted to the pool from external clients. Especially when setting
asyncMode to true in constructors,ForkJoinPool
s may also be
appropriate for use with event-style tasks that are never joined. All
worker threads are initialized withThread.isDaemon()
set true.
Executors创建的ExecutorService中的线程是非守护程序线程.我相信例外是Executors.newWorkStealingPool.我找不到关于此的文档,但这至少是它目前的实现方式.您可以通过提供自定义ThreadFactory来更改此设置.
守护程序线程不会保持JVM活着,如Thread
的Javadoc(强调我的)中所述:
When a Java Virtual Machine starts up, there is usually a single
non-daemon thread (which typically calls the method namedmain
of some
designated class). The Java Virtual Machine continues to execute
threads until either of the following occurs:
- The
exit
method of classRuntime
has been called and the security manager has permitted the exit operation to take place.- All threads that are not daemon threads have died, either by returning from the call to the
run
method or by throwing an exception
that propagates beyond therun
method.
在您的代码中,您有以下(简化):
public static void main(String[] args) {
CompletableFuture<Student> future = CompletableFuture.supplyAsync(/* Supplier */)
.andThen(/* Function One */)
.andThen(/* Function Two */)
.andThen(/* Function Three */)
.andThen(/* Final Function */);
}
这使用了常见的ForkJoinPool,如上所述,它使用守护进程线程.异步代码从主线程启动,但您不必等待它完成.这意味着主线程退出,因此JVM也存在.这在您的异步代码有机会完成之前发生.
然后你试了一个get()的调用:
public static void main(String[] args) throws Exception {
Student student = CompletableFuture.supplyAsync(/* Supplier */)
.andThen(/* Function One */)
.andThen(/* Function Two */)
.andThen(/* Function Three */)
.andThen(/* Final Function */)
.get();
}
这是有效的,因为get()是一个阻塞调用;意味着主线程现在等待异步代码完成后再继续.换句话说,您保持主线程保持活动状态,从而使JVM保持活动状态.
当您从Executors.newCachedThreadPool()使用自己的ExeuctorService时,执行的线程是非守护进程.这意味着异步代码正在这些非守护程序线程上运行,这使得JVM保持活动状态,直到所述代码完成.事实上,如果不调用ExecutorService.shutdown(),即使异步代码完成,它也会使JVM保持活动状态(尽管缓存的线程池可能允许所有线程在一定时间后死亡).
在问题评论中,你会问是否有一种更“优雅”的方式来保持主线程活着(除了get()).我不确定你对“优雅”的定义是什么,但有CompletableFuture.join()
方法.它像get()一样等待将来在返回之前完成(正常或异常).但是,与get()不同,它不会抛出已检查的异常;但是等待也不能中断.
你还说,
I just checked ForkJoinPool common threads are not daemon thread by
usingSystem.out.println("The thread is :: "+Thread.currentThread().getName() + Thread.currentThread().isDaemon());
我不知道为什么会这样.运行此代码:
public static void main(String[] args) {
CompletableFuture.runAsync(() -> {
Thread t = Thread.currentThread();
System.out.printf("Thread_Name: %s, Daemon: %s%n", t.getName(), t.isDaemon());
}).join();
}
给我这个:
Thread_Name: ForkJoinPool.commonPool-worker-9, Daemon: true
标签:concurrent-futures,completable-future,java,multithreading,java-util-concurrent 来源: https://codeday.me/bug/20190828/1746522.html