Source

beanstalkt / beanstalkt / beanstalkt.py

Diff from to

beanstalkt/beanstalkt.py

 limitations under the License.
 """
 
-__version__ = '0.5.0'
+__version__ = '0.6.0'
 
 import socket
 import time
             # already closed
             callback()
         else:
-            self._stream.write('quit\r\n', self._stream.close)
+            self._stream.write(b'quit\r\n', self._stream.close)
 
     def closed(self):
         """"Returns True if the connection is closed."""
         self._talking = True
         with stack_context.NullContext():
             req, cb = self._queue.popleft()
-            command = req.cmd + '\r\n'
+            command = req.cmd + b'\r\n'
             if req.body:
-                command += req.body + '\r\n'
+                command += req.body + b'\r\n'
 
             # write command and body to socket stream
             self._stream.write(command,
                     # when command is written: read line from socket stream
-                    lambda: self._stream.read_until('\r\n',
+                    lambda: self._stream.read_until(b'\r\n',
                     # when a line has been read: return status and results
                     lambda data: self._recv(req, data, cb)))
 
     def _recv(self, req, data, cb):
         # parse the data received as server response
-        spl = data.split()
+        spl = data.decode('utf8').split()
         status, values = spl[0], spl[1:]
 
         error = None
     def _parse_yaml(self, data, resp, cb):
         # dirty parsing of yaml data
         # (assumes that data is a yaml encoded list or dict)
-        spl = data.split('\n')[1:-1]
+        spl = data.decode('utf8').split('\n')[1:-1]
         if spl[0].startswith('- '):
             # it is a list
             resp.body = [s[2:] for s in spl]
         buried when either the body is too big, so server ran out of memory,
         or when the server is in draining mode.
         """
-        request = Bunch(cmd='put {} {} {} {}'.format(priority, delay, ttr,
-                len(body)), ok=['INSERTED'], err=['BURIED', 'JOB_TOO_BIG',
+        cmd = 'put {} {} {} {}'.format(priority, delay, ttr,
+            len(body)).encode('utf8')
+        assert isinstance(body, bytes)
+        request = Bunch(cmd=cmd, ok=['INSERTED'], err=['BURIED', 'JOB_TOO_BIG',
                 'DRAINING'], body=body, read_value=True)
         self._interact(request, callback)
 
                 self._using = resp
             if callback:
                 callback(resp)
-
-        request = Bunch(cmd='use {}'.format(name), ok=['USING'],
+        cmd = 'use {}'.format(name).encode('utf8')
+        request = Bunch(cmd=cmd, ok=['USING'],
                 read_value=True)
         self._interact(request, using)
 
         within the next second, the callback gets a DeadlineSoon exception.
         """
         if timeout is not None:
-            command = 'reserve-with-timeout {}'.format(timeout)
+            cmd = 'reserve-with-timeout {}'.format(timeout).encode('utf8')
         else:
-            command = 'reserve'
-        request = Bunch(cmd=command, ok=['RESERVED'], err=['DEADLINE_SOON',
+            cmd = b'reserve'
+        request = Bunch(cmd=cmd, ok=['RESERVED'], err=['DEADLINE_SOON',
                 'TIMED_OUT'], read_body=True)
         self._interact(request, callback)
 
         neither reserved by the client, ready or buried; the callback gets a
         CommandFailed exception.
         """
-        request = Bunch(cmd='delete {}'.format(job_id), ok=['DELETED'],
-                err=['NOT_FOUND'])
+        cmd = 'delete {}'.format(job_id).encode('utf8')
+        request = Bunch(cmd=cmd, ok=['DELETED'], err=['NOT_FOUND'])
         self._interact(request, callback)
 
     def release(self, job_id, priority=DEFAULT_PRIORITY, delay=0,
         gets a Buried exception. If the job does not exist, or it is not
         reserved by the client, the callback gets a CommandFailed exception.
         """
-        request = Bunch(cmd='release {} {} {}'.format(job_id, priority, delay),
-                ok=['RELEASED'], err=['BURIED', 'NOT_FOUND'])
+        cmd = 'release {} {} {}'.format(job_id, priority, delay).encode('utf8')
+        request = Bunch(cmd=cmd, ok=['RELEASED'], err=['BURIED', 'NOT_FOUND'])
         self._interact(request, callback)
 
     def bury(self, job_id, priority=DEFAULT_PRIORITY, callback=None):
         Calls back when job is burried. If the job does not exist, or it is not
         reserved by the client, the callback gets a CommandFailed exception.
         """
-        request = Bunch(cmd='bury {} {}'.format(job_id, priority),
-                ok=['BURIED'], err=['NOT_FOUND'])
+        cmd = 'bury {} {}'.format(job_id, priority).encode('utf8')
+        request = Bunch(cmd=cmd, ok=['BURIED'], err=['NOT_FOUND'])
         self._interact(request, callback)
 
     def touch(self, job_id, callback=None):
         Calls back when job is touched. If the job does not exist, or it is not
         reserved by the client, the callback gets a CommandFailed exception.
         """
-        request = Bunch(cmd='touch {}'.format(job_id), ok=['TOUCHED'],
-                err=['NOT_FOUND'])
+        cmd = 'touch {}'.format(job_id).encode('utf8')
+        request = Bunch(cmd=cmd, ok=['TOUCHED'], err=['NOT_FOUND'])
         self._interact(request, callback)
 
     def watch(self, name, callback=None):
             if callback:
                 callback(count)
 
-        request = Bunch(cmd='watch {}'.format(name), ok=['WATCHING'],
-                read_value=True)
+        cmd = 'watch {}'.format(name).encode('utf8')
+        request = Bunch(cmd=cmd, ok=['WATCHING'], read_value=True)
         self._interact(request, watching)
 
     def ignore(self, name, callback=None):
             if callback:
                 callback(count)
 
-        request = Bunch(cmd='ignore {}'.format(name), ok=['WATCHING'],
-                err=['NOT_IGNORED'], read_value=True)
+        cmd = 'ignore {}'.format(name).encode('utf8')
+        request = Bunch(cmd=cmd, ok=['WATCHING'], err=['NOT_IGNORED'],
+                read_value=True)
         self._interact(request, ignoring)
 
     #
 
     def _peek(self, variant, callback):
         # a shared gateway for the peek* commands
-        request = Bunch(cmd='peek{}'.format(variant), ok=['FOUND'],
-                err=['NOT_FOUND'], read_body=True)
+        cmd = 'peek{}'.format(variant).encode('utf8')
+        request = Bunch(cmd=cmd, ok=['FOUND'], err=['NOT_FOUND'],
+                read_body=True)
         self._interact(request, callback)
 
     def peek(self, job_id, callback=None):
 
         Calls back with the number of jobs actually kicked.
         """
-        request = Bunch(cmd='kick {}'.format(bound), ok=['KICKED'],
-                read_value=True)
+        cmd = 'kick {}'.format(bound).encode('utf8')
+        request = Bunch(cmd=cmd, ok=['KICKED'], read_value=True)
         self._interact(request, callback)
 
     def kick_job(self, job_id, callback=None):
         job is not in a kickable state, the callback gets a CommandFailed
         exception.
         """
-        request = Bunch(cmd='kick-job {}'.format(job_id), ok=['KICKED'],
-                err=['NOT_FOUND'])
+        cmd = 'kick-job {}'.format(job_id).encode('utf8')
+        request = Bunch(cmd=cmd, ok=['KICKED'], err=['NOT_FOUND'])
         self._interact(request, callback)
 
     def stats_job(self, job_id, callback=None):
         If no job exists with that id, the callback gets a CommandFailed
         exception.
         """
-        request = Bunch(cmd='stats-job {}'.format(job_id), ok=['OK'],
-                err=['NOT_FOUND'], read_body=True, parse_yaml=True)
+        cmd = 'stats-job {}'.format(job_id).encode('utf8')
+        request = Bunch(cmd=cmd, ok=['OK'], err=['NOT_FOUND'], read_body=True,
+                parse_yaml=True)
         self._interact(request, callback)
 
     def stats_tube(self, name, callback=None):
         If no tube exists with that name, the callback gets a CommandFailed
         exception.
         """
-        request = Bunch(cmd='stats-tube {}'.format(name), ok=['OK'],
-                err=['NOT_FOUND'], read_body=True, parse_yaml=True)
+        cmd = 'stats-tube {}'.format(name).encode('utf8')
+        request = Bunch(cmd=cmd, ok=['OK'], err=['NOT_FOUND'], read_body=True,
+                parse_yaml=True)
         self._interact(request, callback)
 
     def stats(self, callback=None):
         """A dict of beanstalkd statistics."""
-        request = Bunch(cmd='stats', ok=['OK'], read_body=True,
+        request = Bunch(cmd=b'stats', ok=['OK'], read_body=True,
                 parse_yaml=True)
         self._interact(request, callback)
 
     def list_tubes(self, callback=None):
         """List of all existing tubes."""
-        request = Bunch(cmd='list-tubes', ok=['OK'], read_body=True,
+        request = Bunch(cmd=b'list-tubes', ok=['OK'], read_body=True,
                 parse_yaml=True)
         self._interact(request, callback)
 
     def list_tube_used(self, callback=None):
         """Name of the tube currently being used."""
-        request = Bunch(cmd='list-tube-used', ok=['USING'], read_value=True)
+        request = Bunch(cmd=b'list-tube-used', ok=['USING'], read_value=True)
         self._interact(request, callback)
 
     def list_tubes_watched(self, callback=None):
         """List of tubes currently being watched."""
-        request = Bunch(cmd='list-tubes-watched', ok=['OK'], read_body=True,
+        request = Bunch(cmd=b'list-tubes-watched', ok=['OK'], read_body=True,
                 parse_yaml=True)
         self._interact(request, callback)
 
         Calls back when tube is paused. If tube does not exists, the callback
         will get a CommandFailed exception.
         """
-        request = Bunch(cmd='pause-tube {} {}'.format(name, delay),
-                ok=['PAUSED'], err=['NOT_FOUND'])
+        cmd = 'pause-tube {} {}'.format(name, delay).encode('utf8')
+        request = Bunch(cmd=cmd, ok=['PAUSED'], err=['NOT_FOUND'])
         self._interact(request, callback)