Java CompletableFuture.complete()块
作者:互联网
在Java中使用CompletableFuture时遇到问题.
我有2个选择请求,这些请求从服务器收到响应时就会被填充.
在连接线程(THREAD-1)(使用电抗器)中,我使用:
if(hasException) {
selectFuture.completeExceptionally(new ClientException(errorCode));
} else {
System.out.println("Before complete future");
selectFuture.complete(result);
System.out.println("After complete future");
}
在其他线程(THREAD-2)中,我使用:
CompleteFuture.allOf(allSelect).whenComplete((aVoid, throwable) -> {
System.out.println("Receive all future");
// Do sth here
});
我的情况是系统打印出“ Receive all future”,但是在调用future.complete(result)时THREAD-1被阻止了.它无法摆脱该命令.
如果在THREAD-2中,我使用CompletableFuture.allOf(allOfSelect).get(),则THREAD-1将正确运行.但是使用CompletableFuture.get()会降低性能,因此我想使用CompletableFuture.whenComplete().
有人可以帮我解释阻塞的原因吗?
谢谢!
解决方法:
完成调用会触发所有相关的CompletionStages.
因此,如果您之前已经向whenComplete注册了BiConsumer,则完成将在其调用线程中调用它.在您的情况下,完成传递将在您传递给whenConlete的BiConsumer完成时返回.这在class javadoc中有描述
Actions supplied for dependent completions of non-async methods may be
performed by the thread that completes the current
CompletableFuture
, or by any other caller of a completion method.
(另一个调用者则是相反的情况,如果目标CompletableFuture已完成,则调用whenComplete的线程实际上将应用BiConsumer.)
这是一个说明行为的小程序:
public static void main(String[] args) throws Exception {
CompletableFuture<String> future = new CompletableFuture<String>();
future.whenComplete((r, t) -> {
System.out.println("before sleep, executed in thread " + Thread.currentThread());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("after sleep, executed in thread " + Thread.currentThread());
});
System.out.println(Thread.currentThread());
future.complete("completed");
System.out.println("done");
}
这将打印
Thread[main,5,main]
before sleep, executed in thread Thread[main,5,main]
after sleep, executed in thread Thread[main,5,main]
done
显示BiConsumer应用于主线程,该线程称为complete.
您可以使用whenCompleteAsync
在单独的线程中强制执行BiConsumer.
[…] that executes the given action using this stage’s default
asynchronous execution facility when this stage completes.
例如,
public static void main(String[] args) throws Exception {
CompletableFuture<String> future = new CompletableFuture<String>();
CompletableFuture<?> done = future.whenCompleteAsync((r, t) -> {
System.out.println("before sleep, executed in thread " + Thread.currentThread());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("after sleep, executed in thread " + Thread.currentThread());
});
System.out.println(Thread.currentThread());
future.complete("completed");
System.out.println("done");
done.get();
}
将打印
Thread[main,5,main]
done
before sleep, executed in thread Thread[ForkJoinPool.commonPool-worker-1,5,main]
after sleep, executed in thread Thread[ForkJoinPool.commonPool-worker-1,5,main]
表明BiConsumer是在单独的线程中应用的.
标签:reactor,completable-future,multithreading,future,java 来源: https://codeday.me/bug/20191118/2024825.html