1. Karsten Schmidt
  2. clj-estuary

Commits

Karsten Schmidt  committed 762292e

refactoring defaccu to support customizable accumulator functions, adding sum-accu as default accu implementation, adding trigger-emitters, adding more doc strings, renaming get-ports -> ports, get-node -> node-for-id, simplifying defaccu construction

  • Participants
  • Parent commits 685baae
  • Branches default

Comments (0)

Files changed (1)

File src/estuary/core.clj

View file
               port (:port t)]
           (send a func port value))))))
 
-(defn get-ports
-  "Filters the given node map and returns a collection of values for keys with the prefix (default \":in\")."
+(defn trigger
+  "Triggers the propagation of the node's current value to all of its targets."
+  [n]
+  (send n (resolve (symbol (:func @n)))))
+
+(defn ports
+  "Filters the given node map and returns a collection of values for keys matching the
+given regexp (default \"in\d+\")."
   ([state]
-    (get-ports state "in"))
-  ([state prefix]
-    (map (fn [v] (val v)) (filter #(.startsWith (name (key %)) prefix) state))))
+    (ports state #"in\d+"))
+  ([state re]
+    (map (fn [v] (val v)) (filter #(re-matches re (name (key %))) state))))
+
+(defn sum-accu
+  "Default accumulator function used by set-and-accumulate.
+Sums all input ports of a node."
+  [state]
+  (reduce + (ports state)))
 
 (defn set-and-accumulate
   "Handler function for an accumulator node with multiple :in terminals (:in0, :in1 etc.)
-First sets an input (ID) to the given value and then sets :value to the sum
-of all existing input keys before propagating it to all targets."
-  [state id value]
+First sets an input (ID) to the given value and then sets :value to the result of the
+configured accumulator function applied to all existing input keys.
+Result value is then propagated to all targets."
+  ([state]
+    (dispatch (:targets state) (:value state))
+    state)
+  ([state id value]
   (let [tmp (assoc state id value)
-        newstate (assoc tmp :value (reduce + (get-ports tmp)))]
-    (dispatch (:targets newstate) (:value newstate))
-    newstate))
+        newvalue ((resolve (symbol (:accu-func state))) tmp)
+        newstate (assoc tmp :value newvalue)]
+    (dispatch (:targets newstate) newvalue)
+    newstate)))
 
 (defn set-value
   "Handler function for (mainly) single value nodes.
 Sets the value for the given key ID and propagates it to all targets."
+  ([state]
+    (dispatch (:targets state) (:value state))
+    state)
   ([state val] (set-value state :value val))
   ([state id val]
     (let [newstate (assoc state id val)]
       (dispatch (:targets newstate) val)
       newstate)))
 
-(defmacro get-node
+(defmacro node-for-id
   "Returns the graph node with the given UUID."
   [graph uuid]
   `(get (:nodes (deref ~graph)) ~uuid))
         with-targets (nodes-with-targets graph)]
     (reduce #(disj %1 %2) all with-targets)))
 
+(defn trigger-emitters
+  [graph]
+  (map #(trigger (node-for-id graph %)) (emitters graph)))
+
 (defn conn!
   "Creates a connection from the given source node to a target node. The nodes need to be either supplied
 as variables or UUIDs. The port parameter is the key of the target's input terminal."
       (fn [state]
         (assoc state :targets (conj (:targets state) {:agent target :id (:id @target) :port port})))) (await src))
   ([g sid tid port]
-    (let [src (get-node g sid) t (get-node g tid)]
+    (let [src (node-for-id g sid) t (node-for-id g tid)]
       (send src
         (fn [state]
           (assoc state :targets (conj (:targets state) {:agent t :id tid :port port})))) (await src))))
     (let [tid (:id @target) targets (filter #(not (and (= (:id %) tid) (= (:port %) port))) (:targets @src))]
       (send-off src assoc :targets targets) (await src)))
   ([g sid tid port]
-    (let [src (get-node g sid)
+    (let [src (node-for-id g sid)
           targets (filter #(not (and (= (:id %) tid) (= (:port %) port))) (:targets @src))]
       (send src assoc :targets targets) (await src))))
 
        a#))))
 
 (defmacro defaccu
-  "Similar to defnode. Creates a node with multiple input terminals (all of them set to 0)."
-  [g id func numinputs]
+  "Similar to defnode. Creates a node with multiple input terminals
+(:in0, :in1... all of them set to 0). Intended to be used with update function
+set-and-accumulate. If no accumulator function is
+specified sum-accu is used by default."
+  ([g id func numinputs] `(defaccu ~g ~id ~func `sum-accu ~numinputs))
+  ([g id func accu-func numinputs]
   `(defnode ~g ~id ~func
-     (let [state#
-           ((fn gen#
-              ([num#] (gen# {} (dec num#)))
-              ([a# num#] (if (>= num# 0)
-                           (recur (assoc a# (keyword (str "in" num#)) 0) (dec num#))
-                           a#)))
-             ~numinputs)]
-       state#)))
+     (assoc (reduce #(assoc %1 (keyword (str "in" %2)) 0) {} (range ~numinputs))
+            :accu-func (str ~accu-func)))))
 
 (defmacro aliasnode
   "Assigns the graph node with the given UUID to a new variable. Useful for working with deserialized graphs."
   [graph id uuid]
-  `(def ~id (get-node ~graph ~uuid)))
+  `(def ~id (node-for-id ~graph ~uuid)))
 
 (defn serialize-graph
   "Serializes the given graph to a JSON string."
     (json/generate-string nodes)))
 
 (defn- prepare-targets
-  "Reconnects all agents in the given raw deserialized graph. Used by deserialize-graph."
+  "Reconnects all agents in the given raw deserialized graph and calls any init handlers defined for them. Used by deserialize-graph."
   [graph]
   (doseq [node (:nodes @graph)]
     (let [targets (map (fn[t] (assoc t :agent (get (:nodes @graph) (:id t)) :port (keyword (:port t)))) (:targets @(val node)))
    Graph should be empty and needs to have been created with defgraph previously."
   [graph json]
   (doseq [node (json/parse-string json true)]
-    (let [a (agent node)]
-      (swap! graph
-        (fn[g n] (assoc g :nodes (assoc (:nodes g) (:id @n) n))) a)))
+    (let [a (agent node)] (swap! graph (fn[g n] (assoc g :nodes (assoc (:nodes g) (:id @n) n))) a)))
   (prepare-targets graph))
 
 (defn start-ticker