Commits

Anonymous committed cc5edf1 Merge

Merged bug22237 into default

Comments (0)

Files changed (15)

 PACKAGE=rabbitmq-bql
 DEPS=rabbitmq-server rabbitmq-erlang-client erlang-rfc4627
+CLIENT_DEPS=rabbit_common amqp_client
 GENERATED_SOURCES=command_lexer command_parser
-EXTRA_PACKAGE_DIRS=scripts
+SCRIPTS_DIR=scripts
+EXTRA_PACKAGE_DIRS=$(SCRIPTS_DIR)
 TEST_APPS=amqp_client rabbitmq_bql
-TEST_COMMANDS=command_parser_test:test() bql_test:test() amq_interface_test:test()
+TEST_COMMANDS=command_parser_test:test() bql_test:test() amq_interface_test:test() bql_client_test:test()
 START_RABBIT_IN_TESTS=true
+CLIENT_PACKAGE=rabbitmq-bql-client.zip
+EXTRA_PACKAGE_ARTIFACTS=$(CLIENT_PACKAGE)
 
 include ../include.mk
 
 
 src/command_parser.erl: ebin/leex.beam src/command_parser.yrl
 	$(ERL) -I -pa ebin -noshell -eval '{ok, _} = yecc:file("$(SOURCE_DIR)/$(PARSER_NAME)"), halt().'
+
+CLIENT_PACKAGE_DIR=build/client
+$(DIST_DIR)/$(CLIENT_PACKAGE): $(TARGETS) $(wildcard $(SCRIPTS_DIR))
+	rm -rf $(CLIENT_PACKAGE_DIR)
+	mkdir -p $(DIST_DIR)
+	mkdir -p $(CLIENT_PACKAGE_DIR)/ebin
+	cp $(EBIN_DIR)/* $(foreach DEP_NAME, $(CLIENT_DEPS), $(PRIV_DEPS_DIR)/$(DEP_NAME)/ebin/*) $(CLIENT_PACKAGE_DIR)/ebin
+	cp $(SCRIPTS_DIR)/* $(CLIENT_PACKAGE_DIR)
+	
+	(cd $(CLIENT_PACKAGE_DIR); zip -r ../../$@ *)
+	
+run_client: $(DIST_DIR)/$(CLIENT_PACKAGE)
+	(cd $(CLIENT_PACKAGE_DIR); ./bql $(CLIENT_ARGS))
+	
+run_dump: $(DIST_DIR)/$(CLIENT_PACKAGE)
+	(cd $(CLIENT_PACKAGE_DIR); ./bql_dump)
 as an unparsed string (so that this works for thin clients as well) 
   -- use content-type to demarcate what type it is, defaulting to unparsed
      so that thin clients don't need to set this
-- Drop Erlang RPC as transport, just use AMQP to get commands in to BQL
 - Test case for draining queues
-- Add at least rudimentary support for security
 
 Refactoring:
 
 RLWRAP=`which rlwrap`
 
 exec $RLWRAP erl \
-    -pa "`dirname $0`/../ebin" \
+    -pa "`dirname $0`/ebin" \
+    -kernel error_logger silent \
     -noshell \
     -hidden \
     -sname amqpbql$$ \
-    -s bql_client \
+    -s bql_shell \
     $@
     exit /B
 )
 
-"%ERLANG_HOME%\bin\erl.exe" -pa "%~dp0..\ebin" -noshell -hidden -sname amqpbdl -s bql_client -extra %*
+"%ERLANG_HOME%\bin\erl.exe" -pa "%~dp0ebin" -kernel error_logger silent -noshell -hidden -sname amqpbql -s bql_shell -extra %*
 
 ##
 
 exec erl \
-    -pa "`dirname $0`/../ebin" \
+    -pa "`dirname $0`/ebin" \
+    -kernel error_logger silent \
     -noshell \
     -hidden \
     -sname amqpbql$$ \
     -s bql_dump \
-    -extra "$@"
+    $@

scripts/bql_dump.bat

     exit /B
 )
 
-"%ERLANG_HOME%\bin\erl.exe" -pa "%~dp0..\ebin" -noshell -hidden -sname amqpbql -s bql_dump -extra %*
+"%ERLANG_HOME%\bin\erl.exe" -pa "%~dp0ebin" -kernel error_logger silent -noshell -hidden -sname amqpbql -s bql_dump %*
 

src/bql_amqp_rpc_client.erl

+%%   The contents of this file are subject to the Mozilla Public License
+%%   Version 1.1 (the "License"); you may not use this file except in
+%%   compliance with the License. You may obtain a copy of the License at
+%%   http://www.mozilla.org/MPL/
+%%
+%%   Software distributed under the License is distributed on an "AS IS"
+%%   basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%%   License for the specific language governing rights and limitations
+%%   under the License.
+%%
+%%   The Original Code is the RabbitMQ Erlang Client.
+%%
+%%   The Initial Developers of the Original Code are LShift Ltd.,
+%%   Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd.
+%%
+%%   Portions created by LShift Ltd., Cohesive Financial
+%%   Technologies LLC., and Rabbit Technologies Ltd. are Copyright (C)
+%%   2007 LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit
+%%   Technologies Ltd.;
+%%
+%%   All Rights Reserved.
+%%
+%%   Contributor(s): Ben Hood <0x6e6562@gmail.com>.
+%%
+
+%% @doc This module allows the simple execution of an asynchronous RPC over 
+%% AMQP. It frees a client programmer of the necessary having to AMQP
+%% plumbing. Note that the this module does not handle any data encoding,
+%% so it is up to the caller to marshall and unmarshall message payloads 
+%% accordingly.
+-module(bql_amqp_rpc_client).
+
+-include_lib("amqp_client/include/amqp_client.hrl").
+
+-behaviour(gen_server).
+
+-export([start/2, stop/1]).
+-export([call/4]).
+-export([init/1, terminate/2, code_change/3, handle_call/3,
+         handle_cast/2, handle_info/2]).
+
+-record(rpc_c_state, {channel,
+                      reply_queue,
+                      exchange,
+                      routing_key,
+                      continuations = dict:new(),
+                      correlation_id = 0}).
+
+%%--------------------------------------------------------------------------
+%% API
+%%--------------------------------------------------------------------------
+
+%% @spec (Connection, Queue) -> RpcClient
+%% where
+%%      Connection = pid()
+%%      Queue = binary()
+%%      RpcClient = pid()
+%% @doc Starts a new RPC client instance that sends requests to a
+%% specified queue. This function returns the pid of the RPC client process
+%% that can be used to invoke RPCs and stop the client.
+start(Connection, Queue) ->
+    {ok, Pid} = gen_server:start(?MODULE, [Connection, Queue], []),
+    Pid.
+
+%% @spec (RpcClient) -> ok
+%% where
+%%      RpcClient = pid()
+%% @doc Stops an exisiting RPC client.
+stop(Pid) ->
+    gen_server:call(Pid, stop, infinity).
+
+%% @spec (RpcClient, Payload) -> ok
+%% where
+%%      RpcClient = pid()
+%%      Payload = binary()
+%% @doc Invokes an RPC. Note the caller of this function is responsible for
+%% encoding the request and decoding the response.
+call(RpcClient, ContentType, Payload, Timeout) ->
+    gen_server:call(RpcClient, {call, ContentType, Payload}, Timeout).
+
+%%--------------------------------------------------------------------------
+%% Plumbing
+%%--------------------------------------------------------------------------
+
+%% Sets up a reply queue for this client to listen on
+setup_reply_queue(State = #rpc_c_state{channel = Channel}) ->
+    #'queue.declare_ok'{queue = Q} =
+        amqp_channel:call(Channel, #'queue.declare'{exclusive = true}),
+    State#rpc_c_state{reply_queue = Q}.
+
+%% Registers this RPC client instance as a consumer to handle rpc responses
+setup_consumer(#rpc_c_state{channel = Channel,
+                            reply_queue = Q}) ->
+    amqp_channel:call(Channel, #'basic.consume'{queue = Q, no_ack = true}).
+
+%% Publishes to the broker, stores the From address against
+%% the correlation id and increments the correlationid for
+%% the next request
+publish(ContentType, Payload, From,
+        State = #rpc_c_state{channel = Channel,
+                             reply_queue = Q,
+                             exchange = X,
+                             routing_key = RoutingKey,
+                             correlation_id = CorrelationId,
+                             continuations = Continuations}) ->
+    Props = #'P_basic'{correlation_id = <<CorrelationId:64>>,
+                       content_type = ContentType,
+                       reply_to = Q},
+    Publish = #'basic.publish'{exchange = X,
+                               routing_key = RoutingKey,
+                               mandatory = true},
+    amqp_channel:call(Channel, Publish, #amqp_msg{props = Props,
+                                                  payload = Payload}),
+    State#rpc_c_state{correlation_id = CorrelationId + 1,
+                      continuations =
+                          dict:store(CorrelationId, From, Continuations)}.
+
+%%--------------------------------------------------------------------------
+%% gen_server callbacks
+%%--------------------------------------------------------------------------
+
+%% Sets up a reply queue and consumer within an existing channel
+%% @private
+init([Connection, RoutingKey]) ->
+    Channel = amqp_connection:open_channel(Connection),
+    InitialState = #rpc_c_state{channel = Channel,
+                                exchange = <<>>,
+                                routing_key = RoutingKey},
+    State = setup_reply_queue(InitialState),
+    setup_consumer(State),
+    {ok, State}.
+
+%% Closes the channel this gen_server instance started
+%% @private
+terminate(_Reason, #rpc_c_state{channel = Channel}) ->
+    amqp_channel:close(Channel),
+    ok.
+
+%% Handle the application initiated stop by just stopping this gen server
+%% @private
+handle_call(stop, _From, State) ->
+    {stop, normal, ok, State};
+
+%% @private
+handle_call({call, ContentType, Payload}, From, State) ->
+    NewState = publish(ContentType, Payload, From, State),
+    {noreply, NewState}.
+
+%% @private
+handle_cast(_Msg, State) ->
+    {noreply, State}.
+
+%% @private
+handle_info(#'basic.consume_ok'{}, State) ->
+    {noreply, State};
+
+%% @private
+handle_info(#'basic.cancel_ok'{}, State) ->
+    {stop, normal, State};
+
+%% @private
+handle_info({#'basic.deliver'{},
+             #amqp_msg{props = #'P_basic'{correlation_id = <<Id:64>>},
+                       payload = Payload}},
+            State = #rpc_c_state{continuations = Conts}) ->
+    From = dict:fetch(Id, Conts),
+    gen_server:reply(From, Payload),
+    {noreply, State#rpc_c_state{continuations = dict:erase(Id, Conts) }}.
+
+%% @private
+code_change(_OldVsn, State, _Extra) ->
+    State.

src/bql_amqp_rpc_server.erl

     Ch = amqp_connection:open_channel(Connection),
     link(Ch),
 
-    _X = amqp_channel:call(Ch, #'exchange.declare'{exchange = ?ExchangeName, durable = true}),
     #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = ?QueueName, durable = true}),
     _ConsumerTag = amqp_channel:call(Ch, #'basic.consume'{queue = ?QueueName}),
-    #'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{exchange = ?ExchangeName, 
-                                                             queue = ?QueueName, 
-                                                             routing_key = <<>>}),
-
     {ok, #state { channel = Ch } }.
 
 handle_call(_,_,State) -> {reply,unhandled_call,State}.
 handle_info({#'basic.deliver' { 'delivery_tag' = DeliveryTag },
              #amqp_msg{props = Props, payload = Payload }},
             State = #state { channel = Ch }) ->
-    #'P_basic'{correlation_id = CorrelationId, reply_to = Q} = Props,
+    #'P_basic'{correlation_id = CorrelationId, reply_to = Q, content_type = ContentType} = Props,
     try
-      ResponseObj = case rfc4627:decode(Payload) of
-        {ok, RequestObj, _Rest} ->
-          case rfc4627:get_field(RequestObj, "query") of 
-            {ok, Query} ->
-              case bql_server:send_command(<<"guest">>, <<"guest">>, <<"/">>, <<"text/bql">>, binary_to_list(Query)) of
-                {ok, Result} ->
-                  {obj, [{"success", true}, {"messages", format_result(Result)}]};
-                {error, Reason} ->
-                  {obj, [{"success", false}, {"message", list_to_binary(Reason)}]}
-              end;
-            _ ->
-              {obj, [{"success", false}, {"message", <<"Invalid request - no query attribute">>}]}
-          end;
-        {error, _Reason} ->
-          {obj, [{"success", false}, {"message", <<"Invalid JSON in Query">>}]}
+      Result = case decode_request(ContentType, Payload) of
+        {ok, User, Password, VHost, Query} ->
+          bql_server:send_command(User, Password, VHost, <<"text/bql">>, Query);
+        {error, Reason} -> {error, Reason}
       end,
-
+      Response = encode_result(ContentType, Result),
+      
       Properties = #'P_basic'{correlation_id = CorrelationId},
       amqp_channel:call(Ch, #'basic.publish'{exchange = <<>>, routing_key = Q}, 
-                        #amqp_msg{payload=rfc4627:encode(ResponseObj), props = Properties})
+                        #amqp_msg{payload = Response, props = Properties})
     catch
       Tag:Error -> io:fwrite("Caught error: ~p,~p,~p~n", [Tag, Error,
                               erlang:get_stacktrace()])
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
 
+decode_request(<<"application/json">>, Payload) ->
+	case rfc4627:decode(Payload) of
+    {ok, RequestObj, _Rest} ->
+      case rfc4627:get_field(RequestObj, "query") of 
+        {ok, Query} ->
+			    {ok, 
+			      get_field_or_default(RequestObj, "user", <<"guest">>), 
+			      get_field_or_default(RequestObj, "password", <<"guest">>), 
+			      get_field_or_default(RequestObj, "vhost", <<"/">>), 
+			      binary_to_list(Query)};
+        _ ->
+          {error, <<"Invalid request - no query attribute">>}
+      end;
+    {error, _Reason} ->
+      {error, <<"Invalid JSON in Query">>}
+  end;
+decode_request(<<"application/bert">>, Payload) ->
+  Request = binary_to_term(Payload),
+  {ok, 
+    proplists:get_value(user, Request, <<"guest">>),
+    proplists:get_value(password, Request, <<"guest">>),
+    proplists:get_value(vhost, Request, <<"/">>),
+    proplists:get_value(query_text, Request, "")}.
+  
+encode_result(<<"application/json">>, Result) ->
+  ResponseObj = case Result of
+    {ok, ResultEls} ->
+      {obj, [{"success", true}, {"messages", format_result(ResultEls)}]};
+    {error, Reason} ->
+      {obj, [{"success", false}, {"message", list_to_binary(Reason)}]}
+  end,
+  
+  list_to_binary(rfc4627:encode(ResponseObj));
+encode_result(<<"application/bert">>, Result) ->
+  term_to_binary(Result).
+
+get_field_or_default(Obj, Name, Default) ->
+  case rfc4627:get_field(Obj, Name) of 
+    {ok, Val} -> Val;
+    _ -> Default
+  end.
+
 format_result(Result) ->
     [format_result_entry(E) || E <- Result].
 

src/bql_applicator.erl

 -include_lib("rabbit_common/include/rabbit_framing.hrl").
 
 -define(RPC_TIMEOUT, 30000).
+-define(MASTER_VHOST, <<"/">>).
 
 -record(state, {node, user, vhost}).
 
 
     {ok, [catch apply_command(Command, #state {node = Node, user = User, vhost = VHost}) 
             || Command <- Commands]}.
-            
+                                          
 % Queue Management
 apply_command({create_queue, Name, Durable, Args}, #state {user = Username, vhost = VHost}) ->
     QueueName = rabbit_misc:r(VHost, queue, list_to_binary(Name)),
     end;
 
 % User Management
-apply_command({create_user, Name, Password}, #state {node = Node}) ->
+apply_command({create_user, Name, Password}, #state {user = Username, node = Node}) ->
+    ensure_wildcard_access(Username, ?MASTER_VHOST, configure),
     rpc_call(Node, rabbit_access_control, add_user, [list_to_binary(Name), list_to_binary(Password)]),
     ok;
-apply_command({drop_user, Name}, #state {node = Node}) ->
+apply_command({drop_user, Name}, #state {user = Username, node = Node}) ->
+    ensure_wildcard_access(Username, ?MASTER_VHOST, configure),
     rpc_call(Node, rabbit_access_control, delete_user, [list_to_binary(Name)]),
     ok;
 
 % VHost Management
-apply_command({create_vhost, Name}, #state {node = Node}) ->
+apply_command({create_vhost, Name}, #state {user = Username, node = Node}) ->
+    ensure_wildcard_access(Username, ?MASTER_VHOST, configure),
     rpc_call(Node, rabbit_access_control, add_vhost, [list_to_binary(Name)]),
     ok;
-apply_command({drop_vhost, Name}, #state {node = Node}) ->
+apply_command({drop_vhost, Name}, #state {user = Username, node = Node}) ->
+    ensure_wildcard_access(Username, ?MASTER_VHOST, configure),
     rpc_call(Node, rabbit_access_control, delete_vhost, [list_to_binary(Name)]),
     ok;
 
                    list_to_binary(RoutingKey), <<"">>, Username, VHost);
 
 % Privilege Management
-apply_command({grant, Privilege, Regex, User}, #state {node = Node, vhost = VHost}) ->
+apply_command({grant, Privilege, Regex, User}, #state {node = Node, user = Username, vhost = VHost}) ->
+    ensure_wildcard_access(Username, ?MASTER_VHOST, configure),
     PrivilegeList = expand_privilege_list(Privilege),
-    apply_privilege_list(Node, list_to_binary(User), VHost, PrivilegeList, list_to_binary(Regex));
-apply_command({revoke, Privilege, User}, #state {node = Node, vhost = VHost}) ->
+    apply_privilege_list(list_to_binary(User), VHost, PrivilegeList, list_to_binary(Regex));
+apply_command({revoke, Privilege, User}, #state {node = Node, user = Username, vhost = VHost}) ->
+    ensure_wildcard_access(Username, ?MASTER_VHOST, configure),
     PrivilegeList = expand_privilege_list(Privilege),
-    apply_privilege_list(Node, list_to_binary(User), VHost, PrivilegeList, <<"">>);
+    apply_privilege_list(list_to_binary(User), VHost, PrivilegeList, <<"">>);
   
 % Queries
-apply_command({select, "exchanges", Fields, Modifiers}, #state {node = Node, vhost = VHost}) ->
+apply_command({select, "exchanges", Fields, Modifiers}, #state {node = Node, user = Username, vhost = VHost}) ->
+    ensure_wildcard_access(Username, VHost, read),
     AllFieldList = [name, type, durable, auto_delete, arguments],
     FieldList = validate_fields(AllFieldList, Fields),
     Exchanges = rpc_call(Node, rabbit_exchange, info_all, [VHost]),
     interpret_response(AllFieldList, FieldList, Exchanges, Modifiers);
 
-apply_command({select, "queues", Fields, Modifiers}, #state {node = Node, vhost = VHost}) ->
+apply_command({select, "queues", Fields, Modifiers}, #state {node = Node, user = Username, vhost = VHost}) ->
+    ensure_wildcard_access(Username, VHost, read),
     AllFieldList = [name, durable, auto_delete, arguments, pid, messages_ready,
                     messages_unacknowledged, messages_uncommitted, messages, acks_uncommitted,
                     consumers, transactions, memory],
     Queues = rpc_call(Node, rabbit_amqqueue, info_all, [VHost]),
     interpret_response(AllFieldList, FieldList, Queues, Modifiers);
 
-apply_command({select, "bindings", Fields, Modifiers}, #state {node = Node, vhost = VHost}) ->
+apply_command({select, "bindings", Fields, Modifiers}, #state {node = Node, user = Username, vhost = VHost}) ->
+    ensure_wildcard_access(Username, VHost, read),
     AllFieldList = [exchange_name, queue_name, routing_key, args],
     FieldList = validate_fields(AllFieldList, Fields),
     Bindings = rpc_call(Node, rabbit_exchange, list_bindings, [VHost]),
     interpret_response(AllFieldList, FieldList, Bindings, Modifiers);
 
-apply_command({select, "users", Fields, Modifiers}, #state {node = Node}) ->
+apply_command({select, "users", Fields, Modifiers}, #state {node = Node, user = Username}) ->
+    ensure_wildcard_access(Username, ?MASTER_VHOST, read),
     AllFieldList = [name],
     FieldList = validate_fields(AllFieldList, Fields),
     Response = rpc_call(Node, rabbit_access_control, list_users, []),
     Users = [[binary_to_list(User)] || User <- Response],
     interpret_response(AllFieldList, FieldList, Users, Modifiers);
 
-apply_command({select, "vhosts", Fields, Modifiers}, #state {node = Node}) ->
+apply_command({select, "vhosts", Fields, Modifiers}, #state {node = Node, user = Username}) ->
+    ensure_wildcard_access(Username, ?MASTER_VHOST, read),
     AllFieldList = [name],
     FieldList = validate_fields(AllFieldList, Fields),
     Response = rpc_call(Node, rabbit_access_control, list_vhosts, []),
     VHosts = [[{name, binary_to_list(User)}] || User <- Response],
     interpret_response(AllFieldList, FieldList, VHosts, Modifiers);
 
-apply_command({select, "permissions", Fields, Modifiers}, #state {node = Node, vhost = VHost}) ->
+apply_command({select, "permissions", Fields, Modifiers}, #state {node = Node, user = Username, vhost = VHost}) ->
+    ensure_wildcard_access(Username, VHost, read),
     AllFieldList = [username,configure_perm,write_perm,read_perm],
     FieldList = validate_fields(AllFieldList, Fields),
     Permissions = rpc_call(Node, rabbit_access_control, list_vhost_permissions, [VHost]),
     interpret_response(AllFieldList, FieldList, Permissions, Modifiers);
 
-apply_command({select, "connections", Fields, Modifiers}, #state {node = Node}) ->
+apply_command({select, "connections", Fields, Modifiers}, #state {node = Node, user = Username}) ->
+    ensure_wildcard_access(Username, ?MASTER_VHOST, read),
     AllFieldList = [pid, address, port, peer_address, peer_port, recv_oct, recv_cnt, send_oct, send_cnt,
                     send_pend, state, channels, user, vhost, timeout, frame_max],
     FieldList = validate_fields(AllFieldList, Fields),
 expand_privilege_list(X) ->
     [X].
 
-apply_privilege_list(Node, User, VHost, PrivilegeList, Regex) ->
+apply_privilege_list(User, VHost, PrivilegeList, Regex) ->
     %% Retrieve the old privilege structure
-    Current = retrieve_privileges(Node, User, VHost),
+    Current = retrieve_privileges(User, VHost),
 
     %% Update each privilege detailed in the privilege spec
     NewPrivs = [case X of
     [NewConfigure,NewWrite,NewRead] = NewPrivs,
 
     % Set the permissions
-    rpc_call(Node, rabbit_access_control, set_permissions, [User, VHost, NewConfigure, NewWrite, NewRead]),
+    rabbit_access_control:set_permissions(User, VHost, NewConfigure, NewWrite, NewRead),
     ok.
 
-retrieve_privileges(Node, User, VHost) ->
-    Permissions = rpc_call(Node, rabbit_access_control, list_vhost_permissions, [VHost]),
+retrieve_privileges(User, VHost) ->
+    Permissions = rabbit_access_control:list_vhost_permissions(VHost),
     UserPermissions = [[{configure, ConfigureRE}, {write, WriteRE}, {read, ReadRE}]
         || {PermUser, ConfigureRE, WriteRE, ReadRE} <- Permissions, User =:= PermUser],
     case length(UserPermissions) of
 ensure_resource_access(Username, Resource, Perm) ->
     rabbit_access_control:check_resource_access(Username, Resource, Perm).
     
+ensure_wildcard_access(Username, VHost, Perm) ->
+    VHostPerms = retrieve_privileges(Username, VHost),
+    Priv = proplists:get_value(Perm, VHostPerms, <<"">>),
+    case Priv == <<".*">> of
+        true -> ok;
+        false -> 
+            rabbit_misc:protocol_error(access_refused, 
+                "wildcard access to ~p on vhost ~s refused for user '~s'",
+                [Perm, VHost, Username])
+    end.
+    
 binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, Username, VHost) ->
     QueueName = rabbit_misc:r(VHost, queue, QueueNameBin),
     ensure_resource_access(Username, QueueName, write),

src/bql_client.erl

 %%
 -module(bql_client).
 
-% Client application for executing BDL commands.
+-export([connect/0, connect/5, close/1, execute/2]).
 
--export([start/0, stop/0]).
+-include_lib("amqp_client/include/amqp_client.hrl").
 
 % Record defining the context in which BQL commands are executed
--record(client_ctx, {username, password, vhost}).
+-record(client_ctx, {username, password, vhost, connection, rpc_client}).
 
-start() ->
-    Username = list_to_binary(argument_or_default(username, "guest")),
-    Password = list_to_binary(argument_or_default(password, "guest")),
-    VHost = list_to_binary(argument_or_default(vhost, "/")),
-    ClientContext = #client_ctx{username = Username, password = Password, vhost = VHost},
+%% Creates a connection to the Rabbit server that can subsequently be used
+%% to issue BQL requests. Uses init arguments to determine connection
+%% parameters.
+connect() ->
+  Username = list_to_binary(bql_utils:argument_or_default(username, "guest")),
+  Password = list_to_binary(bql_utils:argument_or_default(password, "guest")),
+  VHost = list_to_binary(bql_utils:argument_or_default(vhost, "/")),
+  Host = bql_utils:argument_or_default(host, "localhost"),
+  Port = bql_utils:argument_or_default(port, ?PROTOCOL_PORT),
+  connect(Host, Port, Username, Password, VHost).
 
-    case init:get_argument(execute) of
-      error ->
-         execute_shell(ClientContext),
-         halt();
-      {ok, BQL} ->
-         case apply_bql_file(ClientContext, BQL) of
-           ok    -> halt();
-           error -> halt(1)
-         end;
-      _ ->
-         io:fwrite("Too many arguments supplied. Provide a BQL file that should be applied.~n"),
-         halt()
-    end.
-
-stop() ->
-    ok.
-
-argument_or_default(Flag, Default) ->
-  case init:get_argument(Flag) of
-    {ok, [[Val]]} -> Val;
-    _ -> Default
-  end.
-
-execute_shell(ClientContext) ->
-    case run_command(ClientContext) of
-        exit -> ok;
-        _    -> execute_shell(ClientContext)
-    end.
-
-run_command(ClientContext) ->
-    Line = io:get_line("BQL> "),
-    case Line of
-        eof      -> exit;
-        "exit\n" -> exit;
-        _        -> execute_block(ClientContext, Line), ok
-    end.
-      
-
-apply_bql_file(ClientContext, BQL) ->
-    case filelib:is_file(BQL) of
-        false ->
-            io:fwrite("Provided BQL file does not exist!~n"),
-            error;
-        true ->
-            {ok, Contents} = file:read_file(BQL),
-            execute_block(ClientContext, binary_to_list(Contents))
-    end.
-
-execute_block(#client_ctx { username = User, password = Password, vhost = VHost }, Contents) ->
-    case rpc:call(bql_utils:makenode("rabbit"), bql_server, send_command, 
-                    [User, Password, VHost, <<"text/bql">>, Contents]) of	
-        {ok, Result}    -> format_result(Result);
-        {error, Reason} -> io:format("BQL execution failed:~n  ~s~n", [Reason])
-    end.
-
-format_result(Result) ->
-    [format_result_block(Item) || Item <- Result],
-    ok.
-
-format_result_block({Headers, Rows}) when is_list(Headers), is_list(Rows) ->
-    %% Convert the content of all the rows to strings
-    StringifiedRows = [[bql_utils:convert_to_string(Cell) || Cell <- Row] || Row <- Rows],
-
-    %% Work through the items and headers, and find the longest item
-    CountedHeaders = lists:zip(Headers, lists:seq(1, length(Headers))),
-    Widths = [measure_column(Header, Position, StringifiedRows) || {Header, Position} <- CountedHeaders],
-
-    %% Output the header then inside dividers
-    Divider = ["-" || _ <- lists:seq(1, lists:sum(Widths) + 3*length(Widths) + 1)] ++ "~n",
-    io:fwrite(Divider),
-    output_row([atom_to_list(H) || H <- Headers], Widths),
-    io:fwrite(Divider),
-
-    [output_row(Row, Widths) || Row <- StringifiedRows],
-    io:fwrite("~n"),
-    ok;
-format_result_block(Result) ->
-    io:format("~p~n", [Result]),
-    ok.
-
-measure_column(Header, Position, Items) ->
-    lists:max([length(X) || X <- [atom_to_list(Header)] ++ [lists:nth(Position, Row) || Row <- Items]]).
-output_row(Items, Widths) ->
-    WidthItems = lists:zip(Items, Widths),
-    [io:format("| ~s ", [widen(Item, Width)]) || {Item, Width} <- WidthItems],
-    io:fwrite("|~n").
-
-widen(Item, Width) ->
-    Extra = Width - length(Item),
-    case Extra of
-        0 -> Item;
-        _ -> Item ++ [" " || _ <- lists:seq(1, Extra)]
-    end.
+%% Creates a connection to the Rabbit server that can subsequently be used
+%% to issue BQL requests.
+connect(Host, Port, Username, Password, VHost) ->
+	% Open a conenction and wire a RPC client to it
+	Connection = amqp_connection:start_network(#amqp_params{
+        username     = Username,
+        password     = Password,
+        virtual_host = <<"/">>,  %% bql.query is in the default vhost
+        host         = Host,
+        port         = Port}),
+    Client = bql_amqp_rpc_client:start(Connection, <<"bql.query">>),
+    #client_ctx{username = Username, password = Password, vhost = VHost,
+	            connection = Connection, rpc_client = Client}.
+				
+%% Disconnects the BQL client and frees up any resources associated with it
+close(#client_ctx { connection = Connection, rpc_client = Client }) ->
+  bql_amqp_rpc_client:stop(Client),
+  amqp_connection:close(Connection).
+				
+%% Executes the given BQL request on the connected server
+execute(#client_ctx { username = User, password = Password, vhost = VHost, rpc_client = Client }, Contents) ->
+  Request = [{user, User}, {password, Password}, {vhost, VHost}, {query_text, Contents}],
+	Res = bql_amqp_rpc_client:call(Client, <<"application/bert">>, term_to_binary(Request), 500),
+	binary_to_term(Res).
 -define(ReservedQueues, ["bql.query"]).
 
 start() ->
-    Exchanges = execute_block("select * from exchanges order by name;",
+    Client = bql_client:connect(),
+  
+    Exchanges = execute_block(Client, "select * from exchanges order by name;",
                               fun(Ex) ->
                                 {value, {_, ExchangeType}} = lists:keysearch(type, 1, Ex),
                                 Durable = durable_str(Ex),
                                   false -> io_lib:format("create ~s~p exchange '~s';", [Durable, ExchangeType, Name])
                                 end
                               end),
-    Queues = execute_block("select * from queues order by name;",
+    Queues = execute_block(Client, "select * from queues order by name;",
                            fun(Q) ->
                              Durable = durable_str(Q),
                              {value, {_, Name}} = lists:keysearch(name, 1, Q),
                                false -> io_lib:format("create ~squeue '~s';", [Durable, Name])
                              end
                            end),
-    Bindings = execute_block("select * from bindings order by exchange_name, queue_name, 'routing_key';",
+    Bindings = execute_block(Client, "select * from bindings order by exchange_name, queue_name, 'routing_key';",
                              fun(B) ->
                                {value, {_, X}} = lists:keysearch(exchange_name, 1, B),
                                {value, {_, Q}} = lists:keysearch(queue_name, 1, B),
                              end),
 
     io:format("~s~n", [string:join([Exchanges, Queues, Bindings], "\n")]),
+    bql_client:close(Client),
+    
     init:stop().
 
 stop() ->
     ok.
 
-execute_block(Contents, Formatter) ->
-    case rpc:call(bql_utils:makenode("rabbit"), bql_server, send_command,
-                  [<<"guest">>, <<"guest">>, <<"text/bql">>, Contents]) of	
+execute_block(Client, Contents, Formatter) ->
+    case bql_client:execute(Client, Contents) of	
         {ok, Result}    -> format(Result, Formatter);
         {error, Reason} -> io:format("BQL execution failed:~n  ~s~n", [Reason])
     end.

src/bql_shell.erl

+%%   The contents of this file are subject to the Mozilla Public License
+%%   Version 1.1 (the "License"); you may not use this file except in
+%%   compliance with the License. You may obtain a copy of the License at
+%%   http://www.mozilla.org/MPL/
+%%
+%%   Software distributed under the License is distributed on an "AS IS"
+%%   basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%%   License for the specific language governing rights and limitations
+%%   under the License.
+%%
+%%   The Original Code is RabbitMQ BQL Plugin.
+%%
+%%   The Initial Developers of the Original Code are LShift Ltd.
+%%
+%%   Copyright (C) 2009 LShift Ltd.
+%%
+%%   All Rights Reserved.
+%%
+%%   Contributor(s): ______________________________________.
+%%
+-module(bql_shell).
+
+% Client application for executing BQL commands.
+
+-export([start/0, stop/0]).
+
+-include_lib("amqp_client/include/amqp_client.hrl").
+
+% Record defining the context in which BQL commands are executed
+-record(client_ctx, {username, password, vhost}).
+
+start() ->
+    Client = bql_client:connect(),
+
+    case init:get_argument(execute) of
+      error ->
+         execute_shell(Client),
+         halt();
+      {ok, BQL} ->
+         case apply_bql_file(Client, BQL) of
+           ok    -> halt();
+           error -> halt(1)
+         end;
+      _ ->
+         io:fwrite("Too many arguments supplied. Provide a BQL file that should be applied.~n"),
+         halt()
+    end.
+
+stop() ->
+    ok.
+
+execute_shell(Client) ->
+    case run_command(Client) of
+        exit -> ok;
+        _    -> execute_shell(Client)
+    end.
+
+run_command(Client) ->
+    Line = io:get_line("BQL> "),
+    case Line of
+        eof      -> exit;
+        "exit\n" -> exit;
+        _        -> execute_block(Client, Line), ok
+    end.
+      
+
+apply_bql_file(Client, BQL) ->
+    case filelib:is_file(BQL) of
+        false ->
+            io:fwrite("Provided BQL file does not exist!~n"),
+            error;
+        true ->
+            {ok, Contents} = file:read_file(BQL),
+            execute_block(Client, binary_to_list(Contents))
+    end.
+
+execute_block(Client, Contents) ->
+    case bql_client:execute(Client, Contents) of	
+        {ok, Result}    -> format_result(Result);
+        {error, Reason} -> io:format("BQL execution failed:~n  ~s~n", [Reason])
+    end.
+
+format_result(Result) ->
+    [format_result_block(Item) || Item <- Result],
+    ok.
+
+format_result_block({Headers, Rows}) when is_list(Headers), is_list(Rows) ->
+    %% Convert the content of all the rows to strings
+    StringifiedRows = [[bql_utils:convert_to_string(Cell) || Cell <- Row] || Row <- Rows],
+
+    %% Work through the items and headers, and find the longest item
+    CountedHeaders = lists:zip(Headers, lists:seq(1, length(Headers))),
+    Widths = [measure_column(Header, Position, StringifiedRows) || {Header, Position} <- CountedHeaders],
+
+    %% Output the header then inside dividers
+    Divider = ["-" || _ <- lists:seq(1, lists:sum(Widths) + 3*length(Widths) + 1)] ++ "~n",
+    io:fwrite(Divider),
+    output_row([atom_to_list(H) || H <- Headers], Widths),
+    io:fwrite(Divider),
+
+    [output_row(Row, Widths) || Row <- StringifiedRows],
+    io:fwrite("~n"),
+    ok;
+format_result_block(Result) ->
+    io:format("~p~n", [Result]),
+    ok.
+
+measure_column(Header, Position, Items) ->
+    lists:max([length(X) || X <- [atom_to_list(Header)] ++ [lists:nth(Position, Row) || Row <- Items]]).
+output_row(Items, Widths) ->
+    WidthItems = lists:zip(Items, Widths),
+    [io:format("| ~s ", [widen(Item, Width)]) || {Item, Width} <- WidthItems],
+    io:fwrite("|~n").
+
+widen(Item, Width) ->
+    Extra = Width - length(Item),
+    case Extra of
+        0 -> Item;
+        _ -> Item ++ [" " || _ <- lists:seq(1, Extra)]
+    end.

src/bql_utils.erl

 %%
 -module(bql_utils).
 
--export([convert_to_string/1, makenode/1]).
+-export([convert_to_string/1, makenode/1, argument_or_default/2]).
 
 convert_to_string(Value) when is_list(Value) ->
     Value;
                             {Prefix, Suffix};
         {Prefix, Suffix} -> {Prefix, tl(Suffix)}
     end.
+    
+argument_or_default(Flag, Default) ->
+    case init:get_argument(Flag) of
+        {ok, [[Val]]} -> Val;
+        _ -> Default
+    end.

test/bql_amqp_rpc_client.erl

-%%   The contents of this file are subject to the Mozilla Public License
-%%   Version 1.1 (the "License"); you may not use this file except in
-%%   compliance with the License. You may obtain a copy of the License at
-%%   http://www.mozilla.org/MPL/
-%%
-%%   Software distributed under the License is distributed on an "AS IS"
-%%   basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
-%%   License for the specific language governing rights and limitations
-%%   under the License.
-%%
-%%   The Original Code is the RabbitMQ Erlang Client.
-%%
-%%   The Initial Developers of the Original Code are LShift Ltd.,
-%%   Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd.
-%%
-%%   Portions created by LShift Ltd., Cohesive Financial
-%%   Technologies LLC., and Rabbit Technologies Ltd. are Copyright (C)
-%%   2007 LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit
-%%   Technologies Ltd.;
-%%
-%%   All Rights Reserved.
-%%
-%%   Contributor(s): Ben Hood <0x6e6562@gmail.com>.
-%%
-
-%% @doc This module allows the simple execution of an asynchronous RPC over 
-%% AMQP. It frees a client programmer of the necessary having to AMQP
-%% plumbing. Note that the this module does not handle any data encoding,
-%% so it is up to the caller to marshall and unmarshall message payloads 
-%% accordingly.
--module(bql_amqp_rpc_client).
-
--include_lib("amqp_client/include/amqp_client.hrl").
-
--behaviour(gen_server).
-
--export([start/2, stop/1]).
--export([call/4]).
--export([init/1, terminate/2, code_change/3, handle_call/3,
-         handle_cast/2, handle_info/2]).
-
--record(rpc_c_state, {channel,
-                      reply_queue,
-                      exchange,
-                      routing_key,
-                      continuations = dict:new(),
-                      correlation_id = 0}).
-
-%%--------------------------------------------------------------------------
-%% API
-%%--------------------------------------------------------------------------
-
-%% @spec (Connection, Queue) -> RpcClient
-%% where
-%%      Connection = pid()
-%%      Queue = binary()
-%%      RpcClient = pid()
-%% @doc Starts a new RPC client instance that sends requests to a
-%% specified queue. This function returns the pid of the RPC client process
-%% that can be used to invoke RPCs and stop the client.
-start(Connection, Queue) ->
-    {ok, Pid} = gen_server:start(?MODULE, [Connection, Queue], []),
-    Pid.
-
-%% @spec (RpcClient) -> ok
-%% where
-%%      RpcClient = pid()
-%% @doc Stops an exisiting RPC client.
-stop(Pid) ->
-    gen_server:call(Pid, stop, infinity).
-
-%% @spec (RpcClient, Payload) -> ok
-%% where
-%%      RpcClient = pid()
-%%      Payload = binary()
-%% @doc Invokes an RPC. Note the caller of this function is responsible for
-%% encoding the request and decoding the response.
-call(RpcClient, ContentType, Payload, Timeout) ->
-    gen_server:call(RpcClient, {call, ContentType, Payload}, Timeout).
-
-%%--------------------------------------------------------------------------
-%% Plumbing
-%%--------------------------------------------------------------------------
-
-%% Sets up a reply queue for this client to listen on
-setup_reply_queue(State = #rpc_c_state{channel = Channel}) ->
-    #'queue.declare_ok'{queue = Q} =
-        amqp_channel:call(Channel, #'queue.declare'{}),
-    State#rpc_c_state{reply_queue = Q}.
-
-%% Registers this RPC client instance as a consumer to handle rpc responses
-setup_consumer(#rpc_c_state{channel = Channel,
-                            reply_queue = Q}) ->
-    amqp_channel:call(Channel, #'basic.consume'{queue = Q}).
-
-%% Publishes to the broker, stores the From address against
-%% the correlation id and increments the correlationid for
-%% the next request
-publish(ContentType, Payload, From,
-        State = #rpc_c_state{channel = Channel,
-                             reply_queue = Q,
-                             exchange = X,
-                             routing_key = RoutingKey,
-                             correlation_id = CorrelationId,
-                             continuations = Continuations}) ->
-    Props = #'P_basic'{correlation_id = <<CorrelationId:64>>,
-                       content_type = ContentType,
-                       reply_to = Q},
-    Publish = #'basic.publish'{exchange = X,
-                               routing_key = RoutingKey,
-                               mandatory = true},
-    amqp_channel:call(Channel, Publish, #amqp_msg{props = Props,
-                                                  payload = Payload}),
-    State#rpc_c_state{correlation_id = CorrelationId + 1,
-                      continuations =
-                          dict:store(CorrelationId, From, Continuations)}.
-
-%%--------------------------------------------------------------------------
-%% gen_server callbacks
-%%--------------------------------------------------------------------------
-
-%% Sets up a reply queue and consumer within an existing channel
-%% @private
-init([Connection, RoutingKey]) ->
-    Channel = amqp_connection:open_channel(Connection),
-    InitialState = #rpc_c_state{channel = Channel,
-                                exchange = <<>>,
-                                routing_key = RoutingKey},
-    State = setup_reply_queue(InitialState),
-    setup_consumer(State),
-    {ok, State}.
-
-%% Closes the channel this gen_server instance started
-%% @private
-terminate(_Reason, #rpc_c_state{channel = Channel}) ->
-    amqp_channel:close(Channel),
-    ok.
-
-%% Handle the application initiated stop by just stopping this gen server
-%% @private
-handle_call(stop, _From, State) ->
-    {stop, normal, ok, State};
-
-%% @private
-handle_call({call, ContentType, Payload}, From, State) ->
-    NewState = publish(ContentType, Payload, From, State),
-    {noreply, NewState}.
-
-%% @private
-handle_cast(_Msg, State) ->
-    {noreply, State}.
-
-%% @private
-handle_info(#'basic.consume_ok'{}, State) ->
-    {noreply, State};
-
-%% @private
-handle_info(#'basic.cancel_ok'{}, State) ->
-    {stop, normal, State};
-
-%% @private
-handle_info({#'basic.deliver'{},
-             #amqp_msg{props = #'P_basic'{correlation_id = <<Id:64>>},
-                       payload = Payload}},
-            State = #rpc_c_state{continuations = Conts}) ->
-    From = dict:fetch(Id, Conts),
-    gen_server:reply(From, Payload),
-    {noreply, State#rpc_c_state{continuations = dict:erase(Id, Conts) }}.
-
-%% @private
-code_change(_OldVsn, State, _Extra) ->
-    State.

test/bql_client_test.erl

+%%   The contents of this file are subject to the Mozilla Public License
+%%   Version 1.1 (the "License"); you may not use this file except in
+%%   compliance with the License. You may obtain a copy of the License at
+%%   http://www.mozilla.org/MPL/
+%%
+%%   Software distributed under the License is distributed on an "AS IS"
+%%   basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%%   License for the specific language governing rights and limitations
+%%   under the License.
+%%
+%%   The Original Code is RabbitMQ BQL Plugin.
+%%
+%%   The Initial Developers of the Original Code are LShift Ltd.
+%%
+%%   Copyright (C) 2009 LShift Ltd.
+%%
+%%   All Rights Reserved.
+%%
+%%   Contributor(s): ______________________________________.
+%%
+-module(bql_client_test).
+
+-include_lib("amqp_client/include/amqp_client.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+submit_create_command_test() ->
+    Response = send_request("create exchange myexchange;"),
+    ?assertEqual({ok, [ok]}, Response).
+
+submit_query_test() ->
+    Response = send_request("select * from vhosts where name='/';"),
+    ?assertEqual({ok, [{[name], [["/"]]}]}, Response).
+
+submit_badly_formatted_query_test() ->
+    Response = send_request("create invalidexchange myexchange;"),
+    ?assertEqual({error, "syntax error before: \"invalidexchange\" on line 1"}, Response).
+
+submit_query_against_non_existant_object_test() ->
+    Response = send_request("select * from something;"),
+    ?assertEqual({ok, ["Unknown entity something specified to query"]}, Response).
+
+send_request(Content) ->
+    Client = bql_client:connect("localhost", ?PROTOCOL_PORT, <<"guest">>, <<"guest">>, <<"/">>),
+    Res = bql_client:execute(Client, Content),
+    bql_client:close(Client),
+    Res.