clj-estuary / src / estuary / core.clj

(ns estuary.core
 "Easy dataflow graph construction using standard clojure agents.
Graphs can be de/serialized as JSON."
 (:require [clj-json.core :as json])
 (:import (java.util UUID)))

(defn dispatch
  "Dispatches a value to the list of node targets. Returns the value.
Every node handler must call this function to ensure value propagation."
  [targets value]
  (if-not (nil? (seq targets))
    (doseq [t targets]
      (if-not (nil? (:func @(:agent t)))
        (let [a (:agent t)
              func (resolve (symbol (:func @a)))
              port (:port t)]
          (send a func port value))))))

(defn trigger
  "If only the node is specified, triggers the propagation of the node's current
value to all of its targets.
If key and value are given, triggers the node's update handler."
  (send n (resolve (symbol (:func @n)))))
  ([n k v]
    (send n (resolve (symbol (:func @n))) k v)))

(defn ports
  "Filters the given node map and returns a collection of values for keys matching the
given regexp (default \"in\\d+\")."
    (ports state #"in\d+"))
  ([state re]
    (map (fn [v] (val v)) (filter #(re-matches re (name (key %))) state))))

(defn filter-targets
  ([targets key]
    (filter (fn[t] (get t key)) targets))
  ([targets key v]
    (filter (fn[t] (= (get t key) v)) targets)))

(defn demux-value
  "Update handler for accumulator nodes with multiple inputs.
First sets the specified input to the given value and then propagates the value
to all matching targets with their :demux port matching the current input id."
  [state id v]
  (let [newstate (assoc state id v)]
    (dispatch (filter-targets (:targets newstate) :demux id) v)

(defn sum-accu
  "Default accumulator function used by set-and-accumulate.
Sums all input ports of a node."
  (reduce + (ports state)))

(defn set-and-accumulate
  "Handler function for an accumulator node with multiple :in terminals (:in0, :in1 etc.)
First sets an input (ID) to the given value and then sets :value to the result of the
configured accumulator function applied to all existing input keys.
Result value is then propagated to all targets."
    (dispatch (:targets state) (:value state))
  ([state id value]
  (let [tmp (assoc state id value)
        newvalue ((resolve (symbol (:accu-func state))) tmp)
        newstate (assoc tmp :value newvalue)]
    (dispatch (:targets newstate) newvalue)

(defn set-value
  "Handler function for (mainly) single value nodes.
Sets the value for the given key ID and propagates it to all targets."
    (dispatch (:targets state) (:value state))
  ([state val] (set-value state :value val))
  ([state id val]
    (let [newstate (assoc state id val)]
      (dispatch (:targets newstate) val)

(defmacro node-for-id
  "Returns the graph node with the given UUID."
  [graph uuid]
  `(get (:nodes (deref ~graph)) ~uuid))

(defn all-node-ids
  "Returns a set of all node IDs in the graph."
  (reduce #(conj %1 %2) #{} (keys (:nodes @graph))))

(defn target-node-ids
  "Returns a set of all node IDs which are targets of other nodes in the graph."
    (fn[targets n]
      (let [ids (map #(:id %) (:targets @(val n)))]
        (if-not (nil? (seq ids))
          (reduce #(conj %1 %2) targets ids)
    #{} (:nodes @graph)))

(defn nodes-with-targets
  "Returns a set of all node IDs which have defined targets in the graph."
  (map #(key %)
         #(> (count (:targets @(val %))) 0)
         (:nodes @graph))))

(defn emitters
  "Returns a set of all node IDs which have defined targets, but are
not targets themselves in the graph, the opposite of the set returned
by (leaves)."
  (let [all (all-node-ids graph)
        targets (target-node-ids graph)]
    (reduce #(disj %1 %2) all targets)))

(defn leaves
  "Returns a set of all node IDs which have no targets, but are
targets themselves in the graph, the opposite of the set returned
by (emitters)."
  (let [all (all-node-ids graph)
        with-targets (nodes-with-targets graph)]
    (reduce #(disj %1 %2) all with-targets)))

(defn trigger-emitters
  (map #(trigger (node-for-id graph %)) (emitters graph)))

(defn node-target-tree
  "Constructs a nested vector of all direct & indirect target IDs
  for the given node. The graph MUST be acyclic.

  For node A in the following graph this structure is produced:

   / \\
  B   C      -> [B [D [E]] [C [E]] F]
  |    \\
  (let [tlist (:targets @n)]
    (if (= (count tlist) 0)
        (fn[branch t]
          (let [a (:agent t)
                b (node-target-tree a)]
            (if (nil? b)
              (conj branch (:id @a))
              (conj branch (:id @a) b))))
          [] tlist))))

(defn conn!
  "Creates a connection from the given source node to a target node.
The nodes need to be either supplied as node references or UUIDs.
The port parameter is the key of the target's input terminal.
Custom routing data can be added via an optional map."
  ([src target port] (conn! src target port nil))
  ([src target port custom]
    (send-off src
      (fn [state]
        (assoc state
               :targets (conj (:targets state)
                              (merge {:agent target :id (:id @target) :port port} custom)))))
    (await src)
    (spit "foo.txt" @src))
  ([g sid tid port custom]
      (conn! (node-for-id g sid) (node-for-id g tid) port custom)))

(defn disconn!
  "Removes the connection between two nodes, either as defined by their
node UUIDs and port name or via node references."
  ([src target port]
    (let [tid (:id @target)
          targets (filter #(not (and (= (:id %) tid) (= (:port %) port))) (:targets @src))]
      (send-off src assoc :targets targets) (await src)))
  ([g sid tid port]
    (let [src (node-for-id g sid)
          targets (filter #(not (and (= (:id %) tid) (= (:port %) port))) (:targets @src))]
      (send src assoc :targets targets)
      (await src))))

(defmacro defgraph
  "Creates a new atom based map container for nodes and assigns it to the given variable.
If also supplied with JSON string, the graph is initialized using (deserialize-graph)."
  `(def ~id (atom {:nodes {}})))
  ([id json]
    `(do (defgraph ~id) (deserialize-graph ~id ~json))))

(defmacro defnode
  "Creates a new agent based node with the given initial state and update handler function.
The function needs to be supplied as fully qualified symbol (use ` syntax quote)."
  ([graph id state] `(defnode ~graph ~id nil nil ~state))
  ([graph id func state] `(defnode ~graph ~id nil ~func ~state))
  ([graph id init func state]
  `(def ~id
     (let [state# (if (nil? ~init)
                    ((resolve (symbol ~init)) ~state))
           a# (agent (assoc
                       (if (nil? (:targets state#))
                         (assoc state# :targets ())
                       :init (if (nil? ~init) nil (str ~init))
                       :func (if (nil? ~func) nil (str ~func))
                       :id (.toString (UUID/randomUUID))))]
       (swap! ~graph
              (fn[g# n#] (assoc g# :nodes (assoc (:nodes g#) (:id @n#) n#)))

(defmacro defaccu
  "Similar to defnode. Creates a node with multiple input terminals
(:in0, :in1... all of them set to 0). Intended to be used with update function
set-and-accumulate. If no accumulator function is
specified sum-accu is used by default."
  ([g id func numinputs] `(defaccu ~g ~id ~func `sum-accu ~numinputs))
  ([g id func accu-func numinputs]
  `(defnode ~g ~id ~func
     (assoc (reduce #(assoc %1 (keyword (str "in" %2)) 0) {} (range ~numinputs))
            :accu-func (str ~accu-func)))))

(defmacro aliasnode
  "Assigns the graph node with the given UUID to a new variable. Useful for working with deserialized graphs."
  [graph id uuid]
  `(def ~id (node-for-id ~graph ~uuid)))

(defn serialize-graph
  "Serializes the given graph to a JSON string."
  (let [nodes
        (map (fn[n]
               (let [nv @(val n)]
                 (assoc nv :targets (map (fn[t] (dissoc t :agent)) (:targets nv))))) (:nodes @g))]
    (json/generate-string nodes)))

(defn- prepare-targets
  "Reconnects all agents in the given raw deserialized graph and calls any init handlers defined for them. Used by deserialize-graph."
  (doseq [node (:nodes @graph)]
    (let [targets (map (fn[t] (assoc t :agent (get (:nodes @graph) (:id t)) :port (keyword (:port t)))) (:targets @(val node)))
          id (key node)
          state (assoc @(val node) :targets targets)]
      (send-off (val node)
                  (if (:init state)
                    ((resolve (symbol (:init state))) state)

(defn deserialize-graph
  "Initializes a graph from the given JSON string. Re-creates all necessary agents and connections.
   Graph should be empty and needs to have been created with defgraph previously."
  [graph json]
  (doseq [node (json/parse-string json true)]
    (let [a (agent node)] (swap! graph (fn[g n] (assoc g :nodes (assoc (:nodes g) (:id @n) n))) a)))
  (prepare-targets graph))

(defn start-ticker
  ([ticker] (start-ticker ticker 0))
  ([ticker frame] (start-ticker ticker frame 60))
  ([ticker frame fps]
  (letfn [(tick [state]
                (if (:running state)
                  (let [newframe (inc (:frame state))
                        newstate (assoc state :frame newframe)]
                    (Thread/sleep (:delay state))
                    (dispatch (:targets state) newframe)
                    (send ticker tick)
         (send ticker
                 (send ticker tick)
                 (assoc state
                        :frame frame
                        :delay (/ 1000 fps)
                        :running true))))))

(defn stop-ticker
  (send ticker (fn[state] (assoc state :running false))))