RX-* 系列的库是一款开源的并发流程控制的框架,有多种语言的实现[1]。用户可以通过它使用流式的编程风格,写出高可读性的并发流程控制代码。以下是针对RX-Java中,observable的各种变换(如map、flatmap)的内部实现的分析。
如果使用过RX-Java,我们知道,map可以使一种类型的可订阅者被另一种类型的订阅者订阅:
Observable<Integer>
可被Subscriber<Integer>
订阅Observable<Integer>
–>map<Integer, String>
:可被Subscriber<String>
订阅
不论是map、还是flatmap,在底层都是通过life实现。那么通过life转换后的的observable如何将原来的类型参数发射到新的subscriber上(observable –> observable ): 1: 调用operator<R, T>的call方法(R为目标类型, T为原始类型)
- (1)call中传入新的Subscriber
(将来新的subscriber订阅observable 时会自动传入 ) , - (2)实例化旧的的Subscriber
(和原本旧的observer 绑定 ),通过旧的Subscriber中的onNext、onCompleted,将T类型的数据处理并转为R类型,发送通知给subscriber
- (1)call中传入新的Subscriber
- 2:
* 实例化一个新的observable<R>,绑定新的OnSubscribe * 通过 newOnSubscribe<R>.call()调用oldOnSubscribe<T>.call() * oldOnSubscribe<T> 通知 subscriber<T> ,subscriber<T> 在onNext onError onComplate中调用subscriber<R>
通过代码来理解:
定义一个Integer 的 observable
1 | Observable observable = Observable.create(new Observable.OnSubscribe<Integer>() { |
通过life将Integer的observable转为String的observable
注意看其中subsubscriber
1 | observable = observable.lift(new Observable.Operator<String, Integer>() { |
注册Subscriber
1 | observable.observeOn(Schedulers.newThread()) |
我们可以看到:
- RX-Java在life的实现中,将observable
转为observable ,并没有去考虑将observable 中的call方法中发出的事件克隆过来,而是直接将observable 的创建与observable 的OnSubscribe相关联,直接通过observable 产生事件通知observable 。 - 在subscriber
与observable 的订阅事件(call)注册过程中,也很巧妙,通过observable 产生事件,通知observable -->
然后observable通知subscriber 执行事件 -->
subscriber中再通知subscriber 执行订阅事件
这一整套过程,就优雅地使用了设计模式中的适配器模式
和代理
模式