Commits

Paul Jones  committed 2d4af3d

Eliminated use of Erlang AMQP client in applicator

  • Participants
  • Parent commits 6f0b7fa

Comments (0)

Files changed (3)

   rpc_call()
 - Drain queues to the server side
 - Accept keywords as terms in predicates
-- Remove the use of the Erlang AMQP client in the applicator module
 - Allow layered commands so that a thick client can submit a term list as well
 as an unparsed string (so that this works for thin clients as well) 
   -- use content-type to demarcate what type it is, defaulting to unparsed

File src/bql_applicator.erl

 
 -export([apply_commands/2]).
 
--include("amqp_client.hrl").
 -include("rabbit.hrl").
 -include("rabbit_framing.hrl").
 
 -define(RPC_TIMEOUT, 30000).
 
--record(state, {ch, node, user}).
+-record(state, {node, user}).
 
 apply_commands(Commands, User) ->
-    % Create an AMQP channel so we can actually perform operations
-    Connection = lib_amqp:start_connection("localhost"),
-    ControlCh = amqp_connection:open_channel(Connection),
-
-    % Create a connection to the Rabbit node too
+    % Create a connection to the Rabbit node
     Node = rabbit_misc:localnode(rabbit),
 
-    try
-        {ok, [catch apply_command(Command, #state {ch = ControlCh, node = Node, user = User}) 
-                || Command <- Commands]}
-    after
-        lib_amqp:close_connection(Connection)
-    end.
-
+    {ok, [catch apply_command(Command, #state {node = Node, user = User}) 
+            || Command <- Commands]}.
+            
 % Queue Management
 apply_command({create_queue, Name, Durable, Args}, #state {user = Username}) ->
     QueueName = rabbit_misc:r(<<"/">>, queue, list_to_binary(Name)),
     interpret_response(AllFieldList, FieldList, Connections, Modifiers);
 
 % Sending Messages
-apply_command({post_message, Exchange, RoutingKey, Msg}, #state { ch = Ch }) ->
-    Properties = #'P_basic'{ delivery_mode = 1 },
-    case lib_amqp:publish(Ch, list_to_binary(Exchange), list_to_binary(RoutingKey), 
-                          list_to_binary(Msg), Properties) of
-        ok  -> ok;
-        Res -> io_lib:format("~p", [Res])
+apply_command({post_message, X, RoutingKey, Msg}, #state { user = Username }) ->
+    ExchangeName = rabbit_misc:r(<<"/">>, exchange, list_to_binary(X)),
+    ensure_resource_access(Username, ExchangeName, write),
+    Exchange = rabbit_exchange:lookup_or_die(ExchangeName),
+    Content = rabbit_basic:build_content(#'P_basic'{}, list_to_binary(Msg)),
+    Message = #basic_message{exchange_name  = ExchangeName,
+                             routing_key    = list_to_binary(RoutingKey),
+                             content        = Content,
+                             persistent_key = none},
+    {RoutingRes, _DeliveredQPids} =
+                rabbit_exchange:publish(
+                  Exchange,
+                  rabbit_basic:delivery(true, false, none, Message)),
+    case RoutingRes of
+        routed ->
+            ok;
+        unroutable ->
+            "Message was unroutable";
+        not_delivered ->
+            "Message was not able to be delivered"
     end;
-
+    
 % Retreving Messages
-apply_command({retrieve_message, Q}, State = #state{ch = Ch}) ->
-    with_queue(State, fun() -> poll(Ch, Q) end, Q);
+apply_command({retrieve_message, QName}, State = #state{}) ->
+    with_queue(fun(Q) -> poll(Q) end, QName, State);
 
 %% This drains messages to a file on the server
-apply_command({drain_queue, Q}, State = #state{ch = Ch}) ->
+apply_command({drain_queue, QName}, State = #state{}) ->
     Fun = 
-        fun() ->
-            case disk_log:open([{name, Q}, {type, halt}]) of
-                {ok, Log} -> drain_loop(Ch, Q, Log);
-                {repaired, Log, _, _} -> drain_loop(Ch, Q, Log);
+        fun(Q) ->
+            case disk_log:open([{name, QName}, {type, halt}]) of
+                {ok, Log} -> drain_loop(Q, Log);
+                {repaired, Log, _, _} -> drain_loop(Q, Log);
                 {error, Reason} ->
                     error_logger:error_msg("Could not open disk log for"
                                            " queue (~p): ~p~n", [Q, Reason]),
             end,
             ok
         end,
-  with_queue(State, Fun, Q);
+  with_queue(Fun, QName, State);
 
 apply_command({select, EntityName, _, _}, #state {}) ->
     lists:flatten("Unknown entity " ++ EntityName ++ " specified to query");
 apply_command(Unknown, _State) ->
     debug("Unknown command: ~p~n", [Unknown]).
 
-drain_loop(Channel, Q, Log) ->
-    case poll(Channel, Q) of
+drain_loop(Q, Log) ->
+    case poll(Q) of
         empty -> ok;
         Payload ->
             case disk_log:blog(Log, Payload) of
-                ok -> drain_loop(Channel, Q, Log);
+                ok -> drain_loop(Q, Log);
                 _ -> ok
             end
     end,
     disk_log:close(Log),
     ok.
 
-with_queue(#state{node = Node}, Fun, Queue) ->
-    case rpc_call(Node, rabbit_amqqueue, lookup,
-                  [{resource, <<"/">>, queue, list_to_binary(Queue)}]) of
-        {error, not_found} -> unknown_queue;
-        {ok,_} -> Fun()
+with_queue(Fun, Queue, #state{node = Node, user = Username}) ->
+    QueueName = rabbit_misc:r(<<"/">>, queue, list_to_binary(Queue)),
+    ensure_resource_access(Username, QueueName, read),
+    
+    case rpc_call(Node, rabbit_amqqueue, lookup, [QueueName]) of
+        {error, not_found} -> 
+            lists:flatten(io_lib:format("~s not found", [rabbit_misc:rs(QueueName)]));
+        {ok, Q} -> 
+            Fun(Q)
     end.
 
-poll(Channel, Queue) ->
-    case lib_amqp:get(Channel, list_to_binary(Queue)) of
-        'basic.get_empty'            -> empty;
-        #amqp_msg{payload = Payload} -> Payload
+poll(Q) ->
+    case rabbit_amqqueue:basic_get(Q, self(), true) of
+        {ok, _MsgCount,
+         {_QName, _QPid, _MsgId, _Redelivered,
+          #basic_message{content = Content}}} ->
+            {_Props, Payload} = rabbit_basic:from_content(Content),
+            Payload;
+        empty ->
+            empty
     end.
 
 % Debug Control

File test/bql_test.erl

     ok.
 
 select_connections_test() ->
-    [{_, Result}] = execute("select * from connections"),
-    ?assert(length(Result) > 0).
+    execute("select * from connections").
 
 post_message_test() ->
     [ok] = execute("create exchange mynondurableexchange;"),
 
 post_message_with_routing_key_test() ->
     [ok] = execute("create exchange mynondurableexchange;"),
-    [ok] = execute("post 'Hello World' to mynondurableexchange with routing_key rk;").
+    ["Message was unroutable"] = execute("post 'Hello World' to mynondurableexchange with routing_key rk;").
 
 purge_queue_test() ->
     [ok] = execute("create queue mypurgingqueue;"),
     [Response2] = execute("get from mydeliveryqueue;"),
     ?assertEqual(empty, Response2),
     [Response3] = execute("get from bogusqueue;"),
-    ?assertEqual(unknown_queue, Response3).
+    ?assertEqual("queue 'bogusqueue' in vhost '/' not found", Response3).
     
 select_exchange_with_raw_test() ->
     Result = execute_raw([{select, "exchanges", [name], {{eq, name, "amq.topic"}, none}}]),