编程语言
首页 > 编程语言> > RxJava2.0 源码解析

RxJava2.0 源码解析

作者:互联网

对于一些比较主流的开源框架,其中的一些源码还是需要去看,去了解的。

以下基于个人视角,简单解读一下RxJava2.0的源码,挑一些比较常见的重要的地方进行解读。

这一段常见的rxjava使用代码。

从中可以看到一些基础的知识点。比如观察者与被观察者的绑定。

rxjava的底层是基于观察者模式实现的。

Observable是被观察者,Observer是观察者。

Observer比较简单,主要是包含了这几个事件响应方法。

而Observable则相对复杂些。

Observable继承自ObservableSource,可以看到里面只有一个方法,就是订阅subscribe。

而我选取了代码中的create方法来看,可以看到create方法是用来新建一个被观察者,之后返回。

 

这两张代码截图比较重要,第一张里告诉我们observable建立时,需要传入一个ObservableOnSubscribe参数,这个东西其实只做了一件事,就是subscribe方法里应该做什么,也就是上游对下游有哪些操作。

之后的subscribeActual方法也很重要,它真正实现了观察者与被观察者的绑定。

之后开始调用source的subscribe方法开始发送事件了。

而第二张图中的CreateEmitter可以看出是把观察者observer包了一层,里面仍然调用了其中的onNext,onComplete以及onError方法。

 

 

值得注意的是,disposable是一个接口,而CreateEmitter实现了这个接口,所以在绑定时,实际上是观察者Observer把自己绑定给了自己,调用了自己里面的disposable方法。

 

这两张图揭示了disposed的用法。看过源码后发现还是挺巧的。

其实是发射器自己维护了一个变量,这个变量如果为true,则停止观察者onNext等方法的调用。而在代码中在某些条件下设置这个值为true的正是观察者(外面包了发射器)本身。

 

接下来的这张图可谓是重点之中的重点了。

subscribeOn和observerOn分别指定了发送事件的线程以及响应事件的线程。

 

 

先看上游线程设置,这里调了一个两个参数的ObservableSubscribeOn构造方法。之后看其构造方法内的代码,可以看出持有了一个scheduler,这个东西无疑是控制线程的。另外subscribeActual方法比原来多了一项工作是对观察者设置了disposable。

而且从setDispoasble方法可以看出,这一步实际上是完成了异步线程启动发送事件。

追其源码,最终是在这里调用了run方法。而这个Worker是线程启动的关键。

 

让我们回过头来看一看rx框架新起线程的实现方式。

可以看出rxjava为我们提供了大概5种线程启动方式,有单线程,IO线程,新线程等等。

再深入看新线程启动的源码,最终发现代码中用到了这么一个东西,onInitNewThreadHandler。看到这差不多能明白了,其实线程切换最终用的还是handler的方式。但是这个handler是在哪里设置的呢。

直接追源码没有找到,那我们看一下下游响应事件的线程切换方法有没有答案。

ObserverOn方法最终调用了这里,scheduler是下游响应的线程。

我们可以看到会先判断scheduler是否是当前线程,如果是则直接订阅上游。如果不是,则开启一个worker。

 

这几张图揭示了切换线程的真正操作。

每一种切换线程的类型,都对应了一个方法。这一系列步骤以新起线程为例。

可以看出,NewThreadScheduler是一个单例,其继承了Scheduler,重写了createWorker方法。

在createWorker方法里调用了NewThreadWorker方法,里面通过参数Thread_Factory其实也就是一个线程池,最终生成了线程,开始执行任务。

由此我们可以推出,其实是不同的Scheduler实现了不同的createWorker方法。当检测到线程不统一时,就新建立线程或使用已有的线程,从而实现发送或接收事件时在目标线程。

 

 

 

 

 

 

 

 

标签:RxJava2.0,代码,调用,观察者,源码,线程,解析,方法
来源: https://blog.csdn.net/Kongou/article/details/96606068