Moritz Heidkamp  committed 2ff47aa

rename make-receiving-channel to siphon-channel and slightly change the API

  • Participants
  • Parent commits 4317e84

Comments (0)

Files changed (2)

  ;; derivators
  fork-channel fold-channel map-channel filter-channel
- make-receiving-channel
+ siphon-channel
  ;; operations
  channel-enqueue channel-receive channel-receive/delay
       (queue-add! (channel-on-drain-handlers channel) thunk)))
-(define (make-receiving-channel source-channel on-receive)
-  (let* ((receiving-channel (make-channel))
-         (receiver (let ((rc* (make-weak-locative receiving-channel)))
-                     (lambda (message)
-                       (and-let* ((rc (locative->object rc*)))
-                         (on-receive rc message))))))
+(define (siphon-channel source-channel
+                        #!optional
+                        (destination-channel (make-channel))
+                        (on-receive channel-enqueue))
+  (let ((receiver (let ((rc* (make-weak-locative destination-channel)))
+                    (lambda (message)
+                      (and-let* ((rc (locative->object rc*)))
+                        (on-receive rc message))))))
-     receiving-channel
+     destination-channel
      (lambda ()
        (with-locked-channel source-channel
          (lambda ()
            (unless (channel-has-receivers? source-channel)
              (close-channel source-channel))))))
-     receiving-channel
+     destination-channel
      (lambda (x)
        (channel-remove-receiver source-channel receiver)))))
 (define (fold-channel channel proc seed)
   (let ((accumulator (make-mutex)))
     (mutex-specific-set! accumulator seed)
-    (make-receiving-channel
+    (siphon-channel
+     (make-channel)
      (lambda (folding-channel message)
        (mutex-lock! accumulator)
        (let ((acc (proc message (mutex-specific accumulator))))
          (mutex-unlock! accumulator))))))
 (define (map-channel channel proc)
-  (make-receiving-channel
+  (siphon-channel
+   (make-channel)
    (lambda (mapping-channel message)
      (channel-enqueue mapping-channel (proc message)))))
 (define (filter-channel channel pred?)
-  (make-receiving-channel
+  (siphon-channel
+   (make-channel)
    (lambda (filtering-channel message)
      (when (pred? message)
        (channel-enqueue filtering-channel message)))))

File tests/run.scm

   (handle-exceptions exn exn (channel-enqueue c 3))
   (test (r2) '(3)))
-(test-group "make-receiving-channel"
+(test-group "siphon-channel"
   (define c (make-channel 1 2 3))
-  (define sc (make-receiving-channel c (lambda (sc m)
-                                         (channel-enqueue sc (+ 1 m)))))
+  (define sc (make-channel))
+  (siphon-channel c sc (lambda (sc m)
+                         (channel-enqueue sc (+ 1 m))))
   (test (channel-messages sc) '(2 3 4))
   (test (channel-messages c) '())
   (channel-enqueue c 4)
   (test-group "gc"
     (define c (make-channel 1 2))
     (define result (make-receiver))
-    (define rc (make-receiving-channel c (lambda (_ m) (result m))))
+    (define rc (siphon-channel c (make-channel) (lambda (_ m) (result m))))
     (test '(1 2) (result))
     (set! rc #f)
   (test-group "close propagation"
     (define c1     (make-channel))
-    (define c1.1   (make-receiving-channel c1 void))
-    (define c1.2   (make-receiving-channel c1 void))
-    (define c1.2.1 (make-receiving-channel c1.2 void))
+    (define c1.1   (siphon-channel c1))
+    (define c1.2   (siphon-channel c1))
+    (define c1.2.1 (siphon-channel c1.2))
     (close-channel c1.2.1)
     (test-assert (channel-closed? c1.2.1))
     (test-assert (channel-closed? c1.2))