将异常处理交给订阅者来做,Observerable的操作符调用链中一旦有一个抛出了异常,就会直接执行onError()方法。
使用RxJava,Observable对象根本不需要知道如何处理错误!操作符也不需要处理错误状态-一旦发生错误,就会跳过当前和后续的操作符。所有的错误处理都交给订阅者来做。
结合多个 Observable 的发射项,取每个 Observable 的发射项,然后组合后发出一个。
RxJava 2.x 最大的改动就是对于 backpressure 的处理 。为此将原来的 Observable 拆分成了新的 Observable 和 Flowable,。
不过此次更新中,出现了两种观察者模式:
- Observable ( 被观察者 ) / Observer ( 观察者 )
- Flowable (被观察者)/ Subscriber (观察者)
对于I / O绑定工作,例如阻塞I / O的异步性能,此调度程序由一个将根据需要增长的线程池支持;默认情况下,Schedulers.io()是一个CachedThreadScheduler,它类似于带有线程缓存的新线程调度程序。
meant for I/O-bound work such as asynchronous performance of blocking I/O, this scheduler is backed by a thread-pool that will grow as needed; for ordinary computational work, switch to Schedulers.computation( ); Schedulers.io( ) by default is a CachedThreadScheduler, which is something like a new thread scheduler with thread caching
用于计算工作,如事件循环和回调处理;不要将此调度程序用于I / O(改为使用Schedulers.io());默认情况下,线程数等于处理器数。
meant for computational work such as event-loops and callback processing; do not use this scheduler for I/O (use Schedulers.io( ) instead); the number of threads, by default, is equal to the number of processors
- Emitter:onNext,onError,onComplete方法
- ObservableEmitter: 实现了Emitter,并绑定Disposable对象,实现订阅和取消订阅功能
- CreateEmitter:实现ObservableEmitter,包装了Observer。由这个类调用Observer的onNext,onError等方法。
- ObservableOnSubscribe:实现了ObservableEmitter到Observable的绑定
- ObservableSource:Observable抽象类的接口
- Observable:抽象类
- ObservableCreate:实现类
- Observer:观察者接口
RxJava 中上游是怎么发送事件的,下游又是怎样接收到的?
Observable持有ObservableOnSubscribe对象,ObservableOnSubscribe持有ObservableEmitter对象,ObservableEmitter持有Observer对象。Observable调用ObservableEmitter的onNext方法时,会由ObservableEmitter通知到Observer。
ObservableEmitter通知给Observer之前会检查null值、订阅关系是否存在等
基本工作原理: https://juejin.im/post/5a521d68f265da3e4e25750e
线程切换实现:https://juejin.im/post/5a6751af6fb9a01cb2571794
https://www.jianshu.com/p/88aa273d37be
我写的rxjava的几个例子:https://github.com/jliang981/commonJava/tree/master/jliang-midware/src/main/java/com/step/jliang/rxjava
rxjava适用于大量的任务之间没有依赖关系,可以并发执行的情况。回调方法 (onNext, onCompleted, onError)Subscribe方法用于将观察者连接到Observable,你的观察者需要实现以下方法的一个子集:onNext(T item)Observable调用这个方法发射数据,方法的参数就是Observable发射的数据,这个方法可能会被调用多次,取决于你的实现...
zip函数允许你传入多个请求,然后合并出另外的结果传出来,普通的用法就不多说了,网上一堆介绍的
然后做项目时有个疑问点,Observable.zip如果传入一个列表,合并列表里的所有请求的时候,请求回来的顺序是未知的,返回回来的数组是否会按传入时的顺序返回回来呢。于是做了以下实验:
Integer[] skuSerials={1,2,3,4,5};
ArrayList<Observable<Integer>> requestList=new Array
RxJava 在 GitHub 的介绍:
RxJava:a library for composing asynchronous and event-based programs using observable sequences for the Java VM
// 翻译:RxJava 是一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库
使用zip方法可以把多个类型不同的数据源Observable合并为一个类型的数据源Observable。
public static <T1, T2, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, final Func2<? super T1, ? super T2, ? extends R> zipFunction)
举个例子:
* 把多个Observable合并后,并且把这些Observable的数据进行转换再发射出去。转换之后的数据数目由最短数据长度的那个Observable决定。发射完最终会自动调用观察者的onComplete方法()
* 如以下代码: 数据长度为6的observable1和数据长度为4的observable2进行合并转换后,观察者只接收到4个数据
Observable observable1 = Observabl...
1、zip
Observable ob1 = Observable.just(1,2,3);
Observable ob2 = Observable.just(10,20,30,40);
Observable.zip(ob1,ob2, (int1, int2) -> int1+"-"+int2).subscribe(o -> Log.v("TAG", "zip:"+o
zip操作符允许接受多个ObservableSource发射的数据,并能够将他们重新组合并重新发射的一种操作符。常见的使用方式:
首先传入若干个ObservableSource,然后每个ObservableSource都能通过observer.onNext来发射数据,最终在Function接口实现方法中可以获取到这些数据,并做一个重新整合或其他操作,然后返回。比如这里我将获取的值相加然后返回,最终在Observer.onNext中能获取到值123:
看看zip的实现:
将传入的Functi