【Unity】【R3】非同期メソッド版のObservable.Createでストリームを直感的に書く

R3の非同期メソッド版のObservable.Createでストリームを直感的に書く方法です。

Unity 2022.3.28f1
R3 1.2.9

はじめに

R3はC#でReactive Extensions(Rx)を使うためのライブラリで、UniRxの後継という位置付けです。

github.com

UniRxではObservable.Createで自由にストリームを作成することができました。
R3でも引き続きObservable.Createは使えますが、非同期メソッドからストリームを作成できるオーバーロードが登場しました。

本記事ではこれの使い方についてまとめます。

使い方

それでは早速使い方を示します。
以下の例では「1秒ごとに現在時刻を通知するObservable」を作っていますが、こういった処理がasync/awaitを使って直感的にかけます。

using System;
using Cysharp.Threading.Tasks;
using R3;
using UnityEngine;

public sealed class Example : MonoBehaviour
{
    private void Start()
    {
        CreateExampleObservable()
            .Subscribe(x => { Debug.Log($"OnNext: {x}"); },
                       result => { Debug.Log($"OnCompleted: {result.IsSuccess}"); })
            .AddTo(this);
    }

    private static Observable<DateTime> CreateExampleObservable()
    {
        // Observable.Createの非同期版を使う
        return Observable.Create<DateTime>(async (obs, token) =>
        {
            // 1秒ごとに現在時刻を通知、10回で終わり
            for (var i = 0; i < 10; i++)
            {
                await UniTask.Delay(1000, cancellationToken: token);
                // Observer.OnNextで通知
                obs.OnNext(DateTime.UtcNow);
            }

            // Observer.OnCompletedで完了を通知
            obs.OnCompleted();
        });
    }
}

説明はコメントに書いた通りです。

エラーハンドリング

さて前節のコードではエラーが発生してもそれを知る手段がありません。
そのためエラーハンドリングを追加します。

using System;
using Cysharp.Threading.Tasks;
using R3;
using UnityEngine;

public sealed class Example : MonoBehaviour
{
    private void Start()
    {
        CreateExampleObservable()
            .Subscribe(x => { Debug.Log($"OnNext: {x}"); },
                       result => { Debug.Log($"OnCompleted: {result.IsSuccess}"); })
            .AddTo(this);
    }

    private static Observable<DateTime> CreateExampleObservable()
    {
        return Observable.Create<DateTime>(async (obs, token) =>
        {
            try
            {
                for (var i = 0; i < 10; i++)
                {
                    await UniTask.Delay(1000, cancellationToken: token);
                    obs.OnNext(DateTime.UtcNow);
                }
            }
            catch (Exception e)
            {
                // エラーとして終了する(エラーの扱いは概念レベルでUniRxと違うので注意)
                obs.OnCompleted(e);
            }

            obs.OnCompleted();
        });
    }
}

R3ではUniTaskと違ってOnCompleteで成功状態を通知できるため、Observer.OnCompletedに例外を渡すことでエラーを伝えます。

ちなみに本記事では扱いませんがOnErrorResume を使うと、OnErrorにエラーを通知しつつ、Complete状態にしない(処理は続ける)ということもできます。
エラーが発生しても処理が止めない場合はこっちを使います。

参考

github.com