Commits

Anonymous committed d40ea58

add accessors for connection pid, channel pid, and consumer tag

  • Participants
  • Parent commits 4cb5c82

Comments (0)

Files changed (2)

File src/example_gb.erl

 
 -include_lib("gen_bunny.hrl").
 
--record(state, {}).
+-record(state, {messages=[]}).
 
 open_connection(direct) ->
     Connection = lib_amqp:start_connection(),
 
 start_link(Type) ->
     setup(Type),
-    gen_bunny:start_link(?MODULE, 
-                         {direct, "guest", "guest"}, <<"bunny.test">>, []).
+    gen_bunny:start_link(?MODULE, {direct, "guest", "guest"}, <<"bunny.test">>, []).
 
 init([]) -> 
     {ok, #state{}}.
 
-handle_message(Message, State) -> 
-    io:format("~p got message ~p~n", [?MODULE, Message]),
-    {noreply, State}.
+handle_message(Message, State=#state{messages=Messages}) -> 
+    NewMessages = [Message|Messages],
+    {noreply, State#state{messages=NewMessages}}.
 
 terminate(_Reason, _State) -> ok.
 
+
+%%
+%% Tests
+%%
+
+-include_lib("eunit/include/eunit.hrl").
+

File src/gen_bunny.erl

          terminate/2,
          code_change/3]).
 -export([behaviour_info/1]).
--export([stop/1]).
+-export([get_connection/1,
+         get_channel/1,
+         get_consumer_tag/1,
+         stop/1]).
 
--record(state, {mod, modstate, channel, queue, consumer_tag}).
+-record(state, {mod, 
+                modstate, 
+                channel, 
+                connection,
+                queue, 
+                consumer_tag}).
 
 behaviour_info(callbacks) ->
     [{init, 1},
     case Module:init(InitArgs) of
         {ok, ModState} ->
             case connect_and_subscribe(ConnectionInfo, QueueName) of
-                {ok, ChannelPid} ->
+                {ok, ChannelPid, ConnectionPid} ->
                     %% TODO:  monitor channel/connection pids?
                     {ok, #state{mod=Module, 
                                 modstate=ModState,
                                 channel=ChannelPid,
+                                connection=ConnectionPid,
                                 queue=QueueName}};
                 Err ->
                     Module:terminate(Err, ModState),
 stop(Pid) when is_pid(Pid) ->
     gen_server:cast(Pid, stop).
 
-handle_call(_Request, _From, State=#state{}) ->
-    {reply, ok, State}.
+get_connection(Pid) when is_pid(Pid) ->
+    gen_server:call(Pid, get_connection).
 
-handle_cast(stop, State=#state{}) ->
-    %% TODO: unsubscribe/shutdown here
+get_channel(Pid) when is_pid(Pid) ->
+    gen_server:call(Pid, get_channel).
+
+get_consumer_tag(Pid) when is_pid(Pid) ->
+    gen_server:call(Pid, get_consumer_tag).
+
+handle_call(get_connection, _From, State=#state{connection=Connection}) ->
+    {reply, Connection, State};
+handle_call(get_channel, _From, State=#state{channel=Channel}) ->
+    {reply, Channel, State};
+handle_call(get_consumer_tag, _From, State=#state{consumer_tag=CTag}) ->
+    {reply, CTag, State}.
+
+handle_cast(stop, State=#state{channel=Channel, consumer_tag=CTag, connection=Connection}) ->
+    ok = lib_amqp:unsubscribe(Channel, CTag),
+    ok = lib_amqp:teardown(Connection, Channel),
     {stop, normal, State}.
 
 handle_info({#'basic.deliver'{},
     ConnectionPid = amqp_connection:start_direct(Username, Password),
     ChannelPid = amqp_connection:open_channel(ConnectionPid),
     lib_amqp:subscribe(ChannelPid, QueueName, self()),
-    {ok, ChannelPid};
+    {ok, ChannelPid, ConnectionPid};
 connect_and_subscribe({network, Host, Port, Username, Password, VHost}, 
                       QueueName) ->
     
                                                   Port, VHost),
     ChannelPid = amqp_connection:open_channel(ConnectionPid),
     lib_amqp:subscribe(ChannelPid, QueueName, self()),
-    {ok, ChannelPid}.
+    {ok, ChannelPid, ConnectionPid}.