注册

Kotlin Flow入门

Flow作为Android开发中的重要的作用。尤其在Jetpack Compose里左一个collect,右一个collect。不交接Flow而开发Android是寸步难行。作为一个入门文章,如果你还不是很了解Flow的话,本文可以带你更进一步的了解Flow。


Flow是一个异步数据流,它会发出数据给收集者,最终带或者不带异常的完成任务。下面我们通过例子来学习。


假设我们正在下载一幅图片。在下载的时候,还要把下载的百分比作为值发出来,比如:1%,2%,3%,等。收集者(collector)会接收到这些值并在界面上以合适的方式显示出来。但是如果出现网络问题,任务也会因此终止。


现在我们来看一下Flow里的几个API:



  • 流构建器(Flow builder)
  • 操作符(Operator)
  • 收集器(Collector)

流构建器


简单来说,它会执行一个任务并把值发出来,有时也会只发出值而不会执行什么任务。比如简单的发出一些数字值。你可以把流构建器当做一个发言人。这个发言人会思考(做任务)和说(发出值).


操作符


操作符可以帮助转化数据。


我们可以把操作符当做是一个翻译。一个发言人说了法语,但是听众(收集器)只能听懂英语。这就需要一个翻译来帮忙了。它可以把法语都翻译成英语让听众理解。


当然,操作符可以做的远不止这些。以上的例子只是帮助理解。


收集器


Flow发出的值经过操作符的处理之后会被收集器收集。


收集器可以当做是收听者。实际上收集器也是一种操作符,它有时被称作终端操作符


第一个例子


flow { 
(0..10).forEach {
emit(it)
}
}.map {
it * it
}.collect {
Log.d(TAG, it.toString())
}

flow {}->流构建器
map {}->操作符
collect {}->收集器

我们来过一下上面的代码:



  • 首先,流构建器会发出从0到10的值
  • 之后,一个map操作符会把每个值计算(it * it)
  • 之后,收集器收集这些发出来的值并打印出来:0,1,4,9,16,25,36,49,64,81,100.

注意:collect方法把流构建器和收集器连到了一起,这个方法调用之后流就开始执行了。


流构建器的不同类型


流构建器有四种:



  1. flowOf():从一个给定的数据集合生成流
  2. asFlow(): 一个扩展方法,可以把某个类型转化成流
  3. flow{}: 我们例子中使用的方法
  4. channelFlow{}:使用构造器自带的send方法发送的元素构建流

例如:


flowOf()


flowOf(4, 2, 5, 1, 7) 
.collect {
Log.d(TAG, it.toString())
}

asFlow()


(1..5).asFlow()
.collect {
Log.d(TAG, it.toString())
}

flow{}


flow {
(0..10).forEach {
emit(it)
}
}
.collect {
Log.d(TAG, it.toString())
}

channelFlow{}


channelFlow {
(0..10).forEach {
send(it)
}
}
.collect {
Log.d(TAG, it.toString())
}

flowOn操作符


flowOn这个操作符可以控制flow任务执行的线程的类型。在Android里一般是在一个后台线程执行任务,之后在界面上更新结果。


下面的例子里加了一个500毫秒的延迟来模拟实际任务。


val flow = flow {
// Run on Background Thread (Dispatchers.Default)
(0..10).forEach {
// emit items with 500 milliseconds delay
delay(500)
emit(it)
}
}
.flowOn(Dispatchers.Default)

CoroutineScope(Dispatchers.Main).launch {
flow.collect {
// Run on Main Thread (Dispatchers.Main)
Log.d(TAG, it.toString())
}
}

本例,流的任务就会在Dispatchers.Default这个“线程”里执行。接下来就是要在UI线程里更新UI了。为了做到这一点就需要在UI线程里collect


flowOn操作符就是用来控制任务执行的线程的。它的作用和RxJava的subscribeOn类似。


Dispatchers主要有这些类型:IODefaultMain。flowOn和CoroutineScope都可以使用Dispatchers来执行任务执行的“线程”(暂且这么理解)。


使用流构造器


我们通过几个例子学习。


移动文件


这里我们用流构造器新建一个流,让流任务在后台线程执行。完成后在UI线程显示状态。


val moveFileflow = flow {
// move file on background thread
FileUtils.move(source, destination)
emit("Done")
}
.flowOn(Dispatchers.IO)

CoroutineScope(Dispatchers.Main).launch {
moveFileflow.collect {
// when it is done
}
}

下载图片


这个例子构造一个流在后台线程下载图片,并且不断的在UI线程更新下载的百分比。


val downloadImageflow = flow {
// start downloading
// send progress
emit(10)
// downloading...
// ......
// send progress
emit(75)
// downloading...
// ......
// send progress
emit(100)
}
.flowOn(Dispatchers.IO)

CoroutineScope(Dispatchers.Main).launch {
downloadImageflow.collect {
// we will get the progress here
}
}

现在你对kotlin的流也有初步的了解了,在项目中可以使用简单的流来处理异步任务。


什么是终端操作符


上文已经提到过collect()方法是一个终端操作符。所谓的终端操作符就是让流跑起来的挂起方法(suspend function)。在以上的例子中,流构造器构造出来的流是不动的,让这个流动起来的操作符就是终端操作符。比如collect


还有:



  • 转化为各种集合的,toList, toSet
  • 获取第一个first,与确保流发射单个值的操作符single
  • 使用reduce, fold这类的把流的值规约到单个值的操作符。

比如:


val sum = (1..5).asFlow()
.map { it * it } // 数字 1 至 5 的平方
.reduce { a, b -> a + b } // 求和(末端操作符)
println(sum)

冷热流


前面的例子里的流都是冷流。我们来对比一下流的不同:


冷流热流
收集器调用的时候开始发出值没有收集器也会发出值
不存储数据可以存储数据
不支持多个收集器可以支持多个收集器

冷流,如果带上了多个收集器,流会每次遇到一个收集器就从头把完整的数据发送一次。


热流遇到多个收集器的时候,流会一直发出数据,收集器开始收集数据的时候遇到的是什么数据就收集什么数据。热流的多个收集器共享一份数据。


冷流是推模式,热流是拉模式。


下面看几个例子:


冷流实例


fun getNumbersColdFlow(): ColdFlow<Int> {
return someColdflow {
(1..5).forEach {
delay(1000)
emit(it)
}
}
}

开始收集


val numbersColdFlow = getNumbersColdFlow()

numbersColdFlow
.collect {
println("1st Collector: $it")
}

delay(2500)

numbersColdFlow
.collect {
println("2nd Collector: $it")
}

输出:


1st Collector: 1
1st Collector: 2
1st Collector: 3
1st Collector: 4
1st Collector: 5

2nd Collector: 1
2nd Collector: 2
2nd Collector: 3
2nd Collector: 4
2nd Collector: 5

两个收集器都从头获取到流的数据,在每次收集的时候都相当于遇到了一个全新的流。


热流实例。本例会设置一个热流每隔一秒发出一个1到5的数值。


fun getNumbersHotFlow(): HotFlow<Int> {
return someHotflow {
(1..5).forEach {
delay(1000)
emit(it)
}
}
}

现在开始收集:


val numbersHotFlow = getNumbersHotFlow()

numbersHotFlow
.collect {
println("1st Collector: $it")
}

delay(2500)

numbersHotFlow
.collect {
println("2nd Collector: $it")
}

输出:


1st Collector: 1
1st Collector: 2
1st Collector: 3
1st Collector: 4
1st Collector: 5

2nd Collector: 3
2nd Collector: 4
2nd Collector: 5

StateFlow


在Android开发中,热流的一个很重要的应用就是StateFlow


StateFlow是一种特殊的热流,它可以允许多个订阅者。如果你使用了jetpack compose来开发app的话,StateFlow可以简单而高效的在app的不同地方享状态(state)。因为热流只发送当前的状态(而不像冷流那样从开始发送值)。


要新建一个StateFlow,可以使用MutableStateFlow,然后给它一个初始值:


val count = MutableStateFlow(0)

在这里新建了一个叫做count的StateFlow,初始值为0。要更新它的值可以使用update方法,或者value属性:


this.count.update { v -> v + 1 }
this.count.value = 10

这时,订阅了count状态的订阅者就可以收到更新之后的值了。要订阅可以这样:


count.collect {
//...
}

在冷热流之外还有两种流:回调流和通道流。这个后面会详细讲到。


SharedFlow


SharedFlow也是一种热流,主要用于事件流。它会对所有的活的收集器发送事件。不同的消费者可以在同一时间收到同一个事件。


可以使用MutableSharedFlow()方法来创建一个SharedFlow对象。可以通过replay参数指明多少个已经发送的事件可以再发送给新的收集器,默认的是0。也即是在默认情况下,收集器只会接收到开始收集之后发送过来的事件。


这个时候可以来一个例子了:


class TickHandler(
    private val externalScope: CoroutineScope,
    private val tickIntervalMs: Long = 5000
) {
    // Backing property to avoid flow emissions from other classes
    private val _tickFlow = MutableSharedFlow<Unit>(replay = 0) // 1
    val tickFlow: SharedFlow<Event<String>> = _tickFlow // 2

    init {
        externalScope.launch {
            while(true) {
                _tickFlow.emit(Unit) // 3
                delay(tickIntervalMs)
            }
        }
    }
}

class NewsRepository(
    ...,
    private val tickHandler: TickHandler, // 4
    private val externalScope: CoroutineScope
) {
    init {
        externalScope.launch {
            // Listen for tick updates
            tickHandler.tickFlow.collect { // 5
                refreshLatestNews()
            }
        }
    }

    suspend fun refreshLatestNews() { ... }
    ...
}

示例解析:



  1. MutableSharedFlow声明了一个变量_tickFlow
  2. 定义了属性tickFlow
  3. 在初始化的时候使用SharedFlow成员变量_tickFlow每隔一段时间发送一个空事件
  4. NewsRepository类里声明成员变量tickHandler
  5. NewsRepository初始化之后开始收集事件,并在收集到事件之后调用refreshLatestNews方法来更新新闻。

看完这个例子再结合上面的介绍就会更加深入的了解SharedFlow了。


注意



  • 这SharedFlow是用于事件流处理的,可不是用来维护状态(state)的。
  • SharedFlow的另外一个重要的参数是extraBufferCapacity,它决定了流要在缓存里保留多少个发送过的事件。缓存满了之后会把缓存里面的一个值清理掉,并放入新的值。
  • 要处理缓存溢出的问题可以给onBufferOverflow指定一个方法。比如当缓存满了之后,并遇到新的事件的时候清理掉最旧的值或者暂停发送新事件一直到缓存有空余。
  • 可以使用tryEmit方法来检测是否存在一个活的收集器。这样可以避免无效的事件发送。

热流的坑


如果在同一个协成里订阅了多个热流,只有第一个才会被收集。其他的永远不会得到数据。


所以,要在同一个协成里订阅多个热流可以使用combine或者zip操作符把这些热流都合成到同一个流里。或者分别在每个协程订阅一个热流。


例如:


coroutineScope.launch {
hotFlow1.collect { value ->
// 处理收到的数据
}
hotFlow2.collect { value ->
// 永远不会执行到
}
}

在本例中,第二个collect不会收到数据。因为第一个collect会运行一个无限循环。


背压 (Backpressure)


背压,顾名思义,当消费者消费的速度没有生产者生产的速度快了。在Flow遇到这个情况的时候,生产者就会挂起直到消费者可以消费更多的数据。


runBlocking {
getFastFlow().collect { value ->
delay(1000) // simulate a slow collector
process(value)
}
}

在这个例子中,getFastFlow()会生成数据的速度比process(value)的速度快。因为collect是一个挂起函数,在process(value)数据处理不过来的时候getFastFlow()就会自动挂起。这样就防止了没有处理的数据的堆积。


使用缓存处理背压


有的时候,即使消费者处理速度已经慢于生产者产生数据的速度的时候,你还是想让生产者继续生产数据。这时就可以引入缓存了。Flow可以使用buffer操作符。如:


runBlocking {
getFastFlow().buffer().collect { value -> process(value) }
}

这个例子里使用了buffer操作符,这样在process(value)还在处理旧数据的时候getFastFlow()可以接着生产新的数据。


今日份先更新到这了。to be continued...


作者:小红星闪啊闪
来源:juejin.cn/post/7271153372793946168

0 个评论

要回复文章请先登录注册