编程语言
首页 > 编程语言> > Java CompletableFuture.complete()块

Java CompletableFuture.complete()块

作者:互联网

Java中使用CompletableFuture时遇到问题.
我有2个选择请求,这些请求从服务器收到响应时就会被填充.

在连接线程(THREAD-1)(使用电抗器)中,我使用:

if(hasException) {
   selectFuture.completeExceptionally(new ClientException(errorCode));
} else {
   System.out.println("Before complete future");
   selectFuture.complete(result);
   System.out.println("After complete future");
}

在其他线程(THREAD-2)中,我使用:

CompleteFuture.allOf(allSelect).whenComplete((aVoid, throwable) -> {
   System.out.println("Receive all future");
   // Do sth here
});

我的情况是系统打印出“ Receive all future”,但是在调用future.complete(result)时THREAD-1被阻止了.它无法摆脱该命令.
如果在THREAD-2中,我使用CompletableFuture.allOf(allOfSelect).get(),则THREAD-1将正确运行.但是使用CompletableFuture.get()会降低性能,因此我想使用CompletableFuture.whenComplete().

有人可以帮我解释阻塞的原因吗?

谢谢!

解决方法:

完成调用会触发所有相关的CompletionStages.

因此,如果您之前已经向whenComplete注册了BiConsumer,则完成将在其调用线程中调用它.在您的情况下,完成传递将在您传递给whenConlete的BiConsumer完成时返回.这在class javadoc中有描述

Actions supplied for dependent completions of non-async methods may be
performed by the thread that completes the current
CompletableFuture
, or by any other caller of a completion method.

(另一个调用者则是相反的情况,如果目标CompletableFuture已完成,则调用whenComplete的线程实际上将应用BiConsumer.)

这是一个说明行为的小程序:

public static void main(String[] args) throws Exception {
    CompletableFuture<String> future = new CompletableFuture<String>();
    future.whenComplete((r, t) -> {
        System.out.println("before sleep, executed in thread " + Thread.currentThread());
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("after sleep, executed in thread " + Thread.currentThread());
    });

    System.out.println(Thread.currentThread());
    future.complete("completed");
    System.out.println("done");
}

这将打印

Thread[main,5,main]
before sleep, executed in thread Thread[main,5,main]
after sleep, executed in thread Thread[main,5,main]
done

显示BiConsumer应用于主线程,该线程称为complete.

您可以使用whenCompleteAsync在单独的线程中强制执行BiConsumer.

[…] that executes the given action using this stage’s default
asynchronous execution facility when this stage completes
.

例如,

public static void main(String[] args) throws Exception {
    CompletableFuture<String> future = new CompletableFuture<String>();
    CompletableFuture<?> done = future.whenCompleteAsync((r, t) -> {
        System.out.println("before sleep, executed in thread " + Thread.currentThread());
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("after sleep, executed in thread " + Thread.currentThread());
    });

    System.out.println(Thread.currentThread());
    future.complete("completed");
    System.out.println("done");
    done.get();
}

将打印

Thread[main,5,main]
done
before sleep, executed in thread Thread[ForkJoinPool.commonPool-worker-1,5,main]
after sleep, executed in thread Thread[ForkJoinPool.commonPool-worker-1,5,main]

表明BiConsumer是在单独的线程中应用的.

标签:reactor,completable-future,multithreading,future,java
来源: https://codeday.me/bug/20191118/2024825.html