Commits

Anonymous committed c515d2c

Two things: give Riak the ability to run multiple backends (riak_multi_backend), and add an in-memory cache backend with LRU ejection (riak_cache_backend).
Backends are configured in etc/app.config (documented in riak_multi_backend). A backend is chosen based on the 'backend' bucket parameter.

Comments (0)

Files changed (11)

apps/riak/ebin/riak.app

              riak_app,
              riak_backup,
              riak_bucket,
+			 riak_cache_backend,
              riak_claim,
              riak_client,
              riak_connect,
              riak_map_phase_fsm,
              riak_mapreduce,
              riak_mapreduce_fsm,
+			 riak_multi_backend,
              riak_object,
              riak_osmos_backend,
              riak_put_fsm,

apps/riak/src/riak.erl

 -author('Justin Sheehy <justin@basho.com>').
 -author('Bryan Fink <bryan@basho.com>').
 -export([stop/0, stop/1]).
--export([get_app_env/1,get_app_env/2]).
+-export([get_app_env/0, get_app_env/1,get_app_env/2]).
 -export([client_connect/1,client_connect/2,
          client_test/1,
          local_client/0,local_client/1,
     % we never do an application:stop because that makes it very hard
     %  to really halt the runtime, which is what we need here.
     error_logger:info_msg(io_lib:format("~p~n",[Reason])),
-    init:stop().    
+    init:stop().
+    
+%% @spec get_app_env() -> [{Key :: atom(), Value :: term()}]
+%% @doc Retrieve all values set in riak's configuration file.
+%%      Returns a list of Key/Value pairs.
+get_app_env() -> 
+    application:get_all_env(riak) ++ init:get_arguments().   
 
 %% @spec get_app_env(Opt :: atom()) -> term()
 %% @doc The official way to get the values set in riak's configuration file.

apps/riak/src/riak_cache_backend.erl

+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License.  You may obtain
+%% a copy of the License at
+
+%%   http://www.apache.org/licenses/LICENSE-2.0
+
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied.  See the License for the
+%% specific language governing permissions and limitations
+%% under the License.    
+
+%% @doc
+%% riak_cache_backend is a backend that turns a bucket into a 
+%% memcached-type memory cache. Ejects the least recently used
+%% objects either when cache gets full or the object's lease
+%% expires.
+%%
+%% 
+%% === Config Settings ===
+%% 
+%% * 'riak_cache_backend_memory' - Specifies the amount of maximum 
+%%   amount of memory allocated to cache, in MB. 
+%% * 'riak_cache_backend_ttl' - When an object is accessed, renew the
+%%   lease for this many seconds.
+%% * 'riak_cache_backend_max_ttl' - Don't allow the object's lease to 
+%%   be renewed after this many seconds.
+
+-module (riak_cache_backend).
+-export([start/2, stop/1, get/2, put/3, list/1, list_bucket/2, delete/2]).
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
+
+-define(PRINT(Var), error_logger:info_msg("DEBUG: ~p:~p - ~p: ~p~n", [?MODULE, ?LINE, ??Var, Var])).
+
+
+-include_lib("eunit/include/eunit.hrl").
+
+-define (DEFAULT_TTL,     timer:minutes(10)).
+-define (DEFAULT_MAX_TTL, timer:hours(1)).
+-define (DEFAULT_MEMORY,  100).  % 100MB
+
+% obj_tree two levels of gb_tree
+%   - first level is buckets
+%   - second level is keys
+-record (state, {ttl, max_ttl, max_memory, used_memory, obj_tree, time_tree}).
+-record (entry, {bucket, key, value, tref, created, now}).
+
+%%% RIAK BACKEND INTERFACE %%%
+
+% @spec start(Partition :: integer(), Config :: proplist()) ->
+%                        {ok, state()} | {{error, Reason :: term()}, state()}
+start(_Partition, Config) ->
+    gen_server:start_link(?MODULE, [Config], []).
+
+get(State, BKey) -> 
+    gen_server:call(State, {get, BKey}).
+    
+put(State, BKey, Val) -> 
+    gen_server:call(State, {put, BKey, Val}).
+    
+list(State) -> 
+    gen_server:call(State, list).
+
+list_bucket(State, Bucket) ->
+    gen_server:call(State, {list_bucket, Bucket}).
+
+delete(State, BKey) -> 
+    gen_server:call(State, {delete, BKey}).
+
+stop(State) -> 
+    gen_server:call(State, stop).
+
+
+%%% GEN_SERVER %%%
+
+%% @private
+init([Config]) ->
+    MaxTTL = proplists:get_value(riak_cache_backend_max_ttl, Config, ?DEFAULT_MAX_TTL),
+    TTL = proplists:get_value(riak_cache_backend_ttl, Config, ?DEFAULT_TTL),
+    Memory = proplists:get_value(riak_cache_backend_memory, Config, ?DEFAULT_MEMORY),
+    {ok, #state { 
+        ttl=TTL, max_ttl=MaxTTL, max_memory=Memory * 1024 * 1024, used_memory=0,
+        obj_tree=gb_trees:empty(), time_tree=gb_trees:empty()
+    }}.
+
+
+%% @private
+handle_call({get, {Bucket, Key}}, _From, State) -> 
+    % Read using the BKey...
+    case obj_find(Bucket, Key, State) of
+        {value, Entry} ->
+            % If we have exceeded the max_ttl for this object, then 
+            % delete the object and return {error, notfound}.
+            % Otherwise, renew the lease and return the object.
+            case exceeds_max_ttl(Entry, State#state.max_ttl) of
+                true -> 
+                    {_, ok, NewState} = handle_call({delete, {Bucket, Key}}, _From, State),
+                    {reply, {error, notfound}, NewState};
+                false ->
+                    NewEntry = extend_lease(Entry, State#state.ttl),
+                    NewState = obj_store(NewEntry, State),
+                    {reply, {ok, NewEntry#entry.value}, NewState}
+            end;
+        none ->
+            {reply, {error, notfound}, State}
+    end;
+    
+handle_call({put, {Bucket, Key}, Value}, _From, State) ->
+    % Delete the old entry if it exists...
+    {_, ok, State1} = handle_call({delete, {Bucket, Key}}, _From, State),
+    
+    % Make an entry in obj_tree and time_tree
+    % Create a timer ref to remove.
+    Now = {now(), make_ref()},
+    Entry = #entry { bucket=Bucket, key=Key, value=Value, created=now(), now=Now },
+    NewEntry = extend_lease(Entry, State#state.ttl),
+    State2 = obj_store(NewEntry, State1),
+    State3 = time_store(NewEntry, State2),
+    
+    % Update space used...
+    Size = size(Bucket) + size(Key) + size(Value),
+    UsedMemory = State3#state.used_memory + Size,
+    State4 = State3#state { used_memory=UsedMemory },
+    
+    % Get us under the max_memory setting...
+    State5 = trim_while_too_big(State4),
+    
+    {reply, ok, State5};
+    
+handle_call({delete, {Bucket, Key}}, _From, State) -> 
+    % Remove the object from th obj_tree and time_tree...
+    case obj_find(Bucket, Key, State) of
+        {value, Entry} ->
+            % Remove the timer ref...
+            cancel_timer(Entry),
+
+            % Remove entries from obj_tree and time_tree...
+            State1 = obj_delete(Entry, State),
+            State2 = time_delete(Entry, State1),
+            
+            % Update space used...
+            Size = size(Bucket) + size(Key) + size(Entry#entry.value),
+            UsedMemory = State2#state.used_memory - Size,
+            State3 = State2#state { used_memory=UsedMemory },
+            {reply, ok, State3};
+        none ->
+            {reply, ok, State}
+    end;
+    
+handle_call({eject_from_cache, {Bucket, Key}, UniqueRef}, _From, State) ->
+    % Check if the object exists.
+    case obj_find(Bucket, Key, State) of
+        {value, Entry} ->
+            % Check if the Uniquerefs match.
+            case Entry#entry.tref of
+                {_, UniqueRef} -> 
+                    % Delete the object.
+                    handle_call({delete, {Bucket, Key}}, self(), State);
+                _ ->
+                    % Refs don't match, so ignore.
+                    {reply, ok, State}
+            end;
+        none ->
+            % Missing object, so ignore.
+            {reply, ok, State}
+    end;
+    
+handle_call(list, _From, State) ->
+    % Fold through the gb_trees, gathering keys...
+    F = fun(Bucket, Tree, Acc) ->
+        io:format("Bucket: ~p~n", [Bucket]),
+        io:format("Tree: ~p~n", [Tree]),
+        io:format("Acc: ~p~n", [Acc]),
+
+        Keys = [{Bucket, Key} || Key <- gb_trees:keys(Tree)],
+        Acc ++ Keys        
+    end,
+    AllKeys = gb_trees_fold(F, [], State#state.obj_tree),
+    {reply, AllKeys, State};
+    
+handle_call({list_bucket, '_'}, _From, State) ->
+    % List all buckets...
+    Buckets = gb_trees:keys(State#state.obj_tree),
+    {reply, Buckets, State};
+    
+handle_call({list_bucket, {filter, Bucket, Fun}}, _From, State) ->
+    case gb_trees:lookup(Bucket, State#state.obj_tree) of
+        {value, Tree} ->
+            Keys = gb_trees:keys(Tree),
+            FilteredKeys = lists:filter(Fun, Keys),
+            {reply, FilteredKeys, State};
+            
+        none -> 
+            {reply, [], State}
+    end;
+    
+handle_call({list_bucket, Bucket}, _From, State) ->
+    case gb_trees:lookup(Bucket, State#state.obj_tree) of
+        {value, Tree} ->
+            Keys = gb_trees:keys(Tree),
+            {reply, Keys, State};
+            
+        none -> 
+            {reply, [], State}
+    end;
+
+handle_call(stop, _From, State) -> 
+    {reply, ok, State}.
+
+%% @private
+handle_cast(_, State) -> {noreply, State}.
+
+%% @private
+handle_info(_Msg, State) -> {noreply, State}.
+
+%% @private
+terminate(_Reason, _State) -> ok.
+
+%% @private
+code_change(_OldVsn, State, _Extra) -> {ok, State}.
+
+%%
+%% Private Functions
+%%
+
+% Lookup the #entry record specified by Bucket/Key.    
+% Returns {value, Entry} or none.
+obj_find(Bucket, Key, State) -> 
+    Level1 = State#state.obj_tree,
+    case gb_trees:lookup(Bucket, Level1) of
+        {value, Level2} -> gb_trees:lookup(Key, Level2);
+        none -> none
+    end.
+
+% Stores the #entry record in the obj_tree.    
+% Returns NewState.
+obj_store(Entry, State) -> 
+    Bucket = Entry#entry.bucket,
+    Key = Entry#entry.key,
+    Level1 = State#state.obj_tree,
+    Level2 = case gb_trees:lookup(Bucket, Level1) of
+        {value, X} -> X;
+        none -> gb_trees:empty()
+    end,
+    
+    NewLevel2 = gb_trees:enter(Key, Entry, Level2),
+    NewLevel1 = gb_trees:enter(Bucket, NewLevel2, Level1),
+    State#state { obj_tree=NewLevel1 }.
+    
+% Remove the #entry record in the obj_tree.
+% Returns NewState.
+obj_delete(Entry, State) ->
+    Bucket = Entry#entry.bucket,
+    Key = Entry#entry.key,
+    Level1 = State#state.obj_tree,
+    {value, Level2} = gb_trees:lookup(Bucket, Level1),
+    NewLevel2 = gb_trees:delete(Key, Level2),
+    NewLevel1 = gb_trees:update(Bucket, NewLevel2, Level1),
+    State#state { obj_tree=NewLevel1 }.
+    
+    
+% Get the oldest object in the cache. Assumes that 
+% the cache has at least one entry.
+% Returns Entry.
+time_oldest(State) ->
+    {_, Entry} = gb_trees:smallest(State#state.time_tree),
+    Entry.
+    
+% Store the #entry record in the time_tree.
+% Assumes the entry does not yet exist.
+% Returns NewState.
+time_store(Entry, State) ->
+    Now = Entry#entry.now,
+    TimeTree = State#state.time_tree,
+    NewTimeTree = gb_trees:insert(Now, Entry, TimeTree),
+    State#state { time_tree = NewTimeTree }.
+    
+% Remove the #entry record in the time_tree.
+% Assumes the entry exists.
+% Returns NewState.
+time_delete(Entry, State) ->
+    Now = Entry#entry.now,
+    TimeTree = State#state.time_tree,
+    NewTimeTree = gb_trees:delete(Now, TimeTree),
+    State#state { time_tree = NewTimeTree }.
+
+% Check if this object is past the max_ttl setting.
+% Returns 'true' or 'false'.
+exceeds_max_ttl(Entry, MaxTTL) ->
+    Created = Entry#entry.created,
+    Diff = (timer:now_diff(now(), Created) / 1000 / 1000),
+    Diff > MaxTTL.
+
+
+% Extend the lease of an object in cache. Remove
+% the old timer, create a new timer, and update the
+% object.
+% Returns the new #entry record.
+extend_lease(Entry, TTL) ->
+    % Cancel the old timer...
+    cancel_timer(Entry),
+
+    % Create the new timer...
+    MS = trunc(timer:seconds(TTL)),
+    UniqueRef = make_ref(),
+    Bucket = Entry#entry.bucket,
+    Key = Entry#entry.key,
+    Request = {eject_from_cache, {Bucket, Key}, UniqueRef},
+    {ok, TRef} = timer:apply_after(MS, gen_server, call, [self(), Request]),
+    
+    % Update the entry...
+    Entry#entry { tref={TRef, UniqueRef} }.
+    
+% Cancel the timeout of an event in cache.
+% Called before removing.
+cancel_timer(Entry) ->
+    case Entry#entry.tref of
+        {OldTRef, _} -> timer:cancel(OldTRef);
+        undefined    -> ok
+    end.
+
+% If we are above our max memory, then trim out 
+% the oldest objects...
+trim_while_too_big(State) when 
+    State#state.used_memory =< State#state.max_memory ->
+    State;
+trim_while_too_big(State) ->
+    % Delete the oldest object...
+    Entry = time_oldest(State),
+    Bucket = Entry#entry.bucket,
+    Key = Entry#entry.key,
+    {_, ok, NewState} = handle_call({delete, {Bucket, Key}}, self(), State),
+
+    % Loop...
+    trim_while_too_big(NewState).
+
+gb_trees_fold(Fun, Acc, Tree) ->
+    Iterator = gb_trees:iterator(Tree),
+    gb_trees_fold_inner(Fun, Acc, gb_trees:next(Iterator)).
+
+gb_trees_fold_inner(_, Acc, none) -> Acc;
+gb_trees_fold_inner(Fun, Acc, {Key, Val, Iterator}) ->
+    NewAcc = Fun(Key, Val, Acc),
+    gb_trees_fold_inner(Fun, NewAcc, gb_trees:next(Iterator)).
+
+
+%%
+%% Test
+%%
+
+% @private
+simple_test() ->
+    riak_test_util:standard_backend_test(riak_cache_backend, []).
+    
+% @private
+ttl_test() ->
+    % Set TTL to 0.02 seconds...
+    Config = [{riak_cache_backend_ttl, 0.02}],
+    {ok, State} = start(42, Config),
+
+    Bucket = <<"Bucket">>, 
+    Key = <<"Key">>,
+    Value = <<"Value">>,
+
+    % Put an object...
+    put(State, {Bucket, Key}, Value),
+    
+    % Get it immediately...
+    {ok, Value} = get(State, {Bucket, Key}),
+    {ok, Value} = get(State, {Bucket, Key}),
+    {ok, Value} = get(State, {Bucket, Key}),
+    
+    % Wait 0.05 seconds, object should be cleared from cache...
+    timer:sleep(trunc(timer:seconds(0.05))),
+    
+    % Get the object again, it should be missing...
+    {error, notfound} = get(State, {Bucket, Key}),
+    
+    ok.
+    
+max_ttl_test() ->
+    % Set TTL to 0.04 seconds...
+    % Set Max TTL to 0.9 seconds...
+    Config = [{riak_cache_backend_ttl, 0.04}, {riak_cache_backend_max_ttl, 0.09}],
+    {ok, State} = start(42, Config),
+
+    Bucket = <<"Bucket">>, 
+    Key = <<"Key">>,
+    Value = <<"Value">>,
+
+    % Put an object...
+    put(State, {Bucket, Key}, Value),
+
+    % Wait 0.03 seconds, access it...
+    timer:sleep(trunc(timer:seconds(0.03))),
+    {ok, Value} = get(State, {Bucket, Key}),
+    
+    % Wait 0.03 seconds, access it...
+    timer:sleep(trunc(timer:seconds(0.03))),
+    {ok, Value} = get(State, {Bucket, Key}),
+    
+    % Wait 0.05 seconds, it should expire...
+    timer:sleep(trunc(timer:seconds(0.05))),
+    
+    % This time it should be gone...
+    {error, notfound} = get(State, {Bucket, Key}),
+    
+    ok.
+    
+% @private
+max_memory_test() ->
+    % Set max size to 1.5kb...
+    Config = [{riak_cache_backend_memory, 1.5 * (1 / 1024)}],
+    {ok, State} = start(42, Config),
+
+    Bucket = <<"Bucket">>, 
+    Key1 = <<"Key1">>,
+    Value1 = list_to_binary(string:copies("1", 1024)),
+    Key2 = <<"Key2">>,
+    Value2 = list_to_binary(string:copies("2", 1024)),
+
+    % Write Key1 to the datastore...
+    put(State, {Bucket, Key1}, Value1),
+    
+    % Fetch it...
+    {ok, Value1} = get(State, {Bucket, Key1}),
+    
+    % Pause for a second to let clocks increment...
+    timer:sleep(timer:seconds(1)),
+    
+    % Write Key2 to the datastore...
+    put(State, {Bucket, Key2}, Value2),
+    
+    % Key1 should be kicked out...
+    {error, notfound} = get(State, {Bucket, Key1}),
+
+    % Key2 should still be present...
+    {ok, Value2} = get(State, {Bucket, Key2}),
+    
+    ok.

apps/riak/src/riak_dets_backend.erl

 -module(riak_dets_backend).
 
 -include_lib("eunit/include/eunit.hrl").
--export([start/1,stop/1,get/2,put/3,list/1,list_bucket/2,delete/2]).
+-export([start/2,stop/1,get/2,put/3,list/1,list_bucket/2,delete/2]).
 
 % @type state() = term().
 -record(state, {table}).
 
-% @private
-simple_test() ->
-    application:set_env(riak, riak_dets_backend_root,
-                        "test/dets-backend"),
-    ?assertCmd("rm -rf test/dets-backend"),
-    riak_test_util:standard_backend_test(riak_dets_backend).
-
-% @spec start(Partition :: integer()) ->
+% @spec start(Partition :: integer(), Config :: proplist()) ->
 %                        {ok, state()} | {{error, Reason :: term()}, state()}
-start(Partition) ->
-    ConfigRoot = riak:get_app_env(riak_dets_backend_root),
+start(Partition, Config) ->
+    ConfigRoot = proplists:get_value(riak_dets_backend_root, Config),
     if ConfigRoot =:= undefined ->
             riak:stop("riak_dets_backend_root unset, failing.~n");
        true -> ok
     MList = dets:match(T,MatchSpec),
     list(MList,[]).
 
+%%
+%% Test
+%%
+
+% @private
+simple_test() ->
+    ?assertCmd("rm -rf test/dets-backend"),
+    Config = [{riak_dets_backend_root, "test/dets-backend"}],
+    riak_test_util:standard_backend_test(riak_dets_backend, Config).

apps/riak/src/riak_ets_backend.erl

 %% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 %% KIND, either express or implied.  See the License for the
 %% specific language governing permissions and limitations
-%% under the License.    
+%% under the License.
 
 % @doc riak_ets_backend is a Riak storage backend using ets.
 
 -behaviour(gen_server).
 
 -include_lib("eunit/include/eunit.hrl").
--export([start/1,stop/1,get/2,put/3,list/1,list_bucket/2,delete/2]).
+-export([start/2,stop/1,get/2,put/3,list/1,list_bucket/2,delete/2]).
 
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
 	 terminate/2, code_change/3]).
 % @type state() = term().
 -record(state, {t}).
 
-% @private
-simple_test() ->
-    riak_test_util:standard_backend_test(riak_ets_backend).
-
-% @spec start(Partition :: integer()) ->
+% @spec start(Partition :: integer(), Config :: proplist()) ->
 %                        {ok, state()} | {{error, Reason :: term()}, state()}
-start(Partition) ->
+start(Partition, _Config) ->
     gen_server:start_link(?MODULE, [Partition], []).
 
 %% @private
 
 %% @private
 code_change(_OldVsn, State, _Extra) -> {ok, State}.
+
+%%
+%% Test
+%%
+
+% @private
+simple_test() ->
+    riak_test_util:standard_backend_test(riak_ets_backend, []).

apps/riak/src/riak_fs_backend.erl

 %% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 %% KIND, either express or implied.  See the License for the
 %% specific language governing permissions and limitations
-%% under the License.
+%% under the License.    
 
 % @doc riak_fs_backend is a simple filesystem storage system.
 
 -module(riak_fs_backend).
--export([start/1,stop/1,get/2,put/3,list/1,list_bucket/2,delete/2]).
+-export([start/2,stop/1,get/2,put/3,list/1,list_bucket/2,delete/2]).
 -include_lib("eunit/include/eunit.hrl").
 % @type state() = term().
 -record(state, {dir}).
 
-%% @spec start(Partition :: integer()) ->
+%% @spec start(Partition :: integer(), Config :: proplist()) ->
 %%          {ok, state()} | {{error, Reason :: term()}, state()}
 %% @doc Start this backend.  'riak_fs_backend_root' must be set in
 %%      Riak's application environment.  It must be set to a string
 %%      representing the base directory where this backend should
 %%      store its files.
-start(Partition) ->
+start(Partition, Config) ->
     PartitionName = integer_to_list(Partition),
-    ConfigRoot = riak:get_app_env(riak_fs_backend_root),
+    ConfigRoot = proplists:get_value(riak_fs_backend_root, Config),
     if
         ConfigRoot =:= undefined ->
             riak:stop("riak_fs_backend_root unset, failing.");
         true -> file:read_file(File)
     end.
 
-%% @spec atomic_write(state(), File :: string(), Val :: binary()) ->
+%% @spec atomic_write(File :: string(), Val :: binary()) ->
 %%       ok | {error, Reason :: term()}
 %% @doc store a atomic value to disk. Write to temp file and rename to
 %%       normal path.
-atomic_write(_State, File, Val) ->
+atomic_write(File, Val) ->
     FakeFile = File ++ ".tmpwrite",
     case file:write_file(FakeFile, Val) of
         ok ->
 %% @spec put(state(), BKey :: riak_object:bkey(), Val :: binary()) ->
 %%         ok | {error, Reason :: term()}
 %% @doc Store Val under Bkey
-put(State,BKey,Val) ->
+put(State,BKey,Val) ->       
     File = location(State,BKey),
     case filelib:ensure_dir(File) of
-        ok -> atomic_write(State, File, Val);
+        ok -> atomic_write(File, Val);
         X -> X
     end.
 
 %%
 
 simple_test() ->
-    application:set_env(riak, riak_fs_backend_root,
-                        "test/fs-backend"),
-    ?assertCmd("rm -rf test/fs-backend"),
-    riak_test_util:standard_backend_test(riak_fs_backend).
+   ?assertCmd("rm -rf test/fs-backend"),
+   Config = [{riak_fs_backend_root, "test/fs-backend"}],
+   riak_test_util:standard_backend_test(riak_fs_backend, Config).
 
 dirty_clean_test() ->
     Dirty = "abc=+/def",

apps/riak/src/riak_gb_trees_backend.erl

 -module(riak_gb_trees_backend).
 
 -include_lib("eunit/include/eunit.hrl").
--export([start/1,stop/1,get/2,put/3,list/1,list_bucket/2,delete/2]).
+-export([start/2, stop/1,get/2,put/3,list/1,list_bucket/2,delete/2]).
 
 % @type state() = term().
 -record(state, {pid}).
 
 
-% @private
-simple_test() ->
-    application:set_env(riak, riak_gb_trees_backend_root,
-                        "test/gb_trees-backend"),
-    ?assertCmd("rm -rf test/gb_trees-backend"),
-    riak_test_util:standard_backend_test(riak_gb_trees_backend).
-
-% @spec start(Partition :: integer()) ->
+% @spec start(Partition :: integer(), Config :: integer()) ->
 %                        {ok, state()} | {{error, Reason :: term()}, state()}
-start(_Partition) ->
+start(_Partition, _Config) ->
     Pid = spawn(fun() -> 
         {A1,A2,A3} = now(),
         random:seed(A1, A2, A3),
 srv_list_bucket(Tree, Bucket) ->
     [ Key || {B, Key} <- gb_trees:keys(Tree), B == Bucket ].
 
-% riak_gb_trees_backend does not currently serialize. But if it did,
-% it could use the functions below.
-%
-% read_tree(Filename) ->
-%     case file:read_file(Filename) of
-%         {ok, B} -> binary_to_term(zlib:unzip(B));
-%         _ -> gb_trees:empty()
-%     end.        
-%     
-% write_tree(Tree, Filename) ->
-%     B = term_to_binary(Tree),
-%     BC = zlib:zip(B),
-%     ok = file:write_file(Filename, BC).
+
+%%
+%% Test
+%%
+
+% @private
+simple_test() ->
+    riak_test_util:standard_backend_test(riak_gb_trees_backend, []).
+

apps/riak/src/riak_multi_backend.erl

+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License.  You may obtain
+%% a copy of the License at
+
+%%   http://www.apache.org/licenses/LICENSE-2.0
+
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied.  See the License for the
+%% specific language governing permissions and limitations
+%% under the License.    
+
+%% @doc riak_gb_trees_backend is a Riak storage backend using Erlang gb_trees.
+
+-module (riak_multi_backend).
+-export([start/2, stop/1,get/2,put/3,list/1,list_bucket/2,delete/2]).
+
+-include_lib("eunit/include/eunit.hrl").
+
+-record (state, {backends, default_backend}).
+
+%% @doc
+%% riak_multi_backend allows you to run multiple backends within a 
+%% single Riak instance. The 'backend' property of a bucket specifies
+%% the backend in which the object should be stored. If no 'backend'
+%% is specified, then the 'multi_backend_default' setting is used.
+%% If this is unset, then the first defined backend is used.
+%% 
+%% === Configuration == 
+%% 
+%%     {storage_backend, riak_multi_backend},
+%%     {multi_backend_default, first_backend},
+%%     {multi_backend, [ 
+%%       % format: {name, module, [Configs]}
+%%       {first_backend, riak_xxx_backend, [
+%%         {config1, ConfigValue1},
+%%         {config2, ConfigValue2}
+%%       ]},
+%%       {second_backend, riak_yyy_backend, [
+%%         {config1, ConfigValue1},
+%%         {config2, ConfigValue2}
+%%       ]}
+%%     ]}
+%%
+%%
+%% Then, tell a bucket which one to use...
+%%
+%%     riak_bucket:set_bucket(<<"MY_BUCKET">>, [{backend, second_backend}])
+%%
+
+
+% @spec start(Partition :: integer(), Config :: integer()) ->
+%                        {ok, state()} | {{error, Reason :: term()}, state()}
+start(Partition, Config) ->
+    % Sanity checking...
+    Defs = proplists:get_value(multi_backend, Config),
+    assert(is_list(Defs), {invalid_config_setting, multi_backend, list_expected}),
+    assert(length(Defs) > 0, {invalid_config_setting, multi_backend, list_is_empty}),
+    {First, _, _} = hd(Defs),
+    
+    % Get the default...
+    DefaultBackend = proplists:get_value(multi_backend_default, Config, First),
+    assert(lists:keymember(DefaultBackend, 1, Defs), {invalid_config_setting, multi_backend_default, backend_not_found}),
+    
+    % Start the backends...
+    Backends = [begin
+        {ok, State} = Module:start(Partition, SubConfig),
+        {Name, Module, State}
+    end || {Name, Module, SubConfig} <- Defs],
+
+    {ok, #state { backends=Backends, default_backend=DefaultBackend}}.
+
+% @spec stop(state()) -> ok | {error, Reason :: term()}
+stop(State) -> 
+    Backends = State#state.backends,
+    Results = [Module:stop(SubState) || {_, Module, SubState} <- Backends],
+    ErrorResults = [X || X <- Results, X /= ok],
+    case ErrorResults of
+        [] -> ok;
+        _ -> {error, ErrorResults}
+    end.
+
+% get(state(), Key :: binary()) ->
+%   {ok, Val :: binary()} | {error, Reason :: term()}
+get(State, {Bucket, Key}) ->
+    {_Name, Module, SubState} = get_backend(Bucket, State),
+    Module:get(SubState, {Bucket, Key}).
+
+% put(state(), Key :: binary(), Val :: binary()) ->
+%   ok | {error, Reason :: term()}
+put(State, {Bucket, Key}, Value) -> 
+    {_Name, Module, SubState} = get_backend(Bucket, State),
+    Module:put(SubState, {Bucket, Key}, Value).
+
+% delete(state(), Key :: binary()) ->
+%   ok | {error, Reason :: term()}
+delete(State, {Bucket, Key}) -> 
+    {_Name, Module, SubState} = get_backend(Bucket, State),
+    Module:delete(SubState, {Bucket, Key}).
+
+% list(state()) -> [Key :: binary()]
+list(State) ->
+    F = fun({_, Module, SubState}, Acc) ->
+        Module:list(SubState) ++ Acc
+    end,
+    lists:foldl(F, [], State#state.backends).
+
+% list_bucket(state(), '_') -> [Bucket :: binary()]
+list_bucket(State, '_') -> 
+    F = fun({_, Module, SubState}, Acc) ->
+        Module:list_bucket(SubState, '_') ++ Acc
+    end,
+    lists:foldl(F, [], State#state.backends);
+    
+% list_bucket(state(), {filter, Bucket :: binary(), F :: function()}) -> [Key :: binary()]   
+list_bucket(State, {filter, Bucket, FilterFun}) ->
+    F = fun({_, Module, SubState}, Acc) ->
+        Module:list_bucket(SubState, {filter, Bucket, FilterFun}) ++ Acc
+    end,
+    lists:foldl(F, [], State#state.backends);
+    
+list_bucket(State, Bucket) ->
+    {_Name, Module, SubState} = get_backend(Bucket, State),
+    Module:list_bucket(SubState, Bucket).
+    
+
+% Given a Bucket name and the State, return the
+% backend definition. (ie: {Name, Module, SubState})
+get_backend(Bucket, State) ->
+    % Get the name of the backend...
+    DefaultBackend = State#state.default_backend,
+    BucketProps = riak_bucket:get_bucket(Bucket),
+    BackendName = proplists:get_value(backend, BucketProps, DefaultBackend),
+
+    % Ensure that a backend by that name exists...
+    Backends = State#state.backends,
+    case lists:keyfind(BackendName, 1, Backends) of
+        false -> throw({?MODULE, undefined_backend, BackendName});
+        Backend -> Backend
+    end.
+
+assert(true, _) -> ok;
+assert(false, Error) -> throw({?MODULE, Error}).    
+    
+% @private
+simple_test() ->    
+    % Start the ring manager...
+    crypto:start(),
+    riak_ring_manager:start_link(test),
+    
+    % Set some buckets...
+    riak_bucket:set_bucket(<<"b1">>, [{backend, first_backend}]),
+    riak_bucket:set_bucket(<<"b2">>, [{backend, second_backend}]),
+    
+    % Run the standard backend test...
+    Config = sample_config(),
+    riak_test_util:standard_backend_test(riak_multi_backend, Config).
+
+get_backend_test() ->
+    % Start the ring manager...
+    crypto:start(),
+    riak_ring_manager:start_link(test),
+    
+    % Set some buckets...
+    riak_bucket:set_bucket(<<"b1">>, [{backend, first_backend}]),
+    riak_bucket:set_bucket(<<"b2">>, [{backend, second_backend}]),
+    
+    % Start the backend...    
+    {ok, State} = start(42, sample_config()),
+
+    % Check our buckets...
+    {first_backend, riak_gb_trees_backend, _} = get_backend(<<"b1">>, State),
+    {second_backend, riak_ets_backend, _} = get_backend(<<"b2">>, State),
+    
+    % Check the default...
+    {second_backend, riak_ets_backend, _} = get_backend(<<"b3">>, State),
+    
+    ok.
+
+
+sample_config() ->
+    [
+        {storage_backend, riak_multi_backend},
+        {multi_backend_default, second_backend},
+        {multi_backend, [
+            {first_backend, riak_gb_trees_backend, []},
+            {second_backend, riak_ets_backend, []}
+        ]}
+    ].

apps/riak/src/riak_osmos_backend.erl

 %%    under riak_osmos_backend_root.
 -module(riak_osmos_backend).
 
--export([start/1,stop/1,get/2,put/3,list/1,list_bucket/2,delete/2]).
+-export([start/2,stop/1,get/2,put/3,list/1,list_bucket/2,delete/2]).
 -include_lib("eunit/include/eunit.hrl").
 -record(state, {table}).
 
-%% @spec start(Partition :: integer()) ->
+%% @spec start(Partition :: integer(), Config :: proplist()) ->
 %%                        {ok, state()} | {{error, Reason :: term()}, state()}
-start(Partition) ->
-    ConfigRoot = riak:get_app_env(riak_osmos_backend_root),
+start(Partition, Config) ->
+    ConfigRoot = proplists:get_value(riak_osmos_backend_root, Config),
     if ConfigRoot =:= undefined ->
             riak:stop("riak_osmos_backend_root unset, failing.");
        true -> ok
 simple_test() ->
     case application:start(osmos) of
        ok ->
-            application:set_env(riak, riak_osmos_backend_root,
-                                "test/osmos-backend"),
             ?assertCmd("rm -rf test/osmos-backend"),
-            riak_test_util:standard_backend_test(riak_osmos_backend);
+            Config = [{riak_osmos_backend_root, "test/osmos-backend"}],
+            riak_test_util:standard_backend_test(riak_osmos_backend, Config);
        Error ->
             ?debugFmt("Skipping osmos tests: ~p", [Error])
     end.

apps/riak/src/riak_test_util.erl

 -module(riak_test_util).
--export([standard_backend_test/1,setup_mockring1/0]).
+-export([standard_backend_test/2,setup_mockring1/0]).
 -include_lib("eunit/include/eunit.hrl").
 
-standard_backend_test(BackendMod) ->
-    {ok, S} = BackendMod:start(42),
+standard_backend_test(BackendMod, Config) ->
+    {ok, S} = BackendMod:start(42, Config),
     ?assertEqual(ok, BackendMod:put(S,{<<"b1">>,<<"k1">>},<<"v1">>)),
     ?assertEqual(ok, BackendMod:put(S,{<<"b2">>,<<"k2">>},<<"v2">>)),
     ?assertEqual({ok,<<"v2">>}, BackendMod:get(S,{<<"b2">>,<<"k2">>})),

apps/riak/src/riak_vnode.erl

     gen_fsm:start(?MODULE, [Idx], []).
 init([VNodeIndex]) ->
     Mod = riak:get_app_env(storage_backend),
-    {ok, ModState} = Mod:start(VNodeIndex),
+    Configuration = riak:get_app_env(),
+    {ok, ModState} = Mod:start(VNodeIndex, Configuration),
     StateData0 = #state{idx=VNodeIndex,mod=Mod,modstate=ModState},
     {next_state, StateName, StateData, Timeout} = hometest(StateData0),
     {ok, StateName, StateData, Timeout}.