Commits

Anonymous committed 5154bfe

Added functions riak_client:filter_keys and riak_client:list_buckets; updated ets, dets, osmos & fs backends to support them.

Comments (0)

Files changed (5)

src/riak_client.erl

 -export([put/2,put/3,put/4]).
 -export([delete/3,delete/4]).
 -export([list_keys/1,list_keys/2]).
+-export([filter_keys/2,filter_keys/3]).
+-export([list_buckets/0,list_buckets/1]).
 -export([set_bucket/2,get_bucket/1]).
 -export([reload_all/1]).
 -export([remove_from_cluster/1]).
             {error, timeout}
     end.
 
+%% @spec filter_keys(riak_object:bucket(), Fun :: function()) ->
+%%       {ok, [Key :: riak_object:key()]} |
+%%       {error, timeout} |
+%%       {error, Err :: term()}
+%% @doc List the keys known to be present in Bucket, 
+%%      filtered at the vnode according to Fun, via lists:filter.
+%%      Key lists are updated asynchronously, so this may be slightly
+%%      out of date if called immediately after a put or delete.
+%% @equiv filter_keys(Bucket, Fun, default_timeout()*8)
+filter_keys(Bucket, Fun) -> 
+    list_keys({filter, Bucket, Fun}, ?DEFAULT_TIMEOUT*8).
+
+%% @spec filter_keys(riak_object:bucket(), Fun :: function(), TimeoutMillisecs :: integer()) ->
+%%       {ok, [Key :: riak_object:key()]} |
+%%       {error, timeout} |
+%%       {error, Err :: term()}
+%% @doc List the keys known to be present in Bucket, 
+%%      filtered at the vnode according to Fun, via lists:filter.
+%%      Key lists are updated asynchronously, so this may be slightly
+%%      out of date if called immediately after a put or delete.
+filter_keys(Bucket, Fun, Timeout) -> 
+    list_keys({filter, Bucket, Fun}, Timeout).
+
+%% @spec list_buckets() ->
+%%       {ok, [Bucket :: riak_object:bucket()]} |
+%%       {error, timeout} |
+%%       {error, Err :: term()}
+%% @doc List buckets known to have keys.
+%%      Key lists are updated asynchronously, so this may be slightly
+%%      out of date if called immediately after any operation that
+%%      either adds the first key or removes the last remaining key from
+%%      a bucket.
+%% @equiv list_buckets(default_timeout()*8)
+list_buckets() -> 
+    list_buckets(?DEFAULT_TIMEOUT*8).
+
+%% @spec list_buckets(TimeoutMillisecs :: integer()) ->
+%%       {ok, [Bucket :: riak_object:bucket()]} |
+%%       {error, timeout} |
+%%       {error, Err :: term()}
+%% @doc List buckets known to have keys.
+%%      Key lists are updated asynchronously, so this may be slightly
+%%      out of date if called immediately after any operation that
+%%      either adds the first key or removes the last remaining key from
+%%      a bucket.
+list_buckets(Timeout) -> 
+    list_keys('_', Timeout).
+
 %% @spec set_bucket(riak_object:bucket(), [BucketProp :: {atom(),term()}]) -> ok
 %% @doc Set the given properties for Bucket.
 %%      This is generally best if done at application start time,

src/riak_dets_backend.erl

 list([],Acc) -> Acc;
 list([[K]|Rest],Acc) -> list(Rest,[K|Acc]).
 
+list_bucket(#state{table=T}, {filter, Bucket, Fun}) ->
+    MList = lists:filter(Fun, dets:match(T,{{Bucket,'$1'},'_'})),
+    list(MList,[]);
 list_bucket(#state{table=T}, Bucket) ->
-    MList = dets:match(T,{{Bucket,'$1'},'_'}),
+    case Bucket of
+        '_' -> MatchSpec = {{'$1','_'},'_'};
+        _ -> MatchSpec = {{Bucket,'$1'},'_'}
+    end,
+    MList = dets:match(T,MatchSpec),
     list(MList,[]).
+

src/riak_ets_backend.erl

 % list_bucket(Bucket :: atom(), state()) -> [Key :: binary()]
 list_bucket(SrvRef, Bucket) ->
     gen_server:call(SrvRef,{list_bucket, Bucket}).
+srv_list_bucket(State, {filter, Bucket, Fun}) ->
+    MList = lists:filter(Fun, ets:match(State#state.t,{{Bucket,'$1'},'_'})),
+    list(MList,[]);
 srv_list_bucket(State, Bucket) ->
-    MList = ets:match(State#state.t,{{Bucket,'$1'},'_'}),
+    case Bucket of
+        '_' -> MatchSpec = {{'$1','_'},'_'};
+        _ -> MatchSpec = {{Bucket,'$1'},'_'}
+    end,
+    MList = ets:match(State#state.t,MatchSpec),
     list(MList,[]).
 
 %% @private

src/riak_fs_backend.erl

 %%           [riak_object:key()]
 %% @doc Get a list of the keys in a bucket
 list_bucket(State, Bucket) ->
-    B64 = encode_bucket(Bucket),
-    L = length(State#state.dir),
-    [ K || {_,K} <- [ location_to_bkey(lists:nthtail(L, X)) ||
-                        X <- filelib:wildcard(
-                               filename:join([State#state.dir,
-                                              B64,"*/*/*/*"])) ]].
+    case Bucket of
+        '_' ->
+            lists:usort(lists:map(fun({B, _}) -> B end, list(State)));
+        {filter, B, Fun} ->
+            [ hd(K) || K <-
+                lists:filter(Fun,
+                    [ EV || EV <- lists:map(fun(K) ->
+                                                case K of
+                                                    {B, Key} -> [Key];
+                                                    _ -> []
+                                                end
+                                            end, list(State)),
+                            EV /= [] ]) ];
+        _ ->
+            B64 = encode_bucket(Bucket),
+            L = length(State#state.dir),
+            [ K || {_,K} <- [ location_to_bkey(lists:nthtail(L, X)) ||
+                                X <- filelib:wildcard(
+                                       filename:join([State#state.dir,
+                                                      B64,"*/*/*/*"])) ]]
+    end.
 
 %% @spec location(state(), {riak_object:bucket(), riak_object:key()})
 %%          -> string()

src/riak_osmos_backend.erl

 %% list_bucket(state(), Bucket :: atom()) -> [Key :: binary()]
 list_bucket(#state{table=Table}, Bucket) ->
     accum(Table,
-          fun(K,_) ->
-                  case binary_to_term(K) of
-                      {Bucket, Key} -> {true, Key};
-                      _             -> false
+          fun(Key,_) ->
+                  {B, K} = binary_to_term(Key),
+                  case Bucket of
+                      '_' -> {true, B};
+                      {filter, B, Fun} -> 
+                                  case Fun([K]) of
+                                      true -> {true, K};
+                                      _    -> false
+                                  end;
+                      {filter, _, _} -> false;
+                      B -> {true, K};
+                      _ -> false
                   end
           end).