编程语言
首页 > 编程语言> > 如何使用Java RX Observable链接异步操作?

如何使用Java RX Observable链接异步操作?

作者:互联网

我想反复发出一个HTTP请求并根据结果采取行动.我从public Observable< NewsItem>开始fetchItems(NewsFeed提要).一个请求得到一些新闻,但我决定将其展平.

这个想法是使用Observable.interval()多次发出请求,然后将结果Observable合并为一个.

       Observable
            .interval(timePerItem, TimeUnit.MILLISECONDS)
            .map(i -> feed)
            .map(feed -> fetchItems(feed))
            .subscribe(result -> System.out.println(result));

但是结果是可观察的< Observable< NewsItem>>.不可观察< NewsItem>.如何嫁给他们?

我发现marge()运算符(RX-Java doc: Marge).但是它似乎不适合用例.

在以前的版本中,我使用了CompletableFuture< List< NewsItem>> fetchNewsItems(),但我无法将其放入Observable链中.

解决方法:

不知道我是否理解这个问题,但是您不只是在寻找flatMap吗?

Observable
    .interval(timePerItem, TimeUnit.MILLISECONDS)
    .flatMap(i -> fetchItems(feed))
    .subscribe(result -> System.out.println(result));

标签:rx-java,java
来源: https://codeday.me/bug/20191121/2048746.html