Reactive Spring实战 -- 理解Reactor的设计与实现
作者:互联网
转:
Reactive Spring实战 -- 理解Reactor的设计与实现
Reactor是Spring提供的非阻塞式响应式编程框架,实现了Reactive Streams规范。 它提供了可组合的异步序列API,例如Flux(用于[N]个元素)和Mono(用于[0 | 1]个元素)。
Reactor Netty项目还支持非阻塞式网络通信,非常适用于微服务架构,为HTTP(包括Websockets),TCP和UDP提供了响应式编程基础。
本文通过例子展示和源码阅读,分析Reactor中核心设计与实现机制。
文本Reactor源码基于Reactor 3.3
名词解析
响应式编程,维基百科解析为
reactive programming is an asynchronous programming paradigm concerned with data streams and the propagation of change. This means that it becomes possible to express static (e.g. arrays) or dynamic (e.g. event emitters) data streams with ease via the employed programming language(s)
响应式编程是一个专注于数据流和变化传递的异步编程范式。 这意味着使用编程语言可以很容易地表示静态(例如数组)或动态(例如事件发射器)数据流。
下面简单解释一下相关名词。
数据流与变化传递,我的理解,数据流就如同一条车间流水线,数据在上面传递,经过不同的操作台(我们定义的操作方法),可以被观测,被过滤,被调整,或者与另外一条数据流合并为一条新的流,而操作台对数据做的改变会一直向下传递给其他操作台。
java 8 lambda表达式就是一种数据流形式
lists.stream().filter(i -> i%2==0).sorted().forEach(handler);
lists.stream(),构建一个数据流,负责生产数据。
filter,sorted方法以及handler匿名类,都可以视为操作台,他们负责处 理数据。
这里还涉及两个概念
声明式编程,通过表达式直接告诉计算机我们要的结果,具体操作由底层实现,我们并不关心,如sql,html,spring spel。
对应的命令式编程,一步一步告诉计算机先做什么再做什么。我们平时编写java,c等代码就是命令式编程。
上例中通过filter,sorted等方法直接告诉计算机(Spring)执行过滤,排序操作,可以理解为声明式编程。
注意,我的理解是,声明式,命令式编程并没有明确的界限。
越是可以直接通过声明表达我们要什么,就越接近声明式编程,反之,越是需要我们编写操作过程的,就越接近命令式编程。
如Spring中的声明式事务和编程式事务。
可参考:https://www.zhihu.com/question/22285830
函数式编程,就是将函数当做一个数据类型,函数作为参数,返回值,属性。
Java不支持该模式,通过匿名类实现,如上例中forEach方法。
注意,函数式编程还有很多学术性,专业性的概念,感兴趣的同学可以自行了解。
响应式编程,主要是在上面概念加了异步支持。
这个异步支持非常有用,它可以跟Netty这些基于事件模型的异步网络框架很好地结合,下一篇文章我们通过WebFlux来说明这一点。
数据流转
下面我们来简单看一下Reactor的设计与实现吧。
首先通过一个小用例,来看一个Reactor中如何生产数据,又如何传递给订阅者。
@Test
public void range() {
// [1]
Flux flux = Flux.range(1, 10);
// [2]
Subscriber subscriber = new BaseSubscriber() {
protected void hookOnNext(Integer value) {
System.out.println(Thread.currentThread().getName() + " -> " + value);
request(1);
}
};
// [3]
flux.subscribe(subscriber);
}
Reactor中,发布者Publisher负责生产数据,有两种发布者,Flux可以生产N个数据,Mono可以生产0~1个数据。
订阅者Subscriber负责处理,消费数据。
1
构建一个发布者Flux
注意,这时发布者还没开始生产数据。
2
构建一个订阅者Subscriber
3
创建订阅关系,这时,生产者开始生产数据,并传递给订阅者。
Flux.range,fromArray等静态方法都会返回一个Flux子类,如FluxRange,FluxArray。
Publisher#subscribe,该方法很重要,它负责创建发布者与订阅者的订阅关系。
Flux#subscribe
public final void subscribe(Subscriber actual) {
CorePublisher publisher = Operators.onLastAssembly(this);
CoreSubscriber subscriber = Operators.toCoreSubscriber(actual);
try {
...
publisher.subscribe(subscriber);
}
catch (Throwable e) {
Operators.reportThrowInSubscribe(subscriber, e);
return;
}
}
获取内部的CorePublisher,CoreSubscriber。
Flux子类都是一个CorePublisher。
我们编写的订阅者,都会转化为一个CoreSubscriber。
CorePublisher也有一个内部的subscribe方法,由Flux子类实现。
FluxRange#subscribe
public void subscribe(CoreSubscriber actual) {
...
actual.onSubscribe(new RangeSubscription(actual, st, en));
}
Subscription代表了发布者与订阅者之间的一个订阅关系,由Publisher端实现。
Flux子类subscribe方法中通常会使用CoreSubscriber创建为Subscription,并调用订阅者的onSubscribe方法,这时订阅关系已完成。
下面来看一下Subscriber端的onSubscribe方法
BaseSubscriber#onSubscribe -> hookOnSubscribe
protected void hookOnSubscribe(Subscription subscription) {
subscription.request(9223372036854775807L);
}
Subscription#request由Publisher端实现,也是核心方法,订阅者通过该方法向发布者拉取特定数量的数据。
注意,这时发布者才开始生产数据。
RangeSubscription#request -> RangeSubscription#slowPath -> Subscriber#onNext
void slowPath(long n) {
Subscriber a = this.actual;
long f = this.end;
long e = 0L;
long i = this.index;
while(!this.cancelled) {
// [1]
while(e != n && i != f) {
a.onNext((int)i);
if (this.cancelled) {
return;
}
++e;
++i;
}
...
}
}
1
RangeSubscription负责生产指定范围内的整数,并调用Subscriber#onNext将数据推送到订阅者。
可以看到,
Publisher#subscribe完成订阅操作,生成Subscription订阅关系,并触发订阅者钩子方法onSubscribe。
订阅者的onSubscribe方法中,订阅者开始调用Subscription#request请求数据,这时发布者才开始生产数据,并将数据推给订阅者。
操作符方法
跟java 8 lambda表达式一样,Reactor提供了很多的声明式方法,这些方法类似于操作符,直接操作数据(下文称为操作符方法)。
合理利用这些方法,可以大量简化我们的工作。
数据处理,如skip,distinct,sort,filter
钩子方法,如doOnNext,doOnSuccess
组合操作,flatMap,zipWhen
阻塞等待,blockLast
流量控制,limitRate
数据缓存,buffer,cache
可参考官方文档:https://projectreactor.io/docs/core/release/reference/#which-operator
注意,这些操作符方法虽然是添加到Publisher端,但Reactor会将逻辑会转移到Subscriber端。
看一个简单例子
Flux.range(1, 3)
.doOnNext(i -> {
System.out.println(Thread.currentThread().getName() + " doOnNext:" + i);
})
.skip(1)
.subscribe(myHandler);
myHandler即我们实现的Subscriber。
每调用一次操作符方法,Flux都会生成一个新的Flux子类(装饰模式),最后Flux类为FluxSkip[FluxPeek[FluxRange]]
。
我们来看一下完整的Flux#subscribe方法代码
public final void subscribe(Subscriber actual) {
CorePublisher publisher = Operators.onLastAssembly(this);
CoreSubscriber subscriber = Operators.toCoreSubscriber(actual);
try {
// [1]
if (publisher instanceof OptimizableOperator) {
OptimizableOperator operator = (OptimizableOperator)publisher;
while(true) {
// [2]
subscriber = operator.subscribeOrReturn(subscriber);
if (subscriber == null) {
return;
}
// [3]
OptimizableOperator newSource = operator.nextOptimizableSource();
if (newSource == null) {
publisher = operator.source();
break;
}
operator = newSource;
}
}
// [4]
publisher.subscribe(subscriber);
} catch (Throwable var6) {
Operators.reportThrowInSubscribe(subscriber, var6);
}
}
1
判断Flux是否由操作符方法产生。
2
OptimizableOperator#subscribeOrReturn会生成新的Subscriber,以执行操作符逻辑。如上面例子中,FluxPeek会生成PeekSubscriber,FluxSkip生成SkipSubscriber。这里将操作符逻辑转移到Subscriber端。
OptimizableOperator#subscribeOrReturn也可以直接调用被装饰Publisher的subscribe方法,从而改变流程。如下面说的FluxSubscribeOn。
3
取出上一层被装饰的Publisher作为新的Publisher,如上例的FluxSkip[FluxPeek[FluxRange]]
,会依次取出FluxPeek,FluxRange。
这个操作一直执行,直到取出真正生产数据的Publisher。
4
使用真正生产数据的Publisher,和最后包装好的Subscriber,再调用subscribe方法。
上面例子中,流程如下
push/pull
Reactor提供了push和pull两种模式。
先看一下pull模式
Flux.generate(sink -> {
int k = (int) (Math.random()*10);
if(k > 8)
sink.complete();
sink.next(k);
})
.subscribe(i -> {
System.out.println("receive:" + i);
});
Sink可以理解为数据池,负责存储数据,根据功能不同划分,如IgnoreSink,BufferAsyncSink,LatestAsyncSink。
Sink#next会将数据放入池中,由Sink缓存或直接发送给订阅者。
Flux#generate(Consumer<synchronoussink
标签:订阅,Reactor,Flux,--,Spring,编程,Subscriber,subscribe 来源: https://www.cnblogs.com/wangtcc/p/14630431.html