添加链接
link之家
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接
相关文章推荐
快乐的马铃薯  ·  Webex Contact Center ...·  4 月前    · 
踢足球的皮蛋  ·  使用Beautiful ...·  6 月前    · 
安静的黄豆  ·  mysql中order by ...·  11 月前    · 
直爽的乒乓球  ·  Android 源码 ...·  1 年前    · 
  • 转变已有的Stream
  • 使用 async* 关键字创建流
  • 使用 StreamController 来创建流

1.转变已有的Stream

在已经有了一个流,想基于原始流事件创建一个新的流。例如,希望通过UTF-8对输入进行解码,将一个字节流转换为字符串流。最常用的方法是创建一个新流,该流等待原始流上的事件,然后输出新的事件。

Stream<String> lines(Stream<String> source) async* {
  var partial = '';
  await for (var chunk in source) {
    var lines = chunk.split('\n');
    lines[0] = partial + lines[0];
    partial = lines.removeLast();
    for (var line in lines) {
      yield line;
  if (partial.isNotEmpty) yield partial;

例如,假设你有一个流,counterStream,每秒释放一个递增的计数器。下面是如何实现的:

var counterStram =
      Stream<int>.periodic(Duration(seconds: 1), (x) => x).take(15);
  counterStram.forEach(print);

要转换流事件,您可以在监听流之前在流上调用转换方法,如map()。该方法返回一个新的流。

var counterStram =
      Stream<int>.periodic(Duration(seconds: 1), (x) => x).take(15);
var doubleCounterStream = counterStram.map((event) => event * 2);
doubleCounterStream.forEach(print);

通常,你只需要一个转换方法。但是,如果您需要对转换进行更多的控制,您可以使用Stream 的 transform()方法指定StreamTransformer。平台库为许多常见任务提供流转换器。例如,下面的代码使用了由dart:convert库提供的utf8.decoder和LineSplitter转换器。

main(List<String> args) async {
  Stream<List<int>> content = File(r'.\t6.dart').openRead();
  List<String> lines =
      await content.transform(utf8.decoder).transform(LineSplitter()).toList();
  lines.forEach((element) {
    print(element);
  });

2.从头开始创建流

创建新流的一种方法是使用异步生成器(async*)函数。流是在调用函数时创建的,当侦听流时,函数的主体开始运行。当函数返回时,流关闭。在函数返回之前,它可以使用yieldyield*语句在流上发出事件。
下面是一个基本的示例,它定期发出数字.

Stream<int> timedCounter(Duration interval, [int maxCount]) async* {
  int i = 0;
  while (true) {
    await Future.delayed(interval);
    yield i++;
    if (i == maxCount) break;
main(List<String> args) async{
  var stream = timedCounter(Duration(seconds: 2), 5);
  await for (var i in stream) {
    print(i);

当侦听器取消(通过在listen()方法返回的StreamSubscription对象上调用cancel())时,那么下一次主体到达yield语句时,yield将充当返回语句。执行任何封闭的finally块,函数退出。如果函数试图在退出前生成一个值,则会失败。
当该函数最终退出时,cancel()方法返回的future就完成了。如果函数因错误而退出,则将来会因该错误而结束;否则,它以null结束。
以下示例是将Future序列转化为一个Stream

Stream<T> streamFromFutures<T>(Iterable<Future<T>> futures) async* {
	for(var future in futures){
		var result = await future;
		yield result;

这个函数请求可迭代的future来获得一个新的future,等待那个future,发出结果值,然后进行循环。如果一个future完成时出现了错误,那么流也会随着该错误完成。
用一个async函数从零开始构建流是很少见的。它需要从某个地方获取数据,而这个地方通常是另一个流。在某些情况下,如上面的Futures序列,数据来自其他异步事件源。然而,在许多情况下,async函数过于简单,无法轻松处理多个数据源, 这就是StreamController(流控制器)类存在的原因。

3.流控制器

如果流的事件来自程序的不同部分,而不只是来自可以由异步函数遍历的Stream或Future,则使用StreamController创建和填充流。
StreamController提供了一个新流,以及在任何点和任何地方向流中添加事件的方法。流具有处理侦听器和暂停所需的所有逻辑。返回Stream或者保持Controller完全可以自己控制。
下面的示例(来自流控制器bad.dart)展示了流控制器的基本用法(尽管有缺陷),以实现前面示例中的timedCounter()函数。这段代码创建一个要返回的流,然后根据计时器事件(既不是Future事件也不是Stream
事件)将数据提供给它。

import 'dart:async';
Stream<int> timedCounter(Duration interval, [int maxCount]) {
  var controller = StreamController<int>();
  int counter = 0;
  void tick(Timer timer) {
    counter++;
    controller.add(counter);
    if (maxCount != null && counter >= maxCount) {
      timer.cancel();
      controller.close();
  Timer.periodic(interval, tick);
  return controller.stream;
main(List<String> args) {
  var countStream = timedCounter(Duration(seconds: 1), 10);
  countStream.listen(print);

以上代码存的timedCounter()存在两个问题。

  • 在订阅者接收之前已经开始生产事件
  • 当订阅者暂停,它仍然持续生产事件

在接下来的内容中,将使用onListenonPause来解决上述问题。

3.1等待订阅

作为一个规则,流应该等待订阅者开工作才开始生产事件。

import 'dart:async';
Stream<int> timedCounter(Duration interval, [int maxCount]) {
  var controller = StreamController<int>();
  int counter = 0;
  void tick(Timer timer) {
    counter++;
    controller.add(counter);
    if (maxCount != null && counter >= maxCount) {
      timer.cancel();
      controller.close();
  Timer.periodic(interval, tick);
  return controller.stream;
void listenAfterDelay() async {
  var counterStream = timedCounter(const Duration(seconds: 1), 15);
  await Future.delayed(const Duration(seconds: 5));
  await for (int n in counterStream) {
    print(n);
main(List<String> args) {
  listenAfterDelay();

当这段代码运行时,在前5秒内没有打印任何东西,但是流已经在工作中。所以当暂停5秒结束,流中已经缓存了5个事件。所以观察输出结果可以发现前5个结果几乎同时输出。

3.2遵守暂停状态

为了避免在侦听器请求暂停时产生事件,async* 函数自动在yield 处暂停。但是StreamController在订阅者暂停时也持续生产事件,这样可能会导致流的缓冲区无限增长。

void listenWithPause() {
  var counterStream = timedCounter(const Duration(seconds: 1), 15);
  StreamSubscription<int> subscription;
  subscription = counterStream.listen((counter) {
    print(counter);
    if (counter == 5) {
      subscription.pause(Future.delayed(const Duration(seconds: 5)));
  });

以上例子中,当subscription暂停5秒时,Stream仍然在持续生产。
以下版本的timedCounter()通过使用StreamController上的onListen、onPause、onResume和onCancel回调来实现暂停。

import 'dart:async';
Stream<int> timedCounter(Duration interval, [int maxCount]) {
  StreamController<int> controller;
  Timer timer;
  int counter = 0;
  void tick(_) {
    counter++;
    controller.add(counter);
    if (counter == maxCount) {
      timer.cancel();
      controller.close();
  void startTimer() {
    timer = Timer.periodic(interval, tick);
  void stopTimer() {
    if (timer != null) {
      timer.cancel();
      timer = null;
  controller = StreamController<int>(
      onListen: startTimer,
      onPause: stopTimer,
      onResume: startTimer,
      onCancel: stopTimer);
  return controller.stream;
void listenWithPause() {
  var counterStream = timedCounter(const Duration(seconds: 1), 15);
  StreamSubscription<int> subscription;
  subscription = counterStream.listen((counter) {
    print(counter);
    if (counter == 5) {
      subscription.pause(Future.delayed(const Duration(seconds: 5)));
  });
main(List<String> args) {
  listenWithPause();

上述例子中,当订阅者执行pause时,Stream同时也会执行onPause方法实现暂停生产事件。

当不使用async*函数来创建Stream时,需要记住以下几点:

  • 避免使用同步controllerStreamController(sync: true)。当向未暂停的同步控制器上发送事件时,改事件会立即送到所有的流侦听器上。在添加侦听器代码完全返回之前是不能调用侦听器的,但是同步控制器可能会破坏这一规则。
  • 如果使用StreamController,则在listen调用返回StreamSubscription之前调用onListen回调。不要让onListen回调依赖于已经存在的订阅。
  • StreamController定义的onListen、onPause、onResume和onCancel回调在流侦听器状态改变时被流调用,但不会在别的改变状态的回调函数执行期间调用。在这些情况下,状态更改回调被延迟,直到前一个回调完成。
  • 不要尝试自己实现流接口。在事件、回调以及添加和删除侦听器之间进行交互很容易出错。始终使用现有流(可能来自StreamController)来实现新流的listen调用。
  • 尽管可以通过扩展Stream类并在上面实现listen方法和额外功能来创建具有更多功能的扩展Stream的类,但通常不建议这样做,因为它引入了用户必须考虑的新类型。通常的做法是创建一个类,该类中又一个stream成员变量。
文章目录Dart中创建Stream1.转变已有的Stream2.从头开始创建流3.流控制器3.1等待订阅3.2遵守暂停状态Final hintsDart中创建Stream可以从以下几个方法来创建流转变已有的Stream使用async* 关键字创建流使用StreamController来创建流1.转变已有的Stream在已经有了一个流,想基于原始流事件创建一个新的流。例如,希望通过UTF-8对输入进行解码,将一个字节流转换为字符串流。最常用的方法是创建一个新流,该流等待原始流上的事件,然后输 像这样使用string-to-stream : var str = require ( 'string-to-stream' ) str ( 'hi there' ) . pipe ( process . stdout ) // => 'hi there' main(List<String> arguments) { var systemTempDir = Directory.systemTemp; //在系统临时目录下创建两个目录一个文件 new File('${systemTempDir.path}/dir/su...
文章目录简介File读取整个文件以流的形式读取文件随机访问文件的写入处理异常总结 文件操作是IO非常常见的一种操作,那么对应dart语言来说,操作文件是不是很简单呢?实际上dart提供了两种读取文件的方式,一种是一次性全部读取,一种是将文件读取为流。 一次性读取的缺点是需要将文件内容一次性全部载入到内存,如果遇到文件比较大的情况,就会比较尴尬。所以还需要流式读取文件的方式。一起来看看dart这两种文件的读取方式吧。 事实上dart有很多地方都有File这个类,这里我们要讲解的File
简介:Stream是一个异步的事件队列, 常见的同步的队列会提供迭代接口让你在外部循环从队列拿到一个事件并处理。这得基于你的事件队列里面已经有一些待处理的事件,处理事件的节奏是由外部循环控制的。 当你不知道下一个事件什么时候会发生时,就可以对这个队列设定一个监听,当有事件发生的时由队列来调用预先的处理程序。 其实就是 一个观察者模型 当处理事件的时候是否要切换线程? 队列支持单个订阅者还是多个订阅者? 1.创建
番石榴事件总线 这个库提供了一个谷歌番石榴风格的事件总线。 EventBus 允许组件之间发布订阅式的通信,而无需组件显式地相互注册(从而相互了解)。 它专为使用显式注册取代传统的进程内事件分发而设计。 它不是一个通用的发布-订阅系统,也不是用于进程间通信的。 // Class is typically registered by the container. class EventBusChangeRecorder { @subscribe void recordCustomerChange(ChangeEvent e) { recordChange(e.getChange()); // somewhere during initialization eventBus.register(new EventBusChangeRecorder());
Dart是基于事件循环机制的单线程模型 一条执行线上,同时且只能执行一个任务(事件),其他任务都必须在后面排队等待被执行。也就是说,在一条执行线上,为了不阻碍代码的执行,每遇到的耗时任务都会被挂起放入任务队列,待执行结束后再按放入顺序依次执行队列上的任务,从而达到异步效果。 单线程模型按照代码编写的顺序,自上而下运行,这是我们所认知的,但是当遇到耗时操作(IO/网络请求)等,会给UI造成卡顿阻...
Stream 顾名思义就是流,简单理解,其实就是一个异步数据队列而已。我们知道队列的特点是先进先出的,Stream也正是如此。 Stream 分为两种,单订阅流(single subscription) 和 广播流(broadcast)。 单订阅流的特点是只允许存在一个监听器,即使该监听器被取消后,也不允许再次注册监听器。 如何创建Stream Stream.periodic perio...
直接上代码 StreamController<String> _streamController = StreamController();//创建Stream 控制器 @override //重写方法,销毁流式通道 void dispose() { _streamController.close(); super.dispose(); //接收消息 StreamBuilder<String>() //初始值 initialData