Source

riak / src / riak_keys_fsm.erl

Diff from to

src/riak_keys_fsm.erl

 -module(riak_keys_fsm).
 -behaviour(gen_fsm).
 
--export([start/4]).
+-export([start/6]).
 -export([init/1, handle_event/3, handle_sync_event/4,
          handle_info/3, terminate/3, code_change/4]).
 -export([initialize/2,waiting_kl/2]).
 
 -record(state, {client :: {pid(), reference()},
-                keys :: [set()],
-                waiting :: [node()],
+                client_type :: atom(),
+                bloom :: term(),
+                vnodes_left :: [node()],
                 bucket :: riak_object:bucket(),
                 timeout :: pos_integer(),
-                endtime :: pos_integer(),
                 req_id :: pos_integer(),
                 ring :: riak_ring:riak_ring()
                }).
 
-start(ReqId,Bucket,Timeout,From) ->
-    gen_fsm:start(?MODULE, [ReqId,Bucket,Timeout,From], []).
+start(ReqId,Bucket,Timeout,ClientType,ErrorTolerance,From) ->
+    gen_fsm:start(?MODULE,
+                  [ReqId,Bucket,Timeout,ClientType,ErrorTolerance,From], []).
 
 %% @private
-init([ReqId,Bucket,Timeout,Client]) ->
+init([ReqId,Bucket,Timeout,ClientType,ErrorTolerance,Client]) ->
     {ok, Ring} = riak_ring_manager:get_my_ring(),
-    StateData = #state{client=Client, timeout=Timeout,
-                       req_id=ReqId, bucket=Bucket, ring=Ring},
+    Bloom = bloom:sbf(10000000,ErrorTolerance),
+    StateData = #state{client=Client, client_type=ClientType, timeout=Timeout,
+                       bloom=Bloom, req_id=ReqId, bucket=Bucket, ring=Ring},
     {ok,initialize,StateData,0}.
 
+ask_vn({Index,Node},ReqId,Msg) ->
+    gen_server:cast({riak_vnode_master, Node},
+                    {vnode_list_bucket,{Index,ReqId},Msg}).
+
 %% @private
 initialize(timeout, StateData0=#state{timeout=Timeout, req_id=ReqId,
                                       bucket=Bucket, ring=Ring}) ->
-    RealStartTime = riak_util:moment(),
-    riak_eventer:notify(riak_keys_fsm, keys_fsm_start,
-                        {ReqId, RealStartTime, Bucket}),
-    Msg = {self(), Bucket, ReqId},
-    NodeList = riak_ring:all_owners(Ring),
-    Asked = lists:foldl(
-              fun({Index, Node}, Acc) ->
-                      case net_adm:ping(Node) of
-                          pang -> Acc;
-                          pong ->
-                              gen_server:cast(
-                                {riak_vnode_master, Node},
-                                {vnode_list_bucket,{Index,ReqId},Msg}),
-                              [Index|Acc]
-                      end
-              end,
-              [],
-              NodeList),
-    StateData = StateData0#state{waiting=Asked, keys=[],
-                       endtime=Timeout+riak_util:moment()},
+    [FirstVN|VNodes] = riak_ring:all_owners(Ring),
+    ask_vn(FirstVN,ReqId,{self(), Bucket, ReqId}),
+    StateData = StateData0#state{vnodes_left=VNodes},
     {next_state, waiting_kl, StateData, Timeout}.
 
-waiting_kl({kl, Keys, Idx, ReqId},
-           StateData=#state{keys=Acc,waiting=Waiting,endtime=End}) ->
-    NewAcc = [sets:from_list(Keys)|Acc],
-    case lists:delete(Idx, Waiting) of
-        [] ->
-            riak_eventer:notify(riak_keys_fsm, finish, {ReqId, normal}),
-            respond(StateData#state.client,NewAcc,ReqId),
-            {stop, normal, StateData};
-        StillWaiting ->
-            {next_state, waiting_kl,
-             StateData#state{keys=NewAcc,
-                             waiting=StillWaiting},
-             End-riak_util:moment()}
-    end;
-waiting_kl(timeout, StateData=#state{keys=Acc,client=Client,req_id=ReqId}) ->
-    riak_eventer:notify(riak_keys_fsm, finish, {ReqId, timeout}),
-    respond(Client, Acc, ReqId),
-    {stop, normal, StateData}.
+waiting_kl({kl, Keys, _Idx, ReqId},
+           StateData=#state{vnodes_left=[],bucket=Bucket,
+            bloom=Bloom,req_id=ReqId,client=Client,client_type=ClientType}) ->
+    process_keys(Keys,Bucket,ClientType,Bloom,ReqId,Client),
+    case ClientType of
+        mapred -> gen_fsm:send_event(Client,input_done);
+        plain -> Client ! {ReqId, done}
+    end,
+    {stop,normal,StateData};
+waiting_kl({kl, Keys, _Idx, ReqId},
+           StateData=#state{vnodes_left=[FirstVN|VNodes],bloom=Bloom,
+                   req_id=ReqId,client=Client,timeout=Timeout,
+                            bucket=Bucket,client_type=ClientType}) ->
+    ask_vn(FirstVN,ReqId,{self(), Bucket, ReqId}),
+    {next_state, waiting_kl,
+     StateData#state{
+       bloom=process_keys(Keys,Bucket,ClientType,Bloom,ReqId,Client),
+       vnodes_left=VNodes},
+     Timeout}.
 
 %% @private
-respond(Client, KeyLists, ReqId) ->
-    Reply = sets:to_list(sets:union(KeyLists)),
-    Client ! {ReqId, {ok, Reply}},
-    Reply.
+process_keys(Keys,Bucket,ClientType,Bloom,ReqId,Client) ->
+    process_keys(Keys,Bucket,ClientType,Bloom,ReqId,Client,[]).
+%% @private
+process_keys([],Bucket,ClientType,Bloom,ReqId,Client,Acc) ->
+    case ClientType of
+        mapred -> gen_fsm:send_event(Client,{input,[{Bucket,K} || K <- Acc]});
+        plain -> Client ! {ReqId, {keys, Acc}}
+    end,
+    lists:foldl(fun(E,A) -> bloom:add(E,A) end, Bloom, Acc);
+process_keys([K|Rest],Bucket,ClientType,Bloom,ReqId,Client,Acc) ->
+    case bloom:member(K,Bloom) of
+        true -> process_keys(Rest,Bucket,ClientType,Bloom,ReqId,Client,Acc);
+        false -> process_keys(Rest,Bucket,ClientType,Bloom,ReqId,Client,[K|Acc])
+    end.
 
 %% @private
 handle_event(_Event, _StateName, StateData) ->