【Common Lisp】REPL 上で手軽にスレッドの動作を試すためのライブラリを作った
Lisp Advent Calendar 2019 12日目の記事です。Commmon Lisp の REPL (Read Eval Print Loop) 上でお手軽にスレッドの動作を試すための小さなライブラリ repl-thread
を作ってみた話です。
前置き
REPL駆動開発 *1 とも言われるように、Common Lisp 開発においては組み込み関数や自分で書いた関数の動作の確認に REPL を使い倒すことと思います。これの良いところは、きっちりとしたテストや動作確認用のプログラムを書くことなく、思いつくままにパパッと動作を試せるところにあります。
しかし、スレッドの動作を試そうと思った瞬間そう気軽にいかなくなります。スレッドが一度走り始めてしまうと介入できないので、例えばロックを試すために少なくとも2つの関数を用意して、この辺で止まっているときの動作を見たいのでこっちに sleep を入れあっちに sleep を入れ...という具合に計画性が求められるようになります。さらに、走り切った後は一気に流れた print ログを追いかけながら、これがこの順番で出てるから思った通りの順序で動いているはず...というように検証も面倒です。
そうした面倒臭さを多少とも避けて、気軽にあちらのスレッドでこれ動かして、続けてこちらのスレッドでこれを動かして...ということが REPL 上でできるちょっとしたライブラリ repl-threads を作ってみました。
動かしてみる
現状 repl-threads
は quicklisp に登録していないので ql:quickload
できるような位置にソースを持ってくることが必要です。Roswell を利用している場合は下記が簡単です。
$ ros install eshamster/repl-threads
REPL 上で quickload できたら準備完了です。
CL-USER> (ql:quickload :repl-threads)
まずは所望の数のスレッドを立ち上げます(rts
は repl-thraeds
のニックネームです)。
CL-USER> (defparameter *rts* (rts:make-repl-threads 2)) *RTS*
スレッド内で指定の動作をさせるには、with-thread
マクロに上記で作った repl-threads
とスレッド番号(0からの連番)を指定して処理を書くだけです。
CL-USER> (rts:with-thread (*rts* 0) (print :test)) T :TEST
これだけでは本当にスレッド内で動いているのか分からないので適当にロックをかけてみます。なお、bt
は各種 Commmon Lisp 処理系のスレッド操作を抽象化している bordeaux-threads パッケージのニックネームです。repl-threads
が依存しているので改めて quickload する必要はないです。
CL-USER> (defparameter *lock* (bt:make-lock)) *LOCK* CL-USER> (rts:with-thread (*rts* 0) (bt:acquire-lock *lock*) ; まだ誰もロックを取っていないので (print :thread0)) ; ← の print はすぐ実行される T :THREAD0 CL-USER> (rts:with-thread (*rts* 1) (bt:acquire-lock *lock*) ; thread 0 にロックを取られているので (print :thread1) ; ← の print は待たされる (bt:release-lock *lock*)) T CL-USER> (rts:with-thread (*rts* 0) (bt:release-lock *lock*)) ; ← ロックを手放すと... T :THREAD1 ; ← 待たされていた thread 1 の print が実行される
また、スペシャル変数 *thread-index*
にスレッド番号を格納しています。
CL-USER> (defun test () (print rts:*thread-index*)) TEST CL-USER> (rts:with-thread (*rts* 1) (test)) T 1
掃除は一関数で済みます。スレッドを破棄するだけで各種リソースの面倒まで見てくれるものではないですが...。
CL-USER> (rts:destroy-repl-methods *rts*) NIL
中身
概要
基本的なアイディアは単純です。
- 各スレッドはキューを持ち、関数が投げ込まれるのを待ち受ける
- 投げ込まれた関数を順次実行する
REPL から各スレッドのキューに関数を投げ込むことで、任意の処理を後付けで実行できます。
ということで、次のような順序で作っていきます。
queue
: ただのキュー(スレッドアンセーフ)wait-queue
: スレッドセーフなキューrepl-thread
: 2 を利用して前述の動作をするスレッドrepl-threads
: 3 のスレッドを束ねるクラス
queue: ただのキュー
https://github.com/eshamster/repl-threads/blob/master/queue.lisp
特別なところはないただのキューなのでリンクだけ貼ります。head からの取り出しと tail への追加しかできない双方向リストとして実装しています。
こんな感じで利用します。
CL-USER> (use-package :repl-threads/queue) T CL-USER> (defparameter *q* (make-queue)) *Q* CL-USER> (queue *q* 0) 0 CL-USER> (queue *q* 1) 1 CL-USER> (dequeue *q*) 0 CL-USER> (dequeue *q*) 1 CL-USER> (dequeue *q*) NIL
wait-queue: スレッドセーフなキュー
https://github.com/eshamster/repl-threads/blob/master/wait-queue.lisp
先ほどのキューを使って次のようなキューを作ります。
- スレッドセーフである
- dequeue 時にキューが空なら queue されるまで待機する
まずは パッケージの定義ですが、先ほど作ったキューの他 bordeaux-threads
から Lock, Condition Variable 関連の関数・マクロをインポートします。
(defpackage repl-threads/wait-queue (:use :cl) (:export :wait-queue :make-wait-queue :queue :dequeue) (:import-from :repl-threads/queue :make-queue :queue :dequeue :queue-length) (:import-from :bordeaux-threads :make-lock :with-lock-held :make-condition-variable :condition-notify :condition-wait)) (in-package :repl-threads/wait-queue)
クラス定義はこんな感じです。
(defclass wait-queue () ((queue :initform (make-queue) :reader wq-queue) (cond-var :initform (make-condition-variable :name "WAIT QUEUE COND") :reader wq-cond-var) (wait-count :initform 0 :accessor wq-wait-count) (lock :initform (make-lock "WAIT QUEUE LOCK") :reader wq-lock))) (defun make-wait-queue () (make-instance 'wait-queue))
dequeue
メソッドでは、キューに何か入っていればロックを取って値を取り出し、ロックを解放するだけです(when
内を通らないルート)。キューが空の場合は Condition variable を利用してキューに何か入るのを待ちます。 condition-wait
は次のような動作をします。
- 渡されたロックを解放する
- 渡された Condition variable にシグナルが来るまで待つ
- シグナルが来たらロックを取得して次に進む
(defmethod dequeue ((wq wait-queue)) (let ((lock (wq-lock wq)) (q (wq-queue wq))) (with-lock-held (lock) (when (= (queue-length q) 0) (incf (wq-wait-count wq)) ;; wait until some value is queued (condition-wait (wq-cond-var wq) lock)) (assert (> (queue-length q) 0)) (dequeue q))))
次に queue
メソッドですが、基本的にはロックを取ってキューに値を詰め、ロックを解放するだけです。ただし、待ち状態の dequeue
メソッドが存在する場合は、condition-notify
でシグナルを出して待ちを解除します*2。
(defmethod queue ((wq wait-queue) value) (let ((q (wq-queue wq))) (with-lock-held ((wq-lock wq)) (queue q value) (when (> (wq-wait-count wq) 0) (decf (wq-wait-count wq)) (condition-notify (wq-cond-var wq))))))
repl-thread: 投げ込まれた関数を実行するスレッド
https://github.com/eshamster/repl-threads/blob/master/repl-thread.lisp
パッケージの定義は次の通りです。ロック周りは wait-queue
にお任せしているので、bordeaux-threads からはスレッドの生成・破棄関数だけを import します。
(defpackage repl-threads/repl-thread (:use :cl) (:export :make-repl-thread :queue-process :destroy-repl-thread) (:import-from :repl-threads/wait-queue :wait-queue :make-wait-queue :queue :dequeue) (:import-from :bordeaux-threads :make-thread :destroy-thread)) (in-package :repl-threads/repl-thread)
クラス定義と生成、破棄関数は次の通りです。
(defclass repl-thread () ((thread :initarg :thread :reader rt-thread) (process-queue :initarg :process-queue :reader rt-process-queue))) (defun make-repl-thread () (let ((q (make-instance 'wait-queue))) (make-instance 'repl-thread :thread (make-thread (make-repl-thread-process q)) :process-queue q))) (defmethod destroy-repl-thread ((rt repl-thread)) (destroy-thread (rt-thread rt)))
次に、スレッド内で動作する関数を生成する make-repl-thread-process
を見ます。といっても、dequeue
で待ち、関数が来たら funcall
で実行、を繰り返す関数を作るだけです。
(defmethod make-repl-thread-process ((process-queue wait-queue)) (lambda () (loop (funcall (dequeue process-queue)))))
そして、外部からスレッドに関数を供給するのが queue-process
関数です。受け取った関数をキューに詰め込むだけです。一応受け取ったものが関数かどうか程度のチェックはしていますが、引数ありの関数が funcall
されるとスレッドが死んでしまうので、引数のチェックぐらいは追加しても良いかもしれません。
(defmethod queue-process ((rt repl-thread) process) (assert (functionp process)) (queue (rt-process-queue rt) process) t)
repl-threads: repl-thread を束ねる
最後に repl-thread
を束ねた repl-threads
を作ります。パッケージの定義は次の通りです。repl-thread
のラッパーなので、そこへだけ依存しています。
(defpackage repl-threads/repl-threads (:use :cl) (:export :make-repl-threads :queue-process-to :with-thread :destroy-repl-threads :*thread-index*) (:import-from :repl-threads/repl-thread :make-repl-thread :queue-process :destroy-repl-thread)) (in-package :repl-threads/repl-threads)
クラスとしては repl-thread
の配列を持っているだけです。生成と破棄も素直に指定個数の repl-thread
の生成と破棄をするだけです。
(defclass repl-threads () ((threads :initarg :threads :reader rts-threads))) (defun make-repl-threads (n) (let ((threads (loop :for i :from 0 :below n :collect (make-repl-thread)))) (make-instance 'repl-threads :threads (make-array n :initial-contents threads)))) (defmethod destroy-repl-threads ((rts repl-threads)) (let ((threads (rts-threads rts))) (dotimes (i (length threads)) (destroy-repl-thread (aref threads i)))))
queue-process-to
は、インデックスで指定された repl-thread
に対して queue-process
を呼びます。
(defmethod queue-process-to ((rts repl-threads) thread-index process) (let ((threads (rts-threads rts))) (assert (< thread-index (length threads))) (queue-process (aref threads thread-index) process)))
これをラップしているのが with-thread
マクロです。単純なラッパーとして動作するのに加えて、*thread-index*
にスレッド番号を束縛する役割もあります。
(defvar *thread-index* -1) (defmacro with-thread ((rts thread-index) &body body) (let ((g-thread-index (gensym "THREAD-INDEX"))) `(let ((,g-thread-index ,thread-index)) (queue-process-to ,rts ,g-thread-index (lambda () (let ((*thread-index* ,g-thread-index)) (declare (ignorable *thread-index*)) ,@body))))))
おまけ: 色々試してみる
せっかく作ったので色々試してみます。
Recursive Lock
Recursive Lock は通常のロックと異なり、同一スレッド内であれば何度でも取得できるロックになります(解放はロックと同じ回数だけ必要)。なお、これは SBCL には実装されていないようです。下記は CCL 1.11.5 での動作例になります。
CL-USER> (defparameter *rts* (rts:make-repl-threads 2)) *RTS* CL-USER> (defparameter *rec-lock* (bt:make-recursive-lock)) *REC-LOCK* CL-USER> (rts:with-thread (*rts* 0) (bt:acquire-recursive-lock *rec-lock*) (print :thread0-1)) T :THREAD0-1 CL-USER> (rts:with-thread (*rts* 0) (bt:acquire-recursive-lock *rec-lock*) ; 同じスレッド内なのでもう一度取れる (print :thread0-2)) T :THREAD0-2 CL-USER> (rts:with-thread (*rts* 1) (bt:acquire-recursive-lock *rec-lock*) ; 別スレッドなので待たされる (print :thread1-1)) T CL-USER> (rts:with-thread (*rts* 0) (bt:release-recursive-lock *rec-lock*)) T CL-USER> (rts:with-thread (*rts* 0) (bt:release-recursive-lock *rec-lock*)) T :THREAD1-1 ; 2回解放するとスレッド1が進めるようになる
Semaphore
セマフォ(Semaphore)は資源数に限りがある対象に触る場合に、同時に N 個のスレッドしか触れないようにする、といった操作を実現するために利用するものです。
CL-USER> (defparameter *rts* (rts:make-repl-threads 3)) *RTS* ;; 資源数2のセマフォをつくる CL-USER> (defparameter *sem* (bt:make-semaphore :count 2)) *SEM* CL-USER> (rts:with-thread (*rts* 0) (bt:wait-on-semaphore *sem*) (print :thread0)) T :THREAD0 CL-USER> (rts:with-thread (*rts* 1) (bt:wait-on-semaphore *sem*) (print :thread1)) T :THREAD1 ;; 既に thread 0, 1 が資源を占有しているので待たされる CL-USER> (rts:with-thread (*rts* 2) (bt:wait-on-semaphore *sem*) (print :thread2)) T CL-USER> (rts:with-thread (*rts* 0) (bt:signal-semaphore *sem*)) T :THREAD2 ; thread 0 が資源を手放したので thread 2 が動いた
Condition Variable
Condition Variable は wait-queue
の実装でも利用したように、「何らかの条件が成立するまで待つ」(wait-queue
であれば「キューに何か入るまで待つ」)用途で利用できます。ただし、こうした用途に適しているというだけで、Condition Variable 自体が何かの条件処理をしてくれる訳ではありません。
再掲すると、待ち側である condition-wait
は下記のような動作をします。待ちを解除する condition-notify
は単にシグナルを送るだけです。
- 渡されたロックを解放する
- 渡された Condition variable にシグナルが来るまで待つ
- シグナルが来たらロックを取得して次に進む
試してみます。
CL-USER> (defparameter *rts* (rts:make-repl-threads 2)) *RTS* CL-USER> (defparameter *lock* (bt:make-lock)) *LOCK* CL-USER> (defparameter *cond-var* (bt:make-condition-variable)) *COND-VAR* CL-USER> (rts:with-thread (*rts* 0) (bt:acquire-lock *lock*) (print :thread0-before) ;; ここでロックが解除される (bt:condition-wait *cond-var* *lock*) (print :thread0-after) (bt:release-lock *lock*)) T :THREAD0-BEFORE CL-USER> (rts:with-thread (*rts* 1) ;; ロックを取れる (bt:acquire-lock *lock*) (bt:condition-notify *cond-var*) (print :thread1)) T :THREAD1 ;; ※condition-wait はロックを再取得できないのでまだ待たされる CL-USER> (rts:with-thread (*rts* 1) (bt:release-lock *lock*)) T :THREAD0-AFTER ; シグナルを受け取り、ロックも解除されたので進めた
wait queue
せっかくなので部品として作った wait-queue
も試してみます。
CL-USER> (defparameter *rts* (rts:make-repl-threads 2)) *RTS* CL-USER> (use-package :repl-threads/wait-queue) T CL-USER> (defparameter *wq* (make-wait-queue)) *WQ* CL-USER> (rts:with-thread (*rts* 0) (queue *wq* 0) (queue *wq* 1)) T CL-USER> (rts:with-thread (*rts* 1) (print (dequeue *wq*)) (print (dequeue *wq*)) ;; ↓2つしか入っていなかったので待たされる (print (dequeue *wq*))) T 0 1 CL-USER> (rts:with-thread (*rts* 0) (queue *wq* 2)) T 2 ; ← 待たされたものが動いた