clj-estuary / src / estuary / core.clj

(ns estuary.core
 "Easy dataflow graph construction using standard clojure agents.
Graphs can be de/serialized as JSON."
 (:require [clj-json.core :as json])
 (:import (java.util UUID)))

(defn dispatch
  "Dispatches a value to the list of node targets. Returns the value.
Every node handler must call this function to ensure value propagation."
  [targets value]
  (if-not (nil? (seq targets))
    (doseq [t targets]
      (let [a (:agent t)
            func (resolve (symbol (:func @a)))
            port (:port t)]
        (send a func port value)))))

(defn get-ports
  "Filters the given node map and returns a collection of values for keys with the prefix (default \":in\")."
  ([state] (get-ports state "in"))
  ([state prefix]
  (map (fn [v] (val v)) (filter #(.startsWith (name (key %)) prefix) state))))

(defn set-and-accumulate
  "Handler function for an accumulator node with multiple :in terminals (:in0, :in1 etc.)
First sets an input (ID) to the given value and then sets :value to the sum
of all existing input keys before propagating it to all targets."
  [state id value]
  (let [tmp (assoc state id value)
        newstate (assoc tmp :value (reduce + (get-ports tmp)))]
    (dispatch (:targets newstate) (:value newstate))

(defn set-value
  "Handler function for (mainly) single value nodes.
Sets the value for the given key ID and propagates it to all targets."
  ([state val] (set-value state :value val))
  ([state id val]
    (let [newstate (assoc state id val)]
      (dispatch (:targets newstate) val)

(defn get-node
  "Returns the graph node with the given UUID."
  [graph uuid]
  (get (:nodes @graph) uuid))

(defn conn!
  "Creates a connection from the given source node to a target node. The nodes need to be either supplied
as variables or UUIDs. The port parameter is the key of the target's input terminal."
  ([src target port]
    (send src
      (fn [state]
        (assoc state :targets (conj (:targets state) {:agent target :id (:id @target) :port port})))) (await src))
  ([g sid tid port]
    (let [src (get-node g sid) t (get-node g tid)]
      (send src
        (fn [state]
          (assoc state :targets (conj (:targets state) {:agent t :id tid :port port})))) (await src))))

(defn disconn!
  "Removes the connection between two nodes, either as defined by their node UUIDs and port name or via variables."
  ([src target port]
    (let [tid (:id @target) targets (filter #(not (and (= (:id %) tid) (= (:port %) port))) (:targets @src))]
      (send-off src assoc :targets targets) (await src)))
  ([g sid tid port]
    (let [src (get-node g sid)
          targets (filter #(not (and (= (:id %) tid) (= (:port %) port))) (:targets @src))]
      (send src assoc :targets targets) (await src))))

(defmacro defgraph
  "Creates a new atom based map container for nodes and assigns it to the given variable."
  `(def ~id (atom {:nodes {}})))

(defmacro defnode
  "Creates a new agent based node with the given initial state and update handler function.
The function needs to be supplied as fully qualified symbol (use ` syntax quote)."
  [graph id func state]
  `(def ~id
     (let [a# (agent (assoc (if (nil? (:targets ~state)) (assoc ~state :targets ()) ~state)
            :func (str ~func) :id (.toString (UUID/randomUUID))))]
       (swap! ~graph (fn[g# n#] (assoc g# :nodes (assoc (:nodes g#) (:id @n#) n#))) a#) a#)))

(defmacro defaccu
  "Similar to defnode. Creates a node with multiple input terminals (all of them set to 0)."
  [g id func numinputs]
  `(defnode ~g ~id ~func
     (let [state#
           ((fn gen#
              ([num#] (gen# {} (dec num#)))
              ([a# num#] (if (>= num# 0)
                           (recur (assoc a# (keyword (str "in" num#)) 0) (dec num#))

(defmacro aliasnode
  "Assigns the graph node with the given UUID to a new variable. Useful for working with deserialized graphs."
  [graph id uuid]
  `(def ~id (get-node ~graph ~uuid)))

(defn serialize-graph
  "Serializes the given graph to a JSON string."
  (let [nodes
        (map (fn[n]
               (let [nv @(val n)]
                 (assoc nv :targets (map (fn[t] (dissoc t :agent)) (:targets nv))))) (:nodes @g))]
    (json/generate-string nodes)))

(defn- prepare-targets
  "Reconnects all agents in the given raw deserialized graph. Used by deserialize-graph."
  (doseq [node (:nodes @graph)]
    (let [targets (map (fn[t] (assoc t :agent (get (:nodes @graph) (:id t)) :port (keyword (:port t)))) (:targets @(val node)))
          id (key node)
          state (assoc @(val node) :targets targets)]
      (send-off (val node) (fn[old] state)))))

(defn deserialize-graph
  "Initializes a graph from the given JSON string. Re-creates all necessary agents and connections.
   Graph should be empty and needs to have been created with defgraph previously."
  [graph json]
  (doseq [node (json/parse-string json true)]
    (let [a (agent node)] (swap! graph (fn[g n] (assoc g :nodes (assoc (:nodes g) (:id @n) n))) a)))
  (prepare-targets graph))