动手实现Kotlin协程同步切换线程,以及Kotlin协程是如何实现线程切换的
前言
突发奇想想搞一个同步切换线程的Kotlin协程,而不用各种withContext(){},可以减少嵌套且逻辑更清晰,想实现的结果如下图:
分析
实现我们想要的结果,首先需要知道协程为什么可以控制线程的切换以及在挂起函数恢复的时候回到原来设定的线程中
ps:挂起函数比普通函数多出了两个操作:挂起和恢复,具体参考:Kotlin协程在项目中的实际应用_lt的博客-CSDN博客_kotlin协程使用
其实控制线程切换是协程库内内置的一个拦截器类:ContinuationInterceptor
拦截器是一个协程上下文元素(ContinuationInterceptor实现了Element接口 ,Element实现了CoroutineContext接口)
拦截器的作用是将协程体包装一层后,拦截其恢复功能(resumeWith),这样就可以在协程恢复的时候将包装在其内部的协程体在相应的线程中恢复执行(执行resumeWith方法)
比如协程自带的Dispatchers.Main,Dispatchers.IO等都是协程拦截器,下面简单分析下Dispatchers.IO拦截器
我们点进去IO的定义,兜兜转转的找到其实现类LimitingDispatcher,其继承了ExecutorCoroutineDispatcher ,ExecutorCoroutineDispatcher继承了CoroutineDispatcher ,CoroutineDispatcher实现了ContinuationInterceptor接口,也就是其最终实现了协程拦截器的接口
CoroutineDispatcher重写了拦截器的interceptContinuation方法,该方法就是用来包装并拦截的
然后我们在看看DispatchedContinuation的resumeWith方法(也就是如何拦截并将包装的协程体运行在子线程的)
ps:其实走的是resumeCancellableWith方法,因为协程内部做了一个判断,IO的拦截器是继承了DispatchedContinuation的
第一个红框IO那是写死的true,所以只会走第一个流程,而第二个红框你可以简单的理解为将后续任务(下面的代码逻辑)放在这个dispatcher的线程池中运行(就相当于将协程中的代码的线程放到了IO子线程中运行了)
通过上面的分析,其实IO的拦截器可以简单理解为如下代码:
实现
那我们是不是可以将拦截器从协程上下文中移除呢?我试了下并不行,发现是在launch的时候会自动判断,如果没有拦截器则默认附加Dispatchers.Default拦截器用于将操作置于子线程中
ps:可以通过遍历来查看当前协程上下文中都有哪些协程元素(下面是反射的实现,可以使用系统提供的fold来遍历):
/**
* 通过反射遍历协程上下文中的元素
*/
fun CoroutineContext.forEach(action: (CoroutineContext.Element) -> Unit) {
val modeClass = Class.forName("kotlin.coroutines.CombinedContext")
val elementField = modeClass.getDeclaredField("element")
elementField.isAccessible = true
val leftField = modeClass.getDeclaredField("left")
leftField.isAccessible = true
var context: CoroutineContext? = this
while (context?.javaClass == modeClass) {
(elementField.get(context) as? CoroutineContext.Element)?.let(action)
context = leftField.get(context) as? CoroutineContext
}
(context as? CoroutineContext.Element)?.let(action)
}
coroutineContext.forEach {
it.toString().e()
}
pps:协程上下文其实是以链表形式来存储的,CombinedContext就相当于链表的Node节点,其element相当于数据,其left相当于下一个节点,而存储的最后一个节点是未经过CombinedContext包装的协程上下文元素(可能是节省空间);而不管协程上下文,或者CombinedContext的element和left,都是val的,这样上下文都是只读的,避免了并发修改的危险.
那现在看来其实我们只要自己写一个拦截器,然后在运行协程的时候附加上去,线程切换就可以由我们来控制了,那实现起来其实也很简单,代码如下:
/**
* 同步切换线程的协程元素,需要注意所有挂起函数都有可能影响到后面的线程,所以需要注意:最好内部使用不会切换线程的挂起函数(或者你清楚使用的后果)
*/
fun CoroutineScope.launchSyncSwitchThread(block: suspend SyncSwitchThreadCoroutineScope.() -> Unit): Job =
launch(SyncSwitchThreadContinuationInterceptor) { SyncSwitchThreadCoroutineScope(this@launch).block() }
//拦截器
object SyncSwitchThreadContinuationInterceptor : ContinuationInterceptor {
override val key: CoroutineContext.Key<*> = ContinuationInterceptor
override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> = SyncSwitchThreadContinuation(continuation)
//协程体包装类
class SyncSwitchThreadContinuation<T>(private val continuation: Continuation<T>) : Continuation<T> {
override val context: CoroutineContext = continuation.context
//这里我们不进行拦截恢复方法,直接使用恢复者(谁调用了resume)的线程
override fun resumeWith(result: Result<T>): Unit = continuation.resumeWith(result)
}
//协程作用域包装类,用于控制toMain等方法不会被别的地方使用
class SyncSwitchThreadCoroutineScope(coroutineScope: CoroutineScope) : CoroutineScope by coroutineScope {
private suspend inline fun suspendToThread(crossinline threadFunction: (()->Unit) -> Unit) =
suspendCoroutine<Unit> {
threadFunction {
if (it.context.isActive)
it.resume(Unit)
}
}
//切换到主线程,[force]是否强制post到主线程(false:如果当前是主线程则不会post)
suspend fun toMain(force: Boolean = false) {
if (!force && isMainThread) return
suspendToThread(HandlerPool::postEmptyListener)//Handler#post方法
}
//切换到单例子线程
suspend fun toSingle(): Unit = suspendToThread(ThreadPool::submitToSingleThreadPool)//提交到单线程线程池
suspend fun toIO(): Unit = suspendToThread(ThreadPool::submitToCacheThreadPool)//提交到子线程池
suspend fun toCPU(): Unit = suspendToThread(ThreadPool::submitToCPUThreadPool)//提交到CPU密集型子线程池
//或者为了理解起来简单减少封装写成如下方式
suspend fun toCPU(): Unit = suspendCoroutine {
ThreadPool.submitToCacheThreadPool {
if(it.context.isActive)
it.resume(Unit)
}
}
}
}
ps:第一次运行的线程是启动它的线程
pps:这里我们包装了一下协程作用域CoroutineScope,可以防止toMain这些方法用在别的地方造成歧义且无用
然后我们就可以像开头那样使用了
或者通过协程上下文的plus(+)方法来替换掉默认的拦截器:
最后在封装一下:
fun CoroutineScope.launchSyncSwitchThread(block: suspend SyncSwitchThreadCoroutineScope.() -> Unit): Job =
launch(SyncSwitchThreadContinuationInterceptor) { SyncSwitchThreadCoroutineScope(this@launch).block() }
使用方式就和最开始的图一样了
结语
这样就ok了,其实任何事情只要了解原理,就可以很快的想到方案,如果不清楚原理,就会对一些事情一头雾水
ps:其实使用Dispatchers.Unconfined也可以实现相同的效果2333,但是了解原理还是更重要
pps:如果想限制launchSyncSwitchThread的lambda范围内只能使用自己定义的几个挂起函数,只需要给SyncSwitchThreadCoroutineScope类加上@RestrictsSuspension注解即可,这样在SyncSwitchThreadCoroutineScope的作用域内,只能使用他内部定义的挂起函数,可以有效减少别的挂起函数意外切换掉线程的情况
end
链接:https://juejin.cn/post/7122653681714987022
来源:稀土掘金
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。