Commits

Anonymous committed e3beaf7

gossip simplification

  • Participants
  • Parent commits 4e2d390

Comments (0)

Files changed (3)

File src/chash.erl

 	 successors/2,successors/3,
 	 predecessors/2,predecessors/3,
 	 contains_name/2,key_of/1,
-	 merge_rings/3]).
+	 merge_rings/2]).
 
 -define(RINGTOP, trunc(math:pow(2,160)-1)).  % SHA-1 space
 
     {_NumPartitions, Nodes} = CHash,
     Nodes.
 
-% @doc Return the best possible merge of two rings based on a vclock.
+% @doc Return a randomized merge of two rings based on a vclock.
 %      If multiple nodes are actively claiming nodes in the same
-%      time period, some churn can occur.
-% @spec merge_rings(VClock :: vclock:vclock(),
-%                   CHashA :: chash(), CHashB :: chash()) -> chash()
-merge_rings(VClock,CHashA,CHashB) ->
+%      time period, churn will occur.  Be prepared to live with it.
+% @spec merge_rings(CHashA :: chash(), CHashB :: chash()) -> chash()
+merge_rings(CHashA,CHashB) ->
     {NumPartitions, NodesA} = CHashA,
     {NumPartitions, NodesB} = CHashB,
-    {NumPartitions, [{I,recentnode(VClock,A,B)} || 
+    {NumPartitions, [{I,randomnode(A,B)} || 
 		   {{I,A},{I,B}} <- lists:zip(NodesA,NodesB)]}.
 
-% @spec recentnode(VClock :: vclock:vclock(), NodeA :: node(),
-%                   NodeB :: node()) -> node()
-recentnode(VClock,NodeA,NodeB) ->
-    % note: this can result in node "theft" -- this is okay!
-    % the idea is to gossip often, leading to convergence.
-    CtrA = vclock:get_counter(NodeA,VClock),
-    CtrB = vclock:get_counter(NodeB,VClock),
-    if
-	CtrA > CtrB ->
-	    NodeA;
-	true ->
-	    NodeB
-    end.
+% @spec randomnode(NodeA :: node(), NodeB :: node()) -> node()
+randomnode(NodeA,NodeA) -> NodeA;
+randomnode(NodeA,NodeB) -> lists:nth(crypto:rand_uniform(1,3),[NodeA,NodeB]).
 
 % @doc Return the number of partitions in the ring.
 % @spec size(CHash :: chash()) -> integer()

File src/riak_ring.erl

 -module(riak_ring).
 -include_lib("eunit/include/eunit.hrl").
 
--export([fresh/0,fresh/1,fresh/2,fresh_from_extern/1,fresh_from_extern/2,
+-export([fresh/0,fresh/1,fresh/2,preflist/2,filtered_preflist/3,
 	 owner_node/1,all_members/1,num_partitions/1,all_owners/1,
-	 preflist/2,filtered_preflist/3,
          transfer_node/3,reconcile/2, my_indices/1,
 	 index_owner/2,diff_nodes/2,random_node/1, random_other_index/1,
          get_meta/2, update_meta/3]).	 
                  meta}). % dict of cluster-wide other data
                          % (primarily bucket N-value, etc)
 
-sequence_test() ->
-    test_ensure(crypto),
-    I1 = 365375409332725729550921208179070754913983135744,
-    I2 = 730750818665451459101842416358141509827966271488,
-    A = fresh(4,a),
-    B1 = fresh_from_extern(A,b),
-    B2 = transfer_node(I1, b, B1),
-    {new_ring, A1} = reconcile(B1,A),
-    C1 = fresh_from_extern(A,c),
-    C2 = transfer_node(I1, c, C1),
-    {new_ring, A2} = reconcile(C2,A1),
-    {new_ring, A3} = reconcile(B2,A2),
-    C3 = transfer_node(I2,c,C2),
-    {new_ring, C4} = reconcile(A3,C3),
-    {new_ring, A4} = reconcile(C4,A3),
-    {new_ring, B3} = reconcile(A4,B2),
-    (A4#hstate.ring == B3#hstate.ring) == (
-      B3#hstate.ring == C4#hstate.ring).
-
-% @spec test_ensure(App :: atom()) -> ok
-test_ensure(App) ->
-    case [X || {X,_,_} <- application:which_applications(), X == App] of
-	[] ->
-	    application:start(App);
-	_ ->
-	    ok
-    end.
+% need to re-create this test now that we no longer have fresh_from_extern
+%% sequence_test() ->
+%%     test_ensure(crypto),
+%%     I1 = 365375409332725729550921208179070754913983135744,
+%%     I2 = 730750818665451459101842416358141509827966271488,
+%%     A = fresh(4,a),
+%%     B1 = fresh_from_extern(A,b),
+%%     B2 = transfer_node(I1, b, B1),
+%%     {new_ring, A1} = reconcile(B1,A),
+%%     C1 = fresh_from_extern(A,c),
+%%     C2 = transfer_node(I1, c, C1),
+%%     {new_ring, A2} = reconcile(C2,A1),
+%%     {new_ring, A3} = reconcile(B2,A2),
+%%     C3 = transfer_node(I2,c,C2),
+%%     {new_ring, C4} = reconcile(A3,C3),
+%%     {new_ring, A4} = reconcile(C4,A3),
+%%     {new_ring, B3} = reconcile(A4,B2),
+%%     (A4#hstate.ring == B3#hstate.ring) == (
+%%       B3#hstate.ring == C4#hstate.ring).
+%% % @spec test_ensure(App :: atom()) -> ok
+%% test_ensure(App) ->
+%%     case [X || {X,_,_} <- application:which_applications(), X == App] of
+%% 	[] ->
+%% 	    application:start(App);
+%% 	_ ->
+%% 	    ok
+%%     end.
 
 % @doc This is used only when this node is creating a brand new cluster.
 % @spec fresh() -> hstate()
 % @spec fresh(RingSize :: integer(), NodeName :: term()) -> hstate()
 fresh(RingSize, NodeName) ->
     #hstate{nodename=NodeName,
-	    vclock=vclock:increment(NodeName,vclock:fresh()),
+	    vclock=vclock:fresh(),
 	    ring=chash:fresh(RingSize, NodeName),
             meta=dict:new()}.
 
-% @doc This is used when a node is joining an existing cluster, either
-%      as a new member or upon restarting.
-% @spec fresh_from_extern(ExtState :: hstate()) -> hstate()
-fresh_from_extern(ExtState) ->
-    fresh_from_extern(ExtState,node()).
-% @doc Equivalent to fresh_from_extern/1 but allows specification of the
-%      local node name.  Called by fresh_from_extern/1, and otherwise
-%      only intended for testing purposes.
-% @spec fresh_from_extern(ExtState :: hstate(), NodeName :: term()) -> hstate()
-fresh_from_extern(ExtState,NodeName) ->
-    #hstate{nodename=NodeName,
-	    vclock=ExtState#hstate.vclock,
-	    ring=ExtState#hstate.ring,
-            meta=ExtState#hstate.meta}.
-
 % @doc Return all partition indices owned by the node executing this function.
 % @spec my_indices(State :: hstate()) -> [integer()]
 my_indices(State) ->
     case ancestors([ExternState, MyState]) of
         [OlderState] ->
           case vclock:equal(OlderState#hstate.vclock,MyState#hstate.vclock) of
-              true -> 
-                  {new_ring, #hstate{nodename=MyState#hstate.nodename,
-                                     vclock=ExternState#hstate.vclock,
-                                     ring=ExternState#hstate.ring,
-                                     meta=ExternState#hstate.meta}};
-              false ->
-                  {no_change, MyState}
+              true -> {new_ring, #hstate{nodename=MyState#hstate.nodename,
+                                         vclock=ExternState#hstate.vclock,
+                                         ring=ExternState#hstate.ring,
+                                         meta=ExternState#hstate.meta}};
+              false -> {no_change, MyState}
           end;
 	[] -> 
             case equal_rings(ExternState,MyState) of
     VClock = vclock:increment(MyNodeName,
 				 vclock:merge([StateA#hstate.vclock,
 					       StateB#hstate.vclock])),
-    Ring = chash:merge_rings(VClock,StateA#hstate.ring,StateB#hstate.ring),
+    Ring = chash:merge_rings(StateA#hstate.ring,StateB#hstate.ring),
     Meta = merge_meta(StateA#hstate.meta, StateB#hstate.meta),
     #hstate{nodename=MyNodeName,
 	    vclock=VClock,

File src/riak_ring_gossiper.erl

                     riak_ring_manager:set_my_ring(NewRing),
                     {ok, MyNewRing} = maybe_claim(),
                     riak_ring_manager:set_my_ring(MyNewRing),
-                    Changed = riak_ring:diff_nodes(MyNewRing, NewRing),
-                    [gossip_to(X) ||
-                        X <- [riak_ring:random_node(MyNewRing)|Changed],
-                        X =/= node()],
+                    Me = node(),
+                    case riak_ring:random_node(MyNewRing) of
+                        Me -> nop;
+                        RandNode -> gossip_to(RandNode)
+                    end,
                     riak_eventer:notify(riak_ring_gossiper, changed_ring, 
-                                        {gossip_changed, length(Changed)}),
+                                        gossip_changed),
                     loop(write)
             end;
-        {set_ring, ExternRing} ->
-            riak_eventer:notify(riak_ring_gossiper,
-                                fresh_ring, fresh_ring),
-            riak_ring_manager:set_my_ring(
-              riak_ring:fresh_from_extern(ExternRing, node())),
-            {ok, MyNewRing} = maybe_claim(),
-            riak_ring_manager:set_my_ring(MyNewRing),
-            Changed = riak_ring:diff_nodes(MyNewRing, ExternRing),
-            [gossip_to(X) ||
-                X <- [riak_ring:random_node(MyNewRing)|Changed],
-                X =/= node()],
-            riak_eventer:notify(riak_ring_gossiper, changed_ring, 
-                                {set_changed, length(Changed)}),
-            loop(write);
         {get_ring, RemoteNode} ->
-            set_remote_ring(RemoteNode),
+            gossip_to(RemoteNode),
             loop(Write)
     after Interval ->
             riak_eventer:notify(riak_ring_gossiper, interval, interval),
     {ok, MyRing} = riak_ring_manager:get_my_ring(),
     riak_connect:cast(RemoteNode, {gossip_ring, MyRing}).
 
-set_remote_ring(RemoteNode) ->
-    riak_eventer:notify(riak_ring_gossiper, set_remote_ring, RemoteNode),
-    {ok, MyRing} = riak_ring_manager:get_my_ring(),
-    riak_connect:cast(RemoteNode, {set_ring, MyRing}).
-
 get_ring_from(RemoteNode) ->
     riak_eventer:notify(riak_ring_gossiper, get_remote_ring, RemoteNode),
     riak_connect:cast(RemoteNode, {get_ring, node()}).