書き込み駆動パイプラインの準備: top down vs. bottom up (3)

書き込み駆動型のパイプラインを定義するために,次のようなインターフェイスを作りました.

public interface IAccumulable<TSource, TResult>
{
    IAccumulator<TSource, TResult> GetAccumulator();
}
public interface IAccumulator<TSource, TResult> : IDisposable
{
    bool Process(TSource item);
    TResult Result { get; }
}

モジュールへの書き込みは Process メソッドで行うこととします.Process が false を返すのはもうこれ以上書き込めないときということにして,そのときは書き込み側の責任でパイプラインを停止させましょう.停止要請の流儀は色々あって,例えば PythonPowerShell は停止要請に例外を使っています.
最上流まで停止要請が遡ると,利用者は,最上流モジュールの Dispose メソッドを呼び出します.それぞれのモジュールは,Dispose が呼び出されると,自身の責務として所有する下流モジュールの Dispose を呼び出すことにしておけば,これでパイプライン全体に停止を通知できるでしょう.
また,今回は Reduce 処理の抽象化ということで,Result というプロパティを用意することにしました.セマンティクスとして,Result プロパティは Dispose 後にのみ呼び出し可能とした方が綺麗かもしれませんが,まあこのあたりのデザインは各自色々考えてみて下さい.実装手段は色々あります.

Accumulable.Count

さて,まずは Count を書いてみましょう.

public static partial class Accumulable
{
    public static IAccumulable<TSource, int> Count<TSource>()
    {
        return new CountImpl<TSource>();
    }
    private sealed class CountImpl<TSource> : IAccumulable<TSource, int>
    {
        public IAccumulator<TSource, int> GetAccumulator()
        {
            return new Accumulator();
        }
        private sealed class Accumulator : IAccumulator<TSource, int>
        {
            private int _count = 0;
            public bool Process(TSource item)
            {
                _count = checked(_count + 1);
                return true;
            }
            public int Result
            {
                get
                {
                    return _count;
                }
            }
            public void Dispose()
            {
            }
        }
    }
}

実際に使ってみます.Enumerable.Range(0, 100) を Count に喰わせてみましょう.

static class Program
{
    static void Main(string[] args)
    {
        var seq = Enumerable.Range(0, 100);
        var accumulable = Accumulable.Count<int>();
        using (var accumulator = accumulable.GetAccumulator())
        {
            foreach (var item in seq)
            {
                if (!accumulator.Process(item)) break;
            }
            Console.WriteLine("Count = {0}", accumulator.Result);
        }
    }
}

結果.

Count = 100

OK ですな.
この構造は何度も出てくるので,Extension Method を作っておきましょう.

public static partial class Accumulable
{
    public static TResult Accumulate<TSource, TResult>(this IEnumerable<TSource> source, IAccumulable<TSource, TResult> accumulable)
    {
        using (var accumulator = accumulable.GetAccumulator())
        {
            foreach (var item in source)
            {
                if (!accumulator.Process(item)) break;
            }
            return accumulator.Result;
        }
    }
}

これで,より短く書くことができます.

static class Program
{
    static void Main(string[] args)
    {
        var accumulable = Accumulable.Count<int>();
        var count = Enumerable.Range(0, 100)
                              .Accumulate(accumulable);
        Console.WriteLine("Count = {0}", count);
    }
}

Accumulable.Sum

Sum はちょっと面倒ですね.
型ごとに足し算のセマンティクスが異なるので,とりあえず Int32 のみ定義してみましょう.

public static partial class Accumulable
{
    public static IAccumulable<TSource, TSource> Sum<TSource>()
    {
        if (typeof(TSource) == typeof(Int32))
        {
            return new SumImpl_Int32() as IAccumulable<TSource, TSource>;
        }
        else
        {
            throw new NotImplementedException();
        }
    }

    private sealed class SumImpl_Int32 : IAccumulable<Int32, Int32>
    {
        public IAccumulator<Int32, Int32> GetAccumulator()
        {
            return new Accumulator();
        }
        private sealed class Accumulator : IAccumulator<Int32, Int32>
        {
            private Int64 _sum = 0;
            public bool Process(Int32 item)
            {
                _sum = checked(_sum + item);
                return true;
            }
            public Int32 Result
            {
                get
                {
                    return checked((Int32)_sum);
                }
            }
            public void Dispose()
            {
            }
        }
    }
}

使ってみます.

static class Program
{
    static void Main(string[] args)
    {
        var accumulable = Accumulable.Sum<int>();
        var sum = Enumerable.Range(0, 100)
                            .Accumulate(accumulable);
        Console.WriteLine("Sum = {0}", sum);
    }
}

結果.

Sum = 4950

これも問題なさそうです.

Accumulable.Where

書き込み駆動な Where も作ってみます.

public static partial class Accumulable
{
    public static IAccumulable<TSource, TResult> Where<TSource, TResult>(Func<TSource, bool> predicate, IAccumulable<TSource, TResult> accumulable)
    {
        return new WhereImpl<TSource, TResult>(predicate, accumulable);
    }
    private sealed class WhereImpl<TSource, TResult> : IAccumulable<TSource, TResult>
    {
        private readonly Func<TSource, bool> _predicate;
        private readonly IAccumulable<TSource, TResult> _accumulatable;
        internal WhereImpl(Func<TSource, bool> predicate, IAccumulable<TSource, TResult> accumulable)
        {
            this._predicate = predicate;
            this._accumulatable = accumulable;
        }
        public IAccumulator<TSource, TResult> GetAccumulator()
        {
            return new Accumulator(_predicate, _accumulatable);
        }
        private sealed class Accumulator : IAccumulator<TSource, TResult>
        {
            private readonly Func<TSource, bool> _predicate;
            private readonly IAccumulable<TSource, TResult> _accumulatable;
            private IAccumulator<TSource, TResult> __accumulator = null;
            private IAccumulator<TSource, TResult> _accumulator
            {
                get
                {
                    if (__accumulator == null)
                    {
                        __accumulator = _accumulatable.GetAccumulator();
                    }
                    return __accumulator;
                }
            }

            internal Accumulator(Func<TSource, bool> predicate, IAccumulable<TSource, TResult> accumulable)
            {
                this._predicate = predicate;
                this._accumulatable = accumulable;
            }
            public bool Process(TSource item)
            {
                if (!_predicate(item)) return true;
                return _accumulator.Process(item);
            }
            public TResult Result
            {
                get
                {
                    return _accumulator.Result;
                }
            }
            public void Dispose()
            {
                if (__accumulator != null)
                    __accumulator.Dispose();
            }
        }
    }    
}

Where は,作成時にフィルタ条件と書き込み先を受け取ります.Result は書き込み先から受け取ってそれをオウム返しです.LINQ to Object 版と比較してみましょう.

static class Program
{
    static void Main(string[] args)
    {
        var count1 = Enumerable.Range(0, 100)
                               .Accumulate(
                                Accumulable.Where(x => x % 2 == 0,
                                Accumulable.Count<int>()));

        var count2 = Enumerable.Range(0, 100)
                               .Where(x => x % 2 == 0)
                               .Count();

        Console.WriteLine("Count1 = {0}", count1);
        Console.WriteLine("Count2 = {0}", count2);
    }
}

結果.

Count1 = 50
Count2 = 50

OK ですね.

Accumulable.Select

Select も作りましょう.基本は Where と同じですね.ソースコードは略.
実験.

static class Program
{
    static void Main(string[] args)
    {
        var sum1 = Enumerable.Range(0, 100)
                             .Accumulate(
                              Accumulable.Where(x => x % 2 == 0,
                              Accumulable.Select((int x) => x * (x-1),
                              Accumulable.Sum<int>())));

        var sum2 = Enumerable.Range(0, 100)
                             .Where(x => x % 2 == 0)
                             .Select(x => x * (x-1))
                             .Sum();

        Console.WriteLine("Sum1 = {0}", sum1);
        Console.WriteLine("Sum2 = {0}", sum2);
    }
}

結果.

Sum1 = 159250
Sum2 = 159250

OK です.

ここまでまとめ

これでだいたい準備は整いました.IEnumerable<T> 連鎖で書いた式と,IAccumulable<TSource, TResult> 連鎖で書いた式を見比べてみて下さい.これらは一定のルールで相互変換が可能なように見えます.
次回は実際にその相互変換自体をプログラミングする方法について見ていきましょう.

おまけ1 : それ Aggregate で……

Enumrable.Sum や Enumerable.Count, Enumerable.Min, Enumerable.Max などは,全てEnumerable.Aggregate で書き換えることができます.同じことを考えれば, Accumulator の実装も共通化できるでしょう.

おまけ2 : さらなる最適化を目指す人へ

今回はわかりやすさを優先して IAccumulable<TSource, TResult> 連鎖を使った定義を行いましたが,えムナウさんが気にされていた速度の問題を追及したいという人もいるでしょう.

static class Program
{
    static void Main(string[] args)
    {
        var sum1 = Enumerable.Range(0, 100)
                             .Accumulate(
                              Accumulable.Where(x => x % 2 == 0,
                              Accumulable.Select((int x) => x * (x-1),
                              Accumulable.Sum<int>())));

        var sum2 = Enumerable.Range(0, 100)
                             .Where(x => x % 2 == 0)
                             .Select(x => x * (x-1))
                             .Sum();

        int sum3 = 0;
        foreach (var x in Enumerable.Range(0, 100))
        {
            if (x % 2 == 0)
            {
                var y = x * (x - 1);
                sum3 = sum3 + y;
            }
        }

        Console.WriteLine("Sum1 = {0}", sum1);
        Console.WriteLine("Sum2 = {0}", sum2);
        Console.WriteLine("Sum3 = {0}", sum3);
    }
}

恐らくこの中では sum3 の計算が最も高速となるでしょう.ここでちょっと考えてみます.sum1 や sum2 の式木を与えられたとき,人手で sum3 のコードに書き直すことはできるでしょうか? ……恐らくできるでしょう.
では,このときの書き換えはいくつかのルールで定式化できないものでしょうか? ……これもちょっと考えていけばできそうです.
さて,その書き換え後のソースコードと,同じ動作をするメソッドを動的に作れないでしょうか? ……これは Lightweight Code Gen (LCG) を利用すれば恐らく可能でしょう.
つまり「だいたいにおいて,人手でできる最適化なら,それは自動化することができる」わけです.ただし,式木を使うのであれば,出発点は式である必要があります.分かりやすい「式」で書くことと,それが実行効率の良いプログラムに展開されることは,多くの場面で両立できると私は信じています.
「速度が必要なときはちょっと分かりにくくてもこう書こうよ」も必要だとは思いますが,「その最終形は,もっと分かりやすい式から自動で作れないかな?」を考えてみるのも面白いんじゃないでしょうかね?

おまけ3 : それ F# で……

言うまでもないことですが,C# のための式木変換エンジンを C# で書かなければならないというルールはありません.