Anonymous avatar Anonymous committed a5c264d

use an erlang:send_after() timeout msg in get/put fsms instead of incrementally calcluating timeout

Comments (0)

Files changed (2)

src/riak_get_fsm.erl

                 repair_sent :: list(), 
                 final_obj :: undefined|riak_object:riak_object(),
                 timeout :: pos_integer(),
-                endtime :: pos_integer(),
+                tref    :: reference(),
                 bkey :: {riak_object:bucket(), riak_object:key()},
                 ring :: riak_ring:riak_ring()
                }).
 %% @private
 initialize(timeout, StateData0=#state{timeout=Timeout, req_id=ReqId,
                                       bkey={Bucket,Key}, ring=Ring}) ->
+    TRef = erlang:send_after(Timeout, self(), timeout),
     RealStartTime = riak_util:moment(),
     DocIdx = riak_util:chash_key({Bucket, Key}),
     riak_eventer:notify(riak_get_fsm, get_fsm_start,
                        preflist=Preflist,final_obj=undefined,
                        replied_r=[],replied_fail=[],
                        replied_notfound=[],starttime=riak_util:moment(),
-                       waiting_for=Sent,endtime=Timeout+riak_util:moment()},
-    {next_state,waiting_vnode_r,StateData,Timeout}.
+                       waiting_for=Sent,tref=TRef},
+    {next_state,waiting_vnode_r,StateData}.
 
 waiting_vnode_r({r, {ok, RObj}, Idx, ReqId},
                   StateData=#state{r=R,allowmult=AllowMult,
                                    req_id=ReqId,client=Client,
-                                   replied_r=Replied0, endtime=End}) ->
+                                   replied_r=Replied0}) ->
     Replied = [{RObj,Idx}|Replied0],
     case length(Replied) >= R of
         true ->
                                         {ReqId, ok})
             end,
             NewStateData = StateData#state{replied_r=Replied,final_obj=Final},
-            finalize(NewStateData, End);
+            finalize(NewStateData);
         false ->
             NewStateData = StateData#state{replied_r=Replied},
-            {next_state,waiting_vnode_r,NewStateData,End-riak_util:moment()}
+            {next_state,waiting_vnode_r,NewStateData}
     end;
 waiting_vnode_r({r, {error, notfound}, Idx, ReqId},
                   StateData=#state{r=R,replied_fail=Fails,
                                    req_id=ReqId,client=Client,n=N,
-                                   replied_notfound=Replied0,endtime=End}) ->
+                                   replied_notfound=Replied0}) ->
     Replied = [Idx|Replied0],
     NewStateData = StateData#state{replied_notfound=Replied},
     case (N - length(Replied) - length(Fails)) >= R of
         true ->
-            {next_state,waiting_vnode_r,NewStateData,End-riak_util:moment()};
+            {next_state,waiting_vnode_r,NewStateData};
         false ->
             riak_eventer:notify(riak_get_fsm, get_fsm_reply,
                                 {ReqId, notfound}),
 waiting_vnode_r({r, {error, Err}, Idx, ReqId},
                   StateData=#state{r=R,client=Client,n=N,
                                    replied_fail=Replied0,req_id=ReqId,
-                                   replied_notfound=NotFound,endtime=End}) ->
+                                   replied_notfound=NotFound}) ->
     Replied = [{Err,Idx}|Replied0],
     NewStateData = StateData#state{replied_fail=Replied},
     case (N - length(Replied) - length(NotFound)) >= R of
         true ->
-            {next_state,waiting_vnode_r,NewStateData,End-riak_util:moment()};
+            {next_state,waiting_vnode_r,NewStateData};
         false ->
             case length(NotFound) of
                 0 ->
     {stop,normal,StateData}.
 
 waiting_read_repair({r, {ok, RObj}, Idx, ReqId},
-                  StateData=#state{req_id=ReqId,replied_r=Replied0,
-                                   endtime=End}) ->
-    finalize(StateData#state{replied_r=[{RObj,Idx}|Replied0]}, End);
+                  StateData=#state{req_id=ReqId,replied_r=Replied0}) ->
+    finalize(StateData#state{replied_r=[{RObj,Idx}|Replied0]});
 waiting_read_repair({r, {error, notfound}, Idx, ReqId},
-                  StateData=#state{req_id=ReqId,replied_notfound=Replied0,
-                                   endtime=End}) ->
-    finalize(StateData#state{replied_notfound=[Idx|Replied0]}, End);
+                  StateData=#state{req_id=ReqId,replied_notfound=Replied0}) ->
+    finalize(StateData#state{replied_notfound=[Idx|Replied0]});
 waiting_read_repair({r, {error, Err}, Idx, ReqId},
-                  StateData=#state{req_id=ReqId,replied_fail=Replied0,
-                                   endtime=End}) ->
-    finalize(StateData#state{replied_fail=[{Err,Idx}|Replied0]}, End);
+                  StateData=#state{req_id=ReqId,replied_fail=Replied0}) ->
+    finalize(StateData#state{replied_fail=[{Err,Idx}|Replied0]});
 waiting_read_repair(timeout, StateData) ->
     finalize(StateData).
 
-finalize(StateData=#state{replied_r=R,replied_fail=F,replied_notfound=NF, n=N},
-         End) ->
+finalize(StateData=#state{replied_r=R,replied_fail=F,replied_notfound=NF, n=N}) ->
     case (length(R) + length(F) + length(NF)) >= N of
-        true -> finalize(StateData);
-        false -> {next_state,waiting_read_repair,
-                  StateData,End-riak_util:moment()}
+        true -> really_finalize(StateData);
+        false -> {next_state,waiting_read_repair,StateData}
     end.
-finalize(StateData=#state{final_obj=Final,
+really_finalize(StateData=#state{final_obj=Final,
                           replied_r=RepliedR,
                           bkey=BKey,
                           req_id=ReqId,
     {stop,badmsg,StateData}.
 
 %% @private
+handle_info(timeout, StateName, StateData) ->
+    ?MODULE:StateName(timeout, StateData);
+%% @private
 handle_info(_Info, _StateName, StateData) ->
     {stop,badmsg,StateData}.
 

src/riak_put_fsm.erl

                 replied_dw :: list(), 
                 replied_fail :: list(),
                 timeout :: pos_integer(), 
-                endtime :: pos_integer(), 
+                tref    :: reference(),
                 ring :: riak_ring:riak_ring()
                }).
 
     {ok,Ring} = riak_ring_manager:get_my_ring(),
     StateData = #state{robj=RObj0, client=Client, w=W, dw=DW,
                        req_id=ReqId, timeout=Timeout, ring=Ring},
+    
     {ok,initialize,StateData,0}.
 
 %% @private
 initialize(timeout, StateData0=#state{robj=RObj0, req_id=ReqId,
                                       timeout=Timeout, ring=Ring}) ->
+    TRef = erlang:send_after(Timeout, self(), timeout),
     RObj = update_metadata(RObj0),
     RealStartTime = riak_util:moment(),
     Bucket = riak_object:bucket(RObj),
                   robj=RObj, n=N, preflist=Preflist, bkey={Bucket,Key},
                   waiting_for=Sent, starttime=riak_util:moment(),
                   replied_w=[], replied_dw=[], replied_fail=[],
-                  endtime=Timeout+riak_util:moment()},
-    {next_state,waiting_vnode_w,StateData,Timeout}.
+                  tref=TRef},
+    {next_state,waiting_vnode_w,StateData}.
 
 waiting_vnode_w({w, Idx, ReqId},
                   StateData=#state{w=W,dw=DW,req_id=ReqId,client=Client, bkey={Bucket, Key},
-                                   replied_w=Replied0, endtime=End}) ->
+                                   replied_w=Replied0}) ->
     Replied = [Idx|Replied0],
     case length(Replied) >= W of
         true ->
                     {stop,normal,StateData};
                 _ ->
                     NewStateData = StateData#state{replied_w=Replied},
-                    {next_state,waiting_vnode_dw,NewStateData,
-                     End-riak_util:moment()}
+                    {next_state,waiting_vnode_dw,NewStateData}
+
             end;
         false ->
             NewStateData = StateData#state{replied_w=Replied},
-            {next_state,waiting_vnode_w,NewStateData,End-riak_util:moment()}
+            {next_state,waiting_vnode_w,NewStateData}
     end;
 waiting_vnode_w({dw, Idx, _ReqId},
-                  StateData=#state{replied_dw=Replied0, endtime=End}) ->
+                  StateData=#state{replied_dw=Replied0}) ->
     Replied = [Idx|Replied0],
     NewStateData = StateData#state{replied_dw=Replied},
-    {next_state,waiting_vnode_w,NewStateData,End-riak_util:moment()};
+    {next_state,waiting_vnode_w,NewStateData};
 waiting_vnode_w({fail, Idx, ReqId},
                   StateData=#state{n=N,w=W,client=Client,
-                                   replied_fail=Replied0,endtime=End}) ->
+                                   replied_fail=Replied0}) ->
     Replied = [Idx|Replied0],
     NewStateData = StateData#state{replied_fail=Replied},
     case (N - length(Replied)) >= W of
         true ->
-            {next_state,waiting_vnode_w,NewStateData,End-riak_util:moment()};
+            {next_state,waiting_vnode_w,NewStateData};
         false ->
             riak_eventer:notify(riak_put_fsm, put_fsm_reply,
                                 {ReqId, {error,too_many_fails,Replied}}),
     {stop,normal,StateData}.
 
 waiting_vnode_dw({w, _Idx, ReqId},
-          StateData=#state{req_id=ReqId, endtime=End}) ->
-    {next_state,waiting_vnode_dw,StateData,End-riak_util:moment()};
+          StateData=#state{req_id=ReqId}) ->
+    {next_state,waiting_vnode_dw,StateData};
 waiting_vnode_dw({dw, Idx, ReqId},
                  StateData=#state{dw=DW, client=Client, bkey={Bucket, Key},
-                                   replied_dw=Replied0, endtime=End}) ->
+                                   replied_dw=Replied0}) ->
     Replied = [Idx|Replied0],
     case length(Replied) >= DW of
         true ->
             {stop,normal,StateData};
         false ->
             NewStateData = StateData#state{replied_dw=Replied},
-            {next_state,waiting_vnode_dw,NewStateData,End-riak_util:moment()}
+            {next_state,waiting_vnode_dw,NewStateData}
     end;
 waiting_vnode_dw({fail, Idx, ReqId},
                   StateData=#state{n=N,dw=DW,client=Client,
-                                   replied_fail=Replied0,endtime=End}) ->
+                                   replied_fail=Replied0}) ->
     Replied = [Idx|Replied0],
     NewStateData = StateData#state{replied_fail=Replied},
     case (N - length(Replied)) >= DW of
         true ->
-            {next_state,waiting_vnode_dw,NewStateData,End-riak_util:moment()};
+            {next_state,waiting_vnode_dw,NewStateData};
         false ->
             riak_eventer:notify(riak_put_fsm, put_fsm_reply,
                                 {ReqId, {error,too_many_fails,Replied}}),
     {stop,badmsg,StateData}.
 
 %% @private
+
+handle_info(timeout, StateName, StateData) ->
+    ?MODULE:StateName(timeout, StateData);
 handle_info(_Info, _StateName, StateData) ->
     {stop,badmsg,StateData}.
 
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.