Commits

Anonymous committed de85e9b

New sequenced interface for combining updates and reads. New worker queue with priorities.

  • Participants
  • Parent commits 30a5490

Comments (0)

Files changed (11)

File ebin/emongo.app

 	{vsn, "0.0.5"},
 	{modules, [
 		emongo, emongo_app, emongo_sup, emongo_bson, emongo_packet,
-		emongo_server, emongo_pool, emongo_collection
+		emongo_server, emongo_pool, emongo_router, pqueue
 	]},
 	{registered, [emongo_sup, emongo]},
 	{mod, {emongo_app, []}},

File ebin/emongo.appup

-{"0.0.5", [
-	{"0.0.4", [
-		{load_module, emongo_bson},
-		{load_module, emongo}
-	]},
-	{"0.0.3", [{add_module, emongo_collection}]},
-	{"0.0.2", [
-		{load_module, emongo},
-		{load_module, emongo_app},
-		{load_module, emongo_server},
-		{add_module, emongo_server_sup},
-		{update, emongo_sup, supervisor},
-		{apply, {supervisor, terminate_child, [emongo_sup, emongo]}},
-		{apply, {supervisor, restart_child, [emongo_sup, emongo]}},
-		{apply, {emongo_app, initialize_pools, []}}
-	]}
-], [
-	{"0.0.4", [
-		{load_module, emongo_bson},
-		{load_module, emongo}
-	]},
-	{"0.0.3", [{delete_module, emongo_collection}]},
-	{"0.0.2", [
-		{load_module, emongo},
-		{load_module, emongo_app},
-		{load_module, emongo_server},
-		{delete_module, emongo_server_sup},
-		{update, emongo_sup, supervisor},
-		{apply, {supervisor, terminate_child, [emongo_sup, emongo]}},
-		{apply, {supervisor, restart_child, [emongo_sup, emongo]}}
-	]}
-]}.
+{"0.0.5",
+ [{"0.0.4", [
+             {add_module, pqueue},
+             {load_module, emongo_sup},
+             {load_module, emongo_server},
+             {load_module, emongo_router},
+             {load_module, emongo_bson},
+             {load_module, emongo},
+             {update, emongo_pool, {advanced, []}},
+             {delete_module, emongo_collection}
+            ]}
+ ],
+ [{"0.0.4", [
+             {delete_module, pqueue},
+             {load_module, emongo_sup},
+             {load_module, emongo_server},
+             {load_module, emongo_router},
+             {load_module, emongo_bson},
+             {load_module, emongo},
+             {load_module, emongo_pool},
+             {add_module, emongo_collection}
+            ]}
+ ]
+}.

File src/emongo.erl

 %% Copyright (c) 2009 Jacob Vorreuter <jacob.vorreuter@gmail.com>
+%% Jacob Perkins <japerk@gmail.com>
+%% Belyaev Dmitry <rumata-estor@nm.ru>
 %%
 %% Permission is hereby granted, free of charge, to any person
 %% obtaining a copy of this software and associated documentation
 -module(emongo).
 -behaviour(gen_server).
 
--export([start_link/0, init/1, handle_call/3, handle_cast/2,
-		 handle_info/2, terminate/2, code_change/3]).
+-export([pools/0, oid/0, add_pool/5, del_pool/1]).
 
--export([pools/0, oid/0, add_pool/5, del_pool/1, fold_all/6,
+-export([fold_all/6,
          find_all/2, find_all/3, find_all/4,
-         find_one/3, find_one/4,
-         insert/3, update/4, update/5, delete/2, delete/3,
-		 ensure_index/3, count/2, count/3, distinct/3, distinct/4,
-		 dec2hex/1, hex2dec/1]).
+         find_one/3, find_one/4]).
+
+-export([insert/3, update/4, update/5, delete/2, delete/3]).
+
+-export([ensure_index/3, count/2, count/3, distinct/3, distinct/4]).
+
+-export([dec2hex/1, hex2dec/1]).
+
+-export([sequence/2, synchronous/0, no_response/0,
+         find_all_seq/3, fold_all_seq/5,
+         insert_seq/3, update_seq/5, delete_seq/3]).
 
 -export([update_sync/5, delete_sync/3]).
 
+-deprecated([update_sync/5, delete_sync/3]).
+
+%% internal
+-export([start_link/0, init/1, handle_call/3, handle_cast/2,
+         handle_info/2, terminate/2, code_change/3]).
+
 -include("emongo.hrl").
 
 -record(state, {oid_index, hashed_hostn}).
 %% Description: Starts the server
 %%--------------------------------------------------------------------
 start_link() ->
-	gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
 
 pools() ->
     emongo_sup:pools().
 
 oid() ->
-	gen_server:call(?MODULE, oid, infinity).
+    gen_server:call(?MODULE, oid, infinity).
 
 add_pool(PoolId, Host, Port, Database, Size) ->
     emongo_sup:start_pool(PoolId, Host, Port, Database, Size).
 del_pool(PoolId) ->
     emongo_sup:stop_pool(PoolId).
 
+
 %%------------------------------------------------------------------------------
-%% find
+%% sequences of operations
 %%------------------------------------------------------------------------------
-%% find(PoolId, Collection) ->
-%% 	find(PoolId, Collection, [], [{timeout, ?TIMEOUT}]).
 
-%% find(PoolId, Collection, Selector) when ?IS_DOCUMENT(Selector) ->
-%% 	find(PoolId, Collection, Selector, [{timeout, ?TIMEOUT}]).
+sequence(PoolId, Sequence) ->
+    Len = length(Sequence),
+    {Pid, Database, ReqId} = get_pid_pool(PoolId, Len),
+    lists:foldl(fun(Operation, NewReqId) ->
+                        Operation(Pid, Database, NewReqId)
+                end, ReqId, Sequence).
+
+
+synchronous() ->
+    synchronous(?TIMEOUT).
+
+synchronous(Timeout) ->
+    [fun(_, _, ReqId) -> ReqId end,
+     fun(Pid, Database, ReqId) ->
+             PacketGetLastError = emongo_packet:get_last_error(Database, ReqId),
+             emongo_server:send_recv(Pid, ReqId, PacketGetLastError, Timeout)
+     end].
+
+no_response() ->
+    [].
+
 
 %% @spec find(PoolId, Collection, Selector, Options) -> Result
-%%		 PoolId = atom()
-%%		 Collection = string()
-%%		 Selector = document()
-%%		 Options = [Option]
-%%		 Option = {timeout, Timeout} | {limit, Limit} | {offset, Offset} | {orderby, Orderby} | {fields, Fields} | response_options
-%%		 Timeout = integer (timeout in milliseconds)
-%%		 Limit = integer
-%%		 Offset = integer
-%%		 Orderby = [{Key, Direction}]
-%%		 Key = string() | binary() | atom() | integer()
-%%		 Direction = asc | desc
-%%		 Fields = [Field]
-%%		 Field = string() | binary() | atom() | integer() = specifies a field to return in the result set
-%%		 response_options = return {response, header, response_flag, cursor_id, offset, limit, documents}
-%%		 Result = documents() | response()
-%% find(PoolId, Collection, Selector, Options) ->
-%% 	{Pid, Database, ReqId} = get_pid_pool(PoolId),
-%% 	Query = create_query(Options, Selector),
-%% 	Packet = emongo_packet:do_query(Database, Collection, ReqId, Query),
-%% 	Timeout = proplists:get_value(timeout, Options, ?TIMEOUT),
-%% 	% TODO: generalize this for all send_recv calls
-%% 	try emongo_server:send_recv(Pid, ReqId, Packet, Timeout) of
-%% 		Resp ->
-%% 			case lists:member(response_options, Options) of
-%% 				true -> Resp;
-%% 				false -> Resp#response.documents
-%% 			end
-%% 	catch
-%% 		exit:{timeout, Reason} ->
-%%                 %% force restart of affected emongo_server and try again with a new
-%%                 %% emongo_server pid
-%% 			error_logger:warning_report([{timeout, Pid}, {reason, Reason}]),
-%% 			exit(Pid, Reason),
-%% 			find(PoolId, Collection, Selector, Options)
-%% 	end.
+%%   PoolId = atom()
+%%   Collection = string()
+%%   Selector = document()
+%%   Options = [Option]
+%%   Option = {timeout, Timeout} | {limit, Limit} | {offset, Offset} | {orderby, Orderby} | {fields, Fields} | response_options
+%%   Timeout = integer (timeout in milliseconds)
+%%   Limit = integer
+%%   Offset = integer
+%%   Orderby = [{Key, Direction}]
+%%   Key = string() | binary() | atom() | integer()
+%%   Direction = asc | desc
+%%   Fields = [Field]
+%%   Field = string() | binary() | atom() | integer() = specifies a field to return in the result set
+%%   response_options = return {response, header, response_flag, cursor_id, offset, limit, documents}
+%%   Result = documents() | response()
 
 %%------------------------------------------------------------------------------
 %% find_all
 %%------------------------------------------------------------------------------
-fold_all(F, Value, PoolId, Collection, Selector, Options) ->
-    Timeout = proplists:get_value(timeout, Options, ?TIMEOUT),
-    Resp = find_start(PoolId, Collection, Selector, Options, Timeout),
-    NewValue = lists:foldl(F, Value, Resp#response.documents),
-    
-    fold_more(F, NewValue, Collection, Resp#response{documents=[]}, Timeout).
-
-
 find_all(PoolId, Collection) ->
-	find_all(PoolId, Collection, [], []).
+    find_all(PoolId, Collection, [], []).
 
 find_all(PoolId, Collection, Selector) ->
-	find_all(PoolId, Collection, Selector, []).
+    find_all(PoolId, Collection, Selector, []).
 
 find_all(PoolId, Collection, Selector, Options) ->
-    fold_all(fun(I, A) -> [I | A] end, [],
-             PoolId, Collection, Selector, Options).
+    sequence(PoolId, find_all_seq(Collection, Selector, Options)).
 
 
-find_start(PoolId, Collection, Selector, Options, Timeout) ->
-    {Pid, Database, ReqId} = get_pid_pool(PoolId),
+find_all_seq(Collection, Selector, Options) ->
+    fold_all_seq(fun(I, A) -> [I | A] end, [], Collection, Selector, Options).
+
+%%------------------------------------------------------------------------------
+%% fold_all
+%%------------------------------------------------------------------------------
+fold_all(F, Value, PoolId, Collection, Selector, Options) ->
+    sequence(PoolId, fold_all_seq(F, Value, Collection, Selector, Options)).
+
+
+fold_all_seq(F, Value, Collection, Selector, Options) ->
+    Timeout = proplists:get_value(timeout, Options, ?TIMEOUT),
     Query = create_query(Options, Selector),
-    Packet = emongo_packet:do_query(Database, Collection, ReqId, Query),
-    
-    emongo_server:send_recv(Pid, ReqId, Packet, Timeout).
+    [fun(_, _, ReqId) -> ReqId end,
+     fun(Pid, Database, ReqId) ->
+             Packet = emongo_packet:do_query(Database, Collection, ReqId, Query),
+
+             Resp = emongo_server:send_recv(Pid, ReqId, Packet, Timeout),
+
+             NewValue = lists:foldl(F, Value, Resp#response.documents),
+             fold_more(F, NewValue, Collection, Resp#response{documents=[]}, Timeout)
+     end].
 
 
 fold_more(_F, Value, _Collection, #response{cursor_id=0}, _Timeout) ->
     Value;
 
 fold_more(F, Value, Collection, #response{pool_id=PoolId, cursor_id=CursorID}, Timeout) ->
-    {Pid, Database, ReqId} = get_pid_pool(PoolId),
+    {Pid, Database, ReqId} = get_pid_pool(PoolId, 2),
     Packet = emongo_packet:get_more(Database, Collection, ReqId, 0, CursorID),
     Resp1 = emongo_server:send_recv(Pid, ReqId, Packet, Timeout),
-    
+
     NewValue = lists:foldl(F, Value, Resp1#response.documents),
     fold_more(F, NewValue, Collection, Resp1#response{documents=[]}, Timeout).
 
 %% find_one
 %%------------------------------------------------------------------------------
 find_one(PoolId, Collection, Selector) ->
-	find_one(PoolId, Collection, Selector, []).
+    find_one(PoolId, Collection, Selector, []).
 
 find_one(PoolId, Collection, Selector, Options) ->
-	Options1 = [{limit, 1} | lists:keydelete(limit, 1, Options)],
-	find_all(PoolId, Collection, Selector, Options1).
-
-%%------------------------------------------------------------------------------
-%% get_more
-%%------------------------------------------------------------------------------
-%% get_more(PoolId, Collection, CursorID, Timeout) ->
-%% 	get_more(PoolId, Collection, CursorID, 0, Timeout).
-
-%% get_more(PoolId, Collection, CursorID, NumToReturn, Timeout) ->
-
-%% kill_cursors(PoolId, CursorID) when is_integer(CursorID) ->
-%% 	kill_cursors(PoolId, [CursorID]);
-
-%% kill_cursors(PoolId, CursorIDs) when is_list(CursorIDs) ->
-%% 	{Pid, _Database, ReqId} = get_pid_pool(PoolId),
-%% 	Packet = emongo_packet:kill_cursors(ReqId, CursorIDs),
-%% 	emongo_server:send(Pid, ReqId, Packet).
+    Options1 = [{limit, 1} | lists:keydelete(limit, 1, Options)],
+    find_all(PoolId, Collection, Selector, Options1).
 
 %%------------------------------------------------------------------------------
 %% insert
 %%------------------------------------------------------------------------------
-insert(PoolId, Collection, [[_|_]|_]=Documents) ->
-	{Pid, Database, ReqId} = get_pid_pool(PoolId),
-	Packet = emongo_packet:insert(Database, Collection, ReqId, Documents),
-	emongo_server:send(Pid, ReqId, Packet);
+insert(PoolId, Collection, Documents) ->
+    sequence(PoolId, insert_seq(Collection, Documents, no_response())).
 
-insert(PoolId, Collection, Document) ->
-	insert(PoolId, Collection, [Document]).
-
+insert_seq(Collection, [[_|_]|_]=Documents, Next) ->
+    [fun(Pid, Database, ReqId) ->
+             Packet = emongo_packet:insert(Database, Collection, ReqId, Documents),
+             emongo_server:send(Pid, Packet),
+             ReqId + 1
+     end | Next];
+insert_seq(Collection, Document, Next) ->
+    insert_seq(Collection, [Document], Next).
 
 %%------------------------------------------------------------------------------
 %% update
 %%------------------------------------------------------------------------------
 update(PoolId, Collection, Selector, Document) ->
-	update(PoolId, Collection, Selector, Document, false).
+    update(PoolId, Collection, Selector, Document, false).
 
 update(PoolId, Collection, Selector, Document, Upsert) ->
-	{Pid, Database, ReqId} = get_pid_pool(PoolId),
-	Packet = emongo_packet:update(Database, Collection, ReqId, Upsert, Selector, Document),
-	emongo_server:send(Pid, ReqId, Packet).
+    sequence(PoolId, update_seq(Collection, Selector, Document, Upsert, no_response())).
+
+
+update_seq(Collection, Selector, Document, Upsert, Next) ->
+    [fun(Pid, Database, ReqId) ->
+             Packet = emongo_packet:update(Database, Collection, ReqId, Upsert, Selector, Document),
+             emongo_server:send(Pid, Packet),
+             ReqId + 1
+     end | Next].
+
 
 update_sync(PoolId, Collection, Selector, Document, Upsert) ->
-    {Pid, Database, ReqId} = get_pid_pool(PoolId),
-    Packet = emongo_packet:update(Database, Collection, ReqId, Upsert, Selector, Document),
-    do_sync(Packet, Database, ReqId, Pid).
-
-do_sync(Packet, Database, ReqId, Pid) ->
-    PacketGetLastError = emongo_packet:get_last_error(Database, ReqId),
-    emongo_server:send_recv(Pid, ReqId, [Packet, PacketGetLastError], ?TIMEOUT).
-
+    sequence(PoolId, update_seq(Collection, Selector, Document, Upsert, synchronous())).
 
 %%------------------------------------------------------------------------------
 %% delete
 %%------------------------------------------------------------------------------
 delete(PoolId, Collection) ->
-	delete(PoolId, Collection, []).
+    delete(PoolId, Collection, []).
 
 delete(PoolId, Collection, Selector) ->
-	{Pid, Database, ReqId} = get_pid_pool(PoolId),
-	Packet = emongo_packet:delete(Database, Collection, ReqId, transform_selector(Selector)),
-	emongo_server:send(Pid, ReqId, Packet).
+    sequence(PoolId, delete_seq(Collection, Selector, no_response())).
+
+
+delete_seq(Collection, Selector, Next) ->
+    [fun(Pid, Database, ReqId) ->
+             Packet = emongo_packet:delete(Database, Collection, ReqId, transform_selector(Selector)),
+             emongo_server:send(Pid, Packet),
+             ReqId + 1
+     end | Next].
+
 
 delete_sync(PoolId, Collection, Selector) ->
-    {Pid, Database, ReqId} = get_pid_pool(PoolId),
-    Packet = emongo_packet:delete(Database, Collection, ReqId, transform_selector(Selector)),
-    do_sync(Packet, Database, ReqId, Pid).
+    sequence(PoolId, delete_seq(Collection, Selector, synchronous())).
 
 
 %%------------------------------------------------------------------------------
 %% ensure index
 %%------------------------------------------------------------------------------
 ensure_index(PoolId, Collection, Keys) ->
-	{Pid, Database, ReqId} = get_pid_pool(PoolId),
-	Packet = emongo_packet:ensure_index(Database, Collection, ReqId, Keys),
-	emongo_server:send(Pid, ReqId, Packet).
+    {Pid, Database, ReqId} = get_pid_pool(PoolId, 1),
+    Packet = emongo_packet:ensure_index(Database, Collection, ReqId, Keys),
+    emongo_server:send(Pid, Packet).
 
 
 count(PoolId, Collection) -> count(PoolId, Collection, []).
 
 
 count(PoolId, Collection, Selector) ->
-	{Pid, Database, ReqId} = get_pid_pool(PoolId),
-	Q = [{<<"count">>, Collection}, {<<"ns">>, Database},
-		 {<<"query">>, transform_selector(Selector)}],
-	Query = #emo_query{q=Q, limit=1},
-	Packet = emongo_packet:do_query(Database, "$cmd", ReqId, Query),
-	case emongo_server:send_recv(Pid, ReqId, Packet, ?TIMEOUT) of
-		#response{documents=[[{<<"n">>,Count}|_]]} ->
-			round(Count);
-		_ ->
-			undefined
-	end.
+    {Pid, Database, ReqId} = get_pid_pool(PoolId, 2),
+    Q = [{<<"count">>, Collection}, {<<"ns">>, Database},
+         {<<"query">>, transform_selector(Selector)}],
+    Query = #emo_query{q=Q, limit=1},
+    Packet = emongo_packet:do_query(Database, "$cmd", ReqId, Query),
+    case emongo_server:send_recv(Pid, ReqId, Packet, ?TIMEOUT) of
+        #response{documents=[[{<<"n">>,Count}|_]]} ->
+            round(Count);
+        _ ->
+            undefined
+    end.
 
 
 distinct(PoolId, Collection, Key) -> distinct(PoolId, Collection, Key, []).
 
 distinct(PoolId, Collection, Key, Selector) ->
-	{Pid, Database, ReqId} = get_pid_pool(PoolId),
-	Q = [{<<"distinct">>, Collection}, {<<"key">>, Key}, {<<"ns">>, Database},
-		 {<<"query">>, transform_selector(Selector)}],
-	Query = #emo_query{q=Q, limit=1},
-	Packet = emongo_packet:do_query(Database, "$cmd", ReqId, Query),
-	case emongo_server:send_recv(Pid, ReqId, Packet, ?TIMEOUT) of
-		#response{documents=[[{<<"values">>, {array, Vals}} | _]]} ->
-			Vals;
-		_ ->
-			undefined
-	end.
+    {Pid, Database, ReqId} = get_pid_pool(PoolId, 2),
+    Q = [{<<"distinct">>, Collection}, {<<"key">>, Key}, {<<"ns">>, Database},
+         {<<"query">>, transform_selector(Selector)}],
+    Query = #emo_query{q=Q, limit=1},
+    Packet = emongo_packet:do_query(Database, "$cmd", ReqId, Query),
+    case emongo_server:send_recv(Pid, ReqId, Packet, ?TIMEOUT) of
+        #response{documents=[[{<<"values">>, {array, Vals}} | _]]} ->
+            Vals;
+        _ ->
+            undefined
+    end.
 
 
-%drop_collection(PoolId, Collection) when is_atom(PoolId), is_list(Collection) ->
 
 %%====================================================================
 %% gen_server callbacks
 %% Description: Initiates the server
 %%--------------------------------------------------------------------
 init(_) ->
-	{ok, HN} = inet:gethostname(),
-	<<HashedHN:3/binary,_/binary>> = erlang:md5(HN),
-	{ok, #state{oid_index=1, hashed_hostn=HashedHN}}.
+    {ok, HN} = inet:gethostname(),
+    <<HashedHN:3/binary,_/binary>> = erlang:md5(HN),
+    {ok, #state{oid_index=1, hashed_hostn=HashedHN}}.
 
 %%--------------------------------------------------------------------
 %% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
 %% Description: Handling call messages
 %%--------------------------------------------------------------------
 handle_call(oid, _From, State) ->
-	{Total_Wallclock_Time, _} = erlang:statistics(wall_clock),
-	Front = Total_Wallclock_Time rem 16#ffffffff,
-	<<_:20/binary,PID:2/binary,_/binary>> = term_to_binary(self()),
-	Index = State#state.oid_index rem 16#ffffff,
-	{reply, <<Front:32, (State#state.hashed_hostn)/binary, PID/binary, Index:24>>, State#state{oid_index = State#state.oid_index + 1}};
+    {Total_Wallclock_Time, _} = erlang:statistics(wall_clock),
+    Front = Total_Wallclock_Time rem 16#ffffffff,
+    <<_:20/binary,PID:2/binary,_/binary>> = term_to_binary(self()),
+    Index = State#state.oid_index rem 16#ffffff,
+    {reply, <<Front:32, (State#state.hashed_hostn)/binary, PID/binary, Index:24>>, State#state{oid_index = State#state.oid_index + 1}};
 
 handle_call(_, _From, State) -> {reply, {error, invalid_call}, State}.
 
 %%--------------------------------------------------------------------
 %%% Internal functions
 %%--------------------------------------------------------------------
-get_pid_pool(PoolId) ->
-    emongo_sup:worker_pid(PoolId, emongo_sup:pools()).
+get_pid_pool(PoolId, RequestCount) ->
+    emongo_sup:worker_pid(PoolId, emongo_sup:pools(), RequestCount).
 
 dec2hex(Dec) ->
-	dec2hex(<<>>, Dec).
+    dec2hex(<<>>, Dec).
 
 dec2hex(N, <<I:8,Rem/binary>>) ->
-	dec2hex(<<N/binary, (hex0((I band 16#f0) bsr 4)):8, (hex0((I band 16#0f))):8>>, Rem);
+    dec2hex(<<N/binary, (hex0((I band 16#f0) bsr 4)):8, (hex0((I band 16#0f))):8>>, Rem);
 dec2hex(N,<<>>) ->
-	N.
+    N.
 
 hex2dec(Hex) when is_list(Hex) ->
-	hex2dec(list_to_binary(Hex));
+    hex2dec(list_to_binary(Hex));
 
 hex2dec(Hex) ->
-	hex2dec(<<>>, Hex).
+    hex2dec(<<>>, Hex).
 
 hex2dec(N,<<A:8,B:8,Rem/binary>>) ->
-	hex2dec(<<N/binary, ((dec0(A) bsl 4) + dec0(B)):8>>, Rem);
+    hex2dec(<<N/binary, ((dec0(A) bsl 4) + dec0(B)):8>>, Rem);
 hex2dec(N,<<>>) ->
-	N.
+    N.
 
 create_query(Options, Selector) ->
-	Selector1 = transform_selector(Selector),
-	create_query(Options, #emo_query{}, Selector1, []).
+    Selector1 = transform_selector(Selector),
+    create_query(Options, #emo_query{}, Selector1, []).
 
 create_query([], QueryRec, QueryDoc, []) ->
-	QueryRec#emo_query{q=QueryDoc};
+    QueryRec#emo_query{q=QueryDoc};
 
 create_query([], QueryRec, [], OptDoc) ->
-	QueryRec#emo_query{q=OptDoc};
+    QueryRec#emo_query{q=OptDoc};
 
 create_query([], QueryRec, QueryDoc, OptDoc) ->
-	QueryRec#emo_query{q=(OptDoc ++ [{<<"query">>, QueryDoc}])};
+    QueryRec#emo_query{q=(OptDoc ++ [{<<"query">>, QueryDoc}])};
 
 create_query([{limit, Limit}|Options], QueryRec, QueryDoc, OptDoc) ->
-	QueryRec1 = QueryRec#emo_query{limit=Limit},
-	create_query(Options, QueryRec1, QueryDoc, OptDoc);
+    QueryRec1 = QueryRec#emo_query{limit=Limit},
+    create_query(Options, QueryRec1, QueryDoc, OptDoc);
 
 create_query([{offset, Offset}|Options], QueryRec, QueryDoc, OptDoc) ->
-	QueryRec1 = QueryRec#emo_query{offset=Offset},
-	create_query(Options, QueryRec1, QueryDoc, OptDoc);
+    QueryRec1 = QueryRec#emo_query{offset=Offset},
+    create_query(Options, QueryRec1, QueryDoc, OptDoc);
 
 create_query([{orderby, Orderby}|Options], QueryRec, QueryDoc, OptDoc) ->
-	Orderby1 = [{Key, case Dir of desc -> -1; _ -> 1 end}|| {Key, Dir} <- Orderby],
-	OptDoc1 = [{<<"orderby">>, Orderby1}|OptDoc],
-	create_query(Options, QueryRec, QueryDoc, OptDoc1);
+    Orderby1 = [{Key, case Dir of desc -> -1; _ -> 1 end}|| {Key, Dir} <- Orderby],
+    OptDoc1 = [{<<"orderby">>, Orderby1}|OptDoc],
+    create_query(Options, QueryRec, QueryDoc, OptDoc1);
 
 create_query([{fields, Fields}|Options], QueryRec, QueryDoc, OptDoc) ->
-	QueryRec1 = QueryRec#emo_query{field_selector=[{Field, 1} || Field <- Fields]},
-	create_query(Options, QueryRec1, QueryDoc, OptDoc);
+    QueryRec1 = QueryRec#emo_query{field_selector=[{Field, 1} || Field <- Fields]},
+    create_query(Options, QueryRec1, QueryDoc, OptDoc);
 
 create_query([_|Options], QueryRec, QueryDoc, OptDoc) ->
-	create_query(Options, QueryRec, QueryDoc, OptDoc).
+    create_query(Options, QueryRec, QueryDoc, OptDoc).
 
 transform_selector(Selector) ->
-	transform_selector(Selector, []).
+    transform_selector(Selector, []).
 
 transform_selector([], Acc) ->
-	lists:reverse(Acc);
+    lists:reverse(Acc);
 
 transform_selector([{where, Val}|Tail], Acc) when is_list(Val) ->
-	transform_selector(Tail, [{<<"$where">>, Val}|Acc]);
+    transform_selector(Tail, [{<<"$where">>, Val}|Acc]);
 
 transform_selector([{Key, [{_,_}|_]=Vals}|Tail], Acc) ->
-	Vals1 =
-		[case Operator of
-			O when O == '>'; O == gt ->
-				{<<"$gt">>, Val};
-			O when O == '<'; O == lt ->
-				{<<"$lt">>, Val};
-			O when O == '>='; O == gte ->
-				{<<"$gte">>, Val};
-			O when O == '=<'; O == lte ->
-				{<<"$lte">>, Val};
-			O when O == '=/='; O == '/='; O == ne ->
-				{<<"$ne">>, Val};
-			in when is_list(Val) ->
-				{<<"$in">>, {array, Val}};
-			nin when is_list(Val) ->
-				{<<"$nin">>, {array, Val}};
-			mod when is_list(Val), length(Val) == 2 ->
-				{<<"$mod">>, {array, Val}};
-			all when is_list(Val) ->
-				{<<"$all">>, {array, Val}};
-			size when is_integer(Val) ->
-				{<<"$size">>, Val};
-			exists when is_boolean(Val) ->
-				{<<"$exists">>, Val};
-			_ ->
-				{Operator, Val}
-		 end || {Operator, Val} <- Vals],
-	transform_selector(Tail, [{Key, Vals1}|Acc]);
+    Vals1 =
+        [case Operator of
+             O when O == '>'; O == gt ->
+                 {<<"$gt">>, Val};
+             O when O == '<'; O == lt ->
+                 {<<"$lt">>, Val};
+             O when O == '>='; O == gte ->
+                 {<<"$gte">>, Val};
+             O when O == '=<'; O == lte ->
+                 {<<"$lte">>, Val};
+             O when O == '=/='; O == '/='; O == ne ->
+                 {<<"$ne">>, Val};
+             in when is_list(Val) ->
+                 {<<"$in">>, {array, Val}};
+             nin when is_list(Val) ->
+                 {<<"$nin">>, {array, Val}};
+             mod when is_list(Val), length(Val) == 2 ->
+                 {<<"$mod">>, {array, Val}};
+             all when is_list(Val) ->
+                 {<<"$all">>, {array, Val}};
+             size when is_integer(Val) ->
+                 {<<"$size">>, Val};
+             exists when is_boolean(Val) ->
+                 {<<"$exists">>, Val};
+             _ ->
+                 {Operator, Val}
+         end || {Operator, Val} <- Vals],
+    transform_selector(Tail, [{Key, Vals1}|Acc]);
 
 transform_selector([Other|Tail], Acc) ->
-	transform_selector(Tail, [Other|Acc]).
+    transform_selector(Tail, [Other|Acc]).
 
-dec0($a) ->	10;
-dec0($b) ->	11;
-dec0($c) ->	12;
-dec0($d) ->	13;
-dec0($e) ->	14;
-dec0($f) ->	15;
-dec0(X) ->	X - $0.
+dec0($a) -> 10;
+dec0($b) -> 11;
+dec0($c) -> 12;
+dec0($d) -> 13;
+dec0($e) -> 14;
+dec0($f) -> 15;
+dec0(X) -> X - $0.
 
 hex0(10) -> $a;
 hex0(11) -> $b;

File src/emongo_bson.erl

 	{Int, Tail};
 	
 decode_value(_, _) ->
-	exit(oh_fuck).
+	exit(unknown_type).

File src/emongo_collection.erl

-%% @doc Collection utility functions
-%% @author Jacob Perkins <japerk@gmail.com>
--module(emongo_collection).
-
--include("emongo.hrl").
-
--export([distinct_set/4, distinct_list/4]).
--export([fold/4, fold/5, fold/6]).
--export([foreach/3, foreach/4, foreach/5]).
-
-%%%%%%%%%%%%%%
-%% distinct %%
-%%%%%%%%%%%%%%
-
-distinct_set(PoolId, Collection, Key, Selector) ->
-	F = fun({array, List}, Set) -> sets:union(Set, sets:from_list(List)) end,
-	Arrays = emongo:distinct(PoolId, Collection, Key, Selector),
-	lists:foldl(F, sets:new(), Arrays).
-
-distinct_list(PoolId, Collection, Key, Selector) ->
-	lists:sort(sets:to_list(distinct_set(PoolId, Collection, Key, Selector))).
-
-%%%%%%%%%%
-%% fold %%
-%%%%%%%%%%
-
-fold(F, Acc0, PoolId, Collection) ->
-	fold(F, Acc0, PoolId, Collection, [], [{timeout, ?TIMEOUT}]).
-
-fold(F, Acc0, PoolId, Collection, Selector) ->
-	fold(F, Acc0, PoolId, Collection, Selector, []).
-
-fold(F, Acc0, PoolId, Collection, Selector, Options) ->
-    Documents = emongo:find_all(PoolId, Collection, Selector, Options),
-    lists:foldl(F, Acc0, Documents).
-
-
-%%%%%%%%%%%%%
-%% foreach %%
-%%%%%%%%%%%%%
-
-foreach(F, PoolId, Collection) ->
-	foreach(F, PoolId, Collection, [], []).
-
-foreach(F, PoolId, Collection, Selector) ->
-	foreach(F, PoolId, Collection, Selector, []).
-
-foreach(F, PoolId, Collection, Selector, Options) ->
-    Documents = emongo:find_all(PoolId, Collection, Selector, Options),
-    lists:foreach(F, Documents).

File src/emongo_pool.erl

 -behaviour(gen_server).
 
 %% API
--export([start_link/5, pid/1]).
+-export([start_link/5, pid/1, pid/2]).
+
+-deprecated([pid/1]).
 
 %% gen_server callbacks
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
                size,
                active=false,
                poll=none,
-               conn_pid=queue:new(),
+               conn_pid=pqueue:new(),
                req_id=1}).
 
+%% messages
+-define(pid(RequestCount), {pid, RequestCount}).
+-define(poll(), poll).
+-define(poll_timeout(Pid, ReqId, Tag), {poll_timeout, Pid, ReqId, Tag}).
+
+%% to be removed next release
+-define(old_pid(), pid).
+
+
 %%%%%%%%%%%%%%%%
 %% public api %%
 %%%%%%%%%%%%%%%%
 pid(Pid) ->
     gen_server:call(Pid, pid).
 
+pid(Pid, RequestCount) ->
+    gen_server:call(Pid, {pid, RequestCount}).
+
 %%%%%%%%%%%%%%%%%%%%%%%%%%
 %% gen_server callbacks %%
 %%%%%%%%%%%%%%%%%%%%%%%%%%
                  size = Size
                 },
     
-    {noreply, Pool} = handle_info(poll, Pool0),
+    {noreply, Pool} = handle_info(?poll(), Pool0),
     {ok, Pool}.
 
 %%--------------------------------------------------------------------
 %%                                      {stop, Reason, State}
 %% Description: Handling call messages
 %%--------------------------------------------------------------------
-handle_call(pid, _From, #pool{active=true}=State) ->
-    {Reply, NewState} = get_pid(State),
+handle_call(?old_pid(), _From, #pool{active=true}=State) ->
+    {Reply, NewState} = get_pid(State, 1),
+    {reply, Reply, NewState};
+
+handle_call(?pid(RequestCount), _From, #pool{active=true}=State) ->
+    {Reply, NewState} = get_pid(State, RequestCount),
     {reply, Reply, NewState};
 
 handle_call(_Request, _From, State) ->
     error_logger:error_msg("Pool ~p deactivated by worker death: ~p~n",
                            [State#pool.id, Reason]),
     
-    Pids1 = queue:filter(fun(Item) -> Item =/= Pid end, Pids),
+    Pids1 = pqueue:filter(fun(Item) -> Item =/= Pid end, Pids),
     {noreply, State#pool{conn_pid = Pids1, active=false}};
 
-handle_info(poll, State) ->
+handle_info(?poll(), State) ->
     erlang:send_after(?POLL_INTERVAL, self(), poll),
     NewState = do_open_connections(State),
     {noreply, NewState};
 
-handle_info({poll_timeout, Pid, ReqId, Tag}, #pool{poll={Tag, _}}=State) ->
+handle_info(?poll_timeout(Pid, ReqId, Tag), #pool{poll={Tag, _}}=State) ->
     case catch emongo_server:recv(Pid, ReqId, 0, Tag) of
         #response{} ->
             {noreply, State#pool{active=true, poll=none}};
 %% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
 %% Description: Convert process state when code is changed
 %%--------------------------------------------------------------------
-code_change(_OldVsn, State, _Extra) ->
-    {ok, State}.
+code_change(OldVsn, State, _Extra) ->
+    error_logger:info_msg("emongo_pool:code_change(~p, ...)~n", [OldVsn]),
+    
+    State1 = case queue:is_queue(State#pool.conn_pid) of
+                 false ->
+                     State;
+                 true ->
+                     State#pool{conn_pid = queue2pqueue(State#pool.conn_pid, pqueue:new())}
+             end,
+    
+    {ok, State1}.
 
 %%--------------------------------------------------------------------
 %%% Internal functions
 %%--------------------------------------------------------------------
+queue2pqueue(Queue, PQueue) ->
+    case queue:out(Queue) of
+        {empty, _} ->
+            PQueue;
+        {{value, Item}, NewQueue} ->
+            queue2pqueue(NewQueue, pqueue:push(1, Item, PQueue))
+    end.
 
-get_pid(#pool{database=Database, conn_pid=Pids, req_id=ReqId}=State) ->
-    case queue:out(Pids) of
-        {{value, Pid}, Q2} ->
-            NewState = State#pool{conn_pid=queue:in(Pid, Q2), req_id=(ReqId + 1)},
+get_pid(#pool{database=Database, conn_pid=Pids, req_id=ReqId}=State, RequestCount) ->
+    case pqueue:pop(Pids) of
+        {Pid, Q2} ->
+            NewState = State#pool{conn_pid=pqueue:push(RequestCount, Pid, Q2),
+                                  req_id=(ReqId + RequestCount)},
             {{Pid, Database, ReqId}, NewState};
-        {empty, _} ->
+        empty ->
             {undefined, State}
     end.
 
 do_open_connections(#pool{conn_pid=Pids, size=Size}=Pool) -> 
-    case queue:len(Pids) < Size of
+    case pqueue:size(Pids) < Size of
         true ->
             case emongo_server:start_link(Pool#pool.id, Pool#pool.host, Pool#pool.port) of
                 {error, _Reason} ->
                     Pool#pool{active=false};
                 {ok, Pid} ->
-                    do_open_connections(Pool#pool{conn_pid = queue:in(Pid, Pids)})
+                    do_open_connections(Pool#pool{conn_pid = pqueue:push(1, Pid, Pids)})
             end;
         false ->
             do_poll(Pool)
     end.
 
 do_poll(Pool) ->
-    case get_pid(Pool) of
+    case get_pid(Pool, 2) of
         {{Pid, Database, ReqId}, NewPool} ->
             PacketLast = emongo_packet:get_last_error(Database, ReqId),
             Tag = emongo_server:send_recv_nowait(Pid, ReqId, PacketLast),
-            TimerRef = erlang:send_after(?POLL_TIMEOUT, self(), {poll_timeout, Pid, ReqId, Tag}),
+            TimerRef = erlang:send_after(?POLL_TIMEOUT, self(), ?poll_timeout(Pid, ReqId, Tag)),
             NewPool#pool{poll={Tag, TimerRef}};
         _ ->
             Pool#pool{active=false}

File src/emongo_router.erl

 -behaviour(gen_server).
 
 %% API
--export([start_link/2, pid/1]).
+-export([start_link/2, pid/1, pid/2]).
+
+-deprecated([pid/1]).
 
 %% gen_server callbacks
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
 -define(POOL_ID(BalancerId, PoolIdx), {BalancerId, PoolIdx}).
 -define(RECHECK_TIME, 9500).
 
+%% messages
+-define(pid(RequestCount), {pid, RequestCount}).
+
+%% to be removed next release
+-define(old_pid(), pid).
+
 %%====================================================================
 %% API
 %%====================================================================
 start_link(BalId, Pools) ->
     gen_server:start_link(?MODULE, [BalId, Pools], []).
 
+
 pid(BalancerPid) ->
-    gen_server:call(BalancerPid, pid).
+    pid(BalancerPid, 1).
 
+
+pid(BalancerPid, RequestCount) ->
+    gen_server:call(BalancerPid, {pid, RequestCount}).
 %%====================================================================
 %% gen_server callbacks
 %%====================================================================
 %%                                      {stop, Reason, State}
 %% Description: Handling call messagesp
 %%--------------------------------------------------------------------
-handle_call(pid, _From, State) ->
-    {Pid, NewState} = get_pid(State, emongo_sup:pools()),
+handle_call(?old_pid(), _From, State) ->
+    {Pid, NewState} = get_pid(State, emongo_sup:pools(), 1),
+    {reply, Pid, NewState};
+
+handle_call(?pid(RequestCount), _From, State) ->
+    {Pid, NewState} = get_pid(State, emongo_sup:pools(), RequestCount),
     {reply, Pid, NewState};
 
 handle_call(stop_children, _, #state{id=BalId, active=Active, passive=Passive}=State) ->
 %%% Internal functions
 %%--------------------------------------------------------------------
 
-get_pid(#state{id=BalId, active=Active, passive=Passive, timer=Timer}=State, Pools) ->
+get_pid(#state{id=BalId, active=Active, passive=Passive, timer=Timer}=State, Pools, RequestCount) ->
     case Active of
         [PoolIdx | Active2] ->
-            case emongo_sup:worker_pid(?POOL_ID(BalId, PoolIdx), Pools) of
+            case emongo_sup:worker_pid(?POOL_ID(BalId, PoolIdx), Pools, RequestCount) of
                 undefined ->
                     error_logger:info_msg("pool ~p is disabled!~n", [?POOL_ID(BalId, PoolIdx)]),
                     
                     get_pid(State#state{active=Active2,
                                         passive=[PoolIdx | Passive],
                                         timer=set_timer(Timer)
-                                       }, Pools);
+                                       }, Pools, RequestCount);
                 Pid ->
                     {Pid, State}
             end;
     State#state{passive=Passive, timer=erlang:send_after(?RECHECK_TIME, self(), recheck)};
 
 activate(#state{id=BalId, active=Active, passive=[PoolIdx | Passive]}=State, Acc, Pools) ->
-    case emongo_sup:worker_pid(?POOL_ID(BalId, PoolIdx), Pools) of
+    case emongo_sup:worker_pid(?POOL_ID(BalId, PoolIdx), Pools, 0) of
         undefined ->
             activate(State#state{passive=Passive}, [PoolIdx | Acc], Pools);
         _ ->

File src/emongo_server.erl

 
 -include("emongo.hrl").
 
--export([start_link/3, send/3, send_recv/4]).
+-export([start_link/3]).
+
+-export([send/3, send/2, send_recv/4]).
 -export([send_recv_nowait/3, recv/4]).
 
+-deprecated([send/3]).
+
+%% gen_server
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
          terminate/2, code_change/3]).
 
 -record(state, {pool_id, socket, requests, leftover}).
 
+%% messages
 -define(abort(ReqId), {abort, ReqId}).
+-define(send(Packet), {send, Packet}).
+-define(send_recv(ReqId, Packet, From),
+        {send_recv, ReqID, Packet, From}).
+
+%% to be removed next release
+-define(old_send(ReqId, Packet), {send, ReqId, Packet}).
+
 
 start_link(PoolId, Host, Port) ->
     gen_server:start_link(?MODULE, [PoolId, Host, Port], []).
 
 
-send(Pid, ReqID, Packet) ->
-    gen_server:cast(Pid, {send, ReqID, Packet}).
+send(Pid, _ReqID, Packet) ->
+    send(Pid, Packet).
+
+send(Pid, Packet) ->
+    gen_server:cast(Pid, ?send(Packet)).
 
 
 send_recv_nowait(Pid, ReqID, Packet) ->
     Tag = make_ref(),
-    gen_server:cast(Pid, {send_recv, ReqID, Packet, {self(), Tag}}),
+    gen_server:cast(Pid, ?send_recv(ReqID, Packet, {self(), Tag})),
     Tag.
 
 
     {reply, undefined, State}.
 
 
-handle_cast({send_recv, ReqID, Packet, From}, State) ->
+handle_cast(?send_recv(ReqID, Packet, From), State) ->
     gen_tcp:send(State#state.socket, Packet),
     State1 = State#state{requests=[{ReqID, From} | State#state.requests]},
     {noreply, State1};
 
-handle_cast({send, _, Packet}, State) ->
+handle_cast(?old_send(_ReqId, Packet), State) ->
+    gen_tcp:send(State#state.socket, Packet),
+    {noreply, State};
+
+handle_cast(?send(Packet), State) ->
     gen_tcp:send(State#state.socket, Packet),
     {noreply, State}.
 

File src/emongo_sup.erl

 
 -behaviour(supervisor).
 
--export([start_link/0, start_pool/5, stop_pool/1, pools/0, worker_pid/2]).
+-export([start_link/0, pools/0, worker_pid/2, worker_pid/3]).
+-export([start_pool/5, stop_pool/1]).
 -export([start_router/2, stop_router/1]).
 
+-deprecated([worker_pid/2]).
+
 %% supervisor exports
 -export([init/1]).
 
                               <- supervisor:which_children(?MODULE)].
 
 worker_pid(PoolId, Pools) ->
+    worker_pid(PoolId, Pools, 1).
+
+
+worker_pid(PoolId, Pools, RequestCount) ->
     case lists:keyfind(PoolId, 1, Pools) of
         {_, Pid, Module} ->
-            Module:pid(Pid);
+            Module:pid(Pid, RequestCount);
         _ ->
             undefined
     end.

File src/pqueue.erl

+%% Copyright (c) 2010 Belyaev Dmitry <rumata-estor@nm.ru>
+%%
+%% Permission is hereby granted, free of charge, to any person obtaining a copy
+%% of this software and associated documentation files (the "Software"), to deal
+%% in the Software without restriction, including without limitation the rights
+%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+%% copies of the Software, and to permit persons to whom the Software is
+%% furnished to do so, subject to the following conditions:
+%%
+%% The above copyright notice and this permission notice shall be included in
+%% all copies or substantial portions of the Software.
+%%
+%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+%% THE SOFTWARE.
+
+%% Is pairing heap a better realization?
+-module(pqueue).
+
+-export([
+         new/0,
+         size/1,
+         push/3,
+         pop/1,
+         filter/2
+        ]).
+
+-type tail() :: {integer(), [term()]}.
+
+-record(pqueue, {size :: integer(),
+                 head :: [term()],
+                 tails :: [tail()]}).
+
+-spec new() -> #pqueue{}.
+new() ->
+    #pqueue{size = 0, head = [], tails = []}.
+
+
+-spec size(#pqueue{}) -> integer().
+size(#pqueue{size = Size}) ->
+    Size.
+
+
+-spec push(integer(), term(), #pqueue{}) -> #pqueue{}.
+push(Priority, Item, #pqueue{size = Size, head = Head, tails = Tails}) ->
+    #pqueue{size = Size + 1, head = Head, tails = push_item(Tails, Priority, Item)}.
+
+
+-spec pop(#pqueue{}) -> {term(), #pqueue{}} | empty.
+pop(#pqueue{size = Size, head = [Item | Head], tails = Tails}) ->
+    {Item, #pqueue{size = Size - 1, head = Head, tails = Tails}};
+
+pop(#pqueue{size = Size, head = [], tails = [{_, Tail} | Tails]}) ->
+    [Item | Head] = lists:reverse(Tail),
+    {Item, #pqueue{size = Size - 1, head = Head, tails = Tails}};
+
+pop(_) ->
+    empty.
+
+
+-spec filter(fun((term()) -> boolean()), #pqueue{}) -> #pqueue{}.
+filter(_, #pqueue{size = 0} = Q) ->
+    Q;
+filter(Fun, #pqueue{head = Head, tails = Tails}) ->
+    NewHead = [I || I <- Head, Fun(I)],
+    NewTails = filter_tails(Fun, Tails),
+    
+    NewSize = length(Head) + lists:sum([length(Items) || {_, Items} <- NewTails]),
+    #pqueue{size = NewSize, head = NewHead, tails = NewTails}.
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+-spec push_item([tail()], integer(), term()) -> [tail()].
+push_item([], Priority, Item) ->
+    [{Priority, [Item]}];
+
+push_item([{Priority, Items} | Tails], Priority, Item) ->
+    [{Priority, [Item | Items]} | Tails];
+
+push_item([{LowerP, _} = LowerI | Tails], Priority, Item)
+  when LowerP < Priority ->
+    [LowerI | push_item(Tails, Priority - LowerP, Item)];
+
+push_item([{HigherP, Items} | Tails], Priority, Item) ->
+    [{Priority, [Item]}, {HigherP - Priority, Items} | Tails].
+
+
+-spec filter_tails(fun((term()) -> boolean()), [tail()]) -> [tail()].
+filter_tails(_, []) ->
+    [];
+filter_tails(Fun, [{Priority, Items} | Tails]) ->
+    case [I || I <- Items, Fun(I)] of
+        [] ->
+            filter_tails(Fun, add_priority(Priority, Tails));
+        NewItems ->
+            [{Priority, NewItems} | filter_tails(Fun, Tails)]
+    end.
+
+
+-spec add_priority(integer(), [tail()]) -> [tail()].
+add_priority(_, []) ->
+    [];
+add_priority(Priority, [{NextP, Items} | Tails]) ->
+    [{NextP + Priority, Items} | Tails].

File t/pqueue_test.erl

+-module(pqueue_test).
+
+-compile(export_all).
+
+test(Data, F) ->
+    {Time1, Sorted1} = timer:tc(lists, sort, [Data]),
+    {Time21, Pushed1} = timer:tc(lists, foldl, [fun({P, I}, Q) ->
+                                                        pqueue:push(P, I, Q)
+                                                end, pqueue:new(), Data]),
+    
+    Sorted = [I || {_, V} = I <- Sorted1, F(V)],
+    Pushed = pqueue:filter(F, Pushed1),
+    
+    {Time22, _} = timer:tc(?MODULE, pop_items, [Pushed]),
+    Time2 = Time21 + Time22,
+    
+    io:format("Time sorted: ~p~nTime pushed: ~p~nDivide: ~p~n", [Time1, Time2, Time1/Time2]),
+    
+    LenS = length(Sorted),
+    LenP = pqueue:size(Pushed),
+    
+    LenS = LenP,
+    io:format("Length: ~p~n", [LenS]),
+
+    lists:foldl(fun({_, I}, Q) ->
+                        {I, NQ} = pqueue:pop(Q),
+                        NQ
+                end, Pushed, Sorted).
+
+test() ->
+    Len = 10000,
+    Dev = 10,
+    Fun = fun(I) -> I rem 3 =/= 0 end,
+    
+    Data = [{random:uniform(Dev), I} || I <- lists:seq(1, Len)],
+    test(Data, Fun).
+
+test2() ->
+    Queue0 = push_items([{1, 1}, {1, 2}, {5, 3}, {2, 4}], pqueue:new()),
+    4 = pqueue:size(Queue0),
+    
+    Queue1 = ensure_items([1, 2, 4], Queue0),
+    1 = pqueue:size(Queue1),
+    
+    Queue2 = push_items([{2, 5}, {3, 6}], Queue1),
+    3 = pqueue:size(Queue2),
+    
+    Queue3 = ensure_items([5, 3, 6], Queue2),
+    
+    Queue3 = pqueue:new().
+    
+push_items(Data, Queue) ->
+    lists:foldl(fun({P, I}, Q) -> pqueue:push(P, I, Q) end,
+                Queue,
+                Data).
+
+ensure_items(Items, Queue) ->
+    lists:foldl(fun(I, Q) -> {I, NQ} = pqueue:pop(Q), NQ end,
+                Queue,
+                Items).
+
+pop_items(Queue) ->
+    case pqueue:is_empty(Queue) of
+        true ->
+            ok;
+        _ ->
+            {_Item, NewQueue} = pqueue:pop(Queue),
+            pop_items(NewQueue)
+    end.