Commits

Moritz Heidkamp  committed 73b033d

allow passing a timeout and an optional default thunk to channel-receive for synchronous call

  • Participants
  • Parent commits 2f499c3

Comments (0)

Files changed (2)

             (remove (lambda (r) (eq? r receiver))
                     (channel-receivers channel))))))
 
-(define (channel-receive channel . procs)
-  (let ((mutex   (channel-mutex channel))
-        (queue   (channel-queue channel))
-        (rqueue  (channel-onetime-receivers channel))
-        (cvar    (channel-cvar channel))
-        (sync?   (null? procs)))
+(define (channel-receive channel . args)
+  (let* ((mutex   (channel-mutex channel))
+         (queue   (channel-queue channel))
+         (rqueue  (channel-onetime-receivers channel))
+         (cvar    (channel-cvar channel))
+         (timeout (and (pair? args)
+                       (let ((t (car args)))
+                         (and (number? t) t))))
+         (default (and timeout (if (pair? (cdr args))
+                                   (cadr args)
+                                   (lambda () #f))))
+         (sync?   (or timeout (null? args))))
+
     (when (have-lock? mutex)
       (error "can't receive from already locked channel"))
+
     (mutex-lock! mutex)
-    (if (queue-empty? queue)
-        (if sync?
+
+    (if sync?
+        (if (queue-empty? queue)
             (let ((message #f))
+              ;; FIXME: When the timeout is reached this callback is
+              ;; only cleaned up when the next message is
+              ;; enqueued. It's not really a problem but not
+              ;; particularly neat either.
               (queue-add! rqueue (lambda (x) (set! message x)))
-              (mutex-unlock! mutex cvar)
-              message)
+              (if (mutex-unlock! mutex cvar timeout)
+                  message
+                  (default)))
+            (let ((message (queue-remove! queue)))
+              (mutex-unlock! mutex)
+              message))
+        (if (queue-empty? queue)
             (begin
-              (queue-add-list! rqueue procs)
+              (queue-add-list! rqueue args)
+              (mutex-unlock! mutex)
+              #f)
+            (let ((message (queue-remove! queue)))
+              (call-each message args)
               (mutex-unlock! mutex)
-              #f))
-        (let ((message (queue-remove! queue)))
-          (if sync?
-              (begin
-                (mutex-unlock! mutex)
-                message)
-              (begin
-                (call-each message procs)
-                (mutex-unlock! mutex)
-                #t))))))
+              #t)))))
 
 (define (close-channel channel)
   (set! (channel-closed? channel) #t)

File tests/run.scm

     (thread-start! (lambda () (channel-receive c))))
   (define producer
     (thread-start! (lambda () (channel-enqueue c 'foo))))
-  (test (thread-join! consumer) 'foo))
+  (test (thread-join! consumer) 'foo)
+
+  (test-group "with timeout"
+    (define c (make-channel 'foo))
+    (test 'foo (channel-receive c 3))
+    (test-assert (not (channel-receive c 0.2)))
+    (test 'none (channel-receive c 0.2 (lambda () 'none)))))
 
 (test-group "fork-channel"
   (define r1 (make-receiver))