読者です 読者をやめる 読者になる 読者になる

パイプライン色々: top down vs. bottom up (2)

.NET

id:NyaRuRu:20080127:p1 続き.
私自身,系統だってこの筋の学問を学んだわけではないので,自分の考え方を整理しながら考えてみます.



プログラミングの世界では,決まった動作をする小さなモジュールをパイプライン状につなげていって,何か役に立つ大きなものを作ることがしばしばあります.しかし,一口にパイプラインと言っても色々な実装があることに気付きます.代表的なパイプラインとして,まずは UNIX Pipe について見てみましょう.

UNIX-pile-like-pipeline

UNIX Pipe 流のパイプラインは,BlockingQueue や BlockingStream で連結されたマルチスレッドプログラムとみなすことができます.

この構造は,OSのスレッド/プロセススケジューリングを活用することで,各モジュールが必要なときだけ動作するという特徴があります.具体的には,データを受け取ったり転送したりする処理を OS がインターセプトして,「実行を継続すべきときまでブロックする」ことでこれを実現しています.
OSがスレッドを一時中断したり再開したりするということで,その即物さがかえって分かりやすいかもしれません.この仕組みなら C 言語で書かれたプログラム使えますしね.言語によってはそういった動作そのものを言語仕様に内包しているものもありますが,OSの助けを借りれば既存の言語でも色々な応用ができるというわけです.
このパイプラインでは,どのモジュールがどの順序で動くかがスレッドスケジューラに依存していて,事前に予言することができません.ただし,全体としてはデータを上流から下流に流すという動作をするはずです.
さて,あるモジュールの都合でパイプライン全体を停止したいときはどうすればよいでしょうか? これは,パイプの一端を閉じると,もう一端でそれを検知できるようにしておけば良さそうです.各モジュールのお約束として,パイプの一端が閉じられたことを検知すると,自分が所有するパイプを全て閉じながら自分自身も終了するとしておきましょう.そうすれば,パイプラインのどのモジュールから始めても,全てのモジュールを停止することができるはずです.
パイプラインの停止条件を共有することで,「停止しないモジュール」と「停止するモジュール」を組み合わせるというタイプの計算が可能になります.以前『C# 3.0 と while(true) と Iterator - NyaRuRuの日記』にて,複雑な停止条件を持つループが,停止条件をモジュールに分離することで見やすくなることを示しましたが,このアイディアの元ネタはむしろ UNIX-pipe 流のパイプラインにあります.

MS-DOS-like-pipeline

昔私が PC-9821 にて MS-DOS を使っていた頃,なぜに MS-DOS のパイプがそこまで馬鹿にされているのかよく分かりませんでしたが,今ならそれが少し分かる気がします.
MS-DOS のパイプは,シェルレベルで一旦テンポラリファイルに出力することで実現されています.シングルタスク OS だった MS-DOS では,これはある意味で仕方のないことでしょう.

つまり,最初のモジュールの出力を一度全てテンポラリデータとして受け取って,最初のモジュールを終了させ,改めてデータを次のモジュールに流し込むというわけです.

メリット
シングルスレッドでも実装可能
デメリット
パイプライン停止条件を共有できなくなったため,非決定性モジュールを使えなくなった.テンポラリデータを格納するためのリソースが必要.

シングルスレッドでも利用可能で,停止条件を共有できるソフトウェアパイプライン

さて,今度は OS の助けを借りずに,シングルスレッドでも実装可能で,かつ停止条件を共有できるソフトウェアパイプラインについて考えてみましょう.
CLR はファイバーをサポートしていませんし,関数呼び出しに何か特殊な技能があるわけでもありません.せいぜい例外を使ってコールスタックを一気に遡る程度の能力です.従って,あるモジュールから隣接するモジュールに,「今動作して」という合図を送るのは,そのモジュールのメソッドを呼び出すことで実装するのが自然でしょう*1..NET でよく出てくる「イベント」という奴です.
ここで,向きを選ぶ必要が出てきます.つまり,パイプラインのモジュールの,どちら側がどちら側に合図を送ることにするかという問題です.今回はパイプラインの入り口側を上流,出口側を下流と呼ぶことにして,上流側のモジュールがイベント送信側となるものを便宜上「書き込み駆動型」,その逆を「読み出し駆動型」と定義します.
「書き込み駆動型」も「読み出し駆動型」もどちらか一方があればよいというものではなくて,実際に .NET プログラミングでは暗黙のうちに使い分けられています.
「書き込み駆動型」パイプラインの代表例が,書き込みを意図した System.IO.Stream クラスの連鎖です.一方「読み出し駆動型」パイプラインの例には,読み出しを意図した System.IO.Stream クラスの連鎖を挙げることもできますが,今回は LINQ で多用されている IEnumerator<T> 連鎖を取り上げましょう.

System.IO.Stream 連鎖への書き込み処理

これは .NET プログラミングをしばらく行っている人は割と肌身で感じているパイプラインではないでしょうか.パイプラインの上流側に起こしたアクションが,下流側に連鎖していきます*2
一般的に,System.IO.Stream 連鎖では,作成の段階から「上流 (書き込み側) が外側に」現れます.

FileStream fs = ......;

var sw = new StreamWriter(
            new BufferedStream(
                new GZipStream(
                    fs,
                    CompressionMode.Compress
                )
            )
         );

「書き込み駆動型」パイプラインの歯車を回すエンジンは,最上段にあると考えると分かりやすいでしょう.

System.IO.Stream 連鎖への書き込みのコールスタックを考えると,上流モジュールのメソッドが Caller になり,下流モジュールのメソッドが Callee になります.
停止条件の共有ですが,下流から上流に停止を伝えるには「書き込みメソッドの戻り値」または「例外送出」が使えますね.逆に上流から下流への停止通知には,Stream.Close や Stream.Dispose が利用できるでしょう.一般的には一度パイプライン最上流まで停止通知が遡りながら,例外処理を一通りすませた後,今度は finally ブロックで Close または Dispose が最下流まで順に呼び出されるという停止シークエンスをとります.
「書き込み駆動型」の特徴として,複数の出力を持たせても歯車構造やパイプライン停止条件の共有が維持できるというものがあります.
逆に複数の入力を許してしまうとどうなるでしょうか? まず,ZipWith メソッドのような合流処理を行うには,両方の入力が揃うまで一方の入力をキューイングしておく必要があります.
さらに,Stream クラスのように,下流のストリームのみを保持する単方向リスト構造では,上流から下流へメソッド呼び出しが行われたタイミングでしか上向きの情報伝達チャネルが開きません.複数入力を許しつつ,パイプライン停止タイミングを常に上流にフォワードするためには,上向きのコールバックを別途登録する必要があります.

IEnumerator<T> 連鎖からの読み出し処理

LINQ が利用している IEnumerator<T> 連鎖は,「読み出し駆動型」のパイプラインです.「読み出し駆動型」パイプラインの歯車を回すエンジンは,最下段にあると考えると分かりやすいでしょう.

IEnumerator<T> 連鎖のコールスタックでは下流モジュールのメソッドが Caller になり,上流モジュールのメソッドが Callee になります.
一般的に,IEnumerator<T> 連鎖では,作成の段階から「下流 (読み出し側) が外側に」現れます.

IEnumerable<int> source = ......;

var q = Enumerable.Take(
            Enumerable.Select(
                Enumerable.Where(
                    source,
                    x => x % 2 == 0),
                x => x * x),
            5);

停止条件の共有にて,上流から下流に停止を伝えるには「MoveNext メソッドの戻り値」が利用され.逆に下流から上流への停止通知には,IDisposable.Dispose が用いられます.一般的には一度パイプライン最下流まで停止通知が行き渡り,その後今度は finally ブロックで Dispose が最上流まで遡るという停止シークエンスをとります.
「読み出し駆動型」の特徴として,複数の入力を持たせてもパイプライン停止条件の共有が維持できるというものがあります.
そして,「読み出し駆動型」では複数の出力が苦手ということになります.

reduce 処理と「読み出し駆動型」パイプラインの複数出力

さて,えムナウさんが問題にされていた foreach の回数の問題は,IEnumerator<T> 連鎖が複数の出力を苦手とする,ということと関連しているように見えます.

static int[] cal3(int[] arr)
{
    return new int[] { 
        arr.Sum(),
        arr.Count(),
        arr.Max(),
        arr.Min()
    };
}

この処理には,スカラ出力が 4 つ必要となります.「読み出し駆動型」では読み出しループが処理の基点となりますから,入力ソースは共通でも,この構造のままでは単一の IEnumerator<T> 連鎖になってくれないのです.
対処法のひとつは,既に nsharp 先生がコメント欄で指摘済みです*3.4 つの値をペアにして,実際の出力を 1 つにしてしまえば,1 本のパイプによる「読み出し駆動」で対処できるというわけです.
確かに,このような場面で,LINQ to Object は確かに多少窮屈な部分があるかもしれません.私と言えば,今のところ同じ入力に複数の Reduce 処理を適用したいという場面に何度も出会ったことはありませんが,今後無いとも言い切れません.そこでもう少し一般的にこの問題に対処する方法を考えてみましょう.
出力値が Reduce された値のペアである場合,読み出し駆動ではなく書き込み駆動のパイプラインの方が相性が良さそうです.具体的には Expression Tree を経由して,「読み出し駆動型」の複数 Reduce パイプラインを,単一の「書き込み駆動型」パイプラインにプログラム変換を行ってみます.(続く)

*1:DirectShow なんかは 10 年以上前からやたら小難しくこの問題に対処してますな.アレはアレで芸術なんですが.

*2:もちろん先ほどの『ストリームのパイプライン処理 - MSDN Magazine 2008年2月号』のようにマルチスレッドで実現することもできます

*3:http://blogs.wankuma.com/mnow/archive/2008/01/24/119056.aspx#119331