channel / channel.scm

;; based on https://github.com/ztellman/lamina/

(module channel

(make-channel
 ;; derivators
 fork-channel fold-channel map-channel filter-channel
 siphon-channel siphon-input-port flush-channel-to-output-port

 ;; operations
 channel-enqueue channel-receive channel-receive/delay
 channel-remove-receiver close-channel

 ;; callbacks
 on-channel-receive on-channel-close 
 on-channel-drain on-channel-error

 ;; accessors
 channel-messages channel-forks

 ;; predicates
 channel-empty? channel-closed? channel-drained?)

(import chicken scheme)
(use data-structures srfi-18 srfi-1
     (only srfi-1 filter)
     (only miscmacros push!)
     (only lolevel make-weak-locative locative->object))

(include "set.scm")

(define-record channel
  (setter closed?)
  (setter receivers)
  (setter forks)
  onetime-receivers
  queue
  on-close-handlers
  on-drain-handlers
  on-error-handlers
  mutex
  cvar)

(define-record-printer (channel c out)
  (display "#<channel" out)
  (for-each (lambda (x) (display " " out) (write x out))
            (queue->list (channel-queue c)))
  (display ">" out))

;; semaphore implementation stolen from the 9p egg

(define (have-lock? mutex)
  (eq? (mutex-state mutex) (current-thread)))

(define (semaphore-lock! mutex)
  (if (have-lock? mutex)
      (mutex-specific-set! mutex (add1 (mutex-specific mutex)))
      (begin
	(mutex-lock! mutex)
	(mutex-specific-set! mutex 0))))

(define (semaphore-unlock! mutex)
  (cond
   ((not (have-lock? mutex))
    (error "The current thread does not own the mutex!"))
   ((> (mutex-specific mutex) 0)
    (mutex-specific-set! mutex (sub1 (mutex-specific mutex))))
   (else (mutex-unlock! mutex))))

(define (with-semaphore mutex thunk)
  (dynamic-wind
      (lambda () (semaphore-lock! mutex))
      thunk
      (lambda () (semaphore-unlock! mutex))))

(define (with-locked-channel channel thunk)
  (with-semaphore (channel-mutex channel) thunk))

(define %make-channel make-channel)

(define (queue-add-list! queue items)
  (for-each
   (lambda (item)
     (queue-add! queue item))
   items))

(define (safe-apply on-error proc args)
  (handle-exceptions exn
    (begin (apply on-error exn args) #f)
    (begin (apply proc args) #t)))

(define (safe-apply-some on-error-handlers procs . args)
  (let ((on-error (if (set-empty? on-error-handlers)
                      print-exn
                      (lambda args
                        (run-callbacks on-error-handlers args)))))
    (fold (lambda (proc handled?)
            (or (safe-apply on-error proc args) handled?))
          #f
          procs)))

(define (print-exn exn . _)
  (print-error-message exn (current-error-port)))

(define (run-callbacks callbacks args)
  (set-fold
   callbacks
   (lambda (callback handled?)
     (or (safe-apply print-exn callback args) handled?))
   #f))

(define (run-callbacks! callbacks . args)
  (define handled? (run-callbacks callbacks args))
  (set-clear! callbacks)
  handled?)

(define (make-channel . messages)
  (%make-channel #f '() '()
                 (make-set eq?)
                 (list->queue messages)
                 (make-set eq?)
                 (make-set eq?)
                 (make-set eq?)
                 (make-mutex)
                 (make-condition-variable)))

(define (channel-messages channel)
  (queue->list (channel-queue channel)))

(define (channel-has-receivers? channel)
  (not (and (null? (channel-receivers channel))
            (set-empty? (channel-onetime-receivers channel)))))

(define (flush-channel channel)
  (let ((queue     (channel-queue channel))
        (receivers (channel-receivers channel))
        (onetime-receivers (channel-onetime-receivers channel)))
    (with-locked-channel channel
      (lambda ()
        (let loop ()
          (when (and (not (queue-empty? queue))
                     (channel-has-receivers? channel))
            (let ((message (queue-first queue))
                  (handled? #f))
              (unless (set-empty? onetime-receivers)
                (when (run-callbacks! onetime-receivers message)
                  (set! handled? #t)
                  (condition-variable-broadcast! (channel-cvar channel))))
              (when (or (safe-apply-some (channel-on-error-handlers channel)
                                         receivers
                                         message)
                        handled?)
                (queue-remove! queue)
                (loop)))))
        (when (channel-drained? channel)
          (run-callbacks! (channel-on-drain-handlers channel)))))))

(define (channel-enqueue channel message . messages)
  (and (not (channel-closed? channel))
       (let ((queue (channel-queue channel))
             (messages (cons message messages)))
         (with-locked-channel channel
           (lambda ()
             (queue-add-list! queue messages)
             (set! (channel-forks channel)
                   (filter (lambda (fork)
                             (and-let* ((fork (locative->object fork)))
                               (unless (null? messages)
                                 (apply channel-enqueue fork messages))))
                           (channel-forks channel)))
             (flush-channel channel)))
         #t)))

(define (on-channel-receive channel receiver)
  (with-locked-channel channel
    (lambda ()
      (push! receiver (channel-receivers channel))
      (flush-channel channel))))

(define (channel-remove-receiver channel receiver)
  (with-locked-channel channel
    (lambda ()
      (set! (channel-receivers channel)
            (remove (lambda (r) (eq? r receiver))
                    (channel-receivers channel))))))

(define (channel-receive channel . args)
  (dynamic-wind
      (lambda ()
        (let ((mutex (channel-mutex channel)))
          (when (have-lock? mutex)
            (error "can't receive from already locked channel"))
          (mutex-lock! mutex)))
      (lambda ()
        (let* ((queue   (channel-queue channel))
               (otrec   (channel-onetime-receivers channel))
               (cvar    (channel-cvar channel))
               (timeout (and (pair? args)
                             (let ((t (car args)))
                               (and (number? t) t))))
               (default (and timeout 
                             (pair? (cdr args))
                             (cadr args)))
               (sync?   (or timeout (null? args) (not (procedure? (car args))))))
          (if sync?
              (if (queue-empty? queue)
                  (let ((message #f))
                    (let ((receiver (lambda (x)
                                      (set! message x))))
                      (set-insert! otrec receiver)
                      (if (mutex-unlock! (channel-mutex channel) cvar timeout)
                          message
                          (with-locked-channel channel
                            (lambda ()
                              (or message
                                  (begin
                                    (set-remove! otrec receiver)
                                    (and default (default)))))))))
                  (queue-remove! queue))
              (if (queue-empty? queue)
                  (begin
                    (for-each
                     (lambda (receiver)
                       (set-insert! otrec (lambda (m) (receiver m))))
                     args)
                    #f)
                  (let ((message (queue-first queue)))
                    (when (safe-apply-some (channel-on-error-handlers channel)
                                           args
                                           message)
                      (queue-remove! queue))
                    #t)))))
      (lambda ()
        (let ((mutex (channel-mutex channel)))
          (when (have-lock? mutex)
            (mutex-unlock! mutex))))))

(define (channel-receive/delay channel #!optional timeout default)
  (let ((result (thread-start!
                 (lambda ()
                   (channel-receive channel timeout default)))))
    (thread-yield!)
    (delay (thread-join! result))))

(define (close-channel channel)
  (set! (channel-closed? channel) #t)
  (run-callbacks! (channel-on-close-handlers channel)))

(define (channel-empty? channel)
  (queue-empty? (channel-queue channel)))

(define (channel-drained? channel)
  (and (channel-closed? channel)
       (channel-empty? channel)))

(define (fork-channel channel)
  (with-locked-channel channel
    (lambda ()
      (let* ((fork  (apply make-channel (channel-messages channel)))
             (fork* (make-weak-locative fork)))
        (push! fork* (channel-forks channel))
        (on-channel-close
         channel
         (lambda ()
           (and-let* ((fork (locative->object fork*)))
             (close-channel fork))))
        fork))))

(define (on-channel-close channel thunk)
  (if (channel-closed? channel)
      (thunk)
      (set-insert! (channel-on-close-handlers channel) thunk)))

(define (on-channel-drain channel thunk)
  (if (channel-drained? channel)
      (thunk)
      (set-insert! (channel-on-drain-handlers channel) thunk)))

(define (on-channel-error channel proc)
  (set-insert! (channel-on-error-handlers channel) proc))

(define (siphon-channel source-channel
                        #!optional
                        (destination-channel (make-channel))
                        (on-receive channel-enqueue))
  (let ((receiver (let ((rc* (make-weak-locative destination-channel)))
                    (lambda (message)
                      (and-let* ((rc (locative->object rc*)))
                        (on-receive rc message))))))
    (on-channel-receive
     source-channel
     receiver)
    (on-channel-close
     destination-channel
     (lambda ()
       (with-locked-channel source-channel
         (lambda ()
           (channel-remove-receiver source-channel receiver)
           (unless (channel-has-receivers? source-channel)
             (close-channel source-channel))))))
    (set-finalizer!
     destination-channel
     (lambda (_)
       (channel-remove-receiver source-channel receiver)))))

(define (fold-channel channel proc seed)
  (let ((accumulator (make-mutex)))
    (mutex-specific-set! accumulator seed)
    (siphon-channel
     channel
     (make-channel)
     (lambda (folding-channel message)
       (mutex-lock! accumulator)
       (let ((acc (proc message (mutex-specific accumulator))))
         (mutex-specific-set! accumulator acc)
         (channel-enqueue folding-channel acc)
         (mutex-unlock! accumulator))))))

(define (map-channel channel proc)
  (siphon-channel
   channel
   (make-channel)
   (lambda (mapping-channel message)
     (channel-enqueue mapping-channel (proc message)))))

(define (filter-channel channel pred?)
  (siphon-channel
   channel
   (make-channel)
   (lambda (filtering-channel message)
     (when (pred? message)
       (channel-enqueue filtering-channel message)))))

(define (siphon-input-port port read #!optional (channel (make-channel)))
  (values (lambda ()
            (let ((message (read port)))
              (and (not (eof-object? message))
                   (channel-enqueue channel message))))
          channel))

(define (flush-channel-to-output-port channel port write)
  (on-channel-receive
   channel
   (lambda (message)
     (write message port))))

)
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.