注册

Rxjava - 自己动手实现Rxjava

先看看大致实现的样式:

Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter emitter) throws Exception {
emitter.onNext(1);
emitter.onComplete();
}
}).map(new Function(){

@Override
public String apply(Integer integer) {
return integer + "arrom";
}
}).subscribe(new Observer(){

@Override
public void onSubscribe(Disposable d) {
Log.d("arrom", "onSubscribe 成功");
}

@Override
public void onNext(String s) {
Log.d("arrom", "onSubscribe===" + s);
}

@Override
public void onError(Throwable throwable) {
Log.d("arrom", "onError");
}

@Override
public void onComplete() {
Log.d("arrom", "onComplete");
}
});

被观察者

/**
* 被观察者
*/
public abstract class Observable implements ObserverbleSource{


/**
* 创建操作符号
* @param source
* @param
* @return
*/
public static Observable create(ObservableOnSubscribe source){

return new ObservableCreate(source);
}
@Override
public void subscribe(Observer observer) {

subscribeActual(observer);

}

protected abstract void subscribeActual(Observer<? super T> observer);


public Observable map(Function<? super T,? extends R> function){
return new ObservableMap(this,function);
}

}

观察者

public interface Observer {

void onSubscribe(Disposable d);

void onNext(T t);

void onError(Throwable throwable);

void onComplete();

}

订阅

public interface ObserverbleSource {

//订阅
void subscribe(Observer<? extends T> observer);

}

发射器

public interface ObservableOnSubscribe {

/**
* 为每一个订阅的观察者调用
* @param observableEmitter
* @throws Exception
*/
void subscribe(ObservableEmitter observableEmitter) throws Exception;
}
public interface ObservableEmitter extends Emitter{
}
/**
* 发射器
*/
public interface Emitter {

//发出正常值信号
void onNext(T value);

//发出一个throwable异常信号
void onError(Throwable throwable);

//发出完成的信号
void onComplete();
}

订阅方法的实现

public class ObservableCreate extends Observable {

final ObservableOnSubscribe source;

public ObservableCreate(ObservableOnSubscribe source) {
this.source = source;
}


@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter parent = new CreateEmitter(observer);
observer.onSubscribe(parent);//通知观察者订阅成功
try {
source.subscribe(parent);
} catch (Exception e) {
e.printStackTrace();
parent.onError(e);
}
}

static final class CreateEmitter implements ObservableEmitter ,Disposable{

final Observer<? super T> observer;

private boolean flag;

public CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}

@Override
public void disposa(boolean flag) {
this.flag = flag;
}

@Override
public boolean isDisposad() {
return flag;
}

@Override
public void onNext(T value) {
if (!flag){
observer.onNext(value);
}
}

@Override
public void onError(Throwable throwable) {
if (!flag){
observer.onError(throwable);
}
}

@Override
public void onComplete() {
if (!flag){
observer.onComplete();
}
}
}

}

Disposable

public interface Disposable {

void disposa(boolean flag);


boolean isDisposad();

}

create操作符大致就这个几个类。转换操作和这个有点类似只是有一些不一眼的地方

被观察者

/**
* 被观察者
* @param
* @param
*/
public abstract class AbstractObservableWithUpstream extends Observable {

protected final ObserverbleSource source;

public AbstractObservableWithUpstream(ObserverbleSource source) {
this.source = source;
}

}

观察者

/**
* 观察者
* @param
* @param
*/
public abstract class BaseFuseableObserver implements Observer, Disposable {

//观察者
protected final Observer<? super R> actual;

protected Disposable disposable;

public BaseFuseableObserver(Observer<? super R> actual) {
this.actual = actual;
}

@Override
public void disposa(boolean flag) {
disposable.disposa(flag);
}

@Override
public boolean isDisposad() {
return disposable.isDisposad();
}

@Override
public void onSubscribe(Disposable d) {
this.disposable = d;
actual.onSubscribe(d);
}


@Override
public void onError(Throwable throwable) {
actual.onError(throwable);
}

@Override
public void onComplete() {
actual.onComplete();
}
}
public class ObservableMap extends AbstractObservableWithUpstream {

Function<? super T, ? extends U> function;


public ObservableMap(ObserverbleSource source,Function<? super T, ? extends U> function){
super(source);
this.function = function;
}

@Override
protected void subscribeActual(Observer<? super U> observer) {
source.subscribe(new MapObserver<>(observer,function));
}


static final class MapObserver extends BaseFuseableObserver{

final Function<? super T, ? extends U> mapper;

public MapObserver(Observer<? super U> actual,Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}


@Override
public void onNext(T t) {
U u = mapper.apply(t);
actual.onNext(u);
}

}


}

转换函数

public interface Function {
/**
* 转换
* @param t
* @return
*/
R apply(T t);

}

自己撸完一遍之后感觉其实没有那么绕。


0 个评论

要回复文章请先登录注册