Source

channel / channel.scm

;; based on https://github.com/ztellman/lamina/

(module channel

(make-channel
 ;; derivators
 fork-channel fold-channel map-channel filter-channel
 siphon-channel

 ;; operations
 channel-enqueue channel-receive channel-receive/delay
 channel-remove-receiver close-channel

 ;; callbacks
 on-channel-receive on-channel-close on-channel-drain

 ;; accessors
 channel-messages channel-forks

 ;; predicates
 channel-empty? channel-closed? channel-drained?)

(import chicken scheme)
(use data-structures srfi-18 srfi-1
     (only srfi-1 filter)
     (only miscmacros push!)
     (only lolevel make-weak-locative locative->object))

(define-record channel
  (setter closed?)
  (setter receivers)
  (setter forks)
  onetime-receivers
  queue
  on-close-handlers
  on-drain-handlers
  mutex
  cvar)

(define-record-printer (channel c out)
  (display "#<channel" out)
  (for-each (lambda (x) (display " " out) (write x out))
            (queue->list (channel-queue c)))
  (display ">" out))

;; semaphore implementation stolen from the 9p egg

(define (have-lock? mutex)
  (eq? (mutex-state mutex) (current-thread)))

(define (semaphore-lock! mutex)
  (if (have-lock? mutex)
      (mutex-specific-set! mutex (add1 (mutex-specific mutex)))
      (begin
	(mutex-lock! mutex)
	(mutex-specific-set! mutex 0))))

(define (semaphore-unlock! mutex)
  (cond
   ((not (have-lock? mutex))
    (error "The current thread does not own the mutex!"))
   ((> (mutex-specific mutex) 0)
    (mutex-specific-set! mutex (sub1 (mutex-specific mutex))))
   (else (mutex-unlock! mutex))))

(define (with-semaphore mutex thunk)
  (dynamic-wind
      (lambda () (semaphore-lock! mutex))
      thunk
      (lambda () (semaphore-unlock! mutex))))

(define (with-locked-channel channel thunk)
  (with-semaphore (channel-mutex channel) thunk))

(define %make-channel make-channel)

(define (queue-add-list! queue items)
  (for-each
   (lambda (item)
     (queue-add! queue item))
   items))

(define (call-each arg procs)
  (for-each (lambda (proc) (proc arg)) procs))

(define (run-callback-queue! queue . args)
  (let loop ()
    (unless (queue-empty? queue)
      (apply (queue-remove! queue) args)
      (loop))))

(define (make-channel . messages)
  (%make-channel #f '() '()
                 (make-queue)
                 (list->queue messages)
                 (make-queue)
                 (make-queue)
                 (make-mutex)
                 (make-condition-variable)))

(define (channel-messages channel)
  (queue->list (channel-queue channel)))

(define (channel-has-receivers? channel)
  (not (and (null? (channel-receivers channel))
            (queue-empty? (channel-onetime-receivers channel)))))

(define (flush-channel channel)
  (let ((queue     (channel-queue channel))
        (receivers (channel-receivers channel))
        (onetime-receivers (channel-onetime-receivers channel)))
    (with-locked-channel channel
      (lambda ()
        (let loop ()
          (when (and (not (queue-empty? queue)) (channel-has-receivers? channel))
            (let ((message (queue-remove! queue)))
              (unless (queue-empty? onetime-receivers)
                (run-callback-queue! onetime-receivers message)
                (condition-variable-broadcast! (channel-cvar channel)))
              (call-each message receivers))
            (loop)))
        (when (channel-drained? channel)
          (run-callback-queue! (channel-on-drain-handlers channel)))))))

(define (channel-enqueue channel message . messages)
  (and (not (channel-closed? channel))
       (let ((queue (channel-queue channel))
             (messages (cons message messages)))
         (with-locked-channel channel
           (lambda ()
             (queue-add-list! queue messages)
             (set! (channel-forks channel)
                   (filter (lambda (fork)
                             (and-let* ((fork (locative->object fork)))
                               (unless (null? messages)
                                 (apply channel-enqueue fork messages))))
                           (channel-forks channel)))
             (flush-channel channel)))
         #t)))

(define (on-channel-receive channel receiver)
  (with-locked-channel channel
    (lambda ()
      (push! receiver (channel-receivers channel))
      (flush-channel channel))))

(define (channel-remove-receiver channel receiver)
  (with-locked-channel channel
    (lambda ()
      (set! (channel-receivers channel)
            (remove (lambda (r) (eq? r receiver))
                    (channel-receivers channel))))))

(define (channel-receive channel . args)
  (let* ((mutex   (channel-mutex channel))
         (queue   (channel-queue channel))
         (rqueue  (channel-onetime-receivers channel))
         (cvar    (channel-cvar channel))
         (timeout (and (pair? args)
                       (let ((t (car args)))
                         (and (number? t) t))))
         (default (and timeout 
                       (pair? (cdr args))
                       (cadr args)))
         (sync?   (or timeout (null? args) (not (procedure? (car args))))))

    (when (have-lock? mutex)
      (error "can't receive from already locked channel"))

    (mutex-lock! mutex)

    (if sync?
        (if (queue-empty? queue)
            (let ((message #f))
              ;; FIXME: When the timeout is reached this callback is
              ;; only cleaned up when the next message is
              ;; enqueued. It's not really a problem but not
              ;; particularly neat either.
              (queue-add! rqueue (lambda (x) (set! message x)))
              (if (mutex-unlock! mutex cvar timeout)
                  message
                  (and default (default))))
            (let ((message (queue-remove! queue)))
              (mutex-unlock! mutex)
              message))
        (if (queue-empty? queue)
            (begin
              (queue-add-list! rqueue args)
              (mutex-unlock! mutex)
              #f)
            (let ((message (queue-remove! queue)))
              (call-each message args)
              (mutex-unlock! mutex)
              #t)))))

(define (channel-receive/delay channel #!optional timeout default)
  (let ((result (thread-start!
                 (lambda ()
                   (channel-receive channel timeout default)))))
    (thread-yield!)
    (delay (thread-join! result))))

(define (close-channel channel)
  (set! (channel-closed? channel) #t)
  (run-callback-queue! (channel-on-close-handlers channel)))

(define (channel-empty? channel)
  (queue-empty? (channel-queue channel)))

(define (channel-drained? channel)
  (and (channel-closed? channel)
       (channel-empty? channel)))

(define (fork-channel channel)
  (with-locked-channel channel
    (lambda ()
      (let* ((fork  (apply make-channel (channel-messages channel)))
             (fork* (make-weak-locative fork)))
        (push! fork* (channel-forks channel))
        (on-channel-close
         channel
         (lambda ()
           (and-let* ((fork (locative->object fork*)))
             (close-channel fork))))
        fork))))

(define (on-channel-close channel thunk)
  (if (channel-closed? channel)
      (thunk)
      (queue-add! (channel-on-close-handlers channel) thunk)))

(define (on-channel-drain channel thunk)
  (if (channel-drained? channel)
      (thunk)
      (queue-add! (channel-on-drain-handlers channel) thunk)))

(define (siphon-channel source-channel
                        #!optional
                        (destination-channel (make-channel))
                        (on-receive channel-enqueue))
  (let ((receiver (let ((rc* (make-weak-locative destination-channel)))
                    (lambda (message)
                      (and-let* ((rc (locative->object rc*)))
                        (on-receive rc message))))))
    (on-channel-receive
     source-channel
     receiver)
    (on-channel-close
     destination-channel
     (lambda ()
       (with-locked-channel source-channel
         (lambda ()
           (channel-remove-receiver source-channel receiver)
           (unless (channel-has-receivers? source-channel)
             (close-channel source-channel))))))
    (set-finalizer!
     destination-channel
     (lambda (x)
       (channel-remove-receiver source-channel receiver)))))

(define (fold-channel channel proc seed)
  (let ((accumulator (make-mutex)))
    (mutex-specific-set! accumulator seed)
    (siphon-channel
     channel
     (make-channel)
     (lambda (folding-channel message)
       (mutex-lock! accumulator)
       (let ((acc (proc message (mutex-specific accumulator))))
         (mutex-specific-set! accumulator acc)
         (channel-enqueue folding-channel acc)
         (mutex-unlock! accumulator))))))

(define (map-channel channel proc)
  (siphon-channel
   channel
   (make-channel)
   (lambda (mapping-channel message)
     (channel-enqueue mapping-channel (proc message)))))

(define (filter-channel channel pred?)
  (siphon-channel
   channel
   (make-channel)
   (lambda (filtering-channel message)
     (when (pred? message)
       (channel-enqueue filtering-channel message)))))

)