注册

RxJava的并发实现

我们在开发App过程中,常常遇见这种需求,例如首页,仅一个界面就要请求3个甚至更多的接口,更变态的是这些接口必须按顺序请求,来以此展示返回结果,那么这样我们就无法用普通的并发去同时请求接口了,因为我们无法预知各个接口的请求完成时间,普通的也是最简单的办法就是依次请求接口了,A接口请求完成->B接口请求完成->C接口...简单粗暴有木有?并且在加载效率上(接口请求时间)会差很多,那么有没有更优雅的办法去解决这种需求呢?那必须有,利用RxJava的Observable.zip方法即可实现并发请求!

假如ApiService中有两个接口:

    @GET("test1")
Observable<HttpResult<TestModel1>> test1(@QueryMap HashMap<String, String> options);

@GET("test2")
Observable<HttpResult<TestModel2>> test2(@QueryMap HashMap<String, String> options);

HttpResult为自定义数据结构:

public class HttpResult<T> {

public int status;

public String msg;

public T data;

}

TestModel1和TestModel2则分别为两个返回的数据结构!

接口封装后的请求方法: test1:

    Observable o1 = Observable.create((ObservableOnSubscribe<TestModel1>) emitter ->
//接口请求
ApiUtil.getInstance()
.getApiService()
.test1()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<HttpResult<TestModel1>>() {

@Override
public void onSubscribe(Disposable d) {

}

@Override
public void onNext(HttpResult<TestModel1> httpResult) {
emitter.onNext(httpResult.data);
emitter.onComplete();
}

@Override
public void onError(Throwable e) {
emitter.onNext(null);
emitter.onComplete();
}

@Override
public void onComplete() {

}
}));

注意: ObservableOnSubscribe的参数是o1 中emitter要传递的参数类型,也就是你接口得到的数据类型:TestModel1!

test2:

 Observable o2 = Observable.create((ObservableOnSubscribe<TestModel2>) emitter ->
//接口请求
ApiUtil.getInstance()
.getApiService()
.test2()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<HttpResult<TestModel2>>() {

@Override
public void onSubscribe(Disposable d) {

}

@Override
public void onNext(HttpResult<TestModel2> httpResult) {
emitter.onNext(httpResult.data);
emitter.onComplete();
}

@Override
public void onError(Throwable e) {
emitter.onNext(null);
emitter.onComplete();
}

@Override
public void onComplete() {

}
}));

两个接口请求,得到两个Observable:o1和o2!

合并:

   Observable.zip(o1, o2, new BiFunction<Object, Object, Object>() {
@Override
public Object apply(Object o, Object o2) throws Exception {
TestModel1 t1 = (TestModel1) o;//o1得到的结果
TestModel2 t2 = (TestModel2) o2;//o2得到的结果
FinalData f=new FinalData();//最终结果合并
f.t1=t1;
f.t2=t2;
return f;
}
}).subscribeOn(Schedulers.io()).subscribe(o -> {
FinalData f=(FinalData)o;//获取最终结果
//处理数据...
});

注意: BiFunction中的3个Obj参数,前两个对应接口返回数据类型,最后一个对应apply方法返回的数据类型(最终结果)!

如果是3个或以上接口,那么合并时可以根据接口数量使用Function3,Function4...

   Observable.zip(o1, o2,o3, new Function3<Object, Object, Object,Object>() {
@Override
public Object apply(Object o, Object o2,Object o3) throws Exception {

}
}).subscribeOn(Schedulers.io()).subscribe(o -> {

});

除了zip操作符,rxjava还提供了concat,merge,join等其它合并操作符,但它们又各有不同,有兴趣的可以去多了解一下!

0 个评论

要回复文章请先登录注册