Source

channel / channel.scm

;; based on https://github.com/ztellman/lamina/

(module channel

(make-channel
 ;; derivators
 fork-channel fold-channel map-channel filter-channel
 make-receiving-channel

 ;; operations
 channel-enqueue channel-receive channel-remove-receiver close-channel

 ;; callbacks
 on-channel-receive on-channel-close on-channel-drain

 ;; accessors
 channel-messages channel-forks

 ;; predicates
 channel-empty? channel-closed? channel-drained?)

(import chicken scheme)
(use data-structures srfi-18 srfi-1
     (only srfi-1 filter)
     (only miscmacros push!)
     (only lolevel make-weak-locative locative->object))

(define-record channel
  (setter closed?)
  (setter receivers)
  (setter forks)
  onetime-receivers
  queue
  on-close-handlers
  on-drain-handlers
  mutex
  cvar)

(define-record-printer (channel c out)
  (display "#<channel" out)
  (for-each (lambda (x) (display " " out) (write x out))
            (queue->list (channel-queue c)))
  (display ">" out))

;; semaphore implementation stolen from the 9p egg

(define (have-lock? mutex)
  (eq? (mutex-state mutex) (current-thread)))

(define (semaphore-lock! mutex)
  (if (have-lock? mutex)
      (mutex-specific-set! mutex (add1 (mutex-specific mutex)))
      (begin
	(mutex-lock! mutex)
	(mutex-specific-set! mutex 0))))

(define (semaphore-unlock! mutex)
  (cond
   ((not (have-lock? mutex))
    (error "The current thread does not own the mutex!"))
   ((> (mutex-specific mutex) 0)
    (mutex-specific-set! mutex (sub1 (mutex-specific mutex))))
   (else (mutex-unlock! mutex))))

(define (with-semaphore mutex thunk)
  (dynamic-wind
      (lambda () (semaphore-lock! mutex))
      thunk
      (lambda () (semaphore-unlock! mutex))))

(define (with-locked-channel channel thunk)
  (with-semaphore (channel-mutex channel) thunk))

(define %make-channel make-channel)

(define (queue-add-list! queue items)
  (for-each
   (lambda (item)
     (queue-add! queue item))
   items))

(define (call-each arg procs)
  (for-each (lambda (proc) (proc arg)) procs))

(define (run-callback-queue! queue . args)
  (let loop ()
    (unless (queue-empty? queue)
      (apply (queue-remove! queue) args)
      (loop))))

(define (make-channel . messages)
  (%make-channel #f '() '()
                 (make-queue)
                 (list->queue messages)
                 (make-queue)
                 (make-queue)
                 (make-mutex)
                 (make-condition-variable)))

(define (channel-messages channel)
  (queue->list (channel-queue channel)))

(define (channel-has-receivers? channel)
  (not (and (null? (channel-receivers channel))
            (queue-empty? (channel-onetime-receivers channel)))))

(define (flush-channel channel)
  (let ((queue     (channel-queue channel))
        (receivers (channel-receivers channel))
        (onetime-receivers (channel-onetime-receivers channel)))
    (with-locked-channel channel
      (lambda ()
        (let loop ()
          (when (and (not (queue-empty? queue)) (channel-has-receivers? channel))
            (let ((message (queue-remove! queue)))
              (unless (queue-empty? onetime-receivers)
                (run-callback-queue! onetime-receivers message)
                (condition-variable-broadcast! (channel-cvar channel)))
              (call-each message receivers))
            (loop)))
        (when (channel-drained? channel)
          (run-callback-queue! (channel-on-drain-handlers channel)))))))

(define (channel-enqueue channel message . messages)
  (and (not (channel-closed? channel))
       (let ((queue (channel-queue channel))
             (messages (cons message messages)))
         (with-locked-channel channel
           (lambda ()
             (queue-add-list! queue messages)
             (set! (channel-forks channel)
                   (filter (lambda (fork)
                             (and-let* ((fork (locative->object fork)))
                               (unless (null? messages)
                                 (apply channel-enqueue fork messages))))
                           (channel-forks channel)))
             (flush-channel channel)))
         #t)))

(define (on-channel-receive channel receiver)
  (with-locked-channel channel
    (lambda ()
      (push! receiver (channel-receivers channel))
      (flush-channel channel))))

(define (channel-remove-receiver channel receiver)
  (with-locked-channel channel
    (lambda ()
      (set! (channel-receivers channel)
            (remove (lambda (r) (eq? r receiver))
                    (channel-receivers channel))))))

(define (channel-receive channel . args)
  (let* ((mutex   (channel-mutex channel))
         (queue   (channel-queue channel))
         (rqueue  (channel-onetime-receivers channel))
         (cvar    (channel-cvar channel))
         (timeout (and (pair? args)
                       (let ((t (car args)))
                         (and (number? t) t))))
         (default (and timeout 
                       (pair? (cdr args))
                       (cadr args)))
         (sync?   (or timeout (null? args))))

    (when (have-lock? mutex)
      (error "can't receive from already locked channel"))

    (mutex-lock! mutex)

    (if sync?
        (if (queue-empty? queue)
            (let ((message #f))
              ;; FIXME: When the timeout is reached this callback is
              ;; only cleaned up when the next message is
              ;; enqueued. It's not really a problem but not
              ;; particularly neat either.
              (queue-add! rqueue (lambda (x) (set! message x)))
              (if (mutex-unlock! mutex cvar timeout)
                  message
                  (and default (default))))
            (let ((message (queue-remove! queue)))
              (mutex-unlock! mutex)
              message))
        (if (queue-empty? queue)
            (begin
              (queue-add-list! rqueue args)
              (mutex-unlock! mutex)
              #f)
            (let ((message (queue-remove! queue)))
              (call-each message args)
              (mutex-unlock! mutex)
              #t)))))

(define (close-channel channel)
  (set! (channel-closed? channel) #t)
  (run-callback-queue! (channel-on-close-handlers channel)))

(define (channel-empty? channel)
  (queue-empty? (channel-queue channel)))

(define (channel-drained? channel)
  (and (channel-closed? channel)
       (channel-empty? channel)))

(define (fork-channel channel)
  (with-locked-channel channel
    (lambda ()
      (let* ((fork  (apply make-channel (channel-messages channel)))
             (fork* (make-weak-locative fork)))
        (push! fork* (channel-forks channel))
        (on-channel-close
         channel
         (lambda ()
           (and-let* ((fork (locative->object fork*)))
             (close-channel fork))))
        fork))))

(define (on-channel-close channel thunk)
  (if (channel-closed? channel)
      (thunk)
      (queue-add! (channel-on-close-handlers channel) thunk)))

(define (on-channel-drain channel thunk)
  (if (channel-drained? channel)
      (thunk)
      (queue-add! (channel-on-drain-handlers channel) thunk)))

(define (make-receiving-channel source-channel on-receive)
  (let* ((receiving-channel (make-channel))
         (receiver (let ((rc* (make-weak-locative receiving-channel)))
                     (lambda (message)
                       (and-let* ((rc (locative->object rc*)))
                         (on-receive rc message))))))
    (on-channel-receive
     source-channel
     receiver)
    (on-channel-close
     receiving-channel
     (lambda ()
       (with-locked-channel source-channel
         (lambda ()
           (channel-remove-receiver source-channel receiver)
           (unless (channel-has-receivers? source-channel)
             (close-channel source-channel))))))
    (set-finalizer!
     receiving-channel
     (lambda (x)
       (channel-remove-receiver source-channel receiver)))))

(define (fold-channel channel proc seed)
  (let ((accumulator (make-mutex)))
    (mutex-specific-set! accumulator seed)
    (make-receiving-channel
     channel
     (lambda (folding-channel message)
       (mutex-lock! accumulator)
       (let ((acc (proc message (mutex-specific accumulator))))
         (mutex-specific-set! accumulator acc)
         (channel-enqueue folding-channel acc)
         (mutex-unlock! accumulator))))))

(define (map-channel channel proc)
  (make-receiving-channel
   channel
   (lambda (mapping-channel message)
     (channel-enqueue mapping-channel (proc message)))))

(define (filter-channel channel pred?)
  (make-receiving-channel
   channel
   (lambda (filtering-channel message)
     (when (pred? message)
       (channel-enqueue filtering-channel message)))))

)