Commits

Jacob Söndergaard committed 79feea5

updated to work with Python 3.3 (as well as Python 2.7), also: version bump

  • Participants
  • Parent commits f3b50e3

Comments (0)

Files changed (6)

 
 This module contains a beanstalkd client for [Tornado](http://www.tornadoweb.org) implemented using the asynchronous (non-blocking) [IOStream](http://www.tornadoweb.org/documentation/iostream.html) socket wrapper.
 
+The code has been tested with Python 3.3 and 2.7.
+
 The code and documentation is licensed under the Apache Licence, Version 2.0 ([http://www.apache.org/licenses/LICENSE-2.0.html]()).
 
 ## Example client usage
     import beanstalkt
 
     def show(msg, value, cb):
-      print msg % value
+      print(msg % value)
       cb()
     
     def stop():
       client.close(ioloop.stop)
     
     def put():
-      client.put("A job to work on", callback=lambda s: show(
+      client.put(b"A job to work on", callback=lambda s: show(
           "Queued a job with id %d", s, reserve))
     
     def reserve():
 Executing the script (`python demo.py`) with beanstalkd running produces the following output:
 
     Queued a job with id 1
-    Reserved job {"body": "A job to work on", "id": 1}
+    Reserved job {'body': b'A job to work on', 'id': 1}
     Deleted job with id 1
 
 Where `id` is the job id that beanstalkd has given the job when putting in on the queue.

File beanstalkt/beanstalkt.py

 limitations under the License.
 """
 
-__version__ = '0.5.0'
+__version__ = '0.6.0'
 
 import socket
 import time
             # already closed
             callback()
         else:
-            self._stream.write('quit\r\n', self._stream.close)
+            self._stream.write(b'quit\r\n', self._stream.close)
 
     def closed(self):
         """"Returns True if the connection is closed."""
         self._talking = True
         with stack_context.NullContext():
             req, cb = self._queue.popleft()
-            command = req.cmd + '\r\n'
+            command = req.cmd + b'\r\n'
             if req.body:
-                command += req.body + '\r\n'
+                command += req.body + b'\r\n'
 
             # write command and body to socket stream
             self._stream.write(command,
                     # when command is written: read line from socket stream
-                    lambda: self._stream.read_until('\r\n',
+                    lambda: self._stream.read_until(b'\r\n',
                     # when a line has been read: return status and results
                     lambda data: self._recv(req, data, cb)))
 
     def _recv(self, req, data, cb):
         # parse the data received as server response
-        spl = data.split()
+        spl = data.decode('utf8').split()
         status, values = spl[0], spl[1:]
 
         error = None
     def _parse_yaml(self, data, resp, cb):
         # dirty parsing of yaml data
         # (assumes that data is a yaml encoded list or dict)
-        spl = data.split('\n')[1:-1]
+        spl = data.decode('utf8').split('\n')[1:-1]
         if spl[0].startswith('- '):
             # it is a list
             resp.body = [s[2:] for s in spl]
         buried when either the body is too big, so server ran out of memory,
         or when the server is in draining mode.
         """
-        request = Bunch(cmd='put {} {} {} {}'.format(priority, delay, ttr,
-                len(body)), ok=['INSERTED'], err=['BURIED', 'JOB_TOO_BIG',
+        cmd = 'put {} {} {} {}'.format(priority, delay, ttr,
+            len(body)).encode('utf8')
+        assert isinstance(body, bytes)
+        request = Bunch(cmd=cmd, ok=['INSERTED'], err=['BURIED', 'JOB_TOO_BIG',
                 'DRAINING'], body=body, read_value=True)
         self._interact(request, callback)
 
                 self._using = resp
             if callback:
                 callback(resp)
-
-        request = Bunch(cmd='use {}'.format(name), ok=['USING'],
+        cmd = 'use {}'.format(name).encode('utf8')
+        request = Bunch(cmd=cmd, ok=['USING'],
                 read_value=True)
         self._interact(request, using)
 
         within the next second, the callback gets a DeadlineSoon exception.
         """
         if timeout is not None:
-            command = 'reserve-with-timeout {}'.format(timeout)
+            cmd = 'reserve-with-timeout {}'.format(timeout).encode('utf8')
         else:
-            command = 'reserve'
-        request = Bunch(cmd=command, ok=['RESERVED'], err=['DEADLINE_SOON',
+            cmd = b'reserve'
+        request = Bunch(cmd=cmd, ok=['RESERVED'], err=['DEADLINE_SOON',
                 'TIMED_OUT'], read_body=True)
         self._interact(request, callback)
 
         neither reserved by the client, ready or buried; the callback gets a
         CommandFailed exception.
         """
-        request = Bunch(cmd='delete {}'.format(job_id), ok=['DELETED'],
-                err=['NOT_FOUND'])
+        cmd = 'delete {}'.format(job_id).encode('utf8')
+        request = Bunch(cmd=cmd, ok=['DELETED'], err=['NOT_FOUND'])
         self._interact(request, callback)
 
     def release(self, job_id, priority=DEFAULT_PRIORITY, delay=0,
         gets a Buried exception. If the job does not exist, or it is not
         reserved by the client, the callback gets a CommandFailed exception.
         """
-        request = Bunch(cmd='release {} {} {}'.format(job_id, priority, delay),
-                ok=['RELEASED'], err=['BURIED', 'NOT_FOUND'])
+        cmd = 'release {} {} {}'.format(job_id, priority, delay).encode('utf8')
+        request = Bunch(cmd=cmd, ok=['RELEASED'], err=['BURIED', 'NOT_FOUND'])
         self._interact(request, callback)
 
     def bury(self, job_id, priority=DEFAULT_PRIORITY, callback=None):
         Calls back when job is burried. If the job does not exist, or it is not
         reserved by the client, the callback gets a CommandFailed exception.
         """
-        request = Bunch(cmd='bury {} {}'.format(job_id, priority),
-                ok=['BURIED'], err=['NOT_FOUND'])
+        cmd = 'bury {} {}'.format(job_id, priority).encode('utf8')
+        request = Bunch(cmd=cmd, ok=['BURIED'], err=['NOT_FOUND'])
         self._interact(request, callback)
 
     def touch(self, job_id, callback=None):
         Calls back when job is touched. If the job does not exist, or it is not
         reserved by the client, the callback gets a CommandFailed exception.
         """
-        request = Bunch(cmd='touch {}'.format(job_id), ok=['TOUCHED'],
-                err=['NOT_FOUND'])
+        cmd = 'touch {}'.format(job_id).encode('utf8')
+        request = Bunch(cmd=cmd, ok=['TOUCHED'], err=['NOT_FOUND'])
         self._interact(request, callback)
 
     def watch(self, name, callback=None):
             if callback:
                 callback(count)
 
-        request = Bunch(cmd='watch {}'.format(name), ok=['WATCHING'],
-                read_value=True)
+        cmd = 'watch {}'.format(name).encode('utf8')
+        request = Bunch(cmd=cmd, ok=['WATCHING'], read_value=True)
         self._interact(request, watching)
 
     def ignore(self, name, callback=None):
             if callback:
                 callback(count)
 
-        request = Bunch(cmd='ignore {}'.format(name), ok=['WATCHING'],
-                err=['NOT_IGNORED'], read_value=True)
+        cmd = 'ignore {}'.format(name).encode('utf8')
+        request = Bunch(cmd=cmd, ok=['WATCHING'], err=['NOT_IGNORED'],
+                read_value=True)
         self._interact(request, ignoring)
 
     #
 
     def _peek(self, variant, callback):
         # a shared gateway for the peek* commands
-        request = Bunch(cmd='peek{}'.format(variant), ok=['FOUND'],
-                err=['NOT_FOUND'], read_body=True)
+        cmd = 'peek{}'.format(variant).encode('utf8')
+        request = Bunch(cmd=cmd, ok=['FOUND'], err=['NOT_FOUND'],
+                read_body=True)
         self._interact(request, callback)
 
     def peek(self, job_id, callback=None):
 
         Calls back with the number of jobs actually kicked.
         """
-        request = Bunch(cmd='kick {}'.format(bound), ok=['KICKED'],
-                read_value=True)
+        cmd = 'kick {}'.format(bound).encode('utf8')
+        request = Bunch(cmd=cmd, ok=['KICKED'], read_value=True)
         self._interact(request, callback)
 
     def kick_job(self, job_id, callback=None):
         job is not in a kickable state, the callback gets a CommandFailed
         exception.
         """
-        request = Bunch(cmd='kick-job {}'.format(job_id), ok=['KICKED'],
-                err=['NOT_FOUND'])
+        cmd = 'kick-job {}'.format(job_id).encode('utf8')
+        request = Bunch(cmd=cmd, ok=['KICKED'], err=['NOT_FOUND'])
         self._interact(request, callback)
 
     def stats_job(self, job_id, callback=None):
         If no job exists with that id, the callback gets a CommandFailed
         exception.
         """
-        request = Bunch(cmd='stats-job {}'.format(job_id), ok=['OK'],
-                err=['NOT_FOUND'], read_body=True, parse_yaml=True)
+        cmd = 'stats-job {}'.format(job_id).encode('utf8')
+        request = Bunch(cmd=cmd, ok=['OK'], err=['NOT_FOUND'], read_body=True,
+                parse_yaml=True)
         self._interact(request, callback)
 
     def stats_tube(self, name, callback=None):
         If no tube exists with that name, the callback gets a CommandFailed
         exception.
         """
-        request = Bunch(cmd='stats-tube {}'.format(name), ok=['OK'],
-                err=['NOT_FOUND'], read_body=True, parse_yaml=True)
+        cmd = 'stats-tube {}'.format(name).encode('utf8')
+        request = Bunch(cmd=cmd, ok=['OK'], err=['NOT_FOUND'], read_body=True,
+                parse_yaml=True)
         self._interact(request, callback)
 
     def stats(self, callback=None):
         """A dict of beanstalkd statistics."""
-        request = Bunch(cmd='stats', ok=['OK'], read_body=True,
+        request = Bunch(cmd=b'stats', ok=['OK'], read_body=True,
                 parse_yaml=True)
         self._interact(request, callback)
 
     def list_tubes(self, callback=None):
         """List of all existing tubes."""
-        request = Bunch(cmd='list-tubes', ok=['OK'], read_body=True,
+        request = Bunch(cmd=b'list-tubes', ok=['OK'], read_body=True,
                 parse_yaml=True)
         self._interact(request, callback)
 
     def list_tube_used(self, callback=None):
         """Name of the tube currently being used."""
-        request = Bunch(cmd='list-tube-used', ok=['USING'], read_value=True)
+        request = Bunch(cmd=b'list-tube-used', ok=['USING'], read_value=True)
         self._interact(request, callback)
 
     def list_tubes_watched(self, callback=None):
         """List of tubes currently being watched."""
-        request = Bunch(cmd='list-tubes-watched', ok=['OK'], read_body=True,
+        request = Bunch(cmd=b'list-tubes-watched', ok=['OK'], read_body=True,
                 parse_yaml=True)
         self._interact(request, callback)
 
         Calls back when tube is paused. If tube does not exists, the callback
         will get a CommandFailed exception.
         """
-        request = Bunch(cmd='pause-tube {} {}'.format(name, delay),
-                ok=['PAUSED'], err=['NOT_FOUND'])
+        cmd = 'pause-tube {} {}'.format(name, delay).encode('utf8')
+        request = Bunch(cmd=cmd, ok=['PAUSED'], err=['NOT_FOUND'])
         self._interact(request, callback)

File beanstalkt/bt_test.py

     def test_basics(self):
         '''Test put-reserve-delete cycle'''
         # put the job on the queue
-        body = 'test job'
+        body = b'test job'
         self.btc.put(body, callback=self.stop)
         job_id = self.wait()
         self.assertIsInstance(job_id, int)
     def test_peek_bury_kick(self):
         '''Test peeking, burying and kicking'''
         # put the job on the queue with 1 sec delay
-        body = 'test job'
+        body = b'test job'
         self.btc.put(body, delay=1, callback=self.stop)
         job_id = self.wait()
 
         self.btc.kick_job(job_id, callback=self.stop)
         try:
             self.wait()
-        except beanstalkt.UnexpectedResponse as (_, status, __):
+        except beanstalkt.UnexpectedResponse as e:
+            status = e[1]
             if status != 'UNKNOWN_COMMAND':
                 raise
             # kick-job command is not available in Beanstalkd version <= 1.7

File beanstalkt/cmd.py

 def success(callback, last=True):
     def _check_point(data):
         if isinstance(data, Exception):
-            print str(data)
+            print(data)
             stop()
         else:
             callback(data)
 
 def put(body, priority, use, delay, ttr, func):
     def step1(_):
-        client.put(body, priority=priority, delay=delay, ttr=ttr,
+        client.put(body.encode('utf8'), priority=priority, delay=delay, ttr=ttr,
                 callback=success(step2))
     def step2(data):
-        print data
+        print(data)
     start(lambda: client.use(use, step1))
 
 
         client.reserve(timeout, success(step3, last=False))
 
     def step3(data):
-        print json.dumps(data, indent=2)
+        data['body'] = data['body'].decode('utf8')
+        print(json.dumps(data, indent=2))
 
         cb = success(lambda _: None)
         if action == 'delete':
 
 def peek(job_id, func):
     def step2(data):
-        print json.dumps(data, indent=2)
+        data['body'] = data['body'].decode('utf8')
+        print(json.dumps(data, indent=2))
     start(lambda: client.peek(job_id, success(step2)))
 
 
     def step1(_):
         client.peek_ready(success(step2))
     def step2(data):
-        print json.dumps(data, indent=2)
+        data['body'] = data['body'].decode('utf8')
+        print(json.dumps(data, indent=2))
     start(lambda: client.use(use, step1))
 
 
     def step1(_):
         client.peek_delayed(success(step2))
     def step2(data):
-        print json.dumps(data, indent=2)
+        data['body'] = data['body'].decode('utf8')
+        print(json.dumps(data, indent=2))
     start(lambda: client.use(use, step1))
 
 
     def step1(_):
         client.peek_buried(success(step2))
     def step2(data):
-        print json.dumps(data, indent=2)
+        data['body'] = data['body'].decode('utf8')
+        print(json.dumps(data, indent=2))
     start(lambda: client.use(use, step1))
 
 
     def step1(_):
         client.kick(bound, success(step2))
     def step2(data):
-        print data
+        print(data)
     start(lambda: client.use(use, step1))
 
 
 
 def stats_job(job_id, func):
     def step2(data):
-        print json.dumps(data, indent=2)
+        print(json.dumps(data, indent=2))
     start(lambda: client.stats_job(job_id, success(step2)))
 
 
 def stats_tube(name, func):
     def step2(data):
-        print json.dumps(data, indent=2)
+        print(json.dumps(data, indent=2))
     start(lambda: client.stats_tube(name, success(step2)))
 
 
 def stats(func):
     def step2(data):
-        print json.dumps(data, indent=2)
+        print(json.dumps(data, indent=2))
     start(lambda: client.stats(success(step2)))
 
 
 def list_tubes(func):
     def step2(data):
-        print json.dumps(data, indent=2)
+        print(json.dumps(data, indent=2))
     start(lambda: client.list_tubes(success(step2)))
 
 

File beanstalkt/demo.py

 import beanstalkt
 
 def show(msg, value, cb):
-  print msg % value
+  print(msg % value)
   cb()
 
 def stop():
   client.close(ioloop.stop)
 
 def put():
-  client.put("A job to work on", callback=lambda s: show(
+  client.put(b"A job to work on", callback=lambda s: show(
       "Queued a job with id %d", s, reserve))
 
 def reserve():
 from setuptools import setup
 
 setup(name='beanstalkt',
-      version='0.5.0',
+      version='0.6.0',
       description='An async beanstalkd client for Tornado',
       author='Jacob Sondergaard',
       author_email='jacob@nephics.com',