Commits

Anonymous committed 42f5c3e

now manage pool of connection based on sqlalchemy code.

  • Participants
  • Parent commits 5e7b513

Comments (0)

Files changed (2)

File friendpaste/dbsession.py

 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from friendpaste.utils import ScopedRegistry
+import weakref
+import thread
+import threading
+
 from couchdb import Server
 
+from friendpaste.utils import local_manager,ScopedRegistry
+import friendpaste.queue as Queue
+
+class TimeoutErrot(Exception):
+    pass
+
+class nvalidRequestError(Exception):
+    pass
+
 def make_connection(uri, dbname):
     return CouchDBProxy(uri, dbname)
 
     def __init__(self, uri, dbname):
         self.uri = uri
         self.dbname = dbname
+        self.pool =CouchDBPool(uri, dbname, local_manager.get_ident) 
 
     def _get_db(self):
-        couchdb_server = Server(self.uri)
+        """couchdb_server = Server(self.uri)
         try:
             db = couchdb_server.create(self.dbname)
         except:
-            db = couchdb_server[self.dbname]
+            db = couchdb_server[self.dbname]"""
+        con = self.pool.get()
+
+        return con.connection
+    db = property(_get_db)
+        
+class CouchDBPool(object):
+    # started couchdbpool
+
+    def __init__(self, uri, dbname, scopefunc=None, pool_size=5, max_overflow = 10, timeout=30):
+        self.uri = uri
+        self.dbname = dbname
+        if scopefunc is None:
+            self.scopefunc = thread.get_ident
+        else:
+            self.scopefunc = scopefunc
+        self._threadconns = weakref.WeakValueDictionary()
+        self._timeout = timeout
+
+        self._pool = Queue.Queue(pool_size)
+        self._overflow = 0 - pool_size
+        self._max_overflow = max_overflow
+        self._overflow_lock = self._max_overflow > -1 and threading.Lock() or None
+
+        self._on_connect = []
+        self._on_checkout = []
+        self._on_checkin = []
+
+    def connect(self, record):
+        try:
+            return self._threadconns[self.scopefunc].checkout()
+        except:
+            agent = CouchDBConnectionFairy(self)
+            self._threadconns[self.scopefunc] = agent
+            return agent.checkout()
+
+    def create_connection(self):
+        return _CouchDBConnectionRecord(self)
+
+    def return_conn(self, record):
+        if self._use_threadlocal and thread.get_ident() in self._threadconns:
+            del self._threadconns[thread.get_ident()]
+        self.do_return_conn(record)
+    
+    def do_return_conn(self, conn):
+        try:
+            self._pool.put(conn, False)
+        except Queue.Full:
+            if self._overflow_lock is None:
+                self._overflow -= 1
+            else:
+                self._overflow_lock.acquire()
+                try:
+                    self._overflow -= 1
+                finally:
+                    self._overflow_lock.release()
+    
+    def get(self):
+        return self.do_get()
+
+    def do_get(self):
+        try:
+            wait = self._max_overflow > -1 and self._overflow >= self._max_overflow
+            return self._pool.get(wait, self._timeout)
+        except Queue.Empty:
+            if self._max_overflow > -1 and self._overflow >= self._max_overflow:
+                if not wait:
+                    return self.do_get()
+                else:
+                    raise TimeoutError("QueuePool limit of size %d overflow %d reached, connection timed out, timeout %d" % (self.size(), self.overflow(), self._timeout))
+
+            if self._overflow_lock is not None:
+                self._overflow_lock.acquire()
+
+            if self._max_overflow > -1 and self._overflow >= self._max_overflow:
+                if self._overflow_lock is not None:
+                    self._overflow_lock.release()
+                return self.do_get()
+
+            try:
+                con = self.create_connection()
+                self._overflow += 1
+            finally:
+                if self._overflow_lock is not None:
+                    self._overflow_lock.release()
+            return con
+
+    def dispose(self):
+        while True:
+            try:
+                conn = self._pool.get(False)
+                conn.close()
+            except Queue.Empty:
+                break
+
+        self._overflow = 0 - self.size()
+        if self._should_log_info:
+            self.log("Pool disposed. " + self.status())
+    
+    def status(self):
+        tup = (self.size(), self.checkedin(), self.overflow(), self.checkedout())
+        return "Pool size: %d  Connections in pool: %d Current Overflow: %d Current Checked out connections: %d" % tup
+
+    def size(self):
+        return self._pool.maxsize
+
+    def checkedin(self):
+        return self._pool.qsize()
+
+    def overflow(self):
+        return self._overflow
+
+    def checkedout(self):
+        return self._pool.maxsize - self._pool.qsize() + self._overflow
+
+class _CouchDBConnectionRecord(object):
+
+    def __init__(self, pool):
+        self._pool = pool
+        self.connection = self.__connect()
+
+    def close(self):
+        # for now we do nothing here 
+        pass
+
+    def invalidate(self, e=None):
+        self.close()
+        self.connection = None
+
+    def __connect(self):
+        couchdb_server = Server(self._pool.uri)
+        try:
+            db = couchdb_server.create(self._pool.dbname)
+        except:
+            db = couchdb_server[self._pool.dbname]
 
         return db
-    db = property(_get_db)
+
+    def get_connection(self):
+        # for now recreate a connection each time you need it 
+        # and let the socket die
+        self.connection = self.__connect()
+        return self.connection()
+
+
+def _finalize_fairy(connection, connection_record, pool, ref=None):
+    if ref is not None and connection_record.backref is not ref:
+        return
+    if connection is not None:
+        if connection_record is not None:
+            connection.close()
+
+    if connection_record is not None:
+        connection_record.backref = None
+        pool.return_conn(connection_record)
+
+class _CouchDBConnectionFairy(object):
+
+    def __init__(self, pool):
+        self._pool = pool
+        self.connection = self.__connect()
+        self.__counter = 0
+
+        try:
+            rec = self._connection_record = pool.get()
+            conn = self.connection = self._connection_record.get_connection()
+            self._connection_record.backref = weakref.ref(self, lambda ref:_finalize_fairy(conn, rec, pool, ref))
+
+        except:
+            self.connection = None
+            self._connection_record = None
+            raise
+
+    is_valid = property(lambda self:self.connection is not None)
+
+    def invalidate(self, e=None):
+        if self.connection is None:
+            raise InvalidRequestError("This connection is closed")
+
+        if self._connection_record is not None:
+            self._connection_record.invalidate(e=e)
+            self.connection = None
+            self._close()
+
+    def __getattr__(self, key):
+        return getattr(self.connection, key)
+
+    def checkout(self):
+        if self.connection is None:
+            raise exceptions.InvalidRequestError("This connection is closed")
+
+        if not self._pool._on_checkout or self.__counter != 1:
+            return self
+
+        # Pool listeners can trigger a reconnection on checkout
+        attempts = 2
+        while attempts > 0:
+            try:
+                for l in self._pool._on_checkout:
+                    l.checkout(self.connection, self._connection_record, self)
+                return self
+            except:
+                self._connection_record.invalidate(e)
+                self.connection = self._connection_record.get_connection()
+                attempts -= 1
+        self.invalidate()
+        raise exceptions.InvalidRequestError("This connection is closed")
+
+    def close(self):
+        self.__counter -=1
+        if self.__counter == 0:
+            self._close()
+
+    def _close(self):
+        _finalize_fairy(self.connection, self._connection_record, self._pool)
+        self.connection = None
+        self._connection_record = None
         
-        
-
 
 class ScopedCouchDBSession(object):
     def __init__(self, session_factory, scopefunc=None):

File friendpaste/queue.py

+"""An adaptation of Py2.3/2.4's Queue module which supports reentrant
+behavior, using RLock instead of Lock for its mutex object.
+
+This is to support the connection pool's usage of ``__del__`` to return
+connections to the underlying Queue, which can apparently in extremely
+rare cases be invoked within the ``get()`` method of the Queue itself,
+producing a ``put()`` inside the ``get()`` and therefore a reentrant
+condition."""
+
+from time import time as _time
+
+try:
+    # py2.4 deque class
+    from collections import deque
+except:
+    # roll our own...
+    class deque(list):
+        def popleft(self):
+            return self.pop(0)
+
+__all__ = ['Empty', 'Full', 'Queue']
+
+class Empty(Exception):
+    "Exception raised by Queue.get(block=0)/get_nowait()."
+
+    pass
+
+class Full(Exception):
+    "Exception raised by Queue.put(block=0)/put_nowait()."
+
+    pass
+
+class Queue:
+    def __init__(self, maxsize=0):
+        """Initialize a queue object with a given maximum size.
+
+        If `maxsize` is <= 0, the queue size is infinite.
+        """
+
+        try:
+            import threading
+        except ImportError:
+            import dummy_threading as threading
+        self._init(maxsize)
+        # mutex must be held whenever the queue is mutating.  All methods
+        # that acquire mutex must release it before returning.  mutex
+        # is shared between the two conditions, so acquiring and
+        # releasing the conditions also acquires and releases mutex.
+        self.mutex = threading.RLock()
+        # Notify not_empty whenever an item is added to the queue; a
+        # thread waiting to get is notified then.
+        self.not_empty = threading.Condition(self.mutex)
+        # Notify not_full whenever an item is removed from the queue;
+        # a thread waiting to put is notified then.
+        self.not_full = threading.Condition(self.mutex)
+
+    def qsize(self):
+        """Return the approximate size of the queue (not reliable!)."""
+
+        self.mutex.acquire()
+        n = self._qsize()
+        self.mutex.release()
+        return n
+
+    def empty(self):
+        """Return True if the queue is empty, False otherwise (not reliable!)."""
+
+        self.mutex.acquire()
+        n = self._empty()
+        self.mutex.release()
+        return n
+
+    def full(self):
+        """Return True if the queue is full, False otherwise (not reliable!)."""
+
+        self.mutex.acquire()
+        n = self._full()
+        self.mutex.release()
+        return n
+
+    def put(self, item, block=True, timeout=None):
+        """Put an item into the queue.
+
+        If optional args `block` is True and `timeout` is None (the
+        default), block if necessary until a free slot is
+        available. If `timeout` is a positive number, it blocks at
+        most `timeout` seconds and raises the ``Full`` exception if no
+        free slot was available within that time.  Otherwise (`block`
+        is false), put an item on the queue if a free slot is
+        immediately available, else raise the ``Full`` exception
+        (`timeout` is ignored in that case).
+        """
+
+        self.not_full.acquire()
+        try:
+            if not block:
+                if self._full():
+                    raise Full
+            elif timeout is None:
+                while self._full():
+                    self.not_full.wait()
+            else:
+                if timeout < 0:
+                    raise ValueError("'timeout' must be a positive number")
+                endtime = _time() + timeout
+                while self._full():
+                    remaining = endtime - _time()
+                    if remaining <= 0.0:
+                        raise Full
+                    self.not_full.wait(remaining)
+            self._put(item)
+            self.not_empty.notify()
+        finally:
+            self.not_full.release()
+
+    def put_nowait(self, item):
+        """Put an item into the queue without blocking.
+
+        Only enqueue the item if a free slot is immediately available.
+        Otherwise raise the ``Full`` exception.
+        """
+        return self.put(item, False)
+
+    def get(self, block=True, timeout=None):
+        """Remove and return an item from the queue.
+
+        If optional args `block` is True and `timeout` is None (the
+        default), block if necessary until an item is available. If
+        `timeout` is a positive number, it blocks at most `timeout`
+        seconds and raises the ``Empty`` exception if no item was
+        available within that time.  Otherwise (`block` is false),
+        return an item if one is immediately available, else raise the
+        ``Empty`` exception (`timeout` is ignored in that case).
+        """
+
+        self.not_empty.acquire()
+        try:
+            if not block:
+                if self._empty():
+                    raise Empty
+            elif timeout is None:
+                while self._empty():
+                    self.not_empty.wait()
+            else:
+                if timeout < 0:
+                    raise ValueError("'timeout' must be a positive number")
+                endtime = _time() + timeout
+                while self._empty():
+                    remaining = endtime - _time()
+                    if remaining <= 0.0:
+                        raise Empty
+                    self.not_empty.wait(remaining)
+            item = self._get()
+            self.not_full.notify()
+            return item
+        finally:
+            self.not_empty.release()
+
+    def get_nowait(self):
+        """Remove and return an item from the queue without blocking.
+
+        Only get an item if one is immediately available. Otherwise
+        raise the ``Empty`` exception.
+        """
+
+        return self.get(False)
+
+    # Override these methods to implement other queue organizations
+    # (e.g. stack or priority queue).
+    # These will only be called with appropriate locks held
+
+    # Initialize the queue representation
+    def _init(self, maxsize):
+        self.maxsize = maxsize
+        self.queue = deque()
+
+    def _qsize(self):
+        return len(self.queue)
+
+    # Check whether the queue is empty
+    def _empty(self):
+        return not self.queue
+
+    # Check whether the queue is full
+    def _full(self):
+        return self.maxsize > 0 and len(self.queue) == self.maxsize
+
+    # Put a new item in the queue
+    def _put(self, item):
+        self.queue.append(item)
+
+    # Get an item from the queue
+    def _get(self):
+        return self.queue.popleft()