Commits

Anonymous committed 9af730b

Make eventers use simple Erlang messaging (remove gen_events) [#27]

  • Participants
  • Parent commits 8cb7982

Comments (0)

Files changed (3)

File src/riak_event_guard.erl

-%% This file is provided to you under the Apache License,
-%% Version 2.0 (the "License"); you may not use this file
-%% except in compliance with the License.  You may obtain
-%% a copy of the License at
-
-%%   http://www.apache.org/licenses/LICENSE-2.0
-
-%% Unless required by applicable law or agreed to in writing,
-%% software distributed under the License is distributed on an
-%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-%% KIND, either express or implied.  See the License for the
-%% specific language governing permissions and limitations
-%% under the License.    
-
-%% @doc A wrapper server for connecting riak_eventer handlers to the gen_event.
-
--module(riak_event_guard).
-
--export([start_link/0]).
--export([add_handler/3]).
--export([init/1, handle_info/2]).
--export([handle_call/3,handle_cast/2,code_change/3,terminate/2]).
-
--behavior(gen_server).
-
-%% @spec start_link() -> {ok, pid()}
-%% @doc The usual gen_server start_link mechanism.
-start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-
-% @private
-init([]) ->
-    case gen_event:start_link({local,riak_event}) of
-        {ok, Pid} -> {ok, Pid};
-        {error,{already_started,Pid}} -> {ok, Pid};
-        X -> {stop, {error_in_init, X}}
-    end.
-
-%% @spec add_handler(HandlerMod :: atom(), Arg :: term(), 
-%%                   MatchSpec :: string()) ->
-%%       ok | {error, Error :: term()}
-%% @doc Attach a new HandlerMod to riak events, started with Arg.
-add_handler(HandlerMod, Arg, MatchSpec) ->
-    gen_server:call(?MODULE, {add_handler, HandlerMod, Arg, MatchSpec}).
-
-%% @private
-handle_call({add_handler, HandlerMod, Arg, MatchSpec},_From,State) -> 
-    {ok, MyRing} = riak_ring_manager:get_my_ring(),
-    Eventers0 = case riak_ring:get_meta(eventers, MyRing) of
-        undefined -> [];
-        {ok, X} -> sets:to_list(X)
-    end,
-    
-    Eventers = sets:add_element({node(),MatchSpec}, sets:from_list(
-                 [{N,MS} || {N,MS} <- Eventers0,
-                       net_adm:ping(N) =:= pong,
-                       N /= node()])),
-    NewRing = riak_ring:update_meta(eventers, Eventers, MyRing),
-    riak_ring_manager:set_my_ring(NewRing),
-    riak_ring_manager:write_ringfile(),
-    Reply = gen_event:swap_sup_handler(riak_event,
-                                       {{HandlerMod,{node(),now()}}, swap}, 
-                                       {{HandlerMod,{node(),now()}}, Arg}),
-    riak_ring_gossiper:gossip_to(
-      riak_ring:index_owner(NewRing,riak_ring:random_other_index(NewRing))),
-    {reply, Reply, State}.
-
-%% @private
-handle_info({gen_event_EXIT, HandlerMod, Reason},State) ->
-    %% gen_event manager sends this message if a handler was added using
-    %% gen_event:add_sup_handler/3 or gen_event:swap_sup_handler/3 functions
-    riak_eventer:notify(riak_event_guard, gen_event_exit,{HandlerMod, Reason}),
-    {noreply,State}.
-
-%% @private
-handle_cast(_,State) -> {noreply,State}.
-%% @private
-code_change(_OldVsn, State, _Extra) -> {ok, State}.
-%% @private
-terminate(_Reason,_State)  -> ok.

File src/riak_eventer.erl

 %% specific language governing permissions and limitations
 %% under the License.    
 
+
+%% TODO 
+%% - Start a loop to remove dead handlers.
+%% - riak_client interface to add handler.
+%% - riak_client interface to remove handler.
+%% - String based remove handler.
+
 -module(riak_eventer).
 -behaviour(gen_server2).
 -export([start_link/0,start_link/1,stop/0]).
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
 	 terminate/2, code_change/3]).
 
--export([notify/1,notify/3,eventer_config/1,do_eventer/1]).
+-export([notify/1, notify/3]).
+-export ([add_handler/2, add_handler/3, add_handler/4]).
+-export ([remove_handler/1, remove_handler/3]).
 
 -include_lib("eunit/include/eunit.hrl").
 
+-record (handler, {
+    id,         % The id of this handler. Made from a combination 
+                % of pid, matchhead, and matchguard, allowing for
+                % multiple handlers from the same pid.
+    desc,       % Human readable description
+    pid,        % Pid of the remote process
+    matchhead,  % MatchHead applied against {Module, EventName, Node, EventDetail}
+    matchguard  % MatchGuard, defaults to []
+}).
+
 %% @private
 start_link() -> gen_server2:start_link({local, ?MODULE}, ?MODULE, [], []).
 start_link(test) -> % when started this way, run a mock server (nop)
 
 notify(Module, EventName, EventDetail) ->
     notify({Module, EventName, node(), EventDetail}).
+    
+%% @spec add_handler(Pid :: pid(), 
+%%                   Desc :: string(), 
+%%                   MatchHead :: match_head(), 
+%%                   MatchSpec :: match_spec()) ->
+%%       ok | {error, Error :: term()}
+%% @doc Attach a new handler pid to Riak events. See http://erlang.org/doc/apps/erts/match_spec.html
+%% for more information about match head and match guard.
+add_handler(Pid, Desc) -> 
+    add_handler(Pid, Desc, {'_', '_', '_', '_'}).
+add_handler(Pid, Desc, MatchHead) -> 
+    add_handler(Pid, Desc, MatchHead, []).
+add_handler(Pid, Desc, MatchHead, MatchSpec) ->
+    gen_server:call(?MODULE, {add_handler, Pid, Desc, MatchHead, MatchSpec}).
+
+%% remove_handler/N - 
+%% Remove a previously added handler, if it still exists.
+%% Handlers are automatically removed for dead processes
+%% every (gossip_interval) seconds, or upon adding
+%% or deleting.
+remove_handler(Pid, MatchHead, MatchSpec) ->
+    HandlerID = get_handler_id(Pid, MatchHead, MatchSpec),
+    remove_handler(HandlerID).
+
+remove_handler(HandlerID) ->
+    gen_server:call(?MODULE, {remove_handler, HandlerID}).
 
 %% @private (only used for test instances)
 stop() -> gen_server2:cast(?MODULE, stop).
 
 %% @private
+handle_call({add_handler, Pid, Desc, MatchHead, MatchSpec},_From,State) -> 
+    % Get the ring...
+    {ok, Ring} = riak_ring_manager:get_my_ring(),
+    
+    % Add the handler...
+    Handler = make_handler(Pid, Desc, MatchHead, MatchSpec),
+    Ring1 = add_handler_to_ring(Handler, Ring),
+    
+    % Set and save the new ring...
+    riak_ring_manager:set_my_ring(Ring1),
+    riak_ring_manager:write_ringfile(),
+    
+    % Gossip the new ring...
+    RandomNode = riak_ring:index_owner(Ring1,riak_ring:random_other_index(Ring1)),
+    riak_ring_gossiper:gossip_to(RandomNode),
+    {reply, ok, State};
+    
+handle_call({remove_handler, HandlerID},_From,State) -> 
+    % Get the ring...
+    {ok, Ring} = riak_ring_manager:get_my_ring(),
+    
+    % Add the handler...
+    Ring1 = remove_handler_from_ring(HandlerID, Ring),
+    
+    % Set and save the new ring...
+    riak_ring_manager:set_my_ring(Ring1),
+    riak_ring_manager:write_ringfile(),
+    
+    % Gossip the new ring...
+    RandomNode = riak_ring:index_owner(Ring1,riak_ring:random_other_index(Ring1)),
+    riak_ring_gossiper:gossip_to(RandomNode),
+    {reply, ok, State};
+    
+handle_call(_, _From, State) -> {reply, no_call_support, State}.
+
+%% @private
 handle_cast(stop, State) -> {stop,normal,State};
 
 handle_cast({event, _Event}, test) -> {noreply,test};
+
 handle_cast({event, Event}, State) ->
-    {ok, Ring} = riak_ring_manager:get_my_ring(),    %%%% TEST EVENTS!
-    Eventers = match_eventers(get_eventers(Ring), Event, []),
-    [gen_event:notify({riak_event,Node},Event) || Node <- Eventers],
-    {noreply, State}.
-  
-
-eventer_config([Cluster, CookieStr]) ->
-    RipConf = [{no_config, true}, {cluster_name, Cluster},
-       {riak_cookie, list_to_atom(CookieStr)}, {ring_state_dir, "<nostore>"},
-       {ring_creation_size, 12}, {gossip_interval,1000000},
-       {wants_claim_fun, {riak_claim, never_wants_claim}},
-       {riak_web_ip, "undefined"},
-       {doorbell_port, 7000 + random:uniform(1000)},
-       {storage_backend, undefined}],
-    application:stop(sasl),
-    application:unload(sasl),
-    ok = application:load({application,sasl,[{errlog_type,error}]}),
-    ok = application:start(sasl),
-    [application:set_env(riak,K,V) || {K,V} <- RipConf].
-
-parse_matchspec(MatchSpec) when is_list(MatchSpec) ->
-    [NM,MM,TM] = string:tokens(MatchSpec, ":"),
-    {list_to_atom(NM),list_to_atom(MM),list_to_atom(TM)}.
-
-do_eventer([IP, PortStr, HandlerName, HandlerArg]) ->
-    do_eventer([IP, PortStr, HandlerName, HandlerArg, "_:_:_"]);
-do_eventer([IP, PortStr, HandlerName, HandlerArg, MatchSpec]) ->
-    MS = parse_matchspec(MatchSpec),
-    riak_startup:join_cluster([IP, PortStr]),
-    timer:sleep(random:uniform(1000)), % let some gossip happen
-    riak_event_guard:add_handler(list_to_atom(HandlerName),HandlerArg, MS),
-    ok.
-
-get_eventers(Ring) ->
-    case riak_ring:get_meta(eventers, Ring) of
-        undefined -> [];
-        {ok, X} -> sets:to_list(X)
-    end.        
-
-match_eventers([], _, Acc) ->
-    Acc;
-match_eventers([{Eventer,MS}|Rest], Event, Acc) ->
-    case match_event(MS,Event) of
-        true ->
-            match_eventers(Rest, Event, [Eventer|Acc]);
-        false ->
-            match_eventers(Rest, Event, Acc)
-    end.
-
-% Match an event to the event filter.
-% The idea is that the filter values (in the first tuple)
-% must either be a wildcard ('_'), or must match 
-% the value within the event tuple.
-% Return true if the filter matches the event.
-match_event({Node1, Module1, Type1}, {Module2, Type2, Node2, _}) when
-    (Node1 == '_' orelse Node1 == Node2) andalso
-    (Module1 == '_' orelse Module1 == Module2) andalso
-    (Type1 == '_' orelse Type1 == Type2) -> true;
-match_event(_, _) -> false.
+    % Get the handlers...
+    {ok, Ring} = riak_ring_manager:get_my_ring(),
+    Handlers = get_handlers(Ring),
+    MatchingHandlers = get_matching_handlers(Event, Handlers),
     
+    % Send the message to all handlers...
+    [begin
+        Pid = X#handler.pid,
+        Pid ! {event, Event}
+    end || X <- MatchingHandlers],
+    {noreply, State};
+    
+handle_cast(remove_dead_handlers, State) ->
+    % Get the handlers...
+    % Filter out any dead pids...
+    % Update the ring, and gossip any changes...
+    throw(not_yet_implemented),
+    {noreply, State};
+    
+handle_cast(_, State) -> {noreply, State}.
 
 handle_info(_Info, State) -> {noreply, State}.
 
 %% @private
 code_change(_OldVsn, State, _Extra) ->  {ok, State}.
 
-%% @private
-handle_call(_, _From, State) -> {reply, no_call_support, State}.
+%% make_handler/4 -
+%% Create an handler record from the supplied params.
+make_handler(Pid, Desc, MatchHead, MatchGuard) ->
+    ID = get_handler_id(Pid, MatchHead, MatchGuard),
+    #handler {
+        id = ID,
+        pid = Pid,
+        desc = Desc,
+        matchhead = MatchHead,
+        matchguard = MatchGuard
+    }.
+    
+%% add_handler_to_ring/5 -
+%% Given an handler and a ring, add the handler to
+%% the ring.
+add_handler_to_ring(Handler, Ring) ->
+    Handlers = get_handlers(Ring),
+    Handlers1 = lists:keystore(Handler#handler.id, 2, Handlers, Handler),
+    _Ring1 = set_handlers(Handlers1, Ring).
 
-match_eventer_test() ->
-    Event1 = {some_mod, some_type, some_node, {some_detail}},
-    Event2 = {some_mod, some_type, other_node, {some_detail}},
-    Event3 = {some_mod, other_type, other_node, {some_detail}},
-    Event4 = {other_mod, other_type, other_node, {some_detail}},
-    MS1 = {'_', '_', '_'},
-    MS2 = {some_node, '_', '_'},
-    MS3 = {some_node, some_mod, '_'},
-    MS4 = {some_node, some_mod, some_type},
-    ?assertEqual(match_event(MS1, Event1), true),
-    ?assertEqual(match_event(MS2, Event1), true),
-    ?assertEqual(match_event(MS2, Event2), false),
-    ?assertEqual(match_event(MS3, Event1), true),
-    ?assertEqual(match_event(MS3, Event3), false),
-    ?assertEqual(match_event(MS4, Event1), true),
-    ?assertEqual(match_event(MS4, Event4), false).
+%% remove_handler_from_ring/4 -
+%% Given part of an handler definition and a Ring, remove
+%% the matching handler from the ring.
+remove_handler_from_ring(Pid, MatchHead, MatchGuard, Ring) -> 
+    HandlerID = get_handler_id(Pid, MatchHead, MatchGuard),
+    remove_handler_from_ring(HandlerID, Ring).
+
+%% remove_handler_from_ring/2 -
+%% Given an HandlerID and a Ring, remove
+%% the matching handler from the ring.
+remove_handler_from_ring(HandlerID, Ring) -> 
+    % Remove the handler from the ring...
+    Handlers = get_handlers(Ring),
+    Handlers1 = lists:keydelete(HandlerID, 2, Handlers),
+    _Ring1 = set_handlers(Handlers1, Ring).
+  
+%% get_matching_handlers/2 -
+%% Given an event and a list of #handlers, look 
+%% through the handlers for all handlers that 
+%% should receive the event based on their matchspec.
+get_matching_handlers(Event, Handlers) ->
+    F = fun(H = #handler { matchhead=MatchHead, matchguard=MatchGuard }, Matches) ->
+        % NOTE: Compiled match_specs cannot be transfered across nodes,
+        % so we have to recompile each time. Don't worry, it's fast.
+        MS = ets:match_spec_compile([{MatchHead, MatchGuard, ['$$']}]),
+        case ets:match_spec_run([Event], MS) of
+            [_] -> [H|Matches];
+            _ -> Matches
+        end
+    end,
+    lists:foldl(F, [], Handlers).
+    
+%% Return the handlers in a ring...        
+get_handlers(Ring) ->
+    case riak_ring:get_meta(handlers, Ring) of
+        undefined -> [];
+        {ok, X} -> X
+    end.
+    
+%% Update a ring with a new set of handlers...
+set_handlers(Handlers, Ring) ->
+    riak_ring:update_meta(handlers, Handlers, Ring).
+    
+get_handler_id(Pid, MatchHead, MatchGuard) ->
+    erlang:md5(term_to_binary({Pid, MatchHead, MatchGuard})).
+    
+%% TESTS %%%
+    
+add_handler_to_ring_test() ->
+    application:set_env(riak, ring_creation_size, 16),
+    
+    % The bare ring...
+    Ring = riak_ring:fresh(),
+    [] = get_handlers(Ring),
+    
+    % Add an handler...
+    Handler1 = make_handler(self(), "Test description", {'_', '_', '_', '_'}, []),
+    Ring1 = add_handler_to_ring(Handler1, Ring),
+    [Handler1] = get_handlers(Ring1),
+    
+    % Add another handler...
+    Handler2 = make_handler(self(), "Test description 1", {riak_vnode, '_', '_', '_'}, []),
+    Ring2 = add_handler_to_ring(Handler2, Ring1),
+    ?assert(lists:sort([Handler1, Handler2]) == lists:sort(get_handlers(Ring2))),
+    
+    % Remove Handler2, only Handler1 should be left...
+    Ring3 = remove_handler_from_ring(Handler2#handler.pid, Handler2#handler.matchhead, Handler2#handler.matchguard, Ring2),
+    [Handler1] = get_handlers(Ring3),
+    
+    % Remove Handler1, no handlers should be left...
+    Ring4 = remove_handler_from_ring(Handler1#handler.id, Ring3),
+    [] = get_handlers(Ring4),
+    ok.
+
+get_matching_handlers_test() ->
+    Handlers = [
+        make_handler(self(), "All 1", '_', []),
+        make_handler(self(), "All 2", {'_', '_', '_', '_'}, []),
+        make_handler(self(), "Only riak_vnode 1", {riak_vnode, '_', '_', '_'}, []),
+        make_handler(self(), "Only riak_vnode 2", {'$1', '_', '_', '_'}, [{'==', '$1', riak_vnode}]),
+        make_handler(self(), "Only riak_vnode delete", {riak_vnode, delete, '_', '_'}, []),
+        make_handler(self(), "Only riak_vnode put, get, or delete", {'$1', '$2', '_', '_'}, [
+            {'andalso', {'==', '$1', riak_vnode}, {'orelse', {'==', '$2', get}, {'==', '$2', put}, {'==', '$2', delete}}}
+        ])
+    ],
+    ?assert(length(get_matching_handlers({test, ignored, ignored, ignored}, Handlers)) == 2),    
+    ?assert(length(get_matching_handlers({riak_vnode, ignored, ignored, ignored}, Handlers)) == 4),    
+    ?assert(length(get_matching_handlers({riak_vnode, delete, ignored, ignored}, Handlers)) == 6).

File src/riak_sup.erl

     VMaster = {riak_vnode_master,
                {riak_vnode_master, start_link, []},
                permanent, 5000, worker, [riak_vnode_master]},
-    EventGuard = {riak_event_guard,
-                 {riak_event_guard, start_link, []},
-                 permanent, 5000, worker, dynamic},
     LocalLogger = {riak_local_logger,
                    {riak_local_logger, start_link, []},
                    permanent, 5000, worker, [riak_local_logger]},
     Processes0 = 
     case riak:get_app_env(riak_web_ip) of
         "undefined" ->
-            [RingMgr,RingGossip,Connect,EventGuard,LocalLogger];
+            [RingMgr,RingGossip,Connect,LocalLogger];
         undefined ->
-            [RingMgr,RingGossip,Connect,EventGuard,LocalLogger];
+            [RingMgr,RingGossip,Connect,LocalLogger];
         _ ->
-            [RingMgr,RingGossip,Connect,EventGuard,LocalLogger,
+            [RingMgr,RingGossip,Connect,LocalLogger,
              RiakWeb]
     end,
     Processes1 =