Commits

Karsten Schmidt  committed 76ff5ee

adding node UUIDs and JSON de/serialization, updating docs & tests

  • Participants
  • Parent commits 4089289

Comments (0)

Files changed (4)

   <modelVersion>4.0.0</modelVersion>
   <groupId>estuary</groupId>
   <artifactId>estuary</artifactId>
-  <version>0.1.0-SNAPSHOT</version>
+  <version>0.2.0-SNAPSHOT</version>
   <name>estuary</name>
   <description>Easy construction of dataflow graphs using standard Clojure agents.</description>
   <build>
-(defproject estuary "0.1.0-SNAPSHOT"
+(defproject estuary "0.2.0-SNAPSHOT"
   :description "Easy construction of dataflow graphs using standard Clojure agents."
   :dependencies [[org.clojure/clojure "1.2.1"]
                  [clj-json "0.3.2"]]
   :dev-dependencies [[org.clojars.weavejester/autodoc "0.9.0"]]
-  :autodoc {:name "Estuary" :page-title "Estuary docs"})
+  :autodoc {:name "Estuary" :page-title "Estuary docs"}
+  :main estuary.core)

File src/estuary/core.clj

 (ns estuary.core
-  "Easy dataflow graph construction using standard clojure agents.
-   Graphs can be de/serialized as JSON."
-  (:require [clj-json.core :as json]))
+ "Easy dataflow graph construction using standard clojure agents.
+Graphs can be de/serialized as JSON."
+ (:require [clj-json.core :as json])
+ (:import (java.util UUID)))
 
 (defn dispatch
   "Dispatches a value to the list of node targets. Returns the value.
-   Every node handler must call this function to ensure value propagation."
+Every node handler must call this function to ensure value propagation."
   [targets value]
   (if-not (nil? (seq targets))
     (doseq [t targets]
-      (let [a @(resolve (symbol (:agent t)))
+      (let [a (:agent t)
             func (resolve (symbol (:func @a)))
             port (:port t)]
         (send a func port value)))))
 
 (defn set-and-accumulate
   "Handler function for an accumulator node with multiple :in terminals (:in0, :in1 etc.)
-   First sets an input (ID) to the given value and then sets :value to the sum
-   of all existing input keys before propagating it to all targets."
+First sets an input (ID) to the given value and then sets :value to the sum
+of all existing input keys before propagating it to all targets."
   [state id value]
   (let [tmp (assoc state id value)
         newstate (assoc tmp :value (reduce + (get-inputs tmp)))]
 
 (defn set-value
   "Handler function for (mainly) single value nodes.
-   Sets the value for the given key ID and propagates it to all targets."
+Sets the value for the given key ID and propagates it to all targets."
   ([state val] (set-value state :value val))
   ([state id val]
     (let [newstate (assoc state id val)]
       (dispatch (:targets newstate) val)
       newstate)))
 
-(defn add-target
-  "Adds a target reference to the given node/agent. The agent needs to be supplied
-   as fully qualified symbol (use ` syntax quote). The port parameter is the key of the
-   target node's input terminal."
-  [src target port]
-  (send-off src
-    (fn [state]
-      (assoc state :targets (conj (:targets state) {:agent (str target) :port port})))))
+(defn get-node
+  "Returns the node with the given UUID."
+  [graph uuid]
+  (get (:nodes @graph) uuid))
+
+(defn conn!
+  "Creates a connection from the given source node to a target node. The nodes need to be either supplied
+as variables or UUIDs. The port parameter is the key of the target's input terminal."
+  ([src target port]
+    (send-off src
+      (fn [state]
+        (assoc state :targets (conj (:targets state) {:agent target :id (:id @target) :port port})))))
+  ([g sid tid port]
+    (let [src (get-node g sid) t (get-node g tid)]
+      (send-off src
+        (fn [state]
+          (assoc state :targets (conj (:targets state) {:agent t :id tid :port port})))))))
+
+(defmacro defgraph
+  "Creates a new atom based map container for nodes and assigns it to the given variable."
+  [id]
+  `(def ~id (atom {:nodes {}})))
 
 (defmacro defnode
   "Creates a new agent based node with the given initial state and update handler function.
-   The function needs to be supplied as fully qualified symbol (use ` syntax quote)."
-  [id func state]
-  `(def ~id (agent (assoc
-     (if (nil? (:targets ~state)) (assoc ~state :targets ()) ~state)
-     :func (str ~func) :id (str "node_" (gensym))))))
+The function needs to be supplied as fully qualified symbol (use ` syntax quote)."
+  [graph id func state]
+  `(def ~id
+     (let [a# (agent (assoc (if (nil? (:targets ~state)) (assoc ~state :targets ()) ~state)
+            :func (str ~func) :id (.toString (UUID/randomUUID))))]
+       (swap! ~graph (fn[g# n#] (assoc g# :nodes (assoc (:nodes g#) (:id @n#) n#))) a#) a#)))
 
 (defmacro defaccu
   "Similar to defnode. Creates a node with multiple input terminals."
-  [id func numinputs]
-    `(def ~id (agent (let [state# ((fn gen#
-                  ([num#] (gen# {} (dec num#)))
-                  ([a# num#] (if (>= num# 0)
-                             (recur (assoc a# (keyword (str "in" num#)) 0) (dec num#)) a#))) ~numinputs)]
-      (assoc state# :value 0 :func (str ~func) :id (str "node_" (gensym)))))))
+  [g id func numinputs]
+  `(defnode ~g ~id ~func
+     (let [state#
+           ((fn gen#
+              ([num#] (gen# {} (dec num#)))
+              ([a# num#] (if (>= num# 0)
+                           (recur (assoc a# (keyword (str "in" num#)) 0) (dec num#))
+                           a#)))
+             ~numinputs)]
+       state#)))
 
-(defn serialize-nodes
-  [& nodes]
-  (json/generate-string nodes))
+(defmacro aliasnode
+  "Assigns the graph node with the given UUID to a new variable. Useful for working with deserialized graphs."
+  [graph id uuid]
+  `(def ~id (get (:nodes @~graph) ~uuid)))
+
+(defn serialize-graph
+  "Serializes the given graph to a JSON string."
+  [g]
+  (let [nodes
+        (map (fn[n]
+               (let [nv @(val n)]
+                 (assoc nv :targets (map (fn[t] (dissoc t :agent)) (:targets nv))))) (:nodes @g))]
+    (json/generate-string nodes)))
+
+(defn- prepare-targets
+  "Reconnects all agents in the given graph. Used by deserialize-graph."
+  [graph]
+  (doseq [node (:nodes @graph)]
+    (let [targets (map (fn[t] (assoc t :agent (get (:nodes @graph) (:id t)) :port (keyword (:port t)))) (:targets @(val node)))
+          id (key node)
+          state (assoc @(val node) :targets targets)]
+      (send-off (val node) (fn[old] state)))))
+
+(defn deserialize-graph
+  "Initializes a graph from the given JSON string. Re-creates all necessary agents and connections.
+   Graph should be empty and needs to have been created with defgraph previously."
+  [graph json]
+  (doseq [node (json/parse-string json true)]
+    (let [a (agent node)] (swap! graph (fn[g n] (assoc g :nodes (assoc (:nodes g) (:id @n) n))) a)))
+  (prepare-targets graph))

File test/estuary/test/core.clj

  (:use [estuary.core])
  (:use [clojure.test]))
 
-(with-test
-  (defnode a `set-value {:value 0})
-  (defnode b `set-value {:value 0})
-  (defaccu c `set-and-accumulate 2)
-  (add-target a `c :in0)
-  (add-target b `c :in1)
+(deftest basic-accu
+  (defgraph g)
+  (defnode g a `set-value {:value 0})
+  (defnode g b `set-value {:value 0})
+  (defaccu g c `set-and-accumulate 2)
+  (conn! a c :in0)
+  (conn! b c :in1)
   (send-off a set-value 23)
   (send-off b set-value 42)
   (Thread/sleep 100)
   (is (= 65 (:value @c))))
+
+(deftest deserialize
+  (defgraph g2)
+  (deserialize-graph g2 "[{\"value\":84,\"id\":\"afc2f8f2-226d-4cac-a9cb-cbd574f913a3\",\"func\":\"estuary.core/set-and-accumulate\",\"targets\":[],\"in0\":42,\"in1\":42},{\"id\":\"1b1049ec-6fe7-48a0-b8be-91adab3aeb8f\",\"func\":\"estuary.core/set-value\",\"targets\":[{\"id\":\"afc2f8f2-226d-4cac-a9cb-cbd574f913a3\",\"port\":\"in0\"}],\"value\":42},{\"id\":\"fbad23fa-3efd-42ec-8129-20873905f8bc\",\"func\":\"estuary.core/set-value\",\"targets\":[{\"id\":\"afc2f8f2-226d-4cac-a9cb-cbd574f913a3\",\"port\":\"in1\"},{\"id\":\"1b1049ec-6fe7-48a0-b8be-91adab3aeb8f\",\"port\":\"value\"}],\"value\":42}]")
+  (aliasnode g2 a "fbad23fa-3efd-42ec-8129-20873905f8bc")
+  (aliasnode g2 b "1b1049ec-6fe7-48a0-b8be-91adab3aeb8f")
+  (aliasnode g2 c "afc2f8f2-226d-4cac-a9cb-cbd574f913a3")
+  (send-off a set-value 1000)
+  (Thread/sleep 100)
+  (is (= 1000 (:value @b)))
+  (is (= 2000 (:value @c))))