Commits

Jacob Perkins committed b1a4b8e

replace emongo_conn with emongo_server gen_server

Comments (0)

Files changed (5)

.gitignore

-ebin/*.beam
-ebin/*.app
 	{description, "Erlang MongoDB Driver"},
 	{vsn, "0.0.2"},
 	{modules, [
-		emongo, emongo_app, emongo_sup, emongo_bson, emongo_conn, emongo_packet
+		emongo, emongo_app, emongo_sup, emongo_bson, emongo_packet, emongo_server
 	]},
 	{registered, [emongo_sup, emongo]},
 	{mod, {emongo_app, []}},
 find(PoolId, Collection, Query) when is_record(Query, emo_query) ->
 	{Pid, Pool} = gen_server:call(?MODULE, {pid, PoolId}, infinity),
 	Packet = emongo_packet:do_query(Pool#pool.database, Collection, Pool#pool.req_id, Query),
-	emongo_conn:send_recv(Pid, Pool#pool.req_id, Packet, ?TIMEOUT).
+	emongo_server:send_recv(Pid, Pool#pool.req_id, Packet, ?TIMEOUT).
 	
 %% @spec find(PoolId, Collection, Selector, Options) -> Result
 %%		 PoolId = atom()
 	{Pid, Pool} = gen_server:call(?MODULE, {pid, PoolId}, infinity),
 	Query = create_query(Options, Selector),
 	Packet = emongo_packet:do_query(Pool#pool.database, Collection, Pool#pool.req_id, Query),
-	Resp = emongo_conn:send_recv(Pid, Pool#pool.req_id, Packet, proplists:get_value(timeout, Options, ?TIMEOUT)),
+	Resp = emongo_server:send_recv(Pid, Pool#pool.req_id, Packet, proplists:get_value(timeout, Options, ?TIMEOUT)),
 	case lists:member(response_options, Options) of
 		true -> Resp;
 		false -> Resp#response.documents
 get_more(PoolId, Collection, CursorID, NumToReturn, Timeout) ->
 	{Pid, Pool} = gen_server:call(?MODULE, {pid, PoolId}, infinity),
 	Packet = emongo_packet:get_more(Pool#pool.database, Collection, Pool#pool.req_id, NumToReturn, CursorID),
-	emongo_conn:send_recv(Pid, Pool#pool.req_id, Packet, Timeout).
+	emongo_server:send_recv(Pid, Pool#pool.req_id, Packet, Timeout).
 	
 kill_cursors(PoolId, CursorID) when is_integer(CursorID) ->
 	kill_cursors(PoolId, [CursorID]);
 kill_cursors(PoolId, CursorIDs) when is_list(CursorIDs) ->
 	{Pid, Pool} = gen_server:call(?MODULE, {pid, PoolId}, infinity),
 	Packet = emongo_packet:kill_cursors(Pool#pool.req_id, CursorIDs),
-	emongo_conn:send(Pid, Pool#pool.req_id, Packet).
+	emongo_server:send(Pid, Pool#pool.req_id, Packet).
 	
 %%------------------------------------------------------------------------------
 %% insert
 insert(PoolId, Collection, Documents) when ?IS_LIST_OF_DOCUMENTS(Documents) ->
 	{Pid, Pool} = gen_server:call(?MODULE, {pid, PoolId}, infinity),
 	Packet = emongo_packet:insert(Pool#pool.database, Collection, Pool#pool.req_id, Documents),
-	emongo_conn:send(Pid, Pool#pool.req_id, Packet).
+	emongo_server:send(Pid, Pool#pool.req_id, Packet).
 
 %%------------------------------------------------------------------------------
 %% update
 update(PoolId, Collection, Selector, Document, Upsert) when ?IS_DOCUMENT(Selector), ?IS_DOCUMENT(Document) ->
 	{Pid, Pool} = gen_server:call(?MODULE, {pid, PoolId}, infinity),
 	Packet = emongo_packet:update(Pool#pool.database, Collection, Pool#pool.req_id, Upsert, Selector, Document),
-	emongo_conn:send(Pid, Pool#pool.req_id, Packet).
+	emongo_server:send(Pid, Pool#pool.req_id, Packet).
 
 %%------------------------------------------------------------------------------
 %% delete
 delete(PoolId, Collection, Selector) ->
 	{Pid, Pool} = gen_server:call(?MODULE, {pid, PoolId}, infinity),
 	Packet = emongo_packet:delete(Pool#pool.database, Collection, Pool#pool.req_id, Selector),
-	emongo_conn:send(Pid, Pool#pool.req_id, Packet).
+	emongo_server:send(Pid, Pool#pool.req_id, Packet).
 
 %%------------------------------------------------------------------------------
 %% ensure index
 ensure_index(PoolId, Collection, Keys) when ?IS_DOCUMENT(Keys)->
 	{Pid, Pool} = gen_server:call(?MODULE, {pid, PoolId}, infinity),
 	Packet = emongo_packet:ensure_index(Pool#pool.database, Collection, Pool#pool.req_id, Keys),
-	emongo_conn:send(Pid, Pool#pool.req_id, Packet).
+	emongo_server:send(Pid, Pool#pool.req_id, Packet).
 
 count(PoolId, Collection) ->
 	{Pid, Pool} = gen_server:call(?MODULE, {pid, PoolId}, infinity),
 	Query = #emo_query{q=[{<<"count">>, Collection}, {<<"ns">>, Pool#pool.database}], limit=1},
 	Packet = emongo_packet:do_query(Pool#pool.database, "$cmd", Pool#pool.req_id, Query),
-	case emongo_conn:send_recv(Pid, Pool#pool.req_id, Packet, ?TIMEOUT) of
+	case emongo_server:send_recv(Pid, Pool#pool.req_id, Packet, ?TIMEOUT) of
 		#response{documents=[[{<<"n">>,Count}|_]]} ->
 			round(Count);
 		_ ->
 do_open_connections(#pool{conn_pids=Pids, size=Size}=Pool) -> 
 	case queue:len(Pids) < Size of
 		true ->
-			Pid = emongo_conn:start_link(Pool#pool.id, Pool#pool.host, Pool#pool.port),
+			%Pid = emongo_server:start_link(Pool#pool.id, Pool#pool.host, Pool#pool.port),
+			{ok, Pid} = emongo_server:start_link(Pool#pool.id, Pool#pool.host, Pool#pool.port),
 			do_open_connections(Pool#pool{conn_pids = queue:in(Pid, Pids)});
 		false ->
 			Pool

src/emongo_conn.erl

-%% Copyright (c) 2009 Jacob Vorreuter <jacob.vorreuter@gmail.com>
-%% 
-%% Permission is hereby granted, free of charge, to any person
-%% obtaining a copy of this software and associated documentation
-%% files (the "Software"), to deal in the Software without
-%% restriction, including without limitation the rights to use,
-%% copy, modify, merge, publish, distribute, sublicense, and/or sell
-%% copies of the Software, and to permit persons to whom the
-%% Software is furnished to do so, subject to the following
-%% conditions:
-%% 
-%% The above copyright notice and this permission notice shall be
-%% included in all copies or substantial portions of the Software.
-%% 
-%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
-%% EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
-%% OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
-%% NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
-%% HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
-%% WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
-%% FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
-%% OTHER DEALINGS IN THE SOFTWARE.
--module(emongo_conn).
-
--export([start_link/3, init/4, send/3, send_recv/4]).
-
--record(request, {req_id, requestor}).
--record(state, {pool_id, socket, requests}).
-
--include("emongo.hrl").
-
-start_link(PoolId, Host, Port) ->
-	proc_lib:start_link(?MODULE, init, [PoolId, Host, Port, self()]).
-	
-init(PoolId, Host, Port, Parent) ->
-	Socket = open_socket(Host, Port),
-	proc_lib:init_ack(Parent, self()),
-	loop(#state{pool_id=PoolId, socket=Socket, requests=[]}, <<>>).
-	
-send(Pid, ReqID, Packet) ->
-	case gen:call(Pid, '$emongo_conn_send', {ReqID, Packet}) of
-		{ok, Result} -> Result;
-		{error, Reason} -> exit(Reason)
-	end.
-	
-send_recv(Pid, ReqID, Packet, Timeout) ->
-	case gen:call(Pid, '$emongo_conn_send_recv', {ReqID, Packet}, Timeout) of
-		{ok, Resp} ->
-			Documents = emongo_bson:decode(Resp#response.documents),
-			Resp#response{documents=Documents};
-		{error, Reason} -> 
-			exit(Reason)
-	end.
-	
-loop(State, Leftover) ->
-	Socket = State#state.socket,
-	receive
-		{'$emongo_conn_send', {From, Mref}, {_ReqID, Packet}} ->
-			gen_tcp:send(Socket, Packet),
-			gen:reply({From, Mref}, ok),
-			loop(State, Leftover);
-		{'$emongo_conn_send_recv', {From, Mref}, {ReqID, Packet}} -> 
-			gen_tcp:send(Socket, Packet),
-			Request = #request{req_id=ReqID, requestor={From, Mref}},
-			State1 = State#state{requests=[{ReqID, Request}|State#state.requests]},
-			loop(State1, Leftover);
-		{tcp, Socket, Data} ->
-			case emongo_packet:decode_response(<<Leftover/binary, Data/binary>>) of
-				undefined ->
-					loop(State, <<Leftover/binary, Data/binary>>);
-				{Resp, Tail} ->
-					ResponseTo = (Resp#response.header)#header.response_to,
-					case find_request(ResponseTo, State#state.requests, []) of
-						{undefined, _} ->
-							exit({unexpected_response, Resp});
-						{Request, Others} ->
-							gen:reply(Request#request.requestor, Resp),
-							loop(State#state{requests=Others}, Tail)
-					end
-			end;
-		{tcp_closed, Socket} ->
-			exit({State#state.pool_id, tcp_closed});
-		{tcp_error, Socket, Reason} ->
-			exit({State#state.pool_id, Reason})
-	end.
-	
-open_socket(Host, Port) ->
-	case gen_tcp:connect(Host, Port, [binary, {active, true}]) of
-		{ok, Sock} ->
-			Sock;
-		{error, Reason} ->
-			exit({failed_to_open_socket, Reason})
-	end.
-	
-find_request(RequestID, [{RequestID, Request}|Tail], OtherReqs) ->
-	{Request, lists:append(OtherReqs, Tail)};
-
-find_request(RequestID, [Request|Tail], OtherReqs) ->
-	find_request(RequestID, Tail, [Request|OtherReqs]);
-	
-find_request(_RequestID, [], OtherReqs) ->
-	{undefined, OtherReqs}.

src/emongo_server.erl

+-module(emongo_server).
+
+-behaviour(gen_server).
+
+-include("emongo.hrl").
+
+-export([start_link/3, send/3, send_recv/4]).
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+		 terminate/2, code_change/3]).
+
+-record(request, {req_id, requestor}).
+-record(state, {pool_id, socket, requests, leftover}).
+
+start_link(PoolId, Host, Port) ->
+	gen_server:start_link(?MODULE, [PoolId, Host, Port], []).
+
+send(Pid, ReqID, Packet) ->
+	case gen_server:call(Pid, {send, ReqID, Packet}) of
+		{ok, Result} -> Result;
+		{error, Reason} -> exit(Reason)
+	end.
+	
+send_recv(Pid, ReqID, Packet, Timeout) ->
+	case gen_server:call(Pid, {send_recv, ReqID, Packet}, Timeout) of
+		{ok, Resp} ->
+			Documents = emongo_bson:decode(Resp#response.documents),
+			Resp#response{documents=Documents};
+		{error, Reason} -> 
+			exit(Reason)
+	end.
+	
+open_socket(Host, Port) ->
+	case gen_tcp:connect(Host, Port, [binary, {active, true}, {nodelay, true}]) of
+		{ok, Sock} ->
+			Sock;
+		{error, Reason} ->
+			exit({failed_to_open_socket, Reason})
+	end.
+
+%% gen_server %%
+	
+init([PoolId, Host, Port]) ->
+	Socket = open_socket(Host, Port),
+	{ok, #state{pool_id=PoolId, socket=Socket, requests=[], leftover = <<>>}}.
+
+handle_call({send, _, Packet}, _From, State) ->
+	% TODO: make this a cast
+	gen_tcp:send(State#state.socket, Packet),
+	{reply, ok, State};
+handle_call({send_recv, ReqID, Packet}, From, State) ->
+	gen_tcp:send(State#state.socket, Packet),
+	Request = #request{req_id=ReqID, requestor=From},
+	State1 = State#state{requests=[{ReqID, Request} | State#state.requests]},
+	{noreply, State1}.
+
+handle_cast(_, State) -> {noreply, State}.
+
+handle_info({tcp, _Socket, Data}, State) ->
+	Leftover = <<(State#state.leftover)/binary, Data/binary>>,
+	
+	case emongo_packet:decode_response(Leftover) of
+		undefined ->
+			{noreply, State#state{leftover=Leftover}};
+		{Resp, Tail} ->
+			ResponseTo = (Resp#response.header)#header.response_to,
+			
+			case lists:keytake(ResponseTo, 1, State#state.requests) of
+				false ->
+					exit({unexpected_response, Resp});
+				{value, {ResponseTo, Request}, Requests} ->
+					gen_server:reply(Request#request.requestor, {ok, Resp}),
+					{noreply, State#state{requests=Requests, leftover=Tail}}
+			end
+	end;
+handle_info({tcp_closed, _Socket}, State) ->
+	exit({State#state.pool_id, tcp_closed});
+handle_info({tcp_error, _Socket, Reason}, State) ->
+	exit({State#state.pool_id, Reason}).
+
+terminate(_, State) -> gen_tcp:close(State#state.socket).
+
+code_change(_Old, State, _Extra) -> {ok, State}.