Anonymous avatar Anonymous committed b8e14ea

new remove_from_cluster that attempts to preserve target_n_val guarantees

Comments (0)

Files changed (2)

src/riak_claim.erl

 -module(riak_claim).
 -export([default_wants_claim/1, default_choose_claim/1,
          never_wants_claim/1, random_choose_claim/1]).
+-export([default_choose_claim/2,
+         claim_rebalance_n/2]).
 
 -include_lib("eunit/include/eunit.hrl").
 
 %% @spec default_choose_claim(riak_ring()) -> riak_ring()
 %% @doc Choose a partition at random.
 default_choose_claim(Ring) ->
+    default_choose_claim(Ring, node()).
+
+default_choose_claim(Ring, Node) ->
     TargetN = riak:get_app_env(target_n_val, 3),
     case meets_target_n(Ring, TargetN) of
         {true, TailViolations} ->
             %% we claim vnodes, as long as we don't violate the
             %% target N with any of our additions
             %% (== claim partitions at least N steps apart)
-            claim_with_n_met(Ring, TailViolations);
+            claim_with_n_met(Ring, TailViolations, Node);
         false ->
             %% we don't meet target N yet, rebalance
-            claim_rebalance_n(Ring)
+            claim_rebalance_n(Ring, Node)
     end.
 
 meets_target_n(Ring, TargetN) ->
                      Last),
     {true, [ Part || {_, _, Part} <- Violations ]}.
 
-claim_with_n_met(Ring, TailViolations) ->
+claim_with_n_met(Ring, TailViolations, Node) ->
     CurrentOwners = lists:keysort(1, riak_ring:all_owners(Ring)),
-    Node = node(),
     Nodes = lists:usort([Node|riak_ring:all_members(Ring)]),
     case lists:sort([ I || {I, N} <- CurrentOwners, N == Node ]) of
         [] ->
             %% node already has claims - respect them
             %% pick biggest hole & sit in the middle
             %% rebalance will cure any mistake on the next pass
-            claim_hole(Ring, Mine, CurrentOwners)
+            claim_hole(Ring, Mine, CurrentOwners, Node)
     end.
 
-claim_hole(Ring, Mine, Owners) ->
+claim_hole(Ring, Mine, Owners, Node) ->
     Choices = case find_biggest_hole(Mine) of
                   {I0, I1} when I0 < I1 ->
                       %% start-middle of the ring
               end,
     Half = length(Choices) div 2,
     {I, _} = lists:nth(Half, Choices),
-    riak_ring:transfer_node(I, node(), Ring).
+    riak_ring:transfer_node(I, Node, Ring).
 
 find_biggest_hole(Mine) ->
     lists:foldl(fun({I0, I1}, none) ->
                 none,
                 lists:zip(Mine, tl(Mine)++[hd(Mine)])).
 
-claim_rebalance_n(Ring) ->
+claim_rebalance_n(Ring, Node) ->
     %% diagonal stripes guarantee most disperse data
-    Nodes = lists:usort([node()|riak_ring:all_members(Ring)]),
+    Nodes = lists:usort([Node|riak_ring:all_members(Ring)]),
     Partitions = lists:sort([ I || {I, _} <- riak_ring:all_owners(Ring) ]),
     Zipped = lists:zip(Partitions,
                        lists:sublist(

src/riak_connect.erl

     Indices = [I || {I,Owner} <- AllOwners, Owner =:= ExitingNode],
     riak_eventer:notify(riak_connect, remove_from_cluster,
                         {ExitingNode, length(Indices)}),
-                        
-    
+
     % Transfer indexes to other nodes...
-    Others = lists:delete(ExitingNode, riak_ring:all_members(Ring)),
-    F = fun(Index, Acc) ->
-        RandomNode = lists:nth(crypto:rand_uniform(1,length(Others)+1),Others),
-        riak_ring:transfer_node(Index, RandomNode ,Acc)    
-    end,
-    ExitRing = lists:foldl(F, Ring, Indices),
+    ExitRing = 
+        case attempt_simple_transfer(Ring, AllOwners, ExitingNode) of
+            {ok, NR} -> NR;
+            target_n_fail ->
+                %% re-diagonalize
+                %% first hand off all claims to *any* one else,
+                %% just so rebalance doesn't include exiting node
+                Other = hd(lists:delete(ExitingNode, riak_ring:all_members(Ring))),
+                TempRing = lists:foldl(fun({I,N}, R) when N == ExitingNode ->
+                                               riak_ring:transfer_node(
+                                                 I, Other, R);
+                                          (_, R) -> R
+                                       end,
+                                       Ring,
+                                       AllOwners),
+                riak_claim:claim_rebalance_n(TempRing, Other)
+        end,
     riak_ring_manager:set_my_ring(ExitRing),
 
     % Send the ring to all other rings...
     [send_ring(X) || X <- riak_ring:all_members(Ring)],
     
-    %% Is this line right?
+    %% This line is right!
     [gen_server:cast({riak_vnode_master, ExitingNode}, {start_vnode, P}) ||
         P <- AllIndices].    
+
+attempt_simple_transfer(Ring, Owners, ExitingNode) ->
+    attempt_simple_transfer(Ring, Owners,
+                            riak:get_app_env(target_n_val, 3),
+                            ExitingNode, 0, []).
+attempt_simple_transfer(Ring, [{P, Exit}|Rest], TargetN, Exit, Idx, Last) ->
+    %% handoff
+    case [ N || {N, I} <- Last, Idx-I >= TargetN ] of
+        [] ->
+            target_n_fail;
+        Candidates ->
+            %% these nodes don't violate target_n in the reverse direction
+            StepsToNext = fun(Node) ->
+                                  length(lists:takewhile(
+                                           fun({_, Owner}) -> Node /= Owner end,
+                                           Rest))
+                          end,
+            case lists:filter(fun(N) -> 
+                                      Next = StepsToNext(N),
+                                      (Next >= TargetN) orelse (Next == length(Rest))
+                              end,
+                              Candidates) of
+                [] ->
+                    target_n_fail;
+                Qualifiers ->
+                    %% these nodes don't violate target_n in the forward direction
+                    Chosen = lists:nth(crypto:rand_uniform(1, length(Qualifiers)+1),
+                                       Qualifiers),
+                    %% choose one, and do the rest of the ring
+                    attempt_simple_transfer(
+                      riak_ring:transfer_node(P, Chosen, Ring),
+                      Rest, TargetN, Exit, Idx+1,
+                      lists:keyreplace(Chosen, 1, Last, {Chosen, Idx}))
+            end
+    end;
+attempt_simple_transfer(Ring, [{_, N}|Rest], TargetN, Exit, Idx, Last) ->
+    %% just keep track of seeing this node
+    attempt_simple_transfer(Ring, Rest, TargetN, Exit, Idx+1,
+                            lists:keyreplace(N, 1, Last, {N, Idx}));
+attempt_simple_transfer(Ring, [], _, _, _, _) ->
+    {ok, Ring}.
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.