注册

Kotlin 协程调度切换线程是时候解开真相了

在前面的文章里,通过比较基础的手段演示了如何开启协程、如何挂起、恢复协程。并没有涉及到如何切换线程执行,而没有切换线程功能的协程是没有灵魂的。

本篇将重点分析协程是如何切换线程执行以及如何回到原来的线程执行等知识。

通过本篇文章,你将了解到:




  1. 如何指定协程运行的线程?
  2. 协程调度器原理
  3. 协程恢复时线程的选择


1. 如何指定协程运行的线程?


Android 切换线程常用手法


常规手段


平常大家用的切换到主线程的手段:Activity.runOnUiThread(xx),View.post(xx),Handler.sendMessage(xx) 等简单方式。另外还有一些框架,如AsyncTask、RxJava、线程池等。
它们本质上是借助了Looper+Handler功能。

先看个Demo,在子线程获取学生信息,拿到结果后切换到主线程展示:


    private inner class MyHandler : Handler(Looper.getMainLooper()) {
override fun handleMessage(msg: Message) {
//主线程弹出toast
Toast.makeText(context, msg.obj.toString(), Toast.LENGTH_SHORT).show()
}
}

//获取学生信息
fun showStuInfo() {
thread {
//模拟网络请求
Thread.sleep(3000)
var handler = MyHandler()
var msg = Message.obtain()
msg.obj = "我是小鱼人"
//发送到主线程执行
handler.sendMessage(msg)
}
}

我们知道Android UI 刷新是基于事件驱动的,主线程一直尝试从事件队列里拿到待执行的事件,没拿到就等待,拿到后就执行对应的事件。这也是Looper的核心功能,不断检测事件队列,而往队列里放事件即是通过Handler来操作的。



子线程通过Handler 往队列里存放事件,主线程在遍历队列,这就是一次子线程切换到主线程运行的过程。



当然了,因为主线程有消息队列,若想要抛事件到子线程执行,在子线程构造消息队列即可。


协程切换到主线程


同样的功能,用协程实现:


    fun showStuInfoV2() {
GlobalScope.launch(Dispatchers.Main) {
var stuInfo = withContext(Dispatchers.IO) {
//模拟网络请求
Thread.sleep(3000)
"我是小鱼人"
}

Toast.makeText(context, stuInfo, Toast.LENGTH_SHORT).show()
}
}

很明显,协程简洁太多。

相较于常规手段,协程无需显示构造线程,也无需显示通过Handler发送,在Handler里接收信息并展示。

我们有理由猜测,协程内部也是通过Handler+Looper实现切换到主线程运行的。


协程切换线程


当然协程不只能够从子线程切换到主线程,也可以从主线程切换到子线程,甚至在子线程之间切换。


    fun switchThread() {
println("我在某个线程,准备切换到主线程")
GlobalScope.launch(Dispatchers.Main) {
println("我在主线程,准备切换到子线程")
withContext(Dispatchers.IO) {
println("我在子线程,准备切换到子线程")
withContext(Dispatchers.Default) {
println("我在子线程,准备切换到主线程")
withContext(Dispatchers.Main) {
println("我在主线程")
}
}
}
}
}

无论是launch()函数还是withContext()函数,只要我们指定了运行的线程,那么协程将会在指定的线程上运行。


2. 协程调度器原理


指定协程运行的线程


接下来从launch()源码出发,一步步探究协程是如何切换线程的。

launch()简洁写法:


    fun launch1() {
GlobalScope.launch {
println("launch default")
}
}

launch()函数有三个参数,前两个参数都有默认值,第三个是我们的协程体,也即是 GlobalScope.launch 花括号里的内容。


#Builders.common.kt
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
//构造新的上下文
val newContext = newCoroutineContext(context)
//构造completion
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
//开启协程
coroutine.start(start, coroutine, block)
return coroutine
}

接着看newCoroutineContext 实现:


#CoroutineContext.kt
actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {
//在Demo 环境里 coroutineContext = EmptyCoroutineContext
val combined = coroutineContext + context
//DEBUG = false
val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined
//没有指定分发器,默认使用的分发器为:Dispatchers.Default
//若是指定了分发器,就用指定的
return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null)
debug + Dispatchers.Default else debug
}

这块涉及到CoroutineContext 一些重载运算符的操作,关于CoroutineContext 本次不会深入,只需理解其意思即可。


只需要知道:

CoroutineContext 里存放着协程的分发器。


协程有哪些分发器呢?


Dispatchers.Main



UI 线程,在Android里为主线程



Dispatchers.IO



IO 线程,主要执行IO 操作



Dispatchers.Default



主要执行CPU密集型操作,比如一些计算型任务



Dispatchers.Unconfined



不特意指定使用的线程



指定协程在主线程运行


不使用默认参数,指定协程的分发器:


    fun launch1() {
GlobalScope.launch(Dispatchers.Main) {
println("我在主线程执行")
}
}

以此为例,继续分析其源码。

上面提到过,开启协程使用coroutine.start(start, coroutine, block)函数:



#AbstractCoroutine.kt
fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
//start 为CoroutineStart里的函数
//最终会调用到invoke
start(block, receiver, this)
}
#CoroutineStart.kt
public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>): Unit =
when (this) {
//this 指的是StandaloneCoroutine,默认走default
CoroutineStart.DEFAULT -> block.startCoroutineCancellable(receiver, completion)
CoroutineStart.ATOMIC -> block.startCoroutine(receiver, completion)
CoroutineStart.UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
CoroutineStart.LAZY -> Unit // will start lazily
}

CoroutineStart.DEFAULT、CoroutineStart.ATOMIC 表示的是协程的启动方式,其中DEFAULT 表示立即启动,也是默认启动方式。


接下来就是通过block去调用一系列的启动函数,这部分我们之前有详细分析过,此处再简单过一下:



block 代表的是协程体,其实际编译结果为:匿名内部类,该类继承自SuspendLambda,而SuspendLambda 间接实现了Continuation 接口。



继续看block的调用:


#Cancellable.kt
//block 的扩展函数
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(
receiver: R, completion: Continuation<T>,
onCancellation: ((cause: Throwable) -> Unit)? = null
) =
//runSafely 为高阶函数,里边就是调用了"{}"里的内容
runSafely(completion) {
createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit), onCancellation)
}

流程流转到createCoroutineUnintercepted()函数了,在少年,你可知 Kotlin 协程最初的样子? 里有重点分析过:该函数是真正创建协程体的地方。


直接上代码:


#IntrinsicsJvm.kt
actual fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted(
receiver: R,
completion: Continuation<T>
): Continuation<Unit> {
//包装completion
val probeCompletion = probeCoroutineCreated(completion)
return if (this is BaseContinuationImpl)
//创建协程体类
//receiver completion 皆为协程体对象 StandaloneCoroutine
create(receiver, probeCompletion)
else {
createCoroutineFromSuspendFunction(probeCompletion) {
(this as Function2<R, Continuation<T>, Any?>).invoke(receiver, it)
}
}
}

该函数的功能为创建一个协程体类,我们暂且称之为MyAnnoy。


class MyAnnoy extends SuspendLambda implements Function2 {
@Nullable
@Override
protected Object invokeSuspend(@NotNull Object o) {
//...协程体逻辑
return null;
}
@NotNull
@Override
public Continuation<Unit> create(@NotNull Continuation<?> completion) {
//...创建MyAnnoy
return null;
}
@Override
public Object invoke(Object o, Object o2) {
return null;
}
}

新的MyAnnoy 创建完成后,调用intercepted(xx)函数,这个函数很关键:


#Intrinsics.Jvm.kt
public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
//判断如果是ContinuationImpl,则转为ContinuationImpl 类型
//继而调用intercepted()函数
(this as? ContinuationImpl)?.intercepted() ?: this

此处为什么要将MyAnnoy 转为ContinuationImpl ?

因为它要调用ContinuationImpl里的intercepted() 函数:


#ContinuationImpl.kt
public fun intercepted(): Continuation<Any?> =
intercepted
//1、如果intercepted 为空则从context里取数据
//2、如果context 取不到,则返回自身,最后给intercepted 赋值
?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
.also { intercepted = it }

先看intercepted 变量类型:


#ContinuationImpl.kt
private var intercepted: Continuation<Any?>? = null

还是Continuation 类型,初始时intercepted = null。

context[ContinuationInterceptor] 表示从CoroutineContext里取出key 为ContinuationInterceptor 的Element。

既然要取出,那么得要放进去的时候,啥时候放进去的呢?


答案是:



newCoroutineContext(context) 构造了新的CoroutineContext,里边存放了分发器。



又因为我们设定的是在主线程进行分发:Dispatchers.Main,因此context[ContinuationInterceptor] 取出来的是Dispatchers.Main。


Dispatchers.Main 定义:


#Dispatchers.kt
public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher
#MainCoroutineDispatcher.kt
public abstract class MainCoroutineDispatcher : CoroutineDispatcher() {}

MainCoroutineDispatcher 继承自 CoroutineDispatcher,而它里边有个函数:


#CoroutineDispatcher.kt
public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
DispatchedContinuation(this, continuation)

而 Dispatchers.Main 调用的就是interceptContinuation(xx)函数。

该函数入参为Continuation 类型,也就是MyAnnoy 对象,函数的内容很简单:




  • 构造DispatchedContinuation 对象,传入的参数分别是Dispatchers.Main和MyAnnoy 对象。
  • Dispatchers.Main、MyAnnoy 分别赋值给成员变量dispatcher和continuation。


DispatchedContinuation 继承自DispatchedTask,它又继承自SchedulerTask,本质上就是Task,Task 实现了Runnable接口:


#Tasks.kt
internal abstract class Task(
@JvmField var submissionTime: Long,
@JvmField var taskContext: TaskContext
) : Runnable {
//...
}

至此,我们重点关注其实现了Runnable接口里的run()函数即可。


再回过头来看构造好DispatchedContinuation 之后,调用resumeCancellableWith()函数:


#DispatchedContinuation.kt
override fun resumeWith(result: Result<T>) {
val context = continuation.context
val state = result.toState()
//需要分发
if (dispatcher.isDispatchNeeded(context)) {
_state = state
resumeMode = MODE_ATOMIC
//调用分发器分发
dispatcher.dispatch(context, this)
} else {
executeUnconfined(state, MODE_ATOMIC) {
withCoroutineContext(this.context, countOrElement) {
continuation.resumeWith(result)
}
}
}
}

而Demo里此处的dispatcher 即为Dispatchers.Main。


好了,总结一下launch()函数的功能:



image.png


Dispatchers.Main 实现


接着来看看Dispatchers.Main 如何分发任务的,先看其实现:


#MainDispatcherLoader.java
internal object MainDispatcherLoader {

//默认true
private val FAST_SERVICE_LOADER_ENABLED = systemProp(FAST_SERVICE_LOADER_PROPERTY_NAME, true)

@JvmField
val dispatcher: MainCoroutineDispatcher = loadMainDispatcher()
//构造主线程分发
private fun loadMainDispatcher(): MainCoroutineDispatcher {
return try {
val factories = if (FAST_SERVICE_LOADER_ENABLED) {
//加载分发器工厂①
FastServiceLoader.loadMainDispatcherFactory()
} else {
...
}
//通过工厂类,创建分发器②
factories.maxByOrNull { it.loadPriority }?.tryCreateDispatcher(factories)
?: createMissingDispatcher()
} catch (e: Throwable) {
...
}
}
}

先看①:


#FastServiceLoader.kt
internal fun loadMainDispatcherFactory(): List<MainDispatcherFactory> {
val clz = MainDispatcherFactory::class.java
//...
return try {
//反射构造工厂类:AndroidDispatcherFactory
val result = ArrayList<MainDispatcherFactory>(2)
FastServiceLoader.createInstanceOf(clz,
"kotlinx.coroutines.android.AndroidDispatcherFactory")?.apply { result.add(this) }
FastServiceLoader.createInstanceOf(clz,
"kotlinx.coroutines.test.internal.TestMainDispatcherFactory")?.apply { result.add(this) }
result
} catch (e: Throwable) {
//...
}
}

该函数返回的工厂类为:AndroidDispatcherFactory。


再看②,拿到工厂类后,就该用它来创建具体的实体了:


#HandlerDispatcher.kt
internal class AndroidDispatcherFactory : MainDispatcherFactory {
//重写createDispatcher 函数,返回HandlerContext
override fun createDispatcher(allFactories: List<MainDispatcherFactory>) =
HandlerContext(Looper.getMainLooper().asHandler(async = true), "Main")
//...
}

//定义
internal class HandlerContext private constructor(
private val handler: Handler,
private val name: String?,
private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay {
}

最终创建了HandlerContext。

HandlerContext 继承自类:HandlerDispatcher


#HandlerDispatcher.kt
sealed class HandlerDispatcher : MainCoroutineDispatcher(), Delay {
//重写分发函数
override fun dispatch(context: CoroutineContext, block: Runnable) {
//抛到主线程执行,handler为主线程的Handler
handler.post(block)
}
}

很明显了,DispatchedContinuation里借助dispatcher.dispatch()进行分发,而dispatcher 是Dispatchers.Main,最终的实现是HandlerContext。

因此dispatch() 函数调用的是HandlerDispatcher.dispatch()函数,该函数里将block 抛到了主线程执行。

block 为啥是呢?

block 其实是DispatchedContinuation 对象,从上面的分析可知,它间接实现了Runnable 接口。

查看其实现:


#DispatchedTask.kt
override fun run() {
val taskContext = this.taskContext
var fatalException: Throwable? = null
try {
//delegate 为DispatchedContinuation 本身
val delegate = delegate as DispatchedContinuation<T>
//delegate.continuation 为我们的协程体 MyAnnoy
val continuation = delegate.continuation
withContinuationContext(continuation, delegate.countOrElement) {
val context = continuation.context
//...
val job = if (exception == null && resumeMode.isCancellableMode) context[Job] else null
if (job != null && !job.isActive) {
//...
} else {
if (exception != null) {
continuation.resumeWithException(exception)
} else {
//执行协程体
continuation.resume(getSuccessfulResult(state))
}
}
}
} catch (e: Throwable) {
//...
} finally {
//...
}
}

continuation 变量是我们的协程体:MyAnnoy。

MyAnnoy.resume(xx) 这函数我们很熟了,再重新熟悉一下:


#ContinuationImpl.kt
override fun resumeWith(result: Result<Any?>) {
// This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
var current = this
var param = result
while (true) {
with(current) {
//completion 即为开始时定义的StandaloneCoroutine
val completion = completion!! // fail fast when trying to resume continuation without completion
val outcome: Result<Any?> =
try {
//执行协程体里的代码
val outcome = invokeSuspend(param)
if (outcome === kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED) return
kotlin.Result.success(outcome)
} catch (exception: Throwable) {
kotlin.Result.failure(exception)
}
//...
}
}
}

invokeSuspend(param) 调用的是协程体里的代码,也就是launch 花括号里的内容,因此这里面的内容是主线程执行的。


再来看看launch(Dispatchers.Main)函数执行步骤如下:




  1. 分发器HandlerContext 存储在CoroutineContext(协程上下文)里。
  2. 构造DispatchedContinuation 分发器,它持有变量dispatcher=HandlerContext,continuation=MyAnnoy。
  3. DispatchedContinuation 调用dispatcher(HandlerContext) 进行分发。
  4. HandlerContext 将Runnable(DispatchedContinuation) 抛到主线程。


经过上面几步,launch(Dispatchers.Main) 任务算是完成了,至于Runnable什么时候执行与它无关了。


当Runnable 在主线程被执行后,从DispatchedContinuation 里取出continuation(MyAnnoy),并调用continuation.resume()函数,进而执行MyAnnoy.invokeSuspend()函数,最后执行了launch{}协程体里的内容。

于是协程就愉快地在主线程执行了。


老规矩,结合代码与函数调用图:



image.png


3. 协程恢复时线程的选择


以主线程为例,我们知道了协程指定线程运行的原理。

想象另一种场景:



在协程里切换了子线程执行,子线程执行完毕后还会回到主线程执行吗?



对上述Demo进行改造:


    fun launch2() {
GlobalScope.launch(Dispatchers.Main) {
println("我在主线程执行")
withContext(Dispatchers.IO) {
println("我在子线程执行")//②
}
println("我在哪个线程执行?")//③
}
}

大家先猜猜③ 的答案是什么?是主线程还是子线程?


withContext(xx)函数上篇(讲真,Kotlin 协程的挂起没那么神秘(原理篇))已经深入分析过了,它是挂起函数,主要作用:



切换线程执行协程。




image.png


MyAnnoy1 对应协程体1,为父协程体。

MyAnnoy2 对应协程体2,为子协程体。

当② 执行完成后,会切换到父协程执行,我们看看切换父协程的流程。

每个协程的执行都要经历下面这个函数:


#BaseContinuationImpl.kt
override fun resumeWith(result: Result<Any?>) {
//...
while (true) {
//..
with(current) {
val completion = completion!! // fail fast when trying to resume continuation without completion
val outcome: Result<Any?> =
try {
//执行协程体
val outcome = invokeSuspend(param)
if (outcome === kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED) return
kotlin.Result.success(outcome)
} catch (exception: Throwable) {
kotlin.Result.failure(exception)
}
releaseIntercepted() // this state machine instance is terminating
if (completion is BaseContinuationImpl) {
//...
} else {
//如果上一步的协程体不阻塞,则执行completion
completion.resumeWith(outcome)
return
}
}
}
}

此处以withContext(xx)函数协程体执行为例,它的completion 为何物?

上面提到过launch()开启协程时,它的协程体的completion 为StandaloneCoroutine,也就是说MyAnnoy1.completion = StandaloneCoroutine。

从withContext(xx)源码里得知,它的completion 为DispatchedCoroutine,DispatchedCoroutine,它继承自ScopeCoroutine,ScopeCoroutine 有个成员变量为:uCont: Continuation。

当构造DispatchedCoroutine 时,传入的协程体赋值给uCont。
也就是DispatchedCoroutine.uCont = MyAnnoy1,MyAnnoy2.completion = DispatchedCoroutine。



此时,子协程体与父协程 通过DispatchedCoroutine 关联起来了。



因此completion.resumeWith(outcome)==DispatchedCoroutine.resumeWith(outcome)。
直接查看 后者实现即可:


#AbstractCoroutine.kt
public final override fun resumeWith(result: Result<T>) {
val state = makeCompletingOnce(result.toState())
if (state === COMPLETING_WAITING_CHILDREN) return
afterResume(state)
}

#Builders.common.kt
#DispatchedCoroutine 类里
override fun afterResume(state: Any?) {
//uCont 为父协程体
uCont.intercepted().resumeCancellableWith(recoverResult(state, uCont))
}

到此就豁然开朗了,uCont.intercepted() 找到它的拦截器,因为uCont为MyAnnoy1,它的拦截器就是HandlerContext,又来了一次抛回到主线程执行。


因此,上面Demo里③ 的答案是:



它在主线程执行。



小结来看,就两步:




  1. 父协程在主线程执行,中途遇到挂起的方法切换到子线程(子协程)执行。
  2. 当子协程执行完毕后,找到父协程的协程体,继续让其按照原有规则分发。


老规矩,有代码有图有真相:



image.png


至此,切换到主线程执行的原理已经分析完毕。


好奇的小伙伴可能会问:你这举例都是子线程往主线程切换,若是子线程往子线程切换呢?

往主线程切换依靠Handler,而子线程切换依赖线程池,这块内容较多,单独拎出来分析。

既然都提到这个点了,那这里再提一个问题:


    fun launch3() {
GlobalScope.launch(Dispatchers.IO) {
withContext(Dispatchers.Default) {
println("我在哪个线程运行")
delay(2000)
println("delay 后我在哪个线程运行")
}
println("我又在哪个线程运行")
}
}

你知道上面的答案吗?


我们下篇将重点分析协程线程池的调度原理,通过它你将会知道上面的答案。


本文基于Kotlin 1.5.3,文中完整Demo请点击


作者:小鱼人爱编程
链接:https://juejin.cn/post/7113706345190129700
来源:稀土掘金
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

0 个评论

要回复文章请先登录注册