Commits

Anonymous committed 93750f3

moving vclock pruning from FSM into vclock module and pushing it to backend-storage time

Comments (0)

Files changed (5)

src/riak_get_fsm.erl

                           bkey=BKey,
                           req_id=ReqId,
                           replied_notfound=NotFound,
-                          ring=Ring}) ->
+                          ring=Ring,
+                          starttime=StartTime}) ->
     case Final of
         {error, notfound} ->
             maybe_finalize_delete(StateData);
         {ok,_} ->
-            maybe_do_read_repair(Ring,Final,RepliedR,NotFound,BKey,ReqId);
+            maybe_do_read_repair(Ring,Final,RepliedR,NotFound,BKey,
+                                 ReqId,StartTime);
         _ -> nop
     end,
     {stop,normal,StateData}.
     end
     end).
 
-maybe_do_read_repair(Ring,Final,RepliedR,NotFound,BKey,ReqId) ->
+maybe_do_read_repair(Ring,Final,RepliedR,NotFound,BKey,ReqId,StartTime) ->
     Targets0 = ancestor_indices(Final, RepliedR) ++ NotFound,
     Targets = [{Idx,riak_ring:index_owner(Ring,Idx)} || Idx <- Targets0],
     {ok, FinalRObj} = Final,
-    Msg = {self(), BKey, FinalRObj, ReqId},
+    Msg = {self(), BKey, FinalRObj, ReqId, StartTime},
     case Targets of
         [] -> nop;
         _ ->

src/riak_put_fsm.erl

 %% @private
 initialize(timeout, StateData0=#state{robj=RObj0, req_id=ReqId,
                                       timeout=Timeout, ring=Ring}) ->
+    RObj = update_metadata(RObj0),
     RealStartTime = riak_util:moment(),
-    Bucket = riak_object:bucket(RObj0),
+    Bucket = riak_object:bucket(RObj),
     BucketProps = riak_bucket:get_bucket(Bucket, Ring),
-    RObj = prune_vclock(update_metadata(RObj0),BucketProps),
     Key = riak_object:key(RObj),
     riak_eventer:notify(riak_put_fsm, put_fsm_start,
                         {ReqId, RealStartTime, Bucket, Key}),
     DocIdx = riak_util:chash_key({Bucket, Key}),
-    Msg = {self(), {Bucket,Key}, RObj, ReqId},
+    Msg = {self(), {Bucket,Key}, RObj, ReqId, RealStartTime},
     N = proplists:get_value(n_val,BucketProps),
     Preflist = riak_ring:filtered_preflist(DocIdx, Ring, N),
     {Targets, Fallbacks} = lists:split(N, Preflist),
     end,
     riak_object:apply_updates(riak_object:update_metadata(RObj, NewMD)).
 
-prune_vclock(RObj,BucketProps) ->
-    % This function is a little bit evil, as it relies on the
-    % internal structure of vclocks.
-    % That structure being [{Id, {Vsn, Timestamp}}]
-    V = riak_object:vclock(RObj),
-    SortV = lists:sort(fun({_,{_,A}},{_,{_,B}}) -> A < B end, V),
-    Now = calendar:datetime_to_gregorian_seconds(erlang:universaltime()),
-    case prune_vclock1(Now,SortV,BucketProps,no_change) of
-        {no_change, _} -> RObj;
-        {pruned, NewV} -> riak_object:set_vclock(RObj,NewV)
-    end.
-
-prune_vclock1(Now,V,BProps,Changed) ->
-    case length(V) =< proplists:get_value(small_vclock,BProps) of
-        true -> {Changed, V};
-        false ->
-            {_,{_,HeadTime}} = hd(V),
-            case (Now - HeadTime) < proplists:get_value(young_vclock,BProps) of
-                true -> {Changed, V};
-                false -> prune_vclock1(Now,V,BProps,Changed,HeadTime)
-            end
-    end.
-prune_vclock1(Now,V,BProps,Changed,HeadTime) ->
-    % has a precondition that V is longer than small and older than young
-    case length(V) > proplists:get_value(big_vclock,BProps) of
-        true -> prune_vclock1(Now,tl(V),BProps,pruned);
-        false ->
-            case (Now - HeadTime) > proplists:get_value(old_vclock,BProps) of
-                true -> prune_vclock1(Now,tl(V),BProps,pruned);
-                false -> {Changed, V}
-            end
-    end.
-
 make_vtag(RObj) ->
     <<HashAsNum:128/integer>> = crypto:md5(term_to_binary(riak_object:vclock(RObj))),
     riak_util:integer_to_list(HashAsNum,62).
 
-% following two are just utility functions for test assist
-vc_obj(VC) -> riak_object:set_vclock(riak_object:new(<<"b">>,<<"k">>,<<"v">>), VC).
-obj_vc(OB) -> riak_object:vclock(OB).
-
-prune_small_vclock_test() ->
-    % vclock with less entries than small_vclock will be untouched
-    OldTime = calendar:datetime_to_gregorian_seconds(erlang:universaltime())
-               - 32000000,
-    SmallVC = [{<<"1">>, {1, OldTime}},
-               {<<"2">>, {2, OldTime}},
-               {<<"3">>, {3, OldTime}}],
-    Props = [{small_vclock,4}],
-    ?assertEqual(SmallVC, obj_vc(prune_vclock(vc_obj(SmallVC), Props))).
-
-prune_young_vclock_test() ->
-    % vclock with all entries younger than young_vclock will be untouched
-    NewTime = calendar:datetime_to_gregorian_seconds(erlang:universaltime())
-               - 1,
-    VC = [{<<"1">>, {1, NewTime}},
-          {<<"2">>, {2, NewTime}},
-          {<<"3">>, {3, NewTime}}],
-    Props = [{small_vclock,1},{young_vclock,1000}],
-    ?assertEqual(VC, obj_vc(prune_vclock(vc_obj(VC), Props))).
-
-prune_big_vclock_test() ->
-    % vclock not preserved by small or young will be pruned down to
-    % no larger than big_vclock entries
-    NewTime = calendar:datetime_to_gregorian_seconds(erlang:universaltime())
-               - 1000,
-    VC = [{<<"1">>, {1, NewTime}},
-          {<<"2">>, {2, NewTime}},
-          {<<"3">>, {3, NewTime}}],
-    Props = [{small_vclock,1},{young_vclock,1},
-             {big_vclock,2},{old_vclock,100000}],
-    ?assert(length(obj_vc(prune_vclock(vc_obj(VC), Props))) =:= 2).
-
-prune_old_vclock_test() ->
-    % vclock not preserved by small or young will be pruned down to
-    % no larger than big_vclock and no entries more than old_vclock ago
-    NewTime = calendar:datetime_to_gregorian_seconds(erlang:universaltime())
-               - 1000,
-    OldTime = calendar:datetime_to_gregorian_seconds(erlang:universaltime())
-               - 100000,    
-    VC = [{<<"1">>, {1, NewTime}},
-          {<<"2">>, {2, OldTime}},
-          {<<"3">>, {3, OldTime}}],
-    Props = [{small_vclock,1},{young_vclock,1},
-             {big_vclock,2},{old_vclock,10000}],
-    ?assert(length(obj_vc(prune_vclock(vc_obj(VC), Props))) =:= 1).
-
 make_vtag_test() ->
     Obj = riak_object:new(<<"b">>,<<"k">>,<<"v1">>),
     ?assertNot(make_vtag(Obj) =:= 

src/riak_vnode.erl

     VNode = self(),
     do_map(ClientPid,QTerm,BKey,KeyData,Cache,Mod,ModState,VNode),
     {noreply, State};
-handle_cast({put, FSM_pid, BKey, RObj, ReqID},
+handle_cast({put, FSM_pid, BKey, RObj, ReqID, FSMTime},
             State=#state{mapcache=Cache,idx=Idx}) ->
     riak_eventer:notify(riak_vnode, put, {ReqID, Idx}),
     gen_fsm:send_event(FSM_pid, {w, Idx, ReqID}),
-    do_put(FSM_pid, BKey, RObj, ReqID, State),
+    do_put(FSM_pid, BKey, RObj, ReqID, FSMTime, State),
     {noreply, State#state{mapcache=dict:erase(BKey,Cache)}};
 handle_cast({get, FSM_pid, BKey, ReqID}, State=#state{idx=Idx}) ->
     riak_eventer:notify(riak_vnode, get, {ReqID, Idx}),
 simple_binary_put(BKey, Val, Mod, ModState) ->
     Mod:put(ModState, BKey, Val).
 
-do_put(FSM_pid, BKey, RObj, ReqID,
+do_put(FSM_pid, BKey, RObj, ReqID, PruneTime, 
        _State=#state{idx=Idx,mod=Mod,modstate=ModState}) ->
+    {ok,Ring} = riak_ring_manager:get_my_ring(),    
+    {Bucket,_Key} = BKey,
+    BProps = riak_bucket:get_bucket(Bucket, Ring),
     case syntactic_put_merge(Mod, ModState, BKey, RObj, ReqID) of
         oldobj -> 
             riak_eventer:notify(riak_vnode,put_reply,ReqID),
             gen_fsm:send_event(FSM_pid, {dw, Idx, ReqID});
-        {newobj, ObjToStore} ->
+        {newobj, NewObj} ->
+            VC = riak_object:vclock(NewObj),
+            ObjToStore = riak_object:set_vclock(NewObj,
+                                           vclock:prune(VC,PruneTime,BProps)),
             Val = term_to_binary(ObjToStore, [compressed]),
             case simple_binary_put(BKey, Val, Mod, ModState) of
                 ok ->
                 false -> {newobj, ResObj}
             end    
     end.
+

src/riak_vnode_master.erl

     % (obligation done, now the problem of the vnodes)
     {noreply, State};
 handle_cast({vnode_put, {Partition,_Node},
-             {FSM_pid,BKey,RObj,ReqID}}, State) ->
+             {FSM_pid,BKey,RObj,ReqID,FSMTime}}, State) ->
     Pid = get_vnode(Partition, State),
-    gen_server2:cast(Pid, {put, FSM_pid, BKey, RObj, ReqID}),
+    gen_server2:cast(Pid, {put, FSM_pid, BKey, RObj, ReqID, FSMTime}),
     % (obligation done, now the problem of the vnodes)
     {noreply, State};
 handle_cast({vnode_get, {Partition,_Node},
 -author('Andy Gross <andy@basho.com>').
 
 -export([fresh/0,descends/2,merge/1,get_counter/2,get_timestamp/2,
-	increment/2,all_nodes/1,equal/2]).
+	increment/2,all_nodes/1,equal/2,prune/3]).
 
 -include_lib("eunit/include/eunit.hrl").
 
                 false -> true
             end
     end.
+
+% @doc Possibly shrink the size of a vclock, depending on current age and size.
+% @spec prune(V::vclock(), Now::integer(), BucketProps::term()) -> vclock()
+prune(V,Now,BucketProps) ->
+    SortV = lists:sort(fun({_,{_,A}},{_,{_,B}}) -> A < B end, V),
+    prune_vclock1(SortV,Now,BucketProps).
+% @private
+prune_vclock1(V,Now,BProps) ->
+    case length(V) =< proplists:get_value(small_vclock,BProps) of
+        true -> V;
+        false ->
+            {_,{_,HeadTime}} = hd(V),
+            case (Now - HeadTime) < proplists:get_value(young_vclock,BProps) of
+                true -> V;
+                false -> prune_vclock1(V,Now,BProps,HeadTime)
+            end
+    end.
+% @private
+prune_vclock1(V,Now,BProps,HeadTime) ->
+    % has a precondition that V is longer than small and older than young
+    case length(V) > proplists:get_value(big_vclock,BProps) of
+        true -> prune_vclock1(tl(V),Now,BProps);
+        false ->
+            case (Now - HeadTime) > proplists:get_value(old_vclock,BProps) of
+                true -> prune_vclock1(tl(V),Now,BProps);
+                false -> V
+            end
+    end.
+
+prune_small_test() ->
+    % vclock with less entries than small_vclock will be untouched
+    Now = riak_util:moment(),
+    OldTime = Now - 32000000,
+    SmallVC = [{<<"1">>, {1, OldTime}},
+               {<<"2">>, {2, OldTime}},
+               {<<"3">>, {3, OldTime}}],
+    Props = [{small_vclock,4}],
+    ?assertEqual(lists:sort(SmallVC), lists:sort(prune(SmallVC, Now, Props))).
+
+prune_young_test() ->
+    % vclock with all entries younger than young_vclock will be untouched
+    Now = riak_util:moment(),
+    NewTime = Now - 1,
+    VC = [{<<"1">>, {1, NewTime}},
+          {<<"2">>, {2, NewTime}},
+          {<<"3">>, {3, NewTime}}],
+    Props = [{small_vclock,1},{young_vclock,1000}],
+    ?assertEqual(lists:sort(VC), lists:sort(prune(VC, Now, Props))).
+
+prune_big_test() ->
+    % vclock not preserved by small or young will be pruned down to
+    % no larger than big_vclock entries
+    Now = riak_util:moment(),
+    NewTime = Now - 1000,
+    VC = [{<<"1">>, {1, NewTime}},
+          {<<"2">>, {2, NewTime}},
+          {<<"3">>, {3, NewTime}}],
+    Props = [{small_vclock,1},{young_vclock,1},
+             {big_vclock,2},{old_vclock,100000}],
+    ?assert(length(prune(VC, Now, Props)) =:= 2).
+
+prune_old_test() ->
+    % vclock not preserved by small or young will be pruned down to
+    % no larger than big_vclock and no entries more than old_vclock ago
+    Now = riak_util:moment(),
+    NewTime = Now - 1000,
+    OldTime = Now - 100000,    
+    VC = [{<<"1">>, {1, NewTime}},
+          {<<"2">>, {2, OldTime}},
+          {<<"3">>, {3, OldTime}}],
+    Props = [{small_vclock,1},{young_vclock,1},
+             {big_vclock,2},{old_vclock,10000}],
+    ?assert(length(prune(VC, Now, Props)) =:= 1).
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.