注册

Kotlin协程实现原理概述

协程的顶层实现-CPS


现有如下代码:


fun test(a: Int, b: Int) {
// 求和
var result = a + b
// 乘以2
result = result shl 1
// 加2
result += 2
// 打印结果
println(result)
}

我们来将代码SRP一下(单一职责):


// 加法
fun sum(a: Int,b: Int) = a + b
// x乘以2
fun double(x: Int) = x shl 1
// x加2
fun add2(x: Int) = x + 2

// 最终的test
fun test(a: Int, b: Int) {
// 从内层依次调用,最终打印
println(add2(double(sum(a,b))))
}

可以看到,我们将原来一坨的方法,抽离成了好几个方法,每个方法干一件事,虽然提高了可读性和可维护性,但是代码复杂了,我们来让它更复杂一点。


上述代码是 让内层方法的返回值 作为参数 传递给外层方法,现在我们 把外层方法作为接口回调 传递给 内层方法:


// 加法,next是加法做完的回调,会传入相加的结果
fun sum(a: Int, b: Int, next: (Int) -> Unit) = a + b
// x乘以2
fun double(x: Int, next: (Int) -> Unit) = x shl 1
// x加2
fun add2(x: Int, next: (Int) -> Unit) = x + 2

// 最终的test
fun test2(a: Int, b: Int) {
// 执行加法
sum(a, b) { sum ->
// 加完执行乘法
double(sum) { double ->
// 乘完就加2
add2(double) { result ->
// 最后打印
println(result)
}
}
}
}

这就是CPS的代码风格:通过接口回调的方式来实现的


假设: 我们上述的几个方法: sum()/double()/add2()都是挂起函数,那么最终也会编译为CPS风格的回调函数方式,也就是:原来看起来同步的代码,经过编译器的"修改",变成了异步的方法,也就是:CPS化了,这就是kotlin协程的顶层实现逻辑。


现在,让我们来验证一下,我们定义一个suspend函数,反编译看下是否真的CPS化了。


// 定义挂起函数
suspend fun test(id: String): String = "hello"

反编译结果如下:


// 参数添加了一个Continuation参数
public final Object test(@NotNull String id, @NotNull Continuation $completion) {
return "hello";
}

可以看到,多了个Continuation参数,这是个接口,是在本次函数执行完毕后执行的回调,内容如下:


public interface Continuation<in T> {
// 保存上下文(比如变量状态)
public val context: CoroutineContext

// 方法执行结束的回调,参数是个范型,用来传递方法执行的结果
public fun resumeWith(result: Result<T>)
}

好,现在我们知道了suspend函数 是通过添加Continuation来实现的,我们来看个具体的业务:


// 根据id获取token
suspend fun getToken(id: String): String = "token"

// 根据token获取info
suspend fun getInfo(token: String): String = "info"

// 测试
suspend fun test() {
// 先获取token,这是耗时请求
val token = getToken("123")
// 再根据token获取info,这也是个耗时请求
val info = getInfo(token)
// 打印
println(info)
}

上述的业务代码很简单,但是前两步都是耗时操作,线程会卡在那里wait吗?显然不会,既然是suspend函数,那么就可以CPS化,等价的CPS代码如下:


// 跟上述相同,传递了Continuation回调
fun getToken(id: String, callback: Continuation<String>): String = "token"

// 跟上述相同,传递了Continuation回调
fun getInfo(token: String, callback: Continuation<String>): String = "info"

// 测试(只写了主线代码)
fun test() {
// 先获取token,传入回调
getToken("123", object : Continuation<String> {
override fun resumeWith(result: Result<String>) {
// 用token获取info,传入回调
val token = result.getOrNull()
getInfo(token!!, object : Continuation<String> {
override fun resumeWith(result: Result<String>) {
// 打印结果
val info = result.getOrNull()
println(info)
}
})
}
})
}

上述就是无suspend的CPS风格代码,通过传入接口回调来实现协程的同步代码风格。


接下来我们来反编译suspend风格代码,看下它里面是怎么调度的。


协程的底层实现-状态机


我们先来简单修改下suspend test函数:


// 没变化
suspend fun getToken(id: String): String = "token"
// 没变化
suspend fun getInfo(token: String): String = "info"

// 添加了局部变量a,看下suspend怎么保存a这个变量
suspend fun test() {
val token = getToken("123") // 挂起点1
var a = 10 // 这里是10
val info = getInfo(token) // 挂起点2,需要将前面的数据保存(比如a),在挂起点之后恢复
println(info)
println(a
}

每个suspend函数调用点,都会生成一个挂起点,在挂起点我们要保存当前的运行状态,比如局部变量等。


反编译后的代码大致如下:


public final Object getToken(String id, Continuation completion) {
return "token";
}

public final Object getInfo(String token, Continuation completion) {
return "info";
}

// 重点函数(伪代码)
public final Object test(Continuation<String>: continuation) {
Continuation cont = new ContinuationImpl(continuation) {
int label; // 保存状态
Object result; // 保存中间结果,还记得那个Result<T>吗,是个泛型,因为泛型擦除,所以为Object,用到就强转
int tempA; // 保存上下文a的值,这个是根据具体代码产生的
};
switch(cont.label) {
case 0 : {
cont.label = 1; //更新label

getToken("123",cont) // 执行对应的操作,注意cont,就是传入的回调
break;
}

case 1 : {
cont.label = 2; // 更新label

// 这是一个挂起点,我们要保存上下文数据,这里就保存a的值
int a = 10;
cont.tempA = a; // 保存a的值

// 获取上一步的结果,因为泛型擦除,需要强转
String token = (Object)cont.result;
getInfo(token, cont); // 执行对应的操作
break;
}

case 2 : {
String info = (Object)cont.result; // 获取上一步的结果
println(info); // 执行对应的操作

// 在挂起点之后,恢复a的值
int a = cont.tempA;
println(a);

return;
}
}
}

我们可以将每个case理解为一个状态,每个case分支对应的语句,理解为一个Continuation实现。


上述伪代码大致描述了协程的调度流程:



  • 1 调用test函数时,需要传入一个Continuation接口,我们会对它进行二次装饰。
  • 2 装饰就是根据函数具体逻辑,在内部添加额外的上下文数据和状态信息(也就是label)。
  • 3 每个状态对应一个Continuation接口,里面会执行对应的业务逻辑。
  • 4 每个状态都会: 保存上下文信息 -> 获取上一个状态的结果 -> 执行本状态业务逻辑 -> 恢复上下文信息。
  • 5 直到最后一个状态对应的逻辑执行完毕。

总结


综上,我们可以归纳以下几点:



  • 1 Kotlin协程没有很"频繁"的切换线程,它是在顶层通过调度方式实现的,所以效率是比较高的。
  • 2 Kotlin中,每个suspend方法,都需要一个Continuation接口实现,用来执行下一个状态的操作;并且,每个suspend方法的调用点都会产生一个挂起点。
  • 3 每个挂起点,都会产生一个label,对应于状态机的一个状态,不同的状态之间,通过Continuation来切换。
  • 4 Kotlin协程会在每个挂起点保存当前的上下文数据,并且在挂起点之后进行恢复。这样,每个状态之间就是相互独立的,可以独立调度。
  • 5 协程的切换,只不过是从一种状态切换到另一种状态,因为不同状态是相互独立的,所以在合适的时机,再切换回来也不会对结果造成影响。

作者:奔波儿灞取经
链接:https://juejin.cn/post/7011011123814072327
来源:掘金
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

0 个评论

要回复文章请先登录注册