【Unity】【UniRx】ForeachAsyncオペレータの用途と使い方

UniRxのForeachAsyncオペレータの用途と使い方です。

ForeachAsync?

いま、OnNext()が複数回呼ばれるストリームを考えます。

Observable
    .Interval(System.TimeSpan.FromSeconds(3))
    .Take(3)
    .Subscribe(x => Debug.Log("OnNext"));

これについて、以下のような要件を考えます。

  • Do()のようにOnNext()が来た時に何か処理を挟みたい
  • ただし最終的に作るストリームはOnNext()とOnCompleted()が1回だけ同時に呼ばれるようにしたい

まずForeachAsync()を使わない場合、以下のような実装をすればこれを達成できます。

Observable
    .Interval(System.TimeSpan.FromSeconds(3))
    .Take(3)
    .Do(x => Debug.Log("Do"))     // OnNextのときの処理する
    .Last()                                 // OnCompleted時にだけ値を流す
    .AsUnitObservable()                     // 流れる値をUnit.Deafultにする
    .Subscribe(x => Debug.Log("OnNext"));

これを、ForeachAsync()を使えば簡潔に書けます。

Observable
    .Interval(System.TimeSpan.FromSeconds(3))
    .Take(3)
    .ForEachAsync(x => Debug.Log("Do"))
    .Subscribe(x => Debug.Log("OnNext"));

つまりForeachAsyncとは、値が流れてきたときに処理を行いつつ、
OnCompleted()と同時に一回だけOnNext()が呼ばれるようなUnitObservableに変換するオペレータだといえます。

実用例(WhenAllとの比較)

例えばリソースをまとめてロードするストリームを作るときはWhenAll()とかを使うと思います。

var loadStreams        = new List<IObservable<Texture2D>>();
loadStreams
    .WhenAll()
    .Subscribe(x => Debug.Log("All textures have been loaded."));

ただしこれでは個々のリソースがロードされたときに処理ができません。
そういったときには代わりにMerge() + ForEachAsyncを使います。

var loadStreams        = new List<IObservable<Texture2D>>();
loadStreams
    .Merge()
    .ForEachAsync(x => Debug.Log("Loaded : " + x.name))
    .Subscribe(x => Debug.Log("All textures have been loaded."));