1. paulgb
  2. sqlalchemy

Commits

Mike Bayer  committed a27e52c

- NullPool supports reconnect on failure behavior.
[ticket:1094]

  • Participants
  • Parent commits 1b386c3
  • Branches rel_0_4

Comments (0)

Files changed (4)

File CHANGES

View file
     - Connection.invalidate() checks for closed status 
       to avoid attribute errors. [ticket:1246]
 
+    - NullPool supports reconnect on failure behavior.
+      [ticket:1094]
+
 - postgres
     - Added Index reflection support to Postgres, using a
       great patch we long neglected, submitted by 

File lib/sqlalchemy/pool.py

View file
     def do_get(self):
         return self.create_connection()
 
+    def recreate(self):
+        self.log("Pool recreating")
+
+        return NullPool(self._creator, 
+            recycle=self._recycle, 
+            echo=self._should_log_info, 
+            use_threadlocal=self._use_threadlocal, 
+            listeners=self.listeners)
+
+    def dispose(self):
+        pass
+        
 class StaticPool(Pool):
     """A Pool of exactly one connection, used for all requests."""
 

File test/engine/pool.py

View file
         c.close()
         assert counts == [1, 2, 3]
 
-
-
     def tearDown(self):
        pool.clear_managers()
 
+class NullPoolTest(TestBase):
+   def test_reconnect(self):
+       dbapi = MockDBAPI()
+       p = pool.NullPool(creator = lambda: dbapi.connect('foo.db'))
+       c1 = p.connect()
+       c_id = c1.connection.id
+       c1.close(); c1=None
+
+       c1 = p.connect()
+       dbapi.raise_error = True
+       c1.invalidate()
+       c1 = None
+
+       c1 = p.connect()
+       assert c1.connection.id != c_id
+
+
 
 if __name__ == "__main__":
     testenv.main()

File test/engine/reconnect.py

View file
 import testenv; testenv.configure_for_tests()
 import sys, weakref
-from sqlalchemy import create_engine, exceptions, select, MetaData, Table, Column, Integer, String
+from sqlalchemy import create_engine, exceptions, select, MetaData, Table, Column, Integer, String, pool
 from testlib import *
 import time
 import gc
 
         conn.close()
     
+    def test_null_pool(self):
+        engine = engines.reconnecting_engine(options=dict(poolclass=pool.NullPool))
+        conn = engine.connect()
+        self.assertEquals(conn.execute(select([1])).scalar(), 1)
+        assert not conn.closed
+        engine.test_shutdown()
+        try:
+            conn.execute(select([1]))
+            assert False
+        except exceptions.DBAPIError, e:
+            if not e.connection_invalidated:
+                raise
+        assert not conn.closed
+        assert conn.invalidated
+        self.assertEquals(conn.execute(select([1])).scalar(), 1)
+        assert not conn.invalidated
+    
     def test_close(self):
         conn = engine.connect()
         self.assertEquals(conn.execute(select([1])).scalar(), 1)