【Common Lisp】Go言語の goroutine っぽいものを作ってみたかった話

Lisp Advent Calendar 2019 23日目の記事です。

近頃仕事で書いている Go 言語の勉強も兼ねて「Go言語による並行処理」という本を買ったので Lisp を書いていくぞという内容です。goroutine っぽい cloutine なるものを Common Lisp 上で作ってみようという試みです。

www.oreilly.co.jp



できたもの(と制限)

github.com

  • 隠蔽された複数の実スレッドにいわゆる Green Thread(cloutine と命名)を投げ込んで並行処理を実現する
  • 非同期なチャネル

というようにできたのは一部分だけです。また、できたものについても下記の制限があります。

  • 色々実用に耐えない
    • 後述のように継続ライブラリ cl-cont を利用していますが、unwind-protect などいくつかのスペシャルフォームに対応していないので実用的に使うのは辛そうです
  • SBCL 上ではさらに実用的に耐えない
    • SBCL のエグい最適化と cl-cont のエグいコード変換がかち合って不安定だったり、コンパイルに異様に時間がかかったりします
    • 一応テストは SBCL, CCL の2処理系で行ってはいるものの...*1
  • チャネルについてはかろうじて単独で動作するだけで select のような高度で実用的な機能はできていない

技術要素

次のようなものを利用して作成しました。

  • マルチスレッド: bordeaux-threads
  • 限定継続: cl-cont
    • 簡単な使い方について「cl-cont: 限定継続」の項で簡単に解説しているので、興味のある方はそこだけ見てみると良いかもしれません
  • 非同期処理: Blackbird: いわゆる Promise を実現するライブラリ

参考

  • green-thread
    • シングルスレッド上での Green Thread を実現するライブラリです
    • 利用はしていませんが実装の参考にしています。bordeaux-threads 以外の技術要素は同じです

動かしてみる

現状 cloutine は quicklisp に登録していないので ql:quickload できるような位置にソースを持ってくることが必要です。Roswell を利用している場合は下記が簡単です。

$ ros install eshamster/cloutine

まずは次の Go のコードに相当する動作を試してみます。

package main

import (
    "fmt"
    "time"
)

func main() {
    ch := make(chan int)
    // go で goroutine を1つ立てる
    go func() {
        // チャネルがクローズされるまで値を取り出して出力し続ける
        for {
            val, ok := <-ch
            if !ok {
                break
            }
            fmt.Printf("%d\n", val)
        }
    }()
    go func() {
        // 0~4までの値をチャネルに投入した後、クローズする
        defer close(ch)
        for i := 0; i < 5; i++ {
            ch <- i
        }
    }()
    // 本来 wait group でやるところをサボり
    time.Sleep(1)
}

次のようになります。コメントに既につらみが見えています。

CL-USER> (ql:quickload :cloutine :silent t)
(:CLOUTINE)
CL-USER> (use-package :cloutine)
T
;; goroutine ぽいもの = cloutine を実行する実スレッドを2つ立ち上げる初期化処理
CL-USER> (init-cloutine 2)
#<CLOUTINE/REAL-THREADS::REAL-THREADS #x3020011BEDDD>
CL-USER> (let ((chan (make-channel)))
           ;; clt: cloutine を作成するマクロ
           (clt (loop
                   ;; チャネルに値が入ってくるのを待つ
                   (let ((x (<-chan chan)))
                     ;; 多値で「値, open-p」みたいに返してクローズ判定したいが、
                     ;; cl-cont との組み合わせがうまくいかないので苦しいクローズ判定...
                     (when (channel-closed-value-p x)
                       (return))
                     (print x))))
           (clt (dotimes (i 5)
                  ;; 値をチャネルに入れる
                  (chan<- chan i))
                ;; defer 相当のことをするために unwind-protect を利用するべきだが
                ;; cl-cont が対応していないので普通に close...
                (close-channel chan)))
NIL

0
1
2
3
4

念のため、init-cloutine の引数に指定する実スレッドの数は同時に立てられる cloutine の数を制限するものではないです。次の通り、3つの実スレッドに対して5 (+1) 個の cloutine を同時に立てることができています*2

CL-USER> (ql:quickload :cloutine :silent t)
(:CLOUTINE)
CL-USER> (use-package :cloutine)
T
;; bt = bordeaux-threads のニックネーム
CL-USER> (defparameter *lock* (bt:make-lock))
*LOCK*
CL-USER> (init-cloutine 3)
#<CLOUTINE/REAL-THREADS::REAL-THREADS #x3020011BECCD>
CL-USER> (let ((chan (make-channel)))
           (dotimes (i 5)
             (let ((i i))
               (clt (loop
                       ;; スレッドが切り代わり易いように適当に sleep を入れて動作を乱す
                       (sleep 0.001)
                       (let ((x (<-chan chan)))
                         (when (channel-closed-value-p x)
                           (return))
                         (bt:with-lock-held (*lock*)
                           ;; *real-thread-index* は実スレッドの番号
                           (format t "thread: ~D, cloutine: ~D, value: ~D~%"
                                   *real-thread-index* i x)))))))
           (clt (dotimes (i 15)
                  (chan<- chan i)
                  ;; 上に同じく適当に sleep を入れる
                  (when (= (mod i 5) 0)
                    (sleep 0.001)))
                (close-channel chan)))
NIL
thread: 2, cloutine: 1, value: 0
thread: 0, cloutine: 0, value: 1
thread: 2, cloutine: 1, value: 5
thread: 0, cloutine: 2, value: 2
thread: 2, cloutine: 1, value: 7
thread: 0, cloutine: 2, value: 8
thread: 1, cloutine: 3, value: 3
thread: 2, cloutine: 1, value: 9
thread: 0, cloutine: 2, value: 10
thread: 2, cloutine: 1, value: 11
thread: 1, cloutine: 3, value: 12
thread: 0, cloutine: 2, value: 13
thread: 2, cloutine: 1, value: 14
thread: 1, cloutine: 4, value: 4
thread: 0, cloutine: 0, value: 6

今回参考にした「Go言語による並行処理」第6章「ゴルーチンとGoランタイム」P.206 に次のような説明用のコードがあるので、同等のものを動かしてみます。

   fib = func(n int) <-chan int {
        result := make(chan int)
        go func() {
            defer close(result)
            if n <= 2 {
                result <- 1
                return
            }
            result <- <-fib(n-1) + <-fib(n-2)
        }()
        return result
    }

    fmt.Printf("fib(4) = %d", <-fib(4))

次のようになります。どうにか動作はするようです。

CL-USER> (ql:quickload :cloutine :silent t)
(:CLOUTINE)
CL-USER> (use-package :cloutine)
T
CL-USER> (defun fib (n)
           (let ((result (make-channel)))
             (clt (if (<= n 2)
                      (chan<- result 1)
                      (chan<- result
                              (+ (<-chan (fib (- n 1)))
                                 (<-chan (fib (- n 2))))))
                  ;; 前述の通り unwind-protect を使えないので普通に close...
                  (close-channel result))
             result))
;; warnings は見なかったことにする
;Compiler warnings :
;   In an anonymous lambda form inside an anonymous lambda form inside an anonymous lambda form inside an anonymous lambda form inside an anonymous lambda form inside FIB: Undefined function FIB
;   In an anonymous lambda form inside an anonymous lambda form inside FIB: Undefined function FIB
FIB

CL-USER> (init-cloutine 2)
#<CLOUTINE/REAL-THREADS::REAL-THREADS #x302000F9D86D>

;; channel を利用するには with-call/cc で囲う必要がある(clt マクロも内部で囲っている)
CL-USER> (cont:with-call/cc
           (print (<-chan (fib 4))))
#<PROMISE name: "attach: PROMISE" finished: NIL errored: NIL forward: NIL #x302000F9DA9D>

3

実装の基本的なアイディア

cloutine の生成と実行

goroutine っぽいもの = cloutine の裏で動作する実スレッドはそれぞれ下記のような処理を行います。この辺りは「Go言語による並行処理」第6章「ゴルーチンとGoランタイム」を参考にしています。

  • 各スレッドはキューを一つずつ持つ
  • 全てのキューが空のとき全てのスレッドは待ち状態になる
  • いずれかのキューに関数が入っているときスレッドの待ちが解除され次の動作をする
    1. 自身のキューを調べ、関数が入っていればそれを実行する
    2. 1で取得できない場合、他のキューを順に調べて関数を見つけ次第それを実行する

したがって、cloutine を生成する clt マクロの基本的な内容は、渡された処理を lambda で包んで関数化し、上記のキューに詰め込むだけです。基本的には各スレッドは自身のキューに関数を詰めて自身で実行するのですが、「他のキューを順に調べて関数を見つけ次第それを実行する」の動作があるため、空いているスレッドは人のキュー内の関数を盗んで実行することができます。

非同期なチャネル

上記の通り、cloutine の基本的な動作だけであればそれほど面倒なところはない *3 のですが、非同期なチャネルの実装が割と厄介です。「技術要素」の項で述べた限定継続ライブラリ cl-cont と、Promise ライブラリ Blackbird はいずれもここで出てきます。

分かり易く説明できる気がしないのですが、非同期な待ちは次のように実現します(※待ちが発生しない限りはほぼ単なるスレッドセーフなキュー)。以下は取り出し待ちの例ですが、チャネルのサイズを制限した場合の投入待ちの処理もおおむね同じになります。

  1. チャネル操作以降の処理を「継続」= 特定の形式の関数として取り出す(cl-cont)
  2. Promise を作成し、解決するための関数をチャネルに保管する(Blackbird)
  3. 2 で作成した Promise に、解決時の処理として 1 の継続を実行する処理を登録する
  4. チャネルに値を投入する際に 2 で保管した関数があれば呼び出して Promise を解決する
    • ここで、3で登録した継続が実行される

実装

queue: ただのキュー

https://github.com/eshamster/cloutine/blob/master/queue.lisp

特別なところはないただのキューなのでリンクだけ貼ります。head からの取り出しと tail への追加しかできない双方向リストとして実装しています。

こんな感じで利用します。

CL-USER> (use-package :cloutine/queue)
    T
CL-USER> (defparameter *q* (make-queue))
*Q*
CL-USER> (queue *q* 0)
0
CL-USER> (queue *q* 1)
1
CL-USER> (dequeue *q*)
0
CL-USER> (dequeue *q*)
1
CL-USER> (dequeue *q*)
NIL

この部分は12日目の記事「【Common Lisp】REPL 上で手軽にスレッドの動作を試すためのライブラリを作った」の repl-threads と全く同じです。というより、そもそも repl-threads は cloutine を書いていて思いついたものです。ついでに言うと、6日目の記事「cl-base + rove + Travis CI でテストする」は cloutine のテストで困ったことからできたものです。

multi-queue

https://github.com/eshamster/cloutine/blob/master/multi-queue.lisp

multi-queue は次のような特徴を持つ複数のキューのまとまりです。

  • 複数のキューを配列として持つ
  • 全てのキューが空のとき dequeue 操作は待たされる
  • いずれかのキューにデータがあるとき dequeue は次のように動作する
    1. 指定されたインデックスのキューを調べ、データがあればそれを取得する
    2. 1で取得できない場合、他のキューを順に調べデータを見つけ次第そこから取得する
      • ロック・セマフォにより適切に排他制御をかけることで、 2まで来て取得できないケースが出ないようにする

パッケージ定義を見ると、上記に必要な部品であるキュー・ロック・セマフォを import していることが分かります。

(defpackage cloutine/multi-queue
  (:use :cl)
  (:export :multi-queue
           :init-multi-queue
           :queue-into
           :dequeue-from)
  (:import-from :cloutine/queue
                :init-queue
                :queue
                :dequeue)
  (:import-from :bordeaux-threads
                :make-lock
                :make-semaphore
                :with-lock-held
                :wait-on-semaphore
                :signal-semaphore))
(in-package :cloutine/multi-queue)

クラス定義は次のようになります。複数のキューと各キューに対応する複数のロック、そして1つのセマフォを持ちます。

(defclass multi-queue ()
  ((queues :initarg :queues :accessor mq-queues)
   (locks :initarg :locks :accessor mq-locks)
   (semaphore :initform (make-semaphore) :accessor mq-semaphore)))

(defun init-multi-queue (n)
  (let ((queues (make-array n))
        (locks (make-array n)))
    (dotimes (i n)
      (setf (aref queues i) (init-queue)
            (aref locks  i) (make-lock)))
    (make-instance 'multi-queue
                   :queues queues
                   :locks locks)))

queue-into では指定したインデックスのキューへ値を入れ、セマフォをインクリメントします。

(defmethod queue-into ((mq multi-queue) queue-index value)
  (assert (< queue-index (queue-count mq)))
  (with-lock-held ((aref (mq-locks mq) queue-index))
    (queue (aref (mq-queues mq) queue-index)
           value))
  (signal-semaphore (mq-semaphore mq)))

dequeue-from では前述の、指定されたインデックスのキューを優先した dequeue 操作を行います。ただし、queue 操作がない限りはセマフォがインクリメントされることはないので、全てのキューが空であれば入口で待たされます。

(defmethod dequeue-from ((mq multi-queue) prior-queue-index)
  (assert (< prior-queue-index (queue-count mq)))
  ;; 全てのキューが空なら待つ
  (wait-on-semaphore (mq-semaphore mq))
  (macrolet ((try-dequeue (index)
               `(with-lock-held ((aref (mq-locks mq) ,index))
                  (let ((value (dequeue (aref (mq-queues mq) ,index))))
                    (when value
                      (return-from dequeue-from value))))))
    ;; まずは指定されたインデックスのキューを調べる
    (try-dequeue prior-queue-index)
    ;; なければ別のキューを順番に見て値を取り出す
    (dotimes (i (queue-count mq))
      (unless (= i prior-queue-index)
        (try-dequeue i)))
    ;; 上記で取れないケースはあり得ないのでエラー
    (error "No value is there.")))

real-threads

https://github.com/eshamster/cloutine/blob/master/real-threads.lisp

real-threads はその名の通り実スレッドです。 multi-queue を利用して各スレッドは次のように動作します。いずれの動作も multi-queue の各動作に対応しています。

  • 各スレッドは multi-queue 内に対応するキューを1つずつ持つ
  • 全てのキューが空のとき全てのスレッドは待ち状態になる
  • いずれかのキューに関数が入っているときスレッドの待ちが解除され次の動作をする
    1. 自身のキューを調べ、関数が入っていればそれを実行する
    2. 1で取得できない場合、他のキューを順に調べて関数を見つけ次第それを実行する

上記を実現するため、先程の multi-queue の他、スレッド作成・破棄の関数を import します。セマフォ関連も import していますが、これは初期化時のみの利用で、本質的な処理には絡んでいません*4

(defpackage cloutine/real-threads
  (:use :cl)
  (:export :start-real-threads
           :destroy-real-threads
           :queue-promise)
  (:import-from :cloutine/multi-queue
                :multi-queue
                :init-multi-queue
                :queue-into
                :dequeue-from)
  (:import-from :bordeaux-threads
                :make-thread
                :destroy-thread
                :make-semaphore
                :wait-on-semaphore
                :signal-semaphore))
(in-package :cloutine/real-threads)

クラスの定義は次の通りです。

(defclass real-thread ()
  ((thread :initarg :instance :accessor thread-instance)
   (index :initarg :index :accessor thread-index)))

(defclass real-threads ()
  ((threads :initarg :rt-array :accessor threads-array)
   (multi-queue :initarg :mq :accessor threads-mq)
   (destroied-p :initform nil :accessor threads-destroied-p)))

初期化は引数で指定されたスレッド数に応じて、multi-queue を初期化し、後述の process-thread 関数を実行する実スレッドを作成します。一応初期化が完了するまではセマフォprocess-thread が走らないようにしています。

(defun start-real-threads (n)
  (let* ((mq (init-multi-queue n))
         (rt-arr (make-array n))
         (rts (make-instance 'real-threads :mq mq))
         (sem-to-wait-start (make-semaphore)))
    (dotimes (i n)
      (let ((rt (make-instance 'real-thread :index i)))
        (setf (thread-instance rt)
              (make-thread (lambda ()
                             (wait-on-semaphore sem-to-wait-start)
                             (process-thread rt rts))))
        (setf (aref rt-arr i) rt)))
    (setf (threads-array rts) rt-arr)
    (signal-semaphore sem-to-wait-start :count n)
    rts))

;; destroy-real-threads は略

次がその process-thread で、multi-queue から関数を取り出せたらそれを実行する、を繰り返すだけです。また、スペシャル変数 *real-thread-index* に自身のスレッド番号を束縛します。

(defvar *real-thread-index* nil)

(defmethod process-thread ((rt real-thread) (rts real-threads))
  (loop
     (let ((index (thread-index rt)))
       ;; dequeue-from は全てのキューが空であるうちは待たされる
       (let ((process (dequeue-from (threads-mq rts) index)))
         (assert (functionp process))
         (let ((*real-thread-index* index))
           (funcall process))))))

スレッドの外もしくは中からこのキューに関数を投げ込むメソッドが queue-process です。スレッド内からの場合は自身のインデックスのキューに投げます。そうでない場合は取りあえず 0 番目のキューに投げています(が乱数で選択した方が良い気もします)。

(defmethod queue-process ((rts real-threads) (process function))
  (when (threads-destroied-p rts)
    (error "The thread has been destroied."))
  (queue-into (threads-mq rts)
              (if *real-thread-index*
                  *real-thread-index*
                  0)
              process))

cloutine

https://github.com/eshamster/cloutine/blob/master/cloutine.lisp

本題の cloutine の実装ですが、処理の実体は real-threads が担っているので、ほぼそのラッパー程度の役割です。次のように real-threads の他に、後のチャネルの実装の仕込みとして cl-cont のシンボルをいくつか import します。

(defpackage :cloutine/cloutine
  (:use :cl)
  (:export :init-cloutine
           :destroy-cloutine
           :cloutine
           :clt)
  (:import-from :cloutine/real-threads
                :start-real-threads
                :destroy-real-threads
                :queue-process)
  (:import-from :cl-cont
                :with-call/cc
                :without-call/cc))
(in-package :cloutine/cloutine)

初期化関数は次の通りで、グローバルに real-threads を1つ作成します。

(defvar *real-threads* nil)

(defun init-cloutine (n)
  (setf *real-threads* (start-real-threads n)))

;; ※destroy-cloutine は省略

cloutine を作成する cloutine マクロとそのエイリアスclt マクロは次のようになります。ほぼ body 部を lambda で包んで real-threads の queue-process に渡すだけです。with-call/cc, without-call/cc については次のチャネルの項を参照してください。

(defmacro cloutine (&body body)
  `(queue-process *real-threads*
                  (without-call/cc
                    (lambda ()
                      (with-call/cc
                        ,@body)))))

(defmacro clt (&body body)
  `(cloutine ,@body))

channel

チャネルの実装の前に、限定継続ライブラリ cl-cont と Promise ライブラリ Blackbird について簡単に見ていきます。

cl-cont: 限定継続

限定継続についてざっくり感覚的な理解を得るには κeen さんの Common Lispで限定継続と遊ぶ などを見ながら cl-cont をいじってみるのが良いのかなと思います。自身も継続を使ってみるのは始めてでざっくりとした理解しかありませんが...。

ここでは継続とは何かという話は置いて、どの様な動きをするのかいくつかの例で見てみます。まずは単純に継続 = 「以降の処理」を取り出す様子を見てみます。

;; ※ややこしいので以降 print の出力のみ記載し、REPLの出力は省略します
CL-USER> (defparameter *cont* nil)
CL-USER> (cont:with-call/cc
           (print :start)
           ;; let/cc 以降の処理 = 継続が k に束縛される
           (cont:let/cc k
             (print :let-start)
             (setf *cont* k)
             (print :let-end))
           ;; ↓はまだ処理されない
           (print :end))

:START
:LET-START
:LET-END
;; 継続は関数として表されるので funcall できる
CL-USER> (funcall *cont*)

:END
;; もちろん何度でも呼べる
CL-USER> (funcall *cont*)

:END

この with-call/ccdefun を囲ったものが defun/cc で、次のように使えます。始めて試してみたときは、うわ本当に関数外の処理(継続)を引き込んで好きに扱える!と中々シビれました。

CL-USER> (defparameter *cont* nil)
CL-USER> (cont:defun/cc test-cc ()
           (cont:let/cc k
             (setf *cont* k)))
CL-USER> (cont:with-call/cc
           (print :start)
           (test-cc)
           (print :end))

:START
CL-USER> (funcall *cont*)

:END

さて、限定継続として取り出される関数は0個または1個の引数を取ります。ここまでは0引数の例でしたが、チャネルからの値の取り出し待ちをする継続は値を待っているので、そこに引数として渡してあげる必要があります。次のようにチャネルにまだ値がなかった場合の値取り出しの動作を模してみます。

CL-USER> (defparameter *cont* nil)
CL-USER> (cont:defun/cc like-waiting-chan ()
           (cont:let/cc k
             (setf *cont* k)))
CL-USER> (cont:with-call/cc
           (print :start)
           ;; まだチャネルに値が入っていないので残りの処理はどこか(*cont*)に格納しておいて
           ;; いったん処理は完了させる...という体
           (print (like-waiting-chan))
           (print :end))

:START
;; どこからかチャネルに値 100 が供給された...という体
CL-USER> (funcall *cont* 100)

100
:END

最後に実装時の細かい部分になりますが、 without-call/cc に触れます。ここまでに見たように、単純に with-call/cc を利用すると、 let/cc 以降の処理が全て束縛されてしまいます。実際にはもう少し範囲を絞りたいのですが、そのときの区切りとして利用するのが without-call/cc です。次のように使います。

CL-USER> (cont:with-call/cc
           (print :outer-start)
           (cont:without-call/cc
             (cont:with-call/cc
               (print :start)
               (cont:let/cc k
                 (setf *cont* k))
               ;; ここはまだ待って欲しい
               (print :end)))
           ;; ここはすぐ処理されて欲しい
           (print :outer-end))

:OUTER-START
:START
:OUTER-END
CL-USER> (funcall *cont*)

:END

Blackbird: Promise

Blackbird はいわゆる Promise(Future とも)を実現するライブラリで、非同期処理を実現するための部品として利用できます。次は resolve 済みの Promise を生成するだけの余り意味のない例です。

CL-USER> (ql:quickload :blackbird :silent t)
(:BLACKBIRD)
;; bb = blackbird
CL-USER> (let ((promise (bb:with-promise (resolve reject)
                          (resolve 100))))
           ;; promise が resolve されない限り attach された関数は実行されない
           ;; 今回は resolve 済みなのですぐ実行される
           (bb:attach promise
                      (lambda (x)
                        (format t "~&resolved: ~D~%" x))))
resolved: 100
;; ↓式自体は promise クラスを返している
#<PROMISE name: "attach: PROMISE" finished: T errored: NIL forward: NIL #x302000E6E2FD>

実際に非同期処理の文脈で利用するには大きく2つの方法があります。

  • cl-async のような非同期処理と組み合わせる
    • cl-async は libuv ベースの非同期処理ライブラリです
    • Blackbird 冒頭の例で紹介されている方法です
  • Promise 生成時に resolve 処理を外に保管しておいて後から resolve してもらう

今回とるのは後者の方法です。ここで、with-promiseresolve(と reject)は macrolet として定義されているため、そのまま外に渡すことはできません。こうした用途のためにはキーワード引数の resolve-fnreject-fn)が用意されているので、そちらを外に渡します。

CL-USER> (let* (resolver
                (promise (bb:with-promise (resolve reject :resolve-fn resolve-fn)
                           ;; resolve-fn を外に渡す
                           (setf resolver resolve-fn))))
           ;; promise が resolve されていないのですぐには実行されない
           ;; 名前の通り、resolve 時にやって欲しい処理を promise に atttach するだけ
           (bb:attach promise
                      (lambda (x)
                        (format t "~&resolved: ~D~%" x)))
           (print :after-attach)
           ;; ここで promise が resolve されるので上記の lambda も実行れる
           (funcall resolver 999)
           (print :after-resolve))

:AFTER-ATTACH
resolved: 999

:AFTER-RESOLVE
:AFTER-RESOLVE ; ← ※REPL の出力

これと cl-cont と組み合わせることでチャネルの待ちを実現できそうです。

  • チャネルに値が入っていないときは、resolve-fn を保管して Promise を生成し、残りの処理 = 継続を attach しておく
  • チャネルに値を入れる際に保管された resolve-fn に値を渡して attach された継続を実行する

実装

https://github.com/eshamster/cloutine/blob/master/sync/channel.lisp

問題の実装を見ていきます。パッケージ定義は以下の通りで、ここまでに説明した cl-cont, Blackbird を import しています。また、値を入れる容器として queue を、その排他制御のためにロック関連のシンボルを import しています。

(defpackage cloutine/sync/channel
  (:use :cl)
  (:export :make-channel
           :close-channel
           :<-chan
           :chan<-
           :ch-closed-p)
  (:import-from :cloutine/cloutine
                :clt)
  (:import-from :cloutine/queue
                :init-queue
                :queue
                :dequeue
                :queue-length)
  (:import-from :blackbird
                :attach
                :with-promise)
  (:import-from :bordeaux-threads
                :make-lock
                :acquire-lock
                :release-lock
                :with-lock-held)
  (:import-from :cl-cont
                :defun/cc
                :let/cc))
(in-package :cloutine/sync/channel)

クラス定義は次の通りです。3つのキューを持っているところが特徴的です。

  • queue: チャネルに投入されるデータを管理するキュー
    • 以降の説明で単に「キュー」と言った場合はこのキューのこと
  • queue-resolver-queue: 投入待ち(キューが一杯のとき)を表す Promise の resolver (resolve-fn) を管理するキュー
    • max-lengthnil の場合は利用されない
  • deqeueue-resolver-queue: 受け取り待ち(キューが空のとき)を表す Promise の resolver (resolve-fn) を管理するキュー
(defclass channel ()
  ((queue :initform (init-queue) :reader ch-queue)
   (queue-resolver-queue :initform (init-queue) :reader ch-queue-resolvers)
   (deqeueue-resolver-queue :initform (init-queue) :reader ch-dequeue-resolvers)
   (lock :initform (make-lock "Channel lock") :accessor ch-lock)
   (max-length :initarg :max-resource :reader ch-max-resource) ; param
   (closed-p :initform nil :accessor ch-closed-p)))

(defun make-channel (&optional max-resource)
  (make-instance 'channel :max-resource max-resource))

チャネルから値を取り出す関数 <-chan の実装は次のようになります。解説はソース中にコメントしていますが、色々とつらみが見え隠れします。

(defmacro with-release-lock ((lock) &body body)
  `(unwind-protect
        (progn ,@body)
     (release-lock ,lock)))

(defun/cc <-chan (ch)
  (let ((lock (ch-lock ch))
        (q (ch-queue ch)))
    ;; let/cc が unwind-protect に囲われてしまうとうまく動作しないので、
    ;; (内部に unwind-protect を含む)with-lock-held マクロは利用できない
    (acquire-lock lock)
    (cond ;; キューに値が入っているケース
          ((> (queue-length q) 0)
           (with-release-lock (lock)
             ;; 投入待ちがあれば resolve しておく
             ;; (一瞬 max-length を越えるがロック内で解消するのでまあ良いのでは...)
             (when (> (queue-length (ch-queue-resolvers ch)) 0)
               (funcall (dequeue (ch-queue-resolvers ch)) t))
             (dequeue q)))
          ;; ここから下はキューが空のケース
          ;; チャネルがクローズ済みなら待たずに終わる
          ((ch-closed-p ch)
           (with-release-lock (lock)
             (make-instance 'channel-closed-value)))
          ;; まだチャネルが開いていれば待つ
          (t (let ((promise (with-promise (resolve reject :resolve-fn resolver)
                              ;; 取り出し待ち解決のための resolver を溜めておく
                              (queue (ch-dequeue-resolvers ch) resolver))))
               (release-lock lock)
               (let/cc k
                 ;; 値取り出し後の処理を Promise に attach しておく
                 (attach promise
                         (lambda (val closed-p)
                           ;; closed-p をもらっているが、継続に多値をうまく渡す手段がなく
                           ;; 使えないままになっている...
                           (declare (ignore closed-p))
                           ;; 値を渡して新しい cloutine 上で継続を実行する
                           (clt (funcall k val))))))))))

チャネルへ値を投入する関数 <-chan の実装は次のようになります。こちらも色々つらい。

(defun/cc chan<- (ch value)
  (let ((lock (ch-lock ch))
        (q (ch-queue ch))
        (max-length (ch-max-resource ch)))
    ;; 上に同じく with-lock-held を使わずに頑張る
    (acquire-lock lock)
    (cond ((ch-closed-p ch) ; チャネルがクローズ済み
           (with-release-lock (lock)
             ;; 一応 error を返しているが、継続内ではうまく拾えない...
             (error "Error: Insert a value into a closed channel")))
          ;; 取り出し待ちがある場合
          ((> (queue-length (ch-dequeue-resolvers ch)) 0)
           (let (resolver)
             (with-release-lock (lock)
               (setf resolver (dequeue (ch-dequeue-resolvers ch))))
             ;; キューを介さずに直接値を渡す
             (funcall resolver value t)))
          ;; キューに空きがある = 投入可能な場合
          ((or (null max-length)
               (< (queue-length q) max-length))
           (with-release-lock (lock)
             (queue q value)))
          ;; キューに空きがない場合
          (t (let ((promise (with-promise (resolve reject :resolve-fn resolver)
                              ;; 投入待ち解決のための resolver を溜めておく
                              (queue (ch-queue-resolvers ch) resolver))))
               (release-lock lock)
               (let/cc k
                 ;; 値投入とその後の処理を Promise に attach しておく
                 (attach promise
                         (lambda (open-p)
                           (if open-p
                               (progn
                                 ;; キューに値を入れる
                                 ;; (ロックは resolver を呼び出した側でかけている)
                                 (queue q value)
                                 ;; 新しい cloutine 上で継続を実行する
                                 (clt (funcall k)))
                               (error "Error: Channel is closed when waiting to insert a value"))))))))))

最後に close-channel の実装も示します。残っている resolver を呼び出して全ての待ちを解決済みにします。

(defmethod close-channel ((ch channel))
  "Close channel and broadcast signal to all waiting readers and writers."
  (let ((lock (ch-lock ch)))
    (with-lock-held (lock)
      (setf (ch-closed-p ch) t)
      (dotimes (i (queue-length (ch-queue-resolvers ch)))
        (funcall (dequeue (ch-queue-resolvers ch)) nil))
      (dotimes (i (queue-length (ch-dequeue-resolvers ch)))
        (funcall (dequeue (ch-dequeue-resolvers ch)) nil nil)))))

*1:CCL上ではロックが recursive-lock として実装されているようで、少々雑にロック・アンロックをしても通ってしまうので、SBCL で検査しているといった理由もあります(なお、逆に SBCL には recursive-lock の実装がなさそう)

*2:同じ cloutine が必ず同じスレッドで実行されているのは意図通りではないので何かバグがあるかも...

*3:実際の goroutine のランタイムはもっと賢いことをしているはずですが

*4:抜き出した範囲では利用していないので抜いていますが、ロック関連も import しています。ただし、それらもデバッグ機能作成用であって、同じく本質的な処理には絡んでいません