1. Justin Sheehy
  2. riak

Commits

Bryan Fink  committed 588fff8

new bucket-keys support for osmos

  • Participants
  • Parent commits b5ff098
  • Branches default

Comments (0)

Files changed (1)

File src/riak_osmos_backend.erl

View file
 %%    under riak_osmos_backend_root.
 -module(riak_osmos_backend).
 
--export([start/1,stop/1,get/2,put/3,list/1,delete/2]).
+-export([start/1,stop/1,get/2,put/4,list/1,list_bucket/2,delete/2]).
 -record(state, {table}).
 
 %% @spec start(Partition :: integer()) ->
 get(#state{table=Table}, Key) ->
     case osmos:read(Table, Key) of
         {ok, <<>>}  -> {error, notfound}; %% sentinal for delete
-        {ok, Value} -> {ok, Value};
+        {ok, Value} ->
+            {_,_,RVal} = binary_to_term(Value),
+            {ok, RVal};
         not_found   -> {error, notfound}
     end.
 
 %% put(state(), Key :: binary(), Val :: binary()) ->
 %%   ok | {error, Reason :: term()}
 %% key must be 160b
-put(#state{table=Table},Key,Val) ->       
-    osmos:write(Table, Key, Val).
+put(#state{table=Table},{B,K},Key,Val) ->       
+    osmos:write(Table, Key, term_to_binary({B,K,Val})).
 
 %% delete(state(), Key :: binary()) ->
 %%   ok | {error, Reason :: term()}
 -define(SELECT_CHUNK, 1000).
 
 %% list(state()) -> [Key :: binary()]
+list(#state{table=Table}) ->
+    accum(Table, fun(K,_) -> {true, K} end).
+
+%% list_bucket(state(), Bucket :: atom()) -> [Key :: binary()]
+list_bucket(#state{table=Table}, Bucket) ->
+    accum(Table,
+          fun(_,V) ->
+                  case binary_to_term(V) of
+                      {Bucket, Key, _} -> {true, Key};
+                      _                -> false
+                  end
+          end).
+
+%% @spec accum(osmos_table(), function()) -> [term()]
+%% @doc map across the rows in the osmos table, and return
+%%      terms R for which Fun returns {true, R}
 %% Explanation of osmos:select_range params:
 %%   The three functions are, in order, LessLo, LessHi, and Select.
 %%   We are trying to select *all* keys, not a limited range, thus,
 %%   "all keys are less than our desired upper bound".
 %%   Select's only function is to throw away the keys that have been
 %%   deleted (i.e. that have the delete sentinal stored as their value).
-list(#state{table=Table}) ->
-    accum_keys(Table,
-               osmos:select_range(Table,
-                                  fun(_) -> false end,
-                                  fun(_) -> true end,
-                                  fun(_,V) -> V /= <<>> end,
-                                  ?SELECT_CHUNK),
-               []).
-
+accum(Table, Fun) ->
+    accum2(Table, Fun,
+           osmos:select_range(Table,
+                              fun(_) -> false end,
+                              fun(_) -> true end,
+                              fun(_,V) -> V /= <<>> end,
+                              ?SELECT_CHUNK),
+           []).
+                          
 %% simple accumulator to exhaust select_range's continuation should
 %% there be more than SELECT_CHUNK keys to return
-accum_keys(_, {ok, [], _}, Acc) -> lists:append(Acc);
-accum_keys(_, {error, _}, Acc)  -> lists:append(Acc);
-accum_keys(Table, {ok, NewList, Continue}, Acc) ->
-    accum_keys(Table,
-               osmos:select_continue(Table, Continue, ?SELECT_CHUNK),
-               [[ K || {K,_} <- NewList]|Acc]).
+accum2(_, _, {ok, [], _}, Acc) -> lists:append(Acc);
+accum2(_, _, {error, _}, Acc)  -> lists:append(Acc);
+accum2(Table, Fun, {ok, NewList, Continue}, Acc) ->
+    accum2(Table, Fun,
+           osmos:select_continue(Table, Continue, ?SELECT_CHUNK),
+           [[ A || {true, A} <- [ Fun(K,V) || {K,V} <- NewList]]|Acc]).