Commits

Anonymous committed a04a252 Merge

Merge.

Comments (0)

Files changed (5)

src/riak_client.erl

 mapred(Inputs,Query,Timeout)
   when is_list(Inputs), is_list(Query), is_integer(Timeout) ->
     Me = self(),
-    spawn(Node, riak_mapreduce_fsm, start, [Inputs,Query,Timeout,Me]),
-    receive
-        {error, Err} -> {error, Err};
-        {ok, Res} -> {ok, Res}
-    after Timeout ->
-            {error, timeout}
-    end.
+    ReqId = mk_reqid(),
+    spawn(Node, riak_mapreduce_fsm, start, [ReqId,Inputs,Query,Timeout,Me]),
+    wait_for_reqid(ReqId, Timeout).
 
 %% @spec get(riak_object:bucket(), riak_object:key(), R :: integer()) ->
 %%       {ok, riak_object:riak_object()} |
 get(Bucket, Key, R, Timeout) when is_binary(Bucket), is_binary(Key),
                                   is_integer(R), is_integer(Timeout) ->
     Me = self(),
-    spawn(Node, riak_get_fsm, start, [Bucket,Key,R,Timeout,Me]),
-    receive
-        {error, Err} -> {error, Err};
-        {ok, RObj} -> {ok, RObj}
-    after Timeout ->
-            {error, timeout}
-    end.
+    ReqId = mk_reqid(),
+    spawn(Node, riak_get_fsm, start, [ReqId,Bucket,Key,R,Timeout,Me]),
+    wait_for_reqid(ReqId, Timeout).
 
 %% @spec put(RObj :: riak_object:riak_object(), W :: integer()) ->
 %%        ok |
 put(RObj, W, DW, Timeout) ->
     R0 = riak_object:increment_vclock(RObj, ClientId),
     Me = self(),
-    spawn(Node, riak_put_fsm, start, [R0,W,DW,Timeout,Me]),
-    receive
-        ok -> ok;
-        {error, Err} -> {error, Err}
-    after Timeout ->
-            {error, timeout}
-    end.
+    ReqId = mk_reqid(),
+    spawn(Node, riak_put_fsm, start, [ReqId,R0,W,DW,Timeout,Me]),
+    wait_for_reqid(ReqId, Timeout).
 
 %% @spec delete(riak_object:bucket(), riak_object:key(), RW :: integer()) ->
 %%        ok |
 %%      nodes have responded with a value or error, or TimeoutMillisecs passes.
 delete(Bucket,Key,RW,Timeout) ->
     Me = self(),
-    spawn(Node, riak_delete, delete, [Bucket,Key,RW,Timeout,Me]),
-    receive
-        ok -> ok;
-        {error, Err} -> {error, Err}    
-    after Timeout ->
-            {error, timeout}
-    end.
+    ReqId = mk_reqid(),
+    spawn(Node, riak_delete, delete, [ReqId,Bucket,Key,RW,Timeout,Me]),
+    wait_for_reqid(ReqId, Timeout).
 
 %% @spec list_keys(riak_object:bucket()) ->
 %%       {ok, [Key :: riak_object:key()]} |
 %%      out of date if called immediately after a put or delete.
 list_keys(Bucket, Timeout) -> 
     Me = self(),
-    spawn(Node, riak_keys_fsm, start, [Bucket,Timeout,Me]),
-    receive
-        {ok, Reply} -> {ok, Reply};
-        {error, Err} -> {error, Err}
-    after Timeout ->
-            {error, timeout}
-    end.
+    ReqId = mk_reqid(),
+    spawn(Node, riak_keys_fsm, start, [ReqId,Bucket,Timeout,Me]),
+    wait_for_reqid(ReqId, Timeout).
 
 %% @spec filter_keys(riak_object:bucket(), Fun :: function()) ->
 %%       {ok, [Key :: riak_object:key()]} |
 %% every (gossip_interval) seconds, or upon adding
 %% or deleting.
 remove_event_handler(Pid, MatchHead, MatchGuard) ->
-    rpc:call(Node, riak_eventer, remove_handler, [Pid, MatchHead, MatchGuard]). 
+    rpc:call(Node, riak_eventer, remove_handler, [Pid, MatchHead, MatchGuard]). 
+
+%% @private
+mk_reqid() -> erlang:phash2(erlang:now()). % only has to be unique per-pid
+
+%% @private
+wait_for_reqid(ReqId, Timeout) ->
+    receive
+        {ReqId, {error, Err}} -> {error, Err};
+        {ReqId, ok} -> ok;
+        {ReqId, {ok, Res}} -> {ok, Res}
+    after Timeout ->
+            {error, timeout}
+    end.

src/riak_get_fsm.erl

 -module(riak_get_fsm).
 -behaviour(gen_fsm).
 
--export([start/5]).
+-export([start/6]).
 -export([init/1, handle_event/3, handle_sync_event/4,
          handle_info/3, terminate/3, code_change/4]).
 -export([initialize/2,waiting_vnode_r/2,waiting_read_repair/2]).
                 ring :: riak_ring:riak_ring()
                }).
 
-start(Bucket,Key,R,Timeout,From) ->
-    gen_fsm:start(?MODULE, [Bucket,Key,R,Timeout,From], []).
+start(ReqId,Bucket,Key,R,Timeout,From) ->
+    gen_fsm:start(?MODULE, [ReqId,Bucket,Key,R,Timeout,From], []).
 
 %% @private
-init([Bucket,Key,R,Timeout,Client]) ->
+init([ReqId,Bucket,Key,R,Timeout,Client]) ->
     {ok, Ring} = riak_ring_manager:get_my_ring(),
     StateData = #state{client=Client,r=R, timeout=Timeout,
-                       bkey={Bucket,Key}, ring=Ring},
+                req_id=ReqId, bkey={Bucket,Key}, ring=Ring},
     {ok,initialize,StateData,0}.
 
 %% @private
-initialize(timeout, StateData0=#state{timeout=Timeout,
+initialize(timeout, StateData0=#state{timeout=Timeout, req_id=ReqId,
                                       bkey={Bucket,Key}, ring=Ring}) ->
     RealStartTime = riak_util:moment(),
     DocIdx = chash:key_of({Bucket, Key}),
-    ReqID = erlang:phash2({random:uniform(), self(), DocIdx, RealStartTime}),
     riak_eventer:notify(riak_get_fsm, get_fsm_start,
-                        {ReqID, RealStartTime, Bucket, Key}),
-    Msg = {self(), {Bucket,Key}, ReqID},
+                        {ReqId, RealStartTime, Bucket, Key}),
+    Msg = {self(), {Bucket,Key}, ReqId},
     BucketProps = riak_bucket:get_bucket(Bucket, Ring),
     N = proplists:get_value(n_val,BucketProps),
     AllowMult = proplists:get_value(allow_mult,BucketProps),
         false -> Sent1 ++ riak_util:fallback(vnode_get,Msg,Pangs1,Fallbacks)
     end,
     riak_eventer:notify(riak_get_fsm, get_fsm_sent,
-                                {ReqID, [{T,S} || {_I,T,S} <- Sent]}),
+                                {ReqId, [{T,S} || {_I,T,S} <- Sent]}),
     StateData = StateData0#state{n=N,allowmult=AllowMult,repair_sent=[],
                        preflist=Preflist,final_obj=undefined,
-                       req_id=ReqID,replied_r=[],replied_fail=[],
+                       replied_r=[],replied_fail=[],
                        replied_notfound=[],starttime=riak_util:moment(),
                        waiting_for=Sent,endtime=Timeout+riak_util:moment()},
     {next_state,waiting_vnode_r,StateData,Timeout}.
 
-waiting_vnode_r({r, {ok, RObj}, Idx, ReqID},
+waiting_vnode_r({r, {ok, RObj}, Idx, ReqId},
                   StateData=#state{r=R,allowmult=AllowMult,
-                                   req_id=ReqID,client=Client,
+                                   req_id=ReqId,client=Client,
                                    replied_r=Replied0, endtime=End}) ->
     Replied = [{RObj,Idx}|Replied0],
     case length(Replied) >= R of
         true ->
-            Final = respond(Client,Replied,AllowMult),
+            Final = respond(Client,Replied,AllowMult,ReqId),
             case Final of
                 {error, notfound} ->
                     riak_eventer:notify(riak_get_fsm, get_fsm_reply,
-                                        {ReqID, notfound});
+                                        {ReqId, notfound});
                 {ok, _} ->
                     riak_eventer:notify(riak_get_fsm, get_fsm_reply,
-                                        {ReqID, ok})
+                                        {ReqId, ok})
             end,
             NewStateData = StateData#state{replied_r=Replied,final_obj=Final},
             {next_state,waiting_read_repair,
             NewStateData = StateData#state{replied_r=Replied},
             {next_state,waiting_vnode_r,NewStateData,End-riak_util:moment()}
     end;
-waiting_vnode_r({r, {error, notfound}, Idx, ReqID},
+waiting_vnode_r({r, {error, notfound}, Idx, ReqId},
                   StateData=#state{r=R,replied_fail=Fails,
-                                   req_id=ReqID,client=Client,n=N,
+                                   req_id=ReqId,client=Client,n=N,
                                    replied_notfound=Replied0,endtime=End}) ->
     Replied = [Idx|Replied0],
     NewStateData = StateData#state{replied_notfound=Replied},
             {next_state,waiting_vnode_r,NewStateData,End-riak_util:moment()};
         false ->
             riak_eventer:notify(riak_get_fsm, get_fsm_reply,
-                                {ReqID, notfound}),
-            Client ! {error,notfound},
+                                {ReqId, notfound}),
+            Client ! {ReqId, {error,notfound}},
             {stop,normal,NewStateData}
     end;
-waiting_vnode_r({r, {error, Err}, Idx, ReqID},
+waiting_vnode_r({r, {error, Err}, Idx, ReqId},
                   StateData=#state{r=R,client=Client,n=N,
-                                   replied_fail=Replied0,req_id=ReqID,
+                                   replied_fail=Replied0,req_id=ReqId,
                                    replied_notfound=NotFound,endtime=End}) ->
     Replied = [{Err,Idx}|Replied0],
     NewStateData = StateData#state{replied_fail=Replied},
                 0 ->
                     FullErr = [E || {E,_I} <- Replied],
                     riak_eventer:notify(riak_get_fsm, get_fsm_reply,
-                                        {ReqID, {error,FullErr}}),
-                    Client ! {error,FullErr},
+                                        {ReqId, {error,FullErr}}),
+                    Client ! {ReqId, {error,FullErr}},
                     {stop,normal,NewStateData};
                 _ ->
                     riak_eventer:notify(riak_get_fsm, get_fsm_reply,
-                                        {ReqID, notfound}),
-                    Client ! {error,notfound},
+                                        {ReqId, notfound}),
+                    Client ! {ReqId, {error,notfound}},
                     {stop,normal,NewStateData}
             end
     end;
-waiting_vnode_r(timeout, StateData=#state{client=Client,req_id=ReqID}) ->
+waiting_vnode_r(timeout, StateData=#state{client=Client,req_id=ReqId}) ->
     riak_eventer:notify(riak_get_fsm, get_fsm_reply,
-                        {ReqID, timeout}),
-    Client ! {error,timeout},
+                        {ReqId, timeout}),
+    Client ! {ReqId, {error,timeout}},
     {stop,normal,StateData}.
 
-waiting_read_repair({r, {ok, RObj}, Idx, ReqID},
-                  StateData=#state{req_id=ReqID,replied_r=Replied0,
+waiting_read_repair({r, {ok, RObj}, Idx, ReqId},
+                  StateData=#state{req_id=ReqId,replied_r=Replied0,
                                    endtime=End}) ->
     Replied = [{RObj,Idx}|Replied0],
     NewStateData = StateData#state{replied_r=Replied},
     {next_state,waiting_read_repair,NewStateData,End-riak_util:moment()};
-waiting_read_repair({r, {error, notfound}, Idx, ReqID},
-                  StateData=#state{req_id=ReqID,replied_notfound=Replied0,
+waiting_read_repair({r, {error, notfound}, Idx, ReqId},
+                  StateData=#state{req_id=ReqId,replied_notfound=Replied0,
                                    endtime=End}) ->
     Replied = [Idx|Replied0],
     NewStateData = StateData#state{replied_notfound=Replied},
     {next_state,waiting_read_repair,NewStateData,End-riak_util:moment()};
-waiting_read_repair({r, {error, Err}, Idx, ReqID},
-                  StateData=#state{req_id=ReqID,replied_fail=Replied0,
+waiting_read_repair({r, {error, Err}, Idx, ReqId},
+                  StateData=#state{req_id=ReqId,replied_fail=Replied0,
                                    endtime=End}) ->
     Replied = [{Err,Idx}|Replied0],
     NewStateData = StateData#state{replied_fail=Replied},
 waiting_read_repair(timeout, StateData=#state{final_obj=Final,
                                               replied_r=RepliedR,
                                               bkey=BKey,
-                                              req_id=ReqID,
+                                              req_id=ReqId,
                                               replied_notfound=NotFound,
                                               ring=Ring}) ->
     case Final of
         {error, notfound} ->
             maybe_finalize_delete(StateData);
         {ok,_} ->
-            maybe_do_read_repair(Ring,Final,RepliedR,NotFound,BKey,ReqID);
+            maybe_do_read_repair(Ring,Final,RepliedR,NotFound,BKey,ReqId);
         _ -> nop
     end,
     {stop,normal,StateData}.
 
 maybe_finalize_delete(_StateData=#state{replied_notfound=NotFound,n=N,
                                         replied_r=RepliedR,
-                                        waiting_for=Sent,req_id=ReqID,
+                                        waiting_for=Sent,req_id=ReqId,
                                         bkey=BKey}) ->
     spawn(fun() ->
     IdealNodes = [{I,Node} || {I,Node,Node} <- Sent],
                         true -> % and every response was X-Deleted, go!
                             riak_eventer:notify(riak_get_fsm,
                                                 delete_finalize_start,
-                                                {ReqID, BKey}),
+                                                {ReqId, BKey}),
                             [gen_server2:call({riak_vnode_master, Node},
                                              {vnode_del, {Idx,Node},
-                                              {BKey,ReqID}}) ||
+                                              {BKey,ReqId}}) ||
                                 {Idx,Node} <- IdealNodes];
                         _ -> nop
                     end;
     end
     end).
 
-maybe_do_read_repair(Ring,Final,RepliedR,NotFound,BKey,ReqID) ->
+maybe_do_read_repair(Ring,Final,RepliedR,NotFound,BKey,ReqId) ->
     Targets0 = ancestor_indices(Final, RepliedR) ++ NotFound,
     Targets = [{Idx,riak_ring:index_owner(Ring,Idx)} || Idx <- Targets0],
     {ok, FinalRObj} = Final,
-    Msg = {self(), BKey, FinalRObj, ReqID},
+    Msg = {self(), BKey, FinalRObj, ReqId},
     case Targets of
         [] -> nop;
         _ ->
             riak_eventer:notify(riak_get_fsm, read_repair,
-                                {ReqID, Targets}),
+                                {ReqId, Targets}),
             [gen_server:cast({riak_vnode_master, Node},
                              {vnode_put, {Idx,Node}, Msg}) ||
                 {Idx,Node} <- Targets]
     {stop,badmsg,StateData}.
 
 %% @private
-terminate(Reason, _StateName, _State=#state{req_id=ReqID}) ->
+terminate(Reason, _StateName, _State=#state{req_id=ReqId}) ->
     riak_eventer:notify(riak_get_fsm, get_fsm_end,
-                        {ReqID, Reason}),
+                        {ReqId, Reason}),
     Reason.
 
 %% @private
 code_change(_OldVsn, StateName, State, _Extra) -> {ok, StateName, State}.
 
-respond(Client,VResponses,AllowMult) ->
+respond(Client,VResponses,AllowMult,ReqId) ->
     Reply = merge_robjs([R || {R,_I} <- VResponses],AllowMult),
-    Client ! Reply,
+    Client ! {ReqId, Reply},
     Reply.
 
 merge_robjs(RObjs0,AllowMult) ->

src/riak_keys_fsm.erl

 -module(riak_keys_fsm).
 -behaviour(gen_fsm).
 
--export([start/3]).
+-export([start/4]).
 -export([init/1, handle_event/3, handle_sync_event/4,
          handle_info/3, terminate/3, code_change/4]).
 -export([initialize/2,waiting_kl/2]).
                 ring :: riak_ring:riak_ring()
                }).
 
-start(Bucket,Timeout,From) ->
-    gen_fsm:start(?MODULE, [Bucket,Timeout,From], []).
+start(ReqId,Bucket,Timeout,From) ->
+    gen_fsm:start(?MODULE, [ReqId,Bucket,Timeout,From], []).
 
 %% @private
-init([Bucket,Timeout,Client]) ->
+init([ReqId,Bucket,Timeout,Client]) ->
     {ok, Ring} = riak_ring_manager:get_my_ring(),
     StateData = #state{client=Client, timeout=Timeout,
-                       bucket=Bucket, ring=Ring},
+                       req_id=ReqId, bucket=Bucket, ring=Ring},
     {ok,initialize,StateData,0}.
 
 %% @private
-initialize(timeout, StateData0=#state{timeout=Timeout,
+initialize(timeout, StateData0=#state{timeout=Timeout, req_id=ReqId,
                                       bucket=Bucket, ring=Ring}) ->
     RealStartTime = riak_util:moment(),
-    ReqID = erlang:phash2({random:uniform(), self(), Bucket, RealStartTime}),
     riak_eventer:notify(riak_keys_fsm, keys_fsm_start,
-                        {ReqID, RealStartTime, Bucket}),
-    Msg = {self(), Bucket, ReqID},
+                        {ReqId, RealStartTime, Bucket}),
+    Msg = {self(), Bucket, ReqId},
     NodeList = riak_ring:all_owners(Ring),
     Asked = lists:foldl(
               fun({Index, Node}, Acc) ->
                           pong ->
                               gen_server:cast(
                                 {riak_vnode_master, Node},
-                                {vnode_list_bucket,{Index,ReqID},Msg}),
+                                {vnode_list_bucket,{Index,ReqId},Msg}),
                               [Index|Acc]
                       end
               end,
               [],
               NodeList),
     StateData = StateData0#state{waiting=Asked, keys=[],
-                       endtime=Timeout+riak_util:moment(),
-                       req_id=ReqID},
+                       endtime=Timeout+riak_util:moment()},
     {next_state, waiting_kl, StateData, Timeout}.
 
-waiting_kl({kl, Keys, Idx, ReqID},
+waiting_kl({kl, Keys, Idx, ReqId},
            StateData=#state{keys=Acc,waiting=Waiting,endtime=End}) ->
     NewAcc = [sets:from_list(Keys)|Acc],
     case lists:delete(Idx, Waiting) of
         [] ->
-            riak_eventer:notify(riak_keys_fsm, finish, {ReqID, normal}),
-            respond(StateData#state.client,NewAcc),
+            riak_eventer:notify(riak_keys_fsm, finish, {ReqId, normal}),
+            respond(StateData#state.client,NewAcc,ReqId),
             {stop, normal, StateData};
         StillWaiting ->
             {next_state, waiting_kl,
                              waiting=StillWaiting},
              End-riak_util:moment()}
     end;
-waiting_kl(timeout, StateData=#state{keys=Acc,client=Client,req_id=ReqID}) ->
-    riak_eventer:notify(riak_keys_fsm, finish, {ReqID, timeout}),
-    respond(Client, Acc),
+waiting_kl(timeout, StateData=#state{keys=Acc,client=Client,req_id=ReqId}) ->
+    riak_eventer:notify(riak_keys_fsm, finish, {ReqId, timeout}),
+    respond(Client, Acc, ReqId),
     {stop, normal, StateData}.
 
 %% @private
-respond(Client, KeyLists) ->
+respond(Client, KeyLists, ReqId) ->
     Reply = sets:to_list(sets:union(KeyLists)),
-    Client ! {ok, Reply},
+    Client ! {ReqId, {ok, Reply}},
     Reply.
 
 %% @private
     {stop,badmsg,StateData}.
 
 %% @private
-terminate(Reason, _StateName, _State=#state{req_id=ReqID}) ->
+terminate(Reason, _StateName, _State=#state{req_id=ReqId}) ->
     riak_eventer:notify(riak_keys_fsm, key_fsm_end,
-                        {ReqID, Reason}),
+                        {ReqId, Reason}),
     Reason.
 
 %% @private

src/riak_mapreduce_fsm.erl

 -module(riak_mapreduce_fsm).
 -behaviour(gen_fsm).
 
--export([start/4]).
+-export([start/5]).
 -export([init/1, handle_event/3, handle_sync_event/4,
          handle_info/3, terminate/3, code_change/4]).
 
 
 -record(state, {client, reqid, acc, fsms, starttime, endtime, ring}).
 
-start(Inputs,Query,Timeout,Client) ->
-    gen_fsm:start(?MODULE, [Inputs,Query,Timeout,Client], []).
+start(ReqId,Inputs,Query,Timeout,Client) ->
+    gen_fsm:start(?MODULE, [ReqId,Inputs,Query,Timeout,Client], []).
 %% @private
-init([Inputs,Query,Timeout,Client]) ->
+init([ReqId,Inputs,Query,Timeout,Client]) ->
     {ok, Ring} = riak_ring_manager:get_my_ring(),
-    RealStartTime = riak_util:moment(),
-    ReqID = erlang:phash2({random:uniform(), self(), RealStartTime}),
     riak_eventer:notify(riak_mapreduce_fsm, mr_fsm_start,
-              {ReqID, length(Inputs), length(Query)}),
+              {ReqId, length(Inputs), length(Query)}),
     case check_query_syntax(Query) of
         ok ->
             FSMs = make_phase_fsms(Query, Ring), % Pid for each phase, in-order
             gen_fsm:send_event(hd(FSMs), {input, Inputs}),
             gen_fsm:send_event(hd(FSMs), done),
-            StateData = #state{client=Client,fsms=FSMs,acc=[],reqid=ReqID,
+            StateData = #state{client=Client,fsms=FSMs,acc=[],reqid=ReqId,
                                starttime=riak_util:moment(),
                                endtime=Timeout+riak_util:moment(),
                                ring=Ring},
         {bad_qterm, QTerm} ->
             riak_eventer:notify(riak_mapreduce_fsm, mr_fsm_done,
                                 {error, {bad_qterm, QTerm}}),
-            Client ! {error, {bad_qterm, QTerm}},
+            Client ! {ReqId, {error, {bad_qterm, QTerm}}},
             {stop,normal}
     end.
 
     {ok, Pid} = PhaseMod:start_link(Ring, QTerm, NextFSM, self()),
     make_phase_fsms(Rest,Pid,[Pid|FSMs], Ring).
 
-wait({done,FSM}, StateData=#state{client=Client,acc=Acc,reqid=ReqID,
+wait({done,FSM}, StateData=#state{client=Client,acc=Acc,reqid=ReqId,
                                   endtime=End,fsms=FSMs0}) ->
     riak_eventer:notify(riak_mapreduce_fsm, mr_fsm_done_msg, {FSM,FSMs0}),
     FSMs = lists:delete(FSM,FSMs0),
     case FSMs of
         [] -> 
             riak_eventer:notify(riak_mapreduce_fsm, mr_fsm_done,
-                                {ok, ReqID, length(Acc)}),
-            Client ! {ok, Acc},
+                                {ok, ReqId, length(Acc)}),
+            Client ! {ReqId, {ok, Acc}},
             {stop,normal,StateData};
         _ ->
             {next_state, wait, StateData#state{fsms=FSMs},
              End-riak_util:moment()}
     end;
-wait({error, ErrFSM, ErrMsg}, StateData=#state{client=Client,reqid=ReqID,
+wait({error, ErrFSM, ErrMsg}, StateData=#state{client=Client,reqid=ReqId,
                                                fsms=FSMs0}) ->
     FSMs = lists:delete(ErrFSM,FSMs0),
     [gen_fsm:send_event(FSM, die) || FSM <- FSMs],
-    riak_eventer:notify(riak_mapreduce_fsm, mr_fsm_done, {error, ReqID}),
-    Client ! {error, ErrMsg},
+    riak_eventer:notify(riak_mapreduce_fsm, mr_fsm_done, {error, ReqId}),
+    Client ! {ReqId, {error, ErrMsg}},
     {stop,normal,StateData};
 wait({acc,Data}, StateData=#state{acc=Acc,endtime=End}) ->
     AccData = case Data of
         {list, X} -> X ++ Acc
     end,
     {next_state, wait, StateData#state{acc=AccData},End-riak_util:moment()};
-wait(timeout, StateData=#state{reqid=ReqID,client=Client}) ->
-    riak_eventer:notify(riak_mapreduce_fsm, mr_fsm_done, {timeout, ReqID}),
-    Client ! {error, timeout},
+wait(timeout, StateData=#state{reqid=ReqId,client=Client}) ->
+    riak_eventer:notify(riak_mapreduce_fsm, mr_fsm_done, {timeout, ReqId}),
+    Client ! {ReqId, {error, timeout}},
     {stop,normal,StateData}.
 
 %% @private
     {stop,badmsg,StateData}.
 
 %% @private
-terminate(Reason, _StateName, _State=#state{reqid=ReqID}) ->
-    riak_eventer:notify(riak_mapreduce_fsm, mr_fsm_end, {ReqID, Reason}),
+terminate(Reason, _StateName, _State=#state{reqid=ReqId}) ->
+    riak_eventer:notify(riak_mapreduce_fsm, mr_fsm_end, {ReqId, Reason}),
     Reason.
 
 %% @private

src/riak_put_fsm.erl

 -include_lib("eunit/include/eunit.hrl").
 -behaviour(gen_fsm).
 
--export([start/5]).
+-export([start/6]).
 -export([init/1, handle_event/3, handle_sync_event/4,
          handle_info/3, terminate/3, code_change/4]).
 -export([initialize/2,waiting_vnode_w/2,waiting_vnode_dw/2]).
                 ring :: riak_ring:riak_ring()
                }).
 
-start(RObj,W,DW,Timeout,From) ->
-    gen_fsm:start(?MODULE, [RObj,W,DW,Timeout,From], []).
+start(ReqId,RObj,W,DW,Timeout,From) ->
+    gen_fsm:start(?MODULE, [ReqId,RObj,W,DW,Timeout,From], []).
 
 %% @private
-init([RObj0,W,DW,Timeout,Client]) ->
+init([ReqId,RObj0,W,DW,Timeout,Client]) ->
     {ok,Ring} = riak_ring_manager:get_my_ring(),
     StateData = #state{robj=RObj0, client=Client, w=W, dw=DW,
-                       timeout=Timeout, ring=Ring},
+                       req_id=ReqId, timeout=Timeout, ring=Ring},
     {ok,initialize,StateData,0}.
 
 %% @private
-initialize(timeout, StateData0=#state{robj=RObj0,
+initialize(timeout, StateData0=#state{robj=RObj0, req_id=ReqId,
                                       timeout=Timeout, ring=Ring}) ->
     RealStartTime = riak_util:moment(),
     Bucket = riak_object:bucket(RObj0),
     BucketProps = riak_bucket:get_bucket(Bucket, Ring),
     RObj = prune_vclock(update_metadata(RObj0),BucketProps),
-    ReqID = erlang:phash2({random:uniform(), self(), RObj, RealStartTime}),
     Key = riak_object:key(RObj),
     riak_eventer:notify(riak_put_fsm, put_fsm_start,
-                        {ReqID, RealStartTime, Bucket, Key}),
+                        {ReqId, RealStartTime, Bucket, Key}),
     DocIdx = chash:key_of({Bucket, Key}),
-    Msg = {self(), {Bucket,Key}, RObj, ReqID},
+    Msg = {self(), {Bucket,Key}, RObj, ReqId},
     N = proplists:get_value(n_val,BucketProps),
     Preflist = riak_ring:filtered_preflist(DocIdx, Ring, N),
     {Targets, Fallbacks} = lists:split(N, Preflist),
         false -> Sent1 ++ riak_util:fallback(vnode_put,Msg,Pangs1,Fallbacks)
     end,
     riak_eventer:notify(riak_put_fsm, put_fsm_sent,
-                                {ReqID, [{T,S} || {_I,T,S} <- Sent]}),
+                                {ReqId, [{T,S} || {_I,T,S} <- Sent]}),
     StateData = StateData0#state{
                   robj=RObj, n=N, preflist=Preflist, bkey={Bucket,Key},
-                  waiting_for=Sent, req_id=ReqID, starttime=riak_util:moment(),
+                  waiting_for=Sent, starttime=riak_util:moment(),
                   replied_w=[], replied_dw=[], replied_fail=[],
                   endtime=Timeout+riak_util:moment()},
     {next_state,waiting_vnode_w,StateData,Timeout}.
 
-waiting_vnode_w({w, Idx, ReqID},
-                  StateData=#state{w=W,dw=DW,req_id=ReqID,client=Client,
+waiting_vnode_w({w, Idx, ReqId},
+                  StateData=#state{w=W,dw=DW,req_id=ReqId,client=Client,
                                    replied_w=Replied0, endtime=End}) ->
     Replied = [Idx|Replied0],
     case length(Replied) >= W of
         true ->
             case DW of
                 0 ->
-                    Client ! ok,
+                    Client ! {ReqId, ok},
                     riak_eventer:notify(riak_put_fsm, put_fsm_reply,
-                                        {ReqID, ok}),
+                                        {ReqId, ok}),
                     {stop,normal,StateData};
                 _ ->
                     NewStateData = StateData#state{replied_w=Replied},
             NewStateData = StateData#state{replied_w=Replied},
             {next_state,waiting_vnode_w,NewStateData,End-riak_util:moment()}
     end;
-waiting_vnode_w({dw, Idx, _ReqID},
+waiting_vnode_w({dw, Idx, _ReqId},
                   StateData=#state{replied_dw=Replied0, endtime=End}) ->
     Replied = [Idx|Replied0],
     NewStateData = StateData#state{replied_dw=Replied},
     {next_state,waiting_vnode_w,NewStateData,End-riak_util:moment()};
-waiting_vnode_w({fail, Idx, ReqID},
+waiting_vnode_w({fail, Idx, ReqId},
                   StateData=#state{n=N,w=W,client=Client,
                                    replied_fail=Replied0,endtime=End}) ->
     Replied = [Idx|Replied0],
             {next_state,waiting_vnode_w,NewStateData,End-riak_util:moment()};
         false ->
             riak_eventer:notify(riak_put_fsm, put_fsm_reply,
-                                {ReqID, {error,too_many_fails,Replied}}),
-            Client ! {error,too_many_fails},
+                                {ReqId, {error,too_many_fails,Replied}}),
+            Client ! {ReqId, {error,too_many_fails}},
             {stop,normal,NewStateData}
     end;
-waiting_vnode_w(timeout, StateData=#state{client=Client,req_id=ReqID}) ->
+waiting_vnode_w(timeout, StateData=#state{client=Client,req_id=ReqId}) ->
     riak_eventer:notify(riak_put_fsm, put_fsm_reply,
-                        {ReqID, {error,timeout}}),
-    Client ! {error,timeout},
+                        {ReqId, {error,timeout}}),
+    Client ! {ReqId, {error,timeout}},
     {stop,normal,StateData}.
 
-waiting_vnode_dw({w, _Idx, ReqID},
-          StateData=#state{req_id=ReqID, endtime=End}) ->
+waiting_vnode_dw({w, _Idx, ReqId},
+          StateData=#state{req_id=ReqId, endtime=End}) ->
     {next_state,waiting_vnode_dw,StateData,End-riak_util:moment()};
-waiting_vnode_dw({dw, Idx, ReqID},
+waiting_vnode_dw({dw, Idx, ReqId},
                   StateData=#state{dw=DW, client=Client,
                                    replied_dw=Replied0, endtime=End}) ->
     Replied = [Idx|Replied0],
     case length(Replied) >= DW of
         true ->
             riak_eventer:notify(riak_put_fsm, put_fsm_reply,
-                                {ReqID, ok}),
-            Client ! ok,
+                                {ReqId, ok}),
+            Client ! {ReqId, ok},
             {stop,normal,StateData};
         false ->
             NewStateData = StateData#state{replied_dw=Replied},
             {next_state,waiting_vnode_dw,NewStateData,End-riak_util:moment()}
     end;
-waiting_vnode_dw({fail, Idx, ReqID},
+waiting_vnode_dw({fail, Idx, ReqId},
                   StateData=#state{n=N,dw=DW,client=Client,
                                    replied_fail=Replied0,endtime=End}) ->
     Replied = [Idx|Replied0],
             {next_state,waiting_vnode_dw,NewStateData,End-riak_util:moment()};
         false ->
             riak_eventer:notify(riak_put_fsm, put_fsm_reply,
-                                {ReqID, {error,too_many_fails,Replied}}),
-            Client ! {error,too_many_fails},
+                                {ReqId, {error,too_many_fails,Replied}}),
+            Client ! {ReqId, {error,too_many_fails}},
             {stop,normal,NewStateData}
     end;
-waiting_vnode_dw(timeout, StateData=#state{client=Client,req_id=ReqID}) ->
+waiting_vnode_dw(timeout, StateData=#state{client=Client,req_id=ReqId}) ->
     riak_eventer:notify(riak_put_fsm, put_fsm_reply,
-                        {ReqID, {error,timeout}}),
-    Client ! {error,timeout},
+                        {ReqId, {error,timeout}}),
+    Client ! {ReqId, {error,timeout}},
     {stop,normal,StateData}.
 
 %% @private
     {stop,badmsg,StateData}.
 
 %% @private
-terminate(Reason, _StateName, _State=#state{req_id=ReqID}) ->
+terminate(Reason, _StateName, _State=#state{req_id=ReqId}) ->
     riak_eventer:notify(riak_put_fsm, put_fsm_end,
-                        {ReqID, Reason}),
+                        {ReqId, Reason}),
     Reason.
 
 %% @private
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.