riak / src / riak_osmos_backend.erl

%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License.  You may obtain
%% a copy of the License at

%%   http://www.apache.org/licenses/LICENSE-2.0

%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied.  See the License for the
%% specific language governing permissions and limitations
%% under the License.    

%% @doc riak_osmos_backend is a storage backend that makes use of
%%      the Dukes of Erl "osmos" file storage system.
%%      http://dukesoferl.blogspot.com/2009/07/osmos.html
%%      http://code.google.com/p/osmos/
%%
%% Configuration parameters:
%%
%%  riak_osmos_backend_root:
%%       Directory in which to keep osmos files
%%       Required - riak will exit if this option is not set                    
%%
%%  riak_osmost_backend_block_size:
%%       "Block size" parameter, as documented in osmos's docs
%%       osmos_table_format:new/3
%%       Optional - defaulted to 2048
%%
%% Notes:
%%  - Support for the 'delete' operation is strange in osmos.  As such,
%%    delete is implemented here as writing an empty binary for a key's
%%    value.  This is safe because no riak_object will ever serialize
%%    to an empty binary.
%%  - The osmos documentation is not explicit about it, but opening two
%%    tables in the same directory will cause osmos to become confused.
%%    To solve this, each partition opens its table in a unique subdirectory
%%    under riak_osmos_backend_root.
-module(riak_osmos_backend).

-export([start/1,stop/1,get/2,put/3,list/1,list_bucket/2,delete/2]).
-include_lib("eunit/include/eunit.hrl").
-record(state, {table}).

%% @spec start(Partition :: integer()) ->
%%                        {ok, state()} | {{error, Reason :: term()}, state()}
start(Partition) ->
    ConfigRoot = riak:get_app_env(riak_osmos_backend_root),
    if ConfigRoot =:= undefined ->
            riak:stop("riak_osmos_backend_root unset, failing.");
       true -> ok
    end,

    %% put each table in its own directory, named after the partition
    %% osmos requires this - each table must have its own directory
    TableRoot = filename:join([ConfigRoot, integer_to_list(Partition)]),
    case filelib:ensure_dir(TableRoot) of
        ok -> ok;
        _Error ->
            riak:stop("riak_osmos_backend could not ensure"
                      " the existence of its root directory")
    end,

    case application:start(osmos) of
        ok -> ok;
        {error,{already_started,osmos}} -> ok
    end,

    BlockSize = riak:get_app_env(riak_osmos_backend_block_size, 2048),
    Format = osmos_table_format:new(binary, binary_replace, BlockSize),

    Ready = case osmos:open(Partition, 
                            [{directory, TableRoot},
                             {format, Format}]) of
                {ok, Partition} ->
                    ok;
                {error, Reason} ->
                    riak:stop("osmos:open failed"),
                    {error, Reason}
            end,
    {Ready, #state{table=Partition}}.

%% @spec stop(state()) -> ok | {error, Reason :: term()}
stop(#state{table=Table}) ->
    osmos:close(Table).

%% get(state(), BKey :: riak_object:bkey()) ->
%%   {ok, Val :: binary()} | {error, Reason :: term()}
%% key must be 160b
get(#state{table=Table}, BKey) ->
    case osmos:read(Table, term_to_binary(BKey)) of
        {ok, <<>>}  -> {error, notfound}; %% sentinal for delete
        {ok, Value} -> {ok, Value};
        not_found   -> {error, notfound}
    end.

%% put(state(), BKey :: riak_object:bkey(), Val :: binary()) ->
%%   ok | {error, Reason :: term()}
%% key must be 160b
put(#state{table=Table},BKey,Val) ->       
    osmos:write(Table, term_to_binary(BKey), Val).

%% delete(state(), Key :: riak_object:key()) ->
%%   ok | {error, Reason :: term()}
%% key must be 160b
delete(#state{table=Table}, BKey) ->
    osmos:write(Table, term_to_binary(BKey), <<>>). %% sentinal for delete

-define(SELECT_CHUNK, 1000).

%% list(state()) -> [Key :: riak_object:key()]
list(#state{table=Table}) ->
    accum(Table, fun(K,_) -> {true, binary_to_term(K)} end).

%% list_bucket(state(), Bucket :: riak_object:bucket()) -> [Key :: binary()]
list_bucket(#state{table=Table}, Bucket) ->
    accum(Table,
          fun(Key,_) ->
                  {B, K} = binary_to_term(Key),
                  case Bucket of
                      '_' -> {true, B};
                      {filter, B, Fun} -> 
                                  case Fun([K]) of
                                      true -> {true, K};
                                      _    -> false
                                  end;
                      {filter, _, _} -> false;
                      B -> {true, K};
                      _ -> false
                  end
          end).

%% @spec accum(osmos_table(), function()) -> [term()]
%% @doc map across the rows in the osmos table, and return
%%      terms R for which Fun returns {true, R}
%% Explanation of osmos:select_range params:
%%   The three functions are, in order, LessLo, LessHi, and Select.
%%   We are trying to select *all* keys, not a limited range, thus,
%%   LessLo always returns 'false' to say "all keys are not less than
%%   our desired lower bound" and LessHi always returns 'true' to say
%%   "all keys are less than our desired upper bound".
%%   Select's only function is to throw away the keys that have been
%%   deleted (i.e. that have the delete sentinal stored as their value).
accum(Table, Fun) ->
    accum2(Table, Fun,
           osmos:select_range(Table,
                              fun(_) -> false end,
                              fun(_) -> true end,
                              fun(_,V) -> V /= <<>> end,
                              ?SELECT_CHUNK),
           []).
                          
%% simple accumulator to exhaust select_range's continuation should
%% there be more than SELECT_CHUNK keys to return
accum2(_, _, {ok, [], _}, Acc) -> lists:append(Acc);
accum2(_, _, {error, _}, Acc)  -> lists:append(Acc);
accum2(Table, Fun, {ok, NewList, Continue}, Acc) ->
    accum2(Table, Fun,
           osmos:select_continue(Table, Continue, ?SELECT_CHUNK),
           [[ A || {true, A} <- [ Fun(K,V) || {K,V} <- NewList]]|Acc]).

%%
%% Test
%%

simple_test() ->
    case application:start(osmos) of
       ok ->
            application:set_env(riak, riak_osmos_backend_root,
                                "test/osmos-backend"),
            ?assertCmd("rm -rf test/osmos-backend"),
            riak_test_util:standard_backend_test(riak_osmos_backend);
       Error ->
            ?debugFmt("Skipping osmos tests: ~p", [Error])
    end.
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.