Commits

Jacob Perkins committed a17dea0

erldis_client cleanup

Comments (0)

Files changed (1)

src/erldis_client.erl

-%% @doc This is a    very similar to erldis_client, but it does
+%% @doc This is a	 very similar to erldis_client, but it does
 %% synchronous calls instead of async pipelining. Does so by keeping a queue
 %% of From pids in State.calls, then calls gen_server2:reply when it receives
 %% handle_info({tcp, ...). Therefore, it must get commands on handle_call
 %%%%%%%%%%%%%
 %% helpers %%
 %%%%%%%%%%%%%
-    
+
 str(X) when is_list(X) ->
 	list_to_binary(X);
 str(X) when is_atom(X) ->
 str(X) when is_float(X) ->
 	list_to_binary(float_to_list(X));
 str(X) ->
-  term_to_binary(X).
-  
+	term_to_binary(X).
+
 format([], Result) ->
-  Result;
+	Result;
 format([Line|Rest], <<>>) ->
-  JoinedLine = binary_join(Line, <<" ">>),
-  format(Rest, JoinedLine);
-  
+	JoinedLine = binary_join(Line, <<" ">>),
+	format(Rest, JoinedLine);
+
 format([Line|Rest], Result) ->
-  Sep = <<?EOL>>,
+	Sep = <<?EOL>>,
 	JoinedLine = binary_join(Line, <<" ">>),
-	format(Rest, <<Result/binary,Sep/binary,JoinedLine/binary>>).
+	format(Rest, <<Result/binary, Sep/binary, JoinedLine/binary>>).
 
 format(Lines) ->
 	format(Lines, <<>>).
 
 sformat(<<>>)->
-  <<>>;
+	<<>>;
 sformat(Line) ->
-	format([Line],<<>>).
+	format([Line], <<>>).
 
-binary_join([], _)-> <<>>;
+binary_join([], _)->
+	<<>>;
 binary_join(Array, Sep)->
-  Sz = size(Sep),
-  R = lists:foldl(fun(Elem, Acc)->
-    E2 = str(Elem),
-	  <<Acc/binary,Sep/binary,E2/binary>>
-	end, <<>>, Array),
-	<<_:Sz/bytes,Result/binary>> = R,
+	F = fun(Elem, Acc) ->
+			E2 = str(Elem),
+			<<Acc/binary, Sep/binary, E2/binary>>
+		end,
+	
+	Sz = size(Sep),
+	<<_:Sz/bytes, Result/binary>> = lists:foldl(F, <<>>, Array),
 	Result.
 
 trim2({ok, S}) ->
-  Read = size(S)-2,
-  <<R:Read/bytes,_R/binary>> = S,
-  R;
+	Read = size(S)-2,
+	<<R:Read/bytes, _/binary>> = S,
+	R;
 trim2(S) ->
 	trim2({ok, S}).
 
 	end.
 
 ensure_started(#redis{socket=undefined, host=Host, port=Port, timeout=Timeout, db=DB}=State)->
-	Opts = [binary, {active, false}, {packet, line}, {nodelay, true}, {send_timeout, Timeout}],
+	Opts = [binary, {active, false}, {packet, line}, {nodelay, true},
+			{send_timeout, Timeout}],
+	
 	case gen_tcp:connect(Host, Port, Opts, Timeout) of
 		{ok, Socket} ->
-			%error_logger:info_report([{?MODULE, reconnected}, State]),
 			End = <<?EOL>>,
-			case DB of 
-			  <<"0">> ->
-			    ok;
-			  _Id ->
-			    gen_tcp:send(Socket, <<"select ",DB,End>>),
-			    {ok, <<"+OK",_R/binary>>} = gen_tcp:recv(Socket, 10)
+			
+			if
+				DB == <<"0">> ->
+					ok;
+				true ->
+					gen_tcp:send(Socket, <<"select ", DB/binary, End/binary>>),
+					{ok, <<"+OK", _R/binary>>} = gen_tcp:recv(Socket, 10)
 			end,
+			
 			inet:setopts(Socket, [{active, once}]),
 			State#redis{socket=Socket};
 		{error, Why} ->
 
 sr_scall(Client, Cmd) -> sr_scall(Client, Cmd, []).
 
-sr_scall(Client, Cmd, Args) -> 
-  case scall(Client, Cmd, Args) of
-    [R] -> R;
-    ok -> ok
-  end.
+sr_scall(Client, Cmd, Args) ->
+	case scall(Client, Cmd, Args) of
+		[R] -> R;
+		ok -> ok
+	end.
 
 % This is the simple send with a single row of commands
-scall(Client, Cmd) -> scall(Client, Cmd, <<>> ).
+scall(Client, Cmd) -> scall(Client, Cmd, <<>>).
 
 scall(Client, Cmd, Args) ->
-  Args2 = sformat(Args),
-	M = case gen_server2:call(Client, is_pipelined) of
-	  true -> cast;
-	  _ -> call
-	end,
-	case apply(gen_server2,M,[Client,{send, <<Cmd/binary,Args2/binary>>}]) of
-		{error, Reason} -> throw({error, Reason});
-		Retval -> Retval
-	end.
-	
+	Args2 = sformat(Args),
+	send(Client, <<Cmd/binary, Args2/binary>>).
+
 % This is the complete send with multiple rows
 call(Client, Cmd) -> call(Client, Cmd, []).
 
 call(Client, Cmd, Args) ->
-  Args2 = format(Args),
-	SCmd = <<Cmd/binary, Args2/binary>>,
-	M = case gen_server2:call(Client, is_pipelined) of
-	  true -> cast;
-	  _ -> call
-	end,
-	case apply(gen_server2,M,[Client, {send, SCmd}]) of
-		{error, Reason} -> throw({error, Reason});
-		Retval -> Retval
+	Args2 = format(Args),
+	send(Client, <<Cmd/binary, Args2/binary>>).
+
+send(Client, Cmd) ->
+	Piped = gen_server2:call(Client, is_pipelined),
+	
+	if
+		Piped ->
+			gen_server2:cast(Client, {send, Cmd});
+		true ->
+			case gen_server2:call(Client, {send, Cmd}) of
+				{error, Reason} -> throw({error, Reason});
+				Retval -> Retval
+			end
 	end.
 
 % stop is synchronous so can be sure that client is shutdown
 	end.
 
 select(Client, DB) ->
-  DBB = list_to_binary(integer_to_list(DB)),
-	[ok] = scall(Client,<<"select ",DBB/binary>>),
+	DBB = list_to_binary(integer_to_list(DB)),
+	[ok] = scall(Client, <<"select ", DBB/binary>>),
 	Client.
 
 info(Client) ->
 				{Key, Val} -> {Key, Val}
 			end
 		end,
-	
+
 	[S] = scall(Client, info),
 	elists:mapfilter(F, string:tokens(binary_to_list(S), "\r\n")).
 
 
 handle_call(is_pipelined, _From, #redis{pipeline=P}=State)->
   {reply, P, State};
-
 handle_call(get_all_results, From, #redis{pipeline=true, calls=Calls} = State) ->
-    %error_logger:info_report([{state, State}, {calls, queue:len(Calls)}]),
-    case queue:len(Calls) of
-        0 ->
-            % answers came earlier than we could start listening...
-            % Very unlikely but totally possible.
-            {reply, lists:reverse(State#redis.results), State#redis{results=[], calls=Calls}};
-        _ ->
-            % We are here earlier than results came, so just make
-            % ourselves wait until stuff is ready.
-            {noreply, State#redis{reply_caller=fun(V) -> gen_server2:reply(From, V) end}}
-    end;
-
-
-  
+	case queue:len(Calls) of
+		0 ->
+			% answers came earlier than we could start listening...
+			% Very unlikely but totally possible.
+			{reply, lists:reverse(State#redis.results), State#redis{results=[], calls=Calls}};
+		_ ->
+			% We are here earlier than results came, so just make
+			% ourselves wait until stuff is ready.
+			{noreply, State#redis{reply_caller=fun(V) -> gen_server2:reply(From, V) end}}
+	end;
 handle_call({send, Cmd}, From, State1) ->
 	% NOTE: redis ignores sent commands it doesn't understand, which means
 	% we don't get a reply, which means callers will timeout
 	End = <<?EOL>>,
 	State = ensure_started(State1),
+	
 	case gen_tcp:send(State#redis.socket, [Cmd|End]) of
 		ok ->
-			%error_logger:info_report([{send, Cmd}, {from, From}]),
 			Queue = queue:in(From, State#redis.calls),
+			
 			case Cmd of
-			  <<"select ", DB/binary>> ->
-			    {noreply, State#redis{calls=Queue, remaining=1, db=DB}};
-			  _ ->
-			    {noreply, State#redis{calls=Queue, remaining=1}}
+				<<"select ", DB/binary>> ->
+					{noreply, State#redis{calls=Queue, remaining=1, db=DB}};
+				_ ->
+					{noreply, State#redis{calls=Queue, remaining=1}}
 			end;
 		{error, Reason} ->
 			error_logger:error_report([{send, Cmd}, {error, Reason}]),
 handle_call(_, _, State) ->
 	{reply, undefined, State}.
 
+%%%%%%%%%%%%%%%%%
+%% handle_cast %%
+%%%%%%%%%%%%%%%%%
+
 handle_cast({pipelining, Bool}, State) ->
-  {noreply, State#redis{pipeline=Bool}};
+	{noreply, State#redis{pipeline=Bool}};
 handle_cast(disconnect, State) ->
 	{stop, shutdown, State};
+handle_cast({send, Cmd}, #redis{remaining=Remaining, calls=Calls} = State1) ->
+	End = <<?EOL>>,
+	State = ensure_started(State1),
+	Queue = queue:in(async, Calls),
+	gen_tcp:send(State#redis.socket, [Cmd|End]),
 	
-handle_cast({send, Cmd},#redis{remaining=Remaining, 
-                                      calls=Calls} = State1) ->
-  End = <<?EOL>>,
-  State = ensure_started(State1),
-  Queue = queue:in(async, Calls),
-  gen_tcp:send(State#redis.socket, [Cmd|End]),
-  case Remaining of
-      0 ->
-          {noreply, State#redis{remaining=1, calls=Queue}};
-      _ ->
-          {noreply,State#redis{calls=Queue}}
-  end;
+	case Remaining of
+		0 ->
+			{noreply, State#redis{remaining=1, calls=Queue}};
+		_ ->
+			{noreply,State#redis{calls=Queue}}
+	end;
 handle_cast(_, State) ->
 	{noreply, State}.
 
 
 recv_value(Socket, NBytes) ->
 	inet:setopts(Socket, [{packet, 0}]), % go into raw mode to read bytes
+	
 	case gen_tcp:recv(Socket, NBytes+2) of
 		{ok, Packet} ->
 			inet:setopts(Socket, [{packet, line}]), % go back to line mode
-			%error_logger:info_report([{packet, Packet}]),
 			trim2({ok, Packet});
 		{error, Reason} ->
 			error_logger:error_report([{recv, NBytes}, {error, Reason}]),
 			throw({error, Reason})
 	end.
 
-send_reply(#redis{pipeline=true,calls=Calls,results=Results, reply_caller=ReplyCaller}=State)->
-  Result = case lists:reverse(State#redis.buffer) of
-    [V] when is_atom(V) -> V;
-    R -> R
-  end,
-  {{value, _From}, Queue} = queue:out(Calls),
-  case queue:len(Queue) of
-    0 ->
-      %error_logger:info_report([sendreply, {state, State}, {queue,queue:len(Queue) }]),
-      FullResults = [Result|Results],
-      NewState = case ReplyCaller of
-          undefined ->
-              State#redis{results=FullResults};
-          _ ->
-              ReplyCaller(lists:reverse(FullResults)),
-              State#redis{results=[]}
-      end,
-      NewState#redis{remaining=0, pstate=empty,
-                     reply_caller=undefined, buffer=[],
-                     calls=Queue};
-    _ ->
-      %error_logger:info_report([sendreply, {state, State}, {queue,queue:len(Queue) }]),
-      State#redis{results=[Result|Results], remaining=1, pstate=empty, buffer=[], calls=Queue}
-  end;
+send_reply(#redis{pipeline=true, calls=Calls, results=Results, reply_caller=ReplyCaller}=State)->
+	Result = case lists:reverse(State#redis.buffer) of
+		[V] when is_atom(V) -> V;
+		R -> R
+	end,
+	
+	{{value, _From}, Queue} = queue:out(Calls),
+	
+	case queue:len(Queue) of
+		0 ->
+			FullResults = [Result|Results],
+			
+			NewState = case ReplyCaller of
+				undefined ->
+					State#redis{results=FullResults};
+				_ ->
+					ReplyCaller(lists:reverse(FullResults)),
+					State#redis{results=[]}
+			end,
+			
+			NewState#redis{remaining=0, pstate=empty,
+						 reply_caller=undefined, buffer=[],
+						 calls=Queue};
+		_ ->
+			State#redis{results=[Result|Results], remaining=1, pstate=empty, buffer=[], calls=Queue}
+	end;
 
 send_reply(State) ->
 	{{value, From}, Queue} = queue:out(State#redis.calls),
 	Reply = lists:reverse(State#redis.buffer),
-	%error_logger:info_report([sendreply_no_pipeline, {state, State}, {queue,queue:len(Queue) }]),
 	gen_server2:reply(From, Reply),
 	State#redis{calls=Queue, buffer=[], pstate=empty}.
 
 
 parse_state(State, Socket, Data) ->
 	Parse = erldis_proto:parse(State#redis.pstate, trim2(Data)),
-	%error_logger:info_report([{parse, Parse}, {state , State}]),
+	
 	case {State#redis.remaining-1, Parse} of
 		{0, error} ->
 			% next line is the error string
 		{_, {read, nil}} ->
 			% reply with nil
 			send_reply(State#redis{buffer=[nil]});
-	  {_, {read, 0}} ->
+		{_, {read, 0}} ->
 			% reply with nil
 			send_reply(State#redis{buffer=[]});
 		{0, {read, NBytes}} ->
 		{0, Value} ->
 			% reply with Value
 			Buffer = [Value | State#redis.buffer],
-		  send_reply(State#redis{buffer=Buffer});
-		{N, Value} -> 
-		  Buffer = [Value | State#redis.buffer],
-		  %error_logger:info_report([{buffer, Buffer},{parsed, {N, Value}}, {state , State}]),
-		  State#redis{remaining=N, buffer=Buffer, pstate=read}
+			send_reply(State#redis{buffer=Buffer});
+		{N, Value} ->
+			Buffer = [Value | State#redis.buffer],
+			State#redis{remaining=N, buffer=Buffer, pstate=read}
 	end.
 
 handle_info({tcp, Socket, Data}, State) ->
 	case (catch parse_state(State, Socket, Data)) of
 		{error, Reason} ->
-			%error_logger:error_report([{parse_state, Data}, {error, Reason}]),
 			{stop, Reason, State};
 		NewState ->
 			inet:setopts(Socket, [{active, once}]),
 	% NOTE: if supervised with brutal_kill, may not be able to reply
 	R = fun(From) -> gen_server2:reply(From, {error, closed}) end,
 	lists:foreach(R, queue:to_list(State#redis.calls)),
+	
 	case State#redis.socket of
 		undefined -> ok;
 		Socket -> gen_tcp:close(Socket)