java-按需执行热的Observable
作者:互联网
举一个冷酷的例子:
Observable<Integer> cold = Observable.create(subscriber -> {
try {
for (int i = 0; i <= 42; i++) {
// avoid doing unnecessary work
if (!subscriber.isUnsubscribed()) {
break;
}
subscriber.onNext(i);
}
subscriber.onCompleted();
} catch (Throwable cause) {
subscriber.onError(cause);
}
});
它从头开始为每个新订户执行:
// starts execution
cold.subscribe(...)
并可以在订户提早退订的情况下停止执行:
// stops execution
subscription.unsubscribe();
现在,如果不是代替样本进行循环,而是进行了一些实际的业务逻辑(不需要为每个订户重播,而是实时的),那么我们正在处理可观察的热点…
PublishSubject<Integer> hot = PublishSubject.create();
Thread thread = new Thread(() -> {
try {
for (int i = 0; i < 42; i++) {
// how to avoid unnecessary work when no one is subscribed?
hot.onNext(i);
}
hot.onCompleted();
} catch (Throwable cause) {
hot.onError(cause);
}
});
当我们希望它开始时,我们可能会做
// stats work (although no one is subscribed)
thread.start();
因此,第一个问题是:只有在第一个观察者订阅时才能开始工作? (也许可以观察到吗?)
还有一个重要的问题:最后一个订户退订时如何停止工作? (我无法弄清楚如何访问该主题的当前订阅,如果存在这样的解决方案,我想找到没有共享全局状态的干净解决方案)
我能想到的一种解决方案是使用可管理订户的自定义运算符来提升主题…
解决方法:
请参阅运算符refCount-http://reactivex.io/documentation/operators/refcount.html.此运算符将您的Observable转换为ConnectableObservable,并在第一个订阅者订阅时将其连接,而在没有更多订阅时断开连接
标签:reactive-streams,rx-java,reactive-programming,java 来源: https://codeday.me/bug/20191027/1945293.html