Android进阶系列之第三方库知识点整理。
知识点总结,整理也是学习的过程,如有错误,欢迎批评指出。
第一篇:
Rxjava2(一)、基础概念及使用
第二篇:
Rxjava2(二)、五种观察者模式及背压
终于到操作符了,我觉得
rxjava2
如此好用,绝对少不了操作符的功劳,下面这张图你就简单的扫一眼,别慌,我们慢慢啃。
上一篇讲了,
rxjava
有五种观察者创建模式,其中
Observable
和
Flowable
差不多,只是
Flowable
支持背压,而其它三种,都是简化版的
Observable
,所以,本篇以
Observable
方式来讲操作符的使用。
Observable
源码
Observable
是一个抽象类,继承
ObservableSource
ObservableSource
:
一、创建操作符
这类操作符,创建直接返回
Observable
1.1、嵌套回调事件
1.1.1、create
create
是最常用的一个操作符,该操作符的参数中产生的
emitter
发射器,通过
onNext
不断给下游发送数据,也可以发送
onComplete
、
onError
事件给下游。
需要发送给下游的数据,就通过emitter.onNext()给下游发送。
当发送了
onComplete
或者
onError
事件后,下游停止接收剩下的
onNext
事件
static <T> Observable<T> create(ObservableOnSubscribe<T> source)
demo:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("A");
emitter.onNext("B");
emitter.onComplete();
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext: s=" + s);
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError");
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete");
}
});
1.2、复杂数据遍历
这类操作符,直接将一个
数组
、
集合
拆分成单个ObJect数据依次发送给下游,也可以直接发送Object数据。
1.2.1、just
转换一个或多个
Object
数据,依次将这些数据发射到下游。
最多接收十个
Object
参数。
A : 最多只接收十个参数。
Demo:
Observable.just("A", "B", "C", "D")
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, "accept: s=" + s);
}
});
1.2.2、fromArray
直接传入一个数组数据,操作符将数组里面的元素按先后顺序依次发送给下游,可以发送十个以上的数据。
static <T> Observable<T> fromArray(T... items)
Demo:
String[] data = new String[]{"A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "K"};
Observable.fromArray(data)
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, "fromArray--accept: s=" + s
);
}
});
1.2.3、fromIterable
直接传入一个集合数据,操作符将集合里面的元素按先后顺序依次发送给下游,可以发送十个以上的数据。
static <T> Observable<T> fromIterable(Iterable<? extends T> source)
Demo:
List<String> mData = new ArrayList<>();
mData.add("A");
mData.add("B");
mData.add("C");
Observable.fromIterable(mData)
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, "fromIterable--accept: s=" + s);
}
});
1.2.4、range
快速创建一个被观察者对象,连续发送一个指定开始和总数的事件序列
立即发送,无延时
static Observable<Integer> range(final int start, final int count)
Demo:
Observable.range(3, 10).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "range--accept: integer=" + integer);
}
});
1.3、定时任务
1.3.1、interval
快速创建一个被观察者,延迟一定时间后再每隔指定的一个时间发送一个事件(从0开始的整数)给下游。
发送数据从0开始,依次+1整数递增
延迟时间可以为0,重载方法不设置默认使用第二个参数数值。
Observable<Long> interval(long initialDelay, long period, TimeUnit unit)
public static Observable<Long> interval(long period, TimeUnit unit) {
return interval(period, period, unit, Schedulers.computation());
}
static Observable<Long> interval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler)
demo:
Observable.interval(2, 5, TimeUnit.SECONDS)
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Long aLong) {
Log.d(TAG, "onNext: aLong=" + aLong);
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: error" + e);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
1.3.2、intervalRange
快速创建1个被观察者对象,每隔指定时间发送1个事件,可以指定事件发送开始的值和总的值。
static Observable<Long> intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit, Scheduler scheduler)
demo:
Observable.intervalRange(2, 10, 3, 1, TimeUnit.SECONDS)
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Long aLong) {
Log.d(TAG, "onNext: aLong=" + aLong);
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: error" + e);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
1.4、延迟任务
1.4.1、defer
创建一个
Observable
对象,被观察者逻辑真正执行的时机是在其被订阅的时候。
当下游订阅后,上游才开始处理逻辑。
static <T> Observable<T> defer(Callable<? extends ObservableSource<? extends T>> supplier)
demo:
String[] mStrings = new String[]{"A", "B", "C", "D"};
Observable observable = Observable.defer(new Callable<ObservableSource<String>>() {
@Override
public ObservableSource<String> call() throws Exception {
return Observable.fromArray(mStrings);
}
});
mStrings = new String[]{"defer,订阅时候才创建"};
observable.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, "accept: s=" + s);
}
});
1.4.2、timer
创建一个被观察者对象,上游延时指定的时间后发送一个事件到下游。
发送的数值为Long型的0
static Observable<Long> timer(long delay, TimeUnit unit, Scheduler scheduler)
demo:
public void timer() {
Observable.timer(5, TimeUnit.SECONDS)
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.d(TAG, "accept: aLong=" + aLong);
}
});
}
二、变换操作符
2.1、变换
2.1.1、map
对上游发送的每一个事件都进行指定的函数处理,从而变换成另一个事件再发送给下游。
常使用场景:用作数据类型转换
final <R> Observable<R> map(Function<? super T, ? extends R> mapper)
demo:
public void map() {
Observable.just(1, 2, 3).map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return integer + "变换";
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, "accept: s=" + s);
}
});
}
2.1.2、flatMap
将一个发送事件的上游
Observable
变换为多个发送事件的
Observables
,然后将它们发射的事件单独做处理后再合并放进一个单独的
Observable
里发送给下游。
可以看到上游发送了三个事件(注意颜色),中间对每个事假数据进行处理后(每一个圆变成两个矩形),再合并成包含六个矩形事件的Observable对象发送给下游,注意矩形颜色,他是无规律,无序的,并不是严格按照上游发送的顺序来发送给下游。
final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper)
demo:
public void flatMap() {
Observable.just(1, 2, 3).flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
final List<String> list = new ArrayList<>();
for (int i = 0; i < 3; i++) {
list.add("变换后的数据" + integer);
}
return Observable.fromIterable(list).delay(10, TimeUnit.MILLISECONDS);
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, "accept: s=" + s);
}
});
}
2.1.3、concatMap
同
flatMap
一样的功能,只是
flatMap
不能保证转换后发送给下游事件的时序,concatMap转换后能严格按照上游发送的顺序再发送给下游。
同
flatMap
一样,重点注意颜色,转换后颜色和上游发送的顺序一致,有序发送
final <R> Observable<R> concatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, int prefetch)
demo:
public void concatMap() {
Observable.just(1, 2, 3).concatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
final List<String> list = new ArrayList<>();
for (int i = 0; i < 3; i++) {
list.add("变换后的数据" + integer);
}
return Observable.fromIterable(list).delay(10, TimeUnit.MILLISECONDS);
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, "accept: s=" + s);
}
});
}
三、合并操作符
3.1、concat
组合多个被观察者一起发送数据,合并后
按发送顺序串行执行
组合的被观察者数量要求小于等于4个,从提供的方法参数里面可以得知。
public static <T> Observable<T> concat(
ObservableSource<? extends T> source1, ObservableSource<? extends T> source2,
ObservableSource<? extends T> source3, ObservableSource<? extends T> source4)
demo:
public void concat() {
Observable<String> observable1 = Observable.just("1", "2");
Observable<String> observable2 = Observable.just("A", "B", "C");
Observable<String> observable3 = Observable.just("hello", "rxjava");
Observable.concat(observable1
, observable2, observable3)
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, "accept: s=" + s);
}
});
}
3.2、concatArray
同
concat
一样,组合多个被观察者一起发送数据,合并后
按发送顺序串行执行
concatArray对组合的被观察者对象没有个数限制,可以大于4个。
上游发送的是一个组合的观察者数组,没有数量限制(注意颜色)
转换后串行发送(颜色和上游发送顺序对应)
static <T> Observable<T> concatArray(ObservableSource<? extends T>... sources)
demo:
public void concatArray() {
Observable<String> observable1 = Observable.just("1", "2");
Observable<String> observable2 = Observable.just("A", "B", "C");
Observable<String> observable3 = Observable.just("D", "E");
Observable<String> observable4 = Observable.just("F");
Observable<String> observable5 = Observable.just("G");
Observable.concatArray(observable1, observable2, observable3, observable4, observable5)
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, "accept: s=" + s);
}
});
}
3.3、concatDelayError、concatArrayDelayError
使用concat操作符时,如果遇到其中一个被观察者发出
onError
事件则会马上终止其他被观察者的事件,如果希望
onError
事件推迟到其他被观察者都结束后才触发,可以使用对应的concatDelayError。
public static <T> Observable<T> concatDelayError(Iterable<? extends ObservableSource<? extends T>> sources) {
ObjectHelper.requireNonNull(sources, "sources is null");
return concatDelayError(fromIterable(sources));
}
public static <T> Observable<T> concatDelayError(ObservableSource<? extends ObservableSource<? extends T>> sources) {
return concatDelayError(sources, bufferSize(), true);
}
public static <T> Observable<T> concatDelayError(ObservableSource<? extends ObservableSource<? extends T>> sources, int prefetch, boolean tillTheEnd)
demo:
public void concatArrayDelayError() {
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("A");
emitter.onNext("B");
emitter.onNext("C");
emitter.onError(new NullPointerException(""));
emitter.onNext("D");
}
});
Observable.concatArrayDelayError(observable, Observable.just("E", "F"))
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: ");
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext: s="+s);
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: e" + e.getMessage(), e);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
}
可以看到,第一个observable发送到c后,手动抛出一个错误,但是并灭有影响到Observable.just("E", "F")的执行,我们依旧打印出了 E,F两个参数后才去执行我们手动抛出的NullPointerException错误
操作符这部分内容比较多,先整理这部分,后面会对其他操作符再做整理。