Commits

Anonymous committed a3bebee

allow streaming of keylists and map/red over whole buckets

With this commit, the key-listing of buckets is performed in an
incremental streaming fashion without ever collecting the entire list
in one memory location. This allows for the useful listing of much
larger buckets, especially if you use the streaming interface instead
of just the list_keys collector.

Also, the streaming keylist can be directed at a mapreduce job,
allowing mapreduce to be performed over a whole bucket in one go
instead of the more usual "targeted" mapreduce form. Please note that
some (planned) optimizations in the mapreduce FSMs are needed before
this will have acceptable performance on very large buckets.

A demonstration, first that the existing list_keys behavior is
unchanged and then of the bucket mapreduce:

1: {ok,C} = riak:local_client()
-> {ok,{riak_client,'riak1@127.0.0.1',<<4,180,80,101>>}}
2: lists:foldl(fun(E, C) ->
O = riak_object:new(<<"testbuck">>, <<E:32>>, E),
C:put(O, 3),
C
end,
C,
lists:seq(1, 1000))
-> {riak_client,'riak1@127.0.0.1',<<4,180,80,101>>}
3: {ok,L} = C:list_keys(<<"testbuck">>)
-> {ok,[...]}
4: length(L)
-> 1000
5: {ok,Res} =
C:mapred_bucket(<<"testbuck">>,
[riak_mapreduce:map_object_value(false),
riak_mapreduce:reduce_sum(true)])
-> {ok,[500500]}

If you wish to consume the stream of keys directly, see the new
stream_list_keys functions in riak_client.

  • Participants
  • Parent commits a5ffbbc

Comments (0)

Files changed (2)

src/riak_client.erl

 
 -export([mapred/2,mapred/3]).
 -export([mapred_stream/2,mapred_stream/3]).
+-export([mapred_bucket/2,mapred_bucket/3,mapred_bucket/4]).
+-export([mapred_bucket_stream/3,mapred_bucket_stream/4,mapred_bucket_stream/5]).
 -export([get/3,get/4]).
 -export([put/2,put/3,put/4]).
 -export([delete/3,delete/4]).
--export([list_keys/1,list_keys/2]).
+-export([list_keys/1,list_keys/2,list_keys/3]).
+-export([stream_list_keys/1,stream_list_keys/2,stream_list_keys/3,
+         stream_list_keys/4,stream_list_keys/5]).
 -export([filter_keys/2,filter_keys/3]).
 -export([list_buckets/0,list_buckets/1]).
 -export([set_bucket/2,get_bucket/1]).
 -export ([remove_event_handler/3]).
 %% @type default_timeout() = 15000
 -define(DEFAULT_TIMEOUT, 15000).
+-define(DEFAULT_ERRTOL, 0.00003).
 
 %% @spec mapred(Inputs :: list(),
 %%              Query :: [riak_mapreduce_fsm:mapred_queryterm()]) ->
                             [ReqId,Query,Timeout,ClientPid]),
     {ok, {ReqId, MR_FSM}}.
 
+mapred_bucket_stream(Bucket, Query, ClientPid) ->
+    mapred_bucket_stream(Bucket, Query, ClientPid, ?DEFAULT_TIMEOUT).
+
+mapred_bucket_stream(Bucket, Query, ClientPid, Timeout) ->
+    mapred_bucket_stream(Bucket, Query, ClientPid, Timeout, ?DEFAULT_ERRTOL).
+
+mapred_bucket_stream(Bucket, Query, ClientPid, Timeout, ErrorTolerance) ->
+    {ok,{MR_ReqId,MR_FSM}} = mapred_stream(Query,ClientPid,Timeout),
+    {ok,_Stream_ReqID} = stream_list_keys(Bucket, Timeout, ErrorTolerance,
+                                  MR_FSM, mapred),
+    {ok,MR_ReqId}.
+
+mapred_bucket(Bucket, Query) ->
+    mapred_bucket(Bucket, Query, ?DEFAULT_TIMEOUT).
+
+mapred_bucket(Bucket, Query, Timeout) ->
+    mapred_bucket(Bucket, Query, Timeout, ?DEFAULT_ERRTOL).
+
+mapred_bucket(Bucket, Query, Timeout, ErrorTolerance) ->
+    Me = self(),
+    {ok,MR_ReqId} = mapred_bucket_stream(Bucket, Query, Me,
+                                         Timeout, ErrorTolerance),
+    collect_mr_results(MR_ReqId, Timeout, []).
+
 %% @spec get(riak_object:bucket(), riak_object:key(), R :: integer()) ->
 %%       {ok, riak_object:riak_object()} |
 %%       {error, notfound} |
 %%      Key lists are updated asynchronously, so this may be slightly
 %%      out of date if called immediately after a put or delete.
 list_keys(Bucket, Timeout) -> 
+    list_keys(Bucket, Timeout, ?DEFAULT_ERRTOL).
+list_keys(Bucket, Timeout, ErrorTolerance) -> 
     Me = self(),
     ReqId = mk_reqid(),
-    spawn(Node, riak_keys_fsm, start, [ReqId,Bucket,Timeout,Me]),
-    wait_for_reqid(ReqId, Timeout).
+    spawn(Node, riak_keys_fsm, start,
+          [ReqId,Bucket,Timeout,plain,ErrorTolerance,Me]),
+    wait_for_listkeys(ReqId, Timeout).
+
+stream_list_keys(Bucket) -> 
+    stream_list_keys(Bucket, ?DEFAULT_TIMEOUT).
+
+stream_list_keys(Bucket, Timeout) -> 
+    stream_list_keys(Bucket, Timeout, ?DEFAULT_ERRTOL).
+
+stream_list_keys(Bucket, Timeout, ErrorTolerance) -> 
+    Me = self(),
+    stream_list_keys(Bucket, Timeout, ErrorTolerance, Me).
+
+stream_list_keys(Bucket, Timeout, ErrorTolerance, Client) -> 
+    stream_list_keys(Bucket, Timeout, ErrorTolerance, Client, plain).
+
+%% @spec stream_list_keys(riak_object:bucket(),
+%%                        TimeoutMillisecs :: integer(),
+%%                        ErrorTolerance :: float(),
+%%                        Client :: pid(),
+%%                        ClientType :: atom()) ->
+%%       {ok, ReqId :: term()}
+%% @doc List the keys known to be present in Bucket.
+%%      Key lists are updated asynchronously, so this may be slightly
+%%      out of date if called immediately after a put or delete.
+%%      The list will not be returned directly, but will be sent
+%%      to Client in a sequence of {ReqId, {keys,Keys}} messages
+%%      and a final {ReqId, done} message.
+%%      None of the Keys lists will be larger than the number of
+%%      keys in Bucket on any single vnode.
+%%      If ClientType is set to 'mapred' instead of 'plain', then the
+%%      messages will be sent in the form of a MR input stream.
+stream_list_keys(Bucket, Timeout, ErrorTolerance, Client, ClientType) -> 
+    ReqId = mk_reqid(),
+    spawn(Node, riak_keys_fsm, start,
+          [ReqId,Bucket,Timeout,ClientType,ErrorTolerance,Client]),
+    {ok, ReqId}.
 
 %% @spec filter_keys(riak_object:bucket(), Fun :: function()) ->
 %%       {ok, [Key :: riak_object:key()]} |
             {error, timeout}
     end.
 
+%% @private
+wait_for_listkeys(ReqId, Timeout) ->
+    wait_for_listkeys(ReqId,Timeout,[]).
+%% @private
+wait_for_listkeys(ReqId,Timeout,Acc) ->
+    receive
+        {ReqId, done} -> {ok, Acc};
+        {ReqId,{keys,Res}} -> wait_for_listkeys(ReqId,Timeout,Acc++Res)
+    after Timeout ->
+            {error, timeout, Acc}
+    end.
 
-            
-            

src/riak_keys_fsm.erl

 -module(riak_keys_fsm).
 -behaviour(gen_fsm).
 
--export([start/4]).
+-export([start/6]).
 -export([init/1, handle_event/3, handle_sync_event/4,
          handle_info/3, terminate/3, code_change/4]).
 -export([initialize/2,waiting_kl/2]).
 
 -record(state, {client :: {pid(), reference()},
-                keys :: [set()],
-                waiting :: [node()],
+                client_type :: atom(),
+                bloom :: term(),
+                vnodes_left :: [node()],
                 bucket :: riak_object:bucket(),
                 timeout :: pos_integer(),
-                endtime :: pos_integer(),
                 req_id :: pos_integer(),
                 ring :: riak_ring:riak_ring()
                }).
 
-start(ReqId,Bucket,Timeout,From) ->
-    gen_fsm:start(?MODULE, [ReqId,Bucket,Timeout,From], []).
+start(ReqId,Bucket,Timeout,ClientType,ErrorTolerance,From) ->
+    gen_fsm:start(?MODULE,
+                  [ReqId,Bucket,Timeout,ClientType,ErrorTolerance,From], []).
 
 %% @private
-init([ReqId,Bucket,Timeout,Client]) ->
+init([ReqId,Bucket,Timeout,ClientType,ErrorTolerance,Client]) ->
     {ok, Ring} = riak_ring_manager:get_my_ring(),
-    StateData = #state{client=Client, timeout=Timeout,
-                       req_id=ReqId, bucket=Bucket, ring=Ring},
+    Bloom = bloom:sbf(10000000,ErrorTolerance),
+    StateData = #state{client=Client, client_type=ClientType, timeout=Timeout,
+                       bloom=Bloom, req_id=ReqId, bucket=Bucket, ring=Ring},
     {ok,initialize,StateData,0}.
 
+ask_vn({Index,Node},ReqId,Msg) ->
+    gen_server:cast({riak_vnode_master, Node},
+                    {vnode_list_bucket,{Index,ReqId},Msg}).
+
 %% @private
 initialize(timeout, StateData0=#state{timeout=Timeout, req_id=ReqId,
                                       bucket=Bucket, ring=Ring}) ->
-    RealStartTime = riak_util:moment(),
-    riak_eventer:notify(riak_keys_fsm, keys_fsm_start,
-                        {ReqId, RealStartTime, Bucket}),
-    Msg = {self(), Bucket, ReqId},
-    NodeList = riak_ring:all_owners(Ring),
-    Asked = lists:foldl(
-              fun({Index, Node}, Acc) ->
-                      case net_adm:ping(Node) of
-                          pang -> Acc;
-                          pong ->
-                              gen_server:cast(
-                                {riak_vnode_master, Node},
-                                {vnode_list_bucket,{Index,ReqId},Msg}),
-                              [Index|Acc]
-                      end
-              end,
-              [],
-              NodeList),
-    StateData = StateData0#state{waiting=Asked, keys=[],
-                       endtime=Timeout+riak_util:moment()},
+    [FirstVN|VNodes] = riak_ring:all_owners(Ring),
+    ask_vn(FirstVN,ReqId,{self(), Bucket, ReqId}),
+    StateData = StateData0#state{vnodes_left=VNodes},
     {next_state, waiting_kl, StateData, Timeout}.
 
-waiting_kl({kl, Keys, Idx, ReqId},
-           StateData=#state{keys=Acc,waiting=Waiting,endtime=End}) ->
-    NewAcc = [sets:from_list(Keys)|Acc],
-    case lists:delete(Idx, Waiting) of
-        [] ->
-            riak_eventer:notify(riak_keys_fsm, finish, {ReqId, normal}),
-            respond(StateData#state.client,NewAcc,ReqId),
-            {stop, normal, StateData};
-        StillWaiting ->
-            {next_state, waiting_kl,
-             StateData#state{keys=NewAcc,
-                             waiting=StillWaiting},
-             End-riak_util:moment()}
-    end;
-waiting_kl(timeout, StateData=#state{keys=Acc,client=Client,req_id=ReqId}) ->
-    riak_eventer:notify(riak_keys_fsm, finish, {ReqId, timeout}),
-    respond(Client, Acc, ReqId),
-    {stop, normal, StateData}.
+waiting_kl({kl, Keys, _Idx, ReqId},
+           StateData=#state{vnodes_left=[],bucket=Bucket,
+            bloom=Bloom,req_id=ReqId,client=Client,client_type=ClientType}) ->
+    process_keys(Keys,Bucket,ClientType,Bloom,ReqId,Client),
+    case ClientType of
+        mapred -> gen_fsm:send_event(Client,input_done);
+        plain -> Client ! {ReqId, done}
+    end,
+    {stop,normal,StateData};
+waiting_kl({kl, Keys, _Idx, ReqId},
+           StateData=#state{vnodes_left=[FirstVN|VNodes],bloom=Bloom,
+                   req_id=ReqId,client=Client,timeout=Timeout,
+                            bucket=Bucket,client_type=ClientType}) ->
+    ask_vn(FirstVN,ReqId,{self(), Bucket, ReqId}),
+    {next_state, waiting_kl,
+     StateData#state{
+       bloom=process_keys(Keys,Bucket,ClientType,Bloom,ReqId,Client),
+       vnodes_left=VNodes},
+     Timeout}.
 
 %% @private
-respond(Client, KeyLists, ReqId) ->
-    Reply = sets:to_list(sets:union(KeyLists)),
-    Client ! {ReqId, {ok, Reply}},
-    Reply.
+process_keys(Keys,Bucket,ClientType,Bloom,ReqId,Client) ->
+    process_keys(Keys,Bucket,ClientType,Bloom,ReqId,Client,[]).
+%% @private
+process_keys([],Bucket,ClientType,Bloom,ReqId,Client,Acc) ->
+    case ClientType of
+        mapred -> gen_fsm:send_event(Client,{input,[{Bucket,K} || K <- Acc]});
+        plain -> Client ! {ReqId, {keys, Acc}}
+    end,
+    lists:foldl(fun(E,A) -> bloom:add(E,A) end, Bloom, Acc);
+process_keys([K|Rest],Bucket,ClientType,Bloom,ReqId,Client,Acc) ->
+    case bloom:member(K,Bloom) of
+        true -> process_keys(Rest,Bucket,ClientType,Bloom,ReqId,Client,Acc);
+        false -> process_keys(Rest,Bucket,ClientType,Bloom,ReqId,Client,[K|Acc])
+    end.
 
 %% @private
 handle_event(_Event, _StateName, StateData) ->