riak / src / riak_connect.erl

Full commit
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License.  You may obtain
%% a copy of the License at


%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% KIND, either express or implied.  See the License for the
%% specific language governing permissions and limitations
%% under the License.    

%% @doc
%% riak_connect takes care of the mechanics of shuttling a
%% from one node to another upon request by other
%% Riak processes.
%% Additionally, it occasionally checks to make sure the current
%% node has its fair share of partitions, and also sends
%% a copy of the ring to some other random node, ensuring
%% that all nodes eventually synchronize on the same understanding
%% of the Riak cluster. This interval is configurable, but defaults
%% to once per minute.


-export([start_link/0, stop/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
-export ([send_ring/1, send_ring/2, remove_from_cluster/1]).

-define (SERVER, ?MODULE).

%% send_ring/1 -
%% Send the current node's ring to some other node.
send_ring(ToNode) -> send_ring(node(), ToNode).

%% send_ring/2 -
%% Send the ring from one node to another node. 
%% Does nothing if the two nodes are the same.
send_ring(Node, Node) -> ok;
send_ring(FromNode, ToNode) ->
    gen_server:cast({?SERVER, FromNode}, {send_ring_to, ToNode}).

%% @private
start_link() -> 
    gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
%% @private
init(_State) -> 
    {ok, false}.
schedule_next_gossip() ->
    MaxInterval = riak:get_app_env(gossip_interval),
    Interval = random:uniform(MaxInterval),
    timer:apply_after(Interval, gen_server, cast, [?SERVER, gossip_ring]). 

stop() -> gen_server:cast(?SERVER, stop).

%% @private
handle_cast({send_ring_to, Node}, RingChanged) ->
    riak_eventer:notify(riak_connect, send_ring_to, Node),
    {ok, MyRing} = riak_ring_manager:get_my_ring(),    
    gen_server:cast({?SERVER, Node}, {reconcile_ring, MyRing}),
    {noreply, RingChanged};
handle_cast({reconcile_ring, OtherRing}, RingChanged) ->
    % Compare the two rings, see if there is anything that
    % must be done to make them equal...
    {ok, MyRing} = riak_ring_manager:get_my_ring(),
    case riak_ring:reconcile(OtherRing, MyRing) of
        {no_change, _} -> 
            {noreply, RingChanged};
        {new_ring, ReconciledRing} ->
            BalancedRing = claim_until_balanced(ReconciledRing),
            RandomNode = riak_ring:random_node(BalancedRing),
            send_ring(node(), RandomNode),
            riak_eventer:notify(riak_connect, changed_ring, gossip_changed),
            {noreply, true}
handle_cast(gossip_ring, RingChanged) ->
    % First, schedule the next round of gossip...
    riak_eventer:notify(riak_connect, interval, interval),
    % Make sure all vnodes are started...
    {ok, MyRing} = riak_ring_manager:get_my_ring(),
    % If the ring has changed since our last write,
    % then rewrite the ring...
    case RingChanged of
        true ->
        false -> 
    % Finally, gossip the ring to some random other node...
    RandomNode = riak_ring:index_owner(MyRing, riak_ring:random_other_index(MyRing)),
    send_ring(node(), RandomNode),
    {noreply, false};                         

handle_cast(_, State) -> 
    {noreply, State}.

%% @private
handle_info(_Info, State) -> {noreply, State}.

%% @private
handle_call(_, _From, State) -> {reply, ok, State}.

%% @private
terminate(_Reason, _State) -> ok.

%% @private
code_change(_OldVsn, State, _Extra) ->  {ok, State}.

claim_until_balanced(Ring) ->
    {WMod, WFun} = riak:get_app_env(wants_claim_fun),
    NeedsIndexes = apply(WMod, WFun, [Ring]),
    case NeedsIndexes of
        no -> 
        {yes, NumToClaim} ->
            riak_eventer:notify(riak_ring, want_claim, NumToClaim),
            claim_indexes(Ring, NumToClaim)

claim_indexes(Ring, 0) -> Ring;
claim_indexes(Ring, NumToClaim) ->
    {CMod, CFun} = riak:get_app_env(choose_claim_fun),
    NewRing = CMod:CFun(Ring),
    claim_indexes(NewRing, NumToClaim - 1).

ensure_vnodes_started(Ring) ->
    VNodes2Start = case length(riak_ring:all_members(Ring)) of
       1 -> riak_ring:my_indices(Ring);
       _ -> [riak_ring:random_other_index(Ring)| riak_ring:my_indices(Ring)]
        gen_server:cast({riak_vnode_master, node()}, {start_vnode, I}) 
    end|| I <- VNodes2Start].

remove_from_cluster(ExitingNode) ->
    % Set the remote node to stop claiming.
    % Ignore return of rpc as this should succeed even if node is offline
    rpc:call(ExitingNode, application, set_env, [riak, wants_claim_fun, {riak_claim, never_wants_claim}]),
    % Get a list of indices owned by the ExitingNode...
    {ok, Ring} = riak_ring_manager:get_my_ring(),
    AllOwners = riak_ring:all_owners(Ring),
    AllIndices = [I || {I,_Owner} <- AllOwners],
    Indices = [I || {I,Owner} <- AllOwners, Owner =:= ExitingNode],
    riak_eventer:notify(riak_connect, remove_from_cluster,
                        {ExitingNode, length(Indices)}),
    % Transfer indexes to other nodes...
    Others = lists:delete(ExitingNode, riak_ring:all_members(Ring)),
    F = fun(Index, Acc) ->
        RandomNode = lists:nth(crypto:rand_uniform(1,length(Others)+1),Others),
        riak_ring:transfer_node(Index, RandomNode ,Acc)    
    ExitRing = lists:foldl(F, Ring, Indices),
    % Send the ring to all other rings...
    [send_ring(X) || X <- riak_ring:all_members(Ring)],
    %% Is this line right?
    [gen_server:cast({riak_vnode_master, ExitingNode}, {start_vnode, P}) ||
        P <- AllIndices].