Anonymous avatar Anonymous committed 96ce594

Fix for bug #66

Comments (0)

Files changed (9)

apps/luke/src/luke_flow.erl

 %%      until the flow completes or exceeds the flow_timeout.
 %% @spec collect_output(any(), integer()) -> [any()] | {error, any()}
 collect_output(FlowId, Timeout) ->
-    collect_output(FlowId, Timeout, []).
+    collect_output(FlowId, Timeout, dict:new()).
 
 %% @doc Returns the pids for each phase. Intended for
 %%      testing only
 executing({results, done}, #state{client=Client, flow_id=FlowId}=State) ->
     Client ! {flow_results, FlowId, done},
     {stop, normal, State};
-executing({results, Result}, #state{client=Client, flow_id=FlowId}=State) ->
-    Client ! {flow_results, FlowId, Result},
+executing({results, PhaseId, Result}, #state{client=Client, flow_id=FlowId}=State) ->
+    Client ! {flow_results, PhaseId, FlowId, Result},
     {next_state, executing, State}.
 
 executing(get_phases, _From, #state{fsms=FSMs}=State) ->
 
 %% Internal functions
 start_phases(FlowDesc, Timeout) ->
-    start_phases(lists:reverse(FlowDesc), Timeout, []).
+    start_phases(lists:reverse(FlowDesc), length(FlowDesc) - 1, Timeout, []).
 
-start_phases([], _Timeout, Accum) ->
+start_phases([], _Id, _Timeout, Accum) ->
     {ok, Accum};
-start_phases([{PhaseMod, Behaviors, Args}|T], Timeout, Accum) ->
+start_phases([{PhaseMod, Behaviors, Args}|T], Id, Timeout, Accum) ->
     NextFSM = next_fsm(Accum),
     case proplists:get_value(converge, Behaviors) of
         undefined ->
-            case luke_phase_sup:new_phase(PhaseMod, Behaviors, NextFSM, self(), Timeout, Args) of
+            case luke_phase_sup:new_phase(Id, PhaseMod, Behaviors, NextFSM, self(), Timeout, Args) of
                 {ok, Pid} ->
                     erlang:link(Pid),
-                    start_phases(T, Timeout, [Pid|Accum]);
+                    start_phases(T, Id - 1, Timeout, [Pid|Accum]);
                 Error ->
                     Error
             end;
         InstanceCount ->
-            Pids = start_converging_phases(PhaseMod, Behaviors, NextFSM, self(), Timeout, Args, InstanceCount),
-            erlang:monitor(process, hd(Pids)),
-            start_phases(T, Timeout, [Pids|Accum])
+            Pids = start_converging_phases(Id, PhaseMod, Behaviors, NextFSM, self(), Timeout, Args, InstanceCount),
+            start_phases(T, Id - 1, Timeout, [Pids|Accum])
     end.
 
 collect_output(FlowId, Timeout, Accum) ->
     receive
         {flow_results, FlowId, done} ->
-            {ok, lists:append(lists:reverse(Accum))};
-        {flow_results, FlowId, Results} ->
-            collect_output(FlowId, Timeout, [Results|Accum]);
+            {ok, finalize_results(Accum)};
+        {flow_results, PhaseId, FlowId, Results} ->
+            collect_output(FlowId, Timeout, accumulate_results(PhaseId, Results, Accum));
         {flow_error, FlowId, Error} ->
             Error
     after Timeout ->
                 length(Accum) == 0 ->
                     {error, timeout};
                 true ->
-                    {ok, lists:append(lists:reverse(Accum))}
+                    {ok, finalize_results(Accum)}
             end
     end.
 
          end
  end.
 
-start_converging_phases(PhaseMod, Behaviors0, NextFSM, Flow, Timeout, Args, Count) ->
+start_converging_phases(Id, PhaseMod, Behaviors0, NextFSM, Flow, Timeout, Args, Count) ->
     Behaviors = [normalize_behavior(B) || B <- Behaviors0],
-    Pids = start_converging_phases(PhaseMod, Behaviors, NextFSM, Flow, Timeout, Args, Count, []),
+    Pids = start_converging_phases(Id, PhaseMod, Behaviors, NextFSM, Flow, Timeout, Args, Count, []),
     [Leader|_] = Pids,
     lists:foreach(fun(P) -> luke_phase:partners(P, Leader, Pids) end, Pids),
     Pids.
 
-start_converging_phases(_PhaseMod, _Behaviors, _NextFSM, _Flow, _Timeout, _Args, 0, Accum) ->
+start_converging_phases(_Id, _PhaseMod, _Behaviors, _NextFSM, _Flow, _Timeout, _Args, 0, Accum) ->
     Accum;
-start_converging_phases(PhaseMod, Behaviors, NextFSM, Flow, Timeout, Args, Count, Accum) ->
-    case luke_phase_sup:new_phase(PhaseMod, Behaviors, NextFSM, Flow, Timeout, Args) of
+start_converging_phases(Id, PhaseMod, Behaviors, NextFSM, Flow, Timeout, Args, Count, Accum) ->
+    case luke_phase_sup:new_phase(Id, PhaseMod, Behaviors, NextFSM, Flow, Timeout, Args) of
         {ok, Pid} ->
             erlang:link(Pid),
-            start_converging_phases(PhaseMod, Behaviors, NextFSM, Flow, Timeout, Args, Count - 1, [Pid|Accum]);
+            start_converging_phases(Id, PhaseMod, Behaviors, NextFSM, Flow, Timeout, Args, Count - 1, [Pid|Accum]);
         Error ->
             throw(Error)
     end.
     converge;
 normalize_behavior(Behavior) ->
     Behavior.
+
+finalize_results(Accum) ->
+    case [lists:append(R) || {_, R} <- lists:sort(dict:to_list(Accum))] of
+        [R] ->
+            R;
+        R ->
+            R
+    end.
+
+accumulate_results(PhaseId, Results, Accum) ->
+    case dict:find(PhaseId, Accum) of
+        error ->
+            dict:store(PhaseId, [Results], Accum);
+        {ok, PhaseAccum} ->
+            dict:store(PhaseId, [Results|PhaseAccum], Accum)
+    end.

apps/luke/src/luke_phase.erl

 -behaviour(gen_fsm).
 
 %% API
--export([start_link/6,
+-export([start_link/7,
          complete/0,
          partners/3]).
 
          terminate/3,
          code_change/4]).
 
--record(state, {mod,
+-record(state, {id,
+                mod,
                 modstate,
                 converge=false,
                 accumulate=false,
 behaviour_info(_) ->
     undefined.
 
-start_link(PhaseMod, Behaviors, NextPhases, Flow, Timeout, PhaseArgs) ->
-    gen_fsm:start_link(?MODULE, [PhaseMod, Behaviors, NextPhases, Flow, Timeout, PhaseArgs], []).
+start_link(PhaseMod, Id, Behaviors, NextPhases, Flow, Timeout, PhaseArgs) ->
+    gen_fsm:start_link(?MODULE, [Id, PhaseMod, Behaviors, NextPhases, Flow, Timeout, PhaseArgs], []).
 
 complete() ->
     gen_fsm:send_event(self(), complete).
 partners(PhasePid, Leader, Partners) ->
     gen_fsm:send_event(PhasePid, {partners, Leader, Partners}).
 
-init([PhaseMod, Behaviors, NextPhases, Flow, Timeout, PhaseArgs]) ->
+init([Id, PhaseMod, Behaviors, NextPhases, Flow, Timeout, PhaseArgs]) ->
     case PhaseMod:init(PhaseArgs) of
         {ok, ModState} ->
             Accumulate = lists:member(accumulate, Behaviors),
             Converge = lists:member(converge, Behaviors),
-            {ok, executing, #state{mod=PhaseMod, modstate=ModState, next_phases=NextPhases,
+            {ok, executing, #state{id=Id, mod=PhaseMod, modstate=ModState, next_phases=NextPhases,
                                    flow=Flow, accumulate=Accumulate, converge=Converge, flow_timeout=Timeout}};
         {stop, Reason} ->
             {stop, Reason}
 %% Send output to flow for accumulation and propagate as inputs
 %% to the next phase. Accumulation is only true for the lead
 %% process of a converging phase
-route_output(Output, #state{converge=true, accumulate=Accumulate, lead_partner=undefined,
+route_output(Output, #state{id=Id, converge=true, accumulate=Accumulate, lead_partner=undefined,
                             flow=Flow, next_phases=Next}=State) ->
     if
         Accumulate =:= true ->
-            luke_phases:send_flow_results(Flow, Output);
+            luke_phases:send_flow_results(Flow, Id, Output);
         true ->
             ok
     end,
 
 %% Route output to the next phase. Accumulate output
 %% to the flow if accumulation is turned on.
-route_output(Output, #state{converge=false, accumulate=Accumulate, flow=Flow, next_phases=Next} = State) ->
+route_output(Output, #state{id=Id, converge=false, accumulate=Accumulate, flow=Flow, next_phases=Next} = State) ->
     if
         Accumulate =:= true ->
-            luke_phases:send_flow_results(Flow, Output);
+            luke_phases:send_flow_results(Flow, Id, Output);
         true ->
             ok
     end,

apps/luke/src/luke_phase_sup.erl

 
 %% API
 -export([start_link/0,
-         new_phase/6]).
+         new_phase/7]).
 
 %% Supervisor callbacks
 -export([init/1]).
 
-new_phase(PhaseMod, Behavior, NextPhases, Flow, Timeout, PhaseArgs) when is_atom(PhaseMod),
-                                                                        is_list(PhaseArgs) ->
-    start_child(PhaseMod, [Behavior, NextPhases, Flow, Timeout, PhaseArgs]).
+new_phase(Id, PhaseMod, Behavior, NextPhases, Flow, Timeout, PhaseArgs) when is_atom(PhaseMod),
+                                                                             is_list(PhaseArgs) ->
+    start_child(PhaseMod, [Id, Behavior, NextPhases, Flow, Timeout, PhaseArgs]).
 
 start_link() ->
     supervisor:start_link({local, ?MODULE}, ?MODULE, []).

apps/luke/src/luke_phases.erl

 -export([send_inputs/2,
          send_inputs_done/1,
          send_flow_complete/1]).
--export([send_flow_results/2]).
+-export([send_flow_results/3]).
 
 %% @doc Sends inputs to a phase process
 %%      If a phase has multiple processes, inputs
 %%      This is sent by phases which are configured
 %%      to accumulate their results
 %% @spec send_flow_results(pid(), any()) -> ok
-send_flow_results(FlowPid, Results) ->
-    gen_fsm:send_event(FlowPid, {results, Results}).
+send_flow_results(FlowPid, Id, Results) ->
+    gen_fsm:send_event(FlowPid, {results, Id, Results}).

apps/luke/tests/results_tests.erl

              ?assertMatch(["hello"], Results),
              test_util:assertDead([Pid|Phases]) end,
      fun() ->
+             {FlowId, Pid, Phases} = test_util:start_flow(?THREE_PHASE_NESTED_FLOW),
+             luke_flow:add_inputs(Pid, ["hello", "goodbye"]),
+             luke_flow:finish_inputs(Pid),
+             ?assertMatch({ok, [["hello", "goodbye"], ["hello", "goodbye"], ["hello", "goodbye"]]}, luke_flow:collect_output(FlowId, 100)),
+             test_util:verify_results(FlowId, none),
+             test_util:assertDead([Pid|Phases]) end,
+
+     fun() ->
              {FlowId, Pid, Phases} = test_util:start_flow(?TWO_ASYNC_FLOW),
              luke_flow:add_inputs(Pid, "testing"),
              {ok, "testing"} = test_util:verify_results(FlowId, results),
              {FlowId, Pid, Phases} = test_util:start_flow(?MAP_DBL_FLOW),
              luke_flow:add_inputs(Pid, [a,b]),
              luke_flow:add_inputs(Pid, [a,b]),
-             ?assertMatch({ok, [5,5,5,5,5,5,5,5]}, luke_flow:collect_output(FlowId, 100)),
+             ?assertMatch({ok, [[5,5,5,5],[5,5,5,5]]}, luke_flow:collect_output(FlowId, 100)),
              test_util:verify_results(FlowId, none),
              luke_flow:finish_inputs(Pid),
              test_util:verify_results(FlowId, done),

apps/luke/tests/test_util.erl

     receive
         {flow_results, FlowId, done} ->
             throw({error, unexpected_done});
-        {flow_results, FlowId, Results} ->
+        {flow_results, _PhaseId, FlowId, Results} ->
             throw({error, unexpected_results, Results})
     after 100 ->
             ok
     receive
         {flow_results, FlowId, done} ->
             throw({error, unexpected_done});
-        {flow_results, FlowId, Results} ->
+        {flow_results, _PhaseId, FlowId, Results} ->
             {ok, Results}
     after 100 ->
             throw({error, no_results})
     receive
         {flow_results, FlowId, done} ->
             ok;
-        {flow_results, FlowId, Results} ->
+        {flow_results, _PhaseId, FlowId, Results} ->
             throw({error, unexpected_results, Results})
     after 100 ->
             throw({error, no_results})

apps/luke/tests/tests.hrl

 -define(TWO_PHASE_FLOW, [{simple_phase, [], []},
                          {simple_phase, [accumulate], []}]).
 
+-define(THREE_PHASE_NESTED_FLOW, [{nested_phase, [accumulate], []},
+                                  {simple_phase, [accumulate], []},
+                                  {simple_phase, [accumulate], []}]).
+
+
 -define(TWO_ASYNC_FLOW, [{async_phase, [], []},
                          {async_phase, [accumulate], []}]).
 

apps/riak/src/mapred_resource.erl

             {format_error(Error), done};
         {flow_error, ReqId, Error} ->
             {format_error({error, Error}), done};
-        {flow_results, ReqId, Res} ->
-            Data = mochijson2:encode(Res),
+        {flow_results, PhaseId, ReqId, Res} ->
+            Data = mochijson2:encode({struct, [{phase, PhaseId}, {data, Res}]}),
             Body = ["\n--", State#state.boundary, "\n",
                     "Content-Type: application/json\n\n",
                     Data],

apps/riak/src/riak_reduce_phase.erl

 
 handle_input(Inputs, #state{reduced=Reduced}=State0, _Timeout) ->
     State = State0#state{reduced=Inputs ++ Reduced, new_input=true},
-    {no_output, State, 50}.
+    {no_output, State, 250}.
 
 handle_input_done(#state{qterm=QTerm, reduced=Reduced0, new_input=NewInput}=State) ->
     case NewInput of
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.