Source

tornadis / redis.py

Full commit
import socket
from collections import deque

from tornado import iostream, ioloop
import swirl

class RedisTornado(object):
    def __init__(self, host = None, port = None):
        """
        Asynchronous Redis client using the Tornado IOLoop
        """
        self.host       = host or 'localhost'
        self.port       = port or 6379
        self._queue     = deque()
        self.callback   = None
        self._waiting   = False
        self.DELIMITER  = '\r\n'
        self.DELBYTES   = len(self.DELIMITER)
        
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
        sock.connect((self.host, self.port))
        
        self.stream = iostream.IOStream(sock)
        
        
    @swirl.asynchronous    
    def _work(self):
        self._waiting = True
        try:
            cmd, callback = self._queue.popleft()
        except IndexError:
            self._waiting = False
            return
            
        yield lambda cb: self.stream.write(cmd, cb)
        response = yield lambda cb: self.stream.read_until(self.DELIMITER, cb)
        
        if response[0] == '$':
            num_bytes = int( response[1:] )
            if num_bytes < 1:
                callback(None)
                self.waiting = False
                # ioloop.IOLoop.instance().add_callback(_work)
                self._work()
                return
            response = yield lambda cb : self.stream.read_bytes(num_bytes + self.DELBYTES, cb)
            callback(response[:-2])
            self._waiting = False
            # ioloop.IOLoop.instance().add_callback(_work)
            self._work()
            return
            
        elif response[0] == '+':
            callback(True)
            self._waiting = False
            # ioloop.IOLoop.instance().add_callback(_work)
            self._work()
            return
        
        elif response[0] == '-':
            callback(False)
            self._waiting = False
            # ioloop.IOLoop.instance().add_callback(_work)
            self._work()
            return
        
    def set(self, key, value, callback = None):
        cmd = '%s %s %s\r\n%s\r\n' % ('SET', key, len(value), value)
        self._queue.append((cmd, callback))
        if not self._waiting:
            self._work()
        
    def get(self, key, callback = None):
        cmd = 'GET %s\r\n' % key
        self._queue.append((cmd, callback))
        if not self._waiting:
            self._work()