WebSurfer's Home

トップ > Blog 1   |   Login
Filter by APML

タスク並列ライブラリ (TPL) その 2

by WebSurfer 20. July 2021 14:11

先の記事「タスク並列ライブラリ (TPL)」で、TPL に代えて、複数のタスクをスレッドプールで実行するように設定し、並列化については OS 任せにするコードを紹介しました。

TPL を使った場合は "使用可能なすべてのプロセッサを最も効率的に使用するようにコンカレンシーの程度を動的に拡大" するそうですが、それとの違いを調べてみました。(独断&自分流の調べ方なのでハズレがあるかも)

(1) TPL, PLINK を使わない場合

下の画像は、TPL, PLINK は使わないで、Task.Delay(3000).Wait() で 3 秒遅延するコードを含む同期メソッドを、Task.Run メソッド を使って 100 個キューに配置し、終了を await Task.WhenAll(...) で待機した結果です。

並列化は OS 任せ

コードはこの記事の下の方に記載したサンプルコードを見てください。その中で、実行対象の同期メソッドが Work、それを 100 個キューに置いてスレッドプールで実行するのが TaskRunAsync メソッドです。

環境は Windows 10 Pro 64-bit、Core i7-9700K 8 コア 8 論理プロセッサ、Visual Studio 2019、.NET 5.0 のコンソールアプリです。

で、TPL を使った場合との比較ですが、同じ同期メソッド Work を 100 個 Parallel.Invoke, Parallel.For, Parallel.ForEach で実行した結果と比べると、全体の実行時間はどれも 25 秒前後でほとんど違いはなかったです。(なぜか Parallel LINK は後述するように期待外れでした)

唯一気になった違いは、上の画像の n = 2, n = 4, n = 18 のように必要以上の時間スレッドを解放できないケースがあるということです。TPL にはそれは無かったです。全体の実行時間が TPL と比べて 1 ~ 2 秒遅かったのはそのせいかもしれません。問題の種を含んでいるということなのでしょうか。

ほかに興味深かったのは、PC のコア数(画像の minWorker: 8 がそれ)まではスレッドプールから一気にスレッドを取得できるが(n = 0 ~ 7)、さらにスレッドを取得しようとすると少し時間がかかる(n = 8, 9, 10, 11)ということでした。

それは CLR スレッドプールの仕様らしいです。ネットで見つけた記事「ThreadPool Growth: Some Important Details」に書いてありましたが 500ms かかるとのことです。(Microsoft の公式文書は見つけられていませんが結果を見る限り間違いなさそう)

500ms の制限は TPL を使用しても同じらしいです。なので、TPL を使用するしないにかかわらず、スレッドプールのスレッドをバースト的に多数使用する場合は設定を変更するのが良さそうです。(設定方法は上に紹介した記事に書いてあります)

以下に、Parallel.Invoke メソッド、Parallel.For メソッド、Parallel.ForEach メソッド、Parallel LINK での結果の画像を貼っておきます。どのようにしたかは下に記載したサンプルコードを見てください。それぞれ実装が違うようで、結果もそれぞれ異なっています。(結果の違いが判るだけで、具体的に中の動きがどう違った結果そうなるのかは分かりませんが)

(2) Parallel.Invoke メソッド

Parallel.Invoke メソッド

(3) Parallel.For メソッド

Parallel.For メソッド

(4) Parallel.ForEach メソッド

Parallel.ForEach メソッド

(5) Parallel LINK

Parallel LINK の場合、全体の実行時間が 42 秒前後となり、TPL と比べて 7 割弱増えてしまいました。実行中の挙動を見ていると、途中で一旦結果が表示されて止まってしまい、何秒かののち再開されて最後まで実行されるという感じです。

Parallel LINK

TPL とは実装が大きく異なるのでしょうか? 前のスレッドで書いたように await Task.Run(() => ... を使っても UI スレッドがブロックされるのは避けられませんでしたし。それとも自分の使い方が間違っているのでしょうか?

以下に上に書いた検証に使用したサンプルコードを記載しておきます。

using System;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Generic;
using System.Linq;

namespace ConsoleAppWenAllParallelFor
{
    class Program
    {
        static async Task Main(string[] args)
        {
            int minWorker; // ワーカースレッドの最小数(PC のコア数と同じ)
            int minIOC;    // 非同期 I/O スレッドの最小数
            ThreadPool.GetMinThreads(out minWorker, out minIOC);
            Console.WriteLine($"minWorker: {minWorker}, minIOC: {minIOC}");

            int maxWorker; // ワーカー スレッドの最大数
            int maxIOC;    // 非同期 I/O スレッドの最大数
            ThreadPool.GetMaxThreads(out maxWorker, out maxIOC);
            Console.WriteLine($"maxWorker: {maxWorker}, maxIOC: {maxIOC}");

            var prog = new Program();

            await prog.TaskRunAsync();
            //await prog.ParallelInvokeAsync();
            //await prog.ParallelForAsync();
            //await prog.ParralelForEachAsync();
            //await prog.PLinkAsync();
        }

        // 同期メソッド
        public string Work(int number)
        {
            int id = Thread.CurrentThread.ManagedThreadId;
            DateTime start = DateTime.Now;
            string retunVlaue = $"n = {number}, ThreadID = {id}" +
                                $", start: {start:ss.fff}, ";

            // ここで 3 秒遅延
            Task.Delay(3000).Wait();

            DateTime end = DateTime.Now;
            TimeSpan diff = start - end;
            retunVlaue += $"end: {end:ss.fff}, timespan: {diff:s\\.fff}";
            return retunVlaue;
        }

        // タスク (この記事の例では同期メソッド Work) を 100 個 Task.Run
        // メソッドでキューに配置する。OS がスレッドプールから適宜スレッ
        // ドを取得してキューのタスク実行。await Task.WhenAll ですべての
        // タスクの完了を待機する
        public async Task TaskRunAsync()
        {
            int id = Thread.CurrentThread.ManagedThreadId;
            DateTime start = DateTime.Now;
            Console.WriteLine($"Main Thread ID = {id}, " +
                $"TaskRunAsync 開始: {start:ss.fff}");
            string[] stringResults = new string[100];
            var taskList = new List<Task>();
            for (int i = 0; i < 100; i++)
            {
                int n = i;
                taskList.Add(Task.Run(() => stringResults[n] = Work(n)));
            }

            await Task.WhenAll(taskList);

            foreach (string result in stringResults)
            {
                Console.WriteLine(result);
            }

            DateTime end = DateTime.Now;
            TimeSpan diff = start - end;
            Console.WriteLine($"Main Thread ID = {id}, " +
                $"終了: {end:ss.fff}, 所要時間: {diff:s\\.fff}");
        }

        // タスク (この記事の例では同期メソッド Work) を Parallel.Invoke
        // を使って 100 個実行。Parallel.Invoke の機能により可能な限り
        // 並列実行されるはず
        public async Task ParallelInvokeAsync()
        {
            int id = Thread.CurrentThread.ManagedThreadId;
            DateTime start = DateTime.Now;
            Console.WriteLine($"Main Thread ID = {id}, " +
                $"ParallelInvokeAsync 開始: {start:ss.fff}");
            string[] stringResults = new string[100];
            Action[] actions = new Action[100];
            for (int i = 0; i < 100; i++)
            {
                int n = i;
                actions[n] = () => stringResults[n] = Work(n);
            }

            await Task.Run(() => Parallel.Invoke(actions));

            foreach (string result in stringResults)
            {
                Console.WriteLine(result);
            }

            DateTime end = DateTime.Now;
            TimeSpan diff = start - end;
            Console.WriteLine($"Main Thread ID = {id}, " +
                $"終了: {end:ss.fff}, 所要時間: {diff:s\\.fff}");
        }

        // Parallel.For を使って 100 個実行
        public async Task ParallelForAsync()
        {
            int id = Thread.CurrentThread.ManagedThreadId;
            DateTime start = DateTime.Now;
            Console.WriteLine($"Main Thread ID = {id}, " +
                $"ParallelForAsync 開始: {start:ss.fff}");
            string[] stringResults = new string[100];

            await Task.Run(() => Parallel.For(0, 100, 
                                 (n) => stringResults[n] = Work(n)));

            foreach (string result in stringResults)
            {
                Console.WriteLine(result);
            }

            DateTime end = DateTime.Now;
            TimeSpan diff = start - end;
            Console.WriteLine($"Main Thread ID = {id}, " +
                $"終了: {end:ss.fff}, 所要時間: {diff:s\\.fff}");
        }

        // Parallel.ForEach を使って 100 個実行
        public async Task ParralelForEachAsync()
        {
            int id = Thread.CurrentThread.ManagedThreadId;
            DateTime start = DateTime.Now;
            Console.WriteLine($"Main Thread ID = {id}, " +
                $"ParallelForEachAsync 開始: {start:ss.fff}");
            string[] stringResults = new string[100];

            await Task.Run(() => Parallel.ForEach(Enumerable.Range(0, 100),
                                 (n) => stringResults[n] = Work(n)));

            foreach (string result in stringResults)
            {
                Console.WriteLine(result);
            }

            DateTime end = DateTime.Now;
            TimeSpan diff = start - end;
            Console.WriteLine($"Main Thread ID = {id}, " +
                $"終了: {end:ss.fff}, 所要時間: {diff:s\\.fff}");
        }

        // PLINK を使って 100 個実行
        public async Task PLinkAsync()
        {
            int id = Thread.CurrentThread.ManagedThreadId;
            DateTime start = DateTime.Now;
            Console.WriteLine($"Main Thread ID = {id}, " +
                $"PLinkAsync 開始: {start:ss.fff}");

            var results = await Task.Run(() =>
                          Enumerable.Range(0, 100).AsParallel()
                          .Select(n => Work(n)));

            foreach (string result in results)
            {
                Console.WriteLine(result);
            }

            DateTime end = DateTime.Now;
            TimeSpan diff = start - end;
            Console.WriteLine($"Main Thread ID = {id}, " +
                $"終了: {end:ss.fff}, 所要時間: {diff:s\\.fff}");
        }
    }
}

Tags: , , , , ,

.NET Framework

Parallel.For ループのキャンセル

by WebSurfer 17. July 2021 14:53

タスク並列ライブラリ (TPL) の Parallel.For メソッドの複数の並列処理をキャンセルするコードを書いているときにハマって悩んだので、今後そういうことがないよう備忘録を書いておきます。

Parallel.For ループのキャンセル

ちなみにコードはこの記事の下の方に記載したもので、Microsoft のドキュメント「方法: Parallel.For または ForEach ループを取り消す」を参考にキャンセル処置を実装しました。Windows Forms アプリなので UI スレッドをブロックしないようにしている点が違いますが、基本的には同じです。

で、何にハマったかと言うと、Visual Studio から[デバッグ(D)]⇒[デバッグの開始(S)]でアプリを実行すると try - catch で OperationCanceledException を捕捉できないということです

下の画像を見てください。try - catch 構文で try 句内で発生した OperationCanceledException を catch 句があるにもかかわらず捕捉できていません。(実はそこは思い違いだったのですが。詳細後述)

OperationCanceledException

Visual Studio から[デバッグ(D)]⇒[デバッグなしで開始(H)]で実行すれば上の画像のようなことは起こらず、catch 句で OperationCanceledException を捕捉できます。ということは、先の記事「不正なクロススレッドコールの捕捉」の話と同様にデバッグ実行でないと検出できない不正な何かがあると思い込んでいました。

なので、Unhandled OperationCanceledException when thrown from Parallel.ForEach に書いてあるように ThrowIfCancellationRequested メソッドを try - catch で囲ったり、await Task.Run( async () => ... と async を付与してデバッグ実行でも catch できるように対応してみました。

でも、実はそんなことをする必要はなかったです。(汗) 上の画像は、Visual Studio がデバッグ時にユーザーに便宜(?)を図るために、例外が発生した場所で一旦実行を止めて知らせたのだそうです。続行すれば catch 句まで進んで OperationCanceledException を補足できます。

そのことは Microsoft のドキュメント「例外処理(タスク並列ライブラリ)」の「注意」に以下のように書いてありました:

"[マイ コードのみ] が有効になっている場合、Visual Studio では、例外をスローする行で処理が中断され、"ユーザー コードで処理されない例外" に関するエラー メッセージが表示されることがあります。このエラーは問題にはなりません。 F5 キーを押して続行し、以下の例に示す例外処理動作を確認できます。 Visual Studio による処理が最初のエラーで中断しないようにするには、 [ツール] メニューの[オプション]、[デバッグ] の順にクリックし、[全般] で [マイ コードのみを有効にする] チェック ボックスをオフにします"

試してみましたが確かにその通りでした。

なお、すべてのケースで例外が発生した場所で一旦実行を止めて知らせるというわけではなくて、ある条件の時に限るようです。ある条件とは、多分、呼び出し元と別のスレッドで実行されているタスクで例外がスローされたが、その例外を呼び出し元で catch できるか不明な時ではないかと思われます (だから async を付与すると解決した?)。

検証に使った Windows Forms アプリのコードを以下に載せておきます。デバッグ実行してキャンセルをかけると上の画像のように例外が発生した場所で一旦止まります。

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using System.Windows.Forms;

namespace WinFormsTPL
{
    public partial class Form2 : Form
    {
        private int currentProgress = 0;
        private CancellationTokenSource cts;

        public Form2()
        {
            InitializeComponent();

            toolStripStatusLabel1.Text = "";
            toolStripProgressBar1.Value = 0;
        }

        // 進捗をプログレスバーとラベルに表示するコールバック。UIスレッド
        // で呼び出される
        // 【注】Parallel.For は同期メソッドなので、下のコード例のように
        // await Task.Run(() => Parallel.For ...  を使って UI スレッドを
        // ブロックしないようにすること。でないと ShowProgress はキューに
        // 溜るだけで、Parallel.For が終了してから一気に 100% になりプロ
        // グレス表示にならない。
        private void ShowProgress(int percent)
        {
            currentProgress += percent;
            toolStripStatusLabel1.Text = currentProgress + "%完了";
            toolStripProgressBar1.Value = currentProgress;
        }     

        // Parallel.For で複数並列に実行する同期メソッド
        private string  WorkProgress(int number, IProgress<int> progress)
        {
            int id = Thread.CurrentThread.ManagedThreadId;
            string retunVlaue = $"n = {number}, ThreadID = {id}" +
                                $", start: {DateTime.Now:ss.fff}, ";

            // ここで 3 秒中断
            Thread.Sleep(3000);

            // 進捗をプログレスバーとラベルに表示
            progress.Report(1);

            retunVlaue += $"end: {DateTime.Now:ss.fff}\r\n";
            return retunVlaue;
        }

        // 画像の [ParallelForProgress] クリックのハンドラ
        // 上の WorkProgress メソッドを Parallel.For で 100 並列実行
        private async void ParallelForProgress_Click(object sender, EventArgs e)
        {
            currentProgress = 0;
            toolStripStatusLabel1.Text = "";
            toolStripProgressBar1.Value = 0;

            int id = Thread.CurrentThread.ManagedThreadId;
            label1.Text = $"UI Thread ID = {id}\r\n";

            // CancellationToken を ParallelOptions 経由で Parallel.For
            // に渡すため、ParallelOptions を初期化
            var option = new ParallelOptions();

            // WorkProgress の戻り値を保持する配列の定義と初期化
            string[] results = new string[100];

            using (cts = new CancellationTokenSource())
            {
                option.CancellationToken = cts.Token;
                var p = new Progress<int>(ShowProgress);

                try
                {
                    // Parallel.For は同期メソッドであることに注意。
                    // UI スレッドをブロックしないよう await Task.Run
                    // を用いてスレッドプールで Parallel.For を実行
                    await Task.Run(() => Parallel.For(0, 100, option,
                        (n) => {
                            results[n] = WorkProgress(n, p);

                            // 以下は無くてもキャンセルされるが、Microsoft
                            // のドキュメントに従って入れておく
                            option.CancellationToken
                                  .ThrowIfCancellationRequested();

                        }), cts.Token);
                }
                catch (OperationCanceledException)
                {
                    toolStripStatusLabel1.Text = "キャンセル";
                }
            }

            foreach (string result in results)
            {
                label1.Text += result;
            }

            // using を抜けて CancellationTokenSource が Dispose されても
            // すぐには null にならないので、再度キャンセルボタンをクリック
            // すると cts.Cancel() で例外がスローされる。その対応
            cts = null;
        }

        // 画像の [Cancel] クリックのハンドラ
        private void Cancel_Click(object sender, EventArgs e)
        {
            if (cts == null) return;

            cts.Cancel();
        }
    }
}

環境は Windows 10 v21H1, Visual Studio Cummunity 2019 v16.10.3 で .NET Framework 4.8 および .NET 5.0 の両方で試しました。

Tags: , ,

.NET Framework

HttpClient のキャンセルは要求の中断に相当? (CORE)

by WebSurfer 13. July 2021 12:31

下の画像のシステムで、クライアントがブラウザを操作して要求を中断した場合、Web API でのサーバーで実行中の処理をキャンセルできるでしょうか? 自分が検証した限りではできるようです。その詳細を以下に書きます。

システム構成

クライアントがブラウザを使って MVC にアクセスすると、MVC のサーバーは HttpClient クラスを使って Web API にアクセスして必要な情報を取得し、ブラウザに応答として返すというシステムです。

クライアントが要求を送信した後待ちきれなくなって、サーバーによる処理が終わって応答が返ってくる前に要求を中断した場合、それ以上サーバーのリソースを消費しないで済むよう、サーバー側の処理を MVC でも Web API でも中断できるかがポイントです。

なお、クライアントによる要求の中断とは、下の画像の赤丸印の中のブラウザの ✕ ボタンをクリックするとか Esc キーを押す、Ajax を使っての要求の場合は XMLHttpRequest.abort() メソッドを実行することを意味します。

要求の中断

先の記事「要求の中断による処理のキャンセル (CORE)」に書きましたように、処理のキャンセルには HttpContext.RequestAborted プロパティで取得できる CancellationToken を利用します。クライアントが要求を中断すると、取得した CancellationToken がキャンセル通知を配信しますので、それをリッスンして処理の中断を行います。

ブラウザ ⇔ MVC の間は、先の記事に書いたように、MVC のアクションメソッドの引数に渡された CancellationToken によるキャンセル通知を利用して MVC のサーバー内での処理を中断できます。

その先の MVC ⇔ Web API の間は MVC のサーバーから HttpClient クラスを利用して Web API にアクセスするというシステムですが、そこがどうできるかをこの記事の下の方に載せた検証用のコードを使って調べてみました。

MVC のアクションメソッドでは次のようにします。HttpClient クラスの SendAsync メソッドや PostAsync メソッドには引数に CancellationToken を取るオーバーロードがあるので、それに HttpContext.RequestAborted プロパティで取得できる CancellationToken を渡します。そうすることで、クライアントによる要求の中断で SendAsync メソッドや PostAsync メソッドの実行をキャンセルできます。

Web API のアクションメソッドの引数にも Web API のサーバー内で HttpContext.RequestAborted プロパティで取得できる CancellationToken を渡します。そのキャンセル通知をリッスンして処理を中断します。

そうした場合、MVC のサーバー内で SendAsync メソッドや PostAsync メソッドの実行がキャンセルされると、Web API のアクションメソッドの引数に渡した CancellationToken はキャンセル通知を配信してくれるかが問題です。

下に載せたコードで検証した結果 Web API の CancellationToken もキャンセル通知を配信してくれることが分かりました。

なので、ブラウザ ⇔ MVC ⇔ Web API という構成でも、適切に CancellationToken を渡してキャンセル通知で処理の中断を行う実装をしておけば、ブラウザで要求を中断しても、MVC でも Web API でもサーバー内の処理を中断できるようです。

参考に検証に使った MVC および Web API のコードを以下に載せておきます。.NET 5.0 の ASP.NET Core アプリで MVC と Web API のプロジェクトは異なります (検証の際のホストが異なりますので、HttpContext.RequestAborted プロパティで取得できる CancellationToken は MVC と Web API で違うものになります)。

MVC(MvcCore5App4)

using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Logging;
using MvcCore5App4.Models;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using System.Net.Http;

namespace MvcCore5App4.Controllers
{
    public class HomeController : Controller
    {        
        private readonly ILogger<HomeController> _logger;
        private readonly IHttpClientFactory _clientFactory;

        public HomeController(ILogger<HomeController> logger,
                              IHttpClientFactory clientFactory)
        {
            _logger = logger;
            _clientFactory = clientFactory;
        }

        // ・・・中略・・・

        public async Task<IActionResult> Cancel(CancellationToken token)
        {
            HttpClient client = _clientFactory.CreateClient();
            var url = "https://localhost:44398/api/values";

            // GET 要求する場合はこちら
            //var request = new HttpRequestMessage(HttpMethod.Get, url);
            //HttpResponseMessage response = 
            //                await client.SendAsync(request, token);

            // POST 要求する場合はこちら
            HttpResponseMessage response =
                            await client.PostAsync(url, null, token);

            if (response.IsSuccessStatusCode)
            {
                string result = 
                    await response.Content.ReadAsStringAsync(token);
                ViewBag.Result = result;
            }

            return View();
        }
    }
}

Web API (MvcCore5App2)

using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Logging;
using System.Threading;
using System.Threading.Tasks;
using System;

namespace MvcCore5App2.Controllers
{
    [Route("api/[controller]")]
    [ApiController]
    public class ValuesController : ControllerBase
    {
        private readonly ILogger<ValuesController> _logger;

        public ValuesController(ILogger<ValuesController> logger)
        {
            _logger = logger;
        }

        [HttpGet]
        public async Task<IActionResult> Get(CancellationToken token)
        {
            _logger.LogInformation($"start: {DateTime.Now:ss.fff}");
            await Task.Delay(5000, token);
            _logger.LogInformation($"end: {DateTime.Now:ss.fff}");
            return Ok("GET 処理完了");
        }

        [HttpPost]
        public async Task<IActionResult> Post(CancellationToken token)
        {
            _logger.LogInformation($"start: {DateTime.Now:ss.fff}");
            await Task.Delay(5000, token);
            _logger.LogInformation($"end: {DateTime.Now:ss.fff}");
            return Ok("POST 処理完了");
        }
    }
}

なお、上記は IIS をリバースプロキシに使ってのインプロセス ホスティング モデルに限った話ですので注意してください。

先の記事「要求の中断による処理のキャンセル (CORE)」に書きましたように、アウトプロセス ホスティング モデルや Linux 系の OS で Nginx とか Apache をリバースプロキシに使う場合は CancellationToken のキャンセル通知を配信できませんので、サーバーでの処理の中断はできません。

それから、データベースサーバーを相手にする場合、Entity Framework で使う ToListAsync とか SaveChangesAsync などではどうなるかですが、そこはまだ調べ切れていません。走り出したらキャンセルは効かないということもあるかもしれません。今後の検討課題にしたいと思います。

Tags: , , ,

CORE

About this blog

2010年5月にこのブログを立ち上げました。その後 ブログ2 を追加し、ここは ASP.NET 関係のトピックス、ブログ2はそれ以外のトピックスに分けました。

Calendar

<<  October 2021  >>
MoTuWeThFrSaSu
27282930123
45678910
11121314151617
18192021222324
25262728293031
1234567

View posts in large calendar