Commits

Moritz Heidkamp  committed b4f51e1

Properly unlock mutex in channel-receive

  • Participants
  • Parent commits a35e8e5

Comments (0)

Files changed (1)

                     (channel-receivers channel))))))
 
 (define (channel-receive channel . args)
-  (let* ((mutex   (channel-mutex channel))
-         (queue   (channel-queue channel))
-         (otrec   (channel-onetime-receivers channel))
-         (cvar    (channel-cvar channel))
-         (timeout (and (pair? args)
-                       (let ((t (car args)))
-                         (and (number? t) t))))
-         (default (and timeout 
-                       (pair? (cdr args))
-                       (cadr args)))
-         (sync?   (or timeout (null? args) (not (procedure? (car args))))))
-
-    (when (have-lock? mutex)
-      (error "can't receive from already locked channel"))
-
-    (mutex-lock! mutex)
-
-    (if sync?
-        (if (queue-empty? queue)
-            (let ((message #f))
-              (let ((receiver (lambda (x)
-                                (set! message x))))
-                (set-insert! otrec receiver)
-                (if (mutex-unlock! mutex cvar timeout)
-                    message
-                    (with-locked-channel channel
-                      (lambda ()
-                        (or message
-                            (begin
-                              (set-remove! otrec receiver)
-                              (and default (default)))))))))
-            (let ((message (queue-remove! queue)))
-              (mutex-unlock! mutex)
-              message))
-        (if (queue-empty? queue)
-            (begin
-              (for-each
-               (lambda (receiver)
-                 (set-insert! otrec (lambda (m) (receiver m))))
-               args)
-              (mutex-unlock! mutex)
-              #f)
-            (let ((message (queue-first queue)))
-              (when (safe-apply-some (channel-on-error-handlers channel)
-                                     args
-                                     message)
-                (queue-remove! queue))
-              (mutex-unlock! mutex)
-              #t)))))
+  (dynamic-wind
+      (lambda ()
+        (let ((mutex (channel-mutex channel)))
+          (when (have-lock? mutex)
+            (error "can't receive from already locked channel"))
+          (mutex-lock! mutex)))
+      (lambda ()
+        (let* ((queue   (channel-queue channel))
+               (otrec   (channel-onetime-receivers channel))
+               (cvar    (channel-cvar channel))
+               (timeout (and (pair? args)
+                             (let ((t (car args)))
+                               (and (number? t) t))))
+               (default (and timeout 
+                             (pair? (cdr args))
+                             (cadr args)))
+               (sync?   (or timeout (null? args) (not (procedure? (car args))))))
+          (if sync?
+              (if (queue-empty? queue)
+                  (let ((message #f))
+                    (let ((receiver (lambda (x)
+                                      (set! message x))))
+                      (set-insert! otrec receiver)
+                      (if (mutex-unlock! (channel-mutex channel) cvar timeout)
+                          message
+                          (with-locked-channel channel
+                            (lambda ()
+                              (or message
+                                  (begin
+                                    (set-remove! otrec receiver)
+                                    (and default (default)))))))))
+                  (queue-remove! queue))
+              (if (queue-empty? queue)
+                  (begin
+                    (for-each
+                     (lambda (receiver)
+                       (set-insert! otrec (lambda (m) (receiver m))))
+                     args)
+                    #f)
+                  (let ((message (queue-first queue)))
+                    (when (safe-apply-some (channel-on-error-handlers channel)
+                                           args
+                                           message)
+                      (queue-remove! queue))
+                    #t)))))
+      (lambda ()
+        (let ((mutex (channel-mutex channel)))
+          (when (have-lock? mutex)
+            (mutex-unlock! mutex))))))
 
 (define (channel-receive/delay channel #!optional timeout default)
   (let ((result (thread-start!
              (close-channel source-channel))))))
     (set-finalizer!
      destination-channel
-     (lambda (x)
+     (lambda (_)
        (channel-remove-receiver source-channel receiver)))))
 
 (define (fold-channel channel proc seed)