RxのSubjectをイチから実装してみたメモです。
ソースコード
早速ですがソースコードです。
public class ExampleSubject<T> : ISubject<T> { private bool _didComplete = false; private System.Exception _exception; private readonly List<IObserver<T>> _observers = new List<IObserver<T>>(); /// <summary> /// 購読する /// </summary> public System.IDisposable Subscribe(IObserver<T> observer) { if (_exception != null) { // エラーになっている場合 observer.OnError(_exception); } else if (_didComplete) { // Completeしている場合 observer.OnCompleted(); } else { if (!_observers.Contains(observer)) { _observers.Add(observer); } } return new Unsubscriber<T>(_observers, observer); } /// <summary> /// 値を通知する /// </summary> public void OnNext(T value) { // すでにCompleteしていたら何もしない if (_didComplete) { return; } foreach (var observer in _observers) { observer.OnNext(value); } } /// <summary> /// ストリームを完了する /// </summary> public void OnCompleted() { // すでにCompleteしてたら何もしない if (_didComplete) { return; } foreach (var observer in _observers) { observer.OnCompleted(); } _didComplete = true; } /// <summary> /// ストリームをエラーにする /// </summary> public void OnError(System.Exception error) { // すでにCompleteしてたら何もしない if (_didComplete) { return; } foreach (var observer in _observers) { observer.OnError(error); } _exception = error; _didComplete = true; } } public class Unsubscriber<T> : System.IDisposable { private readonly List<IObserver<T>> _observers; private readonly IObserver<T> _observer; public Unsubscriber(List<IObserver<T>> observers, IObserver<T> observer) { _observers = observers; _observer = observer; } public void Dispose() { // 自身をListから削除する if (_observer != null && _observers.Contains(_observer)) { _observers.Remove(_observer); } } }
説明はコメントに書いた通りです。
使い方
使い方は通常のSubjectと同様です。
// 1, 2, 3と表示 var subject = new ExampleSubject<int>(); subject.Subscribe(x => Debug.Log(x)); subject.OnNext(1); subject.OnNext(2); subject.OnNext(3); subject.OnCompleted(); subject.OnNext(4);