【Unity】【UniRx】ストリームの合成方法まとめ

ストリームを合成するオペレータのまとめです。

Merge

Mergeは合成元のストリームから値が流れてきたらすべてそのまま流します。
全ての合成元ストリームがCompletedとなったら合成後ストリームもCompletedとなります。
また合成元ストリームは合成後ストリームを購読した時点で購読されます。

var subject1   = new Subject<int>();
var subject2    = new Subject<int>();

var subject     = Observable
    .Merge(subject1, subject2)
    .Subscribe(x => Debug.Log("next: " + x + "."), () => Debug.Log("completed."));


subject1.OnNext(0); // next: 0.
subject2.OnNext(1); // next: 1.
subject1.OnCompleted();
subject2.OnCompleted(); // completed.

Concat

Concatは合成元のストリームを合成した順に流します。
最初の合成元ストリームがCompletedとなったら次の合成元ストリームを購読します。
最後の合成元ストリームがCompletedとなった時点で合成後ストリームもCompletedとなります。

var subject1   = new Subject<int>();
var subject2    = new Subject<int>();

var subject     = Observable
    .Concat(subject1, subject2)
    .Subscribe(x => Debug.Log("next: " + x + "."), () => Debug.Log("completed."));


subject1.OnNext(1); // next: 1.
subject2.OnNext(2); // 流れない
subject1.OnCompleted(); // この時点でsubject2が購読される
subject1.OnNext(1); // 流れない
subject2.OnNext(2); // next: 2.
subject2.OnCompleted(); // completed.

Zip

Zipは合成元のストリームのうち全てに一回以上値が流れた時点で合成後のストリームに値を流します。
合成前の各ストリームが流した値をリストにして流します。
合成元のストリームのうち、2回以上値が流れているものがあったら先に流れた値を採用します。

var subject1   = new Subject<int>();
var subject2    = new Subject<int>();

Observable
    .Zip(subject1, subject2)
    .Subscribe(x => 
    {
        foreach (var item in x) {
            Debug.Log("next." + item);
        }
    }
    , () => Debug.Log("completed."));

    
subject1.OnNext(1); // 流れない
subject2.OnNext(2); // next: 1.  >  next: 2.
subject2.OnNext(3); // 流れない
subject2.OnNext(4); // 流れない
subject1.OnNext(5); // next: 5.  >  next: 3.
subject1.OnCompleted();
subject2.OnCompleted(); // completed.

ZipLastest

Zipとほぼ同じです。
が、合成元のストリームのうち、2回以上値が流れているものがあったら後に流れた値を採用します。

var subject1   = new Subject<int>();
var subject2    = new Subject<int>();

Observable
    .ZipLatest(subject1, subject2)
    .Subscribe(x => 
    {
        foreach (var item in x) {
            Debug.Log("next." + item);
        }
    }
    , () => Debug.Log("completed."));

    
subject1.OnNext(1); // 流れない
subject2.OnNext(2); // next: 1.  >  next: 2.
subject2.OnNext(3); // 流れない
subject2.OnNext(4); // 流れない
subject1.OnNext(5); // next: 5.  >  next: 4.
subject1.OnCompleted();
subject2.OnCompleted(); // completed.

CombineLatest

合成前のストリームのいずれかに値が流れてきたら、
合成前の全てのストリームの最新の値をリストにして合成後のストリームに流します。

一回も流れていないストリームがあるうちは合成後のストリームに値は流れません。

var subject1   = new Subject<int>();
var subject2    = new Subject<int>();

Observable
    .CombineLatest(subject1, subject2)
    .Subscribe(x => 
    {
        foreach (var item in x) {
            Debug.Log("next." + item);
        }
    }
    , () => Debug.Log("completed."));

    
subject1.OnNext(1); // 流れない
subject2.OnNext(2); // next: 1.  >  next: 2.
subject2.OnNext(3); // next: 1.  >  next: 3.
subject1.OnCompleted();
subject2.OnCompleted(); // completed.

WithLatestFrom

合成前の1個目のストリームに値が流れてきたらその値と2個目のストリームの最新値を合成して合成後のストリームに流します。
2個目のストリームが最新値を持たないうちは合成後のストリームに値は流れません。

var subject1   = new Subject<int>();
var subject2    = new Subject<int>();

Observable
    .WithLatestFrom(subject1, subject2, (a, b) => a + b)
    .Subscribe(x => Debug.Log("next." + x), () => Debug.Log("completed."));

subject1.OnNext(1); // 流れない
subject2.OnNext(2); // 流れない
subject2.OnNext(3); // 流れない
subject1.OnNext(4); // next: 7 (3 + 4)
subject1.OnCompleted(); // completed.
subject2.OnCompleted();

Amb

値が早く流れてきたほうのストリームを生かし、遅かったほうを殺します。

var subject1   = new Subject<int>();
var subject2    = new Subject<int>();

Observable
    .Amb(subject1, subject2)
    .Subscribe(x => Debug.Log("next." + x), () => Debug.Log("completed."));
    
subject2.OnNext(2); // next. 2
subject1.OnNext(1); // 流れない
subject2.OnNext(3); // next. 3
subject1.OnNext(4); // 流れない
subject2.OnCompleted(); // completed.
subject1.OnCompleted();

Select + Switch

合成前のストリームのいずれかに値が流れてきたら、合成前の全てのストリームの最新の値を好きなように合成して合成後のストリームに流したい場合があります。
このような場合にはSwitch + Selectを使います。

light11.hatenadiary.com