Anonymous avatar Anonymous committed 332a351 Merge

merge

Comments (0)

Files changed (16)

 Rusty Klophaus
 Jay Doane
 Martin Scholl
+Jayson Baird
 
+

client_lib/jiak.py

 #!/usr/bin/env python
+"""
+Copyright 2009 Jay Baird <jay@mochimedia.com>
 
-# This file is provided to you under the Apache License,
-# Version 2.0 (the "License"); you may not use this file
-# except in compliance with the License.  You may obtain
-# a copy of the License at
+This file is provided to you under the Apache License,
+Version 2.0 (the "License"); you may not use this file
+except in compliance with the License.  You may obtain
+a copy of the License at
 
-#   http://www.apache.org/licenses/LICENSE-2.0
+  http://www.apache.org/licenses/LICENSE-2.0
 
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.    
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+"""
 
-import httplib
 import urllib
+from cStringIO import StringIO
+
 try:
-    import json
+    import pycurl
+    HAS_PYCURL = True
+except ImportError:
+    import httplib
+    HAS_PYCURL = False
+    
+try:
+	import json
 except ImportError:
     import simplejson as json
 
-class JiakClient:
-    '''A Python interface for speaking to Riak.
-    (the following doctest only works if you
-     have a running riak cluster with
-     {riak_web_ip, "127.0.0.1"}.
-     {riak_web_port, 8999}.
-    )
+def expect(status):
+    """Wraps a function in a function that guarantees a return code(s) and 
+       if certain conditions are met either return None or a Dictionary from
+       a JSON string.
+    
+    """
+    if not isinstance(status, (list, tuple)):
+        status = [status]
+    def wrapper_func(f):
+        def wrapped_func(*args, **kwargs):
+            code, resp = f(*args, **kwargs)
+            if HAS_PYCURL:
+                resp.reset()
+            if code not in status:
+                raise JiakException(code, resp.read())
+            if code in [404, 204]:
+                return None
+            else:
+                return json.load(resp)
+        return wrapped_func
+    return wrapper_func
 
-    Example usage:
-
-    >>> JC = JiakClient("127.0.0.1",8999)
-    >>> [JC.delete("jiak_example", key) for key in ["doctestkey","jroot","jleaf1","jleaf2","jleaf3"]]
-    [None, None, None, None, None]
-    >>> JO = JiakObject("jiak_example", "doctestkey")
-    >>> JO.object["foo"] = 2
-    >>> JC.store(JO)
-    >>> JC.fetch("jiak_example", "doctestkey").object["foo"] == 2
-    True
-    >>> JRoot = JiakObject("jiak_example","jroot")
-    >>> JRoot.object["foo"] = 0
-    >>> JLeaf1 = JiakObject("jiak_example","jleaf1")
-    >>> JLeaf1.object["foo"] = "in results"
-    >>> JLeaf2 = JiakObject("jiak_example","jleaf2")
-    >>> JLeaf2.object["foo"] = "in results"
-    >>> JLeaf3 = JiakObject("jiak_example","jleaf3")
-    >>> JLeaf3.object["foo"] = "not in results"
-    >>> JRoot.links = [("jiak_example", "jleaf1", "tag_one"), ("jiak_example", "jleaf2", "tag_one"), ("jiak_example", "jleaf3", "tag_other")]
-    >>> [JC.store(xobj) for xobj in [JRoot, JLeaf1, JLeaf2, JLeaf3]]
-    [None, None, None, None]
-    >>> [O.object["foo"] for O in JC.walk("jiak_example","jroot",[("jiak_example","tag_one","1")])[0]]
-    [u'in results', u'in results']
-    >>> [JC.delete("jiak_example", key) for key in ["doctestkey","jroot","jleaf1","jleaf2","jleaf3"]]
-    [None, None, None, None, None]
-    '''
-
-    def __init__(self, IP, Port, JiakPrefix="/jiak/"):
-        self.IP = IP
-        self.Port = Port
-        self.JKP = JiakPrefix
-    def _do_req(self, method, uri, body="", headers={}):
-        C = httplib.HTTPConnection(self.IP, self.Port)
-        C.request(method, uri, body, headers)
-        return C.getresponse()
-    def _expect(self, Status, Resp):
-        if Resp.status == Status:
-            return json.loads(Resp.read())
-        raise JiakException(Resp.status, Resp.reason, Resp.read())
-    def set_bucket_schema(self, Bucket, allowed_fields,
-                write_mask=None, read_mask=None, required_fields=None):
-        if required_fields == None: required_fields = []
-        if write_mask == None: write_mask = allowed_fields
-        if read_mask == None: read_mask = allowed_fields
-        Body = json.dumps({'schema': {'allowed_fields': allowed_fields,
-                                      'required_fields': required_fields,
-                                      'write_mask': write_mask,
-                                      'read_mask': read_mask}})
-        Resp = self._do_req("PUT",
-                            self.JKP + urllib.quote_plus(Bucket),
-                            Body,
-                            {"Content-Type": "application/json"})
-        if Resp.status == 204:
-            return None
-        raise JiakException(Resp.status, Resp.reason, Resp.read())
-    def list_bucket(self, Bucket):
-        return self._expect(200,
-                 self._do_req("GET", self.JKP + urllib.quote_plus(Bucket)))
-    def store(self, JObj, W=2,DW=2):
-        NewData = self._expect(200,
-                     self._do_req("PUT",
-                                  self.JKP
-                                  + urllib.quote_plus(JObj.bucket) + "/"
-                                  + urllib.quote_plus(JObj.key)
-                                  + "?returnbody=true"
-                                  + "&w=" + str(W)
-                                  + "&dw=" + str(DW),
-                                  JObj.to_json(),
-                                  {"Content-Type": "application/json"}))
-        JObj.update(NewData)
-    def fetch(self, bucket, key, R=2):
-        Resp = self._do_req("GET",
-                            self.JKP + urllib.quote_plus(bucket)
-                            + "/" + urllib.quote_plus(key)
-                            + "?r=" + str(R))
-        if Resp.status == 404:
-            return None
-        Data = self._expect(200,Resp)
-        Obj = JiakObject(bucket, key)
-        Obj.update(Data)
-        return Obj
-    def delete(self, bucket, key, DW=2):
-        Resp = self._do_req("DELETE",
-                            self.JKP + urllib.quote_plus(bucket)
-                            + "/" + urllib.quote_plus(key)
-                            + "?dw=" + str(DW))
-        if Resp.status == 404:
-            return None
-        elif Resp.status == 204:
-            return None
-        raise JiakException(Resp.status, Resp.reason, Resp.read())
-    def walk(self, bucket, key, spec):
-        # spec should be a list of tuples, each of the form:
-        # (bucket, tag, acc) where
-        # bucket is a string name of a bucket, or "_" to match any bucket
-        # tag is a string tag name, or "_" to match any link tag
-        # acc is either the string "1" or "0"
-        #
-        # if the walk succeeds, this will return a list, where each
-        #  element is a list of JiakObjects corresponding to a spec
-        #  element that had acc == "1"
-        Resp = self._do_req("GET",
-                            self.JKP + urllib.quote_plus(bucket)
-                            + "/" + urllib.quote_plus(key) + "/"
-                            + _convert_walk_spec(spec))
-        if Resp.status == 404:
-            return None
-        Data = self._expect(200,Resp)
-        objlists = Data['results']
-        return _convert_objlists(objlists)
-
-def _convert_objlists(objlists):
-    return [[_make_object(objdata) for objdata in objlist]
-            for objlist in objlists]
-
-def _make_object(data):
-    O = JiakObject(data['bucket'], data['key'])
-    O.update(data)
-    return O
-
-def _convert_walk_spec(spec):
-    return "/".join([urllib.quote_plus(b) + "," + urllib.quote_plus(t)
-                     + "," + a for (b,t,a) in spec])
-
-class JiakObject:
-    def __init__(self, bucket, key, links=None, obj=None):
-        self.bucket = bucket
-        self.key = key
-        if links == None: links = []
-        self.links = links
-        if obj == None: obj = {}
-        self.object = obj
-    def update(self, Data):
-        self.vclock = Data["vclock"]
-        self.lastmod = Data["lastmod"]
-        self.vtag = Data["vtag"]
-        self.object = Data["object"]
-        self.links = Data["links"]
-    def to_json(self):
-        return json.dumps(self.__dict__)
-
+def build_headers(h, disable_continue=True):
+    headers = ['Expect:'] if disable_continue else []
+    for k,v in h.iteritems():
+        headers.append('%s: %s' % (k, v))
+    return headers
 
 class JiakException(Exception): pass
 
+class Jiak(object):
+    """A Python interface for the Riak (http://riak.basho.com/) key-value store.
+       The Riak source does ship with a client library for python, but I wanted
+       something more Pythonic and I wanted to use pycURL.
+
+       Example Usage:
+       
+       >>> client = Jiak('127.0.0.1', 8098, 'jiak')
+       >>> [client.delete('jiak_example', key) for key in ['doctestkey', 'jroot', 'jleaf1', 'jleaf2', 'jleaf3']]
+       [None, None, None, None, None]
+       >>> obj = client.store('jiak_example', 'doctestkey', {'foo':2})
+       >>> client.fetch('jiak_example', 'doctestkey').get('object', {}).get('foo') == 2
+       True
+       
+    """
+    def __init__(self, host, port, prefix="jiak"):
+        self.host = host
+        self.port = port
+        self.prefix = prefix
+        if HAS_PYCURL:
+            self._request = self._pycurl_request
+        else:
+            self._request = self._httplib_request
+    
+    def _build_path(self, bucket, key=''):
+        return 'http://%s:%d/%s/%s/%s' % (self.host, self.port, self.prefix,
+                                            urllib.quote_plus(bucket),
+                                            urllib.quote_plus(key))
+    
+    def _httplib_request(self, method, uri, body="", headers={}):
+        client = httplib.HTTPConnection(self.host, self.port)
+        client.request(method, uri, body, headers)
+        response = client.getresponse()
+        return response.status, response.getheaders(), response
+        
+    def _pycurl_request(self, method, uri, body="", headers={}):
+        resp_headers = StringIO()
+        response = StringIO()
+        client = pycurl.Curl()
+        if method in ("PUT", "POST"):
+            if method == "POST":
+                client.setopt(pycurl.POST, 1)
+            else:
+                client.setopt(pycurl.CUSTOMREQUEST, method)
+            client.setopt(pycurl.POSTFIELDS, body)
+        elif method in ("DELETE",):
+            client.setopt(pycurl.CUSTOMREQUEST, method)
+        client.setopt(pycurl.URL, uri)
+        client.setopt(pycurl.HTTPHEADER, build_headers(headers))
+        client.setopt(pycurl.WRITEFUNCTION, response.write)
+        client.setopt(pycurl.HEADERFUNCTION, resp_headers.write)
+        client.perform()
+        code = client.getinfo(pycurl.HTTP_CODE)
+        
+        return code, resp_headers, response
+    
+    @expect(204)
+    def set_bucket_schema(self, bucket, allowed_fields, required_fields=[], write_mask=None, read_mask=None):
+        write_mask = allowed_fields if write_mask is None else write_mask
+        read_mask = allowed_fields if read_mask is None else read_mask
+        body = json.dumps(dict(
+            schema=dict(
+                allowed_fields=allowed_fields,
+                required_fields=required_fields,
+                write_mask=write_mask,
+                read_mask=read_mask)))
+        code, _, resp = self._request("PUT", self._build_path(bucket), body, {'Content-Type': "application/json"})
+        
+        return code, resp
+    
+    @expect(200)
+    def list_bucket(self, bucket):
+        code, _, resp = self._request("GET", self._build_path(bucket))
+        return code, resp
+     
+    @expect(200)    
+    def store(self, bucket, key, obj, links=[], w=2, dw=2):
+        obj = dict(
+            bucket=bucket,
+            key=key,
+            object=obj,
+            links=links)
+        code, _, resp = self._request("PUT", '%s?%s' % (self._build_path(bucket, key),
+                                        urllib.urlencode(dict(
+                                            returnbody='true',
+                                            w=w,
+                                            dw=dw
+                                        ))), json.dumps(obj), {'Content-Type': 'application/json'})
+        return code, resp
+    
+    @expect([200, 404])    
+    def fetch(self, bucket, key, r=2):
+        code, headers, resp = self._request("GET", '%s?r=%d' % (self._build_path(bucket, key), r))
+        return code, resp
+    
+    @expect([204, 404])
+    def delete(self, bucket, key, dw=2):
+        code, _, resp = self._request("DELETE", '%s?dw=%d' % (self._build_path(bucket, key), dw))
+        return code, resp
+        
+    @expect([200, 404])
+    def walk(self, bucket, key, spec):
+        """spec should be a list of tuples, each of the form:
+           (bucket, tag, acc) where bucket is a string name of 
+           a bucket, or "_" to match any bucket tag is a string 
+           tag name, or "_" to match any link tag acc is either 
+           the string "1" or "0"
+       
+           if the walk succeeds, this will return a list, where
+           each element is a list of JiakObjects corresponding to
+           a spec element that had acc == "1"
+        
+        """
+        def build_spec(spec):
+            return "/".join(['%s,%s,%s' % tuple(map(urllib.quote_plus, [b,t,a])) for b,t,a in spec])
+        
+        code, _, resp = self._request("GET", '%s/%s' % (self._build_path(bucket, key), build_spec(spec)))
+        return code, resp
+
+if __name__ == '__main__':
+    import doctest
+    doctest.testmod()

config/riak-demo.erlenv

 %% open files (using ulimit/sysctl) when creating larger ring sizes. 
 {ring_creation_size, 16}.
 {gossip_interval, 60000}.
-{doorbell_port, 9000}.
 {storage_backend, riak_ets_backend}.
 %{riak_dets_backend_root, "priv/store/dets"}.
 {riak_cookie, riak_demo_cookie}.

config/riak-dets.erlenv

 %% open files (using ulimit/sysctl) when creating larger ring sizes. 
 {ring_creation_size, 16}.
 {gossip_interval, 60000}.
-{doorbell_port, 9000}.
 {storage_backend, riak_dets_backend}.
 {riak_dets_backend_root, "/path/to/riak/dets-store"}.
 {riak_cookie, default_riak_cookie}.

config/riak-ets-jiak.erlenv

 %% open files (using ulimit/sysctl) when creating larger ring sizes. 
 {ring_creation_size, 16}.
 {gossip_interval, 60000}.
-{doorbell_port, 9000}.
 {storage_backend, riak_ets_backend}.
 {riak_cookie, default_riak_cookie}.
 {riak_heart_command, "(cd /path/to/riak; ./start-restart.sh /path/to/riak/config/riak-ets-jiak.erlenv)"}.

config/riak-osmos.erlenv

 %% open files (using ulimit/sysctl) when creating larger ring sizes. 
 {ring_creation_size, 16}.
 {gossip_interval, 60000}.
-{doorbell_port, 9000}.
 {storage_backend, riak_osmos_backend}.
 {riak_osmos_backend_root, "/path/to/riak/osmos-store"}.
 {riak_cookie, default_riak_cookie}.

config/riak.erlenv

 %%  Making it larger will reduce chatter, but will cause greater delays
 %%  in ring convergence between nodes.
 {gossip_interval, 60000}.
-%% doorbell_port is the UDP port used by clients and other nodes to connect
-%%  to the cluster via this node.  If this is undefined the node will be
-%%  storage-only and cannot be an entry point for API requests or joins.
-{doorbell_port, 9000}.
 %% storage_backend specifies the Erlang module defining the storage mechanism
 %%  that will be used on this node. 
 {storage_backend, riak_dets_backend}.

demo/stickynotes/riak-config.erlenv

 {ring_state_dir, "priv/ringstate"}.
 {ring_creation_size, 16}.
 {gossip_interval, 60000}.
-{doorbell_port, 9000}.
 {storage_backend, riak_ets_backend}.
 {riak_cookie, stickynotes_cookie}.
 {riak_heart_command, "(cd $RIAK_HOME; ./start-restart.sh $RIAK_HOME/demo/stickynotes/riak-config.erlenv)"}.

deps/webmachine/src/webmachine_decision_core.erl

 -export([do_log/1]).
 -include("webmachine_logger.hrl").
 
-
 handle_request(Resource, ReqState) ->
     put(resource, Resource),
     put(reqstate, ReqState),
 	404 ->
 	    {ok, ErrorHandler} = application:get_env(webmachine, error_handler),
 	    Reason = {none, none, []},
-	    ErrorHTML = ErrorHandler:render_error(
+	    {ErrorHTML,ReqState} = ErrorHandler:render_error(
                           Code, {webmachine_request,get(reqstate)}, Reason),
+            put(reqstate, ReqState), 
             wrcall({set_resp_body, ErrorHTML});
         304 ->
             wrcall({remove_resp_header, "Content-Type"}),

deps/webmachine/src/webmachine_mochiweb.erl

                                               PathTokens,AppRoot,StringPath),
             XReq1 = {webmachine_request,RS1},
             {ok,RS2} = XReq1:set_metadata('resource_module', Mod),
-            webmachine_decision_core:handle_request(Resource, RS2)
+            try 
+                webmachine_decision_core:handle_request(Resource, RS2)
+            catch
+                error:_ -> 
+                    FailReq = {webmachine_request,RS2},
+                    {ok,RS3} = FailReq:send_response(500),
+                    PostFailReq = {webmachine_request,RS3},
+                    {LogData,_RS4} = PostFailReq:log_data(),
+                    case application:get_env(webmachine,
+                                             webmachine_logger_module) of
+                        {ok, LogModule} ->
+                            spawn(LogModule, log_access, [LogData]);
+                        _ -> nop
+                    end
+            end
     end.
 
 get_option(Option, Options) ->

src/riak_connect.erl

 send_ring(Node, Node) -> ok;
 send_ring(FromNode, ToNode) ->
     gen_server:cast({?SERVER, FromNode}, {send_ring_to, ToNode}).
-    
+
 
 %% @private
 start_link() -> 
 %% @private
 init(_State) -> 
     schedule_next_gossip(),
-    {ok, false}.
+    {ok, true}.
     
 schedule_next_gossip() ->
     MaxInterval = riak:get_app_env(gossip_interval),

src/riak_doorbell.erl

-%% This file is provided to you under the Apache License,
-%% Version 2.0 (the "License"); you may not use this file
-%% except in compliance with the License.  You may obtain
-%% a copy of the License at
-
-%%   http://www.apache.org/licenses/LICENSE-2.0
-
-%% Unless required by applicable law or agreed to in writing,
-%% software distributed under the License is distributed on an
-%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-%% KIND, either express or implied.  See the License for the
-%% specific language governing permissions and limitations
-%% under the License.    
-
-%% @doc The doorbell (and door knocker) is a UDP socket server that provides a discovery mechanism for other nodes to connect into the Riak cluster.
-
--module(riak_doorbell).
-
--behaviour(gen_server).
--export([start_link/0]).
--export([init/1, handle_call/3, handle_cast/2, handle_info/2,
-	 terminate/2, code_change/3]).
--export([ring/2,knock/3, stop/0]).
-
--include_lib("eunit/include/eunit.hrl").
-	 
--record(state, {port, sock}).
-
-%% @spec start_link() -> {ok, pid()}
-start_link() ->
-    Port = riak:get_app_env(doorbell_port),
-    pong = net_adm:ping(node()), % fail if not distributed
-    gen_server:start_link({local, ?MODULE}, ?MODULE, [Port], []).
-
-%% @spec knock(IP :: list(), Port :: integer(), RiakCookie :: atom()) ->
-%%       ok | {error, Reason}
-%% @doc This is used by a node not seeking to be a member of the cluster,
-%%      to establish a distributed-erlang connection used by a riak_client.
-knock(IP, Port, RiakCookie) ->
-    % for non-riak nodes seeking a client proc
-    Nonce = random:uniform(),
-    {ok, SendSock} = gen_udp:open(0),
-    gen_udp:send(SendSock, IP, Port,term_to_binary({
-              {knock,Nonce,self(),erlang:get_cookie(),RiakCookie},node()})),
-    gen_udp:close(SendSock),
-    Nonce.
-
-%% @spec ring(IP :: list(), Port :: integer()) ->
-%%       ok | {error, Reason}
-%% @doc This is used by a node joining the riak cluster.
-ring(IP, Port) ->
-    % for riak nodes joining
-    {ok, SendSock} = gen_udp:open(0),
-    Res = gen_udp:send(SendSock, IP, Port, term_to_binary({ring,node()})),
-    gen_udp:close(SendSock),
-    Res.
-
-stop() -> gen_server:cast(?MODULE, stop).
-    
-% @private
-init([Port]) ->
-    Opts = [{active, true},
-            list,
-            {reuseaddr, true}],
-    {ok, Sock} = gen_udp:open(Port, Opts),
-    {ok, #state{port=Port,sock=Sock}}.
-
-% @private
-handle_info({udp, _Socket, IP, _InPortNo, Packet0},State) ->
-    {RingType, Node} = binary_to_term(list_to_binary(Packet0)),
-    case RingType of
-        ring ->
-            case net_adm:ping(Node) of
-                pong -> riak_eventer:notify(riak_doorbell,connected,{IP,Node});
-                pang -> riak_eventer:notify(riak_doorbell,connectfail,{IP,Node})
-            end;
-        {knock,Nonce,Pid,Cookie,RiakCookie} ->
-            case riak:get_app_env(riak_cookie) of
-                RiakCookie ->
-                    riak_eventer:notify(riak_doorbell, client_connected,
-                                        {IP, Pid, Node}),
-                    erlang:set_cookie(Node,Cookie),
-                    Pid ! {riak_connect, Nonce, node()};
-                _ ->
-                    riak_eventer:notify(riak_doorbell, client_connectfail,
-                                        {IP, Pid, Node})
-            end
-    end,
-    {noreply, State};
-handle_info(_Info, State) -> {noreply, State}.
-
-%% @private
-handle_cast(stop, State) -> {stop, normal,State}.
-
-%% @private
-handle_call(_Request, _From, State) -> {reply, no_call, State}.
-
-%% @private
-terminate(_Reason, _State=#state{sock=Sock}) ->
-    gen_udp:close(Sock),
-    ok.
-
-%% @private
-code_change(_OldVsn, State, _Extra) ->  {ok, State}.
-
-knock_test() ->
-    application:set_env(riak, doorbell_port, 9001),
-    application:set_env(riak, riak_cookie, default_riak_cookie),
-    {ok, _Pid} = riak_doorbell:start_link(),
-    Nonce = riak_doorbell:knock("127.0.0.1", 9001, default_riak_cookie),
-    receive
-        {riak_connect, Nonce, _} ->
-            ok
-    after 1000 ->
-            throw(knock_test_timeout)
-    end,
-    riak_doorbell:stop().
-       
-    
-            
-    
-
-    
-

src/riak_get_fsm.erl

                 repair_sent :: list(), 
                 final_obj :: undefined|riak_object:riak_object(),
                 timeout :: pos_integer(),
-                endtime :: pos_integer(),
+                tref    :: reference(),
                 bkey :: {riak_object:bucket(), riak_object:key()},
                 ring :: riak_ring:riak_ring()
                }).
 %% @private
 initialize(timeout, StateData0=#state{timeout=Timeout, req_id=ReqId,
                                       bkey={Bucket,Key}, ring=Ring}) ->
+    TRef = erlang:send_after(Timeout, self(), timeout),
     RealStartTime = riak_util:moment(),
     DocIdx = riak_util:chash_key({Bucket, Key}),
     riak_eventer:notify(riak_get_fsm, get_fsm_start,
                        preflist=Preflist,final_obj=undefined,
                        replied_r=[],replied_fail=[],
                        replied_notfound=[],starttime=riak_util:moment(),
-                       waiting_for=Sent,endtime=Timeout+riak_util:moment()},
-    {next_state,waiting_vnode_r,StateData,Timeout}.
+                       waiting_for=Sent,tref=TRef},
+    {next_state,waiting_vnode_r,StateData}.
 
 waiting_vnode_r({r, {ok, RObj}, Idx, ReqId},
                   StateData=#state{r=R,allowmult=AllowMult,
                                    req_id=ReqId,client=Client,
-                                   replied_r=Replied0, endtime=End}) ->
+                                   replied_r=Replied0}) ->
     Replied = [{RObj,Idx}|Replied0],
     case length(Replied) >= R of
         true ->
                                         {ReqId, ok})
             end,
             NewStateData = StateData#state{replied_r=Replied,final_obj=Final},
-            finalize(NewStateData, End);
+            finalize(NewStateData);
         false ->
             NewStateData = StateData#state{replied_r=Replied},
-            {next_state,waiting_vnode_r,NewStateData,End-riak_util:moment()}
+            {next_state,waiting_vnode_r,NewStateData}
     end;
 waiting_vnode_r({r, {error, notfound}, Idx, ReqId},
                   StateData=#state{r=R,replied_fail=Fails,
                                    req_id=ReqId,client=Client,n=N,
-                                   replied_notfound=Replied0,endtime=End}) ->
+                                   replied_notfound=Replied0}) ->
     Replied = [Idx|Replied0],
     NewStateData = StateData#state{replied_notfound=Replied},
     case (N - length(Replied) - length(Fails)) >= R of
         true ->
-            {next_state,waiting_vnode_r,NewStateData,End-riak_util:moment()};
+            {next_state,waiting_vnode_r,NewStateData};
         false ->
             riak_stat:update(node_get),
             riak_eventer:notify(riak_get_fsm, get_fsm_reply,
 waiting_vnode_r({r, {error, Err}, Idx, ReqId},
                   StateData=#state{r=R,client=Client,n=N,
                                    replied_fail=Replied0,req_id=ReqId,
-                                   replied_notfound=NotFound,endtime=End}) ->
+                                   replied_notfound=NotFound}) ->
     Replied = [{Err,Idx}|Replied0],
     NewStateData = StateData#state{replied_fail=Replied},
     case (N - length(Replied) - length(NotFound)) >= R of
         true ->
-            {next_state,waiting_vnode_r,NewStateData,End-riak_util:moment()};
+            {next_state,waiting_vnode_r,NewStateData};
         false ->
             case length(NotFound) of
                 0 ->
     {stop,normal,StateData}.
 
 waiting_read_repair({r, {ok, RObj}, Idx, ReqId},
-                  StateData=#state{req_id=ReqId,replied_r=Replied0,
-                                   endtime=End}) ->
-    finalize(StateData#state{replied_r=[{RObj,Idx}|Replied0]}, End);
+                  StateData=#state{req_id=ReqId,replied_r=Replied0}) ->
+    finalize(StateData#state{replied_r=[{RObj,Idx}|Replied0]});
 waiting_read_repair({r, {error, notfound}, Idx, ReqId},
-                  StateData=#state{req_id=ReqId,replied_notfound=Replied0,
-                                   endtime=End}) ->
-    finalize(StateData#state{replied_notfound=[Idx|Replied0]}, End);
+                  StateData=#state{req_id=ReqId,replied_notfound=Replied0}) ->
+    finalize(StateData#state{replied_notfound=[Idx|Replied0]});
 waiting_read_repair({r, {error, Err}, Idx, ReqId},
-                  StateData=#state{req_id=ReqId,replied_fail=Replied0,
-                                   endtime=End}) ->
-    finalize(StateData#state{replied_fail=[{Err,Idx}|Replied0]}, End);
+                  StateData=#state{req_id=ReqId,replied_fail=Replied0}) ->
+    finalize(StateData#state{replied_fail=[{Err,Idx}|Replied0]});
 waiting_read_repair(timeout, StateData) ->
     finalize(StateData).
 
-finalize(StateData=#state{replied_r=R,replied_fail=F,replied_notfound=NF, n=N},
-         End) ->
+finalize(StateData=#state{replied_r=R,replied_fail=F,replied_notfound=NF, n=N}) ->
     case (length(R) + length(F) + length(NF)) >= N of
-        true -> finalize(StateData);
-        false -> {next_state,waiting_read_repair,
-                  StateData,End-riak_util:moment()}
+        true -> really_finalize(StateData);
+        false -> {next_state,waiting_read_repair,StateData}
     end.
-finalize(StateData=#state{final_obj=Final,
+really_finalize(StateData=#state{final_obj=Final,
                           replied_r=RepliedR,
                           bkey=BKey,
                           req_id=ReqId,
     {stop,badmsg,StateData}.
 
 %% @private
+handle_info(timeout, StateName, StateData) ->
+    ?MODULE:StateName(timeout, StateData);
+%% @private
 handle_info(_Info, _StateName, StateData) ->
     {stop,badmsg,StateData}.
 

src/riak_put_fsm.erl

                 replied_dw :: list(), 
                 replied_fail :: list(),
                 timeout :: pos_integer(), 
-                endtime :: pos_integer(), 
+                tref    :: reference(),
                 ring :: riak_ring:riak_ring()
                }).
 
     {ok,Ring} = riak_ring_manager:get_my_ring(),
     StateData = #state{robj=RObj0, client=Client, w=W, dw=DW,
                        req_id=ReqId, timeout=Timeout, ring=Ring},
+    
     {ok,initialize,StateData,0}.
 
 %% @private
 initialize(timeout, StateData0=#state{robj=RObj0, req_id=ReqId,
                                       timeout=Timeout, ring=Ring}) ->
+    TRef = erlang:send_after(Timeout, self(), timeout),
     RObj = update_metadata(RObj0),
     RealStartTime = riak_util:moment(),
     Bucket = riak_object:bucket(RObj),
                   robj=RObj, n=N, preflist=Preflist, bkey={Bucket,Key},
                   waiting_for=Sent, starttime=riak_util:moment(),
                   replied_w=[], replied_dw=[], replied_fail=[],
-                  endtime=Timeout+riak_util:moment()},
-    {next_state,waiting_vnode_w,StateData,Timeout}.
+                  tref=TRef},
+    {next_state,waiting_vnode_w,StateData}.
 
 waiting_vnode_w({w, Idx, ReqId},
                   StateData=#state{w=W,dw=DW,req_id=ReqId,client=Client, bkey={Bucket, Key},
-                                   replied_w=Replied0, endtime=End}) ->
+                                   replied_w=Replied0}) ->
     Replied = [Idx|Replied0],
     case length(Replied) >= W of
         true ->
                     {stop,normal,StateData};
                 _ ->
                     NewStateData = StateData#state{replied_w=Replied},
-                    {next_state,waiting_vnode_dw,NewStateData,
-                     End-riak_util:moment()}
+                    {next_state,waiting_vnode_dw,NewStateData}
+
             end;
         false ->
             NewStateData = StateData#state{replied_w=Replied},
-            {next_state,waiting_vnode_w,NewStateData,End-riak_util:moment()}
+            {next_state,waiting_vnode_w,NewStateData}
     end;
 waiting_vnode_w({dw, Idx, _ReqId},
-                  StateData=#state{replied_dw=Replied0, endtime=End}) ->
+                  StateData=#state{replied_dw=Replied0}) ->
     Replied = [Idx|Replied0],
     NewStateData = StateData#state{replied_dw=Replied},
-    {next_state,waiting_vnode_w,NewStateData,End-riak_util:moment()};
+    {next_state,waiting_vnode_w,NewStateData};
 waiting_vnode_w({fail, Idx, ReqId},
                   StateData=#state{n=N,w=W,client=Client,
-                                   replied_fail=Replied0,endtime=End}) ->
+                                   replied_fail=Replied0}) ->
     Replied = [Idx|Replied0],
     NewStateData = StateData#state{replied_fail=Replied},
     case (N - length(Replied)) >= W of
         true ->
-            {next_state,waiting_vnode_w,NewStateData,End-riak_util:moment()};
+            {next_state,waiting_vnode_w,NewStateData};
         false ->
             riak_stat:update(node_put),
             riak_eventer:notify(riak_put_fsm, put_fsm_reply,
     {stop,normal,StateData}.
 
 waiting_vnode_dw({w, _Idx, ReqId},
-          StateData=#state{req_id=ReqId, endtime=End}) ->
-    {next_state,waiting_vnode_dw,StateData,End-riak_util:moment()};
+          StateData=#state{req_id=ReqId}) ->
+    {next_state,waiting_vnode_dw,StateData};
 waiting_vnode_dw({dw, Idx, ReqId},
                  StateData=#state{dw=DW, client=Client, bkey={Bucket, Key},
-                                   replied_dw=Replied0, endtime=End}) ->
+                                   replied_dw=Replied0}) ->
     Replied = [Idx|Replied0],
     case length(Replied) >= DW of
         true ->
             {stop,normal,StateData};
         false ->
             NewStateData = StateData#state{replied_dw=Replied},
-            {next_state,waiting_vnode_dw,NewStateData,End-riak_util:moment()}
+            {next_state,waiting_vnode_dw,NewStateData}
     end;
 waiting_vnode_dw({fail, Idx, ReqId},
                   StateData=#state{n=N,dw=DW,client=Client,
-                                   replied_fail=Replied0,endtime=End}) ->
+                                   replied_fail=Replied0}) ->
     Replied = [Idx|Replied0],
     NewStateData = StateData#state{replied_fail=Replied},
     case (N - length(Replied)) >= DW of
         true ->
-            {next_state,waiting_vnode_dw,NewStateData,End-riak_util:moment()};
+            {next_state,waiting_vnode_dw,NewStateData};
         false ->
             riak_eventer:notify(riak_put_fsm, put_fsm_reply,
                                 {ReqId, {error,too_many_fails,Replied}}),
     {stop,badmsg,StateData}.
 
 %% @private
+
+handle_info(timeout, StateName, StateData) ->
+    ?MODULE:StateName(timeout, StateData);
 handle_info(_Info, _StateName, StateData) ->
     {stop,badmsg,StateData}.
 
Add a comment to this file

src/riak_vnode_master.erl

File contents unchanged.

www/basic-setup.html

   ring configuration.  All nodes in a cluster should have the same
   cluster name.  This parameter is required.
 </dd>
-<dt><code>doorbell_port: integer</code></dt>
-<dd>
-  Network port (UDP) on which this node will listen for connections from
-  clients and other nodes.  If this parameter is not specified, the node
-  cannot be used to service client requests, but can still participate
-  in storage.
-</dd>
 <dt><code>riak_cookie: atom</code></dt>
 <dd>
   The Erlang cookie for the riak cluster.  All nodes in a cluster
 <pre>
 {cluster_name, "default"}.
 {ring_creation_size, 16}.
-{doorbell_port, 9000}.
 {storage_backend, riak_dets_backend}.
 {riak_dets_backend_root, "/var/riak/store"}.
 {riak_cookie, default_riak_cookie}.
 <pre>
 {cluster_name, "default"}.
 {ring_creation_size, 1024}.
-{doorbell_port, 9000}.
 {storage_backend, riak_dets_backend}.
 {riak_dets_backend_root, "/var/riak/store"}.
 {riak_cookie, default_riak_cookie}.
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.