Observable.Generate、そして非同期実行

Reactive Extensionsのお勉強の続き。
今日はObservable.Generate メソッドに注目した。

というのも、
「あるメソッドの戻り値が一定の値になるまで、戻り値をそのメソッドの引数にして非同期で実行し続けるにはどうすればいいか?」
という問題にぶち当たっていたから。
非同期と言えばRxでしょう(当社比)から何かしらうまいことできるんじゃないのと思っていたけど勉強始めたばかりなので全然アイデアが浮かばなかった。

で、「非同期だからFromAsyncPattern?」とか数々の勘違いを積み重ねた上で「これ使えるんじゃね?」って思ったのがこのObservable.Generateメソッド。

ざっと紹介すると

using System;
using System.Reactive.Linq;

namespace ConsoleApplication19
{
    class Program
    {
        static void Main(string[] args)
        {
            Observable.Generate(
                1,              //初期値。
                x => x < 10,    //終了判定。falseになれば終了。
                x => x * 2,     //イテレーション。次の値を返す。
                x => x.ToString("# です!")//セレクタ
            ).Subscribe(
                x => Console.WriteLine(x));

            Console.WriteLine("*****終了*****");
            Console.ReadKey();
        }
    }
}

のように、初期値と終了判定とイテレーションセレクタを指定すればプッシュしてくれるもの。
これをさっきの
「あるメソッドの戻り値が一定の値になるまで、戻り値をそのメソッドの引数にして実行し続けるにはどうすればいいか?」
に当てはめれば

  • 初期値    :メソッドを最初に呼ぶときの引数
  • 終了判定   :メソッドの戻り値が一定の値になったかどうか
  • イテレーション:メソッドそのもの
  • セレクタ   :戻り値

・・・うーん、なんかできそうな感じしてきた。

しかし!
一番肝心な非同期はどうすんのよ。。。
惜しい、すごく惜しい・・・。

・・・と思ったんだけど、公式サンプルにもあるとおり第5引数にScheduler.ThreadPoolを指定することで別スレッドで実行してくれるということに気づいた。
ヤッター、というか早く気づいとけ。

■Scheduler.ThreadPoolを指定した例

using System;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Threading;

namespace ConsoleApplication18
{
    class Program
    {
        static void Main(string[] args)
        {
            var o = Observable.Generate(
                1,
                i => i < 5,
                i => AsyncFunc(i),
                i => i,
                Scheduler.ThreadPool //← これ!
            );

            o.Subscribe(
                //OnNext
                i => Console.WriteLine("{0} called", i),
                //エラーハンドリング
                e => Console.WriteLine(e.Message),
                //完了時のコールバック
                () => Console.WriteLine("========すべての作業は終了しました。========")
            );

            Console.WriteLine("*************メインスレッドの作業は終了しました***************");
            Console.WriteLine("何かキーを押すと終了します。");
            Console.ReadKey();
        }


        static int AsyncFunc(int value)
        {
            Console.WriteLine("Start....");
            //三秒待つ
            Thread.Sleep(3000);
            Console.WriteLine("{0} executed", value);
            //インクリメントする
            return ++value;
        }
    }
}

■結果

*************メインスレッドの作業は終了しました***************"
何かキーを押すと終了します。
1 called
Start....
1 executed
2 called
Start....
2 executed
3 called
Start....
3 executed
4 called
Start....
4 executed
========すべての作業は終了しました。========

やったー!
非同期実行されてるぜ!!
ちなみに、Scheduler.ThreadPoolを指定して別スレッドで実行することを指示できるのは何もこのメソッドに限ったことでは無く、
Observableの拡張メソッドの多くはこの機能を備えている。

ビバ!Reactive Extensions!!