一文搞明白协程的挂起和恢复
协程是使用非阻塞式挂起的方式来实现线程运行的。那协程又是如何挂起和恢复的,这里面的概念又是什么,带着这些问题就让我们重新探究下协程的挂起和恢复。
我们先创建个协程:
override fun initView() {
lifecycleScope.launch {
val num = dealA()
dealB(num)
}
}
private suspend fun dealA():Int {
withContext(Dispatchers.IO) {
delay(3000)
}
return 1
}
private suspend fun dealB(num:Int) {
withContext(Dispatchers.IO) {
delay(1000)
}
}
可以看到写协程的时候要在函数前面加上suspend
修饰,这也是常说的挂起函数,那挂起函数又是什么?
挂起函数
了解之前,我们先将上面的挂起函数dealA()
反编译成 Java,简单的看看编译后是什么样的?(省略了后面会着重解释的一些代码,主要先看挂起函数的方法)
private final Object dealA(Continuation var1) {
......
Object $result = ((<undefinedtype>)$continuation).result;
Object var4 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch(((<undefinedtype>)$continuation).label) {
case 0:
ResultKt.throwOnFailure($result);
CoroutineContext var10000 = (CoroutineContext)Dispatchers.getIO();
Function2 var10001 = (Function2)(new Function2((Continuation)null) {
int label;
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
.......
return Unit.INSTANCE;
}
......
if (BuildersKt.withContext(var10000, var10001, (Continuation)$continuation) == var4) {
return var4;
}
break;
......
}
return Boxing.boxInt(1);
}
可以看到suspend
经过反编译后,会出现Continuation
类型的参数传进去,并且返回的是Object
对象。
是不是对Continuation
是什么很好奇,这也是协程的核心部分`:
public interface Continuation<in T> {
//对应于这个延续的协程的上下文
public val context: CoroutineContext
//继续执行相应的协程,传递一个成功或失败的 [result] 作为最后一个暂停点的返回值。
public fun resumeWith(result: Result<T>)
}
从定义上可以看出Continuation
其实就是一个带有泛型参数的callback
,而resumeWith
也就相当于onSuccess的成功回调,来恢复执行后面的代码,除这个之外,还有一个ContineContext
,它就是协程的上下文。
回到dealA
方法中,当执行到withContext
方法的时候,会返回CoroutineSingletons.COROUTINE_SUSPENDED
,表示函数被挂起了,到这里你是不是觉得就结束了,其实还没有。
在查看过程中是不是看到有个invokeSuspend
的回调方法还没有被调用,这又是在什么时候会被触发的?
那我们就从刚才执行到的withContext
那里进一步查看,写过协程的都知道这就是用来切换线程:
public suspend fun <T> withContext(
context: CoroutineContext,
block: suspend CoroutineScope.() -> T
): T {
contract {
callsInPlace(block, InvocationKind.EXACTLY_ONCE)
}
return suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
// compute new context
val oldContext = uCont.context
val newContext = oldContext + context
// always check for cancellation of new context
newContext.ensureActive()
// FAST PATH #1 -- 新上下文与旧上下文相同
if (newContext === oldContext) {
val coroutine = ScopeCoroutine(newContext, uCont)
return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
}
// FAST PATH #2 新的调度程序与旧的调度程序相同
if (newContext[ContinuationInterceptor] == oldContext[ContinuationInterceptor]) {
val coroutine = UndispatchedCoroutine(newContext, uCont)
// 上下文有变化,所以这个线程需要更新
withCoroutineContext(newContext, null) {
return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
}
}
// SLOW PATH -- 使用新的调度程序
val coroutine = DispatchedCoroutine(newContext, uCont)
block.startCoroutineCancellable(coroutine, coroutine)
coroutine.getResult()
}
}
在withContext
方法中,传入了两个参数,一个是协程的上下文,另一个就是协程里的代码。可以看到不管新的调度和旧的调度一样最后都是会调用startCoroutineCancellable
方法:
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(
receiver: R, completion: Continuation<T>,
onCancellation: ((cause: Throwable) -> Unit)? = null
) =
runSafely(completion) {
createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit), onCancellation)
}
而在startCoroutineCancellable
方法中,创建了Coroutination
,之后会调用resumeCancelableWith
方法:
public fun <T> Continuation<T>.resumeCancellableWith(
result: Result<T>,
onCancellation: ((cause: Throwable) -> Unit)? = null
): Unit = when (this) {
is DispatchedContinuation -> resumeCancellableWith(result, onCancellation)
else -> resumeWith(result)
}
在这里是不是看到了我们之前提到过的resumeWith
方法,之前也解释了下它就相当于一个回调。然后我们再来看下它的具体实现,是在ContinuationImpl
类中:
public final override fun resumeWith(result: Result<Any?>) {
var current = this
var param = result
while (true) {
probeCoroutineResumed(current)
with(current) {
val completion = completion!! // fail fast when trying to resume continuation without completion
val outcome: Result<Any?> =
try {
val outcome = invokeSuspend(param)
if (outcome === COROUTINE_SUSPENDED) return
Result.success(outcome)
} catch (exception: Throwable) {
Result.failure(exception)
}
.......
}
}
}
在resumeWith
方法中执行到了我们一直在找的invokeSuspend
,通过这个方法将result
回调了出去,并判断当前是不是COROUTINE_SUSPENDED
(挂起),是挂起直接退出,去执行上面说到的invokeSuspend
里面的内容。
在这里我们了解到invokeSuspend
是由resumeWith
所触发的,那接下来我们看看真正的挂起和恢复如何被执行的。
协程的启动
了解挂起和恢复的过程,要从协程的启动执行开始,我们还是跟刚才一样反编译启动协程的代码:
BuildersKt.launch$default((CoroutineScope)LifecycleOwnerKt.getLifecycleScope(this), (CoroutineContext)null, (CoroutineStart)null, (Function2)(new Function2((Continuation)null) {
int label;
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
......
}
@NotNull
public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
Intrinsics.checkNotNullParameter(completion, "completion");
Function2 var3 = new <anonymous constructor>(completion);
return var3;
}
public final Object invoke(Object var1, Object var2) {
return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
}
}), 3, (Object)null);
在协程启动的反编译代码我们又看到了ininvokeSuspend
方法,这个方法又是在最下面创建了Continuation
,之后在invoke
中被调用,更多的信息是看不出来了。我们还是回到launch
源码内部里面去寻找答案。
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
//coroutine.start
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
start(block, receiver, this)
}
当查看到start
这里的时候,你会发现跟进不下去了。那我们就换种方法,还是将这个类反编译下,你会看到变成了这样:
public final void start(@NotNull CoroutineStart start, Object receiver, @NotNull Function2 block) {
Intrinsics.checkNotNullParameter(start, "start");
Intrinsics.checkNotNullParameter(block, "block");
start.invoke(block, receiver, (Continuation)this);
}
//使用此协程启动策略将带有接收器的相应块作为协程启动
@InternalCoroutinesApi
public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>): Unit =
when (this) {
DEFAULT -> block.startCoroutineCancellable(receiver, completion)
ATOMIC -> block.startCoroutine(receiver, completion)
UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
LAZY -> Unit // will start lazily
}
在这里又看到了我们熟悉的startCoroutineCancellable
,由于默认值为CoroutineStart.DEFAULT
,所以该方法会被调用。后面会怎么调用,应该很清楚了,最后会一路调用到invokeSuspend
方法,所以这时候就会执行到suspend{}
代码块里面,协程启动!
协程的挂起
调用到invokeinvokeSuspend
函数里面的代码的时候,我们单拎出出来看下:
//launch
public final Object invokeSuspend(@NotNull Object $result) {
Object var3 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
Object var10000;
ButtonTextActivity var4;
switch(this.label) {
case 0:
ResultKt.throwOnFailure($result);
var4 = ButtonTextActivity.this;
this.label = 1;
var10000 = var4.dealA(this);
if (var10000 == var3) {
return var3;
}
break;
case 1:
ResultKt.throwOnFailure($result);
var10000 = $result;
break;
case 2:
ResultKt.throwOnFailure($result);
return Unit.INSTANCE;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
int num = ((Number)var10000).intValue();
var4 = ButtonTextActivity.this;
this.label = 2;
if (var4.dealB(num, this) == var3) {
return var3;
} else {
return Unit.INSTANCE;
}
}
这里涉及到了label
状态机的分析,当label
为0时,会调用case为0下面的代码。在里面label
被设置为了1,又调用了var4.dealA(this)
这个挂起函数,从前面挂起函数的分析知道其会返回COROUTINE_SUSPENDED
标志,所以var10000
也就会得到COROUTINE_SUSPENDED
标志,此时会被判断相等,协程会被挂起。
协程的恢复
挂起后就要恢复了。在前面执行到的dealA
方法中,在withContext
的时候会触发dealA
中的invokeSuspend
方法。此时label
被设置为1,所以会被调用到case为1的代码:
//dealA
public final Object invokeSuspend(@NotNull Object $result) {
Object var2 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch(this.label) {
case 0:
ResultKt.throwOnFailure($result);
this.label = 1;
if (DelayKt.delay(3000L, this) == var2) {
return var2;
}
break;
case 1:
ResultKt.throwOnFailure($result);
break;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
return Unit.INSTANCE;
}
return Boxing.boxInt(1);
执行了ResultKt.throwOnFailure($result)
,最后返回int的值。同时launch中的invokeSuspend
也被执行,上面已经将label设置为1,这里就会执行到case 1下的代码:
//launch invokeSuspend
switch(this.label) {
......
case 1:
ResultKt.throwOnFailure($result);
var10000 = $result;
break;
}
int num = ((Number)var10000).intValue();
var4 = ButtonTextActivity.this;
this.label = 2;
if (var4.dealB(num, this) == var3) {
return var3;
} else {
return Unit.INSTANCE;
}
对结果进行了失败处理,此时var10000也就是刚刚得到的int值,接着执行suspend
剩余的代码,在下面将lable
设置为了2,开始执行dealB的方法。
在dealB
方法中,跟之前分析的步骤一样,也会回到invokeSuspend
中:
private final Object dealB(int num, Continuation $completion) {
Object var10000 = BuildersKt.withContext((CoroutineContext)Dispatchers.getIO(), (Function2)(new Function2((Continuation)null) {
int label;
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
......
return Unit.INSTANCE;
}
......
return var10000 == IntrinsicsKt.getCOROUTINE_SUSPENDED() ? var10000 : Unit.INSTANCE;
}
最后当没有挂起函数的时候,会返回Unit.INSTANCE
,结束协程执行。
小结
协程通过suspend
来标识挂起点,但真正的挂起点还需要通过是否返回COROUTINE_SUSPENDED
来判断,而代码体现是通过状态机来处理协程的挂起与恢复。
在挂起和恢复的过程中,当判断挂起函数到返回值是COROUTINE_SUSPENDED
标志时,会挂起,在需要挂起的时候,状态机会把之前的结果以成员变量的方式保存在 continuation
中。在挂起函数恢复的时候,会调用Continuation的resumeWith
方法,继而触发invokeSuspend
。根据保存在Continuation
中的label,进入不同的 分支恢复之前保存的状态,进入下一个状态。
在挂起的时候并不会阻塞当前的线程,是因为挂起是在invokeSuspend方法中return出去的,而invokeSuspend之外的函数当然还是会继续执行。
作者:罗恩不带土
链接:https://juejin.cn/post/7103311646591811598
来源:稀土掘金
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。