Commits

Karsten Schmidt committed 065df60

adding optional init handler for nodes, overloading defnode macro to also support nodes without named update handler, adding start/stop ticker functions for repeatedly triggering node updates (frame counter style), switching project to clojure 1.3, updating project.clj & pom

  • Participants
  • Parent commits 69fc899

Comments (0)

Files changed (3)

   <modelVersion>4.0.0</modelVersion>
   <groupId>estuary</groupId>
   <artifactId>estuary</artifactId>
-  <version>0.2.0-SNAPSHOT</version>
+  <version>0.3.0-SNAPSHOT</version>
   <name>estuary</name>
   <description>Easy construction of dataflow graphs using standard Clojure agents.</description>
   <build>
   </repositories>
   <dependencies>
     <dependency>
-      <groupId>org.clojars.weavejester</groupId>
-      <artifactId>autodoc</artifactId>
-      <version>0.9.0</version>
+      <groupId>lein-marginalia</groupId>
+      <artifactId>lein-marginalia</artifactId>
+      <version>0.6.1</version>
       <scope>test</scope>
     </dependency>
     <dependency>
       <groupId>org.clojure</groupId>
       <artifactId>clojure</artifactId>
-      <version>1.2.1</version>
+      <version>1.3.0</version>
     </dependency>
     <dependency>
       <groupId>clj-json</groupId>
       <artifactId>clj-json</artifactId>
-      <version>0.3.2</version>
+      <version>0.4.3</version>
     </dependency>
   </dependencies>
 </project>
-(defproject estuary "0.2.0-SNAPSHOT"
+(defproject estuary "0.3.0-SNAPSHOT"
   :description "Easy construction of dataflow graphs using standard Clojure agents."
-  :dependencies [[org.clojure/clojure "1.2.1"]
-                 [clj-json "0.3.2"]]
-  :dev-dependencies [[org.clojars.weavejester/autodoc "0.9.0"]]
-  :autodoc {:name "Estuary" :page-title "Estuary docs"}
+  :dependencies [[org.clojure/clojure "1.3.0"]
+                 [clj-json "0.4.3"]]
+  :dev-dependencies [[lein-marginalia "0.6.1"]]
   :main estuary.core)

File src/estuary/core.clj

   [targets value]
   (if-not (nil? (seq targets))
     (doseq [t targets]
-      (let [a (:agent t)
-            func (resolve (symbol (:func @a)))
-            port (:port t)]
-        (send a func port value)))))
+      (if-not (nil? (:func @(:agent t)))
+        (let [a (:agent t)
+              func (resolve (symbol (:func @a)))
+              port (:port t)]
+          (send a func port value))))))
 
 (defn get-ports
   "Filters the given node map and returns a collection of values for keys with the prefix (default \":in\")."
-  ([state] (get-ports state "in"))
+  ([state]
+    (get-ports state "in"))
   ([state prefix]
     (map (fn [v] (val v)) (filter #(.startsWith (name (key %)) prefix) state))))
 
-(defn filter-targets
-  ([targets key]
-    (filter (fn[t] (get t key)) targets))
-  ([targets key v]
-    (filter (fn[t] (= (get t key) v)) targets)))
-
 (defn set-and-accumulate
   "Handler function for an accumulator node with multiple :in terminals (:in0, :in1 etc.)
 First sets an input (ID) to the given value and then sets :value to the sum
       (dispatch (:targets newstate) val)
       newstate)))
 
-(defn demux-value
-  "Handler function for accumulator nodes with multiple inputs.
-First sets the specified input to the given value and then propagates the value
-to all matching targets with their :demux port matching the current input id."
-  [state id v]
-  (let [newstate (assoc state id v)]
-    (dispatch (filter-targets (:targets newstate) :demux id) v)
-    newstate))
-
-(defn get-node
+(defmacro get-node
   "Returns the graph node with the given UUID."
   [graph uuid]
-  (get (:nodes @graph) uuid))
+  `(get (:nodes (deref ~graph)) ~uuid))
 
 (defn conn!
-  "Creates a connection from the given source node to a target node. The port parameter is the key of the target's input terminal.
-Custom routing data can be added via an optional map."
-  ([src target port & custom]
+  "Creates a connection from the given source node to a target node. The nodes need to be either supplied
+as variables or UUIDs. The port parameter is the key of the target's input terminal."
+  ([src target port]
     (send src
       (fn [state]
-        (assoc state :targets
-          (conj (:targets state) (merge {:agent target :id (:id @target) :port port} (first custom))))))
-    (await src)))
+        (assoc state :targets (conj (:targets state) {:agent target :id (:id @target) :port port})))) (await src))
+  ([g sid tid port]
+    (let [src (get-node g sid) t (get-node g tid)]
+      (send src
+        (fn [state]
+          (assoc state :targets (conj (:targets state) {:agent t :id tid :port port})))) (await src))))
 
 (defn disconn!
-  "Removes the connection between two nodes, either as defined via variables or by their node UUIDs and port name."
+  "Removes the connection between two nodes, either as defined by their node UUIDs and port name or via variables."
   ([src target port]
-    (let [tid (:id @target)
-          targets (filter #(not (and (= (:id %) tid) (= (:port %) port))) (:targets @src))]
-      (send src assoc :targets targets) (await src)))
+    (let [tid (:id @target) targets (filter #(not (and (= (:id %) tid) (= (:port %) port))) (:targets @src))]
+      (send-off src assoc :targets targets) (await src)))
   ([g sid tid port]
     (let [src (get-node g sid)
           targets (filter #(not (and (= (:id %) tid) (= (:port %) port))) (:targets @src))]
 (defmacro defnode
   "Creates a new agent based node with the given initial state and update handler function.
 The function needs to be supplied as fully qualified symbol (use ` syntax quote)."
-  [graph id func state]
+  ([graph id state] `(defnode ~graph ~id nil nil ~state))
+  ([graph id func state] `(defnode ~graph ~id nil ~func ~state))
+  ([graph id init func state]
   `(def ~id
-     (let [a# (agent (assoc (if (nil? (:targets ~state)) (assoc ~state :targets ()) ~state)
-                       :func (str ~func) :id (.toString (UUID/randomUUID))))]
-       (swap! ~graph (fn[g# n#] (assoc g# :nodes (assoc (:nodes g#) (:id @n#) n#))) a#) a#)))
+     (let [state# (if (nil? ~init)
+                    ~state
+                    ((resolve (symbol ~init)) ~state))
+           a# (agent (assoc
+                       (if (nil? (:targets state#))
+                         (assoc state# :targets ())
+                         state#)
+                       :init (if (nil? ~init) nil (str ~init))
+                       :func (if (nil? ~func) nil (str ~func))
+                       :id (.toString (UUID/randomUUID))))]
+       (swap! ~graph
+              (fn[g# n#] (assoc g# :nodes (assoc (:nodes g#) (:id @n#) n#)))
+              a#)
+       a#))))
 
 (defmacro defaccu
   "Similar to defnode. Creates a node with multiple input terminals (all of them set to 0)."
     (let [targets (map (fn[t] (assoc t :agent (get (:nodes @graph) (:id t)) :port (keyword (:port t)))) (:targets @(val node)))
           id (key node)
           state (assoc @(val node) :targets targets)]
-      (send-off (val node) (fn[old] state)))))
+      (send-off (val node)
+                (fn[old]
+                  (if (:init state)
+                    ((resolve (symbol (:init state))) state)
+                    state))))))
 
 (defn deserialize-graph
   "Initializes a graph from the given JSON string. Re-creates all necessary agents and connections.
-Graph should be empty and needs to have been created with defgraph previously."
+   Graph should be empty and needs to have been created with defgraph previously."
   [graph json]
   (doseq [node (json/parse-string json true)]
-    (let [a (agent node)]
-      (swap! graph (fn[g n] (assoc g :nodes (assoc (:nodes g) (:id @n) n))) a)))
+    (let [a (agent node)] (swap! graph (fn[g n] (assoc g :nodes (assoc (:nodes g) (:id @n) n))) a)))
   (prepare-targets graph))
+
+(defn start-ticker
+  ([ticker] (start-ticker ticker 0))
+  ([ticker frame] (start-ticker ticker frame 60))
+  ([ticker frame fps]
+  (letfn [(tick [state]
+                (if (:running state)
+                  (let [newframe (inc (:frame state))
+                        newstate (assoc state :frame newframe)]
+                    (Thread/sleep (:delay state))
+                    (dispatch (:targets state) newframe)
+                    (send ticker tick)
+                    newstate)
+                  state))]
+         (send ticker
+               (fn[state]
+                 (send ticker tick)
+                 (assoc state
+                        :frame frame
+                        :delay (/ 1000 fps)
+                        :running true))))))
+
+(defn stop-ticker
+  [ticker]
+  (send ticker (fn[state] (assoc state :running false))))