Commits

Anonymous committed 20f505d

Implementing re-reduce toggle

Comments (0)

Files changed (1)

src/luke_phase.erl

                 modstate,
                 converge=false,
                 accumulate=false,
+                rereduce=false,
                 lead_partner,
                 partners,
                 next_phases,
         {ok, ModState} ->
             Accumulate = lists:member(accumulate, Behaviors),
             Converge = lists:member(converge, Behaviors),
+            Rereduce = proplists:get_value(rereduce, Behaviors, false),
             {ok, executing, #state{id=Id, mod=PhaseMod, modstate=ModState, next_phases=NextPhases,
-                                   flow=Flow, accumulate=Accumulate, converge=Converge, flow_timeout=Timeout}};
+                                   flow=Flow, rereduce=Rereduce, accumulate=Accumulate, converge=Converge,
+                                   flow_timeout=Timeout}};
         {stop, Reason} ->
             {stop, Reason}
     end.
 handle_callback(_Type, BadValue, _State) ->
   throw({error, {bad_return, BadValue}}).
 
-%% Route output to lead when converging
-%% Accumulation is ignored for non-leads of converging phases
-%% since all accumulation is performed in the lead process
-route_output(Output, #state{converge=true, lead_partner=Lead}=State) when is_pid(Lead) ->
-    propagate_inputs([Lead], Output),
-    State;
-
-%% Send output to flow for accumulation and propagate as inputs
-%% to the next phase. Accumulation is only true for the lead
-%% process of a converging phase
+route_output(Output, #state{id=Id, converge=true, rereduce=Rereduce, accumulate=Accumulate,
+                            flow=Flow, lead_partner=Lead, next_phases=Next}=State) when is_pid(Lead) ->
+    case Rereduce of
+        false ->
+            case Accumulate of
+                true ->
+                    luke_phases:send_flow_results(Flow, Id, Output);
+                false -> ok
+            end,
+            RotatedNext = propagate_inputs(Next, Output),
+            State#state{next_phases=RotatedNext};
+        true ->
+            luke_phases:send_inputs(Lead, Output),
+            State
+    end;
 route_output(Output, #state{id=Id, converge=true, accumulate=Accumulate, lead_partner=undefined,
                             flow=Flow, next_phases=Next}=State) ->
     if