Commits

Anonymous committed d2094c1

Revert changeset 53. Create missing workers on demand.

Comments (0)

Files changed (3)

 
 %% this function has been deprecated
 find(PoolId, Collection, Query) when is_record(Query, emo_query) ->
-	{Pid, Pool} = gen_server:call(?MODULE, {pid, PoolId}, infinity),
+	{Pid, Pool} = get_pid_pool(PoolId),
 	Packet = emongo_packet:do_query(Pool#pool.database, Collection, Pool#pool.req_id, Query),
 	emongo_server:send_recv(Pid, Pool#pool.req_id, Packet, ?TIMEOUT).
 
 %%		 response_options = return {response, header, response_flag, cursor_id, offset, limit, documents}
 %%		 Result = documents() | response()
 find(PoolId, Collection, Selector, Options) when ?IS_DOCUMENT(Selector), is_list(Options) ->
-	{Pid, Pool} = gen_server:call(?MODULE, {pid, PoolId}, infinity),
+	{Pid, Pool} = get_pid_pool(PoolId),
 	Query = create_query(Options, Selector),
 	Packet = emongo_packet:do_query(Pool#pool.database, Collection, Pool#pool.req_id, Query),
 	Timeout = proplists:get_value(timeout, Options, ?TIMEOUT),
 	get_more(PoolId, Collection, CursorID, 0, Timeout).
 
 get_more(PoolId, Collection, CursorID, NumToReturn, Timeout) ->
-	{Pid, Pool} = gen_server:call(?MODULE, {pid, PoolId}, infinity),
+	{Pid, Pool} = get_pid_pool(PoolId),
 	Packet = emongo_packet:get_more(Pool#pool.database, Collection, Pool#pool.req_id, NumToReturn, CursorID),
 	emongo_server:send_recv(Pid, Pool#pool.req_id, Packet, Timeout).
 
 	kill_cursors(PoolId, [CursorID]);
 
 kill_cursors(PoolId, CursorIDs) when is_list(CursorIDs) ->
-	{Pid, Pool} = gen_server:call(?MODULE, {pid, PoolId}, infinity),
+	{Pid, Pool} = get_pid_pool(PoolId),
 	Packet = emongo_packet:kill_cursors(Pool#pool.req_id, CursorIDs),
 	emongo_server:send(Pid, Pool#pool.req_id, Packet).
 
 	insert(PoolId, Collection, [Document]);
 
 insert(PoolId, Collection, Documents) when ?IS_LIST_OF_DOCUMENTS(Documents) ->
-	{Pid, Pool} = gen_server:call(?MODULE, {pid, PoolId}, infinity),
+	{Pid, Pool} = get_pid_pool(PoolId),
 	Packet = emongo_packet:insert(Pool#pool.database, Collection, Pool#pool.req_id, Documents),
 	emongo_server:send(Pid, Pool#pool.req_id, Packet).
 
 	update(PoolId, Collection, Selector, Document, false).
 
 update(PoolId, Collection, Selector, Document, Upsert) when ?IS_DOCUMENT(Selector), ?IS_DOCUMENT(Document) ->
-	{Pid, Pool} = gen_server:call(?MODULE, {pid, PoolId}, infinity),
+	{Pid, Pool} = get_pid_pool(PoolId),
 	Packet = emongo_packet:update(Pool#pool.database, Collection, Pool#pool.req_id, Upsert, Selector, Document),
 	emongo_server:send(Pid, Pool#pool.req_id, Packet).
 
 	delete(PoolId, Collection, []).
 
 delete(PoolId, Collection, Selector) ->
-	{Pid, Pool} = gen_server:call(?MODULE, {pid, PoolId}, infinity),
+	{Pid, Pool} = get_pid_pool(PoolId),
 	Packet = emongo_packet:delete(Pool#pool.database, Collection, Pool#pool.req_id, transform_selector(Selector)),
 	emongo_server:send(Pid, Pool#pool.req_id, Packet).
 
 %% ensure index
 %%------------------------------------------------------------------------------
 ensure_index(PoolId, Collection, Keys) when ?IS_DOCUMENT(Keys)->
-	{Pid, Pool} = gen_server:call(?MODULE, {pid, PoolId}, infinity),
+	{Pid, Pool} = get_pid_pool(PoolId),
 	Packet = emongo_packet:ensure_index(Pool#pool.database, Collection, Pool#pool.req_id, Keys),
 	emongo_server:send(Pid, Pool#pool.req_id, Packet).
 
 count(PoolId, Collection) ->
-	{Pid, Pool} = gen_server:call(?MODULE, {pid, PoolId}, infinity),
+	{Pid, Pool} = get_pid_pool(PoolId),
 	Query = #emo_query{q=[{<<"count">>, Collection}, {<<"ns">>, Pool#pool.database}], limit=1},
 	Packet = emongo_packet:do_query(Pool#pool.database, "$cmd", Pool#pool.req_id, Query),
 	case emongo_server:send_recv(Pid, Pool#pool.req_id, Packet, ?TIMEOUT) of
 				size=Size
 			},
 
-			{ok, _SupPid} = emongo_sup:start_pool(PoolId, Host, Port, Size),
+			{ok, PoolPid} = emongo_sup:start_pool(PoolId, Host, Port),
+			emongo_server_sup:start_children(PoolPid, Size, undefined),
 			{reply, ok, State#state{pools=[{PoolId, Pool}|Pools]}}
 	end;
 
 %%                                       {stop, Reason, State}
 %% Description: Handling all non call/cast messages
 %%--------------------------------------------------------------------
-%handle_info({'EXIT', Pid, {PoolId, tcp_closed}}, #state{pools=Pools}=State) ->
-	%io:format("EXIT ~p, {~p, tcp_closed}~n", [Pid, PoolId]),
-	%State1 =
-		%case get_pool(PoolId, Pools) of
-			%undefined ->
-				%State;
-			%{Pool, Others} ->
-				%Pids1 = queue:filter(fun(Item) -> Item =/= Pid end, Pool#pool.conn_pids),
-				%Pool1 = Pool#pool{conn_pids = Pids1},
-				%Pool2 = do_open_connections(Pool1),
-				%Pools1 = [{PoolId, Pool2}|Others],
-				%State#state{pools=Pools1}
-		%end,
-	%{noreply, State1};
-
 handle_info(_Info, State) ->
     {noreply, State}.
 
 %%--------------------------------------------------------------------
 %%% Internal functions
 %%--------------------------------------------------------------------
+get_pid_pool(PoolId) ->
+    {PidOrErr, Pool} = gen_server:call(?MODULE, {pid, PoolId}, infinity),
+    {ok, Pid} = PidOrErr,
+    {Pid, Pool}.
 
 dec2hex(Dec) ->
 	dec2hex(<<>>, Dec).

src/emongo_server_sup.erl

-%% one_for_one supervisor for emongo_server instances
+%% simple_one_for_one supervisor for emongo_server instances
 %% there should be one emongo_server_sup instance for each pool, that then
 %% supervises emongo_server instances based on the size of pool
 -module(emongo_server_sup).
 
 -behaviour(supervisor).
 
--export([start_link/4, child_count/1, start_child/1, nth_child_pid/2, init/1]).
+-export([start_link/3, child_count/1, start_children/3, nth_child_pid/2, init/1]).
 
 %%%%%%%%%%%%%%%%
 %% public api %%
 %%%%%%%%%%%%%%%%
 
-start_link(PoolId, Host, Port, Size) ->
-    supervisor:start_link(?MODULE, [PoolId, Host, Port, Size]).
+start_link(PoolId, Host, Port) ->
+	supervisor:start_link(?MODULE, [PoolId, Host, Port]).
 
 child_count(PoolId) ->
     length(supervisor:which_children(pool_pid(PoolId))).
 
-start_child(PoolId) ->
-    supervisor:start_child(pool_pid(PoolId), []).
+start_children(_PoolPid, 0, LastPid) ->
+    {ok, LastPid};
+start_children(PoolPid, Count, _PrevPid) ->
+    case supervisor:start_child(PoolPid, []) of
+        {ok, Pid} ->
+            start_children(PoolPid, Count - 1, Pid);
+        Error ->
+            Error
+    end.
 
 nth_child_pid(PoolId, N) ->
-	Children = supervisor:which_children(pool_pid(PoolId)),
-	
-	if
-		N > length(Children) ->
-			throw(badarg);
-		true ->
-			{_, Pid, worker, [emongo_server]} = lists:nth(N, Children),
-			Pid
-	end.
+    PoolPid = pool_pid(PoolId),
+    Children = supervisor:which_children(PoolPid),
+    Missing = N - length(Children),
+    
+    if
+        Missing > 0 ->
+            start_children(PoolPid, Missing, undefined);
+        true ->
+            {undefined, Pid, worker, [emongo_server]} = lists:nth(N, Children),
+            {ok, Pid}
+    end.
 
 pool_pid(PoolId) ->
     [PoolPid] = [Pid || {Id, Pid, supervisor, _} <- supervisor:which_children(emongo_sup),
 %% supervisor callbacks %%
 %%%%%%%%%%%%%%%%%%%%%%%%%%
 
-init([PoolId, Host, Port, Size]) ->
-    {ok, {{one_for_one, Size * 10, 10},
-          [
-           {Id,
-            {emongo_server, start_link, [PoolId, Host, Port]},
-            permanent, brutal_kill, worker, [emongo_server]}
-           || Id <- lists:seq(1, Size)
-          ]}}.
+init([PoolId, Host, Port]) ->
+	{ok, {{simple_one_for_one, 1000, 10}, [
+		{emongo_server, {emongo_server, start_link, [PoolId, Host, Port]},
+		 permanent, brutal_kill, worker, [emongo_server]}
+	]}}.

src/emongo_sup.erl

 
 -behaviour(supervisor).
 
--export([start_link/0, start_pool/4, init/1]).
+-export([start_link/0, start_pool/3, init/1]).
 
 %%%%%%%%%%%%%%%%
 %% public api %%
 
 start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []).
 
-start_pool(PoolId, Host, Port, Size) ->
+start_pool(PoolId, Host, Port) ->
 	% emongo_server_sup instances are added dynamically, one for each pool
 	supervisor:start_child(?MODULE, {PoolId,
-		{emongo_server_sup, start_link, [PoolId, Host, Port, Size]},
+		{emongo_server_sup, start_link, [PoolId, Host, Port]},
 		permanent, infinity, supervisor, [emongo_server_sup]
 	}).
 
 	{ok, {{one_for_one, 10, 10}, [
 		{emongo, {emongo, start_link, []},
 		 permanent, 5000, worker, [emongo]}
-	]}}.
+	]}}.