Commits

Anonymous committed 1af6409

checking in core code

  • Participants
  • Parent commits 8be066f

Comments (0)

Files changed (7)

File include/emongo.hrl

+-record(pool, {id, host, port, database, size=1, conn_pids=[], req_id=1}).
+-record(header, {messageLength, requestID, responseTo, opCode}).
+-record(response, {header, responseFlag, cursorID, startingFrom, numberReturned, documents}).
+
+-define(OP_REPLY, 1).
+-define(OP_MSG, 1000).
+-define(OP_UPDATE, 2001).
+-define(OP_INSERT, 2002).
+-define(OP_QUERY, 2004).
+-define(OP_GET_MORE, 2005).
+-define(OP_DELETE, 2006).
+-define(OP_KILL_CURSORS, 2007).

File priv/example.config

+[{emongo, [
+	{pools, [
+		{test1, [
+			{size, 1},
+			{host, "localhost"},
+			{port, 27017},
+			{database, "testdatabase"}
+		]}
+	]}
+]}].

File src/emongo.erl

 -export([start_link/0, init/1, handle_call/3, handle_cast/2, 
 		 handle_info/2, terminate/2, code_change/3]).
 
--export([find/2, find_one/2, insert/2, save/2, remove/2]).
+-export([pools/0, add_pool/5, insert/3]).
+
+-include("emongo.hrl").
 
 %%====================================================================
 %% API
 start_link() ->
 	gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
 		
+pools() ->
+	gen_server:call(?MODULE, pools, infinity).
+	
+add_pool(PoolId, Host, Port, Database, Size) ->
+	gen_server:call(?MODULE, {add_pool, PoolId, Host, Port, Database, Size}, infinity).
+	
 %%show_dbs() -> ok.
 
 %%show_collections(Database) -> ok.
 
 %%use_db(PoolId) -> ok.
 
-find(PoolId, {obj, Props}) ->
-	gen_server:call(?MODULE, {PoolId, {find, {obj, Props}}}, infinity).
+%find(PoolId, Collection, {obj, _}=Obj) ->
 
-find_one(PoolId, {obj, Props}) -> ok.
+%find_one(PoolId, Collection, {obj, _}=Obj) ->
 
-insert(PoolId, {obj, Props}) -> ok.
+insert(PoolId, Collection, {obj, _}=Obj) ->
+	{Pid, Pool} = gen_server:call(?MODULE, {pid, PoolId}, infinity),
+	Packet = emongo_packet:insert(Pool#pool.database, Collection, Pool#pool.req_id, Obj),
+	emongo_conn:send(Pid, Pool#pool.req_id, Packet).
 
 %%update
 
-save(PoolId, {obj, Props}) -> ok.
+%save(PoolId, {obj, Props}) -> ok.
 
-remove(PoolId, {obj, Props}) -> ok.
+%remove(PoolId, {obj, Props}) -> ok.
 
 %%ensure_index
 
 %%count
 
-%%drop_collection
-
-%%clear
+%drop_collection(PoolId, Collection) when is_atom(PoolId), is_list(Collection) ->
 
 %%====================================================================
 %% gen_server callbacks
 %% Description: Initiates the server
 %%--------------------------------------------------------------------
 init(_) ->
-	{ok, []}.
+	{ok, initialize_pools()}.
 
 %%--------------------------------------------------------------------
 %% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
 %%                                      {stop, Reason, State}
 %% Description: Handling call messages
 %%--------------------------------------------------------------------
-handle_call({PoolId, {find, {obj, Props}}}, _From, Pools) ->
+handle_call(pools, _From, Pools) ->
+	{reply, Pools, Pools};
+	
+handle_call({add_pool, PoolId, Host, Port, Database, Size}, _From, Pools) ->
+	{Result, Pools1} = 
+		case proplists:is_defined(PoolId, Pools) of
+			true ->
+				{{error, pool_already_exists}, Pools};
+			false ->
+				Pool = #pool{
+					id=PoolId,
+					host=Host,
+					port=Port,
+					database=Database,
+					size=Size
+				},
+				Pool1 = open_connections(Pool),
+				{ok, [{PoolId, Pool1}|Pools]}
+		end,
+	{reply, Result, Pools1};
+	
+handle_call({pid, PoolId}, _From, Pools) ->
 	Pool = proplists:get_value(PoolId, Pools),
-	
-	{reply, ok, Pools};
-	
-	
-
+	Pool1 = Pool#pool{req_id = ((Pool#pool.req_id)+1)},
+	OtherPools = proplists:delete(PoolId, Pools),
+	Pid = get_conn_pid(Pool),
+	{reply, {Pid, Pool}, [Pool1|OtherPools]};
 	
 handle_call(_, _From, State) -> {reply, {error, invalid_call}, State}.
 
 %%--------------------------------------------------------------------
 %%% Internal functions
 %%--------------------------------------------------------------------
+initialize_pools() ->
+	case application:get_env(emongo, pools) of
+		undefined ->
+			[];
+		{ok, Pools} ->
+			[begin
+				Pool = #pool{
+					id = PoolId, 
+					size = proplists:get_value(size, Props, 1),
+					host = proplists:get_value(host, Props, "localhost"), 
+					port = proplists:get_value(port, Props, 27017), 
+					database = proplists:get_value(database, Props, "test")
+				},
+				{PoolId, open_connections(Pool)}
+			 end || {PoolId, Props} <- Pools]
+	end.
+	
+open_connections(Pool) ->
+	ConnPids = [begin
+		emongo_conn:start_link(Pool#pool.host, Pool#pool.port)
+	 end || _ <- lists:seq(1, Pool#pool.size)],
+	Pool#pool{conn_pids=ConnPids}.
 
-
-
+get_conn_pid(#pool{conn_pids=[Pid|_]}) -> Pid.

File src/emongo_app.erl

 
 init(_) ->
    {ok, {{one_for_one, 10, 10}, [
-       %{ex_scheduler, {ex_scheduler, start_link, []}, permanent, 5000, worker, [ex_scheduler]}
+       {emongo, {emongo, start_link, []}, permanent, 5000, worker, [emongo]}
    ]}}.
 
 build_rel() ->

File src/emongo_conn.erl

+%% Copyright (c) 2009 Jacob Vorreuter <jacob.vorreuter@gmail.com>
+%% 
+%% Permission is hereby granted, free of charge, to any person
+%% obtaining a copy of this software and associated documentation
+%% files (the "Software"), to deal in the Software without
+%% restriction, including without limitation the rights to use,
+%% copy, modify, merge, publish, distribute, sublicense, and/or sell
+%% copies of the Software, and to permit persons to whom the
+%% Software is furnished to do so, subject to the following
+%% conditions:
+%% 
+%% The above copyright notice and this permission notice shall be
+%% included in all copies or substantial portions of the Software.
+%% 
+%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+%% EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+%% OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+%% NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+%% HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+%% WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+%% FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+%% OTHER DEALINGS IN THE SOFTWARE.
+-module(emongo_conn).
+
+-export([start_link/2, init/3, send/3, send_recv/3]).
+
+-record(request, {req_id, requestor}).
+-record(state, {socket, requests}).
+
+-include("emongo.hrl").
+
+start_link(Host, Port) ->
+	proc_lib:start_link(?MODULE, init, [Host, Port, self()]).
+	
+init(Host, Port, Parent) ->
+	Socket = open_socket(Host, Port),
+	proc_lib:init_ack(Parent, self()),
+	loop(#state{socket=Socket, requests=[]}).
+	
+send(Pid, ReqID, Packet) ->
+	gen:call(Pid, '$emongo_conn_send', {ReqID, Packet}).
+	
+send_recv(Pid, ReqID, Packet) ->
+	gen:call(Pid, '$emongo_conn_send_recv', {ReqID, Packet}).
+	
+loop(State) ->
+	receive
+		{'$emongo_conn_send', {From, Mref}, {_ReqID, Packet}} ->
+			gen_tcp:send(State#state.socket, Packet),
+			gen:reply({From, Mref}, ok),
+			loop(State);
+		{'$emongo_conn_send_recv', {From, Mref}, {ReqID, Packet}} -> 
+			gen_tcp:send(State#state.socket, Packet),
+			gen:reply({From, Mref}, ok),
+			Request = #request{req_id=ReqID, requestor={From, Mref}},
+			State1 = State#state{requests=[Request|State#state.requests]},
+			loop(State1);
+		{tcp, _Sock, Data} ->
+			Resp = emongo_packet:decode_response(Data),
+			ResponseTo = (Resp#response.header)#header.responseTo,
+			case proplists:get_value(ResponseTo, State#state.requests) of
+				undefined ->
+					ok;
+				Requestor ->
+					gen:reply(Requestor, Resp)
+			end,
+			loop(State)
+	end.
+	
+open_socket(Host, Port) ->
+	case gen_tcp:connect(Host, Port, [binary, {active, true}]) of
+		{ok, Sock} ->
+			Sock;
+		{error, Reason} ->
+			exit({failed_to_open_socket, Reason})
+	end.

File src/emongo_packet.erl

+%% Copyright (c) 2009 Jacob Vorreuter <jacob.vorreuter@gmail.com>
+%% 
+%% Permission is hereby granted, free of charge, to any person
+%% obtaining a copy of this software and associated documentation
+%% files (the "Software"), to deal in the Software without
+%% restriction, including without limitation the rights to use,
+%% copy, modify, merge, publish, distribute, sublicense, and/or sell
+%% copies of the Software, and to permit persons to whom the
+%% Software is furnished to do so, subject to the following
+%% conditions:
+%% 
+%% The above copyright notice and this permission notice shall be
+%% included in all copies or substantial portions of the Software.
+%% 
+%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+%% EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+%% OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+%% NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+%% HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+%% WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+%% FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+%% OTHER DEALINGS IN THE SOFTWARE.
+-module(emongo_packet).
+
+-export([insert/4, decode_response/1]).
+
+-include("emongo.hrl").
+	
+insert(Database, Collection, ReqID, {obj, Props}) ->
+	FullName = mongodb_bson:encode_cstring(lists:concat([Database, ".", Collection])),
+	EncodedDocument = mongodb_bson:encode({obj, Props}),
+	Message = <<0:32, FullName/binary, EncodedDocument/binary>>,
+	Length = byte_size(Message),
+    <<(Length+16):32/little-signed, ReqID:32/little-signed, 0:32, ?OP_INSERT:32/little-signed, Message/binary>>.
+
+decode_response(<<Length:32/little-signed, ReqID:32/little-signed, RespTo:32/little-signed, Op:32/little-signed, Message/binary>>) ->
+	<<RespFlag:32/little-signed, 
+	  CursorID:64/little-signed, 
+	  StartingFrom:32/little-signed, 
+	  NumRet:32/little-signed, 
+	  Documents/binary>> = Message,
+	#response{
+		header = {header, Length, ReqID, RespTo, Op}, 
+		responseFlag = RespFlag, 
+		cursorID = CursorID, 
+		startingFrom = StartingFrom, 
+		numberReturned = NumRet, 
+		documents = mongodb_bson:decode(Documents)
+	}.

File t/001-load.t

 #!/usr/local/bin/escript
 %% -*- erlang -*-
-%%! -pa ebin -sasl errlog_type error -boot start_sasl -noshell
+%%! -pa ebin -sasl errlog_type error -boot start_sasl -noshell -config priv/example
 
 main(_) ->
 	etap:plan(unknown),
     error_logger:tty(false),
     etap_application:start_ok(emongo, "application 'emongo' started ok"),
 
+	etap:is(length(emongo:pools()), 1, "one pool exists in state"),
+
    	etap:end_tests().