编程语言
首页 > 编程语言> > rxjava2的使用(1)

rxjava2的使用(1)

作者:互联网

Rxjava 的使用

  github地址:https://github.com/ReactiveX/RxJava

参考自:

https://mcxiaoke.gitbooks.io/rxdocs/content/

https://blog.csdn.net/weixin_36709064/article/details/82919785

https://www.jianshu.com/p/25682d620320

https://blog.csdn.net/jdsjlzx/article/details/54845517

rxjava :一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库

了解 ReactiveX :

ReactiveX 是一个专注于异步编程与控制可观察数据(或者事件)流的API。它组合了观察者模式,迭代器模式和函数式编程的优秀思想,RxJava 是 ReactiveX 在 Java 上的开源的实现。Observable(观察者) 和 Subscriber(订阅者)是两个主要的类。


 

http://gank.io/post/560e15be2dca930e00da1083

目录

Rxjava 的使用

一、操作符

1、创建 被观察者 的操作符

2、功能型的操作符(依赖于被观察者)

1、转换类操作符 (流操作符,作用于被观察者)

2、合并类操作符(流操作符)

3、过滤类操作符(流操作符)

二、线程控制Scheduler

 Scheduler :   

内置的Scheduler

默认调度器

 

三、重要的方法和接口

1、重要的接口

 1)ObservableOnSubscribe:

2)ObservableEmitter:

3)Observer

4)Consumer

5)Disposable

6)Function系列的接口

4、具体使用注意事项:

 


一、操作符

1、创建 被观察者 的操作符

Observable 通过静态方法创建被观察者

1、  Observable.create()    :  最常用的操作符,用于创建一个具有发射事件能力的被观察者 

EP:
Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("Hello");
                emitter.onNext("World");
                emitter.onComplete();
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.e("TAG", s);
            }
        });

 2、Observable.just();只是简单的原样发射数据(接收的是什么类型的数据,发射的就是什么类型的数据)

String[] arr = new String[]{"ss1", "ss2", "ss3"};

   Observable.just(arr).subscribe(new Consumer<String[]>() {
              @Override
               public void accept(String[] s) throws Exception {
                 Log.e("TAG", "" + s.length);
              }
                });

//--------------------------------------------------

Observable.just("1","2","3","4")
        .subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                
            }
        });   

 

   3、Observable.from(): from系列可以将数组或Iterable的元素拿出来,做成多个事件进行发射

   Observable.fromArray(arr);//fromArray里面放数组 (将数组元素一个个发射) 
  Observable.fromIterable(stringList)//里面放集合(将集合元素一个个发射)


  //--注意:int常量类型的数组要转换成Interger对象,否则fromArray会将数组当作一个数组对象数据处理,

ep:

//--它将int常量类型的数组当作一个数组对象发射了,没有进行拆分
int[] ints={1,2,3,4,5};

Observable.fromArray(ints)
        .subscribe(new Consumer<int[]>() {
            @Override
            public void accept(int[] ints) throws Exception {
                
            }
        })

 

4、Observable.fromCallable():该方法的参数是一个回调方法,将该回调方法的返回值装入事件进行发送

EP:

Observable.fromCallable(new Callable<ArrayList<String>>() {

    @Override
    public ArrayList<String> call() throws Exception {
        ArrayList<String> aa=new ArrayList<>();
        for(int i=0;i<5;i++){
            aa.add("这是发送的数据");
        }
        return aa;
    }
});

5、Observable.interval()按照固定时间间隔发送(无线循环)

ep: 每隔500毫秒执行一次
Observable.interval(500, TimeUnit.MILLISECONDS,  Schedulers.trampoline())
        .subscribe(new Consumer<Long>() {
            @Override
            public void accept(Long aLong) throws Exception {
                System.out.println("当前:"+aLong);

            }
        });
//--period--间隔时间,unit--时间单位,scheduler--线程调度器
interval(long period, TimeUnit unit, Scheduler scheduler) 
//--period--间隔时间,unit--时间单位
interval(long period, TimeUnit unit)
//initialDelay--初始开始执行的时间,--period--间隔时间,unit--时间单位,scheduler--线程调度器
interval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler) 

6、Observable.intervalRange(); 按照固定时间间隔发送(循环有限次数)

EP:

//从5开始循环,循环15次,从0秒开始执行,每次间隔500毫秒。
Observable.intervalRange(5,15,0,500,TimeUnit.MILLISECONDS,Schedulers.trampoline())
        .subscribe(new Consumer<Long>() {
            @Override
            public void accept(Long aLong) throws Exception {
                System.out.println("当前:"+aLong);
            }
        });
//start--开始循环范围,count--循环次数,initialDelay--开始时间,period--间隔时间
intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit, Scheduler scheduler)

2、功能型的操作符(依赖于被观察者)

1、转换类操作符 (流操作符,作用于被观察者)

转化类操作符,可以将一个被观察者通过某种方法,转化为另外的被观察者! 变换的操作符有map,flatMap,concatMap,switchMap,buffer,groupBy。

1)Observable.map:map的作用是将每一个发射的事件,应用于一个函数,从而可以对事件做相应处理,事件数量不会变化.一对一的关系,map实质就是拦截这个被观察者对象,然后进行处理,转换成另一个被观察者对象,订阅者收到的对象就是转换后的对象

Observable.just("this is test")
        .map(new Function<String, String>() {

            @Override
            public String apply(String s) throws Exception {
                String tt="这是map后的结果"+s;
                return tt;
            }
        })
        .subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                

            }
        });

 

2)Observable.flatMap():flatMap 操作符使用一个指定的函数对原始Observable发射的每一项数据执行变换操作,这个函数返回一个本身也发射数据的Observable,然后FlatMap合并这些Observables发射的数据,最后将合并后的结果当做它自己的数据序列发射。(合并的数据不保证数据的顺序)

*注意: 转换类操作符都是借用function系列方法进行变换操作,flatmap 的不同点是function 函数的输出对象也是一个Observable.      

ep:

ArrayList<Student> students=new ArrayList<>();
        //--发射学生集合数据,得到每个学生
Observable.fromIterable(students) //--fromIterable 作用是拆分集合 
         //--- 将学生对象转换为课程对象
        .flatMap(new Function<Student, ObservableSource<Course>>() {
            @Override
            public ObservableSource<Course> apply(Student student) throws Exception {
                //--发射每个学生的每门课程
                return Observable.fromIterable(student.getCourses());
            }
        })
        .subscribe(new Consumer<Course>() { //--接收到的是所有学生的所有课程(平铺了数据)
            @Override
            public void accept(Course course) throws Exception {

            }
        });

//---.flatMap 的特点就是使用的Function系列的转换函数,里面的输出对象 是一个本身也发射数据的Observable

flatmap的使用特点:

1、他可以简单遍历二位数据,处理象数组或者是集合这类观察者对象(就是可以进行二次拆分数据)
          https://www.jianshu.com/p/0524d7914429
           2、可解决网络请求嵌套的问题  (可以进行多层次发射数据---因为flatmap的输出对象是一个本身也发射数据的Observable)
           https://blog.csdn.net/jdsjlzx/article/details/51493552

                                                                                                                                                          

 3)Observable.concatMap();与上面的flatMap作用基本一样,与flatMap唯一不同的是concat能保证Observer接收到Observable集合发送事件的顺序。

 

4)Observable.buffer();将发射数据分成若干段,然后把每一段发射数据组成一个新的Observable,然后发射这个新的observable

EP:
    按照每段2位数,发射数据
Observable.just(1,2,3,4,5)
        .buffer(2)
        .subscribe(new Consumer<List<Integer>>() {
            @Override
            public void accept(List<Integer> integers) throws Exception {
                for (Integer integer : integers) {
                    System.out.println("buffer:"+String.valueOf(integer));
                }
                System.out.println("============================");
            }
        })   
输出结果:
buffer:1
buffer:2
buffer:3
============================
buffer:4
buffer:5
============================

   //-------------------------------------------------------------------
  按照每段3位数,发射数据,新开始的数据跳过前面2位数
   Observable.just(1,2,3,4,5)
        .buffer(3,2)
        .subscribe(new Consumer<List<Integer>>() {
            @Override
            public void accept(List<Integer> integers) throws Exception {
                for (Integer integer : integers) {
                    System.out.println("buffer:"+String.valueOf(integer));
                }
                System.out.println("============================");
            }
        })
;
输出结果:

buffer:1
buffer:2
buffer:3
============================
buffer:3
buffer:4
buffer:5
============================
buffer:5
============================

2、合并类操作符(流操作符)

合并类操作符的作用就是将多个Observable 合并 做为自己的Observable ,发射事件。 

包含:concatMap(),concat(), merge(), mergeArray    (),concateArray(),reduce(),collect(),startWith(),zip(),count()。

1)Observable.combineLatest:会合并 被合并的每一个Observable  的最新值

EP:

Observable ob1= Observable.just("1","2","3","4","5");
Observable ob2= Observable.just("6","7","8","9","10");

Observable.combineLatest(ob1, ob2, new BiFunction<String, String, String>() {
    @Override
    public String apply(String s, String s2) throws Exception {
        System.out.println("当前的值:"+s+"----"+s2);
        return s+s2;
    }
})
.subscribe(new Consumer<String>() {
    @Override
    public void accept(String ss) throws Exception {

    }
})
;

输出结果:

当前的值:5----6
当前的值:5----7
当前的值:5----8
当前的值:5----9
当前的值:5----10

 2)Observable.zip():使用一个函数组合多个Observable发射的数据集合,然后再发射这个结果。如果多个Observable发射的数据量不一样,则以最少的Observable为标准进行压合!

Observable<Integer>  observable1=Observable.just(1,2,3,4);
Observable<Integer>  observable2=Observable.just(1,2);
Observable<String>  observable3=Observable.just("s1", "s2", "s3");
Observable.zip(observable1, observable2, observable3, new Function3<Integer, Integer, String, String>() {
    @Override
    public String apply(Integer integer, Integer integer2, String s) throws Exception {
        return integer.intValue()+"---" + integer2.intValue() +"----"+ s;
    }
}).subscribe(s -> System.out.println(s));

输出结果:

1---1----s1
2---2----s2

3、Observable.merge | Observable.mergeWith:将两个Observable合并成一个Observable,发射数据,数据没有进行任何变化

EP:

Observable<Integer>  observable1=Observable.just(1,2,3,4);
Observable<Integer>  observable2=Observable.just(9,10);
Observable.merge(observable1,observable2)
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                System.out.println(integer.intValue());
            }
        });

observable1.mergeWith(observable2).subscribe(new Consumer<Integer>() {
    @Override
    public void accept(Integer integer) throws Exception {
 System.out.println(integer.intValue());
    }
});

输出结果:上面的merge()和mergeWith()输出结果一样

1
2
3
4
9
10

4、Observable.startWith | Observable.startWithArray():在Observable开始发射他们的数据之前,startWith()通过传递一个参数来先发射一个数据序列,startWithArray通过传递一个数组先发射数组里面的数据。最后发射Observable的数据。

EP:

Observable.just("10","11")
        .startWithArray("1","2")
        .startWith("5")
        .subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                System.out.println(s);
            }
        });

输出结果: 

5
1
2
10
11

3、过滤类操作符(流操作符)

过滤类操作符,按照某个条件过滤掉不符合该条件的的发射事件,下游就会接收不到被过滤的事件。

1)Observable.distinct() : 过滤掉重复的事件 

EP:

Observable.just(1,1,2,1,3,2)
        .distinct()
        .subscribe(integer -> System.out.println(integer.intValue()));

输出结果:
1
2
3

2)Observable.filter():设置过滤条件,过滤掉不符合条件的。 

EP:

Observable.just(1,1,2,1,3,2)
        .filter(new Predicate<Integer>() {
            @Override
            public boolean test(Integer integer) throws Exception {
                return integer.intValue()==2;
            }
        })
        .subscribe(integer -> System.out.println(integer.intValue()));

输出结果:

2
2

3)Observable.skip():跳过前n项数据,或者是跳过开始的某一段时间的数据

 EP:

1、跳过前3项的数据

Observable.just(1,2,3,4,5,6,7)
        .skip(3)
        .subscribe(integer -> System.out.println(integer.intValue()));

输出结果:

4
5
6
7

2、跳过开始某一段时间的数据

Observable.interval(500,TimeUnit.MILLISECONDS,Schedulers.trampoline()) //--每500毫秒执行一次,无线循环        .skip(3,TimeUnit.SECONDS) //跳过前3s的数据
.subscribe(aLong -> System.out.println(aLong));

输出结果:

5
6
7
8
9

....

 

 4)Observable.take() / takeUntil() / takeWhile()  / takelast(): 

Observable.take():take方法传递一个long型参数,表示执行的次数--也就是保留前n项数据(功能与skip相反)

EP:

Observable.interval(500,TimeUnit.MILLISECONDS,Schedulers.trampoline())
        .take(5)
        .subscribe(aLong -> System.out.println(aLong));

结果:

0
1
2
3
4

Observable.takeUntil():当发返回值为true的时候终止 (包含临界条件)

Observable.interval(500,TimeUnit.MILLISECONDS,Schedulers.trampoline())
        .takeUntil(new Predicate<Long>() {
            @Override
            public boolean test(Long aLong) throws Exception {
                return aLong==3;
            }
        })
        .subscribe(aLong -> System.out.println(aLong));

结果:

0
1
2
3

Observable.takeWhile():当 返回值是false 的时候终止(并且不包含临界条件的item ) 

Observable.interval(500,TimeUnit.MILLISECONDS,Schedulers.trampoline())
        .takeWhile(new Predicate<Long>() {
            @Override
            public boolean test(Long aLong) throws Exception {
                return aLong!=3;
            }
        })
        .subscribe(aLong -> System.out.println(aLong));

结果:

0
1
2

Observable.takelast():传递的参数是long型,发射最后n项数据(与take()相反)

Observable.just("1","2","3","4","5","6","7")
        .takeLast(3)
        .subscribe(aLong -> System.out.println(aLong));

结果:

5
6
7

二、线程控制Scheduler

 Scheduler :   

Scheduler :RxJava 通过线程调度器来指定每一段代码应该运行在什么样的线程,利用 subscribeOn() 结合 observeOn() 来实现线程控制,让事件的产生和消费发生在不同的线程。

创建类操作符 ----事件的产生,由subscribeOn来控制运行在哪种线程里。subscribeOn不论出于何种位置,都只会执行一次。

非创建类操作符---事件的消费 ,由observeOn来控制 运行在哪种线程里。可以通过多次设定observeOn,来指定不同的代码运行在不同的线程里。

observeOn 切换线程原理:observeOn 切换的是observeOn之后执行的代码,如果后面没有observeOn进行再次切换的话,后面的所有代码都运行在首个observeOn切换的线程里。

subscribeOn 主要控制事件发射运行的线程(默认是在主线程),subscribeOn 之所以只起一次作用,因为它作用于事件发射(被观察者)。--发射事件后不可能再次发射这次事件,所以只有一次起作用

 

EP:

 

Observable.just(1, 2, 3, 4) 
        .subscribeOn(Schedulers.io()) //指定创建类操作符(just)运行线程
        .observeOn(Schedulers.newThread()) //指定map(mapOperator)运行线程
        .map(mapOperator) 
        .observeOn(Schedulers.io())//指定map(mapOperator2)运行线程
        .map(mapOperator2) 
        .observeOn(AndroidSchedulers.mainThread) //指定subscribe(subscriber)运行线程
        .subscribe(subscriber); 

 

 

内置的Scheduler

RxJava 已经内置了几个 Scheduler ,它们已经适合大多数的使用场景:

已经内置的调度器的种类

下表展示了RxJava中可用的调度器种类:

调度器类型 效果
Schedulers.computation( ) 用于计算的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如xml,json文件的解析,Bitmap图片的压缩取样等。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。
Schedulers.from(executor) 使用指定的Executor作为调度器
Schedulers.immediate( ) 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。(rxjava2 已废弃)
Schedulers.io( ) 用于IO密集型任务,如异步阻塞IO操作,这个调度器的线程池会根据需要增长;对于普通的计算任务,请使用Schedulers.computation();Schedulers.io( )默认是一个CachedThreadScheduler,很像一个有线程缓存的新线程调度器
Schedulers.newThread( ) 启用新线程,并在新线程执行操作
Schedulers.trampoline( ) 在当前线程立即执行任务,如果当前线程有任务在执行,则会将其暂停,等插入进来的任务执行完之后,再将未完成的任务接着执行。(等价于Schedulers.immediate( ))

默认调度器

在RxJava中,某些Observable操作符的变体允许你设置用于操作执行的调度器,其它的则不在任何特定的调度器上执行,或者在一个指定的默认调度器上执行。下面的表格个列出了一些操作符的默认调度器:

操作符 调度器
buffer(timespan) computation
buffer(timespan, count) computation
buffer(timespan, timeshift) computation
debounce(timeout, unit) computation
delay(delay, unit) computation
delaySubscription(delay, unit) computation
interval computation
repeat trampoline
replay(time, unit) computation
replay(buffersize, time, unit) computation
replay(selector, time, unit) computation
replay(selector, buffersize, time, unit) computation
retry trampoline
sample(period, unit) computation
skip(time, unit) computation
skipLast(time, unit) computation
take(time, unit) computation
takeLast(time, unit) computation
takeLast(count, time, unit) computation
takeLastBuffer(time, unit) computation
takeLastBuffer(count, time, unit) computation
throttleFirst computation
throttleLast computation
throttleWithTimeout computation
timeInterval immediate
timeout(timeoutSelector) immediate
timeout(firstTimeoutSelector, timeoutSelector) immediate
timeout(timeoutSelector, other) immediate
timeout(timeout, timeUnit) computation
timeout(firstTimeoutSelector, timeoutSelector, other) immediate
timeout(timeout, timeUnit, other) computation
timer computation
timestamp immediate
window(timespan) computation
window(timespan, count) computation
window(timespan, timeshift) computation

 

 

 

三、重要的方法和接口

1、重要的接口

 1)ObservableOnSubscribe:发射器回调接口

public interface ObservableOnSubscribe<T>{

    void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;

}

:ObservableOnSubscribe接口只有一个方法,这个方法提供了发射数据的功能,做为Observable.create() 传递的参数

2)ObservableEmitter:数据发射器接口

 ObservableEmitter 是数据发射器接口:

ObservableEmitter 继承自Emitter接口,Emitter是一个发射器接口,是具体实现发射数据的接口,

例如:Emitter中的onNext()方法就是发射数据用的。

public interface ObservableEmitter<T> extends Emitter<T> {
    void setDisposable(@Nullable Disposable d);

    void setCancellable(@Nullable Cancellable c);

    boolean isDisposed();

    @NonNull
    ObservableEmitter<T> serialize();

    boolean tryOnError(@NonNull Throwable t);

}

 

 

3)Observer:订阅者接口

Observer是订阅者接口:
           Observer 是用于接收被观察者的数据
           public interface Observer<T> {

    void onSubscribe(@NonNull Disposable d);

    void onNext(@NonNull T t);

           void one rror(@NonNull Throwable e);

    void onComplete();

}

 

4)Consumer:消费者接口(功能类似于订阅者接口)

Consumer是消费者接口

Consumer用于接收被观察者的数据。

public interface Consumer<T> {
  
    void accept(T t) throws Exception;
}

 

 

5)Disposable:代表整个观测组序列事件

Disposable 代表一个可支配的资源

Disposable接口是代表一个可支配的支援,当被观察者被订阅,执行 .subscribe()方法,该方法返回的就是一个Disposable,即就是一个可支配的支援。

public interface Disposable {
 
    void dispose();


    boolean isDisposed();
}

6)Function系列的接口:对象转换接口

Function用于变换对象,使用于map,flatmap这种需要对象转换的情况。

例如 function接口:

public interface Function<T, R> {  //--T--输入对象,R--输出对象
   
    R apply(@NonNull T t) throws Exception;
}

function系列的接口按照输入对象的个数排序为:

IntFunction<T>    ---这个没有输出对象
Function<T, R>
BiFunction<T1, T2, R>
Function3<T1, T2, T3, R>
Function4<T1, T2, T3, T4, R>
Function5<T1, T2, T3, T4, T5, R>
Function6<T1, T2, T3, T4, T5, T6, R>
Function7<T1, T2, T3, T4, T5, T6, T7, R>
Function8<T1, T2, T3, T4, T5, T6, T7, T8, R>
Function9<T1, T2, T3, T4, T5, T6, T7, T8, T9, R>

 

;

 

 

 

 

4、具体使用注意事项:

1、内存溢出问题 :在使用rxjava的时候,如果没有及时解除订阅,在退出activity的时候,异步线程还在执行。
对activity还存在引用,此时activity不能被回收,因此出现内存溢出问题。

rxjava执行异步任务时,通常需要在Activity/Fragment销毁时,及时关闭异步任务,否则就会有内存泄漏。

方式一: 使用CompositeDisposable 管理Disposable对象,在Activity/Fragment销毁时 调用CompositeDisposable.clear()关闭所有异步任务。

compositeDisposable.add(disposable);

@Override
public void onDestroy() {
    iView=null;
    if(compositeDisposable!=null){
        compositeDisposable.clear();
        compositeDisposable=null;
    }

}

       方式二:RxLifecycle 取消订阅  (了解 lifecycle) 

       

 

 

 

 

 

 

 

 

 

 

 

标签:Observable,rxjava2,subscribe,线程,computation,使用,操作符,public
来源: https://blog.csdn.net/liu835704070/article/details/88566298