Commits

Anonymous committed 9e3d7fd Merge

merge from bitbucket

Comments (0)

Files changed (6)

 82100e18113bf79f5c790e0f4858b0f33854fced riak-0.7.1
 82100e18113bf79f5c790e0f4858b0f33854fced riak-0.7.1
 b6515e496da1cc5cb2203c345b6bb0ef407785b4 riak-0.7.1
+b6515e496da1cc5cb2203c345b6bb0ef407785b4 riak-0.7.1
+e1357f00129f7f99090698f190302df042e125b4 riak-0.7.1

apps/protobuffs/rebar.config

-{erl_opts, [debug_info]}.
+{erl_opts, [debug_info, fail_on_warning]}.

apps/protobuffs/src/protobuffs_compile.erl

 expand_encode_function(Msgs, Line, Clause) ->
     {function,Line,encode,2,[filter_encode_clause(Msg, Clause) || Msg <- Msgs]}.
     
-filter_encode_clause({MsgName, Fields}, {clause,L,_Args,Guards,_Content}) ->
+filter_encode_clause({MsgName, _Fields}, {clause,L,_Args,Guards,_Content}) ->
     ToBin = {call,L,{atom,L,iolist_to_binary},[{call,L,
                                                 {atom,L,iolist},
                                                 [{atom,L,atomize(MsgName)},{var,L,'Record'}]}]},

apps/riak/ebin/riak.app

          {wants_claim_fun, {riak_claim, default_wants_claim}},
          {choose_claim_fun, {riak_claim, default_choose_claim}},
 
+         %% Number of VNodes allowed to do handoff concurrently.
+         {handoff_concurrency, 4},
+
          %% Secondary code paths
          {add_paths, []}
         ]}

apps/riak/src/riak_handoff_sender.erl

 -module(riak_handoff_sender).
 -export([start_link/3]).
 -include("riakserver_pb.hrl").
+-define(ACK_COUNT, 1000).
 
 start_link(TargetNode, Partition, BKeyList) ->
-    case global:set_lock({handoff_token, {node(), Partition}}, [node()], 0) of
-        true ->
+    TokenCount = riak:get_app_env(handoff_concurrency, 4),
+    case get_handoff_lock(Partition, TokenCount) of
+        {ok, HandoffToken} ->
             Self = self(),
-            {ok, spawn_link(fun()->start_fold(TargetNode, Partition, BKeyList, Self) end)};
-        false ->
+            Pid = spawn_link(fun()->start_fold(TargetNode, Partition, BKeyList, Self) end),
+            {ok, Pid, HandoffToken};
+        {error, max_concurrency} ->
             {error, locked}
     end.
-            
+
+get_handoff_lock(_Partition, 0) ->
+    {error, max_concurrency};
+get_handoff_lock(Partition, Count) ->
+    case global:set_lock({{handoff_token, Count}, {node(), Partition}}, [node()], 0) of
+        true ->
+            {ok, {handoff_token, Count}};
+        false ->
+            get_handoff_lock(Partition, Count-1)
+    end.
 
 start_fold(TargetNode, Partition, BKeyList, ParentPid) ->
     [_Name,Host] = string:tokens(atom_to_list(TargetNode), "@"),
 folder({B,K}, V, AccIn) ->
     visit_item({B,K}, V, AccIn).
 
-visit_item({B,K}, V, {Socket, ParentPid, 100}) ->
+visit_item({B,K}, V, {Socket, ParentPid, ?ACK_COUNT}) ->
     M = <<2:8,"sync">>,
     ok = gen_tcp:send(Socket, M),
     inet:setopts(Socket, [{active, false}]),

apps/riak/src/riak_vnode.erl

 -define(TIMEOUT, 60000).
 -define(LOCK_RETRY_TIMEOUT, 10000).
 
--record(state, {idx,mapcache,mod,modstate,handoff_q}).
+-record(state, {idx,mapcache,mod,modstate,handoff_q,handoff_token}).
 
 start_link(Idx) -> gen_fsm:start_link(?MODULE, [Idx], []).
 
         true ->
             delete_and_exit(StateData);
         false ->
-            {HQ,TO} = case riak_handoff_sender:start_link(TargetNode, Idx, all) of
-                {ok, _Pid} -> {[], ?TIMEOUT};
-                {error, locked} -> {not_in_handoff, ?LOCK_RETRY_TIMEOUT}
+            {HQ,TO,HT} = case riak_handoff_sender:start_link(TargetNode, Idx, all) of
+                {ok, _Pid, HandoffToken} -> {[], ?TIMEOUT, HandoffToken};
+                {error, locked} -> {not_in_handoff, ?LOCK_RETRY_TIMEOUT, undefined}
             end,
-            {next_state,active,StateData#state{handoff_q=HQ},TO}
+            {next_state,active,StateData#state{handoff_q=HQ,handoff_token=HT},TO}
     end.
 
 %% @private
 active({map, ClientPid, QTerm, BKey, KeyData}, StateData) ->
     NewState = do_map(ClientPid,QTerm,BKey,KeyData,StateData,self()),
     {next_state,active,NewState,?TIMEOUT};
-active(handoff_complete, StateData=#state{idx=Idx}) ->
-    global:del_lock({handoff_token, {node(), Idx}}),
+active(handoff_complete, StateData=#state{idx=Idx,handoff_token=HT}) ->
+    global:del_lock({HT, {node(), Idx}}),
     hometest(StateData);
 active({put, FSM_pid, BKey, RObj, ReqID, FSMTime},
        StateData=#state{idx=Idx,mapcache=Cache,handoff_q=HQ0}) ->