Moritz Heidkamp avatar Moritz Heidkamp committed 90317e4

don't lose messages when no handler has successfully handled them

Comments (0)

Files changed (2)

 (define %make-set make-set)
 
 (define (make-set)
-  (%make-set (make-hash-table)))
+  (%make-set (make-hash-table eq?)))
 
 (define (set-contains? set object)
   (hash-table-exists? (set-table set) object))
 (define (set-members set)
   (hash-table-keys (set-table set)))
 
-(define (set-for-each set proc)
-  (hash-table-for-each (set-table set) (lambda (k v) (proc k))))
+(define (set-fold set proc seed)
+  (hash-table-fold
+   (set-table set)
+   (lambda (k v s) (proc k seed))
+   seed))
 
 (define (set-empty? set)
   (zero? (hash-table-size (set-table set))))
      (queue-add! queue item))
    items))
 
-(define (call-each arg procs)
-  (for-each (lambda (proc) (proc arg)) procs))
+(define (safe-call proc . args)
+  (handle-exceptions _ #f (begin (apply proc args) #t)))
+
+(define (safe-call-some arg procs)
+  (fold (lambda (proc handled?)
+          (or (safe-call proc arg) handled?))
+        #f
+        procs))
 
 (define (run-callback-queue! queue . args)
-  (let loop ()
+  (let loop ((handled? #f))
     (unless (queue-empty? queue)
-      (apply (queue-remove! queue) args)
-      (loop))))
-
-(define (run-callbacks! callbacks . args)
-  (set-for-each
-   callbacks
-   (lambda (callback)
-     (apply callback args)))
-  (set-clear! callbacks))
+      (loop (or (apply safe-call (queue-remove! queue) args) handled?)))))
+
+(define (run-callbacks! callbacks message)
+  (define handled? 
+    (set-fold
+     callbacks
+     (lambda (callback handled?)
+       (or (safe-call callback message) handled?))
+     #f))
+  (set-clear! callbacks)
+  handled?)
 
 (define (make-channel . messages)
   (%make-channel #f '() '()
     (with-locked-channel channel
       (lambda ()
         (let loop ()
-          (when (and (not (queue-empty? queue)) (channel-has-receivers? channel))
-            (let ((message (queue-remove! queue)))
+          (when (and (not (queue-empty? queue))
+                     (channel-has-receivers? channel))
+            (let ((message (queue-first queue))
+                  (handled? #f))
               (unless (set-empty? onetime-receivers)
-                (run-callbacks! onetime-receivers message)
-                (condition-variable-broadcast! (channel-cvar channel)))
-              (call-each message receivers))
-            (loop)))
+                (when (run-callbacks! onetime-receivers message)
+                  (set! handled? #t)
+                  (condition-variable-broadcast! (channel-cvar channel))))
+              (when (or (safe-call-some message receivers) handled?)
+                (queue-remove! queue)
+                (loop)))))
         (when (channel-drained? channel)
           (run-callback-queue! (channel-on-drain-handlers channel)))))))
 
 
     (if sync?
         (if (queue-empty? queue)
-            (let* ((message #f)
-                   (receiver (lambda (x) (set! message x))))
-              (set-insert! otrec receiver)
-              (if (mutex-unlock! mutex cvar timeout)
-                  message
-                  (with-locked-channel channel
-                    (lambda ()
-                      (or message
-                          (begin
-                            (set-remove! otrec receiver)
-                            (and default (default))))))))
+            (let ((message #f))
+              (let ((receiver (lambda (x)
+                                (set! message x))))
+                (set-insert! otrec receiver)
+                (if (mutex-unlock! mutex cvar timeout)
+                    message
+                    (with-locked-channel channel
+                      (lambda ()
+                        (or message
+                            (begin
+                              (set-remove! otrec receiver)
+                              (and default (default)))))))))
             (let ((message (queue-remove! queue)))
               (mutex-unlock! mutex)
               message))
               (mutex-unlock! mutex)
               #f)
             (let ((message (queue-remove! queue)))
-              (call-each message args)
+              (unless (safe-call-some message args)
+                (queue-push-back! queue message))
               (mutex-unlock! mutex)
               #t)))))
 
   (define results-2 (make-receiver))
   (define results-3 (make-receiver))
   (on-channel-receive c results-1)
+  ;; using the same receiver multiple times will register it only once
   (channel-receive c results-2 results-2)
   (channel-enqueue c 4)
   (on-channel-receive c results-3)
   (channel-enqueue c 5)
   (test (results-1) '(1 2 3 4 5))
-  (test (results-2) '(4 4))
+  (test (results-2) '(4))
   (test (results-3) '(5))
   (channel-remove-receiver c results-1)
   (channel-enqueue c 6)
   (test (close) '(close close))
   (test (drain) '(drain drain)))
 
-;; sane?
 (test-group "errors"
   (define c (make-channel 1))
-  (test-error
-   (on-channel-receive c (lambda (x) (error "boom"))))
-  (define r1 (make-receiver))  
-  (define r2 (make-receiver))  
+  (define r1 (make-receiver))
+  (define r2 (make-receiver))
+  (on-channel-receive c (lambda (x) (error "boom")))
   (channel-receive c r1)
+  (on-channel-receive c r1)
+  (channel-enqueue c 2)
   (channel-receive c (lambda (x) (error "kaboom")))
   (channel-receive c r2)
-  (handle-exceptions exn exn (channel-enqueue c 2))
-  (test (r1) '(2))
-  (test (r2) '())
-  (handle-exceptions exn exn (channel-enqueue c 3))
+  (channel-enqueue c 3)
+  (test (r1) '(1 2 3))
   (test (r2) '(3)))
 
 (test-group "siphon-channel"
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.