RxJava源码解析
作者:互联网
转自:https://blog.csdn.net/sted_zxz/article/details/82317400
本文基于RxJava2.2.1版本分析。
简介
官方介绍:
RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.
It extends the observer pattern to support sequences of data/events and adds operators that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronization, thread-safety and concurrent data structures.
RxJava是一个处理异步操作的库、基于事件流的库。
其核心的东西不外乎两个, Observable(被观察者)和 Observer(观察者)。当观察者订阅了被观察者之后,被观察者可以发出一系列的事件(例如网络请求、复杂计算、数据库操作、文件读取等),事件执行结束后将结果交给观察者处理。
create
create()是创建Observable对象的方法之一,还有其他诸多的创建方法,这里主要了解下Observable的创建流程:
// 创建被观察者
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onComplete();
}
});
进入create():
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
// requireNonNull()很多地方会用到,顾名思义就是用来判空的
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
```
进入onAssembly():
```java
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
// 这里hook
return apply(f, source);
}
return source;
}
onAssembly()主要是用来hook的,默认使用中都没有hook,返回的就是入参ObservableCreate。我们来看一下ObservableCreate:
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
这里我们看到ObservableCreate将最初我们创建的ObservableOnSubscribe包装了一层,用成员变量source记住。
至此,创建流程结束,我们得到了Observable< T >对象,其实就是ObservableCreate< T >.
subscribe
承接上文的示例代码:
// 创建观察者
Observer<Integer> observer = new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "subscribe");
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "" + value);
}
@Override
public void one rror(Throwable e) {
Log.d(TAG, "error");
}
@Override
public void onComplete() {
Log.d(TAG, "complete");
}
};
// 观察者和被观察者建立连接
observable.subscribe(observer);
进入subscribe():
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call one rror because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
这里的关键代码只有一句subscribeActual(observer);,被观察者被订阅时真正被执行的方法:
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
// 这里首先调用了observer的onSubscribe()
observer.onSubscribe(parent);
try {
// 然后调用了source的subscribe()
// 这里的sourec就是之前保存的ObservableCreate
// subscribe()就是我们自己一开始实现的方法
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
CreateEmitter是啥,它是一个事件的发射器。当我们在ObservableCreate的subscribe()中调用CreateEmitter的onNext()、onError()或者onComplete()方法时,它会去调用observer的onNext()、onError()或者onComplete()方法。
从CreateEmitter parent = new CreateEmitter(observer);一句中,大概可以猜到,CreateEmitter持有了observer,确实如此:
@Override
public void onNext(T t) {
if (t == null) {
one rror(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
@Override
public void one rror(Throwable t) {
if (!tryOnError(t)) {
RxJavaPlugins.onError(t);
}
}
@Override
public boolean tryOnError(Throwable t) {
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
if (!isDisposed()) {
try {
observer.onError(t);
} finally {
dispose();
}
return true;
}
return false;
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}
我们可以看到CreateEmitter的onNext()等方法被调用时,其中会调用对应的observer的onNext()等方法。
以上是最简单的流程,那在其中加入一个操作符会怎么样呢?我们来看一下map是做什么的。
map
map起到类型转换的作用,例子如下,源头observable发送的是String类型的数字,利用map转换成int型,最终在终点observer接受到的也是int类型数据:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("1");
e.onComplete();
}
}).map(new Function<String, Integer>() {
@Override
public Integer apply(String s) throws Exception {
return Integer.parseInt(s);
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe() called with: d = [" + d + "]");
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "onNext() called with: value = [" + value + "]");
}
@Override
public void one rror(Throwable e) {
Log.d(TAG, "onError() called with: e = [" + e + "]");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete() called");
}
});
来看它的源码:
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
抛开onAssembly(),好像只是创建了一个ObservableMap并且返回:
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
// 将传入的observable记住
super(source);
// 将变换的方法记住
this.function = function;
}
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
这里和之前看到的ObservableCreate如出一辙,将上一个observable包装了起来。
然后当subscribeActual()执行的时候,去调用source的subscribe(),这里的source指的是上游的observable。并且创建了一个新的observer,MapObserver将下游的observer包装了一层。
如果多次map()的话,我们可以将函数调用的流程分为三步:
首先自上而下将最初的observable包装了一层又一层,不断创建新的observable并且返回,最下游得到一个层层包装的observable。
当observable.subscribe(observer);之后,就反过来从下游不断回溯,调用上游observable的subscribe(),而subscribe()的入参是一个observer,这个observer是将下游传递过来的observer包装过后的新的observer,最终最上游得到一个层层包装之后的observer。
当回溯到最上游的时候source是一个ObservableCreate,调用它的subscribe()时,会调用对应CreateEmitter的方法,CreateEmitter进而调用对应的observer的方法,它持有的这个observer是一个层层包装之后的observer,然后又开始自上而下层层拆包,不断调用下游observer的onNext()等方法。
假设当前的observer是MapObserver,在它的onNext()被调用时会调用它的变换方法,然后继续调用下游observer的onNext(),如下:
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
downstream.onNext(null);
return;
}
U v;
try {
// 调用变换方法mapper.apply(t)
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
// 调用下游observer的onNext()
downstream.onNext(v);
}
最终调用最后一个observer的onNext()。
subscribeOn
用来指定subscribe()发生的线程的:
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
这里返回了一个ObservableSubscribeOn:
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> observer) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
observer.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
这里做法其实和以上的提到过的ObservableCreate和ObservableMap中的是一样的,包装了上游的observable,并且额外记录了subscribe()的线程调度器。
当subscribeActual()被执行的时候,会切换到线程调度器所指定的线程。
那么如果subscribeOn(Schedulers.xxx())切换线程N次,总是以第一次为准,或者说离源observable最近的那次为准,并且对其上面的代码生效。
observeOn
指定onNext()等方法执行的线程:
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
还是似曾相识的场景,创建了一个ObservableObserveOn:
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
自然少不了对原来observable的包装,同时记录了线程调度器。
当从上游push消息过来的时候,我们会看到ObservableObserveOn会切换到所记录的线程调度器指定的线程:
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
以上是RxJava2最基本方法的一些源码分析。
链式调用原理
我们可以将函数调用的流程分为三步:
首先自上而下将最初的observable包装了一层又一层,不断创建新的observable并且返回,最下游得到一个层层包装的observable。
当observable.subscribe(observer);之后,就反过来从下游不断回溯,调用上游observable的subscribe(),而subscribe()的入参是一个observer,这个observer是将下游传递过来的observer包装过后的新的observer,最终最上游得到一个层层包装之后的observer。
当回溯到最上游的时候source是一个ObservableCreate,调用它的subscribe()时,会调用对应CreateEmitter的方法,CreateEmitter进而调用对应的observer的方法,它持有的这个observer是一个层层包装之后的observer,然后又开始自上而下层层拆包,不断调用下游observer的onNext()等方法。
标签:observer,subscribe,onNext,source,源码,void,RxJava,解析,public 来源: https://blog.csdn.net/m0_37698652/article/details/101030534