Commits

jus...@basho.com  committed 5ce340d Merge

merge

  • Participants
  • Parent commits 0eaa5da, 93750f3

Comments (0)

Files changed (18)

File client_lib/jiak.js

         this.baseurl += '/';
 
     this.opts = Opts||{};
+
+    // utility to convert an integer to base64-encoded 32-bits
+    base64 = function(N) {
+        var base64digits = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/=";
+        return base64digits[(N >>> 26)]
+            +base64digits[((N >>> 20)&63)]
+            +base64digits[((N >>> 14)&63)]
+            +base64digits[((N >>> 8)&63)]
+            +base64digits[((N >>> 2)&63)]
+            +base64digits[((N << 4)&63)]
+            +'==';
+    }
+
+    if (('clientId' in this.opts) && !!this.opts.clientId) {
+        if (typeof this.opts.clientId == "number"
+            && this.opts.clientId > 0 && this.opts.clientId < 4294967296) {
+            this.opts.clientId = base64(this.opts.clientId);
+        }
+        //otherwise, just use whatever clientId was given
+    } else {
+        //choose a client id if the caller didn't provide one
+        this.opts.clientId = base64(Math.floor(Math.random()*4294967296));
+    }
 }
 
 JiakClient.prototype.store = function(Object, Callback, NoReturnBody, W, DW, R) {
+    var cid = this.opts.clientId;
     var req = {
         contentType: "application/json",
-        dataType: "json"
+        dataType: "json",
+        beforeSend: function(xhr) {
+            xhr.setRequestHeader("X-Riak-ClientId", cid);
+        }
     };
 
     if (this.opts.alwaysPost || !Object.key)
 }
 
 JiakClient.prototype.remove = function(Bucket, Key, Callback, RW) {
+    var cid = this.opts.clientId;
     return $.ajax({
         type:    'DELETE',
         url:     this.path(Bucket, Key)+
                    ((RW||this.opts.rw)?('?rw='+(RW||this.opts.rw)):''),
-        success: Callback
+        success: Callback,
+        beforeSend: function(xhr) {
+            xhr.setRequestHeader("X-Riak-ClientId", cid);
+        }
     });
 }
 

File client_lib/jiak.php

                 array("Content-type: application/json")));
     }
     
-    function bucket_info($bucket) {
-        return $this->_expect(200,
-            $this->_do_req("GET", $this->JKP . urlencode($bucket)));
-    }
-    
     function store_all($ar, $w=2, $dw=2) {
         foreach($ar as $o) {
             $this->store($o, $w, $dw);
         $data = $this->_expect(200, $resp);
         return $this->_make_object($data);
     }
+
+    function list_bucket($bucket) {
+        $o = $this->bucket_info($bucket, true);
+        return $o['keys'];
+    }
+
+    function get_bucket_schema($bucket) {
+        $o = $this->bucket_info($bucket, false);
+        return $o['schema'];
+    }
+
+    function bucket_info($bucket, $with_keys=true) {
+        $resp = $this->_do_req("GET",
+                    $this->JKP . urlencode($bucket) . "/" .
+                    ($with_keys===true?"":"?keys=false"));
+        if ($resp['http_code'] == 404) {
+            return null;
+        }
+        $data = $this->_expect(200, $resp);
+        return $data;
+    }
     
     function delete($bucket, $key, $dw=2) {
         $resp = $this->_do_req("DELETE", $this->JKP .
     }
 }
 
+
 /*
     // jiak.php example
     //
         
         print ("\nDeleting jiak_test object ('test_object')...\n");
         $JC->delete("jiak_test", "test_object");
-        
+ 
+        print ("\nListing keys for bucket '" . $demo_bucket . "'...\n");
+        print_r($JC->list_bucket($demo_bucket));
+
+        print ("\nGetting jiak bucket schema for bucket '" . $demo_bucket . "'...\n");
+        print_r($JC->get_bucket_schema($demo_bucket));
+       
         print ("\nDone!\n\n");
         
     } catch (Exception $ex) {
     }
 */
 
-
 ?>

File client_lib/jiak.rb

     @port = port
     @prefix = jiakPrefix
     @opts = options
+
+    if (@opts['clientId'])
+      if (@opts['clientId'].kind_of? Integer &&
+          @opts['clientId'] > 0 &&
+          @opts['clientId'] < 4294967296)
+        @opts['clientId'] = base64(@opts['clientId'])
+      end
+    else
+      @opts['clientId'] = base64(rand(4294967296))
+    end
   end
 
   # Set the schema for 'bucket'.  The schema parameter
       'r'=>(r||@opts['r'])
     }
     if (object['key'])
-      req = Net::HTTP::Put.new(path(object['bucket'], object['key'], q))
+      req = Net::HTTP::Put.new(path(object['bucket'], object['key'], q),
+                               initheader={"X-Riak-ClientId" => @opts['clientId']})
       code = '200'
     else
-      req = Net::HTTP::Post.new(path(object['bucket'], nil, q))
+      req = Net::HTTP::Post.new(path(object['bucket'], nil, q),
+                                initheader={"X-Riak-ClientId" => @opts['clientId']})
       code = '201'
     end
 
   # Delete the data stored in 'bucket' at 'key'
   def delete(bucket, key, rw=nil)
     do_req(Net::HTTP::Delete.new(path(bucket, key,
-                                      {'rw'=>(rw||@opts['rw'])})),
+                                      {'rw'=>(rw||@opts['rw'])}),
+                                 initheader={"X-Riak-ClientId" => @opts['clientId']}),
            '204')
   end
 
       raise JiakException.new(res.code+' '+res.message+' '+res.body)
     end
   end
+
+  def base64(n)
+    base64digits = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/="
+    "%c%c%c%c%c%c==" %
+      [base64digits[(n >> 26)],
+       base64digits[((n >> 20)&63)],
+       base64digits[((n >> 14)&63)],
+       base64digits[((n >> 8)&63)],
+       base64digits[((n >> 2)&63)],
+       base64digits[((n << 4)&63)]]
+  end
   private:convert_walk_spec
   private:path
   private:set_data
   private:do_req
+  private:base64
 end
 
 class JiakException<Exception

File demo/stickynotes/priv/www/js/jiak.js

         this.baseurl += '/';
 
     this.opts = Opts||{};
+
+    // utility to convert an integer to base64-encoded 32-bits
+    base64 = function(N) {
+        var base64digits = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/=";
+        return base64digits[(N >>> 26)]
+            +base64digits[((N >>> 20)&63)]
+            +base64digits[((N >>> 14)&63)]
+            +base64digits[((N >>> 8)&63)]
+            +base64digits[((N >>> 2)&63)]
+            +base64digits[((N << 4)&63)]
+            +'==';
+    }
+
+    if (('clientId' in this.opts) && !!this.opts.clientId) {
+        if (typeof this.opts.clientId == "number"
+            && this.opts.clientId > 0 && this.opts.clientId < 4294967296) {
+            this.opts.clientId = base64(this.opts.clientId);
+        }
+        //otherwise, just use whatever clientId was given
+    } else {
+        //choose a client id if the caller didn't provide one
+        this.opts.clientId = base64(Math.floor(Math.random()*4294967296));
+    }
 }
 
 JiakClient.prototype.store = function(Object, Callback, NoReturnBody, W, DW, R) {
+    var cid = this.opts.clientId;
     var req = {
         contentType: "application/json",
-        dataType: "json"
+        dataType: "json",
+        beforeSend: function(xhr) {
+            xhr.setRequestHeader("X-Riak-ClientId", cid);
+        }
     };
 
     if (this.opts.alwaysPost || !Object.key)
 }
 
 JiakClient.prototype.remove = function(Bucket, Key, Callback, RW) {
+    var cid = this.opts.clientId;
     return $.ajax({
         type:    'DELETE',
         url:     this.path(Bucket, Key)+
                    ((RW||this.opts.rw)?('?rw='+(RW||this.opts.rw)):''),
-        success: Callback
+        success: Callback,
+        beforeSend: function(xhr) {
+            xhr.setRequestHeader("X-Riak-ClientId", cid);
+        }
     });
 }
 

File doc/basic-client.txt

 
 To talk to riak, all you need is an Erlang node with riak/ebin in its 
 code path.  Once this shell is up,
-use riak:client_connect/3 to get connected.  The client returned from
+use riak:client_connect/1 to get connected.  The client returned from
 client_connect is defined by the riak_client module, and supports the
 simple functions get, put, delete, and others.
 
 Riak client nodes must use "long names" and have riak/ebin in their
 code path.  The easiest way to start a node of this nature is:
 
-$ erl -name myclient@127.0.0.1 -pa $PATH_TO_RIAK/ebin 
+$ erl -name myclient@127.0.0.1 -setcookie cookie -pa $PATH_TO_RIAK/ebin 
 
 You'll know you've done this correctly if you can execute the
 following commands and get a path to a beam file, instead of the atom
 Connecting
 ---
 
-Once you have your node running, pass your Riak server's IP, port, and
-cookie to riak:client_connect/3 to connect and get a client.  This can
+Once you have your node running, pass your Riak server nodename
+to riak:client_connect/1 to connect and get a client.  This can
 be as simple as:
 
-3> {ok, Client} = riak:client_connect("127.0.0.1", 9000, mycookie).
-{ok,{riak_client,'riak@127.0.0.1',
-                 "20090722141126-myclient@127.0.0.1-riak@127.0.0.1-928359"}}
+3> {ok, Client} = riak:client_connect('riak@127.0.0.1').
+{ok,{riak_client,'riak@127.0.0.1', <<1,112,224,226>>}}
 
 
 Storing New Data
 happen if you just fired up a new client, and overwrote the existing
 list with a new one?
 
-1> {ok, C} = riak:client_connect("127.0.0.1", 9000, mycookie).
+1> {ok, C} = riak:client_connect('riak@127.0.0.1').
 ...
 2> ok = C:put(riak_object:new(<<"groceries">>, <<"mine">>, ["bread","cheese"]), 1).
 ...
  {allow_mult,true},
  {linkfun,{no_mod,no_fun}},
  {old_vclock,86400},
- {young_vclock,21600},
+ {young_vclock,20},
  {big_vclock,50},
  {small_vclock,10}]
 
 new client.
 
 
-riak:client_connect/3 returns {error,timeout}
+riak:client_connect/1 returns {error,timeout}
 -
 
-This riak cluster is probably down.  It may also just be overloaded,
-but it's more likely down.
+The Riak node you are connecting to is down.
 
 
 {error,notfound}

File doc/basic-mapreduce.txt

 
 - If you need to do a Riak 'get' inside of a map or reduce function,
   you can use riak:local_client/0 to get a Riak client instead of
-  riak:client_connect/3.  The code is already running on a connected
-  node -- there's no need to go through the doorbell/connect
-  proceedure.
+  riak:client_connect/1.
 
 - Your map and reduce functions are running on a Riak node, which
   means that that Riak node is spending CPU time doing something other

File doc/basic-setup.txt

   The name of the cluster.  Can be anything.  Used mainly in saving
   ring configuration.  All nodes should have the same cluster name.
 
-doorbell_port: integer
-  Network port on which this node will listen for connections from
-  clients and other nodes.
-
 gossip_interval: integer
   The period, in milliseconds, at which ring state gossiping will
   happen.  A good default is 60000 (sixty seconds).  Best not to
 {ring_state_dir, "priv/ringstate"}.
 {ring_creation_size, 16}.
 {gossip_interval, 60000}.
-{doorbell_port, 9000}.
 {storage_backend, riak_fs_backend}.
 {riak_fs_backend_root, "/var/riak/store"}.
 {riak_cookie, default_riak_cookie}.
 
 1. cd /usr/local/riak
 2. cp config/riak.erlenv config/riak2.erlenv
-3. Edit riak2.erlenv, and change doorbell_port, riak_fs_backend_root,
+3. Edit riak2.erlenv, and change riak_fs_backend_root
    and riak_nodename to something unique.
-3. ./start-join config/riak2.erlenv 127.0.0.1 9000
+3. ./start-join config/riak2.erlenv riak@127.0.0.1
 
 That node will also start and background itself.  You cluster will
 still be ready to accept requests, with no further changes.
 {ring_state_dir, "priv/ringstate"}.
 {ring_creation_size, 1024}.
 {gossip_interval, 60000}.
-{doorbell_port, 9000}.
 {storage_backend, riak_fs_backend}.
 {riak_fs_backend_root, "/var/riak/store"}.
 {riak_cookie, default_riak_cookie}.
 1. Install Riak on another host.
 2. Copy riak.erlenv from your original host to the new host.
 3. Edit riak.erlenv and change riak_hostname to match the new host.
-4. ./start-join config/riak.erlenv prod0.domain.net 9000
+4. ./start-join config/riak.erlenv riak@prod0.domain.net
 
 That node will also start and background itself.  You cluster will
 still be ready to accept requests, with no further changes.
 
-Notice that there is no need to change doorbell_port,
-riak_fs_backend_port, or riak_nodename on the new host, because they
-won't conflict with those settings on the original host, unlike the
-development configuration.
+Notice that there is no need to change riak_fs_backend_root, or
+riak_nodename on the new host, because they won't conflict with 
+those settings on the original host, unlike the development configuration.
 
 
 Logging
 
 Riak doesn't do any persistent logging in the default configuration.
 Instead, logging can be "enabled" and "disabled" by connecting and
-disconnecting an "eventer".  Eventers will be described more fully in
-another document, but this simple steps for starting the default
-logging eventer are:
+disconnecting an "eventer".  Eventers are described more fully in
+the riak_eventer module documentation, but this simple steps for starting 
+the default logging eventer are:
 
 1. cd /usr/local/riak
-2. ./start-eventer.sh default default_riak_cookie 127.0.0.1 9000 \
-   evt riak_event_logger /tmp/riakevt.log
+2. ./start-logger.sh node@hostname cookie filename
+   Filename can be left blank to log to stdout.
 
-That command will start and Erlang node, named 'evt' that will stay
-running.  It will connect to the cluster "default" at 127.0.0.1:9000,
-with Erlang cookie 'default_riak_cookie'.  It will then begin spewing
-data into /tmp/riakevt.log.  Use 'tail -F /tmp/riakevt.log' to watch
-it fly by.  Note that it is not recommended that you connect this
+This command will start an Erlang node, named riak_logger@localhost,
+that will stay running. It will connect to the cluster through the
+specified node, and attach an eventer to the cluster. The eventer will
+capture all events about the system and either dump them to the provided file,
+or print them to the screen.
+
+Note that it is not recommended that you connect this
 particular logger to an active production cluster, as it generates a
 *lot* of data, and has no provision for things like log file rollover.

File src/jiak.erl

 %%      links for following.
 -module(jiak).
 
--export([local_client/0, client_connect/1]).
+-export([local_client/0, local_client/1,
+         client_connect/1, client_connect/2]).
 -export([default_jiak_bucket_props/0]).
 -export([standard_sibling_merge/1]).
 
 -include_lib("eunit/include/eunit.hrl").
 
 %% @spec local_client() -> {ok, jiak_client()}|error_term()
+%% @equiv local_client(undefined)
+local_client() -> local_client(undefined).
+
+%% @spec local_client(binary()|undefined) ->
+%%         {ok, jiak_client()}|error_term()
 %% @doc Open a Riak client for modifying Jiak objects.
-%% @see riak:local_client/0
-local_client() -> client_connect(node()).
+%% @see riak:local_client/1
+local_client(ClientId) -> client_connect(node(), ClientId).
 
 %% @spec client_connect(Node :: node())
 %%        -> {ok, Client :: jiak_client()} | exception
+%% @equiv client_connect(Node, undefined)
+client_connect(Node) -> client_connect(Node, undefined).
+
+%% @spec client_connect(Node :: node(), ClientId :: binary()|undefined)
+%%        -> {ok, Client :: jiak_client()} | exception
 %% @doc The usual way to get a client.  Timeout often means either a bad
 %%      cookie or a poorly-connected distributed erlang network.
-client_connect(Node) -> 
-    case riak:client_connect(Node) of
+client_connect(Node, ClientId) ->
+    case riak:client_connect(Node, ClientId) of
         {ok, C} -> {ok, jiak_client:new(C)};
         Error -> Error
     end.

File src/jiak_resource.erl

 %%      attempt to open a client to Riak, and will fail if it is
 %%      unable to do so.
 init(Props) ->
-    {ok, JiakClient} = 
+    ClientType = 
         case proplists:get_value(riak_local, Props) of
-            true ->
-                jiak:local_client();
+            true -> local;
             _ ->
                 Node = proplists:get_value(riak_node, Props),
                 Cookie = proplists:get_value(riak_cookie, Props),
                 erlang:set_cookie(node(), Cookie),
-                jiak:client_connect(Node)
+                {remote, Node}
         end,
     {ok, #ctx{jiak_name=proplists:get_value(jiak_name, Props),
               key=proplists:get_value(key_type, Props),
-              jiak_client=JiakClient}}.
+              jiak_client=ClientType}}.
 
 %% @spec service_available(webmachine:wrq(), context()) -> 
 %%           {boolean, webmachine:wrq(), context()}
 %%      same name as the bucket.  If no module is found, the bucket 
 %%      configuration metadata in the ring is used, and must contain
 %%      a valid Jiak schema.  
-service_available(ReqData, Context=#ctx{key=container}) ->
+service_available(ReqData, Context=#ctx{jiak_client=ClientType}) ->
+    {ok, Client} = case ClientType of
+                       local ->
+                           jiak:local_client(get_client_id(ReqData));
+                       {remote, Node} ->
+                           jiak:client_connect(Node, get_client_id(ReqData))
+                   end,
+    service_available2(ReqData, Context#ctx{jiak_client=Client}).
+
+%% @spec service_available2(webmachine:wrq(), context()) -> 
+%%           {boolean, webmachine:wrq(), context()}
+%% @doc Continue service_available processing after the riak_client
+%%      is created.
+service_available2(ReqData, Context=#ctx{key=container}) ->
     {ServiceAvailable, NewCtx} = 
         case wrq:method(ReqData) of
             'PUT' -> 
                 {true, Context#ctx{module=Mod}}
         end,
     {ServiceAvailable, ReqData, NewCtx};
-service_available(ReqData, Context) ->
+service_available2(ReqData, Context) ->
     Mod = jiak_util:get_jiak_module(ReqData),
     {true, ReqData, Context#ctx{module=Mod}}.
 
+%% @spec get_client_id(reqdata()) -> term()
+%% @doc Extract the request's preferred client id from the
+%%      X-Riak-ClientId header.  Return value will be:
+%%        'undefined' if no header was found
+%%        32-bit binary() if the header could be base64-decoded
+%%           into a 32-bit binary
+%%        string() if the header could not be base64-decoded
+%%           into a 32-bit binary
+get_client_id(RD) ->
+    case wrq:get_req_header("X-Riak-ClientId", RD) of
+        undefined -> undefined;
+        RawId ->
+            case catch base64:decode(RawId) of
+                ClientId= <<_:32>> -> ClientId;
+                _ -> RawId
+            end
+    end.
+
 %% @spec allowed_methods(webmachine:wrq(), context()) ->
 %%          {[http_method()], webmachine:wrq(), context()}
 %% @type http_method() = 'HEAD'|'GET'|'POST'|'PUT'|'DELETE'

File src/raw_http.hrl

 -define(HEAD_VCLOCK,   "X-Riak-Vclock").
 -define(HEAD_LINK,     "Link").
 -define(HEAD_ENCODING, "Content-Encoding").
+-define(HEAD_CLIENT,   "X-Riak-ClientId").
 
 %% Names of JSON fields in bucket properties
 -define(JSON_PROPS,   <<"props">>).

File src/raw_http_resource.erl

 %%      bindings from the dispatch, as well as any vtag
 %%      query parameter.
 service_available(RD, Ctx=#ctx{riak=RiakProps}) ->
-    case get_riak_client(RiakProps) of
+    case get_riak_client(RiakProps, get_client_id(RD)) of
         {ok, C} ->
             {true,
              RD,
              Ctx}
     end.
 
-%% @spec get_riak_client(local|{node(),Cookie::atom()}) ->
+%% @spec get_riak_client(local|{node(),Cookie::atom()}, term()) ->
 %%          {ok, riak_client()} | error()
 %% @doc Get a riak_client.
-get_riak_client(local) ->
-    riak:local_client();
-get_riak_client({Node, Cookie}) ->
+get_riak_client(local, ClientId) ->
+    riak:local_client(ClientId);
+get_riak_client({Node, Cookie}, ClientId) ->
     erlang:set_cookie(node(), Cookie),
-    riak:client_connect(Node).
+    riak:client_connect(Node, ClientId).
+
+%% @spec get_client_id(reqdata()) -> term()
+%% @doc Extract the request's preferred client id from the
+%%      X-Riak-ClientId header.  Return value will be:
+%%        'undefined' if no header was found
+%%        32-bit binary() if the header could be base64-decoded
+%%           into a 32-bit binary
+%%        string() if the header could not be base64-decoded
+%%           into a 32-bit binary
+get_client_id(RD) ->
+    case wrq:get_req_header(?HEAD_CLIENT, RD) of
+        undefined -> undefined;
+        RawId ->
+            case catch base64:decode(RawId) of
+                ClientId= <<_:32>> -> ClientId;
+                _ -> RawId
+            end
+    end.
 
 %% @spec allowed_methods(reqdata(), context()) ->
 %%          {[method()], reqdata(), context()}

File src/riak.erl

 -author('Bryan Fink <bryan@basho.com>').
 -export([start/0, start/1, stop/0, stop/1]).
 -export([get_app_env/1,get_app_env/2]).
--export([client_connect/1,local_client/0]).
+-export([client_connect/1,client_connect/2,
+         local_client/0,local_client/1]).
 
 -include_lib("eunit/include/eunit.hrl").
 
     end.
 
 %% @spec local_client() -> {ok, Client :: riak_client()}
+%% @equiv local_client(undefined)
+local_client() ->
+    local_client(undefined).
+
+%% @spec local_client(binary()|undefined) -> {ok, Client :: riak_client()}
 %% @doc When you want a client for use on a running Riak node.
-local_client() -> client_connect(node()).
-
+%%      ClientId should be a 32-bit binary.  If it is not, a
+%%      32-bit binary will be created from ClientId by phash2/1.
+%%      If ClientId is the atom 'undefined', a random ClientId will
+%%      be chosen.
+local_client(ClientId) ->
+    client_connect(node(), ClientId).
 
 %% @spec client_connect(Node :: node())
 %%        -> {ok, Client :: riak_client()} | {error, timeout}
+%% @equiv client_connect(Node, undefined)
+client_connect(Node) -> 
+    client_connect(Node, undefined).
+
+%% @spec client_connect(node(), binary()|undefined)
+%%         -> {ok, Client :: riak_client} | {error, timeout}
 %% @doc The usual way to get a client.  Timeout often means either a bad
 %%      cookie or a poorly-connected distributed erlang network.
-client_connect(Node) -> 
+%%      ClientId should be a 32-bit binary.  If it is not, a
+%%      32-bit binary will be created from ClientId by phash2/1.
+%%      If ClientId is the atom 'undefined', a random ClientId will
+%%      be chosen.
+client_connect(Node, ClientId= <<_:32>>) ->
     % Make sure we can reach this node...
     case net_adm:ping(Node) of
         pang -> {error, {could_not_reach_node, Node}};
-        pong -> {ok, riak_client:new(Node, riak_util:mkclientid(Node))}
-    end.
-
+        pong -> {ok, riak_client:new(Node, ClientId)}
+    end;
+client_connect(Node, undefined) ->
+    client_connect(Node, riak_util:mkclientid(Node));
+client_connect(Node, Other) ->
+    client_connect(Node, <<(erlang:phash2(Other)):32>>).
 
 %% @spec ensure_started(Application :: atom()) -> ok
 %% @doc Start the named application if not already started.

File src/riak_bucket.erl

      {linkfun,{modfun, jiak_object, mapreduce_linkfun}},
      {chash_keyfun, {riak_util, chash_std_keyfun}},
      {old_vclock, 86400},
-     {young_vclock, 21600},
+     {young_vclock, 20},
      {big_vclock, 50},
      {small_vclock, 10}].
 

File src/riak_get_fsm.erl

                           bkey=BKey,
                           req_id=ReqId,
                           replied_notfound=NotFound,
-                          ring=Ring}) ->
+                          ring=Ring,
+                          starttime=StartTime}) ->
     case Final of
         {error, notfound} ->
             maybe_finalize_delete(StateData);
         {ok,_} ->
-            maybe_do_read_repair(Ring,Final,RepliedR,NotFound,BKey,ReqId);
+            maybe_do_read_repair(Ring,Final,RepliedR,NotFound,BKey,
+                                 ReqId,StartTime);
         _ -> nop
     end,
     {stop,normal,StateData}.
     end
     end).
 
-maybe_do_read_repair(Ring,Final,RepliedR,NotFound,BKey,ReqId) ->
+maybe_do_read_repair(Ring,Final,RepliedR,NotFound,BKey,ReqId,StartTime) ->
     Targets0 = ancestor_indices(Final, RepliedR) ++ NotFound,
     Targets = [{Idx,riak_ring:index_owner(Ring,Idx)} || Idx <- Targets0],
     {ok, FinalRObj} = Final,
-    Msg = {self(), BKey, FinalRObj, ReqId},
+    Msg = {self(), BKey, FinalRObj, ReqId, StartTime},
     case Targets of
         [] -> nop;
         _ ->

File src/riak_put_fsm.erl

 %% @private
 initialize(timeout, StateData0=#state{robj=RObj0, req_id=ReqId,
                                       timeout=Timeout, ring=Ring}) ->
+    RObj = update_metadata(RObj0),
     RealStartTime = riak_util:moment(),
-    Bucket = riak_object:bucket(RObj0),
+    Bucket = riak_object:bucket(RObj),
     BucketProps = riak_bucket:get_bucket(Bucket, Ring),
-    RObj = prune_vclock(update_metadata(RObj0),BucketProps),
     Key = riak_object:key(RObj),
     riak_eventer:notify(riak_put_fsm, put_fsm_start,
                         {ReqId, RealStartTime, Bucket, Key}),
     DocIdx = riak_util:chash_key({Bucket, Key}),
-    Msg = {self(), {Bucket,Key}, RObj, ReqId},
+    Msg = {self(), {Bucket,Key}, RObj, ReqId, RealStartTime},
     N = proplists:get_value(n_val,BucketProps),
     Preflist = riak_ring:filtered_preflist(DocIdx, Ring, N),
     {Targets, Fallbacks} = lists:split(N, Preflist),
     end,
     riak_object:apply_updates(riak_object:update_metadata(RObj, NewMD)).
 
-prune_vclock(RObj,BucketProps) ->
-    % This function is a little bit evil, as it relies on the
-    % internal structure of vclocks.
-    % That structure being [{Id, {Vsn, Timestamp}}]
-    V = riak_object:vclock(RObj),
-    SortV = lists:sort(fun({_,{_,A}},{_,{_,B}}) -> A < B end, V),
-    Now = calendar:datetime_to_gregorian_seconds(erlang:universaltime()),
-    case prune_vclock1(Now,SortV,BucketProps,no_change) of
-        {no_change, _} -> RObj;
-        {pruned, NewV} -> riak_object:set_vclock(RObj,NewV)
-    end.
-
-prune_vclock1(Now,V,BProps,Changed) ->
-    case length(V) =< proplists:get_value(small_vclock,BProps) of
-        true -> {Changed, V};
-        false ->
-            {_,{_,HeadTime}} = hd(V),
-            case (Now - HeadTime) < proplists:get_value(young_vclock,BProps) of
-                true -> {Changed, V};
-                false -> prune_vclock1(Now,V,BProps,Changed,HeadTime)
-            end
-    end.
-prune_vclock1(Now,V,BProps,Changed,HeadTime) ->
-    % has a precondition that V is longer than small and older than young
-    case length(V) > proplists:get_value(big_vclock,BProps) of
-        true -> prune_vclock1(Now,tl(V),BProps,pruned);
-        false ->
-            case (Now - HeadTime) > proplists:get_value(old_vclock,BProps) of
-                true -> prune_vclock1(Now,tl(V),BProps,pruned);
-                false -> {Changed, V}
-            end
-    end.
-
 make_vtag(RObj) ->
     <<HashAsNum:128/integer>> = crypto:md5(term_to_binary(riak_object:vclock(RObj))),
     riak_util:integer_to_list(HashAsNum,62).
 
-% following two are just utility functions for test assist
-vc_obj(VC) -> riak_object:set_vclock(riak_object:new(<<"b">>,<<"k">>,<<"v">>), VC).
-obj_vc(OB) -> riak_object:vclock(OB).
-
-prune_small_vclock_test() ->
-    % vclock with less entries than small_vclock will be untouched
-    OldTime = calendar:datetime_to_gregorian_seconds(erlang:universaltime())
-               - 32000000,
-    SmallVC = [{<<"1">>, {1, OldTime}},
-               {<<"2">>, {2, OldTime}},
-               {<<"3">>, {3, OldTime}}],
-    Props = [{small_vclock,4}],
-    ?assertEqual(SmallVC, obj_vc(prune_vclock(vc_obj(SmallVC), Props))).
-
-prune_young_vclock_test() ->
-    % vclock with all entries younger than young_vclock will be untouched
-    NewTime = calendar:datetime_to_gregorian_seconds(erlang:universaltime())
-               - 1,
-    VC = [{<<"1">>, {1, NewTime}},
-          {<<"2">>, {2, NewTime}},
-          {<<"3">>, {3, NewTime}}],
-    Props = [{small_vclock,1},{young_vclock,1000}],
-    ?assertEqual(VC, obj_vc(prune_vclock(vc_obj(VC), Props))).
-
-prune_big_vclock_test() ->
-    % vclock not preserved by small or young will be pruned down to
-    % no larger than big_vclock entries
-    NewTime = calendar:datetime_to_gregorian_seconds(erlang:universaltime())
-               - 1000,
-    VC = [{<<"1">>, {1, NewTime}},
-          {<<"2">>, {2, NewTime}},
-          {<<"3">>, {3, NewTime}}],
-    Props = [{small_vclock,1},{young_vclock,1},
-             {big_vclock,2},{old_vclock,100000}],
-    ?assert(length(obj_vc(prune_vclock(vc_obj(VC), Props))) =:= 2).
-
-prune_old_vclock_test() ->
-    % vclock not preserved by small or young will be pruned down to
-    % no larger than big_vclock and no entries more than old_vclock ago
-    NewTime = calendar:datetime_to_gregorian_seconds(erlang:universaltime())
-               - 1000,
-    OldTime = calendar:datetime_to_gregorian_seconds(erlang:universaltime())
-               - 100000,    
-    VC = [{<<"1">>, {1, NewTime}},
-          {<<"2">>, {2, OldTime}},
-          {<<"3">>, {3, OldTime}}],
-    Props = [{small_vclock,1},{young_vclock,1},
-             {big_vclock,2},{old_vclock,10000}],
-    ?assert(length(obj_vc(prune_vclock(vc_obj(VC), Props))) =:= 1).
-
 make_vtag_test() ->
     Obj = riak_object:new(<<"b">>,<<"k">>,<<"v1">>),
     ?assertNot(make_vtag(Obj) =:= 

File src/riak_vnode.erl

     VNode = self(),
     do_map(ClientPid,QTerm,BKey,KeyData,Cache,Mod,ModState,VNode),
     {noreply, State};
-handle_cast({put, FSM_pid, BKey, RObj, ReqID},
+handle_cast({put, FSM_pid, BKey, RObj, ReqID, FSMTime},
             State=#state{mapcache=Cache,idx=Idx}) ->
     riak_eventer:notify(riak_vnode, put, {ReqID, Idx}),
     gen_fsm:send_event(FSM_pid, {w, Idx, ReqID}),
-    do_put(FSM_pid, BKey, RObj, ReqID, State),
+    do_put(FSM_pid, BKey, RObj, ReqID, FSMTime, State),
     {noreply, State#state{mapcache=dict:erase(BKey,Cache)}};
 handle_cast({get, FSM_pid, BKey, ReqID}, State=#state{idx=Idx}) ->
     riak_eventer:notify(riak_vnode, get, {ReqID, Idx}),
 simple_binary_put(BKey, Val, Mod, ModState) ->
     Mod:put(ModState, BKey, Val).
 
-do_put(FSM_pid, BKey, RObj, ReqID,
+do_put(FSM_pid, BKey, RObj, ReqID, PruneTime, 
        _State=#state{idx=Idx,mod=Mod,modstate=ModState}) ->
+    {ok,Ring} = riak_ring_manager:get_my_ring(),    
+    {Bucket,_Key} = BKey,
+    BProps = riak_bucket:get_bucket(Bucket, Ring),
     case syntactic_put_merge(Mod, ModState, BKey, RObj, ReqID) of
         oldobj -> 
             riak_eventer:notify(riak_vnode,put_reply,ReqID),
             gen_fsm:send_event(FSM_pid, {dw, Idx, ReqID});
-        {newobj, ObjToStore} ->
+        {newobj, NewObj} ->
+            VC = riak_object:vclock(NewObj),
+            ObjToStore = riak_object:set_vclock(NewObj,
+                                           vclock:prune(VC,PruneTime,BProps)),
             Val = term_to_binary(ObjToStore, [compressed]),
             case simple_binary_put(BKey, Val, Mod, ModState) of
                 ok ->
                 false -> {newobj, ResObj}
             end    
     end.
+

File src/riak_vnode_master.erl

     % (obligation done, now the problem of the vnodes)
     {noreply, State};
 handle_cast({vnode_put, {Partition,_Node},
-             {FSM_pid,BKey,RObj,ReqID}}, State) ->
+             {FSM_pid,BKey,RObj,ReqID,FSMTime}}, State) ->
     Pid = get_vnode(Partition, State),
-    gen_server2:cast(Pid, {put, FSM_pid, BKey, RObj, ReqID}),
+    gen_server2:cast(Pid, {put, FSM_pid, BKey, RObj, ReqID, FSMTime}),
     % (obligation done, now the problem of the vnodes)
     {noreply, State};
 handle_cast({vnode_get, {Partition,_Node},

File src/vclock.erl

 -author('Andy Gross <andy@basho.com>').
 
 -export([fresh/0,descends/2,merge/1,get_counter/2,get_timestamp/2,
-	increment/2,all_nodes/1,equal/2]).
+	increment/2,all_nodes/1,equal/2,prune/3]).
 
 -include_lib("eunit/include/eunit.hrl").
 
                 false -> true
             end
     end.
+
+% @doc Possibly shrink the size of a vclock, depending on current age and size.
+% @spec prune(V::vclock(), Now::integer(), BucketProps::term()) -> vclock()
+prune(V,Now,BucketProps) ->
+    SortV = lists:sort(fun({_,{_,A}},{_,{_,B}}) -> A < B end, V),
+    prune_vclock1(SortV,Now,BucketProps).
+% @private
+prune_vclock1(V,Now,BProps) ->
+    case length(V) =< proplists:get_value(small_vclock,BProps) of
+        true -> V;
+        false ->
+            {_,{_,HeadTime}} = hd(V),
+            case (Now - HeadTime) < proplists:get_value(young_vclock,BProps) of
+                true -> V;
+                false -> prune_vclock1(V,Now,BProps,HeadTime)
+            end
+    end.
+% @private
+prune_vclock1(V,Now,BProps,HeadTime) ->
+    % has a precondition that V is longer than small and older than young
+    case length(V) > proplists:get_value(big_vclock,BProps) of
+        true -> prune_vclock1(tl(V),Now,BProps);
+        false ->
+            case (Now - HeadTime) > proplists:get_value(old_vclock,BProps) of
+                true -> prune_vclock1(tl(V),Now,BProps);
+                false -> V
+            end
+    end.
+
+prune_small_test() ->
+    % vclock with less entries than small_vclock will be untouched
+    Now = riak_util:moment(),
+    OldTime = Now - 32000000,
+    SmallVC = [{<<"1">>, {1, OldTime}},
+               {<<"2">>, {2, OldTime}},
+               {<<"3">>, {3, OldTime}}],
+    Props = [{small_vclock,4}],
+    ?assertEqual(lists:sort(SmallVC), lists:sort(prune(SmallVC, Now, Props))).
+
+prune_young_test() ->
+    % vclock with all entries younger than young_vclock will be untouched
+    Now = riak_util:moment(),
+    NewTime = Now - 1,
+    VC = [{<<"1">>, {1, NewTime}},
+          {<<"2">>, {2, NewTime}},
+          {<<"3">>, {3, NewTime}}],
+    Props = [{small_vclock,1},{young_vclock,1000}],
+    ?assertEqual(lists:sort(VC), lists:sort(prune(VC, Now, Props))).
+
+prune_big_test() ->
+    % vclock not preserved by small or young will be pruned down to
+    % no larger than big_vclock entries
+    Now = riak_util:moment(),
+    NewTime = Now - 1000,
+    VC = [{<<"1">>, {1, NewTime}},
+          {<<"2">>, {2, NewTime}},
+          {<<"3">>, {3, NewTime}}],
+    Props = [{small_vclock,1},{young_vclock,1},
+             {big_vclock,2},{old_vclock,100000}],
+    ?assert(length(prune(VC, Now, Props)) =:= 2).
+
+prune_old_test() ->
+    % vclock not preserved by small or young will be pruned down to
+    % no larger than big_vclock and no entries more than old_vclock ago
+    Now = riak_util:moment(),
+    NewTime = Now - 1000,
+    OldTime = Now - 100000,    
+    VC = [{<<"1">>, {1, NewTime}},
+          {<<"2">>, {2, OldTime}},
+          {<<"3">>, {3, OldTime}}],
+    Props = [{small_vclock,1},{young_vclock,1},
+             {big_vclock,2},{old_vclock,10000}],
+    ?assert(length(prune(VC, Now, Props)) =:= 1).