Anonymous avatar Anonymous committed c16aea7

Combine riak_connect and riak_ring_gossiper, converting riak_ring_gossiper to a gen_server in the process.

Comments (0)

Files changed (10)

 start([ConfigPath]) ->
     application:set_env(riak, configpath, ConfigPath),
     start().
+    
 %% @spec start() -> ok
 %% @doc Start the riak server.
 start() ->
 %% @doc The application:stop callback for riak.
 stop(_State) -> ok.
 
+
 %% @spec read_config() -> ok
 %% @doc Read the riak erlenv configuration file and set environment variables.
 read_config() -> read_config(riak:get_app_env(configpath)).

src/riak_bucket.erl

     riak_ring_manager:set_my_ring(R1),
     riak_ring_manager:write_ringfile(),
     riak_eventer:notify(riak_bucket, set_bucket, {Name,BucketProps++PrunedOld}),
-    riak_ring_gossiper:gossip_to(riak_ring:random_node(R1)),
+    RandomNode = riak_ring:random_node(R1),
+    riak_connect:send_ring(RandomNode),
     ok.
     
 %% @spec get_bucket(riak_object:bucket()) ->

src/riak_client.erl

 %% @doc Cause all partitions owned by ExitingNode to be taken over
 %%      by other nodes.
 remove_from_cluster(ExitingNode) ->
-    rpc:call(Node,riak_ring_gossiper,remove_from_cluster,[ExitingNode]).
+    rpc:call(Node, riak_connect, remove_from_cluster,[ExitingNode]).
 
 %% @spec send_event(EventName::atom(), EventDetail::term()) -> ok
 %% @doc  Send a client-generated event to the Riak eventer.

src/riak_connect.erl

 %% specific language governing permissions and limitations
 %% under the License.    
 
-%% @doc This tiny server provides a gossip bridge between riak nodes.
+%% @doc
+%% riak_connect takes care of the mechanics of shuttling a
+%% from one node to another upon request by other
+%% Riak processes.
+%%
+%% Additionally, it occasionally checks to make sure the current
+%% node has its fair share of partitions, and also sends
+%% a copy of the ring to some other random node, ensuring
+%% that all nodes eventually synchronize on the same understanding
+%% of the Riak cluster. This interval is configurable, but defaults
+%% to once per minute.
 
 -module(riak_connect).
 
 -behaviour(gen_server).
--export([start_link/0]).
--export([init/1, handle_call/3, handle_cast/2, handle_info/2,
-	 terminate/2, code_change/3]).
--export([cast/2, stop/0]).
--record(state, {me}).
+-export([start_link/0, stop/0]).
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
+-export ([send_ring/1, send_ring/2, remove_from_cluster/1]).
 
 -include_lib("eunit/include/eunit.hrl").
 
-%% @private
-start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+%% send_ring/1 -
+%% Send the current node's ring to some other node.
+send_ring(ToNode) -> send_ring(node(), ToNode).
+
+%% send_ring/2 -
+%% Send the ring from one node to another node. 
+%% Does nothing if the two nodes are the same.
+send_ring(Node, Node) -> ok;
+send_ring(FromNode, ToNode) ->
+    gen_server:cast({?SERVER, FromNode}, {send_ring_to, ToNode}).
+    
 
 %% @private
-init([]) -> {ok, #state{me=node()}}.
+start_link() -> 
+    gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
+    
+%% @private
+init(_State) -> 
+    schedule_next_gossip(),
+    {ok, false}.
+    
+schedule_next_gossip() ->
+    MaxInterval = riak:get_app_env(gossip_interval),
+    Interval = random:uniform(MaxInterval),
+    timer:apply_after(Interval, gen_server, cast, [?SERVER, gossip_ring]). 
 
-stop() -> gen_server:cast(?MODULE, stop).
+stop() -> gen_server:cast(?SERVER, stop).
 
 %% @private
-cast(RemoteNode, Msg) -> gen_server:cast({riak_connect, RemoteNode}, Msg).
+handle_cast({send_ring_to, Node}, RingChanged) ->
+    riak_eventer:notify(riak_connect, send_ring_to, Node),
+    {ok, MyRing} = riak_ring_manager:get_my_ring(),    
+    gen_server:cast({?SERVER, Node}, {reconcile_ring, MyRing}),
+    {noreply, RingChanged};
+    
+handle_cast({reconcile_ring, OtherRing}, RingChanged) ->
+    % Compare the two rings, see if there is anything that
+    % must be done to make them equal...
+    {ok, MyRing} = riak_ring_manager:get_my_ring(),
+    case riak_ring:reconcile(OtherRing, MyRing) of
+        {no_change, _} -> 
+            {noreply, RingChanged};
+            
+        {new_ring, ReconciledRing} ->
+            BalancedRing = claim_until_balanced(ReconciledRing),
+            riak_ring_manager:set_my_ring(BalancedRing),
+            RandomNode = riak_ring:random_node(BalancedRing),
+            send_ring(self(), RandomNode),
+            riak_eventer:notify(riak_connect, changed_ring, gossip_changed),
+            {noreply, true}
+    end;
+    
+handle_cast(gossip_ring, RingChanged) ->
+    % First, schedule the next round of gossip...
+    schedule_next_gossip(),
+    riak_eventer:notify(riak_connect, interval, interval),
+    
+    % Make sure all vnodes are started...
+    {ok, MyRing} = riak_ring_manager:get_my_ring(),
+    ensure_vnodes_started(MyRing),
+    
+    % If the ring has changed since our last write,
+    % then rewrite the ring...
+    case RingChanged of
+        true ->
+            riak_ring_manager:prune_ringfiles(),
+            riak_ring_manager:write_ringfile();
+        false -> 
+            ignore
+    end,
+      
+    % Finally, gossip the ring to some random other node...
+    RandomNode = riak_ring:index_owner(MyRing, riak_ring:random_other_index(MyRing)),
+    send_ring(node(), RandomNode),
+    {noreply, false};                         
 
-%% @private
-handle_cast({gossip_ring, Ring}, State) ->
-    riak_ring_gossiper ! {gossip_ring, Ring},
-    {noreply, State};
-handle_cast({set_ring, Ring}, State) ->
-    riak_ring_gossiper ! {set_ring, Ring},
-    {noreply, State};
-handle_cast({get_ring, RemoteNode}, State) ->
-    riak_ring_gossiper ! {get_ring, RemoteNode},
-    {noreply, State};
-handle_cast(stop, State) -> {stop, normal, State}.
+handle_cast(_, State) -> 
+    {noreply, State}.
 
 %% @private
 handle_info(_Info, State) -> {noreply, State}.
 
 %% @private
+handle_call(_, _From, State) -> {reply, ok, State}.
+
+%% @private
 terminate(_Reason, _State) -> ok.
 
 %% @private
 code_change(_OldVsn, State, _Extra) ->  {ok, State}.
 
-%% @private
-handle_call(_, _From, State) -> {noreply, State}.
+claim_until_balanced(Ring) ->
+    {WMod, WFun} = riak:get_app_env(wants_claim_fun),
+    NeedsIndexes = apply(WMod, WFun, [Ring]),
+    case NeedsIndexes of
+        no -> 
+            Ring;
+        {yes, NumToClaim} ->
+            riak_eventer:notify(riak_ring, want_claim, NumToClaim),
+            claim_indexes(Ring, NumToClaim)
+    end.
 
-connect_test() ->
-    {ok, _Pid}  = riak_connect:start_link(),
-    F = fun(Pid) ->
-                register(riak_ring_gossiper, self()),
-                Loop = fun([], _) -> 
-                               Pid ! ok;
-                          (WaitingFor, Loop) ->
-                               receive
-                                   {gossip_ring, _} ->
-                                       Loop(WaitingFor -- [gossip_ring], Loop);
-                                   {set_ring, _} ->
-                                       Loop(WaitingFor -- [set_ring], Loop);
-                                   {get_ring, _} ->
-                                       Loop(WaitingFor -- [get_ring], Loop)
-                               end
-                       end,
-                Loop([gossip_ring, set_ring, get_ring], Loop)
-        end,
-    Self = self(),
-    spawn(fun() -> F(Self) end),
-    gen_server:cast(?MODULE, {gossip_ring, test}),
-    gen_server:cast(?MODULE, {set_ring, test}),
-    gen_server:cast(?MODULE, {get_ring, test}),
-    R = receive
-            ok -> ok
-        end,
-    riak_connect:stop(),
-    ?assertEqual(R, ok).
+claim_indexes(Ring, 0) -> Ring;
+claim_indexes(Ring, NumToClaim) ->
+    {CMod, CFun} = riak:get_app_env(choose_claim_fun),
+    NewRing = CMod:CFun(Ring),
+    claim_indexes(NewRing, NumToClaim - 1).
+
+ensure_vnodes_started(Ring) ->
+    VNodes2Start = case length(riak_ring:all_members(Ring)) of
+       1 -> riak_ring:my_indices(Ring);
+       _ -> [riak_ring:random_other_index(Ring)| riak_ring:my_indices(Ring)]
+    end,
+    [begin
+        gen_server:cast({riak_vnode_master, node()}, {start_vnode, I}) 
+    end|| I <- VNodes2Start].
+
+remove_from_cluster(ExitingNode) ->
+    % Set the remote node to stop claiming.
+    % Ignore return of rpc as this should succeed even if node is offline
+    rpc:call(ExitingNode, application, set_env, [riak, wants_claim_fun, {riak_claim, never_wants_claim}]),
+    
+    % Get a list of indices owned by the ExitingNode...
+    {ok, Ring} = riak_ring_manager:get_my_ring(),
+    AllOwners = riak_ring:all_owners(Ring),
+    AllIndices = [I || {I,_Owner} <- AllOwners],
+    Indices = [I || {I,Owner} <- AllOwners, Owner =:= ExitingNode],
+    riak_eventer:notify(riak_connect, remove_from_cluster,
+                        {ExitingNode, length(Indices)}),
+                        
+    
+    % Transfer indexes to other nodes...
+    Others = lists:delete(ExitingNode, riak_ring:all_members(Ring)),
+    F = fun(Index, Acc) ->
+        RandomNode = lists:nth(crypto:rand_uniform(1,length(Others)+1),Others),
+        riak_ring:transfer_node(Index, RandomNode ,Acc)    
+    end,
+    ExitRing = lists:foldl(F, Ring, Indices),
+    riak_ring_manager:set_my_ring(ExitRing),
+    
+    % Send the ring to all other rings...
+    [send_ring(X) || X <- riak_ring:all_members(Ring)],
+    
+    %% Is this line right?
+    [gen_server:cast({riak_vnode_master, ExitingNode}, {start_vnode, P}) ||
+        P <- AllIndices].    

src/riak_event_logger.erl

 %% specific language governing permissions and limitations
 %% under the License.    
 
+%% @doc 
+%% riak_event_logger is an example of how to connect to a 
+%% running Riak cluster to receive events.
+
 -module(riak_event_logger).
--behavior(gen_event).
+-behavior(gen_server).
 
--export([init/1, handle_event/2, handle_call/2, handle_info/2, terminate/2]).
--export([code_change/3]).
+-define (RECONNECT_INTERVAL, 200).
+-define (SERVER, ?MODULE).
+-record (state, {pid, hostname, port, cookie, verbosity, fd}).
+-export ([start/4, start_link/4]).
+-export ([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
 
 %% @private
-init({Arg,_}) ->
-    case Arg of
-        "stdout" -> {ok, "stdout"};
+start(Hostname, Port, Cookie, Filename) ->
+    gen_server:start({local, ?SERVER}, ?MODULE, [Hostname, Port, Cookie, Filename], []).
+
+start_link(Hostname, Port, Cookie, Filename) -> 
+    gen_server:start_link({local, ?SERVER}, ?MODULE, [Hostname, Port, Cookie, Filename], []).
+
+%% @private
+init([Hostname, Port, Cookie, Filename]) -> 
+    % If this gen_server dies and is supervised, then it will
+    % be restarted under a new pid. If this happens, then we will
+    % lose our connection to the Riak cluster. So, send a keepalive
+    % every few seconds to reconnect.
+    timer:apply_interval(?RECONNECT_INTERVAL, gen_server, call, [?SERVER, connect]),
+    
+    % Open the file, get the file descriptor....
+    {ok, FD} = case Filename of
+        "stdout" -> 
+            {ok, stdout};
         _ ->
             {ok, CWD} = file:get_cwd(),
-            LogFN = filename:join([CWD,Arg]),
+            LogFN = filename:join([CWD,Filename]),
             ok = filelib:ensure_dir(LogFN),
             io:format("Writing event log to ~p~n",[LogFN]),
             file:open(LogFN, [raw, append, delayed_write])
-    end.
+    end,
+    
+    State = #state {
+        hostname = Hostname,
+        port = Port,
+        cookie = Cookie,
+        fd = FD
+    },
+    {ok, State}.
+    
+%% @private
+%% Check if we need to reconnect to the Riak cluster.
+handle_call(connect, _From, State) ->
+    PidHasChanged = State#state.pid /= self(),
+    case PidHasChanged of
+        true ->  register_for_events(State);
+        false -> ignore
+    end,
+    {reply, ok, State#state { pid=self() }};
+    
+handle_call(_, _, State) -> {ok, State}.
+
+handle_cast(_, State) -> {noreply, State}.
 
 %% @private
-handle_event(Event, "stdout") ->
-    io:format("~s",[fmtnow()]),
-    io:format(": ~p~n",[Event]),
-    {ok, "stdout"};
-handle_event(Event, FD) ->
-    file:write(FD, [fmtnow()]),
-    file:write(FD, io_lib:format(": ~p~n",[Event])),
-    {ok, FD}.
-
-%% @private
-handle_call(_, State) -> {ok, no_call_support, State}.
-
-%% @private
-handle_info(_, State) -> {ok, State}.
+%% Got an incoming event. Write it to a file or to the console.
+handle_info({event, Event}, State) ->
+    case State#state.fd of
+        stdout ->
+            io:format("~s",[fmtnow()]),
+            io:format(": ~p~n",[Event]);
+        FD ->
+            file:write(FD, [fmtnow()]),
+            file:write(FD, io_lib:format(": ~p~n",[Event]))
+    end,
+    {noreply, State};
+    
+handle_info(_, State) -> {noreply, State}.
 
 %% @private
 terminate(swap, State)  -> {?MODULE, State};
 %% @private
 code_change(_OldVsn, State, _Extra) -> {ok, State}.
 
+
+register_for_events(State) ->
+    io:format("1~n"),
+    {ok, C} = riak:client_connect(State#state.hostname, State#state.port, State#state.cookie),
+    io:format("2~n"),
+    Desc = io_lib:format("~s (~s)", [?SERVER, node()]),
+    io:format("3~n"),
+    C:add_event_handler(self(), Desc),
+    io:format("4~n").
+
+
+%%% DATE FUNCTIONS %%%
+
 month(1) ->  "Jan";
 month(2) ->  "Feb";
 month(3) ->  "Mar";

src/riak_eventer.erl

     
     % Gossip the new ring...
     RandomNode = riak_ring:index_owner(Ring1,riak_ring:random_other_index(Ring1)),
-    riak_ring_gossiper:gossip_to(RandomNode),
+    riak_connect:send_ring(RandomNode),
     {reply, ok, State};
     
 handle_call({remove_handler, HandlerID},_From,State) -> 
     
     % Gossip the new ring...
     RandomNode = riak_ring:index_owner(Ring1,riak_ring:random_other_index(Ring1)),
-    riak_ring_gossiper:gossip_to(RandomNode),
+    riak_connect:send_ring(RandomNode),
     {reply, ok, State};
     
     
     
             % Gossip the new ring...
             RandomNode = riak_ring:index_owner(Ring1,riak_ring:random_other_index(Ring1)),
-            riak_ring_gossiper:gossip_to(RandomNode);
+            riak_connect:send_ring(RandomNode);
         false -> ignore
     end,
     {noreply, State};

src/riak_ring_gossiper.erl

-%% This file is provided to you under the Apache License,
-%% Version 2.0 (the "License"); you may not use this file
-%% except in compliance with the License.  You may obtain
-%% a copy of the License at
-
-%%   http://www.apache.org/licenses/LICENSE-2.0
-
-%% Unless required by applicable law or agreed to in writing,
-%% software distributed under the License is distributed on an
-%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-%% KIND, either express or implied.  See the License for the
-%% specific language governing permissions and limitations
-%% under the License.    
-
--module(riak_ring_gossiper).
-
--export([start_link/0, init/0]).
--export([gossip_to/1, get_ring_from/1, remove_from_cluster/1]).
-
-start_link() -> {ok, spawn_link(node(), ?MODULE, init, [])}.
-
-%% @private
-init() ->
-    register(riak_ring_gossiper, self()),
-    loop(write).
-
-loop(Write) ->
-    MaxInterval = case Write of
-        write -> 600;
-        no_write -> riak:get_app_env(gossip_interval)
-    end,
-    Interval = random:uniform(MaxInterval),
-    receive
-        {gossip_ring, ExternRing} ->
-            {ok, MyRing} = riak_ring_manager:get_my_ring(),
-            case riak_ring:reconcile(ExternRing, MyRing) of
-                {no_change, _} ->
-                    loop(Write);
-                {new_ring, NewRing} ->
-                    {ok, MyNewRing} = maybe_claim(NewRing),
-                    riak_ring_manager:set_my_ring(MyNewRing),
-                    Me = node(),
-                    case riak_ring:random_node(MyNewRing) of
-                        Me -> nop;
-                        RandNode -> gossip_to(RandNode)
-                    end,
-                    riak_eventer:notify(riak_ring_gossiper, changed_ring, 
-                                        gossip_changed),
-                    loop(write)
-            end;
-        {get_ring, RemoteNode} ->
-            gossip_to(RemoteNode),
-            loop(Write)
-    after Interval ->
-            riak_eventer:notify(riak_ring_gossiper, interval, interval),
-            {ok, MyRing} = riak_ring_manager:get_my_ring(),
-            VNodes2Start = case length(riak_ring:all_members(MyRing)) of
-               1 -> riak_ring:my_indices(MyRing);
-               _ -> [riak_ring:random_other_index(MyRing)|
-                     riak_ring:my_indices(MyRing)]
-            end,
-            [gen_server:cast({riak_vnode_master, node()},
-                   {start_vnode, I}) || I <- VNodes2Start],                             
-            case Write of
-                no_write -> nop;
-                write ->
-                    riak_ring_manager:prune_ringfiles(),
-                    riak_ring_manager:write_ringfile()
-            end,
-            riak_ring_gossiper:gossip_to(
-              riak_ring:index_owner(MyRing,
-                                    riak_ring:random_other_index(MyRing))),
-            loop(no_write)                         
-    end.
-
-gossip_to(RemoteNode) ->
-    case lists:member(riak_ring_gossiper, registered()) of
-        false -> nop; % only gossip if we can also receive
-        true ->
-            riak_eventer:notify(riak_ring_gossiper, send, RemoteNode),
-            {ok, MyRing} = riak_ring_manager:get_my_ring(),
-            riak_connect:cast(RemoteNode, {gossip_ring, MyRing})
-    end.
-
-gossip_ring_to(RemoteNode,Ring) ->
-    riak_eventer:notify(riak_ring_gossiper, send, RemoteNode),
-    riak_connect:cast(RemoteNode, {gossip_ring, Ring}).
-
-get_ring_from(RemoteNode) ->
-    riak_eventer:notify(riak_ring_gossiper, get_remote_ring, RemoteNode),
-    riak_connect:cast(RemoteNode, {get_ring, node()}).
-
-maybe_claim(Ring) ->
-    {WMod, WFun} = riak:get_app_env(wants_claim_fun),
-    case apply(WMod, WFun, [Ring]) of
-        no -> {ok, Ring};
-        {yes, Wanted} ->
-            riak_eventer:notify(riak_ring_gossiper, want_claim, Wanted),
-            do_claim(Ring, Wanted)
-    end.
-
-do_claim(Ring,Wanted) ->
-    case Wanted of
-        0 -> {ok, Ring};
-        _ ->
-            {CMod, CFun} = riak:get_app_env(choose_claim_fun),
-            do_claim(apply(CMod, CFun, [Ring]), Wanted-1)
-    end.
-
-remove_from_cluster(ExitingNode) ->
-    rpc:call(ExitingNode, application, set_env, [riak, wants_claim_fun,
-                                        {riak_claim, never_wants_claim}]),
-    % ignore return of rpc as this should succeed even if node is offline
-    {ok, Ring} = riak_ring_manager:get_my_ring(),
-    AllOwners = riak_ring:all_owners(Ring),
-    AllIndices = [I || {I,_Owner} <- AllOwners],
-    Indices = [I || {I,Owner} <- AllOwners, 
-                       Owner =:= ExitingNode],
-    riak_eventer:notify(riak_ring_gossiper, remove_from_cluster,
-                        {ExitingNode, length(Indices)}),
-    Others = lists:delete(ExitingNode, riak_ring:all_members(Ring)),
-    ExitRing = lists:foldl(
-      fun(I,R) ->
-          riak_ring:transfer_node(I,
-            lists:nth(crypto:rand_uniform(1,length(Others)+1),Others),R) end, 
-      Ring, Indices),
-    riak_ring_manager:set_my_ring(ExitRing),
-    [gossip_ring_to(X,ExitRing) || X <- riak_ring:all_members(Ring)],
-    [gen_server:cast({riak_vnode_master, ExitingNode}, {start_vnode, P}) ||
-        P <- AllIndices].
-        
-
-

src/riak_startup.erl

 
 % functions called by the shell scripts used to start riak
 
+% How do we know when a node is successfully running in a cluster.
+%
+%
+% start(ConfigFile) ->
+%     % try_join()
+%     % Read the config file.
+%     % Read riak_clustername, riak_seednodes, riak_cookie, ring_location
+%     % Set riak_cookie on local node.
+%     % Try to get the ring from each seed node.
+%     % If it worked then we're done. Read cluster_name and gossip_interval from other node.
+%     
+%     % try_restart()
+%     % Try to read the local ring file.
+%      
+%     % try_fresh()
+%     % Read ring_creation_size and gossip_interval
+%     % Create a new ring
+% 
+% try_join(_, []) -> could_not_connect;
+% try_join(ClusterName, [SeedNode|SeedNodes]) ->
+%     
+%     riak_ring_gossiper:get_ring_from(hd(nodes()));
+    
+    
+    
+
 join_cluster([IP, PortStr]) ->
     case check_deps() of
         ok ->
             Port = list_to_integer(PortStr),
             riak_doorbell:ring(IP, Port),
             timer:sleep(1000),
-            riak_ring_gossiper:get_ring_from(hd(nodes()));
+            riak_connect:send_ring(hd(nodes()), node());
         X ->
             X
     end.
     RingMgr = {riak_ring_manager,
              {riak_ring_manager, start_link, []},
              permanent, 5000, worker, [riak_ring_manager]},
-    RingGossip = {riak_ring_gossiper,
-             {riak_ring_gossiper, start_link, []},
-             permanent, 5000, worker, [riak_ring_gossiper]},
     Connect = {riak_connect,
              {riak_connect, start_link, []},
              permanent, 5000, worker, [riak_connect]},
     Processes0 = 
     case riak:get_app_env(riak_web_ip) of
         "undefined" ->
-            [RingMgr,RingGossip,Connect,LocalLogger];
+            [RingMgr,Connect,LocalLogger];
         undefined ->
-            [RingMgr,RingGossip,Connect,LocalLogger];
+            [RingMgr,Connect,LocalLogger];
         _ ->
-            [RingMgr,RingGossip,Connect,LocalLogger,
+            [RingMgr,Connect,LocalLogger,
              RiakWeb]
     end,
     Processes1 = 
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.