Commits

Anonymous committed f14ceb7

[mq]: parameterized chash keying

  • Participants
  • Parent commits 33a5ea0

Comments (0)

Files changed (5)

src/riak_bucket.erl

     [{n_val,3},
      {allow_mult,false},
      {linkfun,{modfun, jiak_object, mapreduce_linkfun}},
+     {chash_keyfun, {riak_util, chash_std_keyfun}},
      {old_vclock, 86400},
      {young_vclock, 21600},
      {big_vclock, 50},

src/riak_get_fsm.erl

 initialize(timeout, StateData0=#state{timeout=Timeout, req_id=ReqId,
                                       bkey={Bucket,Key}, ring=Ring}) ->
     RealStartTime = riak_util:moment(),
-    DocIdx = chash:key_of({Bucket, Key}),
+    DocIdx = riak_util:chash_key({Bucket, Key}),
     riak_eventer:notify(riak_get_fsm, get_fsm_start,
                         {ReqId, RealStartTime, Bucket, Key}),
     Msg = {self(), {Bucket,Key}, ReqId},

src/riak_map_executor.erl

 %% @private
 init([Ring,{{Bucket,Key},KeyData},QTerm0,PhasePid]) ->
     riak_eventer:notify(riak_map_executor, mapexec_start, start),
-    DocIdx = chash:key_of({Bucket,Key}),
+    DocIdx = riak_util:chash_key({Bucket,Key}),
     BucketProps = riak_bucket:get_bucket(Bucket, Ring),
     LinkFun = case QTerm0 of
         {link,_,_,_} -> proplists:get_value(linkfun, BucketProps);

src/riak_put_fsm.erl

     Key = riak_object:key(RObj),
     riak_eventer:notify(riak_put_fsm, put_fsm_start,
                         {ReqId, RealStartTime, Bucket, Key}),
-    DocIdx = chash:key_of({Bucket, Key}),
+    DocIdx = riak_util:chash_key({Bucket, Key}),
     Msg = {self(), {Bucket,Key}, RObj, ReqId},
     N = proplists:get_value(n_val,BucketProps),
     Preflist = riak_ring:filtered_preflist(DocIdx, Ring, N),

src/riak_util.erl

 -export([moment/0,make_tmp_dir/0,compare_dates/2,reload_all/1,
          is_x_deleted/1,obj_not_deleted/1,integer_to_list/2,
          unique_id_62/0]).
+-export([chash_key/1,chash_std_keyfun/1,chash_bucketonly_keyfun/1]).
 -export([try_cast/3, fallback/4, mkclientid/1]).
 
 %% @spec moment() -> integer()
            "~4.4.0w~2.2.0w~2.2.0w~2.2.0w~2.2.0w~2.2.0w-~s-~s-~p",
                     [Y,Mo,D,H,Mi,S,node(),RemoteNode,NowPart]))).
 
+%% @spec chash_key(BKey :: riak_object:bkey()) -> chash:index().
+%% @doc Create a binary used for determining replica placement.
+chash_key({Bucket,Key}) ->
+    BucketProps = riak_bucket:get_bucket(Bucket),
+    {chash_keyfun, {M, F}} = proplists:lookup(chash_keyfun, BucketProps),
+    M:F({Bucket,Key}).
+
+%% @spec chash_std_keyfun(BKey :: riak_object:bkey()) -> chash:index().
+%% @doc Default object/ring hashing fun, direct passthrough of bkey.
+chash_std_keyfun({Bucket, Key}) -> chash:key_of({Bucket, Key}).
+    
+%% @spec chash_bucketonly_keyfun(BKey :: riak_object:bkey()) -> chash:index().
+%% @doc Object/ring hashing fun that ignores Key, only uses Bucket.
+chash_bucketonly_keyfun({Bucket, _Key}) -> chash:key_of(Bucket).
+
 %% @spec moment_test() -> boolean()
 moment_test() ->
     M1 = riak_util:moment(),