Kotlin协程:MutableStateFlow的实现原理
一.MutableStateFlow接口的实现
1.MutableStateFlow方法
在Kotlin协程:StateFlow的设计与使用中,讲到可以通过MutableSharedFlow方法创建一个MutableSharedFlow接口指向的对象,代码如下:
@Suppress("FunctionName")
public fun <T> MutableStateFlow(value: T): MutableStateFlow<T> =
StateFlowImpl(value ?: NULL)
...
@JvmField
@SharedImmutable
internal val NULL = Symbol("NULL")
在MutableStateFlow方法中,根据参数value创建并返回一个类型为StateFlowImpl的对象,如果参数value为空,则传入对应的标识NULL。
二.StateFlowImpl类
StateFlowImpl类是MutableStateFlow接口的核心实现,它的继承关系与SharedFlowImpl类的继承关系类似,如下图所示:
- AbstractSharedFlow类:提供了对订阅者进行管理的方法。
- CancellableFlow接口:用于标记StateFlowImpl类型的Flow对象是可取消的。
- MutableStateFlow接口:表示StateFlowImpl类型的Flow对象是一个单数据更新的热流。
- FusibleFlow接口:表示StateFlowImpl类型的Flow对象是可融合的。
1.发射数据的管理
在StateFlowImpl中,当前的数据被保存在名为_state的全局变量中,_state表示StateFlowImpl类型对象的状态,当前代码如下:
private class StateFlowImpl<T>(
initialState: Any
) : AbstractSharedFlow<StateFlowSlot>(), MutableStateFlow<T>, CancellableFlow<T>, FusibleFlow<T> {
// 当前状态
private val _state = atomic(initialState)
...
}
除此之外,StateFlowImpl不对其他的数据做缓存。
2.订阅者的管理
由于StateFlowImpl类与SharedFLowImpl类都继承自AbstractSharedFlow类,因此二者订阅者管理的核心逻辑相同,这里不再赘述,详情可参考Kotlin协程:MutableSharedFlow的实现原理。
唯一不同的地方在,在SharedFlowImpl类中,订阅者数组中存储的对象类型为SharedFlowSlot,而在StateFlowImpl类中,订阅者数组存储的对象类型为StateFlowSlot。
1)StateFlowSlot类
StateFlowSlot类与SharedFlowSlot类类似,都继承自AbstractSharedFlowSlot类。但相比于SharedFlowImpl类型的对象,StateFlowImpl类型的对象是有状态的。
在StateFlowSlot类中,有一个名为_state的全局变量,代码如下:
@SharedImmutable
private val NONE = Symbol("NONE") // 状态标识
@SharedImmutable
private val PENDING = Symbol("PENDING") // 状态标识
private class StateFlowSlot : AbstractSharedFlowSlot<StateFlowImpl<*>>() {
// _state中默认值为null
private val _state = atomic<Any?>(null)
...
}
根据_state保存对象的不同,可以确定StateFlowSlot类型的对象的状态。StateFlowSlot类型的对象共有四种状态:
null:如果_state保存的对象为空,表示当前StateFlowSlot类型的对象没有被任何订阅者使用。
NONE:如果_state保存的对象为NONE标识,表示当前StateFlowSlot类型的对象已经被对应的订阅者使用,但既没有挂起,也没有在处理当前的数据。
PENDING:如果_state保存的对象为PENDING标识,表示当前StateFlowSlot类型的对象已经被对应的订阅者使用,并且将开始处理当前的数据。
CancellableContinuationImpl<Unit>:如果_state保存的对象为续体,表示当前StateFlowSlot类型的对象已经被对应的订阅者使用,但是订阅者已处理完当前的数据,所在的协程已被挂起,等待新的数据到来。
a)订阅者状态的管理
在StateFlowSlot类中,重写了AbstractSharedFlowSlot类的allocateLocked方法与freeLocked方法,顶用两个方法会对订阅者的初始状态和最终状态进行改变,代码如下:
// 新订阅者申请使用当前的StateFlowSlot类型的对象
override fun allocateLocked(flow: StateFlowImpl<*>): Boolean {
// 如果_state保存的对象不为空,
// 说明当前StateFlowSlot类型的对象已经被其他订阅者使用
// 返回false
if (_state.value != null) return false
// 走到这里,说明没有被其他订阅者使用,分配成功
// 修改状态值为NONE
_state.value = NONE
// 返回true
return true
}
// 订阅者释放已经使用的StateFlowSlot类型的对象
override fun freeLocked(flow: StateFlowImpl<*>): Array<Continuation<Unit>?> {
// 修改状态值为null
_state.value = null
// 返回空数组
return EMPTY_RESUMES
}
@JvmField
@SharedImmutable
internal val EMPTY_RESUMES = arrayOfNulls<Continuation<Unit>?>(0)
为了实现上述对订阅者状态的管理,在StateFlowSlot类中,还额外提供了三个方法用于实现对订阅者的状态的切换,代码如下:
// 当有状态更新成功时,会调用makePending方法,通知订阅者可以开始处理新数据
@Suppress("UNCHECKED_CAST")
fun makePending() {
// 根据当前状态判断
_state.loop { state ->
when {
// 如果未被订阅者使用,则直接返回
state == null -> return
// 如果已经处于PENDING状态,则直接返回
state === PENDING -> return
// 如果当前状态为NONE
state === NONE -> {
// 通过CAS的方式,将状态修改为PENDPENDING,并返回
if (_state.compareAndSet(state, PENDING)) return
}
// 如果为挂起状态
else -> {
// 通过CAS的方法,将状态修改为NONE
if (_state.compareAndSet(state, NONE)) {
// 如果修改成功,则恢复对应续体的执行,并返回
(state as CancellableContinuationImpl<Unit>).resume(Unit)
return
}
}
}
}
}
// 当订阅者每次处理完新数据(不一定处理成功)后,会调用takePending方法,表示完成处理
// 获取当前的状态,并修改新状态为NONE
fun takePending(): Boolean = _state.getAndSet(NONE)!!.let { state ->
assert { state !is CancellableContinuationImpl<*> }
// 如果之前的状态为PENDING,则返回true
return state === PENDING
}
// 当订阅者没有新数据需要处理时,会调用awaitPending方法挂起
@Suppress("UNCHECKED_CAST")
// 直接挂起,获取续体
suspend fun awaitPending(): Unit = suspendCancellableCoroutine sc@ { cont ->
assert { _state.value !is CancellableContinuationImpl<*> }
// 通过CAS的方式,将当前的状态修改为挂起,并返回
if (_state.compareAndSet(NONE, cont)) return@sc
// 走到这里代表状态修改失败,说明又发射了新数据,当前的状态被修改为PENDING
assert { _state.value === PENDING }
// 唤起订阅者续体的执行
cont.resume(Unit)
}
3.数据的接收
当调用StateFlow类型对象的collect方法,会触发订阅过程,接收emit方法发送的数据,这部分在
StateFlowImpl中实现,代码如下:
override suspend fun collect(collector: FlowCollector<T>) {
// 为当前的订阅者分配一个StateFlowSlot类型的对象
val slot = allocateSlot()
try {
// 如果collector类型为SubscribedFlowCollector,
// 说明订阅者监听了订阅过程的启动,则先回调
if (collector is SubscribedFlowCollector) collector.onSubscription()
// 获取订阅者所在的协程
val collectorJob = currentCoroutineContext()[Job]
// 局部变量,保存上一次发射的数据,初始值为null
var oldState: Any? = null
// 死循环
while (true) {
// 获取当前的数据
val newState = _state.value
// 判断订阅者所在协程是否是存活的,如果不是则抛出异常
collectorJob?.ensureActive()
// 如果订阅者是第一次处理数据或者当前数据与上一次数据不同
if (oldState == null || oldState != newState) {
// 将数据发送给下游
collector.emit(NULL.unbox(newState))
// 保存当前发射数据到局部变量
oldState = newState
}
// 修改状态,如果之前不是PENGDING状态
if (!slot.takePending()) {
// 则挂起等待新数据更新
slot.awaitPending()
}
}
} finally {
// 释放已分配的StateFlowSlot类型的对象
freeSlot(slot)
}
}
在上述代码中,假设当前订阅者处于PENGDING状态,并在处理数据后,通过takePending方法,将自身状态修改为NONE,由于之前为PENGDING状态,因此不会执行awaitPending方法进行挂起。因此进行了第二次循环,而在第二次调用takePending方法之前,如果数据没有更新,则订阅者将一直处于NONE状态,当再次调用takePending方法时,会调用awaitPending方法,将订阅者所在协程挂起。
4.数据的发射
在StateFlowImpl类中,当需要发射数据时,可以调用emit方法、tryEmit方法、compareAndSet方法,代码如下:
override fun tryEmit(value: T): Boolean {
this.value = value
return true
}
override suspend fun emit(value: T) {
this.value = value
}
override fun compareAndSet(expect: T, update: T): Boolean =
updateState(expect ?: NULL, update ?: NULL)
compareAndSet方法内部调用updateState方法对数据进行更新,而emit方法与tryEmit方法内部通过value属性对数据进行更新,代码如下:
@Suppress("UNCHECKED_CAST")
public override var value: T
// 拆箱
get() = NULL.unbox(_state.value)
// 更新数据
set(value) { updateState(null, value ?: NULL) }
// 拆箱操作
@Suppress("UNCHECKED_CAST", "NOTHING_TO_INLINE")
inline fun <T> unbox(value: Any?): T = if (value === this) null as T else value as T
可以发现,无论是通过emit方法、tryEmit方法还是compareAndSet方法,最终都是通过updateState方法实现数据的更新,代码如下:
// sequence是一个全局变量,当新的数据更新时,sequence会发生变化
// 当sequence为奇数时,表示当前数据正在更新
private var sequence = 0
// CAS方式更新当前数据的值
private fun updateState(expectedState: Any?, newState: Any): Boolean {
var curSequence = 0
// 获取所有的订阅者
var curSlots: Array<StateFlowSlot?>? = this.slots
// 加锁
synchronized(this) {
// 获取当前数据的值
val oldState = _state.value
// 如果期待数据不为空,同时当前数据不等于期待数据,则返回false
if (expectedState != null && oldState != expectedState) return false
// 如果新数据与老数据相同,即前后数据没有发生变化,则直接返回true
if (oldState == newState) return true
// 更新当前数据
_state.value = newState
// 获取全局变量
curSequence = sequence
// 如果为偶数,说明updateState方法没有被其他协程调用,没有并发
if (curSequence and 1 == 0) {
// 自增加1,表示当前正在更新数据
curSequence++
// 将新值保存到全局变量中
sequence = curSequence
} else { // 如果为奇数,说明updateState方法正在被其他协程调用,处于并发中
// 加2后不改变奇偶性,只是表示当前数据发生了变化
sequence = curSequence + 2
// 返回true
return true
}
// 获取当前所有的订阅者
curSlots = slots
}
// 走到这里,说明上面不是并发调用updateState方法的情况
// 循环,通知订阅者
while (true) {
// 遍历,修改订阅者的状态,通知订阅者
curSlots?.forEach {
it?.makePending()
}
// 加锁,判断在通知订阅者的过程中,数据是否又被更新了
synchronized(this) {
// 如果数据没有被更新
if (sequence == curSequence) {
// 加1,让sequence变成偶数,表示更新完毕
sequence = curSequence + 1
// 返回true
return true
}
// 如果数据有被更新,则获取sequence和订阅者
// 再次循环
curSequence = sequence
curSlots = slots
}
}
}
5.新订阅者获取缓存数据
当新订阅者出现时,StateFlow会将当前最新的数据发送给订阅者。可以通过调用StateFlowImpl类重写的常量replayCache获取当前最新的数据,代码如下:
override val replayCache: List<T>
get() = listOf(value)
在StateFlow中,清除replayCache是无效的,因为StateFlow中必须持有一个数据,因此调用
resetReplayCache方法会抛出异常,代码如下:
@Suppress("UNCHECKED_CAST")
override fun resetReplayCache() {
throw UnsupportedOperationException("MutableStateFlow.resetReplayCache is not supported")
}
6.热流的融合
SharedFlowImpl类实现了FusibleFlow接口,重写了其中的fuse方法,代码如下:
// 内部调用了fuseStateFlow方法
override fun fuse(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow) =
fuseStateFlow(context, capacity, onBufferOverflow)
...
internal fun <T> StateFlow<T>.fuseStateFlow(
context: CoroutineContext,
capacity: Int,
onBufferOverflow: BufferOverflow
): Flow<T> {
assert { capacity != Channel.CONFLATED }
// 如果容量为0、1、BUFFERED,同时溢出策略为DROP_OLDEST
if ((capacity in 0..1 || capacity == Channel.BUFFERED) && onBufferOverflow == BufferOverflow.DROP_OLDEST) {
// 返回自身
return this
}
// 调用fuseSharedFlow方法
return fuseSharedFlow(context, capacity, onBufferOverflow)
}
7.只读热流
调用MutableStateFlow方法,可以得到一个类型为MutableStateFlow的对象。通过这个对象,我们可以调用它的collect方法来订阅接收,也可以调用它的emit方法来发射数据。但大多数的时候,我们需要统一数据的发射过程,因此需要对外暴露一个只可以调用collect方法订阅而不能调用emit方法发射的对象,而不是直接暴露MutableStateFlow类型的对象。
根据上面代码的介绍,订阅的过程实际上是对数据的获取,而发射的过程实际上是数据的修改,因此如果一个流只能调用collect方法而不能调用emit方法,这种流这是一种只读流。
事实上,在Kotlin协程:StateFlow的设计与使用分析接口的时候可以发现,MutableStateFlow接口继承了MutableSharedFlow接口,MutableSharedFlow接口继承了FlowCollector接口,emit方法定义在FlowCollector中。StateFlow接口继承了Flow接口,collect方法定义在Flow接口中。因此只要将MutableStateFlow接口指向的对象转换为StateFlow接口指向的对象就可以将读写流转换为只读流。
在代码中,对MutableStateFlow类型的对象调用asStateFlow方法恰好可以实现将读写流转换为只读流,代码如下:
// 该方法调用了ReadonlyStateFlow方法,返回一个类型为StateFlow的对象
public fun <T> MutableStateFlow<T>.asStateFlow(): StateFlow<T> =
// 传入当前的MutableStateFlow类型的对象
ReadonlyStateFlow(this)
// 实现了FusibleFlow接口,
// 实现了StateFlow接口,并且使用上一步传入的MutableStateFlow类型的对象作为代理
private class ReadonlyStateFlow<T>(
flow: StateFlow<T>
) : StateFlow<T> by flow, CancellableFlow<T>, FusibleFlow<T> {
override fun fuse(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow) =
// 用于流融合,也是通过fuseStateFlow方法实现
fuseStateFlow(context, capacity, onBufferOverflow)
}
作者:李萧蝶
链接:https://juejin.cn/post/7147236984963432456
来源:稀土掘金
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。