Source

riak / src / riak_put_fsm.erl

Diff from to

File src/riak_put_fsm.erl

 -include_lib("eunit/include/eunit.hrl").
 -behaviour(gen_fsm).
 
--export([start/5]).
+-export([start/6]).
 -export([init/1, handle_event/3, handle_sync_event/4,
          handle_info/3, terminate/3, code_change/4]).
 -export([initialize/2,waiting_vnode_w/2,waiting_vnode_dw/2]).
                 ring :: riak_ring:riak_ring()
                }).
 
-start(RObj,W,DW,Timeout,From) ->
-    gen_fsm:start(?MODULE, [RObj,W,DW,Timeout,From], []).
+start(ReqId,RObj,W,DW,Timeout,From) ->
+    gen_fsm:start(?MODULE, [ReqId,RObj,W,DW,Timeout,From], []).
 
 %% @private
-init([RObj0,W,DW,Timeout,Client]) ->
+init([ReqId,RObj0,W,DW,Timeout,Client]) ->
     {ok,Ring} = riak_ring_manager:get_my_ring(),
     StateData = #state{robj=RObj0, client=Client, w=W, dw=DW,
-                       timeout=Timeout, ring=Ring},
+                       req_id=ReqId, timeout=Timeout, ring=Ring},
     {ok,initialize,StateData,0}.
 
 %% @private
-initialize(timeout, StateData0=#state{robj=RObj0,
+initialize(timeout, StateData0=#state{robj=RObj0, req_id=ReqId,
                                       timeout=Timeout, ring=Ring}) ->
     RealStartTime = riak_util:moment(),
     Bucket = riak_object:bucket(RObj0),
     BucketProps = riak_bucket:get_bucket(Bucket, Ring),
     RObj = prune_vclock(update_metadata(RObj0),BucketProps),
-    ReqID = erlang:phash2({random:uniform(), self(), RObj, RealStartTime}),
     Key = riak_object:key(RObj),
     riak_eventer:notify(riak_put_fsm, put_fsm_start,
-                        {ReqID, RealStartTime, Bucket, Key}),
+                        {ReqId, RealStartTime, Bucket, Key}),
     DocIdx = chash:key_of({Bucket, Key}),
-    Msg = {self(), {Bucket,Key}, RObj, ReqID},
+    Msg = {self(), {Bucket,Key}, RObj, ReqId},
     N = proplists:get_value(n_val,BucketProps),
     Preflist = riak_ring:filtered_preflist(DocIdx, Ring, N),
     {Targets, Fallbacks} = lists:split(N, Preflist),
         false -> Sent1 ++ riak_util:fallback(vnode_put,Msg,Pangs1,Fallbacks)
     end,
     riak_eventer:notify(riak_put_fsm, put_fsm_sent,
-                                {ReqID, [{T,S} || {_I,T,S} <- Sent]}),
+                                {ReqId, [{T,S} || {_I,T,S} <- Sent]}),
     StateData = StateData0#state{
                   robj=RObj, n=N, preflist=Preflist, bkey={Bucket,Key},
-                  waiting_for=Sent, req_id=ReqID, starttime=riak_util:moment(),
+                  waiting_for=Sent, starttime=riak_util:moment(),
                   replied_w=[], replied_dw=[], replied_fail=[],
                   endtime=Timeout+riak_util:moment()},
     {next_state,waiting_vnode_w,StateData,Timeout}.
 
-waiting_vnode_w({w, Idx, ReqID},
-                  StateData=#state{w=W,dw=DW,req_id=ReqID,client=Client,
+waiting_vnode_w({w, Idx, ReqId},
+                  StateData=#state{w=W,dw=DW,req_id=ReqId,client=Client,
                                    replied_w=Replied0, endtime=End}) ->
     Replied = [Idx|Replied0],
     case length(Replied) >= W of
         true ->
             case DW of
                 0 ->
-                    Client ! ok,
+                    Client ! {ReqId, ok},
                     riak_eventer:notify(riak_put_fsm, put_fsm_reply,
-                                        {ReqID, ok}),
+                                        {ReqId, ok}),
                     {stop,normal,StateData};
                 _ ->
                     NewStateData = StateData#state{replied_w=Replied},
             NewStateData = StateData#state{replied_w=Replied},
             {next_state,waiting_vnode_w,NewStateData,End-riak_util:moment()}
     end;
-waiting_vnode_w({dw, Idx, _ReqID},
+waiting_vnode_w({dw, Idx, _ReqId},
                   StateData=#state{replied_dw=Replied0, endtime=End}) ->
     Replied = [Idx|Replied0],
     NewStateData = StateData#state{replied_dw=Replied},
     {next_state,waiting_vnode_w,NewStateData,End-riak_util:moment()};
-waiting_vnode_w({fail, Idx, ReqID},
+waiting_vnode_w({fail, Idx, ReqId},
                   StateData=#state{n=N,w=W,client=Client,
                                    replied_fail=Replied0,endtime=End}) ->
     Replied = [Idx|Replied0],
             {next_state,waiting_vnode_w,NewStateData,End-riak_util:moment()};
         false ->
             riak_eventer:notify(riak_put_fsm, put_fsm_reply,
-                                {ReqID, {error,too_many_fails,Replied}}),
-            Client ! {error,too_many_fails},
+                                {ReqId, {error,too_many_fails,Replied}}),
+            Client ! {ReqId, {error,too_many_fails}},
             {stop,normal,NewStateData}
     end;
-waiting_vnode_w(timeout, StateData=#state{client=Client,req_id=ReqID}) ->
+waiting_vnode_w(timeout, StateData=#state{client=Client,req_id=ReqId}) ->
     riak_eventer:notify(riak_put_fsm, put_fsm_reply,
-                        {ReqID, {error,timeout}}),
-    Client ! {error,timeout},
+                        {ReqId, {error,timeout}}),
+    Client ! {ReqId, {error,timeout}},
     {stop,normal,StateData}.
 
-waiting_vnode_dw({w, _Idx, ReqID},
-          StateData=#state{req_id=ReqID, endtime=End}) ->
+waiting_vnode_dw({w, _Idx, ReqId},
+          StateData=#state{req_id=ReqId, endtime=End}) ->
     {next_state,waiting_vnode_dw,StateData,End-riak_util:moment()};
-waiting_vnode_dw({dw, Idx, ReqID},
+waiting_vnode_dw({dw, Idx, ReqId},
                   StateData=#state{dw=DW, client=Client,
                                    replied_dw=Replied0, endtime=End}) ->
     Replied = [Idx|Replied0],
     case length(Replied) >= DW of
         true ->
             riak_eventer:notify(riak_put_fsm, put_fsm_reply,
-                                {ReqID, ok}),
-            Client ! ok,
+                                {ReqId, ok}),
+            Client ! {ReqId, ok},
             {stop,normal,StateData};
         false ->
             NewStateData = StateData#state{replied_dw=Replied},
             {next_state,waiting_vnode_dw,NewStateData,End-riak_util:moment()}
     end;
-waiting_vnode_dw({fail, Idx, ReqID},
+waiting_vnode_dw({fail, Idx, ReqId},
                   StateData=#state{n=N,dw=DW,client=Client,
                                    replied_fail=Replied0,endtime=End}) ->
     Replied = [Idx|Replied0],
             {next_state,waiting_vnode_dw,NewStateData,End-riak_util:moment()};
         false ->
             riak_eventer:notify(riak_put_fsm, put_fsm_reply,
-                                {ReqID, {error,too_many_fails,Replied}}),
-            Client ! {error,too_many_fails},
+                                {ReqId, {error,too_many_fails,Replied}}),
+            Client ! {ReqId, {error,too_many_fails}},
             {stop,normal,NewStateData}
     end;
-waiting_vnode_dw(timeout, StateData=#state{client=Client,req_id=ReqID}) ->
+waiting_vnode_dw(timeout, StateData=#state{client=Client,req_id=ReqId}) ->
     riak_eventer:notify(riak_put_fsm, put_fsm_reply,
-                        {ReqID, {error,timeout}}),
-    Client ! {error,timeout},
+                        {ReqId, {error,timeout}}),
+    Client ! {ReqId, {error,timeout}},
     {stop,normal,StateData}.
 
 %% @private
     {stop,badmsg,StateData}.
 
 %% @private
-terminate(Reason, _StateName, _State=#state{req_id=ReqID}) ->
+terminate(Reason, _StateName, _State=#state{req_id=ReqId}) ->
     riak_eventer:notify(riak_put_fsm, put_fsm_end,
-                        {ReqID, Reason}),
+                        {ReqId, Reason}),
     Reason.
 
 %% @private