编程语言
首页 > 编程语言> > java – Reactor compose vs flatMap

java – Reactor compose vs flatMap

作者:互联网

我继续玩Reactor,现在我看到compose运算符的行为与flatMap完全一样,我想知道是否有任何我不理解的差异.

    @Test
public void compose() throws InterruptedException {
    Scheduler mainThread = Schedulers.single();
    Flux.just(("old element"))
            .compose(element ->
                    Flux.just("new element in new thread")
                            .subscribeOn(mainThread)
                            .doOnNext(value -> System.out.println("Thread:" + Thread.currentThread().getName())))
            .doOnNext(value -> System.out.println("Thread:" + Thread.currentThread().getName()))
            .subscribe(System.out::println);
    Thread.sleep(1000);
}

@Test
public void flatMapVsCompose() throws InterruptedException {
    Scheduler mainThread = Schedulers.single();
    Flux.just(("old element"))
            .flatMap(element ->
                    Flux.just("new element in new thread")
                            .subscribeOn(mainThread)
                            .doOnNext(value -> System.out.println("Thread:" + Thread.currentThread().getName())))
            .doOnNext(value -> System.out.println("Thread:" + Thread.currentThread().getName()))
            .subscribe(System.out::println);
    Thread.sleep(1000);
}

这两个示例表现并返回相同的结果.

问候.

解决方法:

An excellent explanation by Dan Lew

不同之处在于compose()是一个更高级别的抽象:它在整个流上运行,而不是单独发出的项目.更具体的说法:

> compose()是获得原始Observable< T>的唯一方法.从溪流.因此,影响整个流的运算符(如subscribeOn()和observeOn())需要使用compose().

相反,如果将subscribeOn()/ observeOn()放在flatMap()中,它只会影响您在flatMap()中创建的Observable,而不会影响流的其余部分.
> compose()在您创建Observable流时立即执行,就像您已编写内联运算符一样. flatMap()在每次调用onNext()时都会执行.换句话说,flatMap()转换每个项目,而compose()转换整个流.
> flatMap()的效率必然较低,因为每次调用onNext()时都必须创建一个新的Observable. compose()按原样对流进行操作.如果要使用可重用代码替换某些运算符,请使用compose(). flatMap()有很多用途,但这不是其中之一.

标签:project-reactor,java,spring
来源: https://codeday.me/bug/20190724/1521956.html