RxJava装饰者模式
1.装饰者模式
- 装饰者模式时在保留原有结构的前提下添加新的功能,这些功能作为其原有结构的包装。
2.RxJava的装饰者模式
1.被观察者Observable
- 根据
Observerable
的源码可知Observable
的结构接口是Observerablesource<T>
,里面有一个方法subscribe
用于和观察者实现订阅,源码如下
/**
* Represents a basic, non-backpressured {@link Observable} source base interface,
* consumable via an {@link Observer}.
*
* @param <T> the element type
* @since 2.0
*/
public interface ObservableSource<T> {
/**
* Subscribes the given Observer to this ObservableSource instance.
* @param observer the Observer, not null
* @throws NullPointerException if {@code observer} is null
*/
void subscribe(Observer<? super T> observer);
}
- 然后需要一个包装类,就是实现
ObservableSource
接口的类,就是Observable<T>
,它实现了ObservableSource
并在subscribe方法中调用了subscribeActual
方法与观察者实现订阅关系,源码如下
public abstract class Observable<T> implements ObservableSource<T> {
@Override
public final void subscribe(Observer<? super T> observer) {
...
subscribeActual(observer);
...
}
protected abstract void subscribeActual(Observer<? super T> observer);
}
- 第三步就是包装类了,包装类有很多有一百多个,如
ObservableAll
、ObservableAny
、ObservableCache
2.观察者Observer
- 第一步,
Observer
的结构的接口有Emitter
和Observer
,两个接口中的方法差不多,都是onNext
、OnError
、OnComplete
,用于被观察者的回调 - 第二步,实现
Emitter
或者Observer
接口的包装类,观察者中没有实现这两个接口的基础包装类,而是直接封装了很多包装类
3.被观察者和观察者的包装类有在创建的时候进行包装也有在调用的时候包装,那么他们的结构又是怎么样的
以RxJava的最基础用法来分析,Observable.create().subscribeOn().observeOn().subscribe()
为例,层层调用后它的结构如下:
- 首先是
Observable.create
,通过创建ObservableCreate
对象进行第一层包装,把ObservableOnSubscribe
包在了里面
- 然后是
Observable.create().subscribeOn()
,调用时又进行了一层包装,把ObservableCreate包进去了
- 再然后就分别是
observeOn()
了,结构如下
- 总共进行了4层包装,可以理解为每调用一次操作符就会进行一层被观察者的包装,这样包装的好处就是为了添加额外的功能,那么每一层又添加了哪些额外的功能呢
4.被观察者的subscribe
方法
调用subscribe
方法后会从最外层的包装类一步一步的往里面调用,从被观察者的subscribe
方法中可以得知额外功能的实现是在subscribeActual
方法中,那么上面几层包装的subscribeActual
方法中又做了什么呢,分析如下
- 先看最外层的包装
observerOn
的subscribeActual
方法做了什么,先看源码:
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
final boolean delayError;
final int bufferSize;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
...
}
- 源码中有一个
source
,这个source
是上一层包装类的实例,在source.subscribe()
中对观察者进行了一层包装,也就是ObserveOnObserver
,它在onNext
方法里面实现了线程切换,这个onNext
是在被观察者在通知观察者时会被回调,然后通过包装类实现额外的线程切换,这里是切换到了主线程执行。此时观察者的结构如下:
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
- 再看下一层的包装
subscribeOn
的subscribeActual
方法做了什么,先看源码
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
@Override
public void run() {
source.subscribe(parent);
}
}));
}
...
}
这里又对观察者进行了一层包装,也就是SubscribeOnObserver
,这里面的额外功能就是资源释放,包装完后的结构如下
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
private static final long serialVersionUID = 8094547886072529208L;
...
@Override
public void dispose() {
DisposableHelper.dispose(s);
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
void setDisposable(Disposable d) {
DisposableHelper.setOnce(this, d);
}
}
在subscribeActual
方法中有一个调用是source.subscribe(parent)
,这个source
就是它的上一层的包装类ObservableCreate
,那么ObservableCreate
的subscribeActual
方法就会在子线程执行。
ObservableCreate
的subscribeActual
方法做了什么,先看源码
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
...
}
源码中的source
就是创建最原始的ObservableOnSubscribe
,这里会回调到ObservableOnSubscribe
的subscribe方法
,在subscribeActual
方法中又对观察者进行了一层包装也就是CreateEmitter
,这个类里面做的事情是判断线程是否被释放,如果释放了则不再进行回调,这时候结构如下图
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
这里由于上面的包装类已经切换到了子线程所以ObservableOnSubscribe
的subscribe
方法的执行也是在子线程;
3.总结
在创建被观察者的时候会对被观察者进行层层的包装,创建几次就包装几次,然后在被观察者调用subscribe
方法时,一层层回调被观察者的subscribeActual
方法,而在被观察者subscribeActual
方法中会对观察者做一层包装。也就是说被观察者是创建的时候包装,在subscribeActual
方法中实现额外的功能,观察者是在被观察者调用subscribeActual
方法时进行包装的,然后针对观察者实现自己的额外的功能,流程图如下:
最终的结构如下:
- 第一步:创建被观察者时或者使用操作符时会对被观察者进行包装
- 第二步:当被观察者和观察者产生订阅关系后,被观察者会一层层的回调被观察者的
subscribeActual
方法,在这个方法中对观察者进行包装,此时被观察者的功能实现是在subscribeActual
中,观察者的实现是在包装类里
- 第三步:被观察者和观察者不同的是,被观察者是在订阅成功后就执行了包装类相应的功能,而观察者是在事件回调的时候,会在观察者的包装类里实现相应的功能
- 最终流程图
链接:https://juejin.cn/post/7180695827252248633
来源:稀土掘金
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。