【Flutter 异步编程 - 叁】 | 初步认识 Stream 类的使用
一、分析 Stream 对象
要了解一个事物,最好去思考它存在的 价值
。当你可以意识到某个事物的作用,缺少它会有什么弊端,自然会有兴趣去了解它。而不是稀里糊涂的看别人怎么用,自己死记硬背 API
有哪些,分别表示什么意思。一味的堆砌知识点,这样无论学什么都是流于表面,不得要领。
1. Stream 存在的必要性
可能很多朋友都没有在开发中使用过 Stream
对象,知道它挺重要,但又不知道他的具体的用途。有种只可远观,不可亵玩的距离感。Stream
可以弥补 Future
的短板,它对于异步来说是一块很重要的版块。
一个 Future
对象诞生的那一刻,无论成败,它最终注定只有一个结果。就像一个普通的网络接口,一次请求只会有一个响应结果。应用开发在绝大多数场景是一个 因
,对应一个 果
,所以和 Future
打交道比较多。
但有些场景,任务无法一次完成,对于 一次
请求,会有 若干次
响应。比如现实生活中,你追更一部小说,在你订阅后,作者每次新时,都会通知你。在这个场景下,小说完结代表任务结束,期间会触发多次响应通知,这是 Future
无法处理的。
另外,事件通知的时间不确定的,作者创作的过程也是非常耗时的,所以机体没有必要处于同步等待
的阻塞状态。像这种 异步事件序列
被形象的称之为 Stream 流
。
在人类科学中,一件重要事物的存在,必然有其发挥效用的场所,在这片领域之下,它是所向披靡的王。在接触新知识、新概念时,感知这片领域非常重要,一个工具只有在合适的场景下,才能发挥最大的效力。
2.从读取文件认识 Stream 的使用
File
对象可以通过 readAsString
异步方法读取文件内容,返回 Future<String>
类型对象。而 Future
异步任务只有一次响应机会,通过 then
回调,所以该方法会将文件中的 所有字符
读取出来。
---->[File#readAsString]---
Future<String> readAsString({Encoding encoding = utf8});
但有些场景中没有必要
或 不能
全部读取。比如,想要在一个大文件中寻找一些字符,找到后就 停止读取
;想要在读取文件时 显示
读取进度。这时,只能响应一次事件的 Future
就爱莫能助了,而这正是 Stream
大显身手的领域。在 File
类中有 openRead
方法返回 Stream
对象,我们先通过这个方法了解一下 Stream
的使用方式。
Stream<List<int>> openRead([int? start, int? end]);
现在的场景和上面 追更小说
是很相似的:
小说作者
无需一次性向读者
提供所有的章节;小说是一章章
进行更新的,每次更新章节,都需要通知读者
进行阅读。操作系统
不用一次性读取全部文件内容,返回给请求的机体
;文件是一块块
进行读取的,每块文件读取完,需要通知机体
进行处理。
在对 Stream
的理解中,需要认清两个角色: 发布者
和 订阅者
。其中发布者是真正处理任务的机体,是结果的生产者,比如 作者
、操作系统
、服务器
等,它们有 发送通知
的义务。订阅者是发送请求的机体,对于异步任务,其本身并不参与到执行过程中,可以监听通知来获取需要的结果数据。
代码处理中 Stream
对象使用 listen
方法 监听通知
,该方法的第一入参是回调函数,每次通知时都会被触发。回调函数的参数类型是 Stream
的泛型,表示此次通知时携带的结果数据。
StreamSubscription<T> listen(void onData(T event)?,
{Function? onError, void onDone()?, bool? cancelOnError});
如下是通过 Stream
事件读取文件,显示读取进度的处理逻辑。当 openRead
任务分发之后,操作系统会一块一块地对文件进行读取,每读一块会发送通知。Dart
代码中通过 _onData
函数进行监听,回调的 bytes
就是读取的字节数组结果。
在 _onData
函数中根据每次回调的字节数,就可以很轻松地计算出读取的进度。 onDone
指定的函数,会在任务完成时被触发,任务完成也就表示不会再有事件通知了。
void readFile() async {
File file = File(path.join(Directory.current.path, "assets", "Jane Eyre.txt"));
print("开始读取 Jane Eyre.txt ");
fileLength = await file.length();
Stream<List<int>> stream = file.openRead();
stream.listen(_onData,onDone: _onDone);
}
void _onData(List<int> bytes) {
counter += bytes.length;
double progress = counter * 100 / fileLength;
DateTime time = DateTime.now();
String timeStr = "[${time.hour}:${time.minute}:${time.second}:${time.millisecond}]";
print(timeStr + "=" * (progress ~/ 2) + '[${progress.toStringAsFixed(2)}%]');
}
void _onDone() {
print("读取 Jane Eyre.txt 结束");
}
3.初步认识 StreamSubscription
Stream#listen
方法监听后,会返回一个 StreamSubscription
对象,表示此次对流的订阅。
StreamSubscription<T> listen(void onData(T event)?,
{Function? onError, void onDone()?, bool? cancelOnError});
通过这个订阅对象,可以暂停 pause
或恢复 resume
对流的监听,以及通过 cancel
取消对流的监听。
---->[StreamSubscription]----
void pause([Future<void>? resumeSignal]);
void resume();
Future<void> cancel();
比如下面当进度大于 50
时,取消对流的订阅:通过打印日志可以看出 54.99%
时,订阅取消,流也随之停止,可以注意一个细节。此时 onDone
回调并未触发,表示当 Stream
任务被取消订阅时,不能算作完成。
late StreamSubscription<List<int>> subscription;
void readFile() async {
File file = File(path.join(Directory.current.path, "assets", "Jane Eyre.txt"));
print("开始读取 Jane Eyre.txt ");
fileLength = await file.length();
Stream<List<int>> stream = file.openRead();
// listen 方法返回 StreamSubscription 对象
subscription = stream.listen(_onData,onDone: _onDone);
}
void _onData(List<int> bytes) async{
counter += bytes.length;
double progress = counter * 100 / fileLength;
DateTime time = DateTime.now();
String timeStr = "[${time.hour}:${time.minute}:${time.second}:${time.millisecond}]";
print(timeStr + "=" * (progress ~/ 2) + '[${progress.toStringAsFixed(2)}%]');
if(progress > 50){
subscription.cancel(); // 取消订阅
}
}
二、结合应用理解 Stream 的使用
单看 Dart
代码在控制台打印,实在有些不过瘾。下面通过一个有趣的小例子,介绍 Stream
在 Flutter
项目中的使用。这样可以更形象地认识 Stream
的用途,便于进一步理解。
1. 场景分析
现实生活中如果细心观察,会发现很多 Stream
概念的身影。比如在银行办理业务时,客户可以看作 Stream
中的一个元素,广播依次播报牌号,业务员需要对某个元素进行处理。在餐馆中,每桌的客人可以看作 Stream
中的一个元素,客人下单完成,厨师根据请求准备饭菜进行处理。这里,通过模拟 红绿灯
的状态变化,来说明 Stream
的使用。
可以想象,在一个时间轴上,信号灯的变化是一个连续不断的事件。我们可以将每次的变化视为 Stream
中的一个元素,信号灯每秒的状态信息都会不同。也就是说,这个 Stream
每秒会产出一个状态,要在应用中模拟红绿灯,只需要监听每次的通知,更新界面显示即可。
这里将信号灯的状态信息通过 SignalState
类来封装,成员变量有当前秒数 counter
和信号灯类型 type
。 其中信号灯类型通过 SignalType
枚举表示,有如下三种类型:
const int _kAllowMaxCount = 10;
const int _kWaitMaxCount = 3;
const int _kDenialMaxCount = 10;
class SignalState {
final int counter;
final SignalType type;
SignalState({
required this.counter,
required this.type,
});
}
enum SignalType {
allow, // 允许 - 绿灯
denial, // 拒绝 - 红灯
wait, // 等待 - 黄灯
}
2. 信号灯组件的构建
如下所示,信号灯由三个 Lamp
组件和数字构成。三个灯分别表示 红、黄、绿
,某一时刻只会量一盏,不亮的使用灰色示意。三个灯水平排列,有一个黑色背景装饰,和文字呈上下结构。
先看灯 Lamp
组件的构建:逻辑非常简单,使用 Container
组件显示圆形,构造时可指定颜色值,为 null
时显示灰色。
class Lamp extends StatelessWidget {
final Color? color;
const Lamp({Key? key, required this.color}) : super(key: key);
@override
Widget build(BuildContext context) {
return Container(
width: 40,
height: 40,
decoration: BoxDecoration(
color: color ?? Colors.grey.withOpacity(0.8),
shape: BoxShape.circle,
),
);
}
}
如下是 SignalLamp
组件的展示效果,其依赖于 SignalState
对象进行显示。根据 SignalType
确定显示的颜色和需要点亮的灯,状态中的 counter
成员用于展示数字。
class SignalLamp extends StatelessWidget {
final SignalState state;
const SignalLamp({Key? key, required this.state}) : super(key: key);
Color get activeColor {
switch (state.type) {
case SignalType.allow:
return Colors.green;
case SignalType.denial:
return Colors.red;
case SignalType.wait:
return Colors.amber;
}
}
@override
Widget build(BuildContext context) {
return Column(
children: [
Container(
padding: const EdgeInsets.symmetric(horizontal: 15, vertical: 10),
decoration: BoxDecoration(
color: Colors.black, borderRadius: BorderRadius.circular(30),),
child: Wrap(
alignment: WrapAlignment.center,
crossAxisAlignment: WrapCrossAlignment.center,
spacing: 15,
children: [
Lamp(color: state.type == SignalType.denial ? activeColor : null),
Lamp(color: state.type == SignalType.wait ? activeColor : null),
Lamp(color: state.type == SignalType.allow ? activeColor : null),
],
),
),
Text(
state.counter.toString(),
style: TextStyle(
fontWeight: FontWeight.bold, fontSize: 50, color: activeColor,
),
)
],
);
}
}
4. Stream 事件的添加与监听
这样,指定不同的 SignalState
就会呈现相应的效果,如下是黄灯的 2 s
:
SignalLamp(
state: SignalState(counter: 2, type: SignalType.wait),
)
在使用 Stream
触发更新之前,先说一下思路。Stream
可以监听一系列事件的触发,每次监听会获取新的信号状态,根据新状态渲染界面即可。如下在 SignalState
中定义 next
方法,便于产出下一状态。逻辑很简单,如果数值大于一,类型不变,数值减一,比如 红灯 6
的下一状态是 红灯 5
; 如果数值等于一,会进入下一类型的最大数值,比如 红灯 1
的下一状态是 黄灯 3
。
---->[SignalState]----
SignalState next() {
if (counter > 1) {
return SignalState(type: type, counter: counter - 1);
} else {
switch (type) {
case SignalType.allow:
return SignalState(
type: SignalType.denial, counter: _kDenialMaxCount);
case SignalType.denial:
return SignalState(type: SignalType.wait, counter: _kWaitMaxCount);
case SignalType.wait:
return SignalState(type: SignalType.allow, counter: _kAllowMaxCount);
}
}
}
把每个事件通知看做元素,Stream
应用处理事件序列,只不过序列中的元素在此刻是未知的,何时触发也是不定的。Stream
基于 发布-订阅
的思想通过监听来处理这些事件。 其中两个非常重要的角色: 发布者
是元素的生产者,订阅者
是元素的消费者。
在引擎中的 async
包中封装了 StreamController
类用于控制元素的添加操作,同时提供 Stream
对象用于监听。代码处理如下,tag1
处,监听 streamController
的 stream
对象。事件到来时触发 emit
方法 ( 方法名任意
),在 emit
中会回调出 SignalState
对象,根据这个新状态更新界面即可。然后延迟 1s
继续添加下一状态。
---->[_MyHomePageState]----
final StreamController<SignalState> streamController = StreamController();
SignalState _signalState = SignalState(counter: 10, type: SignalType.denial);
@override
void initState() {
super.initState();
streamController.stream.listen(emit); // tag1
streamController.add(_signalState);
}
@override
void dispose() {
super.dispose();
streamController.close();
}
void emit(SignalState state) async {
_signalState = state;
setState(() {});
await Future.delayed(const Duration(seconds: 1));
streamController.add(state.next());
}
这样 streamController
添加元素,作为 发布者
;添加的元素可以通过 StreamController
的 stream
成员进行监听。
5. Stream 的控制与异常监听
在前面介绍过 Stream#listen
方法会返回一个 StreamSubscription
的订阅对象,通过该对象可以暂停、恢复、取消对流的监听。如下所示,通过点击按钮执行 _toggle
方法,可以达到 暂停/恢复
切换的效果:
---->[_MyHomePageState]----
late StreamSubscription<SignalState> _subscription;
@override
void initState() {
super.initState();
_subscription = streamController.stream.listen(emit);
streamController.add(_signalState);
}
void _toggle() {
if(_subscription.isPaused){
_subscription.resume();
}else{
_subscription.pause();
}
setState(() {});
}
另外,StreamController
在构造时可以传入四个函数来监听流的状态:
final StreamController<SignalState> streamController = StreamController(
onListen: ()=> print("=====onListen====="),
onPause: ()=> print("=====onPause====="),
onResume: ()=> print("=====onResume====="),
onCancel: ()=> print("=====onCancel====="),
);
onListen
会在 stream
成员被监听时触发一次;onPause
、onResume
、onCancel
分别对应订阅者的 pause
、 resume
、cancel
方法。如下是点击暂停和恢复的日志信息:
在 Stream#listen
方法中还有另外两个可选参数用于异常的处理。 onError
是错误的回调函数,cancelOnError
标识用于控制触发异常时,是否取消 Stream
。
StreamSubscription<T> listen(void onData(T event)?,
{Function? onError, void onDone()?, bool? cancelOnError});
如下所示,在 emit
中故意在 红 7
时通过 addError
添加一个异常元素。这里界面简单显示错误信息,在 3 s
后异常被修复,继续添加新元素。
void emit(SignalState state) async {
_signalState = state;
setState(() {});
await Future.delayed(const Duration(seconds: 1));
SignalState nextState = state.next();
if (nextState.counter == 7 && nextState.type == SignalType.denial) {
streamController.addError(Exception('Error Signal State'));
} else {
streamController.add(nextState);
}
}
在 listen
方法中使用 onError
监听异常事件,进行处理:其中逻辑是渲染错误界面,三秒后修复异常,继续产出下一状态:
_subscription = streamController.stream.listen(
emit,
onError: (err) async {
print(err);
renderError();
await Future.delayed(const Duration(seconds: 3));
fixError();
emit(_signalState.next());
},
cancelOnError: false,
);
关于异常的处理,这里简单地提供 hasError
标识进行构建逻辑的区分:
bool hasError = false;
void renderError(){
hasError = true;
setState(() {});
}
void fixError(){
hasError = false;
}
最后说一下 listen
中 cancelOnError
的作用,它默认是 false
。如果 cancelOnError = true
,在监听到异常之后,就会取消监听 stream
,也就是说之后控制器添加的元素就会监听了。这样异常时 StreamController
会触发 onCancel
回调:
三、异步生成器函数与 Stream
前面介绍了通过 StreamController
获取 Stream
进行处理的方式,下面再来看另一种获取 Stream
的方式 - 异步生成器函数
。
1. 思考 Stream 与 Iterable
通过前面对 Stream
的认识,我们知道它是在 时间线
上可拥有若干个可监听的事件元素。而 Iterable
也可以拥有多个元素,两者之间是有很大差距的。Iterable
在 时间
和 空间
上都对元素保持持有关系;而 Stream
只是在时间上监听若干元素的到来,并不在任意时刻都持有元素,更不会在空间上保持持有关系。
对于一个 Type
类型的数据,在异步任务中,Stream<T>
是 Future<T>
就是多值和单值的区别,它们的结果都不能在 当前时刻
得到,只能通过监听在 未来
得到值。 与之相对的就是 Iterable<Type>
和 Type
,它们代表此时此刻,实实在在的对象,可以随时使用。
单值 | 多值 | |
---|---|---|
同步 | Type | Iterable<Type> |
异步 | Future<Type> | Stream<Type> |
2. 通过异步生成器函数获取 Stream 对象
Future
对象可以通过 async/awiat
关键字,简化书写,更方便的获取异步任务结果。 对于 Stream
也有类似的 async*/yield
关键字。 如下所示, async*
修饰的方法需要返回一个 Stream
对象。
在方法体中通过 yield
关键字 产出
泛型结果对象,如下是对 信号状态流
元素产生出的逻辑:遍历 count
次,每隔 1 s
产出一个状态。
class SignalStream{
SignalState _signalState = SignalState(counter: 10, type: SignalType.denial);
Stream<SignalState> createStream({int count = 100}) async*{
for(int i = 0 ; i < count; i++){
await Future.delayed(const Duration(seconds: 1));
_signalState = _signalState.next();
yield _signalState;
}
}
}
这样,在 _MyHomePageState
中通过 signalStream.createStream()
就可以创建一个有 100
个元素的流,进行监听。每次接收到新状态时,更新界面,也可以达到目的:
---->[_MyHomePageState]---
final SignalStream signalStream = SignalStream();
_subscription = signalStream.createStream().listen(
emit,
);
void emit(SignalState state) async {
_signalState = state;
setState(() {});
}
到这里,关于 Stream
的初步认识就结束了,当然 Stream
的知识还有很多,在后面会陆续介绍。通过本文,你只需要明白 Stream
是什么,通过它我们能干什么就行了。下一篇我们将分析一下 FutureBuilder
和 StreamBuilder
组件的使用和源码实现。它们是 Flutter
对异步对象的封装组件,通过对它们的认识,也能加深我们对 Future
和 Stream
的立即。 那本文就到这里,谢谢观看 ~
作者:张风捷特烈
链接:https://juejin.cn/post/7147881475688366093
来源:稀土掘金
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。