Commits

Jacob Perkins committed 8686a5c

emongo 0.0.3 with supervised pools and connections

Comments (0)

Files changed (8)

 {application, emongo, [
 	{description, "Erlang MongoDB Driver"},
-	{vsn, "0.0.2"},
+	{vsn, "0.0.3"},
 	{modules, [
-		emongo, emongo_app, emongo_sup, emongo_bson, emongo_packet, emongo_server
+		emongo, emongo_app, emongo_sup, emongo_bson, emongo_packet,
+		emongo_server, emongo_server_sup
 	]},
 	{registered, [emongo_sup, emongo]},
 	{mod, {emongo_app, []}},
 	{applications, [kernel, stdlib]},
 	{env, [
 		{pools, [
-			{emongo, [
+			% NOTE: PoolId will be a locally registered name & therefore
+			% cannot conflict with other registered names
+			{emongo_pool, [
 				{size, 1},
 				{host, "localhost"},
 				{port, 27017},

ebin/emongo.appup

-{"0.0.2", [], []}.
+{"0.0.3", [
+	{"0.0.2", [
+		{load_module, emongo},
+		{load_module, emongo_app},
+		{load_module, emongo_server},
+		{load_module, emongo_sup},
+		{add_module, emongo_server_sup},
+		{restart_application, emongo}
+	]}
+], [
+	{"0.0.2", [
+		{load_module, emongo},
+		{load_module, emongo_app},
+		{load_module, emongo_server},
+		{load_module, emongo_sup},
+		{delete_module, emongo_server_sup},
+		{restart_application, emongo}
+	]}
+]}.

include/emongo.hrl

--record(pool, {id, host, port, database, size=1, conn_pids=queue:new(), req_id=1}).
+-record(pool, {id, host, port, database, size=1, conn_pid=1, req_id=1}).
 -record(header, {message_length, request_id, response_to, op_code}).
 -record(response, {header, response_flag, cursor_id, offset, limit, documents}).
 -record(emo_query, {opts=[], offset=0, limit=0, q=[], field_selector=[]}).
 %% Copyright (c) 2009 Jacob Vorreuter <jacob.vorreuter@gmail.com>
-%% 
+%%
 %% Permission is hereby granted, free of charge, to any person
 %% obtaining a copy of this software and associated documentation
 %% files (the "Software"), to deal in the Software without
 %% copies of the Software, and to permit persons to whom the
 %% Software is furnished to do so, subject to the following
 %% conditions:
-%% 
+%%
 %% The above copyright notice and this permission notice shall be
 %% included in all copies or substantial portions of the Software.
-%% 
+%%
 %% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
 %% EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
 %% OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
 -module(emongo).
 -behaviour(gen_server).
 
--export([start_link/0, init/1, handle_call/3, handle_cast/2, 
+-export([start_link/0, init/1, handle_call/3, handle_cast/2,
 		 handle_info/2, terminate/2, code_change/3]).
 
 -export([pools/0, oid/0, add_pool/5, find/2, find/3, find/4,
 %%--------------------------------------------------------------------
 start_link() ->
 	gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-		
+
 pools() ->
 	gen_server:call(?MODULE, pools, infinity).
-	
+
 oid() ->
 	gen_server:call(?MODULE, oid, infinity).
-	
+
 add_pool(PoolId, Host, Port, Database, Size) ->
 	gen_server:call(?MODULE, {add_pool, PoolId, Host, Port, Database, Size}, infinity).
 
 
 find(PoolId, Collection, Selector) when ?IS_DOCUMENT(Selector) ->
 	find(PoolId, Collection, Selector, [{timeout, ?TIMEOUT}]);
-	
+
 %% this function has been deprecated
 find(PoolId, Collection, Query) when is_record(Query, emo_query) ->
 	{Pid, Pool} = gen_server:call(?MODULE, {pid, PoolId}, infinity),
 	Packet = emongo_packet:do_query(Pool#pool.database, Collection, Pool#pool.req_id, Query),
 	emongo_server:send_recv(Pid, Pool#pool.req_id, Packet, ?TIMEOUT).
-	
+
 %% @spec find(PoolId, Collection, Selector, Options) -> Result
 %%		 PoolId = atom()
 %%		 Collection = string()
 	{Pid, Pool} = gen_server:call(?MODULE, {pid, PoolId}, infinity),
 	Query = create_query(Options, Selector),
 	Packet = emongo_packet:do_query(Pool#pool.database, Collection, Pool#pool.req_id, Query),
-	Resp = emongo_server:send_recv(Pid, Pool#pool.req_id, Packet, proplists:get_value(timeout, Options, ?TIMEOUT)),
-	case lists:member(response_options, Options) of
-		true -> Resp;
-		false -> Resp#response.documents
+	Timeout = proplists:get_value(timeout, Options, ?TIMEOUT),
+	% TODO: generalize this for all send_recv calls
+	try emongo_server:send_recv(Pid, Pool#pool.req_id, Packet, Timeout) of
+		Resp ->
+			case lists:member(response_options, Options) of
+				true -> Resp;
+				false -> Resp#response.documents
+			end
+	catch
+		exit:{timeout, Reason} ->
+			% force restart of affected emongo_server and try again with a new
+			% emongo_server pid
+			error_logger:warning_report([{timeout, Pid}]),
+			exit(Pid, Reason),
+			find(PoolId, Collection, Selector, Options)
 	end.
-	
+
 %%------------------------------------------------------------------------------
 %% find_all
 %%------------------------------------------------------------------------------
 find_all(PoolId, Collection, Selector, Options) when ?IS_DOCUMENT(Selector), is_list(Options) ->
 	Resp = find(PoolId, Collection, Selector, [response_options|Options]),
 	find_all(PoolId, Collection, Selector, Options, Resp).
-	
+
 find_all(_PoolId, _Collection, _Selector, Options, Resp) when is_record(Resp, response), Resp#response.cursor_id == 0 ->
 	case lists:member(response_options, Options) of
 		true -> Resp;
 		false -> Resp#response.documents
 	end;
-	
+
 find_all(PoolId, Collection, Selector, Options, Resp) when is_record(Resp, response) ->
 	Resp1 = get_more(PoolId, Collection, Resp#response.cursor_id, proplists:get_value(timeout, Options, ?TIMEOUT)),
 	Documents = lists:append(Resp#response.documents, Resp1#response.documents),
 %%------------------------------------------------------------------------------
 get_more(PoolId, Collection, CursorID, Timeout) ->
 	get_more(PoolId, Collection, CursorID, 0, Timeout).
-	
+
 get_more(PoolId, Collection, CursorID, NumToReturn, Timeout) ->
 	{Pid, Pool} = gen_server:call(?MODULE, {pid, PoolId}, infinity),
 	Packet = emongo_packet:get_more(Pool#pool.database, Collection, Pool#pool.req_id, NumToReturn, CursorID),
 	emongo_server:send_recv(Pid, Pool#pool.req_id, Packet, Timeout).
-	
+
 kill_cursors(PoolId, CursorID) when is_integer(CursorID) ->
 	kill_cursors(PoolId, [CursorID]);
-	
+
 kill_cursors(PoolId, CursorIDs) when is_list(CursorIDs) ->
 	{Pid, Pool} = gen_server:call(?MODULE, {pid, PoolId}, infinity),
 	Packet = emongo_packet:kill_cursors(Pool#pool.req_id, CursorIDs),
 	emongo_server:send(Pid, Pool#pool.req_id, Packet).
-	
+
 %%------------------------------------------------------------------------------
 %% insert
 %%------------------------------------------------------------------------------
 insert(PoolId, Collection, Document) when ?IS_DOCUMENT(Document) ->
 	insert(PoolId, Collection, [Document]);
-	
+
 insert(PoolId, Collection, Documents) when ?IS_LIST_OF_DOCUMENTS(Documents) ->
 	{Pid, Pool} = gen_server:call(?MODULE, {pid, PoolId}, infinity),
 	Packet = emongo_packet:insert(Pool#pool.database, Collection, Pool#pool.req_id, Documents),
 %%------------------------------------------------------------------------------
 update(PoolId, Collection, Selector, Document) when ?IS_DOCUMENT(Selector), ?IS_DOCUMENT(Document) ->
 	update(PoolId, Collection, Selector, Document, false).
-	
+
 update(PoolId, Collection, Selector, Document, Upsert) when ?IS_DOCUMENT(Selector), ?IS_DOCUMENT(Document) ->
 	{Pid, Pool} = gen_server:call(?MODULE, {pid, PoolId}, infinity),
 	Packet = emongo_packet:update(Pool#pool.database, Collection, Pool#pool.req_id, Upsert, Selector, Document),
 %%------------------------------------------------------------------------------
 delete(PoolId, Collection) ->
 	delete(PoolId, Collection, []).
-	
+
 delete(PoolId, Collection, Selector) ->
 	{Pid, Pool} = gen_server:call(?MODULE, {pid, PoolId}, infinity),
 	Packet = emongo_packet:delete(Pool#pool.database, Collection, Pool#pool.req_id, Selector),
 %%--------------------------------------------------------------------
 init(_) ->
 	process_flag(trap_exit, true),
-	Pools = initialize_pools(),
+	%Pools = initialize_pools(),
 	{ok, HN} = inet:gethostname(),
 	<<HashedHN:3/binary,_/binary>> = erlang:md5(HN),
-	{ok, #state{pools=Pools, oid_index=1, hashed_hostn=HashedHN}}.
+	{ok, #state{pools=[], oid_index=1, hashed_hostn=HashedHN}}.
 
 %%--------------------------------------------------------------------
 %% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
 %%--------------------------------------------------------------------
 handle_call(pools, _From, State) ->
 	{reply, State#state.pools, State};
-	
+
 handle_call(oid, _From, State) ->
 	{Total_Wallclock_Time, _} = erlang:statistics(wall_clock),
 	Front = Total_Wallclock_Time rem 16#ffffffff,
 	<<_:20/binary,PID:2/binary,_/binary>> = term_to_binary(self()),
 	Index = State#state.oid_index rem 16#ffffff,
 	{reply, <<Front:32, (State#state.hashed_hostn)/binary, PID/binary, Index:24>>, State#state{oid_index = State#state.oid_index + 1}};
-	
+
 handle_call({add_pool, PoolId, Host, Port, Database, Size}, _From, #state{pools=Pools}=State) ->
-	{Result, Pools1} = 
-		case proplists:is_defined(PoolId, Pools) of
-			true ->
-				{{error, pool_already_exists}, Pools};
-			false ->
-				Pool = #pool{
-					id=PoolId,
-					host=Host,
-					port=Port,
-					database=Database,
-					size=Size
-				},
-				Pool1 = do_open_connections(Pool),
-				{ok, [{PoolId, Pool1}|Pools]}
-		end,
-	{reply, Result, State#state{pools=Pools1}};
-	
+	case proplists:is_defined(PoolId, Pools) of
+		true ->
+			{reply, {error, pool_already_exists}, State};
+		false ->
+			Pool = #pool{
+				id=PoolId,
+				host=Host,
+				port=Port,
+				database=Database,
+				size=Size
+			},
+
+			{ok, _SupPid} = emongo_sup:start_pool(PoolId, Host, Port),
+			Pool1 = do_open_connections(Pool),
+			{reply, ok, State#state{pools=[{PoolId, Pool1}|Pools]}}
+	end;
+
 handle_call({pid, PoolId}, _From, #state{pools=Pools}=State) ->
-	case get_pool(PoolId, Pools) of
-		undefined ->
+	case lists:keytake(PoolId, 1, Pools) of
+		false ->
 			{reply, {undefined, undefined}, State};
-		{Pool, Others} ->			
-			case queue:out(Pool#pool.conn_pids) of
-				{{value, Pid}, Q2} ->
-					Pool1 = Pool#pool{conn_pids = queue:in(Pid, Q2), req_id = ((Pool#pool.req_id)+1)},
-					Pools1 = [{PoolId, Pool1}|Others],
-					{reply, {Pid, Pool}, State#state{pools=Pools1}};
-				{empty, _} ->
-					{reply, {undefined, Pool}, State}
-			end
+		{value, {PoolId, Pool}, Others} ->
+			N = Pool#pool.conn_pid,
+			Pid = emongo_server_sup:nth_child_pid(Pool#pool.id, N),
+			
+			if
+				N < Pool#pool.size -> N1 = N + 1;
+				true -> N1 = 1
+			end,
+			
+			Pool1 = Pool#pool{conn_pid=N1, req_id=(Pool#pool.req_id)+1},
+			Pools1 = [{PoolId, Pool1} | Others],
+			{reply, {Pid, Pool}, State#state{pools=Pools1}}
 	end;
-	
+
 handle_call(_, _From, State) -> {reply, {error, invalid_call}, State}.
 
 %%--------------------------------------------------------------------
 %%                                       {stop, Reason, State}
 %% Description: Handling all non call/cast messages
 %%--------------------------------------------------------------------
-handle_info({'EXIT', Pid, {PoolId, tcp_closed}}, #state{pools=Pools}=State) ->
-	io:format("EXIT ~p, {~p, tcp_closed}~n", [Pid, PoolId]),
-	State1 =
-		case get_pool(PoolId, Pools) of
-			undefined ->
-				State;
-			{Pool, Others} ->
-				Pids1 = queue:filter(fun(Item) -> Item =/= Pid end, Pool#pool.conn_pids),
-				Pool1 = Pool#pool{conn_pids = Pids1},
-				Pool2 = do_open_connections(Pool1),
-				Pools1 = [{PoolId, Pool2}|Others],
-				State#state{pools=Pools1}
-		end,
-	{noreply, State1};
-	
+%handle_info({'EXIT', Pid, {PoolId, tcp_closed}}, #state{pools=Pools}=State) ->
+	%io:format("EXIT ~p, {~p, tcp_closed}~n", [Pid, PoolId]),
+	%State1 =
+		%case get_pool(PoolId, Pools) of
+			%undefined ->
+				%State;
+			%{Pool, Others} ->
+				%Pids1 = queue:filter(fun(Item) -> Item =/= Pid end, Pool#pool.conn_pids),
+				%Pool1 = Pool#pool{conn_pids = Pids1},
+				%Pool2 = do_open_connections(Pool1),
+				%Pools1 = [{PoolId, Pool2}|Others],
+				%State#state{pools=Pools1}
+		%end,
+	%{noreply, State1};
+
 handle_info(_Info, State) ->
     {noreply, State}.
 
 %%--------------------------------------------------------------------
 %%% Internal functions
 %%--------------------------------------------------------------------
-initialize_pools() ->
-	case application:get_env(emongo, pools) of
-		undefined ->
-			[];
-		{ok, Pools} ->
-			F = fun({PoolId, Props}) ->
-					Pool = #pool{
-						id = PoolId, 
-						size = proplists:get_value(size, Props, 1),
-						host = proplists:get_value(host, Props, "localhost"), 
-						port = proplists:get_value(port, Props, 27017), 
-						database = proplists:get_value(database, Props, "test")
-					},
-					{PoolId, do_open_connections(Pool)}
-				end,
-			
-			lists:map(F, Pools)
-	end.
-		
-do_open_connections(#pool{conn_pids=Pids, size=Size}=Pool) -> 
-	case queue:len(Pids) < Size of
-		true ->
-			%Pid = emongo_server:start_link(Pool#pool.id, Pool#pool.host, Pool#pool.port),
-			{ok, Pid} = emongo_server:start_link(Pool#pool.id, Pool#pool.host, Pool#pool.port),
-			do_open_connections(Pool#pool{conn_pids = queue:in(Pid, Pids)});
-		false ->
-			Pool
-	end.
 
-get_pool(PoolId, Pools) ->
-	get_pool(PoolId, Pools, []).
-	
-get_pool(_, [], _) ->
-	undefined;
-		
-get_pool(PoolId, [{PoolId, Pool}|Tail], Others) ->
-	{Pool, lists:append(Tail, Others)};
-	
-get_pool(PoolId, [Pool|Tail], Others) ->
-	get_pool(PoolId, Tail, [Pool|Others]).
-	
+do_open_connections(#pool{size=Size}=Pool) ->
+	% each connection is an emongo_server supervised by simple_one_for_one
+	% emongo_server_sup supervisor
+	F = fun(_) -> {ok, _} = emongo_server_sup:start_child(Pool#pool.id) end,
+	lists:foreach(F, lists:seq(1, Size)),
+	Pool.
+
 dec2hex(Dec) ->
 	dec2hex(<<>>, Dec).
-	
+
 dec2hex(N, <<I:8,Rem/binary>>) ->
 	dec2hex(<<N/binary, (hex0((I band 16#f0) bsr 4)):8, (hex0((I band 16#0f))):8>>, Rem);
 dec2hex(N,<<>>) ->
 
 hex2dec(Hex) when is_list(Hex) ->
 	hex2dec(list_to_binary(Hex));
-	
+
 hex2dec(Hex) ->
 	hex2dec(<<>>, Hex).
-	
+
 hex2dec(N,<<A:8,B:8,Rem/binary>>) ->
 	hex2dec(<<N/binary, ((dec0(A) bsl 4) + dec0(B)):8>>, Rem);
 hex2dec(N,<<>>) ->
 create_query(Options, Selector) ->
 	Selector1 = transform_selector(Selector),
 	create_query(Options, #emo_query{}, Selector1, []).
-	
+
 create_query([], QueryRec, QueryDoc, []) ->
 	QueryRec#emo_query{q=QueryDoc};
 
-create_query([], QueryRec, [], OptDoc) ->	
+create_query([], QueryRec, [], OptDoc) ->
 	QueryRec#emo_query{q=OptDoc};
-	
+
 create_query([], QueryRec, QueryDoc, OptDoc) ->
 	QueryRec#emo_query{q=(OptDoc ++ [{<<"query">>, QueryDoc}])};
-	
+
 create_query([{limit, Limit}|Options], QueryRec, QueryDoc, OptDoc) ->
 	QueryRec1 = QueryRec#emo_query{limit=Limit},
 	create_query(Options, QueryRec1, QueryDoc, OptDoc);
-	
+
 create_query([{offset, Offset}|Options], QueryRec, QueryDoc, OptDoc) ->
 	QueryRec1 = QueryRec#emo_query{offset=Offset},
 	create_query(Options, QueryRec1, QueryDoc, OptDoc);
 	Orderby1 = [{Key, case Dir of desc -> -1; _ -> 1 end}|| {Key, Dir} <- Orderby],
 	OptDoc1 = [{<<"orderby">>, Orderby1}|OptDoc],
 	create_query(Options, QueryRec, QueryDoc, OptDoc1);
-	
+
 create_query([{fields, Fields}|Options], QueryRec, QueryDoc, OptDoc) ->
 	QueryRec1 = QueryRec#emo_query{field_selector=[{Field, 1} || Field <- Fields]},
 	create_query(Options, QueryRec1, QueryDoc, OptDoc);
-	
+
 create_query([_|Options], QueryRec, QueryDoc, OptDoc) ->
 	create_query(Options, QueryRec, QueryDoc, OptDoc).
-	
+
 transform_selector(Selector) ->
 	transform_selector(Selector, []).
-	
-transform_selector([], Acc) -> 
+
+transform_selector([], Acc) ->
 	lists:reverse(Acc);
-	
+
 transform_selector([{where, Val}|Tail], Acc) when is_list(Val) ->
 	transform_selector(Tail, [{<<"$where">>, Val}|Acc]);
-	
+
 transform_selector([{Key, [{_,_}|_]=Vals}|Tail], Acc) ->
 	Vals1 =
 		[case Operator of
-			O when O == '>'; O == gt -> 
+			O when O == '>'; O == gt ->
 				{<<"$gt">>, Val};
 			O when O == '<'; O == lt ->
 				{<<"$lt">>, Val};
 				{<<"$lte">>, Val};
 			O when O == '=/='; O == '/='; O == ne ->
 				{<<"$ne">>, Val};
-			in when is_list(Val) -> 
+			in when is_list(Val) ->
 				{<<"$in">>, {array, Val}};
-			nin when is_list(Val) -> 
+			nin when is_list(Val) ->
 				{<<"$nin">>, {array, Val}};
-			mod when is_list(Val), length(Val) == 2 -> 
+			mod when is_list(Val), length(Val) == 2 ->
 				{<<"$mod">>, {array, Val}};
 			all when is_list(Val) ->
 				{<<"$all">>, {array, Val}};
 				{<<"$size">>, Val};
 			exists when is_boolean(Val) ->
 				{<<"$exists">>, Val};
-			_ -> 
+			_ ->
 				{Operator, Val}
 		 end || {Operator, Val} <- Vals],
 	transform_selector(Tail, [{Key, Vals1}|Acc]);
-	
+
 transform_selector([Other|Tail], Acc) ->
 	transform_selector(Tail, [Other|Acc]).
-	
+
 dec0($a) ->	10;
 dec0($b) ->	11;
 dec0($c) ->	12;
 hex0(14) -> $e;
 hex0(15) -> $f;
 hex0(I) ->  $0 + I.
-	

src/emongo_app.erl

 
 -behaviour(application).
 
+-include("emongo.hrl").
+
 -export([start/2, stop/1]).
 
-start(_, _) -> emongo_sup:start_link().
+start(_, _) ->
+	{ok, Pid} = emongo_sup:start_link(),
+	% Pools must be initialized after emongo_sup is started instead of in
+	% emongo:init, because emongo_server_sup instances are dynamically added
+	% to the emongo_sup supervisor, which also supervises emongo gen_server.
+	% (otherwise get a deadlock where emongo is waiting on emongo_sup, which
+	% is waiting on emongo)
+	initialize_pools(),
+	{ok, Pid}.
 
-stop(_) -> ok.
+stop(_) -> ok.
+
+initialize_pools() ->
+	F = fun({PoolId, Props}) ->
+			Host = proplists:get_value(host, Props, "localhost"),
+			Port = proplists:get_value(port, Props, 27017),
+			Database = proplists:get_value(database, Props, "test"),
+			Size = proplists:get_value(size, Props, 1),
+			emongo:add_pool(PoolId, Host, Port, Database, Size)
+		end,
+	
+	case application:get_env(emongo, pools) of
+		undefined -> ok;
+		{ok, Pools} -> lists:foreach(F, Pools)
+	end.

src/emongo_server.erl

 		ok -> ok;
 		{error, Reason} -> exit(Reason)
 	end.
-	
+
 send_recv(Pid, ReqID, Packet, Timeout) ->
 	case gen_server:call(Pid, {send_recv, ReqID, Packet}, Timeout) of
 		{ok, Resp} ->
 			Documents = emongo_bson:decode(Resp#response.documents),
 			Resp#response{documents=Documents};
-		{error, Reason} -> 
+		{error, Reason} ->
 			exit(Reason)
 	end.
-	
+
 open_socket(Host, Port) ->
 	case gen_tcp:connect(Host, Port, [binary, {active, true}, {nodelay, true}]) of
 		{ok, Sock} ->
 	end.
 
 %% gen_server %%
-	
+
 init([PoolId, Host, Port]) ->
 	Socket = open_socket(Host, Port),
 	{ok, #state{pool_id=PoolId, socket=Socket, requests=[], leftover = <<>>}}.
 
 handle_info({tcp, _Socket, Data}, State) ->
 	Leftover = <<(State#state.leftover)/binary, Data/binary>>,
-	
+
 	case emongo_packet:decode_response(Leftover) of
 		undefined ->
 			{noreply, State#state{leftover=Leftover}};
 		{Resp, Tail} ->
 			ResponseTo = (Resp#response.header)#header.response_to,
-			
+
 			case lists:keytake(ResponseTo, 1, State#state.requests) of
 				false ->
 					exit({unexpected_response, Resp});

src/emongo_server_sup.erl

+%% simple_one_for_one supervisor for emongo_server instances
+%% there should be one emongo_server_sup instance for each pool, that then
+%% supervises emongo_server instances based on the size of pool
+-module(emongo_server_sup).
+
+-behaviour(supervisor).
+
+-export([start_link/3, child_count/1, start_child/1, nth_child_pid/2, init/1]).
+
+%%%%%%%%%%%%%%%%
+%% public api %%
+%%%%%%%%%%%%%%%%
+
+start_link(PoolId, Host, Port) ->
+	supervisor:start_link({local, PoolId}, ?MODULE, [PoolId, Host, Port]).
+
+child_count(PoolId) -> length(supervisor:which_children(PoolId)).
+
+start_child(PoolId) -> supervisor:start_child(PoolId, []).
+
+nth_child_pid(PoolId, N) ->
+	Children = supervisor:which_children(PoolId),
+	
+	if
+		N > length(Children) ->
+			throw(badarg);
+		true ->
+			{undefined, Pid, worker, [emongo_server]} = lists:nth(N, Children),
+			Pid
+	end.
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%
+%% supervisor callbacks %%
+%%%%%%%%%%%%%%%%%%%%%%%%%%
+
+init([PoolId, Host, Port]) ->
+	{ok, {{simple_one_for_one, 10, 10}, [
+		{emongo_server, {emongo_server, start_link, [PoolId, Host, Port]},
+		 permanent, brutal_kill, worker, [emongo_server]}
+	]}}.

src/emongo_sup.erl

 
 -behaviour(supervisor).
 
--export([start_link/0, init/1]).
+-export([start_link/0, start_pool/3, init/1]).
+
+%%%%%%%%%%%%%%%%
+%% public api %%
+%%%%%%%%%%%%%%%%
 
 start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []).
 
+start_pool(PoolId, Host, Port) ->
+	% emongo_server_sup instances are added dynamically, one for each pool
+	supervisor:start_child(?MODULE, {PoolId,
+		{emongo_server_sup, start_link, [PoolId, Host, Port]},
+		permanent, infinity, supervisor, [emongo_server_sup]
+	}).
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%
+%% supervisor callbacks %%
+%%%%%%%%%%%%%%%%%%%%%%%%%%
+
 init(_) ->
 	{ok, {{one_for_one, 10, 10}, [
-		{emongo, {emongo, start_link, []}, permanent, 5000, worker, [emongo]}
+		{emongo, {emongo, start_link, []},
+		 permanent, 5000, worker, [emongo]}
 	]}}.
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.