java – Flux没有在Spring 5反应堆中订阅
作者:互联网
我可能错过了一些东西,但我无法弄清楚它是什么.
以下代码什么都不做:
webClient.get().uri("/some/path/here").retrieve()
.bodyToMono(GetLocationsResponse.class)
.doOnNext(System.out::println)
.subscribe();
如果我试图阻止呼叫它工作正常:
webClient.get().uri("/some/path/here").retrieve()
.bodyToMono(GetLocationsResponse.class)
.doOnNext(System.out::println)
.block();
奇怪的是,如果我“手动”创建一个Flux(即不是来自spring webClient),这很好用:
Flux.just("1", "2", "3")
.filter(s -> !s.equals("2"))
.doOnNext(System.out::println)
.subscribe();
有人可以解释一下我做错了什么吗?是不是.subscribe()应该在第一种情况下执行操作,就像它在上一次中一样?
谢谢!
解决方法:
简答
subscribe不阻止当前线程,这意味着应用程序主线程可以比Flux发出任何元素更早完成.因此要么使用块,要么在主线程中使用等待.
细节
调用no-args subscribe()只需在Flux上发出请求(无限制)而无需设置任何订阅者.它通常在单独的线程中触发操作,但不阻止当前线程.最有可能的是,您的主线程在WebClient在该单独的线程中收到响应并且passive side effect doOnNext(...)
发生之前结束.
为了说明/测试操作是否已开始,请在主线程中等待一段时间.只需在subscribe()调用后输入以下行:
Thread.sleep(1000);
现在,在播放超时值后,您将能够看到打印结果.
现在让我们隐式地为异步操作发送一个自定义调度程序,并等待其所有任务完成.另外,让我们将System.out :: println作为subscribe(…)参数而不是doOnNext传递,以便完整代码如下所示:
ExecutorService executor = Executors.newSingleThreadExecutor();
webClient.get().uri("/some/path/here").retrieve()
.bodyToMono(GetLocationsResponse.class)
.publishOn(Schedulers.fromExecutor(executor)) // next operation will go to this executor
.subscribe(System.out::println); //still non-blocking
executor.awaitTermination(1, TimeUnit.SECONDS); //block current main thread
此示例使用略有不同的subscribe(Consumer).最重要的是,它添加了由ExecutorService支持的publishOn(Scheduler).然后使用后者在主线程中等待终止.
当然,更容易实现相同结果的方法是使用最初提到的block():
06002
最后,注意你的第三个例子与Flux.just(…)… subscribe() – 似乎它只是在主线程终止之前快速完成.这是因为与单个GetLocationsResponse元素的发射相比,它需要更少的时间来发出一些String元素(这意味着写请求读取响应的时序解析为POJO).但是,如果你使这个Flux延迟元素,你会得到相同的行为:
Flux.just("1", "2", "3")
.filter(s -> !s.equals("2"))
.delayElements(Duration.ofMillis(500)) //this makes it stop printing in main thread
.doOnNext(System.out::println)
.subscribe();
Flux.just("1", "2", "3")
.filter(s -> !s.equals("2"))
.delayElements(Duration.ofMillis(500))
.doOnNext(System.out::println)
.blockLast(); //and that makes it printing back again
标签:project-reactor,java,reactive-programming 来源: https://codeday.me/bug/20190823/1697243.html