Source

riak / src / riak_keys_fsm.erl

Diff from to

File src/riak_keys_fsm.erl

 -module(riak_keys_fsm).
 -behaviour(gen_fsm).
 
--export([start/3]).
+-export([start/4]).
 -export([init/1, handle_event/3, handle_sync_event/4,
          handle_info/3, terminate/3, code_change/4]).
 -export([initialize/2,waiting_kl/2]).
                 ring :: riak_ring:riak_ring()
                }).
 
-start(Bucket,Timeout,From) ->
-    gen_fsm:start(?MODULE, [Bucket,Timeout,From], []).
+start(ReqId,Bucket,Timeout,From) ->
+    gen_fsm:start(?MODULE, [ReqId,Bucket,Timeout,From], []).
 
 %% @private
-init([Bucket,Timeout,Client]) ->
+init([ReqId,Bucket,Timeout,Client]) ->
     {ok, Ring} = riak_ring_manager:get_my_ring(),
     StateData = #state{client=Client, timeout=Timeout,
-                       bucket=Bucket, ring=Ring},
+                       req_id=ReqId, bucket=Bucket, ring=Ring},
     {ok,initialize,StateData,0}.
 
 %% @private
-initialize(timeout, StateData0=#state{timeout=Timeout,
+initialize(timeout, StateData0=#state{timeout=Timeout, req_id=ReqId,
                                       bucket=Bucket, ring=Ring}) ->
     RealStartTime = riak_util:moment(),
-    ReqID = erlang:phash2({random:uniform(), self(), Bucket, RealStartTime}),
     riak_eventer:notify(riak_keys_fsm, keys_fsm_start,
-                        {ReqID, RealStartTime, Bucket}),
-    Msg = {self(), Bucket, ReqID},
+                        {ReqId, RealStartTime, Bucket}),
+    Msg = {self(), Bucket, ReqId},
     NodeList = riak_ring:all_owners(Ring),
     Asked = lists:foldl(
               fun({Index, Node}, Acc) ->
                           pong ->
                               gen_server:cast(
                                 {riak_vnode_master, Node},
-                                {vnode_list_bucket,{Index,ReqID},Msg}),
+                                {vnode_list_bucket,{Index,ReqId},Msg}),
                               [Index|Acc]
                       end
               end,
               [],
               NodeList),
     StateData = StateData0#state{waiting=Asked, keys=[],
-                       endtime=Timeout+riak_util:moment(),
-                       req_id=ReqID},
+                       endtime=Timeout+riak_util:moment()},
     {next_state, waiting_kl, StateData, Timeout}.
 
-waiting_kl({kl, Keys, Idx, ReqID},
+waiting_kl({kl, Keys, Idx, ReqId},
            StateData=#state{keys=Acc,waiting=Waiting,endtime=End}) ->
     NewAcc = [sets:from_list(Keys)|Acc],
     case lists:delete(Idx, Waiting) of
         [] ->
-            riak_eventer:notify(riak_keys_fsm, finish, {ReqID, normal}),
-            respond(StateData#state.client,NewAcc),
+            riak_eventer:notify(riak_keys_fsm, finish, {ReqId, normal}),
+            respond(StateData#state.client,NewAcc,ReqId),
             {stop, normal, StateData};
         StillWaiting ->
             {next_state, waiting_kl,
                              waiting=StillWaiting},
              End-riak_util:moment()}
     end;
-waiting_kl(timeout, StateData=#state{keys=Acc,client=Client,req_id=ReqID}) ->
-    riak_eventer:notify(riak_keys_fsm, finish, {ReqID, timeout}),
-    respond(Client, Acc),
+waiting_kl(timeout, StateData=#state{keys=Acc,client=Client,req_id=ReqId}) ->
+    riak_eventer:notify(riak_keys_fsm, finish, {ReqId, timeout}),
+    respond(Client, Acc, ReqId),
     {stop, normal, StateData}.
 
 %% @private
-respond(Client, KeyLists) ->
+respond(Client, KeyLists, ReqId) ->
     Reply = sets:to_list(sets:union(KeyLists)),
-    Client ! {ok, Reply},
+    Client ! {ReqId, {ok, Reply}},
     Reply.
 
 %% @private
     {stop,badmsg,StateData}.
 
 %% @private
-terminate(Reason, _StateName, _State=#state{req_id=ReqID}) ->
+terminate(Reason, _StateName, _State=#state{req_id=ReqId}) ->
     riak_eventer:notify(riak_keys_fsm, key_fsm_end,
-                        {ReqID, Reason}),
+                        {ReqId, Reason}),
     Reason.
 
 %% @private