WebSurfer's Home

トップ > Blog 1   |   ログイン
APMLフィルター

タスク並列ライブラリ (TPL) その 2

by WebSurfer 2021年7月20日 14:11

注意: 以下は .NET 5.0 のコンソールアプリの例です。.NET 6.0 ではスレッドプールからのスレッドの取得に改善があったようで結果が大幅に異なります (Parallel Link は改善なし)。.NET 6.0 での結果を下の方に追記しておきます。

先の記事「タスク並列ライブラリ (TPL)」で、TPL に代えて、複数のタスクをスレッドプールで実行するように設定し、並列化については OS 任せにするコードを紹介しました。

TPL を使った場合は "使用可能なすべてのプロセッサを最も効率的に使用するようにコンカレンシーの程度を動的に拡大" するそうですが、それとの違いを調べてみました。(独断&自分流の調べ方なのでハズレがあるかも)

(1) TPL, PLINK を使わない場合

下の画像は、TPL, PLINK は使わないで、Task.Delay(3000).Wait() で 3 秒遅延するコードを含む同期メソッドを、Task.Run メソッド を使って 100 個キューに配置し、終了を await Task.WhenAll(...) で待機した結果です。

並列化は OS 任せ

コードはこの記事の下の方に記載したサンプルコードを見てください。その中で、実行対象の同期メソッドが Work、それを 100 個キューに置いてスレッドプールで実行するのが TaskRunAsync メソッドです。

環境は Windows 10 Pro 64-bit、Core i7-9700K 8 コア 8 論理プロセッサ、Visual Studio 2019、.NET 5.0 のコンソールアプリです。

で、TPL を使った場合との比較ですが、同じ同期メソッド Work を 100 個 Parallel.Invoke, Parallel.For, Parallel.ForEach で実行した結果と比べると、全体の実行時間はどれも 25 秒前後でほとんど違いはなかったです。(なぜか Parallel LINK は後述するように期待外れでした)

唯一気になった違いは、上の画像の n = 2, n = 4, n = 18 のように必要以上の時間スレッドを解放できないケースがあるということです。TPL にはそれは無かったです。全体の実行時間が TPL と比べて 1 ~ 2 秒遅かったのはそのせいかもしれません。問題の種を含んでいるということなのでしょうか。

ほかに興味深かったのは、PC のコア数(画像の minWorker: 8 がそれ)まではスレッドプールから一気にスレッドを取得できるが(n = 0 ~ 7)、さらにスレッドを取得しようとすると少し時間がかかる(n = 8, 9, 10, 11)ということでした。

それは CLR スレッドプールの仕様らしいです。ネットで見つけた記事「ThreadPool Growth: Some Important Details」に書いてありましたが 500ms かかるとのことです。(Microsoft の公式文書は見つけられていませんが結果を見る限り間違いなさそう)

500ms の制限は TPL を使用しても同じらしいです。なので、TPL を使用するしないにかかわらず、スレッドプールのスレッドをバースト的に多数使用する場合は設定を変更するのが良さそうです。(設定方法は上に紹介した記事に書いてあります)

以下に、Parallel.Invoke メソッド、Parallel.For メソッド、Parallel.ForEach メソッド、Parallel LINK での結果の画像を貼っておきます。どのようにしたかは下に記載したサンプルコードを見てください。それぞれ実装が違うようで、結果もそれぞれ異なっています。(結果の違いが判るだけで、具体的に中の動きがどう違った結果そうなるのかは分かりませんが)

(2) Parallel.Invoke メソッド

Parallel.Invoke メソッド

(3) Parallel.For メソッド

Parallel.For メソッド

(4) Parallel.ForEach メソッド

Parallel.ForEach メソッド

(5) Parallel LINK

Parallel LINK の場合、全体の実行時間が 42 秒前後となり、TPL と比べて 7 割弱増えてしまいました。実行中の挙動を見ていると、途中で一旦結果が表示されて止まってしまい、何秒かののち再開されて最後まで実行されるという感じです。

Parallel LINK

TPL とは実装が大きく異なるのでしょうか? 前のスレッドで書いたように await Task.Run(() => ... を使っても UI スレッドがブロックされるのは避けられませんでしたし。それとも自分の使い方が間違っているのでしょうか?

以下に上に書いた検証に使用したサンプルコードを記載しておきます。

using System;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Generic;
using System.Linq;

namespace ConsoleAppWenAllParallelFor
{
    class Program
    {
        static async Task Main(string[] args)
        {
            int minWorker; // ワーカースレッドの最小数(PC のコア数と同じ)
            int minIOC;    // 非同期 I/O スレッドの最小数
            ThreadPool.GetMinThreads(out minWorker, out minIOC);
            Console.WriteLine($"minWorker: {minWorker}, minIOC: {minIOC}");

            int maxWorker; // ワーカー スレッドの最大数
            int maxIOC;    // 非同期 I/O スレッドの最大数
            ThreadPool.GetMaxThreads(out maxWorker, out maxIOC);
            Console.WriteLine($"maxWorker: {maxWorker}, maxIOC: {maxIOC}");

            var prog = new Program();

            await prog.TaskRunAsync();
            //await prog.ParallelInvokeAsync();
            //await prog.ParallelForAsync();
            //await prog.ParralelForEachAsync();
            //await prog.PLinkAsync();
        }

        // 同期メソッド
        public string Work(int number)
        {
            int id = Thread.CurrentThread.ManagedThreadId;
            DateTime start = DateTime.Now;
            string retunVlaue = $"n = {number}, ThreadID = {id}" +
                                $", start: {start:ss.fff}, ";

            // ここで 3 秒遅延
            Task.Delay(3000).Wait();

            DateTime end = DateTime.Now;
            TimeSpan diff = start - end;
            retunVlaue += $"end: {end:ss.fff}, timespan: {diff:s\\.fff}";
            return retunVlaue;
        }

        // タスク (この記事の例では同期メソッド Work) を 100 個 Task.Run
        // メソッドでキューに配置する。OS がスレッドプールから適宜スレッ
        // ドを取得してキューのタスク実行。await Task.WhenAll ですべての
        // タスクの完了を待機する
        public async Task TaskRunAsync()
        {
            int id = Thread.CurrentThread.ManagedThreadId;
            DateTime start = DateTime.Now;
            Console.WriteLine($"Main Thread ID = {id}, " +
                $"TaskRunAsync 開始: {start:ss.fff}");
            string[] stringResults = new string[100];
            var taskList = new List<Task>();
            for (int i = 0; i < 100; i++)
            {
                int n = i;
                taskList.Add(Task.Run(() => stringResults[n] = Work(n)));
            }

            await Task.WhenAll(taskList);

            foreach (string result in stringResults)
            {
                Console.WriteLine(result);
            }

            DateTime end = DateTime.Now;
            TimeSpan diff = start - end;
            Console.WriteLine($"Main Thread ID = {id}, " +
                $"終了: {end:ss.fff}, 所要時間: {diff:s\\.fff}");
        }

        // タスク (この記事の例では同期メソッド Work) を Parallel.Invoke
        // を使って 100 個実行。Parallel.Invoke の機能により可能な限り
        // 並列実行されるはず
        public async Task ParallelInvokeAsync()
        {
            int id = Thread.CurrentThread.ManagedThreadId;
            DateTime start = DateTime.Now;
            Console.WriteLine($"Main Thread ID = {id}, " +
                $"ParallelInvokeAsync 開始: {start:ss.fff}");
            string[] stringResults = new string[100];
            Action[] actions = new Action[100];
            for (int i = 0; i < 100; i++)
            {
                int n = i;
                actions[n] = () => stringResults[n] = Work(n);
            }

            await Task.Run(() => Parallel.Invoke(actions));

            foreach (string result in stringResults)
            {
                Console.WriteLine(result);
            }

            DateTime end = DateTime.Now;
            TimeSpan diff = start - end;
            Console.WriteLine($"Main Thread ID = {id}, " +
                $"終了: {end:ss.fff}, 所要時間: {diff:s\\.fff}");
        }

        // Parallel.For を使って 100 個実行
        public async Task ParallelForAsync()
        {
            int id = Thread.CurrentThread.ManagedThreadId;
            DateTime start = DateTime.Now;
            Console.WriteLine($"Main Thread ID = {id}, " +
                $"ParallelForAsync 開始: {start:ss.fff}");
            string[] stringResults = new string[100];

            await Task.Run(() => Parallel.For(0, 100, 
                                 (n) => stringResults[n] = Work(n)));

            foreach (string result in stringResults)
            {
                Console.WriteLine(result);
            }

            DateTime end = DateTime.Now;
            TimeSpan diff = start - end;
            Console.WriteLine($"Main Thread ID = {id}, " +
                $"終了: {end:ss.fff}, 所要時間: {diff:s\\.fff}");
        }

        // Parallel.ForEach を使って 100 個実行
        public async Task ParralelForEachAsync()
        {
            int id = Thread.CurrentThread.ManagedThreadId;
            DateTime start = DateTime.Now;
            Console.WriteLine($"Main Thread ID = {id}, " +
                $"ParallelForEachAsync 開始: {start:ss.fff}");
            string[] stringResults = new string[100];

            await Task.Run(() => Parallel.ForEach(Enumerable.Range(0, 100),
                                 (n) => stringResults[n] = Work(n)));

            foreach (string result in stringResults)
            {
                Console.WriteLine(result);
            }

            DateTime end = DateTime.Now;
            TimeSpan diff = start - end;
            Console.WriteLine($"Main Thread ID = {id}, " +
                $"終了: {end:ss.fff}, 所要時間: {diff:s\\.fff}");
        }

        // PLINK を使って 100 個実行
        public async Task PLinkAsync()
        {
            int id = Thread.CurrentThread.ManagedThreadId;
            DateTime start = DateTime.Now;
            Console.WriteLine($"Main Thread ID = {id}, " +
                $"PLinkAsync 開始: {start:ss.fff}");

            var results = await Task.Run(() =>
                          Enumerable.Range(0, 100).AsParallel()
                          .Select(n => Work(n)));

            foreach (string result in results)
            {
                Console.WriteLine(result);
            }

            DateTime end = DateTime.Now;
            TimeSpan diff = start - end;
            Console.WriteLine($"Main Thread ID = {id}, " +
                $"終了: {end:ss.fff}, 所要時間: {diff:s\\.fff}");
        }
    }
}

2022/8/9 追記

.NET 6.0 で上のコードと同じアプリを作成し「(1) TPL, PLINK を使わない場合」と「(2) Parallel.Invoke メソッド」を実行した結果を以下に載せておきます。

Parallel.For, Parallel.ForEach の結果もほぼ同じです。Parallel Link は、理由不明ですが、改善なしでした。

スレッドプールから一度に取得できるスレッド数とそれを超えて追加でスレッドを取得する際に要する時間が改善されたようで、全体の実行時間が .NET 5.0 では 25 秒前後かかっていたものが .NET 6.0 では 9 秒前後と大幅に短縮されています。

(1) TPL, PLINK を使わない場合 (.NET 6.0)

並列化は OS 任せ (.NET 6.0)

(2) Parallel.Invoke メソッド (.NET 6.0)

Parallel.Invoke メソッド (.NET 6.0)

Tags: , , , , ,

.NET Framework

タスク並列ライブラリ (TPL)

by WebSurfer 2020年12月27日 14:53

タスク並列ライブラリ (TPL) は Windows Forms のような GUI アプリでは使い難い点があるということを今更ながらですが学びましたので、備忘録として書いておきます。

タスク並列ライブラリの検証

最近の PC の CPU はマルチコアが当たり前になっているようですが、PC のマルチコアを有効に使うプログラミングをサポートするため、.NET Framework 4.0 で導入されたタスク並列ライブラリ (TPL・・・Parallel.For とか Parallel.Invoke とか) を利用するという話を巷でよく耳にします。

マルチスレッドの基本的な話ですが、@IT の記事「第1回 マルチスレッドはこんなときに使う (1/2)」の「マルチスレッドの動作原理」セクションを見てください。リンク切れになると困るので画像だけ借用して以下に表示しておきます。

マルチスレッドの比較

② のようになると OS がスレッドを切り替えて処理を行うのでそのオーバーヘッドの分逆に遅くなるということになります。なので、マルチスレッドアプリで処置時間の短縮を図るなら ③ のようにマルチコアを利用できる環境が必要で、さらにマルチコアを有効に利用するプログラミングを行うという話になると思います。

その際に自分がよく聞くのがタスク並列ライブラリ (TPL) や Parallel LINQ (PLINQ) を使うという話です。Microsoft のドキュメント「.NET での並列プログラミング」や、ググるとヒットする記事例えば @IT の記事「ループをParallelクラスで並列処理にするには?」を読むと TPL, PLINQ は並列処理には万能のような気がしてました。

Microsoft のドキュメント「タスク並列ライブラリ (TPL)」には、

"TPL は、使用可能なすべてのプロセッサを最も効率的に使用するように、コンカレンシーの程度を動的に拡大します。The TPL scales the degree of concurrency dynamically to most efficiently use all the processors that are available."

・・・と言う記述がありますし、PC のマルチコアを有効に使うという局面に限れば最強のように思えます。

しかし、Windows Forms のような GUI アプリではそうでもなさそうな感じです。自分が気が付いた限りですが、以下の 2 点が問題だと思いました。(自分が回避策を知らないだけという可能性は否定できませんが )

  1. 並列に実行される複数のメソッドがすべて完了するまで UI がブロックされる。 ⇒ PLINQ 以外は async / await を使って回避できる方法がありました。下の【2021/6/13 追記】を見てください
  2. 並列実行するメソッドには非同期版は使えない。

以下のサンプルコードの ParallelInvoke_Click メソッドは Parallel.Invoke を使って 5 つの同期版メソッド Work を並列に実行するものですが、5 つのメソッドがすべて完了するまで UI がブロックされるので、アプリはフリーズしたようになります。その他の TPL, PLINQ を使った xxxxx_Click メソッドも同様で、完了するまで UI がブロックされます。それを回避する手段は、自分が探した限りですが、なさそうです。

さらに、非同期版メソッド WorkAsync は使えません。詳しくはサンプルコードのコメントに書きましたので見てください。なので、ライブラリなどで非同期版メソッドしか提供されてない場合は何ともならないと思われます。

というわけで、下のサンプルコードの WhenAll1_Click, WhenAll2_Click メソッドのように、Parallel.Invoke など使わないで、await Task.WhenAll(...) で待機するようにし、並列化については OS に任せるのが良さそうと思いました。(実際にサンプルを動かしてみると並列化してくれているような感じはしました。② のようになっている可能性は否定しきれませんが)

非同期版メソッド WorkAsync も使えます。下のサンプルコードの WhenAll2_Click メソッドを見てください。上の画像がその実行結果です。(ThreadID が 1 で UI スレッドと同じなのは、WorkAsync メソッドの中で ManagedThreadId を取得するのが UI スレッドだからです)

using System;
using System.Windows.Forms;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Generic;
using System.Linq;

namespace WinFormsApp1
{
    public partial class TaskParallelLibrary : Form
    {
        public TaskParallelLibrary()
        {
            InitializeComponent();
        }

        // 同期版メソッド
        private string Work(int n)
        {
            int id = Thread.CurrentThread.ManagedThreadId;
            string retunVlaue =
                $"n = {n}, ThreadID = {id}, start:{DateTime.Now:ss.fff}, ";

            Thread.Sleep(3000);

            retunVlaue += $"end:{DateTime.Now:ss.fff}";

            return retunVlaue;
        }

        // 非同期版メソッド
        private async Task<string> WorkAsync(int n)
        {
            int id = Thread.CurrentThread.ManagedThreadId;
            string retunVlaue =
                $"n = {n}, ThreadID = {id}, start: {DateTime.Now:ss.fff}, ";

            await Task.Delay(3000);

            retunVlaue += $"end: {DateTime.Now:ss.fff}";

            return retunVlaue;
        }

        // 非同期版メソッド 5 つを for ループで逐次実行
        private async void InOrder_Click(object sender, EventArgs e)
        {
            int id = Thread.CurrentThread.ManagedThreadId;
            this.label1.Text = $"UI Thread ID = {id}\r\n";

            for (int n = 0; n < 5; n++)
            {
                this.label1.Text += await WorkAsync(n) + "\r\n";
            }

            this.label1.Text += "完了";
        }

        // 同期版メソッド 5 つを Parallel.Invoke で実行
        // Executes each of the provided actions, possibly in parallel.
        // 終了まで UI はブロックされる
        private void ParallelInvoke_Click(object sender, EventArgs e)
        {
            int id = Thread.CurrentThread.ManagedThreadId;
            this.label1.Text = $"UI Thread ID = {id}\r\n";

            string[] results = new string[5];

            Parallel.Invoke(
                () => results[0] = Work(0),
                () => results[1] = Work(1),
                () => results[2] = Work(2),
                () => results[3] = Work(3),
                () => results[4] = Work(4));

            foreach (string result in results)
            {
                this.label1.Text += result + "\r\n";
            }

            this.label1.Text += "完了";

            // 非同期版メソッド WorkAsync は使えない

            // Task.Result でデッドロックになる
            //Task<string> result1 = null, result2 = null;
            //Parallel.Invoke(
            //    () => result1 = WorkAsync(0),
            //    () => result2 = WorkAsync(1));
            //this.label1.Text += result1.Result + "\r\n";
            //this.label1.Text += result2.Result + "\r\n";

            // await で待つことなく終わってしまう
            //string result1 = "", result2 = "";
            //Parallel.Invoke(
            //    async () => result1 = await WorkAsync(0),
            //    async () => result2 = await WorkAsync(1));
            //this.label1.Text += result1 + "\r\n";
            //this.label1.Text += result2 + "\r\n";

        }

        // 同期版メソッド 5 つを Parallel.For で実行
        // Executes a for loop in which iterations may run in parallel.
        // 終了まで UI はブロックされる
        private void ParallelFor_Click(object sender, EventArgs e)
        {
            int id = Thread.CurrentThread.ManagedThreadId;
            this.label1.Text = $"UI Thread ID = {id}\r\n";

            string[] results = new string[5];

            Parallel.For(0, 5, n => results[n] = Work(n));

            foreach (string result in results)
            {
                this.label1.Text += result + "\r\n";
            }

            this.label1.Text += "完了";
        }

        // 同期版メソッド 5 つを Parallel.ForEach で実行
        // Executes a foreach operation in which iterations may run in parallel.
        // 終了まで UI はブロックされる
        private void ParallelForEach_Click(object sender, EventArgs e)
        {
            int id = Thread.CurrentThread.ManagedThreadId;
            this.label1.Text = $"UI Thread ID = {id}\r\n";

            string[] results = new string[5];

            Parallel.ForEach(Enumerable.Range(0, 5), 
                             n => results[n] = Work(n));

            foreach (string result in results)
            {
                this.label1.Text += result + "\r\n";
            }

            this.label1.Text += "完了";
        }

        // 同期版メソッド 5 つを PLINQ で実行
        // PLINQ は、システムのすべてのプロセッサを十分に活用しようとする。
        // そのために、データ ソースをセグメントにパーティション分割し、
        // 複数のプロセッサで個々のワーカー スレッドの各セグメントに対して
        // クエリを並行実行します。
        // 終了まで UI はブロックされる
        private void PLINQ_Click(object sender, EventArgs e)
        {
            int id = Thread.CurrentThread.ManagedThreadId;
            this.label1.Text = $"UI Thread ID = {id}\r\n";

            var results = Enumerable.Range(0, 5).AsParallel().
                          Select(n => Work(n));

            foreach (string result in results)
            {
                this.label1.Text += result + "\r\n";
            }

            this.label1.Text += "完了";
        }

        // ***** 以下 TPL, PLINQ に代えて Task.WhenAll を使用 *****

        // 同期版メソッド 5 つを Task.Run で実行、
        // await Task.WhenAll で待機        
        private async void WhenAll1_Click(object sender, EventArgs e)
        {
            int id = Thread.CurrentThread.ManagedThreadId;
            this.label1.Text = $"UI Thread ID = {id}\r\n";

            string[] results = new string[5];
            var taskList = new List<Task>();
            for (int n = 0; n < 5; n++)
            {
                int i = n;
                taskList.Add(Task.Run(() => results[i] = Work(i)));
            }

            // WaitAll ではブロックされてしまう。
            // WhenAll でないと await で待機できないので注意
            await Task.WhenAll(taskList.ToArray());

            foreach (string result in results)
            {
                this.label1.Text += result + "\r\n";
            }

            this.label1.Text += "完了";
        }

        // 非同期版メソッド 5 つを実行、
        // await Task.WhenAll で待機
        private async void WhenAll2_Click(object sender, EventArgs e)
        {
            int id = Thread.CurrentThread.ManagedThreadId;
            this.label1.Text = $"UI Thread ID = {id}\r\n";

            Task<string>[] results = new Task<string>[5];

            for (int n = 0; n < 5; n++)
            {
                int i = n;
                results[i] = WorkAsync(i);
            }

            // WaitAll ではデッドロックになる
            await Task.WhenAll(results);

            foreach (Task<string> result in results)
            {
                this.label1.Text += result.Result + "\r\n";
            }

            this.label1.Text += "完了";
        }
    }
}

-----【2021/6/13 追記】-----

UI スレッドがブロックされるのを回避する方法ですが、Parallel LINQ (PLINQ) 以外は、.NET Framework 4.5 から導入された async / await を使って TPL の部分を await Task.Run で動かすと UI スレッドはブロックされず、メッセージループでマウスのクリックやキーボードのストロークなどのユーザーイベントが処理されることが分かりました。

具体的には、Parallel.For を使った場合を例に取ると、以下のようにします。

private async void ParallelFor_Click(object sender, EventArgs e)
{
    int id = Thread.CurrentThread.ManagedThreadId;
    this.label1.Text = $"UI Thread ID = {id}\r\n";

    string[] results = new string[5];

    await Task.Run(() => Parallel.For(0, 5, n => results[n] = Work(n)));

    foreach (string result in results)
    {
        this.label1.Text += result + "\r\n";
    }

    this.label1.Text += "完了";
}

ただ、async / await が使えるなら、上のコード例の await Task.WhenAll で待機することもできますので、TPL は使わなくても良いのではないかという気はします。上に書いた "TPL は、使用可能なすべてのプロセッサを最も効率的に使用するように、コンカレンシーの程度を動的に拡大します" という点に意味があるのかもしれませんが。

なお、Parallel LINQ (PLINQ) の方は以下のようにしてみたのですが UI スレッドがブロックされるのは回避できないようで、マウスのクリックなどのユーザーイベントには無反応(フリーズ状態)のままになります。

var results = await Task.Run(() => Enumerable.Range(0, 5)
                                   .AsParallel()
                                   .Select(n => Work(n)));

PLINQ では回避できない理由や対応策は分かりません。今後の検討課題ということで。

Tags: , , , , ,

.NET Framework

About this blog

2010年5月にこのブログを立ち上げました。主に ASP.NET Web アプリ関係の記事です。

Calendar

<<  2024年4月  >>
31123456
78910111213
14151617181920
21222324252627
2829301234
567891011

View posts in large calendar