1. Moritz Heidkamp
  2. channel

Commits

Moritz Heidkamp  committed e474310

implement `make-receiving-channel` as well as `channel-fold!`, `channel-map!` and `channel-filter!` on top of it.

  • Participants
  • Parent commits fd11390
  • Branches master

Comments (0)

Files changed (2)

File channel.scm

View file
  on-channel-receive on-channel-close on-channel-drain
  channel-messages channel-empty?
  channel-close! channel-closed? channel-drained?
- channel-fork!)
+ make-receiving-channel
+ channel-fork! channel-fold! channel-map! channel-filter!)
 
 (import chicken scheme)
 (use data-structures miscmacros srfi-18)
       (thunk)
       (queue-add! (channel-on-drain-handlers channel) thunk)))
 
+(define (make-receiving-channel source-channel on-receive)
+  (let ((destination-channel (make-channel)))
+    (on-channel-receive
+     source-channel
+     (lambda (message)
+       (on-receive destination-channel message)))
+    (on-channel-close
+     destination-channel
+     (lambda ()
+       (channel-close! source-channel)))
+    destination-channel))
+
+(define (channel-fold! channel proc seed)
+  (let ((accumulator (make-mutex)))
+    (mutex-specific-set! accumulator seed)
+    (make-receiving-channel
+     channel
+     (lambda (folding-channel message)
+       (mutex-lock! accumulator)
+       (let ((acc (proc message (mutex-specific accumulator))))
+         (mutex-specific-set! accumulator acc)
+         (channel-enqueue! folding-channel acc)
+         (mutex-unlock! accumulator))))))
+
+(define (channel-map! channel proc)
+  (make-receiving-channel
+   channel
+   (lambda (mapping-channel message)
+     (channel-enqueue! mapping-channel (proc message)))))
+
+(define (channel-filter! channel pred?)
+  (make-receiving-channel
+   channel
+   (lambda (filtering-channel message)
+     (when (pred? message)
+       (channel-enqueue! filtering-channel message)))))
+
 )

File tests/run.scm

View file
   (handle-exceptions exn exn (channel-enqueue! c 3))
   (test (r2) '(3)))
 
+(test-group "make-receiving-channel"
+  (define c (make-channel 1 2 3))
+  (define sc (make-receiving-channel c (lambda (sc m)
+                                      (channel-enqueue! sc (+ 1 m)))))
+  (test (channel-messages sc) '(2 3 4))
+  (test (channel-messages c) '())
+  (channel-enqueue! c 4)
+  (test (channel-messages sc) '(2 3 4 5))
+  (test (channel-messages c) '())
+  (test-assert (not (channel-closed? c)))
+  (test-assert (not (channel-closed? sc)))
+  (channel-close! sc)
+  (test-assert (channel-closed? c))
+  (test-assert (channel-closed? sc)))
+
+(test-group "channel-fold!"
+  (define c (make-channel 1 2))
+  (define cf (channel-fold! c cons '()))
+  (test (channel-messages cf) '((1) (2 1)))
+  (channel-enqueue! c 3)
+  (test (channel-messages cf) '((1) (2 1) (3 2 1)))
+  (test (channel-messages c) '()))
+
+(test-group "channel-map!"
+  (define c (make-channel 1 2))
+  (define m (channel-map! c add1))
+  (test (channel-messages m) '(2 3))
+  (channel-enqueue! c 3)
+  (test (channel-messages m) '(2 3 4))
+  (test (channel-messages c) '()))
+
+(test-group "channel-filter!"
+  (define c (make-channel 1 2 3 4))
+  (define m (channel-filter! c even?))
+  (test (channel-messages m) '(2 4))
+  (channel-enqueue! c 5 6)
+  (test (channel-messages m) '(2 4 6))
+  (test (channel-messages c) '()))
+
 (test-end)
 
 (test-exit)