Commits

Dmitry Belyaev  committed 0a87654

use queue for connections in pool
rename emongo_server_sup to emongo_pool

  • Participants
  • Parent commits d2094c1

Comments (0)

Files changed (5)

File include/emongo.hrl

--record(pool, {id, host, port, database, size=1, conn_pid=1, req_id=1}).
+-record(pool, {id, host, port, database, size, conn_pid=queue:new(), req_id=1}).
 -record(header, {message_length, request_id, response_to, op_code}).
 -record(response, {header, response_flag, cursor_id, offset, limit, documents}).
 -record(emo_query, {opts=[], offset=0, limit=0, q=[], field_selector=[]}).

File src/emongo.erl

 -export([start_link/0, init/1, handle_call/3, handle_cast/2,
 		 handle_info/2, terminate/2, code_change/3]).
 
--export([pools/0, oid/0, add_pool/5, find/2, find/3, find/4,
+-export([pools/0, oid/0, add_pool/5, del_pool/1, find/2, find/3, find/4,
 		 find_all/2, find_all/3, find_all/4, get_more/4,
 		 get_more/5, find_one/3, find_one/4, kill_cursors/2,
 		 insert/3, update/4, update/5, delete/2, delete/3,
 
 -include("emongo.hrl").
 
--record(state, {pools, oid_index, hashed_hostn}).
+-record(state, {oid_index, hashed_hostn}).
 
 %%====================================================================
 %% Types
 	gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
 
 pools() ->
-	gen_server:call(?MODULE, pools, infinity).
+    emongo_sup:pools().
 
 oid() ->
 	gen_server:call(?MODULE, oid, infinity).
 
 add_pool(PoolId, Host, Port, Database, Size) ->
-	gen_server:call(?MODULE, {add_pool, PoolId, Host, Port, Database, Size}, infinity).
+    emongo_sup:start_pool(PoolId, Host, Port, Database, Size).
+
+del_pool(PoolId) ->
+    emongo_sup:stop_pool(PoolId).
 
 %%------------------------------------------------------------------------------
 %% find
 %% Description: Initiates the server
 %%--------------------------------------------------------------------
 init(_) ->
-	process_flag(trap_exit, true),
-	%Pools = initialize_pools(),
 	{ok, HN} = inet:gethostname(),
 	<<HashedHN:3/binary,_/binary>> = erlang:md5(HN),
-	{ok, #state{pools=[], oid_index=1, hashed_hostn=HashedHN}}.
+	{ok, #state{oid_index=1, hashed_hostn=HashedHN}}.
 
 %%--------------------------------------------------------------------
 %% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
 %%                                      {stop, Reason, State}
 %% Description: Handling call messages
 %%--------------------------------------------------------------------
-handle_call(pools, _From, State) ->
-	{reply, State#state.pools, State};
-
 handle_call(oid, _From, State) ->
 	{Total_Wallclock_Time, _} = erlang:statistics(wall_clock),
 	Front = Total_Wallclock_Time rem 16#ffffffff,
 	Index = State#state.oid_index rem 16#ffffff,
 	{reply, <<Front:32, (State#state.hashed_hostn)/binary, PID/binary, Index:24>>, State#state{oid_index = State#state.oid_index + 1}};
 
-handle_call({add_pool, PoolId, Host, Port, Database, Size}, _From, #state{pools=Pools}=State) ->
-	case proplists:is_defined(PoolId, Pools) of
-		true ->
-			{reply, {error, pool_already_exists}, State};
-		false ->
-			Pool = #pool{
-				id=PoolId,
-				host=Host,
-				port=Port,
-				database=Database,
-				size=Size
-			},
-
-			{ok, PoolPid} = emongo_sup:start_pool(PoolId, Host, Port),
-			emongo_server_sup:start_children(PoolPid, Size, undefined),
-			{reply, ok, State#state{pools=[{PoolId, Pool}|Pools]}}
-	end;
-
-handle_call({pid, PoolId}, _From, #state{pools=Pools}=State) ->
-	case lists:keytake(PoolId, 1, Pools) of
-		false ->
-			{reply, {undefined, undefined}, State};
-		{value, {PoolId, Pool}, Others} ->
-			N = Pool#pool.conn_pid,
-			Pid = emongo_server_sup:nth_child_pid(Pool#pool.id, N),
-			
-			if
-				N < Pool#pool.size -> N1 = N + 1;
-				true -> N1 = 1
-			end,
-			
-			Pool1 = Pool#pool{conn_pid=N1, req_id=(Pool#pool.req_id)+1},
-			Pools1 = [{PoolId, Pool1} | Others],
-			{reply, {Pid, Pool}, State#state{pools=Pools1}}
-	end;
-
 handle_call(_, _From, State) -> {reply, {error, invalid_call}, State}.
 
 %%--------------------------------------------------------------------
 %%% Internal functions
 %%--------------------------------------------------------------------
 get_pid_pool(PoolId) ->
-    {PidOrErr, Pool} = gen_server:call(?MODULE, {pid, PoolId}, infinity),
-    {ok, Pid} = PidOrErr,
-    {Pid, Pool}.
+    PoolPid = emongo_sup:pool_pid(PoolId),
+    gen_server:call(PoolPid, pid, infinity).
 
 dec2hex(Dec) ->
 	dec2hex(<<>>, Dec).

File src/emongo_pool.erl

+%%%-------------------------------------------------------------------
+%%% Description : emongo pool supervisor
+%%%-------------------------------------------------------------------
+-module(emongo_pool).
+
+-behaviour(gen_server).
+
+%% API
+-export([start_link/5]).
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+         terminate/2, code_change/3]).
+
+-include("emongo.hrl").
+-define(RECONNECT_TIME, 10000).
+
+%%%%%%%%%%%%%%%%
+%% public api %%
+%%%%%%%%%%%%%%%%
+
+start_link(PoolId, Host, Port, Database, Size) ->
+    gen_server:start_link(?MODULE, [PoolId, Host, Port, Database, Size], []).
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%
+%% gen_server callbacks %%
+%%%%%%%%%%%%%%%%%%%%%%%%%%
+
+%%--------------------------------------------------------------------
+%% Function: init(Args) -> {ok, State} |
+%%                         {ok, State, Timeout} |
+%%                         ignore               |
+%%                         {stop, Reason}
+%% Description: Initiates the server
+%%--------------------------------------------------------------------
+init([PoolId, Host, Port, Database, Size]) ->
+    process_flag(trap_exit, true),
+    
+    Pool = #pool{id = PoolId,
+                 host = Host,
+                 port = Port,
+                 database = Database,
+                 size = Size
+                },
+    NewPool = do_open_connections(Pool),
+    {ok, NewPool}.
+
+%%--------------------------------------------------------------------
+%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
+%%                                      {reply, Reply, State, Timeout} |
+%%                                      {noreply, State} |
+%%                                      {noreply, State, Timeout} |
+%%                                      {stop, Reason, Reply, State} |
+%%                                      {stop, Reason, State}
+%% Description: Handling call messages
+%%--------------------------------------------------------------------
+handle_call(state, _From, State) ->
+    {reply, State, State};
+handle_call(pid, _From, #pool{conn_pid=Pids, req_id=ReqId}=State) ->
+    case queue:out(Pids) of
+        {{value, Pid}, Q2} ->
+            NewState = State#pool{conn_pid=queue:in(Pid, Q2), req_id=(ReqId + 1)},
+            {reply, {Pid, State}, NewState};
+        {empty, _} ->
+            {reply, undefined, State}
+    end;
+handle_call(_Request, _From, State) ->
+    {reply, undefined, State}.
+
+%%--------------------------------------------------------------------
+%% Function: handle_cast(Msg, State) -> {noreply, State} |
+%%                                      {noreply, State, Timeout} |
+%%                                      {stop, Reason, State}
+%% Description: Handling cast messages
+%%--------------------------------------------------------------------
+handle_cast(_Msg, State) ->
+    {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% Function: handle_info(Info, State) -> {noreply, State} |
+%%                                       {noreply, State, Timeout} |
+%%                                       {stop, Reason, State}
+%% Description: Handling all non call/cast messages
+%%--------------------------------------------------------------------
+handle_info({'EXIT', Pid, {PoolId, tcp_closed}}, #pool{conn_pid=Pids}=State) ->
+    Pids1 = queue:filter(fun(Item) -> Item =/= Pid end, Pids),
+    
+    NewState = do_open_connections(State#pool{conn_pid = Pids1}),
+    {noreply, NewState};
+
+handle_info(reconnect, State) ->
+    NewState = do_open_connections(State),
+    {noreply, NewState};
+
+handle_info(_Info, State) ->
+    {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% Function: terminate(Reason, State) -> void()
+%% Description: This function is called by a gen_server when it is about to
+%% terminate. It should be the opposite of Module:init/1 and do any necessary
+%% cleaning up. When it returns, the gen_server terminates with Reason.
+%% The return value is ignored.
+%%--------------------------------------------------------------------
+terminate(_Reason, _State) ->
+    ok.
+
+%%--------------------------------------------------------------------
+%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
+%% Description: Convert process state when code is changed
+%%--------------------------------------------------------------------
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+%%--------------------------------------------------------------------
+%%% Internal functions
+%%--------------------------------------------------------------------
+
+do_open_connections(#pool{conn_pid=Pids, size=Size}=Pool) -> 
+    case queue:len(Pids) < Size of
+        true ->
+            case emongo_server:start_link(Pool#pool.id, Pool#pool.host, Pool#pool.port) of
+                {error, _Reason} ->
+                    erlang:send_after(?RECONNECT_TIME, self(), reconnect),
+                    Pool;
+                {ok, Pid} ->
+                    do_open_connections(Pool#pool{conn_pid = queue:in(Pid, Pids)})
+            end;
+        false ->
+            Pool
+    end.

File src/emongo_server_sup.erl

-%% simple_one_for_one supervisor for emongo_server instances
-%% there should be one emongo_server_sup instance for each pool, that then
-%% supervises emongo_server instances based on the size of pool
--module(emongo_server_sup).
-
--behaviour(supervisor).
-
--export([start_link/3, child_count/1, start_children/3, nth_child_pid/2, init/1]).
-
-%%%%%%%%%%%%%%%%
-%% public api %%
-%%%%%%%%%%%%%%%%
-
-start_link(PoolId, Host, Port) ->
-	supervisor:start_link(?MODULE, [PoolId, Host, Port]).
-
-child_count(PoolId) ->
-    length(supervisor:which_children(pool_pid(PoolId))).
-
-start_children(_PoolPid, 0, LastPid) ->
-    {ok, LastPid};
-start_children(PoolPid, Count, _PrevPid) ->
-    case supervisor:start_child(PoolPid, []) of
-        {ok, Pid} ->
-            start_children(PoolPid, Count - 1, Pid);
-        Error ->
-            Error
-    end.
-
-nth_child_pid(PoolId, N) ->
-    PoolPid = pool_pid(PoolId),
-    Children = supervisor:which_children(PoolPid),
-    Missing = N - length(Children),
-    
-    if
-        Missing > 0 ->
-            start_children(PoolPid, Missing, undefined);
-        true ->
-            {undefined, Pid, worker, [emongo_server]} = lists:nth(N, Children),
-            {ok, Pid}
-    end.
-
-pool_pid(PoolId) ->
-    [PoolPid] = [Pid || {Id, Pid, supervisor, _} <- supervisor:which_children(emongo_sup),
-                        PoolId =:= Id],
-    PoolPid.
-
-%%%%%%%%%%%%%%%%%%%%%%%%%%
-%% supervisor callbacks %%
-%%%%%%%%%%%%%%%%%%%%%%%%%%
-
-init([PoolId, Host, Port]) ->
-	{ok, {{simple_one_for_one, 1000, 10}, [
-		{emongo_server, {emongo_server, start_link, [PoolId, Host, Port]},
-		 permanent, brutal_kill, worker, [emongo_server]}
-	]}}.

File src/emongo_sup.erl

 
 -behaviour(supervisor).
 
--export([start_link/0, start_pool/3, init/1]).
+-export([start_link/0, start_pool/5, stop_pool/1, pools/0, pool_pid/1]).
+
+%% supervisor exports
+-export([init/1]).
 
 %%%%%%%%%%%%%%%%
 %% public api %%
 
 start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []).
 
-start_pool(PoolId, Host, Port) ->
-	% emongo_server_sup instances are added dynamically, one for each pool
-	supervisor:start_child(?MODULE, {PoolId,
-		{emongo_server_sup, start_link, [PoolId, Host, Port]},
-		permanent, infinity, supervisor, [emongo_server_sup]
+start_pool(PoolId, Host, Port, Database, Size) ->
+    supervisor:start_child(?MODULE, {PoolId,
+		{emongo_pool, start_link, [PoolId, Host, Port, Database, Size]},
+		permanent, 10000, worker, [emongo_pool]
 	}).
 
+stop_pool(PoolId) ->
+    supervisor:terminate_child(?MODULE, PoolId),
+    supervisor:delete_child(?MODULE, PoolId).
+
+pools() ->
+    [{PoolId, Pid} || {PoolId, Pid, _, [emongo_pool]} <- supervisor:which_children(?MODULE)].
+
+pool_pid(PoolId) ->
+    case [Pid || {Id, Pid, _, [emongo_pool]} <- supervisor:which_children(?MODULE), Id =:= PoolId] of
+        [Pid] ->
+            Pid;
+        _ ->
+            undefined
+    end.
+
 %%%%%%%%%%%%%%%%%%%%%%%%%%
 %% supervisor callbacks %%
 %%%%%%%%%%%%%%%%%%%%%%%%%%
 	{ok, {{one_for_one, 10, 10}, [
 		{emongo, {emongo, start_link, []},
 		 permanent, 5000, worker, [emongo]}
-	]}}.
+	]}}.