Commits

Anonymous committed a2e9022

use gen_server timeouts instead of timer:send_interval for riak_bucketkeys

Comments (0)

Files changed (1)

src/riak_bucketkeys.erl

 -define(QSIZE, 1000).
 -define(NFRAGS, 1024).
 -define(FLUSH_INTERVAL, 1000).
--record(state, {ops, ring, tref}).
+-record(state, {ops, ring, count=0}).
 
 % State.ops is a dict, keys are bucketname atoms, vals are lists of operations,
 % where operations are tuples of {Op::[add|del], riak_object:key()}
 
 %% @private
 init(_) -> 
-    {ok, TRef} = timer:send_interval(1000, timeout),
-    {ok, #state{ops=dict:new(),ring=undefined, tref=TRef}}.
+    {ok, #state{ops=dict:new(),ring=undefined, count=0}}.
 
 handle_call(_,_,State) -> {reply,no_call_support,State}.
 
-handle_cast({OpType,Bucket,Key},State=#state{ops=Ops}) ->
+handle_cast({OpType,Bucket,Key},State=#state{ops=Ops, count=Count}) ->
     BucketFrag = list_to_binary(
                    [atom_to_list(Bucket),"-",
                     integer_to_list(erlang:phash2(Key) rem ?NFRAGS)]),
         error ->   [{OpType,Key}];
         {ok, L} -> [{OpType,Key}|L]
     end,
-    case length(OpList) > ?QSIZE of
+    NewOps = dict:store(BucketFrag, OpList, Ops),
+    NewCount = Count + 1,
+    case NewCount > ?QSIZE of
         true ->
-            do_write_all(dict:store(BucketFrag, OpList, Ops), 
-                         NewState#state.ring),
-            {noreply, NewState#state{ops=dict:new()}};
+            do_write_all(NewOps,NewState#state.ring),
+            {noreply, NewState#state{ops=dict:new(), count=0}, ?FLUSH_INTERVAL};
         false ->
-            {noreply,NewState#state{ops=dict:store(BucketFrag,OpList,Ops)}}
+            {noreply,NewState#state{ops=NewOps, count=0}, ?FLUSH_INTERVAL}
     end.
 
 %% @private
 
 %% @private
 do_write_all(Ops, Ring) ->
+    fix_bucket(Ring),
     [do_write_bucket(BucketOps, Ring) || BucketOps <- dict:to_list(Ops)].
 
 %% @private
 do_write_bucket({BucketName,OpList}, Ring) ->
-    Obj = get_keysobj(BucketName, Ring),
+    Obj = case get_keysobj(BucketName) of
+              undefined ->
+                  riak_object:new(?BUCK,BucketName,{sets:new(), []});
+              O -> O
+          end,
     NewSet = merge_keysobj(Obj, OpList),
     NewV = {NewSet, OpList},
     NewObj = riak_object:update_value(Obj,NewV),
     put_keysobj(NewObj, Ring).
 
 %% @private
-get_keysobj(Bucket, Ring) ->
-    fix_bucket(Ring),
+get_keysobj(Bucket) ->
     case gen_server2:call({riak_api,node()},
                           {get,?BUCK,Bucket,1,120000}) of
         {error, notfound} ->
-            riak_object:new(?BUCK,Bucket,{sets:new(), []});
+            undefined;
         {error, Err} -> {error, Err};
         {ok,Obj} -> Obj
     end.
 %% @private
 get_all_keyfrags(Bucket, Ring) ->
     fix_bucket(Ring),
-    [get_keysobj(Frag, Ring) || Frag <- all_frags(Bucket)].
+    [O || O <- [get_keysobj(Frag) || Frag <- all_frags(Bucket)],
+          O /= undefined].
 
 %% @private
 merge_keysobj(KeysObj, NewReplays) ->
     FragErrs = [F || F <- AllFrags, element(1, F) =:= error],
     case FragErrs of
         [] ->
-            {ok, merge_frags(AllFrags, Ring, sets:new())};
+            {ok, merge_frags(AllFrags, Ring, [])};
         [E|_] -> E
     end.
     
 %% @private
 merge_frags([], _Ring, Acc) ->
-    sets:to_list(Acc);
+    sets:to_list(sets:union(Acc));
 merge_frags([F|T], Ring, Acc) ->
     Contents = riak_object:get_values(F),
     FSet = 
                 spawn(fun() -> put_keysobj(NewObj, Ring) end),
                 NewSet
         end,
-    merge_frags(T, Ring, sets:union([FSet, Acc])).
+    merge_frags(T, Ring, [FSet|Acc]).
 
 %% @private                    
 fix_bucket(Ring) ->
 ensure_ring(State) -> State.
 
 %% @private
-terminate(_,#state{tref=TRef}) -> timer:cancel(TRef), ok.
+terminate(_,_) -> ok.
 
 %% @private
 code_change(_OldVsn, State, _Extra) -> {ok, State}.
 handle_info(timeout, State=#state{ops=Ops}) ->
     NewState = ensure_ring(State),
     do_write_all(Ops, NewState#state.ring),
-    {noreply, NewState#state{ops=dict:new()}};
+    {noreply, NewState#state{ops=dict:new(), count=0}};
 handle_info(_,State) -> {stop,badmsg,State}.
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.