channel / tests / run.scm

(load-relative "../channel")
(load-relative "../channel.import")
(import channel)
(use srfi-18 test data-structures miscmacros)

(test-begin)

(define (gc!!)
  (repeat 5 (gc #t)))

(define (make-receiver)
  (let ((q (make-queue)))
    (case-lambda
     (() (queue->list q))
     ((x) (queue-add! q x)))))

(test-group "channel-receive"
  (define c (make-channel))
  (channel-enqueue c 1)
  (define results (make-receiver))
  (test-assert (channel-receive c results))
  (test (results) '(1))
  (test-assert (not (channel-receive c results)))
  (channel-enqueue c 2)
  (test (results) '(1 2))

  (define results (make-receiver))
  (channel-enqueue c 1 2)
  (channel-receive c results results)
  (channel-receive c results results)
  (test (results) '(1 1 2 2)))

(test-group "channel receivers"
  (define c (make-channel 1 2 3))
  (define results-1 (make-receiver))
  (define results-2 (make-receiver))
  (define results-3 (make-receiver))
  (on-channel-receive c results-1)
  ;; using the same receiver multiple times will register it only once
  (channel-receive c results-2 results-2)
  (channel-enqueue c 4)
  (on-channel-receive c results-3)
  (channel-enqueue c 5)
  (test (results-1) '(1 2 3 4 5))
  (test (results-2) '(4 4))
  (test (results-3) '(5))
  (channel-remove-receiver c results-1)
  (channel-enqueue c 6)
  (test (results-1) '(1 2 3 4 5))
  (test (results-3) '(5 6)))

(test-group "parallel channel receivers"

  (define results-receiver-t (make-queue))
  (define results-receiver-p (make-queue))
  (define results-receive-1  (make-queue))
  (define results-receive-2  (make-queue))

  (define (print-channel-status c)
    (display "> ") (pp (channel-messages c)))

  (define (enqueue c . x)
    (print ">> enqueuing " x)
    (apply channel-enqueue c x)
    (print-channel-status c))

  (define (test-receiver name results-queue)
    (let ((source-thread (thread-name (current-thread))))
      (print "<< " source-thread " " name)
      (lambda (x)
        (print "< " source-thread "\t"
               (thread-name (current-thread)) "\t"
               name "\t" x)
        (queue-add! results-queue x))))

  (define channel (make-channel 'foo))
  (print-channel-status channel)

  (define m (make-mutex))
  (define cv (make-condition-variable))

  (define-syntax sync
    (syntax-rules ()
      ((_ body ...)
       (begin
         (mutex-lock! m)
         body ...
         (condition-variable-signal! cv)
         (mutex-unlock! m cv)))))

  (sync
   (define thread
     (thread-start!
      (lambda ()
        (sync (channel-receive channel (test-receiver 'receive-1 results-receive-1)))
        (sync (on-channel-receive channel (test-receiver 'receiver-t results-receiver-t)))
        (sync (channel-receive channel
                                (test-receiver 'receive-2 results-receive-2)
                                (test-receiver 'receive-2 results-receive-2)))
        (enqueue channel 'frob)))))

  (sync (enqueue channel 'bar))

  (sync (on-channel-receive
         channel
         (test-receiver 'receiver-p results-receiver-p)))

  (enqueue channel 'baz 'qux 'quux)
  (condition-variable-signal! cv)
  (thread-join! thread)

  (test '(foo) (queue->list results-receive-1))
  (test '(bar baz qux quux frob) (queue->list results-receiver-t))
  (test '(baz baz) (queue->list results-receive-2))
  (test '(baz qux quux frob) (queue->list results-receiver-p)))


(test-group "closing, emptying and draining channels"
  (define c (make-channel))
  (test-assert (not (channel-closed? c)))
  (test-assert (not (channel-drained? c)))
  (test-assert (channel-empty? c))

  (test-assert (channel-enqueue c 1))
  (test-assert (not (channel-empty? c)))
  (test-assert (not (channel-closed? c)))
  (test-assert (not (channel-drained? c)))

  (channel-receive c void)
  (test-assert (channel-empty? c))
  (test-assert (not (channel-closed? c)))
  (test-assert (not (channel-drained? c)))

  (test-assert (channel-enqueue c 1))
  (close-channel c)
  (test-assert (channel-closed? c))
  (test-assert (not (channel-enqueue c 1)))
  (test-assert (not (channel-empty? c)))

  (channel-receive c void)
  (test-assert (channel-closed? c))
  (test-assert (channel-empty? c))
  (test-assert (channel-drained? c)))

(test-group "synchronous channel-receive"
  (define c (make-channel 1))
  (test 1 (channel-receive c))
  (define consumer
    (thread-start! (lambda () (channel-receive c))))
  (define producer
    (thread-start! (lambda () (channel-enqueue c 'foo))))
  (test (thread-join! consumer) 'foo)

  (test-group "with timeout"
    (define c (make-channel 'foo))
    (test 'foo (channel-receive c 3))
    (test-assert (not (channel-receive c 0.2)))
    (test 'none (channel-receive c 0.2 (lambda () 'none)))))

(test-group "fork-channel"
  (define r1 (make-receiver))
  (define c1 (make-channel 1 2))
  (define r2 (make-receiver))
  (define c2 (fork-channel c1))
  (on-channel-receive c1 r1)
  (on-channel-receive c2 r2)
  (test (r1) '(1 2))
  (test (r2) '(1 2))
  (channel-enqueue c1 3)
  (channel-enqueue c2 4)
  (test (r1) '(1 2 3))
  (test (r2) '(1 2 3 4))

  (test-assert (not (channel-closed? c2)))
  (close-channel c1)
  (test-assert (channel-closed? c2))

  (test-group "gc"
    (define c (make-channel))
    (define f (fork-channel c))
    (test 1 (length (channel-forks c)))
    (set! f #f)
    (gc!!)
    (test 1 (length (channel-forks c)))
    (channel-enqueue c 'x)
    (test 0 (length (channel-forks c)))))

(test-group "on-channel-close, on-channel-drain"
  (define close (make-receiver))
  (define drain (make-receiver))
  (define c (make-channel 1))
  (on-channel-close c (lambda () (close 'close)))
  (on-channel-drain c (lambda () (drain 'drain)))
  (test (close) '())
  (test (drain) '())
  (close-channel c)
  (test (close) '(close))
  (test (drain) '())
  (on-channel-receive c void)
  (test (close) '(close))
  (test (drain) '(drain))
  (on-channel-close c (lambda () (close 'close)))
  (on-channel-drain c (lambda () (drain 'drain)))
  (test (close) '(close close))
  (test (drain) '(drain drain)))

(test-group "errors"
  (define c (make-channel 1))
  (define r1 (make-receiver))
  (define r2 (make-receiver))
  (on-channel-receive c (lambda (x) (error "boom")))
  (channel-receive c r1)
  (on-channel-receive c r1)
  (channel-enqueue c 2)
  (channel-receive c (lambda (x) (error "kaboom")))
  (channel-receive c r2)
  (channel-enqueue c 3)
  (test (r1) '(1 2 3))
  (test (r2) '(3)))

(test-group "on-channel-error"
  (define channel (make-channel))
  (define err #f)
  (define msg #f)
  (on-channel-receive
   channel
   (lambda _
     (error 'in-the-error-handler "explosion")))
  (on-channel-error
   channel (lambda (e m)
             (set! err e)
             (set! msg m)))
  (channel-enqueue channel 'foo)
  (test-assert (condition? err))
  (test (get-condition-property err 'exn 'location) 'in-the-error-handler)
  (test msg 'foo))

(test-group "siphon-channel"
  (define c (make-channel 1 2 3))
  (define sc (make-channel))
  (siphon-channel c sc (lambda (sc m)
                         (channel-enqueue sc (+ 1 m))))
  (test (channel-messages sc) '(2 3 4))
  (test (channel-messages c) '())
  (channel-enqueue c 4)
  (test (channel-messages sc) '(2 3 4 5))
  (test (channel-messages c) '())
  (test-assert (not (channel-closed? c)))
  (test-assert (not (channel-closed? sc)))

  (test-group "gc"
    (define c (make-channel 1 2))
    (define result (make-receiver))
    (define rc (siphon-channel c (make-channel) (lambda (_ m) (result m))))
    (test '(1 2) (result))
    (set! rc #f)
    (gc!!)
    (channel-enqueue c 3)
    (test '(1 2) (result)))

  (test-group "close propagation"
    (define c1     (make-channel))
    (define c1.1   (siphon-channel c1))
    (define c1.2   (siphon-channel c1))
    (define c1.2.1 (siphon-channel c1.2))
    (close-channel c1.2.1)
    (test-assert (channel-closed? c1.2.1))
    (test-assert (channel-closed? c1.2))
    (test-assert (not (channel-closed? c1.1)))
    (test-assert (not (channel-closed? c1)))
    (close-channel c1.1)
    (test-assert (channel-closed? c1.1))
    (test-assert (channel-closed? c1))))

(test-group "fold-channel"
  (define c (make-channel 1 2))
  (define cf (fold-channel c cons '()))
  (test (channel-messages cf) '((1) (2 1)))
  (channel-enqueue c 3)
  (test (channel-messages cf) '((1) (2 1) (3 2 1)))
  (test (channel-messages c) '()))

(test-group "map-channel"
  (define c (make-channel 1 2))
  (define m (map-channel c add1))
  (test (channel-messages m) '(2 3))
  (channel-enqueue c 3)
  (test (channel-messages m) '(2 3 4))
  (test (channel-messages c) '()))

(test-group "filter-channel"
  (define c (make-channel 1 2 3 4))
  (define m (filter-channel c even?))
  (test (channel-messages m) '(2 4))
  (channel-enqueue c 5 6)
  (test (channel-messages m) '(2 4 6))
  (test (channel-messages c) '()))

(test-group "channel-receive/delay"
  (define c (make-channel))
  (define p1 (channel-receive/delay c))
  (define p2 (channel-receive/delay c))
  (test-assert (promise? p1))
  (test-assert (promise? p2))
  (channel-enqueue c 'foo)
  (test 'foo (force p1))
  (test 'foo (force p2))
  (define p3 (channel-receive/delay c 0.5))
  (test-assert (not (force p3))))

(test-group "siphon-input-port"
  (define port (open-input-string "this is a test"))
  (define-values (next channel) 
    (siphon-input-port port read))

  (let loop ()
    (when (next)
      (loop)))

  (test '(this is a test) (channel-messages channel)))

(test-group "flush-channel-to-output-port"
  (define port (open-output-string))
  (define channel (make-channel 'foo 'bar))
  (flush-channel-to-output-port channel port write)
  (test "foobar" (get-output-string port))
  (channel-enqueue channel 'baz)
  (test "foobarbaz" (get-output-string port))

  ;; what's sensible in this case? detaching the receiver?
  ;; (test-group "closing the port"
  ;;   (close-output-port port)
  ;;   (channel-enqueue channel 'boom))
  )

(test-end)

(test-exit)
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.