reactive
作者:互联网
从CompletableFuture到Reactor编程
通过 CompletableFuture 和 Lambda 表达式,可以快速实现轻量业务异步封装与编排,与 Callback 相比可以避免方法多层嵌套问题,但面对相对复杂业务逻辑时仍存在以下局限:
-
难以简单优雅实现多异步任务编排;
-
难以处理实时流式场景;
-
难以支持高级异常处理;
-
不支持任务延迟执行。
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> "hello" ); // f2依赖f1的结果做转换 CompletableFuture<String> f2 = f1.thenApplyAsync(t -> t + " world" ); System.out.println("异步结果:" + f2.get());
相比 Future,基于 Reactive 模型丰富的操作符组合(filter/map/flatMap/zip/onErrorResume 等高阶函数)代码清晰易读,搭配 Lamda 可以轻松实现复杂业务场景任务编排。
Reactor模式在IO读写数据时还是在同一个线程中实现的,即使使用多个Reactor机制的情况下,那些共享一个Reactor的Channel如果出现一个长时间的数据读写,会影响这个Reactor中其他Channel的相应时间,比如在大文件传输时,IO操作就会影响其他Client的相应时间,因而对这种操作,使用传统的Thread-Per-Connection或许是一个更好的选择,或则此时使用Proactor模式。
在 Reactor 中,执行模式以及执行过程取决于所使用的 Scheduler
。
默认情况下, Reactor提供三种核心调度程序接口实现:
1. SingleScheduler:能为一个专用工作单元安排所有可能的任务,具有时间性,因此可以延迟安排定期事件。此调度程序可以使用Scheduler.single()进行引用。
2, ParallelScheduler: 适用于固定大小的工作单元池(默认情况下,收CPU内核数量限制)适合CPU密集任务,Scheduler.parallel()进行引用。
3. ElasticScheduler:可以动态创建工作单元池并缓存线程池,由于其所创建的线程池没有最大数量限制,因此此调度程序非常适合用于IO密集操作的调度,Scheduler.elastic()进行引用。
可以使用 Schedulers.fromExecutorService(ExecutorService)
基于现有的 ExecutorService
创建 Scheduler
。
Scheduler.parallel()
创建一个基于单线程 ExecutorService
的固定大小的任务线程池。 因为可能会有一个或两个线程导致问题,它总是至少创建 4 个线程。然后 publishOn 方法便共享了这些任务线程, 当 publishOn
请求元素的时候,会从任一个正在发出元素的线程那里获取元素。这样, 就是进行了任务共享(一种资源共享方式)。
Scheduler.elastic()
也能创建线程,它能够很方便地创建专门的线程(以便跑一些可能会阻塞资源的任务, 比如一个同步服务),请见 如何包装一个同步阻塞的调用?。
Reactive编程核心是背压,即nothing happens until you subscribe,在使用Reactor构建反应式流时,数据会从发布者流向订阅者,同时也会从订阅者向上传播到发布者
https://projectreactor.io/docs/core/release/reference/
http://htmlpreview.github.io/?https://github.com/get-set/reactor-core/blob/master-zh/src/docs/index.html
标签:Reactor,创建,reactive,任务,线程,Scheduler,CompletableFuture 来源: https://www.cnblogs.com/BaymaxHH/p/16350331.html