Commits

Anonymous committed 91beee7 Merge

merge

Comments (0)

Files changed (4)

src/riak_event_guard.erl

 -module(riak_event_guard).
 
 -export([start_link/0]).
--export([add_handler/2]).
+-export([add_handler/3]).
 -export([init/1, handle_info/2]).
 -export([handle_call/3,handle_cast/2,code_change/3,terminate/2]).
 
         X -> {stop, {error_in_init, X}}
     end.
 
-%% @spec add_handler(HandlerMod :: atom(), Arg :: term()) ->
+%% @spec add_handler(HandlerMod :: atom(), Arg :: term(), 
+%%                   MatchSpec :: string()) ->
 %%       ok | {error, Error :: term()}
 %% @doc Attach a new HandlerMod to riak events, started with Arg.
-add_handler(HandlerMod, Arg) ->
-    gen_server:call(?MODULE, {add_handler, HandlerMod, Arg}).
+add_handler(HandlerMod, Arg, MatchSpec) ->
+    gen_server:call(?MODULE, {add_handler, HandlerMod, Arg, MatchSpec}).
 
 %% @private
-handle_call({add_handler, HandlerMod, Arg},_From,State) -> 
+handle_call({add_handler, HandlerMod, Arg, MatchSpec},_From,State) -> 
     {ok, MyRing} = riak_ring_manager:get_my_ring(),
     Eventers0 = case riak_ring:get_meta(eventers, MyRing) of
         undefined -> [];
         {ok, X} -> sets:to_list(X)
     end,
-    Eventers = sets:add_element(node(), sets:from_list(
-                 [X || X <- Eventers0, net_adm:ping(X) =:= pong])),
+    
+    Eventers = sets:add_element({node(),MatchSpec}, sets:from_list(
+                 [{N,MS} || {N,MS} <- Eventers0,
+                       net_adm:ping(N) =:= pong,
+                       N /= node()])),
     NewRing = riak_ring:update_meta(eventers, Eventers, MyRing),
     riak_ring_manager:set_my_ring(NewRing),
     riak_ring_manager:write_ringfile(),
     Reply = gen_event:swap_sup_handler(riak_event,
-                                       {{HandlerMod,{node(),now()}}, swap},
+                                       {{HandlerMod,{node(),now()}}, swap}, 
                                        {{HandlerMod,{node(),now()}}, Arg}),
     riak_ring_gossiper:gossip_to(
       riak_ring:index_owner(NewRing,riak_ring:random_other_index(NewRing))),

src/riak_eventer.erl

 
 -export([notify/1,notify/3,eventer_config/1,do_eventer/1]).
 
+-include_lib("eunit/include/eunit.hrl").
+
 %% @private
 start_link() -> gen_server2:start_link({local, ?MODULE}, ?MODULE, [], []).
 start_link(test) -> % when started this way, run a mock server (nop)
 handle_cast({event, _Event}, test) -> {noreply,test};
 handle_cast({event, Event}, State) ->
     {ok, Ring} = riak_ring_manager:get_my_ring(),    %%%% TEST EVENTS!
-    Eventers = get_eventers(Ring),
+    Eventers = match_eventers(get_eventers(Ring), Event, []),
     [gen_event:notify({riak_event,Node},Event) || Node <- Eventers],
     {noreply, State}.
-    
+  
+  
 eventer_config([Cluster, CookieStr]) ->
     RipConf = [{no_config, true}, {cluster_name, Cluster},
        {riak_cookie, list_to_atom(CookieStr)}, {ring_state_dir, "<nostore>"},
     ok = application:start(sasl),
     [application:set_env(riak,K,V) || {K,V} <- RipConf].
 
+parse_matchspec(MatchSpec) when is_list(MatchSpec) ->
+    [NM,MM,TM] = string:tokens(MatchSpec, ":"),
+    {list_to_atom(NM),list_to_atom(MM),list_to_atom(TM)}.
+
 do_eventer([IP, PortStr, HandlerName, HandlerArg]) ->
+    do_eventer([IP, PortStr, HandlerName, HandlerArg, "_:_:_"]);
+do_eventer([IP, PortStr, HandlerName, HandlerArg, MatchSpec]) ->
+    MS = parse_matchspec(MatchSpec),
     riak_startup:join_cluster([IP, PortStr]),
     timer:sleep(random:uniform(1000)), % let some gossip happen
-    riak_event_guard:add_handler(list_to_atom(HandlerName), HandlerArg),
+    riak_event_guard:add_handler(list_to_atom(HandlerName),HandlerArg, MS),
     ok.
 
 get_eventers(Ring) ->
         {ok, X} -> sets:to_list(X)
     end.        
 
+match_eventers([], _, Acc) ->
+    Acc;
+match_eventers([{Eventer,MS}|Rest], Event, Acc) ->
+    case match_event(MS,Event) of
+        true ->
+            match_eventers(Rest, Event, [Eventer|Acc]);
+        false ->
+            match_eventers(Rest, Event, Acc)
+    end.
+
+match_event({'_','_','_'}, _) -> true;
+match_event({'_',Module,Type},{M,T,_N,_}) -> match_event({Module,Type},{M,T});
+match_event({N,Module,Type}, {M,T,N,_}) -> match_event({Module,Type},{M,T});
+match_event({_Node,_Module,_Type}, {_M,_T,_N,_}) -> false;
+match_event({'_', Type}, {_M,T}) -> match_event({Type}, {T});
+match_event({M,Type}, {M,T}) -> match_event({Type}, {T});
+match_event({_Module, _Type}, {_M, _T}) -> false;
+match_event({'_'}, {_}) -> true;
+match_event({T}, {T}) -> true;
+match_event(_, _) -> false.
+
 handle_info(_Info, State) -> {noreply, State}.
 
 %% @private
 
 %% @private
 handle_call(_, _From, State) -> {reply, no_call_support, State}.
+
+match_eventer_test() ->
+    Event1 = {some_mod, some_type, some_node, {some_detail}},
+    Event2 = {some_mod, some_type, other_node, {some_detail}},
+    Event3 = {some_mod, other_type, other_node, {some_detail}},
+    Event4 = {other_mod, other_type, other_node, {some_detail}},
+    MS1 = {'_', '_', '_'},
+    MS2 = {some_node, '_', '_'},
+    MS3 = {some_node, some_mod, '_'},
+    MS4 = {some_node, some_mod, some_type},
+    ?assertEqual(match_event(MS1, Event1), true),
+    ?assertEqual(match_event(MS2, Event1), true),
+    ?assertEqual(match_event(MS2, Event2), false),
+    ?assertEqual(match_event(MS3, Event1), true),
+    ?assertEqual(match_event(MS3, Event3), false),
+    ?assertEqual(match_event(MS4, Event1), true),
+    ?assertEqual(match_event(MS4, Event4), false).

src/riak_util.erl

     true = is_x_deleted(O1).
 
 clientid_uniqueness_test() ->
-    ClientIds = [mkclientid('somenode@somehost') || I <- lists:seq(0, 10000)],
+    ClientIds = [mkclientid('somenode@somehost') || _I <- lists:seq(0, 10000)],
     length(ClientIds) =:= length(sets:to_list(sets:from_list(ClientIds))).
 #!/usr/bin/env bash
-# ./start-eventer.sh <clustername> <cookie> <ip> <port> <nodename> <eventmodname> <eventmodarg>
+# ./start-eventer.sh <clustername> <cookie> <ip> <port> <nodename> <eventmodname> <eventmodarg> <matchspec>
 # This will:
 #  Join riak cluster <clustername> using erlcookie <cookie>
 #  via the node listening at <ip>:<port>
     echo Usage: 1>&2
     echo "    `basename $0` <clustername> <cookie> <ip in ring> " 1>&2
     echo "                  <doorbell port> <nodename> <event module name>" 1>&2
-    echo "                  <event module args>" 1>&2
+    echo "                  <event module args> <matchspec>" 1>&2
     exit 1
 fi
 . riak-env.sh
-erl -noshell -pa deps/*/ebin -pa ebin -name $5 -run riak_eventer eventer_config $1 $2 -run riak start -run riak_eventer do_eventer $3 $4 $6 $7
+erl -noshell -pa deps/*/ebin -pa ebin -name $5 -run riak_eventer eventer_config $1 $2 -run riak start -run riak_eventer do_eventer $3 $4 $6 $7 $8
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.