Commits

Dmitry Belyaev  committed 1d998d6

internal round-robin balancer

  • Participants
  • Parent commits ffdefe9

Comments (0)

Files changed (6)

File ebin/emongo.app

 	{vsn, "0.0.4"},
 	{modules, [
 		emongo, emongo_app, emongo_sup, emongo_bson, emongo_packet,
-		emongo_server, emongo_server_sup, emongo_collection
+		emongo_server, emongo_pool, emongo_collection, emongo_balancer
 	]},
 	{registered, [emongo_sup, emongo]},
 	{mod, {emongo_app, []}},

File src/emongo.erl

 %%% Internal functions
 %%--------------------------------------------------------------------
 get_pid_pool(PoolId) ->
-    PoolPid = emongo_sup:pool_pid(PoolId),
-    gen_server:call(PoolPid, pid, infinity).
+    emongo_sup:worker_pid(PoolId).
 
 dec2hex(Dec) ->
 	dec2hex(<<>>, Dec).

File src/emongo_balancer.erl

+%%%-------------------------------------------------------------------
+%%% Description : balancer for emongo_pool connections
+%%%-------------------------------------------------------------------
+-module(emongo_balancer).
+
+-behaviour(gen_server).
+
+%% API
+-export([start_link/2, pid/1]).
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+         terminate/2, code_change/3]).
+
+-record(state, {id,
+                active,
+                passive,
+                timer=undefined
+               }).
+
+-define(POOL_ID(BalancerId, PoolIdx), {BalancerId, PoolIdx}).
+-define(RECHECK_TIME, 10000).
+
+%%====================================================================
+%% API
+%%====================================================================
+%%--------------------------------------------------------------------
+%% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
+%% Description: Starts the server
+%%--------------------------------------------------------------------
+start_link(BalId, Pools) ->
+    gen_server:start_link(?MODULE, [BalId, Pools], []).
+
+pid(BalancerPid) ->
+    gen_server:call(BalancerPid, pid, infinity).
+
+%%====================================================================
+%% gen_server callbacks
+%%====================================================================
+
+%%--------------------------------------------------------------------
+%% Function: init(Args) -> {ok, State} |
+%%                         {ok, State, Timeout} |
+%%                         ignore               |
+%%                         {stop, Reason}
+%% Description: Initiates the server
+%%--------------------------------------------------------------------
+init([BalId, Pools]) ->
+    process_flag(trap_exit, true),
+    self() ! {init, Pools},
+    {ok, #state{id = BalId,
+                active = queue:new(),
+                passive = []}}.
+
+%%--------------------------------------------------------------------
+%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
+%%                                      {reply, Reply, State, Timeout} |
+%%                                      {noreply, State} |
+%%                                      {noreply, State, Timeout} |
+%%                                      {stop, Reason, Reply, State} |
+%%                                      {stop, Reason, State}
+%% Description: Handling call messagesp
+%%--------------------------------------------------------------------
+handle_call(pid, From, #state{id=BalId, active=Active, passive=Passive, timer=Timer}=State) ->
+    case queue:out(Active) of
+        {{value, PoolIdx}, Q2} ->
+            case emongo_sup:worker_pid(?POOL_ID(BalId, PoolIdx)) of
+                undefined ->
+                    handle_call(pid, From,
+                                State#state{active=Q2,
+                                            passive=[PoolIdx | Passive],
+                                            timer=set_timer(Timer)
+                                           });
+                Pid ->
+                    {reply, Pid, State#state{active=queue:in(PoolIdx, Q2)}}
+            end;
+        {empty, _} ->
+            {reply, undefined, State}
+    end;
+
+handle_call(stop_children, _, #state{id=BalId, active=Active, passive=Passive}=State) ->
+    Fun = fun(PoolIdx) ->
+                  emongo_sup:stop_pool(?POOL_ID(BalId, PoolIdx)),
+                  false
+          end,
+    lists:foreach(Fun, Passive),
+    
+    {reply, ok, State#state{active=queue:filter(Fun, Active), passive=[]}};
+
+handle_call(_Request, _From, State) ->
+    Reply = ok,
+    {reply, Reply, State}.
+
+%%--------------------------------------------------------------------
+%% Function: handle_cast(Msg, State) -> {noreply, State} |
+%%                                      {noreply, State, Timeout} |
+%%                                      {stop, Reason, State}
+%% Description: Handling cast messages
+%%--------------------------------------------------------------------
+handle_cast(_Msg, State) ->
+    {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% Function: handle_info(Info, State) -> {noreply, State} |
+%%                                       {noreply, State, Timeout} |
+%%                                       {stop, Reason, State}
+%% Description: Handling all non call/cast messages
+%%--------------------------------------------------------------------
+handle_info({init, Pools}, #state{id=BalId, active=Active}=State) ->
+    Fun = fun({Host, Port, Database, Size}, {PoolIdx, PoolQueue}) ->
+                  case emongo_sup:start_pool(?POOL_ID(BalId, PoolIdx),
+                                             Host, Port, Database, Size) of
+                      {ok, PoolPid} ->
+                          link(PoolPid),
+                          {PoolIdx + 1, queue:in(PoolIdx, PoolQueue)};
+                      _ ->
+                          {PoolIdx + 1, PoolQueue}
+                  end
+          end,
+    {_, PoolQueue} = lists:foldl(Fun, {1, Active}, Pools),
+    {noreply, State#state{active=PoolQueue}};
+
+handle_info(recheck, State) ->
+    {noreply, activate(State, [])};
+
+handle_info(_Info, State) ->
+    {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% Function: terminate(Reason, State) -> void()
+%% Description: This function is called by a gen_server when it is about to
+%% terminate. It should be the opposite of Module:init/1 and do any necessary
+%% cleaning up. When it returns, the gen_server terminates with Reason.
+%% The return value is ignored.
+%%--------------------------------------------------------------------
+terminate(_Reason, _State) ->
+    ok.
+
+%%--------------------------------------------------------------------
+%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
+%% Description: Convert process state when code is changed
+%%--------------------------------------------------------------------
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+%%--------------------------------------------------------------------
+%%% Internal functions
+%%--------------------------------------------------------------------
+
+set_timer(undefined) ->
+    erlang:send_after(?RECHECK_TIME, self(), recheck);
+set_timer(TimerRef) ->
+    TimerRef.
+
+
+activate(#state{passive=[], timer=_TimerRef}=State, []) ->
+    State#state{timer=undefined};
+
+activate(#state{passive=[]}=State, Passive) ->
+    State#state{passive=Passive, timer=erlang:send_after(?RECHECK_TIME, self(), recheck)};
+
+activate(#state{id=BalId, active=Active, passive=[PoolIdx | Passive]}=State, Acc) ->
+    case emongo_sup:worker_pid(?POOL_ID(BalId, PoolIdx)) of
+        undefined ->
+            activate(State#state{passive=Passive}, [PoolIdx | Acc]);
+        _ ->
+            activate(State#state{active=queue:in(PoolIdx, Active), passive=Passive}, Acc)
+    end.

File src/emongo_pool.erl

 -behaviour(gen_server).
 
 %% API
--export([start_link/5]).
+-export([start_link/5, pid/1]).
 
 %% gen_server callbacks
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
 start_link(PoolId, Host, Port, Database, Size) ->
     gen_server:start_link(?MODULE, [PoolId, Host, Port, Database, Size], []).
 
+pid(Pid) ->
+    gen_server:call(Pid, pid, infinity).
+
 %%%%%%%%%%%%%%%%%%%%%%%%%%
 %% gen_server callbacks %%
 %%%%%%%%%%%%%%%%%%%%%%%%%%
 %%                                      {stop, Reason, State}
 %% Description: Handling call messages
 %%--------------------------------------------------------------------
-handle_call(state, _From, State) ->
-    {reply, State, State};
 handle_call(pid, _From, #pool{conn_pid=Pids, req_id=ReqId}=State) ->
     case queue:out(Pids) of
         {{value, Pid}, Q2} ->
 %%                                       {stop, Reason, State}
 %% Description: Handling all non call/cast messages
 %%--------------------------------------------------------------------
-handle_info({'EXIT', Pid, {PoolId, tcp_closed}}, #pool{conn_pid=Pids}=State) ->
-    Pids1 = queue:filter(fun(Item) -> Item =/= Pid end, Pids),
-    
-    NewState = do_open_connections(State#pool{conn_pid = Pids1}),
-    {noreply, NewState};
+handle_info({'EXIT', Pid, Reason}, #pool{conn_pid=Pids}=State) ->
+    case case Reason of
+             tcp_closed -> true;
+             {tcp_error, _Reason} -> true;
+             _ -> false
+         end
+        of
+        true ->
+            Pids1 = queue:filter(fun(Item) -> Item =/= Pid end, Pids),
+
+            NewState = do_open_connections(State#pool{conn_pid = Pids1}),
+            {noreply, NewState};
+        _ ->
+            {noreply, State}
+    end;
 
 handle_info(reconnect, State) ->
     NewState = do_open_connections(State),

File src/emongo_server.erl

 					{noreply, State#state{requests=Requests, leftover=Tail}}
 			end
 	end;
-handle_info({tcp_closed, _Socket}, State) ->
-	exit({State#state.pool_id, tcp_closed});
-handle_info({tcp_error, _Socket, Reason}, State) ->
-	exit({State#state.pool_id, Reason}).
+handle_info({tcp_closed, _Socket}, _State) ->
+	exit(tcp_closed);
+handle_info({tcp_error, _Socket, Reason}, _State) ->
+	exit({tcp_error, Reason}).
 
 terminate(_, State) -> gen_tcp:close(State#state.socket).
 
-code_change(_Old, State, _Extra) -> {ok, State}.
+code_change(_Old, State, _Extra) -> {ok, State}.

File src/emongo_sup.erl

 
 -behaviour(supervisor).
 
--export([start_link/0, start_pool/5, stop_pool/1, pools/0, pool_pid/1]).
+-export([start_link/0, start_pool/5, stop_pool/1, pools/0, worker_pid/1]).
+-export([start_balancer/2, stop_balancer/1]).
 
 %% supervisor exports
 -export([init/1]).
 
 start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []).
 
+start_balancer(BalId, Pools) ->
+    supervisor:start_child(?MODULE,
+                           {BalId,
+                            {emongo_balancer, start_link, [BalId, Pools]},
+                            permanent, 10000, worker, [emongo_balancer]
+                           }).
+
+stop_balancer(BalId) ->
+    case [Pid || {PoolId, Pid, _, [emongo_balancer]} <- supervisor:which_children(?MODULE), PoolId =:= BalId] of
+        [Pid] ->
+            gen_server:call(Pid, stop_children),
+            stop_pool(BalId)
+    end.
+
+
 start_pool(PoolId, Host, Port, Database, Size) ->
     supervisor:start_child(?MODULE, {PoolId,
 		{emongo_pool, start_link, [PoolId, Host, Port, Database, Size]},
 pools() ->
     [{PoolId, Pid} || {PoolId, Pid, _, [emongo_pool]} <- supervisor:which_children(?MODULE)].
 
-pool_pid(PoolId) ->
-    case [Pid || {Id, Pid, _, [emongo_pool]} <- supervisor:which_children(?MODULE), Id =:= PoolId] of
-        [Pid] ->
-            Pid;
+worker_pid(PoolId) ->
+    case [{Pid, Module} || {Id, Pid, _, [Module]} <- supervisor:which_children(?MODULE), Id =:= PoolId] of
+        [{Pid, Module}] ->
+            Module:pid(Pid);
         _ ->
             undefined
     end.