Commits

Anonymous committed 49473d4

Make eventers use simple erlang messaging (remove gen_events) [#27]

  • Participants
  • Parent commits 9af730b

Comments (0)

Files changed (2)

File src/riak_client.erl

 -export([reload_all/1]).
 -export([remove_from_cluster/1]).
 -export([send_event/2]).
+-export ([add_event_handler/2, add_event_handler/3, add_event_handler/4]).
+-export ([remove_event_handler/3]).
 %% @type default_timeout() = 15000
 -define(DEFAULT_TIMEOUT, 15000).
 
 send_event(EventName, EventDetail) ->
     rpc:call(Node,riak_eventer,notify,
              [client_event, EventName, {ClientId, EventDetail}]).
+
+
+%% @spec add_handler(Pid :: pid(), 
+%%                   Desc :: string(), 
+%%                   MatchHead :: match_head(), 
+%%                   MatchSpec :: match_spec()) ->
+%%       ok | {error, Error :: term()}
+%% @doc Attach a new handler pid to Riak events. 
+%% See http://erlang.org/doc/apps/erts/match_spec.html for more 
+%% information about match head and match guard.
+%% Desc is simply a human readable string used by the WebUI.
+add_event_handler(Pid, Desc) -> 
+    add_event_handler(Pid, Desc, {'_', '_', '_', '_'}).
+    
+add_event_handler(Pid, Desc, MatchHead) -> 
+    add_event_handler(Pid, Desc, MatchHead, []).
+    
+add_event_handler(Pid, Desc, MatchHead, MatchGuard) ->
+    rpc:call(Node, riak_eventer, add_handler, [Pid, Desc, MatchHead, MatchGuard]). 
+
+%% remove_handler/N - 
+%% Remove a previously added handler, if it still exists.
+%% Handlers are automatically removed for dead processes
+%% every (gossip_interval) seconds, or upon adding
+%% or deleting.
+remove_event_handler(Pid, MatchHead, MatchGuard) ->
+    rpc:call(Node, riak_eventer, remove_handler, [Pid, MatchHead, MatchGuard]). 

File src/riak_eventer.erl

 %% under the License.    
 
 
-%% TODO 
+%% TODO
 %% - Start a loop to remove dead handlers.
-%% - riak_client interface to add handler.
-%% - riak_client interface to remove handler.
-%% - String based remove handler.
 
 -module(riak_eventer).
 -behaviour(gen_server2).
 	 terminate/2, code_change/3]).
 
 -export([notify/1, notify/3]).
--export ([add_handler/2, add_handler/3, add_handler/4]).
+-export ([add_handler/4]).
 -export ([remove_handler/1, remove_handler/3]).
+-export ([remove_dead_handlers/0]).
+
+-define(REMOVE_INTERVAL, 5 * 1000).
 
 -include_lib("eunit/include/eunit.hrl").
 
     gen_server2:start_link({local, ?MODULE}, ?MODULE, [test], []).
 
 %% @private
-init([]) -> {ok, stateless_server};
-init([test]) -> {ok, test}.
+init(Arg) ->
+    % Remove dead handlers every 5 seconds. Only
+    % regossip the ring if something has changed.
+    timer:apply_after(?REMOVE_INTERVAL, gen_server, call, [?MODULE, {remove_dead_handlers, true}]),
+    case Arg of
+        [] -> {ok, stateless_server};
+        [test] -> {ok, test}
+    end.
 
 notify(Event) ->
     gen_server2:cast(riak_local_logger, {event, Event}),
 notify(Module, EventName, EventDetail) ->
     notify({Module, EventName, node(), EventDetail}).
     
-%% @spec add_handler(Pid :: pid(), 
-%%                   Desc :: string(), 
-%%                   MatchHead :: match_head(), 
-%%                   MatchSpec :: match_spec()) ->
-%%       ok | {error, Error :: term()}
-%% @doc Attach a new handler pid to Riak events. See http://erlang.org/doc/apps/erts/match_spec.html
-%% for more information about match head and match guard.
-add_handler(Pid, Desc) -> 
-    add_handler(Pid, Desc, {'_', '_', '_', '_'}).
-add_handler(Pid, Desc, MatchHead) -> 
-    add_handler(Pid, Desc, MatchHead, []).
 add_handler(Pid, Desc, MatchHead, MatchSpec) ->
     gen_server:call(?MODULE, {add_handler, Pid, Desc, MatchHead, MatchSpec}).
 
-%% remove_handler/N - 
-%% Remove a previously added handler, if it still exists.
-%% Handlers are automatically removed for dead processes
-%% every (gossip_interval) seconds, or upon adding
-%% or deleting.
 remove_handler(Pid, MatchHead, MatchSpec) ->
     HandlerID = get_handler_id(Pid, MatchHead, MatchSpec),
     remove_handler(HandlerID).
 
 remove_handler(HandlerID) ->
     gen_server:call(?MODULE, {remove_handler, HandlerID}).
+    
+remove_dead_handlers() ->
+    gen_server:call(?MODULE, {remove_dead_handlers, false}).
 
 %% @private (only used for test instances)
 stop() -> gen_server2:cast(?MODULE, stop).
     riak_ring_gossiper:gossip_to(RandomNode),
     {reply, ok, State};
     
+handle_call({remove_dead_handlers, Loop}, _From, State) ->
+    % If we are looping, then set up a new timer...
+    case Loop of
+        true ->  timer:apply_after(?REMOVE_INTERVAL, gen_server, call, [?MODULE, {remove_dead_handlers, true}]);
+        false -> ignore
+    end,
+
+    % Get the handlers...
+    {ok, Ring} = riak_ring_manager:get_my_ring(),
+    OldHandlers = get_handlers(Ring),
+    
+    % Filter out any dead handlers...
+    F = fun(Handler, R) ->
+        io:format("Handler: ~p~n", [Handler]),
+        case is_remote_process_alive(Handler#handler.pid) of
+            false -> remove_handler_from_ring(Handler#handler.id, R); 
+            true -> R
+        end
+    end,
+    Ring1 = lists:foldl(F, Ring, OldHandlers),
+    NewHandlers = get_handlers(Ring1),
+    
+    % Write and gossip the ring if it has changed...
+    RingHasChanged = OldHandlers /= NewHandlers,
+    case RingHasChanged of
+        true ->
+            % Set and save the new ring...
+            riak_ring_manager:set_my_ring(Ring1),
+            riak_ring_manager:write_ringfile(),
+    
+            % Gossip the new ring...
+            RandomNode = riak_ring:index_owner(Ring1,riak_ring:random_other_index(Ring1)),
+            riak_ring_gossiper:gossip_to(RandomNode);
+        false -> ignore
+    end,
+    {noreply, State};
+
+    
 handle_call(_, _From, State) -> {reply, no_call_support, State}.
 
 %% @private
     end || X <- MatchingHandlers],
     {noreply, State};
     
-handle_cast(remove_dead_handlers, State) ->
-    % Get the handlers...
-    % Filter out any dead pids...
-    % Update the ring, and gossip any changes...
-    throw(not_yet_implemented),
-    {noreply, State};
-    
 handle_cast(_, State) -> {noreply, State}.
 
 handle_info(_Info, State) -> {noreply, State}.
 get_handler_id(Pid, MatchHead, MatchGuard) ->
     erlang:md5(term_to_binary({Pid, MatchHead, MatchGuard})).
     
+is_remote_process_alive(Pid) ->
+    is_pid(Pid) andalso
+    lists:member(node(Pid), [node()|nodes()]) andalso
+    rpc:call(node(Pid), erlang, is_process_alive, [Pid]).
+    
 %% TESTS %%%
     
 add_handler_to_ring_test() ->
     ],
     ?assert(length(get_matching_handlers({test, ignored, ignored, ignored}, Handlers)) == 2),    
     ?assert(length(get_matching_handlers({riak_vnode, ignored, ignored, ignored}, Handlers)) == 4),    
-    ?assert(length(get_matching_handlers({riak_vnode, delete, ignored, ignored}, Handlers)) == 6).
+    ?assert(length(get_matching_handlers({riak_vnode, delete, ignored, ignored}, Handlers)) == 6).
+    
+