Commits

Anonymous committed a2db186

shorter timeout on failure to acquire handoff lock

Comments (0)

Files changed (3)

apps/riak/src/riak_dets_backend.erl

                                    {min_no_slots, 8192},
                                    {max_no_slots, 16777216}]) of
         {ok, DetsName} ->
-            io:format("~p~n", [dets:info(DetsName)]),
             ok = dets:sync(DetsName),
             {ok, #state{table=DetsName, path=TablePath}};
         {error, Reason}  ->

apps/riak/src/riak_handoff_sender.erl

 -include("riakserver_pb.hrl").
 
 start_link(TargetNode, Partition, BKeyList) ->
-    Self = self(),
-    spawn_link(fun() -> start_fold(TargetNode, Partition, BKeyList, Self) end).
+    io:format("about to acquire lock~n"),
+    case global:set_lock({handoff_token, {node(), Partition}}, [node()], 0) of
+        true ->
+            Self = self(),
+            {ok, spawn_link(fun() -> start_fold(TargetNode, Partition, BKeyList, Self) end)};
+        false ->
+            io:format("failed to acquire handoff lock token~n"),
+            {error, locked}
+    end.
+            
 
 start_fold(TargetNode, Partition, BKeyList, ParentPid) ->
+
     [_Name,Host] = string:tokens(atom_to_list(TargetNode), "@"),
     {ok, Port} = get_handoff_port(TargetNode),
     {ok, Socket} = gen_tcp:connect(Host, Port, 

apps/riak/src/riak_vnode.erl

 -export([active/2,active/3]).
 
 -define(TIMEOUT, 60000).
+-define(LOCK_RETRY_TIMEOUT, 10000).
 
 -record(state, {idx,mapcache,mod,modstate,handoff_q}).
 
         true ->
             delete_and_exit(StateData);
         false ->
-            riak_handoff_sender:start_link(TargetNode, Idx, all),
-            {next_state,active,StateData#state{handoff_q=[]},?TIMEOUT}
+            {HQ,TO} = case riak_handoff_sender:start_link(TargetNode, Idx, all) of
+                {ok, _Pid} -> {[], ?TIMEOUT};
+                {error, locked} -> {not_in_handoff, ?LOCK_RETRY_TIMEOUT}
+            end,
+            {next_state,active,StateData#state{handoff_q=HQ},TO}
     end.
 
 %% @private
         [] ->
             delete_and_exit(StateData);
         _ ->
-            riak_handoff_sender:start_link(TargetNode, Idx, BKeyList),
-            {next_state,active,StateData#state{handoff_q=[]},?TIMEOUT}
+            {HQ,TO} = case riak_handoff_sender:start_link(TargetNode, Idx, all) of
+                {ok, _Pid} -> {[], ?TIMEOUT};
+                {error, locked} -> {not_in_handoff, ?LOCK_RETRY_TIMEOUT}
+            end,
+            {next_state,active,StateData#state{handoff_q=HQ},TO}
     end.
 
 %% @private
        StateData=#state{mapcache=Cache,mod=Mod,modstate=ModState}) ->
     do_map(ClientPid,QTerm,BKey,KeyData,Cache,Mod,ModState,self()),
     {next_state,active,StateData,?TIMEOUT};
-active(handoff_complete, StateData) ->
+active(handoff_complete, StateData=#state{idx=Idx}) ->
+    global:del_lock({handoff_token, {node(), Idx}}),
     hometest(StateData);
 active({put, FSM_pid, BKey, RObj, ReqID, FSMTime},
        StateData=#state{idx=Idx,mapcache=Cache,handoff_q=HQ0}) ->
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.