Commits

Mike Bayer committed 1fbddf9

- fix to connection pool _close() to properly clean up, fixes
MySQL synchronization errors [ticket:387]

Comments (0)

Files changed (5)

   could be unnecessarily cascaded on the save/update cascade
 - MySQL detects errors 2006 (server has gone away) and 2014
 (commands out of sync) and invalidates the connection on which it occured.
+- fix to connection pool _close() to properly clean up, fixes
+MySQL synchronization errors [ticket:387]
 - added keywords for EXCEPT, INTERSECT, EXCEPT ALL, INTERSECT ALL
 [ticket:247]
 - added label() function to Select class, when scalar=True is used

lib/sqlalchemy/engine/base.py

             self.cursor.close()
             if self.connection.should_close_with_result and self.dialect.supports_autoclose_results:
                 self.connection.close()
-    
+
     def _convert_key(self, key):
         """given a key, which could be a ColumnElement, string, etc., matches it to the 
         appropriate key we got from the result set's metadata; then cache it locally for quick re-access."""

lib/sqlalchemy/pool.py

 be managed automatically, based on module type and connect arguments,
  simply by calling regular DBAPI connect() methods."""
 
-import weakref, string, time, sys
+import weakref, string, time, sys, traceback
 try:
     import cPickle as pickle
 except:
     """proxies a DBAPI connection object and provides return-on-dereference support"""
     def __init__(self, pool):
         self._threadfairy = _ThreadFairy(self)
-        self.cursors = {}
+        self._cursors = {}
         self.__pool = pool
         self.__counter = 0
         try:
             raise exceptions.InvalidRequestError("This connection is closed")
         self._connection_record.invalidate()
         self.connection = None
-        self.cursors = None
+        self._cursors = None
         self._close()
     def cursor(self, *args, **kwargs):
         try:
         self.__counter +=1
         return self    
     def close_open_cursors(self):
-        if self.cursors is not None:
-            for c in list(self.cursors):
+        if self._cursors is not None:
+            for c in list(self._cursors):
                 c.close()
     def close(self):
         self.__counter -=1
     def __del__(self):
         self._close()
     def _close(self):
-        if self.cursors is not None:
+        if self._cursors is not None:
             # cursors should be closed before connection is returned to the pool.  some dbapis like
             # mysql have real issues if they are not.
             if self.__pool.auto_close_cursors:
                 self.close_open_cursors()
             elif self.__pool.disallow_open_cursors:
-                if len(self.cursors):
-                    raise exceptions.InvalidRequestError("This connection still has %d open cursors" % len(self.cursors))
+                if len(self._cursors):
+                    raise exceptions.InvalidRequestError("This connection still has %d open cursors" % len(self._cursors))
         if self.connection is not None:
             try:
                 self.connection.rollback()
             except:
-                # damn mysql -- (todo look for NotSupportedError)
-                pass
+                if self._connection_record is not None:
+                    self._connection_record.invalidate()
         if self._connection_record is not None:
             if self.__pool.echo:
                 self.__pool.log("Connection %s being returned to pool" % repr(self.connection))
             self.__pool.return_conn(self)
+        self.connection = None
         self._connection_record = None
         self._threadfairy = None
-            
+        self._cursors = None
+        
 class _CursorFairy(object):
     def __init__(self, parent, cursor):
         self.__parent = parent
-        self.__parent.cursors[self]=True
+        self.__parent._cursors[self]=True
         self.cursor = cursor
     def invalidate(self):
         self.__parent.invalidate()
     def close(self):
-        if self in self.__parent.cursors:
-            del self.__parent.cursors[self]
+        if self in self.__parent._cursors:
+            del self.__parent._cursors[self]
             self.cursor.close()
     def __getattr__(self, key):
         return getattr(self.cursor, key)

test/engine/pool.py

         mcid += 1
     def close(self):
         pass
+    def rollback(self):
+        pass
     def cursor(self):
         return MockCursor()
 class MockCursor(object):
         c1 = p.connect()
         c_id = c1.connection.id
         c1.close(); c1=None
-
         c1 = p.connect()
         assert c1.connection.id == c_id
         c1.invalidate()

test/perf/wsgi.py

+#!/usr/bin/python
+
+from sqlalchemy import *
+import sqlalchemy.pool as pool
+import thread
+from sqlalchemy import exceptions
+
+import logging
+logging.basicConfig()
+logging.getLogger('sqlalchemy.pool').setLevel(logging.INFO)
+
+threadids = set()
+#meta = BoundMetaData('postgres://scott:tiger@127.0.0.1/test')
+
+#meta = BoundMetaData('mysql://scott:tiger@localhost/test', poolclass=pool.SingletonThreadPool)
+meta = BoundMetaData('mysql://scott:tiger@localhost/test')
+foo = Table('foo', meta, 
+    Column('id', Integer, primary_key=True),
+    Column('data', String(30)))
+
+meta.drop_all()
+meta.create_all()
+
+data = []
+for x in range(1,500):
+    data.append({'id':x,'data':"this is x value %d" % x})
+foo.insert().execute(data)
+
+class Foo(object):
+    pass
+
+mapper(Foo, foo)
+
+root = './'
+port = 8000
+
+def serve(environ, start_response):
+    sess = create_session()
+    l = sess.query(Foo).select()
+            
+    start_response("200 OK", [('Content-type','text/plain')])
+    threadids.add(thread.get_ident())
+    print "sending response on thread", thread.get_ident(), " total threads ", len(threadids)
+    return ["\n".join([x.data for x in l])]
+
+        
+if __name__ == '__main__':
+    from wsgiutils import wsgiServer
+    server = wsgiServer.WSGIServer (('localhost', port), {'/': serve})
+    print "Server listening on port %d" % port
+    server.serve_forever()
+
+
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.