1. Justin Sheehy
  2. riak

Source

riak / src / riak_mapreduce_fsm.erl

Diff from to

File src/riak_mapreduce_fsm.erl

 -module(riak_mapreduce_fsm).
 -behaviour(gen_fsm).
 
--export([start/5]).
+-export([start/4]).
 -export([init/1, handle_event/3, handle_sync_event/4,
          handle_info/3, terminate/3, code_change/4]).
 
 -export([wait/2]). 
 
--record(state, {client, reqid, acc, fsms, starttime, endtime, ring}).
+-record(state, {client,reqid,fsms,starttime,timeout,ring,input_done}).
 
-start(ReqId,Inputs,Query,Timeout,Client) ->
-    gen_fsm:start(?MODULE, [ReqId,Inputs,Query,Timeout,Client], []).
+start(ReqId,Query,Timeout,Client) ->
+    gen_fsm:start(?MODULE, [ReqId,Query,Timeout,Client], []).
 %% @private
-init([ReqId,Inputs,Query,Timeout,Client]) ->
+init([ReqId,Query,Timeout,Client]) ->
     {ok, Ring} = riak_ring_manager:get_my_ring(),
-    riak_eventer:notify(riak_mapreduce_fsm, mr_fsm_start,
-              {ReqId, length(Inputs), length(Query)}),
+    riak_eventer:notify(riak_mapreduce_fsm, mr_fsm_start, {ReqId, Query}),
     case check_query_syntax(Query) of
         ok ->
             FSMs = make_phase_fsms(Query, Ring), % Pid for each phase, in-order
-            gen_fsm:send_event(hd(FSMs), {input, Inputs}),
-            gen_fsm:send_event(hd(FSMs), done),
-            StateData = #state{client=Client,fsms=FSMs,acc=[],reqid=ReqId,
-                               starttime=riak_util:moment(),
-                               endtime=Timeout+riak_util:moment(),
-                               ring=Ring},
+            StateData = #state{client=Client,fsms=FSMs,reqid=ReqId,
+                               starttime=riak_util:moment(),timeout=Timeout,
+                               ring=Ring,input_done=false},
             {ok,wait,StateData,Timeout};
         {bad_qterm, QTerm} ->
             riak_eventer:notify(riak_mapreduce_fsm, mr_fsm_done,
     {ok, Pid} = PhaseMod:start_link(Ring, QTerm, NextFSM, self()),
     make_phase_fsms(Rest,Pid,[Pid|FSMs], Ring).
 
-wait({done,FSM}, StateData=#state{client=Client,acc=Acc,reqid=ReqId,
-                                  endtime=End,fsms=FSMs0}) ->
+wait({input,Inputs},
+     StateData=#state{reqid=ReqId,timeout=Timeout,fsms=FSMs}) ->
+    riak_eventer:notify(riak_mapreduce_fsm, mr_got_input,
+                        {ReqId, length(Inputs)}),
+    gen_fsm:send_event(hd(FSMs), {input, Inputs}),
+    {next_state, wait, StateData, Timeout};
+wait(input_done, StateData=#state{reqid=ReqId,fsms=FSMs}) ->
+    riak_eventer:notify(riak_mapreduce_fsm, mr_done_input, {ReqId}),
+    gen_fsm:send_event(hd(FSMs), done),
+    maybe_finish(StateData#state{input_done=true});
+wait({done,FSM}, StateData=#state{fsms=FSMs0}) ->
     riak_eventer:notify(riak_mapreduce_fsm, mr_fsm_done_msg, {FSM,FSMs0}),
     FSMs = lists:delete(FSM,FSMs0),
-    case FSMs of
-        [] -> 
-            riak_eventer:notify(riak_mapreduce_fsm, mr_fsm_done,
-                                {ok, ReqId, length(Acc)}),
-            Client ! {ReqId, {ok, Acc}},
-            {stop,normal,StateData};
-        _ ->
-            {next_state, wait, StateData#state{fsms=FSMs},
-             End-riak_util:moment()}
-    end;
+    maybe_finish(StateData#state{fsms=FSMs});
 wait({error, ErrFSM, ErrMsg}, StateData=#state{client=Client,reqid=ReqId,
                                                fsms=FSMs0}) ->
     FSMs = lists:delete(ErrFSM,FSMs0),
     riak_eventer:notify(riak_mapreduce_fsm, mr_fsm_done, {error, ReqId}),
     Client ! {ReqId, {error, ErrMsg}},
     {stop,normal,StateData};
-wait({acc,Data}, StateData=#state{acc=Acc,endtime=End}) ->
-    AccData = case Data of
-        {single, X} -> [X|Acc];
-        {list, X} -> X ++ Acc
+wait({acc,Data}, StateData=#state{reqid=ReqId,client=Client,timeout=Timeout}) ->
+    LData = case Data of
+        {single, X} -> [X];
+        {list, X} -> X
     end,
-    {next_state, wait, StateData#state{acc=AccData},End-riak_util:moment()};
+    Client ! {ReqId, {mr_results, LData}},
+    {next_state, wait, StateData,Timeout};
 wait(timeout, StateData=#state{reqid=ReqId,client=Client}) ->
     riak_eventer:notify(riak_mapreduce_fsm, mr_fsm_done, {timeout, ReqId}),
     Client ! {ReqId, {error, timeout}},
     {stop,normal,StateData}.
 
 %% @private
+maybe_finish(StateData=#state{input_done=Input_Done,fsms=FSMs,
+                client=Client,reqid=ReqId,timeout=Timeout}) ->
+    case Input_Done of
+        false ->
+            {next_state, wait, StateData, Timeout};
+        true ->
+            case FSMs of
+                [] ->
+                    riak_eventer:notify(riak_mapreduce_fsm, mr_fsm_done,
+                                        {ok, ReqId}),
+                    Client ! {ReqId, done},
+                    {stop,normal,StateData};
+                _ ->
+                    {next_state, wait, StateData, Timeout}
+            end
+    end.
+
+%% @private
 handle_event(_Event, _StateName, StateData) ->
     {stop,badmsg,StateData}.