Rxjava - 自己动手实现Rxjava
先看看大致实现的样式:
Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter emitter) throws Exception {
emitter.onNext(1);
emitter.onComplete();
}
}).map(new Function(){
@Override
public String apply(Integer integer) {
return integer + "arrom";
}
}).subscribe(new Observer(){
@Override
public void onSubscribe(Disposable d) {
Log.d("arrom", "onSubscribe 成功");
}
@Override
public void onNext(String s) {
Log.d("arrom", "onSubscribe===" + s);
}
@Override
public void onError(Throwable throwable) {
Log.d("arrom", "onError");
}
@Override
public void onComplete() {
Log.d("arrom", "onComplete");
}
});
被观察者
/**
* 被观察者
*/
public abstract class Observable implements ObserverbleSource{
/**
* 创建操作符号
* @param source
* @param
* @return
*/
public static Observable create(ObservableOnSubscribe source){
return new ObservableCreate(source);
}
@Override
public void subscribe(Observer observer) {
subscribeActual(observer);
}
protected abstract void subscribeActual(Observer<? super T> observer);
public Observable map(Function<? super T,? extends R> function){
return new ObservableMap(this,function);
}
}
观察者
public interface Observer {
void onSubscribe(Disposable d);
void onNext(T t);
void onError(Throwable throwable);
void onComplete();
}
订阅
public interface ObserverbleSource {
//订阅
void subscribe(Observer<? extends T> observer);
}
发射器
public interface ObservableOnSubscribe {
/**
* 为每一个订阅的观察者调用
* @param observableEmitter
* @throws Exception
*/
void subscribe(ObservableEmitter observableEmitter) throws Exception;
}
public interface ObservableEmitter extends Emitter{
}
/**
* 发射器
*/
public interface Emitter {
//发出正常值信号
void onNext(T value);
//发出一个throwable异常信号
void onError(Throwable throwable);
//发出完成的信号
void onComplete();
}
订阅方法的实现
public class ObservableCreate extends Observable {
final ObservableOnSubscribe source;
public ObservableCreate(ObservableOnSubscribe source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter parent = new CreateEmitter(observer);
observer.onSubscribe(parent);//通知观察者订阅成功
try {
source.subscribe(parent);
} catch (Exception e) {
e.printStackTrace();
parent.onError(e);
}
}
static final class CreateEmitter implements ObservableEmitter ,Disposable{
final Observer<? super T> observer;
private boolean flag;
public CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void disposa(boolean flag) {
this.flag = flag;
}
@Override
public boolean isDisposad() {
return flag;
}
@Override
public void onNext(T value) {
if (!flag){
observer.onNext(value);
}
}
@Override
public void onError(Throwable throwable) {
if (!flag){
observer.onError(throwable);
}
}
@Override
public void onComplete() {
if (!flag){
observer.onComplete();
}
}
}
}
Disposable
public interface Disposable {
void disposa(boolean flag);
boolean isDisposad();
}
create操作符大致就这个几个类。转换操作和这个有点类似只是有一些不一眼的地方
被观察者
/**
* 被观察者
* @param
* @param
*/
public abstract class AbstractObservableWithUpstream extends Observable {
protected final ObserverbleSource source;
public AbstractObservableWithUpstream(ObserverbleSource source) {
this.source = source;
}
}
观察者
/**
* 观察者
* @param
* @param
*/
public abstract class BaseFuseableObserver implements Observer, Disposable {
//观察者
protected final Observer<? super R> actual;
protected Disposable disposable;
public BaseFuseableObserver(Observer<? super R> actual) {
this.actual = actual;
}
@Override
public void disposa(boolean flag) {
disposable.disposa(flag);
}
@Override
public boolean isDisposad() {
return disposable.isDisposad();
}
@Override
public void onSubscribe(Disposable d) {
this.disposable = d;
actual.onSubscribe(d);
}
@Override
public void onError(Throwable throwable) {
actual.onError(throwable);
}
@Override
public void onComplete() {
actual.onComplete();
}
}
public class ObservableMap extends AbstractObservableWithUpstream {
Function<? super T, ? extends U> function;
public ObservableMap(ObserverbleSource source,Function<? super T, ? extends U> function){
super(source);
this.function = function;
}
@Override
protected void subscribeActual(Observer<? super U> observer) {
source.subscribe(new MapObserver<>(observer,function));
}
static final class MapObserver extends BaseFuseableObserver{
final Function<? super T, ? extends U> mapper;
public MapObserver(Observer<? super U> actual,Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
@Override
public void onNext(T t) {
U u = mapper.apply(t);
actual.onNext(u);
}
}
}
转换函数
public interface Function {
/**
* 转换
* @param t
* @return
*/
R apply(T t);
}
自己撸完一遍之后感觉其实没有那么绕。