kt协程 | suspend非阻塞挂起魔术解密
一 前言
kotin协程,一种轻量级用户态线程,能通过suspend函数避免回调地狱以及快速实现线程的切换等,已经普及到大量实际项目中。这里将解析协程核心功能suspend的「非阻塞式挂起」实现原理,从而避免管中窥豹,使得在后续使用kotlin协程开发能更加得心应手。
二 callback代码同步化
kotlin官方宣传的非阻塞式挂起,即用同步代码写异步操作。其实可以理解,就是通过suspend将代码“用阻塞式的写法实现非阻塞式线程调度”,就是内部帮我们切线程。 先简述同步化写法,下面会逐步分析,kotlin协程是如何通过suspend来实现非阻塞式的代码同步化
2.1 callback代码同步化
假设一个简单的需求,先从网络获取最新的消息,再与本地数据库做增量操作,获取完整消息。 在原生中,回调代码如下:
fun fetch(){
fetchRemote { msg->
fetchLocal(msg) { result ->
//实际业务操作
println("result:$result")
}
}
}
fun fetchRemote(onNext:(Int)->Unit){
Thread.sleep(300)
val value = 1
onNext(value)
}
fun fetchLocal(id:Int,onNext:(Int)->Unit){
Thread.sleep(300)
val value = 2
onNext(id + value)
利用了kotlin协程,可以直接以同步方式:
suspend fun fetch():Int{//用正常同步写法,消除回调
val msg = fetchRemote()
val result = fetchLocal(msg)
println("result:$result")
return result
}
//简单起见,采用suspendCoroutine
suspend fun fetchRemote() = suspendCoroutine {
it.resume(1)
}
suspend fun fetchLocal(id:Int) = suspendCoroutine {
it.resume(id + 2)
}
ok,上面的 suspendFetch
函数写法,就是传说中的 “同步代码实现异步操作” 了,简称「代码同步化」
三 suspend解密
备注:为方便理解,下面展示的是伪代码,与实际字节码翻译可能存在不同;
3.1 suspend函数解体
这里先讲解一个声明为suspend
的函数,如suspend fun fetch():Int
,会如何被kotlin编译器解体,再讲述执行过程。
先总结:kotlin编译器会使用状态机实现回调功能,每一个suspend函数都是状态机的一个状态,suspend就是声明一个状态而已,再往简单地说,编译器会注入代码,内部帮我们实现了回调!
- 编译器首先会在suspend函数,加入一个额外的参数
completion: Continuation
,比如会将上述的suspend fun fetch():Int
变成fun fetch(completion: Continuation):Any
,这也额外解释了为何suspend只能被suspend函数或协程内调用。
注意,这里的返回值变成any,是因为除了我们定义的返回值以外,还可能返回挂起标识CoroutineSingletons.COROUTINE_SUSPENDED
,也就是用于实现挂起的逻辑
- kotlin编译器用状态机机制判断当前会执行哪个代码块,每一个挂起函数都会被当成一个状态点,用label来表示,如
fetchRemote
是一个label,一个可能会存在挂起的状态,伪代码:
fun fetch(completion: Continuation):Any{
...
when(label){
0 -> {//label 为默认值0 ,即fetch函数被第一次调用运行,函数代码此时正常运行,还没接触到任何其他挂起函数
...
label = 1 //下面会运行一个挂起函数,所以状态label立即加1,也就是说label==1,表示代码运行到了第一个挂起函数,此处是fetchRemote()
val state = fetchRemote()
...
return COROUTINE_SUSPENDED
}
1 -> { //label 1 ,表示在遇到第一个挂起函数fetchRemote() 之后,调用resume等方式恢复了调度
...
label = 2 //下面会运行另一个挂起函数,所以状态label立即加1,也就是说label==2,表示代码运行到了第二个挂起函数,此处是fetchLocal()
val state = fetchLocal(id)
...
return COROUTINE_SUSPENDED
}
2 -> {//label 2 ,表示在遇到第二个挂起函数fetchLocal() 之后,调用resume等方式恢复了调度
...
println("result:$result")
return result
}
}
}
再次提下总结:每一个suspend函数
都是状态机的一个状态,suspend就是声明一个状态,体现到代码层次就是一个label值来表示。
- 到这里,还需要在状态之间分发上一个状态机的执行结果「即,上一个suspend的返回值」。kotlin通过生成一个状态机管理类,存储label和结果值,解决这个问题:
这里的类命名只是为了方便理解
class FetchStateMachine(
completion: Continuation
) : ContinuationImpl(completion) {
var result: Result? = null
var label: Int = 0
override fun invokeSuspend(result: Any?) {
this.result = result
fetch(this)
}
}
先注意这里的invokeSuspend包裹了真实的要执行的协程体,并保存了传进来的执行结果result,负责存储每个suspend函数执行结果以共享。
4.一个小点,就是如何判断它是第一次执行这个suspend函数,也就是初始状态label==0
。这里比较简单,直接通过判断completion是不是生成的状态机类就知道了,不是状态机类就代表第一次执行,包裹起来:
val continuation = completion as? FetchStateMachine ?: FetchStateMachine(completion)
- 再接上最开始提到的挂起逻辑。是否特别好奇过,究竟协程是如何知道该挂起,该怎么做了?答案很简单,当某个挂起函数,如
fetchRemote()
,没有调resume时,编译器会让它返回一个CoroutineSingletons.COROUTINE_SUSPENDED
结果,这也是为什么返回值会变成Any,然后只要判断result == 挂起标志,代码直接return,就实现挂起了!!是不是很朴实??
val result = fetchRemote(continuation)
if (result == CoroutineSingletons.COROUTINE_SUSPENDED){
return result
}
到了这里,就可以看到编译器对fetch()
解体的代码的模样了:
fun fetch(completion: Continuation): Any {
class FetchStateMachine(
completion: Continuation
) : ContinuationImpl(completion) {
var result: Result? = null //执行结果的共享
var label: Int = 0 //判断执行到哪个代码快,挂起函数
override fun invokeSuspend(result: Any?) {//触发状态机运行,调用resumeWith时会触发
this.result = result
suspendFetch(this)
}
}
//第一次执行,包裹成状态机类
val continuation = completion as? FetchStateMachine ?: FetchStateMachine(completion)
val result = continuation.result
val suspended = COROUTINE_SUSPENDED
when (continuation.label) {
0 -> {
//检查是否异常
result.throwOnFailure()
//立即修改label+1
continuation.label = 1
val var0 = fetchRemote(continuation)
if (var0 == suspended){ //表示suspendRemote挂起
return var0
}
//再次触发状态机跑下一个 label1,正常情况不会跑这里。只有当suspendRemote实现是普通函数 suspend fun suspendRemote() = 1,才会触发
fetch(continuation)
}
1 -> {
result.throwOnFailure()
continuation.label = 2
val var0 = fetchLocal(result.value,continuation)
if (var0 == suspended){//这里就相当于一次挂起了
return var0
}
fetch(continuation)
}
2 -> {
result.throwOnFailure()
return result.value
}
else -> throw IllegalStateException("call to 'resume' before 'invoke' with coroutine")
}
}
3.2 执行流程
ok,这里针对编译器解体的代码,讲一下状态机执行过程;
- launch协程后,会触发协程体执行,从而第一次调用到
fetch()
方法,开始执行状态机; - 第一次进来,将completion包装成状态机类,此时label为0,执行到第一个挂起函数
fetchRemote()
; - 当
fetchRemote()
是个普通函数,类似suspend fun a()=1
这种只是简单声明suspend的函数,会直接返回函数结果值,递归调度fetch(continuation)
;
//Decompilerkotlin to java by cfr
public static final Object a(@NotNull Continuation $completion) {
return Boxing.boxInt((int)1);
}
- 当
fetchRemote()
是实现了suspendCoroutine/suspendCoroutine
的正经挂起函数时,函数会返回一个挂起标志CoroutineSingletons.COROUTINE_SUSPENDED
,这也是会什么suspend函数返回值是Any类型,到这里会发生一次挂起;
- 对于
fetchRemote
,当调用resumeWith
恢复调度时,会递归循环调用我们一开始生成的状态机包裹类的invokeSuspend方法
,而invokeSuspend方法
就是会再次触发自身函数,即fetch()
- 此时触发状态机接着跑此时的label为1,会跑到
fetchLocal
挂起方法。然后循环递归步骤3 4,直到结束。
这里的执行流程核心就是一个循环递归,从而帮我们内部实现回调。
作者:CYQ
链接:https://juejin.cn/post/7016636661400338463
来源:稀土掘金
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。