Commits

Anonymous committed f94fa81

replace riak_bucketkeys with asking vnodes to return a list of the bucket-keys they have stored

only the ets backend implements the necessary bits as of this commit

  • Participants
  • Parent commits a2e9022

Comments (0)

Files changed (11)

File src/riak_api.erl

                            riak_bucket:get_bucket(BucketName))
           end),
     {noreply, State};
-handle_call({list_keys,Bucket}, From, State) ->
-    spawn(fun() ->
-        gen_server2:reply(From, riak_bucketkeys:get_keys(Bucket))
-          end),
+handle_call({list_keys,Bucket,Timeout}, From, State) ->
+    NewState = ensure_ring(State),
+    riak_keys_fsm:start(NewState#state.ring, Bucket, Timeout, From),
     {noreply, State}.
 
 %% @private

File src/riak_bucketkeys.erl

-%% This file is provided to you under the Apache License,
-%% Version 2.0 (the "License"); you may not use this file
-%% except in compliance with the License.  You may obtain
-%% a copy of the License at
-
-%%   http://www.apache.org/licenses/LICENSE-2.0
-
-%% Unless required by applicable law or agreed to in writing,
-%% software distributed under the License is distributed on an
-%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-%% KIND, either express or implied.  See the License for the
-%% specific language governing permissions and limitations
-%% under the License.    
-
-%% @doc Management of keylists for Riak buckets.
--module(riak_bucketkeys).
--behavior(gen_server2).
-
--export([put_key/2,
-         del_key/2,
-         get_keys/1]).
--export([start_link/0,
-         init/1,
-         handle_call/3,
-         handle_cast/2,
-         handle_info/2,
-         code_change/3,
-         terminate/2]).
-
--define(BUCK, ' bucketkeys').
--define(QSIZE, 1000).
--define(NFRAGS, 1024).
--define(FLUSH_INTERVAL, 1000).
--record(state, {ops, ring, count=0}).
-
-% State.ops is a dict, keys are bucketname atoms, vals are lists of operations,
-% where operations are tuples of {Op::[add|del], riak_object:key()}
-
-%% @spec put_key(riak_object:bucket(), riak_object:key()) -> ok
-%% @doc Add Key to the keylist for Bucket.
-put_key(Bucket,Key) -> 
-    gen_server2:cast(?MODULE,{add,Bucket,Key}).
-
-%% @spec del_key(riak_object:bucket(), riak_object:key()) -> ok
-%% @doc Remove Key from the keylist for Bucket.
-del_key(Bucket,Key) -> 
-    gen_server2:cast(?MODULE,{del,Bucket,Key}).
-
-%% @private
-start_link() -> 
-    gen_server2:start_link({local, ?MODULE}, ?MODULE, [], 
-                           [{fullsweep_after, 0}]).
-
-%% @private
-init(_) -> 
-    {ok, #state{ops=dict:new(),ring=undefined, count=0}}.
-
-handle_call(_,_,State) -> {reply,no_call_support,State}.
-
-handle_cast({OpType,Bucket,Key},State=#state{ops=Ops, count=Count}) ->
-    BucketFrag = list_to_binary(
-                   [atom_to_list(Bucket),"-",
-                    integer_to_list(erlang:phash2(Key) rem ?NFRAGS)]),
-    NewState = ensure_ring(State),
-    OpList = case dict:find(BucketFrag, Ops) of
-        error ->   [{OpType,Key}];
-        {ok, L} -> [{OpType,Key}|L]
-    end,
-    NewOps = dict:store(BucketFrag, OpList, Ops),
-    NewCount = Count + 1,
-    case NewCount > ?QSIZE of
-        true ->
-            do_write_all(NewOps,NewState#state.ring),
-            {noreply, NewState#state{ops=dict:new(), count=0}, ?FLUSH_INTERVAL};
-        false ->
-            {noreply,NewState#state{ops=NewOps, count=0}, ?FLUSH_INTERVAL}
-    end.
-
-%% @private
-sort_contents([], Acc) ->
-    [V || {_, V} <- lists:sort(Acc)];
-sort_contents([{M,V}|T], Acc) ->
-    LM = 
-        case dict:find(<<"X-Riak-Last-Modified">>, M) of
-            {ok, Val} -> Val;
-            error -> httpd_util:rfc1123_date()
-        end,
-    sort_contents(
-      T,
-      [{calendar:datetime_to_gregorian_seconds(httpd_util:convert_request_date(LM)), V}|Acc]
-     ).
-
-
-%% @private
-replay_changes([], Set) -> Set;
-replay_changes([{add,K}|T], Set) -> replay_changes(T, sets:add_element(K, Set));
-replay_changes([{del,K}|T], Set) -> replay_changes(T, sets:del_element(K, Set)).
-
-%% @private
-do_write_all(Ops, Ring) ->
-    fix_bucket(Ring),
-    [do_write_bucket(BucketOps, Ring) || BucketOps <- dict:to_list(Ops)].
-
-%% @private
-do_write_bucket({BucketName,OpList}, Ring) ->
-    Obj = case get_keysobj(BucketName) of
-              undefined ->
-                  riak_object:new(?BUCK,BucketName,{sets:new(), []});
-              O -> O
-          end,
-    NewSet = merge_keysobj(Obj, OpList),
-    NewV = {NewSet, OpList},
-    NewObj = riak_object:update_value(Obj,NewV),
-    put_keysobj(NewObj, Ring).
-
-%% @private
-get_keysobj(Bucket) ->
-    case gen_server2:call({riak_api,node()},
-                          {get,?BUCK,Bucket,1,120000}) of
-        {error, notfound} ->
-            undefined;
-        {error, Err} -> {error, Err};
-        {ok,Obj} -> Obj
-    end.
-
-%% @private
-get_all_keyfrags(Bucket, Ring) ->
-    fix_bucket(Ring),
-    [O || O <- [get_keysobj(Frag) || Frag <- all_frags(Bucket)],
-          O /= undefined].
-
-%% @private
-merge_keysobj(KeysObj, NewReplays) ->
-    Sorted = sort_contents(riak_object:get_contents(KeysObj), []),
-    {Sets, Replays0} = lists:unzip(Sorted),
-    UnionSet = sets:union(Sets), 
-    AllReplays = lists:flatten(Replays0 ++ lists:reverse(NewReplays)),
-    replay_changes(AllReplays, UnionSet).
-
-%% @private
-put_keysobj(KeysObj, Ring) ->
-    fix_bucket(Ring),
-    {ok, C} = riak:local_client(),
-    C:put(KeysObj, 1, 1, 120000).
-
-%% @spec get_keys(Bucket::atom()) -> 
-%%               {ok, [riak_object:key()]} | {error, Reason::term()}
-%% @doc Return the keylist for Bucket.
-get_keys(Bucket) ->            
-    % this one will cause a put if merge is needed
-    {ok, Ring} = riak_ring_manager:get_my_ring(),
-    AllFrags = get_all_keyfrags(Bucket, Ring),
-    FragErrs = [F || F <- AllFrags, element(1, F) =:= error],
-    case FragErrs of
-        [] ->
-            {ok, merge_frags(AllFrags, Ring, [])};
-        [E|_] -> E
-    end.
-    
-%% @private
-merge_frags([], _Ring, Acc) ->
-    sets:to_list(sets:union(Acc));
-merge_frags([F|T], Ring, Acc) ->
-    Contents = riak_object:get_values(F),
-    FSet = 
-        case length(Contents) of
-            1 ->
-                merge_keysobj(F, []);
-            _ ->
-                NewSet = merge_keysobj(F, []),
-                NewV = {NewSet, []},
-                NewObj = riak_object:update_value(F, NewV),
-                spawn(fun() -> put_keysobj(NewObj, Ring) end),
-                NewSet
-        end,
-    merge_frags(T, Ring, [FSet|Acc]).
-
-%% @private                    
-fix_bucket(Ring) ->
-    Bucket = riak_bucket:get_bucket(?BUCK, Ring),
-    Change = case proplists:get_value(n_val,Bucket) of
-        5 -> case proplists:get_value(allow_mult,Bucket) of
-                 true -> false;
-                 _ -> true
-             end;
-        _ -> true
-    end,
-    case Change of
-        false -> nop;
-        true ->
-            riak_bucket:set_bucket(?BUCK,
-                            [{n_val,5},{allow_mult,true},{has_links,false}])
-    end.
-
-%% @private 
-all_frags(Bucket) when is_atom(Bucket) ->
-    LB = atom_to_list(Bucket)++"-",
-    [list_to_binary(LB++integer_to_list(I)) || I <- lists:seq(0, ?NFRAGS-1)].
-%% @private
-ensure_ring(State=#state{ring=undefined}) ->
-    riak_ring_manager:subscribe(self()),
-    {ok, Ring} = riak_ring_manager:get_my_ring(),
-    State#state{ring=Ring};
-ensure_ring(State) -> State.
-
-%% @private
-terminate(_,_) -> ok.
-
-%% @private
-code_change(_OldVsn, State, _Extra) -> {ok, State}.
-
-%% @private
-handle_info({set_ring, Ring}, State) -> {noreply, State#state{ring=Ring}};
-handle_info(timeout, State=#state{ops=Ops}) ->
-    NewState = ensure_ring(State),
-    do_write_all(Ops, NewState#state.ring),
-    {noreply, NewState#state{ops=dict:new(), count=0}};
-handle_info(_,State) -> {stop,badmsg,State}.

File src/riak_client.erl

 %% @doc List the keys known to be present in Bucket.
 %%      Key lists are updated asynchronously, so this may be slightly
 %%      out of date if called immediately after a put or delete.
+%% @equiv list_keys(Bucket, default_timeout()*8)
 list_keys(Bucket) -> 
-    gen_server2:call({riak_api,Node}, {list_keys,Bucket}, ?DEFAULT_TIMEOUT*8).
+    list_keys(Bucket, ?DEFAULT_TIMEOUT*8).
+
+%% @spec list_keys(riak_object:bucket(), TimeoutMillisecs :: integer()) ->
+%%       {ok, [Key :: riak_object:key()]} |
+%%       {error, timeout} |
+%%       {error, Err :: term()}
+%% @doc List the keys known to be present in Bucket.
+%%      Key lists are updated asynchronously, so this may be slightly
+%%      out of date if called immediately after a put or delete.
+list_keys(Bucket, Timeout) -> 
+    gen_server2:call({riak_api,Node}, {list_keys,Bucket,Timeout}, Timeout).
 
 %% @spec set_bucket(riak_object:bucket(), [BucketProp :: {atom(),term()}]) -> ok
 %% @doc Set the given properties for Bucket.

File src/riak_delete.erl

             Reply = C:put(NewObj, RW, RW, RemainingTime),
             case Reply of
                 ok -> 
-                    spawn(fun()-> riak_bucketkeys:del_key(Bucket,Key) end),
                     spawn(
                       fun()-> reap(Bucket,Key,RemainingTime,Timeout,ReqID) end);
                 _ -> nop
             riak_eventer:notify(riak_delete, delete_reply, {ReqID, Reply}),
             gen_server2:reply(Client, Reply);
         {error, notfound} ->
-            spawn(fun()-> riak_bucketkeys:del_key(Bucket,Key) end),
             riak_eventer:notify(riak_delete, delete_reply,
                                 {ReqID, {error, notfound}}),
             gen_server2:reply(Client, {error, notfound});

File src/riak_ets_backend.erl

 -behaviour(gen_server).
 
 -include_lib("eunit/include/eunit.hrl").
--export([start/1,stop/1,get/2,put/3,list/1,delete/2]).
+-export([start/1,stop/1,get/2,put/4,list/1,list_bucket/2,delete/2]).
 
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
 	 terminate/2, code_change/3]).
 %% @private
 handle_call(stop,_From,State) -> {reply, srv_stop(State), State};
 handle_call({get,Key},_From,State) -> {reply, srv_get(State,Key), State};
-handle_call({put,Key,Val},_From,State) -> {reply, srv_put(State,Key,Val),State};
+handle_call({put,{B,K},Key,Val},_From,State) ->
+    {reply, srv_put(State,{B,K},Key,Val),State};
 handle_call({delete,Key},_From,State) -> {reply, srv_delete(State,Key),State};
-handle_call(list,_From,State) -> {reply, srv_list(State), State}.
+handle_call(list,_From,State) -> {reply, srv_list(State), State};
+handle_call({list_bucket,Bucket},_From,State) ->
+    {reply, srv_list_bucket(State, Bucket), State}.
 
 % @spec stop(state()) -> ok | {error, Reason :: term()}
 stop(SrvRef) -> gen_server:call(SrvRef,stop).
 srv_get(State, Key) ->
     case ets:lookup(State#state.t,Key) of
         [] -> {error, notfound};
-        [{Key,Val}] -> {ok, Val};
+        [{Key,{_,_,Val}}] -> {ok, Val};
         Err -> {error, Err}
     end.
 
-% put(state(), Key :: binary(), Val :: binary()) ->
+% put(state(), {B :: atom(), K :: binary()}, Key :: binary(),
+%     Val :: binary()) ->
 %   ok | {error, Reason :: term()}
 % key must be 160b
-put(SrvRef, Key, Val) -> gen_server:call(SrvRef,{put,Key,Val}).
-srv_put(State,Key,Val) ->       
-   case ets:insert(State#state.t, {Key,Val}) of
+put(SrvRef, {B,K}, Key, Val) -> gen_server:call(SrvRef,{put,{B,K},Key,Val}).
+srv_put(State,{B,K},Key,Val) ->       
+   case ets:insert(State#state.t, {Key,{B,K,Val}}) of
         true -> ok;
         Err -> {error, Err}
     end.
 list([],Acc) -> Acc;
 list([[K]|Rest],Acc) -> list(Rest,[K|Acc]).
 
+% list_bucket(Bucket :: atom(), state()) -> [Key :: binary()]
+list_bucket(SrvRef, Bucket) ->
+    gen_server:call(SrvRef,{list_bucket, Bucket}).
+srv_list_bucket(State, Bucket) ->
+    MList = ets:match(State#state.t,{'_',{Bucket,'$1','_'}}),
+    list(MList,[]).
+
 %% @private
 handle_info(_Msg, State) -> {noreply, State}.
 
 
 %% @private
 code_change(_OldVsn, State, _Extra) -> {ok, State}.
-

File src/riak_get_fsm.erl

                             riak_eventer:notify(riak_get_fsm,
                                                 delete_finalize_start,
                                                 {ReqID, Bucket, Key}),
-                            riak_bucketkeys:del_key(Bucket,Key),
                             [gen_server2:call({riak_vnode_master, Node},
                                              {vnode_del, {Idx,Node},
                                               {Storekey,ReqID}}) ||

File src/riak_keys_fsm.erl

+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License.  You may obtain
+%% a copy of the License at
+
+%%   http://www.apache.org/licenses/LICENSE-2.0
+
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied.  See the License for the
+%% specific language governing permissions and limitations
+%% under the License.    
+
+-module(riak_keys_fsm).
+-behaviour(gen_fsm).
+
+-export([start/4]).
+-export([init/1, handle_event/3, handle_sync_event/4,
+         handle_info/3, terminate/3, code_change/4]).
+-export([waiting_kl/2]).
+
+-record(state, {client :: {pid(), reference()},
+                keys :: [set()],
+                waiting :: [node()],
+                endtime :: pos_integer(),
+                req_id :: pos_integer()
+               }).
+
+start(Ring,Bucket,Timeout,From) ->
+    gen_fsm:start(?MODULE, [Ring,Bucket,Timeout,From], []).
+
+%% @private
+init([Ring,Bucket,Timeout,Client]) ->
+    RealStartTime = riak_util:moment(),
+    ReqID = erlang:phash2({random:uniform(), self(), Bucket, RealStartTime}),
+    riak_eventer:notify(riak_keys_fsm, keys_fsm_start,
+                        {ReqID, RealStartTime, Bucket}),
+    Msg = {self(), Bucket, ReqID},
+    NodeList = riak_ring:all_owners(Ring),
+    Asked = lists:foldl(
+              fun({Index, Node}, Acc) ->
+                      case net_adm:ping(Node) of
+                          pang -> Acc;
+                          pong ->
+                              gen_server:cast(
+                                {riak_vnode_master, Node},
+                                {vnode_list_bucket,{Index,ReqID},Msg}),
+                              [Index|Acc]
+                      end
+              end,
+              [],
+              NodeList),
+    StateData = #state{waiting=Asked, keys=[], client=Client,
+                       endtime=Timeout+riak_util:moment(),
+                       req_id=ReqID},
+    {ok, waiting_kl, StateData, Timeout}.
+
+waiting_kl({kl, Keys, Idx, ReqID},
+           StateData=#state{keys=Acc,waiting=Waiting,endtime=End}) ->
+    NewAcc = [sets:from_list(Keys)|Acc],
+    case lists:delete(Idx, Waiting) of
+        [] ->
+            riak_eventer:notify(riak_keys_fsm, finish, {ReqID, normal}),
+            respond(StateData#state.client,NewAcc),
+            {stop, normal, StateData};
+        StillWaiting ->
+            {next_state, waiting_kl,
+             StateData#state{keys=NewAcc,
+                             waiting=StillWaiting},
+             End-riak_util:moment()}
+    end;
+waiting_kl(timeout, StateData=#state{keys=Acc,client=Client,req_id=ReqID}) ->
+    riak_eventer:notify(riak_keys_fsm, finish, {ReqID, timeout}),
+    respond(Client, Acc),
+    {stop, normal, StateData}.
+
+%% @private
+respond(Client, KeyLists) ->
+    Reply = sets:to_list(sets:union(KeyLists)),
+    gen_server2:reply(Client, {ok, Reply}),
+    Reply.
+
+%% @private
+handle_event(_Event, _StateName, StateData) ->
+    {stop,badmsg,StateData}.
+
+%% @private
+handle_sync_event(_Event, _From, _StateName, StateData) ->
+    {stop,badmsg,StateData}.
+
+%% @private
+handle_info(_Info, _StateName, StateData) ->
+    {stop,badmsg,StateData}.
+
+%% @private
+terminate(Reason, _StateName, _State=#state{req_id=ReqID}) ->
+    riak_eventer:notify(riak_keys_fsm, key_fsm_end,
+                        {ReqID, Reason}),
+    Reason.
+
+%% @private
+code_change(_OldVsn, StateName, State, _Extra) ->
+    {ok, StateName, State}.

File src/riak_put_fsm.erl

         true ->
             case DW of
                 0 ->
-                    send_key_update(RObj),
                     gen_server2:reply(Client,ok),
                     riak_eventer:notify(riak_put_fsm, put_fsm_reply,
                                         {ReqID, ok}),
     Replied = [Idx|Replied0],
     case length(Replied) >= DW of
         true ->
-            send_key_update(RObj),
             riak_eventer:notify(riak_put_fsm, put_fsm_reply,
                                 {ReqID, ok}),
             gen_server2:reply(Client,ok),
     <<HashAsNum:128/integer>> = crypto:md5(iolist_to_binary(io_lib:format("~p",
                                                  [riak_object:vclock(RObj)]))),
     riak_util:integer_to_list(HashAsNum,62).
-
-send_key_update(RObj) ->
-    case riak_util:is_x_deleted(RObj) of
-        true -> nop;
-        false ->
-            spawn(fun() ->
-                case riak_object:bucket(RObj) of
-                    ' bucketkeys' -> nop; % special keylist bucket, ignore
-                    Bucket ->
-                        riak_bucketkeys:put_key(Bucket,
-                                                  riak_object:key(RObj))
-                          end
-                  end)
-    end.

File src/riak_sup.erl

     EventGuard = {riak_event_guard,
                  {riak_event_guard, start_link, []},
                  permanent, 5000, worker, dynamic},
-    BucketKeys = {riak_bucketkeys,
-                 {riak_bucketkeys, start_link, []},
-                  permanent, 5000, worker, dynamic},
     LocalLogger = {riak_local_logger,
                    {riak_local_logger, start_link, []},
                    permanent, 5000, worker, dynamic},
     Processes0 = 
     case riak:get_app_env(riak_web_ip) of
         "undefined" ->
-            [RingMgr,RingGossip,Connect,API,EventGuard,LocalLogger,BucketKeys];
+            [RingMgr,RingGossip,Connect,API,EventGuard,LocalLogger];
         undefined ->
-            [RingMgr,RingGossip,Connect,API,EventGuard,LocalLogger,BucketKeys];
+            [RingMgr,RingGossip,Connect,API,EventGuard,LocalLogger];
         _ ->
-            [RingMgr,RingGossip,Connect,API,EventGuard,LocalLogger,BucketKeys,
+            [RingMgr,RingGossip,Connect,API,EventGuard,LocalLogger,
              RiakWeb]
     end,
     Processes1 = 

File src/riak_vnode.erl

 handle_cast({delete, Client, Storekey, ReqID}, State=#state{idx=Idx}) ->
     riak_eventer:notify(riak_vnode, delete, {ReqID, Idx}),
     do_delete(Client, Storekey, ReqID, State),
+    {noreply, State};
+handle_cast({list_bucket, Client, Bucket, ReqID},
+            State=#state{mod=Mod,modstate=ModState,idx=Idx}) ->
+    riak_eventer:notify(riak_vnode, list_bucket, {ReqID, Idx}),
+    do_list_bucket(Client,ReqID,Bucket,Mod,ModState,Idx),
     {noreply, State}.
 
 %% @private
                   gen_server2:reply(From, RetVal)
           end).
 
+do_list_bucket(FSM_pid,ReqID,Bucket,Mod,ModState,Idx) ->
+    RetVal = Mod:list_bucket(ModState,Bucket),
+    riak_eventer:notify(riak_vnode, keys_reply, {ReqID, FSM_pid}),
+    gen_fsm:send_event(FSM_pid, {kl, RetVal,Idx,ReqID}).
+
 do_get_binary(Storekey, Mod, ModState) ->
     Mod:get(ModState,Storekey).
 
             gen_server2:reply(Client, {fail, Idx, ReqID})
     end.
 
-simple_binary_put(Storekey, Val, Mod, ModState) ->
-    Mod:put(ModState, Storekey, Val).
+simple_binary_put({Bucket, Key}, Storekey, Val, Mod, ModState) ->
+    Mod:put(ModState, {Bucket, Key}, Storekey, Val).
 
 do_put(FSM_pid, Storekey, RObj, ReqID,
        _State=#state{idx=Idx,mod=Mod,modstate=ModState}) ->
             gen_fsm:send_event(FSM_pid, {dw, Idx, ReqID});
         {newobj, ObjToStore} ->
             Val = term_to_binary(ObjToStore, [compressed]),
-            case simple_binary_put(Storekey, Val, Mod, ModState) of
+            case simple_binary_put({riak_object:bucket(RObj),
+                                    riak_object:key(RObj)},
+                                   Storekey, Val, Mod, ModState) of
                 ok ->
                     riak_eventer:notify(riak_vnode,put_reply,ReqID),
                     gen_fsm:send_event(FSM_pid, {dw, Idx, ReqID});

File src/riak_vnode_master.erl

     Pid = get_vnode(Partition, State),
     gen_server2:cast(Pid, {vnode_merkle, {RemoteVN,Merkle}}),
     % (obligation done, now the problem of the vnodes)
+    {noreply, State};
+handle_cast({vnode_list_bucket, {Partition,_Node},
+            {FSM_pid, Bucket, ReqID}}, State) ->
+    Pid = get_vnode(Partition, State),
+    gen_server2:cast(Pid, {list_bucket, FSM_pid, Bucket, ReqID}),
     {noreply, State}.