Commits

Ben Hood committed a4c2c91 Merge

Merged default into bug21410

Comments (0)

Files changed (15)

 PARSER_NAME=command_parser
 
 src/command_lexer.erl: ebin/leex.beam src/command_lexer.xrl
-	$(ERL) -I -pa ebin -noshell -eval 'leex:file("$(SOURCE_DIR)/$(LEXER_NAME).xrl",[{outdir,"$(SOURCE_DIR)"}]), halt().'
+	$(ERL) -I -pa ebin -noshell -eval 'ok = leex:file("$(SOURCE_DIR)/$(LEXER_NAME).xrl",[{outdir,"$(SOURCE_DIR)"}]), halt().'
 
 src/command_parser.erl: ebin/leex.beam src/command_parser.yrl
-	$(ERL) -I -pa ebin -noshell -eval 'yecc:file("$(SOURCE_DIR)/$(PARSER_NAME)"), halt().'
+	$(ERL) -I -pa ebin -noshell -eval '{ok, _} = yecc:file("$(SOURCE_DIR)/$(PARSER_NAME)"), halt().'
-- Queue based access mechanism
+- Assume that the broker is running in the same VM, i.e. get rid of
+  rpc_call()
+- Drain queues to the server side
 - Accept keywords as terms in predicates
-- Use keyword allowing the vhost to be switched
+- Allow layered commands so that a thick client can submit a term list as well
+as an unparsed string (so that this works for thin clients as well) 
+  -- use content-type to demarcate what type it is, defaulting to unparsed
+     so that thin clients don't need to set this
+- Drop Erlang RPC as transport, just use AMQP to get commands in to BQL
+- Test case for draining queues
+- Add at least rudimentary support for security
+
+Refactoring:
 
 Nice to haves:
 - drop connection force where user='guest' and host='host' and port='port';
 - Runlevel changes - ie, drop broker into a maintenance mode
 - Creating checkpoints (mnesia, and queues)
 - Activate checkpoint
+
+Bugs

src/bql_amqp_rpc_server.erl

 -export([start_link/0]).
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
 
+-include_lib("amqp_client/include/amqp_client.hrl").
 -include_lib("rabbit_common/include/rabbit.hrl").
 -include_lib("rabbit_common/include/rabbit_framing.hrl").
 
 -define(QueueName, <<"bql.query">>).
 
 start_link() ->
-  gen_server:start_link(?MODULE, [], []).
+    gen_server:start_link(?MODULE, [], []).
 
 init([]) ->
     Connection = lib_amqp:start_connection(),
 handle_info(#'basic.consume_ok'{}, State) ->
     {noreply, State};
 handle_info({#'basic.deliver' { 'delivery_tag' = DeliveryTag },
-             {content, ClassId, Props, PropertiesBin, [Payload] }},
+             #amqp_msg{props = Props, payload = Payload }},
             State = #state { channel = Ch }) ->
-    #'P_basic'{correlation_id = CorrelationId,
-               reply_to = Q} = decode_properties(ClassId, Props, PropertiesBin),
+    #'P_basic'{correlation_id = CorrelationId, reply_to = Q} = Props,
     try
       ResponseObj = case rfc4627:decode(Payload) of
         {ok, RequestObj, _Rest} ->
           case rfc4627:get_field(RequestObj, "query") of 
             {ok, Query} ->
-              case bql_server:send_command(<<"guest">>, <<"guest">>, binary_to_list(Query)) of
+              case bql_server:send_command(<<"guest">>, <<"guest">>, <<"text/bql">>, binary_to_list(Query)) of
                 {ok, Result} ->
                   {obj, [{"success", true}, {"messages", format_result(Result)}]};
                 {error, Reason} ->
       Properties = #'P_basic'{correlation_id = CorrelationId},
       lib_amqp:publish(Ch, <<>>, Q, rfc4627:encode(ResponseObj), Properties)
     catch
-      Tag:Error -> io:fwrite("Caught error: ~p,~p~n", [Tag, Error])
+      Tag:Error -> io:fwrite("Caught error: ~p,~p,~p~n", [Tag, Error,
+                              erlang:get_stacktrace()])
     end,
     lib_amqp:ack(Ch, DeliveryTag),
     {noreply, State};
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
 
-decode_properties(ClassId, Properties, PropertiesBin) ->
-  case Properties of
-    none -> rabbit_framing:decode_properties(ClassId, PropertiesBin);
-    _    -> Properties
-  end.
-
 format_result(Result) ->
-  [format_result_entry(E) || E <- Result].
+    [format_result_entry(E) || E <- Result].
 
 format_result_entry(ok) ->
-  <<"ok">>;
+    <<"ok">>;
 format_result_entry({Headers, Rows}) ->
-  [{obj, [{atom_to_list(Header), list_to_binary(bql_utils:convert_to_string(Cell))} || {Header, Cell} <- lists:zip(Headers, Row)]} || Row <- Rows];
+    [{obj, [{atom_to_list(Header), list_to_binary(bql_utils:convert_to_string(Cell))} || 
+                {Header, Cell} <- lists:zip(Headers, Row)]} || Row <- Rows];
 format_result_entry(Msg) when is_list(Msg) ->
-  list_to_binary(Msg).
+    list_to_binary(Msg).

src/bql_applicator.erl

 %%
 -module(bql_applicator).
 
--export([apply_commands/1]).
+-export([apply_commands/2]).
 
 -include_lib("rabbit_common/include/rabbit.hrl").
 -include_lib("rabbit_common/include/rabbit_framing.hrl").
 
 -define(RPC_TIMEOUT, 30000).
 
--record(state, {ch, node}).
+-record(state, {node, user}).
 
-apply_commands(Commands) ->
-  % Create an AMQP channel so we can actually perform operations
-  Connection = lib_amqp:start_connection("localhost"),
-  ControlCh = amqp_connection:open_channel(Connection),
+apply_commands(Commands, User) ->
+    % Create a connection to the Rabbit node
+    Node = rabbit_misc:localnode(rabbit),
 
-  % Create a connection to the Rabbit node too
-  Node = rabbit_misc:localnode(rabbit),
-
-  {ok, [catch apply_command(#state {ch = ControlCh, node = Node}, Command) || Command <- Commands]}.
-
+    {ok, [catch apply_command(Command, #state {node = Node, user = User}) 
+            || Command <- Commands]}.
+            
 % Queue Management
-apply_command(#state {ch = ControlCh}, {create_queue, Name, Durable}) ->
-  lib_amqp:declare_queue(ControlCh, #'queue.declare'{queue = list_to_binary(Name), durable = Durable}),
-  ok;
-apply_command(#state {ch = ControlCh}, {drop_queue, Name}) ->
-  lib_amqp:delete_queue(ControlCh, list_to_binary(Name)),
-  ok;
-apply_command(#state {ch = ControlCh}, {purge_queue, Name}) ->
-  amqp_channel:call(ControlCh, #'queue.purge'{queue = list_to_binary(Name)}),
-  ok;
+apply_command({create_queue, Name, Durable, Args}, #state {user = Username}) ->
+    QueueName = rabbit_misc:r(<<"/">>, queue, list_to_binary(Name)),
+    ensure_resource_access(Username, QueueName, configure),
+    rabbit_amqqueue:declare(QueueName, Durable, false, Args),
+    ok;
+apply_command({drop_queue, Name}, #state {user = Username}) ->
+    QueueName = rabbit_misc:r(<<"/">>, queue, list_to_binary(Name)),
+    ensure_resource_access(Username, QueueName, configure),
+    case rabbit_amqqueue:with(
+           QueueName,
+           fun (Q) -> rabbit_amqqueue:delete(Q, false, false) end) of
+        {ok, _Purged} ->
+            ok;
+        {error, not_found} ->
+            {error, io_lib:format("Queue ~s not found", [Name])}
+    end;
+apply_command({purge_queue, Name}, #state {user = Username}) ->
+    QueueName = rabbit_misc:r(<<"/">>, queue, list_to_binary(Name)),
+    ensure_resource_access(Username, QueueName, read),
+    rabbit_amqqueue:with_or_die(QueueName,
+                                fun (Q) -> rabbit_amqqueue:purge(Q) end);
 
 % Exchange Management
-apply_command(#state {ch = ControlCh}, {create_exchange, Name, Type, Durable}) ->
-  amqp_channel:call(ControlCh, #'exchange.declare'{exchange = list_to_binary(Name),
-                                                   type = list_to_binary(atom_to_list(Type)),
-                                                   durable = Durable}),
-  ok;
-apply_command(#state {ch = ControlCh}, {drop_exchange, Name}) ->
-  lib_amqp:delete_exchange(ControlCh, list_to_binary(Name)),
-  ok;
+apply_command({create_exchange, Name, Type, Durable, Args}, #state {user = Username}) ->
+    CheckedType = rabbit_exchange:check_type(list_to_binary(atom_to_list(Type))),
+    ExchangeName = rabbit_misc:r(<<"/">>, exchange, list_to_binary(Name)),
+    ensure_resource_access(Username, ExchangeName, configure),
+    X = case rabbit_exchange:lookup(ExchangeName) of
+           {ok, FoundX} -> FoundX;
+           {error, not_found} ->
+               case rabbit_misc:r_arg(<<"/">>, exchange, Args,
+                                      <<"alternate-exchange">>) of
+                   undefined -> ok;
+                   AName     -> ensure_resource_access(Username, ExchangeName, read),
+                                ensure_resource_access(Username, AName, write),
+                                ok
+               end,
+               rabbit_exchange:declare(ExchangeName, CheckedType,
+                                       Durable, false, Args)
+       end,
+    ok = rabbit_exchange:assert_type(X, CheckedType),
+    ok;
+apply_command({drop_exchange, Name}, #state {user = Username}) ->
+    ExchangeName = rabbit_misc:r(<<"/">>, exchange, list_to_binary(Name)),
+    ensure_resource_access(Username, ExchangeName, configure),
+    case rabbit_exchange:delete(ExchangeName, false) of
+        {error, not_found} ->
+            io_lib:format("Unknown exchange ~s", [Name]);
+        ok ->
+            ok
+    end;
 
 % User Management
-apply_command(#state {node = Node}, {create_user, Name, Password}) ->
-  rpc_call(Node, rabbit_access_control, add_user, [list_to_binary(Name), list_to_binary(Password)]),
-  ok;
-apply_command(#state {node = Node}, {drop_user, Name}) ->
-  rpc_call(Node, rabbit_access_control, delete_user, [list_to_binary(Name)]),
-  ok;
+apply_command({create_user, Name, Password}, #state {node = Node}) ->
+    rpc_call(Node, rabbit_access_control, add_user, [list_to_binary(Name), list_to_binary(Password)]),
+    ok;
+apply_command({drop_user, Name}, #state {node = Node}) ->
+    rpc_call(Node, rabbit_access_control, delete_user, [list_to_binary(Name)]),
+    ok;
 
 % VHost Management
-apply_command(#state {node = Node}, {create_vhost, Name}) ->
-  rpc_call(Node, rabbit_access_control, add_vhost, [list_to_binary(Name)]),
-  ok;
-apply_command(#state {node = Node}, {drop_vhost, Name}) ->
-  rpc_call(Node, rabbit_access_control, delete_vhost, [list_to_binary(Name)]),
-  ok;
+apply_command({create_vhost, Name}, #state {node = Node}) ->
+    rpc_call(Node, rabbit_access_control, add_vhost, [list_to_binary(Name)]),
+    ok;
+apply_command({drop_vhost, Name}, #state {node = Node}) ->
+    rpc_call(Node, rabbit_access_control, delete_vhost, [list_to_binary(Name)]),
+    ok;
 
 % Binding Management
-apply_command(#state {ch = ControlCh}, {create_binding, {X, Q, RoutingKey}}) ->
-  lib_amqp:bind_queue(ControlCh, list_to_binary(X), list_to_binary(Q), list_to_binary(RoutingKey)),
-  ok;
-apply_command(#state {ch = ControlCh}, {drop_binding, {X, Q, RoutingKey}}) ->
-  lib_amqp:unbind_queue(ControlCh, list_to_binary(X), list_to_binary(Q), list_to_binary(RoutingKey)),
-  ok;
+apply_command({create_binding, {X, Q, RoutingKey}, Args}, #state {user = Username}) ->
+    binding_action(fun rabbit_exchange:add_binding/4, 
+                   list_to_binary(X), list_to_binary(Q),
+                   list_to_binary(RoutingKey), Args, Username);
+apply_command({drop_binding, {X, Q, RoutingKey}}, #state {user = Username}) ->
+    binding_action(fun rabbit_exchange:delete_binding/4, 
+                   list_to_binary(X), list_to_binary(Q),
+                   list_to_binary(RoutingKey), <<"">>, Username);
 
 % Privilege Management
-apply_command(#state {node = Node}, {grant, Privilege, Regex, User}) ->
-  PrivilegeList = expand_privilege_list(Privilege),
-  apply_privilege_list(Node, list_to_binary(User), PrivilegeList, list_to_binary(Regex));
-apply_command(#state {node = Node}, {revoke, Privilege, User}) ->
-  PrivilegeList = expand_privilege_list(Privilege),
-  apply_privilege_list(Node, list_to_binary(User), PrivilegeList, <<"">>);
+apply_command({grant, Privilege, Regex, User}, #state {node = Node}) ->
+    PrivilegeList = expand_privilege_list(Privilege),
+    apply_privilege_list(Node, list_to_binary(User), PrivilegeList, list_to_binary(Regex));
+apply_command({revoke, Privilege, User}, #state {node = Node}) ->
+    PrivilegeList = expand_privilege_list(Privilege),
+    apply_privilege_list(Node, list_to_binary(User), PrivilegeList, <<"">>);
   
 % Queries
-apply_command(#state {node = Node}, {select, "exchanges", Fields, Modifiers}) ->
-  AllFieldList = [name, type, durable, auto_delete, arguments],
-  FieldList = validate_fields(AllFieldList, Fields),
-  Exchanges = rpc_call(Node, rabbit_exchange, info_all, [<<"/">>, AllFieldList]),
-  interpret_response(AllFieldList, FieldList, Exchanges, Modifiers);
+apply_command({select, "exchanges", Fields, Modifiers}, #state {node = Node}) ->
+    AllFieldList = [name, type, durable, auto_delete, arguments],
+    FieldList = validate_fields(AllFieldList, Fields),
+    Exchanges = rpc_call(Node, rabbit_exchange, info_all, [<<"/">>]),
+    interpret_response(AllFieldList, FieldList, Exchanges, Modifiers);
 
-apply_command(#state {node = Node}, {select, "queues", Fields, Modifiers}) ->
-  AllFieldList = [name, durable, auto_delete, arguments, pid, messages_ready,
-                  messages_unacknowledged, messages_uncommitted, messages, acks_uncommitted,
-                  consumers, transactions, memory],
-  FieldList = validate_fields(AllFieldList, Fields),
-  Queues = rpc_call(Node, rabbit_amqqueue, info_all, [<<"/">>, AllFieldList]),
-  interpret_response(AllFieldList, FieldList, Queues, Modifiers);
+apply_command({select, "queues", Fields, Modifiers}, #state {node = Node}) ->
+    AllFieldList = [name, durable, auto_delete, arguments, pid, messages_ready,
+                    messages_unacknowledged, messages_uncommitted, messages, acks_uncommitted,
+                    consumers, transactions, memory],
+    FieldList = validate_fields(AllFieldList, Fields),
+    Queues = rpc_call(Node, rabbit_amqqueue, info_all, [<<"/">>]),
+    interpret_response(AllFieldList, FieldList, Queues, Modifiers);
 
-apply_command(#state {node = Node}, {select, "bindings", Fields, Modifiers}) ->
-  AllFieldList = [exchange_name, queue_name, routing_key, args],
-  FieldList = validate_fields(AllFieldList, Fields),
-  Bindings = rpc_call(Node, rabbit_exchange, list_bindings, [<<"/">>]),
-  interpret_response(AllFieldList, FieldList, Bindings, Modifiers);
+apply_command({select, "bindings", Fields, Modifiers}, #state {node = Node}) ->
+    AllFieldList = [exchange_name, queue_name, routing_key, args],
+    FieldList = validate_fields(AllFieldList, Fields),
+    Bindings = rpc_call(Node, rabbit_exchange, list_bindings, [<<"/">>]),
+    interpret_response(AllFieldList, FieldList, Bindings, Modifiers);
 
-apply_command(#state {node = Node}, {select, "users", Fields, Modifiers}) ->
-  AllFieldList = [name],
-  FieldList = validate_fields(AllFieldList, Fields),
-  Response = rpc_call(Node, rabbit_access_control, list_users, []),
-  Users = [[binary_to_list(User)] || User <- Response],
-  interpret_response(AllFieldList, FieldList, Users, Modifiers);
+apply_command({select, "users", Fields, Modifiers}, #state {node = Node}) ->
+    AllFieldList = [name],
+    FieldList = validate_fields(AllFieldList, Fields),
+    Response = rpc_call(Node, rabbit_access_control, list_users, []),
+    Users = [[binary_to_list(User)] || User <- Response],
+    interpret_response(AllFieldList, FieldList, Users, Modifiers);
 
-apply_command(#state {node = Node}, {select, "vhosts", Fields, Modifiers}) ->
-  AllFieldList = [name],
-  FieldList = validate_fields(AllFieldList, Fields),
-  Response = rpc_call(Node, rabbit_access_control, list_vhosts, []),
-  VHosts = [[{name, binary_to_list(User)}] || User <- Response],
-  interpret_response(AllFieldList, FieldList, VHosts, Modifiers);
+apply_command({select, "vhosts", Fields, Modifiers}, #state {node = Node}) ->
+    AllFieldList = [name],
+    FieldList = validate_fields(AllFieldList, Fields),
+    Response = rpc_call(Node, rabbit_access_control, list_vhosts, []),
+    VHosts = [[{name, binary_to_list(User)}] || User <- Response],
+    interpret_response(AllFieldList, FieldList, VHosts, Modifiers);
 
-apply_command(#state {node = Node}, {select, "permissions", Fields, Modifiers}) ->
-  AllFieldList = [username,configure_perm,write_perm,read_perm],
-  FieldList = validate_fields(AllFieldList, Fields),
-  Permissions = rpc_call(Node, rabbit_access_control, list_vhost_permissions, [<<"/">>]),
-  interpret_response(AllFieldList, FieldList, Permissions, Modifiers);
+apply_command({select, "permissions", Fields, Modifiers}, #state {node = Node}) ->
+    AllFieldList = [username,configure_perm,write_perm,read_perm],
+    FieldList = validate_fields(AllFieldList, Fields),
+    Permissions = rpc_call(Node, rabbit_access_control, list_vhost_permissions, [<<"/">>]),
+    interpret_response(AllFieldList, FieldList, Permissions, Modifiers);
 
-apply_command(#state {node = Node}, {select, "connections", Fields, Modifiers}) ->
-  AllFieldList = [pid, address, port, peer_address, peer_port, recv_oct, recv_cnt, send_oct, send_cnt,
-                  send_pend, state, channels, user, vhost, timeout, frame_max],
-  FieldList = validate_fields(AllFieldList, Fields),
-  Connections = rpc_call(Node, rabbit_networking, connection_info_all, [AllFieldList]),
-  interpret_response(AllFieldList, FieldList, Connections, Modifiers);
+apply_command({select, "connections", Fields, Modifiers}, #state {node = Node}) ->
+    AllFieldList = [pid, address, port, peer_address, peer_port, recv_oct, recv_cnt, send_oct, send_cnt,
+                    send_pend, state, channels, user, vhost, timeout, frame_max],
+    FieldList = validate_fields(AllFieldList, Fields),
+    Connections = rpc_call(Node, rabbit_networking, connection_info_all, []),
+    interpret_response(AllFieldList, FieldList, Connections, Modifiers);
 
 % Sending Messages
-apply_command(#state { ch = Ch }, {post_message, Exchange, RoutingKey, Msg}) ->
-  Properties = #'P_basic'{ delivery_mode = 1 },
-  case lib_amqp:publish(Ch, list_to_binary(Exchange), list_to_binary(RoutingKey), 
-                        list_to_binary(Msg), Properties) of
-    ok  -> ok;
-    Res -> io_lib:format("~p", [Res])
-  end;
+apply_command({post_message, X, RoutingKey, Msg}, #state { user = Username }) ->
+    ExchangeName = rabbit_misc:r(<<"/">>, exchange, list_to_binary(X)),
+    ensure_resource_access(Username, ExchangeName, write),
+    Exchange = rabbit_exchange:lookup_or_die(ExchangeName),
+    Content = rabbit_basic:build_content(#'P_basic'{}, list_to_binary(Msg)),
+    Message = #basic_message{exchange_name  = ExchangeName,
+                             routing_key    = list_to_binary(RoutingKey),
+                             content        = Content,
+                             persistent_key = none},
+    {RoutingRes, _DeliveredQPids} =
+                rabbit_exchange:publish(
+                  Exchange,
+                  rabbit_basic:delivery(true, false, none, Message)),
+    case RoutingRes of
+        routed ->
+            ok;
+        unroutable ->
+            "Message was unroutable";
+        not_delivered ->
+            "Message was not able to be delivered"
+    end;
+    
+% Retreving Messages
+apply_command({retrieve_message, QName}, State = #state{}) ->
+    with_queue(fun(Q) -> poll(Q) end, QName, State);
 
-% Retreving Messages
-apply_command(#state { ch = Ch }, {retrieve_message, Queue}) ->
-  case lib_amqp:get(Ch, list_to_binary(Queue)) of
-    'basic.get_empty' ->
-      empty;
-    {content, _ClassId, _Props, _PropertiesBin, [Payload]} ->
-      Payload
-  end;
+%% This drains messages to a file on the server
+apply_command({drain_queue, QName}, State = #state{}) ->
+    Fun = 
+        fun(Q) ->
+            case disk_log:open([{name, QName}, {type, halt}]) of
+                {ok, Log} -> drain_loop(Q, Log);
+                {repaired, Log, _, _} -> drain_loop(Q, Log);
+                {error, Reason} ->
+                    error_logger:error_msg("Could not open disk log for"
+                                           " queue (~p): ~p~n", [Q, Reason]),
+                    not_ok
+            end,
+            ok
+        end,
+  with_queue(Fun, QName, State);
 
-apply_command(#state {}, {select, EntityName, _, _}) ->
-  lists:flatten("Unknown entity " ++ EntityName ++ " specified to query");
+apply_command({select, EntityName, _, _}, #state {}) ->
+    lists:flatten("Unknown entity " ++ EntityName ++ " specified to query");
 
 % Catch-all  
-apply_command(_State, Unknown) ->
-  debug("Unknown command: ~p~n", [Unknown]).
+apply_command(Unknown, _State) ->
+    debug("Unknown command: ~p~n", [Unknown]).
+
+drain_loop(Q, Log) ->
+    case poll(Q) of
+        empty -> ok;
+        Payload ->
+            case disk_log:blog(Log, Payload) of
+                ok -> drain_loop(Q, Log);
+                _ -> ok
+            end
+    end,
+    disk_log:close(Log),
+    ok.
+
+with_queue(Fun, Queue, #state{node = Node, user = Username}) ->
+    QueueName = rabbit_misc:r(<<"/">>, queue, list_to_binary(Queue)),
+    ensure_resource_access(Username, QueueName, read),
+    
+    case rpc_call(Node, rabbit_amqqueue, lookup, [QueueName]) of
+        {error, not_found} -> 
+            lists:flatten(io_lib:format("~s not found", [rabbit_misc:rs(QueueName)]));
+        {ok, Q} -> 
+            Fun(Q)
+    end.
+
+poll(Q) ->
+    case rabbit_amqqueue:basic_get(Q, self(), true) of
+        {ok, _MsgCount,
+         {_QName, _QPid, _MsgId, _Redelivered,
+          #basic_message{content = Content}}} ->
+            {_Props, Payload} = rabbit_basic:from_content(Content),
+            Payload;
+        empty ->
+            empty
+    end.
 
 % Debug Control
 debug(_Format, _Params) ->
-  ok.
-  % io:format(Format, Params).
+    ok.
+    %% io:format(Format, Params).
 
 % RPC Commands
 rpc_call(Node, Mod, Fun, Args) ->
 
 % Formatting commands
 interpret_response(_, _, {bad_argument, Field}, _) ->
-  lists:flatten(io_lib:format("Invalid field \"~p\" requested", [Field]));
+    lists:flatten(io_lib:format("Invalid field \"~p\" requested", [Field]));
 interpret_response(_, _, [], _) ->
-  [];
+    [];
 
 interpret_response(AvailFieldList, RequestedFieldList, [RHead|_] = Response, Modifiers) when is_tuple(RHead)->
-  interpret_response(AvailFieldList, RequestedFieldList, [tuple_to_list(X) || X <- Response], Modifiers);
+    interpret_response(AvailFieldList, RequestedFieldList, [tuple_to_list(X) || X <- Response], Modifiers);
 
 interpret_response(AvailFieldList, RequestedFieldList, Response, {Constraints, Ordering}) ->
-  FormattedResponse = [[format_response(Cell) || Cell <- Detail] || Detail <- Response],
-  ConstrainedResponse = apply_constraints(AvailFieldList, FormattedResponse, Constraints),
-  OrderedResponse = apply_ordering(AvailFieldList, ConstrainedResponse, Ordering),
-  FilteredResponse = filter_cols(AvailFieldList, RequestedFieldList, OrderedResponse),
-  {RequestedFieldList, FilteredResponse}.
+    FormattedResponse = [[format_response(Cell) || Cell <- Detail] || Detail <- Response],
+    ConstrainedResponse = apply_constraints(AvailFieldList, FormattedResponse, Constraints),
+    OrderedResponse = apply_ordering(AvailFieldList, ConstrainedResponse, Ordering),
+    FilteredResponse = filter_cols(AvailFieldList, RequestedFieldList, OrderedResponse),
+    {RequestedFieldList, FilteredResponse}.
 
 format_response({_Name, Value}) ->
-  format_response(Value);
+    format_response(Value);
 format_response({resource, _VHost, _Type, Value}) ->
-  binary_to_list(Value);
+    binary_to_list(Value);
 format_response(Value) ->
-  Value.
+    Value.
 
 % Constraints
 apply_constraints(_FieldList, Rows, none) ->
-  Rows;
+    Rows;
 
 apply_constraints(FieldList, Rows, {and_sym, Left, Right}) ->
-  LeftRows = apply_constraints(FieldList, Rows, Left),
-  RightRows = apply_constraints(FieldList, Rows, Right),
-  sets:to_list(sets:intersection(sets:from_list(LeftRows), sets:from_list(RightRows)));
+    LeftRows = apply_constraints(FieldList, Rows, Left),
+    RightRows = apply_constraints(FieldList, Rows, Right),
+    sets:to_list(sets:intersection(sets:from_list(LeftRows), sets:from_list(RightRows)));
 apply_constraints(FieldList, Rows, {or_sym, Left, Right}) ->
-  LeftRows = apply_constraints(FieldList, Rows, Left),
-  RightRows = apply_constraints(FieldList, Rows, Right),
-  sets:to_list(sets:union(sets:from_list(LeftRows), sets:from_list(RightRows)));
+    LeftRows = apply_constraints(FieldList, Rows, Left),
+    RightRows = apply_constraints(FieldList, Rows, Right),
+    sets:to_list(sets:union(sets:from_list(LeftRows), sets:from_list(RightRows)));
 
 apply_constraints(FieldList, Rows, {Constraint, Field, Value}) ->
-  FieldPositions = lists:zip(FieldList, lists:seq(1, length(FieldList))),
-  case lists:keysearch(Field, 1, FieldPositions) of
-    {value, {Field, FieldPosition}} -> 
-      [Row || Row <- Rows, constraint_accepts(Constraint, lists:nth(FieldPosition, Row), Value)];
-    false                           -> 
-      throw(lists:flatten(io_lib:format("Invalid field ~s specified in constraint", [Field])))
+    FieldPositions = lists:zip(FieldList, lists:seq(1, length(FieldList))),
+    case lists:keysearch(Field, 1, FieldPositions) of
+        {value, {Field, FieldPosition}} -> 
+            [Row || Row <- Rows, constraint_accepts(Constraint, lists:nth(FieldPosition, Row), Value)];
+        false                           -> 
+            throw(lists:flatten(io_lib:format("Invalid field ~s specified in constraint", [Field])))
   end.
 
 constraint_accepts(eq, Value, Expected) ->
-  bql_utils:convert_to_string(Value) =:= Expected;
+    bql_utils:convert_to_string(Value) =:= Expected;
 constraint_accepts(neq, Value, Expected) ->
-  not(bql_utils:convert_to_string(Value) =:= Expected);
+    not(bql_utils:convert_to_string(Value) =:= Expected);
 constraint_accepts(lt, Value, Expected) ->
-  {IntExpected, _Rest} = string:to_integer(Expected),
-  Value < IntExpected;
+    {IntExpected, _Rest} = string:to_integer(Expected),
+    Value < IntExpected;
 constraint_accepts(lteq, Value, Expected) ->
-  {IntExpected, _Rest} = string:to_integer(Expected),
-  Value =< IntExpected;
+    {IntExpected, _Rest} = string:to_integer(Expected),
+    Value =< IntExpected;
 constraint_accepts(gt, Value, Expected) ->
-  {IntExpected, _Rest} = string:to_integer(Expected),
-  Value > IntExpected;
+    {IntExpected, _Rest} = string:to_integer(Expected),
+    Value > IntExpected;
 constraint_accepts(gteq, Value, Expected) ->
-  {IntExpected, _Rest} = string:to_integer(Expected),
-  Value >= IntExpected;
+    {IntExpected, _Rest} = string:to_integer(Expected),
+    Value >= IntExpected;
 constraint_accepts(like, Value, Expected) ->
-  % Build the REs
-  {ok, PeriodReplaceRe} = re:compile("\\."),
-  {ok, PercentReplaceRe} = re:compile("%"),
+    % Build the REs
+    {ok, PeriodReplaceRe} = re:compile("\\."),
+    {ok, PercentReplaceRe} = re:compile("%"),
 
-  % Update the pattern
-  ProtectedPeriods = re:replace(Expected, PeriodReplaceRe, "\\\\.", [global, {return, list}]),
-  PercentagesAsWildcards = re:replace(ProtectedPeriods, PercentReplaceRe, ".*", [global, {return, list}]),
+    % Update the pattern
+    ProtectedPeriods = re:replace(Expected, PeriodReplaceRe, "\\\\.", [global, {return, list}]),
+    PercentagesAsWildcards = re:replace(ProtectedPeriods, PercentReplaceRe, ".*", [global, {return, list}]),
   
-  % Compile the user pattern
-  {ok, LikePattern} = re:compile("^" ++ PercentagesAsWildcards ++ "$"),
+    % Compile the user pattern
+    {ok, LikePattern} = re:compile("^" ++ PercentagesAsWildcards ++ "$"),
 
-  % Test if the pattern matches
-  case re:run(Value, LikePattern) of
-    {match, _} -> true;
-    nomatch -> false
-  end.
+    % Test if the pattern matches
+    case re:run(Value, LikePattern) of
+        {match, _} -> true;
+        nomatch -> false
+    end.
 
 apply_ordering(_FieldList, Rows, none) ->
-  Rows;
+    Rows;
 apply_ordering(FieldList, Rows, {order_by, Clauses}) ->
-  FieldPositions = lists:zip(FieldList, lists:seq(1, length(FieldList))),
-  OrderingFieldPositions = [{name_to_position(Name, FieldPositions), Direction} || {Name, Direction} <- Clauses],
-  lists:sort(fun(Row1, Row2) -> order_items(Row1, Row2, OrderingFieldPositions) end, Rows).
+    FieldPositions = lists:zip(FieldList, lists:seq(1, length(FieldList))),
+    OrderingFieldPositions = [{name_to_position(Name, FieldPositions), Direction} || {Name, Direction} <- Clauses],
+    lists:sort(fun(Row1, Row2) -> order_items(Row1, Row2, OrderingFieldPositions) end, Rows).
 
 name_to_position(Field, FieldPositions) ->
-  case lists:keysearch(Field, 1, FieldPositions) of
-    {value, {Field, FieldPosition}} ->
-      FieldPosition;
-    false ->
-      throw(lists:flatten(io_lib:format("Invalid field ~s specified in ordering clause", [Field])))
-  end.
+    case lists:keysearch(Field, 1, FieldPositions) of
+        {value, {Field, FieldPosition}} ->
+            FieldPosition;
+        false ->
+            throw(lists:flatten(io_lib:format("Invalid field ~s specified in ordering clause", [Field])))
+    end.
 
 order_items(_, _, []) ->
-  true;
+    true;
 order_items(Row1, Row2, [{FieldPosition, Direction} | RestOrdering]) ->
-  Row1Val = lists:nth(FieldPosition, Row1),
-  Row2Val = lists:nth(FieldPosition, Row2),
-  case Row1Val == Row2Val of
-    true -> order_items(Row1, Row2, RestOrdering);
-    false ->
-      case Direction of
-        descending -> Row1Val > Row2Val;
-        ascending  -> Row1Val < Row2Val
-      end
-  end.
+    Row1Val = lists:nth(FieldPosition, Row1),
+    Row2Val = lists:nth(FieldPosition, Row2),
+    case Row1Val == Row2Val of
+        true -> order_items(Row1, Row2, RestOrdering);
+        false ->
+            case Direction of
+                descending -> Row1Val > Row2Val;
+                ascending  -> Row1Val < Row2Val
+            end
+    end.
 
 filter_cols(AllFields, RequiredFields, Rows) ->
-  FieldPositions = lists:zip(AllFields, lists:seq(1, length(AllFields))),
-  Extract = fun(Field, Row) ->
-     {value, {_, Position}} = lists:keysearch(Field, 1, FieldPositions),
-     lists:nth(Position, Row)
-  end,
+    FieldPositions = lists:zip(AllFields, lists:seq(1, length(AllFields))),
+    Extract = 
+        fun(Field, Row) ->
+            {value, {_, Position}} = 
+                lists:keysearch(Field, 1, FieldPositions),
+            lists:nth(Position, Row)
+        end,
   [[Extract(Field, Row) || Field <- RequiredFields] || Row <- Rows].
 
 validate_fields(Available, Requested) ->
 
 % Privilege Helpers
 expand_privilege_list(all) ->
-  [configure, read, write];
+    [configure, read, write];
 expand_privilege_list(X) ->
-  [X].
+    [X].
 
 apply_privilege_list(Node, User, PrivilegeList, Regex) ->
-  % Retrieve the old privilege structure
-  Current = retrieve_privileges(Node, User),
+    %% Retrieve the old privilege structure
+    Current = retrieve_privileges(Node, User),
 
-  % Update each privilege detailed in the privilege spec
-  NewPrivs = [case X of
+    %% Update each privilege detailed in the privilege spec
+    NewPrivs = [case X of
                 {PrivKey, CurVal} ->
                     case lists:member(PrivKey, PrivilegeList) of
                         true -> Regex;
                         false -> CurVal
                     end
               end || X <- Current],
-  [NewConfigure,NewWrite,NewRead] = NewPrivs,
+    [NewConfigure,NewWrite,NewRead] = NewPrivs,
 
-  % Set the permissions
-  rpc_call(Node, rabbit_access_control, set_permissions, [User, <<"/">>, NewConfigure, NewWrite, NewRead]),
-  ok.
+    % Set the permissions
+    rpc_call(Node, rabbit_access_control, set_permissions, [User, <<"/">>, NewConfigure, NewWrite, NewRead]),
+    ok.
 
 retrieve_privileges(Node, User) ->
-  Permissions = rpc_call(Node, rabbit_access_control, list_vhost_permissions, [<<"/">>]),
-  UserPermissions = [[{configure, ConfigureRE}, {write, WriteRE}, {read, ReadRE}]
-    || {PermUser, ConfigureRE, WriteRE, ReadRE} <- Permissions, User =:= PermUser],
-  case length(UserPermissions) of
-    0 -> [{configure, <<"">>}, {write, <<"">>}, {read, <<"">>}];
-    _ -> lists:nth(1, UserPermissions)
-  end.
+    Permissions = rpc_call(Node, rabbit_access_control, list_vhost_permissions, [<<"/">>]),
+    UserPermissions = [[{configure, ConfigureRE}, {write, WriteRE}, {read, ReadRE}]
+        || {PermUser, ConfigureRE, WriteRE, ReadRE} <- Permissions, User =:= PermUser],
+    case length(UserPermissions) of
+        0 -> [{configure, <<"">>}, {write, <<"">>}, {read, <<"">>}];
+        _ -> lists:nth(1, UserPermissions)
+    end.
+
+ensure_resource_access(Username, Resource, Perm) ->
+    rabbit_access_control:check_resource_access(Username, Resource, Perm).
+    
+binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, Username) ->
+    QueueName = rabbit_misc:r(<<"/">>, queue, QueueNameBin),
+    ensure_resource_access(Username, QueueName, write),
+    ExchangeName = rabbit_misc:r(<<"/">>, exchange, ExchangeNameBin),
+    ensure_resource_access(Username, ExchangeName, read),
+    case Fun(ExchangeName, QueueName, RoutingKey, Arguments) of
+        {error, exchange_not_found} ->
+            lists:flatten(io_lib:format("~s not found", [rabbit_misc:rs(ExchangeName)]));
+        {error, queue_not_found} ->
+            lists:flatten(io_lib:format("~s not found", [rabbit_misc:rs(QueueName)]));
+        {error, exchange_and_queue_not_found} ->
+            lists:flatten(io_lib:format("Neither ~s nor ~s exist",
+                                        [rabbit_misc:rs(ExchangeName), 
+                                         rabbit_misc:rs(QueueName)]));
+        {error, binding_not_found} ->
+            ok;
+        {error, durability_settings_incompatible} ->
+            lists:flatten(io_lib:format("Durability settings of ~s incompatible with ~s",
+                                        [rabbit_misc:rs(QueueName), 
+                                         rabbit_misc:rs(ExchangeName)]));
+        ok -> ok
+    end.

src/bql_client.erl

     ok.
 
 execute_shell() ->
-  case run_command() of
-    exit -> ok;
-    _    -> execute_shell()
-  end.
+    case run_command() of
+        exit -> ok;
+        _    -> execute_shell()
+    end.
 
 run_command() ->
-  Line = io:get_line("BQL> "),
-  case Line of
-    eof      -> exit;
-    "exit\n" -> exit;
-    _        -> execute_block(Line), ok
-  end.
+    Line = io:get_line("BQL> "),
+    case Line of
+        eof      -> exit;
+        "exit\n" -> exit;
+        _        -> execute_block(Line), ok
+    end.
       
 
 apply_bql_file(BQL) ->
-  case filelib:is_file(BQL) of
-    false ->
-      io:fwrite("Provided BQL file does not exist!~n"),
-      error;
-    true ->
-      {ok, Contents} = file:read_file(BQL),
-      execute_block(binary_to_list(Contents))
-  end.
+    case filelib:is_file(BQL) of
+        false ->
+            io:fwrite("Provided BQL file does not exist!~n"),
+            error;
+        true ->
+            {ok, Contents} = file:read_file(BQL),
+            execute_block(binary_to_list(Contents))
+    end.
 
 execute_block(Contents) ->
-  case rpc:call(localnode(rabbit), bql_server, send_command, [<<"guest">>, <<"guest">>, Contents]) of	
-%  case bql_server:send_command(<<"guest">>, <<"guest">>, Contents) of
-    {ok, Result}    -> format_result(Result);
-    {error, Reason} -> io:format("BQL execution failed:~n  ~s~n", [Reason])
-  end.
+    case rpc:call(localnode(rabbit), bql_server, send_command, 
+                    [<<"guest">>, <<"guest">>, <<"text/bql">>, Contents]) of	
+        {ok, Result}    -> format_result(Result);
+        {error, Reason} -> io:format("BQL execution failed:~n  ~s~n", [Reason])
+    end.
 
 format_result(Result) ->
-  [format_result_block(Item) || Item <- Result],
-  ok.
+    [format_result_block(Item) || Item <- Result],
+    ok.
 
 format_result_block({Headers, Rows}) when is_list(Headers), is_list(Rows) ->
-  % Convert the content of all the rows to strings
-  StringifiedRows = [[bql_utils:convert_to_string(Cell) || Cell <- Row] || Row <- Rows],
+    %% Convert the content of all the rows to strings
+    StringifiedRows = [[bql_utils:convert_to_string(Cell) || Cell <- Row] || Row <- Rows],
 
-  % Work through the items and headers, and find the longest item
-  CountedHeaders = lists:zip(Headers, lists:seq(1, length(Headers))),
-  Widths = [measure_column(Header, Position, StringifiedRows) || {Header, Position} <- CountedHeaders],
+    %% Work through the items and headers, and find the longest item
+    CountedHeaders = lists:zip(Headers, lists:seq(1, length(Headers))),
+    Widths = [measure_column(Header, Position, StringifiedRows) || {Header, Position} <- CountedHeaders],
 
-  % Output the header then inside dividers
-  Divider = ["-" || _ <- lists:seq(1, lists:sum(Widths) + 3*length(Widths) + 1)] ++ "~n",
-  io:fwrite(Divider),
-  output_row([atom_to_list(H) || H <- Headers], Widths),
-  io:fwrite(Divider),
+    %% Output the header then inside dividers
+    Divider = ["-" || _ <- lists:seq(1, lists:sum(Widths) + 3*length(Widths) + 1)] ++ "~n",
+    io:fwrite(Divider),
+    output_row([atom_to_list(H) || H <- Headers], Widths),
+    io:fwrite(Divider),
 
-  [output_row(Row, Widths) || Row <- StringifiedRows],
-  io:fwrite("~n"),
-  ok;
+    [output_row(Row, Widths) || Row <- StringifiedRows],
+    io:fwrite("~n"),
+    ok;
 format_result_block(Result) ->
-  io:format("~p~n", [Result]),
-  ok.
+    io:format("~p~n", [Result]),
+    ok.
 
 measure_column(Header, Position, Items) ->
-  lists:max([length(X) || X <- [atom_to_list(Header)] ++ [lists:nth(Position, Row) || Row <- Items]]).
+    lists:max([length(X) || X <- [atom_to_list(Header)] ++ [lists:nth(Position, Row) || Row <- Items]]).
 output_row(Items, Widths) ->
-  WidthItems = lists:zip(Items, Widths),
-  [io:format("| ~s ", [widen(Item, Width)]) || {Item, Width} <- WidthItems],
-  io:fwrite("|~n").
+    WidthItems = lists:zip(Items, Widths),
+    [io:format("| ~s ", [widen(Item, Width)]) || {Item, Width} <- WidthItems],
+    io:fwrite("|~n").
 
 widen(Item, Width) ->
-  Extra = Width - length(Item),
-  case Extra of
-    0 -> Item;
-    _ -> Item ++ [" " || _ <- lists:seq(1, Extra)]
-  end.
+    Extra = Width - length(Item),
+    case Extra of
+        0 -> Item;
+        _ -> Item ++ [" " || _ <- lists:seq(1, Extra)]
+    end.
 
 localnode(Name) ->
     %% Imported from rabbit_misc to remove the dependency on the Rabbit server!
     ok.
 
 execute_block(Contents, Formatter) ->
-  case rpc:call(localnode(rabbit), bql_server, send_command, [<<"guest">>, <<"guest">>, Contents]) of	
-    {ok, Result}    -> format(Result, Formatter);
-    {error, Reason} -> io:format("BQL execution failed:~n  ~s~n", [Reason])
-  end.
+    case rpc:call(localnode(rabbit), bql_server, send_command, [<<"guest">>, <<"guest">>, Contents]) of	
+        {ok, Result}    -> format(Result, Formatter);
+        {error, Reason} -> io:format("BQL execution failed:~n  ~s~n", [Reason])
+    end.
 
 durable_str(Row) ->
-  case lists:keysearch(durable, 1, Row) of
-    {value, {_, true}} -> "durable ";
-    _                  -> ""
-  end.
+    case lists:keysearch(durable, 1, Row) of
+        {value, {_, true}} -> "durable ";
+        _                  -> ""
+    end.
 
 
 format([{Headers, Rows}], Formatter) ->
-  Zipped = [lists:zip(Headers, Row) || Row <- Rows],
-  Formatted = [Formatter(Row) || Row <- Zipped],
-  lists:flatten(string:join([F || F <- Formatted, not(length(F) == 0)], "\n")).
+    Zipped = [lists:zip(Headers, Row) || Row <- Rows],
+    Formatted = [Formatter(Row) || Row <- Zipped],
+    lists:flatten(string:join([F || F <- Formatted, not(length(F) == 0)], "\n")).
 
 localnode(Name) ->
     %% Imported from rabbit_misc to remove the dependency on the Rabbit server!

src/bql_server.erl

 
 -behaviour(gen_server).
 
--export([start/0, start/2, stop/0, stop/1, start_link/0, send_command/3]).
+-export([start/0, start/2, stop/0, stop/1, start_link/0, send_command/4]).
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
 
 -record(state, {}).
 
 start() ->
-  start_link(),
-  ok.
+    start_link(),
+    ok.
 
 start(normal, []) ->
-  start_link().
+    start_link().
 
 stop() ->
-  ok.
+    ok.
 
 stop(_State) ->
-  stop().
+    stop().
 
 start_link() ->
-  gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
 
-send_command(Username, Password, Command) ->
-  gen_server:call(?MODULE, {execute, Username, Password, Command}).
+send_command(Username, Password, ContentType, Command) ->
+    gen_server:call(?MODULE, {execute, Username, Password, ContentType, Command}).
 
 %---------------------------
 % Gen Server Implementation
 % --------------------------
 
 init([]) ->
-  {ok, #state{}}.
+    {ok, #state{}}.
 
 handle_call(Msg,_From,State = #state{}) ->
-  case Msg of
-    {execute, _Username, _Password, Command} ->
-      % rabbit_access_control:user_pass_login(Username, Password),
+    case catch handle_message(Msg) of
+        {reply, Content} ->
+            {reply, Content, State};
+        {'EXIT', {amqp,access_refused,ErrorMsg,none}} ->
+            {reply, {error, ErrorMsg}, State};
+        {'EXIT', Reason} ->
+            {reply, {error, Reason}, State};
+        Response ->
+            {reply, {error, Response}, State}
+    end.
 
-      % rabbit_access_control:check_resource_access(Username, Resource, Perm)
-      % select name,depth from queues where name = 'amq.control'; => [{name, depth}, {"amq.control", 5}]
+handle_cast(_,State) -> 
+    {noreply, State}.
+    
+handle_info(_Info, State) -> 
+    {noreply, State}.
+    
+terminate(_,_) -> 
+    ok.
 
-     case commands:parse(Command) of
+code_change(_OldVsn, State, _Extra) -> 
+    {ok, State}.
+
+%% Message Handling
+handle_message({execute, Username, Password, ContentType, Command}) ->
+    %% Validate the user credentials
+    rabbit_access_control:user_pass_login(Username, Password),
+    
+    %% Parse the input based on the content type
+    ParsedCommands = case ContentType of
+        <<"text/bql">> ->
+            commands:parse(Command);
+        <<"application/bql-terms">> ->
+            list_to_term(Command)
+    end,
+    
+    % rabbit_access_control:check_resource_access(Username, Resource, Perm)
+    
+    case ParsedCommands of
         {ok, Commands} ->
-          case bql_applicator:apply_commands(Commands) of
-            {ok, Result} ->
-              {reply, {ok, Result}, State};
-            {error, Reason} ->
-              {reply, {error, Reason}, State}
-          end;
+            case bql_applicator:apply_commands(Commands, Username) of
+                {ok, Result} ->
+                    {reply, {ok, Result}};
+                {error, Reason} ->
+                    {reply, {error, Reason}}
+            end;
         {error, Reason} ->
-            {reply, {error, Reason}, State}
-     end;
-    _ ->
-      {reply, unknown_command, State}
-  end. 
+            {reply, {error, Reason}}
+    end;
+handle_message(_) ->
+    {reply, unknown_command}.
 
-handle_cast(_,State) -> {reply,unhandled_cast,State}.
-handle_info(_Info, State) -> {reply, unhandled_info, State}.
-terminate(_,_) -> ok.
-code_change(_OldVsn, State, _Extra) -> {ok, State}.
+%%
+%% Helper Methods
+%%
+
+list_to_term(String) ->
+    {ok, T, _} = erl_scan:string(String++"."),
+    erl_parse:parse_term(T).

src/bql_utils.erl

 -export([convert_to_string/1]).
 
 convert_to_string(Value) when is_list(Value) ->
-  Value;
+    Value;
 convert_to_string(Value) when is_binary(Value) ->
-  binary_to_list(Value);
+    binary_to_list(Value);
 convert_to_string(Value) ->
-  io_lib:write(Value).
+    io_lib:write(Value).

src/command_lexer.xrl

 Definitions.
 
 C     = [0-9A-Za-z\.\-\_]
-W     = [0-9A-Za-z\s\.\-\_%\/*]
+W     = [0-9A-Za-z\s\.\-\_%\/*#@]
 Semi  = [;]
 Wild  = [*]
 Comma = [,]
 post        :   {token,{post,TokenLine,list_to_atom(TokenChars)}}.
 with        :   {token,{with,TokenLine,list_to_atom(TokenChars)}}.
 get         :   {token,{get,TokenLine,list_to_atom(TokenChars)}}.
+drain       :   {token,{drain,TokenLine,list_to_atom(TokenChars)}}.
 {Wild}      :   {token,{wildcard,TokenLine,list_to_atom(TokenChars)}}.
 {Comma}     :   {token,{comma,TokenLine,list_to_atom(TokenChars)}}.
 {Comp}      :   {token,{comparator,TokenLine,list_to_atom(TokenChars)}}.

src/command_parser.yrl

 
 Terminals
 create drop durable queue exchange exchange_type route from to routing_key is when_sym string select wildcard
-comma where comparator union order by asc desc user identified vhost grant revoke on purge post with get semi.
+comma where comparator union order by asc desc user identified vhost grant revoke on purge post with get semi
+drain.
 
 Rootsymbol statements.
 
 
 expression -> create vhost string                             : {create_vhost, unwrap('$3')}.
 expression -> drop vhost string                               : {drop_vhost, unwrap('$3')}.
-expression -> create queue string                             : {create_queue, unwrap('$3'), false}.
-expression -> create durable queue string                     : {create_queue, unwrap('$4'), true}.
+expression -> create queue string                             : {create_queue, unwrap('$3'), false, ""}.
+expression -> create durable queue string                     : {create_queue, unwrap('$4'), true, ""}.
 expression -> drop queue string                               : {drop_queue, unwrap('$3')}.
-expression -> create exchange string                          : {create_exchange, unwrap('$3'), direct, false}.
-expression -> create durable exchange string                  : {create_exchange, unwrap('$4'), direct, true}.
-expression -> create exchange_type exchange string            : {create_exchange, unwrap('$4'), unwrap('$2'), false}.
-expression -> create durable exchange_type exchange string    : {create_exchange, unwrap('$5'), unwrap('$3'), true}.
+expression -> create exchange string                          : {create_exchange, unwrap('$3'), direct, false, ""}.
+expression -> create durable exchange string                  : {create_exchange, unwrap('$4'), direct, true, ""}.
+expression -> create exchange_type exchange string            : {create_exchange, unwrap('$4'), unwrap('$2'), false, ""}.
+expression -> create durable exchange_type exchange string    : {create_exchange, unwrap('$5'), unwrap('$3'), true, ""}.
 expression -> drop exchange string                            : {drop_exchange, unwrap('$3')}.
-expression -> create route_desc                               : {create_binding, '$2'}.
+expression -> create route_desc                               : {create_binding, '$2', ""}.
 expression -> drop route_desc                                 : {drop_binding, '$2'}.
 expression -> create user string identified by string         : {create_user, unwrap('$3'), unwrap('$6')}.
 expression -> drop user string                                : {drop_user, unwrap('$3')}.
 expression -> post string to string                           : {post_message, unwrap('$4'), "", unwrap('$2')}.
 expression -> post string to string with routing_key string   : {post_message, unwrap('$4'), unwrap('$7'), unwrap('$2')}.
 expression -> get from string                                 : {retrieve_message, unwrap('$3')}.
+expression -> drain string                                    : {drain_queue, unwrap('$2')}.
 
 route_desc -> route from string to string                                 : {unwrap('$3'), unwrap('$5'), ""}.
 route_desc -> route from string to string when_sym routing_key is string  : {unwrap('$3'), unwrap('$5'), unwrap('$9')}.

src/rabbitmq_bql_sup.erl

 -export([start_link/0, init/1]).
 
 start_link() ->
-  supervisor:start_link({local, ?MODULE}, ?MODULE, _Arg = []).
+    supervisor:start_link({local, ?MODULE}, ?MODULE, _Arg = []).
 
 init([]) ->
     {ok, {{one_for_one, 3, 10},

test/amq_interface_test.erl

 %%
 -module(amq_interface_test).
 
--include("rabbit.hrl").
--include("rabbit_framing.hrl").
+-include_lib("rabbit_common/include/rabbit.hrl").
+-include_lib("rabbit_common/include/rabbit_framing.hrl").
 -include_lib("eunit/include/eunit.hrl").
 
 submit_create_command_test() ->
-  Response = send_request("create exchange myexchange;"),
-  ?assertEqual("{\"success\":true,\"messages\":[\"ok\"]}", Response).
+    Response = send_request("create exchange myexchange;"),
+    ?assertEqual("{\"success\":true,\"messages\":[\"ok\"]}", Response).
 
 submit_query_test() ->
-  Response = send_request("select * from vhosts where name='/';"),
-  ?assertEqual("{\"success\":true,\"messages\":[[{\"name\":\"/\"}]]}", Response).
+    Response = send_request("select * from vhosts where name='/';"),
+    ?assertEqual("{\"success\":true,\"messages\":[[{\"name\":\"/\"}]]}", Response).
 
 submit_badly_formatted_query_test() ->
-  Response = send_request("create invalidexchange myexchange;"),
-  ?assertEqual("{\"success\":false,\"message\":\"syntax error before: \\\"invalidexchange\\\" on line 1\"}", Response).
+    Response = send_request("create invalidexchange myexchange;"),
+    ?assertEqual("{\"success\":false,\"message\":\"syntax error before: \\\"invalidexchange\\\" on line 1\"}", Response).
 
 submit_query_against_non_existant_object_test() ->
-  Response = send_request("select * from something;"),
-  ?assertEqual("{\"success\":true,\"messages\":[\"Unknown entity something specified to query\"]}", Response).
+    Response = send_request("select * from something;"),
+    ?assertEqual("{\"success\":true,\"messages\":[\"Unknown entity something specified to query\"]}", Response).
 
 send_request(Content) ->
-  Connection = Connection = lib_amqp:start_connection(),
-  Client = bql_amqp_rpc_client:start(Connection, <<>>),
-  Res = bql_amqp_rpc_client:call(Client, <<"bql.query">>, <<"application/json">>,
-                                 list_to_binary("{\"query\":\"" ++ Content ++ "\"}"), 500),
-  lib_amqp:close_connection(Connection),
-  Res.
+    Connection = Connection = lib_amqp:start_connection(),
+    Client = bql_amqp_rpc_client:start(Connection, <<>>),
+    Res = bql_amqp_rpc_client:call(Client, <<"bql.query">>, <<"application/json">>,
+                                   list_to_binary("{\"query\":\"" ++ Content ++ "\"}"), 500),
+    lib_amqp:close_connection(Connection),
+    Res.

test/bql_amqp_rpc_client.erl

 
 -module(bql_amqp_rpc_client).
 
--include_lib("rabbit_framing.hrl").
--include_lib("rabbit.hrl").
--include("amqp_client.hrl").
+-include_lib("rabbit_common/include/rabbit_framing.hrl").
+-include_lib("rabbit_common/include/rabbit.hrl").
+-include_lib("amqp_client/include/amqp_client.hrl").
 
 -behaviour(gen_server).
 
     gen_server:call(Pid, stop, infinity).
 
 call(RpcClientPid, Exchange, ContentType, Payload) ->
-  call(RpcClientPid, Exchange, ContentType, Payload, infinity).
+    call(RpcClientPid, Exchange, ContentType, Payload, infinity).
 call(RpcClientPid, Exchange, ContentType, Payload, Timeout) ->
     gen_server:call(RpcClientPid, {call, Exchange, ContentType, Payload}, Timeout).
 
     {stop, normal, State};
 
 handle_info({#'basic.deliver'{},
-            {content, ClassId, Props, PropertiesBin, [Payload] }},
+            #amqp_msg{props = #'P_basic'{correlation_id = <<Id:64>>},
+                      payload = Payload }},
             State = #rpc_client_state{continuations = Conts}) ->
-    #'P_basic'{correlation_id = CorrelationId}
-               = decode_properties(ClassId, Props, PropertiesBin),
-    <<Id:64>> = CorrelationId,
     From = dict:fetch(Id, Conts),
-    gen_server:reply(From, Payload),
+    gen_server:reply(From, binary_to_list(Payload)),
     {noreply, State#rpc_client_state{continuations = dict:erase(Id, Conts) }}.
 
 code_change(_OldVsn, State, _Extra) ->

test/bql_test.erl

 -define(debugCommands(C), ?debugFmt("Got commands: ~p~n", [C])).
 
 create_nondurable_queue_test() ->
-  [ok] = execute("create queue mynondurablequeue;"),
-  [{_, Result}] = execute("select * from queues where name=mynondurablequeue and 'durable'=false;"),
-  ?assertEqual(1, length(Result)),
-  ok.
+    [ok] = execute("create queue mynondurablequeue;"),
+    [{_, Result}] = execute("select * from queues where name=mynondurablequeue and 'durable'=false;"),
+    ?assertEqual(1, length(Result)),
+    ok.
 
 create_durable_queue_test() ->
-  [ok] = execute("create durable queue mydurablequeue;"),
-  [{_, Result}] = execute("select * from queues where name=mydurablequeue and 'durable'=true;"),
-  ?assertEqual(1, length(Result)),
-  ok.
+    [ok] = execute("create durable queue mydurablequeue;"),
+    [{_, Result}] = execute("select * from queues where name=mydurablequeue and 'durable'=true;"),
+    ?assertEqual(1, length(Result)),
+    ok.
 
 drop_queue_test() ->
-  [ok, ok] = execute("create queue myqueuefordropping; drop queue myqueuefordropping;"),
-  [{_, Result}] = execute("select * from queues where name=myqueuefordropping;"),
-  ?assertEqual(0, length(Result)),
-  ok.
+    [ok, ok] = execute("create queue myqueuefordropping; drop queue myqueuefordropping;"),
+    [{_, Result}] = execute("select * from queues where name=myqueuefordropping;"),
+    ?assertEqual(0, length(Result)),
+    ok.
 
 constrain_with_invalid_field_test() ->
-  Response = execute("select * from queues where invalid_field=something;"),
-  ?assertEqual(["Invalid field invalid_field specified in constraint"], Response),
-  ok.
+    Response = execute("select * from queues where invalid_field=something;"),
+    ?assertEqual(["Invalid field invalid_field specified in constraint"], Response),
+    ok.
 
 order_with_invalid_field_test() ->
-  Response = execute("select * from queues order by invalid_field;"),
-  ?assertEqual(["Invalid field invalid_field specified in ordering clause"], Response),
-  ok.
+    Response = execute("select * from queues order by invalid_field;"),
+    ?assertEqual(["Invalid field invalid_field specified in ordering clause"], Response),
+    ok.
 
 order_with_single_field_test() ->
-  [ok,ok,ok] = execute("create queue myqueue3; create queue myqueue2; create queue myqueue1"),
-  [{_, Result}] = execute("select name from queues where name like 'myqueue%' order by name;"),
-  ?assertEqual([["myqueue1"], ["myqueue2"], ["myqueue3"]], Result),
-  ok.
+    [ok,ok,ok] = execute("create queue myqueue3; create queue myqueue2; create queue myqueue1"),
+    [{_, Result}] = execute("select name from queues where name like 'myqueue%' order by name;"),
+    ?assertEqual([["myqueue1"], ["myqueue2"], ["myqueue3"]], Result),
+    ok.
 
 order_with_multiple_field_test() ->
-  [ok,ok,ok] = execute("create topic exchange myx3; create topic exchange myx2; create headers exchange myx1"),
-  [{_, Result}] = execute("select name from exchanges where name like 'myx%' order by type desc, name;"),
-  ?assertEqual([["myx2"], ["myx3"], ["myx1"]], Result),
-  ok.
+    [ok,ok,ok] = execute("create topic exchange myx3; create topic exchange myx2; create headers exchange myx1"),
+    [{_, Result}] = execute("select name from exchanges where name like 'myx%' order by type desc, name;"),
+    ?assertEqual([["myx2"], ["myx3"], ["myx1"]], Result),
+    ok.
 
 create_non_durable_exchange_test() ->
-  [ok] = execute("create exchange mynondurableexchange;"),
-  [{_, Result}] = execute("select * from exchanges where name=mynondurableexchange and 'durable'=false;"),
-  ?assertEqual(1, length(Result)),
-  ok.
+    [ok] = execute("create exchange mynondurableexchange;"),
+    [{_, Result}] = execute("select * from exchanges where name=mynondurableexchange and 'durable'=false;"),
+    ?assertEqual(1, length(Result)),
+    ok.
 
 create_durable_exchange_test() ->
-  [ok] = execute("create durable exchange mydurableexchange;"),
-  [{_, Result}] = execute("select * from exchanges where name=mydurableexchange and 'durable'=true;"),
-  ?assertEqual(1, length(Result)),
-  ok.
+    [ok] = execute("create durable exchange mydurableexchange;"),
+    [{_, Result}] = execute("select * from exchanges where name=mydurableexchange and 'durable'=true;"),
+    ?assertEqual(1, length(Result)),
+    ok.
 
 drop_exchange_test() ->
-  [ok, ok] = execute("create exchange myexchangefordropping; drop exchange myexchangefordropping;"),
-  [{_, Result}] = execute("select * from exchanges where name=myexchangefordropping;"),
-  ?assertEqual(0, length(Result)),
-  ok.
+    [ok, ok] = execute("create exchange myexchangefordropping; drop exchange myexchangefordropping;"),
+    [{_, Result}] = execute("select * from exchanges where name=myexchangefordropping;"),
+    ?assertEqual(0, length(Result)),
+    ok.
 
 create_vhost_test() ->
-  [ok] = execute("create vhost '/mytestvhost';"),
-  [{_, Result}] = execute("select * from vhosts where name='/mytestvhost';"),
-  ?assertEqual(1, length(Result)),
-  ok.
+    [ok] = execute("create vhost '/mytestvhost';"),
+    [{_, Result}] = execute("select * from vhosts where name='/mytestvhost';"),
+    ?assertEqual(1, length(Result)),
+    ok.
 
 drop_vhost_test() ->
-  [ok, ok] = execute("create vhost '/mytestvhostfordropping'; drop vhost '/mytestvhostfordropping';"),
-  [{_, Result}] = execute("select * from vhosts where name='/mytestvhostfordropping';"),
-  ?assertEqual(0, length(Result)),
-  ok.
+    [ok, ok] = execute("create vhost '/mytestvhostfordropping'; drop vhost '/mytestvhostfordropping';"),
+    [{_, Result}] = execute("select * from vhosts where name='/mytestvhostfordropping';"),
+    ?assertEqual(0, length(Result)),
+    ok.
 
 create_user_test() ->
-  [ok] = execute("create user anewuser identified by password;"),
-  [{_, Result}] = execute("select * from users where name=anewuser;"),
-  ?assertEqual(1, length(Result)),
-  ok.
+    [ok] = execute("create user anewuser identified by password;"),
+    [{_, Result}] = execute("select * from users where name=anewuser;"),
+    ?assertEqual(1, length(Result)),
+    ok.
 
 drop_user_test() ->
-  [ok, ok] = execute("create user anotheruserfordropping identified by secret; drop user anotheruserfordropping;"),
-  [{_, Result}] = execute("select * from users where name=anotheruserfordropping;"),
-  ?assertEqual(0, length(Result)),
-  ok.
+    [ok, ok] = execute("create user anotheruserfordropping identified by secret; drop user anotheruserfordropping;"),
+    [{_, Result}] = execute("select * from users where name=anotheruserfordropping;"),
+    ?assertEqual(0, length(Result)),
+    ok.
+    
+create_route_from_bad_exchange_test() ->
+    [ok] = execute("create queue forbadexchange"),
+    ?assertEqual(["exchange 'missing' in vhost '/' not found"], execute("create route from missing to forbadexchange")).
+        
+create_route_to_bad_queue_test() ->
+    [ok] = execute("create exchange forbadqueue"),
+    ?assertEqual(["queue 'missing' in vhost '/' not found"], execute("create route from forbadqueue to missing")).
+    
+create_route_from_nondurable_exchange_to_durable_queue_test() ->
+    [ok] = execute("create exchange nondurableexchange"),
+    [ok] = execute("create durable queue mydurablequeue"),
+    ?assertEqual(["Durability settings of " ++
+                  "queue 'mydurablequeue' in vhost '/' " ++
+                  "incompatible with exchange 'nondurableexchange' in vhost '/'"],
+                 execute("create route from nondurableexchange to mydurablequeue")).
 
 select_permission_with_where_clause_not_in_result_test() ->
-  [{_, Result}] = execute("select username from permissions where username='guest' and read_perm='.*';"),
-  ?assertEqual([[<<"guest">>]], Result),
-  ok.
+    [{_, Result}] = execute("select username from permissions where username='guest' and read_perm='.*';"),
+    ?assertEqual([[<<"guest">>]], Result),
+    ok.
 
 select_queue_with_where_clause_not_in_result_test() ->
-  [ok] = execute("create queue mynondurablequeue;"),
-  [{_, Result}] = execute("select 'durable' from queues where name='mynondurablequeue' and 'durable'=false;"),
-  ?assertEqual([[false]], Result),
-  ok.
+    [ok] = execute("create queue mynondurablequeue;"),
+    [{_, Result}] = execute("select 'durable' from queues where name='mynondurablequeue' and 'durable'=false;"),
+    ?assertEqual([[false]], Result),
+    ok.
 
 select_exchange_with_where_clause_not_in_result_test() ->
-  [ok] = execute("create exchange mynondurableexchange;"),
-  [{_, Result}] = execute("select 'durable' from exchanges where name='mynondurableexchange' and 'durable'=false;"),
-  ?assertEqual([[false]], Result),
-  ok.
+    [ok] = execute("create exchange mynondurableexchange;"),
+    [{_, Result}] = execute("select 'durable' from exchanges where name='mynondurableexchange' and 'durable'=false;"),
+    ?assertEqual([[false]], Result),
+    ok.
 
 select_binding_with_where_clause_not_in_result_test() ->
-  [ok, ok] = execute("create exchange mynondurableexchange; create queue mynondurablequeue;"),
-  [ok] = execute("create route from mynondurableexchange to mynondurablequeue;"),
-  [{_, Result}] = execute("select queue_name from bindings where exchange_name='mynondurableexchange';"),
-  ?assertEqual([["mynondurablequeue"]], Result),
-  ok.
+    [ok, ok] = execute("create exchange mynondurableexchange; create queue mynondurablequeue;"),
+    [ok] = execute("create route from mynondurableexchange to mynondurablequeue;"),
+    [{_, Result}] = execute("select queue_name from bindings where exchange_name='mynondurableexchange';"),
+    ?assertEqual([["mynondurablequeue"]], Result),
+    ok.
 
 select_connections_test() ->
-  [{_, Result}] = execute("select * from connections"),
-  ?assert(length(Result) > 0).
+    execute("select * from connections").
 
 post_message_test() ->
-  [ok] = execute("create exchange mynondurableexchange;"),
-  [ok] = execute("post 'Hello World' to mynondurableexchange;").  
+    [ok] = execute("create exchange mynondurableexchange;"),
+    [ok] = execute("post 'Hello World' to mynondurableexchange;").  
 
 post_message_with_routing_key_test() ->
-  [ok] = execute("create exchange mynondurableexchange;"),
-  [ok] = execute("post 'Hello World' to mynondurableexchange with routing_key rk;").
+    [ok] = execute("create exchange mynondurableexchange;"),
+    ["Message was unroutable"] = execute("post 'Hello World' to mynondurableexchange with routing_key rk;").
+
+purge_queue_test() ->
+    [ok] = execute("create queue mypurgingqueue;"),
+    [{ok, 0}] = execute("purge queue mypurgingqueue;"),
+    [ok] = execute("post 'Some Message' to '' with routing_key 'mypurgingqueue';"),
+    [{ok, 1}] = execute("purge queue mypurgingqueue;").
 
 retrieve_message_test() ->
-  [ok, ok] = execute("create exchange mydeliveryexchange; create queue mydeliveryqueue;"),
-  [ok] = execute("purge queue mydeliveryqueue;"),
-  [ok] = execute("create route from mydeliveryexchange to mydeliveryqueue;"),
-  [ok] = execute("post 'Some Message' to mydeliveryexchange;"),
-  [Response] = execute("get from mydeliveryqueue;"),
-  ?assertEqual(<<"Some Message">>, Response).
+    [ok, ok] = execute("create exchange mydeliveryexchange; create queue mydeliveryqueue;"),
+    [{ok, 0}] = execute("purge queue mydeliveryqueue;"),
+    [ok] = execute("create route from mydeliveryexchange to mydeliveryqueue;"),
+    [ok] = execute("post 'Some Message' to mydeliveryexchange;"),
+    [Response1] = execute("get from mydeliveryqueue;"),
+    ?assertEqual(<<"Some Message">>, Response1),
+    [{ok, 0}] = execute("purge queue mydeliveryqueue;"),
+    [Response2] = execute("get from mydeliveryqueue;"),
+    ?assertEqual(empty, Response2),
+    [Response3] = execute("get from bogusqueue;"),
+    ?assertEqual("queue 'bogusqueue' in vhost '/' not found", Response3).
+    
+select_exchange_with_raw_test() ->
+    Result = execute_raw([{select, "exchanges", [name], {{eq, name, "amq.topic"}, none}}]),
+    ?assertEqual([{[name], [["amq.topic"]]}], Result).
 
 execute(Command) ->
-  {ok, Result} = bql_server:send_command(<<"guest">>, <<"guest">>, Command),
-  Result.
+    {ok, Result} = bql_server:send_command(<<"guest">>, <<"guest">>, <<"text/bql">>, Command),
+    Result.
+    
+execute_raw(Terms) ->
+    Formatted = lists:flatten(io_lib:format("~p", [Terms])),
+    {ok, Result} = bql_server:send_command(<<"guest">>, <<"guest">>, <<"application/bql-terms">>,
+                                           Formatted),
+    Result.

test/command_parser_test.erl

 
 create_queue_with_nosemi_test() ->
     {ok, Commands} = commands:parse("create queue blah"),
-    ?assert([{create_queue,"blah",false}] =:= Commands).
+    ?assert([{create_queue,"blah",false, ""}] =:= Commands).
 
 create_nondurable_queue_test() ->
     {ok, Commands} = commands:parse("create queue blah;"),
-    ?assert([{create_queue,"blah",false}] =:= Commands).
+    ?assert([{create_queue,"blah",false, ""}] =:= Commands).
 
 create_nondurable_queue_with_space_in_name_test() ->
     {ok, Commands} = commands:parse("create queue 'bl ah';"),
-    ?assert([{create_queue,"bl ah",false}] =:= Commands).
+    ?assert([{create_queue,"bl ah",false, ""}] =:= Commands).
 
 create_nondurable_queue_with_exotic_name_test() ->
     {ok, Commands} = commands:parse("create queue b.a-t_b;"),
-    ?assert([{create_queue,"b.a-t_b",false}] =:= Commands).
+    ?assert([{create_queue,"b.a-t_b",false, ""}] =:= Commands).
 
 create_durable_queue_test() ->
     {ok, Commands} = commands:parse("create durable queue 'blah';"),
-    ?assert([{create_queue,"blah",true}] =:= Commands).
+    ?assert([{create_queue,"blah",true, ""}] =:= Commands).
 
 create_multiple_queues_test() ->
     {ok, Commands} = commands:parse("create durable queue 'blah'; create queue 'blah2';"),
-    ?assert([{create_queue,"blah",true}, {create_queue,"blah2", false}] =:= Commands).
+    ?assert([{create_queue,"blah",true, ""}, {create_queue,"blah2", false, ""}] =:= Commands).
 
 drop_queue_test() ->
     {ok, Commands} = commands:parse("drop queue 'myqueue';"),
 
 create_default_exchange_test() ->
     {ok, Commands} = commands:parse("create exchange 'myex';"),
-    ?assert([{create_exchange,"myex",direct,false}] =:= Commands).
+    ?assert([{create_exchange,"myex",direct,false,""}] =:= Commands).
 
 create_direct_exchange_test() ->
     {ok, Commands} = commands:parse("create direct exchange 'myex';"),
-    ?assert([{create_exchange,"myex",direct,false}] =:= Commands).
+    ?assert([{create_exchange,"myex",direct,false,""}] =:= Commands).
 
 create_headers_exchange_test() ->
     {ok, Commands} = commands:parse("create headers exchange 'myex';"),
-    ?assert([{create_exchange,"myex",headers,false}] =:= Commands).
+    ?assert([{create_exchange,"myex",headers,false,""}] =:= Commands).
 
 create_fanout_exchange_test() ->
     {ok, Commands} = commands:parse("create fanout exchange 'myex';"),
-    ?assert([{create_exchange,"myex",fanout,false}] =:= Commands).
+    ?assert([{create_exchange,"myex",fanout,false,""}] =:= Commands).
 
 create_topic_exchange_test() ->
     {ok, Commands} = commands:parse("create topic exchange 'myex';"),
-    ?assert([{create_exchange,"myex",topic,false}] =:= Commands).
+    ?assert([{create_exchange,"myex",topic,false,""}] =:= Commands).
 
 create_durable_default_exchange_test() ->
     {ok, Commands} = commands:parse("create durable exchange 'myex';"),
-    ?assert([{create_exchange,"myex",direct,true}] =:= Commands).
+    ?assert([{create_exchange,"myex",direct,true,""}] =:= Commands).
 
 create_durable_direct_exchange_test() ->
     {ok, Commands} = commands:parse("create durable direct exchange 'myex';"),
-    ?assert([{create_exchange,"myex",direct,true}] =:= Commands).
+    ?assert([{create_exchange,"myex",direct,true,""}] =:= Commands).
 
 create_durable_headers_exchange_test() ->
     {ok, Commands} = commands:parse("create durable headers exchange 'myex';"),
-    ?assert([{create_exchange,"myex",headers,true}] =:= Commands).
+    ?assert([{create_exchange,"myex",headers,true,""}] =:= Commands).
 
 create_durable_fanout_exchange_test() ->
     {ok, Commands} = commands:parse("create durable fanout exchange 'myex';"),
-    ?assert([{create_exchange,"myex",fanout,true}] =:= Commands).
+    ?assert([{create_exchange,"myex",fanout,true,""}] =:= Commands).
 
 drop_exchange_test() ->
     {ok, Commands} = commands:parse("drop exchange 'myex';"),
 
 create_binding_with_no_routing_key_test() ->
     {ok, Commands} = commands:parse("create route from 'myex' to 'myqueue';"),
-    ?assert([{create_binding,{"myex","myqueue",""}}] =:= Commands).
+    ?assert([{create_binding,{"myex","myqueue",""},""}] =:= Commands).
 
 create_binding_with_routing_key_test() ->
     {ok, Commands} = commands:parse("create route from 'myex' to 'myqueue' when routing_key is 'Hello';"),
-    ?assert([{create_binding,{"myex","myqueue","Hello"}}] =:= Commands).
+    ?assert([{create_binding,{"myex","myqueue","Hello"},""}] =:= Commands).
 
 drop_binding_with_no_routing_key_test() ->
     {ok, Commands} = commands:parse("drop route from 'myex' to 'myqueue';"),
 get_message_test() ->
     {ok, Commands} = commands:parse("get from myqueue;"),
     ?assert([{retrieve_message, "myqueue"}] =:= Commands).
+
+drain_queue_test() ->
+    {ok, Commands} = commands:parse("drain myqueue;"),
+    ?assert([{drain_queue, "myqueue"}] =:= Commands).
+