R3の非同期メソッド版のObservable.Createでストリームを直感的に書く方法です。
Unity 2022.3.28f1
R3 1.2.9
はじめに
R3はC#でReactive Extensions(Rx)を使うためのライブラリで、UniRxの後継という位置付けです。
UniRxではObservable.Createで自由にストリームを作成することができました。
R3でも引き続きObservable.Createは使えますが、非同期メソッドからストリームを作成できるオーバーロードが登場しました。
本記事ではこれの使い方についてまとめます。
使い方
それでは早速使い方を示します。
以下の例では「1秒ごとに現在時刻を通知するObservable」を作っていますが、こういった処理がasync/awaitを使って直感的にかけます。
using System; using Cysharp.Threading.Tasks; using R3; using UnityEngine; public sealed class Example : MonoBehaviour { private void Start() { CreateExampleObservable() .Subscribe(x => { Debug.Log($"OnNext: {x}"); }, result => { Debug.Log($"OnCompleted: {result.IsSuccess}"); }) .AddTo(this); } private static Observable<DateTime> CreateExampleObservable() { // Observable.Createの非同期版を使う return Observable.Create<DateTime>(async (obs, token) => { // 1秒ごとに現在時刻を通知、10回で終わり for (var i = 0; i < 10; i++) { await UniTask.Delay(1000, cancellationToken: token); // Observer.OnNextで通知 obs.OnNext(DateTime.UtcNow); } // Observer.OnCompletedで完了を通知 obs.OnCompleted(); }); } }
説明はコメントに書いた通りです。
エラーハンドリング
さて前節のコードではエラーが発生してもそれを知る手段がありません。
そのためエラーハンドリングを追加します。
using System; using Cysharp.Threading.Tasks; using R3; using UnityEngine; public sealed class Example : MonoBehaviour { private void Start() { CreateExampleObservable() .Subscribe(x => { Debug.Log($"OnNext: {x}"); }, result => { Debug.Log($"OnCompleted: {result.IsSuccess}"); }) .AddTo(this); } private static Observable<DateTime> CreateExampleObservable() { return Observable.Create<DateTime>(async (obs, token) => { try { for (var i = 0; i < 10; i++) { await UniTask.Delay(1000, cancellationToken: token); obs.OnNext(DateTime.UtcNow); } } catch (Exception e) { // エラーとして終了する(エラーの扱いは概念レベルでUniRxと違うので注意) obs.OnCompleted(e); } obs.OnCompleted(); }); } }
R3ではUniTaskと違ってOnCompleteで成功状態を通知できるため、Observer.OnCompletedに例外を渡すことでエラーを伝えます。
ちなみに本記事では扱いませんがOnErrorResume を使うと、OnErrorにエラーを通知しつつ、Complete状態にしない(処理は続ける)ということもできます。
エラーが発生しても処理が止めない場合はこっちを使います。