【C#】【Unity】マルチスレッドによる並列処理を排他制御する方法まとめ

C#における並列処理の排他制御についてまとめました。
特にUnityに依存する内容ではないですが、Unityで動作確認を行っています。

Unity2019.4.4

排他制御の必要性

複数スレッドで並列処理を行う場合には共有の変数などの取り扱いに注意をする必要があります。
例えば以下のように一つの変数を100回インクリメントするTaskを100個作成したとします。

using System.Collections.Generic;
using System.Threading.Tasks;
using UnityEngine;

public class Example : MonoBehaviour
{
    public async void Start()
    {
        var result = 0;
        
        // resultを100回インクリメントするタスクを作るメソッド
        Task CreateTask()
        {
            return Task.Run(() =>
            {
                for (var m = 0; m < 100; m++) {
                    result++; // 同時に処理しちゃいけない部分(クリティカルセクション)
                }
            });
        }
        
        // ↑のタスクを100個作って終了を待つ
        var tasks = new List<Task>();
        for (var i = 0; i < 100; i++)
        {
            tasks.Add(CreateTask());
        }
        await Task.WhenAll(tasks);
        
        // 出力される結果は7555, 9558, 8613, 9015と実行するごとにばらつく(期待する結果は10000)
        Debug.Log(result);
    }
}

このとき期待される結果は100 * 100 = 10000ですが、実際には7555, 9558, 8613, 9015と実行するごとに結果がばらついてしまいます。
これはあるスレッドが演算を行っている途中に他スレッドからの演算が行われてしまうためです。

このインクリメント部分のように複数のスレッドから同時に処理してはいけない部分をクリティカルセクションといいます。
以下ではクリティカルセクション排他制御する方法についてまとめます。

基本的な排他処理:lockObject

まずはlockObjectによる排他制御です。
これは以下のコメントのように、object型のインスタンスを作成してlockステートメントクリティカルセクションを囲うだけです。

using System.Collections.Generic;
using System.Threading.Tasks;
using UnityEngine;

public class Example : MonoBehaviour
{
    public async void Start()
    {
        var result = 0;
        
        // objectをインスタンス化
        var lockObject = new object();
        
        Task CreateTask()
        {
            return Task.Run(() =>
            {
                for (var m = 0; m < 100; m++) {
                    // lock(lockObject){}でクリティカルセクションを囲む
                    lock (lockObject)
                    {
                        result++;
                    }
                }
            });
        }
        
        var tasks = new List<Task>();
        for (var i = 0; i < 100; i++)
        {
            tasks.Add(CreateTask());
        }
        await Task.WhenAll(tasks);
        
        Debug.Log(result);
    }
}

これを実行すると正常に10000という結果が得られます。

単純な排他処理:Interlocked

上記のようなインクリメントやデクリメント、加算などのような単純な処理であればInterlockedも使えます。
以下のようにInterlockedに定義されているクラスを使用するだけで簡単に排他制御ができます。

using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using UnityEngine;

public class Example : MonoBehaviour
{
    public async void Start()
    {
        var result = 0;
        
        Task CreateTask()
        {
            return Task.Run(() =>
            {
                for (var m = 0; m < 100; m++) {
                    // Interlockedのメソッドを使って処理するとその部分は排他制御される
                    Interlocked.Increment(ref result);
                }
            });
        }
        
        var tasks = new List<Task>();
        for (var i = 0; i < 100; i++)
        {
            tasks.Add(CreateTask());
        }
        await Task.WhenAll(tasks);
        
        Debug.Log(result);
    }
}

並列数に上限を設ける : SemaphoreSlim

セマフォとは並列して実行できる数のことを指します。
SemaphoreSlimクラスを以下のように使うと例えば並列可能なTaskの上限数を3に設定したりできます。

using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using UnityEngine;

public class Example : MonoBehaviour
{
    [SerializeField] private int _threadCount;
    [SerializeField] private int _completeThreadCount;

    public async void Start()
    {
        // 上限を3とするセマフォを作る
        var semaphore = new SemaphoreSlim(3);
        
        Task CreateTask()
        {
            return Task.Run(() =>
            {
                try
                {
                    // セマフォの数が上限値だったら待つ
                    semaphore.Wait();
                    Interlocked.Increment(ref _threadCount);
                    Thread.Sleep(300);
                    Interlocked.Decrement(ref _threadCount);
                    Interlocked.Increment(ref _completeThreadCount);
                }
                finally
                {
                    // セマフォを一つ解放する
                    semaphore.Release();
                }
            });
        }
        
        var tasks = new List<Task>();
        for (var i = 0; i < 100; i++)
        {
            tasks.Add(CreateTask());
        }
        await Task.WhenAll(tasks);
        
        Debug.Log("Complete");
    }
}

これを再生してInspectorを見ると常に三つを上限としてタスクが動いていることが確認できます。

f:id:halya_11:20201013165017p:plain
Inspector

Semaphore

SemaphreSlimの他にSemaphoreというクラスも存在します。
使い方としてはSemaphoreSlimと同じような感じで使えます。

using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using UnityEngine;

public class Example : MonoBehaviour
{
    [SerializeField] private int _threadCount;
    [SerializeField] private int _completeThreadCount;

    public async void Start()
    {
        // 上限を3とするセマフォを作る
        var semaphore = new Semaphore(3, 3);
        
        Task CreateTask()
        {
            return Task.Run(() =>
            {
                try
                {
                    // セマフォの数が上限値だったら待つ
                    semaphore.WaitOne();
                    Interlocked.Increment(ref _threadCount);
                    Thread.Sleep(300);
                    Interlocked.Decrement(ref _threadCount);
                    Interlocked.Increment(ref _completeThreadCount);
                }
                finally
                {
                    // セマフォを一つ解放する
                    semaphore.Release();
                }
            });
        }
        
        var tasks = new List<Task>();
        for (var i = 0; i < 100; i++)
        {
            tasks.Add(CreateTask());
        }
        await Task.WhenAll(tasks);
        
        Debug.Log("Complete");
    }
}

ただしこちらは名前をつけて他プロセスからも参照できるのでマルチプロセスのアプリケーションで使用されるようです。

docs.microsoft.com

EventWaitHandle

今回のような排他制御では特に使う必要もないとは思いますが、EventWaitHandleを使う方法もあります。

light11.hatenadiary.com

関連

light11.hatenadiary.com

参考

docs.microsoft.com