Anonymous avatar Anonymous committed 9d20ab4

imported patch removal of riak_api intermediary

Comments (0)

Files changed (13)

 start_link() -> gen_server2:start_link({local, ?MODULE}, ?MODULE, [], []).
 
 %% @private
-handle_call({reload_all, Mod}, _From, State) ->
-    {reply, riak_util:reload_all(Mod), State};
-handle_call({remove_from_cluster,ExitingNode}, _From, State) ->
-    {reply, riak_ring_gossiper:remove_from_cluster(ExitingNode), State};
-handle_call({mapred,Inputs,Query,Timeout}, From, State) ->
-    NewState = ensure_ring(State),
-    riak_mapreduce_fsm:start(NewState#state.ring,Inputs,Query,Timeout,From),
-    {noreply, NewState};
-handle_call({put,RObj,W,DW,Timeout}, From, State) ->
-    NewState = ensure_ring(State),
-    riak_put_fsm:start(NewState#state.ring, RObj,W,DW,Timeout,From),
-    {noreply, NewState};
-handle_call({get,Bucket,Key,R,Timeout}, From, State) ->
-    NewState = ensure_ring(State),
-    riak_get_fsm:start(NewState#state.ring, Bucket,Key,R,Timeout,From),
-    {noreply, NewState};
-handle_call({delete,Bucket,Key,RW,Timeout}, From, State) ->
-    spawn(fun() -> riak_delete:delete(Bucket,Key,RW,Timeout,From) end),
-    {noreply, State};
-handle_call({set_bucket,BucketName,BucketProps}, From, State) ->
-    spawn(fun() ->
-          gen_server2:reply(From,
-                           riak_bucket:set_bucket(BucketName, BucketProps))
-          end),
-    {noreply, State};
-handle_call({get_bucket,BucketName}, From, State) ->
-    spawn(fun() ->
-          gen_server2:reply(From,
-                           riak_bucket:get_bucket(BucketName))
-          end),
-    {noreply, State};
-handle_call({list_keys,Bucket,Timeout}, From, State) ->
-    NewState = ensure_ring(State),
-    riak_keys_fsm:start(NewState#state.ring, Bucket, Timeout, From),
-    {noreply, State}.
+handle_call(_,_From,State) -> {noreply,State}.
 
 %% @private
 init([]) -> {ok, #state{ring=undefined}}.
 
 %% @private
-handle_cast({send_event, ClientId, EventName, EventDetail}, State) ->
-    riak_eventer:notify(client_event, EventName, {ClientId, EventDetail}),
-    {noreply, State};
 handle_cast(_Msg, State) -> {noreply,State}.
 
 %% @private
 %% @private
 code_change(_OldVsn, State, _Extra) ->  {ok, State}.
 
-%% @private
-ensure_ring(State=#state{ring=undefined}) ->
-    riak_ring_manager:subscribe(self()),
-    {ok, Ring} = riak_ring_manager:get_my_ring(),
-    State#state{ring=Ring};
-ensure_ring(State) -> State.
-    

src/riak_client.erl

 %%      See the map/reduce documentation for explanation of behavior.
 mapred(Inputs,Query,Timeout)
   when is_list(Inputs), is_list(Query), is_integer(Timeout) ->
-    gen_server2:call({riak_api,Node}, {mapred,Inputs,Query,Timeout}, Timeout).
-
+    Me = self(),
+    spawn(Node, riak_mapreduce_fsm, start, [Inputs,Query,Timeout,Me]),
+    receive
+        {error, Err} -> {error, Err};
+        {ok, Res} -> {ok, Res}
+    after Timeout ->
+            {error, timeout}
+    end.
 
 %% @spec get(riak_object:bucket(), riak_object:key(), R :: integer()) ->
 %%       {ok, riak_object:riak_object()} |
 %%       {error, Err :: term()}
 %% @doc Fetch the object at Bucket/Key.  Return a value as soon as R
 %%      nodes have responded with a value or error, or TimeoutMillisecs passes.
-get(Bucket, Key, R, Timeout) when is_atom(Bucket),
-                                  (is_list(Key) orelse is_binary(Key)),
+get(Bucket, Key, R, Timeout) when is_atom(Bucket), is_binary(Key),
                                   is_integer(R), is_integer(Timeout) ->
-    gen_server2:call({riak_api,Node}, {get,Bucket,Key,R,Timeout}, Timeout).
-
+    Me = self(),
+    spawn(Node, riak_get_fsm, start, [Bucket,Key,R,Timeout,Me]),
+    receive
+        {error, Err} -> {error, Err};
+        {ok, RObj} -> {ok, RObj}
+    after Timeout ->
+            {error, timeout}
+    end.
 
 %% @spec put(RObj :: riak_object:riak_object(), W :: integer()) ->
 %%        ok |
 %%      TimeoutMillisecs passes.
 put(RObj, W, DW, Timeout) ->
     R0 = riak_object:increment_vclock(RObj, ClientId),
-    gen_server2:call({riak_api,Node}, {put,R0,W,DW,Timeout}, Timeout).
-
+    Me = self(),
+    spawn(Node, riak_put_fsm, start, [R0,W,DW,Timeout,Me]),
+    receive
+        ok -> ok
+    after Timeout ->
+            {error, timeout}
+    end.
 
 %% @spec delete(riak_object:bucket(), riak_object:key(), RW :: integer()) ->
 %%        ok |
 %% @doc Delete the object at Bucket/Key.  Return a value as soon as RW
 %%      nodes have responded with a value or error, or TimeoutMillisecs passes.
 delete(Bucket,Key,RW,Timeout) ->
-    gen_server2:call({riak_api,Node}, {delete,Bucket,Key,RW,Timeout}, Timeout).
-
+    Me = self(),
+    spawn(Node, riak_delete, delete, [Bucket,Key,RW,Timeout,Me]),
+    receive
+        ok -> ok
+    after Timeout ->
+            {error, timeout}
+    end.
 
 %% @spec list_keys(riak_object:bucket()) ->
 %%       {ok, [Key :: riak_object:key()]} |
 %%      Key lists are updated asynchronously, so this may be slightly
 %%      out of date if called immediately after a put or delete.
 list_keys(Bucket, Timeout) -> 
-    gen_server2:call({riak_api,Node}, {list_keys,Bucket,Timeout}, Timeout).
+    Me = self(),
+    spawn(Node, riak_keys_fsm, start, [Bucket,Timeout,Me]),
+    receive
+        {ok, Reply} -> {ok, Reply}
+    after Timeout ->
+            {error, timeout}
+    end.
 
 %% @spec set_bucket(riak_object:bucket(), [BucketProp :: {atom(),term()}]) -> ok
 %% @doc Set the given properties for Bucket.
 %%      to ensure expected per-bucket behavior.
 %% See riak_bucket for expected useful properties.
 set_bucket(BucketName,BucketProps) ->
-    gen_server2:call({riak_api,Node}, {set_bucket,BucketName,BucketProps}).
+    rpc:call(Node,riak_bucket,set_bucket,[BucketName,BucketProps]).
 %% @spec get_bucket(riak_object:bucket()) -> [BucketProp :: {atom(),term()}]
 %% @doc Get all properties for Bucket.
 %% See riak_bucket for expected useful properties.
 get_bucket(BucketName) ->
-    gen_server2:call({riak_api,Node}, {get_bucket,BucketName}).
-
+    rpc:call(Node,riak_bucket,get_bucket,[BucketName]).
 %% @spec reload_all(Module :: atom()) -> term()
 %% @doc Force all Riak nodes to reload Module.
 %%      This is used when loading new modules for map/reduce functionality.
-reload_all(Module) -> gen_server2:call({riak_api,Node}, {reload_all, Module}).
+reload_all(Module) -> rpc:call(Node,riak_util,reload_all,[Module]).
 
 %% @spec remove_from_cluster(ExitingNode :: atom()) -> term()
 %% @doc Cause all partitions owned by ExitingNode to be taken over
 %%      by other nodes.
 remove_from_cluster(ExitingNode) ->
-    gen_server2:call({riak_api,Node}, {remove_from_cluster,ExitingNode}).
+    rpc:call(Node,riak_ring_gossiper,remove_from_cluster,[ExitingNode]).
 
 %% @spec send_event(EventName::atom(), EventDetail::term()) -> ok
 %% @doc  Send a client-generated event to the Riak eventer.
 send_event(EventName, EventDetail) ->
-    gen_server2:cast({riak_api,Node},
-                     {send_event,ClientId,EventName,EventDetail}).
+    rpc:call(Node,riak_eventer,notify,
+             [client_event, EventName, {ClientId, EventDetail}]).

src/riak_delete.erl

 %% specific language governing permissions and limitations
 %% under the License.    
 
-%% @doc Interface used by riak_api for object deletion.
+%% @doc Interface for object deletion.
 
 -module(riak_delete).
 
     RealStartTime = riak_util:moment(),
     ReqID = erlang:phash2({random:uniform(),self(),Bucket,Key,RealStartTime}),
     riak_eventer:notify(riak_delete, delete_start, {ReqID, Bucket, Key}),
-    case gen_server2:call({riak_api, node()},
-                          {get,Bucket,Key,RW,Timeout}) of
+    {ok,C} = riak:local_client(),
+    case C:get(Bucket,Key,RW,Timeout) of
         {ok, OrigObj} ->
             RemainingTime = Timeout - (riak_util:moment() - RealStartTime),
             OrigMD = hd([MD || {MD,_V} <- riak_object:get_contents(OrigObj)]),
             NewObj = riak_object:update_metadata(OrigObj,
                             dict:store(<<"X-Riak-Deleted">>, "true", OrigMD)),
-            {ok, C} = riak:local_client(),
             Reply = C:put(NewObj, RW, RW, RemainingTime),
             case Reply of
                 ok -> 
                 _ -> nop
             end,
             riak_eventer:notify(riak_delete, delete_reply, {ReqID, Reply}),
-            gen_server2:reply(Client, Reply);
+            Client ! Reply;
         {error, notfound} ->
             riak_eventer:notify(riak_delete, delete_reply,
                                 {ReqID, {error, notfound}}),
-            gen_server2:reply(Client, {error, notfound});
+            Client ! {error, notfound};
         X ->
             riak_eventer:notify(riak_delete, delete_reply, {ReqID, X}),
-            gen_server2:reply(Client, X)
+            Client ! X
     end.
 
 reap(Bucket, Key, WaitTime, Timeout, ReqId) ->
     timer:sleep(WaitTime),
-    case gen_server2:call({riak_api, node()}, 
-                          {get, Bucket, Key, 1, Timeout}) of
+    {ok,C} = riak:local_client(),
+    case C:get(Bucket,Key,1,Timeout) of
         {error, notfound} ->
             riak_eventer:notify(riak_delete, finalize_reap, 
                                 {ReqId, Bucket, Key, ok});

src/riak_event_guard.erl

                  [X || X <- Eventers0, net_adm:ping(X) =:= pong])),
     NewRing = riak_ring:update_meta(eventers, Eventers, MyRing),
     riak_ring_manager:set_my_ring(NewRing),
+    io:format("RINGY ~p~n",[NewRing]),
     riak_ring_manager:write_ringfile(),
     Reply = gen_event:swap_sup_handler(riak_event,
                                        {{HandlerMod,{node(),now()}}, swap},
                                        {{HandlerMod,{node(),now()}}, Arg}),
-    riak_ring_gossiper:gossip_to(riak_ring:random_node(NewRing)),
+    riak_ring_gossiper:gossip_to(
+      riak_ring:index_owner(NewRing,riak_ring:random_other_index(NewRing))),
     {reply, Reply, State}.
 
 %% @private

src/riak_eventer.erl

 
 -export([notify/1,notify/3,eventer_config/1,do_eventer/1]).
 
--record(state, {ring, eventers}).
-
 %% @private
 start_link() -> gen_server2:start_link({local, ?MODULE}, ?MODULE, [], []).
 
 %% @private
-init([]) -> {ok, #state{ring=undefined, eventers=[]}}.
+init([]) -> {ok, stateless_server}.
 
 notify(Event) ->
     gen_server2:cast(riak_local_logger, {event, Event}),
     notify({Module, EventName, node(), EventDetail}).
 
 %% @private
-handle_cast({event, _Event}, State=#state{eventers=[],ring=undefined}) ->
-    NewState = ensure_ring(State),
-    {noreply, NewState};
-handle_cast({event, _Event}, State=#state{eventers=[]}) ->
-    {noreply, State};
-handle_cast({event, Event}, State=#state{}) ->
-    NewState = ensure_ring(State),
-    Eventers = NewState#state.eventers,
+handle_cast({event, Event}, State) ->
+    {ok, Ring} = riak_ring_manager:get_my_ring(),    %%%% TEST EVENTS!
+    Eventers = get_eventers(Ring),
     [gen_event:notify({riak_event,Node},Event) || Node <- Eventers],
-    {noreply, NewState}.
+    {noreply, State}.
     
 eventer_config([Cluster, CookieStr]) ->
     RipConf = [{no_config, true}, {cluster_name, Cluster},
     riak_event_guard:add_handler(list_to_atom(HandlerName), HandlerArg),
     ok.
 
-%% @private
-ensure_ring(State=#state{ring=undefined}) ->
-    riak_ring_manager:subscribe(self()),
-    {ok, Ring} = riak_ring_manager:get_my_ring(),
-    State#state{ring=Ring, eventers=get_eventers(Ring)};
-ensure_ring(State) -> State.
-
-
 get_eventers(Ring) ->
     case riak_ring:get_meta(eventers, Ring) of
         undefined -> [];
         {ok, X} -> sets:to_list(X)
     end.        
 
-%% @private
-handle_info({set_ring, Ring}, State) -> 
-    {noreply, State#state{ring=Ring, eventers=get_eventers(Ring)}};
 handle_info(_Info, State) -> {noreply, State}.
 
 %% @private

src/riak_get_fsm.erl

 -module(riak_get_fsm).
 -behaviour(gen_fsm).
 
--export([start/6]).
+-export([start/5]).
 -export([init/1, handle_event/3, handle_sync_event/4,
          handle_info/3, terminate/3, code_change/4]).
 -export([initialize/2,waiting_vnode_r/2,waiting_read_repair/2]).
                 ring :: riak_ring:riak_ring()
                }).
 
-start(Ring,Bucket,Key,R,Timeout,From) ->
-    gen_fsm:start(?MODULE, [Ring,Bucket,Key,R,Timeout,From], []).
+start(Bucket,Key,R,Timeout,From) ->
+    gen_fsm:start(?MODULE, [Bucket,Key,R,Timeout,From], []).
 
 %% @private
-init([Ring,Bucket,Key,R,Timeout,Client]) ->
+init([Bucket,Key,R,Timeout,Client]) ->
+    {ok, Ring} = riak_ring_manager:get_my_ring(),
     StateData = #state{client=Client,r=R, timeout=Timeout,
                        bkey={Bucket,Key}, ring=Ring},
     {ok,initialize,StateData,0}.
         false ->
             riak_eventer:notify(riak_get_fsm, get_fsm_reply,
                                 {ReqID, notfound}),
-            gen_server2:reply(Client,{error,notfound}),
+            Client ! {error,notfound},
             {stop,normal,NewStateData}
     end;
 waiting_vnode_r({r, {error, Err}, Idx, ReqID},
                     FullErr = [E || {E,_I} <- Replied],
                     riak_eventer:notify(riak_get_fsm, get_fsm_reply,
                                         {ReqID, {error,FullErr}}),
-                    gen_server2:reply(Client,{error,FullErr}),
+                    Client ! {error,FullErr},
                     {stop,normal,NewStateData};
                 _ ->
                     riak_eventer:notify(riak_get_fsm, get_fsm_reply,
                                         {ReqID, notfound}),
-                    gen_server2:reply(Client,{error,notfound}),
+                    Client ! {error,notfound},
                     {stop,normal,NewStateData}
             end
     end;
 waiting_vnode_r(timeout, StateData=#state{client=Client,req_id=ReqID}) ->
     riak_eventer:notify(riak_get_fsm, get_fsm_reply,
                         {ReqID, timeout}),
-    gen_server2:reply(Client,{error,timeout}),
+    Client ! {error,timeout},
     {stop,normal,StateData}.
 
 waiting_read_repair({r, {ok, RObj}, Idx, ReqID},
 
 respond(Client,VResponses,AllowMult) ->
     Reply = merge_robjs([R || {R,_I} <- VResponses],AllowMult),
-    gen_server2:reply(Client,Reply),
+    Client ! Reply,
     Reply.
 
 merge_robjs(RObjs0,AllowMult) ->

src/riak_keys_fsm.erl

 -module(riak_keys_fsm).
 -behaviour(gen_fsm).
 
--export([start/4]).
+-export([start/3]).
 -export([init/1, handle_event/3, handle_sync_event/4,
          handle_info/3, terminate/3, code_change/4]).
 -export([initialize/2,waiting_kl/2]).
                 ring :: riak_ring:riak_ring()
                }).
 
-start(Ring,Bucket,Timeout,From) ->
-    gen_fsm:start(?MODULE, [Ring,Bucket,Timeout,From], []).
+start(Bucket,Timeout,From) ->
+    gen_fsm:start(?MODULE, [Bucket,Timeout,From], []).
 
 %% @private
-init([Ring,Bucket,Timeout,Client]) ->
+init([Bucket,Timeout,Client]) ->
+    {ok, Ring} = riak_ring_manager:get_my_ring(),
     StateData = #state{client=Client, timeout=Timeout,
                        bucket=Bucket, ring=Ring},
     {ok,initialize,StateData,0}.
 %% @private
 respond(Client, KeyLists) ->
     Reply = sets:to_list(sets:union(KeyLists)),
-    gen_server2:reply(Client, {ok, Reply}),
+    Client ! {ok, Reply},
     Reply.
 
 %% @private

src/riak_mapreduce_fsm.erl

 -module(riak_mapreduce_fsm).
 -behaviour(gen_fsm).
 
--export([start/5]).
+-export([start/4]).
 -export([init/1, handle_event/3, handle_sync_event/4,
          handle_info/3, terminate/3, code_change/4]).
 
 
 -record(state, {client, reqid, acc, fsms, starttime, endtime, ring}).
 
-start(Ring,Inputs,Query,Timeout,Client) ->
-    gen_fsm:start(?MODULE, [Ring,Inputs,Query,Timeout,Client], []).
+start(Inputs,Query,Timeout,Client) ->
+    gen_fsm:start(?MODULE, [Inputs,Query,Timeout,Client], []).
 %% @private
-init([Ring,Inputs,Query,Timeout,Client]) ->
+init([Inputs,Query,Timeout,Client]) ->
+    {ok, Ring} = riak_ring_manager:get_my_ring(),
     RealStartTime = riak_util:moment(),
     ReqID = erlang:phash2({random:uniform(), self(), RealStartTime}),
     riak_eventer:notify(riak_mapreduce_fsm, mr_fsm_start,
         {bad_qterm, QTerm} ->
             riak_eventer:notify(riak_mapreduce_fsm, mr_fsm_done,
                                 {error, {bad_qterm, QTerm}}),
-            gen_server2:reply(Client,{error, {bad_qterm, QTerm}}),
+            Client ! {error, {bad_qterm, QTerm}},
             {stop,normal}
     end.
 
         [] -> 
             riak_eventer:notify(riak_mapreduce_fsm, mr_fsm_done,
                                 {ok, ReqID, length(Acc)}),
-            gen_server2:reply(Client,{ok, Acc}),
+            Client ! {ok, Acc},
             {stop,normal,StateData};
         _ ->
             {next_state, wait, StateData#state{fsms=FSMs},
     FSMs = lists:delete(ErrFSM,FSMs0),
     [gen_fsm:send_event(FSM, die) || FSM <- FSMs],
     riak_eventer:notify(riak_mapreduce_fsm, mr_fsm_done, {error, ReqID}),
-    gen_server2:reply(Client,{error, ErrMsg}),
+    Client ! {error, ErrMsg},
     {stop,normal,StateData};
 wait({acc,Data}, StateData=#state{acc=Acc,endtime=End}) ->
     AccData = case Data of
     {next_state, wait, StateData#state{acc=AccData},End-riak_util:moment()};
 wait(timeout, StateData=#state{reqid=ReqID,client=Client}) ->
     riak_eventer:notify(riak_mapreduce_fsm, mr_fsm_done, {timeout, ReqID}),
-    gen_server2:reply(Client,{error, timeout}),
+    Client ! {error, timeout},
     {stop,normal,StateData}.
 
 %% @private

src/riak_put_fsm.erl

 -module(riak_put_fsm).
 -behaviour(gen_fsm).
 
--export([start/6]).
+-export([start/5]).
 -export([init/1, handle_event/3, handle_sync_event/4,
          handle_info/3, terminate/3, code_change/4]).
 -export([initialize/2,waiting_vnode_w/2,waiting_vnode_dw/2]).
                 ring :: riak_ring:riak_ring()
                }).
 
-start(Ring,RObj,W,DW,Timeout,From) ->
-    gen_fsm:start(?MODULE, [Ring, RObj,W,DW,Timeout,From], []).
+start(RObj,W,DW,Timeout,From) ->
+    gen_fsm:start(?MODULE, [RObj,W,DW,Timeout,From], []).
 
 %% @private
-init([Ring,RObj0,W,DW,Timeout,Client]) ->
+init([RObj0,W,DW,Timeout,Client]) ->
+    {ok,Ring} = riak_ring_manager:get_my_ring(),
     StateData = #state{robj=RObj0, client=Client, w=W, dw=DW,
                        timeout=Timeout, ring=Ring},
     {ok,initialize,StateData,0}.
 
 %% @private
-initialize(timeout, StateData0=#state{robj=RObj0, client=Client, w=W, dw=DW, 
-                                                timeout=Timeout, ring=Ring}) ->
+initialize(timeout, StateData0=#state{robj=RObj0,
+                                      timeout=Timeout, ring=Ring}) ->
     RealStartTime = riak_util:moment(),
     Bucket = riak_object:bucket(RObj0),
     BucketProps = riak_bucket:get_bucket(Bucket, Ring),
         true ->
             case DW of
                 0 ->
-                    gen_server2:reply(Client,ok),
+                    Client ! ok,
                     riak_eventer:notify(riak_put_fsm, put_fsm_reply,
                                         {ReqID, ok}),
                     {stop,normal,StateData};
         false ->
             riak_eventer:notify(riak_put_fsm, put_fsm_reply,
                                 {ReqID, {error,too_many_fails,Replied}}),
-            gen_server2:reply(Client,{error,too_many_fails}),
+            Client ! {error,too_many_fails},
             {stop,normal,NewStateData}
     end;
 waiting_vnode_w(timeout, StateData=#state{client=Client,req_id=ReqID}) ->
     riak_eventer:notify(riak_put_fsm, put_fsm_reply,
                         {ReqID, {error,timeout}}),
-    gen_server2:reply(Client,{error,timeout}),
+    Client ! {error,timeout},
     {stop,normal,StateData}.
 
 waiting_vnode_dw({w, _Idx, ReqID},
         true ->
             riak_eventer:notify(riak_put_fsm, put_fsm_reply,
                                 {ReqID, ok}),
-            gen_server2:reply(Client,ok),
+            Client ! ok,
             {stop,normal,StateData};
         false ->
             NewStateData = StateData#state{replied_dw=Replied},
         false ->
             riak_eventer:notify(riak_put_fsm, put_fsm_reply,
                                 {ReqID, {error,too_many_fails,Replied}}),
-            gen_server2:reply(Client,{error,too_many_fails}),
+            Client ! {error,too_many_fails},
             {stop,normal,NewStateData}
     end;
 waiting_vnode_dw(timeout, StateData=#state{client=Client,req_id=ReqID}) ->
     riak_eventer:notify(riak_put_fsm, put_fsm_reply,
                         {ReqID, {error,timeout}}),
-    gen_server2:reply(Client,{error,timeout}),
+    Client ! {error,timeout},
     {stop,normal,StateData}.
 
 %% @private

src/riak_ring.erl

 my_indices(State) ->
     [I || {I,Owner} <- ?MODULE:all_owners(State), Owner =:= node()].
 
-% @doc Return a partition indices not owned by the node executing this function.
+% @doc Return a partition index not owned by the node executing this function.
+%      If this node owns all partitions, return any index.
 % @spec random_other_index(State :: hstate()) -> integer()
 random_other_index(State) ->
     L = [I || {I,Owner} <- ?MODULE:all_owners(State), Owner =/= node()],
-    lists:nth(crypto:rand_uniform(1, length(L)+1), L).
+    case L of
+        [] -> hd(my_indices(State));
+        _ -> lists:nth(crypto:rand_uniform(1, length(L)+1), L)
+    end.
 
 % @doc Return the node that owns the given index.
 % @spec index_owner(State :: hstate(), Idx :: integer()) -> Node :: term()

src/riak_ring_gossiper.erl

                 {no_change, _} ->
                     loop(Write);
                 {new_ring, NewRing} ->
-                    riak_ring_manager:set_my_ring(NewRing),
-                    {ok, MyNewRing} = maybe_claim(),
+                    {ok, MyNewRing} = maybe_claim(NewRing),
                     riak_ring_manager:set_my_ring(MyNewRing),
                     Me = node(),
                     case riak_ring:random_node(MyNewRing) of
                 no_write -> nop;
                 write -> riak_ring_manager:write_ringfile()
             end,
-            Me = node(),
-            case riak_ring:random_node(MyRing) of
-                Me -> nop;
-                RandNode -> gossip_to(RandNode)
-            end,
+            riak_ring_gossiper:gossip_to(
+              riak_ring:index_owner(MyRing,
+                                    riak_ring:random_other_index(MyRing))),
             loop(no_write)                         
     end.
 
     {ok, MyRing} = riak_ring_manager:get_my_ring(),
     riak_connect:cast(RemoteNode, {gossip_ring, MyRing}).
 
+gossip_ring_to(RemoteNode,Ring) ->
+    riak_eventer:notify(riak_ring_gossiper, send, RemoteNode),
+    riak_connect:cast(RemoteNode, {gossip_ring, Ring}).
+
 get_ring_from(RemoteNode) ->
     riak_eventer:notify(riak_ring_gossiper, get_remote_ring, RemoteNode),
     riak_connect:cast(RemoteNode, {get_ring, node()}).
 
-maybe_claim() ->
-    {ok, Ring} = riak_ring_manager:get_my_ring(),
+maybe_claim(Ring) ->
     {WMod, WFun} = riak:get_app_env(wants_claim_fun),
     case apply(WMod, WFun, [Ring]) of
         no -> {ok, Ring};
           riak_ring:transfer_node(I,
             lists:nth(crypto:rand_uniform(1,length(Others)+1),Others),R) end, 
       Ring, Indices),
-    riak_ring_manager:set_my_ring(ExitRing),    
-    [gossip_to(X) || X <- Others],
+    riak_ring_manager:set_my_ring(ExitRing),
+    [gossip_ring_to(X,ExitRing) || X <- riak_ring:all_members(Ring)],
     [gen_server:cast({riak_vnode_master, ExitingNode}, {start_vnode, P}) ||
         P <- AllIndices].
         

src/riak_ring_manager.erl

 -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
 	 terminate/2, code_change/3]).
 -export([get_my_ring/0,set_my_ring/1,write_ringfile/0,prune_ringfiles/0,
-        read_ringfile/1,find_latest_ringfile/0, subscribe/1, unsubscribe/1]).
--record(state, {ring, subscribers}).
+        read_ringfile/1,find_latest_ringfile/0]).
 
 start_link() -> gen_server2:start_link({local, ?MODULE}, ?MODULE, [], []).
 
 %% @private
-init([]) -> {ok, #state{ring=riak_ring:fresh(), subscribers=gb_trees:empty()}};
-init([RingFile]) -> {ok, #state{ring=read_ringfile(RingFile)}}.
+init([]) ->
+    Ring = riak_ring:fresh(),
+    ets:new(nodelocal_ring, [protected, named_table]),
+    ets:insert(nodelocal_ring, {ring, Ring}),
+    {ok, stateless_server}.
 
 %% @spec get_my_ring() -> {ok, riak_ring:riak_ring()} | {error, Reason}
-get_my_ring() -> gen_server2:call(?MODULE, get_my_ring).
+get_my_ring() ->
+    case ets:match(nodelocal_ring, {ring, '$1'}) of
+        [[Ring]] -> {ok, Ring};
+        [] -> {error, no_ring}
+    end.
 %% @spec set_my_ring(riak_ring:riak_ring()) -> ok
 set_my_ring(Ring) -> gen_server2:cast(?MODULE, {set_my_ring, Ring}).
 %% @spec write_ringfile() -> ok
 write_ringfile() -> gen_server2:cast(?MODULE, write_ringfile).
-%% @spec subscribe(pid()) -> ok
-subscribe(Pid) when is_pid(Pid) -> gen_server2:cast(?MODULE, {subscribe, Pid}).
-%% @spec unsubscribe(pid()) -> ok
-unsubscribe(Pid) when is_pid(Pid)-> 
-    gen_server2:cast(?MODULE, {unsubscribe, Pid}).
-     
-     
+
 %% @private
-handle_cast({set_my_ring, Ring}, State=#state{subscribers=Subscribers}) -> 
-    spawn(fun() -> notify_subscribers(Subscribers, Ring) end),
-    {noreply,State#state{ring=Ring}};
-
-handle_cast(write_ringfile, State=#state{ring=Ring}) ->
-    spawn(fun() -> do_write_ringfile(Ring) end),
+handle_cast({set_my_ring, Ring}, State) -> 
+    ets:insert(nodelocal_ring, {ring, Ring}),
     {noreply,State};
 
-handle_cast({subscribe, Pid}, State) ->
-    {noreply, add_subscriber(Pid, State)};
+handle_cast(write_ringfile, State) ->
+    {ok, Ring} = get_my_ring(),
+    spawn(fun() -> do_write_ringfile(Ring) end),
+    {noreply,State}.
 
-handle_cast({unsubscribe, Pid}, State) ->
-    {noreply, del_subscriber(Pid, State)}.
+handle_info(_Info, State) -> {noreply, State}.
 
 %% @private
-handle_call(get_my_ring, _From, State=#state{ring=Ring}) ->
-    {reply, {ok,Ring}, State}.
-
-%% @private
-handle_info({'DOWN', MonRef, process, Pid, _I}, State) ->
-    io:format("got DOWN msg for ~p ~p ~p~n", [MonRef, Pid, _I]),
-    {noreply, del_subscriber(Pid, State)};
-
-handle_info(_Info, State) -> {noreply, State}.
+handle_call(_Msg,_From,State) -> {noreply, State}.
 
 %% @private
 terminate(_Reason, _State) -> ok.
 %% @private
 code_change(_OldVsn, State, _Extra) ->  {ok, State}.
 
-add_subscriber(Pid, State=#state{subscribers=Subscribers}) ->
-    case gb_trees:lookup(Pid, Subscribers) of
-        {value, _} -> State;
-        none -> 
-            MonRef = erlang:monitor(process, Pid),
-            State#state{subscribers=gb_trees:enter(Pid, MonRef, Subscribers)}
-    end.
-
-del_subscriber(Pid, State=#state{subscribers=Subscribers}) ->
-    case gb_trees:lookup(Pid, Subscribers) of
-        {value, MonRef} ->
-            erlang:demonitor(MonRef, [flush]),
-            State#state{subscribers=gb_trees:delete(Pid, Subscribers)};
-        none ->
-            State
-    end.
-
-notify_subscribers(Subscribers, Ring) ->
-    [S ! {set_ring, Ring} || S <- gb_trees:keys(Subscribers)].
-
 do_write_ringfile(Ring) ->
     {{Year, Month, Day},{Hour, Minute, Second}} = calendar:universal_time(),
     TS = io_lib:format(".~B~2.10.0B~2.10.0B~2.10.0B~2.10.0B~2.10.0B",
     Connect = {riak_connect,
              {riak_connect, start_link, []},
              permanent, 5000, worker, [riak_connect]},
-    API = {riak_api,
-           {riak_api, start_link, []},
-           permanent, 5000, worker, [riak_api]},
     VMaster = {riak_vnode_master,
                {riak_vnode_master, start_link, []},
                permanent, 5000, worker, [riak_vnode_master]},
     Processes0 = 
     case riak:get_app_env(riak_web_ip) of
         "undefined" ->
-            [RingMgr,RingGossip,Connect,API,EventGuard,LocalLogger];
+            [RingMgr,RingGossip,Connect,EventGuard,LocalLogger];
         undefined ->
-            [RingMgr,RingGossip,Connect,API,EventGuard,LocalLogger];
+            [RingMgr,RingGossip,Connect,EventGuard,LocalLogger];
         _ ->
-            [RingMgr,RingGossip,Connect,API,EventGuard,LocalLogger,
+            [RingMgr,RingGossip,Connect,EventGuard,LocalLogger,
              RiakWeb]
     end,
     Processes1 = 
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.