Commits

Anonymous committed 8e5745f Merge

Junked bug22232

Comments (0)

Files changed (26)

 src/command_lexer.erl
 src/command_parser.erl
 *.dump
+dist
+tmp
+test_ebin
+
+syntax: regexp
+^build/
+^dist/
+716a66dbc612fce6dcc5f7616b4e13c87c2df88c rabbitmq_v1_7_0
-SHELL=/bin/bash
+PACKAGE=rabbitmq-bql
+DEPS=rabbitmq-server rabbitmq-erlang-client erlang-rfc4627
+GENERATED_SOURCES=command_lexer command_parser
+EXTRA_PACKAGE_DIRS=scripts
+TEST_APPS=amqp_client rabbitmq_bql
+TEST_COMMANDS=command_parser_test:test() bql_test:test() amq_interface_test:test()
+START_RABBIT_IN_TESTS=true
 
-SOURCE_DIR=src
-EBIN_DIR=ebin
-INCLUDE_DIR=include
-SOURCES=$(wildcard $(SOURCE_DIR)/*.erl)
-BEAM_TARGETS=$(patsubst $(SOURCE_DIR)/%.erl, $(EBIN_DIR)/%.beam,$(SOURCES))
-TARGETS=ebin/leex.beam src/command_lexer.erl src/command_parser.erl ebin/command_lexer.beam ebin/command_parser.beam $(BEAM_TARGETS)
+include ../include.mk
 
-ERLAMQP_BIN=$(CURDIR)/../rabbitmq-erlang-client
-RABBIT_BIN=$(CURDIR)/../rabbitmq-server
-
-INCLUDE_OPTS=-I $(INCLUDE_DIR) -I $(ERLAMQP_BIN)/include -I $(RABBIT_BIN)/include
-
-ERLC_OPTS=$(INCLUDE_OPTS) -o $(EBIN_DIR) -Wall -v +debug_info
-
-ERLC=erlc
-ERL=erl
-
-ERL_PATH_OPTS=-pa $(EBIN_DIR) -pa $(ERLAMQP_BIN)/ebin -pa $(RABBIT_BIN)/ebin -pa $(IBROWSE_BIN)/ebin
 LEXER_NAME=command_lexer
 PARSER_NAME=command_parser
 
-all: $(TARGETS)
+src/command_lexer.erl: ebin/leex.beam src/command_lexer.xrl
+	$(ERL) -I -pa ebin -noshell -eval 'ok = leex:file("$(SOURCE_DIR)/$(LEXER_NAME).xrl",[{outdir,"$(SOURCE_DIR)"}]), halt().'
 
-src/command_lexer.erl: src/command_lexer.xrl
-	$(ERL) -I -pa ebin -noshell -eval 'leex:file("$(SOURCE_DIR)/$(LEXER_NAME).xrl",[{outdir,"$(SOURCE_DIR)"}]), halt().'
-
-src/command_parser.erl: src/command_parser.yrl
-	$(ERL) -I -pa ebin -noshell -eval 'yecc:file("$(SOURCE_DIR)/$(PARSER_NAME)"), halt().'
-
-$(EBIN_DIR)/%.beam: $(SOURCE_DIR)/%.erl
-	$(ERLC) $(ERLC_OPTS) -pa $(EBIN_DIR) $<
-
-test: all
-	$(ERL) -I -pa ebin -noshell -eval 'command_parser_test:test(), halt().'
-
-clean:
-	rm -f $(EBIN_DIR)/*.beam
-	rm -f erl_crash.dump
-	rm -f src/command_lexer.erl
-	rm -f src/command_parser.erl
+src/command_parser.erl: ebin/leex.beam src/command_parser.yrl
+	$(ERL) -I -pa ebin -noshell -eval '{ok, _} = yecc:file("$(SOURCE_DIR)/$(PARSER_NAME)"), halt().'
-- Queue based access mechanism
+- Assume that the broker is running in the same VM, i.e. get rid of
+  rpc_call()
+- Drain queues to the server side
 - Accept keywords as terms in predicates
-- Use keyword allowing the vhost to be switched
+- Allow layered commands so that a thick client can submit a term list as well
+as an unparsed string (so that this works for thin clients as well) 
+  -- use content-type to demarcate what type it is, defaulting to unparsed
+     so that thin clients don't need to set this
+- Drop Erlang RPC as transport, just use AMQP to get commands in to BQL
+- Test case for draining queues
+- Add at least rudimentary support for security
+
+Refactoring:
 
 Nice to haves:
 - drop connection force where user='guest' and host='host' and port='port';
 - Runlevel changes - ie, drop broker into a maintenance mode
 - Creating checkpoints (mnesia, and queues)
 - Activate checkpoint
+
+Bugs

ebin/mod_bql.app

-{application, mod_bql,
- [{description, "mod_bql"},
-  {vsn, "0.01"},
-  {modules, [
-  ]},
-  {registered, []},
-  {mod, {bql_server, []}},
-  {env, []},
-  {applications, [kernel, stdlib, crypto]}]}.

ebin/rabbitmq_bql.app

+{application, rabbitmq_bql,
+ [{description, "RabbitMQ Broker Query Language"},
+  {vsn, "0.01"},
+  {modules, [
+    rabbitmq_bql,
+    rabbitmq_bql_sup,
+    bql_server,
+    bql_applicator
+  ]},
+  {registered, []},
+  {mod, {rabbitmq_bql, []}},
+  {env, []},
+  {applications, [kernel, stdlib, rabbit, amqp_client]}]}.
 ##   License for the specific language governing rights and limitations
 ##   under the License.
 ##
-##   The Original Code is RabbitMQ.
+##   The Original Code is RabbitMQ BQL Plugin.
 ##
-##   The Initial Developers of the Original Code are LShift Ltd,
-##   Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+##   The Initial Developers of the Original Code are LShift Ltd.
 ##
-##   Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
-##   Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
-##   are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
-##   Technologies LLC, and Rabbit Technologies Ltd.
-##
-##   Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
-##   Ltd. Portions created by Cohesive Financial Technologies LLC are
-##   Copyright (C) 2007-2009 Cohesive Financial Technologies
-##   LLC. Portions created by Rabbit Technologies Ltd are Copyright
-##   (C) 2007-2009 Rabbit Technologies Ltd.
+##   Copyright (C) 2009 LShift Ltd.
 ##
 ##   All Rights Reserved.
 ##
 ##
 
 CURDIR=`dirname $0`
-ERLAMQP_BIN=$CURDIR/../../rabbitmq-erlang-client
-RABBIT_BIN=$CURDIR/../../rabbitmq-server
+RLWRAP=`which rlwrap`
 
-exec erl \
+exec $RLWRAP erl \
     -pa "`dirname $0`/../ebin" \
-    -pa "$ERLAMQP_BIN/ebin" \
-    -pa "$RABBIT_BIN/ebin" \
     -noshell \
     -hidden \
-    ${RABBITMQ_CTL_ERL_ARGS} \
-    -sname amqpbdl$$ \
+    -sname amqpbql$$ \
     -s bql_client \
-    -extra "$@"
+    $@
+@echo off
+REM   The contents of this file are subject to the Mozilla Public License
+REM   Version 1.1 (the "License"); you may not use this file except in
+REM   compliance with the License. You may obtain a copy of the License at
+REM   http://www.mozilla.org/MPL/
+REM
+REM   Software distributed under the License is distributed on an "AS IS"
+REM   basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+REM   License for the specific language governing rights and limitations
+REM   under the License.
+REM
+REM   The Original Code is RabbitMQ BQL Plugin.
+REM
+REM   The Initial Developers of the Original Code are LShift Ltd.
+REM
+REM   Copyright (C) 2009 LShift Ltd.
+REM
+REM   All Rights Reserved.
+REM
+REM   Contributor(s): ______________________________________.
+REM
+
+if "%ERLANG_HOME%"=="" (
+    set ERLANG_HOME=%~dp0%..\..\..
+)
+
+if not exist "%ERLANG_HOME%\bin\erl.exe" (
+    echo.
+    echo ******************************
+    echo ERLANG_HOME not set correctly. 
+    echo ******************************
+    echo.
+    echo Please either set ERLANG_HOME to point to your Erlang installation or place the
+    echo RabbitMQ server distribution in the Erlang lib folder.
+    echo.
+    exit /B
+)
+
+"%ERLANG_HOME%\bin\erl.exe" -pa "%~dp0..\ebin" -noshell -hidden -sname amqpbdl -s bql_client -extra %*
+
+#!/bin/sh
+##   The contents of this file are subject to the Mozilla Public License
+##   Version 1.1 (the "License"); you may not use this file except in
+##   compliance with the License. You may obtain a copy of the License at
+##   http://www.mozilla.org/MPL/
+##
+##   Software distributed under the License is distributed on an "AS IS"
+##   basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+##   License for the specific language governing rights and limitations
+##   under the License.
+##
+##   The Original Code is RabbitMQ BQL Plugin.
+##
+##   The Initial Developers of the Original Code are LShift Ltd.
+##
+##   Copyright (C) 2009 LShift Ltd.
+##
+##   All Rights Reserved.
+##
+##   Contributor(s): ______________________________________.
+##
+
+exec erl \
+    -pa "`dirname $0`/../ebin" \
+    -noshell \
+    -hidden \
+    -sname amqpbql$$ \
+    -s bql_dump \
+    -extra "$@"

scripts/bql_dump.bat

+@echo off
+REM   The contents of this file are subject to the Mozilla Public License
+REM   Version 1.1 (the "License"); you may not use this file except in
+REM   compliance with the License. You may obtain a copy of the License at
+REM   http://www.mozilla.org/MPL/
+REM
+REM   Software distributed under the License is distributed on an "AS IS"
+REM   basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+REM   License for the specific language governing rights and limitations
+REM   under the License.
+REM
+REM   The Original Code is RabbitMQ BQL Plugin.
+REM
+REM   The Initial Developers of the Original Code are LShift Ltd.
+REM
+REM   Copyright (C) 2009 LShift Ltd.
+REM
+REM   All Rights Reserved.
+REM
+REM   Contributor(s): ______________________________________.
+REM
+
+if "%ERLANG_HOME%"=="" (
+    set ERLANG_HOME=%~dp0%..\..\..
+)
+
+if not exist "%ERLANG_HOME%\bin\erl.exe" (
+    echo.
+    echo ******************************
+    echo ERLANG_HOME not set correctly. 
+    echo ******************************
+    echo.
+    echo Please either set ERLANG_HOME to point to your Erlang installation or place the
+    echo RabbitMQ server distribution in the Erlang lib folder.
+    echo.
+    exit /B
+)
+
+"%ERLANG_HOME%\bin\erl.exe" -pa "%~dp0..\ebin" -noshell -hidden -sname amqpbql -s bql_dump -extra %*
+

src/bql_amqp_rpc_server.erl

+%%   The contents of this file are subject to the Mozilla Public License
+%%   Version 1.1 (the "License"); you may not use this file except in
+%%   compliance with the License. You may obtain a copy of the License at
+%%   http://www.mozilla.org/MPL/
+%%
+%%   Software distributed under the License is distributed on an "AS IS"
+%%   basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%%   License for the specific language governing rights and limitations
+%%   under the License.
+%%
+%%   The Original Code is RabbitMQ BQL Plugin.
+%%
+%%   The Initial Developers of the Original Code are LShift Ltd.
+%%
+%%   Copyright (C) 2009 LShift Ltd.
+%%
+%%   All Rights Reserved.
+%%
+%%   Contributor(s): ______________________________________.
+%%
+-module(bql_amqp_rpc_server).
+
+-behaviour(gen_server).
+
+-export([start_link/0]).
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
+
+-include_lib("amqp_client/include/amqp_client.hrl").
+
+-record(state, { channel }).
+
+-define(ExchangeName, <<"bql.query">>).
+-define(QueueName, <<"bql.query">>).
+
+start_link() ->
+    gen_server:start_link(?MODULE, [], []).
+
+init([]) ->
+    Connection = amqp_connection:start_direct(#amqp_params{}),
+    Ch = amqp_connection:open_channel(Connection),
+    link(Ch),
+
+    _X = amqp_channel:call(Ch, #'exchange.declare'{exchange = ?ExchangeName, durable = true}),
+    #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = ?QueueName, durable = true}),
+    _ConsumerTag = amqp_channel:call(Ch, #'basic.consume'{queue = ?QueueName}),
+    #'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{exchange = ?ExchangeName, 
+                                                             queue = ?QueueName, 
+                                                             routing_key = <<>>}),
+
+    {ok, #state { channel = Ch } }.
+
+handle_call(_,_,State) -> {reply,unhandled_call,State}.
+handle_cast(_,State) -> {reply,unhandled_cast,State}.
+
+handle_info(#'basic.consume_ok'{}, State) ->
+    {noreply, State};
+handle_info({#'basic.deliver' { 'delivery_tag' = DeliveryTag },
+             #amqp_msg{props = Props, payload = Payload }},
+            State = #state { channel = Ch }) ->
+    #'P_basic'{correlation_id = CorrelationId, reply_to = Q} = Props,
+    try
+      ResponseObj = case rfc4627:decode(Payload) of
+        {ok, RequestObj, _Rest} ->
+          case rfc4627:get_field(RequestObj, "query") of 
+            {ok, Query} ->
+              case bql_server:send_command(<<"guest">>, <<"guest">>, <<"/">>, <<"text/bql">>, binary_to_list(Query)) of
+                {ok, Result} ->
+                  {obj, [{"success", true}, {"messages", format_result(Result)}]};
+                {error, Reason} ->
+                  {obj, [{"success", false}, {"message", list_to_binary(Reason)}]}
+              end;
+            _ ->
+              {obj, [{"success", false}, {"message", <<"Invalid request - no query attribute">>}]}
+          end;
+        {error, _Reason} ->
+          {obj, [{"success", false}, {"message", <<"Invalid JSON in Query">>}]}
+      end,
+
+      Properties = #'P_basic'{correlation_id = CorrelationId},
+      amqp_channel:call(Ch, #'basic.publish'{exchange = <<>>, routing_key = Q}, 
+                        #amqp_msg{payload=rfc4627:encode(ResponseObj), props = Properties})
+    catch
+      Tag:Error -> io:fwrite("Caught error: ~p,~p,~p~n", [Tag, Error,
+                              erlang:get_stacktrace()])
+    end,
+    ok = amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag}),
+    {noreply, State};
+handle_info(shutdown, State) ->
+    {stop, channel_shutdown, State};
+handle_info(_Info, State) ->
+    {reply, unhandled_info, State}.
+
+terminate(_Reason, #state { channel = Ch }) ->
+    case is_process_alive(Ch) of
+        true -> amqp_channel:close(Ch);
+        false -> ok
+    end,
+    ok.
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+format_result(Result) ->
+    [format_result_entry(E) || E <- Result].
+
+format_result_entry(ok) ->
+    <<"ok">>;
+format_result_entry({Headers, Rows}) ->
+    [{obj, [{atom_to_list(Header), list_to_binary(bql_utils:convert_to_string(Cell))} || 
+                {Header, Cell} <- lists:zip(Headers, Row)]} || Row <- Rows];
+format_result_entry(Msg) when is_list(Msg) ->
+    list_to_binary(Msg).

src/bql_applicator.erl

 %%   License for the specific language governing rights and limitations
 %%   under the License.
 %%
-%%   The Original Code is the RabbitMQ Erlang Client.
+%%   The Original Code is RabbitMQ BQL Plugin.
 %%
-%%   The Initial Developers of the Original Code are LShift Ltd.,
-%%   Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd.
+%%   The Initial Developers of the Original Code are LShift Ltd.
 %%
-%%   Portions created by LShift Ltd., Cohesive Financial
-%%   Technologies LLC., and Rabbit Technologies Ltd. are Copyright (C)
-%%   2009 LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit
-%%   Technologies Ltd.;
+%%   Copyright (C) 2009 LShift Ltd.
 %%
 %%   All Rights Reserved.
 %%
-%%   Contributor(s): ___________________________
+%%   Contributor(s): ______________________________________.
 %%
 -module(bql_applicator).
 
--export([apply_commands/1]).
+-export([apply_commands/3]).
 
--include("rabbit.hrl").
--include("rabbit_framing.hrl").
+-include_lib("rabbit_common/include/rabbit.hrl").
+-include_lib("rabbit_common/include/rabbit_framing.hrl").
 
 -define(RPC_TIMEOUT, 30000).
 
--record(state, {ch, node}).
+-record(state, {node, user, vhost}).
 
-apply_commands(Commands) ->
-  % Create an AMQP channel so we can actually perform operations
-  Connection = lib_amqp:start_connection("localhost"),
-  ControlCh = amqp_connection:open_channel(Connection),
+apply_commands(Commands, User, VHost) ->
+    % Create a connection to the Rabbit node
+    Node = rabbit_misc:makenode("rabbit"),
 
-  % Create a connection to the Rabbit node too
-  Node = rabbit_misc:localnode(rabbit),
-
-  {ok, [catch apply_command(#state {ch = ControlCh, node = Node}, Command) || Command <- Commands]}.
-
+    {ok, [catch apply_command(Command, #state {node = Node, user = User, vhost = VHost}) 
+            || Command <- Commands]}.
+            
 % Queue Management
-apply_command(#state {ch = ControlCh}, {create_queue, Name, Durable}) ->
-  debug("create_queue(~p) (Durable: ~p)~n", [Name, Durable]),
-
-  lib_amqp:declare_queue(ControlCh, #'queue.declare'{queue = list_to_binary(Name), durable = Durable}),
-  ok;
-apply_command(#state {ch = ControlCh}, {drop_queue, Name}) ->
-  debug("delete_queue(~p)~n", [Name]),
-
-  lib_amqp:delete_queue(ControlCh, list_to_binary(Name)),
-  ok;
-apply_command(#state {ch = ControlCh}, {purge_queue, Name}) ->
-  amqp_channel:call(ControlCh, #'queue.purge'{queue = list_to_binary(Name)}),
-  ok;
+apply_command({create_queue, Name, Durable, Args}, #state {user = Username, vhost = VHost}) ->
+    QueueName = rabbit_misc:r(VHost, queue, list_to_binary(Name)),
+    ensure_resource_access(Username, QueueName, configure),
+    rabbit_amqqueue:declare(QueueName, Durable, false, Args),
+    ok;
+apply_command({drop_queue, Name}, #state {user = Username, vhost = VHost}) ->
+    QueueName = rabbit_misc:r(VHost, queue, list_to_binary(Name)),
+    ensure_resource_access(Username, QueueName, configure),
+    case rabbit_amqqueue:with(
+           QueueName,
+           fun (Q) -> rabbit_amqqueue:delete(Q, false, false) end) of
+        {ok, _Purged} ->
+            ok;
+        {error, not_found} ->
+            {error, io_lib:format("Queue ~s not found", [Name])}
+    end;
+apply_command({purge_queue, Name}, #state {user = Username, vhost = VHost}) ->
+    QueueName = rabbit_misc:r(VHost, queue, list_to_binary(Name)),
+    ensure_resource_access(Username, QueueName, read),
+    rabbit_amqqueue:with_or_die(QueueName,
+                                fun (Q) -> rabbit_amqqueue:purge(Q) end);
 
 % Exchange Management
-apply_command(#state {ch = ControlCh}, {create_exchange, Name, Type, Durable}) ->
-  debug("create_exchange(~p) (Durable: ~p)~n", [Name, Durable]),
-
-  amqp_channel:call(ControlCh, #'exchange.declare'{exchange = list_to_binary(Name),
-                                                   type = list_to_binary(atom_to_list(Type)),
-                                                   durable = Durable}),
-  ok;
-apply_command(#state {ch = ControlCh}, {drop_exchange, Name}) ->
-  debug("delete_exchange(~p)~n", [Name]),
-
-  lib_amqp:delete_exchange(ControlCh, list_to_binary(Name)),
-  ok;
+apply_command({create_exchange, Name, Type, Durable, Args}, #state {user = Username, vhost = VHost}) ->
+    CheckedType = rabbit_exchange:check_type(list_to_binary(atom_to_list(Type))),
+    ExchangeName = rabbit_misc:r(VHost, exchange, list_to_binary(Name)),
+    ensure_resource_access(Username, ExchangeName, configure),
+    X = case rabbit_exchange:lookup(ExchangeName) of
+           {ok, FoundX} -> FoundX;
+           {error, not_found} ->
+               case rabbit_misc:r_arg(VHost, exchange, Args,
+                                      <<"alternate-exchange">>) of
+                   undefined -> ok;
+                   AName     -> ensure_resource_access(Username, ExchangeName, read),
+                                ensure_resource_access(Username, AName, write),
+                                ok
+               end,
+               rabbit_exchange:declare(ExchangeName, CheckedType,
+                                       Durable, false, Args)
+       end,
+    ok = rabbit_exchange:assert_type(X, CheckedType),
+    ok;
+apply_command({drop_exchange, Name}, #state {user = Username, vhost = VHost}) ->
+    ExchangeName = rabbit_misc:r(VHost, exchange, list_to_binary(Name)),
+    ensure_resource_access(Username, ExchangeName, configure),
+    case rabbit_exchange:delete(ExchangeName, false) of
+        {error, not_found} ->
+            io_lib:format("Unknown exchange ~s", [Name]);
+        ok ->
+            ok
+    end;
 
 % User Management
-apply_command(#state {node = Node}, {create_user, Name, Password}) ->
-  debug("create_user(~p)~n", [Name]),
-
-  rpc_call(Node, rabbit_access_control, add_user, [list_to_binary(Name), list_to_binary(Password)]),
-  ok;
-apply_command(#state {node = Node}, {drop_user, Name}) ->
-  debug("delete_user(~p)~n", [Name]),
-
-  rpc_call(Node, rabbit_access_control, delete_user, [list_to_binary(Name)]),
-  ok;
+apply_command({create_user, Name, Password}, #state {node = Node}) ->
+    rpc_call(Node, rabbit_access_control, add_user, [list_to_binary(Name), list_to_binary(Password)]),
+    ok;
+apply_command({drop_user, Name}, #state {node = Node}) ->
+    rpc_call(Node, rabbit_access_control, delete_user, [list_to_binary(Name)]),
+    ok;
 
 % VHost Management
-apply_command(#state {node = Node}, {create_vhost, Name}) ->
-  debug("create_vhost(~p)~n", [Name]),
-
-  rpc_call(Node, rabbit_access_control, add_vhost, [list_to_binary(Name)]),
-  ok;
-apply_command(#state {node = Node}, {drop_vhost, Name}) ->
-  debug("delete_vhost(~p)~n", [Name]),
-
-  rpc_call(Node, rabbit_access_control, delete_vhost, [list_to_binary(Name)]),
-  ok;
+apply_command({create_vhost, Name}, #state {node = Node}) ->
+    rpc_call(Node, rabbit_access_control, add_vhost, [list_to_binary(Name)]),
+    ok;
+apply_command({drop_vhost, Name}, #state {node = Node}) ->
+    rpc_call(Node, rabbit_access_control, delete_vhost, [list_to_binary(Name)]),
+    ok;
 
 % Binding Management
-apply_command(#state {ch = ControlCh}, {create_binding, {X, Q, RoutingKey}}) ->
-  debug("create_binding(~p to ~p) (with routing key ~p)~n", [X, Q, RoutingKey]),
-
-  lib_amqp:bind_queue(ControlCh, list_to_binary(X), list_to_binary(Q), list_to_binary(RoutingKey)),
-  ok;
-apply_command(#state {ch = ControlCh}, {drop_binding, {X, Q, RoutingKey}}) ->
-  debug("unbind_queue(~p to ~p) with ~p~n", [X, Q, RoutingKey]),
-
-  lib_amqp:unbind_queue(ControlCh, list_to_binary(X), list_to_binary(Q), list_to_binary(RoutingKey)),
-  ok;
+apply_command({create_binding, {X, Q, RoutingKey}, Args}, #state {user = Username, vhost = VHost}) ->
+    binding_action(fun rabbit_exchange:add_binding/4, 
+                   list_to_binary(X), list_to_binary(Q),
+                   list_to_binary(RoutingKey), Args, Username, VHost);
+apply_command({drop_binding, {X, Q, RoutingKey}}, #state {user = Username, vhost = VHost}) ->
+    binding_action(fun rabbit_exchange:delete_binding/4, 
+                   list_to_binary(X), list_to_binary(Q),
+                   list_to_binary(RoutingKey), <<"">>, Username, VHost);
 
 % Privilege Management
-apply_command(#state {node = Node}, {grant, Privilege, Regex, User}) ->
-  PrivilegeList = expand_privilege_list(Privilege),
-  apply_privilege_list(Node, list_to_binary(User), PrivilegeList, list_to_binary(Regex));
-apply_command(#state {node = Node}, {revoke, Privilege, User}) ->
-  PrivilegeList = expand_privilege_list(Privilege),
-  apply_privilege_list(Node, list_to_binary(User), PrivilegeList, <<"">>);
+apply_command({grant, Privilege, Regex, User}, #state {node = Node, vhost = VHost}) ->
+    PrivilegeList = expand_privilege_list(Privilege),
+    apply_privilege_list(Node, list_to_binary(User), VHost, PrivilegeList, list_to_binary(Regex));
+apply_command({revoke, Privilege, User}, #state {node = Node, vhost = VHost}) ->
+    PrivilegeList = expand_privilege_list(Privilege),
+    apply_privilege_list(Node, list_to_binary(User), VHost, PrivilegeList, <<"">>);
   
 % Queries
-apply_command(#state {node = Node}, {select, "exchanges", Fields, Modifiers}) ->
-  FieldList = case Fields of
-    all -> [name, type, durable, auto_delete, arguments];
-    _   -> Fields
-  end,
-  debug("select(~p) from exchanges~n", [FieldList]),
-  Exchanges = rpc_call(Node, rabbit_exchange, info_all, [<<"/">>, FieldList]),
-  interpret_response(FieldList, Exchanges, Modifiers);
+apply_command({select, "exchanges", Fields, Modifiers}, #state {node = Node, vhost = VHost}) ->
+    AllFieldList = [name, type, durable, auto_delete, arguments],
+    FieldList = validate_fields(AllFieldList, Fields),
+    Exchanges = rpc_call(Node, rabbit_exchange, info_all, [VHost]),
+    interpret_response(AllFieldList, FieldList, Exchanges, Modifiers);
 
-apply_command(#state {node = Node}, {select, "queues", Fields, Modifiers}) ->
-  FieldList = case Fields of
-    all -> [name, durable, auto_delete, arguments, pid, messages_ready,
-            messages_unacknowledged, messages_uncommitted, messages, acks_uncommitted,
-            consumers, transactions, memory];
-    _   -> Fields
-  end,
-  debug("select(~p) from queues~n", [FieldList]),
-  Queues = rpc_call(Node, rabbit_amqqueue, info_all, [<<"/">>, FieldList]),
-  interpret_response(FieldList, Queues, Modifiers);
+apply_command({select, "queues", Fields, Modifiers}, #state {node = Node, vhost = VHost}) ->
+    AllFieldList = [name, durable, auto_delete, arguments, pid, messages_ready,
+                    messages_unacknowledged, messages_uncommitted, messages, acks_uncommitted,
+                    consumers, transactions, memory],
+    FieldList = validate_fields(AllFieldList, Fields),
+    Queues = rpc_call(Node, rabbit_amqqueue, info_all, [VHost]),
+    interpret_response(AllFieldList, FieldList, Queues, Modifiers);
 
-apply_command(#state {node = Node}, {select, "bindings", Fields, Modifiers}) ->
-  AllFieldList = [exchange_name, queue_name, routing_key, args],
-  FieldList = case Fields of
-    all -> AllFieldList;
-    _   -> Fields
-  end,
-  debug("select(~p) from bindings~n", [FieldList]),
-  Bindings = rpc_call(Node, rabbit_exchange, list_bindings, [<<"/">>]),
-  ZippedBindings = [lists:zip(AllFieldList, tuple_to_list(X)) || X <- Bindings],
-  FilteredBindings = filter_rows(FieldList, ZippedBindings),
-  interpret_response(FieldList, FilteredBindings, Modifiers);
+apply_command({select, "bindings", Fields, Modifiers}, #state {node = Node, vhost = VHost}) ->
+    AllFieldList = [exchange_name, queue_name, routing_key, args],
+    FieldList = validate_fields(AllFieldList, Fields),
+    Bindings = rpc_call(Node, rabbit_exchange, list_bindings, [VHost]),
+    interpret_response(AllFieldList, FieldList, Bindings, Modifiers);
 
-apply_command(#state {node = Node}, {select, "users", Fields, Modifiers}) ->
-  FieldList = case Fields of
-    all -> [name];
-    _   -> Fields
-  end,
-  debug("select(~p) from users~n", [FieldList]),
-  Users = case FieldList of
-    [name] ->
-        Response = rpc_call(Node, rabbit_access_control, list_users, []),
-        [[{name, binary_to_list(User)}] || User <- Response];
-    _      ->
-        % Simulate the bad argument response
-        {bad_argument, lists:last([F || F <- FieldList, not(F =:= name)])}
-  end,
-  interpret_response(FieldList, Users, Modifiers);
+apply_command({select, "users", Fields, Modifiers}, #state {node = Node}) ->
+    AllFieldList = [name],
+    FieldList = validate_fields(AllFieldList, Fields),
+    Response = rpc_call(Node, rabbit_access_control, list_users, []),
+    Users = [[binary_to_list(User)] || User <- Response],
+    interpret_response(AllFieldList, FieldList, Users, Modifiers);
 
-apply_command(#state {node = Node}, {select, "vhosts", Fields, Modifiers}) ->
-  AllFieldList = [name],
-  FieldList = case Fields of
-    all -> AllFieldList;
-    _   -> Fields
-  end,
-  debug("select(~p) from vhosts~n", [FieldList]),
-  validate_fields(AllFieldList, FieldList),
-  Users = case FieldList of
-    [name] ->
-        Response = rpc_call(Node, rabbit_access_control, list_vhosts, []),
-        [[{name, binary_to_list(User)}] || User <- Response];
-    _      ->
-        % Simulate the bad argument response
-        {bad_argument, lists:last([F || F <- FieldList, not(F =:= name)])}
-  end,
-  interpret_response(FieldList, Users, Modifiers);
+apply_command({select, "vhosts", Fields, Modifiers}, #state {node = Node}) ->
+    AllFieldList = [name],
+    FieldList = validate_fields(AllFieldList, Fields),
+    Response = rpc_call(Node, rabbit_access_control, list_vhosts, []),
+    VHosts = [[{name, binary_to_list(User)}] || User <- Response],
+    interpret_response(AllFieldList, FieldList, VHosts, Modifiers);
 
-apply_command(#state {node = Node}, {select, "permissions", Fields, Modifiers}) ->
-  AllFieldList = [username,configure_perm,write_perm,read_perm],
-  FieldList = case Fields of
-    all -> AllFieldList;
-    _   -> Fields
-  end,
-  debug("select(~p) from permissions~n", [FieldList]),
-  validate_fields(AllFieldList, FieldList),
-  Bindings = rpc_call(Node, rabbit_access_control, list_vhost_permissions, [<<"/">>]),
-  ZippedBindings = [lists:zip(AllFieldList, tuple_to_list(X)) || X <- Bindings],
-  FilteredBindings = filter_rows(FieldList, ZippedBindings),
-  interpret_response(FieldList, FilteredBindings, Modifiers);
+apply_command({select, "permissions", Fields, Modifiers}, #state {node = Node, vhost = VHost}) ->
+    AllFieldList = [username,configure_perm,write_perm,read_perm],
+    FieldList = validate_fields(AllFieldList, Fields),
+    Permissions = rpc_call(Node, rabbit_access_control, list_vhost_permissions, [VHost]),
+    interpret_response(AllFieldList, FieldList, Permissions, Modifiers);
 
-apply_command(#state {}, {select, EntityName, _, _}) ->
-  lists:flatten("Unknown entity " ++ EntityName ++ " specified to query");
+apply_command({select, "connections", Fields, Modifiers}, #state {node = Node}) ->
+    AllFieldList = [pid, address, port, peer_address, peer_port, recv_oct, recv_cnt, send_oct, send_cnt,
+                    send_pend, state, channels, user, vhost, timeout, frame_max],
+    FieldList = validate_fields(AllFieldList, Fields),
+    Connections = rpc_call(Node, rabbit_networking, connection_info_all, []),
+    interpret_response(AllFieldList, FieldList, Connections, Modifiers);
+
+% Sending Messages
+apply_command({post_message, X, RoutingKey, Msg}, #state { user = Username, vhost = VHost }) ->
+    ExchangeName = rabbit_misc:r(VHost, exchange, list_to_binary(X)),
+    ensure_resource_access(Username, ExchangeName, write),
+    Exchange = rabbit_exchange:lookup_or_die(ExchangeName),
+    Content = rabbit_basic:build_content(#'P_basic'{}, list_to_binary(Msg)),
+    Message = #basic_message{exchange_name  = ExchangeName,
+                             routing_key    = list_to_binary(RoutingKey),
+                             content        = Content,
+                             persistent_key = none},
+    {RoutingRes, _DeliveredQPids} =
+                rabbit_exchange:publish(
+                  Exchange,
+                  rabbit_basic:delivery(true, false, none, Message)),
+    case RoutingRes of
+        routed ->
+            ok;
+        unroutable ->
+            "Message was unroutable";
+        not_delivered ->
+            "Message was not able to be delivered"
+    end;
+    
+% Retreving Messages
+apply_command({retrieve_message, QName}, State = #state{}) ->
+    with_queue(fun(Q) -> poll(Q) end, QName, State);
+
+%% This drains messages to a file on the server
+apply_command({drain_queue, QName}, State = #state{}) ->
+    Fun = 
+        fun(Q) ->
+            case disk_log:open([{name, QName}, {type, halt}]) of
+                {ok, Log} -> drain_loop(Q, Log);
+                {repaired, Log, _, _} -> drain_loop(Q, Log);
+                {error, Reason} ->
+                    error_logger:error_msg("Could not open disk log for"
+                                           " queue (~p): ~p~n", [Q, Reason]),
+                    not_ok
+            end,
+            ok
+        end,
+  with_queue(Fun, QName, State);
+
+apply_command({select, EntityName, _, _}, #state {}) ->
+    lists:flatten("Unknown entity " ++ EntityName ++ " specified to query");
 
 % Catch-all  
-apply_command(_State, Unknown) ->
-  debug("Unknown command: ~p~n", [Unknown]).
+apply_command(Unknown, _State) ->
+    debug("Unknown command: ~p~n", [Unknown]).
+
+drain_loop(Q, Log) ->
+    case poll(Q) of
+        empty -> ok;
+        Payload ->
+            case disk_log:blog(Log, Payload) of
+                ok -> drain_loop(Q, Log);
+                _ -> ok
+            end
+    end,
+    disk_log:close(Log),
+    ok.
+
+with_queue(Fun, Queue, #state{node = Node, user = Username, vhost = VHost}) ->
+    QueueName = rabbit_misc:r(VHost, queue, list_to_binary(Queue)),
+    ensure_resource_access(Username, QueueName, read),
+    
+    case rpc_call(Node, rabbit_amqqueue, lookup, [QueueName]) of
+        {error, not_found} -> 
+            lists:flatten(io_lib:format("~s not found", [rabbit_misc:rs(QueueName)]));
+        {ok, Q} -> 
+            Fun(Q)
+    end.
+
+poll(Q) ->
+    case rabbit_amqqueue:basic_get(Q, self(), true) of
+        {ok, _MsgCount,
+         {_QName, _QPid, _MsgId, _Redelivered,
+          #basic_message{content = Content}}} ->
+            {_Props, Payload} = rabbit_basic:from_content(Content),
+            Payload;
+        empty ->
+            empty
+    end.
 
 % Debug Control
 debug(_Format, _Params) ->
-  ok.
-  % io:format(Format, Params).
+    ok.
+    %% io:format(Format, Params).
 
 % RPC Commands
 rpc_call(Node, Mod, Fun, Args) ->
     rpc:call(Node, Mod, Fun, Args, ?RPC_TIMEOUT).
 
 % Formatting commands
-interpret_response(FieldList, Response, {Constraints, Ordering}) ->
-  case Response of
-    {bad_argument, Field} -> lists:flatten(io_lib:format("Invalid field \"~p\" requested", [Field]));
-    _                     ->
-        FormattedResponse = [[format_response(Cell) || Cell <- Detail] || Detail <- Response],
-        ConstrainedResponse = apply_constraints(FieldList, FormattedResponse, Constraints),
-        OrderedResponse = apply_ordering(FieldList, ConstrainedResponse, Ordering),
-        {FieldList, OrderedResponse}
-  end.
+interpret_response(_, _, {bad_argument, Field}, _) ->
+    lists:flatten(io_lib:format("Invalid field \"~p\" requested", [Field]));
+interpret_response(_, _, [], _) ->
+    [];
 
-format_response({_Name, {resource, _VHost, _Type, Value}}) ->
-  binary_to_list(Value);
+interpret_response(AvailFieldList, RequestedFieldList, [RHead|_] = Response, Modifiers) when is_tuple(RHead)->
+    interpret_response(AvailFieldList, RequestedFieldList, [tuple_to_list(X) || X <- Response], Modifiers);
+
+interpret_response(AvailFieldList, RequestedFieldList, Response, {Constraints, Ordering}) ->
+    FormattedResponse = [[format_response(Cell) || Cell <- Detail] || Detail <- Response],
+    ConstrainedResponse = apply_constraints(AvailFieldList, FormattedResponse, Constraints),
+    OrderedResponse = apply_ordering(AvailFieldList, ConstrainedResponse, Ordering),
+    FilteredResponse = filter_cols(AvailFieldList, RequestedFieldList, OrderedResponse),
+    {RequestedFieldList, FilteredResponse}.
+
 format_response({_Name, Value}) ->
-  Value;
+    format_response(Value);
+format_response({resource, _VHost, _Type, Value}) ->
+    binary_to_list(Value);
 format_response(Value) ->
-  lists:flatten(io_lib:format("Unparseable: ~p", [Value])).
+    Value.
 
 % Constraints
 apply_constraints(_FieldList, Rows, none) ->
-  Rows;
+    Rows;
 
 apply_constraints(FieldList, Rows, {and_sym, Left, Right}) ->
-  LeftRows = apply_constraints(FieldList, Rows, Left),
-  RightRows = apply_constraints(FieldList, Rows, Right),
-  sets:to_list(sets:intersection(sets:from_list(LeftRows), sets:from_list(RightRows)));
+    LeftRows = apply_constraints(FieldList, Rows, Left),
+    RightRows = apply_constraints(FieldList, Rows, Right),
+    sets:to_list(sets:intersection(sets:from_list(LeftRows), sets:from_list(RightRows)));
 apply_constraints(FieldList, Rows, {or_sym, Left, Right}) ->
-  LeftRows = apply_constraints(FieldList, Rows, Left),
-  RightRows = apply_constraints(FieldList, Rows, Right),
-  sets:to_list(sets:union(sets:from_list(LeftRows), sets:from_list(RightRows)));
+    LeftRows = apply_constraints(FieldList, Rows, Left),
+    RightRows = apply_constraints(FieldList, Rows, Right),
+    sets:to_list(sets:union(sets:from_list(LeftRows), sets:from_list(RightRows)));
 
 apply_constraints(FieldList, Rows, {Constraint, Field, Value}) ->
-  FieldPositions = lists:zip(FieldList, lists:seq(1, length(FieldList))),
-  {value, {Field, FieldPosition}} =  lists:keysearch(Field, 1, FieldPositions),
-  [Row || Row <- Rows, constraint_accepts(Constraint, lists:nth(FieldPosition, Row), Value)].
+    FieldPositions = lists:zip(FieldList, lists:seq(1, length(FieldList))),
+    case lists:keysearch(Field, 1, FieldPositions) of
+        {value, {Field, FieldPosition}} -> 
+            [Row || Row <- Rows, constraint_accepts(Constraint, lists:nth(FieldPosition, Row), Value)];
+        false                           -> 
+            throw(lists:flatten(io_lib:format("Invalid field ~s specified in constraint", [Field])))
+  end.
 
 constraint_accepts(eq, Value, Expected) ->
-  bql_utils:convert_to_string(Value) =:= Expected;
+    bql_utils:convert_to_string(Value) =:= Expected;
 constraint_accepts(neq, Value, Expected) ->
-  not(bql_utils:convert_to_string(Value) =:= Expected);
+    not(bql_utils:convert_to_string(Value) =:= Expected);
 constraint_accepts(lt, Value, Expected) ->
-  {IntExpected, _Rest} = string:to_integer(Expected),
-  Value < IntExpected;
+    {IntExpected, _Rest} = string:to_integer(Expected),
+    Value < IntExpected;
 constraint_accepts(lteq, Value, Expected) ->
-  {IntExpected, _Rest} = string:to_integer(Expected),
-  Value =< IntExpected;
+    {IntExpected, _Rest} = string:to_integer(Expected),
+    Value =< IntExpected;
 constraint_accepts(gt, Value, Expected) ->
-  {IntExpected, _Rest} = string:to_integer(Expected),
-  Value > IntExpected;
+    {IntExpected, _Rest} = string:to_integer(Expected),
+    Value > IntExpected;
 constraint_accepts(gteq, Value, Expected) ->
-  {IntExpected, _Rest} = string:to_integer(Expected),
-  Value >= IntExpected;
+    {IntExpected, _Rest} = string:to_integer(Expected),
+    Value >= IntExpected;
 constraint_accepts(like, Value, Expected) ->
-  % Build the REs
-  {ok, PeriodReplaceRe} = re:compile("\\."),
-  {ok, PercentReplaceRe} = re:compile("%"),
+    % Build the REs
+    {ok, PeriodReplaceRe} = re:compile("\\."),
+    {ok, PercentReplaceRe} = re:compile("%"),
 
-  % Update the pattern
-  ProtectedPeriods = re:replace(Expected, PeriodReplaceRe, "\\\\.", [global, {return, list}]),
-  PercentagesAsWildcards = re:replace(ProtectedPeriods, PercentReplaceRe, ".*", [global, {return, list}]),
+    % Update the pattern
+    ProtectedPeriods = re:replace(Expected, PeriodReplaceRe, "\\\\.", [global, {return, list}]),
+    PercentagesAsWildcards = re:replace(ProtectedPeriods, PercentReplaceRe, ".*", [global, {return, list}]),
   
-  % Compile the user pattern
-  {ok, LikePattern} = re:compile("^" ++ PercentagesAsWildcards ++ "$"),
+    % Compile the user pattern
+    {ok, LikePattern} = re:compile("^" ++ PercentagesAsWildcards ++ "$"),
 
-  % Test if the pattern matches
-  case re:run(Value, LikePattern) of
-    {match, _} -> true;
-    nomatch -> false
-  end.
+    % Test if the pattern matches
+    case re:run(Value, LikePattern) of
+        {match, _} -> true;
+        nomatch -> false
+    end.
 
 apply_ordering(_FieldList, Rows, none) ->
-  Rows;
-apply_ordering(FieldList, Rows, {order_by, Field, Direction}) ->
-  FieldPositions = lists:zip(FieldList, lists:seq(1, length(FieldList))),
-  {value, {Field, FieldPosition}} =  lists:keysearch(Field, 1, FieldPositions),
-  lists:sort(fun(Row1, Row2) -> order_items(FieldPosition, Row1, Row2, Direction) end, Rows).
+    Rows;
+apply_ordering(FieldList, Rows, {order_by, Clauses}) ->
+    FieldPositions = lists:zip(FieldList, lists:seq(1, length(FieldList))),
+    OrderingFieldPositions = [{name_to_position(Name, FieldPositions), Direction} || {Name, Direction} <- Clauses],
+    lists:sort(fun(Row1, Row2) -> order_items(Row1, Row2, OrderingFieldPositions) end, Rows).
 
-order_items(FieldPosition, Row1, Row2, Direction) ->
-  Row1Val = lists:nth(FieldPosition, Row1),
-  Row2Val = lists:nth(FieldPosition, Row2),
-  case Direction of
-    descending -> Row1Val > Row2Val;
-    ascending  -> Row1Val < Row2Val
-  end.
+name_to_position(Field, FieldPositions) ->
+    case lists:keysearch(Field, 1, FieldPositions) of
+        {value, {Field, FieldPosition}} ->
+            FieldPosition;
+        false ->
+            throw(lists:flatten(io_lib:format("Invalid field ~s specified in ordering clause", [Field])))
+    end.
 
-filter_rows(RequiredFields, Rows) ->
-  Extract = fun(Field, Row) ->
-     {value, Cell} = lists:keysearch(Field, 1, Row),
-     Cell
-  end,
+order_items(_, _, []) ->
+    true;
+order_items(Row1, Row2, [{FieldPosition, Direction} | RestOrdering]) ->
+    Row1Val = lists:nth(FieldPosition, Row1),
+    Row2Val = lists:nth(FieldPosition, Row2),
+    case Row1Val == Row2Val of
+        true -> order_items(Row1, Row2, RestOrdering);
+        false ->
+            case Direction of
+                descending -> Row1Val > Row2Val;
+                ascending  -> Row1Val < Row2Val
+            end
+    end.
+
+filter_cols(AllFields, RequiredFields, Rows) ->
+    FieldPositions = lists:zip(AllFields, lists:seq(1, length(AllFields))),
+    Extract = 
+        fun(Field, Row) ->
+            {value, {_, Position}} = 
+                lists:keysearch(Field, 1, FieldPositions),
+            lists:nth(Position, Row)
+        end,
   [[Extract(Field, Row) || Field <- RequiredFields] || Row <- Rows].
 
 validate_fields(Available, Requested) ->
-  	AvailableSet = sets:from_list(Available),
-	RequestedSet = sets:from_list(Requested),
-	Invalid = sets:subtract(RequestedSet, AvailableSet),
-	case sets:size(Invalid) of
-		0 -> ok;
-		1 -> throw(lists:flatten(io_lib:format("The field ~p is invalid", sets:to_list(Invalid))));
-		_ -> throw(lists:flatten(io_lib:format("The fields ~p are invalid", [sets:to_list(Invalid)])))
+	case Requested of
+		all   -> Available;
+		_     ->
+  			AvailableSet = sets:from_list(Available),
+			RequestedSet = sets:from_list(Requested),
+			Invalid = sets:subtract(RequestedSet, AvailableSet),
+			case sets:size(Invalid) of
+				0 -> Requested;
+				1 -> throw(lists:flatten(io_lib:format("The field ~p is invalid", sets:to_list(Invalid))));
+				_ -> throw(lists:flatten(io_lib:format("The fields ~p are invalid", [sets:to_list(Invalid)])))
+			end
 	end.
 
 % Privilege Helpers
 expand_privilege_list(all) ->
-  [configure, read, write];
+    [configure, read, write];
 expand_privilege_list(X) ->
-  [X].
+    [X].
 
-apply_privilege_list(Node, User, PrivilegeList, Regex) ->
-  % Retrieve the old privilege structure
-  Current = retrieve_privileges(Node, User),
+apply_privilege_list(Node, User, VHost, PrivilegeList, Regex) ->
+    %% Retrieve the old privilege structure
+    Current = retrieve_privileges(Node, User, VHost),
 
-  % Update each privilege detailed in the privilege spec
-  NewPrivs = [case X of
+    %% Update each privilege detailed in the privilege spec
+    NewPrivs = [case X of
                 {PrivKey, CurVal} ->
                     case lists:member(PrivKey, PrivilegeList) of
                         true -> Regex;
                         false -> CurVal
                     end
               end || X <- Current],
-  [NewConfigure,NewWrite,NewRead] = NewPrivs,
+    [NewConfigure,NewWrite,NewRead] = NewPrivs,
 
-  % Set the permissions
-  rpc_call(Node, rabbit_access_control, set_permissions, [User, <<"/">>, NewConfigure, NewWrite, NewRead]),
-  ok.
+    % Set the permissions
+    rpc_call(Node, rabbit_access_control, set_permissions, [User, VHost, NewConfigure, NewWrite, NewRead]),
+    ok.
 
-retrieve_privileges(Node, User) ->
-  Permissions = rpc_call(Node, rabbit_access_control, list_vhost_permissions, [<<"/">>]),
-  UserPermissions = [[{configure, ConfigureRE}, {write, WriteRE}, {read, ReadRE}]
-    || {PermUser, ConfigureRE, WriteRE, ReadRE} <- Permissions, User =:= PermUser],
-  case length(UserPermissions) of
-    0 -> [{configure, <<"">>}, {write, <<"">>}, {read, <<"">>}];
-    _ -> lists:nth(1, UserPermissions)
-  end.
+retrieve_privileges(Node, User, VHost) ->
+    Permissions = rpc_call(Node, rabbit_access_control, list_vhost_permissions, [VHost]),
+    UserPermissions = [[{configure, ConfigureRE}, {write, WriteRE}, {read, ReadRE}]
+        || {PermUser, ConfigureRE, WriteRE, ReadRE} <- Permissions, User =:= PermUser],
+    case length(UserPermissions) of
+        0 -> [{configure, <<"">>}, {write, <<"">>}, {read, <<"">>}];
+        _ -> lists:nth(1, UserPermissions)
+    end.
+
+ensure_resource_access(Username, Resource, Perm) ->
+    rabbit_access_control:check_resource_access(Username, Resource, Perm).
+    
+binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, Username, VHost) ->
+    QueueName = rabbit_misc:r(VHost, queue, QueueNameBin),
+    ensure_resource_access(Username, QueueName, write),
+    ExchangeName = rabbit_misc:r(VHost, exchange, ExchangeNameBin),
+    ensure_resource_access(Username, ExchangeName, read),
+    case Fun(ExchangeName, QueueName, RoutingKey, Arguments) of
+        {error, exchange_not_found} ->
+            lists:flatten(io_lib:format("~s not found", [rabbit_misc:rs(ExchangeName)]));
+        {error, queue_not_found} ->
+            lists:flatten(io_lib:format("~s not found", [rabbit_misc:rs(QueueName)]));
+        {error, exchange_and_queue_not_found} ->
+            lists:flatten(io_lib:format("Neither ~s nor ~s exist",
+                                        [rabbit_misc:rs(ExchangeName), 
+                                         rabbit_misc:rs(QueueName)]));
+        {error, binding_not_found} ->
+            ok;
+        {error, durability_settings_incompatible} ->
+            lists:flatten(io_lib:format("Durability settings of ~s incompatible with ~s",
+                                        [rabbit_misc:rs(QueueName), 
+                                         rabbit_misc:rs(ExchangeName)]));
+        ok -> ok
+    end.

src/bql_client.erl

 %%   License for the specific language governing rights and limitations
 %%   under the License.
 %%
-%%   The Original Code is the RabbitMQ Erlang Client.
+%%   The Original Code is RabbitMQ BQL Plugin.
 %%
-%%   The Initial Developers of the Original Code are LShift Ltd.,
-%%   Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd.
+%%   The Initial Developers of the Original Code are LShift Ltd.
 %%
-%%   Portions created by LShift Ltd., Cohesive Financial
-%%   Technologies LLC., and Rabbit Technologies Ltd. are Copyright (C)
-%%   2009 LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit
-%%   Technologies Ltd.;
+%%   Copyright (C) 2009 LShift Ltd.
 %%
 %%   All Rights Reserved.
 %%
-%%   Contributor(s): ___________________________
+%%   Contributor(s): ______________________________________.
 %%
 -module(bql_client).
 
 
 -export([start/0, stop/0]).
 
+% Record defining the context in which BQL commands are executed
+-record(client_ctx, {username, password, vhost}).
+
 start() ->
-    FullCommand = init:get_plain_arguments(),
-    case FullCommand of
-      [] ->
-         execute_shell(),
+    Username = list_to_binary(argument_or_default(username, "guest")),
+    Password = list_to_binary(argument_or_default(password, "guest")),
+    VHost = list_to_binary(argument_or_default(vhost, "/")),
+    ClientContext = #client_ctx{username = Username, password = Password, vhost = VHost},
+
+    case init:get_argument(execute) of
+      error ->
+         execute_shell(ClientContext),
          halt();
-      [BQL] ->
-         case apply_bql_file(BQL) of
+      {ok, BQL} ->
+         case apply_bql_file(ClientContext, BQL) of
            ok    -> halt();
            error -> halt(1)
          end;
       _ ->
-         io:fwrite("Too many arguments supplied. Provide a BDL file that should be applied.~n"),
+         io:fwrite("Too many arguments supplied. Provide a BQL file that should be applied.~n"),
          halt()
     end.
 
 stop() ->
     ok.
 
-execute_shell() ->
-  case run_command() of
-    exit -> ok;
-    _    -> execute_shell()
+argument_or_default(Flag, Default) ->
+  case init:get_argument(Flag) of
+    {ok, [[Val]]} -> Val;
+    _ -> Default
   end.
 
-run_command() ->
-  Line = io:get_line("BQL> "),
-  case Line of
-    eof      -> exit;
-    "exit\n" -> exit;
-    _        -> execute_block(Line), ok
-  end.
+execute_shell(ClientContext) ->
+    case run_command(ClientContext) of
+        exit -> ok;
+        _    -> execute_shell(ClientContext)
+    end.
+
+run_command(ClientContext) ->
+    Line = io:get_line("BQL> "),
+    case Line of
+        eof      -> exit;
+        "exit\n" -> exit;
+        _        -> execute_block(ClientContext, Line), ok
+    end.
       
 
-apply_bql_file(BQL) ->
-  case filelib:is_file(BQL) of
-    false ->
-      io:fwrite("Provided BQL file does not exist!~n"),
-      error;
-    true ->
-      {ok, Contents} = file:read_file(BQL),
-      execute_block(binary_to_list(Contents))
-  end.
+apply_bql_file(ClientContext, BQL) ->
+    case filelib:is_file(BQL) of
+        false ->
+            io:fwrite("Provided BQL file does not exist!~n"),
+            error;
+        true ->
+            {ok, Contents} = file:read_file(BQL),
+            execute_block(ClientContext, binary_to_list(Contents))
+    end.
 
-execute_block(Contents) ->
-  case rpc:call(rabbit_misc:localnode(rabbit), bql_server, send_command, [<<"guest">>, <<"guest">>, Contents]) of	
-%  case bql_server:send_command(<<"guest">>, <<"guest">>, Contents) of
-    {ok, Result}    -> format_result(Result);
-    {error, Reason} -> io:format("BQL execution failed:~n  ~s~n", [Reason])
-  end.
+execute_block(#client_ctx { username = User, password = Password, vhost = VHost }, Contents) ->
+    case rpc:call(bql_utils:makenode("rabbit"), bql_server, send_command, 
+                    [User, Password, VHost, <<"text/bql">>, Contents]) of	
+        {ok, Result}    -> format_result(Result);
+        {error, Reason} -> io:format("BQL execution failed:~n  ~s~n", [Reason])
+    end.
 
 format_result(Result) ->
-  [format_result_block(Item) || Item <- Result],
-  ok.
+    [format_result_block(Item) || Item <- Result],
+    ok.
 
 format_result_block({Headers, Rows}) when is_list(Headers), is_list(Rows) ->
-  % Convert the content of all the rows to strings
-  StringifiedRows = [[bql_utils:convert_to_string(Cell) || Cell <- Row] || Row <- Rows],
+    %% Convert the content of all the rows to strings
+    StringifiedRows = [[bql_utils:convert_to_string(Cell) || Cell <- Row] || Row <- Rows],
 
-  % Work through the items and headers, and find the longest item
-  CountedHeaders = lists:zip(Headers, lists:seq(1, length(Headers))),
-  Widths = [measure_column(Header, Position, StringifiedRows) || {Header, Position} <- CountedHeaders],
+    %% Work through the items and headers, and find the longest item
+    CountedHeaders = lists:zip(Headers, lists:seq(1, length(Headers))),
+    Widths = [measure_column(Header, Position, StringifiedRows) || {Header, Position} <- CountedHeaders],
 
-  % Output the header then inside dividers
-  Divider = ["-" || _ <- lists:seq(1, lists:sum(Widths) + 3*length(Widths) + 1)] ++ "~n",
-  io:fwrite(Divider),
-  output_row([atom_to_list(H) || H <- Headers], Widths),
-  io:fwrite(Divider),
+    %% Output the header then inside dividers
+    Divider = ["-" || _ <- lists:seq(1, lists:sum(Widths) + 3*length(Widths) + 1)] ++ "~n",
+    io:fwrite(Divider),
+    output_row([atom_to_list(H) || H <- Headers], Widths),
+    io:fwrite(Divider),
 
-  [output_row(Row, Widths) || Row <- StringifiedRows],
-  io:fwrite("~n"),
-  ok;
+    [output_row(Row, Widths) || Row <- StringifiedRows],
+    io:fwrite("~n"),
+    ok;
 format_result_block(Result) ->
-  io:format("~p~n", [Result]),
-  ok.
+    io:format("~p~n", [Result]),
+    ok.
 
 measure_column(Header, Position, Items) ->
-  lists:max([length(X) || X <- [atom_to_list(Header)] ++ [lists:nth(Position, Row) || Row <- Items]]).
+    lists:max([length(X) || X <- [atom_to_list(Header)] ++ [lists:nth(Position, Row) || Row <- Items]]).
 output_row(Items, Widths) ->
-  WidthItems = lists:zip(Items, Widths),
-  [io:format("| ~s ", [widen(Item, Width)]) || {Item, Width} <- WidthItems],
-  io:fwrite("|~n").
+    WidthItems = lists:zip(Items, Widths),
+    [io:format("| ~s ", [widen(Item, Width)]) || {Item, Width} <- WidthItems],
+    io:fwrite("|~n").
 
 widen(Item, Width) ->
-  Extra = Width - length(Item),
-  case Extra of
-    0 -> Item;
-    _ -> Item ++ [" " || _ <- lists:seq(1, Extra)]
-  end.
+    Extra = Width - length(Item),
+    case Extra of
+        0 -> Item;
+        _ -> Item ++ [" " || _ <- lists:seq(1, Extra)]
+    end.
+%%   The contents of this file are subject to the Mozilla Public License
+%%   Version 1.1 (the "License"); you may not use this file except in
+%%   compliance with the License. You may obtain a copy of the License at
+%%   http://www.mozilla.org/MPL/
+%%
+%%   Software distributed under the License is distributed on an "AS IS"
+%%   basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%%   License for the specific language governing rights and limitations
+%%   under the License.
+%%
+%%   The Original Code is RabbitMQ BQL Plugin.
+%%
+%%   The Initial Developers of the Original Code are LShift Ltd.
+%%
+%%   Copyright (C) 2009 LShift Ltd.
+%%
+%%   All Rights Reserved.
+%%
+%%   Contributor(s): ______________________________________.
+%%
+-module(bql_dump).
+
+% Client application for dumping the entire state of the Broker to BQL
+
+-export([start/0, stop/0]).
+
+-define(ReservedExchanges, ["amq.match", "amq.headers", "amq.topic", "amq.direct", "amq.fanout", 
+                            "", "amq.rabbitmq.log", "bql.query"]).
+-define(ReservedQueues, ["bql.query"]).
+
+start() ->
+    Exchanges = execute_block("select * from exchanges order by name;",
+                              fun(Ex) ->
+                                {value, {_, ExchangeType}} = lists:keysearch(type, 1, Ex),
+                                Durable = durable_str(Ex),
+                                {value, {_, Name}} = lists:keysearch(name, 1, Ex),
+                               
+                                case lists:member(Name, ?ReservedExchanges) of
+                                  true -> "";
+                                  false -> io_lib:format("create ~s~p exchange '~s';", [Durable, ExchangeType, Name])
+                                end
+                              end),
+    Queues = execute_block("select * from queues order by name;",
+                           fun(Q) ->
+                             Durable = durable_str(Q),
+                             {value, {_, Name}} = lists:keysearch(name, 1, Q),
+                             
+                             case lists:member(Name, ?ReservedQueues) of
+                               true  -> "";
+                               false -> io_lib:format("create ~squeue '~s';", [Durable, Name])
+                             end
+                           end),
+    Bindings = execute_block("select * from bindings order by exchange_name, queue_name, 'routing_key';",
+                             fun(B) ->
+                               {value, {_, X}} = lists:keysearch(exchange_name, 1, B),
+                               {value, {_, Q}} = lists:keysearch(queue_name, 1, B),
+                               {value, {_, RK}} = lists:keysearch(routing_key, 1, B),
+                              
+                               case {X,binary_to_list(RK)} == {"",Q} of
+                                 true ->
+                                   %% Auto-route from the default exchange to a queue. Skip.
+                                   "";
+                                 false -> 
+                                   io_lib:format("create route from '~s' to '~s' when routing_key is '~s';",
+                                                 [X, Q, RK])
+                               end
+                             end),
+
+    io:format("~s~n", [string:join([Exchanges, Queues, Bindings], "\n")]),
+    init:stop().
+
+stop() ->
+    ok.
+
+execute_block(Contents, Formatter) ->
+    case rpc:call(bql_utils:makenode("rabbit"), bql_server, send_command,
+                  [<<"guest">>, <<"guest">>, <<"text/bql">>, Contents]) of	
+        {ok, Result}    -> format(Result, Formatter);
+        {error, Reason} -> io:format("BQL execution failed:~n  ~s~n", [Reason])
+    end.
+
+durable_str(Row) ->
+    case lists:keysearch(durable, 1, Row) of
+        {value, {_, true}} -> "durable ";
+        _                  -> ""
+    end.
+
+
+format([{Headers, Rows}], Formatter) ->
+    Zipped = [lists:zip(Headers, Row) || Row <- Rows],
+    Formatted = [Formatter(Row) || Row <- Zipped],
+    lists:flatten(string:join([F || F <- Formatted, not(length(F) == 0)], "\n")).

src/bql_server.erl

 %%   License for the specific language governing rights and limitations
 %%   under the License.
 %%
-%%   The Original Code is the RabbitMQ Erlang Client.
+%%   The Original Code is RabbitMQ BQL Plugin.
 %%
-%%   The Initial Developers of the Original Code are LShift Ltd.,
-%%   Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd.
+%%   The Initial Developers of the Original Code are LShift Ltd.
 %%
-%%   Portions created by LShift Ltd., Cohesive Financial
-%%   Technologies LLC., and Rabbit Technologies Ltd. are Copyright (C)
-%%   2009 LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit
-%%   Technologies Ltd.;
+%%   Copyright (C) 2009 LShift Ltd.
 %%
 %%   All Rights Reserved.
 %%
-%%   Contributor(s): ___________________________
+%%   Contributor(s): ______________________________________.
 %%
 -module(bql_server).
 
 -behaviour(gen_server).
 
--export([start/0, start/2, stop/0, stop/1, start_link/0, send_command/3]).
+-export([start/0, start/2, stop/0, stop/1, start_link/0, send_command/5]).
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
 
+-include_lib("rabbit_common/include/rabbit.hrl").
+-include_lib("rabbit_common/include/rabbit_framing.hrl").
+
 -record(state, {}).
 
 start() ->
-  start_link(),
-  ok.
+    start_link(),
+    ok.
 
 start(normal, []) ->
-  start_link().
+    start_link().
 
 stop() ->
-  ok.
+    ok.
 
 stop(_State) ->
-  stop().
+    stop().
 
 start_link() ->
-  gen_server:start_link({global, ?MODULE}, ?MODULE, [], []).
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
 
-send_command(Username, Password, Command) ->
-  gen_server:call({global, ?MODULE}, {execute, Username, Password, Command}).
+send_command(Username, Password, VHost, ContentType, Command) ->
+    gen_server:call(?MODULE, {execute, Username, Password, VHost, ContentType, Command}).
 
 %---------------------------
 % Gen Server Implementation
 % --------------------------
 
 init([]) ->
-  {ok, #state{}}.
+    {ok, #state{}}.
 
 handle_call(Msg,_From,State = #state{}) ->
-  case Msg of
-    {execute, Username, Password, Command} ->
-      % rabbit_access_control:user_pass_login(Username, Password),
+    case catch handle_message(Msg) of
+        {reply, Content} ->
+            {reply, Content, State};
+        {'EXIT', #amqp_error{name = access_refused, explanation = Expl}} ->
+            {reply, {error, Expl}, State};
+        {'EXIT', Reason} ->
+            {reply, {error, Reason}, State};
+        Response ->
+            {reply, {error, Response}, State}
+    end.
 
-      % rabbit_access_control:check_resource_access(Username, Resource, Perm)
-      % select name,depth from queues where name = 'amq.control'; => [{name, depth}, {"amq.control", 5}]
+handle_cast(_,State) -> 
+    {noreply, State}.
+    
+handle_info(_Info, State) -> 
+    {noreply, State}.
+    
+terminate(_,_) -> 
+    ok.
 
-     case commands:parse(Command) of
+code_change(_OldVsn, State, _Extra) -> 
+    {ok, State}.
+
+%% Message Handling
+handle_message({execute, Username, Password, VHost, ContentType, Command}) ->
+    %% Validate the user credentials
+    rabbit_access_control:user_pass_login(Username, Password),
+    
+    %% Parse the input based on the content type
+    ParsedCommands = case ContentType of
+        <<"text/bql">> ->
+            commands:parse(Command);
+        <<"application/bql-terms">> ->
+            list_to_term(Command)
+    end,
+    
+    % rabbit_access_control:check_resource_access(Username, Resource, Perm)
+    
+    case ParsedCommands of
         {ok, Commands} ->
-          case bql_applicator:apply_commands(Commands) of
-            {ok, Result} ->
-              {reply, {ok, Result}, State};
-            {error, Reason} ->
-              {reply, {error, Reason}, State}
-          end;
+            case bql_applicator:apply_commands(Commands, Username, VHost) of
+                {ok, Result} ->
+                    {reply, {ok, Result}};
+                {error, Reason} ->
+                    {reply, {error, Reason}}
+            end;
         {error, Reason} ->
-            {reply, {error, Reason}, State}
-     end;
-    _ ->
-      {reply, unknown_command, State}
-  end. 
+            {reply, {error, Reason}}
+    end;
+handle_message(_) ->
+    {reply, unknown_command}.
 
-handle_cast(_,State) -> {reply,unhandled_cast,State}.
-handle_info(_Info, State) -> {reply, unhandled_info, State}.
-terminate(_,_) -> ok.
-code_change(_OldVsn, State, _Extra) -> {ok, State}.
+%%
+%% Helper Methods
+%%
+
+list_to_term(String) ->
+    {ok, T, _} = erl_scan:string(String++"."),
+    erl_parse:parse_term(T).

src/bql_utils.erl

 %%   License for the specific language governing rights and limitations
 %%   under the License.
 %%
-%%   The Original Code is the RabbitMQ Erlang Client.
+%%   The Original Code is RabbitMQ BQL Plugin.
 %%
-%%   The Initial Developers of the Original Code are LShift Ltd.,
-%%   Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd.
+%%   The Initial Developers of the Original Code are LShift Ltd.
 %%
-%%   Portions created by LShift Ltd., Cohesive Financial
-%%   Technologies LLC., and Rabbit Technologies Ltd. are Copyright (C)
-%%   2009 LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit
-%%   Technologies Ltd.;
+%%   Copyright (C) 2009 LShift Ltd.
 %%
 %%   All Rights Reserved.
 %%
-%%   Contributor(s): ___________________________
+%%   Contributor(s): ______________________________________.
 %%
 -module(bql_utils).
 
--export([convert_to_string/1]).
+-export([convert_to_string/1, makenode/1]).
 
 convert_to_string(Value) when is_list(Value) ->
-  Value;
+    Value;
 convert_to_string(Value) when is_binary(Value) ->
-  binary_to_list(Value);
+    binary_to_list(Value);
 convert_to_string(Value) ->
-  io_lib:write(Value).
+    io_lib:write(Value).
+
+%% Imported from rabbit_misc to remove the dependency on the Rabbit server!
+makenode({Prefix, Suffix}) ->
+    list_to_atom(lists:append([Prefix, "@", Suffix]));
+makenode(NodeStr) ->
+    makenode(nodeparts(NodeStr)).
+
+nodeparts(Node) when is_atom(Node) ->
+    nodeparts(atom_to_list(Node));
+nodeparts(NodeStr) ->
+    case lists:splitwith(fun (E) -> E =/= $@ end, NodeStr) of
+        {Prefix, []}     -> {_, Suffix} = nodeparts(node()),
+                            {Prefix, Suffix};
+        {Prefix, Suffix} -> {Prefix, tl(Suffix)}
+    end.

src/command_lexer.xrl

 %%   License for the specific language governing rights and limitations
 %%   under the License.
 %%
-%%   The Original Code is the RabbitMQ Erlang Client.
+%%   The Original Code is RabbitMQ BQL Plugin.
 %%
-%%   The Initial Developers of the Original Code are LShift Ltd.,
-%%   Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd.
+%%   The Initial Developers of the Original Code are LShift Ltd.
 %%
-%%   Portions created by LShift Ltd., Cohesive Financial
-%%   Technologies LLC., and Rabbit Technologies Ltd. are Copyright (C)
-%%   2009 LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit
-%%   Technologies Ltd.;
+%%   Copyright (C) 2009 LShift Ltd.
 %%
 %%   All Rights Reserved.
 %%
-%%   Contributor(s): ___________________________
+%%   Contributor(s): ______________________________________.
 %%
 % Lexes a selector
 
 Definitions.
 
 C     = [0-9A-Za-z\.\-\_]
-W     = [0-9A-Za-z\s\.\-\_%\/*]
+W     = [0-9A-Za-z\s\.\-\_%\/*#@]
 Semi  = [;]
 Wild  = [*]
 Comma = [,]
 direct      :   {token,{exchange_type,TokenLine,list_to_atom(TokenChars)}}.
 headers     :   {token,{exchange_type,TokenLine,list_to_atom(TokenChars)}}.
 fanout      :   {token,{exchange_type,TokenLine,list_to_atom(TokenChars)}}.
+topic       :   {token,{exchange_type,TokenLine,list_to_atom(TokenChars)}}.
 route       :   {token,{route,TokenLine,list_to_atom(TokenChars)}}.
 from        :   {token,{from,TokenLine,list_to_atom(TokenChars)}}.
 to          :   {token,{to,TokenLine,list_to_atom(TokenChars)}}.
 grant       :   {token,{grant,TokenLine,list_to_atom(TokenChars)}}.
 revoke      :   {token,{revoke,TokenLine,list_to_atom(TokenChars)}}.
 purge       :   {token,{purge,TokenLine,list_to_atom(TokenChars)}}.
+post        :   {token,{post,TokenLine,list_to_atom(TokenChars)}}.
+with        :   {token,{with,TokenLine,list_to_atom(TokenChars)}}.
+get         :   {token,{get,TokenLine,list_to_atom(TokenChars)}}.
+drain       :   {token,{drain,TokenLine,list_to_atom(TokenChars)}}.
 {Wild}      :   {token,{wildcard,TokenLine,list_to_atom(TokenChars)}}.
 {Comma}     :   {token,{comma,TokenLine,list_to_atom(TokenChars)}}.
 {Comp}      :   {token,{comparator,TokenLine,list_to_atom(TokenChars)}}.
 like        :   {token,{comparator,TokenLine,list_to_atom(TokenChars)}}.
-'{W}+'      :   {token,{string,TokenLine,strip(TokenChars, TokenLen)}}.
+'{W}*'      :   {token,{string,TokenLine,strip(TokenChars, TokenLen)}}.
 {C}+        :   {token,{string,TokenLine,TokenChars}}.
 {Semi}      :   {token,{semi,TokenLine,list_to_atom(TokenChars)}}.
 {WS}+       :   skip_token.

src/command_parser.yrl

 %%   License for the specific language governing rights and limitations
 %%   under the License.
 %%
-%%   The Original Code is the RabbitMQ Erlang Client.
+%%   The Original Code is RabbitMQ BQL Plugin.
 %%
-%%   The Initial Developers of the Original Code are LShift Ltd.,
-%%   Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd.
+%%   The Initial Developers of the Original Code are LShift Ltd.
 %%
-%%   Portions created by LShift Ltd., Cohesive Financial
-%%   Technologies LLC., and Rabbit Technologies Ltd. are Copyright (C)
-%%   2009 LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit
-%%   Technologies Ltd.;
+%%   Copyright (C) 2009 LShift Ltd.
 %%
 %%   All Rights Reserved.
 %%
-%%   Contributor(s): ___________________________
+%%   Contributor(s): ______________________________________.
 %%
 Nonterminals 
-statements statement expression route_desc field_desc field_list modifiers where_clause orderby_clause predicate predicates.
+statements expression route_desc field_desc field_list modifiers where_clause orderby_clause predicate predicates
+orderby_predicates orderby_predicate.
 
 Terminals
 create drop durable queue exchange exchange_type route from to routing_key is when_sym string select wildcard
-comma where comparator union order by asc desc user identified vhost grant revoke on purge semi.
+comma where comparator union order by asc desc user identified vhost grant revoke on purge post with get semi
+drain.
 
 Rootsymbol statements.
 
-statements -> statement            : ['$1'].
-statements -> statement statements : ['$1'] ++ '$2'.
-
-statement -> expression semi       : '$1'.
+statements -> expression                 : ['$1'].
+statements -> expression semi            : ['$1'].
+statements -> expression semi statements : ['$1'] ++ '$3'.
 
 expression -> create vhost string                             : {create_vhost, unwrap('$3')}.
 expression -> drop vhost string                               : {drop_vhost, unwrap('$3')}.
-expression -> create queue string                             : {create_queue, unwrap('$3'), false}.
-expression -> create durable queue string                     : {create_queue, unwrap('$4'), true}.
+expression -> create queue string                             : {create_queue, unwrap('$3'), false, ""}.
+expression -> create durable queue string                     : {create_queue, unwrap('$4'), true, ""}.
 expression -> drop queue string                               : {drop_queue, unwrap('$3')}.
-expression -> create exchange string                          : {create_exchange, unwrap('$3'), direct, false}.
-expression -> create durable exchange string                  : {create_exchange, unwrap('$4'), direct, true}.
-expression -> create exchange_type exchange string            : {create_exchange, unwrap('$4'), unwrap('$2'), false}.
-expression -> create durable exchange_type exchange string    : {create_exchange, unwrap('$5'), unwrap('$3'), true}.
+expression -> create exchange string                          : {create_exchange, unwrap('$3'), direct, false, ""}.
+expression -> create durable exchange string                  : {create_exchange, unwrap('$4'), direct, true, ""}.
+expression -> create exchange_type exchange string            : {create_exchange, unwrap('$4'), unwrap('$2'), false, ""}.
+expression -> create durable exchange_type exchange string    : {create_exchange, unwrap('$5'), unwrap('$3'), true, ""}.
 expression -> drop exchange string                            : {drop_exchange, unwrap('$3')}.
-expression -> create route_desc                               : {create_binding, '$2'}.
+expression -> create route_desc                               : {create_binding, '$2', ""}.
 expression -> drop route_desc                                 : {drop_binding, '$2'}.
 expression -> create user string identified by string         : {create_user, unwrap('$3'), unwrap('$6')}.
 expression -> drop user string                                : {drop_user, unwrap('$3')}.
 expression -> grant string on string to string                : {grant, list_to_atom(unwrap('$2')), unwrap('$4'), unwrap('$6')}.
 expression -> revoke string from string                       : {revoke, list_to_atom(unwrap('$2')), unwrap('$4')}.
 expression -> purge queue string                              : {purge_queue, unwrap('$3')}.
+expression -> post string to string                           : {post_message, unwrap('$4'), "", unwrap('$2')}.
+expression -> post string to string with routing_key string   : {post_message, unwrap('$4'), unwrap('$7'), unwrap('$2')}.
+expression -> get from string                                 : {retrieve_message, unwrap('$3')}.
+expression -> drain string                                    : {drain_queue, unwrap('$2')}.
 
 route_desc -> route from string to string                                 : {unwrap('$3'), unwrap('$5'), ""}.
 route_desc -> route from string to string when_sym routing_key is string  : {unwrap('$3'), unwrap('$5'), unwrap('$9')}.
 
 predicate -> string comparator string                         : {comp_to_atom(unwrap('$2')), list_to_atom(unwrap('$1')), unwrap('$3')}.
 
-orderby_clause -> order by string                             : {order_by, list_to_atom(unwrap('$3')), ascending}.
-orderby_clause -> order by string asc                         : {order_by, list_to_atom(unwrap('$3')), ascending}.
-orderby_clause -> order by string desc                        : {order_by, list_to_atom(unwrap('$3')), descending}.
+orderby_clause -> order by orderby_predicates                    : {order_by, '$3'}.
+
+orderby_predicates -> orderby_predicate                          : ['$1'].
+orderby_predicates -> orderby_predicate comma orderby_predicates : ['$1'] ++ '$3'.
+
+orderby_predicate -> string                                   : {list_to_atom(unwrap('$1')), ascending}.
+orderby_predicate -> string asc                               : {list_to_atom(unwrap('$1')), ascending}.
+orderby_predicate -> string desc                              : {list_to_atom(unwrap('$1')), descending}.
 
 Erlang code.
 

src/command_parser_test.erl

-%%   The contents of this file are subject to the Mozilla Public License
-%%   Version 1.1 (the "License"); you may not use this file except in
-%%   compliance with the License. You may obtain a copy of the License at
-%%   http://www.mozilla.org/MPL/
-%%
-%%   Software distributed under the License is distributed on an "AS IS"
-%%   basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
-%%   License for the specific language governing rights and limitations
-%%   under the License.
-%%
-%%   The Original Code is the RabbitMQ Erlang Client.
-%%
-%%   The Initial Developers of the Original Code are LShift Ltd.,
-%%   Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd.
-%%
-%%   Portions created by LShift Ltd., Cohesive Financial
-%%   Technologies LLC., and Rabbit Technologies Ltd. are Copyright (C)
-%%   2009 LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit
-%%   Technologies Ltd.;
-%%
-%%   All Rights Reserved.
-%%
-%%   Contributor(s): ___________________________
-%%
--module(command_parser_test).
-
--include_lib("eunit/include/eunit.hrl").
-
--define(debugCommands(C), ?debugFmt("Got commands: ~p~n", [C])). 
-
-create_nondurable_queue_test() ->
-    {ok, Commands} = commands:parse("create queue blah;"),
-    ?assert([{create_queue,"blah",false}] =:= Commands).
-
-create_nondurable_queue_with_space_in_name_test() ->
-    {ok, Commands} = commands:parse("create queue 'bl ah';"),
-    ?assert([{create_queue,"bl ah",false}] =:= Commands).
-
-create_nondurable_queue_with_exotic_name_test() ->
-    {ok, Commands} = commands:parse("create queue b.a-t_b;"),
-    ?assert([{create_queue,"b.a-t_b",false}] =:= Commands).
-
-create_durable_queue_test() ->
-    {ok, Commands} = commands:parse("create durable queue 'blah';"),
-    ?assert([{create_queue,"blah",true}] =:= Commands).
-
-create_multiple_queues_test() ->
-    {ok, Commands} = commands:parse("create durable queue 'blah'; create queue 'blah2';"),
-    ?assert([{create_queue,"blah",true}, {create_queue,"blah2", false}] =:= Commands).
-
-drop_queue_test() ->
-    {ok, Commands} = commands:parse("drop queue 'myqueue';"),
-    ?assert([{drop_queue,"myqueue"}] =:= Commands).
-
-create_default_exchange_test() ->
-    {ok, Commands} = commands:parse("create exchange 'myex';"),
-    ?assert([{create_exchange,"myex",direct,false}] =:= Commands).
-
-create_direct_exchange_test() ->
-    {ok, Commands} = commands:parse("create direct exchange 'myex';"),
-    ?assert([{create_exchange,"myex",direct,false}] =:= Commands).
-
-create_headers_exchange_test() ->
-    {ok, Commands} = commands:parse("create headers exchange 'myex';"),
-    ?assert([{create_exchange,"myex",headers,false}] =:= Commands).
-
-create_fanout_exchange_test() ->
-    {ok, Commands} = commands:parse("create fanout exchange 'myex';"),
-    ?assert([{create_exchange,"myex",fanout,false}] =:= Commands).
-
-create_durable_default_exchange_test() ->
-    {ok, Commands} = commands:parse("create durable exchange 'myex';"),
-    ?assert([{create_exchange,"myex",direct,true}] =:= Commands).
-
-create_durable_direct_exchange_test() ->
-    {ok, Commands} = commands:parse("create durable direct exchange 'myex';"),
-    ?assert([{create_exchange,"myex",direct,true}] =:= Commands).
-
-create_durable_headers_exchange_test() ->
-    {ok, Commands} = commands:parse("create durable headers exchange 'myex';"),
-    ?assert([{create_exchange,"myex",headers,true}] =:= Commands).
-
-create_durable_fanout_exchange_test() ->
-    {ok, Commands} = commands:parse("create durable fanout exchange 'myex';"),
-    ?assert([{create_exchange,"myex",fanout,true}] =:= Commands).
-
-drop_exchange_test() ->
-    {ok, Commands} = commands:parse("drop exchange 'myex';"),
-    ?assert([{drop_exchange,"myex"}] =:= Commands).
-
-create_binding_with_no_routing_key_test() ->
-    {ok, Commands} = commands:parse("create route from 'myex' to 'myqueue';"),
-    ?assert([{create_binding,{"myex","myqueue",""}}] =:= Commands).
-
-create_binding_with_routing_key_test() ->
-    {ok, Commands} = commands:parse("create route from 'myex' to 'myqueue' when routing_key is 'Hello';"),
-    ?assert([{create_binding,{"myex","myqueue","Hello"}}] =:= Commands).
-
-drop_binding_with_no_routing_key_test() ->
-    {ok, Commands} = commands:parse("drop route from 'myex' to 'myqueue';"),
-    ?assert([{drop_binding,{"myex","myqueue",""}}] =:= Commands).
-
-drop_binding_with_routing_key_test() ->
-    {ok, Commands} = commands:parse("drop route from 'myex' to 'myqueue' when routing_key is 'Hello';"),
-    ?assert([{drop_binding,{"myex","myqueue","Hello"}}] =:= Commands).
-
-%create_queue_with_bad_character_test() ->
-%    {error, Reason} = commands:parse("create queue 'queue';\n\n\ncreate queue 'queue%';"),
-%    ?assert("Illegal token \"'queue%\" on line 4" =:= Reason).
-
-create_queue_with_missing_end_quote_test() ->
-    {error, Reason} = commands:parse("create queue 'queue';\n\n\ncreate queue 'queue;"),
-    ?assert("Illegal token \"'queue;\" on line 4" =:= Reason).
-
-create_exchange_of_invalid_type_test() ->
-    {error, Reason} = commands:parse("create queue 'queue';\n\n\ncreate interesting exchange 'queue';"),
-    ?assert("syntax error before: \"interesting\" on line 4" =:= Reason).
-
-select_all_exchange_details_test() ->
-    {ok, Commands} = commands:parse("select * from exchanges;"),
-    ?assert([{select, "exchanges", all, {none, none}}] =:= Commands).
-
-select_exchange_names_test() ->
-    {ok, Commands} = commands:parse("select name from exchanges;"),
-    ?assert([{select, "exchanges", [name], {none, none}}] =:= Commands).
-
-select_exchange_names_and_types_test() ->
-    {ok, Commands} = commands:parse("select name,type from exchanges;"),
-    ?assert([{select, "exchanges", [name, type], {none, none}}] =:= Commands).
-
-select_exchange_name_types_with_filter_test() ->
-    {ok, Commands} = commands:parse("select name,type from exchanges where name='amq.topic';"),
-    ?assert([{select, "exchanges", [name, type], {{eq, name, "amq.topic"}, none}}] =:= Commands).
-
-select_exchange_name_types_with_anded_filter_test() ->
-    {ok, Commands} = commands:parse("select name,type from exchanges where name='amq.topic' and 'durable'!=true;"),
-    ?assert([{select, "exchanges", [name, type], {{and_sym, {eq, name, "amq.topic"}, {neq, durable, "true"}}, none}}] =:= Commands).
-
-select_exchange_name_types_with_ored_filter_test() ->
-    {ok, Commands} = commands:parse("select name,type from exchanges where name='amq.topic' or 'durable'!=true;"),
-    ?assert([{select, "exchanges", [name, type], {{or_sym, {eq, name, "amq.topic"}, {neq, durable, "true"}}, none}}] =:= Commands).
-
-select_exchange_name_order_by_name_test() ->
-    {ok, Commands} = commands:parse("select name,type from exchanges order by name;"),
-    ?assert([{select, "exchanges", [name, type], {none, {order_by, name, ascending}}}] =:= Commands).
-
-select_exchange_name_order_by_name_explicit_descending_test() ->
-    {ok, Commands} = commands:parse("select name,type from exchanges order by name desc;"),
-    ?assert([{select, "exchanges", [name, type], {none, {order_by, name, descending}}}] =:= Commands).
-
-select_exchange_name_order_by_name_explicit_ascending_test() ->
-    {ok, Commands} = commands:parse("select name,type from exchanges order by name asc;"),
-    ?assert([{select, "exchanges", [name, type], {none, {order_by, name, ascending}}}] =:= Commands).
-
-select_exchange_name_order_by_name_with_constraints_test() ->
-    {ok, Commands} = commands:parse("select name,type from exchanges where name='amq.topic' or 'durable'!=true order by name;"),
-    ?assert([{select, "exchanges", [name, type], {{or_sym, {eq, name, "amq.topic"}, {neq, durable, "true"}}, {order_by, name, ascending}}}] =:= Commands).
-
-create_user_test() ->
-    {ok, Commands} = commands:parse("create user user1 identified by mypassword;"),
-    ?assert([{create_user, "user1", "mypassword"}] =:= Commands).
-
-drop_user_test() ->
-    {ok, Commands} = commands:parse("drop user user1;"),
-    ?assert([{drop_user, "user1"}] =:= Commands).
-
-select_binding_with_routing_key_like_test() ->
-    {ok, Commands} = commands:parse("select * from bindings where 'routing_key' like 'amq.%';"),
-    ?assertEqual([{select, "bindings", all, {{like, routing_key, "amq.%"}, none}}], Commands).
-
-create_vhost_test() ->
-    {ok, Commands} = commands:parse("create vhost '/myhost';"),
-    ?assert([{create_vhost, "/myhost"}] =:= Commands).
-
-drop_vhost_test() ->
-    {ok, Commands} = commands:parse("drop vhost '/myhost';"),
-    ?assert([{drop_vhost, "/myhost"}] =:= Commands).
-
-grant_permission_test() ->
-    {ok, Commands} = commands:parse("grant all on '.*' to 'user';"),
-    ?assertEqual([{grant, all, ".*", "user"}], Commands).
-
-revoke_permission_test() ->
-    {ok, Commands} = commands:parse("revoke configure from 'user';"),
-    ?assert([{revoke, configure, "user"}] =:= Commands).
-
-purge_queue_test() ->
-    {ok, Commands} = commands:parse("purge queue myqueue;"),
-    ?assert([{purge_queue, "myqueue"}] =:= Commands).
 %%   License for the specific language governing rights and limitations
 %%   under the License.
 %%
-%%   The Original Code is the RabbitMQ Erlang Client.
+%%   The Original Code is RabbitMQ BQL Plugin.
 %%
-%%   The Initial Developers of the Original Code are LShift Ltd.,
-%%   Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd.
+%%   The Initial Developers of the Original Code are LShift Ltd.
 %%
-%%   Portions created by LShift Ltd., Cohesive Financial
-%%   Technologies LLC., and Rabbit Technologies Ltd. are Copyright (C)
-%%   2009 LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit
-%%   Technologies Ltd.;
+%%   Copyright (C) 2009 LShift Ltd.
 %%
 %%   All Rights Reserved.
 %%
-%%   Contributor(s): ___________________________
+%%   Contributor(s): ______________________________________.
 %%
 -module(commands).
 

src/rabbitmq_bql.erl

+%%   The contents of this file are subject to the Mozilla Public License
+%%   Version 1.1 (the "License"); you may not use this file except in
+%%   compliance with the License. You may obtain a copy of the License at
+%%   http://www.mozilla.org/MPL/
+%%
+%%   Software distributed under the License is distributed on an "AS IS"
+%%   basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%%   License for the specific language governing rights and limitations
+%%   under the License.
+%%
+%%   The Original Code is RabbitMQ BQL Plugin.
+%%
+%%   The Initial Developers of the Original Code are LShift Ltd.
+%%
+%%   Copyright (C) 2009 LShift Ltd.
+%%
+%%   All Rights Reserved.
+%%
+%%   Contributor(s): ______________________________________.
+%%
+-module(rabbitmq_bql).
+
+-export([start/0, stop/0, start/2, stop/1]).
+
+start() -> 
+    rabbitmq_bql_sup:start_link(), ok.
+
+stop() -> 
+    ok.
+
+start(normal, []) ->
+    rabbitmq_bql_sup:start_link().
+
+stop(_State) ->
+    ok.

src/rabbitmq_bql_sup.erl

+%%   The contents of this file are subject to the Mozilla Public License
+%%   Version 1.1 (the "License"); you may not use this file except in
+%%   compliance with the License. You may obtain a copy of the License at
+%%   http://www.mozilla.org/MPL/
+%%
+%%   Software distributed under the License is distributed on an "AS IS"
+%%   basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%%   License for the specific language governing rights and limitations
+%%   under the License.
+%%
+%%   The Original Code is RabbitMQ BQL Plugin.
+%%
+%%   The Initial Developers of the Original Code are LShift Ltd.
+%%
+%%   Copyright (C) 2009 LShift Ltd.
+%%
+%%   All Rights Reserved.
+%%
+%%   Contributor(s): ______________________________________.
+%%
+-module(rabbitmq_bql_sup).
+-behaviour(supervisor).
+
+-export([start_link/0, init/1]).
+
+start_link() ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, _Arg = []).
+
+init([]) ->
+    {ok, {{one_for_one, 3, 10},
+          [{bql_server,
+            {bql_server, start_link, []},
+            permanent,
+            10000,
+            worker,
+            [bql_server]},
+           {bql_amqp_rpc_server,
+            {bql_amqp_rpc_server, start_link, []},
+            permanent,
+            10000,
+            worker,
+            [bql_amqp_rpc_server]}
+          ]}}.

test/amq_interface_test.erl

+%%   The contents of this file are subject to the Mozilla Public License
+%%   Version 1.1 (the "License"); you may not use this file except in
+%%   compliance with the License. You may obtain a copy of the License at
+%%   http://www.mozilla.org/MPL/
+%%