Commits

Anonymous committed 0038a87

adding api functions

Comments (0)

Files changed (4)

include/emongo.hrl

 -record(response, {header, response_flag, cursor_id, offset, limit, documents}).
 -record(emo_query, {opts=[], offset=0, limit=0, q=[], field_selector=[]}).
 
+-define(IS_DOCUMENT(Doc), (is_list(Doc) andalso (Doc == [] orelse (is_tuple(hd(Doc)) andalso tuple_size(hd(Doc)) == 2)))).
+-define(IS_LIST_OF_DOCUMENTS(Docs), (
+	is_list(Docs) andalso (
+		Docs == [] orelse (
+			is_list(hd(Docs)) andalso (
+				hd(Docs) == [] orelse (
+					is_tuple(hd(hd(Docs))) andalso 
+					tuple_size(hd(hd(Docs))) == 2
+				)
+			)
+		)
+	))).
+
 -define(TIMEOUT, 5000).
 
 -define(OP_REPLY, 1).
 -export([start_link/0, init/1, handle_call/3, handle_cast/2, 
 		 handle_info/2, terminate/2, code_change/3]).
 
--export([pools/0, add_pool/5, find/2, find/3, find/4, 
-		 find_one/3, find_one/4, insert/3]).
+-export([pools/0, add_pool/5, find/2, find/3, find/4,
+		 find_all/2, find_all/3, find_all/4, get_more/4,
+		 get_more/5, find_one/3, find_one/4, kill_cursors/2,
+		 insert/3, update/4, update/5, delete/2, delete/3,
+		 count/2]).
 
 -include("emongo.hrl").
 
 
 %%use_db(PoolId) -> ok.
 
+%%------------------------------------------------------------------------------
+%% find
+%%------------------------------------------------------------------------------
 find(PoolId, Collection) ->
 	find(PoolId, Collection, [], ?TIMEOUT).
 
 find(PoolId, Collection, Document) ->
 	find(PoolId, Collection, Document, ?TIMEOUT).
 	
-find(PoolId, Collection, Document, Timeout) when is_list(Document) ->
+find(PoolId, Collection, Document, Timeout) when ?IS_DOCUMENT(Document) ->
 	find(PoolId, Collection, #emo_query{q=Document}, Timeout);
 	
 find(PoolId, Collection, {oid, OID}, Timeout) when is_binary(OID) ->
 find(PoolId, Collection, Query, Timeout) when is_record(Query, emo_query) ->
 	{Pid, Pool} = gen_server:call(?MODULE, {pid, PoolId}, infinity),
 	Packet = emongo_packet:do_query(Pool#pool.database, Collection, Pool#pool.req_id, Query),
-	io:format("packet ~p~n", [Packet]),
 	emongo_conn:send_recv(Pid, Pool#pool.req_id, Packet, Timeout).
+
+%%------------------------------------------------------------------------------
+%% find_all
+%%------------------------------------------------------------------------------
+find_all(PoolId, Collection) ->
+	find_all(PoolId, Collection, [], ?TIMEOUT).
+
+find_all(PoolId, Collection, Document) ->
+	find_all(PoolId, Collection, Document, ?TIMEOUT).
+
+find_all(PoolId, Collection, Document, Timeout) when ?IS_DOCUMENT(Document) ->
+	find_all(PoolId, Collection, #emo_query{q=Document}, Timeout);
+
+find_all(PoolId, Collection, Query, Timeout) when is_record(Query, emo_query) ->
+	Resp = find(PoolId, Collection, Query, Timeout),
+	find_all(PoolId, Collection, Resp, Timeout);
 	
-find_one(PoolId, Collection, Document) ->
+find_all(_PoolId, _Collection, Resp, _Timeout) when is_record(Resp, response), Resp#response.cursor_id == 0 ->
+	Resp;
+	
+find_all(PoolId, Collection, Resp, Timeout) when is_record(Resp, response) ->
+	Resp1 = get_more(PoolId, Collection, Resp#response.cursor_id, Timeout),
+	Documents = lists:append(Resp#response.documents, Resp1#response.documents),
+	find_all(PoolId, Collection, Resp1#response{documents=Documents}, Timeout).
+
+%%------------------------------------------------------------------------------
+%% find_one
+%%------------------------------------------------------------------------------
+find_one(PoolId, Collection, Document) when ?IS_DOCUMENT(Document) ->
 	find_one(PoolId, Collection, Document, ?TIMEOUT).
 
-find_one(PoolId, Collection, Document, Timeout) when is_list(Document) ->
+find_one(PoolId, Collection, Document, Timeout) when ?IS_DOCUMENT(Document) ->
 	find(PoolId, Collection, #emo_query{q=Document, limit=1}, Timeout);
 
 find_one(PoolId, Collection, {oid, OID}, Timeout) when is_binary(OID) ->
 	find(PoolId, Collection, #emo_query{q=[{"_id", {oid, OID}}], limit=1}, Timeout).
 
-insert(PoolId, Collection, Document) ->
+%%------------------------------------------------------------------------------
+%% get_more
+%%------------------------------------------------------------------------------
+get_more(PoolId, Collection, CursorID, Timeout) ->
+	get_more(PoolId, Collection, CursorID, 0, Timeout).
+	
+get_more(PoolId, Collection, CursorID, NumToReturn, Timeout) ->
 	{Pid, Pool} = gen_server:call(?MODULE, {pid, PoolId}, infinity),
-	Packet = emongo_packet:insert(Pool#pool.database, Collection, Pool#pool.req_id, Document),
+	Packet = emongo_packet:get_more(Pool#pool.database, Collection, Pool#pool.req_id, NumToReturn, CursorID),
+	emongo_conn:send_recv(Pid, Pool#pool.req_id, Packet, Timeout).
+	
+kill_cursors(PoolId, CursorID) when is_integer(CursorID) ->
+	kill_cursors(PoolId, [CursorID]);
+	
+kill_cursors(PoolId, CursorIDs) when is_list(CursorIDs) ->
+	{Pid, Pool} = gen_server:call(?MODULE, {pid, PoolId}, infinity),
+	Packet = emongo_packet:kill_cursors(Pool#pool.req_id, CursorIDs),
+	emongo_conn:send(Pid, Pool#pool.req_id, Packet).
+	
+%%------------------------------------------------------------------------------
+%% insert
+%%------------------------------------------------------------------------------
+insert(PoolId, Collection, Document) when ?IS_DOCUMENT(Document) ->
+	insert(PoolId, Collection, [Document]);
+	
+insert(PoolId, Collection, Documents) when ?IS_LIST_OF_DOCUMENTS(Documents) ->
+	{Pid, Pool} = gen_server:call(?MODULE, {pid, PoolId}, infinity),
+	Packet = emongo_packet:insert(Pool#pool.database, Collection, Pool#pool.req_id, Documents),
 	emongo_conn:send(Pid, Pool#pool.req_id, Packet).
 
-%%update
+%%------------------------------------------------------------------------------
+%% update
+%%------------------------------------------------------------------------------
+update(PoolId, Collection, Selector, Document) when ?IS_DOCUMENT(Selector), ?IS_DOCUMENT(Document) ->
+	update(PoolId, Collection, Selector, Document, false).
+	
+update(PoolId, Collection, Selector, Document, Upsert) when ?IS_DOCUMENT(Selector), ?IS_DOCUMENT(Document) ->
+	{Pid, Pool} = gen_server:call(?MODULE, {pid, PoolId}, infinity),
+	Packet = emongo_packet:update(Pool#pool.database, Collection, Pool#pool.req_id, Upsert, Selector, Document),
+	emongo_conn:send(Pid, Pool#pool.req_id, Packet).
 
-%save(PoolId, {obj, Props}) -> ok.
-
-%remove(PoolId, {obj, Props}) -> ok.
+%%------------------------------------------------------------------------------
+%% delete
+%%------------------------------------------------------------------------------
+delete(PoolId, Collection) ->
+	delete(PoolId, Collection, []).
+	
+delete(PoolId, Collection, Selector) ->
+	{Pid, Pool} = gen_server:call(?MODULE, {pid, PoolId}, infinity),
+	Packet = emongo_packet:delete(Pool#pool.database, Collection, Pool#pool.req_id, Selector),
+	emongo_conn:send(Pid, Pool#pool.req_id, Packet).
 
 %%ensure_index
 
-%%count
+count(PoolId, Collection) ->
+	{Pid, Pool} = gen_server:call(?MODULE, {pid, PoolId}, infinity),
+	Query = #emo_query{q=[{<<"count">>, Collection}, {<<"ns">>, Pool#pool.database}], limit=1},
+	Packet = emongo_packet:do_query(Pool#pool.database, "$cmd", Pool#pool.req_id, Query),
+	case emongo_conn:send_recv(Pid, Pool#pool.req_id, Packet, ?TIMEOUT) of
+		#response{documents=[[{<<"n">>,Count}|_]]} ->
+			round(Count);
+		_ ->
+			undefined
+	end.
 
 %drop_collection(PoolId, Collection) when is_atom(PoolId), is_list(Collection) ->
 
 %% Description: Handling all non call/cast messages
 %%--------------------------------------------------------------------
 handle_info({'EXIT', Pid, {PoolId, tcp_closed}}, #state{pools=Pools}=State) ->
+	io:format("EXIT ~p, {~p, tcp_closed}~n", [Pid, PoolId]),
 	State1 =
 		case get_pool(PoolId, Pools) of
 			undefined ->
 				State;
 			{Pool, Others} ->
+				io:format("delete ~p from ~p~n", [Pid, Pool#pool.conn_pids]),
 				Pids1 = lists:delete(Pid, Pool#pool.conn_pids),
+				io:format("new pids ~p~n", [Pids1]),
 				Pool1 = Pool#pool{conn_pids = Pids1},
 				Pool2 = do_open_connections(Pool1),
 				Pools1 = [{PoolId, Pool2}|Others],
+				io:format("new pool ~p~n", [Pools1]),
 				State#state{pools=Pools1}
 		end,
 	{noreply, State1};

src/emongo_conn.erl

 init(PoolId, Host, Port, Parent) ->
 	Socket = open_socket(Host, Port),
 	proc_lib:init_ack(Parent, self()),
-	loop(#state{pool_id=PoolId, socket=Socket, requests=[]}).
+	loop(#state{pool_id=PoolId, socket=Socket, requests=[]}, <<>>).
 	
 send(Pid, ReqID, Packet) ->
 	case gen:call(Pid, '$emongo_conn_send', {ReqID, Packet}) of
 	
 send_recv(Pid, ReqID, Packet, Timeout) ->
 	case gen:call(Pid, '$emongo_conn_send_recv', {ReqID, Packet}, Timeout) of
-		{ok, Result} -> Result;
+		{ok, Resp} ->
+			Documents = if 
+				Resp#response.documents == <<>> -> []; 
+				true -> emongo_bson:decode(Resp#response.documents) 
+			end,
+			Resp#response{documents=Documents};
 		{error, Reason} -> exit(Reason)
 	end.
 	
-loop(State) ->
+loop(State, Leftover) ->
 	receive
 		{'$emongo_conn_send', {From, Mref}, {_ReqID, Packet}} ->
 			gen_tcp:send(State#state.socket, Packet),
 			gen:reply({From, Mref}, ok),
-			loop(State);
+			loop(State, Leftover);
 		{'$emongo_conn_send_recv', {From, Mref}, {ReqID, Packet}} -> 
 			gen_tcp:send(State#state.socket, Packet),
 			Request = #request{req_id=ReqID, requestor={From, Mref}},
 			State1 = State#state{requests=[{ReqID, Request}|State#state.requests]},
-			loop(State1);
+			loop(State1, Leftover);
 		{tcp, _Sock, Data} ->
 			io:format("recv'd ~p~n", [Data]),
-			Resp = emongo_packet:decode_response(Data),
-			ResponseTo = (Resp#response.header)#header.response_to,
-			case find_request(ResponseTo, State#state.requests, []) of
-				{undefined, _} ->
-					exit({unexpected_response, Resp});
-				{Request, Others} ->
-					gen:reply(Request#request.requestor, Resp),
-					loop(State#state{requests=Others})
+			case emongo_packet:decode_response(<<Leftover/binary, Data/binary>>) of
+				undefined ->
+					loop(State, <<Leftover/binary, Data/binary>>);
+				{Resp, Tail} ->
+					ResponseTo = (Resp#response.header)#header.response_to,
+					case find_request(ResponseTo, State#state.requests, []) of
+						{undefined, _} ->
+							exit({unexpected_response, Resp});
+						{Request, Others} ->
+							gen:reply(Request#request.requestor, Resp),
+							loop(State#state{requests=Others}, Tail)
+					end
 			end;
 		{tcp_closed, _Sock} ->
 			exit({State#state.pool_id, tcp_closed});

src/emongo_packet.erl

 	Length = byte_size(Message),
     <<(Length+16):32/little-signed, ReqID:32/little-signed, 0:32, ?OP_UPDATE:32/little-signed, Message/binary>>.
 
-insert(Database, Collection, ReqID, Document) ->
+insert(Database, Collection, ReqID, Documents) ->
 	FullName = unicode:characters_to_binary([Database, ".", Collection]),
-	EncodedDocument = emongo_bson:encode(Document),
+	EncodedDocument = iolist_to_binary([emongo_bson:encode(Document) || Document <- Documents]),
 	Message = <<0:32, FullName/binary, 0, EncodedDocument/binary>>,
 	Length = byte_size(Message),
     <<(Length+16):32/little-signed, ReqID:32/little-signed, 0:32, ?OP_INSERT:32/little-signed, Message/binary>>.
     <<(Length+16):32/little-signed, ReqID:32/little-signed, 0:32, ?OP_MSG:32/little-signed, Message/binary>>.
 	
 decode_response(<<Length:32/little-signed, ReqID:32/little-signed, RespTo:32/little-signed, Op:32/little-signed, Message/binary>>) ->
-	<<RespFlag:32/little-signed, 
-	  CursorID:64/little-signed, 
-	  StartingFrom:32/little-signed, 
-	  NumRet:32/little-signed, 
-	  Documents/binary>> = Message,
-	#response{
-		header = {header, Length, ReqID, RespTo, Op}, 
-		response_flag = RespFlag, 
-		cursor_id = CursorID, 
-		offset = StartingFrom, 
-		limit = NumRet, 
-		documents = if Documents == <<>> -> []; true -> emongo_bson:decode(Documents) end
-	}.
+	MsgLen = Length - 16,
+	if 
+		byte_size(Message) < MsgLen ->
+			undefined;
+		true ->
+			DocLen = MsgLen - 20,
+			<<RespFlag:32/little-signed, 
+			  CursorID:64/little-signed, 
+			  StartingFrom:32/little-signed, 
+			  NumRet:32/little-signed, 
+			  Documents:DocLen/binary,
+			  Tail/binary>> = Message,
+			Resp = #response{
+				header = {header, Length, ReqID, RespTo, Op}, 
+				response_flag = RespFlag, 
+				cursor_id = CursorID, 
+				offset = StartingFrom, 
+				limit = NumRet, 
+				documents = Documents
+			},
+			{Resp, Tail}
+	end.
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.