编程语言
首页 > 编程语言> > android-使用RxJava与取消订阅之间的活动之间可以共享观察

android-使用RxJava与取消订阅之间的活动之间可以共享观察

作者:互联网

我正在尝试实现在我的应用程序的活动之间共享的计时器Observable.我正在实现一个Dagger单例类的实现,我将其注入到每个不同Activity的每个Presenter中.

我以这种方式创建了一次Observable:

Observable.defer(() -> Observable.timer(milliseconds, TimeUnit.MILLISECONDS).map(t -> this::doSomethingCool()))
            .subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())
            .share();

我使用以下功能从Presenter进行订阅:

public Observable<Status> register(Callback callback) {

    PublishSubject<Status> subject = PublishSubject.create();
    subject.subscribe(status -> {},
            throwable -> L.LOGE(TAG, throwable.getMessage()),
            () -> callback.onStatusChanged(mBasketStatus));

    mObservable.subscribe(subject);
    basketCounterCallback.onStatusChanged(status));

    subject.doOnUnsubscribe(() -> L.LOGD(TAG, "Unsubcribed from subject!"));
    return subject.asObservable();
}

我将该主题存储为可观察到的每个演示者,并呼吁:
    obs.unsubscribeOn(AndroidSchedulers.mainThread())
取消订阅(在onPause()方法中).我还尝试使用Scheduler Schedulers.immediate()退订

但是无论如何,该回调都会被调用X次(其中X是我已订阅的所有Presenter,因此它并没有取消订阅).还有日志“取消订阅主题!”没有接到电话.

如何正确退订每个主题?

提前致谢

编辑:

由于评论,添加了更多的实现细节:

这是我创建Observable并将其存储在Singleton类StatusManager的成员中的状态(状态也是singleton):

private Observable<BasketStatus> mObservable;
private Status mStatus;

public Observable<BasketStatus> start(long milliseconds, Status status, Callback callback) {

    if (mObservable == null) mObservable = createObservable(milliseconds, status);

    return register(callback);
}

private Observable<BasketStatus> createObservable(long milliseconds, Status status) {

    mStatus = status;

    return Observable.defer(() -> Observable.timer(milliseconds, TimeUnit.MILLISECONDS).map(t -> status.upgradeStatus()))
            .subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())
            .share();
}

public Observable<BasketStatus> register(Callback callback) {

    PublishSubject<Status> subject = PublishSubject.create();
    subject.subscribe(status -> {},
            throwable -> L.LOGE(TAG, throwable.getMessage()),
            () -> callback.onStatusChanged(mStatus));

    mObservable.subscribe(subject);
    callback.onStatusChanged(mStatus));

    subject.doOnUnsubscribe(() -> L.LOGD(TAG, "Unsubcribed from subject!"));
    return subject.asObservable();
}

从启动计时器的Presenter调用方法start(…)之后,我从下一个Presenters调用register(…)方法:

class Presenter implements Callback {

    private Observable<BasketStatus> mRegister;

    @Inject
    public Presenter(Status status, StatusManager statusManager) {
        mRegister = statusManager.start(20000, status, this);
    }

    // Method called from onPause()
    public void unregisterFromBasketStatus() {
         mRegister.unsubscribeOn(Schedulers.immediate());
    }
}

下一位主持人

@Inject
public NextPresenter(StatusManager statusManager) {
    mBasketStatusManager.register(this);
}

解决方法:

正如我在评论中提到的那样,您没有得到unsubscribeOn运算符的行为.它不会取消任何订阅,而是告诉每个订阅者在取消订阅发生时应该在哪里进行工作.这也没有意义,因为您不是从Observable取消订阅,而是从表示观察者与流本身之间的连接的订阅退订.

回到你的问题.现在设计代码的方式,返回的主题完全没有用.您正在使用注册方法中的可观察计时器,但是您将这些通知转发给主题,此主题此后再也没有任何人订阅.如果确实需要它们,例如在我们未看到的代码的某些部分中,您需要存储subscribe()方法的结果,该方法是一个Subscription对象,并在需要时在其上调用unsubscribe().我认为是unregisterFromBasketStatus).

但是代码中还有更多问题.例如 :

subject.doOnUnsubscribe(() -> L.LOGD(TAG, "Unsubcribed from subject!"));
return subject.asObservable(); 

在第一行中,您不会在任何地方存储该操作的结果.由于Observables是不可变的结构,因此每个运算符都创建一个新流,该流接收来自上一个的通知.由此可见,很明显,当您在第二行中返回subject.asObservable()时,您不会从第一行中返回已修改的observable,而是返回未修改的旧状态.要更正此问题,只需将结果替换为主题变量:subject = subject.doOnUnsubscribe(()-> L.LOGD(TAG,“取消订阅主题!”)));

其次,使用流的原因之一是完全替换回调.当然,它将起作用,但是您正在减轻Rx带给代码库的许多好处.但是,它的可读性要差得多,任何需要处理您的代码的人都会把您诅咒到地狱,以至于他实际上可能会成功:-))我知道,Rx从一开始就很难学习,但是尝试尽可能避免这种情况.

标签:observable,rx-java,rx-android,subject-observer,android
来源: https://codeday.me/bug/20191026/1935211.html