Commits

Moritz Heidkamp  committed d710992

add on-channel-error callbacks for handling errors raised by receivers

  • Participants
  • Parent commits 84b66f7
  • Tags 0.0.1

Comments (0)

Files changed (2)

  channel-remove-receiver close-channel
 
  ;; callbacks
- on-channel-receive on-channel-close on-channel-drain
+ on-channel-receive on-channel-close 
+ on-channel-drain on-channel-error
 
  ;; accessors
  channel-messages channel-forks
   queue
   on-close-handlers
   on-drain-handlers
+  on-error-handlers
   mutex
   cvar)
 
      (queue-add! queue item))
    items))
 
-(define (safe-apply proc args)
-  (handle-exceptions _ #f (begin (apply proc args) #t)))
-
-(define (safe-apply-some procs . args)
-  (fold (lambda (proc handled?)
-          (or (safe-apply proc args) handled?))
-        #f
-        procs))
+(define (safe-apply on-error proc args)
+  (handle-exceptions exn
+    (begin (apply on-error exn args) #f)
+    (begin (apply proc args) #t)))
+
+(define (safe-apply-some on-error-handlers procs . args)
+  (let ((on-error (if (set-empty? on-error-handlers)
+                      print-exn
+                      (lambda args
+                        (run-callbacks on-error-handlers args)))))
+    (fold (lambda (proc handled?)
+            (or (safe-apply on-error proc args) handled?))
+          #f
+          procs)))
+
+(define (print-exn exn . _)
+  (print-error-message exn (current-error-port)))
+
+(define (run-callbacks callbacks args)
+  (set-fold
+   callbacks
+   (lambda (callback handled?)
+     (or (safe-apply print-exn callback args) handled?))
+   #f))
 
 (define (run-callbacks! callbacks . args)
-  (define handled? 
-    (set-fold
-     callbacks
-     (lambda (callback handled?)
-       (or (safe-apply callback args) handled?))
-     #f))
+  (define handled? (run-callbacks callbacks args))
   (set-clear! callbacks)
   handled?)
 
                  (list->queue messages)
                  (make-set eq?)
                  (make-set eq?)
+                 (make-set eq?)
                  (make-mutex)
                  (make-condition-variable)))
 
                 (when (run-callbacks! onetime-receivers message)
                   (set! handled? #t)
                   (condition-variable-broadcast! (channel-cvar channel))))
-              (when (or (safe-apply-some receivers message) handled?)
+              (when (or (safe-apply-some (channel-on-error-handlers channel)
+                                         receivers
+                                         message)
+                        handled?)
                 (queue-remove! queue)
                 (loop)))))
         (when (channel-drained? channel)
               (mutex-unlock! mutex)
               #f)
             (let ((message (queue-first queue)))
-              (when (safe-apply-some args message)
+              (when (safe-apply-some (channel-on-error-handlers channel)
+                                     args
+                                     message)
                 (queue-remove! queue))
               (mutex-unlock! mutex)
               #t)))))
       (thunk)
       (set-insert! (channel-on-drain-handlers channel) thunk)))
 
+(define (on-channel-error channel proc)
+  (set-insert! (channel-on-error-handlers channel) proc))
+
 (define (siphon-channel source-channel
                         #!optional
                         (destination-channel (make-channel))

File tests/run.scm

   (test (r1) '(1 2 3))
   (test (r2) '(3)))
 
+(test-group "on-channel-error"
+  (define channel (make-channel))
+  (define err #f)
+  (define msg #f)
+  (on-channel-receive
+   channel
+   (lambda _
+     (error 'in-the-error-handler "explosion")))
+  (on-channel-error
+   channel (lambda (e m)
+             (set! err e)
+             (set! msg m)))
+  (channel-enqueue channel 'foo)
+  (test-assert (condition? err))
+  (test (get-condition-property err 'exn 'location) 'in-the-error-handler)
+  (test msg 'foo))
+
 (test-group "siphon-channel"
   (define c (make-channel 1 2 3))
   (define sc (make-channel))