ConcurrentReduce: top down vs. bottom up (4)
最近私が C# 3.0 で新しいライブラリを書くときは,まず記法 (シンタックス) を考え,それが「それらしい」かどうかから吟味するのですが,今回もあらかじめどんなシンタックスになるか少し見ておきましょう.
今回目指すシンタックスは次のようなものです.
var result = q.ConcurrentReduce ( seq => new { Count = seq.Where(x => x % 2 == 0).Count(), Sum = seq.Select(x => x * x).Sum(), Min = seq.Min(), Max = seq.Max(), } );
記法から考えることについては善し悪しがあると思いますが,シンタックスがあまりにも LINQ の世界から離れてしまうのは望むところではありません.上記のような ConcurrentReduce を実装するにあたり,その実装を簡単にするために独特のシンタックスを強いるよりは,多少実装が複雑になっても,書き手は LINQ の世界で自然なシンタックスのままでいられることを優先したいと思います.
ConcurrentReduce のメソッドシグネチャは次のようになります.
public static TResult ConcurrentReduce<TSource, TResult>( this IEnumerable<TSource> source, Expression<Func<IEnumerable<TSource>, TResult>> expr) { ...... }
ここで,例で示したコードがどのような式木に quote されるか見ておきましょう.この手の開発に id:NyaRuRu:20071201:p1 で紹介した『Expression Tree Visualizer』は必須です.作者である波村さんに感謝 *1.
だいぶ複雑に見えますな.せっかくなので Mathematica の TreeForm にも絵を描かせてみます.
XSum[ x_List ] := Apply[ Plus, x]; XWhere[ x_List, pred_Symbol] := Select[ x, pred ]; XWhere[ x_List, pred_Function] := Select[ x, pred ]; XSelect[ x_List, pred_Symbol] := Map[ x, pred ]; XSelect[ x_List, pred_Function] := Map[ x, pred ]; XCount[ x_List] := Length[ x]; XMin[x_List] := Min[x]; XMax[x_List] := Max[x]; TreeForm[ Function[{seq}, {XCount[XWhere[seq, Function[{x}, Mod[x, 2] == 0]]], XSum[XSelect[seq, Function[{x}, x*x]]], XMin[seq], XMax[seq] }]]
まあ微妙にニュアンスが違いますが概ねこんなイメージで.
ConcurrentReduce を作る
さて,んでは実際に ConcurrentReduce を作ってみましょう.
今回はデモ用途ということでだいぶ実装を省略しています.完全版を作るのはそれなりに大変なので,我こそはという方は是非挑戦してみて下さい.
public static TResult ConcurrentReduce<TResult>( this IEnumerable<int> source, Expression<Func<IEnumerable<int>, TResult>> expr) { var seq = expr.Parameters[0]; var newexpr = expr.Body as NewExpression; var accumulators = newexpr.Arguments .OfType<MethodCallExpression>() .Select(subexpr => ConvertLeafToRoot(subexpr)) .Select(accumulable => accumulable.GetAccumulator()) .ToArray(); // sequencial scan! foreach (var item in source) { foreach (var accumulator in accumulators) { accumulator.Process(item); } } var results = accumulators.Select(accumulator => accumulator.Result) .Cast<object>() .ToArray(); foreach (var accumulator in accumulators) { accumulator.Dispose(); } return (TResult) newexpr.Constructor.Invoke(results); }
ConvertLeafToRoot と ConvertNodeToNode の実装については最後にまとめて行っています.今回は受け取る式木の形を決めうちしていて,いきなり NewExpression にキャストしていますが,実際はコンストラクタや関数呼び出しなど色々なケースを想定する必要があるでしょう.
使ってみます.
var q = Enumerable.Range(0, 100) .Hook(new EnumeratorHook<int>() { OnBeforeBegin = () => Console.WriteLine("Beginning!"), OnAfterDispose = _ => Console.WriteLine("Disposed!"), }); var result = q.ConcurrentReduce( seq => new { Sum = seq.Sum(), Count = seq.Count(), Min = seq.Min(), Max = seq.Max(), X = seq.Where(x => x % 2 == 0) .Select(x => x * (x - 1)) .Sum(), }); Console.WriteLine("Result = {0}", result);
結果.
Beginning! Disposed! Result = { Sum = 4950, Count = 100, Min = 0, Max = 99, X = 159250 }
今回のコードでは,"sequencial scan!" とコメントに書いた部分の foreach が,読み取り駆動型パイプラインと書き込み駆動型パイプラインの両方のエンジンを回す役目を果たしています.
速度は?
ちなみにこの ConcurrentReduce ですが,速度はというと遅いです.本気で速度が欲しければ,以前書いたように LCG でループ展開が吉でしょう.
というか今回実装がここまで手抜きなのは,事前に速度が出ないことを知っていたからだったり.プロトタイプが完成して早々に (1月29日ぐらい?) 「速度でねー」となって,IAccumulable 連鎖で完成度を高める意欲を失っちゃったと.LCG 版も 3 割ぐらい書いたのですが,いつか完成して日の目を見るかは不明.
とまあ変なオチですが,以上 C# のパイプライン書き換えについてのお話でした.
ConvertLeafToRoot と ConvertNodeToNode の実装
private static IAccumulable<int, int> ConvertLeafToRoot(MethodCallExpression call) { var accumulable = default(IAccumulable<int, int>); switch (call.Method.Name) { case "Sum": accumulable = Accumulable.Sum<int>(); break; case "Count": accumulable = Accumulable.Count<int>(); break; case "Min": accumulable = Accumulable.Min<int>(); break; case "Max": accumulable = Accumulable.Max<int>(); break; default: throw new NotImplementedException(); } var nextCall = call.Arguments[0] as MethodCallExpression; if (nextCall != null) { accumulable = ConvertNodeToNode(nextCall, accumulable); } return accumulable; } private static IAccumulable<int, int> ConvertNodeToNode( MethodCallExpression call, IAccumulable<int, int> child) { var accumulable = default(IAccumulable<int, int>); switch (call.Method.Name) { case "Select": { var func = call.Arguments[1] as Expression<Func<int, int>>; accumulable = Accumulable.Select(func.Compile(), child); break; } case "Where": { var func = call.Arguments[1] as Expression<Func<int, bool>>; accumulable = Accumulable.Where(func.Compile(), child); break; } default: throw new NotImplementedException(); } var nextCall = call.Arguments[0] as MethodCallExpression; if (nextCall != null) { accumulable = ConvertNodeToNode(nextCall, accumulable); } return accumulable; }
こちらもかなり手抜き実装です.特に Select は int → int しか受け付けていないので注意.
*1:デブサミ 2008 で来日ですよ!