Anonymous avatar Anonymous committed 07bb099 Merge

merge

Comments (0)

Files changed (6)

src/riak_claim.erl

 %%      {yes, Integer} or 'no' where Integer is the number of
 %%       additional partitions wanted by this node.
 %%      A choose_claim function should return a riak_ring with
-%%      one more partition claimed by this node than in the input ring.
+%%      more partitions claimed by this node than in the input ring.
+
+% The usual intention for partition ownership assumes relative
+% heterogeneity of capacity and connectivity.  Accordingly, the
+% standard claim functions attempt to maximize "spread" -- expected
+% distance between partitions claimed by each given node.  This
+% is in order to produce the expectation that for any reasonably
+% short span of consecutive partitions, there will be a minimal
+% number of partitions owned by the same node.
+
+% The exact amount that is considered tolerable is determined by the
+% application env variable "target_n_val".  The functions in
+% riak_claim will ensure that all sequences up to target_n_val long
+% contain no repeats if at all possible.  The effect of this is that
+% when the number of nodes in the system is smaller than target_n_val,
+% a potentially large number of partitions must be moved in order to
+% safely add a new node.  After the cluster has grown beyond that size,
+% a minimal number of partitions (1/NumNodes) will generally be moved.
+
+% If the number of nodes does not divide evenly into the number of
+% partitions, it may not be possible to perfectly achieve the maximum
+% spread constraint.  In that case, Riak will minimize the cases where
+% the constraint is violated and they will all exist near the origin
+% point of the ring.
+
+% A good way to decide on the setting of target_n_val for your
+% application is to set it to the largest value you expect to use for
+% any bucket's n_val.  The default is 3.
 
 -module(riak_claim).
 -export([default_wants_claim/1, default_choose_claim/1,
-         never_wants_claim/1]).
+         never_wants_claim/1, random_choose_claim/1]).
+-export([default_choose_claim/2,
+         claim_rebalance_n/2]).
 
 -include_lib("eunit/include/eunit.hrl").
 
 %% @spec default_choose_claim(riak_ring()) -> riak_ring()
 %% @doc Choose a partition at random.
 default_choose_claim(Ring) ->
+    default_choose_claim(Ring, node()).
+
+default_choose_claim(Ring, Node) ->
+    TargetN = riak:get_app_env(target_n_val, 3),
+    case meets_target_n(Ring, TargetN) of
+        {true, TailViolations} ->
+            %% if target N is met, then it doesn't matter where
+            %% we claim vnodes, as long as we don't violate the
+            %% target N with any of our additions
+            %% (== claim partitions at least N steps apart)
+            claim_with_n_met(Ring, TailViolations, Node);
+        false ->
+            %% we don't meet target N yet, rebalance
+            claim_rebalance_n(Ring, Node)
+    end.
+
+meets_target_n(Ring, TargetN) ->
+    Owners = lists:keysort(1, riak_ring:all_owners(Ring)),
+    meets_target_n(Owners, TargetN, 0, [], []).
+meets_target_n([{Part,Node}|Rest], TargetN, Index, First, Last) ->
+    case lists:keytake(Node, 1, Last) of
+        {value, {Node, LastIndex, _}, NewLast} ->
+            if Index-LastIndex >= TargetN ->
+                    %% node repeat respects TargetN
+                    meets_target_n(Rest, TargetN, Index+1, First,
+                                   [{Node, Index, Part}|NewLast]);
+               true ->
+                    %% violation of TargetN
+                    false
+            end;
+        false ->
+            %% haven't seen this node yet
+            meets_target_n(Rest, TargetN, Index+1,
+                           [{Node, Index}|First], [{Node, Index, Part}|Last])
+    end;
+meets_target_n([], TargetN, Index, First, Last) ->
+    %% start through end guarantees TargetN
+    %% compute violations at wrap around, but don't fail
+    %% because of them: handle during reclaim
+    Violations = 
+        lists:filter(fun({Node, L, _}) ->
+                             {Node, F} = proplists:lookup(Node, First),
+                             (Index-L)+F < TargetN
+                     end,
+                     Last),
+    {true, [ Part || {_, _, Part} <- Violations ]}.
+
+claim_with_n_met(Ring, TailViolations, Node) ->
+    CurrentOwners = lists:keysort(1, riak_ring:all_owners(Ring)),
+    Nodes = lists:usort([Node|riak_ring:all_members(Ring)]),
+    case lists:sort([ I || {I, N} <- CurrentOwners, N == Node ]) of
+        [] ->
+            %% node hasn't claimed anything yet - just claim stuff
+            Spacing = length(Nodes),
+            [{First,_}|OwnList] =
+                case TailViolations of
+                    [] ->
+                        %% no wrap-around problems - choose whatever
+                        lists:nthtail(Spacing-1, CurrentOwners);
+                    [TV|_] ->
+                        %% attempt to cure a wrap-around problem
+                        lists:dropwhile(
+                             fun({I, _}) -> I /= TV end,
+                             lists:reverse(CurrentOwners))
+                end,
+            {_, NewRing} = lists:foldl(
+                             fun({I, _}, {0, Acc}) ->
+                                     {Spacing, riak_ring:transfer_node(I, Node, Acc)};
+                                (_, {S, Acc}) ->
+                                     {S-1, Acc}
+                             end,
+                             {Spacing, riak_ring:transfer_node(First, Node, Ring)},
+                             OwnList),
+            NewRing;
+        Mine ->
+            %% node already has claims - respect them
+            %% pick biggest hole & sit in the middle
+            %% rebalance will cure any mistake on the next pass
+            claim_hole(Ring, Mine, CurrentOwners, Node)
+    end.
+
+claim_hole(Ring, Mine, Owners, Node) ->
+    Choices = case find_biggest_hole(Mine) of
+                  {I0, I1} when I0 < I1 ->
+                      %% start-middle of the ring
+                      lists:takewhile(
+                        fun({I, _}) -> I /= I1 end,
+                        tl(lists:dropwhile(
+                             fun({I, _}) -> I /= I0 end,
+                             Owners)));
+                  {I0, I1} when I0 > I1 ->
+                      %% wrap-around end-start of the ring
+                      tl(lists:dropwhile(
+                           fun({I, _}) -> I /= I0 end, Owners))
+                          ++lists:takewhile(
+                              fun({I, _}) -> I /= I1 end, Owners);
+                  {I0, I0} ->
+                      %% node only has one claim
+                      {Start, End} = 
+                          lists:splitwith(
+                            fun({I, _}) -> I /= I0 end,
+                            Owners),
+                      tl(End)++Start
+              end,
+    Half = length(Choices) div 2,
+    {I, _} = lists:nth(Half, Choices),
+    riak_ring:transfer_node(I, Node, Ring).
+
+find_biggest_hole(Mine) ->
+    lists:foldl(fun({I0, I1}, none) ->
+                        {I0, I1};
+                   ({I0, I1}, {C0, C1}) when I0 < I1->
+                        %% start-middle of the ring
+                        if I1-I0 > C1-C0 ->
+                                {I0, I1};
+                           true ->
+                                {C0, C1}
+                        end;
+                   ({I0, I1}, {C0, C1}) ->
+                        %% wrap-around end-start of the ring
+                        Span = I0+trunc(math:pow(2, 160))-1-I1,
+                        if Span > C1-C0 ->
+                                {I0, I1};
+                           true ->
+                                {C0, C1}
+                        end
+                end,
+                none,
+                lists:zip(Mine, tl(Mine)++[hd(Mine)])).
+
+claim_rebalance_n(Ring, Node) ->
+    %% diagonal stripes guarantee most disperse data
+    Nodes = lists:usort([Node|riak_ring:all_members(Ring)]),
+    Partitions = lists:sort([ I || {I, _} <- riak_ring:all_owners(Ring) ]),
+    Zipped = lists:zip(Partitions,
+                       lists:sublist(
+                         lists:flatten(
+                           lists:duplicate(
+                             1+(length(Partitions) div length(Nodes)),
+                             Nodes)),
+                         1, length(Partitions))),
+    lists:foldl(fun({P, N}, Acc) ->
+                        riak_ring:transfer_node(P, N, Acc)
+                end,
+                Ring,
+                Zipped).
+
+random_choose_claim(Ring) ->
     riak_ring:transfer_node(riak_ring:random_other_index(Ring),
-                              node(), Ring).
+                            node(), Ring).
 
 %% @spec never_wants_claim(riak_ring()) -> no
 %% @doc For use by nodes that should not claim any partitions.

src/riak_connect.erl

     case NeedsIndexes of
         no -> 
             Ring;
-        {yes, NumToClaim} ->
-            riak_eventer:notify(riak_ring, want_claim, NumToClaim),
-            claim_indexes(Ring, NumToClaim)
+        {yes, _NumToClaim} ->
+            {CMod, CFun} = riak:get_app_env(choose_claim_fun),
+            NewRing = CMod:CFun(Ring),
+            claim_until_balanced(NewRing)
     end.
 
-claim_indexes(Ring, 0) -> Ring;
-claim_indexes(Ring, NumToClaim) ->
-    {CMod, CFun} = riak:get_app_env(choose_claim_fun),
-    NewRing = CMod:CFun(Ring),
-    claim_indexes(NewRing, NumToClaim - 1).
-
 ensure_vnodes_started(Ring) ->
     VNodes2Start = case length(riak_ring:all_members(Ring)) of
        1 -> riak_ring:my_indices(Ring);
     Indices = [I || {I,Owner} <- AllOwners, Owner =:= ExitingNode],
     riak_eventer:notify(riak_connect, remove_from_cluster,
                         {ExitingNode, length(Indices)}),
-                        
-    
+
     % Transfer indexes to other nodes...
-    Others = lists:delete(ExitingNode, riak_ring:all_members(Ring)),
-    F = fun(Index, Acc) ->
-        RandomNode = lists:nth(crypto:rand_uniform(1,length(Others)+1),Others),
-        riak_ring:transfer_node(Index, RandomNode ,Acc)    
-    end,
-    ExitRing = lists:foldl(F, Ring, Indices),
+    ExitRing = 
+        case attempt_simple_transfer(Ring, AllOwners, ExitingNode) of
+            {ok, NR} -> NR;
+            target_n_fail ->
+                %% re-diagonalize
+                %% first hand off all claims to *any* one else,
+                %% just so rebalance doesn't include exiting node
+                Other = hd(lists:delete(ExitingNode, riak_ring:all_members(Ring))),
+                TempRing = lists:foldl(fun({I,N}, R) when N == ExitingNode ->
+                                               riak_ring:transfer_node(
+                                                 I, Other, R);
+                                          (_, R) -> R
+                                       end,
+                                       Ring,
+                                       AllOwners),
+                riak_claim:claim_rebalance_n(TempRing, Other)
+        end,
     riak_ring_manager:set_my_ring(ExitRing),
-    
+
     % Send the ring to all other rings...
     [send_ring(X) || X <- riak_ring:all_members(Ring)],
     
-    %% Is this line right?
+    %% This line is right!
     [gen_server:cast({riak_vnode_master, ExitingNode}, {start_vnode, P}) ||
-        P <- AllIndices].    
+        P <- AllIndices].    
+
+attempt_simple_transfer(Ring, Owners, ExitingNode) ->
+    attempt_simple_transfer(Ring, Owners,
+                            riak:get_app_env(target_n_val, 3),
+                            ExitingNode, 0, []).
+attempt_simple_transfer(Ring, [{P, Exit}|Rest], TargetN, Exit, Idx, Last) ->
+    %% handoff
+    case [ N || {N, I} <- Last, Idx-I >= TargetN ] of
+        [] ->
+            target_n_fail;
+        Candidates ->
+            %% these nodes don't violate target_n in the reverse direction
+            StepsToNext = fun(Node) ->
+                                  length(lists:takewhile(
+                                           fun({_, Owner}) -> Node /= Owner end,
+                                           Rest))
+                          end,
+            case lists:filter(fun(N) -> 
+                                 Next = StepsToNext(N),
+                                 (Next >= TargetN) orelse (Next == length(Rest))
+                              end,
+                              Candidates) of
+                [] ->
+                    target_n_fail;
+                Qualifiers ->
+                    %% these nodes don't violate target_n forward
+                    Chosen = lists:nth(crypto:rand_uniform(
+                                         1, length(Qualifiers)+1),
+                                       Qualifiers),
+                    %% choose one, and do the rest of the ring
+                    attempt_simple_transfer(
+                      riak_ring:transfer_node(P, Chosen, Ring),
+                      Rest, TargetN, Exit, Idx+1,
+                      lists:keyreplace(Chosen, 1, Last, {Chosen, Idx}))
+            end
+    end;
+attempt_simple_transfer(Ring, [{_, N}|Rest], TargetN, Exit, Idx, Last) ->
+    %% just keep track of seeing this node
+    attempt_simple_transfer(Ring, Rest, TargetN, Exit, Idx+1,
+                            lists:keyreplace(N, 1, Last, {N, Idx}));
+attempt_simple_transfer(Ring, [], _, _, _, _) ->
+    {ok, Ring}.

src/riak_get_fsm.erl

     BucketProps = riak_bucket:get_bucket(Bucket, Ring),
     N = proplists:get_value(n_val,BucketProps),
     AllowMult = proplists:get_value(allow_mult,BucketProps),
-    Preflist = riak_ring:filtered_preflist(DocIdx, Ring, N),
+    Preflist = riak_ring:preflist(DocIdx, Ring),
     {Targets, Fallbacks} = lists:split(N, Preflist),
     {Sent1, Pangs1} = riak_util:try_cast(vnode_get, Msg, nodes(), Targets),
     Sent = case length(Sent1) =:= N of   % Sent is [{Index,TargetNode,SentNode}]

src/riak_map_executor.erl

                         {link, LB, LT, LAcc} -> {map, LinkFun, {LB, LT}, LAcc}
                     end,
             N = proplists:get_value(n_val,BucketProps),
-            Preflist = riak_ring:filtered_preflist(DocIdx, Ring, N),
+            Preflist = riak_ring:preflist(DocIdx, Ring),
             {Targets, _} = lists:split(N, Preflist),
             VNodes = try_vnode(QTerm, {Bucket,Key}, KeyData, Targets),
             {ok,wait,

src/riak_put_fsm.erl

     DocIdx = riak_util:chash_key({Bucket, Key}),
     Msg = {self(), {Bucket,Key}, RObj, ReqId, RealStartTime},
     N = proplists:get_value(n_val,BucketProps),
-    Preflist = riak_ring:filtered_preflist(DocIdx, Ring, N),
+    Preflist = riak_ring:preflist(DocIdx, Ring),
     {Targets, Fallbacks} = lists:split(N, Preflist),
     {Sent1, Pangs1} = riak_util:try_cast(vnode_put, Msg, nodes(), Targets),
     Sent = case length(Sent1) =:= N of   % Sent is [{Index,TargetNode,SentNode}]

src/riak_ring.erl

 -module(riak_ring).
 -include_lib("eunit/include/eunit.hrl").
 
--export([fresh/0,fresh/1,fresh/2,preflist/2,filtered_preflist/3,
+-export([fresh/0,fresh/1,fresh/2,preflist/2,
 	 owner_node/1,all_members/1,num_partitions/1,all_owners/1,
          transfer_node/3,reconcile/2, my_indices/1,
 	 index_owner/2,diff_nodes/2,random_node/1, random_other_index/1,
 %      {partition,node} pairs that could be responsible for that object.
 % @spec preflist(Key :: binary(), State :: chstate()) ->
 %                                 [{Index :: integer(), Node :: term()}]
-preflist(Key, State) ->
-    chash:successors(Key, State#chstate.chring).
+preflist(Key, State) -> chash:successors(Key, State#chstate.chring).
 
-filtered_preflist(Key, State, N) ->
-    Preflist = preflist(Key, State),
-    Try1 = filtered_preflist1(Preflist, [], [], []),
-    case length(Try1) >= N of
-        true -> Try1;
-        false -> Preflist
-    end.
-filtered_preflist1([], _Seen, Passed, Acc) ->
-    Acc ++ Passed;
-filtered_preflist1([{I,Node}|Preflist], Seen, Passed, Acc) ->
-    case lists:member(Node, Seen) of
-        true -> filtered_preflist1(Preflist,Seen,[{I,Node}|Passed],Acc);
-        false -> filtered_preflist1(Preflist,[Node|Seen],Passed,[{I,Node}|Acc])
-    end.
-
-% @doc Transfer ownership of partition at Idx to Node.
 % @spec transfer_node(Idx :: integer(), Node :: term(), MyState :: chstate()) ->
 %           chstate()
 transfer_node(Idx, Node, MyState) ->
     ?assertEqual(x,index_owner(Ring1,0)),
     ?assertEqual(lists:sort([x,node()]),lists:sort(diff_nodes(Ring0,Ring1))).
 
-preflist_test() ->
-    IB = 274031556999544297163190906134303066185487351808,
-    IC = 1004782375664995756265033322492444576013453623296,
-    R = transfer_node(IB,b,transfer_node(IC,c,fresh(16,a))),
-    {FirstFour,_} = lists:split(4,[N || {_I,N} <- 
-                                     filtered_preflist(chash:key_of(0),R,4)]),
-    ?assertEqual([a,a,b,c],lists:sort(FirstFour)).
-
 reconcile_test() ->
     Ring0 = fresh(2,node()),
     Ring1 = transfer_node(0,x,Ring0),
                             merge_meta(Ring2#chstate.meta,
                                        Ring1#chstate.meta)})).
 
-full_preflist_test() ->
-    Ring0 = fresh(4,a),
-    I = 365375409332725729550921208179070754913983135744,
-    Ring = transfer_node(I,b,Ring0),
-    ?assertEqual(filtered_preflist(chash:key_of(zzzzzzzzz),Ring,2),
-                 filtered_preflist(chash:key_of(zzzzzzzzz),Ring,3)).
-
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.