Commits

Ben Hood committed ad7f455 Merge

Merged default into 19250

Comments (0)

Files changed (4)

     
 rpc_bottom_half(Reply, State = #channel_state{writer_pid = Writer,
                                               rpc_requests = RequestQueue,
-                                              do2 = Do2}) ->                                              
-    {{value, {From,_}}, NewRequestQueue} = queue:out(RequestQueue),
-    gen_server:reply(From, Reply),
-    catch case queue:head(NewRequestQueue) of
-        empty ->
-            ok;
-        {NewFrom,Method} ->
-            Do2(Writer,Method)
+                                              do2 = Do2}) ->
+    NewRequestQueue =
+        case queue:out(RequestQueue) of
+            {empty, {[],[]}}        -> exit(empty_rpc_bottom_half);
+            {{value, {From, _}}, Q} -> gen_server:reply(From, Reply),
+                                       Q
+        end,
+    case queue:is_empty(NewRequestQueue) of
+        true  -> ok;
+        false -> {_NewFrom, Method} = queue:head(NewRequestQueue),
+                 Do2(Writer, Method)
     end,
-    NewState = State#channel_state{rpc_requests = NewRequestQueue},
-    {noreply, NewState}.
+    {noreply, State#channel_state{rpc_requests = NewRequestQueue}}.
 
 subscription_top_half(Method, From, State = #channel_state{writer_pid = Writer, do2 = Do2}) ->
     Do2(Writer,Method),

src/direct_client_test.erl

 queue_unbind_test() ->
     test_util:queue_unbind_test(new_connection()).
 
+command_serialization_test() -> 
+    test_util:command_serialization_test(new_connection()).
+
 %----------------------------------------------------------------------------
 % Negative Tests
 

src/network_client_test.erl

 queue_unbind_test() ->
     test_util:queue_unbind_test(new_connection()).
 
+command_serialization_test() ->
+  test_util:command_serialization_test(new_connection()).
+
 %----------------------------------------------------------------------------
 % Negative Tests
 
     lib_amqp:teardown(Connection, Channel2),
     ok.
 
+% This is designed to exercize the internal queuing mechanism
+% to ensure that commands are properly serialized
+command_serialization_test(Connection) ->
+    Channel = lib_amqp:start_channel(Connection),
+    Parent = self(),
+    [spawn(fun() -> 
+                Q = uuid(),
+                Q1 = lib_amqp:declare_queue(Channel, Q),
+                ?assertMatch(Q, Q1),     
+                Parent ! finished
+           end) || Tag <- lists:seq(1,?Latch)],
+    latch_loop(?Latch),
+    lib_amqp:teardown(Connection, Channel).
+
 queue_unbind_test(Connection) ->
     X = <<"eggs">>, Q = <<"foobar">>, Key = <<"quay">>,
     Payload = <<"foobar">>,