Commits

Jacob Vorreuter  committed 241b372

getting find all api function working

  • Participants
  • Parent commits ce3203f

Comments (0)

Files changed (5)

File include/emongo.hrl

 -record(pool, {id, host, port, database, size=1, conn_pids=[], req_id=1}).
 -record(header, {message_length, request_id, response_to, op_code}).
--record(response, {header, response_flag, cursor_id, starting_from, number_returned, documents}).
--record(emo_query, {opts=[], num_to_skip=0, num_to_return=0, q=[], field_selector=[]}).
+-record(response, {header, response_flag, cursor_id, offset, limit, documents}).
+-record(emo_query, {opts=[], offset=0, limit=0, q=[], field_selector=[]}).
 
 -define(TIMEOUT, 5000).
 

File src/emongo.erl

 -export([start_link/0, init/1, handle_call/3, handle_cast/2, 
 		 handle_info/2, terminate/2, code_change/3]).
 
--export([pools/0, add_pool/5, find/3, find/4, find_one/3, find_one/4, 
-		 insert/3]).
+-export([pools/0, add_pool/5, find/2, find/3, find/4, 
+		 find_one/3, find_one/4, insert/3]).
 
 -include("emongo.hrl").
 
 
 %%use_db(PoolId) -> ok.
 
+find(PoolId, Collection) ->
+	find(PoolId, Collection, [], ?TIMEOUT).
+
 find(PoolId, Collection, Document) ->
 	find(PoolId, Collection, Document, ?TIMEOUT).
 	
 	find(PoolId, Collection, #emo_query{q=Document}, Timeout);
 	
 find(PoolId, Collection, {oid, OID}, Timeout) when is_binary(OID) ->
-	find(PoolId, Collection, #emo_query{q=[{"_id", {oid, OID}}], num_to_return=1}, Timeout);
+	find(PoolId, Collection, #emo_query{q=[{"_id", {oid, OID}}], limit=1}, Timeout);
 	
 find(PoolId, Collection, Query, Timeout) when is_record(Query, emo_query) ->
 	{Pid, Pool} = gen_server:call(?MODULE, {pid, PoolId}, infinity),
 	find_one(PoolId, Collection, Document, ?TIMEOUT).
 
 find_one(PoolId, Collection, Document, Timeout) when is_list(Document) ->
-	find(PoolId, Collection, #emo_query{q=Document, num_to_return=1}, Timeout);
+	find(PoolId, Collection, #emo_query{q=Document, limit=1}, Timeout);
 
 find_one(PoolId, Collection, {oid, OID}, Timeout) when is_binary(OID) ->
-	find(PoolId, Collection, #emo_query{q=[{"_id", {oid, OID}}], num_to_return=1}, Timeout).
+	find(PoolId, Collection, #emo_query{q=[{"_id", {oid, OID}}], limit=1}, Timeout).
 
 insert(PoolId, Collection, Document) ->
 	{Pid, Pool} = gen_server:call(?MODULE, {pid, PoolId}, infinity),
 %% Description: Initiates the server
 %%--------------------------------------------------------------------
 init(_) ->
+	process_flag(trap_exit, true),
 	Pools = initialize_pools(),
 	{ok, #state{pools=Pools}}.
 
 					database=Database,
 					size=Size
 				},
-				Pool1 = open_connections(Pool),
+				Pool1 = do_open_connections(Pool),
 				{ok, [{PoolId, Pool1}|Pools]}
 		end,
 	{reply, Result, State#state{pools=Pools1}};
 %%                                       {stop, Reason, State}
 %% Description: Handling all non call/cast messages
 %%--------------------------------------------------------------------
+handle_info({'EXIT', Pid, {PoolId, tcp_closed}}, #state{pools=Pools}=State) ->
+	State1 =
+		case get_pool(PoolId, Pools) of
+			undefined ->
+				State;
+			{Pool, Others} ->
+				Pids1 = lists:delete(Pid, Pool#pool.conn_pids),
+				Pool1 = Pool#pool{conn_pids = Pids1},
+				Pool2 = do_open_connections(Pool1),
+				Pools1 = [{PoolId, Pool2}|Others],
+				State#state{pools=Pools1}
+		end,
+	{noreply, State1};
+	
 handle_info(_Info, State) ->
     {noreply, State}.
 
 					port = proplists:get_value(port, Props, 27017), 
 					database = proplists:get_value(database, Props, "test")
 				},
-				{PoolId, open_connections(Pool)}
+				{PoolId, do_open_connections(Pool)}
 			 end || {PoolId, Props} <- Pools]
 	end.
+		
+do_open_connections(#pool{conn_pids=Pids, size=Size}=Pool) when length(Pids) < Size -> 
+	Pid = emongo_conn:start_link(Pool#pool.id, Pool#pool.host, Pool#pool.port),
+	do_open_connections(Pool#pool{conn_pids = [Pid|Pids]});
 	
-open_connections(Pool) ->
-	ConnPids = [begin
-		emongo_conn:start_link(Pool#pool.host, Pool#pool.port)
-	 end || _ <- lists:seq(1, Pool#pool.size)],
-	Pool#pool{conn_pids=ConnPids}.
+do_open_connections(Pool) -> 
+	Pool.
 
 get_conn_pid(#pool{conn_pids=[Pid|_]}) -> Pid.
 

File src/emongo_bson.erl

 -export([encode/1, decode/1]).
 -compile(export_all).
 
-encode(undefined) ->
-	<<>>;
-	
 encode([]) ->
-	<<>>;
+	<<5,0,0,0,0>>;
 	
 encode([{_,_}|_]=List) when is_list(List) ->
 	Bin = iolist_to_binary([encode_key_value(Key, Val) || {Key, Val} <- List]),
 	Key1 = encode_key(Key),
 	<<10, Key1/binary, 0>>;
 	
+%% REGEX
+encode_key_value(Key, {regexp, Regexp, Options}) ->
+	Key1 = encode_key(Key),
+	RegexpBin = unicode:characters_to_binary(Regexp),
+	OptionsBin = unicode:characters_to_binary(Options),
+	<<11, Key1/binary, 0, RegexpBin/binary, 0, OptionsBin/binary, 0>>;
+	
 % INT
 encode_key_value(Key, Val) when is_integer(Val) ->
 	Key1 = encode_key(Key),
 encode_key_value(_, _) ->
 	exit(oh_balls).
 	
+encode_key(Key) when is_binary(Key) ->
+	Key;
+	
 encode_key(Key) when is_atom(Key) ->
 	atom_to_binary(Key, utf8);
 	
 encode_key(Key) when is_integer(Key) ->
 	encode_key(integer_to_list(Key)).
 
-decode(<<Size:32/little-signed, Rest/binary>>) ->
-	Size1 = Size-5,
-	<<Bin:Size1/binary, _/binary>> = Rest,
+decode(Bin) ->
 	decode(Bin, []).
 	
 decode(<<>>, Acc) ->
 	lists:reverse(Acc);
 	
-decode(<<Type:8/little-signed, Tail1/binary>>, Acc) ->
+decode(<<Size:32/little-signed, Rest/binary>>, Acc) ->
+	Size1 = Size-5,
+	<<Bin:Size1/binary, 0:8, Tail/binary>> = Rest,
+	decode(Tail, [decode_document(Bin, [])|Acc]).
+	
+decode_document(<<>>, Acc) ->
+	lists:reverse(Acc);
+	
+decode_document(<<Type:8/little-signed, Tail1/binary>>, Acc) ->
 	{Key, Tail2} = decode_key(Tail1, <<>>),
 	{Val, Tail3} = decode_value(Type, Tail2),
-	decode(Tail3, [{Key, Val}|Acc]).
+	decode_document(Tail3, [{Key, Val}|Acc]).
 
 decode_key(<<0, Tail/binary>>, Acc) ->
 	{Acc, Tail};

File src/emongo_conn.erl

 %% OTHER DEALINGS IN THE SOFTWARE.
 -module(emongo_conn).
 
--export([start_link/2, init/3, send/3, send_recv/4]).
+-export([start_link/3, init/4, send/3, send_recv/4]).
 
 -record(request, {req_id, requestor}).
--record(state, {socket, requests}).
+-record(state, {pool_id, socket, requests}).
 
 -include("emongo.hrl").
 
-start_link(Host, Port) ->
-	proc_lib:start_link(?MODULE, init, [Host, Port, self()]).
+start_link(PoolId, Host, Port) ->
+	proc_lib:start_link(?MODULE, init, [PoolId, Host, Port, self()]).
 	
-init(Host, Port, Parent) ->
+init(PoolId, Host, Port, Parent) ->
 	Socket = open_socket(Host, Port),
 	proc_lib:init_ack(Parent, self()),
-	loop(#state{socket=Socket, requests=[]}).
+	loop(#state{pool_id=PoolId, socket=Socket, requests=[]}).
 	
 send(Pid, ReqID, Packet) ->
 	case gen:call(Pid, '$emongo_conn_send', {ReqID, Packet}) of
 			State1 = State#state{requests=[{ReqID, Request}|State#state.requests]},
 			loop(State1);
 		{tcp, _Sock, Data} ->
+			io:format("recv'd ~p~n", [Data]),
 			Resp = emongo_packet:decode_response(Data),
 			ResponseTo = (Resp#response.header)#header.response_to,
-			case proplists:get_value(ResponseTo, State#state.requests) of
-				undefined ->
-					ok;
-				Request ->
-					gen:reply(Request#request.requestor, Resp)
-			end,
-			loop(State)
+			case find_request(ResponseTo, State#state.requests, []) of
+				{undefined, _} ->
+					exit({unexpected_response, Resp});
+				{Request, Others} ->
+					gen:reply(Request#request.requestor, Resp),
+					loop(State#state{requests=Others})
+			end;
+		{tcp_closed, _Sock} ->
+			exit({State#state.pool_id, tcp_closed});
+		{tcp_error, _Sock, Reason} ->
+			exit({State#state.pool_id, Reason})
 	end.
 	
 open_socket(Host, Port) ->
 			Sock;
 		{error, Reason} ->
 			exit({failed_to_open_socket, Reason})
-	end.
+	end.
+	
+find_request(RequestID, [{RequestID, Request}|Tail], OtherReqs) ->
+	{Request, lists:append(OtherReqs, Tail)};
+
+find_request(RequestID, [Request|Tail], OtherReqs) ->
+	find_request(RequestID, Tail, [Request|OtherReqs]);
+	
+find_request(_RequestID, [], OtherReqs) ->
+	{undefined, OtherReqs}.

File src/emongo_packet.erl

 		true -> emongo_bson:encode(Query#emo_query.field_selector) 
 	end,
 	Message = <<OptsSum:32/little-signed, FullName/binary, 0:8,
-				(Query#emo_query.num_to_skip):32/little-signed, 
-				(Query#emo_query.num_to_return):32/little-signed, 
+				(Query#emo_query.offset):32/little-signed, 
+				(Query#emo_query.limit):32/little-signed, 
 				EncodedDocument/binary, EncodedFieldSelector/binary>>,
 	Length = byte_size(Message),
     <<(Length+16):32/little-signed, ReqID:32/little-signed, 0:32, ?OP_QUERY:32/little-signed, Message/binary>>.
 		header = {header, Length, ReqID, RespTo, Op}, 
 		response_flag = RespFlag, 
 		cursor_id = CursorID, 
-		starting_from = StartingFrom, 
-		number_returned = NumRet, 
+		offset = StartingFrom, 
+		limit = NumRet, 
 		documents = if Documents == <<>> -> []; true -> emongo_bson:decode(Documents) end
 	}.