Reactor3 功能介绍二十四: ConnectableFlux
作者:互联网
Flux<Long> flux = Flux.interval(Duration.ofMillis(50), Duration.ofMillis(100))
.take(5)
.doOnSubscribe(s -> System.out.println("subscribed to source"));
flux.subscribe(x -> {
System.out.println("____" + x);
});
Thread.sleep(100*3l);
flux.subscribe(x -> {
System.out.println("*****" + x);
});
Thread.sleep(1000*3l);
执行结果:
subscribed to source
____0
____1
____2
subscribed to source
____3
*****0
____4
*****1
*****2
*****3
*****4
从执行结果,发现“subscribed to source”输出了两次,即订阅了两次,每一次订阅其实都生成了一个新的Worker,然后从0开始生产元素并向下游传递。现实中,更多的场景是,一个worker在源源不断的生产元素,新订阅的消费者无须消费历史元素,而是消费订阅之后生产的元素。在这种背景下,ConnectableFlux登场了,当然ConnectableFlux的作用并不仅仅于此。
ConnectableFlux可以在订阅者和源产生订阅后推迟整 个订阅逻辑的执行时间,另外也可以做到当订阅者达到一定数量或激活某个策略条件的时候才触发真正的订阅操作及数据、元素的产生和下放。
Flux<Long> source = Flux.interval(Duration.ofSeconds(2))
.doOnSubscribe(s -> System.out.println("subscribed to source"));//@1
//ConnectableFlux<Long> co = source.publish();
ConnectableFlux<Long> co = source.replay(2); //@2
co.subscribe(t -> System.out.println(t + " One"), e -> { //@3
}, () -> {
});
co.subscribe(t -> System.out.println(t + " Two"), e -> {
}, () -> {
});
System.out.println("done subscribing");
Thread.sleep(500);
System.out.println("will now connect");
co.connect(); //@4
// Thread.sleep(5000);
Thread.sleep(10000);
co.subscribe(t -> System.out.println(t + " Three"), e -> { //@5
}, () -> {
});
Thread.sleep(5000);
执行结果:
done subscribing
will now connect
subscribed to source
0 One
0 Two
1 One
1 Two
2 One
2 Two
3 One
3 Two
4 One
4 Two
3 Three
4 Three
5 One
5 Two
5 Three
6 One
6 Two
6 Three
@1 从执行结果,也可以发现“subscribed to source”仅仅只执行了一次,而且是co.connect()触发的。
@2 reply操作将源序列从“冷”序列转换为“热”序列,即从Flux转换为ConnectableFlux,参数history,是ConnectableFlux缓存的最近的数据的条数,缓存的数据会被下发给后续订阅的消费者
@3 这里的subscribe其实仅仅是建立订阅关系,并不是真正意义上的订阅(前序课程介绍的),仅在 co.connect执行后,才会触发真正的subscribe操作
@4 co.connect真正的触发subscribe操作
@5 后订阅的消费者在本例中从元素3开始消费。
与冷序列不同,在本例中,其实在底层只生成一个线程worker,负责生产元素。ConnectableFlux内部维持着一个订阅者的数组,开启订阅(connect)后,每生产一个元素都会传递给所有的消费者。
标签:订阅,System,ConnectableFlux,source,二十四,println,Reactor3,out 来源: https://blog.csdn.net/qian_348840260/article/details/112624155