Commits

Anonymous committed 4560899

Ported r3919:3922,r3924,r3929,r3932 and r3934 to 0.10-stable (''database pool fixes'')

Comments (0)

Files changed (2)

 import os
 import urllib
 
-from trac.config import Option
+from trac.config import Option, IntOption
 from trac.core import *
 from trac.db.pool import ConnectionPool
 
         [wiki:TracEnvironment#DatabaseConnectionStrings string] for this
         project""")
 
+    timeout = IntOption('trac', 'timeout', '20',
+        """Timeout value for database connection, in seconds.
+        Use '0' to specify ''no timeout''. ''(Since 0.11)''""")
+
     def __init__(self):
         self._cnx_pool = None
 
         if not self._cnx_pool:
             connector, args = self._get_connector()
             self._cnx_pool = ConnectionPool(5, connector, **args)
-        return self._cnx_pool.get_cnx()
+        return self._cnx_pool.get_cnx(self.timeout or None)
 
     def shutdown(self, tid=None):
         if self._cnx_pool:
         self.close()
 
 
+def try_rollback(cnx):
+    """Resets the Connection in a safe way, returning True when it succeeds.
+    
+    The rollback we do for safety on a Connection can fail at
+    critical times because of a timeout on the Connection.
+    """
+    try:
+        cnx.rollback() # resets the connection
+        return True
+    except Exception:
+        cnx.close()
+        return False
+
 class ConnectionPool(object):
     """A very simple connection pool implementation."""
 
     def __init__(self, maxsize, connector, **kwargs):
-        self._dormant = [] # inactive connections in pool
+        self._dormant = {} # inactive connections in pool
         self._active = {} # active connections by thread ID
-        self._available = threading.Condition(threading.Lock())
+        self._available = threading.Condition(threading.RLock())
         self._maxsize = maxsize # maximum pool size
         self._cursize = 0 # current pool size, includes active connections
         self._connector = connector
             if tid in self._active:
                 num, cnx = self._active.get(tid)
                 if num == 0: # was pushed back (see _cleanup)
-                    cnx.rollback()
-                self._active[tid][0] = num + 1
-                return PooledConnection(self, cnx, tid)
+                    if not try_rollback(cnx):
+                        del self._active[tid]
+                        cnx = None
+                if cnx:
+                    self._active[tid][0] = num + 1
+                    return PooledConnection(self, cnx, tid)
             while True:
                 if self._dormant:
-                    cnx = self._dormant.pop()
-                    try:
-                        cnx.cursor() # check whether the connection is stale
+                    if tid in self._dormant: # prefer same thread
+                        cnx = self._dormant.pop(tid)
+                    else: # pick a random one
+                        cnx = self._dormant.pop(self._dormant.keys()[0])
+                    if try_rollback(cnx):
                         break
-                    except Exception:
-                        cnx.close()
+                    else:
+                        self._cursize -= 1
                 elif self._maxsize and self._cursize < self._maxsize:
                     cnx = self._connector.get_connection(**self._kwargs)
                     self._cursize += 1
                     break
                 else:
                     if timeout:
+                        if (time.time() - start) >= timeout:
+                            raise TimeoutError('Unable to get database '
+                                               'connection within %d seconds'
+                                                % timeout)
                         self._available.wait(timeout)
-                        if (time.time() - start) >= timeout:
-                            raise TimeoutError, 'Unable to get database ' \
-                                                'connection within %d seconds' \
-                                                % timeout
-                    else:
+                    else: # Warning: without timeout, Trac *might* hang
                         self._available.wait()
             self._active[tid] = [1, cnx]
             return PooledConnection(self, cnx, tid)
             self._available.release()
 
     def _cleanup(self, tid):
-        # Note: self._available *must* be acquired
+        """Note: self._available *must* be acquired when calling this one."""
         if tid in self._active:
             cnx = self._active.pop(tid)[1]
-            if cnx not in self._dormant: # hm, how could that happen?
-                if cnx.poolable: # i.e. we can manipulate it from other threads
-                    cnx.rollback()
-                    self._dormant.append(cnx)
-                elif tid == threading._get_ident():
-                    cnx.rollback() # non-poolable but same thread: close
+            assert tid not in self._dormant # hm, how could that happen?
+            if cnx.poolable: # i.e. we can manipulate it from other threads
+                if try_rollback(cnx):
+                    self._dormant[tid] = cnx
+                else:
+                    self._cursize -= 1
+            elif tid == threading._get_ident():
+                if try_rollback(cnx): # non-poolable but same thread: close
                     cnx.close()
-                    self._cursize -= 1
-                else: # non-poolable, different thread: push it back
-                    self._active[tid] = [0, cnx]
-                self._available.notify()
+                self._cursize -= 1
+            else: # non-poolable, different thread: push it back
+                self._active[tid] = [0, cnx]
+            self._available.notify()
 
     def shutdown(self, tid=None):
         self._available.acquire()
             for tid in cleanup_list:
                 self._cleanup(tid)
             if not tid:
-                for cnx in self._dormant:
+                for _, cnx in self._dormant.iteritems():
                     cnx.close()
         finally:
             self._available.release()