【C#】マルチスレッドで安全に使えるCollectionクラス - BlockingCollection

マルチスレッドでも安全に使えるCollectionクラスであるBlockingCollectionについてまとめます。

Unity2020.1.10(※C#の記事ですがUnityで動作確認しています)

マルチスレッドでも安全に?

マルチスレッドなプログラムを書く際には、複数のスレッドで共有する変数の取り扱いに注意する必要があります。
具体的には以下の記事のようにクリティカルセクション排他制御を掛け、複数スレッドから同時に触れないようにします。

light11.hatenadiary.com

これはコレクションクラスでも同様で、同時にAddしたりAdd中にGetしたりするのを防ぐ仕組みが必要です。
もちろん上の記事のように自前で実装しても問題ないのですが、C#ではこれを簡単に実現するためのBlockingCollectionというクラスが用意されています。

BlockingCollectionの使い方

BlockingCollectionは以下のようにして使います。

using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
using UnityEngine;

public class Example : MonoBehaviour
{
    private void Start()
    {
        const int capacity = 10; 
        
        // 第二引数に要素数を指定してコレクションを作成する(要素数はかならず事前に決める必要がある)
        // 第一引数のConcurrentBagについては次節で説明します
        var collection = new BlockingCollection<int>(new ConcurrentBag<int>(), capacity);
        
        // 非同期でデータを追加
        Parallel.For(0, capacity, _ =>
        {
            // 処理したスレッドのIDを格納する
            // この時他のスレッドがコレクションをロックしていたり要素数がいっぱいだったらスレッドを止めて待機する
            // 待機時間が第二引数のミリ秒数を超えたらfalseを返す(-1で時間無制限)
            var threadId = Thread.CurrentThread.ManagedThreadId;
            var success = collection.TryAdd(threadId, -1);
        });
        
        // データが追加し終わったことをマークする
        collection.CompleteAdding();

        while (true)
        {
            // コレクションから値を取得して削除
            // この時他のスレッドがコレクションをロックしていたらスレッドを止めて待機する
            // 待機時間が第一引数のミリ秒数を超えたらfalseを返す(-1で時間無制限)
            // 要素数がゼロかつCompleteAdding()が呼ばれてたらfalseを返す
            if (collection.TryTake(out var item, -1))
            {
                Debug.Log(item);
            }
            else
            {
                break;
            }
        }
        
        // 使い終わったらDispose
        collection.Dispose();
    }
}

これを実行するとデータの追加が様々なスレッドから行われていることが確認できます。
上記では追加のみマルチスレッドで処理していますが、もちろんTryTake()もマルチスレッドで使えます。

なお追加と取得を同時に行うようなケースではCompleteAdding()を呼ばなくても動きますが、
TryTakeする際にまだ追加されるのかどうか判定できないので、待機時間を-1にすると要素数が0でもスレッドを止めて待ち続ける挙動になります。

またすべての要素を取得しつつ削除するにはGetConsumingEnumerable()が使えます。

// コレクションから値を削除しつつ全ての値を取得
foreach (var item in collection.GetConsumingEnumerable())
{
    Debug.Log(item);
}

BlockingCollectionでQueueやStackを作る

前節で説明したBlockingCollectionのAPIからわかるように、
BlockingCollectionはある要素を取得したらその要素を削除するようなコレクションを想定して作られています。
またインデックス指定はできず、TryTake()により「次の」要素が取得できるだけです。

要素の取得順については、前節のようにコンストラクタ引数にConcurrentBagを渡すと不定となります。
これに対して以下のようにConcurrentQueueConcurrentStackを渡すとそのBlockingCollectonはそれぞれQueueやStackとして振舞います。

// BlockingCollectionはQueueとして振舞う
var queue = new BlockingCollection<int>(new ConcurrentQueue<int>(), capacity);

// BlockingCollectionはStackとして振舞う
var stack = new BlockingCollection<int>(new ConcurrentStack<int>(), capacity);

関連

light11.hatenadiary.com