java – Reactor compose vs flatMap
作者:互联网
我继续玩Reactor,现在我看到compose运算符的行为与flatMap完全一样,我想知道是否有任何我不理解的差异.
@Test
public void compose() throws InterruptedException {
Scheduler mainThread = Schedulers.single();
Flux.just(("old element"))
.compose(element ->
Flux.just("new element in new thread")
.subscribeOn(mainThread)
.doOnNext(value -> System.out.println("Thread:" + Thread.currentThread().getName())))
.doOnNext(value -> System.out.println("Thread:" + Thread.currentThread().getName()))
.subscribe(System.out::println);
Thread.sleep(1000);
}
@Test
public void flatMapVsCompose() throws InterruptedException {
Scheduler mainThread = Schedulers.single();
Flux.just(("old element"))
.flatMap(element ->
Flux.just("new element in new thread")
.subscribeOn(mainThread)
.doOnNext(value -> System.out.println("Thread:" + Thread.currentThread().getName())))
.doOnNext(value -> System.out.println("Thread:" + Thread.currentThread().getName()))
.subscribe(System.out::println);
Thread.sleep(1000);
}
这两个示例表现并返回相同的结果.
问候.
解决方法:
An excellent explanation by Dan Lew:
不同之处在于compose()是一个更高级别的抽象:它在整个流上运行,而不是单独发出的项目.更具体的说法:
> compose()是获得原始Observable< T>的唯一方法.从溪流.因此,影响整个流的运算符(如subscribeOn()和observeOn())需要使用compose().
相反,如果将subscribeOn()/ observeOn()放在flatMap()中,它只会影响您在flatMap()中创建的Observable,而不会影响流的其余部分.
> compose()在您创建Observable流时立即执行,就像您已编写内联运算符一样. flatMap()在每次调用onNext()时都会执行.换句话说,flatMap()转换每个项目,而compose()转换整个流.
> flatMap()的效率必然较低,因为每次调用onNext()时都必须创建一个新的Observable. compose()按原样对流进行操作.如果要使用可重用代码替换某些运算符,请使用compose(). flatMap()有很多用途,但这不是其中之一.
标签:project-reactor,java,spring 来源: https://codeday.me/bug/20190724/1521956.html