Commits

Jacob Perkins committed 00f791a

mega binary patch from @cstar breaks list API for better performance, bump vsn to 0.1.0

Comments (0)

Files changed (17)

 {application, erldis, [
 	{description, "Erlang Redis application"},
-	{vsn, "0.0.12"},
+	{vsn, "0.1.0"},
 	{registered, [erldis_sup]},
 	{mod, {erldis_app, []}},
 	% TODO: include eunit?
 	{applications, [kernel, stdlib]},
-	{modules, [erldis_client, erldis, erldis_proto, erldis_app, erldis_sup,
-		erldis_sync_client, erldis_sets, erldis_dict, erldis_list]},
+	{modules, [
+		erldis_client, erldis, erldis_proto, erldis_app, erldis_sup,
+		erldis_sets, erldis_dict, erldis_list, gen_server2
+	]},
 	{env, [{host, "localhost"}, {port, 6379}, {timeout, 500}]}
 ]}.

ebin/erldis.appup

-{"0.0.12", [
+{"0.1.0", [
+	{"0.0.12", [{restart_application, erldis}]},
 	{"0.0.11", [
 		{load_module, erldis},
 		{load_module, erldis_proto},
 		{load_module, erldis_sync_client}
 	]},
 	{"0.0.10", [
-		{load_module, erldis_sync_client},
+		{load_module, erldis_client},
 		{load_module, erldis_client}
 	]},
-	{"0.0.9", [{load_module, erldis_sync_client}]},
-	{"0.0.8", [{load_module, erldis_sync_client}]},
-	{"0.0.7", [{load_module, erldis_sync_client}]},
+	{"0.0.9", [{load_module, erldis_client}]},
+	{"0.0.8", [{load_module, erldis_client}]},
+	{"0.0.7", [{load_module, erldis_client}]},
 	{"0.0.6", [
-		{load_module, erldis_sync_client},
+		{load_module, erldis_client},
 		{load_module, erldis_dict},
 		{load_module, erldis_list}
 	]},
 	{"0.0.5", [
-		{load_module, erldis_sync_client},
+		{load_module, erldis_client},
 		{load_module, erldis_sets},
 		{add_module, erldis_dict},
 		{add_module, erldis_list}
 	]},
 	{"0.0.4", [
-		{load_module, erldis_sync_client},
+		{load_module, erldis_client},
 		{load_module, erldis_sets}
 	]},
 	{"0.0.3", [
-		{load_module, erldis_sync_client},
+		{load_module, erldis_client},
 		{load_module, erldis_sets}
 	]},
 	{"0.0.2", [
-		{add_module, erldis_sync_client},
+		{add_module, erldis_client},
 		{load_module, erldis_sets}
 	]},
 	{"0.0.1", [
 		{add_module, erldis_sets}
 	]}
 ], [
+	{"0.0.12", [{restart_application, erldis}]},
 	{"0.0.11", [
 		{load_module, erldis},
 		{load_module, erldis_proto},
 		{load_module, erldis_sync_client}
 	]},
 	{"0.0.10", [
-		{load_module, erldis_sync_client},
+		{load_module, erldis_client},
 		{load_module, erldis_client}
 	]},
-	{"0.0.9", [{load_module, erldis_sync_client}]},
-	{"0.0.8", [{load_module, erldis_sync_client}]},
-	{"0.0.7", [{load_module, erldis_sync_client}]},
+	{"0.0.9", [{load_module, erldis_client}]},
+	{"0.0.8", [{load_module, erldis_client}]},
+	{"0.0.7", [{load_module, erldis_client}]},
 	{"0.0.6", [
-		{load_module, erldis_sync_client},
+		{load_module, erldis_client},
 		{load_module, erldis_dict},
 		{load_module, erldis_list}
 	]},
 	{"0.0.5", [
-		{load_module, erldis_sync_client},
+		{load_module, erldis_client},
 		{load_module, erldis_sets},
 		{delete_module, erldis_dict},
 		{delete_module, erldis_list}
 	]},
 	{"0.0.4", [
-		{load_module, erldis_sync_client},
+		{load_module, erldis_client},
 		{load_module, erldis_sets}
 	]},
 	{"0.0.3", [
-		{load_module, erldis_sync_client},
+		{load_module, erldis_client},
 		{load_module, erldis_sets}
 	]},
 	{"0.0.2", [
-		{delete_module, erldis_sync_client},
+		{delete_module, erldis_client},
 		{load_module, erldis_sets}
 	]},
 	{"0.0.1", [

include/erldis.hrl

--record(redis, {socket,buffer=[],reply_caller,calls=0,remaining=0,pstate=empty,results=[], host, port, timeout}).
+-record(redis, {socket,buffer=[],reply_caller,pipeline=false,calls=0,remaining=0,pstate=empty,results=[], host, port, timeout, db= <<"0">>}).
 
 %% helpers
 flatten({error, Message}) ->
-    {error, Message};
-flatten(List) when is_list(List)->   
-    lists:flatten(List).
+	{error, Message};
+flatten(List) when is_list(List)->	 
+	lists:flatten(List).
 
 %% exposed API
 connect() ->
-    erldis_sync_client:connect().
+	erldis_client:connect().
 connect(Host) ->
-    erldis_sync_client:connect(Host).
+	erldis_client:connect(Host).
 connect(Host, Port) ->
-    erldis_sync_client:connect(Host, Port).
+	erldis_client:connect(Host, Port).
 connect(Host, Port, Options) ->
-    erldis_sync_client:connect(Host, Port, Options).
+	erldis_client:connect(Host, Port, Options).
 
-%quit(Client) ->
-%    erldis_sync_client:scall(Client, "QUIT"),
-%    erldis_sync_client:disconnect(Client).
+get_all_results(Client) -> gen_server2:call(Client, get_all_results).
+
+set_pipelining(Client, Bool) -> gen_server2:cast(Client, {pipelining, Bool}).
+
+quit(Client) ->
+	erldis_client:scall(Client, <<"QUIT">>),
+	erldis_client:disconnect(Client).
 
 %% Commands operating on string values
-internal_set_like(Client, Command, Key, Value) ->
-    case erldis_sync_client:call(Client, Command, [[Key, length(Value)], [Value]]) of
-      [{error, _}=Error]->Error;
-      [R] when R == ok orelse R==nil orelse R == true orelse R == false -> R;
-      R -> R
-    end.
+internal_set_like(Client, Command, Key, Value) when is_binary(Key), is_binary(Value) ->
+	Size = list_to_binary(integer_to_list(size(Value))),
+	Cmd = <<Command/binary, Key/binary, " ", Size/binary, "\r\n", Value/binary>>,
+	
+	case erldis_client:call(Client, Cmd) of
+		[{error, _}=Error] -> Error;
+		[R] when R == ok; R == nil; R == true; R == false -> R;
+		R -> R
+	end;
+internal_set_like(Client, Command, Key, Value) when is_binary(Value) ->
+	case erldis_client:call(Client, Command, [[Key, size(Value)], [Value]]) of
+		[{error, _}=Error] -> Error;
+		[R] when R == ok; R == nil; R == true; R == false -> R;
+		R -> R
+	end;
+internal_set_like(_, _, _, _) ->
+	{error, badarg}.
 
-%get_all_results(Client) -> erldis_client:get_all_results(Client).
+auth(Client, Password) ->
+	erldis_client:scall(Client, <<"auth ", Password/binary>>).
 
-auth(Client, Password) -> erldis_sync_client:scall(Client, auth, [Password]).
+exec(Client, Fun) ->
+	case erldis_client:sr_scall(Client, <<"multi ">>) of
+		ok ->
+			set_pipelining(Client, true),
+			Fun(Client),
+			get_all_results(Client),
+			set_pipelining(Client, false),
+			erldis_client:scall(Client, <<"exec ">>);
+		_ ->
+			{error, unsupported}
+	end.
 
-set(Client, Key, Value) -> internal_set_like(Client, set, Key, Value).
-get(Client, Key) -> erldis_sync_client:sr_scall(Client, get, [Key]).
-getset(Client, Key, Value) -> internal_set_like(Client, getset, Key, Value).
-mget(Client, Keys) -> erldis_sync_client:scall(Client, mget, Keys).
-setnx(Client, Key, Value) -> internal_set_like(Client, setnx, Key, Value).
-incr(Client, Key) -> erldis_sync_client:sr_scall(Client, incr, [Key]).
-incrby(Client, Key, By) -> erldis_sync_client:sr_scall(Client, incrby, [Key, By]).
-decr(Client, Key) -> erldis_sync_client:sr_scall(Client, decr, [Key]).
-decrby(Client, Key, By) -> erldis_sync_client:sr_scall(Client, decrby, [Key, By]).
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%% Commands operating on every value %%
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
 
+exists(Client, Key) -> erldis_client:sr_scall(Client, <<"exists ", Key/binary>>).
 
+del(Client, Key) -> erldis_client:sr_scall(Client, <<"del ", Key/binary>>).
 
-%% Commands operating on every value
-exists(Client, Key) -> erldis_sync_client:sr_scall(Client, exists, [Key]).
-del(Client, Key) -> erldis_sync_client:sr_scall(Client, del, [Key]).
-type(Client, Key) -> erldis_sync_client:sr_scall(Client, type, [Key]).
-keys(Client, Pattern) -> erldis_sync_client:scall(Client, keys, [Pattern]).
-randomkey(Client, Key) -> erldis_sync_client:sr_scall(Client, randomkey, [Key]).
-rename(Client, OldKey, NewKey) -> erldis_sync_client:sr_scall(Client, rename, [OldKey, NewKey]).
-renamenx(Client, OldKey, NewKey) -> erldis_sync_client:sr_scall(Client, renamenx, [OldKey, NewKey]).
-dbsize(Client) -> erldis_sync_client:sr_scall(Client, dbsize).
-expire(Client, Key, Seconds) -> erldis_sync_client:sr_scall(Client, expire, [Key, Seconds]).
-ttl(Client, Key) -> erldis_sync_client:sr_scall(Client, ttl, [Key]).
+type(Client, Key) -> erldis_client:sr_scall(Client, <<"type ", Key/binary>>).
 
+keys(Client, Pattern) -> erldis_client:scall(Client, <<"keys ", Pattern/binary>>).
 
+randomkey(Client, Key) -> erldis_client:sr_scall(Client, <<"randomkey ", Key/binary>>).
 
-%% Commands operating on lists
-rpush(Client, Key, Value) -> internal_set_like(Client, rpush, Key, Value).
-lpush(Client, Key, Value) -> internal_set_like(Client, lpush, Key, Value).
-llen(Client, Key) -> erldis_sync_client:sr_scall(Client, llen, [Key]).
-lrange(Client, Key, Start, End) -> erldis_sync_client:scall(Client, lrange, [Key, Start, End]).
-ltrim(Client, Key, Start, End) -> erldis_sync_client:scall(Client, ltrim, [Key, Start, End]).
-lindex(Client, Key, Index) -> erldis_sync_client:scall(Client, lindex, [Key, Index]).
+rename(Client, OldKey, NewKey) ->
+	erldis_client:sr_scall(Client, <<"rename ", OldKey/binary, " ", NewKey/binary>>).
+
+renamenx(Client, OldKey, NewKey) ->
+	erldis_client:sr_scall(Client, <<"renamenx ", OldKey/binary, " ", NewKey/binary>>).
+
+dbsize(Client) -> erldis_client:sr_scall(Client, <<"dbsize ">>).
+
+expire(Client, Key, Seconds) ->
+	erldis_client:sr_scall(Client, <<"expire ", Key/binary, " ", Seconds/binary>>).
+
+ttl(Client, Key) -> erldis_client:sr_scall(Client, <<"ttl ", Key/binary>>).
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%% Commands operating on string values %%
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+
+set(Client, Key, Value) -> internal_set_like(Client, <<"set ">>, Key, Value).
+
+get(Client, Key) -> erldis_client:sr_scall(Client, <<"get ", Key/binary>>).
+
+getset(Client, Key, Value) -> internal_set_like(Client, <<"getset ">>, Key, Value).
+
+mget(Client, Keys) -> erldis_client:scall(Client, <<"mget ">>, Keys).
+
+setnx(Client, Key, Value) -> internal_set_like(Client, <<"setnx ">>, Key, Value).
+
+incr(Client, Key) -> erldis_client:sr_scall(Client, <<"incr ", Key/binary>>).
+
+incrby(Client, Key, By) -> erldis_client:sr_scall(Client, <<"incrby ">>, [Key, By]).
+
+decr(Client, Key) -> erldis_client:sr_scall(Client, <<"decr ", Key/binary>>).
+
+decrby(Client, Key, By) -> erldis_client:sr_scall(Client, <<"decrby ">>, [Key, By]).
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%% Commands operating on lists %%
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+
+rpush(Client, Key, Value) -> internal_set_like(Client, <<"rpush ">>, Key, Value).
+
+lpush(Client, Key, Value) -> internal_set_like(Client, <<"lpush ">>, Key, Value).
+
+llen(Client, Key) -> erldis_client:sr_scall(Client, <<"llen ">>, [Key]).
+
+lrange(Client, Key, Start, End) ->
+	erldis_client:scall(Client, <<"lrange ">>, [Key, Start, End]).
+
+ltrim(Client, Key, Start, End) ->
+	erldis_client:scall(Client, <<"ltrim ">>, [Key, Start, End]).
+	
+lindex(Client, Key, Index) ->
+	erldis_client:scall(Client, <<"lindex ">>, [Key, Index]).
+
 lset(Client, Key, Index, Value) ->
-    erldis_client:send(Client, lset, [[Key, Index, length(Value)],
-                               [Value]]).
+	erldis_client:send(Client, <<"lset ">>, [[Key, Index, size(Value)], [Value]]).
+
 lrem(Client, Key, Number, Value) ->
-    erldis_client:send(Client, lrem, [[Key, Number, length(Value)],
-                               [Value]]).
-lpop(Client, Key) -> erldis_sync_client:scall(Client, lpop, [Key]).
-rpop(Client, Key) -> erldis_sync_client:scall(Client, rpop, [Key]).
+	erldis_client:send(Client, <<"lrem ">>, [[Key, Number, size(Value)], [Value]]).
+	
+lpop(Client, Key) -> erldis_client:scall(Client, <<"lpop ">>, [Key]).
 
+rpop(Client, Key) -> erldis_client:scall(Client, <<"rpop ">>, [Key]).
 
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%% Commands operating on sets %%
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
 
-%% Commands operating on sets
-sadd(Client, Key, Value) -> internal_set_like(Client, sadd, Key, Value).
-srem(Client, Key, Value) -> internal_set_like(Client, srem, Key, Value).
-smove(Client, SrcKey, DstKey, Member) -> erldis_sync_client:call(Client, smove, [[SrcKey, DstKey, length(Member)],
-                                                                     [Member]]).
-scard(Client, Key) -> erldis_sync_client:scall(Client, scard, [Key]).
-sismember(Client, Key, Value) -> internal_set_like(Client, sismember, Key, Value).
-sintersect(Client, Keys) -> erldis_sync_client:scall(Client, sinter, Keys).
+sadd(Client, Key, Value) -> internal_set_like(Client, <<"sadd ">>, Key, Value).
+
+srem(Client, Key, Value) -> internal_set_like(Client, <<"srem ">>, Key, Value).
+
+smove(Client, SrcKey, DstKey, Member) ->
+	erldis_client:call(Client, <<"smove ">>, [[SrcKey, DstKey, size(Member)], [Member]]).
+
+scard(Client, Key) -> erldis_client:scall(Client, <<"scard ">>, [Key]).
+
+sismember(Client, Key, Value) -> internal_set_like(Client, <<"sismember ">>, Key, Value).
+
+sintersect(Client, Keys) -> erldis_client:scall(Client, <<"sinter ">>, Keys).
+
 sinter(Client, Keys) -> sintersect(Client, Keys).
-sinterstore(Client, DstKey, Keys) -> erldis_sync_client:scall(Client, sinterstore, [DstKey|Keys]).
-sunion(Client, Keys) -> erldis_sync_client:scall(Client, sunion, Keys).
-sunionstore(Client, DstKey, Keys) -> erldis_sync_client:scall(Client, sunionstore, [DstKey|Keys]).
-sdiff(Client, Keys) -> erldis_sync_client:scall(Client, sdiff, Keys).
-sdiffstore(Client, DstKey, Keys) -> erldis_sync_client:scall(Client, sdiffstore, [DstKey|Keys]).
-smembers(Client, Key) -> erldis_sync_client:scall(Client, smembers, [Key]).
 
+sinterstore(Client, DstKey, Keys) ->
+	erldis_client:scall(Client, <<"sinterstore ">>, [DstKey|Keys]).
 
-%% Multiple DB commands
-select(Client, Index) -> erldis_sync_client:scall(Client, select, [Index]).
-move(Client, Key, DBIndex) -> erldis_sync_client:scall(Client, move, [Key, DBIndex]).
-flushdb(Client) -> erldis_sync_client:scall(Client, flushdb).
-flushall(Client) -> erldis_sync_client:scall(Client, flushall).
+sunion(Client, Keys) -> erldis_client:scall(Client, <<"sunion ">>, Keys).
 
+sunionstore(Client, DstKey, Keys) ->
+	erldis_client:scall(Client, <<"sunionstore ">>, [DstKey|Keys]).
 
-%% Commands operating on both lists and sets
-sort(Client, Key) -> erldis_sync_client:scall(Client, sort, [Key]).
-sort(Client, Key, Extra) -> erldis_sync_client:scall(Client, sort, [Key, Extra]).    
+sdiff(Client, Keys) -> erldis_client:scall(Client, <<"sdiff ">>, Keys).
 
+sdiffstore(Client, DstKey, Keys) ->
+	erldis_client:scall(Client, <<"sdiffstore ">>, [DstKey|Keys]).
 
-%% Persistence control commands
-save(Client) -> erldis_sync_client:scall(Client, save).
-bgsave(Client) -> erldis_sync_client:scall(Client, bgsave).
-lastsave(Client) -> erldis_sync_client:scall(Client, lastsave).
-shutdown(Client) -> erldis_client:scall(Client, shutdown).
+smembers(Client, Key) -> erldis_client:scall(Client, <<"smembers ">>, [Key]).
 
+%%%%%%%%%%%%%
+%% Sorting %%
+%%%%%%%%%%%%%
 
-%% Remote server control commands
-info(Client) -> erldis_sync_client:scall(Client, info).
-slaveof(Client, Host, Port) -> erldis_sync_client:scall(Client, slaveof, [Host, Port]).
-slaveof(Client) -> erldis_sync_client:scall(Client, slaveof, ["no one"]).
+sort(Client, Key) -> erldis_client:scall(Client, <<"sort ">>, [Key]).
+
+sort(Client, Key, Extra) -> erldis_client:scall(Client, <<"sort ">>, [Key, Extra]).	
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%
+%% Multiple DB commands %%
+%%%%%%%%%%%%%%%%%%%%%%%%%%
+
+select(Client, Index) -> erldis_client:scall(Client, <<"select ">>, [Index]).
+
+move(Client, Key, DBIndex) ->
+	erldis_client:scall(Client, <<"move ">>, [Key, DBIndex]).
+
+flushdb(Client) -> erldis_client:scall(Client, <<"flushdb ">>).
+
+flushall(Client) -> erldis_client:scall(Client, <<"flushall ">>).
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%% Persistence control commands %%
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+
+save(Client) -> erldis_client:scall(Client, <<"save ">>).
+
+bgsave(Client) -> erldis_client:scall(Client, <<"bgsave ">>).
+
+lastsave(Client) -> erldis_client:scall(Client, <<"lastsave ">>).
+
+shutdown(Client) -> erldis_client:scall(Client, <<"shutdown ">>).
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%% Remote server control commands %%
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+
+info(Client) -> erldis_client:scall(Client, <<"info ">>).
+
+slaveof(Client, Host, Port) ->
+	erldis_client:scall(Client, <<"slaveof ">>, [Host, Port]).
+
+slaveof(Client) -> erldis_client:scall(Client, <<"slaveof ">>, ["no one"]).

src/erldis_client.erl

+%% @doc This is a    very similar to erldis_client, but it does
+%% synchronous calls instead of async pipelining. Does so by keeping a queue
+%% of From pids in State.calls, then calls gen_server2:reply when it receives
+%% handle_info({tcp, ...). Therefore, it must get commands on handle_call
+%% instead of handle_cast, which requires direct command sending instead of
+%% using the API in erldis.
+%%
+%% @todo Much of the code has been copied from erldis_client and should be
+%% abstracted & shared where possible
+%%
+%% @author Jacob Perkins <japerk@gmail.com>
 -module(erldis_client).
--behavior(gen_server).
 
--export([start/1, start/2, connect/0, connect/1, connect/2, connect/3, disconnect/1]).
--export([str/1, format/1, sformat/1]).
--export([asend/2, send/3, send/2, ssend/3, ssend/2, get_all_results/1]).
--export([init/1, handle_call/3, handle_cast/2,
-         handle_info/2, terminate/2, code_change/3]).
+-behaviour(gen_server2).
 
 -include("erldis.hrl").
 
+-export([scall/2, scall/3, call/2, call/3, stop/1, transact/1, transact/2, select/2, info/1,  sr_scall/2, sr_scall/3]).
+-export([connect/0, connect/1, connect/2, connect/3, connect/4]).
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+		 terminate/2, code_change/3]).
+-export([str/1, format/2, format/1, sformat/1]).
 -define(EOL, "\r\n").
 
-
-%% Helpers
+%%%%%%%%%%%%%
+%% helpers %%
+%%%%%%%%%%%%%
+    
 str(X) when is_list(X) ->
-    X;
+	list_to_binary(X);
 str(X) when is_atom(X) ->
-    atom_to_list(X);
+	list_to_binary(atom_to_list(X));
 str(X) when is_binary(X) ->
-    binary_to_list(X);
+	X;
 str(X) when is_integer(X) ->
-    integer_to_list(X);
+	list_to_binary(integer_to_list(X));
 str(X) when is_float(X) ->
-    float_to_list(X).
-
+	list_to_binary(float_to_list(X));
+str(X) ->
+  term_to_binary(X).
+  
 format([], Result) ->
-    string:join(lists:reverse(Result), ?EOL);
+  Result;
+format([Line|Rest], <<>>) ->
+  JoinedLine = binary_join(Line, <<" ">>),
+  format(Rest, JoinedLine);
+  
 format([Line|Rest], Result) ->
-    JoinedLine = string:join([str(X) || X <- Line], " "),
-    format(Rest, [JoinedLine|Result]).
+  Sep = <<?EOL>>,
+	JoinedLine = binary_join(Line, <<" ">>),
+	format(Rest, <<Result/binary,Sep/binary,JoinedLine/binary>>).
 
 format(Lines) ->
-    format(Lines, []).
+	format(Lines, <<>>).
+
+sformat(<<>>)->
+  <<>>;
 sformat(Line) ->
-    format([Line], []).
+	format([Line],<<>>).
 
-app_get_env(AppName, Varname) ->
-    app_get_env(AppName, Varname, undefined).
+binary_join([], _)-> <<>>;
+binary_join(Array, Sep)->
+  Sz = size(Sep),
+  R = lists:foldl(fun(Elem, Acc)->
+    E2 = str(Elem),
+	  <<Acc/binary,Sep/binary,E2/binary>>
+	end, <<>>, Array),
+	<<_:Sz/bytes,Result/binary>> = R,
+	Result.
+
+trim2({ok, S}) ->
+  Read = size(S)-2,
+  <<R:Read/bytes,_R/binary>> = S,
+  R;
+trim2(S) ->
+	trim2({ok, S}).
+
 app_get_env(AppName, Varname, Default) ->
-    case application:get_env(AppName, Varname) of
-        undefined ->
-            {ok, Default};
-        V ->
-            V
-    end.
-%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+	case application:get_env(AppName, Varname) of
+		undefined ->
+			{ok, Default};
+		V ->
+			V
+	end.
 
+ensure_started(#redis{socket=undefined, host=Host, port=Port, timeout=Timeout, db=DB}=State)->
+	Opts = [binary, {active, false}, {packet, line}, {nodelay, true}, {send_timeout, Timeout}],
+	case gen_tcp:connect(Host, Port, Opts, Timeout) of
+		{ok, Socket} ->
+			%error_logger:info_report([{?MODULE, reconnected}, State]),
+			End = <<?EOL>>,
+			case DB of 
+			  <<"0">> ->
+			    ok;
+			  _Id ->
+			    gen_tcp:send(Socket, <<"select ",DB,End>>),
+			    {ok, <<"+OK",_R/binary>>} = gen_tcp:recv(Socket, 10)
+			end,
+			inet:setopts(Socket, [{active, once}]),
+			State#redis{socket=Socket};
+		{error, Why} ->
+			Report = [{?MODULE, unable_to_connect}, {error, Why}, State],
+			error_logger:warning_report(Report),
+			State
+	end;
+ensure_started(State)->
+	State.
 
-%% Exported API
-start(Host) ->
-    connect(Host).
-start(Host, Port) ->
-    connect(Host, Port).
+%%%%%%%%%%%%%%%%%%
+%% call command %%
+%%%%%%%%%%%%%%%%%%
+
+sr_scall(Client, Cmd) -> sr_scall(Client, Cmd, []).
+
+sr_scall(Client, Cmd, Args) -> 
+  case scall(Client, Cmd, Args) of
+    [R] -> R;
+    ok -> ok
+  end.
+
+% This is the simple send with a single row of commands
+scall(Client, Cmd) -> scall(Client, Cmd, <<>> ).
+
+scall(Client, Cmd, Args) ->
+  Args2 = sformat(Args),
+	M = case gen_server2:call(Client, is_pipelined) of
+	  true -> cast;
+	  _ -> call
+	end,
+	case apply(gen_server2,M,[Client,{send, <<Cmd/binary,Args2/binary>>}]) of
+		{error, Reason} -> throw({error, Reason});
+		Retval -> Retval
+	end.
+	
+% This is the complete send with multiple rows
+call(Client, Cmd) -> call(Client, Cmd, []).
+
+call(Client, Cmd, Args) ->
+  Args2 = format(Args),
+	SCmd = <<Cmd/binary, Args2/binary>>,
+	M = case gen_server2:call(Client, is_pipelined) of
+	  true -> cast;
+	  _ -> call
+	end,
+	case apply(gen_server2,M,[Client, {send, SCmd}]) of
+		{error, Reason} -> throw({error, Reason});
+		Retval -> Retval
+	end.
+
+% stop is synchronous so can be sure that client is shutdown
+stop(Client) -> gen_server2:call(Client, disconnect).
+
+transact(F) ->
+	case connect() of
+		{error, Error} -> {error, Error};
+		{ok, Client} -> transact(Client, F)
+	end.
+
+transact(DB, F) when is_integer(DB) ->
+	case connect(DB) of
+		{error, Error} -> {error, Error};
+		{ok, Client} -> transact(Client, F)
+	end;
+transact(Client, F) when is_pid(Client) ->
+	try F(Client) of
+		Result -> stop(Client), Result
+	catch
+		throw:Result -> stop(Client), throw(Result);
+		error:Result -> stop(Client), {error, Result};
+		exit:Result -> stop(Client), exit(Result)
+	end.
+
+select(Client, DB) ->
+  DBB = list_to_binary(integer_to_list(DB)),
+	[ok] = scall(Client,<<"select ",DBB/binary>>),
+	Client.
+
+info(Client) ->
+	F = fun(Stat) ->
+			case parse_stat(Stat) of
+				undefined -> false;
+				{Key, Val} -> {Key, Val}
+			end
+		end,
+	
+	[S] = scall(Client, info),
+	elists:mapfilter(F, string:tokens(binary_to_list(S), "\r\n")).
+
+parse_stat(<<"redis_version:",Vsn/binary>>) ->
+	{version, Vsn};
+parse_stat(<<"uptime_in_seconds:",Val/binary>>) ->
+	{uptime, list_to_integer(Val)};
+parse_stat(<<"connected_clients:",Val/binary>>) ->
+	{clients, list_to_integer(Val)};
+parse_stat(<<"connected_slaves:",Val/binary>>) ->
+	{slaves, list_to_integer(Val)};
+parse_stat(<<"used_memory:",Val/binary>>) ->
+	{memory, list_to_integer(Val)};
+parse_stat(<<"changes_since_last_save:",Val/binary>>) ->
+	{changes, list_to_integer(Val)};
+parse_stat(<<"last_save_time:",Val/binary>>) ->
+	{last_save, list_to_integer(Val)};
+parse_stat(<<"total_connections_received:",Val/binary>>) ->
+	{connections, list_to_integer(Val)};
+parse_stat(<<"total_commands_processed:",Val/binary>>) ->
+	{commands, list_to_integer(Val)};
+parse_stat(_) ->
+	undefined.
+
+%%%%%%%%%%
+%% init %%
+%%%%%%%%%%
 
 connect() ->
-    {ok, Host} = app_get_env(erldis, host, "localhost"),
-    connect(Host).
-connect(Host) ->
-    {ok, Port} = app_get_env(erldis, port, 6379),
-    connect(Host, Port).
+	{ok, Host} = app_get_env(erldis, host, "localhost"),
+	connect(Host).
+
+connect(Host) when is_list(Host) ->
+	{ok, Port} = app_get_env(erldis, port, 6379),
+	connect(Host, Port);
+connect(DB) when is_integer(DB) ->
+	case connect() of
+		{ok, Client} -> {ok, select(Client, DB)};
+		Other -> Other
+	end.
+
 connect(Host, Port) ->
-    {ok, Timeout} = app_get_env(erldis, timeout, 180000),
-    connect(Host, Port, [{timeout, Timeout}]).
+	{ok, Timeout} = app_get_env(erldis, timeout, 500),
+	connect(Host, Port, [{timeout, Timeout}]).
+
 connect(Host, Port, Options) ->
-    % japerk: added Options list so can specify a timeout
-    gen_server:start_link(?MODULE, [Host, Port], Options).
+	% not using start_link because caller may not want to crash if this
+	% server is shutdown
+	gen_server2:start(?MODULE, [Host, Port], Options).
 
-% This is the simple send with a single row of commands
-ssend(Client, Cmd) -> ssend(Client, Cmd, []).
-ssend(Client, Cmd, Args) ->
-    gen_server:cast(Client, {send, sformat([Cmd|Args])}).
+connect(Host, Port, Options, DB) ->
+	case connect(Host, Port, Options) of
+		{ok, Client} -> {ok, select(Client, DB)};
+		Other -> Other
+	end.
 
-% This is the complete send with multiple rows
-send(Client, Cmd) -> send(Client, Cmd, []).
-send(Client, Cmd, Args) ->
-    gen_server:cast(Client, {send,
-        string:join([str(Cmd), format(Args)], " ")}).
+init([Host, Port]) ->
+	process_flag(trap_exit, true),
+	{ok, Timeout} = app_get_env(erldis, timeout, 500),
+	% presence of send_timeout_close Opt causes {error, badarg}
+	Opts = [binary, {active, once}, {packet, line}, {nodelay, true},
+			{send_timeout, Timeout}],
+	% without timeout, default is infinity
+	case gen_tcp:connect(Host, Port, Opts, Timeout) of
+		{error, Why} ->
+			{stop, {socket_error, Why}};
+		{ok, Socket} ->
+			% calls is a queue instead of a count
+			{ok, #redis{socket=Socket, calls=queue:new(), host=Host, port=Port, timeout=Timeout}}
+	end.
 
-% asynchronous send, we don't care about the result.
-asend(Client, Cmd) ->
-    gen_server:cast(Client, {asend, Cmd}).
-disconnect(Client) ->
-    gen_server:call(Client, disconnect).
+%%%%%%%%%%%%%%%%%
+%% handle_call %%
+%%%%%%%%%%%%%%%%%
 
-get_all_results(Client) ->
-    gen_server:call(Client, get_all_results).
-%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+handle_call(is_pipelined, _From, #redis{pipeline=P}=State)->
+  {reply, P, State};
 
-
-
-%% gen_server callbacks
-init([Host, Port]) ->
-    process_flag(trap_exit, true),
-    ConnectOptions = [list, {active, once}, {packet, line}, {nodelay, true}],
-    {ok, Timeout} = app_get_env(erldis, timeout, 180000),
-    % japerk: without timeout, default is infinity
-    case gen_tcp:connect(Host, Port, ConnectOptions, Timeout) of
-        {error, econnrefused} ->
-            % japerk: no redis server, return ignore for supervisor
-            ignore;
-        {error, Why} ->
-            % japerk: correct error return is {stop, Reason}
-            {stop, {socket_error, Why}};
-        {ok, Socket} ->
-            {ok, #redis{socket=Socket, host=Host, port=Port, timeout=Timeout, calls=0}}
-    end.
-
-handle_call({send, Cmd}, From, State) ->
-    gen_tcp:send(State#redis.socket, [Cmd|?EOL]),
-    {noreply, State#redis{reply_caller=fun(V) -> gen_server:reply(From, lists:nth(1, V)) end,
-                          remaining=1}};
-        
-handle_call(disconnect, _From, State) ->
-    {stop, normal, ok, State};
-handle_call(get_all_results, From, State) ->
-    case State#redis.calls of
+handle_call(get_all_results, From, #redis{pipeline=true, calls=Calls} = State) ->
+    %error_logger:info_report([{state, State}, {calls, queue:len(Calls)}]),
+    case queue:len(Calls) of
         0 ->
             % answers came earlier than we could start listening...
             % Very unlikely but totally possible.
-            {reply, lists:reverse(State#redis.results), State#redis{results=[], calls=0}};
+            {reply, lists:reverse(State#redis.results), State#redis{results=[], calls=Calls}};
         _ ->
             % We are here earlier than results came, so just make
             % ourselves wait until stuff is ready.
-            {noreply, State#redis{reply_caller=fun(V) -> gen_server:reply(From, V) end}}
+            {noreply, State#redis{reply_caller=fun(V) -> gen_server2:reply(From, V) end}}
     end;
-handle_call(_, _From, State) -> {noreply, State}.
 
 
-handle_cast({asend, Cmd}, State) ->
-    gen_tcp:send(State#redis.socket, [Cmd|?EOL]),
-    {noreply, State};
-handle_cast({send, Cmd}, State=#redis{remaining=Remaining, calls=Calls}) ->
-    % how we should do here: if remaining is already != 0 then we'll
-    % let handle_info take care of keeping track how many remaining things
-    % there are. If instead it's 0 we are the first call so let's just
-    % do it.
-    gen_tcp:send(State#redis.socket, [Cmd|?EOL]),
-    case Remaining of
-        0 ->
-            {noreply, State#redis{remaining=1, calls=1}};
-        _ ->
-            {noreply, State#redis{calls=Calls+1}}
-    end;
-handle_cast(_Msg, State) -> {noreply, State}.
+  
+handle_call({send, Cmd}, From, State1) ->
+	% NOTE: redis ignores sent commands it doesn't understand, which means
+	% we don't get a reply, which means callers will timeout
+	End = <<?EOL>>,
+	State = ensure_started(State1),
+	case gen_tcp:send(State#redis.socket, [Cmd|End]) of
+		ok ->
+			%error_logger:info_report([{send, Cmd}, {from, From}]),
+			Queue = queue:in(From, State#redis.calls),
+			case Cmd of
+			  <<"select ", DB/binary>> ->
+			    {noreply, State#redis{calls=Queue, remaining=1, db=DB}};
+			  _ ->
+			    {noreply, State#redis{calls=Queue, remaining=1}}
+			end;
+		{error, Reason} ->
+			error_logger:error_report([{send, Cmd}, {error, Reason}]),
+			{stop, timeout, {error, Reason}, State}
+	end;
+handle_call(disconnect, _, State) ->
+	{stop, shutdown, shutdown, State};
+handle_call(_, _, State) ->
+	{reply, undefined, State}.
 
+handle_cast({pipelining, Bool}, State) ->
+  {noreply, State#redis{pipeline=Bool}};
+handle_cast(disconnect, State) ->
+	{stop, shutdown, State};
+	
+handle_cast({send, Cmd},#redis{remaining=Remaining, 
+                                      calls=Calls} = State1) ->
+  End = <<?EOL>>,
+  State = ensure_started(State1),
+  Queue = queue:in(async, Calls),
+  gen_tcp:send(State#redis.socket, [Cmd|End]),
+  case Remaining of
+      0 ->
+          {noreply, State#redis{remaining=1, calls=Queue}};
+      _ ->
+          {noreply,State#redis{calls=Queue}}
+  end;
+handle_cast(_, State) ->
+	{noreply, State}.
 
-trim2({ok, S}) ->
-    string:substr(S, 1, length(S)-2);
-trim2(S) ->
-    trim2({ok, S}).
+%%%%%%%%%%%%%%%%%
+%% handle_info %%
+%%%%%%%%%%%%%%%%%
 
-% This function helps with pipelining by creating a pubsub system with
-% the caller. The caller could submit multiple requests and not listen
-% until later when all or some of them have been answered, at that
-% point 2 conditions can be true:
-%  1) We still need to process more things in this response chain
-%  2) We are finished.
-%
-% And these 2 are together with the following 2:
-%  1) We called get_all_results before the end of the responses.
-%  2) We called get_all_results after the end of the responses.
-%
-% If there's stuff missing in the chain we just push results, this also
-% happens when there's nothing more to process BUT we haven't requested
-% results yet.
-% In case we have requested results: if requests are not yet ready we
-% just push them, otherwise we finally answer all of them.
-save_or_reply(Result, State=#redis{calls=Calls, results=Results, reply_caller=ReplyCaller}) ->
-    case Calls of
-        0 ->
-            % We don't reverse results here because if all the requests
-            % come in and then we submit another one, if we reverse
-            % they will be scrambled in the results field of the record.
-            % instead if we wait just before we reply they will be
-            % in the right order.
-            FullResults = [Result|Results],
-            NewState = case ReplyCaller of
-                undefined ->
-                    State#redis{results=FullResults};
-                _ ->
-                    ReplyCaller(lists:reverse(FullResults)),
-                    State#redis{results=[]}
-            end,
-            NewState#redis{remaining=0, pstate=empty,
-                           reply_caller=undefined, buffer=[],
-                           calls=0};
-        _ ->
-            State#redis{results=[Result|Results], remaining=1, pstate=empty, buffer=[], calls=Calls}
+recv_value(Socket, NBytes) ->
+	inet:setopts(Socket, [{packet, 0}]), % go into raw mode to read bytes
+	case gen_tcp:recv(Socket, NBytes+2) of
+		{ok, Packet} ->
+			inet:setopts(Socket, [{packet, line}]), % go back to line mode
+			%error_logger:info_report([{packet, Packet}]),
+			trim2({ok, Packet});
+		{error, Reason} ->
+			error_logger:error_report([{recv, NBytes}, {error, Reason}]),
+			throw({error, Reason})
+	end.
 
-    end.
+send_reply(#redis{pipeline=true,calls=Calls,results=Results, reply_caller=ReplyCaller}=State)->
+  Result = case lists:reverse(State#redis.buffer) of
+    [V] when is_atom(V) -> V;
+    R -> R
+  end,
+  {{value, _From}, Queue} = queue:out(Calls),
+  case queue:len(Queue) of
+    0 ->
+      %error_logger:info_report([sendreply, {state, State}, {queue,queue:len(Queue) }]),
+      FullResults = [Result|Results],
+      NewState = case ReplyCaller of
+          undefined ->
+              State#redis{results=FullResults};
+          _ ->
+              ReplyCaller(lists:reverse(FullResults)),
+              State#redis{results=[]}
+      end,
+      NewState#redis{remaining=0, pstate=empty,
+                     reply_caller=undefined, buffer=[],
+                     calls=Queue};
+    _ ->
+      %error_logger:info_report([sendreply, {state, State}, {queue,queue:len(Queue) }]),
+      State#redis{results=[Result|Results], remaining=1, pstate=empty, buffer=[], calls=Queue}
+  end;
 
-handle_info({tcp, Socket, Data}, State=#redis{calls=Calls}) ->
-    Trimmed = trim2(Data),
-    NewState = case {State#redis.remaining-1, erldis_proto:parse(State#redis.pstate, Trimmed)} of
-        % This line contained an error code. Next line will hold
-        % The error message that we will parse.
-        {0, error} ->
-            State#redis{remaining=1, pstate=error};
+send_reply(State) ->
+	{{value, From}, Queue} = queue:out(State#redis.calls),
+	Reply = lists:reverse(State#redis.buffer),
+	%error_logger:info_report([sendreply_no_pipeline, {state, State}, {queue,queue:len(Queue) }]),
+	gen_server2:reply(From, Reply),
+	State#redis{calls=Queue, buffer=[], pstate=empty}.
 
-        % The stateful parser just started and tells us the number
-        % of results that we will have to parse for those calls
-        % where more than one result is expected. The next
-        % line will start with the first item to read.
-        {0, {hold, Remaining}} ->
-            case Remaining of
-                nil ->
-                    save_or_reply(nil, State#redis{calls=Calls-1});
-                _ ->
-                    % Reset the remaining value to the number of results that we need to parse.
-                    State#redis{remaining=Remaining, pstate=read}
-            end;
 
-        % We either had only one thing to read or we are at the
-        % end of the stuff that we need to read. either way
-        % just pack up the buffer and send.
-        {0, {read, NBytes}} ->
-            CurrentValue = case NBytes of
-                nil ->
-                    nil;
-                _ ->
-                    inet:setopts(Socket, [{packet, 0}]), % go into raw mode to read bytes
-                    CV = trim2(gen_tcp:recv(Socket, NBytes+2)), % also consume the \r\n
-                    inet:setopts(Socket, [{packet, line}]), % go back to line mode
-                    CV
-            end,
-            OldBuffer = State#redis.buffer,
-            case OldBuffer of
-                [] ->
-                    save_or_reply(CurrentValue, State#redis{calls=Calls-1});
-                _ ->
-                    save_or_reply(lists:reverse([CurrentValue|OldBuffer]), State#redis{calls=Calls-1})
-            end;
+parse_state(State, Socket, Data) ->
+	Parse = erldis_proto:parse(State#redis.pstate, trim2(Data)),
+	%error_logger:info_report([{parse, Parse}, {state , State}]),
+	case {State#redis.remaining-1, Parse} of
+		{0, error} ->
+			% next line is the error string
+			State#redis{remaining=1, pstate=error};
+		{0, {hold, nil}} ->
+			% reply with values in buffer
+			send_reply(State);
+		{0, {hold, Remaining}} ->
+			% begin accumulation of multi bulk reply
+			State#redis{remaining=Remaining, pstate=read};
+		{_, {read, nil}} ->
+			% reply with nil
+			send_reply(State#redis{buffer=[nil]});
+	  {_, {read, 0}} ->
+			% reply with nil
+			send_reply(State#redis{buffer=[]});
+		{0, {read, NBytes}} ->
+			% reply with Value added to buffer
+			Value = recv_value(Socket, NBytes),
+			Buffer = [Value | State#redis.buffer],
+			send_reply(State#redis{buffer=Buffer});
+		{N, {read, NBytes}} ->
+			% accumulate multi bulk reply
+			Value = recv_value(Socket, NBytes),
+			Buffer = [Value | State#redis.buffer],
+			State#redis{remaining=N, buffer=Buffer, pstate=read};
+		{0, Value} ->
+			% reply with Value
+			Buffer = [Value | State#redis.buffer],
+		  send_reply(State#redis{buffer=Buffer});
+		{N, Value} -> 
+		  Buffer = [Value | State#redis.buffer],
+		  %error_logger:info_report([{buffer, Buffer},{parsed, {N, Value}}, {state , State}]),
+		  State#redis{remaining=N, buffer=Buffer, pstate=read}
+	end.
 
-        % The stateful parser tells us to read some bytes
-        {N, {read, NBytes}} ->
-            % annoying repetition... I should reuse this code.
-            CurrentValue = case NBytes of
-                nil ->
-                    nil;
-                _ ->
-                    inet:setopts(Socket, [{packet, 0}]), % go into raw mode to read bytes
-                    CV = trim2(gen_tcp:recv(Socket, NBytes+2)), % also consume the \r\n
-                    inet:setopts(Socket, [{packet, line}]), % go back to line mode
-                    CV
-            end,
-            OldBuffer = State#redis.buffer,
-            State#redis{remaining=N, buffer=[CurrentValue|OldBuffer], pstate=read};
+handle_info({tcp, Socket, Data}, State) ->
+	case (catch parse_state(State, Socket, Data)) of
+		{error, Reason} ->
+			%error_logger:error_report([{parse_state, Data}, {error, Reason}]),
+			{stop, Reason, State};
+		NewState ->
+			inet:setopts(Socket, [{active, once}]),
+			{noreply, NewState}
+	end;
+handle_info({tcp_closed, Socket}, State=#redis{socket=Socket}) ->
+	error_logger:warning_report([{erldis_client, tcp_closed}, State]),
+	{noreply, State#redis{socket=undefined}};
+handle_info(_Info, State) ->
+	{noreply, State}.
 
-
-        % Simple return values contained in a single line
-        {0, Value} ->
-            save_or_reply(Value, State#redis{calls=Calls-1})
-
-    end,
-    inet:setopts(Socket, [{active, once}]),
-    {noreply, NewState};
-handle_info({tcp_closed, Socket}, State=#redis{socket=Socket}) ->
-	% japerk: shutdown Reason does not generate an error message
-    {stop, shutdown, State};
-handle_info(_Info, State) -> {noreply, State}.
-
+%%%%%%%%%%%%%%%
+%% terminate %%
+%%%%%%%%%%%%%%%
 
 terminate(_Reason, State) ->
-    case State#redis.socket of
-        undefined -> ok;
-        Socket -> gen_tcp:close(Socket)
-    end.
+	% NOTE: if supervised with brutal_kill, may not be able to reply
+	R = fun(From) -> gen_server2:reply(From, {error, closed}) end,
+	lists:foreach(R, queue:to_list(State#redis.calls)),
+	case State#redis.socket of
+		undefined -> ok;
+		Socket -> gen_tcp:close(Socket)
+	end.
 
-
-code_change(_OldVsn, State, _Extra) -> {ok, State}.
-%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
-
-
+code_change(_OldVsn, State, _Extra) -> {ok, State}.

src/erldis_dict.erl

 		 update_counter/2, update_counter/3]).
 
 % NOTE: use erldis_lists instead, fetch & find won't work for lists
-append(Key, Value, Client) -> set_call(Client, rpush, Key, Value).
+append(Key, Value, Client) -> set_call(Client, <<"rpush ">>, Key, Value).
 
 append_list(Key, Values, Client) ->
 	lists:foreach(fun(Value) -> append(Key, Value, Client) end, Values).
 
-erase(Key, Client) -> scall(Client, del, [Key]).
+erase(Key, Client) -> scall(Client, <<"del ">>, [Key]).
 
 fetch(Key, Client) ->
-	case scall(Client, get, [Key]) of
+	case scall(Client, <<"get ">>, [Key]) of
 		[nil] -> undefined;
 		[Value] -> Value
 	end.
 
 % NOTE: this is only useful if keys have a known prefix
-fetch_keys(Pattern, Client) -> scall(Client, keys, [Pattern]).
+fetch_keys(Pattern, Client) -> scall(Client, <<"keys ">>, [Pattern]).
 
 %filter(Pred, Client) -> ok.
 
 
 %from_list(List, Client) -> ok.
 
-is_key(Key, Client) -> hd(scall(Client, exists, [Key])).
+is_key(Key, Client) -> hd(scall(Client, <<"exists ">>, [Key])).
 
 size(Client) ->
-	numeric_value(erldis_sync_client:scall(Client, dbsize)).
+	numeric_value(erldis_sync_client:scall(Client, <<"dbsize ">>)).
 
 store(Key, [], Client) -> erase(Key, Client);
-store(Key, Value, Client) -> set_call(Client, set, Key, Value).
+store(Key, Value, Client) -> set_call(Client, <<"set ">>, Key, Value).
 
 %to_list(Client) -> ok.
 
 
 % NOTE: this returns new count value, not a modified dict
 update_counter(Key, 1, Client) ->
-	numeric_value(scall(Client, incr, [Key]));
+	numeric_value(scall(Client, <<"incr ">>, [Key]));
 update_counter(Key, Incr, Client) ->
-	numeric_value(scall(Client, incrby, [Key, Incr])).
+	numeric_value(scall(Client, <<"incrby ">>, [Key, Incr])).
 
 %%%%%%%%%%%%%
 %% helpers %%
 scall(Client, Cmd, Args) -> erldis_sync_client:scall(Client, Cmd, Args).
 
 set_call(Client, Cmd, Key, Val) ->
-	erldis_sync_client:call(Client, Cmd, [[Key, length(Val)], [Val]]).
+	erldis_sync_client:call(Client, Cmd, [[Key, erlang:size(Val)], [Val]]).

src/erldis_list.erl

 is_empty(Key, Client) -> len(Key, Client) == 0.
 
 len(Key, Client) ->
-	case scall(Client, llen, [Key]) of
+	case scall(Client, <<"llen ">>, [Key]) of
 		[false] -> 0;
 		[true] -> 1;
 		[Len] -> Len
 	end.
 
 in(Item, Key, Client) ->
-	[ok] = set_call(Client, rpush, Key, Item).
+	[ok] = set_call(Client, <<"rpush ">>, Key, Item).
 
 in_r(Item, Key, Client) ->
-	[ok] = set_call(Client, lpush, Key, Item).
+	[ok] = set_call(Client, <<"lpush ">>, Key, Item).
 
 out(Key, Client) ->
-	case hd(scall(Client, lpop, [Key])) of
+	case hd(scall(Client, <<"lpop ">>, [Key])) of
 		nil -> empty;
 		Item -> {value, Item}
 	end.
 
 out_r(Key, Client) ->
-	case hd(scall(Client, rpop, [Key])) of
+	case hd(scall(Client, <<"rpop ">>, [Key])) of
 		nil -> empty;
 		Item -> {value, Item}
 	end.
 %% array like api %%
 %%%%%%%%%%%%%%%%%%%%
 
-get(I, Key, Client) -> hd(scall(Client, lindex, [Key, I])).
+get(I, Key, Client) -> hd(scall(Client, <<"lindex ">>, [Key, I])).
 
 is_array(Key, Client) -> is_list(Key, Client).
 
-set(I, Value, Key, Client) -> call(Client, lset, Key, I, Value).
+set(I, Value, Key, Client) -> call(Client, <<"lset ">>, Key, I, Value).
 
 size(Key, Client) -> len(Key, Client).
 
 % any
 % append
 
-delete(Elem, Key, Client) -> call(Client, lrem, Key, 1, Elem).
+delete(Elem, Key, Client) -> call(Client, <<"lrem ">>, Key, 1, Elem).
 
 % dropwhile
 
 		Item -> F(Item), foreach(I+1, F, Key, Client)
 	end.
 
-is_list(Key, Client) -> ["list"] == scall(Client, type, [Key]).
+is_list(Key, Client) -> [<<"list">>] == scall(Client, <<"type ">>, [Key]).
 
 % keysort
 
 	end;
 sublist(Key, Client, Start, Len) when Start > 0, Len > 1 ->
 	% erlang lists are 1-indexed
-	scall(Client, lrange, [Key, Start - 1, Start + Len - 2]);
+	scall(Client, <<"lrange ">>, [Key, Start - 1, Start + Len - 2]);
 sublist(Key, Client, Start, Len) when Start < 0, Len > 1 ->
 	% can give a negative start where -1 is the last element
-	scall(Client, lrange, [Key, Start, Start - Len + 1]).
+	scall(Client, <<"lrange ">>, [Key, Start, Start - Len + 1]).
 
 % sort
 % takewhile
 	end.
 
 from_list(L, Key, Client) ->
-	scall(Client, del, [Key]),
+	scall(Client, <<"del ">>, [Key]),
 	lists:foreach(fun(Item) -> in(Item, Key, Client) end, L).
 
 to_list(Key, Client) -> foldr(fun(Item, L) -> [Item | L] end, [], Key, Client).
 %%%%%%%%%%%%%
 
 call(Client, Cmd, Key, N, Val) ->
-	erldis_sync_client:call(Client, Cmd, [[Key, N, length(Val)], [Val]]).
+	erldis_client:call(Client, Cmd, [[Key, N, size(Val)], [Val]]).
 
-scall(Client, Cmd, Args) -> erldis_sync_client:scall(Client, Cmd, Args).
+scall(Client, Cmd, Args) -> erldis_client:scall(Client, Cmd, Args).
 
 set_call(Client, Cmd, Key, Val) ->
-	erldis_sync_client:call(Client, Cmd, [[Key, length(Val)], [Val]]).
+	erldis_client:call(Client, Cmd, [[Key, size(Val)], [Val]]).

src/erldis_proto.erl

 
 -export([parse/2]).
 
-parse(empty, "+OK") ->
+parse(_, <<"+OK">>) ->
     ok;
-parse(empty, "+PONG") ->
+parse(_, <<":0">>) ->
+    false;
+parse(_, <<":1">>) ->
+    true;
+parse(empty, <<"+QUEUED">>) ->
+	queued;
+parse(empty, <<"+PONG">>) ->
     pong;
-parse(empty, ":0") ->
-    false;
-parse(empty, ":1") ->
-    true;
-parse(empty, "-" ++ Message) ->
+parse(empty, <<"-", Message/binary>>) ->
     {error, Message};
-parse(empty, "$-1") ->
+parse(empty, <<"$-1">>) ->
     {read, nil};
-parse(empty, "*-1") ->
+parse(empty, <<"*-1">>) ->
     {hold, nil};
-parse(empty, "*0") ->
+parse(empty, <<"*0">>) ->
     {read, 0};
-parse(empty, "$" ++ BulkSize) ->
-    {read, list_to_integer(BulkSize)};
-parse(read, "$" ++ BulkSize) ->
-    {read, list_to_integer(BulkSize)};
-parse(empty, "*" ++ MultiBulkSize) ->
-    {hold, list_to_integer(MultiBulkSize)};
+parse(_, <<"$", BulkSize/binary>>) ->
+    {read, list_to_integer(binary_to_list(BulkSize))};
+parse(empty, <<"*", MultiBulkSize/binary>>) ->
+    {hold, list_to_integer(binary_to_list(MultiBulkSize))};
 parse(empty, Message) ->
     convert(Message).
 
-convert(":" ++ Message) ->
-    list_to_integer(Message);
+convert(<<":", Message/binary>>) ->
+    list_to_integer(binary_to_list(Message));
 % in case the message is not OK or PONG it's a
 % real value that we don't know how to convert
-% to an atom, so just pass it as is and remove
-% the +
-convert("+" ++ Message) -> 
+% so just pass it as is and remove the +
+convert(<<"+", Message/binary>>) ->
     Message;
 convert(Message) ->
     Message.
-

src/erldis_sets.erl

-%% @doc sets like interface to redis. Uses erldis_sync_client to ensure
+%% @doc sets like interface to redis. Uses erldis_client to ensure
 %% synchronous results.
 %%
 %% @author Jacob Perkins <japerk@gmail.com>
 %% sets-like api %%
 %%%%%%%%%%%%%%%%%%%
 
-delete(Client) -> erldis_sync_client:stop(Client).
+delete(Client) -> erldis_client:stop(Client).
 
-is_set(Client, Key) -> ["set"] == scall(Client, type, [Key]).
+is_set(Client, Key) -> [<<"set">>] == scall(Client, <<"type ">>, [Key]).
 
 size(Client, Key) ->
-	case scall(Client, scard, [Key]) of
+	case scall(Client, <<"scard ">>, [Key]) of
 		% redis actually returns 0 & 1, but those are interpreted as false & true
 		[false] -> 0;
 		[true] -> 1;
 		[Size] -> Size
 	end.
 
-to_list(Client, Key) -> scall(Client, smembers, [Key]).
+to_list(Client, Key) -> scall(Client, <<"smembers ">>, [Key]).
 
 from_list(Client, Key, List) ->
 	% delete existing set
-	scall(Client, del, [Key]),
+	scall(Client, <<"del ">>, [Key]),
 	lists:foreach(fun(Elem) -> add_element(Elem, Client, Key) end, List),
 	Client.
 
 is_element(Elem, Client, Key) ->
-	case set_call(Client, sismember, Key, Elem) of
+	case set_call(Client, <<"sismember ">>, Key, Elem) of
 		[false] -> false;
 		[true] -> true
 	end.
 
-add_element(Elem, Client, Key) -> set_call(Client, sadd, Key, Elem).
+add_element(Elem, Client, Key) -> set_call(Client, <<"sadd ">>, Key, Elem).
 
-del_element(Elem, Client, Key) -> set_call(Client, srem, Key, Elem).
+del_element(Elem, Client, Key) -> set_call(Client, <<"srem ">>, Key, Elem).
 
-union(Client, Keys) -> scall(Client, sunion, Keys).
+union(Client, Keys) -> scall(Client, <<"sunion ">>, Keys).
 
 intersection(Client, Key1, Key2) -> intersection(Client, [Key1, Key2]).
 
-intersection(Client, Keys) -> scall(Client, sinter, Keys).
+intersection(Client, Keys) -> scall(Client, <<"sinter ">>, Keys).
 
 is_disjoint(Client, Key1, Key2) -> [] == intersection(Client, [Key1, Key2]).
 
 subtract(Client, Key1, Key2) -> subtract(Client, [Key1, Key2]).
 
-subtract(Client, Keys) -> scall(Client, sdiff, Keys).
+subtract(Client, Keys) -> scall(Client, <<"sdiff ">>, Keys).
 
 is_subset(Client, Key1, Key2) -> [] == subtract(Client, [Key2, Key1]).
 
 
 % TODO: handle {error, Reason}. throw exception?
 
-scall(Client, Cmd, Args) -> erldis_sync_client:scall(Client, Cmd, Args).
+scall(Client, Cmd, Args) -> erldis_client:scall(Client, Cmd, Args).
 
 set_call(Client, Cmd, Key, Val) ->
-	erldis_sync_client:call(Client, Cmd, [[Key, length(Val)], [Val]]).
+	erldis_client:call(Client, Cmd, [[Key, erlang:size(Val)], [Val]]).

src/gen_server2.erl

+%% This file is a copy of gen_server.erl from the R11B-5 Erlang/OTP
+%% distribution, with the following modifications:
+%%
+%% 1) the module name is gen_server2
+%%
+%% 2) more efficient handling of selective receives in callbacks
+%% gen_server2 processes drain their message queue into an internal
+%% buffer before invoking any callback module functions. Messages are
+%% dequeued from the buffer for processing. Thus the effective message
+%% queue of a gen_server2 process is the concatenation of the internal
+%% buffer and the real message queue.
+%% As a result of the draining, any selective receive invoked inside a
+%% callback is less likely to have to scan a large message queue.
+%%
+%% 3) gen_server2:cast is guaranteed to be order-preserving
+%% The original code could reorder messages when communicating with a
+%% process on a remote node that was not currently connected.
+%%
+%% All modifications are (C) 2009 LShift Ltd.
+
+%% ``The contents of this file are subject to the Erlang Public License,
+%% Version 1.1, (the "License"); you may not use this file except in
+%% compliance with the License. You should have received a copy of the
+%% Erlang Public License along with this software. If not, it can be
+%% retrieved via the world wide web at http://www.erlang.org/.
+%% 
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and limitations
+%% under the License.
+%% 
+%% The Initial Developer of the Original Code is Ericsson Utvecklings AB.
+%% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings
+%% AB. All Rights Reserved.''
+%% 
+%%     $Id$
+%%
+-module(gen_server2).
+
+%%% ---------------------------------------------------
+%%%
+%%% The idea behind THIS server is that the user module
+%%% provides (different) functions to handle different
+%%% kind of inputs. 
+%%% If the Parent process terminates the Module:terminate/2
+%%% function is called.
+%%%
+%%% The user module should export:
+%%%
+%%%   init(Args)  
+%%%     ==> {ok, State}
+%%%         {ok, State, Timeout}
+%%%         ignore
+%%%         {stop, Reason}
+%%%
+%%%   handle_call(Msg, {From, Tag}, State)
+%%%
+%%%    ==> {reply, Reply, State}
+%%%        {reply, Reply, State, Timeout}
+%%%        {noreply, State}
+%%%        {noreply, State, Timeout}
+%%%        {stop, Reason, Reply, State}  
+%%%              Reason = normal | shutdown | Term terminate(State) is called
+%%%
+%%%   handle_cast(Msg, State)
+%%%
+%%%    ==> {noreply, State}
+%%%        {noreply, State, Timeout}
+%%%        {stop, Reason, State} 
+%%%              Reason = normal | shutdown | Term terminate(State) is called
+%%%
+%%%   handle_info(Info, State) Info is e.g. {'EXIT', P, R}, {nodedown, N}, ...
+%%%
+%%%    ==> {noreply, State}
+%%%        {noreply, State, Timeout}
+%%%        {stop, Reason, State} 
+%%%              Reason = normal | shutdown | Term, terminate(State) is called
+%%%
+%%%   terminate(Reason, State) Let the user module clean up
+%%%        always called when server terminates
+%%%
+%%%    ==> ok
+%%%
+%%%
+%%% The work flow (of the server) can be described as follows:
+%%%
+%%%   User module                          Generic
+%%%   -----------                          -------
+%%%     start            ----->             start
+%%%     init             <-----              .
+%%%
+%%%                                         loop
+%%%     handle_call      <-----              .
+%%%                      ----->             reply
+%%%
+%%%     handle_cast      <-----              .
+%%%
+%%%     handle_info      <-----              .
+%%%
+%%%     terminate        <-----              .
+%%%
+%%%                      ----->             reply
+%%%
+%%%
+%%% ---------------------------------------------------
+
+%% API
+-export([start/3, start/4,
+	 start_link/3, start_link/4,
+	 call/2, call/3,
+	 cast/2, reply/2,
+	 abcast/2, abcast/3,
+	 multi_call/2, multi_call/3, multi_call/4,
+	 enter_loop/3, enter_loop/4, enter_loop/5]).
+
+-export([behaviour_info/1]).
+
+%% System exports
+-export([system_continue/3,
+	 system_terminate/4,
+	 system_code_change/4,
+	 format_status/2]).
+
+%% Internal exports
+-export([init_it/6, print_event/3]).
+
+-import(error_logger, [format/2]).
+
+%%%=========================================================================
+%%%  API
+%%%=========================================================================
+
+behaviour_info(callbacks) ->
+    [{init,1},{handle_call,3},{handle_cast,2},{handle_info,2},
+     {terminate,2},{code_change,3}];
+behaviour_info(_Other) ->
+    undefined.
+
+%%%  -----------------------------------------------------------------
+%%% Starts a generic server.
+%%% start(Mod, Args, Options)
+%%% start(Name, Mod, Args, Options)
+%%% start_link(Mod, Args, Options)
+%%% start_link(Name, Mod, Args, Options) where:
+%%%    Name ::= {local, atom()} | {global, atom()}
+%%%    Mod  ::= atom(), callback module implementing the 'real' server
+%%%    Args ::= term(), init arguments (to Mod:init/1)
+%%%    Options ::= [{timeout, Timeout} | {debug, [Flag]}]
+%%%      Flag ::= trace | log | {logfile, File} | statistics | debug
+%%%          (debug == log && statistics)
+%%% Returns: {ok, Pid} |
+%%%          {error, {already_started, Pid}} |
+%%%          {error, Reason}
+%%% -----------------------------------------------------------------
+start(Mod, Args, Options) ->
+    gen:start(?MODULE, nolink, Mod, Args, Options).
+
+start(Name, Mod, Args, Options) ->
+    gen:start(?MODULE, nolink, Name, Mod, Args, Options).
+
+start_link(Mod, Args, Options) ->
+    gen:start(?MODULE, link, Mod, Args, Options).
+
+start_link(Name, Mod, Args, Options) ->
+    gen:start(?MODULE, link, Name, Mod, Args, Options).
+
+
+%% -----------------------------------------------------------------
+%% Make a call to a generic server.
+%% If the server is located at another node, that node will
+%% be monitored.
+%% If the client is trapping exits and is linked server termination
+%% is handled here (? Shall we do that here (or rely on timeouts) ?).
+%% ----------------------------------------------------------------- 
+call(Name, Request) ->
+    case catch gen:call(Name, '$gen_call', Request) of
+	{ok,Res} ->
+	    Res;
+	{'EXIT',Reason} ->
+	    exit({Reason, {?MODULE, call, [Name, Request]}})
+    end.
+
+call(Name, Request, Timeout) ->
+    case catch gen:call(Name, '$gen_call', Request, Timeout) of
+	{ok,Res} ->
+	    Res;
+	{'EXIT',Reason} ->
+	    exit({Reason, {?MODULE, call, [Name, Request, Timeout]}})
+    end.
+
+%% -----------------------------------------------------------------
+%% Make a cast to a generic server.
+%% -----------------------------------------------------------------
+cast({global,Name}, Request) ->
+    catch global:send(Name, cast_msg(Request)),
+    ok;
+cast({Name,Node}=Dest, Request) when is_atom(Name), is_atom(Node) -> 
+    do_cast(Dest, Request);
+cast(Dest, Request) when is_atom(Dest) ->
+    do_cast(Dest, Request);
+cast(Dest, Request) when is_pid(Dest) ->
+    do_cast(Dest, Request).
+
+do_cast(Dest, Request) -> 
+    do_send(Dest, cast_msg(Request)),
+    ok.
+    
+cast_msg(Request) -> {'$gen_cast',Request}.
+
+%% -----------------------------------------------------------------
+%% Send a reply to the client.
+%% -----------------------------------------------------------------
+reply({To, Tag}, Reply) ->
+    catch To ! {Tag, Reply}.
+
+%% ----------------------------------------------------------------- 
+%% Asyncronous broadcast, returns nothing, it's just send'n prey
+%%-----------------------------------------------------------------  
+abcast(Name, Request) when is_atom(Name) ->
+    do_abcast([node() | nodes()], Name, cast_msg(Request)).
+
+abcast(Nodes, Name, Request) when is_list(Nodes), is_atom(Name) ->
+    do_abcast(Nodes, Name, cast_msg(Request)).
+
+do_abcast([Node|Nodes], Name, Msg) when is_atom(Node) ->
+    do_send({Name,Node},Msg),
+    do_abcast(Nodes, Name, Msg);
+do_abcast([], _,_) -> abcast.
+
+%%% -----------------------------------------------------------------
+%%% Make a call to servers at several nodes.
+%%% Returns: {[Replies],[BadNodes]}
+%%% A Timeout can be given
+%%% 
+%%% A middleman process is used in case late answers arrives after
+%%% the timeout. If they would be allowed to glog the callers message
+%%% queue, it would probably become confused. Late answers will 
+%%% now arrive to the terminated middleman and so be discarded.
+%%% -----------------------------------------------------------------
+multi_call(Name, Req)
+  when is_atom(Name) ->
+    do_multi_call([node() | nodes()], Name, Req, infinity).
+
+multi_call(Nodes, Name, Req) 
+  when is_list(Nodes), is_atom(Name) ->
+    do_multi_call(Nodes, Name, Req, infinity).
+
+multi_call(Nodes, Name, Req, infinity) ->
+    do_multi_call(Nodes, Name, Req, infinity);
+multi_call(Nodes, Name, Req, Timeout) 
+  when is_list(Nodes), is_atom(Name), is_integer(Timeout), Timeout >= 0 ->
+    do_multi_call(Nodes, Name, Req, Timeout).
+
+
+%%-----------------------------------------------------------------
+%% enter_loop(Mod, Options, State, <ServerName>, <TimeOut>) ->_ 
+%%   
+%% Description: Makes an existing process into a gen_server. 
+%%              The calling process will enter the gen_server receive 
+%%              loop and become a gen_server process.
+%%              The process *must* have been started using one of the 
+%%              start functions in proc_lib, see proc_lib(3). 
+%%              The user is responsible for any initialization of the 
+%%              process, including registering a name for it.
+%%-----------------------------------------------------------------
+enter_loop(Mod, Options, State) ->
+    enter_loop(Mod, Options, State, self(), infinity).
+
+enter_loop(Mod, Options, State, ServerName = {_, _}) ->
+    enter_loop(Mod, Options, State, ServerName, infinity);
+
+enter_loop(Mod, Options, State, Timeout) ->
+    enter_loop(Mod, Options, State, self(), Timeout).
+
+enter_loop(Mod, Options, State, ServerName, Timeout) ->
+    Name = get_proc_name(ServerName),
+    Parent = get_parent(),
+    Debug = debug_options(Name, Options),
+    Queue = queue:new(),
+    loop(Parent, Name, State, Mod, Timeout, Queue, Debug).
+
+%%%========================================================================
+%%% Gen-callback functions
+%%%========================================================================
+
+%%% ---------------------------------------------------
+%%% Initiate the new process.
+%%% Register the name using the Rfunc function
+%%% Calls the Mod:init/Args function.
+%%% Finally an acknowledge is sent to Parent and the main
+%%% loop is entered.
+%%% ---------------------------------------------------
+init_it(Starter, self, Name, Mod, Args, Options) ->
+    init_it(Starter, self(), Name, Mod, Args, Options);
+init_it(Starter, Parent, Name, Mod, Args, Options) ->
+    Debug = debug_options(Name, Options),
+    Queue = queue:new(),
+    case catch Mod:init(Args) of
+	{ok, State} ->
+	    proc_lib:init_ack(Starter, {ok, self()}), 	    
+	    loop(Parent, Name, State, Mod, infinity, Queue, Debug);
+	{ok, State, Timeout} ->
+	    proc_lib:init_ack(Starter, {ok, self()}), 	    
+	    loop(Parent, Name, State, Mod, Timeout, Queue, Debug);
+	{stop, Reason} ->
+	    proc_lib:init_ack(Starter, {error, Reason}),
+	    exit(Reason);
+	ignore ->
+	    proc_lib:init_ack(Starter, ignore),
+	    exit(normal);
+	{'EXIT', Reason} ->
+	    proc_lib:init_ack(Starter, {error, Reason}),
+	    exit(Reason);
+	Else ->
+	    Error = {bad_return_value, Else},
+	    proc_lib:init_ack(Starter, {error, Error}),
+	    exit(Error)
+    end.
+
+%%%========================================================================
+%%% Internal functions
+%%%========================================================================
+%%% ---------------------------------------------------
+%%% The MAIN loop.
+%%% ---------------------------------------------------
+loop(Parent, Name, State, Mod, Time, Queue, Debug) ->
+    receive
+        Input -> loop(Parent, Name, State, Mod,
+                      Time, queue:in(Input, Queue), Debug)
+    after 0 ->
+            case queue:out(Queue) of
+                {{value, Msg}, Queue1} ->
+                    process_msg(Parent, Name, State, Mod,
+                                Time, Queue1, Debug, Msg);
+                {empty, Queue1} ->
+                    receive
+                        Input ->
+                            loop(Parent, Name, State, Mod,
+                                 Time, queue:in(Input, Queue1), Debug)
+                    after Time ->
+                            process_msg(Parent, Name, State, Mod,
+                                        Time, Queue1, Debug, timeout)
+                    end
+            end
+    end.
+                    
+process_msg(Parent, Name, State, Mod, Time, Queue, Debug, Msg) ->
+    case Msg of
+	{system, From, Req} ->
+	    sys:handle_system_msg(Req, From, Parent, ?MODULE, Debug,
+				  [Name, State, Mod, Time, Queue]);
+	{'EXIT', Parent, Reason} ->
+	    terminate(Reason, Name, Msg, Mod, State, Debug);
+	_Msg when Debug =:= [] ->
+	    handle_msg(Msg, Parent, Name, State, Mod, Time, Queue);
+	_Msg ->
+	    Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, 
+				      Name, {in, Msg}),
+	    handle_msg(Msg, Parent, Name, State, Mod, Time, Queue, Debug1)
+    end.
+
+%%% ---------------------------------------------------
+%%% Send/recive functions
+%%% ---------------------------------------------------
+do_send(Dest, Msg) ->
+    catch erlang:send(Dest, Msg).
+
+do_multi_call(Nodes, Name, Req, infinity) ->
+    Tag = make_ref(),
+    Monitors = send_nodes(Nodes, Name, Tag, Req),
+    rec_nodes(Tag, Monitors, Name, undefined);
+do_multi_call(Nodes, Name, Req, Timeout) ->
+    Tag = make_ref(),
+    Caller = self(),
+    Receiver =
+	spawn(
+	  fun() ->
+		  %% Middleman process. Should be unsensitive to regular
+		  %% exit signals. The sychronization is needed in case
+		  %% the receiver would exit before the caller started
+		  %% the monitor.
+		  process_flag(trap_exit, true),
+		  Mref = erlang:monitor(process, Caller),
+		  receive
+		      {Caller,Tag} ->
+			  Monitors = send_nodes(Nodes, Name, Tag, Req),
+			  TimerId = erlang:start_timer(Timeout, self(), ok),
+			  Result = rec_nodes(Tag, Monitors, Name, TimerId),
+			  exit({self(),Tag,Result});
+		      {'DOWN',Mref,_,_,_} ->
+			  %% Caller died before sending us the go-ahead.
+			  %% Give up silently.
+			  exit(normal)
+		  end
+	  end),
+    Mref = erlang:monitor(process, Receiver),
+    Receiver ! {self(),Tag},
+    receive
+	{'DOWN',Mref,_,_,{Receiver,Tag,Result}} ->
+	    Result;
+	{'DOWN',Mref,_,_,Reason} ->
+	    %% The middleman code failed. Or someone did 
+	    %% exit(_, kill) on the middleman process => Reason==killed
+	    exit(Reason)
+    end.
+
+send_nodes(Nodes, Name, Tag, Req) ->
+    send_nodes(Nodes, Name, Tag, Req, []).
+
+send_nodes([Node|Tail], Name, Tag, Req, Monitors)
+  when is_atom(Node) ->
+    Monitor = start_monitor(Node, Name),
+    %% Handle non-existing names in rec_nodes.
+    catch {Name, Node} ! {'$gen_call', {self(), {Tag, Node}}, Req},
+    send_nodes(Tail, Name, Tag, Req, [Monitor | Monitors]);
+send_nodes([_Node|Tail], Name, Tag, Req, Monitors) ->
+    %% Skip non-atom Node
+    send_nodes(Tail, Name, Tag, Req, Monitors);
+send_nodes([], _Name, _Tag, _Req, Monitors) -> 
+    Monitors.
+
+%% Against old nodes:
+%% If no reply has been delivered within 2 secs. (per node) check that
+%% the server really exists and wait for ever for the answer.
+%%
+%% Against contemporary nodes:
+%% Wait for reply, server 'DOWN', or timeout from TimerId.
+
+rec_nodes(Tag, Nodes, Name, TimerId) -> 
+    rec_nodes(Tag, Nodes, Name, [], [], 2000, TimerId).
+
+rec_nodes(Tag, [{N,R}|Tail], Name, Badnodes, Replies, Time, TimerId ) ->
+    receive
+	{'DOWN', R, _, _, _} ->
+	    rec_nodes(Tag, Tail, Name, [N|Badnodes], Replies, Time, TimerId);
+	{{Tag, N}, Reply} ->  %% Tag is bound !!!
+	    unmonitor(R), 
+	    rec_nodes(Tag, Tail, Name, Badnodes, 
+		      [{N,Reply}|Replies], Time, TimerId);
+	{timeout, TimerId, _} ->	
+	    unmonitor(R),
+	    %% Collect all replies that already have arrived
+	    rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies)
+    end;
+rec_nodes(Tag, [N|Tail], Name, Badnodes, Replies, Time, TimerId) ->
+    %% R6 node
+    receive
+	{nodedown, N} ->
+	    monitor_node(N, false),
+	    rec_nodes(Tag, Tail, Name, [N|Badnodes], Replies, 2000, TimerId);
+	{{Tag, N}, Reply} ->  %% Tag is bound !!!
+	    receive {nodedown, N} -> ok after 0 -> ok end,
+	    monitor_node(N, false),
+	    rec_nodes(Tag, Tail, Name, Badnodes,
+		      [{N,Reply}|Replies], 2000, TimerId);
+	{timeout, TimerId, _} ->	
+	    receive {nodedown, N} -> ok after 0 -> ok end,
+	    monitor_node(N, false),
+	    %% Collect all replies that already have arrived
+	    rec_nodes_rest(Tag, Tail, Name, [N | Badnodes], Replies)
+    after Time ->
+	    case rpc:call(N, erlang, whereis, [Name]) of
+		Pid when is_pid(Pid) -> % It exists try again.
+		    rec_nodes(Tag, [N|Tail], Name, Badnodes,
+			      Replies, infinity, TimerId);
+		_ -> % badnode
+		    receive {nodedown, N} -> ok after 0 -> ok end,
+		    monitor_node(N, false),
+		    rec_nodes(Tag, Tail, Name, [N|Badnodes],
+			      Replies, 2000, TimerId)
+	    end
+    end;
+rec_nodes(_, [], _, Badnodes, Replies, _, TimerId) ->
+    case catch erlang:cancel_timer(TimerId) of
+	false ->  % It has already sent it's message
+	    receive
+		{timeout, TimerId, _} -> ok
+	    after 0 ->
+		    ok
+	    end;
+	_ -> % Timer was cancelled, or TimerId was 'undefined'
+	    ok
+    end,
+    {Replies, Badnodes}.
+
+%% Collect all replies that already have arrived
+rec_nodes_rest(Tag, [{N,R}|Tail], Name, Badnodes, Replies) ->
+    receive
+	{'DOWN', R, _, _, _} ->
+	    rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies);
+	{{Tag, N}, Reply} -> %% Tag is bound !!!
+	    unmonitor(R),
+	    rec_nodes_rest(Tag, Tail, Name, Badnodes, [{N,Reply}|Replies])
+    after 0 ->
+	    unmonitor(R),
+	    rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies)
+    end;
+rec_nodes_rest(Tag, [N|Tail], Name, Badnodes, Replies) ->
+    %% R6 node
+    receive
+	{nodedown, N} ->
+	    monitor_node(N, false),
+	    rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies);
+	{{Tag, N}, Reply} ->  %% Tag is bound !!!
+	    receive {nodedown, N} -> ok after 0 -> ok end,
+	    monitor_node(N, false),
+	    rec_nodes_rest(Tag, Tail, Name, Badnodes, [{N,Reply}|Replies])
+    after 0 ->
+	    receive {nodedown, N} -> ok after 0 -> ok end,
+	    monitor_node(N, false),
+	    rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies)
+    end;
+rec_nodes_rest(_Tag, [], _Name, Badnodes, Replies) ->
+    {Replies, Badnodes}.
+
+
+%%% ---------------------------------------------------
+%%% Monitor functions
+%%% ---------------------------------------------------
+
+start_monitor(Node, Name) when is_atom(Node), is_atom(Name) ->
+    if node() =:= nonode@nohost, Node =/= nonode@nohost ->
+	    Ref = make_ref(),
+	    self() ! {'DOWN', Ref, process, {Name, Node}, noconnection},
+	    {Node, Ref};
+       true ->
+	    case catch erlang:monitor(process, {Name, Node}) of
+		{'EXIT', _} ->
+		    %% Remote node is R6
+		    monitor_node(Node, true),
+		    Node;
+		Ref when is_reference(Ref) ->
+		    {Node, Ref}
+	    end
+    end.
+
+%% Cancels a monitor started with Ref=erlang:monitor(_, _).
+unmonitor(Ref) when is_reference(Ref) ->
+    erlang:demonitor(Ref),
+    receive
+	{'DOWN', Ref, _, _, _} ->
+	    true
+    after 0 ->
+	    true
+    end.
+
+%%% ---------------------------------------------------
+%%% Message handling functions
+%%% ---------------------------------------------------
+
+dispatch({'$gen_cast', Msg}, Mod, State) ->
+    Mod:handle_cast(Msg, State);
+dispatch(Info, Mod, State) ->
+    Mod:handle_info(Info, State).
+
+handle_msg({'$gen_call', From, Msg},
+           Parent, Name, State, Mod, _Time, Queue) ->
+    case catch Mod:handle_call(Msg, From, State) of
+	{reply, Reply, NState} ->
+	    reply(From, Reply),
+	    loop(Parent, Name, NState, Mod, infinity, Queue, []);
+	{reply, Reply, NState, Time1} ->
+	    reply(From, Reply),
+	    loop(Parent, Name, NState, Mod, Time1, Queue, []);
+	{noreply, NState} ->
+	    loop(Parent, Name, NState, Mod, infinity, Queue, []);
+	{noreply, NState, Time1} ->
+	    loop(Parent, Name, NState, Mod, Time1, Queue, []);
+	{stop, Reason, Reply, NState} ->
+	    {'EXIT', R} = 
+		(catch terminate(Reason, Name, Msg, Mod, NState, [])),
+	    reply(From, Reply),
+	    exit(R);
+	Other -> handle_common_reply(Other,
+                                     Parent, Name, Msg, Mod, State, Queue)
+    end;
+handle_msg(Msg,
+           Parent, Name, State, Mod, _Time, Queue) ->
+    Reply = (catch dispatch(Msg, Mod, State)),
+    handle_common_reply(Reply, Parent, Name, Msg, Mod, State, Queue).
+
+handle_msg({'$gen_call', From, Msg},
+           Parent, Name, State, Mod, _Time, Queue, Debug) ->
+    case catch Mod:handle_call(Msg, From, State) of
+	{reply, Reply, NState} ->
+	    Debug1 = reply(Name, From, Reply, NState, Debug),
+	    loop(Parent, Name, NState, Mod, infinity, Queue, Debug1);
+	{reply, Reply, NState, Time1} ->
+	    Debug1 = reply(Name, From, Reply, NState, Debug),
+	    loop(Parent, Name, NState, Mod, Time1, Queue, Debug1);
+	{noreply, NState} ->
+	    Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name,
+				      {noreply, NState}),
+	    loop(Parent, Name, NState, Mod, infinity, Queue, Debug1);
+	{noreply, NState, Time1} ->
+	    Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name,
+				      {noreply, NState}),
+	    loop(Parent, Name, NState, Mod, Time1, Queue, Debug1);
+	{stop, Reason, Reply, NState} ->
+	    {'EXIT', R} = 
+		(catch terminate(Reason, Name, Msg, Mod, NState, Debug)),
+	    reply(Name, From, Reply, NState, Debug),
+	    exit(R);
+	Other ->
+	    handle_common_reply(Other,
+                                Parent, Name, Msg, Mod, State, Queue, Debug)
+    end;
+handle_msg(Msg,
+           Parent, Name, State, Mod, _Time, Queue, Debug) ->
+    Reply = (catch dispatch(Msg, Mod, State)),
+    handle_common_reply(Reply,
+                        Parent, Name, Msg, Mod, State, Queue, Debug).
+
+handle_common_reply(Reply, Parent, Name, Msg, Mod, State, Queue) ->
+    case Reply of
+	{noreply, NState} ->
+	    loop(Parent, Name, NState, Mod, infinity, Queue, []);
+	{noreply, NState, Time1} ->
+	    loop(Parent, Name, NState, Mod, Time1, Queue, []);
+	{stop, Reason, NState} ->
+	    terminate(Reason, Name, Msg, Mod, NState, []);
+	{'EXIT', What} ->
+	    terminate(What, Name, Msg, Mod, State, []);
+	_ ->
+	    terminate({bad_return_value, Reply}, Name, Msg, Mod, State, [])
+    end.
+
+handle_common_reply(Reply, Parent, Name, Msg, Mod, State, Queue, Debug) ->
+    case Reply of
+	{noreply, NState} ->
+	    Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name,
+				      {noreply, NState}),
+	    loop(Parent, Name, NState, Mod, infinity, Queue, Debug1);
+	{noreply, NState, Time1} ->
+	    Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name,
+				      {noreply, NState}),
+	    loop(Parent, Name, NState, Mod, Time1, Queue, Debug1);
+	{stop, Reason, NState} ->
+	    terminate(Reason, Name, Msg, Mod, NState, Debug);
+	{'EXIT', What} ->
+	    terminate(What, Name, Msg, Mod, State, Debug);
+	_ ->
+	    terminate({bad_return_value, Reply}, Name, Msg, Mod, State, Debug)
+    end.
+
+reply(Name, {To, Tag}, Reply, State, Debug) ->
+    reply({To, Tag}, Reply),
+    sys:handle_debug(Debug, {?MODULE, print_event}, Name, 
+		     {out, Reply, To, State} ).
+
+
+%%-----------------------------------------------------------------
+%% Callback functions for system messages handling.
+%%-----------------------------------------------------------------
+system_continue(Parent, Debug, [Name, State, Mod, Time, Queue]) ->
+    loop(Parent, Name, State, Mod, Time, Queue, Debug).
+
+system_terminate(Reason, _Parent, Debug, [Name, State, Mod, _Time, _Queue]) ->
+    terminate(Reason, Name, [], Mod, State, Debug).
+
+system_code_change([Name, State, Mod, Time, Queue], _Module, OldVsn, Extra) ->
+    case catch Mod:code_change(OldVsn, State, Extra) of
+	{ok, NewState} -> {ok, [Name, NewState, Mod, Time, Queue]};
+	Else -> Else
+    end.
+
+%%-----------------------------------------------------------------
+%% Format debug messages.  Print them as the call-back module sees
+%% them, not as the real erlang messages.  Use trace for that.
+%%-----------------------------------------------------------------
+print_event(Dev, {in, Msg}, Name) ->
+    case Msg of
+	{'$gen_call', {From, _Tag}, Call} ->
+	    io:format(Dev, "*DBG* ~p got call ~p from ~w~n",
+		      [Name, Call, From]);
+	{'$gen_cast', Cast} ->
+	    io:format(Dev, "*DBG* ~p got cast ~p~n",
+		      [Name, Cast]);
+	_ ->
+	    io:format(Dev, "*DBG* ~p got ~p~n", [Name, Msg])
+    end;
+print_event(Dev, {out, Msg, To, State}, Name) ->
+    io:format(Dev, "*DBG* ~p sent ~p to ~w, new state ~w~n", 
+	      [Name, Msg, To, State]);
+print_event(Dev, {noreply, State}, Name) ->
+    io:format(Dev, "*DBG* ~p new state ~w~n", [Name, State]);
+print_event(Dev, Event, Name) ->
+    io:format(Dev, "*DBG* ~p dbg  ~p~n", [Name, Event]).
+
+
+%%% ---------------------------------------------------
+%%% Terminate the server.
+%%% ---------------------------------------------------
+
+terminate(Reason, Name, Msg, Mod, State, Debug) ->
+    case catch Mod:terminate(Reason, State) of
+	{'EXIT', R} ->
+	    error_info(R, Name, Msg, State, Debug),
+	    exit(R);
+	_ ->
+	    case Reason of
+		normal ->
+		    exit(normal);
+		shutdown ->
+		    exit(shutdown);
+		_ ->
+		    error_info(Reason, Name, Msg, State, Debug),
+		    exit(Reason)
+	    end
+    end.
+
+error_info(_Reason, application_controller, _Msg, _State, _Debug) ->
+    %% OTP-5811 Don't send an error report if it's the system process
+    %% application_controller which is terminating - let init take care
+    %% of it instead
+    ok;
+error_info(Reason, Name, Msg, State, Debug) ->
+    Reason1 = 
+	case Reason of
+	    {undef,[{M,F,A}|MFAs]} ->
+		case code:is_loaded(M) of
+		    false ->
+			{'module could not be loaded',[{M,F,A}|MFAs]};
+		    _ ->
+			case erlang:function_exported(M, F, length(A)) of
+			    true ->
+				Reason;
+			    false ->
+				{'function not exported',[{M,F,A}|MFAs]}
+			end
+		end;
+	    _ ->
+		Reason
+	end,    
+    format("** Generic server ~p terminating \n"
+           "** Last message in was ~p~n"
+           "** When Server state == ~p~n"
+           "** Reason for termination == ~n** ~p~n",
+	   [Name, Msg, State, Reason1]),
+    sys:print_log(Debug),
+    ok.
+
+%%% ---------------------------------------------------
+%%% Misc. functions.
+%%% ---------------------------------------------------
+
+opt(Op, [{Op, Value}|_]) ->
+    {ok, Value};
+opt(Op, [_|Options]) ->
+    opt(Op, Options);
+opt(_, []) ->
+    false.
+
+debug_options(Name, Opts) ->
+    case opt(debug, Opts) of
+	{ok, Options} -> dbg_options(Name, Options);
+	_ -> dbg_options(Name, [])
+    end.
+
+dbg_options(Name, []) ->
+    Opts = 
+	case init:get_argument(generic_debug) of
+	    error ->
+		[];
+	    _ ->
+		[log, statistics]
+	end,
+    dbg_opts(Name, Opts);
+dbg_options(Name, Opts) ->
+    dbg_opts(Name, Opts).
+
+dbg_opts(Name, Opts) ->
+    case catch sys:debug_options(Opts) of
+	{'EXIT',_} ->
+	    format("~p: ignoring erroneous debug options - ~p~n",
+		   [Name, Opts]),
+	    [];
+	Dbg ->
+	    Dbg
+    end.
+
+get_proc_name(Pid) when is_pid(Pid) ->
+    Pid;
+get_proc_name({local, Name}) ->
+    case process_info(self(), registered_name) of
+	{registered_name, Name} ->
+	    Name;
+	{registered_name, _Name} ->
+	    exit(process_not_registered);
+	[] ->
+	    exit(process_not_registered)
+    end;    
+get_proc_name({global, Name}) ->
+    case global:safe_whereis_name(Name) of
+	undefined ->
+	    exit(process_not_registered_globally);
+	Pid when Pid =:= self() ->
+	    Name;
+	_Pid ->
+	    exit(process_not_registered_globally)
+    end.
+
+get_parent() ->
+    case get('$ancestors') of
+	[Parent | _] when is_pid(Parent)->
+            Parent;
+        [Parent | _] when is_atom(Parent)->
+            name_to_pid(Parent);
+	_ ->
+	    exit(process_was_not_started_by_proc_lib)
+    end.
+