内製アプリで100ファイルくらいダウンロードするのに3つ並列でダウンロードとかしたかったもののあんまりいい方法が無かったので備忘録がてら残しておきます。
基本的にはTaskを使って実装することになりますが、そのまま並列化してしまうとすごい数の並列ダウンロードになってしまうので工夫が必要となります。
検証環境
項目 | 詳細 |
---|---|
OS | Windows 10 Pro x64 20H2 |
.Net | .Net Core 3.1 |
Microsoft.Xaml.Behaviors.Wpf | 1.1.31 |
Prism.Core | 8.1.97 |
ReactiveProperty | 7.11.0 |
はじめに
通常ダウンロード処理はGUIだとTaskで行うかと思いますが、個数が決まった並列ダウンロードとなるとちょっと工夫が必要です。
Task自体上限があるので数が少ないならそれに合わせるのも手ですが、Taskの上限はかなり大きい(環境依存だと思いますが当環境では32767スレッドが上限)ので自身が保有するサーバでない限りは攻撃になりかねないのと、多分かなり遅くなるので3つくらいの並列に抑えたいところ。
そこで、WebClientをプールし、空いているWebClientがある時のみそのWebClientを使ってダウンロードが行われる仕組みで実装してみます。
完成目標とサンプル
今回は不特定多数が見るサンプルということで、実際にURLにアクセスするとなると色々とまずいのでダミーWebClientを作成してスレッドを待機させる方法でダウンロード待ちを再現してみました。
で、実際のURLを入れてもダウンロードしないのでいいのですが、わかりづらいので連番の数字にしました。
例えば、こんな感じでURL(数字だけど)を食わせると
1 2 3 4 ... 17 18 19 20
こんな感じで3つずつ並列でダウンロードが行われます。
1つ終われば1つダウンロードが開始し、2つ終われば2つ開始します。
設定次第で5でも10でも並列可能です。
Start 1 Start 2 Start 3 Finished 2 Start 4 Finished 4 Start 5 Finished 1 Finished 3 ... Start 17 Finished 13 Start 18 Finished 18 Start 19 Finished 15 Start 20 Finished 20 Finished 17 Finished 19 Finished All.
今回のサンプルもGithubのサンプルに置いているので全ソースはこちらから
WebClientPool - Github
今後記事は更新せずにサイレント更新するかもしれないので正式版はGithubで。
実装
* 例外処理がうまくできない問題を修正しました。
IWebClientInfoインタフェース
まずは処理で使うインタフェースから。
内部で使うIdと非同期タスクを実行するメソッドを定義します。
総称型使ってるのはテストで使えるようにやってるだけでWebClientクラスだけでいいならTじゃなくてもOK。
public interface IWebClientInfo<out T> { int Id { get; } Task StartTask(Action<T> callback); }
WebClientInfoクラス
続いて先程のインタフェースを継承したクラスです。
WebClientオブジェクトで使用中かどうかの判定が必要なのでIdとIsBusyプロパティを追加してラップします。
あとはコンストラクタでは所有者のプールと、プール上の内部IDとTのオブジェクトを受けてフィールドとプロパティに設定します。
StartTaskメソッドでは渡されたアクションに対して新規Taskを実行して、終了し次第プールに自身を返却します。
public class WebClientInfo<T> : IWebClientInfo<T> where T : IDisposable, new() { #region Fields private readonly WebClientPool<T> _pool; #endregion #region Properties public int Id { get; } public T Client { get; } public bool IsBusy { get; set; } #endregion public WebClientInfo(WebClientPool<T> pool, int id, T client) { _pool = pool; Id = id; Client = client; } public Task StartTask(Action<T> callback) { var task = Task.Factory.StartNew(() => callback(Client)); task.ContinueWith(_ => ReturnToPool()); return task; } public override string ToString() { return $"Id: {Id}, IsBusy: {IsBusy}"; } }
WebClientPoolクラス
最後は先程のWebClientInfoクラスをプールするクラスです。
コンストラクタ
まず、コンストラクタではプールするサイズ分ループして予めWebClientクラス(今回は総称型のT)のインスタンスを生成し、それに対して後処理をした後にWebClientInfoクラスのインスタンスを生成します。
あとはそれをコレクションに保持しておきます。
今回はコレクション自体への変更はありませんが、一応ImmutableListクラスで不変を明示しておきます。
.Net FrameworkならReadOnlyCollectionクラスあたりで代用できます。
private readonly ImmutableList<WebClientInfo<T>> _clients; public WebClientPool(int size, Action<T> postProcessing) { var clients = new List<WebClientInfo<T>>(); foreach (var i in Enumerable.Range(0, size)) { var client = new T(); postProcessing?.Invoke(client); clients.Add(new WebClientInfo<T>(this, i, client)); } _clients = ImmutableList.Create(clients.ToArray()); }
GetWebClientメソッド
続いてGetWebClientメソッドではプールしているWebClientInfoオブジェクトで空いている物をIWebClientInfoインタフェースとして返します。
基本的にはWebClientInfo#IsBusyプロパティがfalseなら返しますが、ここではasync/awaitを用いることで後の処理で扱いやすくしています。
こうすることで取得できるまで呼び出し元コンテキストを一旦止め、取得でき次第再開するということができます。
無限ループのところはWebClientInfoオブジェクトの返却忘れなどを考慮してタイムアウトなど何かしら設けてあげてもいいかもしれない。
public async Task<IWebClientInfo<T>> GetWebClient() { return await Task.Factory.StartNew(() => { while (true) { foreach (var webClientInfo in _clients) { lock (webClientInfo) { if (webClientInfo.IsBusy) continue; webClientInfo.IsBusy = true; return webClientInfo; } } Thread.Sleep(100); } }); }
ReturnWebClientメソッド
使用し終えたIWebClientInfoインタフェースのオブジェクトをPoolに返却します。
返却といってもIWebClientInfo#IsBusyをfalseに変えるだけなんですけどね。
一応今回は自動で返却するようになってるので使用者側は特に呼び出す必要はありませんが、これを忘れると永遠にWebClientInfoオブジェクトが貰えなくなります。
public void ReturnWebClient(IWebClientInfo<T> webClient) { var client = _clients[webClient.Id]; lock (client) { client.IsBusy = false; } }
全文
public class WebClientPool<T> : IDisposable where T : IDisposable, new() { private readonly ImmutableList<WebClientInfo<T>> _clients; public WebClientPool(int size, Action<T> postProcessing) { var clients = new List<WebClientInfo<T>>(); foreach (var i in Enumerable.Range(0, size)) { var client = new T(); postProcessing?.Invoke(client); clients.Add(new WebClientInfo<T>(this, i, client)); } _clients = ImmutableList.Create(clients.ToArray()); } public async Task<IWebClientInfo<T>> GetWebClient() { return await Task.Factory.StartNew(() => { while (true) { foreach (var webClientInfo in _clients) { lock (webClientInfo) { if (webClientInfo.IsBusy) continue; webClientInfo.IsBusy = true; return webClientInfo; } } Thread.Sleep(100); } }); } public void ReturnWebClient(IWebClientInfo<T> webClient) { var client = _clients[webClient.Id]; lock (client) { client.IsBusy = false; } } public void Dispose() { foreach (var client in _clients) { client.Client.Dispose(); } } }
使い方
以上を踏まえて並列ダウンロードを行ってみます。
Disposeの都合で2つのメソッドに分けていますが、公開するDownloadメソッドから。
抜粋しているので意味不明かもしれませんが、サンプルのModelにあるメソッドだと思ってもらえるとわかりやすいかと。
ここではまず、WebClientPoolクラスのインスタンスを生成し、引数には3 (3つ並列ダウンロードする) と内部で生成されるDummyWebClientのインスタンスにイベントを登録します。
実際の場面ではDummyWebClientではなく、WebClientを指定することになると思います。
あとは入力されたURLをコレクションに変換し、ダウンロードを行うDownloadTaskメソッドへ移動します。
ダウンロードが終わればDownloadTaskメソッド以下のコンテキストが実行されるので終わりの処理を書いてあげて終了です。
public async Task Download(string urlText) { using var webClientPool = new WebClientPool<DummyWebClient>(3, client => { client.DownloadStarted.Subscribe(DownloadStarted); client.CompletedChanged.Subscribe(DownloadCompleted); }); var convertedUrlText = urlText.Replace("\r\n", "\n").Replace("\r", "\n"); var urls = from x in convertedUrlText.Split('\n') where !string.IsNullOrEmpty(x) select x; await DownloadTask(urls, webClientPool); LogText += "Finished All."; }
DownloadTaskメソッドでは引数のURLコレクションをforeachで回してTaskを用いて並列ダウンロードを行います。
で、WebClientPool#GetWebClientメソッドでWebClientInfoオブジェクトを引っ張ってくるわけですが、awaitを使うことで空いているWebClientInfoオブジェクトがない場合にコンテキストを一時中断させます。
これにより上限有りの並列ダウンロードが実現できます。
WebClientInfoオブジェクトに空きができたらWebClientPool#GetWebClientからオブジェクトが帰ってきてコンテキストが再開されるのでダウンロードが開始されます。
そしてまたループが回ってawaitでコンテキストが一時中断されるのを繰り返します。
全ての処理が終わるのを待たないといけないので、ダウンロードのタスクをコレクションに保管しておいて、ループの外でTask.WhenAllメソッドで待機します。
private static async Task DownloadTask(IEnumerable<string> urls, WebClientPool<DummyWebClient> webClientPool) { var tasks = new List<Task>(); foreach (var (url, index) in urls.Select((v, i) => (Value: v, Index: i))) { var webClientInfo = await webClientPool.GetWebClient(); var task = Task.Factory.StartNew(() => { var webClientInfo = await webClientPool.GetWebClient(); var task = webClientInfo.StartTask(client => client.DownloadString(url, index)); _ = task.ContinueWith(t => { if (t.Exception != null) Debug.WriteLine(t.Exception.InnerException?.Message); }, TaskContinuationOptions.OnlyOnFaulted); tasks.Add(task); }); tasks.Add(task); } await Task.WhenAll(tasks); }
DummyWebClientクラス
一応出てきたので軽く触れておきますが、ダミーなのでそんな深い意味はないです。
やってるのはイベントの定義と、DownloadStringメソッドくらいで、DownloadStringメソッドではダウンロード時間のばらつきを表現するために引数の数で待機時間が変わるようにしています。
実際にダウンロードしてるわけじゃないので気兼ねなくサンプルコードでダウンロードしてもらって大丈夫です。
なお、例外処理の修正に合わせて5の倍数で例外が発生するようにしました。
public class DummyWebClient : IDisposable { #region Events private readonly Subject<string> _downloadStartedSubject = new(); private readonly Subject<string> _completedSubject = new(); public IObservable<string> DownloadStarted => _downloadStartedSubject; public IObservable<string> CompletedChanged => _completedSubject; #endregion public void DownloadString(string url, int num) { var cnt = new[] { 3000, 500 }; _downloadStartedSubject.OnNext(url); Thread.Sleep(cnt[num % 2]); if (num % 5 == 0) throw new Exception(); _completedSubject.OnNext(url); } public void Dispose() { _downloadStartedSubject?.Dispose(); _completedSubject?.Dispose(); } }