【Common Lisp】Go言語の goroutine っぽいものを作ってみたかった話
Lisp Advent Calendar 2019 23日目の記事です。
近頃仕事で書いている Go 言語の勉強も兼ねて「Go言語による並行処理」という本を買ったので Lisp を書いていくぞという内容です。goroutine っぽい cloutine なるものを Common Lisp 上で作ってみようという試みです。
できたもの(と制限)
- 隠蔽された複数の実スレッドにいわゆる Green Thread(cloutine と命名)を投げ込んで並行処理を実現する
- 非同期なチャネル
というようにできたのは一部分だけです。また、できたものについても下記の制限があります。
- 色々実用に耐えない
- SBCL 上ではさらに実用的に耐えない
- チャネルについてはかろうじて単独で動作するだけで select のような高度で実用的な機能はできていない
技術要素
次のようなものを利用して作成しました。
- マルチスレッド: bordeaux-threads
- 限定継続: cl-cont
- 簡単な使い方について「cl-cont: 限定継続」の項で簡単に解説しているので、興味のある方はそこだけ見てみると良いかもしれません
- 非同期処理: Blackbird: いわゆる Promise を実現するライブラリ
参考
- green-thread
- シングルスレッド上での Green Thread を実現するライブラリです
- 利用はしていませんが実装の参考にしています。
bordeaux-threads
以外の技術要素は同じです
動かしてみる
現状 cloutine
は quicklisp に登録していないので ql:quickload
できるような位置にソースを持ってくることが必要です。Roswell を利用している場合は下記が簡単です。
$ ros install eshamster/cloutine
まずは次の Go のコードに相当する動作を試してみます。
package main import ( "fmt" "time" ) func main() { ch := make(chan int) // go で goroutine を1つ立てる go func() { // チャネルがクローズされるまで値を取り出して出力し続ける for { val, ok := <-ch if !ok { break } fmt.Printf("%d\n", val) } }() go func() { // 0~4までの値をチャネルに投入した後、クローズする defer close(ch) for i := 0; i < 5; i++ { ch <- i } }() // 本来 wait group でやるところをサボり time.Sleep(1) }
次のようになります。コメントに既につらみが見えています。
CL-USER> (ql:quickload :cloutine :silent t) (:CLOUTINE) CL-USER> (use-package :cloutine) T ;; goroutine ぽいもの = cloutine を実行する実スレッドを2つ立ち上げる初期化処理 CL-USER> (init-cloutine 2) #<CLOUTINE/REAL-THREADS::REAL-THREADS #x3020011BEDDD> CL-USER> (let ((chan (make-channel))) ;; clt: cloutine を作成するマクロ (clt (loop ;; チャネルに値が入ってくるのを待つ (let ((x (<-chan chan))) ;; 多値で「値, open-p」みたいに返してクローズ判定したいが、 ;; cl-cont との組み合わせがうまくいかないので苦しいクローズ判定... (when (channel-closed-value-p x) (return)) (print x)))) (clt (dotimes (i 5) ;; 値をチャネルに入れる (chan<- chan i)) ;; defer 相当のことをするために unwind-protect を利用するべきだが ;; cl-cont が対応していないので普通に close... (close-channel chan))) NIL 0 1 2 3 4
念のため、init-cloutine
の引数に指定する実スレッドの数は同時に立てられる cloutine の数を制限するものではないです。次の通り、3つの実スレッドに対して5 (+1) 個の cloutine を同時に立てることができています*2。
CL-USER> (ql:quickload :cloutine :silent t) (:CLOUTINE) CL-USER> (use-package :cloutine) T ;; bt = bordeaux-threads のニックネーム CL-USER> (defparameter *lock* (bt:make-lock)) *LOCK* CL-USER> (init-cloutine 3) #<CLOUTINE/REAL-THREADS::REAL-THREADS #x3020011BECCD> CL-USER> (let ((chan (make-channel))) (dotimes (i 5) (let ((i i)) (clt (loop ;; スレッドが切り代わり易いように適当に sleep を入れて動作を乱す (sleep 0.001) (let ((x (<-chan chan))) (when (channel-closed-value-p x) (return)) (bt:with-lock-held (*lock*) ;; *real-thread-index* は実スレッドの番号 (format t "thread: ~D, cloutine: ~D, value: ~D~%" *real-thread-index* i x))))))) (clt (dotimes (i 15) (chan<- chan i) ;; 上に同じく適当に sleep を入れる (when (= (mod i 5) 0) (sleep 0.001))) (close-channel chan))) NIL thread: 2, cloutine: 1, value: 0 thread: 0, cloutine: 0, value: 1 thread: 2, cloutine: 1, value: 5 thread: 0, cloutine: 2, value: 2 thread: 2, cloutine: 1, value: 7 thread: 0, cloutine: 2, value: 8 thread: 1, cloutine: 3, value: 3 thread: 2, cloutine: 1, value: 9 thread: 0, cloutine: 2, value: 10 thread: 2, cloutine: 1, value: 11 thread: 1, cloutine: 3, value: 12 thread: 0, cloutine: 2, value: 13 thread: 2, cloutine: 1, value: 14 thread: 1, cloutine: 4, value: 4 thread: 0, cloutine: 0, value: 6
今回参考にした「Go言語による並行処理」第6章「ゴルーチンとGoランタイム」P.206 に次のような説明用のコードがあるので、同等のものを動かしてみます。
fib = func(n int) <-chan int { result := make(chan int) go func() { defer close(result) if n <= 2 { result <- 1 return } result <- <-fib(n-1) + <-fib(n-2) }() return result } fmt.Printf("fib(4) = %d", <-fib(4))
次のようになります。どうにか動作はするようです。
CL-USER> (ql:quickload :cloutine :silent t) (:CLOUTINE) CL-USER> (use-package :cloutine) T CL-USER> (defun fib (n) (let ((result (make-channel))) (clt (if (<= n 2) (chan<- result 1) (chan<- result (+ (<-chan (fib (- n 1))) (<-chan (fib (- n 2)))))) ;; 前述の通り unwind-protect を使えないので普通に close... (close-channel result)) result)) ;; warnings は見なかったことにする ;Compiler warnings : ; In an anonymous lambda form inside an anonymous lambda form inside an anonymous lambda form inside an anonymous lambda form inside an anonymous lambda form inside FIB: Undefined function FIB ; In an anonymous lambda form inside an anonymous lambda form inside FIB: Undefined function FIB FIB CL-USER> (init-cloutine 2) #<CLOUTINE/REAL-THREADS::REAL-THREADS #x302000F9D86D> ;; channel を利用するには with-call/cc で囲う必要がある(clt マクロも内部で囲っている) CL-USER> (cont:with-call/cc (print (<-chan (fib 4)))) #<PROMISE name: "attach: PROMISE" finished: NIL errored: NIL forward: NIL #x302000F9DA9D> 3
実装の基本的なアイディア
cloutine の生成と実行
goroutine っぽいもの = cloutine の裏で動作する実スレッドはそれぞれ下記のような処理を行います。この辺りは「Go言語による並行処理」第6章「ゴルーチンとGoランタイム」を参考にしています。
- 各スレッドはキューを一つずつ持つ
- 全てのキューが空のとき全てのスレッドは待ち状態になる
- いずれかのキューに関数が入っているときスレッドの待ちが解除され次の動作をする
- 自身のキューを調べ、関数が入っていればそれを実行する
- 1で取得できない場合、他のキューを順に調べて関数を見つけ次第それを実行する
したがって、cloutine を生成する clt
マクロの基本的な内容は、渡された処理を lambda
で包んで関数化し、上記のキューに詰め込むだけです。基本的には各スレッドは自身のキューに関数を詰めて自身で実行するのですが、「他のキューを順に調べて関数を見つけ次第それを実行する」の動作があるため、空いているスレッドは人のキュー内の関数を盗んで実行することができます。
非同期なチャネル
上記の通り、cloutine の基本的な動作だけであればそれほど面倒なところはない *3 のですが、非同期なチャネルの実装が割と厄介です。「技術要素」の項で述べた限定継続ライブラリ cl-cont と、Promise ライブラリ Blackbird はいずれもここで出てきます。
分かり易く説明できる気がしないのですが、非同期な待ちは次のように実現します(※待ちが発生しない限りはほぼ単なるスレッドセーフなキュー)。以下は取り出し待ちの例ですが、チャネルのサイズを制限した場合の投入待ちの処理もおおむね同じになります。
- チャネル操作以降の処理を「継続」= 特定の形式の関数として取り出す(cl-cont)
- Promise を作成し、解決するための関数をチャネルに保管する(Blackbird)
- 2 で作成した Promise に、解決時の処理として 1 の継続を実行する処理を登録する
- チャネルに値を投入する際に 2 で保管した関数があれば呼び出して Promise を解決する
- ここで、3で登録した継続が実行される
実装
queue: ただのキュー
https://github.com/eshamster/cloutine/blob/master/queue.lisp
特別なところはないただのキューなのでリンクだけ貼ります。head からの取り出しと tail への追加しかできない双方向リストとして実装しています。
こんな感じで利用します。
CL-USER> (use-package :cloutine/queue) T CL-USER> (defparameter *q* (make-queue)) *Q* CL-USER> (queue *q* 0) 0 CL-USER> (queue *q* 1) 1 CL-USER> (dequeue *q*) 0 CL-USER> (dequeue *q*) 1 CL-USER> (dequeue *q*) NIL
この部分は12日目の記事「【Common Lisp】REPL 上で手軽にスレッドの動作を試すためのライブラリを作った」の repl-threads と全く同じです。というより、そもそも repl-threads は cloutine を書いていて思いついたものです。ついでに言うと、6日目の記事「cl-base + rove + Travis CI でテストする」は cloutine のテストで困ったことからできたものです。
multi-queue
https://github.com/eshamster/cloutine/blob/master/multi-queue.lisp
multi-queue
は次のような特徴を持つ複数のキューのまとまりです。
- 複数のキューを配列として持つ
- 全てのキューが空のとき dequeue 操作は待たされる
- いずれかのキューにデータがあるとき dequeue は次のように動作する
パッケージ定義を見ると、上記に必要な部品であるキュー・ロック・セマフォを import していることが分かります。
(defpackage cloutine/multi-queue (:use :cl) (:export :multi-queue :init-multi-queue :queue-into :dequeue-from) (:import-from :cloutine/queue :init-queue :queue :dequeue) (:import-from :bordeaux-threads :make-lock :make-semaphore :with-lock-held :wait-on-semaphore :signal-semaphore)) (in-package :cloutine/multi-queue)
クラス定義は次のようになります。複数のキューと各キューに対応する複数のロック、そして1つのセマフォを持ちます。
(defclass multi-queue () ((queues :initarg :queues :accessor mq-queues) (locks :initarg :locks :accessor mq-locks) (semaphore :initform (make-semaphore) :accessor mq-semaphore))) (defun init-multi-queue (n) (let ((queues (make-array n)) (locks (make-array n))) (dotimes (i n) (setf (aref queues i) (init-queue) (aref locks i) (make-lock))) (make-instance 'multi-queue :queues queues :locks locks)))
queue-into
では指定したインデックスのキューへ値を入れ、セマフォをインクリメントします。
(defmethod queue-into ((mq multi-queue) queue-index value) (assert (< queue-index (queue-count mq))) (with-lock-held ((aref (mq-locks mq) queue-index)) (queue (aref (mq-queues mq) queue-index) value)) (signal-semaphore (mq-semaphore mq)))
dequeue-from
では前述の、指定されたインデックスのキューを優先した dequeue 操作を行います。ただし、queue 操作がない限りはセマフォがインクリメントされることはないので、全てのキューが空であれば入口で待たされます。
(defmethod dequeue-from ((mq multi-queue) prior-queue-index) (assert (< prior-queue-index (queue-count mq))) ;; 全てのキューが空なら待つ (wait-on-semaphore (mq-semaphore mq)) (macrolet ((try-dequeue (index) `(with-lock-held ((aref (mq-locks mq) ,index)) (let ((value (dequeue (aref (mq-queues mq) ,index)))) (when value (return-from dequeue-from value)))))) ;; まずは指定されたインデックスのキューを調べる (try-dequeue prior-queue-index) ;; なければ別のキューを順番に見て値を取り出す (dotimes (i (queue-count mq)) (unless (= i prior-queue-index) (try-dequeue i))) ;; 上記で取れないケースはあり得ないのでエラー (error "No value is there.")))
real-threads
https://github.com/eshamster/cloutine/blob/master/real-threads.lisp
real-threads
はその名の通り実スレッドです。 multi-queue
を利用して各スレッドは次のように動作します。いずれの動作も multi-queue
の各動作に対応しています。
- 各スレッドは
multi-queue
内に対応するキューを1つずつ持つ - 全てのキューが空のとき全てのスレッドは待ち状態になる
- いずれかのキューに関数が入っているときスレッドの待ちが解除され次の動作をする
- 自身のキューを調べ、関数が入っていればそれを実行する
- 1で取得できない場合、他のキューを順に調べて関数を見つけ次第それを実行する
上記を実現するため、先程の multi-queue
の他、スレッド作成・破棄の関数を import します。セマフォ関連も import していますが、これは初期化時のみの利用で、本質的な処理には絡んでいません*4。
(defpackage cloutine/real-threads (:use :cl) (:export :start-real-threads :destroy-real-threads :queue-promise) (:import-from :cloutine/multi-queue :multi-queue :init-multi-queue :queue-into :dequeue-from) (:import-from :bordeaux-threads :make-thread :destroy-thread :make-semaphore :wait-on-semaphore :signal-semaphore)) (in-package :cloutine/real-threads)
クラスの定義は次の通りです。
(defclass real-thread () ((thread :initarg :instance :accessor thread-instance) (index :initarg :index :accessor thread-index))) (defclass real-threads () ((threads :initarg :rt-array :accessor threads-array) (multi-queue :initarg :mq :accessor threads-mq) (destroied-p :initform nil :accessor threads-destroied-p)))
初期化は引数で指定されたスレッド数に応じて、multi-queue
を初期化し、後述の process-thread
関数を実行する実スレッドを作成します。一応初期化が完了するまではセマフォで process-thread
が走らないようにしています。
(defun start-real-threads (n) (let* ((mq (init-multi-queue n)) (rt-arr (make-array n)) (rts (make-instance 'real-threads :mq mq)) (sem-to-wait-start (make-semaphore))) (dotimes (i n) (let ((rt (make-instance 'real-thread :index i))) (setf (thread-instance rt) (make-thread (lambda () (wait-on-semaphore sem-to-wait-start) (process-thread rt rts)))) (setf (aref rt-arr i) rt))) (setf (threads-array rts) rt-arr) (signal-semaphore sem-to-wait-start :count n) rts)) ;; destroy-real-threads は略
次がその process-thread
で、multi-queue
から関数を取り出せたらそれを実行する、を繰り返すだけです。また、スペシャル変数 *real-thread-index*
に自身のスレッド番号を束縛します。
(defvar *real-thread-index* nil) (defmethod process-thread ((rt real-thread) (rts real-threads)) (loop (let ((index (thread-index rt))) ;; dequeue-from は全てのキューが空であるうちは待たされる (let ((process (dequeue-from (threads-mq rts) index))) (assert (functionp process)) (let ((*real-thread-index* index)) (funcall process))))))
スレッドの外もしくは中からこのキューに関数を投げ込むメソッドが queue-process
です。スレッド内からの場合は自身のインデックスのキューに投げます。そうでない場合は取りあえず 0 番目のキューに投げています(が乱数で選択した方が良い気もします)。
(defmethod queue-process ((rts real-threads) (process function)) (when (threads-destroied-p rts) (error "The thread has been destroied.")) (queue-into (threads-mq rts) (if *real-thread-index* *real-thread-index* 0) process))
cloutine
https://github.com/eshamster/cloutine/blob/master/cloutine.lisp
本題の cloutine の実装ですが、処理の実体は real-threads が担っているので、ほぼそのラッパー程度の役割です。次のように real-threads の他に、後のチャネルの実装の仕込みとして cl-cont のシンボルをいくつか import します。
(defpackage :cloutine/cloutine (:use :cl) (:export :init-cloutine :destroy-cloutine :cloutine :clt) (:import-from :cloutine/real-threads :start-real-threads :destroy-real-threads :queue-process) (:import-from :cl-cont :with-call/cc :without-call/cc)) (in-package :cloutine/cloutine)
初期化関数は次の通りで、グローバルに real-threads を1つ作成します。
(defvar *real-threads* nil) (defun init-cloutine (n) (setf *real-threads* (start-real-threads n))) ;; ※destroy-cloutine は省略
cloutine を作成する cloutine
マクロとそのエイリアスの clt
マクロは次のようになります。ほぼ body
部を lambda
で包んで real-threads の queue-process
に渡すだけです。with-call/cc
, without-call/cc
については次のチャネルの項を参照してください。
(defmacro cloutine (&body body) `(queue-process *real-threads* (without-call/cc (lambda () (with-call/cc ,@body))))) (defmacro clt (&body body) `(cloutine ,@body))
channel
チャネルの実装の前に、限定継続ライブラリ cl-cont と Promise ライブラリ Blackbird について簡単に見ていきます。
cl-cont: 限定継続
限定継続についてざっくり感覚的な理解を得るには κeen さんの Common Lispで限定継続と遊ぶ などを見ながら cl-cont をいじってみるのが良いのかなと思います。自身も継続を使ってみるのは始めてでざっくりとした理解しかありませんが...。
ここでは継続とは何かという話は置いて、どの様な動きをするのかいくつかの例で見てみます。まずは単純に継続 = 「以降の処理」を取り出す様子を見てみます。
;; ※ややこしいので以降 print の出力のみ記載し、REPLの出力は省略します CL-USER> (defparameter *cont* nil) CL-USER> (cont:with-call/cc (print :start) ;; let/cc 以降の処理 = 継続が k に束縛される (cont:let/cc k (print :let-start) (setf *cont* k) (print :let-end)) ;; ↓はまだ処理されない (print :end)) :START :LET-START :LET-END ;; 継続は関数として表されるので funcall できる CL-USER> (funcall *cont*) :END ;; もちろん何度でも呼べる CL-USER> (funcall *cont*) :END
この with-call/cc
で defun
を囲ったものが defun/cc
で、次のように使えます。始めて試してみたときは、うわ本当に関数外の処理(継続)を引き込んで好きに扱える!と中々シビれました。
CL-USER> (defparameter *cont* nil) CL-USER> (cont:defun/cc test-cc () (cont:let/cc k (setf *cont* k))) CL-USER> (cont:with-call/cc (print :start) (test-cc) (print :end)) :START CL-USER> (funcall *cont*) :END
さて、限定継続として取り出される関数は0個または1個の引数を取ります。ここまでは0引数の例でしたが、チャネルからの値の取り出し待ちをする継続は値を待っているので、そこに引数として渡してあげる必要があります。次のようにチャネルにまだ値がなかった場合の値取り出しの動作を模してみます。
CL-USER> (defparameter *cont* nil) CL-USER> (cont:defun/cc like-waiting-chan () (cont:let/cc k (setf *cont* k))) CL-USER> (cont:with-call/cc (print :start) ;; まだチャネルに値が入っていないので残りの処理はどこか(*cont*)に格納しておいて ;; いったん処理は完了させる...という体 (print (like-waiting-chan)) (print :end)) :START ;; どこからかチャネルに値 100 が供給された...という体 CL-USER> (funcall *cont* 100) 100 :END
最後に実装時の細かい部分になりますが、 without-call/cc
に触れます。ここまでに見たように、単純に with-call/cc
を利用すると、 let/cc
以降の処理が全て束縛されてしまいます。実際にはもう少し範囲を絞りたいのですが、そのときの区切りとして利用するのが without-call/cc
です。次のように使います。
CL-USER> (cont:with-call/cc (print :outer-start) (cont:without-call/cc (cont:with-call/cc (print :start) (cont:let/cc k (setf *cont* k)) ;; ここはまだ待って欲しい (print :end))) ;; ここはすぐ処理されて欲しい (print :outer-end)) :OUTER-START :START :OUTER-END CL-USER> (funcall *cont*) :END
Blackbird: Promise
Blackbird はいわゆる Promise(Future とも)を実現するライブラリで、非同期処理を実現するための部品として利用できます。次は resolve 済みの Promise を生成するだけの余り意味のない例です。
CL-USER> (ql:quickload :blackbird :silent t) (:BLACKBIRD) ;; bb = blackbird CL-USER> (let ((promise (bb:with-promise (resolve reject) (resolve 100)))) ;; promise が resolve されない限り attach された関数は実行されない ;; 今回は resolve 済みなのですぐ実行される (bb:attach promise (lambda (x) (format t "~&resolved: ~D~%" x)))) resolved: 100 ;; ↓式自体は promise クラスを返している #<PROMISE name: "attach: PROMISE" finished: T errored: NIL forward: NIL #x302000E6E2FD>
実際に非同期処理の文脈で利用するには大きく2つの方法があります。
- cl-async のような非同期処理と組み合わせる
- cl-async は libuv ベースの非同期処理ライブラリです
- Blackbird 冒頭の例で紹介されている方法です
- Promise 生成時に resolve 処理を外に保管しておいて後から resolve してもらう
今回とるのは後者の方法です。ここで、with-promise
の resolve
(と reject
)は macrolet
として定義されているため、そのまま外に渡すことはできません。こうした用途のためにはキーワード引数の resolve-fn
(reject-fn
)が用意されているので、そちらを外に渡します。
CL-USER> (let* (resolver (promise (bb:with-promise (resolve reject :resolve-fn resolve-fn) ;; resolve-fn を外に渡す (setf resolver resolve-fn)))) ;; promise が resolve されていないのですぐには実行されない ;; 名前の通り、resolve 時にやって欲しい処理を promise に atttach するだけ (bb:attach promise (lambda (x) (format t "~&resolved: ~D~%" x))) (print :after-attach) ;; ここで promise が resolve されるので上記の lambda も実行れる (funcall resolver 999) (print :after-resolve)) :AFTER-ATTACH resolved: 999 :AFTER-RESOLVE :AFTER-RESOLVE ; ← ※REPL の出力
これと cl-cont と組み合わせることでチャネルの待ちを実現できそうです。
- チャネルに値が入っていないときは、
resolve-fn
を保管して Promise を生成し、残りの処理 = 継続を attach しておく - チャネルに値を入れる際に保管された
resolve-fn
に値を渡して attach された継続を実行する
実装
https://github.com/eshamster/cloutine/blob/master/sync/channel.lisp
問題の実装を見ていきます。パッケージ定義は以下の通りで、ここまでに説明した cl-cont, Blackbird を import しています。また、値を入れる容器として queue を、その排他制御のためにロック関連のシンボルを import しています。
(defpackage cloutine/sync/channel (:use :cl) (:export :make-channel :close-channel :<-chan :chan<- :ch-closed-p) (:import-from :cloutine/cloutine :clt) (:import-from :cloutine/queue :init-queue :queue :dequeue :queue-length) (:import-from :blackbird :attach :with-promise) (:import-from :bordeaux-threads :make-lock :acquire-lock :release-lock :with-lock-held) (:import-from :cl-cont :defun/cc :let/cc)) (in-package :cloutine/sync/channel)
クラス定義は次の通りです。3つのキューを持っているところが特徴的です。
queue
: チャネルに投入されるデータを管理するキュー- 以降の説明で単に「キュー」と言った場合はこのキューのこと
queue-resolver-queue
: 投入待ち(キューが一杯のとき)を表す Promise の resolver (resolve-fn) を管理するキューmax-length
が nil の場合は利用されない
deqeueue-resolver-queue
: 受け取り待ち(キューが空のとき)を表す Promise の resolver (resolve-fn) を管理するキュー
(defclass channel () ((queue :initform (init-queue) :reader ch-queue) (queue-resolver-queue :initform (init-queue) :reader ch-queue-resolvers) (deqeueue-resolver-queue :initform (init-queue) :reader ch-dequeue-resolvers) (lock :initform (make-lock "Channel lock") :accessor ch-lock) (max-length :initarg :max-resource :reader ch-max-resource) ; param (closed-p :initform nil :accessor ch-closed-p))) (defun make-channel (&optional max-resource) (make-instance 'channel :max-resource max-resource))
チャネルから値を取り出す関数 <-chan
の実装は次のようになります。解説はソース中にコメントしていますが、色々とつらみが見え隠れします。
(defmacro with-release-lock ((lock) &body body) `(unwind-protect (progn ,@body) (release-lock ,lock))) (defun/cc <-chan (ch) (let ((lock (ch-lock ch)) (q (ch-queue ch))) ;; let/cc が unwind-protect に囲われてしまうとうまく動作しないので、 ;; (内部に unwind-protect を含む)with-lock-held マクロは利用できない (acquire-lock lock) (cond ;; キューに値が入っているケース ((> (queue-length q) 0) (with-release-lock (lock) ;; 投入待ちがあれば resolve しておく ;; (一瞬 max-length を越えるがロック内で解消するのでまあ良いのでは...) (when (> (queue-length (ch-queue-resolvers ch)) 0) (funcall (dequeue (ch-queue-resolvers ch)) t)) (dequeue q))) ;; ここから下はキューが空のケース ;; チャネルがクローズ済みなら待たずに終わる ((ch-closed-p ch) (with-release-lock (lock) (make-instance 'channel-closed-value))) ;; まだチャネルが開いていれば待つ (t (let ((promise (with-promise (resolve reject :resolve-fn resolver) ;; 取り出し待ち解決のための resolver を溜めておく (queue (ch-dequeue-resolvers ch) resolver)))) (release-lock lock) (let/cc k ;; 値取り出し後の処理を Promise に attach しておく (attach promise (lambda (val closed-p) ;; closed-p をもらっているが、継続に多値をうまく渡す手段がなく ;; 使えないままになっている... (declare (ignore closed-p)) ;; 値を渡して新しい cloutine 上で継続を実行する (clt (funcall k val))))))))))
チャネルへ値を投入する関数 <-chan
の実装は次のようになります。こちらも色々つらい。
(defun/cc chan<- (ch value) (let ((lock (ch-lock ch)) (q (ch-queue ch)) (max-length (ch-max-resource ch))) ;; 上に同じく with-lock-held を使わずに頑張る (acquire-lock lock) (cond ((ch-closed-p ch) ; チャネルがクローズ済み (with-release-lock (lock) ;; 一応 error を返しているが、継続内ではうまく拾えない... (error "Error: Insert a value into a closed channel"))) ;; 取り出し待ちがある場合 ((> (queue-length (ch-dequeue-resolvers ch)) 0) (let (resolver) (with-release-lock (lock) (setf resolver (dequeue (ch-dequeue-resolvers ch)))) ;; キューを介さずに直接値を渡す (funcall resolver value t))) ;; キューに空きがある = 投入可能な場合 ((or (null max-length) (< (queue-length q) max-length)) (with-release-lock (lock) (queue q value))) ;; キューに空きがない場合 (t (let ((promise (with-promise (resolve reject :resolve-fn resolver) ;; 投入待ち解決のための resolver を溜めておく (queue (ch-queue-resolvers ch) resolver)))) (release-lock lock) (let/cc k ;; 値投入とその後の処理を Promise に attach しておく (attach promise (lambda (open-p) (if open-p (progn ;; キューに値を入れる ;; (ロックは resolver を呼び出した側でかけている) (queue q value) ;; 新しい cloutine 上で継続を実行する (clt (funcall k))) (error "Error: Channel is closed when waiting to insert a value"))))))))))
最後に close-channel
の実装も示します。残っている resolver を呼び出して全ての待ちを解決済みにします。
(defmethod close-channel ((ch channel)) "Close channel and broadcast signal to all waiting readers and writers." (let ((lock (ch-lock ch))) (with-lock-held (lock) (setf (ch-closed-p ch) t) (dotimes (i (queue-length (ch-queue-resolvers ch))) (funcall (dequeue (ch-queue-resolvers ch)) nil)) (dotimes (i (queue-length (ch-dequeue-resolvers ch))) (funcall (dequeue (ch-dequeue-resolvers ch)) nil nil)))))
*1:CCL上ではロックが recursive-lock として実装されているようで、少々雑にロック・アンロックをしても通ってしまうので、SBCL で検査しているといった理由もあります(なお、逆に SBCL には recursive-lock の実装がなさそう)
*2:同じ cloutine が必ず同じスレッドで実行されているのは意図通りではないので何かバグがあるかも...
*3:実際の goroutine のランタイムはもっと賢いことをしているはずですが
*4:抜き出した範囲では利用していないので抜いていますが、ロック関連も import しています。ただし、それらもデバッグ機能作成用であって、同じく本質的な処理には絡んでいません