Anonymous avatar Anonymous committed fe33997

Use monitors isntead of links and try to reuse the connection if the channel dies, if the connection dies we just connect again.

Comments (0)

Files changed (1)

src/gen_bunny.erl

          ack/2,
          stop/1]).
 
--record(state, {mod,
+-record(state, {declare_fun,
+                connect_fun,
+                mod,
                 modstate,
                 channel,
                 connection,
                 queue,
+                connection_info,
                 declare_info,
                 consumer_tag,
-                no_ack}).
+                no_ack,
+                channel_mon,
+                connection_mon}).
 
 behaviour_info(callbacks) ->
     [{init, 1},
                                       fun bunny_util:connect/1),
     {DeclareFun, InitArgs3} = get_opt(declare_fun, InitArgs2,
                                       fun bunny_util:declare/2),
+
     case Module:init(InitArgs3) of
         {ok, ModState} ->
             case connect_declare_subscribe(
                    ConnectFun, DeclareFun,
                    ConnectionInfo, DeclareInfo, NoAck) of
                 {ok, ConnectionPid, ChannelPid, QueueName} ->
-                    true = link(ConnectionPid),
-                    true = link(ChannelPid),
-                    %% TODO:  monitor channel/connection pids?
-                    {ok, #state{mod=Module,
+                    ChannelRef = erlang:monitor(process, ChannelPid),
+                    ConnectionRef = erlang:monitor(process, ConnectionPid),
+                    {ok, #state{connect_fun=ConnectFun,
+                                declare_fun=DeclareFun,
+                                mod=Module,
                                 modstate=ModState,
                                 channel=ChannelPid,
                                 connection=ConnectionPid,
+                                connection_info=ConnectionInfo,
                                 declare_info=DeclareInfo,
                                 queue=QueueName,
-                                no_ack=NoAck}};
+                                no_ack=NoAck,
+                                channel_mon=ChannelRef,
+                                connection_mon=ConnectionRef}};
                 {_ErrClass, {error, Reason}} ->
                     Module:terminate(Reason, ModState),
                     {stop, Reason}
     end;
 handle_info(#'basic.consume_ok'{consumer_tag=CTag}, State=#state{}) ->
     {noreply, State#state{consumer_tag=CTag}};
+handle_info({'DOWN', MonitorRef, process, _Object, _Info},
+            State=#state{channel_mon=ChannelRef,
+                         connection=Connection,
+                         declare_fun=DeclareFun,
+                         declare_info=DeclareInfo,
+                         no_ack=NoAck})
+  when MonitorRef =:= ChannelRef ->
+    true = erlang:demonitor(ChannelRef),
+    Channel = lib_amqp:start_channel(Connection),
+    NewChannelRef = erlang:monitor(process, Channel),
+    {ok, QueueName} =
+        declare_subscribe(
+          Channel, DeclareFun, DeclareInfo, NoAck),
+
+    {noreply, State#state{queue=QueueName,
+                          channel=Channel,
+                          channel_mon=NewChannelRef}};
+handle_info({'DOWN', MonitorRef, process, _Object, _Info},
+            State=#state{channel_mon=ChannelRef,
+                         connection_mon=ConnectionRef,
+                         connect_fun=ConnectFun,
+                         connection_info=ConnectionInfo,
+                         declare_fun=DeclareFun,
+                         declare_info=DeclareInfo,
+                         no_ack=NoAck})
+  when MonitorRef =:= ConnectionRef ->
+    true = erlang:demonitor(ChannelRef),
+    true = erlang:demonitor(ConnectionRef),
+    {ok, NewConnection, NewChannel, QueueName} =
+        connect_declare_subscribe(
+          ConnectFun, DeclareFun, ConnectionInfo, DeclareInfo, NoAck),
+
+    NewConnectionRef = erlang:monitor(process, NewConnection),
+    NewChannelRef = erlang:monitor(process, NewChannel),
+
+    {noreply, State#state{queue=QueueName,
+                          connection=NewConnection,
+                          channel=NewChannel,
+                          channel_mon=NewChannelRef,
+                          connection_mon=NewConnectionRef}};
 handle_info(Info, State=#state{mod=Module, modstate=ModState}) ->
     io:format("Unknown info message: ~p~n", [Info]),
     case Module:handle_info(Info, ModState) of
             Reason;
         {ConnectionPid, ChannelPid} when is_pid(ConnectionPid),
                                          is_pid(ChannelPid) ->
-            case catch DeclareFun(ChannelPid, DeclareInfo) of
-                {'EXIT', {Reason, _Stack}} ->
-                    Reason;
-                {_Exchange, Queue} when ?is_queue(Queue) ->
-                    QueueName = bunny_util:get_name(Queue),
-                    lib_amqp:subscribe(ChannelPid,
-                                       QueueName,
-                                       self(), NoAck),
-                    {ok, ConnectionPid, ChannelPid, QueueName}
+            case declare_subscribe(ChannelPid, DeclareFun,
+                                   DeclareInfo, NoAck) of
+                {ok, QueueName} ->
+                    {ok, ConnectionPid, ChannelPid, QueueName};
+                Reason ->
+                    Reason
             end
     end.
 
+declare_subscribe(ChannelPid, DeclareFun, DeclareInfo, NoAck) ->
+    case catch DeclareFun(ChannelPid, DeclareInfo) of
+        {'EXIT', {Reason, _Stack}} ->
+            Reason;
+        {_Exchange, Queue} when ?is_queue(Queue) ->
+            QueueName = bunny_util:get_name(Queue),
+            lib_amqp:subscribe(ChannelPid,
+                               QueueName,
+                               self(), NoAck),
+            {ok, QueueName}
+    end.
+
 get_opt(Opt, Proplist, Default) ->
     {proplists:get_value(Opt, Proplist, Default),
      proplists:delete(Opt, Proplist)}.
 %%
 %% Tests
 %%
-
 -include_lib("eunit/include/eunit.hrl").
 
 cds_setup() ->
 test_gb_setup_1(NoAck) ->
     {ok, _} = mock:mock(lib_amqp),
 
-    ConnectionPid = spawn_fake_proc(self()),
-    ChannelPid = spawn_fake_proc(self()),
+    ConnectionPid = spawn_fake_proc(),
+    ChannelPid = spawn_fake_proc(),
 
     mock:expects(lib_amqp, subscribe,
                  fun({Channel, <<"bunny.test">>, _Pid, NA})
     test_gb_setup_1(false).
 
 
-test_gb_stop({ConnectionPid, ChannelPid, TestPid}) ->
+test_gb_stop({_ConnectionPid, _ChannelPid, TestPid}) ->
+    ExpectedChannelPid = gen_bunny:get_channel(TestPid),
+    ExpectedConnectionPid = gen_bunny:get_connection(TestPid),
+
     mock:expects(lib_amqp, unsubscribe,
                  fun({Channel, <<"bunny.consumer">>})
-                    when Channel =:= ChannelPid ->
+                    when Channel =:= ExpectedChannelPid ->
                          true
                  end,
                  ok),
 
     mock:expects(lib_amqp, teardown,
                  fun({Connection, Channel})
-                    when Connection =:= ConnectionPid,
-                         Channel =:= ChannelPid ->
+                    when Connection =:= ExpectedConnectionPid,
+                         Channel =:= ExpectedChannelPid ->
                          true
                  end,
                  ok),
     mock:verify_and_stop(lib_amqp),
     ok.
 
-test_gb_stop_nostop({_ConnectionPid, _ChannelPid, _TestPid}) ->
-    timer:sleep(100), %% I hate this.
-    mock:verify_and_stop(lib_amqp),
-    ok.
-
-
 test_gb_start_link_test_() ->
     {setup, fun test_gb_setup/0, fun test_gb_stop/1,
      fun({ConnectionPid, ChannelPid, TestPid}) ->
                  end])
      end}.
 
+test_monitor_setup() ->
+    {ok, _} = mock:mock(lib_amqp),
+
+    ConnectionPid = spawn_fake_proc(),
+    NewConnectionPid = spawn_fake_proc(),
+
+    ChannelPid = spawn_fake_proc(),
+    NewChannelPid = spawn_fake_proc(),
+
+    mock:expects(lib_amqp, subscribe,
+              fun({_Channel, <<"bunny.test">>, _Pid, _NA}) ->
+                      true
+                 end,
+                 ok, 2),
+
+    ConnectFun = fun(direct) ->
+                         case get('_connect_fun_run_before') of
+                             undefined ->
+                                 put('_connect_fun_run_before', true),
+                                 {ConnectionPid, ChannelPid};
+                             true ->
+                                 {NewConnectionPid, NewChannelPid}
+                         end
+                 end,
+
+    DeclareFun = fun(_Channel, <<"bunny.test">>) ->
+                         {bunny_util:new_exchange(<<"bunny.test">>),
+                          bunny_util:new_queue(<<"bunny.test">>)}
+                 end,
+
+    {ok, TestPid} = test_gb:start_link([{connect_fun, ConnectFun},
+                                        {declare_fun, DeclareFun}]),
+
+    TestPid ! #'basic.consume_ok'{consumer_tag = <<"bunny.consumer">>},
+
+    {ConnectionPid, NewConnectionPid, ChannelPid, NewChannelPid, TestPid}.
+
+test_monitor_stop({_ConnectionPid, _NewConnectionPid,
+                   _ChannelPid, _NewChannelPid, TestPid}) ->
+    ExpectedChannelPid = gen_bunny:get_channel(TestPid),
+    ExpectedConnectionPid = gen_bunny:get_connection(TestPid),
+
+    mock:expects(lib_amqp, unsubscribe,
+                 fun({Channel, <<"bunny.consumer">>})
+                    when Channel =:= ExpectedChannelPid ->
+                         true
+                 end,
+                 ok),
+
+    mock:expects(lib_amqp, teardown,
+                 fun({Connection, Channel})
+                    when Connection =:= ExpectedConnectionPid,
+                         Channel =:= ExpectedChannelPid ->
+                         true
+                 end,
+                 ok),
+    gen_bunny:stop(TestPid),
+    timer:sleep(100), %% I hate this.
+    mock:verify_and_stop(lib_amqp),
+    ok.
+
+channel_monitor_test_() ->
+    {setup, fun test_monitor_setup/0, fun test_monitor_stop/1,
+     fun({ConnectionPid, _, ChannelPid, NewChannelPid, TestPid}) ->
+             ?_test(
+                [begin
+                     MonRef = erlang:monitor(process, ChannelPid),
+
+                     mock:expects(
+                       lib_amqp, start_channel,
+                       fun({Connection})
+                          when is_pid(Connection) andalso
+                               Connection =:= ConnectionPid ->
+                               true
+                       end,
+                       fun(_, _) ->
+                               NewChannelPid
+                       end),
+
+                     exit(ChannelPid, die),
+                     ?assertEqual(true, erlang:is_process_alive(TestPid)),
+                     ?assertEqual(false, erlang:is_process_alive(ChannelPid)),
+
+                     receive
+                         {'DOWN', MonRef, process, ChannelPid, die} ->
+                             ok
+                     end,
+
+                     ?assertMatch(NewChannelPid,
+                                  gen_bunny:get_channel(TestPid)),
+                     ?assert(ChannelPid =/= NewChannelPid),
+                     ?assertEqual(true, erlang:is_process_alive(NewChannelPid))
+                 end])
+     end}.
+
+
+connection_monitor_test_() ->
+    {setup, fun test_monitor_setup/0, fun test_monitor_stop/1,
+     fun({ConnectionPid, NewConnectionPid,
+          ChannelPid, NewChannelPid, TestPid}) ->
+             ?_test(
+                [begin
+                     MonRef = erlang:monitor(process, ConnectionPid),
+
+                     exit(ConnectionPid, die),
+                     ?assertEqual(true, erlang:is_process_alive(TestPid)),
+                     ?assertEqual(false,
+                                  erlang:is_process_alive(ConnectionPid)),
+
+                     receive
+                         {'DOWN', MonRef, process, ConnectionPid, die} ->
+                             ok
+                     end,
+
+                     ?assertMatch(NewConnectionPid,
+                                  gen_bunny:get_connection(TestPid)),
+                     ?assert(ConnectionPid =/= NewConnectionPid),
+                     ?assertEqual(true,
+                                  erlang:is_process_alive(NewConnectionPid)),
+
+                     ?assertMatch(NewChannelPid,
+                                  gen_bunny:get_channel(TestPid)),
+                     ?assert(ChannelPid =/= NewChannelPid),
+                     ?assertEqual(true, erlang:is_process_alive(NewChannelPid))
+                 end])
+     end}.
 
 %% These are mostly to placate cover.
 
 code_change_test() ->
     ?assertEqual({ok, #state{}}, gen_bunny:code_change(ign, #state{}, ign)).
 
-channel_link_test_() ->
-    {setup, fun test_gb_setup/0, fun test_gb_stop_nostop/1,
-     fun({_ConnectionPid, ChannelPid, TestPid}) ->
-             process_flag(trap_exit, true),
-             exit(ChannelPid, kill),
-             receive
-                 {'EXIT', TestPid, killed}  ->
-                     ok
-             end,
-             process_flag(trap_exit, false),
-             ?assertEqual(false, erlang:is_process_alive(TestPid))
-     end}.
 
-    
-fake_proc(Pid) ->
+%% Test Utils
+
+fake_proc() ->
     receive
         _ ->
             ok
     after 1000 ->
-            fake_proc(Pid)
+            fake_proc()
     end.
 
-spawn_fake_proc(Pid) ->
-    spawn(fun() -> fake_proc(Pid) end).
-    
-
-           
+spawn_fake_proc() ->
+    spawn(fun() -> fake_proc() end).
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.