Commits

Mike Bayer committed 2b0f2f2

- a new mutex that was added in 0.3.9 causes the pool_timeout
feature to fail during a race condition; threads would
raise TimeoutError immediately with no delay if many threads
push the pool into overflow at the same time. this issue has been
fixed.

Comments (0)

Files changed (4)

 0.3.10
+- general
+    - a new mutex that was added in 0.3.9 causes the pool_timeout
+      feature to fail during a race condition; threads would 
+      raise TimeoutError immediately with no delay if many threads 
+      push the pool into overflow at the same time.  this issue has been
+      fixed.
 - sql
     - better quoting of identifiers when manipulating schemas
     - got connection-bound metadata to work with implicit execution

lib/sqlalchemy/pool.py

         try:
             return self._pool.get(self._max_overflow > -1 and self._overflow >= self._max_overflow, self._timeout)
         except Queue.Empty:
+            if self._max_overflow > -1 and self._overflow >= self._max_overflow:
+                raise exceptions.TimeoutError("QueuePool limit of size %d overflow %d reached, connection timed out, timeout %d" % (self.size(), self.overflow(), self._timeout))
+
             if self._overflow_lock is not None:
                 self._overflow_lock.acquire()
+
+            if self._max_overflow > -1 and self._overflow >= self._max_overflow:
+                if self._overflow_lock is not None:
+                    self._overflow_lock.release()
+                return self.do_get()
+
             try:
-                if self._max_overflow > -1 and self._overflow >= self._max_overflow:
-                    raise exceptions.TimeoutError("QueuePool limit of size %d overflow %d reached, connection timed out" % (self.size(), self.overflow()))
                 con = self.create_connection()
                 self._overflow += 1
             finally:

test/engine/pool.py

 class MockDBAPI(object):
     def __init__(self):
         self.throw_error = False
-    def connect(self, argument):
+    def connect(self, argument, delay=0):
         if self.throw_error:
             raise Exception("couldnt connect !")
+        if delay:
+            time.sleep(delay)
         return MockConnection()
 class MockConnection(object):
     def __init__(self):
     
     def test_timeout(self):
         p = pool.QueuePool(creator = lambda: mock_dbapi.connect('foo.db'), pool_size = 3, max_overflow = 0, use_threadlocal = False, timeout=2)
-        c1 = p.get()
-        c2 = p.get()
-        c3 = p.get()
+        c1 = p.connect()
+        c2 = p.connect()
+        c3 = p.connect()
         now = time.time()
         try:
-            c4 = p.get()
+            c4 = p.connect()
             assert False
         except exceptions.TimeoutError, e:
             assert int(time.time() - now) == 2
 
+    def test_timeout_race(self):
+        # test a race condition where the initial connecting threads all race to queue.Empty, then block on the mutex.
+        # each thread consumes a connection as they go in.  when the limit is reached, the remaining threads
+        # go in, and get TimeoutError; even though they never got to wait for the timeout on queue.get().
+        # the fix involves checking the timeout again within the mutex, and if so, unlocking and throwing them back to the start
+        # of do_get()
+        p = pool.QueuePool(creator = lambda: mock_dbapi.connect('foo.db', delay=.05), pool_size = 2, max_overflow = 1, use_threadlocal = False, timeout=3)
+        timeouts = []
+        def checkout():
+            for x in xrange(1):
+                now = time.time()
+                try:
+                    c1 = p.connect()
+                except exceptions.TimeoutError, e:
+                    timeouts.append(int(time.time()) - now)
+                    continue
+                time.sleep(4)
+                c1.close()
+            
+        threads = []
+        for i in xrange(10):
+            th = threading.Thread(target=checkout)
+            th.start()
+            threads.append(th)
+        for th in threads:
+            th.join()
+        
+        print timeouts
+        assert len(timeouts) > 0
+        for t in timeouts:
+            assert abs(t - 2) < 1
+        
     def _test_overflow(self, thread_count, max_overflow):
         def creator():
             time.sleep(.05)

test/perf/poolload.py

-# this program should open three connections.  then after five seconds, the remaining
-# 45 threads should receive a timeout error.  then the program will just stop until
-# ctrl-C is pressed.  it should *NOT* open a bunch of new connections.
+# load test of connection pool
 
 from sqlalchemy import *
 import sqlalchemy.pool as pool
-import psycopg2 as psycopg
 import thread,time
-psycopg = pool.manage(psycopg,pool_size=2,max_overflow=1, timeout=5, echo=True)
-print psycopg
-db = create_engine('postgres://scott:tiger@127.0.0.1/test',pool=psycopg,strategy='threadlocal')
-print db.connection_provider._pool
+db = create_engine('mysql://scott:tiger@127.0.0.1/test', pool_timeout=30, echo_pool=True)
+
 metadata = MetaData(db)
 
 users_table = Table('users', metadata,
   Column('user_id', Integer, primary_key=True),
   Column('user_name', String(40)),
   Column('password', String(10)))
+metadata.drop_all()
 metadata.create_all()
 
-class User(object):
-    pass
-usermapper = mapper(User, users_table)
+users_table.insert().execute([{'user_name':'user#%d' % i, 'password':'pw#%d' % i} for i in range(1000)])
 
-#Then i create loads of threads and in run() of each thread:
-def run():
-    session = create_session()
-    transaction = session.create_transaction()
-    query = session.query(User)
-    u1=query.select(User.c.user_id==3)
-    
-for x in range(0,50):
-    thread.start_new_thread(run, ())
+def runfast():
+    while True:
+        c = db.connection_provider._pool.connect()
+        time.sleep(.5)
+        c.close()
+#        result = users_table.select(limit=100).execute()
+#        d = {}
+#        for row in result:
+#            for col in row.keys():
+#                d[col] = row[col]
+#        time.sleep(.005)
+#        result.close()
+        print "runfast cycle complete"
+        
+#thread.start_new_thread(runslow, ())                
+for x in xrange(0,50):
+    thread.start_new_thread(runfast, ())
 
-while True:
-    time.sleep(5)
+time.sleep(100)