Commits

Anonymous committed 466208b

Additional changes to fix #274.

handle_info messages are passed down through riak_kv_multi_backend
to all the backends.
multi_backend filtersi handle_info messages based on the Module tag and the
specific backends (currently only bitcask backend uses handle_info messages)
based on the reference tag. This is an enabler for #210 as well.

  • Participants
  • Parent commits 5c7966f

Comments (0)

Files changed (3)

File apps/riak_kv/src/riak_kv_bitcask_backend.erl

 
 start(Partition, _Config) ->
     %% Schedule sync (if necessary)
+    TimerRef = make_ref(),
     case application:get_env(bitcask, sync_strategy) of
         {ok, {seconds, Seconds}} ->
             SyncIntervalMs = timer:seconds(Seconds),
             erlang:send_after(SyncIntervalMs, self(),
-                              {?MODULE, {sync, SyncIntervalMs}});
+                              {{?MODULE, TimerRef}, {sync, SyncIntervalMs}});
         _ ->
             ok
     end,
 
     %% Schedule merge checks
-    erlang:send_after(?MERGE_CHECK_INTERVAL, self(), {?MODULE, merge_check}),
+    erlang:send_after(?MERGE_CHECK_INTERVAL, self(), 
+                      {{?MODULE, TimerRef}, merge_check}),
 
     %% Get the data root directory
     DataDir =
 
     case bitcask:open(BitcaskRoot, [{read_write, true}]) of
         Ref when is_reference(Ref) ->
-            {ok, {Ref, BitcaskRoot}};
+            {ok, {Ref, BitcaskRoot, TimerRef}};
         {error, Reason2} ->
             {error, Reason2}
     end.
 
 
-stop({Ref, _}) ->
+stop({Ref, _, _}) ->
     bitcask:close(Ref).
 
 
-get({Ref, _}, BKey) ->
+get({Ref, _, _}, BKey) ->
     Key = term_to_binary(BKey),
     case bitcask:get(Ref, Key) of
         {ok, Value} ->
             {error, Reason}
     end.
 
-put({Ref, _}, BKey, Val) ->
+put({Ref, _, _}, BKey, Val) ->
     Key = term_to_binary(BKey),
     case bitcask:put(Ref, Key, Val) of
         ok -> ok;
             {error, Reason}
     end.
 
-delete({Ref, _}, BKey) ->
+delete({Ref, _, _}, BKey) ->
     case bitcask:delete(Ref, term_to_binary(BKey)) of
         ok -> ok;
         {error, Reason} ->
             {error, Reason}
     end.
 
-list({Ref, _}) ->
+list({Ref, _, _}) ->
     case bitcask:list_keys(Ref) of
         KeyList when is_list(KeyList) ->
             [binary_to_term(K) || K <- KeyList];
     [K || {B, K} <- ?MODULE:list(State), B =:= Bucket].
 
 
-fold({Ref, _}, Fun0, Acc0) ->
+fold({Ref, _, _}, Fun0, Acc0) ->
     %% When folding across the bitcask, the bucket/key tuple must
     %% be decoded. The intermediate binary_to_term call handles this
     %% and yields the expected fun({B, K}, Value, Acc)
                  end,
                  Acc0).
 
-drop({Ref, BitcaskRoot}) ->
+drop({Ref, BitcaskRoot, _}) ->
     %% todo: once bitcask has a more friendly drop function
     %%  of its own, use that instead.
     bitcask:close(Ref),
     file:del_dir(BitcaskRoot),
     ok.
 
-is_empty({Ref, _}) ->
+is_empty({Ref, _, _}) ->
     %% Determining if a bitcask is empty requires us to find at least
     %% one value that is NOT a tombstone. Accomplish this by doing a fold
     %% that forcibly bails on the very first k/v encountered.
             true
     end.
 
-handle_info({Ref, _}, {sync, SyncInterval}) ->
+handle_info({Ref, _, TimerRef}, {{?MODULE, TimerRef}, {sync, SyncInterval}}) ->
     bitcask:sync(Ref),
     erlang:send_after(SyncInterval, self(),
-                      {?MODULE, {sync, SyncInterval}});
+                      {{?MODULE, TimerRef}, {sync, SyncInterval}});
 
-handle_info({Ref, BitcaskRoot}, merge_check) ->
+handle_info({Ref, BitcaskRoot, TimerRef}, {{?MODULE, TimerRef}, merge_check}) ->
     case bitcask:needs_merge(Ref) of
         {true, Files} ->
             bitcask_merge_worker:merge(BitcaskRoot, [], Files);
         false ->
             ok
     end,
-    erlang:send_after(?MERGE_CHECK_INTERVAL, self(), {?MODULE, merge_check}).
+    erlang:send_after(?MERGE_CHECK_INTERVAL, self(),
+                      {{?MODULE, TimerRef}, merge_check});
+
+handle_info(_State, _TaggedMsg) ->
+    ok.
 
 
 %% ===================================================================

File apps/riak_kv/src/riak_kv_multi_backend.erl

                         Module:fold(SubState, Fun, Acc)
                 end, Extra, State#state.backends).
 
-handle_info(State, Msg) ->
-    F = fun(_Name, Module, SubState) ->
-                Module:handle_info(SubState, Msg)
+handle_info(State, TaggedMsg = {{Mod, _Ref}, _Msg}) ->
+    F = fun({_Name, Module, SubState}) ->
+                Module:handle_info(SubState, TaggedMsg)
         end,
-    [F(X) || X <- State#state.backends],
+    [F(X) || X = {_, Module, _} <- State#state.backends, Mod =:= Module],
     ok.
 
 % Given a Bucket name and the State, return the

File apps/riak_kv/src/riak_kv_vnode.erl

     {stop,normal,StateData};
 handle_info(ok, StateName, StateData) ->
     {next_state, StateName, StateData, ?TIMEOUT};
-handle_info({Mod, Msg}, StateName, #state { mod = Mod } = StateData) ->
-    Mod:handle_info(StateData#state.modstate, Msg),
+handle_info(TaggedMsg, StateName, #state { mod = Mod } = StateData) ->
+    Mod:handle_info(StateData#state.modstate, TaggedMsg),
     {next_state, StateName, StateData, ?TIMEOUT}.
 
 %% @private