编程语言
首页 > 编程语言> > RxJava异步订阅

RxJava异步订阅

作者:互联网

我有一个任务列表,应该在新线程中一个接一个地处理这些任务,然后结果应该由某个主线程显示在方法中.
但是,这似乎不起作用,在主线程中调用了flatMap方法.

在这种情况下,为什么subscriptionOn方法不处理“线程切换”?

在另一个线程中执行某些工作的更好模式是什么? (除了使用Observable.create和手动创建新线程外,这非常冗长)

List<Task> tasks = ...;
Observable.from(tasks)
          .flatMap(task -> {
                // should be handled in a new thread
                try {
                    return Observable.just(task.call());
                } catch (Exception e) {
                    log.error("Error", e);
                }

                return Observable.empty();
            })
            .subscribeOn(Schedulers.newThread())
            .observeOn(MySchedulers.main())
            .subscribe(this::show); // subscribe called from main thread

解决方法:

警告:我不是Java程序员,而是C#,所以所有奇怪的驼峰案例方法名称都让我感到迷惑不解.

如果您要为flatMap操作使用新线程,则subscriptionOn放置在错误的位置.将其插入from和flatMap之间.请参见this answer,以了解SubscribeOn和observeOn的完整说明-它是为.NET编写的,但原理是相同的.

我不熟悉Java中的任务,因此不确定您的Task是否像.NET的Task,以及task.call()是否异步并启动自己的线程-我猜不是因为您说的“. ..应该在新线程中一对一处理的任务列表”.

newThread调度程序为每个订阅者使用一个新线程-由于flatMap将进行单个预订,因此所有task.call调用都将在同一线程上进行,尽管与from运算符的线程不同.

如果task.call实际上是异步的,则结果将根据返回,但是它会引入并发性,并且与Rx的语义无关.

无论哪种方式,(正确放置的)observeOn都将导致结果传递给主线程上的this :: show.

标签:multithreading,system-reactive,rx-java,asynchronous,java
来源: https://codeday.me/bug/20191120/2046591.html