Commits

Jacob Sondergaard committed 19b2f37

Much improved async behaviour, exceptions are now passed to callback, and commands are queued for sending in FIFO order.

Comments (0)

Files changed (5)

       client.close(ioloop.stop)
     
     def put():
-      client.put('A job to work on', callback=lambda s: show(
-          'Queued a job with jid %d', s, reserve))
+      client.put("A job to work on", callback=lambda s: show(
+          "Queued a job with jid %d", s, reserve))
     
     def reserve():
       client.reserve(callback=lambda s: show(
-          'Reserved job %s', s, lambda: delete(s['jid'])))
+          "Reserved job %s", s, lambda: delete(s["jid"])))
     
     def delete(jid):
       client.delete(jid, callback=lambda: show(
-          'Deleted job with jid %d', jid, stop))
+          "Deleted job with jid %d", jid, stop))
 
     client = beanstalktc.Client()
     client.connect(put)
 Executing the script (`python demo.py`) with beanstalkd running produces the following output:
 
     Queued a job with jid 1
-    Reserved job {'body': 'A job to work on', 'jid': 1, 'reserved': True}
+    Reserved job {"body": "A job to work on", "jid": 1}
     Deleted job with jid 1
 
 Where `jid` is the job id that beanstalkd has given the job when putting in on the queue.
 
+The client will attempt to automatically re-connect if the socket connection to beanstalkd is closed unexpectedly. In other cases where an error occur, an exception will be passed to the callback function.
+
 ## The job lifecycle
 
 A job in beanstalk gets created by a client with the put command. During its life it can be in one of four states:
 **`buried`**  
 The job is in a FIFO linked list that will not be touched by the server until a client kicks them with the "kick" command  
 
-
 ## Tubes
 
 The system has one or more tubes. Each tube consists of a ready queue and a delay queue. Each job spends its entire life in one tube. Consumers can show interest in tubes by sending the `watch` command; they can show disinterest by sending the `ignore` command. This set of interesting tubes is said to be a consumer’s `watch` list. When a client reserves a job, it may come from any of the tubes in its watch list.
 **`closed()`**
 Return True if the connection is established, otherwise returns False.
 
-If the connection is down (also while re-connecting), any attempt to communicate with beanstalkd, using methods in the following sections, will raise an IOError exception.
+If the connection is down (also while re-connecting), any attempt to communicate with beanstalkd, using methods in the following sections, will likely raise an IOError exception.
 
 ### Producer methods
 
 ### Worker methods
 
 **`reserve(timeout=None, callback=None)`**  
-Calls back with a newly-reserved job. If no job is available to be reserved, beanstalkd will wait to send a response until one becomes available. Once a job is reserved for the client, the client has limited time to run (TTR) the job before the job times out. When the job times out, the server will put the job back into the ready queue. Both the TTR and the actual time left can be found in response to the `stats-job` command.
-  
+Reserve a job from one of the watched tubes, with optional timeout
+in seconds. Calls back with a newly-reserved job.
+
+If no timeout is given, and no job is available to be reserved, beanstalkd will wait to send a response until one becomes available. Commands issued while waiting for the `reserve` callback will be queued and sent in FIFO order, when communication is resumed.
+
+A timeout value of 0 will cause the server to immediately return either a response or TIMED_OUT. A positive value of timeout will limit the amount of time the client will hold communication until a job becomes available.
+
+Once a job is reserved for the client, the client has limited time to run (TTR) the job before the job times out. When the job times out, the server will put the job back into the ready queue. Both the TTR and the actual time left can be found in response to the `stats-job` command.
+
 **`delete(jid, callback=None)`**  
 Removes a job from the server entirely. It is normally used by the client when the job has successfully run to completion. A client can delete jobs that it has `reserved`, `ready` jobs, `delayed` jobs, and jobs that are `buried`.
 

beanstalktc/__init__.py

-from beanstalktc import (Client, BeanstalktcException, UnexpectedResponse,
-        CommandFailed, Buried, DeadlineSoon)
+from beanstalktc import (Client, BeanstalkException, UnexpectedResponse,
+        CommandFailed, Buried, DeadlineSoon, TimedOut)

beanstalktc/beanstalktc.py

 #!/usr/bin/env python
 """beanstalktc - An async beanstalkd client for Tornado"""
 
-__license__ = '''
-Copyright (C) 2012 Nephics AB
+__license__ = """
+Copyright (C) 2012-2013 Nephics AB
 
 Parts of the code adopted from the beanstalkc project are:
     Copyright (C) 2008-2012 Andreas Bolka
 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 See the License for the specific language governing permissions and
 limitations under the License.
-'''
+"""
 
-__version__ = '0.2.2'
+__version__ = '0.3.0'
 
 import socket
 import time
 
+from collections import deque
+
 from tornado.ioloop import IOLoop
 from tornado.iostream import IOStream
+from tornado import stack_context
 
 
 DEFAULT_PRIORITY = 2 ** 31
 RECONNECT_TIMEOUT = 1  # Time (in seconds) between re-connection attempts
 
 
-class BeanstalktcException(Exception): pass
-class UnexpectedResponse(BeanstalktcException): pass
-class CommandFailed(BeanstalktcException): pass
-class Buried(BeanstalktcException): pass
-class DeadlineSoon(BeanstalktcException): pass
+class Bunch:
+    """Create a bunch to group a few variables.
+    Undefined attributes have the default value of None.
+    """
+    def __init__(self, **kwargs):
+        self.__dict__.update(kwargs)
+
+    def __getattr__(self, name):
+        return None
+
+
+class BeanstalkException(Bunch, Exception):
+    def __str__(self):
+        return '{}: {} returned for command {}'.format(
+            self.__class__.__name__, self.status, self.request.cmd)
+
+
+class UnexpectedResponse(BeanstalkException): pass
+class CommandFailed(BeanstalkException): pass
+class Buried(BeanstalkException): pass
+class DeadlineSoon(BeanstalkException): pass
+class TimedOut(BeanstalkException): pass
 
 
 class Client(object):
         self.io_loop = io_loop or IOLoop.instance()
         self._stream = None
         self._using = 'default'  # current tube
-        self._watching = set()   # set of watched tubes
+        self._watching = set(['default'])   # set of watched tubes
+        self._queue = deque()
+        self._talking = False
 
     def _reconnect(self):
         # wait some time before trying to re-connect
 
     def _reconnected(self):
         # re-establish the used tube and tubes being watched
-        watch_list = list(self._watching)
+        watch = self._watching.difference(['default'])
+        # ignore "default", if it is not in the client's watch list
+        ignore = set(['default']).difference(self._watching)
 
         def do_next(_=None):
             try:
-                if watch_list:
-                    self.watch(watch_list.pop(), do_next)
+                if watch:
+                    self.watch(watch.pop(), do_next)
+                elif ignore:
+                    self.ignore(ignore.pop(), do_next)
                 elif self._using != 'default':
+                    # change the tube used
                     self.use(self._using)
             except:
                 # ignored, as next re-connect will retry the operation
 
     def connect(self, callback=None):
         """Connect to beanstalkd server."""
-        if self.closed():
-            self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM,
-                    socket.IPPROTO_TCP)
-            self._stream = IOStream(self._socket, io_loop=self.io_loop)
-            self._stream.set_close_callback(self._reconnect)
+        if not self.closed():
+            return
+        self._talking = False
+        self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM,
+                socket.IPPROTO_TCP)
+        self._stream = IOStream(self._socket, io_loop=self.io_loop)
+        self._stream.set_close_callback(self._reconnect)
         self._stream.connect((self.host, self.port), callback)
 
     def close(self, callback=None):
         """Close connection to server."""
-        self._stream.set_close_callback(callback)
-        self._stream.write('quit\r\n', self._stream.close)
+        if self._stream:
+            self._stream.set_close_callback(callback)
+        if self.closed():
+            # already closed
+            callback()
+        else:
+            self._stream.write('quit\r\n', self._stream.close)
 
     def closed(self):
-        '''Returns True if the connection is closed.'''
+        """"Returns True if the connection is closed."""
         return not self._stream or self._stream.closed()
 
-    def _interact(self, command, callback, expected_ok=[], expected_err=[]):
-        # write command to socket stream
-        self._stream.write(command,
-                # when command is written: read line from socket stream
-                lambda: self._stream.read_until('\r\n',
-                # when a line has been read: return status and results
-                lambda resp: self._parse_response(resp, command, expected_ok,
-                expected_err, callback)))
+    def _interact(self, request, callback):
+        # put the interaction request into the FIFO queue
+        cb = stack_context.wrap(callback)
+        self._queue.append((request, cb))
+        self._process_queue()
 
-    def _parse_response(self, resp, command, expected_ok, expected_err,
-            callback):
-        spl = resp.split()
-        status, results = spl[0], spl[1:]
-        if status in expected_ok:
-            callback(results)
+    def _process_queue(self):
+        if self._talking or not self._queue:
             return
-        cmd = command.split('\r\n')[0]
+        # pop a request of the queue and perform the send-receive interaction
+        self._talking = True
+        with stack_context.NullContext():
+            req, cb = self._queue.popleft()
+            command = req.cmd + '\r\n'
+            if req.body:
+                command += req.body + '\r\n'
+
+            # write command and body to socket stream
+            self._stream.write(command,
+                    # when command is written: read line from socket stream
+                    lambda: self._stream.read_until('\r\n',
+                    # when a line has been read: return status and results
+                    lambda data: self._recv(req, data, cb)))
+
+    def _recv(self, req, data, cb):
+        # parse the data received as server response
+        spl = data.split()
+        status, values = spl[0], spl[1:]
+
+        error = None
+        err_args = dict(request=req, status=status, values=values)
+
         if status == 'BURIED':
-            raise Buried(cmd, status, int(results[0])
-                    if results and results[0].isdigit() else results)
-        elif status in expected_err:
-            raise CommandFailed(cmd, status, results)
+            error = Buried(**err_args)
+        elif status == 'TIMED_OUT':
+            error = TimedOut(**err_args)
+        elif status == 'DEADLINE_SOON':
+            error = DeadlineSoon(**err_args)
+        elif req.err and status in req.err:
+            error = CommandFailed(**err_args)
+        elif not req.ok or status not in req.ok:
+            error = UnexpectedResponse(**err_args)
+
+        resp = Bunch(req=req, status=status, values=values, error=error)
+
+        if error or not req.read_body:
+            # end the request and callback with results
+            self._do_callback(cb, resp)
         else:
-            raise UnexpectedResponse(cmd, status, results)
+            # read the body including the terminating two bytes of crlf
+            if len(values) == 2:
+                jid, size = int(values[0]), int(values[1])
+                resp.jid = int(jid)
+            else:
+                size = int(values[0])
+            self._stream.read_bytes(size + 2,
+                    lambda data: self._recv_body(data[:-2], resp, cb))
 
-    def _interact_success(self, command, callback, expected_ok=[],
-            expected_err=[]):
-        self._interact(command, lambda r: callback and callback(), expected_ok,
-                expected_err)
+    def _recv_body(self, data, resp, cb):
+        if resp.req.parse_yaml:
+            # parse the yaml encoded body
+            self._parse_yaml(data, resp, cb)
+        else:
+            # don't parse body, it is a job!
+            # end the request and callback with results
+            resp.body = {'jid': resp.jid, 'body': data}
+            self._do_callback(cb, resp)
 
-    def _interact_value(self, command, callback, expected_ok=[],
-            expected_err=[]):
-        # callback with a string or integer value
-        self._interact(command, lambda r: callback and callback(int(r[0])
-                if r[0].isdigit() else r[0]), expected_ok, expected_err)
-
-    def _interact_peek(self, command, callback):
-        # callback with a job or None
-        try:
-            self._interact(command,
-                lambda r: self._read_job(int(r[0]), int(r[1]), callback,
-                reserved=False), ['FOUND'], ['NOT_FOUND'])
-        except CommandFailed:
-            callback and callback(None)
-
-    def _read_job(self, jid, size, callback, reserved=True):
-        # read the body including the terminating two bytes of crlf
-        self._stream.read_bytes(size + 2, lambda resp: callback and callback({
-                'jid': jid, 'body': resp[:-2], 'reserved': reserved}))
-
-    def _interact_data(self, command, callback, expected_ok=[],
-            expected_err=[]):
-        self._interact(command,
-                # on success, read data bytes including the terminating two
-                # bytes of crlf, and parse the yaml data
-                lambda r: self._stream.read_bytes(int(r[0]) + 2,
-                lambda data: self._parse_yaml(data, callback)),
-                expected_ok, expected_err)
-
-    def _parse_yaml(self, data, callback):
-        # dirty parsing of yaml data (assume either list or dict)
-        spl = data[:-2].split('\n')[1:-1]
+    def _parse_yaml(self, data, resp, cb):
+        # dirty parsing of yaml data
+        # (assumes that data is a yaml encoded list or dict)
+        spl = data.split('\n')[1:-1]
         if spl[0].startswith('- '):
             # it is a list
-            callback([s[2:] for s in spl])
+            resp.body = [s[2:] for s in spl]
         else:
             # it is a dict
             conv = lambda v: ((float(v) if '.' in v else int(v))
                 if v[0].isdigit() or v[-1].isdigit() else v)
-            callback(dict((k, conv(v.strip())) for k, v in
-                    (s.split(':') for s in spl)))
+            resp.body = dict((k, conv(v.strip())) for k, v in
+                    (s.split(':') for s in spl))
+        self._do_callback(cb, resp)
+
+    def _do_callback(self, cb, resp):
+        # end the request and process next item in the queue
+        # and callback with results
+        self._talking = False
+        self.io_loop.add_callback(self._process_queue)
+
+        if not cb:
+            return
+
+        # default is to callback with error state (None or exception)
+        obj = None
+        req = resp.req
+
+        if resp.error:
+            obj = resp.error
+
+        elif req.read_value:
+            # callback with an integer value or a string
+            if resp.values[0].isdigit():
+                obj = int(resp.values[0])
+            else:
+                obj = resp.values[0]
+
+        elif req.read_body:
+            # callback with the body (job or parsed yaml)
+            obj = resp.body
+
+        self.io_loop.add_callback(lambda: cb(obj))
 
     #
     #  Producer commands
 
     def put(self, body, priority=DEFAULT_PRIORITY, delay=0, ttr=120,
             callback=None):
-        """Put a job body (a string) into the current tube.
+        """Put a job body (a byte string) into the current tube.
 
         The job can be delayed a number of seconds, before it is put in the
         ready queue, default is no delay.
 
         The job is assigned a Time To Run (ttr, in seconds), the mininum is
-        1 sec., default ttr=120 sec.
-        
-        Calls back with job id when inserted.
-        
-        Raises a Buried or CommandFailed exception if job is buried (the
-        server ran out of memory trying to grow the priority queue data 
-        structure), the body is too big or the server is in draining mode.
+        1 sec., default is ttr=120 sec.
+
+        Calls back with jid (job id) when job is inserted. If an error occured,
+        the callback gets a Buried or CommandFailed exception. The job is
+        buried when either the body is too big, so server ran out of memory,
+        or when the server is in draining mode.
         """
-        self._interact_value('put %d %d %d %d\r\n%s\r\n' %
-                (priority, delay, ttr, len(body), body),
-                callback, ['INSERTED'], ['BURIED', 'JOB_TOO_BIG',
-                'DRAINING'])
+        request = Bunch(cmd='put {} {} {} {}'.format(priority, delay, ttr,
+                len(body)), ok=['INSERTED'], err=['BURIED', 'JOB_TOO_BIG',
+                'DRAINING'], body=body, read_value=True)
+        self._interact(request, callback)
 
     def use(self, name, callback=None):
         """Use the tube with given name.
-    
+
         Calls back with the name of the tube now being used.
         """
-        def using(name):
-            self._using = name
+        def using(resp):
+            if not isinstance(resp, Exception):
+                self._using = resp
             if callback:
-                callback(name)
+                callback(resp)
 
-        self._interact_value('use %s\r\n' % name, using, ['USING'])
-    
+        request = Bunch(cmd='use {}'.format(name), ok=['USING'],
+                read_value=True)
+        self._interact(request, using)
+
     #
     #  Worker commands
     #
         """Reserve a job from one of the watched tubes, with optional timeout
         in seconds.
 
-        Calls back with a job dict, with keys jid (job id), body (string), and
-        reserved (boolean). If request times out, call back with None.
+        Not specifying a timeout (timeout=None, the default) will make the
+        client put the communication with beanstalkd on hold, until either a
+        job is reserved, or a already reserved job is approaching it's TTR
+        deadline. Commands issued while waiting for the "reserve" callback will
+        be queued and sent in FIFO order, when communication is resumed.
 
-        Raises a DeadlineSoon exception if ....
+        A timeout value of 0 will cause the server to immediately return either
+        a response or TIMED_OUT. A positive value of timeout will limit the
+        amount of time the client will will hold communication until a job
+        becomes available.
+
+        Calls back with a job dict (jid and body). If the request timed out,
+        the callback gets a TimedOut exception. If a reserved job has deadline
+        within the next second, the callback gets a DeadlineSoon exception.
         """
         if timeout is not None:
-            command = 'reserve-with-timeout %d\r\n' % timeout
+            command = 'reserve-with-timeout {}'.format(timeout)
         else:
-            command = 'reserve\r\n'
-        try:
-            self._interact(command,
-                    lambda r: self._read_job(int(r[0]), int(r[1]), callback),
-                    ['RESERVED'], ['DEADLINE_SOON', 'TIMED_OUT'])
-        except CommandFailed as (_, status, results):
-            if status == 'TIMED_OUT':
-                callback and callback(None)
-            elif status == 'DEADLINE_SOON':
-                raise DeadlineSoon(results)
+            command = 'reserve'
+        request = Bunch(cmd=command, ok=['RESERVED'], err=['DEADLINE_SOON',
+                'TIMED_OUT'], read_body=True)
+        self._interact(request, callback)
 
     def delete(self, jid, callback=None):
-        """Delete a job, by job id.
-    
-        Call back when job is deleted.
-    
-        A CommandFailed exception is raised if job id does not exists.
+        """Delete job with given jid.
+
+        Calls back when job is deleted. If the job does not exist, or it is not
+        neither reserved by the client, ready or buried; the callback gets a
+        CommandFailed exception.
         """
-        self._interact_success('delete %d\r\n' % jid, callback, ['DELETED'],
-                ['NOT_FOUND'])
-    
+        request = Bunch(cmd='delete {}'.format(jid), ok=['DELETED'],
+                err=['NOT_FOUND'])
+        self._interact(request, callback)
+
     def release(self, jid, priority=DEFAULT_PRIORITY, delay=0, callback=None):
         """Release a reserved job back into the ready queue.
-    
-        Call back when job is released or buried (if the server ran out of
-        memory trying to grow the priority queue data structure).
-    
-        A CommandFailed exception is raised if the job does not exist or is not
-        reserved by the client.
+
+        A new priority can be assigned to the job.
+
+        It is also possible to specify a delay (in seconds) to wait before
+        putting the job in the ready queue. The job will be in the "delayed"
+        state during this time.
+
+        Calls back when job is released. If the job was buried, the callback
+        gets a Buried exception. If the job does not exist, or it is not
+        reserved by the client, the callback gets a CommandFailed exception.
         """
-        self._interact_success('release %d %d %d\r\n' % (jid, priority, delay),
-                callback, ['RELEASED'], ['BURIED', 'NOT_FOUND'])
-    
+        request = Bunch(cmd='release {} {} {}'.format(jid, priority, delay),
+                ok=['RELEASED'], err=['BURIED', 'NOT_FOUND'])
+        self._interact(request, callback)
+
     def bury(self, jid, priority=DEFAULT_PRIORITY, callback=None):
-        """Bury a job, by job id.
-    
-        Call back when job is buried.
-    
-        A CommandFailed exception is raised if the job does not exist or is not
-        reserved by the client.
+        """Bury job with given jid.
+
+        A new priority can be assigned to the job.
+
+        Calls back when job is burried. If the job does not exist, or it is not
+        reserved by the client, the callback gets a CommandFailed exception.
         """
-        self._interact_success('bury %d %d\r\n' % (jid, priority), callback,
-                ['BURIED'], ['NOT_FOUND'])
-    
+        request = Bunch(cmd='bury {} {}'.format(jid, priority), ok=['BURIED'],
+                err=['NOT_FOUND'])
+        self._interact(request, callback)
+
     def touch(self, jid, callback=None):
-        """Touch a job, by job id, requesting more time to work on a reserved
-        job before it expires.
-    
-        Call back when job is touched.
-    
-        A CommandFailed exception is raised if the job does not exist or is not
-        reserved by the client.
+        """Touch job with given jid.
+
+        This is for requesting more time to work on a reserved job before it
+        expires.
+
+        Calls back when job is touched. If the job does not exist, or it is not
+        reserved by the client, the callback gets a CommandFailed exception.
         """
-        self._interact_success('touch %d\r\n' % jid, callback, ['TOUCHED'],
-                ['NOT_FOUND'])
+        request = Bunch(cmd='touch {}'.format(jid), ok=['TOUCHED'],
+                err=['NOT_FOUND'])
+        self._interact(request, callback)
 
     def watch(self, name, callback=None):
-        """Watch a given tube.
-    
-        Call back with number of tubes currently in the watch list.
+        """Watch tube with given name.
+
+        Calls back with number of tubes currently in the watch list.
         """
         def watching(count):
-            self._watching.add(name)
+            if not isinstance(count, Exception):
+                # add to the client's watch list
+                self._watching.add(name)
             if callback:
                 callback(count)
 
-        self._interact_value('watch %s\r\n' % name, callback, ['WATCHING'])
+        request = Bunch(cmd='watch {}'.format(name), ok=['WATCHING'],
+                read_value=True)
+        self._interact(request, callback)
 
     def ignore(self, name, callback=None):
-        """Stop watching a given tube.
-    
-        Call back with number of tubes currently in the watch list.
-    
-        A CommandFailed exception is raised on an attempt to ignore the only
-        tube in the watch list.
+        """Stop watching tube with given name.
+
+        Calls back with the number of tubes currently in the watch list. On an
+        attempt to ignore the only tube in the watch list, the callback gets a
+        CommandFailed exception.
         """
         def ignoring(count):
-            self._watching.remove(name)
+            if not isinstance(count, Exception) and name in self._watching:
+                # remove from the client's watch list
+                self._watching.remove(name)
             if callback:
                 callback(count)
 
-        self._interact_value('ignore %s\r\n' % name, ignoring, ['WATCHING'],
-            ['NOT_IGNORED'])
+        request = Bunch(cmd='ignore {}'.format(name), ok=['WATCHING'],
+                err=['NOT_IGNORED'], read_value=True)
+        self._interact(request, ignoring)
 
     #
     #  Other commands
     #
 
+    def _peek(self, variant, callback):
+        # a shared gateway for the peek* commands
+        request = Bunch(cmd='peek{}'.format(variant), ok=['FOUND'],
+                err=['NOT_FOUND'], read_body=True)
+        self._interact(request, callback)
+
     def peek(self, jid, callback=None):
-        """Peek at a job.
-    
-        Call back with a job dict or None.
+        """Peek at job with given jid.
+
+        Calls back with a job dict (jid and body). If no job exists with that
+        jid, the callback gets a CommandFailed exception.
         """
-        self._interact_peek('peek %d\r\n' % jid, callback)
-    
+        self._peek(' {}'.format(jid), callback)
+
     def peek_ready(self, callback=None):
         """Peek at next ready job in the current tube.
-    
-        Call back with a job dict or None.
+
+        Calls back with a job dict (jid and body). If no ready jobs exist,
+        the callback gets a CommandFailed exception.
         """
-        self._interact_peek('peek-ready\r\n', callback)
-    
+        self._peek('-ready', callback)
+
     def peek_delayed(self, callback=None):
         """Peek at next delayed job in the current tube.
-    
-        Call back with a job dict or None.
+
+        Calls back with a job dict (jid and body). If no delayed jobs exist,
+        the callback gets a CommandFailed exception.
         """
-        self._interact_peek('peek-delayed\r\n', callback)
-    
+        self._peek('-delayed', callback)
+
     def peek_buried(self, callback=None):
         """Peek at next buried job in the current tube.
-    
-        Call back with a job dict or None.
+
+        Calls back with a job dict (jid and body). If no buried jobs exist,
+        the callback gets a CommandFailed exception.
         """
-        self._interact_peek('peek-buried\r\n', callback)
-    
+        self._peek('-buried', callback)
+
     def kick(self, bound=1, callback=None):
-        """Kick at most bound jobs into the ready queue.
+        """Kick at most `bound` jobs into the ready queue.
 
-        Call back with the number of jobs actually kicked.
+        Calls back with the number of jobs actually kicked.
         """
-        self._interact_value('kick %d\r\n' % bound, callback, ['KICKED'])
+        request = Bunch(cmd='kick {}'.format(bound), ok=['KICKED'],
+                read_value=True)
+        self._interact(request, callback)
 
     def kick_job(self, jid, callback=None):
         """Kick job with given id into the ready queue.
         (Requires Beanstalkd version >= 1.8)
 
-        Call back if job is kicked.
-
-        A CommandFailed exception is raised if job does not exists or is not in
-        a kickable state.
+        Calls back when job is kicked. If no job exists with that jid, or if
+        job is not in a kickable state, the callback gets a CommandFailed
+        exception.
         """
-        self._interact_success('kick-job %d\r\n' % jid, callback, ['KICKED'],
-                ['NOT_FOUND'])
+        request = Bunch(cmd='kick-job {}'.format(jid), ok=['KICKED'],
+                err=['NOT_FOUND'])
+        self._interact(request, callback)
 
     def stats_job(self, jid, callback=None):
-        """Return a dict of stats about a job, by job id."""
-        self._interact_data('stats-job %d\r\n' % jid, callback,
-                ['OK'], ['NOT_FOUND'])
+        """A dict of stats about the job with given jid.
+
+        If no job exists with that jid, the callback gets a CommandFailed
+        exception.
+        """
+        request = Bunch(cmd='stats-job {}'.format(jid), ok=['OK'],
+                err=['NOT_FOUND'], read_body=True, parse_yaml=True)
+        self._interact(request, callback)
 
     def stats_tube(self, name, callback=None):
-        """Return a dict of stats about a given tube.
-    
-        A CommandFailed exception is raised if tube does not exists.
+        """A dict of stats about the tube with given name.
+
+        If no tube exists with that name, the callback gets a CommandFailed
+        exception.
         """
-        self._interact_data('stats-tube %s\r\n' % name, callback,
-                ['OK'], ['NOT_FOUND'])
-    
+        request = Bunch(cmd='stats-tube {}'.format(name), ok=['OK'],
+                err=['NOT_FOUND'], read_body=True, parse_yaml=True)
+        self._interact(request, callback)
+
     def stats(self, callback=None):
-        """Call back with a dict of beanstalkd statistics."""
-        return self._interact_data('stats\r\n', callback, ['OK'])
-    
+        """A dict of beanstalkd statistics."""
+        request = Bunch(cmd='stats', ok=['OK'], read_body=True,
+                parse_yaml=True)
+        self._interact(request, callback)
+
     def list_tubes(self, callback=None):
-        """Call back with a list of all existing tubes."""
-        self._interact_data('list-tubes\r\n', callback, ['OK'])
+        """List of all existing tubes."""
+        request = Bunch(cmd='list-tubes', ok=['OK'], read_body=True,
+                parse_yaml=True)
+        self._interact(request, callback)
 
     def list_tube_used(self, callback=None):
-        """Call back with a name of tube currently being used."""
-        self._interact_value('list-tube-used\r\n', callback, ['USING'])
+        """Name of the tube currently being used."""
+        request = Bunch(cmd='list-tube-used', ok=['USING'], read_value=True)
+        self._interact(request, callback)
 
     def list_tubes_watched(self, callback=None):
-        """Call back with a list of all tubes being watched."""
-        self._interact_data('list-tubes-watched\r\n', callback, ['OK'])
+        """List of all tubes being watched."""
+        request = Bunch(cmd='list-tubes-watched', ok=['OK'], read_body=True,
+                parse_yaml=True)
+        self._interact(request, callback)
 
     def pause_tube(self, name, delay, callback=None):
-        """Pause a tube for a given delay time, in seconds.
+        """Delay any new job being reserved from the tube for a given time.
 
-        Call back when tube is paused.
+        The delay is an integer number of seconds to wait before reserving any
+        more jobs from the queue.
 
-        A CommandFailed exception is raised if tube does not exists.
+        Calls back when tube is paused. If tube does not exists, the callback
+        will get a CommandFailed exception.
         """
-        self._interact_success('pause-tube %s %d\r\n' % (name, delay),
-                callback, ['PAUSED'], ['NOT_FOUND'])
+        request = Bunch(cmd='pause-tube {} {}'.format(name, delay),
+                ok=['PAUSED'], err=['NOT_FOUND'])
+        self._interact(request, callback)

beanstalktc/btc_test.py

         self.btc.put(body, callback=self.stop)
         jid = self.wait()
         self.assertIsInstance(jid, int)
-        
+
         # reserve the job
         self.btc.reserve(callback=self.stop)
         job = self.wait()
         self.assertIsNotNone(job)
         self.assertEqual(job['jid'], jid)
         self.assertEqual(job['body'], body)
-        self.assertTrue(job['reserved'])
-        
+
         # delete the job
         self.btc.delete(jid, callback=self.stop)
         self.wait()
         self.btc.put(body, delay=1, callback=self.stop)
         jid = self.wait()
 
-        def check(job, reserved=False):
-            self.assertIsNotNone(job)
+        def check(job):
+            self.assertNotIsInstance(job, Exception)
             self.assertEqual(job['jid'], jid)
             self.assertEqual(job['body'], body)
-            if reserved:
-                self.assertTrue(job['reserved'])
-            else:
-                self.assertFalse(job['reserved'])
 
         # peak the next delayed job
         self.btc.peek_delayed(callback=self.stop)
             # kick-job command is not available in Beanstalkd version <= 1.7
             self.btc.kick(callback=self.stop)
             self.wait()
-        
+
         # peak next ready
         self.btc.peek_ready(callback=self.stop)
         check(self.wait())
         # reserve and bury the job
         self.btc.reserve(callback=self.stop)
         job = self.wait()
-        check(job, True)
+        check(job)
         self.btc.bury(jid, callback=self.stop)
         self.wait()
 
         # kick the job to ready
         self.btc.kick(callback=self.stop)
         self.assertEqual(self.wait(), 1)
-        
+
         # delete the job
         self.btc.delete(jid, callback=self.stop)
         self.wait()
 from setuptools import setup
 
 setup(name='beanstalktc',
-      version='0.2.2',
+      version='0.3.0',
       description='An async beanstalkd client for Tornado',
       author='Jacob Sondergaard',
       author_email='jacob@nephics.com',
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.