Commits

Moritz Heidkamp committed 778e7b0

use hash-table based set instead of a queue for onetime-receivers to be able to remove them immediately on timeout

Comments (0)

Files changed (1)

  channel-empty? channel-closed? channel-drained?)
 
 (import chicken scheme)
-(use data-structures srfi-18 srfi-1
+(use data-structures srfi-18 srfi-1 srfi-69
      (only srfi-1 filter)
      (only miscmacros push!)
      (only lolevel make-weak-locative locative->object))
 (define (with-locked-channel channel thunk)
   (with-semaphore (channel-mutex channel) thunk))
 
+
+(define-record set table)
+
+(define %make-set make-set)
+
+(define (make-set)
+  (%make-set (make-hash-table)))
+
+(define (set-contains? set object)
+  (hash-table-exists? (set-table set) object))
+
+(define (set-insert! set object)
+  (hash-table-set! (set-table set) object #t))
+
+(define (set-insert-list! set objects)
+  (for-each
+   (lambda (object)
+     (set-insert! set object))
+   objects))
+
+(define (set-remove! set object)
+  (hash-table-delete! (set-table set) object))
+
+(define (set-clear! set)
+  (hash-table-clear! (set-table set)))
+
+(define (set-members set)
+  (hash-table-keys (set-table set)))
+
+(define (set-for-each set proc)
+  (hash-table-for-each (set-table set) (lambda (k v) (proc k))))
+
+(define (set-empty? set)
+  (zero? (hash-table-size (set-table set))))
+
 (define %make-channel make-channel)
 
 (define (queue-add-list! queue items)
       (apply (queue-remove! queue) args)
       (loop))))
 
+(define (run-callbacks! callbacks . args)
+  (set-for-each
+   callbacks
+   (lambda (callback)
+     (apply callback args)))
+  (set-clear! callbacks))
+
 (define (make-channel . messages)
   (%make-channel #f '() '()
-                 (make-queue)
+                 (make-set)
                  (list->queue messages)
                  (make-queue)
                  (make-queue)
 
 (define (channel-has-receivers? channel)
   (not (and (null? (channel-receivers channel))
-            (queue-empty? (channel-onetime-receivers channel)))))
+            (set-empty? (channel-onetime-receivers channel)))))
 
 (define (flush-channel channel)
   (let ((queue     (channel-queue channel))
         (let loop ()
           (when (and (not (queue-empty? queue)) (channel-has-receivers? channel))
             (let ((message (queue-remove! queue)))
-              (unless (queue-empty? onetime-receivers)
-                (run-callback-queue! onetime-receivers message)
+              (unless (set-empty? onetime-receivers)
+                (run-callbacks! onetime-receivers message)
                 (condition-variable-broadcast! (channel-cvar channel)))
               (call-each message receivers))
             (loop)))
 (define (channel-receive channel . args)
   (let* ((mutex   (channel-mutex channel))
          (queue   (channel-queue channel))
-         (rqueue  (channel-onetime-receivers channel))
+         (otrec   (channel-onetime-receivers channel))
          (cvar    (channel-cvar channel))
          (timeout (and (pair? args)
                        (let ((t (car args)))
 
     (if sync?
         (if (queue-empty? queue)
-            (let ((message #f))
-              ;; FIXME: When the timeout is reached this callback is
-              ;; only cleaned up when the next message is
-              ;; enqueued. It's not really a problem but not
-              ;; particularly neat either.
-              (queue-add! rqueue (lambda (x) (set! message x)))
+            (let* ((message #f)
+                   (receiver (lambda (x) (set! message x))))
+              (set-insert! otrec receiver)
               (if (mutex-unlock! mutex cvar timeout)
                   message
-                  (and default (default))))
+                  (with-locked-channel channel
+                    (lambda ()
+                      (or message
+                          (begin
+                            (set-remove! otrec receiver)
+                            (and default (default))))))))
             (let ((message (queue-remove! queue)))
               (mutex-unlock! mutex)
               message))
         (if (queue-empty? queue)
             (begin
-              (queue-add-list! rqueue args)
+              (set-insert-list! otrec args)
               (mutex-unlock! mutex)
               #f)
             (let ((message (queue-remove! queue)))