1. Builder
Retrofit
通过Retrofit.Builder
创建,主要是配置各种工厂Factory
。
val retrofit = Retrofit.Builder()
.baseUrl("this is baseUrl")
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.addConverterFactory(GsonConverterFactory.create())
.build()
Builder.builder()
主要有五个步骤:
- 获取
Platform
对象,Platform
是平台适配器,主要为了跨平台使用,类如安卓平台实现就是Android
。 - 初始化
CallFactroy
,CallFactroy
的作用是生产realCall
,网络请求由他发出。默认值是OkHttp
的OKHttpClient
。 - 初始化
Executor
,默认通过Platform
获取。 - 初始化
CallAdapter.Factroy
,CallAdapter.Factroy
生产CallAdapter
,CallAdapter
是realCall
的适配器,通过对realCall
的包装,实现Executor
,java8
的Futrue
,rxjava
等调度方法。存在多个,顺序是自定义配置-->默认配置。默认配置通过Platform
获得。 - 初始化
Converter.Factory
,Converter.Factory
生产Converter
,Converter
是数据转换器,将返回的数据转换为需要的数据,一般转换为我们要用的对象。存在多个,顺序是内置配置-->自定义配置-->默认配置。默认配置通过Platform
获得。
class Builder{
Builder(Platform platform) {
this.platform = platform;
}
public Builder() {
//step 0
this(Platform.get());
}
public Retrofit build() {
if (baseUrl == null) {
throw new IllegalStateException("Base URL required.");
}
// step1
okhttp3.Call.Factory callFactory = this.callFactory;
if (callFactory == null) {
callFactory = new OkHttpClient();
}
//step2
Executor callbackExecutor = this.callbackExecutor;
if (callbackExecutor == null) {
callbackExecutor = platform.defaultCallbackExecutor();
}
//step3
List<CallAdapter.Factory> callAdapterFactories = new ArrayList<>(this.callAdapterFactories);
callAdapterFactories.addAll(platform.defaultCallAdapterFactories(callbackExecutor));
//step4
List<Converter.Factory> converterFactories = new ArrayList<>(
1 + this.converterFactories.size() + platform.defaultConverterFactoriesSize());
converterFactories.add(new BuiltInConverters());
converterFactories.addAll(this.converterFactories);
converterFactories.addAll(platform.defaultConverterFactories());
return new Retrofit(callFactory, baseUrl, unmodifiableList(converterFactories),
unmodifiableList(callAdapterFactories), callbackExecutor, validateEagerly);
}
}
到这里,Retrofit对象创建完成,一个大致的结构如下图:
2. 创建API对象
Retrofit通过定义网络请求的接口设置请求参数和返回类型,通过调用retrofit.create()
创建这个接口的对象,调用这个对象的方法生成最终的网络请求。
interface MyService {
@GET("/user")
fun getUser(): Observable<Response<User>>
}
val myServiceClass: MyService = retrofit.create(MyService::class.java)
进入到create
方法,可以看到是直接调用Proxy.newProxyInstance()
方法创建出对象。这是标准库提供的动态代理机制,在运行时创建接口的实例对象。
Proxy.newProxyInstance()
方法有三个参数:
classLoader
: 类加载器
interfaces
:需要实现的接口
InvocationHandler
:代理方法
public <T> T create(final Class<T> service) {
return (T) Proxy.newProxyInstance(service.getClassLoader(), new Class<?>[] { service },
new InvocationHandler() {
private final Platform platform = Platform.get();
private final Object[] emptyArgs = new Object[0];
@Override public @Nullable Object invoke(Object proxy, Method method,
@Nullable Object[] args) throws Throwable {
if (method.getDeclaringClass() == Object.class) {
return method.invoke(this, args);
}
if (platform.isDefaultMethod(method)) {
return platform.invokeDefaultMethod(method, service, proxy, args);
}
return loadServiceMethod(method).invoke(args != null ? args : emptyArgs);
}
});
}
为了更好的理解动态代理,可以当做动态代理自动帮我们生成一个接口的实现类。将所有的方法都通过handler.invoke()
代理,如下面的代码所示。只不过动态代理是运行时生成的这个类,并且是直接生成了字节码。
interface MyService {
@GET("/user")
fun getUser(): Observable<Response<User>>
@GET("/name")
fun getName(userId: String): Observable<Response<String>>
}
//自动生成的代码示例
class SuspendServiceProxy implements SuspendService {
InvocationHandler handler;
@NonNull
@Override
public Observable<Response<User>> getUser() {
return handler.invoke(
this,
SuspendService.class.getMethod("getUser", String.class),
new Object[]{}
);
}
@NonNull
@Override
public Observable<Response<String>> getName(@NonNull String userId) {
return handler.invoke(
this,
SuspendService.class.getMethod("getName", String.class),
new Object[]{userId}
);
}
}
3. 创建请求对象
创建好API对象之后,就可以调用它的方法创建请求对象。
val observable = myServiceClass.getUser()
根据前面可以知道,这个方法代理给了InvocationHandler
,在这个方法首先判断这个方法对象是不是实体对象,如果是的话就直接调用就行。
如果不是一个对象,就把调用再代理给ServiceMethod
。首先调用loadServiceMethod()
创建ServiceMethod
对象,然后调用invoke()
方法得到返回值。
public @Nullable Object invoke(Object proxy, Method method, @Nullable Object[] args) throws Throwable {
// 判断这个方法对象是不是实体对象,如果是的话就直接调用
if (method.getDeclaringClass() == Object.class) {
return method.invoke(this, args);
}
// 判断这个方法对象是不是实体对象,如果是的话就直接调用
if (platform.isDefaultMethod(method)) {
return platform.invokeDefaultMethod(method, service, proxy, args);
}
return loadServiceMethod(method).invoke(args != null ? args : emptyArgs);
}
3.1. 创建ServiceMethod对象
ServiceMethod
是Retrofit
内部的自定义的代理方法,实际的逻辑是交给它处理。
abstract class ServiceMethod<T> {
abstract @Nullable T invoke(Object[] args);
}
loadServiceMethod()
方法获取ServiceMethod
对象。
方法对象Method
作为Key
缓存ServiceMethod
对象在Retrofit
中,因为创建ServiceMethod
是一个耗时过程,所以弄成单例模式。
如果拿不到缓存,就调用ServiceMethod.parseAnnotations()
创建一个ServiceMethod
对象。
private final Map<Method, ServiceMethod<?>> serviceMethodCache = new ConcurrentHashMap<>();
ServiceMethod<?> loadServiceMethod(Method method) {
ServiceMethod<?> result = serviceMethodCache.get(method);
if (result != null) return result;
synchronized (serviceMethodCache) {
result = serviceMethodCache.get(method);
if (result == null) {
result = ServiceMethod.parseAnnotations(this, method);
serviceMethodCache.put(method, result);
}
}
return result;
}
ServiceMethod.parseAnnotations()
中只做了一件事,创建RequestFactory
然后将创建ServiceMethod
的工作又交给了子类HttpServiceMethod
处理。
RequestFactory
是接口的参数配置,通过解析接口的注解,返回值,入参及其注解等获得这些参数。
之后将解析完成的数据传递给HttpServiceMethed
,由它继续创建ServiceMethod
。
abstract class ServiceMethod<T> {
static <T> ServiceMethod<T> parseAnnotations(Retrofit retrofit, Method method) {
RequestFactory requestFactory = RequestFactory.parseAnnotations(retrofit, method);
...
return HttpServiceMethod.parseAnnotations(retrofit, method, requestFactory);
}
abstract @Nullable T invoke(Object[] args);
}
3.1.1. 创建RequstFactory
RequestFactory
内部也是一个Builder
模式。主要做了两件事:
- 遍历接口注解,初始化配置参数,这里读取的是
POST
、GET
等注释。 - 遍历入参及其注解,将入参转换为
ParameterHandler
对象,将每个参数的设置配置的逻辑代理给了它处理。
final class RequestFactory {
static RequestFactory parseAnnotations(Retrofit retrofit, Method method) {
return new RequestFactory.Builder(retrofit, method).build();
}
}
}
3.1.2.2. OkHttpCall
最终创建的网络请求对象是OkHttpCall
。
OkHttpCall
实现了Retrofit.Call
接口,这个接口与OkHttp.Call
基本一致,这里只介绍它的三个方法:
execute()
同步发起请求并且返回请求体Response
。enqueue()
异步发起请求,通过Callback
通信,需要注意的是处理的回调也是在异步中调用的。cancel()
取消请求。
查看OkHttpCall
的实现,可以发现所有的Call
接口方法的具体实现都是代理给了rawCall
。
cancel()
直接代理给rawCall.cancel()
。
execute()
代理给rawCall.execute()
,将返回值交给parseResponse()
转换了一次。
enqueue()
代理给rawCall.enqueue()
,多加了一层Callback
回调,在成功回调中也是交给parseResponse()
转换之后再回调给原始的Callback
。
也就是说OkHttpCall
把所有的逻辑静态代理给了rawCall
,这样做的好处是可以在对应的地方做一下额外的处理,也就是获得返回值通过parseResponse()
转换数据。
fAndroid
平台默认的CallBackExecutor
在Platform
的实现类Android
中,将Runnable
抛到MainHandler
中,实现回调到主线程。
static final class Android extends Platform {
@Override public Executor defaultCallbackExecutor() {
return MainThreadExecutor();
}
static class MainThreadExecutor implements Executor {
private final Handler handler = new Handler(Looper.getMainLooper());
@Override public void execute(Runnable r) {
handler.post(r);
}
}
}
3.1.2.3.2. RxJava2CallAdapterFactory
RxJava2CallAdapterFactory
只在返回类型是Observable
之类的时候创建CallAdapter
。
public final class RxJava2CallAdapterFactory extends CallAdapter.Factory {
@Override
public @Nullable CallAdapter<?, ?> get(
Type returnType, Annotation[] annotations, Retrofit retrofit) {
Class<?> rawType = getRawType(returnType);
if (rawType != Observable.class) { //省略了其他类型
return null;
}
boolean isResult = false;
boolean isBody = false;
Type responseType;
Type observableType = getParameterUpperBound(0, (ParameterizedType) returnType);
Class<?> rawObservableType = getRawType(observableType);
if (rawObservableType == Response.class) {
responseType = getParameterUpperBound(0, (ParameterizedType) observableType);
}
...
return new RxJava2CallAdapter(...);
}
}
RxJava2CallAdapter
中的adapt()
将Call
封装成observable
返回。
final class RxJava2CallAdapter<R> implements CallAdapter<R, Object> {
@Override public Object adapt(Call<R> call) {
Observable<Response<R>> observable = new CallExecuteObservable<>(call);
...
return RxJavaPlugins.onAssembly(observable);
}
}
最后进到CallExecuteObservable
,在启动的时候调用call.execute()
并将结果抛给观察者。
final class CallExecuteObservable<T> extends Observable<Response<T>> {
private final Call<T> originalCall;
CallExecuteObservable(Call<T> originalCall) {
this.originalCall = originalCall;
}
@Override protected void subscribeActual(Observer<? super Response<T>> observer) {
Call<T> call = originalCall.clone();
try {
...
Response<T> response = call.execute();
observer.onNext(response);
}
}
}
3.2. 回顾
Api
接口实例动态代理给HttpServiceMethod
。- 网络请求的真实执行者是
OkHttpClient
创建的RealCall
。 Api
接口的调用到RealCall
直接有两层静态代理 OkHttpCall
和CallAdapter
。OkHttpCall
代理了RealCall
,额外调用Converter
做序列化处理。CallAdapter
代理了OkHttpCall
,可以在这里做扩展,将Call
转换为实际的返回类型。
4. 协程实现
在看Retrofit
如何实现协程之前,先梳理一下协程的基本概念。
当有一个延迟任务,后续的逻辑又需要等待这个任务执行完成返回数据,能继续执行,为了不阻塞线程,一般就需要就需要通过线程调度和传递回调来通信。在使用了协程之后,却只需要像同步代码那样书写,就可以完成这些操作。
但这并不是什么黑魔法,并不是用了协程之后不需要线程调度和传递回调,而是将这些繁琐的事进行复杂的封装并且为我们自动生成。将回调封装成Continuation
,将线程的调度封装成调度器。
Continuation
,调用它的 resume
或者 resumeWithException
来返回结果或者抛出异常,跟我们所说的回调一模一样。
调度器的本质是一个协程拦截器,它拦截的对象就是Continuation
,进而在其中实现回调的调度。调度器一般使用现成的,类如Dispatchers.Main
,如果去挖它的源码,你会发现到了最后,还是使用的handler.post()
,也跟我们所说的线程调度一模一样。
而前面有讲到Retrofit
的实现很多时候需要依据返回类型做不同的处理,所以就需要了解协程是如何自动生成的回调代码和如何传递回调。写一个简单的协程接口,看一下转换后的Java
代码,以及尝试在Java
代码中调用协程接口。
可以看到返回值String
被封装成了Continuation<String>
作为入参传递,思考一下回调不也是这样实现的。
真实的返回值成了Object
(用于状态机状态切换)。
//Kotlin代码
interface SuspendService {
suspend fun C(c1: Long): String
}
//字节码转化的Java代码
public interface SuspendService {
@Nullable
Object C(long var1, @NotNull Continuation var3);
}
//尝试在Java中调用suspend方法
class MMM {
SuspendService service;
public static void main(String[] args) {
MMM mmm = new MMM();
mmm.service.C(1L, new Continuation<String>() {
@NonNull
@Override
public CoroutineContext getContext() {
return null;
}
@Override
public void resumeWith(@NonNull Object o) {
}
});
}
}
接着回到动态代理那部分,因为suspend
生成的代码会多加一个回调参数Continuation
,那么动态代理的时候这个参数就会传入到代理的handler
中。
Continuation
的创建和使用十分的繁琐,最好的处理方法应该是把它再丢进一个kotlin
的suspend
方法中,让编译器去处理这些东西,而这个也就是Retrofit
实现协程的原理。
interface SuspendService {
@GET("/user")
suspend fun getUser(): Response<User>
@GET("/name")
suspend fun getName(userId: String): Response<String>
}
//动态代理生成字节码示例
class SuspendServiceProxy implements SuspendService {
InvocationHandler handler;
@Nullable
@Override
public Object getUser(@NonNull Continuation<? super Response<User>> $completion) {
return handler.invoke(
this,
SuspendService.class.getMethod("getUser", Continuation.class),
new Object[]{$completion}
);
}
@Nullable
@Override
public Object getName(@NonNull String userId, @NonNull Continuation<? super Response<String>> $completion) {
return handler.invoke(
this,
SuspendService.class.getMethod("getName", String.class, Continuation.class),
new Object[]{userId, $completion}
);
}
}
接着再回到HttpServiceMethod
,看看刚才被省略的代码。
在这里面会判断是不是suspend
方法,判断的逻辑在RequestFactory
中,判断的方法就是判断参数有没有Continuation
对象,感兴趣可以去RequestFactory
源码瞅瞅。
现在如果是suspend
方法,会直接自定义一个类型adapterType
,它的实际类型是Call
,泛型是实际的返回类型(Response<T>
里面的T
)。之后将他作为返回类型去创建CallAdapter
,而这里实际创建的就是DefaultCallAdapterFactory
的ExecutorCallbackCall
。最后创建SuspendForResponse
对象返回。
abstract class HttpServiceMethod<ResponseT, ReturnT> extends ServiceMethod<ReturnT> {
static <ResponseT, ReturnT> retrofit2.HttpServiceMethod<ResponseT, ReturnT> parseAnnotations(
Retrofit retrofit, Method method, RequestFactory requestFactory) {
/**
* 通过判断参数是不是Continuation,标志函数是不是suspend
* 在requestFactory内部处理
*/
boolean isKotlinSuspendFunction = requestFactory.isKotlinSuspendFunction;
...
Annotation[] annotations = method.getAnnotations();
Type adapterType;
if (isKotlinSuspendFunction) {
Type[] parameterTypes = method.getGenericParameterTypes();
Type responseType = Utils.getParameterLowerBound(0,
(ParameterizedType) parameterTypes[parameterTypes.length - 1]);
if (getRawType(responseType) == Response.class && responseType instanceof ParameterizedType) {
// Unwrap the actual body type from Response<T>.
responseType = Utils.getParameterUpperBound(0, (ParameterizedType) responseType);
continuationWantsResponse = true;
}
/**
* 自己新建一个返回类型,将实际的返回例行包装给Call
*/
adapterType = new Utils.ParameterizedTypeImpl(null, Call.class, responseType);
} else {
adapterType = method.getGenericReturnType();
}
CallAdapter<ResponseT, ReturnT> callAdapter =
createCallAdapter(retrofit, method, adapterType, annotations);
Type responseType = callAdapter.responseType();
Converter<ResponseBody, ResponseT> responseConverter =
createResponseConverter(retrofit, method, responseType);
okhttp3.Call.Factory callFactory = retrofit.callFactory;
if (!isKotlinSuspendFunction) {
return new CallAdapted<>(requestFactory, callFactory, responseConverter, callAdapter);
} else {
return (HttpServiceMethod<ResponseT, ReturnT>) new SuspendForResponse<>(requestFactory,
callFactory, responseConverter, (CallAdapter<ResponseT, Call<ResponseT>>) callAdapter);
}
}
}
在SuspendForResponse
中,adapt()
的返回类型对应到了suspend
的返回类型Object
。并且其中的逻辑就是解析出Call
和Continuation
对象,然后有调用KotlinExtensions.awaitResponse()
,就如之前说的,它是一个suspend
方法,在代理中不处理Continuation
,而是交给编译器去处理。
static final class SuspendForResponse<ResponseT> extends HttpServiceMethod<ResponseT, Object> {
private final CallAdapter<ResponseT, Call<ResponseT>> callAdapter;
SuspendForResponse(RequestFactory requestFactory, okhttp3.Call.Factory callFactory,
Converter<ResponseBody, ResponseT> responseConverter,
CallAdapter<ResponseT, Call<ResponseT>> callAdapter) {
super(requestFactory, callFactory, responseConverter);
this.callAdapter = callAdapter;
}
@Override
protected Object adapt(Call<ResponseT> call, Object[] args) {
call = callAdapter.adapt(call);
Continuation<Response<ResponseT>> continuation =
(Continuation<Response<ResponseT>>) args[args.length - 1];
...
return KotlinExtensions.awaitResponse(call, continuation);
}
}
KotlinExtensions.awaitResponse()
是Call
的扩展函数,扩展函数的实现是通过静态方法传入this
的方法实现的,所以前面传入awaitResponse()
的参数有两个,分别是Call
对象和Continuation
对象。
KotlinExtensions.awaitResponse()
的主体是suspendCancellableCoroutine
方法,suspendCancellableCoroutine
运行在协程当中并且帮我们获取到当前协程的 CancellableContinuation
实例,CancellableContinuation
是一个可取消的Continuation
。通过调用它的 invokeOnCancellation
方法可以设置一个取消事件的回调,一旦这个回调被调用,那么意味着调用所在的协程被取消了,这时候我们也要相应的做出取消的响应,也就是把OkHttp
发出去的请求给取消掉。这段建议多读几遍。
之后调用Call.enqueue()
发送网络请求,在Callback
中调用CancellableContinuation
的 resume
或者 resumeWithException
来返回结果或者抛出异常。
这里的Callback
也是经过了callAdapter
和OkHttpCall
处理,乏了。
suspend fun <T> Call<T>.awaitResponse(): Response<T> {
return suspendCancellableCoroutine { continuation ->
continuation.invokeOnCancellation {
cancel()
}
enqueue(object : Callback<T> {
override fun onResponse(call: Call<T>, response: Response<T>) {
continuation.resume(response)
}
override fun onFailure(call: Call<T>, t: Throwable) {
continuation.resumeWithException(t)
}
})
}
}
//扩展函数示例
fun <T> Call<T>.awaitResponse(){
toString()
}
//扩展函数示例转换为Java代码
public static final void awaitResponse(@NotNull Call $this$awaitResponse) {
$this$awaitResponse.toString();
}