Anonymous avatar Anonymous committed 803d66b

ping server (with getlasterror) to detect connection problems
don't use pool state in emongo module

Comments (0)

Files changed (3)

include/emongo.hrl

--record(pool, {id, host, port, database, size, conn_pid=queue:new(), req_id=1}).
 -record(header, {message_length, request_id, response_to, op_code}).
 -record(response, {header, response_flag, cursor_id, offset, limit, documents}).
 -record(emo_query, {opts=[], offset=0, limit=0, q=[], field_selector=[]}).
 		 insert/3, update/4, update/5, delete/2, delete/3,
 		 ensure_index/3, count/2, dec2hex/1, hex2dec/1]).
 
--export([update_sync/5]).
+-export([update_sync/5, delete_sync/3]).
 
 -include("emongo.hrl").
 
 
 %% this function has been deprecated
 find(PoolId, Collection, Query) when is_record(Query, emo_query) ->
-	{Pid, Pool} = get_pid_pool(PoolId),
-	Packet = emongo_packet:do_query(Pool#pool.database, Collection, Pool#pool.req_id, Query),
-	emongo_server:send_recv(Pid, Pool#pool.req_id, Packet, ?TIMEOUT).
+	{Pid, Database, ReqId} = get_pid_pool(PoolId),
+	Packet = emongo_packet:do_query(Database, Collection, ReqId, Query),
+	emongo_server:send_recv(Pid, ReqId, Packet, ?TIMEOUT).
 
 %% @spec find(PoolId, Collection, Selector, Options) -> Result
 %%		 PoolId = atom()
 %%		 response_options = return {response, header, response_flag, cursor_id, offset, limit, documents}
 %%		 Result = documents() | response()
 find(PoolId, Collection, Selector, Options) when ?IS_DOCUMENT(Selector), is_list(Options) ->
-	{Pid, Pool} = get_pid_pool(PoolId),
+	{Pid, Database, ReqId} = get_pid_pool(PoolId),
 	Query = create_query(Options, Selector),
-	Packet = emongo_packet:do_query(Pool#pool.database, Collection, Pool#pool.req_id, Query),
+	Packet = emongo_packet:do_query(Database, Collection, ReqId, Query),
 	Timeout = proplists:get_value(timeout, Options, ?TIMEOUT),
 	% TODO: generalize this for all send_recv calls
-	try emongo_server:send_recv(Pid, Pool#pool.req_id, Packet, Timeout) of
+	try emongo_server:send_recv(Pid, ReqId, Packet, Timeout) of
 		Resp ->
 			case lists:member(response_options, Options) of
 				true -> Resp;
 	get_more(PoolId, Collection, CursorID, 0, Timeout).
 
 get_more(PoolId, Collection, CursorID, NumToReturn, Timeout) ->
-	{Pid, Pool} = get_pid_pool(PoolId),
-	Packet = emongo_packet:get_more(Pool#pool.database, Collection, Pool#pool.req_id, NumToReturn, CursorID),
-	emongo_server:send_recv(Pid, Pool#pool.req_id, Packet, Timeout).
+	{Pid, Database, ReqId} = get_pid_pool(PoolId),
+	Packet = emongo_packet:get_more(Database, Collection, ReqId, NumToReturn, CursorID),
+	emongo_server:send_recv(Pid, ReqId, Packet, Timeout).
 
 kill_cursors(PoolId, CursorID) when is_integer(CursorID) ->
 	kill_cursors(PoolId, [CursorID]);
 
 kill_cursors(PoolId, CursorIDs) when is_list(CursorIDs) ->
-	{Pid, Pool} = get_pid_pool(PoolId),
-	Packet = emongo_packet:kill_cursors(Pool#pool.req_id, CursorIDs),
-	emongo_server:send(Pid, Pool#pool.req_id, Packet).
+	{Pid, _Database, ReqId} = get_pid_pool(PoolId),
+	Packet = emongo_packet:kill_cursors(ReqId, CursorIDs),
+	emongo_server:send(Pid, ReqId, Packet).
 
 %%------------------------------------------------------------------------------
 %% insert
 	insert(PoolId, Collection, [Document]);
 
 insert(PoolId, Collection, Documents) when ?IS_LIST_OF_DOCUMENTS(Documents) ->
-	{Pid, Pool} = get_pid_pool(PoolId),
-	Packet = emongo_packet:insert(Pool#pool.database, Collection, Pool#pool.req_id, Documents),
-	emongo_server:send(Pid, Pool#pool.req_id, Packet).
+	{Pid, Database, ReqId} = get_pid_pool(PoolId),
+	Packet = emongo_packet:insert(Database, Collection, ReqId, Documents),
+	emongo_server:send(Pid, ReqId, Packet).
 
 %%------------------------------------------------------------------------------
 %% update
 	update(PoolId, Collection, Selector, Document, false).
 
 update(PoolId, Collection, Selector, Document, Upsert) when ?IS_DOCUMENT(Selector), ?IS_DOCUMENT(Document) ->
-	{Pid, Pool} = get_pid_pool(PoolId),
-	Packet = emongo_packet:update(Pool#pool.database, Collection, Pool#pool.req_id, Upsert, Selector, Document),
-	emongo_server:send(Pid, Pool#pool.req_id, Packet).
+	{Pid, Database, ReqId} = get_pid_pool(PoolId),
+	Packet = emongo_packet:update(Database, Collection, ReqId, Upsert, Selector, Document),
+	emongo_server:send(Pid, ReqId, Packet).
 
 update_sync(PoolId, Collection, Selector, Document, Upsert) ->
-    {Pid, #pool{database=Database, req_id=ReqId}} = get_pid_pool(PoolId),
-    Packet1 = emongo_packet:update(Database, Collection, ReqId, Upsert, Selector, Document),
-    Packet2 = emongo_packet:get_last_error(Database, ReqId),
-    Resp = emongo_server:send_recv(Pid, ReqId, [Packet1, Packet2], ?TIMEOUT),
-    case lists:keysearch(<<"updatedExisting">>, 1, lists:nth(1, Resp#response.documents)) of
-        false ->
-            undefined;
-        {value, {<<"updatedExisting">>, true}} ->
-            ok;
-        {value, {<<"updatedExisting">>, false}} ->
-            {error, not_updated}
-    end.
+    {Pid, Database, ReqId} = get_pid_pool(PoolId),
+    Packet = emongo_packet:update(Database, Collection, ReqId, Upsert, Selector, Document),
+    do_sync(Packet, Database, ReqId, Pid).
+
+do_sync(Packet, Database, ReqId, Pid) ->
+    PacketGetLastError = emongo_packet:get_last_error(Database, ReqId),
+    emongo_server:send_recv(Pid, ReqId, [Packet, PacketGetLastError], ?TIMEOUT).
+
 
 %%------------------------------------------------------------------------------
 %% delete
 	delete(PoolId, Collection, []).
 
 delete(PoolId, Collection, Selector) ->
-	{Pid, Pool} = get_pid_pool(PoolId),
-	Packet = emongo_packet:delete(Pool#pool.database, Collection, Pool#pool.req_id, transform_selector(Selector)),
-	emongo_server:send(Pid, Pool#pool.req_id, Packet).
+	{Pid, Database, ReqId} = get_pid_pool(PoolId),
+	Packet = emongo_packet:delete(Database, Collection, ReqId, transform_selector(Selector)),
+	emongo_server:send(Pid, ReqId, Packet).
+
+delete_sync(PoolId, Collection, Selector) ->
+    {Pid, Database, ReqId} = get_pid_pool(PoolId),
+    Packet = emongo_packet:delete(Database, Collection, ReqId, transform_selector(Selector)),
+    do_sync(Packet, Database, ReqId, Pid).
+
 
 %%------------------------------------------------------------------------------
 %% ensure index
 %%------------------------------------------------------------------------------
 ensure_index(PoolId, Collection, Keys) when ?IS_DOCUMENT(Keys)->
-	{Pid, Pool} = get_pid_pool(PoolId),
-	Packet = emongo_packet:ensure_index(Pool#pool.database, Collection, Pool#pool.req_id, Keys),
-	emongo_server:send(Pid, Pool#pool.req_id, Packet).
+	{Pid, Database, ReqId} = get_pid_pool(PoolId),
+	Packet = emongo_packet:ensure_index(Database, Collection, ReqId, Keys),
+	emongo_server:send(Pid, ReqId, Packet).
 
 count(PoolId, Collection) ->
-	{Pid, Pool} = get_pid_pool(PoolId),
-	Query = #emo_query{q=[{<<"count">>, Collection}, {<<"ns">>, Pool#pool.database}], limit=1},
-	Packet = emongo_packet:do_query(Pool#pool.database, "$cmd", Pool#pool.req_id, Query),
-	case emongo_server:send_recv(Pid, Pool#pool.req_id, Packet, ?TIMEOUT) of
+	{Pid, Database, ReqId} = get_pid_pool(PoolId),
+	Query = #emo_query{q=[{<<"count">>, Collection}, {<<"ns">>, Database}], limit=1},
+	Packet = emongo_packet:do_query(Database, "$cmd", ReqId, Query),
+	case emongo_server:send_recv(Pid, ReqId, Packet, ?TIMEOUT) of
 		#response{documents=[[{<<"n">>,Count}|_]]} ->
 			round(Count);
 		_ ->

src/emongo_pool.erl

          terminate/2, code_change/3]).
 
 -include("emongo.hrl").
--define(RECONNECT_TIME, 10000).
+
+-define(POLL_TIME, 10000).
+
+-record(pool, {id,
+               host,
+               port,
+               database,
+               size,
+               active=false,
+               conn_pid=queue:new(),
+               req_id=1}).
 
 %%%%%%%%%%%%%%%%
 %% public api %%
 init([PoolId, Host, Port, Database, Size]) ->
     process_flag(trap_exit, true),
     
-    Pool = #pool{id = PoolId,
+    Pool0 = #pool{id = PoolId,
                  host = Host,
                  port = Port,
                  database = unicode:characters_to_binary(Database),
                  size = Size
                 },
-    NewPool = do_open_connections(Pool),
-    {ok, NewPool}.
+    
+    {noreply, Pool} = handle_info(poll, Pool0),
+    {ok, Pool}.
 
 %%--------------------------------------------------------------------
 %% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
 %%                                      {stop, Reason, State}
 %% Description: Handling call messages
 %%--------------------------------------------------------------------
-handle_call(pid, _From, #pool{conn_pid=Pids, req_id=ReqId}=State) ->
-    case queue:out(Pids) of
-        {{value, Pid}, Q2} ->
-            NewState = State#pool{conn_pid=queue:in(Pid, Q2), req_id=(ReqId + 1)},
-            {reply, {Pid, State}, NewState};
-        {empty, _} ->
-            {reply, undefined, State}
-    end;
+handle_call(pid, _From, #pool{active=true}=State) ->
+    {Reply, NewState} = get_pid(State),
+    {reply, Reply, NewState};
 handle_call(_Request, _From, State) ->
     {reply, undefined, State}.
 
             {noreply, State}
     end;
 
-handle_info(reconnect, State) ->
+handle_info(poll, State) ->
+    erlang:send_after(?POLL_TIME, self(), poll),
     NewState = do_open_connections(State),
     {noreply, NewState};
 
 %%% Internal functions
 %%--------------------------------------------------------------------
 
+get_pid(#pool{database=Database, conn_pid=Pids, req_id=ReqId}=State) ->
+    case queue:out(Pids) of
+        {{value, Pid}, Q2} ->
+            NewState = State#pool{conn_pid=queue:in(Pid, Q2), req_id=(ReqId + 1)},
+            {{Pid, Database, ReqId}, NewState};
+        {empty, _} ->
+            {undefined, State}
+    end.
+
 do_open_connections(#pool{conn_pid=Pids, size=Size}=Pool) -> 
     case queue:len(Pids) < Size of
         true ->
             case emongo_server:start_link(Pool#pool.id, Pool#pool.host, Pool#pool.port) of
                 {error, _Reason} ->
-                    erlang:send_after(?RECONNECT_TIME, self(), reconnect),
-                    Pool;
+                    Pool#pool{active=false};
                 {ok, Pid} ->
                     do_open_connections(Pool#pool{conn_pid = queue:in(Pid, Pids)})
             end;
         false ->
-            Pool
+            do_poll(Pool)
     end.
+
+do_poll(Pool) ->
+    case get_pid(Pool) of
+        {{Pid, Database, ReqId}, NewPool} ->
+            PacketLast = emongo_packet:get_last_error(Database, ReqId),
+            case catch emongo_server:send_recv(Pid, ReqId, PacketLast, ?TIMEOUT) of
+                {'EXIT', _} ->
+                    NewPool#pool{active=false};
+                _ ->
+                    NewPool#pool{active=true}
+            end;
+        _ ->
+            Pool#pool{active=false}
+    end.
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.