Commits

Anonymous committed 104464c

Allow bunnyc:publish/3 to take a message() and add bunnyc:publish/4
which takes a property list of options, allowing for the mandatory
option to be set.

Comments (0)

Files changed (1)

 -include_lib("gen_bunny.hrl").
 
 -export([start_link/4, stop/1]).
--export([publish/3, get/2, ack/2]).
+-export([publish/3, publish/4, get/2, ack/2]).
 
 -export([init/1,
          handle_call/3,
          terminate/2,
          code_change/3]).
 
--record(state, {connection, channel, exchange, queue}).
+-record(state, {connection, channel, exchange, queue, key}).
 
 %%
 %% API
 %%
-publish(Name, Key, Payload) ->
-    gen_server:call(Name, {publish, Key, Payload}).
+publish(Name, Key, Message) ->
+    gen_server:call(Name, {publish, Key, Message, []}).
+publish(Name, Key, Message, Opts) ->
+    gen_server:call(Name, {publish, Key, Message, Opts}).
 
 
 get(Name, NoAck) ->
                 queue=Queue}}.
 
 
-handle_call({publish, Key, Payload}, _From,
+handle_call({publish, Key, Message, Opts}, _From,
             State = #state{channel=Channel, exchange=Exchange})
-  when is_binary(Key), is_binary(Payload) ->
-    Resp = internal_publish(Channel, Exchange, Key, Payload),
+  when is_binary(Key), is_binary(Message) orelse ?is_message(Message),
+       is_list(Opts) ->
+    Resp = internal_publish(Channel, Exchange, Key, Message, Opts),
     {reply, Resp, State};
 
 handle_call({get, NoAck}, _From,
 %%
 %% Internal
 %%
+internal_publish(Channel, Exchange, Key, Message, Opts)
+  when ?is_message(Message) ->
+    Mandatory = proplists:get_value(mandatory, Opts, false),
 
-internal_publish(Channel, Exchange, Key, Payload) ->
-    lib_amqp:publish(Channel, bunny_util:get_name(Exchange), Key, Payload),
-    ok.
+    BasicPublish = #'basic.publish'{
+      exchange = bunny_util:get_name(Exchange),
+      routing_key = Key,
+      mandatory = Mandatory},
+
+    amqp_channel:call(Channel, BasicPublish, Message),
+    ok;
+internal_publish(Channel, Exchange, Key, Message, Opts)
+  when is_binary(Message) ->
+    internal_publish(Channel, Exchange, Key,
+                     bunny_util:new_message(Message), Opts).
 
 
 internal_get(Channel, Queue, NoAck) ->
 
 normal_setup() ->
     {ok, _} = mock:mock(lib_amqp),
+    {ok, _} = mock:mock(amqp_channel),
     {ok, _} = bunnyc:start_link(
                 bunnyc_test, direct, <<"bunnyc.test">>,
                 connect_and_declare_expects(<<"bunnyc.test">>)),
 
 normal_stop(_) ->
     bunnyc:stop(bunnyc_test),
+    mock:verify_and_stop(amqp_channel),
     mock:verify_and_stop(lib_amqp),
     ok.
 
     {setup, fun normal_setup/0, fun normal_stop/1,
      ?_test(
         [begin
-             mock:expects(lib_amqp, publish,
-                          fun({dummy_channel, <<"bunnyc.test">>,
-                               <<"bunnyc.test">>, <<"HELLO GOODBYE">>}) ->
-                                  true
-                          end,
-                          ok),
+             mock:expects(
+               amqp_channel, call,
+               fun({dummy_channel, #'basic.publish'{
+                      exchange = <<"bunnyc.test">>,
+                      routing_key = <<"bunnyc.test">>},
+                    Message}) when ?is_message(Message) ->
+                       bunny_util:get_payload(Message) =:= <<"HELLO GOODBYE">>
+               end,
+               ok),
 
              ?assertEqual(ok, bunnyc:publish(
                                 bunnyc_test,
          end])}.
 
 
+publish_message_test_() ->
+    {setup, fun normal_setup/0, fun normal_stop/1,
+     ?_test(
+        [begin
+             ExpectedMessage = bunny_util:set_delivery_mode(
+                                 bunny_util:new_message(<<"HELLO">>),
+                                 2),
+
+             mock:expects(
+               amqp_channel, call,
+               fun({dummy_channel, #'basic.publish'{exchange=Exchange,
+                                                    routing_key=Key},
+                    Message}) when ?is_message(Message) ->
+                       Exchange =:= <<"bunnyc.test">>
+                           andalso Key =:= <<"bunnyc.test">>
+                           andalso ExpectedMessage =:= Message
+               end,
+               ok),
+             ?assertEqual(ok, bunnyc:publish(
+                                bunnyc_test,
+                                <<"bunnyc.test">>,
+                                ExpectedMessage))
+         end])}.
+
+
+publish_mandatory_test_() ->
+    {setup, fun normal_setup/0, fun normal_stop/1,
+     ?_test(
+        [begin
+             mock:expects(
+               amqp_channel, call,
+               fun({dummy_channel, #'basic.publish'{
+                      exchange = <<"bunnyc.test">>,
+                      routing_key = <<"bunnyc.test">>,
+                      mandatory = true},
+                    Message}) when ?is_message(Message) ->
+                       bunny_util:get_payload(Message) =:= <<"HELLO GOODBYE">>
+               end,
+               ok),
+
+             ?assertEqual(ok, bunnyc:publish(
+                                bunnyc_test,
+                                <<"bunnyc.test">>,
+                                <<"HELLO GOODBYE">>, [{mandatory, true}]))
+         end])}.
+
+
 get_test_() ->
     {setup, fun normal_setup/0, fun normal_stop/1,
      ?_test(