编程语言
首页 > 编程语言> > RxJava Observable.fromEmitter奇数个反压行为

RxJava Observable.fromEmitter奇数个反压行为

作者:互联网

我一直在使用Observable.fromEmitter()作为Observable.create()的绝佳替代品.我最近遇到了一些奇怪的行为,我无法完全弄清楚为什么会这样.我真的很感谢一个对背压有一定了解的人,以及调度程序来看看这一点.

public final class EmitterTest {
  public static void main(String[] args) {
    Observable<Integer> obs = Observable.fromEmitter(emitter -> {
      for (int i = 1; i < 1000; i++) {
        if (i % 5 == 0) {
          sleep(300L);
        }

        emitter.onNext(i);
      }

      emitter.onCompleted();
    }, Emitter.BackpressureMode.LATEST);

    obs.subscribeOn(Schedulers.computation())
        .observeOn(Schedulers.computation())
        .subscribe(value -> System.out.println("Received " + value)); // Why does this get stuck at "Received 128"

    sleep(10000L);
  }

  private static void sleep(Long duration) {
    try {
      Thread.sleep(duration);
    } catch (InterruptedException e) {
      throw new RuntimeException(e);
    }
  }
}

该应用程序的输出是

Received 1
Received 2
...
Received 128

然后它仍然停留在128(可能是因为这是RxJava的默认缓冲区大小).

如果我将fromEmitter()中指定的模式更改为BackpressureMode.NONE,则代码将按预期工作.如果我删除对observeOn()的调用,它也将按预期工作.有谁能解释为什么会这样?

解决方法:

这是同一个池的死锁情况. subscriptionOn在正在使用的同一线程上调度下游请求,但是如果该线程正忙于睡眠/发射循环,则该请求将永远不会传递到fromEmitter,因此在一段时间之后LATEST开始删除元素,直到最后如果主源等待足够长的时间,则将传递最后一个值(999). (与我们删除的onBackpressureBlock相似的情况.)

如果subscribeOn没有执行此请求调度,则该示例将正常工作.

我已经打开an issue来制定解决方案.

现在的解决方法是使用更大的缓冲区大小与observeOn(有重载)或使用fromEmitter(f,NONE).subscribeOn().onBackpressureLatest().observeOn()

标签:rx-java,java
来源: https://codeday.me/bug/20191118/2025161.html