新しもの好きプログラマの耳より情報ブログ

仕事でもあるプログラミングについて役に立ちそうな情報を発信していこうというブログです。役に立たなそうな情報はfacebookで。

スレッドセーフ等が付いたキューとして使えるSystem.Threading.Channels (1) 最低限の使い方を紹介

概要

大量に発生するイベントを、いったんキューに積んであとで処理したい。しかし、キューと言っても.NETの普通のQueue<T>では最低限の機能しか無く、排他やイベント待ちなど自分で色々考える必要があって辛い。もっと便利に使えるものは無いか?有るぞ!という話です。

最初に結論まとめ

System.Threading.Channelsを使いましょう。Pub/Subメッセージングを実現するライブラリなので、取り出す側が一つだけになるような、いわゆるキューの用途もカバーしています。こんな感じで使えます。

//初期設定
Channel<int> queue = Chennel.CreateUnbounded<int>();

//キューに積む方(スレッド1)
int enqueueData = 1;
await queue.Writer.WriteAsync(enqueueData);

//キューから取り出す方(スレッド2)
await foreach(var dequeueData = queue.Reader.ReadAllAsync()){
    //取り出したdequeueDataの処理
}

System.Threading.Channelsはとても高機能なのですが、そのためWebで探しても情報量が多すぎて使い方がピンと来づらいように思います。なのでこの記事では、キューとして使うという視点で分かりやすくするために、その目的での最低限の使い方だけを説明します。

実用的でスレッドセーフなキューを作るのは意外と厄介

キューを作るだけならそのまんまの名前のQueue<T>を使えば、単に動かすことは簡単です。しかし、実際に使おうとすると、色々考えることがあります。少なくとも入れる側と出す側は別スレッドなので、排他(スレッドセーフ)の考慮は必要でしょう。他にも、出す側はポーリングするよりもイベント待ちにして、キューが空の時は何も動かない方が良い。キューの中身が増えすぎたらそれ以上メモリを食わないようにドロップしたい。等々。半端に独自実装してこうした点を考慮漏れすると、トラブルの元です。

有るものを使おう

そうした点をカバーしてくれるライブラリがあります。System.Threading.Channelsです。名前空間からも察しが付きますが、MS公式のいわゆる.NET Platform Extensionsです。ライセンス的にも信頼性としても使いやすいですね。

Queue<T>と違って次の点まで考慮済みなので、すぐに実用に使うことができます。

  1. 入れる側と出す側の排他(スレッドセーフ)
  2. 出す側のイベント待ち(キューが空の時は動かない)
  3. 最大サイズを制限するか、それともサイズ無限かを設定出来る

これは代表的なところで、他にも用途に合わせて調整出来る機能が色々とあります。

使い方

まずはキューとして使うための最小限の使い方を説明して、その後に基本的なパラメータの設定方法を説明します。続けて色々な使い方を説明しようと思いますが、長くなるのでそれは次回の記事にする予定です。

インスタンス作成

NuGetで、System.Threading.Channelsをインストールします。

まず最初に、こんな感じで型を決めてキューのインスタンスを作成します。これはint型を積む上限サイズがないタイプのキューです。

Channel<int> queue = Chennel.CreateUnbounded<int>();

このインスタンスをフィールドなどに持って、Enqueue側とDequeue側で共有します。とりあえず、これだけでもキューとして使えます。

書き込み(Enqueue)側

次のように、作成したインスタンスが持つ「Writer」を使用します。Writeが、キューにおけるEnqueueに当たります。

await queue.Writer.WriteAsync(1);

読み込み(Dequeue)側

次のように、作成したインスタンスが持つ「Reader」を使用します。Readが、キューにおけるDequeueに当たります。この例だと、Writeした「1」を取り出すことができます。

await foreach(var dequeueData = queue.Reader.ReadAllAsync()){
  //dequeueData == 1
}

作成したインスタンス自体(ここではqueue)はスレッドセーフなので、書き込み側と読み込み側で同じものを使うだけでOKです。改めて排他を考慮する必要はありません。

ただし、そのインスタンスのメンバ「Writer」と「Reader」は、それぞれをマルチスレッドで使うのかシングルスレッドで使うのかを意識する必要があります。この後のマルチスレッドのパラメータ設定の所で説明します。

動作を決めるパラメータでよく使うものをいくつか紹介します。

キューの最大アイテム数を設定

インスタンスを作る時に、キューを無限にする場合はChannel.CreateUnbounded()を、最大アイテム数を決める場合はChannel.CreateBounded()を使います。

CreateBounded()の場合は最大アイテム数の他に、最大値を超えた時にどれをドロップするのかも設定できます。(入れようとしたアイテムを捨てるのか、キュー内の一番古いアイテムを捨てるのか、等)

次のように、CreateBounded()の引数にBoundedChannelOptionsを与えることで設定します。ここでは、1000アイテムを超えると、新しくWriteしたアイテムを捨てるという設定にしています。

Channel.CreateBounded<int>(
    new BoundedChannelOptions(1000)
    {
        FullMode = BoundedChannelFullMode.DropWrite,
    });

他にも、新しくWriteしたアイテムは捨てないが、既存のアイテムの一番古いものを捨てる、などといった設定ができます。

マルチスレッド設定

Writer・Readerそれぞれについて、シングルスレッドで読み書きするのか、マルチスレッドがあり得るのかを設定出来ます。実装上で保証出来る場合は、シングルスレッドの設定にした方がおそらく処理効率は良いのだと思います。

同じくBoundedChannelOptionsで、次のように設定します。読み込み側がシングルスレッドならばSingleReader=trueとし、マルチスレッドならばfalseとします。書き込み側はSingleWriterで、設定内容は同じです。

Channel.CreateBounded<int>(
    new BoundedChannelOptions(1000)
    {
        SingleReader = true,
        SingleWriter = true,
    });

まとめ

高機能ながら、単純にキューとして使うだけなら意外と手軽。そんな便利ライブラリSystem.Threading.Channelsを紹介しました。たいていの用途では、単純なQueue<T>よりも使いやすいと思います。

今回はここまでですが、その他の使い方は別の記事で紹介しようと思います。