【Common Lisp】REPL 上で手軽にスレッドの動作を試すためのライブラリを作った

Lisp Advent Calendar 2019 12日目の記事です。Commmon Lisp の REPL (Read Eval Print Loop) 上でお手軽にスレッドの動作を試すための小さなライブラリ repl-thread を作ってみた話です。

github.com



前置き

REPL駆動開発 *1 とも言われるように、Common Lisp 開発においては組み込み関数や自分で書いた関数の動作の確認に REPL を使い倒すことと思います。これの良いところは、きっちりとしたテストや動作確認用のプログラムを書くことなく、思いつくままにパパッと動作を試せるところにあります。

しかし、スレッドの動作を試そうと思った瞬間そう気軽にいかなくなります。スレッドが一度走り始めてしまうと介入できないので、例えばロックを試すために少なくとも2つの関数を用意して、この辺で止まっているときの動作を見たいのでこっちに sleep を入れあっちに sleep を入れ...という具合に計画性が求められるようになります。さらに、走り切った後は一気に流れた print ログを追いかけながら、これがこの順番で出てるから思った通りの順序で動いているはず...というように検証も面倒です。

そうした面倒臭さを多少とも避けて、気軽にあちらのスレッドでこれ動かして、続けてこちらのスレッドでこれを動かして...ということが REPL 上でできるちょっとしたライブラリ repl-threads を作ってみました。

動かしてみる

現状 repl-threads は quicklisp に登録していないので ql:quickload できるような位置にソースを持ってくることが必要です。Roswell を利用している場合は下記が簡単です。

$ ros install eshamster/repl-threads

REPL 上で quickload できたら準備完了です。

CL-USER> (ql:quickload :repl-threads)

まずは所望の数のスレッドを立ち上げます(rtsrepl-thraeds のニックネームです)。

CL-USER> (defparameter *rts* (rts:make-repl-threads 2))
*RTS*

スレッド内で指定の動作をさせるには、with-thread マクロに上記で作った repl-threads とスレッド番号(0からの連番)を指定して処理を書くだけです。

CL-USER> (rts:with-thread (*rts* 0)
           (print :test))
T
:TEST

これだけでは本当にスレッド内で動いているのか分からないので適当にロックをかけてみます。なお、bt は各種 Commmon Lisp 処理系のスレッド操作を抽象化している bordeaux-threads パッケージのニックネームです。repl-threads が依存しているので改めて quickload する必要はないです。

CL-USER> (defparameter *lock* (bt:make-lock))
*LOCK*
CL-USER> (rts:with-thread (*rts* 0)
           (bt:acquire-lock *lock*) ; まだ誰もロックを取っていないので
           (print :thread0))        ; ← の print はすぐ実行される
T
:THREAD0
CL-USER> (rts:with-thread (*rts* 1)
           (bt:acquire-lock *lock*) ; thread 0 にロックを取られているので
           (print :thread1)         ; ← の print は待たされる
           (bt:release-lock *lock*))
T
CL-USER> (rts:with-thread (*rts* 0)
           (bt:release-lock *lock*)) ; ← ロックを手放すと...
T
:THREAD1 ; ← 待たされていた thread 1 の print が実行される

また、スペシャル変数 *thread-index* にスレッド番号を格納しています。

CL-USER> (defun test ()
           (print rts:*thread-index*))
TEST
CL-USER> (rts:with-thread (*rts* 1)
           (test))
T
1

掃除は一関数で済みます。スレッドを破棄するだけで各種リソースの面倒まで見てくれるものではないですが...。

CL-USER> (rts:destroy-repl-methods *rts*)
NIL

中身

概要

基本的なアイディアは単純です。

  • 各スレッドはキューを持ち、関数が投げ込まれるのを待ち受ける
  • 投げ込まれた関数を順次実行する

REPL から各スレッドのキューに関数を投げ込むことで、任意の処理を後付けで実行できます。

ということで、次のような順序で作っていきます。

  1. queue: ただのキュー(スレッドアンセーフ)
  2. wait-queue: スレッドセーフなキュー
  3. repl-thread: 2 を利用して前述の動作をするスレッド
  4. repl-threads: 3 のスレッドを束ねるクラス

queue: ただのキュー

https://github.com/eshamster/repl-threads/blob/master/queue.lisp

特別なところはないただのキューなのでリンクだけ貼ります。head からの取り出しと tail への追加しかできない双方向リストとして実装しています。

こんな感じで利用します。

CL-USER> (use-package :repl-threads/queue)
T
CL-USER> (defparameter *q* (make-queue))
*Q*
CL-USER> (queue *q* 0)
0
CL-USER> (queue *q* 1)
1
CL-USER> (dequeue *q*)
0
CL-USER> (dequeue *q*)
1
CL-USER> (dequeue *q*)
NIL

wait-queue: スレッドセーフなキュー

https://github.com/eshamster/repl-threads/blob/master/wait-queue.lisp

先ほどのキューを使って次のようなキューを作ります。

  • スレッドセーフである
  • dequeue 時にキューが空なら queue されるまで待機する

まずは パッケージの定義ですが、先ほど作ったキューの他 bordeaux-threads から Lock, Condition Variable 関連の関数・マクロをインポートします。

(defpackage repl-threads/wait-queue
  (:use :cl)
  (:export :wait-queue
           :make-wait-queue
           :queue
           :dequeue)
  (:import-from :repl-threads/queue
                :make-queue
                :queue
                :dequeue
                :queue-length)
  (:import-from :bordeaux-threads
                :make-lock
                :with-lock-held
                :make-condition-variable
                :condition-notify
                :condition-wait))
(in-package :repl-threads/wait-queue)

クラス定義はこんな感じです。

(defclass wait-queue ()
  ((queue :initform (make-queue) :reader wq-queue)
   (cond-var :initform (make-condition-variable :name "WAIT QUEUE COND") :reader wq-cond-var)
   (wait-count :initform 0 :accessor wq-wait-count)
   (lock :initform (make-lock "WAIT QUEUE LOCK") :reader wq-lock)))

(defun make-wait-queue ()
  (make-instance 'wait-queue))

dequeue メソッドでは、キューに何か入っていればロックを取って値を取り出し、ロックを解放するだけです(when 内を通らないルート)。キューが空の場合は Condition variable を利用してキューに何か入るのを待ちます。 condition-wait は次のような動作をします。

  1. 渡されたロックを解放する
  2. 渡された Condition variable にシグナルが来るまで待つ
  3. シグナルが来たらロックを取得して次に進む
(defmethod dequeue ((wq wait-queue))
  (let ((lock (wq-lock wq))
        (q (wq-queue wq)))
    (with-lock-held (lock)
      (when (= (queue-length q) 0)
        (incf (wq-wait-count wq))
        ;; wait until some value is queued
        (condition-wait (wq-cond-var wq) lock))
      (assert (> (queue-length q) 0))
      (dequeue q))))

次に queue メソッドですが、基本的にはロックを取ってキューに値を詰め、ロックを解放するだけです。ただし、待ち状態の dequeue メソッドが存在する場合は、condition-notify でシグナルを出して待ちを解除します*2

(defmethod queue ((wq wait-queue) value)
  (let ((q (wq-queue wq)))
    (with-lock-held ((wq-lock wq))
      (queue q value)
      (when (> (wq-wait-count wq) 0)
        (decf (wq-wait-count wq))
        (condition-notify (wq-cond-var wq))))))

repl-thread: 投げ込まれた関数を実行するスレッド

https://github.com/eshamster/repl-threads/blob/master/repl-thread.lisp

パッケージの定義は次の通りです。ロック周りは wait-queue にお任せしているので、bordeaux-threads からはスレッドの生成・破棄関数だけを import します。

(defpackage repl-threads/repl-thread
  (:use :cl)
  (:export :make-repl-thread
           :queue-process
           :destroy-repl-thread)
  (:import-from :repl-threads/wait-queue
                :wait-queue
                :make-wait-queue
                :queue
                :dequeue)
  (:import-from :bordeaux-threads
                :make-thread
                :destroy-thread))
(in-package :repl-threads/repl-thread)

クラス定義と生成、破棄関数は次の通りです。

(defclass repl-thread ()
  ((thread :initarg :thread :reader rt-thread)
   (process-queue :initarg :process-queue  :reader rt-process-queue)))

(defun make-repl-thread ()
  (let ((q (make-instance 'wait-queue)))
    (make-instance 'repl-thread
                   :thread (make-thread (make-repl-thread-process q))
                   :process-queue q)))

(defmethod destroy-repl-thread ((rt repl-thread))
  (destroy-thread (rt-thread rt)))

次に、スレッド内で動作する関数を生成する make-repl-thread-process を見ます。といっても、dequeue で待ち、関数が来たら funcall で実行、を繰り返す関数を作るだけです。

(defmethod make-repl-thread-process ((process-queue wait-queue))
  (lambda ()
    (loop (funcall (dequeue process-queue)))))

そして、外部からスレッドに関数を供給するのが queue-process 関数です。受け取った関数をキューに詰め込むだけです。一応受け取ったものが関数かどうか程度のチェックはしていますが、引数ありの関数が funcall されるとスレッドが死んでしまうので、引数のチェックぐらいは追加しても良いかもしれません。

(defmethod queue-process ((rt repl-thread) process)
  (assert (functionp process))
  (queue (rt-process-queue rt) process)
  t)

repl-threads: repl-thread を束ねる

最後に repl-thread を束ねた repl-threads を作ります。パッケージの定義は次の通りです。repl-thread のラッパーなので、そこへだけ依存しています。

(defpackage repl-threads/repl-threads
  (:use :cl)
  (:export :make-repl-threads
           :queue-process-to
           :with-thread
           :destroy-repl-threads
           :*thread-index*)
  (:import-from :repl-threads/repl-thread
                :make-repl-thread
                :queue-process
                :destroy-repl-thread))
(in-package :repl-threads/repl-threads)

クラスとしては repl-thread の配列を持っているだけです。生成と破棄も素直に指定個数の repl-thread の生成と破棄をするだけです。

(defclass repl-threads ()
  ((threads :initarg :threads :reader rts-threads)))

(defun make-repl-threads (n)
  (let ((threads (loop :for i :from 0 :below n :collect (make-repl-thread))))
    (make-instance
     'repl-threads
     :threads (make-array n :initial-contents threads))))

(defmethod destroy-repl-threads ((rts repl-threads))
  (let ((threads (rts-threads rts)))
    (dotimes (i (length threads))
      (destroy-repl-thread (aref threads i)))))

queue-process-to は、インデックスで指定された repl-thread に対して queue-process を呼びます。

(defmethod queue-process-to ((rts repl-threads) thread-index process)
  (let ((threads (rts-threads rts)))
    (assert (< thread-index (length threads)))
    (queue-process (aref threads thread-index) process)))

これをラップしているのが with-thread マクロです。単純なラッパーとして動作するのに加えて、*thread-index* にスレッド番号を束縛する役割もあります。

(defvar *thread-index* -1)

(defmacro with-thread ((rts thread-index) &body body)
  (let ((g-thread-index (gensym "THREAD-INDEX")))
    `(let ((,g-thread-index ,thread-index))
       (queue-process-to ,rts ,g-thread-index
                         (lambda ()
                           (let ((*thread-index* ,g-thread-index))
                             (declare (ignorable *thread-index*))
                             ,@body))))))

おまけ: 色々試してみる

せっかく作ったので色々試してみます。

Recursive Lock

Recursive Lock は通常のロックと異なり、同一スレッド内であれば何度でも取得できるロックになります(解放はロックと同じ回数だけ必要)。なお、これは SBCL には実装されていないようです。下記は CCL 1.11.5 での動作例になります。

CL-USER> (defparameter *rts* (rts:make-repl-threads 2))
*RTS*
CL-USER> (defparameter *rec-lock* (bt:make-recursive-lock))
*REC-LOCK*
CL-USER> (rts:with-thread (*rts* 0)
           (bt:acquire-recursive-lock *rec-lock*)
           (print :thread0-1))
T
:THREAD0-1
CL-USER> (rts:with-thread (*rts* 0)
           (bt:acquire-recursive-lock *rec-lock*) ; 同じスレッド内なのでもう一度取れる
           (print :thread0-2))
T
:THREAD0-2
CL-USER> (rts:with-thread (*rts* 1)
           (bt:acquire-recursive-lock *rec-lock*) ; 別スレッドなので待たされる
           (print :thread1-1))
T
CL-USER> (rts:with-thread (*rts* 0)
           (bt:release-recursive-lock *rec-lock*))
T
CL-USER> (rts:with-thread (*rts* 0)
           (bt:release-recursive-lock *rec-lock*))
T
:THREAD1-1 ; 2回解放するとスレッド1が進めるようになる

Semaphore

セマフォ(Semaphore)は資源数に限りがある対象に触る場合に、同時に N 個のスレッドしか触れないようにする、といった操作を実現するために利用するものです。

CL-USER> (defparameter *rts* (rts:make-repl-threads 3))
*RTS*
;; 資源数2のセマフォをつくる
CL-USER> (defparameter *sem* (bt:make-semaphore :count 2))
*SEM*
CL-USER> (rts:with-thread (*rts* 0)
           (bt:wait-on-semaphore *sem*)
           (print :thread0))
T
:THREAD0
CL-USER> (rts:with-thread (*rts* 1)
           (bt:wait-on-semaphore *sem*)
           (print :thread1))

T
:THREAD1
;; 既に thread 0, 1 が資源を占有しているので待たされる
CL-USER> (rts:with-thread (*rts* 2)
           (bt:wait-on-semaphore *sem*)
           (print :thread2))
T
CL-USER> (rts:with-thread (*rts* 0)
           (bt:signal-semaphore *sem*))
T
:THREAD2 ; thread 0 が資源を手放したので thread 2 が動いた

Condition Variable

Condition Variable は wait-queue の実装でも利用したように、「何らかの条件が成立するまで待つ」(wait-queue であれば「キューに何か入るまで待つ」)用途で利用できます。ただし、こうした用途に適しているというだけで、Condition Variable 自体が何かの条件処理をしてくれる訳ではありません。

再掲すると、待ち側である condition-wait は下記のような動作をします。待ちを解除する condition-notify は単にシグナルを送るだけです。

  1. 渡されたロックを解放する
  2. 渡された Condition variable にシグナルが来るまで待つ
  3. シグナルが来たらロックを取得して次に進む

試してみます。

CL-USER> (defparameter *rts* (rts:make-repl-threads 2))
*RTS*
CL-USER> (defparameter *lock* (bt:make-lock))
*LOCK*
CL-USER> (defparameter *cond-var* (bt:make-condition-variable))
*COND-VAR*
CL-USER> (rts:with-thread (*rts* 0)
           (bt:acquire-lock *lock*)
           (print :thread0-before)
           ;; ここでロックが解除される
           (bt:condition-wait *cond-var* *lock*)
           (print :thread0-after)
           (bt:release-lock *lock*))
T
:THREAD0-BEFORE
CL-USER> (rts:with-thread (*rts* 1)
           ;; ロックを取れる
           (bt:acquire-lock *lock*)
           (bt:condition-notify *cond-var*)
           (print :thread1))
T
:THREAD1
;; ※condition-wait はロックを再取得できないのでまだ待たされる
CL-USER> (rts:with-thread (*rts* 1)
           (bt:release-lock *lock*))
T
:THREAD0-AFTER ; シグナルを受け取り、ロックも解除されたので進めた

wait queue

せっかくなので部品として作った wait-queue も試してみます。

CL-USER> (defparameter *rts* (rts:make-repl-threads 2))
*RTS*
CL-USER> (use-package :repl-threads/wait-queue)
T
CL-USER> (defparameter *wq* (make-wait-queue))
*WQ*
CL-USER> (rts:with-thread (*rts* 0)
           (queue *wq* 0)
           (queue *wq* 1))
T
CL-USER> (rts:with-thread (*rts* 1)
           (print (dequeue *wq*))
           (print (dequeue *wq*))
           ;; ↓2つしか入っていなかったので待たされる
           (print (dequeue *wq*)))
T
0
1
CL-USER> (rts:with-thread (*rts* 0)
           (queue *wq* 2))
T
2 ; ← 待たされたものが動いた

*1:検索すると Clojure 記事が多く出てきますが、用語自体は新しめなのでしょうか?

*2:bordeaux-thread のドキュメントによると、condition-notify は実装によって1つの待ちだけにシグナルを送るものと、全ての待ちにシグナルを送るものがあるとのことです。とはいえ基本的には前者とのことなのでその想定で実装しています