Source

txMysql / txmysql / adbapi.py

Full commit
"""
Asynchronous DB-API compatible interface for txMysql

"""
import collections
import Queue

from twisted.internet import defer
from twisted.internet import reactor
from twisted.enterprise import adbapi
from twisted.python import log
from zope import interface as zinterface

from txmysql import imysql
from txmysql import protocol

ConnectionLost = adbapi.ConnectionLost


class Connection(object):
    def __init__(self, proto):
        self.proto = proto

    def isAvailable(self):
        return (self.proto.state == self.proto.AVAILABLE)

    def _cbRun(self, results, query):
        query.deferred.callback(results)

    def _ebRun(self, failure, query):
        query.deferred.errback(failure)

    def run(self, query):
        #import pdb; pdb.set_trace()
        # maybe escape the sql? ;-)
        d = self.proto.query(query.sql)
        d.addCallback(self._cbRun, query)
        d.addErrback(self._ebRun, query)
        return d


class Transaction(object):
    pass


class Pool(object):
    """
    Pool is responsible for maintaining a pool of connections

    Attributes:
     - started:
        * process failures during startup cause us to give up and shutdown,
        * process failrues after startup cause us to simply restart
          those processes
     - shuttingDown:
        * is set only when shutdown is called and then is true
          until the service exits.
        * disables process restarting during shutdown
     - connections: dict of information describing all processes
       key is process name.

    Args:
     - size

    Example:
    >>> Pool(1)
    """
    connectionFactory = protocol.MysqlProtocolFactory

    system = "pool" # for debugging

    def __init__(self, host, port,
                 username, password,
                 database=None,
                 size=1):
        self.started = False
        self.shuttingDown = False

        # operating information
        self.host = host
        self.port = port
        self.username = username
        self.password = password
        self.size = size
        self.connections = []
        self.restartCallbacks = []
        
    def _cbConnectionStarted(self, proto):
        c = Connection(proto)
        self.connections.append(c)

    def start(self):
        """Start up all the pools connections
        """
        dl = []
        for i in xrange(self.size):
            f = self.connectionFactory(self.username,
                                       self.password)
            d = f.getDeferred()
            d.addCallback(self._cbConnectionStarted)
            dl.append(d)
            reactor.connectTCP(self.host, self.port, f)
        dl = defer.DeferredList(dl, fireOnOneErrback=True)
        dl.addCallback(
            self._cbStart
        )
        dl.addErrback(
            self._ebStart
        )
        return dl

    def _cbStart(self, _):
        log.msg("connection pool ready!", system=self.system)
        self.started = True

    def _ebStart(self, failure):
        log.err("failed to start connection pool...", system=self.system)
        return failure

    def anyFree(self):
        for conn in self.connections:
            if conn.isAvailable():
                return True
        return False
    
    def process(self, query):
        for connection in self.connections:
            if connection.isAvailable():
                connection.run(query)
                pd = defer.Deferred()
                pd.addCallback(self._cbProcess)
                pd.addErrback(self._ebProcess)
                return pd

    def _cbProcess(self, success):
        return success

    def _ebProcess(self, failure):
        return failure


class Query(object):
    def __init__(self, sql):
        self.sql = sql
        self.deferred = defer.Deferred()


class ConnectionPool(object):
    """Handles interaction with db by transparently queuing
    requests and dispatching as connections become available
    in the pool.
    """
    poolFactory = Pool
    queueFactory = Queue.Queue

    def __init__(self, host, port,
                 username, password,
                 database=None,
                 size=1):
        self.pool = self.poolFactory(host, port,
                                     username, password,
                                     database=database,
                                     size=size)
        self.queue = self.queueFactory()

    def start(self):
        return self.pool.start()

    def _drain(self):
        while self.pool.anyFree() and self.queue.qsize > 0:
            d = self.queue.get_nowait()
            d.callback(True)

    def _fromQueue(self, _, run, interaction, *args, **kwargs):
        return run(interaction, *args, **kwargs)

    def runQuery(self, *args, **kwargs):
        self.runInteraction(self._runQuery, *args, **kwargs)

    def _runQuery(self, trans, *args, **kw):
        trans.execute(*args, **kw)

    def runInteraction(self, interaction, *args, **kwargs):
        if self.pool.anyFree():
            d = self.pool.connect(interaction, *args, **kwargs)
        else:
            d = defer.Deferred()
            d.addCallback(self._fromQueue, self.runInteraction, interaction, *args, **kwargs)
            self.queue.put_nowait(d)
        d.addCallback(self._drain)
        return d