riak / src / riak_osmos_backend.erl

Full commit
justin 1c37b76 

Bryan Fink b7a6ef6 
Bryan Fink 2e25667 
justin 1c37b76 

Bryan Fink 0b02326 
justin 1c37b76 

Bryan Fink b7a6ef6 

justin 1c37b76 
Bryan Fink b7a6ef6 
justin 1c37b76 

Bryan Fink 0b02326 
justin 1c37b76 

Bryan Fink b7a6ef6 

justin 1c37b76 
Bryan Fink 0b02326 
justin 1c37b76 

Bryan Fink b7a6ef6 

justin 1c37b76 

Bryan Fink 0b02326 
Bryan Fink 588fff8 
Bryan Fink b7a6ef6 
Bryan Fink 588fff8 
Bryan Fink 0b02326 
Bryan Fink 588fff8 

John Muellerleil… 5154bfe 

Bryan Fink 588fff8 

justin 1c37b76 

Bryan Fink 588fff8 

justin 1c37b76 

Bryan Fink 588fff8 

Bryan Fink 2e25667 

Bryan Fink 55097f7 

%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License.  You may obtain
%% a copy of the License at


%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% KIND, either express or implied.  See the License for the
%% specific language governing permissions and limitations
%% under the License.    

%% @doc riak_osmos_backend is a storage backend that makes use of
%%      the Dukes of Erl "osmos" file storage system.
%% Configuration parameters:
%%  riak_osmos_backend_root:
%%       Directory in which to keep osmos files
%%       Required - riak will exit if this option is not set                    
%%  riak_osmost_backend_block_size:
%%       "Block size" parameter, as documented in osmos's docs
%%       osmos_table_format:new/3
%%       Optional - defaulted to 2048
%% Notes:
%%  - Support for the 'delete' operation is strange in osmos.  As such,
%%    delete is implemented here as writing an empty binary for a key's
%%    value.  This is safe because no riak_object will ever serialize
%%    to an empty binary.
%%  - The osmos documentation is not explicit about it, but opening two
%%    tables in the same directory will cause osmos to become confused.
%%    To solve this, each partition opens its table in a unique subdirectory
%%    under riak_osmos_backend_root.

-record(state, {table}).

%% @spec start(Partition :: integer()) ->
%%                        {ok, state()} | {{error, Reason :: term()}, state()}
start(Partition) ->
    ConfigRoot = riak:get_app_env(riak_osmos_backend_root),
    if ConfigRoot =:= undefined ->
            riak:stop("riak_osmos_backend_root unset, failing.");
       true -> ok

    %% put each table in its own directory, named after the partition
    %% osmos requires this - each table must have its own directory
    TableRoot = filename:join([ConfigRoot, integer_to_list(Partition)]),
    case filelib:ensure_dir(TableRoot) of
        ok -> ok;
        _Error ->
            riak:stop("riak_osmos_backend could not ensure"
                      " the existence of its root directory")

    case application:start(osmos) of
        ok -> ok;
        {error,{already_started,osmos}} -> ok

    BlockSize = riak:get_app_env(riak_osmos_backend_block_size, 2048),
    Format = osmos_table_format:new(binary, binary_replace, BlockSize),

    Ready = case osmos:open(Partition, 
                            [{directory, TableRoot},
                             {format, Format}]) of
                {ok, Partition} ->
                {error, Reason} ->
                    riak:stop("osmos:open failed"),
                    {error, Reason}
    {Ready, #state{table=Partition}}.

%% @spec stop(state()) -> ok | {error, Reason :: term()}
stop(#state{table=Table}) ->

%% get(state(), BKey :: riak_object:bkey()) ->
%%   {ok, Val :: binary()} | {error, Reason :: term()}
%% key must be 160b
get(#state{table=Table}, BKey) ->
    case osmos:read(Table, term_to_binary(BKey)) of
        {ok, <<>>}  -> {error, notfound}; %% sentinal for delete
        {ok, Value} -> {ok, Value};
        not_found   -> {error, notfound}

%% put(state(), BKey :: riak_object:bkey(), Val :: binary()) ->
%%   ok | {error, Reason :: term()}
%% key must be 160b
put(#state{table=Table},BKey,Val) ->       
    osmos:write(Table, term_to_binary(BKey), Val).

%% delete(state(), Key :: riak_object:key()) ->
%%   ok | {error, Reason :: term()}
%% key must be 160b
delete(#state{table=Table}, BKey) ->
    osmos:write(Table, term_to_binary(BKey), <<>>). %% sentinal for delete

-define(SELECT_CHUNK, 1000).

%% list(state()) -> [Key :: riak_object:key()]
list(#state{table=Table}) ->
    accum(Table, fun(K,_) -> {true, binary_to_term(K)} end).

%% list_bucket(state(), Bucket :: riak_object:bucket()) -> [Key :: binary()]
list_bucket(#state{table=Table}, Bucket) ->
          fun(Key,_) ->
                  {B, K} = binary_to_term(Key),
                  case Bucket of
                      '_' -> {true, B};
                      {filter, B, Fun} -> 
                                  case Fun([K]) of
                                      true -> {true, K};
                                      _    -> false
                      {filter, _, _} -> false;
                      B -> {true, K};
                      _ -> false

%% @spec accum(osmos_table(), function()) -> [term()]
%% @doc map across the rows in the osmos table, and return
%%      terms R for which Fun returns {true, R}
%% Explanation of osmos:select_range params:
%%   The three functions are, in order, LessLo, LessHi, and Select.
%%   We are trying to select *all* keys, not a limited range, thus,
%%   LessLo always returns 'false' to say "all keys are not less than
%%   our desired lower bound" and LessHi always returns 'true' to say
%%   "all keys are less than our desired upper bound".
%%   Select's only function is to throw away the keys that have been
%%   deleted (i.e. that have the delete sentinal stored as their value).
accum(Table, Fun) ->
    accum2(Table, Fun,
                              fun(_) -> false end,
                              fun(_) -> true end,
                              fun(_,V) -> V /= <<>> end,
%% simple accumulator to exhaust select_range's continuation should
%% there be more than SELECT_CHUNK keys to return
accum2(_, _, {ok, [], _}, Acc) -> lists:append(Acc);
accum2(_, _, {error, _}, Acc)  -> lists:append(Acc);
accum2(Table, Fun, {ok, NewList, Continue}, Acc) ->
    accum2(Table, Fun,
           osmos:select_continue(Table, Continue, ?SELECT_CHUNK),
           [[ A || {true, A} <- [ Fun(K,V) || {K,V} <- NewList]]|Acc]).

%% Test

simple_test() ->
    case application:start(osmos) of
       ok ->
            application:set_env(riak, riak_osmos_backend_root,
            ?assertCmd("rm -rf test/osmos-backend"),
       Error ->
            ?debugFmt("Skipping osmos tests: ~p", [Error])