【Flutter】【Dart】StreamとStreamControllerをざっと理解する

DartにおけるStreamとStreamControllerの使い方をざっとまとめます。

Flutter 3.19.4

Streamの概念

Streamとは、値やイベントが何回かに分けてバラバラに送り出されるものです。

例えば「ボタン」を複数クリックすることを考えると、これはクリックイベントが何回かに分けてバラバラに送り出されているとみなすことができます。
Streamはこういうものを処理するための仕組みです。

Stream

また、重いファイルなどを少しずつバラバラにロードすることで、メモリ負荷やスパイクを抑えたりするのにも使われます。

Streamの基本的な使い方

次に簡単な例でStreamの作り方とハンドリングの仕方を学びます。

void main() async {
  // 1秒間隔で1から10までの値を流すStreamを作成
  var stream = countStream(10);
  // Streamから流れてきた値を出力
  await logStream(stream);
}

// Streamの作り方
Stream<int> countStream(int to) async* {
  for (int i = 1; i <= to; i++) {
    yield i; // yieldで値を流す(イベントを発行する)
    await Future.delayed(const Duration(seconds: 1));
  }
}

// Streamのハンドリングの仕方
Future logStream(Stream<int> stream) async {
  // await for でstreamに流れてきた値を処理できる
  await for (final value in stream) {
    print(value);
  }
}

これを実行すると、一秒間隔で1から10までの数値がコンソールに出力されます。

動かしたい人は↓のサイトで実行確認できます。Flutter便利・・

dartpad.dev

また、以下のようにlistenメソッドを使うと無名関数を使って流れてきた値を処理できます。

void main() async {
  var stream = countStream(10);
  // listenを使うと無名関数を使って値を受け取ることができる
  stream.listen((value) {
    print(value);
  });
}

Stream<int> countStream(int to) async* {
  for (int i = 1; i <= to; i++) {
    yield i;
    await Future.delayed(const Duration(seconds: 1));
  }
}

Streamのハンドリング色々

前節の最後でlistenメソッドを使いましたが、Streamにはこれ以外にもメソッドが用意されているのでこれらを少しみていきます。

以下のelementAtは指定した要素のStreamにおけるインデックスを取得するメソッドです。

void main() async {
  var stream = countStream(10);
  var index = await stream.elementAt(5);
  print(index); // 6
}

Stream<int> countStream(int to) async* {
  for (int i = 1; i <= to; i++) {
    yield i;
    await Future.delayed(const Duration(seconds: 1));
  }
}

このようにStreamに処理するメソッドは以下にまとまっているので、ざっと把握しておくと良さそうです。

dart.dev

これとは別に、Streamを変換するメソッドもあります。
以下は最初の三つの値だけを流すStreamに変換している例です。

void main() async {
  var stream = countStream(10);
  // 最初の三つだけを取得
  await for (var value in stream.take(3)) {
    print(value);
  }
}

Stream<int> countStream(int to) async* {
  for (int i = 1; i <= to; i++) {
    yield i;
    await Future.delayed(const Duration(seconds: 1));
  }
}

Streamを変換するメソッドも以下にまとまっているので、こちらも軽くみておくと良さそうです。

dart.dev

Stream Controllerを使ってStreamを作る

ここまでは async*(非同期ジェネレータ関数)を使ってStreamを作ってきました。
これとは別に、StreamControllerクラスを使う方法があります。

StreamControllerを以下のように、任意のタイミングで値を流すStreamを作ることができます。

import 'dart:async';

void main() async {
  var stream = createStream();
  var subscription = stream.listen((value) {
    print(value);
  }, onDone: () {
    print('done');
  });

  // 3.5秒後に一時停止
  await Future.delayed(const Duration(milliseconds: 3500));
  subscription.pause();

  // 5秒後に再開
  await Future.delayed(const Duration(seconds: 5));
  subscription.resume();
}

Stream<int> countStream(int to) async* {
  for (int i = 1; i <= to; i++) {
    yield i;
    await Future.delayed(const Duration(seconds: 1));
  }
}

// StreamControllerによるStreamの作成方法
Stream<int> createStream() {
  late StreamController<int> controller;
  Timer? timer;
  var count = 0;

  // タイマーのコールバック
  void tick(_) {
    count++;
    controller.add(count);
    if (count == 10) {
      timer?.cancel();
      controller.close(); // closeを呼んだらストリームの終了
    }
  }

  // タイマー開始(一秒ごとにtickを呼ぶ)
  void startTimer() {
    timer = Timer.periodic(const Duration(seconds: 1), tick);
  }

  // タイマー停止
  void stopTimer() {
    timer?.cancel();
    timer = null;
  }

  // StreamControllerを作成
  // コンストラクタ引数に、このStreamがlistenされたとき、pauseされたとき、resumeされたとき、cancelされたときのコールバックを指定できる
  controller = StreamController<int>(
      onListen: startTimer,
      onPause: stopTimer,
      onResume: startTimer,
      onCancel: stopTimer);

  // Streamを返す
  return controller.stream;
}

上記を実行すると、まず1,2,3の値がコンソールに表示され、5秒ちょい経ってから残りの値が順次表示されることが確認できます。

ちなみに上記でcontroller.addしているところをcontroller.sink.addとしているコードを(なぜか)よく見かけますが、やってることは同じです。

stackoverflow.com

StreamControllerのStreamを複数回Listenする

なお、前節の方法でStreamControllerを作った場合、そのStreamを2回Listenすることはできません(例外が発生します)。
複数回ListenできるStreamを作る場合には、以下のようにStreamController<>.broadcastコンストラクタを使います。

import 'dart:async';

void main() async {
  var stream = createStream();

  stream.listen((value) {
    print("1-$value");
  }, onDone: () {
    print('1-done');
  });

  stream.listen((value) {
    print("2-$value");
  }, onDone: () {
    print('2-done');
  });
}

Stream<int> countStream(int to) async* {
  for (int i = 1; i <= to; i++) {
    yield i;
    await Future.delayed(const Duration(seconds: 1));
  }
}

Stream<int> createStream() {
  late StreamController<int> controller;
  Timer? timer;
  var count = 0;

  void tick(_) {
    count++;
    controller.add(count);
    if (count == 10) {
      timer?.cancel();
      controller.close();
    }
  }

  void startTimer() {
    timer = Timer.periodic(const Duration(seconds: 1), tick);
  }

  void stopTimer() {
    timer?.cancel();
    timer = null;
  }

  // broadcastを指定すると複数のリスナーを持てる
  controller = StreamController<int>.broadcast(
      onListen: startTimer, onCancel: stopTimer);

  return controller.stream;
}

複数Listenされている際にonListenonCancelが呼ばれる条件など、詳細な使用に関しては以下のドキュメントを参照してください。

api.flutter.dev

参考

dart.dev

dart.dev

api.flutter.dev