RxJava异步订阅
作者:互联网
我有一个任务列表,应该在新线程中一个接一个地处理这些任务,然后结果应该由某个主线程显示在方法中.
但是,这似乎不起作用,在主线程中调用了flatMap方法.
在这种情况下,为什么subscriptionOn方法不处理“线程切换”?
在另一个线程中执行某些工作的更好模式是什么? (除了使用Observable.create和手动创建新线程外,这非常冗长)
List<Task> tasks = ...;
Observable.from(tasks)
.flatMap(task -> {
// should be handled in a new thread
try {
return Observable.just(task.call());
} catch (Exception e) {
log.error("Error", e);
}
return Observable.empty();
})
.subscribeOn(Schedulers.newThread())
.observeOn(MySchedulers.main())
.subscribe(this::show); // subscribe called from main thread
解决方法:
警告:我不是Java程序员,而是C#,所以所有奇怪的驼峰案例方法名称都让我感到迷惑不解.
如果您要为flatMap操作使用新线程,则subscriptionOn放置在错误的位置.将其插入from和flatMap之间.请参见this answer,以了解SubscribeOn和observeOn的完整说明-它是为.NET编写的,但原理是相同的.
我不熟悉Java中的任务,因此不确定您的Task是否像.NET的Task,以及task.call()是否异步并启动自己的线程-我猜不是因为您说的“. ..应该在新线程中一对一处理的任务列表”.
newThread调度程序为每个订阅者使用一个新线程-由于flatMap将进行单个预订,因此所有task.call调用都将在同一线程上进行,尽管与from运算符的线程不同.
如果task.call实际上是异步的,则结果将根据返回,但是它会引入并发性,并且与Rx的语义无关.
无论哪种方式,(正确放置的)observeOn都将导致结果传递给主线程上的this :: show.
标签:multithreading,system-reactive,rx-java,asynchronous,java 来源: https://codeday.me/bug/20191120/2046591.html