Source

riak / src / riak_get_fsm.erl

Diff from to

src/riak_get_fsm.erl

 -module(riak_get_fsm).
 -behaviour(gen_fsm).
 
--export([start/5]).
+-export([start/6]).
 -export([init/1, handle_event/3, handle_sync_event/4,
          handle_info/3, terminate/3, code_change/4]).
 -export([initialize/2,waiting_vnode_r/2,waiting_read_repair/2]).
                 ring :: riak_ring:riak_ring()
                }).
 
-start(Bucket,Key,R,Timeout,From) ->
-    gen_fsm:start(?MODULE, [Bucket,Key,R,Timeout,From], []).
+start(ReqId,Bucket,Key,R,Timeout,From) ->
+    gen_fsm:start(?MODULE, [ReqId,Bucket,Key,R,Timeout,From], []).
 
 %% @private
-init([Bucket,Key,R,Timeout,Client]) ->
+init([ReqId,Bucket,Key,R,Timeout,Client]) ->
     {ok, Ring} = riak_ring_manager:get_my_ring(),
     StateData = #state{client=Client,r=R, timeout=Timeout,
-                       bkey={Bucket,Key}, ring=Ring},
+                req_id=ReqId, bkey={Bucket,Key}, ring=Ring},
     {ok,initialize,StateData,0}.
 
 %% @private
-initialize(timeout, StateData0=#state{timeout=Timeout,
+initialize(timeout, StateData0=#state{timeout=Timeout, req_id=ReqId,
                                       bkey={Bucket,Key}, ring=Ring}) ->
     RealStartTime = riak_util:moment(),
     DocIdx = chash:key_of({Bucket, Key}),
-    ReqID = erlang:phash2({random:uniform(), self(), DocIdx, RealStartTime}),
     riak_eventer:notify(riak_get_fsm, get_fsm_start,
-                        {ReqID, RealStartTime, Bucket, Key}),
-    Msg = {self(), {Bucket,Key}, ReqID},
+                        {ReqId, RealStartTime, Bucket, Key}),
+    Msg = {self(), {Bucket,Key}, ReqId},
     BucketProps = riak_bucket:get_bucket(Bucket, Ring),
     N = proplists:get_value(n_val,BucketProps),
     AllowMult = proplists:get_value(allow_mult,BucketProps),
         false -> Sent1 ++ riak_util:fallback(vnode_get,Msg,Pangs1,Fallbacks)
     end,
     riak_eventer:notify(riak_get_fsm, get_fsm_sent,
-                                {ReqID, [{T,S} || {_I,T,S} <- Sent]}),
+                                {ReqId, [{T,S} || {_I,T,S} <- Sent]}),
     StateData = StateData0#state{n=N,allowmult=AllowMult,repair_sent=[],
                        preflist=Preflist,final_obj=undefined,
-                       req_id=ReqID,replied_r=[],replied_fail=[],
+                       replied_r=[],replied_fail=[],
                        replied_notfound=[],starttime=riak_util:moment(),
                        waiting_for=Sent,endtime=Timeout+riak_util:moment()},
     {next_state,waiting_vnode_r,StateData,Timeout}.
 
-waiting_vnode_r({r, {ok, RObj}, Idx, ReqID},
+waiting_vnode_r({r, {ok, RObj}, Idx, ReqId},
                   StateData=#state{r=R,allowmult=AllowMult,
-                                   req_id=ReqID,client=Client,
+                                   req_id=ReqId,client=Client,
                                    replied_r=Replied0, endtime=End}) ->
     Replied = [{RObj,Idx}|Replied0],
     case length(Replied) >= R of
         true ->
-            Final = respond(Client,Replied,AllowMult),
+            Final = respond(Client,Replied,AllowMult,ReqId),
             case Final of
                 {error, notfound} ->
                     riak_eventer:notify(riak_get_fsm, get_fsm_reply,
-                                        {ReqID, notfound});
+                                        {ReqId, notfound});
                 {ok, _} ->
                     riak_eventer:notify(riak_get_fsm, get_fsm_reply,
-                                        {ReqID, ok})
+                                        {ReqId, ok})
             end,
             NewStateData = StateData#state{replied_r=Replied,final_obj=Final},
             {next_state,waiting_read_repair,
             NewStateData = StateData#state{replied_r=Replied},
             {next_state,waiting_vnode_r,NewStateData,End-riak_util:moment()}
     end;
-waiting_vnode_r({r, {error, notfound}, Idx, ReqID},
+waiting_vnode_r({r, {error, notfound}, Idx, ReqId},
                   StateData=#state{r=R,replied_fail=Fails,
-                                   req_id=ReqID,client=Client,n=N,
+                                   req_id=ReqId,client=Client,n=N,
                                    replied_notfound=Replied0,endtime=End}) ->
     Replied = [Idx|Replied0],
     NewStateData = StateData#state{replied_notfound=Replied},
             {next_state,waiting_vnode_r,NewStateData,End-riak_util:moment()};
         false ->
             riak_eventer:notify(riak_get_fsm, get_fsm_reply,
-                                {ReqID, notfound}),
-            Client ! {error,notfound},
+                                {ReqId, notfound}),
+            Client ! {ReqId, {error,notfound}},
             {stop,normal,NewStateData}
     end;
-waiting_vnode_r({r, {error, Err}, Idx, ReqID},
+waiting_vnode_r({r, {error, Err}, Idx, ReqId},
                   StateData=#state{r=R,client=Client,n=N,
-                                   replied_fail=Replied0,req_id=ReqID,
+                                   replied_fail=Replied0,req_id=ReqId,
                                    replied_notfound=NotFound,endtime=End}) ->
     Replied = [{Err,Idx}|Replied0],
     NewStateData = StateData#state{replied_fail=Replied},
                 0 ->
                     FullErr = [E || {E,_I} <- Replied],
                     riak_eventer:notify(riak_get_fsm, get_fsm_reply,
-                                        {ReqID, {error,FullErr}}),
-                    Client ! {error,FullErr},
+                                        {ReqId, {error,FullErr}}),
+                    Client ! {ReqId, {error,FullErr}},
                     {stop,normal,NewStateData};
                 _ ->
                     riak_eventer:notify(riak_get_fsm, get_fsm_reply,
-                                        {ReqID, notfound}),
-                    Client ! {error,notfound},
+                                        {ReqId, notfound}),
+                    Client ! {ReqId, {error,notfound}},
                     {stop,normal,NewStateData}
             end
     end;
-waiting_vnode_r(timeout, StateData=#state{client=Client,req_id=ReqID}) ->
+waiting_vnode_r(timeout, StateData=#state{client=Client,req_id=ReqId}) ->
     riak_eventer:notify(riak_get_fsm, get_fsm_reply,
-                        {ReqID, timeout}),
-    Client ! {error,timeout},
+                        {ReqId, timeout}),
+    Client ! {ReqId, {error,timeout}},
     {stop,normal,StateData}.
 
-waiting_read_repair({r, {ok, RObj}, Idx, ReqID},
-                  StateData=#state{req_id=ReqID,replied_r=Replied0,
+waiting_read_repair({r, {ok, RObj}, Idx, ReqId},
+                  StateData=#state{req_id=ReqId,replied_r=Replied0,
                                    endtime=End}) ->
     Replied = [{RObj,Idx}|Replied0],
     NewStateData = StateData#state{replied_r=Replied},
     {next_state,waiting_read_repair,NewStateData,End-riak_util:moment()};
-waiting_read_repair({r, {error, notfound}, Idx, ReqID},
-                  StateData=#state{req_id=ReqID,replied_notfound=Replied0,
+waiting_read_repair({r, {error, notfound}, Idx, ReqId},
+                  StateData=#state{req_id=ReqId,replied_notfound=Replied0,
                                    endtime=End}) ->
     Replied = [Idx|Replied0],
     NewStateData = StateData#state{replied_notfound=Replied},
     {next_state,waiting_read_repair,NewStateData,End-riak_util:moment()};
-waiting_read_repair({r, {error, Err}, Idx, ReqID},
-                  StateData=#state{req_id=ReqID,replied_fail=Replied0,
+waiting_read_repair({r, {error, Err}, Idx, ReqId},
+                  StateData=#state{req_id=ReqId,replied_fail=Replied0,
                                    endtime=End}) ->
     Replied = [{Err,Idx}|Replied0],
     NewStateData = StateData#state{replied_fail=Replied},
 waiting_read_repair(timeout, StateData=#state{final_obj=Final,
                                               replied_r=RepliedR,
                                               bkey=BKey,
-                                              req_id=ReqID,
+                                              req_id=ReqId,
                                               replied_notfound=NotFound,
                                               ring=Ring}) ->
     case Final of
         {error, notfound} ->
             maybe_finalize_delete(StateData);
         {ok,_} ->
-            maybe_do_read_repair(Ring,Final,RepliedR,NotFound,BKey,ReqID);
+            maybe_do_read_repair(Ring,Final,RepliedR,NotFound,BKey,ReqId);
         _ -> nop
     end,
     {stop,normal,StateData}.
 
 maybe_finalize_delete(_StateData=#state{replied_notfound=NotFound,n=N,
                                         replied_r=RepliedR,
-                                        waiting_for=Sent,req_id=ReqID,
+                                        waiting_for=Sent,req_id=ReqId,
                                         bkey=BKey}) ->
     spawn(fun() ->
     IdealNodes = [{I,Node} || {I,Node,Node} <- Sent],
                         true -> % and every response was X-Deleted, go!
                             riak_eventer:notify(riak_get_fsm,
                                                 delete_finalize_start,
-                                                {ReqID, BKey}),
+                                                {ReqId, BKey}),
                             [gen_server2:call({riak_vnode_master, Node},
                                              {vnode_del, {Idx,Node},
-                                              {BKey,ReqID}}) ||
+                                              {BKey,ReqId}}) ||
                                 {Idx,Node} <- IdealNodes];
                         _ -> nop
                     end;
     end
     end).
 
-maybe_do_read_repair(Ring,Final,RepliedR,NotFound,BKey,ReqID) ->
+maybe_do_read_repair(Ring,Final,RepliedR,NotFound,BKey,ReqId) ->
     Targets0 = ancestor_indices(Final, RepliedR) ++ NotFound,
     Targets = [{Idx,riak_ring:index_owner(Ring,Idx)} || Idx <- Targets0],
     {ok, FinalRObj} = Final,
-    Msg = {self(), BKey, FinalRObj, ReqID},
+    Msg = {self(), BKey, FinalRObj, ReqId},
     case Targets of
         [] -> nop;
         _ ->
             riak_eventer:notify(riak_get_fsm, read_repair,
-                                {ReqID, Targets}),
+                                {ReqId, Targets}),
             [gen_server:cast({riak_vnode_master, Node},
                              {vnode_put, {Idx,Node}, Msg}) ||
                 {Idx,Node} <- Targets]
     {stop,badmsg,StateData}.
 
 %% @private
-terminate(Reason, _StateName, _State=#state{req_id=ReqID}) ->
+terminate(Reason, _StateName, _State=#state{req_id=ReqId}) ->
     riak_eventer:notify(riak_get_fsm, get_fsm_end,
-                        {ReqID, Reason}),
+                        {ReqId, Reason}),
     Reason.
 
 %% @private
 code_change(_OldVsn, StateName, State, _Extra) -> {ok, StateName, State}.
 
-respond(Client,VResponses,AllowMult) ->
+respond(Client,VResponses,AllowMult,ReqId) ->
     Reply = merge_robjs([R || {R,_I} <- VResponses],AllowMult),
-    Client ! Reply,
+    Client ! {ReqId, Reply},
     Reply.
 
 merge_robjs(RObjs0,AllowMult) ->
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.