Anonymous avatar Anonymous committed 43e574a Merge

merge

Comments (0)

Files changed (1)

src/riak_bucketkeys.erl

 
 %% @doc Management of keylists for Riak buckets.
 -module(riak_bucketkeys).
--behavior(gen_fsm).
+-behavior(gen_server2).
 
--export([start_link/0]).
--export([init/1,waiting/2,terminate/3]).
--export([code_change/4,handle_event/3,handle_info/3,handle_sync_event/4]).
--export([put_key/2,get_keys/1,del_key/2]).
+-export([put_key/2,
+         del_key/2,
+         get_keys/1]).
+-export([start_link/0,
+         init/1,
+         handle_call/3,
+         handle_cast/2,
+         handle_info/2,
+         code_change/3,
+         terminate/2]).
 
 -define(BUCK, ' bucketkeys').
 -define(QSIZE, 1000).
 -define(NFRAGS, 1024).
 -define(FLUSH_INTERVAL, 1000).
--record(state, {ops, ring}).
+-record(state, {ops, ring, tref}).
 
 % State.ops is a dict, keys are bucketname atoms, vals are lists of operations,
 % where operations are tuples of {Op::[add|del], riak_object:key()}
 %% @spec put_key(riak_object:bucket(), riak_object:key()) -> ok
 %% @doc Add Key to the keylist for Bucket.
 put_key(Bucket,Key) -> 
-    gen_fsm:send_event(?MODULE,{add,Bucket,Key}).
+    gen_server2:cast(?MODULE,{add,Bucket,Key}).
 
 %% @spec del_key(riak_object:bucket(), riak_object:key()) -> ok
 %% @doc Remove Key from the keylist for Bucket.
-del_key(Bucket,Key) -> gen_fsm:send_event(?MODULE,{del,Bucket,Key}).
+del_key(Bucket,Key) -> 
+    gen_server2:cast(?MODULE,{del,Bucket,Key}).
 
 %% @private
-start_link() -> gen_fsm:start_link({local, ?MODULE}, ?MODULE, [], []).
+start_link() -> 
+    gen_server2:start_link({local, ?MODULE}, ?MODULE, [], 
+                           [{fullsweep_after, 0}]).
 
 %% @private
 init(_) -> 
-    {ok, waiting, #state{ops=dict:new(),ring=undefined}, ?FLUSH_INTERVAL}.
+    {ok, TRef} = timer:send_interval(1000, timeout),
+    {ok, #state{ops=dict:new(),ring=undefined, tref=TRef}}.
 
-%% @private
-waiting(timeout, State=#state{ops=Ops}) ->
-    NewState = ensure_ring(State),
-    do_write_all(Ops, NewState#state.ring),
-    {next_state, waiting, NewState#state{ops=dict:new()}, ?FLUSH_INTERVAL};
+handle_call(_,_,State) -> {reply,no_call_support,State}.
 
-%% @private
-waiting({OpType,Bucket,Key}, State=#state{ops=Ops}) ->
+handle_cast({OpType,Bucket,Key},State=#state{ops=Ops}) ->
     BucketFrag = lists:flatten(
                    io_lib:format("~s-~b", 
                                  [Bucket, 
                                   erlang:phash2(Key) rem ?NFRAGS])),
     NewState = ensure_ring(State),
     OpList = case dict:find(BucketFrag, Ops) of
-        error -> [{OpType,Key}];
+        error ->   [{OpType,Key}];
         {ok, L} -> [{OpType,Key}|L]
     end,
     case length(OpList) > ?QSIZE of
         true ->
             do_write_all(dict:store(BucketFrag, OpList, Ops), 
                          NewState#state.ring),
-            {next_state,waiting,NewState#state{ops=dict:new()},?FLUSH_INTERVAL};
+            {noreply, NewState#state{ops=dict:new()}};
         false ->
-            {next_state,waiting,
-             NewState#state{ops=dict:store(BucketFrag,OpList,Ops)},
-             ?FLUSH_INTERVAL}
+            {noreply,NewState#state{ops=dict:store(BucketFrag,OpList,Ops)}}
     end.
 
 %% @private
 %% @private 
 all_frags(Bucket) when is_atom(Bucket) ->
     [atom_to_list(Bucket) ++ "-" ++ X || X <- [integer_to_list(I) || 
-                                                  I <- lists:seq(0, ?NFRAGS-1)]].
-
+                                               I <- lists:seq(0, ?NFRAGS-1)]].
 %% @private
 ensure_ring(State=#state{ring=undefined}) ->
     riak_ring_manager:subscribe(self()),
 ensure_ring(State) -> State.
 
 %% @private
-terminate(_,_,_) -> ok.
+terminate(_,#state{tref=TRef}) -> timer:cancel(TRef), ok.
 
 %% @private
-code_change(_OldVsn, StateName, State, _Extra) -> {ok, StateName, State}.
+code_change(_OldVsn, State, _Extra) -> {ok, State}.
 
 %% @private
-handle_event(_, _, StateData) -> {stop,badmsg,StateData}.
-
-%% @private
-handle_sync_event(_, _, _, StateData) -> {stop,badmsg,StateData}.
-
-%% @private
-handle_info({set_ring, Ring}, StateName, State) -> 
-    {next_state, StateName, State#state{ring=Ring}, ?FLUSH_INTERVAL};
-handle_info(_, _, StateData) -> {stop,badmsg,StateData}.
+handle_info({set_ring, Ring}, State) -> {noreply, State#state{ring=Ring}};
+handle_info(timeout, State=#state{ops=Ops}) ->
+    NewState = ensure_ring(State),
+    do_write_all(Ops, NewState#state.ring),
+    {noreply, NewState#state{ops=dict:new()}};
+handle_info(_,State) -> {stop,badmsg,State}.
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.