Commits

Anonymous committed ed14f01

basic rabbitmq publisher as a riak post-commit hook

Comments (0)

Files changed (7)

apps/rabbit_publisher/ebin/rabbit_publisher.app

+% -*- mode: erlang -*-
+{application, 
+ rabbit_publisher,
+ [{description,  "rabbit_publisher"},
+  {id,           "rabbit_publisher"},
+  {vsn,          "0.1"},
+  {modules,      ['rabbit_publisher', 
+                  'rabbit_publisher_app', 
+                  'rabbit_publisher_server',
+                  'rabbit_publisher_sup'
+                 ]},
+  {applications, [kernel, 
+                  stdlib, 
+                  sasl, 
+                  crypto,
+                  riak_core,
+                  riak_kv]},
+  {registered,   [rabbit_publisher_sup]},
+  {mod,          {rabbit_publisher_app, []}},
+  {env,          [
+                  {rabbit_ip, "127.0.0.1"},
+                  {rabbit_user, "guest"},
+                  {rabbit_pass, "guest"},
+                  {rabbit_queue, <<"riak.publish">>}
+                 ]}
+]}.

apps/rabbit_publisher/src/rabbit_publisher.erl

+-module(rabbit_publisher).
+-author('Andy Gross <andy@basho.com>').
+-export([start/0, stop/0]).
+-export([install_hook/0, postcommit/1]).
+
+-define(RABBIT_HOOK, {struct, 
+                      [{<<"mod">>, <<"rabbit_publisher">>},
+                       {<<"fun">>, <<"postcommit">>}]}).
+
+start() ->
+    riak_core_util:start_app_deps(rabbit_publisher),
+    application:start(rabbit_publisher).
+
+%% @spec stop() -> ok
+stop() -> 
+    application:stop(rabbit_publisher).
+
+postcommit(Object) ->
+    Msg = list_to_binary(mochijson2:encode(riak_object:get_value(Object))),
+    rabbit_publisher_server:send(Msg).
+
+install_hook() ->
+    {ok, DefaultBucketProps} = application:get_env(riak_core, 
+                                                   default_bucket_props),
+    application:set_env(riak_core, default_bucket_props, 
+                        proplists:delete(postcommit, DefaultBucketProps)),
+    riak_core_bucket:append_bucket_defaults([{postcommit, [?RABBIT_HOOK]}]),
+    ok.
+    

apps/rabbit_publisher/src/rabbit_publisher_app.erl

+-module(rabbit_publisher_app).
+-author('Andy Gross <andy@basho.com>').
+-behaviour(application).
+-export([start/2,stop/1]).
+
+%% @spec start(Type :: term(), StartArgs :: term()) ->
+%%          {ok,Pid} | ignore | {error,Error}
+%% @doc The application:start callback for rabbit_publisher.
+%%      Arguments are ignored as all configuration is done via the erlenv file.
+start(_Type, _StartArgs) ->
+    riak_core_util:start_app_deps(rabbit_publisher),
+    rabbit_publisher:install_hook(),
+    rabbit_publisher_sup:start_link().
+
+%% @spec stop(State :: term()) -> ok
+%% @doc The application:stop callback for rabbit_publisher.
+stop(_State) -> ok.
+
+
+    
+

apps/rabbit_publisher/src/rabbit_publisher_server.erl

+-module(rabbit_publisher_server).
+-behaviour(gen_server).
+-export([start_link/0]).
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+         terminate/2, code_change/3]).
+-export([send/1]).
+-record(state, {bunnyc_pid, qname}).
+
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+init([]) ->
+    {ok, IP} = application:get_env(rabbit_publisher, rabbit_ip),
+    {ok, QName} = application:get_env(rabbit_publisher, rabbit_queue),
+    {ok, Pid} = bunnyc:start_link(bunnyc_test, {network, IP},
+                                  QName, []),
+    {ok, #state{bunnyc_pid=Pid, qname=QName}}.
+
+send(Message) ->
+    gen_server:call(?MODULE, {send, Message}).
+
+handle_call({send, Message}, _From, State=#state{bunnyc_pid=Pid, 
+                                                 qname=QName}) ->
+    bunnyc:publish(Pid, QName, Message),
+    {reply, ok, State}.
+
+handle_cast(_Msg, State) ->
+    {noreply, State}.
+
+handle_info(_Info, State) ->
+    {noreply, State}.
+
+terminate(_Reason, _State) ->
+    ok.
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+
+
+

apps/rabbit_publisher/src/rabbit_publisher_sup.erl

+-module(rabbit_publisher_sup).
+-author('Andy Gross <andy@basho.com>').
+-behaviour(supervisor).
+
+%% External exports
+-export([start_link/0]).
+%% supervisor callbacks
+-export([init/1]).
+
+%% @spec start_link() -> ServerRet
+%% @doc API for starting the supervisor.
+start_link() ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+
+%% @spec init([]) -> SupervisorTree
+%% @doc supervisor callback.
+init([]) ->
+    Processes = [
+                 {rabbit_publisher_server,
+                  {rabbit_publisher_server, start_link, []},
+                  permanent, 5000, worker, [rabbit_publisher_server]}
+                ],
+    {ok, {{one_for_one, 9, 10}, Processes}}.
-{sub_dirs, ["rel"]}.
+{sub_dirs, ["rel", "apps/rabbit_publisher"]}.
 
 {require_otp_vsn, "R13B04|R14"}.
 
 {cover_enabled, true}.
 
-{erl_opts, [debug_info, fail_on_warning]}.
+{erl_opts, [debug_info]}.
 
 {deps, [
        {luwak, "1.*", {hg, "http://bitbucket.org/basho/luwak", "93"}},
-       {riak_kv, "0.13.0rc1", {hg, "http://bitbucket.org/basho/riak_kv", "riak_kv-0.13.0rc1"}}
+       {riak_kv, "0.13.0rc1", {hg, "http://bitbucket.org/basho/riak_kv", "riak_kv-0.13.0rc1"}},
+       {gen_bunny, ".*", {hg, "http://bitbucket.org/dreid/gen_bunny", "tip"}}
        ]}.

rel/reltool.config

 %% -*- tab-width: 4;erlang-indent-level: 4;indent-tabs-mode: nil -*-
 %% ex: ts=4 sw=4 et
 {sys, [
-       {lib_dirs, ["../deps"]},
+       {lib_dirs, ["../deps", "../apps"]},
        {rel, "riak", "0.13.0rc1",
         [
          kernel,
          riak_core,
          riak_kv,
          skerl,
-         luwak
+         luwak,
+         gen_bunny,
+         rabbit_publisher
         ]},
        {rel, "start_clean", "",
         [
        {app, riak_kv, [{incl_cond, include}]},
        {app, skerl, [{incl_cond, include}]},
        {app, luwak, [{incl_cond, include}]},
-       {app, sasl, [{incl_cond, include}]}
+       {app, sasl, [{incl_cond, include}]},
+       {app, gen_bunny, [{incl_cond, include}]},
+       {app, rabbit_publisher, [{incl_cond, include}]}
       ]}.