编程语言
首页 > 编程语言> > java – Flux没有在Spring 5反应堆中订阅

java – Flux没有在Spring 5反应堆中订阅

作者:互联网

我可能错过了一些东西,但我无法弄清楚它是什么.

以下代码什么都不做:

webClient.get().uri("/some/path/here").retrieve()
     .bodyToMono(GetLocationsResponse.class)
     .doOnNext(System.out::println)
     .subscribe();

如果我试图阻止呼叫它工作正常:

webClient.get().uri("/some/path/here").retrieve()
      .bodyToMono(GetLocationsResponse.class)
      .doOnNext(System.out::println)
      .block();

奇怪的是,如果我“手动”创建一个Flux(即不是来自spring webClient),这很好用:

Flux.just("1", "2", "3")
    .filter(s -> !s.equals("2"))
    .doOnNext(System.out::println)
    .subscribe();

有人可以解释一下我做错了什么吗?是不是.subscribe()应该在第一种情况下执行操作,就像它在上一次中一样?

谢谢!

解决方法:

简答

subscribe不阻止当前线程,这意味着应用程序主线程可以比Flux发出任何元素更早完成.因此要么使用块,要么在主线程中使用等待.

细节

调用no-args subscribe()只需在Flux上发出请求(无限制)而无需设置任何订阅者.它通常在单独的线程中触发操作,但不阻止当前线程.最有可能的是,您的主线程在WebClient在该单独的线程中收到响应并且passive side effect doOnNext(...)发生之前结束.

为了说明/测试操作是否已开始,请在主线程中等待一段时间.只需在subscribe()调用后输入以下行:

Thread.sleep(1000);

现在,在播放超时值后,您将能够看到打印结果.

现在让我们隐式地为异步操作发送一个自定义调度程序,并等待其所有任务完成.另外,让我们将System.out :: println作为subscribe(…)参数而不是doOnNext传递,以便完整代码如下所示:

ExecutorService executor = Executors.newSingleThreadExecutor(); 

webClient.get().uri("/some/path/here").retrieve()
    .bodyToMono(GetLocationsResponse.class)
    .publishOn(Schedulers.fromExecutor(executor)) // next operation will go to this executor
    .subscribe(System.out::println); //still non-blocking

executor.awaitTermination(1, TimeUnit.SECONDS); //block current main thread 

此示例使用略有不同的subscribe(Consumer).最重要的是,它添加了由ExecutorService支持的publishOn(Scheduler).然后使用后者在主线程中等待终止.

当然,更容易实现相同结果的方法是使用最初提到的block():

06002

最后,注意你的第三个例子与Flux.just(…)… subscribe() – 似乎它只是在主线程终止之前快速完成.这是因为与单个GetLocationsResponse元素的发射相比,它需要更少的时间来发出一些String元素(这意味着写请求读取响应的时序解析为POJO).但是,如果你使这个Flux延迟元素,你会得到相同的行为:

Flux.just("1", "2", "3")
    .filter(s -> !s.equals("2"))
    .delayElements(Duration.ofMillis(500)) //this makes it stop printing in main thread
    .doOnNext(System.out::println)
    .subscribe(); 


Flux.just("1", "2", "3")
    .filter(s -> !s.equals("2"))
    .delayElements(Duration.ofMillis(500))
    .doOnNext(System.out::println)
    .blockLast(); //and that makes it printing back again

标签:project-reactor,java,reactive-programming
来源: https://codeday.me/bug/20190823/1697243.html