DartにおけるStreamとStreamControllerの使い方をざっとまとめます。
- Streamの概念
- Streamの基本的な使い方
- Streamのハンドリング色々
- Stream Controllerを使ってStreamを作る
- StreamControllerのStreamを複数回Listenする
- 参考
Flutter 3.19.4
Streamの概念
Streamとは、値やイベントが何回かに分けてバラバラに送り出されるものです。
例えば「ボタン」を複数クリックすることを考えると、これはクリックイベントが何回かに分けてバラバラに送り出されているとみなすことができます。
Streamはこういうものを処理するための仕組みです。
また、重いファイルなどを少しずつバラバラにロードすることで、メモリ負荷やスパイクを抑えたりするのにも使われます。
Streamの基本的な使い方
次に簡単な例でStreamの作り方とハンドリングの仕方を学びます。
void main() async { // 1秒間隔で1から10までの値を流すStreamを作成 var stream = countStream(10); // Streamから流れてきた値を出力 await logStream(stream); } // Streamの作り方 Stream<int> countStream(int to) async* { for (int i = 1; i <= to; i++) { yield i; // yieldで値を流す(イベントを発行する) await Future.delayed(const Duration(seconds: 1)); } } // Streamのハンドリングの仕方 Future logStream(Stream<int> stream) async { // await for でstreamに流れてきた値を処理できる await for (final value in stream) { print(value); } }
これを実行すると、一秒間隔で1から10までの数値がコンソールに出力されます。
動かしたい人は↓のサイトで実行確認できます。Flutter便利・・
また、以下のようにlisten
メソッドを使うと無名関数を使って流れてきた値を処理できます。
void main() async { var stream = countStream(10); // listenを使うと無名関数を使って値を受け取ることができる stream.listen((value) { print(value); }); } Stream<int> countStream(int to) async* { for (int i = 1; i <= to; i++) { yield i; await Future.delayed(const Duration(seconds: 1)); } }
Streamのハンドリング色々
前節の最後でlisten
メソッドを使いましたが、Streamにはこれ以外にもメソッドが用意されているのでこれらを少しみていきます。
以下のelementAt
は指定した要素のStreamにおけるインデックスを取得するメソッドです。
void main() async { var stream = countStream(10); var index = await stream.elementAt(5); print(index); // 6 } Stream<int> countStream(int to) async* { for (int i = 1; i <= to; i++) { yield i; await Future.delayed(const Duration(seconds: 1)); } }
このようにStreamに処理するメソッドは以下にまとまっているので、ざっと把握しておくと良さそうです。
これとは別に、Streamを変換するメソッドもあります。
以下は最初の三つの値だけを流すStreamに変換している例です。
void main() async { var stream = countStream(10); // 最初の三つだけを取得 await for (var value in stream.take(3)) { print(value); } } Stream<int> countStream(int to) async* { for (int i = 1; i <= to; i++) { yield i; await Future.delayed(const Duration(seconds: 1)); } }
Streamを変換するメソッドも以下にまとまっているので、こちらも軽くみておくと良さそうです。
Stream Controllerを使ってStreamを作る
ここまでは async*
(非同期ジェネレータ関数)を使ってStreamを作ってきました。
これとは別に、StreamController
クラスを使う方法があります。
StreamControllerを以下のように、任意のタイミングで値を流すStreamを作ることができます。
import 'dart:async'; void main() async { var stream = createStream(); var subscription = stream.listen((value) { print(value); }, onDone: () { print('done'); }); // 3.5秒後に一時停止 await Future.delayed(const Duration(milliseconds: 3500)); subscription.pause(); // 5秒後に再開 await Future.delayed(const Duration(seconds: 5)); subscription.resume(); } Stream<int> countStream(int to) async* { for (int i = 1; i <= to; i++) { yield i; await Future.delayed(const Duration(seconds: 1)); } } // StreamControllerによるStreamの作成方法 Stream<int> createStream() { late StreamController<int> controller; Timer? timer; var count = 0; // タイマーのコールバック void tick(_) { count++; controller.add(count); if (count == 10) { timer?.cancel(); controller.close(); // closeを呼んだらストリームの終了 } } // タイマー開始(一秒ごとにtickを呼ぶ) void startTimer() { timer = Timer.periodic(const Duration(seconds: 1), tick); } // タイマー停止 void stopTimer() { timer?.cancel(); timer = null; } // StreamControllerを作成 // コンストラクタ引数に、このStreamがlistenされたとき、pauseされたとき、resumeされたとき、cancelされたときのコールバックを指定できる controller = StreamController<int>( onListen: startTimer, onPause: stopTimer, onResume: startTimer, onCancel: stopTimer); // Streamを返す return controller.stream; }
上記を実行すると、まず1,2,3の値がコンソールに表示され、5秒ちょい経ってから残りの値が順次表示されることが確認できます。
ちなみに上記でcontroller.add
しているところをcontroller.sink.add
としているコードを(なぜか)よく見かけますが、やってることは同じです。
StreamControllerのStreamを複数回Listenする
なお、前節の方法でStreamController
を作った場合、そのStreamを2回Listenすることはできません(例外が発生します)。
複数回ListenできるStreamを作る場合には、以下のようにStreamController<>.broadcast
コンストラクタを使います。
import 'dart:async'; void main() async { var stream = createStream(); stream.listen((value) { print("1-$value"); }, onDone: () { print('1-done'); }); stream.listen((value) { print("2-$value"); }, onDone: () { print('2-done'); }); } Stream<int> countStream(int to) async* { for (int i = 1; i <= to; i++) { yield i; await Future.delayed(const Duration(seconds: 1)); } } Stream<int> createStream() { late StreamController<int> controller; Timer? timer; var count = 0; void tick(_) { count++; controller.add(count); if (count == 10) { timer?.cancel(); controller.close(); } } void startTimer() { timer = Timer.periodic(const Duration(seconds: 1), tick); } void stopTimer() { timer?.cancel(); timer = null; } // broadcastを指定すると複数のリスナーを持てる controller = StreamController<int>.broadcast( onListen: startTimer, onCancel: stopTimer); return controller.stream; }
複数Listenされている際にonListen
やonCancel
が呼ばれる条件など、詳細な使用に関しては以下のドキュメントを参照してください。