android-使用RxJava与取消订阅之间的活动之间可以共享观察
作者:互联网
我正在尝试实现在我的应用程序的活动之间共享的计时器Observable.我正在实现一个Dagger单例类的实现,我将其注入到每个不同Activity的每个Presenter中.
我以这种方式创建了一次Observable:
Observable.defer(() -> Observable.timer(milliseconds, TimeUnit.MILLISECONDS).map(t -> this::doSomethingCool()))
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.share();
我使用以下功能从Presenter进行订阅:
public Observable<Status> register(Callback callback) {
PublishSubject<Status> subject = PublishSubject.create();
subject.subscribe(status -> {},
throwable -> L.LOGE(TAG, throwable.getMessage()),
() -> callback.onStatusChanged(mBasketStatus));
mObservable.subscribe(subject);
basketCounterCallback.onStatusChanged(status));
subject.doOnUnsubscribe(() -> L.LOGD(TAG, "Unsubcribed from subject!"));
return subject.asObservable();
}
我将该主题存储为可观察到的每个演示者,并呼吁:
obs.unsubscribeOn(AndroidSchedulers.mainThread())
取消订阅(在onPause()方法中).我还尝试使用Scheduler Schedulers.immediate()退订
但是无论如何,该回调都会被调用X次(其中X是我已订阅的所有Presenter,因此它并没有取消订阅).还有日志“取消订阅主题!”没有接到电话.
如何正确退订每个主题?
提前致谢
编辑:
由于评论,添加了更多的实现细节:
这是我创建Observable并将其存储在Singleton类StatusManager的成员中的状态(状态也是singleton):
private Observable<BasketStatus> mObservable;
private Status mStatus;
public Observable<BasketStatus> start(long milliseconds, Status status, Callback callback) {
if (mObservable == null) mObservable = createObservable(milliseconds, status);
return register(callback);
}
private Observable<BasketStatus> createObservable(long milliseconds, Status status) {
mStatus = status;
return Observable.defer(() -> Observable.timer(milliseconds, TimeUnit.MILLISECONDS).map(t -> status.upgradeStatus()))
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.share();
}
public Observable<BasketStatus> register(Callback callback) {
PublishSubject<Status> subject = PublishSubject.create();
subject.subscribe(status -> {},
throwable -> L.LOGE(TAG, throwable.getMessage()),
() -> callback.onStatusChanged(mStatus));
mObservable.subscribe(subject);
callback.onStatusChanged(mStatus));
subject.doOnUnsubscribe(() -> L.LOGD(TAG, "Unsubcribed from subject!"));
return subject.asObservable();
}
从启动计时器的Presenter调用方法start(…)之后,我从下一个Presenters调用register(…)方法:
class Presenter implements Callback {
private Observable<BasketStatus> mRegister;
@Inject
public Presenter(Status status, StatusManager statusManager) {
mRegister = statusManager.start(20000, status, this);
}
// Method called from onPause()
public void unregisterFromBasketStatus() {
mRegister.unsubscribeOn(Schedulers.immediate());
}
}
下一位主持人
@Inject
public NextPresenter(StatusManager statusManager) {
mBasketStatusManager.register(this);
}
解决方法:
正如我在评论中提到的那样,您没有得到unsubscribeOn运算符的行为.它不会取消任何订阅,而是告诉每个订阅者在取消订阅发生时应该在哪里进行工作.这也没有意义,因为您不是从Observable取消订阅,而是从表示观察者与流本身之间的连接的订阅退订.
回到你的问题.现在设计代码的方式,返回的主题完全没有用.您正在使用注册方法中的可观察计时器,但是您将这些通知转发给主题,此主题此后再也没有任何人订阅.如果确实需要它们,例如在我们未看到的代码的某些部分中,您需要存储subscribe()方法的结果,该方法是一个Subscription对象,并在需要时在其上调用unsubscribe().我认为是unregisterFromBasketStatus).
但是代码中还有更多问题.例如 :
subject.doOnUnsubscribe(() -> L.LOGD(TAG, "Unsubcribed from subject!"));
return subject.asObservable();
在第一行中,您不会在任何地方存储该操作的结果.由于Observables是不可变的结构,因此每个运算符都创建一个新流,该流接收来自上一个的通知.由此可见,很明显,当您在第二行中返回subject.asObservable()时,您不会从第一行中返回已修改的observable,而是返回未修改的旧状态.要更正此问题,只需将结果替换为主题变量:subject = subject.doOnUnsubscribe(()-> L.LOGD(TAG,“取消订阅主题!”)));
其次,使用流的原因之一是完全替换回调.当然,它将起作用,但是您正在减轻Rx带给代码库的许多好处.但是,它的可读性要差得多,任何需要处理您的代码的人都会把您诅咒到地狱,以至于他实际上可能会成功:-))我知道,Rx从一开始就很难学习,但是尝试尽可能避免这种情况.
标签:observable,rx-java,rx-android,subject-observer,android 来源: https://codeday.me/bug/20191026/1935211.html