Commits

Ben Hood committed 9b27436 Merge

Junked 19560

Comments (0)

Files changed (4)

include/amqp_client.hrl

                         tagged_sub_requests = dict:new(),
                         closing = false,
                         return_handler_pid,
+                        flow_control = false,
+                        flow_handler_pid,
                         consumers = dict:new()}).
 
 -record(rpc_client_state, {broker_config,

src/amqp_channel.erl

 -export([call/2, call/3, cast/2, cast/3]).
 -export([register_direct_peer/2]).
 -export([register_return_handler/2]).
+-export([register_flow_handler/2]).
 
 %% This diagram shows the interaction between the different component processes
 %% in an AMQP client scenario.
 register_return_handler(Channel, ReturnHandler) ->
     gen_server:cast(Channel, {register_return_handler, ReturnHandler} ).
 
+%% Registers a handler to deal with flow control
+register_flow_handler(Channel, FlowHandler) ->
+    gen_server:cast(Channel, {register_flow_handler, FlowHandler} ).
+
 %---------------------------------------------------------------------------
 % Internal plumbing
 %---------------------------------------------------------------------------
     {noreply, NewState} = rpc_bottom_half(ChannelCloseOk, State),
     {stop, normal, NewState};
 
+handle_method(#'channel.flow'{active = Active},
+              State = #channel_state{writer_pid = Writer, do2 = Do2,
+                                     flow_handler_pid = FlowHandler}) ->
+    case FlowHandler of
+        undefined -> ok;
+        _ -> case Active of
+                true  -> FlowHandler ! resume;
+                false -> FlowHandler ! pause
+            end
+    end,
+    Do2(Writer, #'channel.flow_ok'{active = Active}),
+    {noreply, State#channel_state{flow_control = not(Active)}};    
+    
 handle_method(Method, State) ->
     rpc_bottom_half(Method, State).
 
     Do2(Writer, Method),
     {noreply, State};
 
+%% This discards any message submitted to the channel when flow control is
+%% active
+handle_cast({cast, Method, Content}, 
+            State = #channel_state{writer_pid = Writer, do3 = Do3,
+                                   flow_control = true}) ->
+    % Silently discard the message
+    {noreply, State};
+    
 %% Standard implementation of the cast/3 command
-handle_cast({cast, Method, Content}, State = #channel_state{writer_pid = Writer, do3 = Do3}) ->
+handle_cast({cast, Method, Content}, 
+            State = #channel_state{writer_pid = Writer, do3 = Do3}) ->
     Do3(Writer, Method, Content),
     {noreply, State};
 
     NewState = State#channel_state{return_handler_pid = ReturnHandler},
     {noreply, NewState};
 
+%% Registers a handler to process flow control messages
+handle_cast({register_flow_handler, FlowHandler}, State) ->
+    NewState = State#channel_state{flow_handler_pid = FlowHandler},
+    {noreply, NewState};
+
 handle_cast({notify_sent, Peer}, State) ->
     {noreply, State}.
 
     #'basic.cancel_ok'{consumer_tag = ConsumerTag} = amqp_channel:call(Channel,BasicCancel),
     ok.
 
+declare_queue(Channel) ->
+    declare_queue(Channel, <<>>).
+    
 declare_queue(Channel, Q) ->
     QueueDeclare = #'queue.declare'{queue = Q,
                                     passive = false, durable = false,

src/test_util.erl

 % Reject is not yet implemented in RabbitMQ
 basic_reject_test(Connection) ->
     lib_amqp:close_connection(Connection).
+    
+% This is a test, albeit not a unit test, to see if the client
+% handles the channel.flow command.
+% test_util:start_channel_flow(lib_amqp:start_connection("localhost")).
+start_channel_flow(Connection) ->
+    crypto:start(),
+    X = <<"amq.direct">>,
+    Key = uuid(),
+    Producer = spawn_link(fun() ->
+                            Channel = lib_amqp:start_channel(Connection),
+                            amqp_channel:register_flow_handler(Channel, 
+                                                               self()),                            
+                            cf_producer_loop(Channel, X, Key)
+                          end),
+    Consumer = spawn_link(fun() ->
+                            Channel = lib_amqp:start_channel(Connection),
+                            Q = lib_amqp:declare_queue(Channel),                            
+                            lib_amqp:bind_queue(Channel, X, Q, Key),
+                            Tag = lib_amqp:subscribe(Channel, Q, self()),
+                            cf_consumer_loop(Channel, Tag)
+                          end),
+    {Producer, Consumer}.
+
+cf_consumer_loop(Channel, Tag) ->
+    receive
+        #'basic.consume_ok'{} -> cf_consumer_loop(Channel, Tag);
+        #'basic.cancel_ok'{} -> ok;
+        {#'basic.deliver'{delivery_tag = DeliveryTag}, Content} ->
+             lib_amqp:ack(Channel, DeliveryTag),
+             cf_consumer_loop(Channel, Tag);
+        stop ->
+             lib_amqp:unsubscribe(Channel, Tag),
+             ok
+    end.
+
+cf_producer_loop(Channel, X, Key) ->
+    receive
+        resume ->
+            cf_producer_loop(Channel, X, Key);
+        pause ->
+            receive
+                resume -> cf_producer_loop(Channel, X, Key)
+            end;
+        stop -> ok
+    after 5 ->
+        lib_amqp:publish(Channel, X, Key, crypto:rand_bytes(10000)),
+        cf_producer_loop(Channel, X, Key)
+    end.
 
 setup_publish(Channel) ->
     Publish = #publish{routing_key = <<"a.b.c.d">>,