Commits

Tony Garnock-Jones committed 01da05e

Support (and some basic tests) for BLPOP and BRPOP

Comments (0)

Files changed (3)

 
 rpop(Client, Key) -> erldis_client:scall(Client, <<"rpop ">>, [Key]).
 
+blpop(Client, Keys) -> erldis_client:bcall(Client, <<"blpop ">>, Keys, infinity).
+blpop(Client, Keys, Timeout) -> erldis_client:bcall(Client, <<"blpop ">>, Keys, Timeout).
+
+brpop(Client, Keys) -> erldis_client:bcall(Client, <<"brpop ">>, Keys, infinity).
+brpop(Client, Keys, Timeout) -> erldis_client:bcall(Client, <<"brpop ">>, Keys, Timeout).
+
 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
 %% Commands operating on sets %%
 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

src/erldis_client.erl

 
 -include("erldis.hrl").
 
--export([scall/2, scall/3, call/2, call/3, stop/1, transact/1, transact/2, select/2, info/1,  sr_scall/2, sr_scall/3]).
+-export([scall/2, scall/3, scall/4, call/2, call/3, call/4, bcall/4, sr_scall/2, sr_scall/3]).
+-export([stop/1, transact/1, transact/2, select/2, info/1]).
 -export([connect/0, connect/1, connect/2, connect/3, connect/4]).
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
 		 terminate/2, code_change/3]).
 -export([bin/1, format/2, format/1, sformat/1]).
 -define(EOL, "\r\n").
 
+-define(default_timeout, 5000). %% same as in gen.erl in stdlib
+
 %%%%%%%%%%%%%
 %% helpers %%
 %%%%%%%%%%%%%
 	end.
 
 % This is the simple send with a single row of commands
-scall(Client, Cmd) -> scall(Client, Cmd, <<>>).
+scall(Client, Cmd) -> scall(Client, Cmd, <<>>, ?default_timeout).
 
-scall(Client, Cmd, Args) ->
+scall(Client, Cmd, Args) -> scall(Client, Cmd, Args, ?default_timeout).
+
+scall(Client, Cmd, Args, Timeout) ->
 	Args2 = sformat(Args),
-	send(Client, <<Cmd/binary, Args2/binary>>).
+	send(Client, <<Cmd/binary, Args2/binary>>, Timeout).
 
 % This is the complete send with multiple rows
-call(Client, Cmd) -> call(Client, Cmd, []).
+call(Client, Cmd) -> call(Client, Cmd, [], ?default_timeout).
 
-call(Client, Cmd, Args) ->
+call(Client, Cmd, Args) -> call(Client, Cmd, Args, ?default_timeout).
+
+call(Client, Cmd, Args, Timeout) ->
 	Args2 = format(Args),
-	send(Client, <<Cmd/binary, Args2/binary>>).
+	send(Client, <<Cmd/binary, Args2/binary>>, Timeout).
 
-send(Client, Cmd) ->
+% Blocking call with server-side timeout added as final command arg
+bcall(Client, Cmd, Args, Timeout) ->
+    scall(Client, Cmd, Args ++ [server_timeout(Timeout)], erlang_timeout(Timeout)).
+
+% Erlang uses milliseconds, with symbol "infinity" for "wait forever";
+% redis uses seconds, with 0 for "wait forever".
+server_timeout(infinity) -> 0;
+server_timeout(V) when is_number(V) -> V / 1000.
+
+% Kludge on a few milliseconds to the timeout we gave the server, to
+% give the network and client a chance to catch up.
+erlang_timeout(infinity) -> infinity;
+erlang_timeout(V) when is_number(V) -> V + ?default_timeout.
+
+send(Client, Cmd, Timeout) ->
 	Piped = gen_server2:call(Client, is_pipelined),
 	
 	if
 		Piped ->
 			gen_server2:cast(Client, {send, Cmd});
 		true ->
-			case gen_server2:call(Client, {send, Cmd}) of
+			case gen_server2:call(Client, {send, Cmd}, Timeout) of
 				{error, Reason} -> throw({error, Reason});
 				Retval -> Retval
 			end
 		Socket -> gen_tcp:close(Socket)
 	end.
 
-code_change(_OldVsn, State, _Extra) -> {ok, State}.
+code_change(_OldVsn, State, _Extra) -> {ok, State}.

test/erldis_list_tests.erl

 	%?assertEqual([], erldis_list:sublist(<<"foo">>, Client, 3)).
 	% TODO: test negative sublist start index
 
+blocking_queue_test() ->
+    Client = setup(),
+
+    erldis:rpush(Client, <<"a">>, <<"value">>),
+    ?assertEqual([<<"a">>, <<"value">>], erldis:blpop(Client, [<<"a">>, <<"b">>])),
+    erldis:rpush(Client, <<"b">>, <<"value">>),
+    ?assertEqual([<<"b">>, <<"value">>], erldis:blpop(Client, [<<"a">>, <<"b">>])),
+    erldis:rpush(Client, <<"a">>, <<"first">>),
+    erldis:rpush(Client, <<"a">>, <<"second">>),
+    ?assertEqual([<<"a">>, <<"first">>], erldis:blpop(Client, [<<"a">>, <<"b">>])),
+    ?assertEqual([<<"a">>, <<"second">>], erldis:blpop(Client, [<<"a">>, <<"b">>])),
+
+    spawn_link(fun blocking_queue_sender/0),
+    ?assertEqual([<<"a">>, <<1>>], erldis:blpop(Client, [<<"a">>, <<"b">>], 1000)),
+    ?assertEqual([<<"b">>, <<1>>], erldis:blpop(Client, [<<"a">>, <<"b">>], 1000)),
+    ?assertEqual([<<"a">>, <<2>>], erldis:blpop(Client, [<<"a">>, <<"b">>], 1000)),
+    ?assertEqual([], erldis:blpop(Client, [<<"a">>, <<"b">>], 1000)),
+    ?assertEqual([<<"a">>, <<3>>], erldis:blpop(Client, [<<"a">>, <<"b">>], 1000)),
+
+    erldis_client:stop(Client).
+
+blocking_queue_sender() ->
+    Client = setup(),
+    erldis:rpush(Client, <<"a">>, <<1>>),
+    timer:sleep(100),
+    erldis:rpush(Client, <<"b">>, <<1>>),
+    timer:sleep(100),
+    erldis:rpush(Client, <<"a">>, <<2>>),
+    timer:sleep(3000),
+    erldis:rpush(Client, <<"a">>, <<3>>),
+    erldis_client:stop(Client).
+
 foreach_test() ->
 	Client = setup(),
 	?assertEqual(0, erldis_list:len(<<"foo">>, Client)),
 	application:load(erldis),
 	{ok, Client} = erldis_client:connect(),
 	?assertEqual(erldis_client:call(Client, <<"flushdb ">>), [ok]),
-	Client.
+	Client.