Commits

Anonymous committed 71d37e0

Rename bql_client to bql_shell; communicate with BERT via AMQP

Comments (0)

Files changed (8)

 GENERATED_SOURCES=command_lexer command_parser
 EXTRA_PACKAGE_DIRS=scripts
 TEST_APPS=amqp_client rabbitmq_bql
-TEST_COMMANDS=command_parser_test:test() bql_test:test() amq_interface_test:test()
+TEST_COMMANDS=command_parser_test:test() bql_test:test() amq_interface_test:test() bql_client_test:test()
 START_RABBIT_IN_TESTS=true
 
 include ../include.mk
 
 exec $RLWRAP erl \
     -pa "`dirname $0`/../ebin" \
+    -pa "`dirname $0`/../build/deps/amqp_client/ebin" \
+    -pa "`dirname $0`/../build/deps/rabbit_common/ebin" \
+    -pa "`dirname $0`/../build/deps/rfc4627_jsonrpc/ebin" \
     -noshell \
     -hidden \
     -sname amqpbql$$ \
-    -s bql_client \
+    -s bql_shell \
     $@

src/bql_amqp_rpc_client.erl

+%%   The contents of this file are subject to the Mozilla Public License
+%%   Version 1.1 (the "License"); you may not use this file except in
+%%   compliance with the License. You may obtain a copy of the License at
+%%   http://www.mozilla.org/MPL/
+%%
+%%   Software distributed under the License is distributed on an "AS IS"
+%%   basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%%   License for the specific language governing rights and limitations
+%%   under the License.
+%%
+%%   The Original Code is the RabbitMQ Erlang Client.
+%%
+%%   The Initial Developers of the Original Code are LShift Ltd.,
+%%   Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd.
+%%
+%%   Portions created by LShift Ltd., Cohesive Financial
+%%   Technologies LLC., and Rabbit Technologies Ltd. are Copyright (C)
+%%   2007 LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit
+%%   Technologies Ltd.;
+%%
+%%   All Rights Reserved.
+%%
+%%   Contributor(s): Ben Hood <0x6e6562@gmail.com>.
+%%
+
+%% @doc This module allows the simple execution of an asynchronous RPC over 
+%% AMQP. It frees a client programmer of the necessary having to AMQP
+%% plumbing. Note that the this module does not handle any data encoding,
+%% so it is up to the caller to marshall and unmarshall message payloads 
+%% accordingly.
+-module(bql_amqp_rpc_client).
+
+-include_lib("amqp_client/include/amqp_client.hrl").
+
+-behaviour(gen_server).
+
+-export([start/2, stop/1]).
+-export([call/4]).
+-export([init/1, terminate/2, code_change/3, handle_call/3,
+         handle_cast/2, handle_info/2]).
+
+-record(rpc_c_state, {channel,
+                      reply_queue,
+                      exchange,
+                      routing_key,
+                      continuations = dict:new(),
+                      correlation_id = 0}).
+
+%%--------------------------------------------------------------------------
+%% API
+%%--------------------------------------------------------------------------
+
+%% @spec (Connection, Queue) -> RpcClient
+%% where
+%%      Connection = pid()
+%%      Queue = binary()
+%%      RpcClient = pid()
+%% @doc Starts a new RPC client instance that sends requests to a
+%% specified queue. This function returns the pid of the RPC client process
+%% that can be used to invoke RPCs and stop the client.
+start(Connection, Queue) ->
+    {ok, Pid} = gen_server:start(?MODULE, [Connection, Queue], []),
+    Pid.
+
+%% @spec (RpcClient) -> ok
+%% where
+%%      RpcClient = pid()
+%% @doc Stops an exisiting RPC client.
+stop(Pid) ->
+    gen_server:call(Pid, stop, infinity).
+
+%% @spec (RpcClient, Payload) -> ok
+%% where
+%%      RpcClient = pid()
+%%      Payload = binary()
+%% @doc Invokes an RPC. Note the caller of this function is responsible for
+%% encoding the request and decoding the response.
+call(RpcClient, ContentType, Payload, Timeout) ->
+    gen_server:call(RpcClient, {call, ContentType, Payload}, Timeout).
+
+%%--------------------------------------------------------------------------
+%% Plumbing
+%%--------------------------------------------------------------------------
+
+%% Sets up a reply queue for this client to listen on
+setup_reply_queue(State = #rpc_c_state{channel = Channel}) ->
+    #'queue.declare_ok'{queue = Q} =
+        amqp_channel:call(Channel, #'queue.declare'{}),
+    State#rpc_c_state{reply_queue = Q}.
+
+%% Registers this RPC client instance as a consumer to handle rpc responses
+setup_consumer(#rpc_c_state{channel = Channel,
+                            reply_queue = Q}) ->
+    amqp_channel:call(Channel, #'basic.consume'{queue = Q}).
+
+%% Publishes to the broker, stores the From address against
+%% the correlation id and increments the correlationid for
+%% the next request
+publish(ContentType, Payload, From,
+        State = #rpc_c_state{channel = Channel,
+                             reply_queue = Q,
+                             exchange = X,
+                             routing_key = RoutingKey,
+                             correlation_id = CorrelationId,
+                             continuations = Continuations}) ->
+    Props = #'P_basic'{correlation_id = <<CorrelationId:64>>,
+                       content_type = ContentType,
+                       reply_to = Q},
+    Publish = #'basic.publish'{exchange = X,
+                               routing_key = RoutingKey,
+                               mandatory = true},
+    amqp_channel:call(Channel, Publish, #amqp_msg{props = Props,
+                                                  payload = Payload}),
+    State#rpc_c_state{correlation_id = CorrelationId + 1,
+                      continuations =
+                          dict:store(CorrelationId, From, Continuations)}.
+
+%%--------------------------------------------------------------------------
+%% gen_server callbacks
+%%--------------------------------------------------------------------------
+
+%% Sets up a reply queue and consumer within an existing channel
+%% @private
+init([Connection, RoutingKey]) ->
+    Channel = amqp_connection:open_channel(Connection),
+    InitialState = #rpc_c_state{channel = Channel,
+                                exchange = <<>>,
+                                routing_key = RoutingKey},
+    State = setup_reply_queue(InitialState),
+    setup_consumer(State),
+    {ok, State}.
+
+%% Closes the channel this gen_server instance started
+%% @private
+terminate(_Reason, #rpc_c_state{channel = Channel}) ->
+    amqp_channel:close(Channel),
+    ok.
+
+%% Handle the application initiated stop by just stopping this gen server
+%% @private
+handle_call(stop, _From, State) ->
+    {stop, normal, ok, State};
+
+%% @private
+handle_call({call, ContentType, Payload}, From, State) ->
+    NewState = publish(ContentType, Payload, From, State),
+    {noreply, NewState}.
+
+%% @private
+handle_cast(_Msg, State) ->
+    {noreply, State}.
+
+%% @private
+handle_info(#'basic.consume_ok'{}, State) ->
+    {noreply, State};
+
+%% @private
+handle_info(#'basic.cancel_ok'{}, State) ->
+    {stop, normal, State};
+
+%% @private
+handle_info({#'basic.deliver'{},
+             #amqp_msg{props = #'P_basic'{correlation_id = <<Id:64>>},
+                       payload = Payload}},
+            State = #rpc_c_state{continuations = Conts}) ->
+    From = dict:fetch(Id, Conts),
+    gen_server:reply(From, Payload),
+    {noreply, State#rpc_c_state{continuations = dict:erase(Id, Conts) }}.
+
+%% @private
+code_change(_OldVsn, State, _Extra) ->
+    State.

src/bql_amqp_rpc_server.erl

 handle_info({#'basic.deliver' { 'delivery_tag' = DeliveryTag },
              #amqp_msg{props = Props, payload = Payload }},
             State = #state { channel = Ch }) ->
-    #'P_basic'{correlation_id = CorrelationId, reply_to = Q} = Props,
+    #'P_basic'{correlation_id = CorrelationId, reply_to = Q, content_type = ContentType} = Props,
     try
-      ResponseObj = case rfc4627:decode(Payload) of
-        {ok, RequestObj, _Rest} ->
-          case rfc4627:get_field(RequestObj, "query") of 
-            {ok, Query} ->
-              case bql_server:send_command(<<"guest">>, <<"guest">>, <<"/">>, <<"text/bql">>, binary_to_list(Query)) of
-                {ok, Result} ->
-                  {obj, [{"success", true}, {"messages", format_result(Result)}]};
-                {error, Reason} ->
-                  {obj, [{"success", false}, {"message", list_to_binary(Reason)}]}
-              end;
-            _ ->
-              {obj, [{"success", false}, {"message", <<"Invalid request - no query attribute">>}]}
-          end;
-        {error, _Reason} ->
-          {obj, [{"success", false}, {"message", <<"Invalid JSON in Query">>}]}
+      Result = case decode_request(ContentType, Payload) of
+        {ok, User, Password, VHost, Query} ->
+          bql_server:send_command(User, Password, VHost, <<"text/bql">>, Query);
+        {error, Reason} -> {error, Reason}
       end,
-
+      Response = encode_result(ContentType, Result),
+      
       Properties = #'P_basic'{correlation_id = CorrelationId},
       amqp_channel:call(Ch, #'basic.publish'{exchange = <<>>, routing_key = Q}, 
-                        #amqp_msg{payload=rfc4627:encode(ResponseObj), props = Properties})
+                        #amqp_msg{payload = Response, props = Properties})
     catch
       Tag:Error -> io:fwrite("Caught error: ~p,~p,~p~n", [Tag, Error,
                               erlang:get_stacktrace()])
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
 
+decode_request(<<"application/json">>, Payload) ->
+	case rfc4627:decode(Payload) of
+    {ok, RequestObj, _Rest} ->
+      case rfc4627:get_field(RequestObj, "query") of 
+        {ok, Query} ->
+			    {ok, 
+			      get_field_or_default(RequestObj, "user", <<"guest">>), 
+			      get_field_or_default(RequestObj, "password", <<"guest">>), 
+			      get_field_or_default(RequestObj, "vhost", <<"/">>), 
+			      binary_to_list(Query)};
+        _ ->
+          {error, <<"Invalid request - no query attribute">>}
+      end;
+    {error, _Reason} ->
+      {error, <<"Invalid JSON in Query">>}
+  end;
+decode_request(<<"application/bert">>, Payload) ->
+  Request = binary_to_term(Payload),
+  {ok, 
+    proplists:get_value(user, Request, <<"guest">>),
+    proplists:get_value(password, Request, <<"guest">>),
+    proplists:get_value(vhost, Request, <<"/">>),
+    proplists:get_value(query_text, Request, "")}.
+  
+encode_result(<<"application/json">>, Result) ->
+  ResponseObj = case Result of
+    {ok, ResultEls} ->
+      {obj, [{"success", true}, {"messages", format_result(ResultEls)}]};
+    {error, Reason} ->
+      {obj, [{"success", false}, {"message", list_to_binary(Reason)}]}
+  end,
+  
+  list_to_binary(rfc4627:encode(ResponseObj));
+encode_result(<<"application/bert">>, Result) ->
+  term_to_binary(Result).
+
+get_field_or_default(Obj, Name, Default) ->
+  case rfc4627:get_field(Obj, Name) of 
+    {ok, Val} -> Val;
+    _ -> Default
+  end.
+
 format_result(Result) ->
     [format_result_entry(E) || E <- Result].
 

src/bql_client.erl

 %%
 -module(bql_client).
 
-% Client application for executing BDL commands.
+-export([connect/5, close/1, execute/2]).
 
--export([start/0, stop/0]).
+-include_lib("amqp_client/include/amqp_client.hrl").
 
 % Record defining the context in which BQL commands are executed
--record(client_ctx, {username, password, vhost}).
+-record(client_ctx, {username, password, vhost, connection, rpc_client}).
 
-start() ->
-    Username = list_to_binary(argument_or_default(username, "guest")),
-    Password = list_to_binary(argument_or_default(password, "guest")),
-    VHost = list_to_binary(argument_or_default(vhost, "/")),
-    ClientContext = #client_ctx{username = Username, password = Password, vhost = VHost},
-
-    case init:get_argument(execute) of
-      error ->
-         execute_shell(ClientContext),
-         halt();
-      {ok, BQL} ->
-         case apply_bql_file(ClientContext, BQL) of
-           ok    -> halt();
-           error -> halt(1)
-         end;
-      _ ->
-         io:fwrite("Too many arguments supplied. Provide a BQL file that should be applied.~n"),
-         halt()
-    end.
-
-stop() ->
-    ok.
-
-argument_or_default(Flag, Default) ->
-  case init:get_argument(Flag) of
-    {ok, [[Val]]} -> Val;
-    _ -> Default
-  end.
-
-execute_shell(ClientContext) ->
-    case run_command(ClientContext) of
-        exit -> ok;
-        _    -> execute_shell(ClientContext)
-    end.
-
-run_command(ClientContext) ->
-    Line = io:get_line("BQL> "),
-    case Line of
-        eof      -> exit;
-        "exit\n" -> exit;
-        _        -> execute_block(ClientContext, Line), ok
-    end.
-      
-
-apply_bql_file(ClientContext, BQL) ->
-    case filelib:is_file(BQL) of
-        false ->
-            io:fwrite("Provided BQL file does not exist!~n"),
-            error;
-        true ->
-            {ok, Contents} = file:read_file(BQL),
-            execute_block(ClientContext, binary_to_list(Contents))
-    end.
-
-execute_block(#client_ctx { username = User, password = Password, vhost = VHost }, Contents) ->
-    case rpc:call(bql_utils:makenode("rabbit"), bql_server, send_command, 
-                    [User, Password, VHost, <<"text/bql">>, Contents]) of	
-        {ok, Result}    -> format_result(Result);
-        {error, Reason} -> io:format("BQL execution failed:~n  ~s~n", [Reason])
-    end.
-
-format_result(Result) ->
-    [format_result_block(Item) || Item <- Result],
-    ok.
-
-format_result_block({Headers, Rows}) when is_list(Headers), is_list(Rows) ->
-    %% Convert the content of all the rows to strings
-    StringifiedRows = [[bql_utils:convert_to_string(Cell) || Cell <- Row] || Row <- Rows],
-
-    %% Work through the items and headers, and find the longest item
-    CountedHeaders = lists:zip(Headers, lists:seq(1, length(Headers))),
-    Widths = [measure_column(Header, Position, StringifiedRows) || {Header, Position} <- CountedHeaders],
-
-    %% Output the header then inside dividers
-    Divider = ["-" || _ <- lists:seq(1, lists:sum(Widths) + 3*length(Widths) + 1)] ++ "~n",
-    io:fwrite(Divider),
-    output_row([atom_to_list(H) || H <- Headers], Widths),
-    io:fwrite(Divider),
-
-    [output_row(Row, Widths) || Row <- StringifiedRows],
-    io:fwrite("~n"),
-    ok;
-format_result_block(Result) ->
-    io:format("~p~n", [Result]),
-    ok.
-
-measure_column(Header, Position, Items) ->
-    lists:max([length(X) || X <- [atom_to_list(Header)] ++ [lists:nth(Position, Row) || Row <- Items]]).
-output_row(Items, Widths) ->
-    WidthItems = lists:zip(Items, Widths),
-    [io:format("| ~s ", [widen(Item, Width)]) || {Item, Width} <- WidthItems],
-    io:fwrite("|~n").
-
-widen(Item, Width) ->
-    Extra = Width - length(Item),
-    case Extra of
-        0 -> Item;
-        _ -> Item ++ [" " || _ <- lists:seq(1, Extra)]
-    end.
+%% Creates a connection to the Rabbit server that can subsequently be used
+%% to issue BQL requests.
+connect(Host, Port, Username, Password, VHost) ->
+	% Open a conenction and wire a RPC client to it
+	Connection = amqp_connection:start_network(#amqp_params{
+		  username     = Username,
+	    password     = Password,
+	    virtual_host = <<"/">>,  %% bql.query is in the default vhost
+	    host         = Host,
+	    port         = Port}),
+	Client = bql_amqp_rpc_client:start(Connection, <<"bql.query">>),
+	#client_ctx{username = Username, password = Password, vhost = VHost,
+				connection = Connection, rpc_client = Client}.
+				
+%% Disconnects the BQL client and frees up any resources associated with it
+close(#client_ctx { connection = Connection, rpc_client = Client }) ->
+  bql_amqp_rpc_client:stop(Client),
+  amqp_connection:close(Connection).
+				
+%% Executes the given BQL request on the connected server
+execute(#client_ctx { username = User, password = Password, vhost = VHost, rpc_client = Client }, Contents) ->
+  Request = [{user, User}, {password, Password}, {vhost, VHost}, {query_text, Contents}],
+	Res = bql_amqp_rpc_client:call(Client, <<"application/bert">>, term_to_binary(Request), 500),
+	binary_to_term(Res).

src/bql_shell.erl

+%%   The contents of this file are subject to the Mozilla Public License
+%%   Version 1.1 (the "License"); you may not use this file except in
+%%   compliance with the License. You may obtain a copy of the License at
+%%   http://www.mozilla.org/MPL/
+%%
+%%   Software distributed under the License is distributed on an "AS IS"
+%%   basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%%   License for the specific language governing rights and limitations
+%%   under the License.
+%%
+%%   The Original Code is RabbitMQ BQL Plugin.
+%%
+%%   The Initial Developers of the Original Code are LShift Ltd.
+%%
+%%   Copyright (C) 2009 LShift Ltd.
+%%
+%%   All Rights Reserved.
+%%
+%%   Contributor(s): ______________________________________.
+%%
+-module(bql_shell).
+
+% Client application for executing BQL commands.
+
+-export([start/0, stop/0]).
+
+-include_lib("amqp_client/include/amqp_client.hrl").
+
+% Record defining the context in which BQL commands are executed
+-record(client_ctx, {username, password, vhost}).
+
+start() ->
+    Username = list_to_binary(argument_or_default(username, "guest")),
+    Password = list_to_binary(argument_or_default(password, "guest")),
+    VHost = list_to_binary(argument_or_default(vhost, "/")),
+	  Host = argument_or_default(host, "localhost"),
+	  Port = argument_or_default(port, ?PROTOCOL_PORT),
+    Client = bql_client:connect(Host, Port, Username, Password, VHost),
+
+    case init:get_argument(execute) of
+      error ->
+         execute_shell(Client),
+         halt();
+      {ok, BQL} ->
+         case apply_bql_file(Client, BQL) of
+           ok    -> halt();
+           error -> halt(1)
+         end;
+      _ ->
+         io:fwrite("Too many arguments supplied. Provide a BQL file that should be applied.~n"),
+         halt()
+    end.
+
+stop() ->
+    ok.
+
+argument_or_default(Flag, Default) ->
+  case init:get_argument(Flag) of
+    {ok, [[Val]]} -> Val;
+    _ -> Default
+  end.
+
+execute_shell(Client) ->
+    case run_command(Client) of
+        exit -> ok;
+        _    -> execute_shell(Client)
+    end.
+
+run_command(Client) ->
+    Line = io:get_line("BQL> "),
+    case Line of
+        eof      -> exit;
+        "exit\n" -> exit;
+        _        -> execute_block(Client, Line), ok
+    end.
+      
+
+apply_bql_file(Client, BQL) ->
+    case filelib:is_file(BQL) of
+        false ->
+            io:fwrite("Provided BQL file does not exist!~n"),
+            error;
+        true ->
+            {ok, Contents} = file:read_file(BQL),
+            execute_block(Client, binary_to_list(Contents))
+    end.
+
+execute_block(Client, Contents) ->
+    case bql_client:execute(Client, Contents) of	
+        {ok, Result}    -> format_result(Result);
+        {error, Reason} -> io:format("BQL execution failed:~n  ~s~n", [Reason])
+    end.
+
+format_result(Result) ->
+    [format_result_block(Item) || Item <- Result],
+    ok.
+
+format_result_block({Headers, Rows}) when is_list(Headers), is_list(Rows) ->
+    %% Convert the content of all the rows to strings
+    StringifiedRows = [[bql_utils:convert_to_string(Cell) || Cell <- Row] || Row <- Rows],
+
+    %% Work through the items and headers, and find the longest item
+    CountedHeaders = lists:zip(Headers, lists:seq(1, length(Headers))),
+    Widths = [measure_column(Header, Position, StringifiedRows) || {Header, Position} <- CountedHeaders],
+
+    %% Output the header then inside dividers
+    Divider = ["-" || _ <- lists:seq(1, lists:sum(Widths) + 3*length(Widths) + 1)] ++ "~n",
+    io:fwrite(Divider),
+    output_row([atom_to_list(H) || H <- Headers], Widths),
+    io:fwrite(Divider),
+
+    [output_row(Row, Widths) || Row <- StringifiedRows],
+    io:fwrite("~n"),
+    ok;
+format_result_block(Result) ->
+    io:format("~p~n", [Result]),
+    ok.
+
+measure_column(Header, Position, Items) ->
+    lists:max([length(X) || X <- [atom_to_list(Header)] ++ [lists:nth(Position, Row) || Row <- Items]]).
+output_row(Items, Widths) ->
+    WidthItems = lists:zip(Items, Widths),
+    [io:format("| ~s ", [widen(Item, Width)]) || {Item, Width} <- WidthItems],
+    io:fwrite("|~n").
+
+widen(Item, Width) ->
+    Extra = Width - length(Item),
+    case Extra of
+        0 -> Item;
+        _ -> Item ++ [" " || _ <- lists:seq(1, Extra)]
+    end.

test/bql_amqp_rpc_client.erl

-%%   The contents of this file are subject to the Mozilla Public License
-%%   Version 1.1 (the "License"); you may not use this file except in
-%%   compliance with the License. You may obtain a copy of the License at
-%%   http://www.mozilla.org/MPL/
-%%
-%%   Software distributed under the License is distributed on an "AS IS"
-%%   basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
-%%   License for the specific language governing rights and limitations
-%%   under the License.
-%%
-%%   The Original Code is the RabbitMQ Erlang Client.
-%%
-%%   The Initial Developers of the Original Code are LShift Ltd.,
-%%   Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd.
-%%
-%%   Portions created by LShift Ltd., Cohesive Financial
-%%   Technologies LLC., and Rabbit Technologies Ltd. are Copyright (C)
-%%   2007 LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit
-%%   Technologies Ltd.;
-%%
-%%   All Rights Reserved.
-%%
-%%   Contributor(s): Ben Hood <0x6e6562@gmail.com>.
-%%
-
-%% @doc This module allows the simple execution of an asynchronous RPC over 
-%% AMQP. It frees a client programmer of the necessary having to AMQP
-%% plumbing. Note that the this module does not handle any data encoding,
-%% so it is up to the caller to marshall and unmarshall message payloads 
-%% accordingly.
--module(bql_amqp_rpc_client).
-
--include_lib("amqp_client/include/amqp_client.hrl").
-
--behaviour(gen_server).
-
--export([start/2, stop/1]).
--export([call/4]).
--export([init/1, terminate/2, code_change/3, handle_call/3,
-         handle_cast/2, handle_info/2]).
-
--record(rpc_c_state, {channel,
-                      reply_queue,
-                      exchange,
-                      routing_key,
-                      continuations = dict:new(),
-                      correlation_id = 0}).
-
-%%--------------------------------------------------------------------------
-%% API
-%%--------------------------------------------------------------------------
-
-%% @spec (Connection, Queue) -> RpcClient
-%% where
-%%      Connection = pid()
-%%      Queue = binary()
-%%      RpcClient = pid()
-%% @doc Starts a new RPC client instance that sends requests to a
-%% specified queue. This function returns the pid of the RPC client process
-%% that can be used to invoke RPCs and stop the client.
-start(Connection, Queue) ->
-    {ok, Pid} = gen_server:start(?MODULE, [Connection, Queue], []),
-    Pid.
-
-%% @spec (RpcClient) -> ok
-%% where
-%%      RpcClient = pid()
-%% @doc Stops an exisiting RPC client.
-stop(Pid) ->
-    gen_server:call(Pid, stop, infinity).
-
-%% @spec (RpcClient, Payload) -> ok
-%% where
-%%      RpcClient = pid()
-%%      Payload = binary()
-%% @doc Invokes an RPC. Note the caller of this function is responsible for
-%% encoding the request and decoding the response.
-call(RpcClient, ContentType, Payload, Timeout) ->
-    gen_server:call(RpcClient, {call, ContentType, Payload}, Timeout).
-
-%%--------------------------------------------------------------------------
-%% Plumbing
-%%--------------------------------------------------------------------------
-
-%% Sets up a reply queue for this client to listen on
-setup_reply_queue(State = #rpc_c_state{channel = Channel}) ->
-    #'queue.declare_ok'{queue = Q} =
-        amqp_channel:call(Channel, #'queue.declare'{}),
-    State#rpc_c_state{reply_queue = Q}.
-
-%% Registers this RPC client instance as a consumer to handle rpc responses
-setup_consumer(#rpc_c_state{channel = Channel,
-                            reply_queue = Q}) ->
-    amqp_channel:call(Channel, #'basic.consume'{queue = Q}).
-
-%% Publishes to the broker, stores the From address against
-%% the correlation id and increments the correlationid for
-%% the next request
-publish(ContentType, Payload, From,
-        State = #rpc_c_state{channel = Channel,
-                             reply_queue = Q,
-                             exchange = X,
-                             routing_key = RoutingKey,
-                             correlation_id = CorrelationId,
-                             continuations = Continuations}) ->
-    Props = #'P_basic'{correlation_id = <<CorrelationId:64>>,
-                       content_type = ContentType,
-                       reply_to = Q},
-    Publish = #'basic.publish'{exchange = X,
-                               routing_key = RoutingKey,
-                               mandatory = true},
-    amqp_channel:call(Channel, Publish, #amqp_msg{props = Props,
-                                                  payload = Payload}),
-    State#rpc_c_state{correlation_id = CorrelationId + 1,
-                      continuations =
-                          dict:store(CorrelationId, From, Continuations)}.
-
-%%--------------------------------------------------------------------------
-%% gen_server callbacks
-%%--------------------------------------------------------------------------
-
-%% Sets up a reply queue and consumer within an existing channel
-%% @private
-init([Connection, RoutingKey]) ->
-    Channel = amqp_connection:open_channel(Connection),
-    InitialState = #rpc_c_state{channel = Channel,
-                                exchange = <<>>,
-                                routing_key = RoutingKey},
-    State = setup_reply_queue(InitialState),
-    setup_consumer(State),
-    {ok, State}.
-
-%% Closes the channel this gen_server instance started
-%% @private
-terminate(_Reason, #rpc_c_state{channel = Channel}) ->
-    amqp_channel:close(Channel),
-    ok.
-
-%% Handle the application initiated stop by just stopping this gen server
-%% @private
-handle_call(stop, _From, State) ->
-    {stop, normal, ok, State};
-
-%% @private
-handle_call({call, ContentType, Payload}, From, State) ->
-    NewState = publish(ContentType, Payload, From, State),
-    {noreply, NewState}.
-
-%% @private
-handle_cast(_Msg, State) ->
-    {noreply, State}.
-
-%% @private
-handle_info(#'basic.consume_ok'{}, State) ->
-    {noreply, State};
-
-%% @private
-handle_info(#'basic.cancel_ok'{}, State) ->
-    {stop, normal, State};
-
-%% @private
-handle_info({#'basic.deliver'{},
-             #amqp_msg{props = #'P_basic'{correlation_id = <<Id:64>>},
-                       payload = Payload}},
-            State = #rpc_c_state{continuations = Conts}) ->
-    From = dict:fetch(Id, Conts),
-    gen_server:reply(From, Payload),
-    {noreply, State#rpc_c_state{continuations = dict:erase(Id, Conts) }}.
-
-%% @private
-code_change(_OldVsn, State, _Extra) ->
-    State.

test/bql_client_test.erl

+%%   The contents of this file are subject to the Mozilla Public License
+%%   Version 1.1 (the "License"); you may not use this file except in
+%%   compliance with the License. You may obtain a copy of the License at
+%%   http://www.mozilla.org/MPL/
+%%
+%%   Software distributed under the License is distributed on an "AS IS"
+%%   basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%%   License for the specific language governing rights and limitations
+%%   under the License.
+%%
+%%   The Original Code is RabbitMQ BQL Plugin.
+%%
+%%   The Initial Developers of the Original Code are LShift Ltd.
+%%
+%%   Copyright (C) 2009 LShift Ltd.
+%%
+%%   All Rights Reserved.
+%%
+%%   Contributor(s): ______________________________________.
+%%
+-module(bql_client_test).
+
+-include_lib("amqp_client/include/amqp_client.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+submit_create_command_test() ->
+    Response = send_request("create exchange myexchange;"),
+    ?assertEqual({ok, [ok]}, Response).
+
+submit_query_test() ->
+    Response = send_request("select * from vhosts where name='/';"),
+    ?assertEqual({ok, [{[name], [["/"]]}]}, Response).
+
+submit_badly_formatted_query_test() ->
+    Response = send_request("create invalidexchange myexchange;"),
+    ?assertEqual({error, "syntax error before: \"invalidexchange\" on line 1"}, Response).
+
+submit_query_against_non_existant_object_test() ->
+    Response = send_request("select * from something;"),
+    ?assertEqual({ok, ["Unknown entity something specified to query"]}, Response).
+
+send_request(Content) ->
+    Client = bql_client:connect("localhost", ?PROTOCOL_PORT, <<"guest">>, <<"guest">>, <<"/">>),
+    Res = bql_client:execute(Client, Content),
+    bql_client:close(Client),
+    Res.