编程语言
首页 > 编程语言> > 结合多个可观察对象的RxJava弹性方式

结合多个可观察对象的RxJava弹性方式

作者:互联网

我有多个返回Observable的模块:O1,O2,O3 …
所有模块的结果应合并为一个可观察到的Ocomb,以使单个任务可能失败,但是合并不会因单个问题而终止或影响.
使用当前的解决方案,我遇到了以下示例中的各种问题:

这段代码结合了我的模块的输出:

public Observable<Data> getModuleData(){
    List<Observable<Data>> tasks = new ArrayList<>();
    for(MyModule module : modules){
        tasks.add(module.getData());
    }
    return Observable
            .mergeDelayError(Observable.from(tasks))
            .onBackpressureBuffer(MAX_BUFFER)
            .observeOn(AndroidSchedulers.mainThread());
}

现在,我想显示属性X,例如所有发出的数据对象的“名称”:

public List<String> getNames() {
    return  getModuleData() 
            .map(new Func1<Data, String>() {
                @Override
                public String call(Data data) {
                    return data.getName();
                }
            })
            .timeout(600, TimeUnit.MILLISECONDS)
            .toList()
            .toBlocking()
            .firstOrDefault(new ArrayList<String>());
}

getNames()方法应该返回一个列表,因此会阻止执行.

问题1
似乎RxJava中有一个issue,如果我调用observeOn()并将其阻止,则无论超时如何,它都不会返回.

问题2
如果将onObserve()删除,则代码将起作用,但是在应用程序的其他位置,我正在UI中呈现可观察到的非阻塞结果.将会显示数据,但之后我的UI会发疯.每当数据更改时,我都必须触摸我的UI列表组件以刷新屏幕.

问题3
一些模块可能会产生内部错误,或者不会调用onCompleted().我认为mergeDelayError()和timeout()的组合可以处理这些情况,并为无响应的模块调用onCompleted().但是,如果其中一个模块未调用onCompleted()而timeout()语句已删除,则阻塞调用将永远不会返回.

问题:

组合多个可观察对象的最佳方法是什么,以便单个可观察对象可能失败,但是将其作为onCompleted()/被忽略并且不影响组合的可观察对象?

什么是使组合的可观察的阻塞和处理超时而不停止执行或结束循环的最佳解决方案?

解决方法:

What is the best way to combine multiple observable so that individual observables can fail but it’s handled as onCompleted() / ignored and does not affect the combined observable?

          Observable.from(modules)
            .flatMap(MyModule::getData)
                .onErrorResumeNext(Observable.empty())
                .timeout(600,TimeUnit.MILLISECONDS, Observable.empty())
            .toList()
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(System.out::println);

可以通过在subscribe()之前添加toBlocking来将上述流转换为阻塞,但是除了测试外,它没有多大意义

标签:rx-java,reactive-programming,android
来源: https://codeday.me/bug/20191118/2029731.html