编程语言
首页 > 编程语言> > 如何使用RX Java在EventBus上正确处理onError

如何使用RX Java在EventBus上正确处理onError

作者:互联网

我在Android上使用rx java,我的事件总线类如下

public class EventBus {
private final Subject<Event, Event> subject = new SerializedSubject<>(PublishSubject.<Event>create());
private Observable<Map<Type, Event>> stickyObservable;

public EventBus() {
    createStickyObservable();
}

private void createStickyObservable() {
    final List<Observable<Event>> observables = new ArrayList<>();

    final Observable<Map<Type, Event>> so = subject
            .filter(event -> event.sticky)
            .groupBy(event -> event.type)
            .switchMap(groupedObservable -> {
                BehaviorSubject<Event> bs = BehaviorSubject.create();
                groupedObservable.subscribe(bs);
                observables.add(bs);
                return Observable.combineLatest(observables, args -> {
                    Map<Type, Event> map = new HashMap<>();
                    for (Object arg : args) {
                        Event event = (Event) arg;
                        map.put(event.type, event);
                    }

                    return map;
                });
            });

    final BehaviorSubject<Map<Type, Event>> bs = BehaviorSubject.create();
    so.subscribe(bs);
    stickyObservable = bs;
}

public Observable<Event> filter(final String pathExpression) {
    final Pattern pattern = Pattern.compile(pathExpression);

    return subject.filter(event -> {
        if (event.path == null) {
            return pathExpression == null;
        }
        return pattern.matcher(event.path).matches();
    });
}

public Observable<Map<Type, Event>> getStickyObservable() {
    return stickyObservable;
}


public void event(Event event) {
    subject.onNext(event);
}

}

我收到很多带有rx.exceptions.OnErrorNotImplementedException的错误日志:

我怎样才能解决这个问题 ?请建议我一些解决这个问题的方法

解决方法:

订阅getStickyObservable()时,需要实现onError方法(例如,不要仅使用.subscribe(action)重载).

标签:rx-java,rx-android,java,android
来源: https://codeday.me/bug/20191119/2038283.html