如何使用Java RX Observable链接异步操作?
作者:互联网
我想反复发出一个HTTP请求并根据结果采取行动.我从public Observable< NewsItem>开始fetchItems(NewsFeed提要).一个请求得到一些新闻,但我决定将其展平.
这个想法是使用Observable.interval()多次发出请求,然后将结果Observable合并为一个.
Observable
.interval(timePerItem, TimeUnit.MILLISECONDS)
.map(i -> feed)
.map(feed -> fetchItems(feed))
.subscribe(result -> System.out.println(result));
但是结果是可观察的< Observable< NewsItem>>.不可观察< NewsItem>.如何嫁给他们?
我发现marge()运算符(RX-Java doc: Marge).但是它似乎不适合用例.
在以前的版本中,我使用了CompletableFuture< List< NewsItem>> fetchNewsItems(),但我无法将其放入Observable链中.
解决方法:
不知道我是否理解这个问题,但是您不只是在寻找flatMap吗?
Observable
.interval(timePerItem, TimeUnit.MILLISECONDS)
.flatMap(i -> fetchItems(feed))
.subscribe(result -> System.out.println(result));
标签:rx-java,java 来源: https://codeday.me/bug/20191121/2048746.html