注册

Kotlin中 Flow、SharedFlow与StateFlow区别

一、简介


了解过协程Flow 的同学知道是典型的冷数据流,而SharedFlowStateFlow则是热数据流。



  • 冷流:只有当订阅者发起订阅时,事件的发送者才会开始发送事件。
  • 热流:不管订阅者是否存在,只要发送了事件就会被消费,意思是不管接受方是否能够接收到,在这一点上有点像我们Android的LiveData

解释:LiveData新的订阅者不会接收到之前发送的事件,只会收到之前发送的最后一条数据,这个特性和SharedFlow的参数replay设置为1相似


二、使用分析


最好的分析是从使用时入手冷流flow热流SharedFlow和StateFlow热流的具体的实现类分别是MutableSharedFlow和MutableStateFlow


用一个简单的例子来说明什么是冷流,什么是热流。



  • 冷流flow:

private fun testFlow() {
val flow = flow<Int> {
(1..5).forEach {
delay(1000)
emit(it)
}
}
mBind.btCollect.setOnClickListener {
lifecycleScope.launch {
flow.collect {
Log.d(TAG, "testFlow 第一个收集器: 我是冷流:$it")
}
}
lifecycleScope.launch {
delay(5000)
flow.collect {
Log.d(TAG, "testFlow:第二个收集器 我是冷流:$it")
}
}
}

}

我点击收集按钮响应事件后,打印结果如下图:
image.png
这就是冷流,需要去触发收集,才能接收到结果。


从上图时间可知flow每次重新订阅收集都会将所有事件重新发送一次



  • 热流MutableSharedFlow和

private fun testSharedFlow() {

val sharedFlow = MutableSharedFlow<Int>(
replay = 0,//相当于粘性数据
extraBufferCapacity = 0,//接受的慢时候,发送的入栈
onBufferOverflow = BufferOverflow.SUSPEND
)
lifecycleScope.launch {
launch {

sharedFlow.collect {
println("collect1 received ago shared flow $it")

}
}
launch {
(1..5).forEach {
println("emit1 send ago flow $it")
sharedFlow.emit(it)
println("emit1 send after flow $it")
}
}
// wait a 100
delay(100)
launch {
sharedFlow.collect {
println("collect2 received shared flow $it")
}
}
}

}

image.png


第二个流收集被延迟,晚了100毫秒后就收不到了,想当于不管是否订阅,流都会发送,只管发,而collect1能够收集到是因为他在发送之前进行了订阅收集。


三、分析MutableSharedFlow中参数的具体含义


以上面testSharedFlow()方法中对象为例,上面的配置就是,当前对象的默认配置
源码如下图:


image.png


val sharedFlow = MutableSharedFlow<Int>(
replay = 0,//相当于粘性数据
extraBufferCapacity = 0,//接受的慢时候,发送的入栈
onBufferOverflow = BufferOverflow.SUSPEND //产生背压现象后的,执行策略
)

3.1、 reply:事件粘滞数


reply:事件粘滞数以testSharedFlow方法为例如果设置了数目的话,那么其他订阅者不管什么时候订阅都能够收到replay数目的最新的事件,reply=1的话有点类似Android中使用的livedata。


eg:和testSharedFlow方法区别在于 replay = 2


private fun testSharedFlowReplay() {

val sharedFlow = MutableSharedFlow<Int>(
replay = 2,//相当于粘性数据
extraBufferCapacity = 0,//接受的慢时候,发送的入栈
onBufferOverflow = BufferOverflow.SUSPEND
)
lifecycleScope.launch {
launch {

sharedFlow.collect {
println("collect1 received ago shared flow $it")

}
}
launch {
(1..5).forEach {
println("emit1 send ago flow $it")
sharedFlow.emit(it)
println("emit1 send after flow $it")
}
}
// wait a minute
delay(100)
launch {
sharedFlow.collect {
println("collect2 received shared flow $it")
}
}
}

}

按照上面的解释collect2会收集到最新的4,5两个事件如下图:


image.png


3.2 extraBufferCapacity:缓存容量


extraBufferCapacity:缓存容量,就是先发送几个事件,不管已经订阅的消费者是否接收,这种只管发不管消费者消费能力的情况就会出现背压,参数onBufferOverflow就是用于处理背压问题


eg:和testSharedFlow方法区别在于 extraBufferCapacity = 2


private fun testSharedFlowCapacity() {

val sharedFlow = MutableSharedFlow<Int>(
replay = 0,//相当于粘性数据
extraBufferCapacity = 2,//接受的慢时候,发送的入栈
onBufferOverflow = BufferOverflow.SUSPEND
)
lifecycleScope.launch {
launch {

sharedFlow.collect {
println("collect1 received ago shared flow $it")

}
}
launch {
(1..5).forEach {
println("emit1 send ago flow $it")
sharedFlow.emit(it)
println("emit1 send after flow $it")
}
}
// wait a minute
delay(100)
launch {
sharedFlow.collect {
println("collect2 received shared flow $it")
}
}
}

}

结果如下图:


优先发送将其缓存起来,testSharedFlow测试中发送与接收在没有干扰(延时之类的干扰)的情况下 是一条顺序链,而设置了extraBufferCapacity优先发送两条,不管消费情况,不设置的话(extraBufferCapacity = 0)这时如果在collect1里面设置延时delay(100),send会被阻塞(因为默认是 onBufferOverflow = BufferOverflow.SUSPEND的策略)
image.png


3.3、onBufferOverflow


onBufferOverflow:由背压就有处理策略,sharedflow默认为BufferOverflow.SUSPEND
,也即是如果当事件数量超过缓存,发送就会被挂起,上面提到了一句,DROP_OLDEST销毁最旧的值,DROP_LATEST销毁最新的值


三种参数含义


public enum class BufferOverflow {
/**
* 在缓冲区溢出时挂起。
*/
SUSPEND,

/**
* 在缓冲区溢出时删除** *旧的**值,添加新的值到缓冲区,不挂起。
*/
DROP_OLDEST,

/**
* 在缓冲区溢出时,删除当前添加到缓冲区的最新的**值\
*(使缓冲区内容保持不变),不要挂起。
*/
DROP_LATEST
}

eg:和testSharedFlowCapacity方法区别在于 多了个delay(100)



  • SUSPEND模式

private fun testSharedFlow2() {

val sharedFlow = MutableSharedFlow<Int>(
replay = 0,//相当于粘性数据
extraBufferCapacity = 2,//接受的慢时候,发送的入栈
onBufferOverflow = BufferOverflow.SUSPEND
)
lifecycleScope.launch {
launch {

sharedFlow.collect {
println("collect1 received ago shared flow $it")
delay(100)
}
}
launch {
(1..5).forEach {
println("emit1 send ago flow $it")
sharedFlow.emit(it)
println("emit1 send after flow $it")
}
}
// wait a minute
delay(100)
launch {
sharedFlow.collect {
println("collect2 received shared flow $it")
}
}
}

}

image.png


image.png


SUSPEND情况下从第一张图知道collect1都收集了,第二张图发现collect2也打印了两次,为什么只有两次呢?


因为 extraBufferCapacity = 2,等于2,错过了两次的事件发送的接收,不信的话可以试一下extraBufferCapacity = 0,这时候肯定打印了4次,可能有人问为什么是4次呢,因为collect2的订阅者延时了100毫秒才开始订阅,



  • DROP_LATEST模式

private fun testSharedFlow2() {

val sharedFlow = MutableSharedFlow<Int>(
replay = 0,//相当于粘性数据
extraBufferCapacity = 2,//接受的慢时候,发送的入栈
onBufferOverflow = BufferOverflow.DROP_LATEST

)
lifecycleScope.launch {
launch {

sharedFlow.collect {
println("collect1 received ago shared flow $it")
delay(100)
}
}
launch {
(1..5).forEach {
println("emit1 send ago flow $it")
sharedFlow.emit(it)
println("emit1 send after flow $it")
}
}
// wait a minute
delay(100)
launch {
sharedFlow.collect {
println("collect2 received shared flow $it")
}
}
}

}

发送过快的话,销毁最新的,只保留最老的两条事件,我们可以知道1,2,肯定保留其他丢失


image.png


要想不丢是怎么办呢,很简单不要产生背压现象就行,在emit中延时delay(200),比收集耗时长就行。



  • DROP_OLDEST模式
    该模式同理DROP_LATEST模式,保留最新的extraBufferCapacity = 2(多少)的数据就行

四、StateFlow


初始化


val stateFlow = MutableStateFlow<Int>(value = -1)

image.png


image.png


由上图的继承关系可知stateFlow其实就是一种特殊的SharedFlow,它多了个初始值value


image.png
由上图可知:每次更新数据都会和旧数据做一次比较,只有不同时候才会更新数值。


SharedFlow和StateFlow的侧重点



  • StateFlow就是一个replaySize=1的sharedFlow,同时它必须有一个初始值,此外,每次更新数据都会和旧数据做一次比较,只有不同时候才会更新数值。
  • StateFlow重点在状态,ui永远有状态,所以StateFlow必须有初始值,同时对ui而言,过期的状态毫无意义,所以stateFLow永远更新最新的数据(和liveData相似),所以必须有粘滞度=1的粘滞事件,让ui状态保持到最新。另外在一个时间内发送多个事件,不会管中间事件有没有消费完成都会执行最新的一条.(中间值会丢失)
  • SharedFlow侧重在事件,当某个事件触发,发送到队列之中,按照挂起或者非挂起、缓存策略等将事件发送到接受方,在具体使用时,SharedFlow更适合通知ui界面的一些事件,比如toast等,也适合作为viewModel和repository之间的桥梁用作数据的传输。

eg测试如下中间值丢失:


    private fun testSharedFlow2() {
val stateFlow = MutableStateFlow<Int>(value = -1)

lifecycleScope.launch {
launch {

stateFlow.collect {
println("collect1 received ago shared flow $it")
}
}
launch {
(1..5).forEach {
println("emit1 send ago flow $it")
stateFlow.emit(it)
println("emit1 send after flow $it")
}
}
// wait a minute
delay(100)
launch {
stateFlow.collect {
println("collect2 received shared flow $it")
}
}
}

}

由下图可知,中间值丢失,collect2结果可知永远有状态
image.png
好了到这里文章就结束了,源码分析后续再写。


作者:五问
链接:https://juejin.cn/post/7142038525997744141
来源:稀土掘金
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

0 个评论

要回复文章请先登录注册