Commits

Moritz Heidkamp committed 59f27bb

de-bangify API

Comments (0)

Files changed (2)

 (module channel
 
 (make-channel
- channel-enqueue! channel-receive!
+ ;; derivators
+ fork-channel fold-channel map-channel filter-channel
+ make-receiving-channel
+
+ ;; operations
+ channel-enqueue channel-receive channel-remove-receiver close-channel
+
+ ;; callbacks
  on-channel-receive on-channel-close on-channel-drain
- channel-messages channel-empty?
- channel-receiver-remove!
- channel-close! channel-closed? channel-drained?
- channel-forks make-receiving-channel
- channel-fork! channel-fold! channel-map! channel-filter!)
+
+ ;; accessors
+ channel-messages channel-forks
+
+ ;; predicates
+ channel-empty? channel-closed? channel-drained?)
 
 (import chicken scheme)
 (use data-structures srfi-18 srfi-1
 (define (channel-messages channel)
   (queue->list (channel-queue channel)))
 
-(define (channel-flush! channel)
+(define (flush-channel channel)
   (let ((queue     (channel-queue channel))
         (receivers (channel-receivers channel))
         (onetime-receivers (channel-onetime-receivers channel)))
         (when (channel-drained? channel)
           (run-callback-queue! (channel-on-drain-handlers channel)))))))
 
-(define (channel-enqueue! channel message . messages)
+(define (channel-enqueue channel message . messages)
   (and (not (channel-closed? channel))
        (let ((queue (channel-queue channel))
              (messages (cons message messages)))
                    (filter (lambda (fork)
                              (and-let* ((fork (locative->object fork)))
                                (unless (null? messages)
-                                 (apply channel-enqueue! fork messages))))
+                                 (apply channel-enqueue fork messages))))
                            (channel-forks channel)))
-             (channel-flush! channel)))
+             (flush-channel channel)))
          #t)))
 
 (define (on-channel-receive channel receiver)
   (with-locked-channel channel
     (lambda ()
       (push! receiver (channel-receivers channel))
-      (channel-flush! channel))))
+      (flush-channel channel))))
 
-(define (channel-receiver-remove! channel receiver)
+(define (channel-remove-receiver channel receiver)
   (with-locked-channel channel
     (lambda ()
       (set! (channel-receivers channel)
             (remove (lambda (r) (eq? r receiver))
                     (channel-receivers channel))))))
 
-(define (channel-receive! channel . procs)
+(define (channel-receive channel . procs)
   (let ((mutex   (channel-mutex channel))
         (queue   (channel-queue channel))
         (rqueue  (channel-onetime-receivers channel))
                 (mutex-unlock! mutex)
                 #t))))))
 
-(define (channel-close! channel)
+(define (close-channel channel)
   (set! (channel-closed? channel) #t)
   (run-callback-queue! (channel-on-close-handlers channel)))
 
   (and (channel-closed? channel)
        (channel-empty? channel)))
 
-(define (channel-fork! channel)
+(define (fork-channel channel)
   (with-locked-channel channel
     (lambda ()
       (let* ((fork  (apply make-channel (channel-messages channel)))
          channel
          (lambda ()
            (and-let* ((fork (locative->object fork*)))
-             (channel-close! fork))))
+             (close-channel fork))))
         fork))))
 
 (define (on-channel-close channel thunk)
     (on-channel-close
      receiving-channel
      (lambda ()
-       (channel-close! source-channel)))
+       (close-channel source-channel)))
     (set-finalizer!
      receiving-channel
      (lambda (x)
-       (channel-receiver-remove! source-channel receiver)))))
+       (channel-remove-receiver source-channel receiver)))))
 
-(define (channel-fold! channel proc seed)
+(define (fold-channel channel proc seed)
   (let ((accumulator (make-mutex)))
     (mutex-specific-set! accumulator seed)
     (make-receiving-channel
        (mutex-lock! accumulator)
        (let ((acc (proc message (mutex-specific accumulator))))
          (mutex-specific-set! accumulator acc)
-         (channel-enqueue! folding-channel acc)
+         (channel-enqueue folding-channel acc)
          (mutex-unlock! accumulator))))))
 
-(define (channel-map! channel proc)
+(define (map-channel channel proc)
   (make-receiving-channel
    channel
    (lambda (mapping-channel message)
-     (channel-enqueue! mapping-channel (proc message)))))
+     (channel-enqueue mapping-channel (proc message)))))
 
-(define (channel-filter! channel pred?)
+(define (filter-channel channel pred?)
   (make-receiving-channel
    channel
    (lambda (filtering-channel message)
      (when (pred? message)
-       (channel-enqueue! filtering-channel message)))))
+       (channel-enqueue filtering-channel message)))))
 
 )
 
 (test-group "channel-receive!"
   (define c (make-channel))
-  (channel-enqueue! c 1)
+  (channel-enqueue c 1)
   (define results (make-receiver))
-  (test-assert (channel-receive! c results))
+  (test-assert (channel-receive c results))
   (test (results) '(1))
-  (test-assert (not (channel-receive! c results)))
-  (channel-enqueue! c 2)
+  (test-assert (not (channel-receive c results)))
+  (channel-enqueue c 2)
   (test (results) '(1 2))
 
   (define results (make-receiver))
-  (channel-enqueue! c 1 2)
-  (channel-receive! c results results)
-  (channel-receive! c results results)
+  (channel-enqueue c 1 2)
+  (channel-receive c results results)
+  (channel-receive c results results)
   (test (results) '(1 1 2 2)))
 
 (test-group "channel receivers"
   (define results-2 (make-receiver))
   (define results-3 (make-receiver))
   (on-channel-receive c results-1)
-  (channel-receive! c results-2 results-2)
-  (channel-enqueue! c 4)
+  (channel-receive c results-2 results-2)
+  (channel-enqueue c 4)
   (on-channel-receive c results-3)
-  (channel-enqueue! c 5)
+  (channel-enqueue c 5)
   (test (results-1) '(1 2 3 4 5))
   (test (results-2) '(4 4))
   (test (results-3) '(5))
-  (channel-receiver-remove! c results-1)
-  (channel-enqueue! c 6)
+  (channel-remove-receiver c results-1)
+  (channel-enqueue c 6)
   (test (results-1) '(1 2 3 4 5))
   (test (results-3) '(5 6)))
 
 
   (define (enqueue c . x)
     (print ">> enqueuing " x)
-    (apply channel-enqueue! c x)
+    (apply channel-enqueue c x)
     (print-channel-status c))
 
   (define (test-receiver name results-queue)
    (define thread
      (thread-start!
       (lambda ()
-        (sync (channel-receive! channel (test-receiver 'receive-1 results-receive-1)))
+        (sync (channel-receive channel (test-receiver 'receive-1 results-receive-1)))
         (sync (on-channel-receive channel (test-receiver 'receiver-t results-receiver-t)))
-        (sync (channel-receive! channel
+        (sync (channel-receive channel
                                 (test-receiver 'receive-2 results-receive-2)
                                 (test-receiver 'receive-2 results-receive-2)))
         (enqueue channel 'frob)))))
   (test-assert (not (channel-drained? c)))
   (test-assert (channel-empty? c))
 
-  (test-assert (channel-enqueue! c 1))
+  (test-assert (channel-enqueue c 1))
   (test-assert (not (channel-empty? c)))
   (test-assert (not (channel-closed? c)))
   (test-assert (not (channel-drained? c)))
 
-  (channel-receive! c void)
+  (channel-receive c void)
   (test-assert (channel-empty? c))
   (test-assert (not (channel-closed? c)))
   (test-assert (not (channel-drained? c)))
 
-  (test-assert (channel-enqueue! c 1))
-  (channel-close! c)
+  (test-assert (channel-enqueue c 1))
+  (close-channel c)
   (test-assert (channel-closed? c))
-  (test-assert (not (channel-enqueue! c 1)))
+  (test-assert (not (channel-enqueue c 1)))
   (test-assert (not (channel-empty? c)))
 
-  (channel-receive! c void)
+  (channel-receive c void)
   (test-assert (channel-closed? c))
   (test-assert (channel-empty? c))
   (test-assert (channel-drained? c)))
 
-(test-group "synchronous channel-receive!"
+(test-group "synchronous channel-receive"
   (define c (make-channel 1))
-  (test 1 (channel-receive! c))
+  (test 1 (channel-receive c))
   (define consumer
-    (thread-start! (lambda () (channel-receive! c))))
+    (thread-start! (lambda () (channel-receive c))))
   (define producer
-    (thread-start! (lambda () (channel-enqueue! c 'foo))))
+    (thread-start! (lambda () (channel-enqueue c 'foo))))
   (test (thread-join! consumer) 'foo))
 
-(test-group "channel-fork!"
+(test-group "fork-channel"
   (define r1 (make-receiver))
   (define c1 (make-channel 1 2))
   (define r2 (make-receiver))
-  (define c2 (channel-fork! c1))
+  (define c2 (fork-channel c1))
   (on-channel-receive c1 r1)
   (on-channel-receive c2 r2)
   (test (r1) '(1 2))
   (test (r2) '(1 2))
-  (channel-enqueue! c1 3)
-  (channel-enqueue! c2 4)
+  (channel-enqueue c1 3)
+  (channel-enqueue c2 4)
   (test (r1) '(1 2 3))
   (test (r2) '(1 2 3 4))
 
   (test-assert (not (channel-closed? c2)))
-  (channel-close! c1)
+  (close-channel c1)
   (test-assert (channel-closed? c2))
 
   (test-group "gc"
     (define c (make-channel))
-    (define f (channel-fork! c))
+    (define f (fork-channel c))
     (test 1 (length (channel-forks c)))
     (set! f #f)
     (gc!!)
     (test 1 (length (channel-forks c)))
-    (channel-enqueue! c 'x)
+    (channel-enqueue c 'x)
     (test 0 (length (channel-forks c)))))
 
 (test-group "on-channel-close, on-channel-drain"
   (on-channel-drain c (lambda () (drain 'drain)))
   (test (close) '())
   (test (drain) '())
-  (channel-close! c)
+  (close-channel c)
   (test (close) '(close))
   (test (drain) '())
   (on-channel-receive c void)
    (on-channel-receive c (lambda (x) (error "boom"))))
   (define r1 (make-receiver))  
   (define r2 (make-receiver))  
-  (channel-receive! c r1)
-  (channel-receive! c (lambda (x) (error "kaboom")))
-  (channel-receive! c r2)
-  (handle-exceptions exn exn (channel-enqueue! c 2))
+  (channel-receive c r1)
+  (channel-receive c (lambda (x) (error "kaboom")))
+  (channel-receive c r2)
+  (handle-exceptions exn exn (channel-enqueue c 2))
   (test (r1) '(2))
   (test (r2) '())
-  (handle-exceptions exn exn (channel-enqueue! c 3))
+  (handle-exceptions exn exn (channel-enqueue c 3))
   (test (r2) '(3)))
 
 (test-group "make-receiving-channel"
   (define c (make-channel 1 2 3))
   (define sc (make-receiving-channel c (lambda (sc m)
-                                         (channel-enqueue! sc (+ 1 m)))))
+                                         (channel-enqueue sc (+ 1 m)))))
   (test (channel-messages sc) '(2 3 4))
   (test (channel-messages c) '())
-  (channel-enqueue! c 4)
+  (channel-enqueue c 4)
   (test (channel-messages sc) '(2 3 4 5))
   (test (channel-messages c) '())
   (test-assert (not (channel-closed? c)))
   (test-assert (not (channel-closed? sc)))
-  (channel-close! sc)
+  (close-channel sc)
   (test-assert (channel-closed? c))
   (test-assert (channel-closed? sc))
 
     (test '(1 2) (result))
     (set! rc #f)
     (gc!!)
-    (channel-enqueue! c 3)
+    (channel-enqueue c 3)
     (test '(1 2) (result))))
 
-(test-group "channel-fold!"
+(test-group "fold-channel"
   (define c (make-channel 1 2))
-  (define cf (channel-fold! c cons '()))
+  (define cf (fold-channel c cons '()))
   (test (channel-messages cf) '((1) (2 1)))
-  (channel-enqueue! c 3)
+  (channel-enqueue c 3)
   (test (channel-messages cf) '((1) (2 1) (3 2 1)))
   (test (channel-messages c) '()))
 
 (test-group "channel-map!"
   (define c (make-channel 1 2))
-  (define m (channel-map! c add1))
+  (define m (map-channel c add1))
   (test (channel-messages m) '(2 3))
-  (channel-enqueue! c 3)
+  (channel-enqueue c 3)
   (test (channel-messages m) '(2 3 4))
   (test (channel-messages c) '()))
 
-(test-group "channel-filter!"
+(test-group "filter-channel"
   (define c (make-channel 1 2 3 4))
-  (define m (channel-filter! c even?))
+  (define m (filter-channel c even?))
   (test (channel-messages m) '(2 4))
-  (channel-enqueue! c 5 6)
+  (channel-enqueue c 5 6)
   (test (channel-messages m) '(2 4 6))
   (test (channel-messages c) '()))