riak / src / riak_mapreduce_fsm.erl

Diff from to

src/riak_mapreduce_fsm.erl

 -module(riak_mapreduce_fsm).
 -behaviour(gen_fsm).
 
--export([start/4]).
+-export([start/5]).
 -export([init/1, handle_event/3, handle_sync_event/4,
          handle_info/3, terminate/3, code_change/4]).
 
 
 -record(state, {client, reqid, acc, fsms, starttime, endtime, ring}).
 
-start(Inputs,Query,Timeout,Client) ->
-    gen_fsm:start(?MODULE, [Inputs,Query,Timeout,Client], []).
+start(ReqId,Inputs,Query,Timeout,Client) ->
+    gen_fsm:start(?MODULE, [ReqId,Inputs,Query,Timeout,Client], []).
 %% @private
-init([Inputs,Query,Timeout,Client]) ->
+init([ReqId,Inputs,Query,Timeout,Client]) ->
     {ok, Ring} = riak_ring_manager:get_my_ring(),
-    RealStartTime = riak_util:moment(),
-    ReqID = erlang:phash2({random:uniform(), self(), RealStartTime}),
     riak_eventer:notify(riak_mapreduce_fsm, mr_fsm_start,
-              {ReqID, length(Inputs), length(Query)}),
+              {ReqId, length(Inputs), length(Query)}),
     case check_query_syntax(Query) of
         ok ->
             FSMs = make_phase_fsms(Query, Ring), % Pid for each phase, in-order
             gen_fsm:send_event(hd(FSMs), {input, Inputs}),
             gen_fsm:send_event(hd(FSMs), done),
-            StateData = #state{client=Client,fsms=FSMs,acc=[],reqid=ReqID,
+            StateData = #state{client=Client,fsms=FSMs,acc=[],reqid=ReqId,
                                starttime=riak_util:moment(),
                                endtime=Timeout+riak_util:moment(),
                                ring=Ring},
         {bad_qterm, QTerm} ->
             riak_eventer:notify(riak_mapreduce_fsm, mr_fsm_done,
                                 {error, {bad_qterm, QTerm}}),
-            Client ! {error, {bad_qterm, QTerm}},
+            Client ! {ReqId, {error, {bad_qterm, QTerm}}},
             {stop,normal}
     end.
 
     {ok, Pid} = PhaseMod:start_link(Ring, QTerm, NextFSM, self()),
     make_phase_fsms(Rest,Pid,[Pid|FSMs], Ring).
 
-wait({done,FSM}, StateData=#state{client=Client,acc=Acc,reqid=ReqID,
+wait({done,FSM}, StateData=#state{client=Client,acc=Acc,reqid=ReqId,
                                   endtime=End,fsms=FSMs0}) ->
     riak_eventer:notify(riak_mapreduce_fsm, mr_fsm_done_msg, {FSM,FSMs0}),
     FSMs = lists:delete(FSM,FSMs0),
     case FSMs of
         [] -> 
             riak_eventer:notify(riak_mapreduce_fsm, mr_fsm_done,
-                                {ok, ReqID, length(Acc)}),
-            Client ! {ok, Acc},
+                                {ok, ReqId, length(Acc)}),
+            Client ! {ReqId, {ok, Acc}},
             {stop,normal,StateData};
         _ ->
             {next_state, wait, StateData#state{fsms=FSMs},
              End-riak_util:moment()}
     end;
-wait({error, ErrFSM, ErrMsg}, StateData=#state{client=Client,reqid=ReqID,
+wait({error, ErrFSM, ErrMsg}, StateData=#state{client=Client,reqid=ReqId,
                                                fsms=FSMs0}) ->
     FSMs = lists:delete(ErrFSM,FSMs0),
     [gen_fsm:send_event(FSM, die) || FSM <- FSMs],
-    riak_eventer:notify(riak_mapreduce_fsm, mr_fsm_done, {error, ReqID}),
-    Client ! {error, ErrMsg},
+    riak_eventer:notify(riak_mapreduce_fsm, mr_fsm_done, {error, ReqId}),
+    Client ! {ReqId, {error, ErrMsg}},
     {stop,normal,StateData};
 wait({acc,Data}, StateData=#state{acc=Acc,endtime=End}) ->
     AccData = case Data of
         {list, X} -> X ++ Acc
     end,
     {next_state, wait, StateData#state{acc=AccData},End-riak_util:moment()};
-wait(timeout, StateData=#state{reqid=ReqID,client=Client}) ->
-    riak_eventer:notify(riak_mapreduce_fsm, mr_fsm_done, {timeout, ReqID}),
-    Client ! {error, timeout},
+wait(timeout, StateData=#state{reqid=ReqId,client=Client}) ->
+    riak_eventer:notify(riak_mapreduce_fsm, mr_fsm_done, {timeout, ReqId}),
+    Client ! {ReqId, {error, timeout}},
     {stop,normal,StateData}.
 
 %% @private
     {stop,badmsg,StateData}.
 
 %% @private
-terminate(Reason, _StateName, _State=#state{reqid=ReqID}) ->
-    riak_eventer:notify(riak_mapreduce_fsm, mr_fsm_end, {ReqID, Reason}),
+terminate(Reason, _StateName, _State=#state{reqid=ReqId}) ->
+    riak_eventer:notify(riak_mapreduce_fsm, mr_fsm_end, {ReqId, Reason}),
     Reason.
 
 %% @private
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.