【kotlin从摸索到探究】- delay函数实现原理
简介
这片文章主要讲解kotlin
中delay
函数的实现原理,delay
是一个挂起函数。kotlin携程使用过程中,经常使用到挂起函数,在我学习kotlin携程
的时候,一些现象让我很是困惑,所以打算从源码角度来逐一分析。
说明
在分析delay
源码实现过程中,由于对kotlin有些语法还不是很熟悉,所以并不会把每一步将得很透彻,只会梳理一个大致的流程,如果讲解有误的地方,欢迎指出。
例子先行
fun main() = runBlocking {
println("${treadName()}======start")
launch {
println("${treadName()}======delay 1s start")
delay(1000)
println("${treadName()}======delay 1s end")
}
println("${treadName()}======delay 3s start")
delay(3000)
println("${treadName()}======delay 3s end")
// 延迟,保活进程
Thread.sleep(500000)
}
输出如下:
main======start
main======delay 3s start
main======delay 1s start
main======delay 1s end
main======delay 3s end
根据日志可以看出:
- 日志输出环境是在主线程。
- 执行3s延迟函数后,切换到了**
launch
**携程体执行。 - delay挂起函数恢复后执行各自的打印函数。
疑问:
如果真像打印日志输出一样,所以的操作都是在一个线程(主线程)完成,那么问题来了。**
第一:
按照Java线程知识,单线程执行是按照顺序的,是单条线的。那么不管delay
里是何等骚操作,只要没有重新起线程,应该不能够像上面输入的那样吧,你说sleep
,wait
,如果你这么想,那么你可以去补一补Java多线程基础知识了。猜想
:**1. 难得真有什么我不知道的骚操作可以在一个线程里面同时执行delay
和其它代码,真像很多人说的,携程性能很好,使用挂起函数可以不用启动新的线程,就可以异步执行,那真的就很不错
。2.delay
启动了新的线程,上面的现象只不过是进行了线程切换,那么如果多次调用delay
那么岂不是要创建很多线程,这性能问题和资源问题怎么解决。3.delay
基于某种任务调度策略。
delay源码
public suspend inline fun <T> suspendCancellableCoroutine(
crossinline block: (CancellableContinuation<T>) -> Unit
): T =
suspendCoroutineUninterceptedOrReturn { uCont ->
val cancellable = CancellableContinuationImpl(uCont.intercepted(), resumeMode = MODE_CANCELLABLE)
cancellable.initCancellability()
block(cancellable)
cancellable.getResult()
}
cancellable
是一个CancellableContinuationImpl
对象,执行 block(cancellable),回到下面函数。
public suspend fun delay(timeMillis: Long) {
if (timeMillis <= 0) return // don't delay
return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> ->
// if timeMillis == Long.MAX_VALUE then just wait forever like awaitCancellation, don't schedule.
if (timeMillis < Long.MAX_VALUE) {
cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont)
}
}
}
看一下cont.context.delay
的get
方法
internal val CoroutineContext.delay: Delay get() = get(ContinuationInterceptor) as? Delay ?: DefaultDelay
如果get(ContinuationInterceptor)
是Delay
类型对象,那么直接返回该对象,如果不是返回DefaultDelay
变量,看一下DefaultDelay
初始化可以知道,它是一个DefaultExecutor
对象,继承了EventLoopImplBase
类。
runBlocking
执行过程中有这样一行代码createCoroutineUnintercepted(receiver, completion).intercepted()
会被ContinuationInterceptor
进行包装。所以上面cont.context.delay
返回的就是被包装的携程体上下文。
查看scheduleResumeAfterDelay方法。
public override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
val timeNanos = delayToNanos(timeMillis)
if (timeNanos < MAX_DELAY_NS) {
val now = nanoTime()
DelayedResumeTask(now + timeNanos, continuation).also { task ->
continuation.disposeOnCancellation(task)
schedule(now, task)
}
}
}
创建DelayedResumeTask对象,在also执行相关计划任务,看一下schedule
方法。
public fun schedule(now: Long, delayedTask: DelayedTask) {
when (scheduleImpl(now, delayedTask)) {
SCHEDULE_OK -> if (shouldUnpark(delayedTask)) unpark()
SCHEDULE_COMPLETED -> reschedule(now, delayedTask)
SCHEDULE_DISPOSED -> {} // do nothing -- task was already disposed
else -> error("unexpected result")
}
}
这里返回SCHEDULE_OK
,执行unpark
函数,这里用到了Java提供的LockSupport
线程操作相关知识。
读取线程
val thread = thread
如果delay是当前携程的上下文
那么把延时任务加入到队列后,那么又是怎么达到线程延迟呢。回到runBlocking
执行流程,会执行coroutine.joinBlocking()
这样一行代码。fun joinBlocking(): T {
registerTimeLoopThread()
try {
eventLoop?.incrementUseCount()
try {
while (true) {
@Suppress("DEPRECATION")
if (Thread.interrupted()) throw InterruptedException().also { cancelCoroutine(it) }
val parkNanos = eventLoop?.processNextEvent() ?: Long.MAX_VALUE
// note: process next even may loose unpark flag, so check if completed before parking
if (isCompleted) break
parkNanos(this, parkNanos)
}
} finally { // paranoia
eventLoop?.decrementUseCount()
}
} finally { // paranoia
unregisterTimeLoopThread()
}
// now return result
val state = this.state.unboxState()
(state as? CompletedExceptionally)?.let { throw it.cause }
return state as T
}执行:
val parkNanos = eventLoop?.processNextEvent() ?: Long.MAX_VALUE
看一下
processNextEvent
override fun processNextEvent(): Long {
// unconfined events take priority
if (processUnconfinedEvent()) return 0
// queue all delayed tasks that are due to be executed
val delayed = _delayed.value
if (delayed != null && !delayed.isEmpty) {
val now = nanoTime()
while (true) {
delayed.removeFirstIf {
if (it.timeToExecute(now)) {
enqueueImpl(it)
} else
false
} ?: break // quit loop when nothing more to remove or enqueueImpl returns false on "isComplete"
}
}
// then process one event from queue
val task = dequeue()
if (task != null) {
task.run()
return 0
}
return nextTime
}从延迟队列取任务
val delayed = _delayed.value
挂起当前线程
parkNanos(this, parkNanos)
这里是一个
while
循环,当挂起时间到,线程唤醒,继续从任务队列中取任务执行。如果还是延迟任务,这根据当前时间点,计算线程需要挂起的时间,这也是为什么多个延迟任务好像是同时执行的。如果delay是DefaultExecutor
比如这个例子:携程上下文没有像CoroutineStart.DEFAULT
那样进行包装。fun main() {
GlobalScope.launch(start = CoroutineStart.UNDISPATCHED){
println("${treadName()}======我开始执行了~")
delay(1000)
println("${treadName()}======全局携程~")
}
println("${treadName()}======我要睡觉~")
Thread.sleep(3000)
}然后调用
DefaultExecutor
类中thread的get
方法:override val thread: Thread
get() = _thread ?: createThreadSync()看一下
createThreadSync
函数private fun createThreadSync(): Thread {
return _thread ?: Thread(this, THREAD_NAME).apply {
_thread = this
isDaemon = true
start()
}
}创建一个叫
"kotlinx.coroutines.DefaultExecutor
的新线程,并且开始运行。这时候会执行DefaultExecutor
中的run
方法。在run
方法中有这样一行代码:parkNanos(this, parkNanos)
点进去看看:
internal inline fun parkNanos(blocker: Any, nanos: Long) {
timeSource?.parkNanos(blocker, nanos) ?: LockSupport.parkNanos(blocker, nanos)
}调用Java提供的LockSupport.parkNanos(blocker, nanos)方法,阻塞当前线程,实现挂起,当达到阻塞的时间,恢复线程执行。
查看进行中线程情况方法
fun main() {
println("${treadName()}======doSuspendTwo")
Thread.sleep(500000)
}
运行main
,通过命令jps
找到对应Java进程(没有特别指定,进程名为文件名)号。
...
3406 KotlinCoreutinesSuspendKt
...
执行jstack 进程号
查看进程对应的线程资源。
作者:Coolbreeze
链接:https://juejin.cn/post/7007769804505350152
来源:掘金
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。