RxJava2 事件分发&消费绑定逻辑 简析
前言
重温RxJava2源码,做个简单的记录,本文仅分析事件的发射与消费简单逻辑,从源码角度分析被观察者(上游事件)是如何与观察者(下游事件)进行关联的。
事件发射
Observable.just(1,2,3)
.subscribe();
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
}
}).subscribe();
上述两种方式都是由被观察者发出3个事件,交给观察者(下游事件)去处理。这里分析一下Observable.just
与Observable.create
方法的区别
Observable被观察者(上游事件)
just方式
public static <T> Observable<T> just(T item1, T item2, T item3) {
return fromArray(item1, item2, item3);
}
这里将传入的item…
继续传入fromArray
方法
public static <T> Observable<T> fromArray(T... items) {
return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items));
}
最终将参数传入实例化的ObservableFromArray
对象中,并将该对象返回,此处可先不关注RxJavaPlugins
类,继续探索ObservableFromArray
类都做了什么;
public final class ObservableFromArray<T> extends Observable<T> {
final T[] array;
public ObservableFromArray(T[] array) {
this.array = array;
}
@Override
public void subscribeActual(Observer<? super T> observer) {
FromArrayDisposable<T> d = new FromArrayDisposable<T>(observer, array);
observer.onSubscribe(d);
if (d.fusionMode) {
return;
}
d.run();
}
}
作为Observable
的子类,每个被观察者都要实现自己的subscribeActual
方法,这里才是真正与观察者进行绑定的具体实现,其中实例化了FromArrayDisposable
对象,并将observer
(观察者)与array
传入,方法结尾调用了其run
方法。
void run() {
T[] a = array;
int n = a.length;
for (int i = 0; i < n && !isDisposed(); i++) {
T value = a[i];
if (value == null) {
downstream.onError(new NullPointerException("The element at index " + i + " is null"));
return;
}
downstream.onNext(value);
}
if (!isDisposed()) {
downstream.onComplete();
}
}
可以看到其中对于最初传入的1、2、3,以此进行了onNext
方法的调用,分发结束后调用了onComplete
,事件结束。
create方式
首先从上面的实例代码可以看到,create
方法中还需要传入ObservableOnSubscribe
的实例对象,暂且不管,我们来挖掘一下create
方法
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
最终将上述我们创建的ObservableOnSubscribe
对象传入新实例化的ObservableCreate
对象中,并将该对象返回;
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
}
看到在subscribeActual
方法中,创建了CreateEmitter
对象,接着分别调用observer#onSubscribe
方法和source#subscribe
方法,这里要搞清楚其中的3个变量分别是什么
source
:被观察者(上游事件),最初我们create
方法中传入的接口对象,我们就是在source
中进行事件分发的observer
:观察者(下游事件),我们的事件最终交给observer
去处理,这里将observer
传入了CreateEmitter
,就是要在Emitter
中进行中转分发事件给observer
parent
:理解为一个上下游的中转站,上游事件发射后在这里交给下游去处理
最后我们看一下CreateEmitter
类中的实现
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
}
这里只贴出了onNext
方法,可以看到当onNext
方法被调用后,其中就会去调用observer
的onNext
方法,而onNext
最初的触发就是在实例代码中我们实例化的ObservableOnSubscribe
其中的subscribe
方法中
事件消费
...
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
}
});
...
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Integer integer) {
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
上述两种方式都是接收被观察者(上游事件)发出的事件,进行处理消费。这里分析一下Consumer
与Observer
的区别
Observer观察者(下游事件)
Consumer
public interface Consumer<T> {
/**
* Consume the given value.
* @param t the value
* @throws Exception on error
*/
void accept(T t) throws Exception;
}
Consumer
仅为一个接口类,其中accept
方法接收事件并消费,我们需要去到上游事件订阅下游事件时的subscribe
方法,根据下游事件的参数类型与数量,会进入不同的subscribe
重载方法中;
subscribe(Consumer<? super T> onNext) : Diposable
public final Disposable subscribe(Consumer<? super T> onNext) {
return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete, Consumer<? super Disposable> onSubscribe) {
LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);
subscribe(ls);
return ls;
}
该方法中包装了一个LambdaObserver
,将我们传入的onNext
方法再传入其中
public final class LambdaObserver<T> extends AtomicReference<Disposable>
implements Observer<T>, Disposable, LambdaConsumerIntrospection {
private static final long serialVersionUID = -7251123623727029452L;
final Consumer<? super T> onNext;
final Consumer<? super Throwable> onError;
final Action onComplete;
final Consumer<? super Disposable> onSubscribe;
public LambdaObserver(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete,
Consumer<? super Disposable> onSubscribe) {
super();
this.onNext = onNext;
this.onError = onError;
this.onComplete = onComplete;
this.onSubscribe = onSubscribe;
}
@Override
public void onNext(T t) {
if (!isDisposed()) {
try {
onNext.accept(t);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
get().dispose();
onError(e);
}
}
}
可以看到LambdaObserver
实际上就是Observer
的实现类,其中实现了onSubscribe
onNext
onError
onComplete
方法,上述代码中我们看到我们最初的Consumer
对象实际上就是其中的onNext
变量,在LambdaObserver
收到onNext
事件消费时,再将事件交给Consumer
去处理。Consumer
相当于一种简易模式的观察者,根据被观察者的subscribe
订阅方法消费特定的事件(onNext
或onError
等)。
Observer
public interface Observer<T> {
void onSubscribe(@NonNull Disposable d);
void onNext(@NonNull T t);
void onError(@NonNull Throwable e);
void onComplete();
}
Observer
是最原始的观察者,是所有Observer
的顶层接口,其中方法为观察者可以消费的四个事件
subscribe(Observer<? super T> observer)
该方法也是其他所有订阅观察者方法最终会进入的方法
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
...
} catch (Throwable e) {
...
}
}
最终在subscribeActual
方法中进行被观察者与观察者(上游与下游事件)的绑定。
写在结尾
抛开所有的操作符、线程切换来说,RxJava的上下游事件绑定逻辑还是十分清晰易读的,可以通过源码了解每个事件是如何从上游传递至下游的。至于其他逻辑,另起篇幅分析。
链接:https://juejin.cn/post/7184749810484772923
来源:稀土掘金
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。