ストリームを合成するオペレータのまとめです。
Merge
Mergeは合成元のストリームから値が流れてきたらすべてそのまま流します。
全ての合成元ストリームがCompletedとなったら合成後ストリームもCompletedとなります。
また合成元ストリームは合成後ストリームを購読した時点で購読されます。
var subject1 = new Subject<int>(); var subject2 = new Subject<int>(); var subject = Observable .Merge(subject1, subject2) .Subscribe(x => Debug.Log("next: " + x + "."), () => Debug.Log("completed.")); subject1.OnNext(0); // next: 0. subject2.OnNext(1); // next: 1. subject1.OnCompleted(); subject2.OnCompleted(); // completed.
Concat
Concatは合成元のストリームを合成した順に流します。
最初の合成元ストリームがCompletedとなったら次の合成元ストリームを購読します。
最後の合成元ストリームがCompletedとなった時点で合成後ストリームもCompletedとなります。
var subject1 = new Subject<int>(); var subject2 = new Subject<int>(); var subject = Observable .Concat(subject1, subject2) .Subscribe(x => Debug.Log("next: " + x + "."), () => Debug.Log("completed.")); subject1.OnNext(1); // next: 1. subject2.OnNext(2); // 流れない subject1.OnCompleted(); // この時点でsubject2が購読される subject1.OnNext(1); // 流れない subject2.OnNext(2); // next: 2. subject2.OnCompleted(); // completed.
Zip
Zipは合成元のストリームのうち全てに一回以上値が流れた時点で合成後のストリームに値を流します。
合成前の各ストリームが流した値をリストにして流します。
合成元のストリームのうち、2回以上値が流れているものがあったら先に流れた値を採用します。
var subject1 = new Subject<int>(); var subject2 = new Subject<int>(); Observable .Zip(subject1, subject2) .Subscribe(x => { foreach (var item in x) { Debug.Log("next." + item); } } , () => Debug.Log("completed.")); subject1.OnNext(1); // 流れない subject2.OnNext(2); // next: 1. > next: 2. subject2.OnNext(3); // 流れない subject2.OnNext(4); // 流れない subject1.OnNext(5); // next: 5. > next: 3. subject1.OnCompleted(); subject2.OnCompleted(); // completed.
ZipLastest
Zipとほぼ同じです。
が、合成元のストリームのうち、2回以上値が流れているものがあったら後に流れた値を採用します。
var subject1 = new Subject<int>(); var subject2 = new Subject<int>(); Observable .ZipLatest(subject1, subject2) .Subscribe(x => { foreach (var item in x) { Debug.Log("next." + item); } } , () => Debug.Log("completed.")); subject1.OnNext(1); // 流れない subject2.OnNext(2); // next: 1. > next: 2. subject2.OnNext(3); // 流れない subject2.OnNext(4); // 流れない subject1.OnNext(5); // next: 5. > next: 4. subject1.OnCompleted(); subject2.OnCompleted(); // completed.
CombineLatest
合成前のストリームのいずれかに値が流れてきたら、
合成前の全てのストリームの最新の値をリストにして合成後のストリームに流します。
一回も流れていないストリームがあるうちは合成後のストリームに値は流れません。
var subject1 = new Subject<int>(); var subject2 = new Subject<int>(); Observable .CombineLatest(subject1, subject2) .Subscribe(x => { foreach (var item in x) { Debug.Log("next." + item); } } , () => Debug.Log("completed.")); subject1.OnNext(1); // 流れない subject2.OnNext(2); // next: 1. > next: 2. subject2.OnNext(3); // next: 1. > next: 3. subject1.OnCompleted(); subject2.OnCompleted(); // completed.
WithLatestFrom
合成前の1個目のストリームに値が流れてきたらその値と2個目のストリームの最新値を合成して合成後のストリームに流します。
2個目のストリームが最新値を持たないうちは合成後のストリームに値は流れません。
var subject1 = new Subject<int>(); var subject2 = new Subject<int>(); Observable .WithLatestFrom(subject1, subject2, (a, b) => a + b) .Subscribe(x => Debug.Log("next." + x), () => Debug.Log("completed.")); subject1.OnNext(1); // 流れない subject2.OnNext(2); // 流れない subject2.OnNext(3); // 流れない subject1.OnNext(4); // next: 7 (3 + 4) subject1.OnCompleted(); // completed. subject2.OnCompleted();
Amb
値が早く流れてきたほうのストリームを生かし、遅かったほうを殺します。
var subject1 = new Subject<int>(); var subject2 = new Subject<int>(); Observable .Amb(subject1, subject2) .Subscribe(x => Debug.Log("next." + x), () => Debug.Log("completed.")); subject2.OnNext(2); // next. 2 subject1.OnNext(1); // 流れない subject2.OnNext(3); // next. 3 subject1.OnNext(4); // 流れない subject2.OnCompleted(); // completed. subject1.OnCompleted();
Select + Switch
合成前のストリームのいずれかに値が流れてきたら、合成前の全てのストリームの最新の値を好きなように合成して合成後のストリームに流したい場合があります。
このような場合にはSwitch + Selectを使います。