编程语言
首页 > 编程语言> > 使用多线程RxJava进行反应式拉取

使用多线程RxJava进行反应式拉取

作者:互联网

我正在尝试在RxJava中构建反应式拉式观察器.

我的观察者是这样的:

Observable<Command> myObs = Observable.create(s -> {
   Command command;
   int i = 0;
   do {
      command = NetworkOperation1.call(i);
      logger.info("Init command " + i);
      s.onNext(command);
      i++;
   } while (!command.isLast() && i < MAX);
   s.onCompleted();
});

我想以4个并发批处理(缓冲区)进行处理,如下所示:

myObs
    .buffer(10)
    .flatMap(batch -> {
          return Observable
                   .from(batch)
                   .subscribeOn(Schedulers.io())
                   .map(c -> {
                       Intermediate m = NetworkOperation2.call(c));
                       logger.info("Done intermediate " + m.id);
                       return m;
                   }
          }, 4);

然后,我需要以不同的大小批处理结果,如下所示:

    .buffer(25)
    .subscribeOn(Schedulers.newThread())
    .subscribe(list ->
         logger.info("Finished batch with " + list.size());

问题在于Observable中的命令是一次全部处理的,而我希望它们根据需要进行处理.

这是发生的情况的日志:(请注意,同时运行所有1000条命令,而不是根据需要调用)

Init command 0
Init command 1
Init command 2
...
Init command 999
Done intermediate 0
Done intermediate 1
...
Done intermediate 24
Finished batch with 25
Done intermediate 25
Done intermediate 26
...
Done intermediate 49
Finished batch with 25
...

问题:是否有一种方法可以暂停观察者的线程,以便它不会一次发出所有命令或类似的东西?我试过了request()运算符,但无法正常工作.

谢谢.

解决方法:

您需要背压感知源和运算符.您正在使用的运算符支持背压,但您的源不支持.

改为这样做:

myObs = Observable.range(1,1000)
    .map(i -> NetworkOperation1.call(i));

Observable.range支持背压,因此仅在需要时才会发出.

标签:java-8,multithreading,rx-java,reactive-programming,java
来源: https://codeday.me/bug/20191028/1948949.html