Commits

Dmitry Belyaev  committed 5fc4740

find_all uses the same pool as first find call
pool deactivation logging

  • Participants
  • Parent commits 7201af1

Comments (0)

Files changed (5)

File include/emongo.hrl

 -record(header, {message_length, request_id, response_to, op_code}).
--record(response, {header, response_flag, cursor_id, offset, limit, documents}).
+-record(response, {
+          header,
+          response_flag,
+          cursor_id,
+          offset,
+          limit,
+          documents,
+          pool_id
+         }).
 -record(emo_query, {opts=[], offset=0, limit=0, q=[], field_selector=[]}).
 
 -define(IS_DOCUMENT(Doc), (is_list(Doc) andalso (Doc == [] orelse (is_tuple(hd(Doc)) andalso tuple_size(hd(Doc)) == 2)))).

File src/emongo.erl

 %%====================================================================
 %% pool_id() = atom()
 %% collection() = string()
-%% response() = {response, header, response_flag, cursor_id, offset, limit, documents}
 %% documents() = [document()]
 %% document() = [{term(), term()}]
 
 	find(PoolId, Collection, [], [{timeout, ?TIMEOUT}]).
 
 find(PoolId, Collection, Selector) when ?IS_DOCUMENT(Selector) ->
-	find(PoolId, Collection, Selector, [{timeout, ?TIMEOUT}]);
-
-%% this function has been deprecated
-find(PoolId, Collection, Query) when is_record(Query, emo_query) ->
-	{Pid, Database, ReqId} = get_pid_pool(PoolId),
-	Packet = emongo_packet:do_query(Database, Collection, ReqId, Query),
-	emongo_server:send_recv(Pid, ReqId, Packet, ?TIMEOUT).
+	find(PoolId, Collection, Selector, [{timeout, ?TIMEOUT}]).
 
 %% @spec find(PoolId, Collection, Selector, Options) -> Result
 %%		 PoolId = atom()
 %%		 Field = string() | binary() | atom() | integer() = specifies a field to return in the result set
 %%		 response_options = return {response, header, response_flag, cursor_id, offset, limit, documents}
 %%		 Result = documents() | response()
-find(PoolId, Collection, Selector, Options) when ?IS_DOCUMENT(Selector), is_list(Options) ->
+find(PoolId, Collection, Selector, Options) ->
 	{Pid, Database, ReqId} = get_pid_pool(PoolId),
 	Query = create_query(Options, Selector),
 	Packet = emongo_packet:do_query(Database, Collection, ReqId, Query),
 	end;
 
 find_all(PoolId, Collection, Selector, Options, Resp) when is_record(Resp, response) ->
-	Resp1 = get_more(PoolId, Collection, Resp#response.cursor_id, proplists:get_value(timeout, Options, ?TIMEOUT)),
+	Resp1 = get_more(Resp#response.pool_id, Collection, Resp#response.cursor_id, proplists:get_value(timeout, Options, ?TIMEOUT)),
 	Documents = lists:append(Resp#response.documents, Resp1#response.documents),
 	find_all(PoolId, Collection, Selector, Options, Resp1#response{documents=Documents}).
 

File src/emongo_pool.erl

 
 -include("emongo.hrl").
 
--define(POLL_TIME, 10000).
+-define(POLL_INTERVAL, 10000).
+-define(POLL_TIMEOUT, 500).
 
 -record(pool, {id,
                host,
     {noreply, State#pool{conn_pid = Pids1, active=false}};
 
 handle_info(poll, State) ->
-    erlang:send_after(?POLL_TIME, self(), poll),
+    erlang:send_after(?POLL_INTERVAL, self(), poll),
     NewState = do_open_connections(State),
     {noreply, NewState};
 
     case get_pid(Pool) of
         {{Pid, Database, ReqId}, NewPool} ->
             PacketLast = emongo_packet:get_last_error(Database, ReqId),
-            case catch emongo_server:send_recv(Pid, ReqId, PacketLast, 500) of
-                {'EXIT', _} ->
+            case catch emongo_server:send_recv(Pid, ReqId, PacketLast, ?POLL_TIMEOUT) of
+                {'EXIT', Reason} ->
+                    error_logger:error_msg("Pool ~p deactivated: ~p~n'", [Pool#pool.id, Reason]),
                     NewPool#pool{active=false};
                 _ ->
                     NewPool#pool{active=true}

File src/emongo_router.erl

 %% Description: Handling call messagesp
 %%--------------------------------------------------------------------
 handle_call(pid, From, #state{id=BalId, active=Active, passive=Passive, timer=Timer}=State) ->
-    case Active of
-        [PoolIdx | Active2] ->
-            case emongo_sup:worker_pid(?POOL_ID(BalId, PoolIdx)) of
-                undefined ->
-                    handle_call(pid, From,
-                                State#state{active=Active2,
-                                            passive=[PoolIdx | Passive],
-                                            timer=set_timer(Timer)
-                                           });
-                Pid ->
-                    {reply, Pid, State}
-            end;
-        [] ->
-            {reply, undefined, State}
-    end;
+    {Pid, NewState} = get_pid(State, emongo_sup:pools()),
+    {reply, Pid, NewState};
 
 handle_call(stop_children, _, #state{id=BalId, active=Active, passive=Passive}=State) ->
     Fun = fun(PoolIdx) ->
 %%% Internal functions
 %%--------------------------------------------------------------------
 
+get_pid(#state{id=BalId, active=Active, passive=Passive, timer=Timer}=State, Pools) ->
+    case Active of
+        [PoolIdx | Active2] ->
+            case emongo_sup:worker_pid(?POOL_ID(BalId, PoolIdx), Pools) of
+                undefined ->
+                    error_logger:info_msg("pool ~p is disabled!~n", [?POOL_ID(BalId, PoolIdx)]),
+                    
+                    get_pid(State#state{active=Active2,
+                                        passive=[PoolIdx | Passive],
+                                        timer=set_timer(Timer)
+                                       }, Pools);
+                Pid ->
+                    {Pid, State}
+            end;
+        [] ->
+            {undefined, State}
+    end.
+
+
 set_timer(undefined) ->
     erlang:send_after(?RECHECK_TIME, self(), recheck);
 set_timer(TimerRef) ->
         undefined ->
             activate(State#state{passive=Passive}, [PoolIdx | Acc]);
         _ ->
+            error_logger:info_msg("pool ~p is enabled!~n", [?POOL_ID(BalId, PoolIdx)]),
             activate(State#state{active=lists:umerge([PoolIdx], Active), passive=Passive}, Acc)
     end.

File src/emongo_server.erl

 				false ->
 					exit({unexpected_response, Resp});
 				{value, {ResponseTo, Request}, Requests} ->
-					gen_server:reply(Request#request.requestor, {ok, Resp}),
+					gen_server:reply(Request#request.requestor,
+                                                         {ok, Resp#response{pool_id=State#state.pool_id}}),
 					{noreply, State#state{requests=Requests, leftover=Tail}}
 			end
 	end;