Commits

Anonymous committed 1d614a7

Handle decoding of message properties and if NoAck is specified send handle_message a tagged_message {DeliveryTag::integer(), Message::message()} (also add ?is_tagged_message)

Comments (0)

Files changed (5)

include/gen_bunny.hrl

 -define(is_exchange(X), element(1, X) =:= 'exchange.declare').
 -define(is_binding(X), element(1, X) =:= binding).
 -define(is_message(X), element(1, X) =:= content).
+-define(is_tagged_message(X),
+        (is_integer(element(1, X)) andalso
+         ?is_message(element(2, X)))).
 
 %%
 %% -types() - EDoc really needs to learn to read these.

src/bunny_util.erl

 %% @doc Construct a new message with a binary Payload.
 -spec(new_message(payload()) -> message()).
 new_message(Payload) when is_binary(Payload) ->
-    #content{
-             class_id=60,
-             properties= #'P_basic'{},
+    #content{class_id=60,
+             properties= amqp_util:basic_properties(),
+             properties_bin=none,
              payload_fragments_rev=[Payload]}.
 
 %% @spec get_payload(Message::message()) -> payload()

src/example_gb.erl

 
 handle_cast(_Msg, State) -> {noreply, State}.
 
-handle_info(_Info, State) -> {noreply, State}.
+handle_info(Info, State) ->
+    io:format("Unknown message: ~p~n", [Info]),
+    {noreply, State}.
 
 terminate(Reason, _State) ->
     io:format("~p terminating with reason ~p~n", [?MODULE, Reason]),

src/gen_bunny.erl

                 connection,
                 queue,
                 declare_info,
-                consumer_tag}).
+                consumer_tag,
+                no_ack}).
 
 behaviour_info(callbacks) ->
     [{init, 1},
                                 channel=ChannelPid,
                                 connection=ConnectionPid,
                                 declare_info=DeclareInfo,
-                                queue=QueueName}};
+                                queue=QueueName,
+                                no_ack=NoAck}};
                 {_ErrClass, {error, Reason}} ->
                     Module:terminate(Reason, ModState),
                     {stop, Reason}
             {stop, Reason, State#state{modstate=NewModState}}
     end.
 
-handle_info({#'basic.deliver'{},
-             Message},
-            State=#state{mod=Module, modstate=ModState})
-  when ?is_message(Message) ->
+handle_info({Envelope=#'basic.deliver'{},
+             Message0},
+            State=#state{no_ack=NoAck, mod=Module, modstate=ModState})
+  when ?is_message(Message0) ->
+    Message1 = rabbit_binary_parser:ensure_content_decoded(Message0),
+
+    Message = case NoAck of
+                  true ->
+                      Message1;
+                  false ->
+                      {Envelope#'basic.deliver'.delivery_tag, Message1}
+              end,
+
     case Module:handle_message(Message, ModState) of
         {noreply, NewModState} ->
             {noreply, State#state{modstate=NewModState}};
 handle_info(#'basic.consume_ok'{consumer_tag=CTag}, State=#state{}) ->
     {noreply, State#state{consumer_tag=CTag}};
 handle_info(Info, State=#state{mod=Module, modstate=ModState}) ->
+    io:format("Unknown info message: ~p~n", [Info]),
     case Module:handle_info(Info, ModState) of
         {noreply, NewModState} ->
             {noreply, State#state{modstate=NewModState}};
          end])}.
 
 
-test_gb_setup() ->
+test_gb_setup_1(NoAck) ->
     {ok, _} = mock:mock(lib_amqp),
 
     ConnectionPid = c:pid(0,0,0),
     ChannelPid = c:pid(0,0,1),
 
     mock:expects(lib_amqp, subscribe,
-                 fun({Channel, <<"bunny.test">>, _Pid, true})
-                    when Channel =:= ChannelPid ->
+                 fun({Channel, <<"bunny.test">>, _Pid, NA})
+                    when Channel =:= ChannelPid,
+                         NA =:= NoAck ->
                          true
                  end,
                  ok),
                  end,
 
     {ok, Pid} = test_gb:start_link([{connect_fun, ConnectFun},
-                                    {declare_fun, DeclareFun}]),
+                                    {declare_fun, DeclareFun},
+                                    {no_ack, NoAck}]),
 
     Pid ! #'basic.consume_ok'{consumer_tag = <<"bunny.consumer">>},
 
     Pid.
 
 
+test_gb_setup() ->
+    test_gb_setup_1(true).
+
+
+test_gb_noack_setup() ->
+    test_gb_setup_1(false).
+
+
 test_gb_stop(Pid) ->
     gen_bunny:stop(Pid),
     timer:sleep(100), %% I hate this.
      end}.
 
 
+test_gb_handle_message_decode_properties_test_() ->
+    {setup, fun test_gb_setup/0, fun test_gb_stop/1,
+     fun(Pid) ->
+             ?_test(
+                [begin
+                     ExpectedMessage = {
+                       content, 60, amqp_util:basic_properties(),
+                       <<152,0,24,97,112,112,108,105,99,97,116,105,111,110,
+                        47,111,99,116,101,116,45,115,116,114,101,97,109,1,0>>,
+                       [<<"zomgasdfasdf">>]},
+                     RawMessage = {
+                       content, 60, none,
+                       <<152,0,24,97,112,112,108,105,99,97,116,105,111,110,
+                        47,111,99,116,101,116,45,115,116,114,101,97,109,1,0>>,
+                       [<<"zomgasdfasdf">>]},
+                     Pid ! {#'basic.deliver'{}, RawMessage},
+                     ?assertEqual([ExpectedMessage], test_gb:get_messages(Pid))
+                 end])
+     end}.
+
+
+test_gb_handle_message_noack_test_() ->
+    {setup, fun test_gb_noack_setup/0, fun test_gb_stop/1,
+     fun(Pid) ->
+             ?_test(
+                [begin
+                     ExpectedMessage = {1, {
+                       content, 60, amqp_util:basic_properties(),
+                       <<152,0,24,97,112,112,108,105,99,97,116,105,111,110,
+                        47,111,99,116,101,116,45,115,116,114,101,97,109,1,0>>,
+                       [<<"zomgasdfasdf">>]}},
+                     RawMessage = {
+                       content, 60, none,
+                       <<152,0,24,97,112,112,108,105,99,97,116,105,111,110,
+                        47,111,99,116,101,116,45,115,116,114,101,97,109,1,0>>,
+                       [<<"zomgasdfasdf">>]},
+                     Pid ! {#'basic.deliver'{delivery_tag=1}, RawMessage},
+                     ?assertEqual([ExpectedMessage], test_gb:get_messages(Pid))
+                 end])
+     end}.
+
+
 test_gb_call_passthrough_test_() ->
     {setup, fun test_gb_setup/0, fun test_gb_stop/1,
      fun(Pid) ->
     gen_bunny:call(Pid, get_infos).
 
 handle_message(Message, State=#state{messages=Messages})
-  when ?is_message(Message) ->
+  when ?is_message(Message) orelse ?is_tagged_message(Message) ->
     NewMessages = [Message|Messages],
     {noreply, State#state{messages=NewMessages}}.