Commits

Moritz Heidkamp committed 0ca8ff0

use semaphore for more robust channel locking

  • Participants
  • Parent commits cde1640

Comments (0)

Files changed (1)

             (queue->list (channel-queue c)))
   (display ">" out))
 
+;; semaphore implementation stolen from the 9p egg
+
+(define (have-lock? mutex)
+  (eq? (mutex-state mutex) (current-thread)))
+
+(define (semaphore-lock! mutex)
+  (if (have-lock? mutex)
+      (mutex-specific-set! mutex (add1 (mutex-specific mutex)))
+      (begin
+	(mutex-lock! mutex)
+	(mutex-specific-set! mutex 0))))
+
+(define (semaphore-unlock! mutex)
+  (cond
+   ((not (have-lock? mutex))
+    (error "The current thread does not own the mutex!"))
+   ((> (mutex-specific mutex) 0)
+    (mutex-specific-set! mutex (sub1 (mutex-specific mutex))))
+   (else (mutex-unlock! mutex))))
+
+(define (with-semaphore mutex thunk)
+  (dynamic-wind
+      (lambda () (semaphore-lock! mutex))
+      thunk
+      (lambda () (semaphore-unlock! mutex))))
+
+(define (with-locked-channel channel thunk)
+  (with-semaphore (channel-mutex channel) thunk))
+
 (define %make-channel make-channel)
 
 (define (queue-add-list! queue items)
   (queue->list (channel-queue channel)))
 
 (define (channel-flush! channel)
-  (let ((mutex     (channel-mutex channel))
-        (queue     (channel-queue channel))
+  (let ((queue     (channel-queue channel))
         (receivers (channel-receivers channel))
         (onetime-receivers (channel-onetime-receivers channel)))
-    (dynamic-wind
-        (lambda ()
-          (mutex-lock! mutex))
-        (lambda ()
-          (let loop ()
-            (unless (or (queue-empty? queue)
-                        (and (null? receivers) (queue-empty? onetime-receivers)))
-              (let ((message (queue-remove! queue)))
-                (unless (queue-empty? onetime-receivers)
-                  (run-callback-queue! onetime-receivers message)
-                  (condition-variable-broadcast! (channel-cvar channel)))
-                (call-each message receivers))
-              (loop)))
-          (when (channel-drained? channel)
-            (run-callback-queue! (channel-on-drain-handlers channel))))
-        (lambda ()
-          (mutex-unlock! mutex)))))
+    (with-locked-channel channel
+      (lambda ()
+        (let loop ()
+          (unless (or (queue-empty? queue)
+                      (and (null? receivers) (queue-empty? onetime-receivers)))
+            (let ((message (queue-remove! queue)))
+              (unless (queue-empty? onetime-receivers)
+                (run-callback-queue! onetime-receivers message)
+                (condition-variable-broadcast! (channel-cvar channel)))
+              (call-each message receivers))
+            (loop)))
+        (when (channel-drained? channel)
+          (run-callback-queue! (channel-on-drain-handlers channel)))))))
 
 (define (channel-enqueue! channel message . messages)
   (and (not (channel-closed? channel))
        (let ((queue (channel-queue channel))
-             (mutex (channel-mutex channel))
              (messages (cons message messages)))
-         (mutex-lock! mutex)
-         (queue-add-list! queue messages)
-         (set! (channel-forks channel)
-               (filter (lambda (fork)
-                         (and-let* ((fork (locative->object fork)))
-                           (unless (null? messages)
-                             (apply channel-enqueue! fork messages))))
-                       (channel-forks channel)))
-         (mutex-unlock! mutex)
-         (channel-flush! channel)
+         (with-locked-channel channel
+           (lambda ()
+             (queue-add-list! queue messages)
+             (set! (channel-forks channel)
+                   (filter (lambda (fork)
+                             (and-let* ((fork (locative->object fork)))
+                               (unless (null? messages)
+                                 (apply channel-enqueue! fork messages))))
+                           (channel-forks channel)))
+             (channel-flush! channel)))
          #t)))
 
 (define (on-channel-receive channel receiver)
-  (let ((mutex (channel-mutex channel)))
-    (mutex-lock! mutex)
-    (push! receiver (channel-receivers channel))
-    (mutex-unlock! mutex)
-    (channel-flush! channel)))
+  (with-locked-channel channel
+    (lambda ()
+      (push! receiver (channel-receivers channel))
+      (channel-flush! channel))))
 
 (define (channel-receiver-remove! channel receiver)
-  (let ((mutex (channel-mutex channel)))
-    (mutex-lock! mutex)
-    (set! (channel-receivers channel)
-          (remove (lambda (r) (eq? r receiver))
-                  (channel-receivers channel)))
-    (mutex-unlock! mutex)))
+  (with-locked-channel channel
+    (lambda ()
+      (set! (channel-receivers channel)
+            (remove (lambda (r) (eq? r receiver))
+                    (channel-receivers channel))))))
 
 (define (channel-receive! channel . procs)
   (let ((mutex   (channel-mutex channel))
         (rqueue  (channel-onetime-receivers channel))
         (cvar    (channel-cvar channel))
         (sync?   (null? procs)))
+    (when (have-lock? mutex)
+      (error "can't receive from already locked channel"))
     (mutex-lock! mutex)
     (if (queue-empty? queue)
         (if sync?
        (channel-empty? channel)))
 
 (define (channel-fork! channel)
-  (let ((mutex (channel-mutex channel)))
-    (mutex-lock! mutex)
-    (let* ((fork  (apply make-channel (channel-messages channel)))
-           (fork* (make-weak-locative fork)))
-      (push! fork* (channel-forks channel))
-      (mutex-unlock! mutex)
-      (on-channel-close
-       channel
-       (lambda ()
-         (and-let* ((fork (locative->object fork*)))
-           (channel-close! fork))))
-      fork)))
+  (with-locked-channel channel
+    (lambda ()
+      (let* ((fork  (apply make-channel (channel-messages channel)))
+             (fork* (make-weak-locative fork)))
+        (push! fork* (channel-forks channel))
+        (on-channel-close
+         channel
+         (lambda ()
+           (and-let* ((fork (locative->object fork*)))
+             (channel-close! fork))))
+        fork))))
 
 (define (on-channel-close channel thunk)
   (if (channel-closed? channel)