riak / src / riak_object.erl

%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License.  You may obtain
%% a copy of the License at

%%   http://www.apache.org/licenses/LICENSE-2.0

%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied.  See the License for the
%% specific language governing permissions and limitations
%% under the License.    

%% @doc The container for data stored in Riak.
%%      
%%      
-module(riak_object).
-include_lib("eunit/include/eunit.hrl").

-record(r_content, {
          metadata :: dict(),
          value :: term()
         }).

%% @type riak_object().  Opaque container for Riak objects.
-record(r_object, {
          bucket :: atom(),
          key :: binary(),
          contents :: [#r_content{}],
          vclock :: [vclock:vclock()],
          updatemetadata=dict:store(clean, true, dict:new()) :: dict(),
          updatevalue :: term()
         }).

-define(MAX_KEY_SIZE, 65536).

%% @type key()=binary().
%% @type bucket()=atom().
%% @type value()=term().

-export([new/3, ancestors/1, reconcile/2, increment_vclock/2, equal/2]).
-export([key/1, get_metadata/1, get_metadatas/1, get_values/1, get_value/1]).
-export([vclock/1, update_value/2, update_metadata/2, bucket/1, value_count/1]).
-export([get_update_metadata/1, get_update_value/1, get_contents/1]).
-export([merge/2, apply_updates/1, syntactic_merge/3]).
-export([set_contents/2, set_vclock/2]). %% INTERNAL, only for riak_*

object_test() ->
    B = buckets_are_atoms,
    K = <<"keys are binaries">>,
    V = <<"values are anything">>,
    O = riak_object:new(B,K,V),
    B = riak_object:bucket(O),
    K = riak_object:key(O),
    V = riak_object:get_value(O),
    O.

update_test() ->
    O = object_test(),
    V2 = <<"testvalue2">>,
    O1 = riak_object:update_value(O, V2),
    O2 = riak_object:apply_updates(O1),
    V2 = riak_object:get_value(O2),
    {O,O2}.

ancestor_test() ->
    {O,O2} = update_test(),
    O3 = riak_object:increment_vclock(O2,self()),
    [O] = riak_object:ancestors([O,O3]),
    {O,O3}.

reconcile_test() ->
    {O,O3} = ancestor_test(),
    O3 = riak_object:reconcile([O,O3],true),
    O3 = riak_object:reconcile([O,O3],false),
    {O,O3}.

merge_test() ->
    {O,O3} = reconcile_test(),
    O3 = riak_object:syntactic_merge(O,O3,node_does_not_matter_here),
    {O,O3}.    

largekey_test() ->
    TooLargeKey = <<0:(65537*8)>>,
    try
        riak_object:new(a, TooLargeKey, <<>>)
    catch throw:{error, key_too_large} ->
            ok
    end.
            


%% @spec new(Bucket::bucket(), Key::key(), Value::value()) -> riak_object()
%% @doc Constructor for new riak objects.
new(B, K, V) when is_atom(B), is_binary(K) ->
    case size(K) > ?MAX_KEY_SIZE of
        true ->
            throw({error,key_too_large});
        false ->
            Contents = [#r_content{metadata=dict:new(), value=V}],
            #r_object{bucket=B,key=K,contents=Contents,vclock=vclock:fresh()}
    end.

%% @spec equal(riak_object(), riak_object()) -> true | false
%% @doc Deep (expensive) comparison of Riak objects.
equal(Obj1,Obj2) ->
    case Obj1#r_object.bucket =:= Obj2#r_object.bucket of
        false -> false;
        true ->
            case Obj1#r_object.key =:= Obj2#r_object.key of
                false -> false;
                true -> equal1(Obj1,Obj2)
            end
    end.
equal1(Obj1,Obj2) ->
    case vclock:equal(vclock(Obj1),vclock(Obj2)) of
        false -> false;
        true -> equal2(Obj1,Obj2)
    end.
equal2(Obj1,Obj2) ->
    UM1 = lists:sort(dict:to_list(Obj1#r_object.updatemetadata)),
    UM2 = lists:sort(dict:to_list(Obj2#r_object.updatemetadata)),
    case UM1 =:= UM2 of
        false -> false;
        true ->
            case Obj1#r_object.updatevalue =:= Obj2#r_object.updatevalue of
                false -> false;
                true -> 
                    Cont1 = lists:sort(Obj1#r_object.contents),
                    Cont2 = lists:sort(Obj2#r_object.contents),
                    equal_contents(Cont1,Cont2)
            end
    end.
equal_contents([],[]) -> true;
equal_contents(_,[]) -> false;
equal_contents([],_) -> false;
equal_contents([C1|R1],[C2|R2]) ->
    MD1 = lists:sort(dict:to_list(C1#r_content.metadata)),
    MD2 = lists:sort(dict:to_list(C2#r_content.metadata)),
    case MD1 =:= MD2 of
        false -> false;
        true ->
            case C1#r_content.value =:= C2#r_content.value of
                false -> false;
                true -> equal_contents(R1,R2)
            end
    end.

%% @spec reconcile([riak_object()], boolean()) -> riak_object()
%% @doc  Reconcile a list of riak objects.  If AllowMultiple is true,
%%       the riak_object returned may contain multiple values if Objects
%%       contains sibling versions (objects that could not be syntactically
%%       merged).   If AllowMultiple is false, the riak_object returned will
%%       contain the value of the most-recently-updated object, as per the
%%       X-Riak-Last-Modified header.
reconcile(Objects, AllowMultiple) ->
    RObjs = reconcile(Objects),
    AllContents = lists:flatten([O#r_object.contents || O <- RObjs]),
    Contents = case AllowMultiple of
        false ->
            [hd(lists:sort(fun compare_content_dates/2, AllContents))];
	true ->
	    AllContents
    end,
    VClock = vclock:merge([O#r_object.vclock || O <- RObjs]),
    HdObj = hd(RObjs),
    HdObj#r_object{contents=Contents,vclock=VClock,
                   updatemetadata=dict:store(clean, true, dict:new()),
                   updatevalue=undefined}.

%% @spec ancestors([riak_object()]) -> [riak_object()]
%% @doc  Given a list of riak_object()s, return the objects that are pure 
%%       ancestors of other objects in the list, if any.  The changes in the
%%       objects returned by this function are guaranteed to be reflected in
%%       the other objects in Objects, and can safely be discarded from the list
%%       without losing data.
ancestors(Objects) ->
    ToRemove = [[O2 || O2 <- Objects,
     vclock:descends(O1#r_object.vclock,O2#r_object.vclock),
     (vclock:descends(O2#r_object.vclock,O1#r_object.vclock) == false)]
		|| O1 <- Objects],
    lists:flatten(ToRemove).

%% @spec reconcile([riak_object()]) -> [riak_object()]
reconcile(Objects) ->
    All = sets:from_list(Objects),
    Del = sets:from_list(ancestors(Objects)),
    remove_duplicate_objects(sets:to_list(sets:subtract(All, Del))).

remove_duplicate_objects(Os) -> rem_dup_objs(Os,[]).
rem_dup_objs([],Acc) -> Acc;
rem_dup_objs([O|Rest],Acc) ->
    EqO = [AO || AO <- Acc, riak_object:equal(AO,O) =:= true],
    case EqO of
        [] -> rem_dup_objs(Rest,[O|Acc]);
        _ -> rem_dup_objs(Rest,Acc)
    end.

compare_content_dates(C1,C2) ->
    % true if C1 was modifed later than C2
    A1 = calendar:datetime_to_gregorian_seconds(
	   httpd_util:convert_request_date(
             dict:fetch(<<"X-Riak-Last-Modified">>, C1#r_content.metadata))),
    B1 = calendar:datetime_to_gregorian_seconds(
	   httpd_util:convert_request_date(
             dict:fetch(<<"X-Riak-Last-Modified">>, C2#r_content.metadata))),
    A1 > B1.

%% @spec merge(riak_object(), riak_object()) -> riak_object()
%% @doc  Merge the contents and vclocks of OldObject and NewObject. 
%%       Note:  This function calls apply_updates on NewObject.
merge(OldObject, NewObject) ->
    NewObj1 = apply_updates(NewObject),
    OldObject#r_object{contents= NewObj1#r_object.contents ++
                                 OldObject#r_object.contents,
		     vclock=vclock:merge([OldObject#r_object.vclock,
					  NewObj1#r_object.vclock]),
		     updatemetadata=dict:store(clean, true, dict:new()),
		     updatevalue=undefined}.

%% @spec apply_updates(riak_object()) -> riak_object()
%% @doc  Promote pending updates (made with the update_value() and 
%%       update_metadata() calls) to this riak_object.
apply_updates(Object=#r_object{}) ->
    VL = case Object#r_object.updatevalue of
	     undefined ->
		 [C#r_content.value || C <- Object#r_object.contents];
	     _ ->
		 [Object#r_object.updatevalue]
	 end,
    MD = case dict:find(clean, Object#r_object.updatemetadata) of
             {ok,_} ->
                 MDs = [C#r_content.metadata || C <- Object#r_object.contents],
                 case Object#r_object.updatevalue of
                     undefined -> MDs;
                     _ -> [hd(MDs)]
                 end;
             error ->
		 [dict:erase(clean,Object#r_object.updatemetadata) || _X <- VL]
	 end,
    Contents = [#r_content{metadata=M,value=V} || {M,V} <- lists:zip(MD, VL)],
    Object#r_object{contents=Contents,
                 updatemetadata=dict:store(clean, true, dict:new()),
                 updatevalue=undefined}.

%% @spec bucket(riak_object()) -> bucket()
%% @doc Return the containing bucket for this riak_object.
bucket(#r_object{bucket=Bucket}) -> Bucket.

%% @spec key(riak_object()) -> key()
%% @doc  Return the key for this riak_object.
key(#r_object{key=Key}) -> Key.

%% @spec vclock(riak_object()) -> vclock:vclock()
%% @doc  Return the vector clock for this riak_object.
vclock(#r_object{vclock=VClock}) -> VClock.

%% @spec value_count(riak_object()) -> non_neg_integer()
%% @doc  Return the number of values (siblings) of this riak_object.
value_count(#r_object{contents=Contents}) -> length(Contents).

%% @spec get_contents(riak_object()) -> [{dict(), value()}]
%% @doc  Return the contents (a list of {metadata, value} tuples) for 
%%       this riak_object.
get_contents(#r_object{contents=Contents}) ->
    [{Content#r_content.metadata, Content#r_content.value} ||
        Content <- Contents].

%% @spec get_metadata(riak_object()) -> dict()
%% @doc  Assert that this riak_object has no siblings and return its associated
%%       metadata.  This function will fail with a badmatch error if the 
%%       object has siblings (value_count() > 1).
get_metadata(O=#r_object{}) ->
    % this blows up intentionally (badmatch) if more than one content value!
    [{Metadata,_V}] = get_contents(O), 
    Metadata.

%% @spec get_metadatas(riak_object()) -> [dict()]
%% @doc  Return a list of the metadata values for this riak_object.  
get_metadatas(#r_object{contents=Contents}) ->
    [Content#r_content.metadata || Content <- Contents].

%% @spec get_values(riak_object()) -> [value()]
%% @doc  Return a list of object values for this riak_object.
get_values(#r_object{contents=C}) -> [Content#r_content.value || Content <- C].

%% @spec get_value(riak_object()) -> value()
%% @doc  Assert that this riak_object has no siblings and return its associated
%%       value.  This function will fail with a badmatch error if the object
%%       has siblings (value_count() > 1).
get_value(Object=#r_object{}) ->
    % this blows up intentionally (badmatch) if more than one content value!
    [{_M,Value}] = get_contents(Object),
    Value.

%% @spec update_metadata(riak_object(), dict()) -> riak_object()
%% @doc  Set the updated metadata of an object to M.
update_metadata(Object=#r_object{}, M) ->
    Object#r_object{updatemetadata=dict:erase(clean, M)}.

%% @spec update_value(riak_object(), value()) -> riak_object()
%% @doc  Set the updated value of an object to V
update_value(Object=#r_object{}, V) -> Object#r_object{updatevalue=V}.

%% @spec get_update_metadata(riak_object()) -> dict()
%% @doc  Return the updated metadata of this riak_object.
get_update_metadata(#r_object{updatemetadata=UM}) -> UM.

%% @spec get_update_value(riak_object()) -> value()
%% @doc  Return the updated value of this riak_object.
get_update_value(#r_object{updatevalue=UV}) -> UV.

%% @spec set_vclock(riak_object(), vclock:vclock()) -> riak_object()
%% @doc  INTERNAL USE ONLY.  Set the vclock of riak_object O to V.
set_vclock(Object=#r_object{}, VClock) -> Object#r_object{vclock=VClock}.

%% @spec increment_vclock(riak_object(), term()) -> riak_object()
%% @doc  Increment the entry for ClientId in O's vclock.
increment_vclock(Object=#r_object{}, ClientId) ->
    Object#r_object{vclock=vclock:increment(ClientId, Object#r_object.vclock)}.

%% @spec set_contents(riak_object(), [{dict(), value()}]) -> riak_object()
%% @doc  INTERNAL USE ONLY.  Set the contents of riak_object to the 
%%       {Metadata, Value} pairs in MVs. Normal clients should use the
%%       set_update_[value|metadata]() + apply_updates() method for changing
%%       object contents.
set_contents(Object=#r_object{}, MVs) when is_list(MVs) ->
    Object#r_object{contents=[#r_content{metadata=M,value=V} || {M, V} <- MVs]}.

is_updated(_Object=#r_object{updatemetadata=M,updatevalue=V}) ->
    case dict:find(clean, M) of
        error -> true;
        {ok,_} ->
            case V of
                undefined -> false;
                _ -> true
            end
    end.
            
syntactic_merge(CurrentObject, NewObject, FromClientId) ->
    case ancestors([CurrentObject, NewObject]) of
        [OlderObject] ->
            WinObject = case vclock(OlderObject) =:= vclock(CurrentObject) of
                true -> NewObject;
                false -> CurrentObject
            end,
            case is_updated(WinObject) of
                true -> increment_vclock(apply_updates(WinObject),FromClientId);
                false -> WinObject
            end;
	[] -> 
            case riak_object:equal(CurrentObject, NewObject) of
                true ->
                    NewObject;
                false ->
                    increment_vclock(
                      merge(CurrentObject, NewObject), FromClientId)
            end
    end.
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.