Commits

Anonymous committed c1ab76e

Fix a problem calling amqp_connection:start_network, make gen_bunny use bunny_util:connect and bunny_util:declare, and fix up example_gb to rely on the new gen_bunny functionality.

  • Participants
  • Parent commits 049dfc5

Comments (0)

Files changed (3)

src/bunny_util.erl

 
 -define(DEFAULT_USER, "guest").
 -define(DEFAULT_PASS, "guest").
--define(DEFAULT_VHOST, "/").
+-define(DEFAULT_VHOST, <<"/">>).
 
 connect() ->
     connect(direct).
     connect({network, Host, Port, {?DEFAULT_USER, ?DEFAULT_PASS}});
 connect({network, Host, Port, Creds}) ->
     connect({network, Host, Port, Creds, ?DEFAULT_VHOST});
-connect({network, Host, Port, {User, Pass}, VHost}) ->
-    Connection = amqp_connection:start_network(Host, Port, User, Pass, VHost),
+connect({network, Host, Port, {User, Pass}, VHost}) when is_binary(VHost) ->
+    Connection = amqp_connection:start_network(User, Pass, Host, Port, VHost),
     Channel = lib_amqp:start_channel(Connection),
     {Connection, Channel}.
 
     [lib_amqp:bind_queue(Channel, ExchangeName, QueueName, RoutingKey) ||
         RoutingKey <- RoutingKeys],
 
-    ok.
+    {Exchange, Queue}.
 
 
 %%
 
 network_expects(Host, Port, User, Pass, VHost) ->
     mock:expects(amqp_connection, start_network,
-                 fun({H, P0, U, P1, V}) when H =:= Host,
-                                             P0 =:= Port,
-                                             U =:= User,
-                                             P1 =:= Pass,
+                 fun({U, P0, H, P1, V}) when U =:= User,
+                                             P0 =:= Pass,
+                                             H =:= Host,
+                                             P1 =:= Port,
                                              V =:= VHost ->
                          true
                  end,
                              10000,
                              "al",
                              "franken",
-                             "/awesome"),
+                             <<"/awesome">>),
              ?assertEqual({dummy_network_conn, dummy_network_channel},
                           connect({network, "amqp.example.com", 10000,
-                                   {"al", "franken"}, "/awesome"}))
+                                   {"al", "franken"}, <<"/awesome">>}))
          end])}.
 
 %%
                     when E =:= Exchange ->
                          true
                  end,
-                 ok,
+                 {Exchange, Queue},
                  2),
 
     mock:expects(lib_amqp, bind_queue,
              declare_expects(new_exchange(<<"Foo">>),
                              new_queue(<<"Foo">>),
                              [<<"Foo">>]),
-             ?assertEqual(ok, declare(dummy_channel, <<"Foo">>))
+             ?assertEqual({new_exchange(<<"Foo">>),
+                           new_queue(<<"Foo">>)},
+                          declare(dummy_channel, <<"Foo">>))
          end])}.
 
 
              declare_expects(new_exchange(<<"Foo">>),
                              new_queue(<<"Bar">>),
                              [<<"Baz">>]),
-             ?assertEqual(ok, declare(dummy_channel,
-                                      {<<"Foo">>, <<"Bar">>, <<"Baz">>}))
+             ?assertEqual({new_exchange(<<"Foo">>),
+                           new_queue(<<"Bar">>)},
+                          declare(dummy_channel,
+                                  {<<"Foo">>, <<"Bar">>, <<"Baz">>}))
          end])}.
 
 
              declare_expects(new_exchange(<<"Foo">>),
                              new_queue(<<"Bar">>),
                              [<<"Baz">>, <<"Bax">>]),
-             ?assertEqual(ok, declare(dummy_channel,
-                                      {<<"Foo">>, <<"Bar">>,
-                                       [<<"Baz">>, <<"Bax">>]}))
+             ?assertEqual({new_exchange(<<"Foo">>),
+                           new_queue(<<"Bar">>)},
+                          declare(dummy_channel,
+                                  {<<"Foo">>, <<"Bar">>,
+                                   [<<"Baz">>, <<"Bax">>]}))
          end])}.
 
 
              declare_expects(new_exchange(<<"Foo">>),
                              new_queue(<<"Bar">>),
                              [<<"Baz">>]),
-             ?assertEqual(ok, declare(dummy_channel,
-                                      {new_exchange(<<"Foo">>),
-                                       new_queue(<<"Bar">>),
-                                       <<"Baz">>}))
+             ?assertEqual({new_exchange(<<"Foo">>),
+                           new_queue(<<"Bar">>)},
+                          declare(dummy_channel,
+                                  {new_exchange(<<"Foo">>),
+                                   new_queue(<<"Bar">>),
+                                   <<"Baz">>}))
          end])}.
 
 
              declare_expects(new_exchange(<<"Foo">>),
                              new_queue(<<"Bar">>),
                              [<<"Baz">>, <<"Bax">>]),
-             ?assertEqual(ok, declare(dummy_channel,
-                                      {new_exchange(<<"Foo">>),
-                                       new_queue(<<"Bar">>),
-                                       [<<"Baz">>, <<"Bax">>]}))
+             ?assertEqual({new_exchange(<<"Foo">>),
+                           new_queue(<<"Bar">>)},
+                          declare(dummy_channel,
+                                  {new_exchange(<<"Foo">>),
+                                   new_queue(<<"Bar">>),
+                                   [<<"Baz">>, <<"Bax">>]}))
          end])}.

src/example_gb.erl

 -module(example_gb).
 -behavior(gen_bunny).
 
--export([start_link/1,
+-export([start_link/0,
          init/1,
          handle_message/2,
          handle_call/3,
 
 -record(state, {messages=[]}).
 
-open_connection(direct) ->
-    Connection = lib_amqp:start_connection(),
-    Channel = lib_amqp:start_channel(Connection),
-    {Connection, Channel};
-open_connection(network) ->
-    Connection = amqp_connection:start_network("guest", "guest", "127.0.0.1", 
-                                               ?PROTOCOL_PORT, <<"/">>),
-    Channel = lib_amqp:start_channel(Connection),
-    {Connection, Channel}.
-
-setup(Type) ->
-    {_Connection, Channel} = open_connection(Type),
-    %% TODO : need utility stuff for these
-    Exchange = bunny_util:set_durable(
-                 bunny_util:new_exchange(<<"bunny.test">>), true),
-    Queue = bunny_util:set_durable(
-              bunny_util:new_queue(<<"bunny.test">>), true),
-    _Binding = bunny_util:new_binding(<<"bunny.test">>, <<"bunny.test">>,
-                                      <<"bunny.test">>),    
-    amqp_channel:call(Channel, Exchange),
-    amqp_channel:call(Channel, Queue),
-    lib_amqp:bind_queue(Channel, <<"bunny.test">>, <<"bunny.test">>,
-                        <<"bunny.test">>).
-    
-
-start_link(Type) ->
-    setup(Type),
-    gen_bunny:start_link(?MODULE, {direct, "guest", "guest"}, 
+start_link() ->
+    gen_bunny:start_link(?MODULE, direct,
                          <<"bunny.test">>, []).
 
-init([]) -> 
+init([]) ->
     {ok, #state{}}.
 
 get_messages(Pid) ->
     gen_bunny:call(Pid, get_messages).
 
-handle_message(Message, State=#state{messages=Messages}) -> 
+handle_message(Message, State=#state{messages=Messages}) ->
     NewMessages = [Message|Messages],
     {noreply, State#state{messages=NewMessages}}.
 
-handle_call(get_messages, _From, State=#state{messages=Messages}) -> 
+handle_call(get_messages, _From, State=#state{messages=Messages}) ->
     {reply, Messages, State}.
 
 handle_cast(_Msg, State) -> {noreply, State}.
 
 handle_info(_Info, State) -> {noreply, State}.
 
-terminate(Reason, _State) -> 
+terminate(Reason, _State) ->
     io:format("~p terminating with reason ~p~n", [?MODULE, Reason]),
     ok.
-
-
-%%
-%% Tests
-%%
-
--include_lib("eunit/include/eunit.hrl").
-

src/gen_bunny.erl

          get_consumer_tag/1,
          stop/1]).
 
--record(state, {mod, 
-                modstate, 
-                channel, 
+-record(state, {mod,
+                modstate,
+                channel,
                 connection,
-                queue, 
+                queue,
+                declare_info,
                 consumer_tag}).
 
 behaviour_info(callbacks) ->
      {handle_cast, 2},
      {handle_info, 2},
      {terminate, 2}];
-behaviour_info(_) -> 
+behaviour_info(_) ->
     undefined.
 
-start_link(Module, ConnectionInfo, QueueName, InitArgs) 
-  when is_tuple(ConnectionInfo), is_binary(QueueName), is_list(InitArgs)  ->
+start_link(Module, ConnectionInfo, DeclareInfo, InitArgs)
+  when is_atom(ConnectionInfo) orelse is_tuple(ConnectionInfo),
+       is_binary(DeclareInfo) orelse is_tuple(DeclareInfo),
+       is_list(InitArgs) ->
     gen_server:start_link(
-      ?MODULE, 
-      [Module,ConnectionInfo, QueueName, InitArgs], 
+      ?MODULE,
+      [Module, ConnectionInfo, DeclareInfo, InitArgs],
       []).
 
 call(Name, Request) ->
     gen_server:cast(Dest, Request).
 
 
-init([Module, ConnectionInfo, QueueName, InitArgs]) ->
+init([Module, ConnectionInfo, DeclareInfo, InitArgs]) ->
     case Module:init(InitArgs) of
         {ok, ModState} ->
-            case connect_and_subscribe(ConnectionInfo, QueueName) of
-                {ok, ChannelPid, ConnectionPid} ->
+            case connect_declare_subscribe(ConnectionInfo, DeclareInfo) of
+                {ok, ConnectionPid, ChannelPid, QueueName} ->
                     %% TODO:  monitor channel/connection pids?
-                    {ok, #state{mod=Module, 
+                    {ok, #state{mod=Module,
                                 modstate=ModState,
                                 channel=ChannelPid,
                                 connection=ConnectionPid,
+                                declare_info=DeclareInfo,
                                 queue=QueueName}};
                 {_ErrClass, {error, Reason}} ->
                     Module:terminate(Reason, ModState),
         {stop, Reason, NewModState} ->
             {stop, Reason, State#state{modstate=NewModState}}
     end.
-    
+
 
 terminate(Reason, #state{mod=Mod, modstate=ModState}) ->
     io:format("gen_bunny terminating with reason ~p~n", [Reason]),
     {ok, State}.
 
 %% TODO: better error handling here.
-connect_and_subscribe({direct, Username, Password}, QueueName) ->
-    %% TODO: link? 
-    case catch amqp_connection:start_direct(Username, Password) of
+connect_declare_subscribe(ConnectionInfo, DeclareInfo) ->
+    %% TODO: link?
+    case catch bunny_util:connect(ConnectionInfo) of
         {'EXIT', {Reason, _Stack}} ->
             Reason;
-        ConnectionPid when is_pid(ConnectionPid) ->
-            ChannelPid = amqp_connection:open_channel(ConnectionPid),
-            lib_amqp:subscribe(ChannelPid, QueueName, self()),
-            {ok, ChannelPid, ConnectionPid}
-    end;
-connect_and_subscribe({network, Host, Port, Username, Password, VHost}, 
-                      QueueName) ->
-
-    case catch amqp_connection:start_direct(Username, Password, Host, 
-                                            Port, VHost) of
-        {'EXIT', {Reason, _Stack}} ->
-            Reason;
-        ConnectionPid when is_pid(ConnectionPid) ->
-            ChannelPid = amqp_connection:open_channel(ConnectionPid),
-            lib_amqp:subscribe(ChannelPid, QueueName, self()),
-            {ok, ChannelPid, ConnectionPid}
+        {ConnectionPid, ChannelPid} when is_pid(ConnectionPid),
+                                         is_pid(ChannelPid) ->
+            case catch bunny_util:declare(ChannelPid, DeclareInfo) of
+                {'EXIT', {Reason, _Stack}} ->
+                    Reason;
+                {_Exchange, Queue} when ?is_queue(Queue) ->
+                    QueueName = bunny_util:get_name(Queue),
+                    lib_amqp:subscribe(ChannelPid,
+                                       QueueName,
+                                       self()),
+                    {ok, ConnectionPid, ChannelPid, QueueName}
+            end
     end.
-
-
-