Commits

Mike Bayer  committed 77ce1bc

- [feature] Connection event listeners can
now be associated with individual
Connection objects, not just Engine
objects. [ticket:2511]

  • Participants
  • Parent commits 6961cde

Comments (0)

Files changed (8)

       (use sqlalchemy.ext.horizontal_shard)
 
 - engine
+  - [feature] Connection event listeners can
+    now be associated with individual
+    Connection objects, not just Engine
+    objects.  [ticket:2511]
+
   - [bug] Fixed bug whereby if a database restart
     affected multiple connections, each
     connection would individually invoke a new

File doc/build/core/events.rst

 .. autoclass:: sqlalchemy.events.PoolEvents
    :members:
 
-Connection Events
------------------------
+SQL Execution and Connection Events
+------------------------------------
 
 .. autoclass:: sqlalchemy.events.ConnectionEvents
     :members:

File lib/sqlalchemy/engine/base.py

     'connection_memoize']
 
 import inspect, StringIO, sys, operator
-from itertools import izip
+from itertools import izip, chain
 from sqlalchemy import exc, schema, util, types, log, interfaces, \
     event, events
 from sqlalchemy.sql import expression, util as sql_util
       Postgresql.
 
     implicit_returning
-      use RETURNING or equivalent during INSERT execution in order to load 
+      use RETURNING or equivalent during INSERT execution in order to load
       newly generated primary keys and other column defaults in one execution,
       which are then available via inserted_primary_key.
-      If an insert statement has returning() specified explicitly, 
+      If an insert statement has returning() specified explicitly,
       the "implicit" functionality is not used and inserted_primary_key
       will not be available.
 
         Allows dialects to configure options based on server version info or
         other properties.
 
-        The connection passed here is a SQLAlchemy Connection object, 
+        The connection passed here is a SQLAlchemy Connection object,
         with full capabilities.
 
         The initalize() method of the base dialect should be called via
         set) is specified, limit the autoload to the given column
         names.
 
-        The default implementation uses the 
-        :class:`~sqlalchemy.engine.reflection.Inspector` interface to 
+        The default implementation uses the
+        :class:`~sqlalchemy.engine.reflection.Inspector` interface to
         provide the output, building upon the granular table/column/
         constraint etc. methods of :class:`.Dialect`.
 
 
     def get_primary_keys(self, connection, table_name, schema=None, **kw):
         """Return information about primary keys in `table_name`.
-        
-        
-        Deprecated.  This method is only called by the default 
+
+
+        Deprecated.  This method is only called by the default
         implementation of :meth:`get_pk_constraint()`.  Dialects should
         instead implement this method directly.
-        
+
         """
 
         raise NotImplementedError()
         table_name`.
 
         Given a :class:`.Connection`, a string
-        `table_name`, and an optional string `schema`, return primary 
+        `table_name`, and an optional string `schema`, return primary
         key information as a dictionary with these keys:
 
         constrained_columns
         raise NotImplementedError()
 
     def normalize_name(self, name):
-        """convert the given name to lowercase if it is detected as 
+        """convert the given name to lowercase if it is detected as
         case insensitive.
 
         this method is only used if the dialect defines
         raise NotImplementedError()
 
     def _get_default_schema_name(self, connection):
-        """Return the string name of the currently selected schema from 
+        """Return the string name of the currently selected schema from
         the given connection.
 
         This is used by the default implementation to populate the
         raise NotImplementedError()
 
     def do_begin(self, connection):
-        """Provide an implementation of *connection.begin()*, given a 
+        """Provide an implementation of *connection.begin()*, given a
         DB-API connection."""
 
         raise NotImplementedError()
 
     def do_rollback(self, connection):
-        """Provide an implementation of *connection.rollback()*, given 
+        """Provide an implementation of *connection.rollback()*, given
         a DB-API connection."""
 
         raise NotImplementedError()
         raise NotImplementedError()
 
     def do_commit(self, connection):
-        """Provide an implementation of *connection.commit()*, given a 
+        """Provide an implementation of *connection.commit()*, given a
         DB-API connection."""
 
         raise NotImplementedError()
     def connect(self):
         """return a callable which sets up a newly created DBAPI connection.
 
-        The callable accepts a single argument "conn" which is the 
+        The callable accepts a single argument "conn" which is the
         DBAPI connection itself.  It has no return value.
 
         This is used to set dialect-wide per-connection options such as
       True if the statement is a "committable" statement.
 
     prefetch_cols
-      a list of Column objects for which a client-side default 
+      a list of Column objects for which a client-side default
       was fired off.  Applies to inserts and updates.
 
     postfetch_cols
         raise NotImplementedError()
 
     def handle_dbapi_exception(self, e):
-        """Receive a DBAPI exception which occurred upon execute, result 
+        """Receive a DBAPI exception which occurred upon execute, result
         fetch, etc."""
 
         raise NotImplementedError()
 
     def should_autocommit_text(self, statement):
-        """Parse the given textual statement and return True if it refers to 
+        """Parse the given textual statement and return True if it refers to
         a "committable" statement"""
 
         raise NotImplementedError()
     def get_rowcount(self):
         """Return the DBAPI ``cursor.rowcount`` value, or in some
         cases an interpreted value.
-        
+
         See :attr:`.ResultProxy.rowcount` for details on this.
 
         """
 
         :param statement: ``ClauseElement`` to be compiled.
 
-        :param bind: Optional Engine or Connection to compile this 
+        :param bind: Optional Engine or Connection to compile this
           statement against.
         """
 
         return e._execute_compiled(self, multiparams, params)
 
     def scalar(self, *multiparams, **params):
-        """Execute this compiled object and return the result's 
+        """Execute this compiled object and return the result's
         scalar value."""
 
         return self.execute(*multiparams, **params).scalar()
 
     """
 
+    dispatch = event.dispatcher(events.ConnectionEvents)
+
+
     def connect(self, **kwargs):
         """Return a :class:`.Connection` object.
 
         Depending on context, this may be ``self`` if this object
-        is already an instance of :class:`.Connection`, or a newly 
+        is already an instance of :class:`.Connection`, or a newly
         procured :class:`.Connection` if this object is an instance
         of :class:`.Engine`.
 
         context.
 
         Depending on context, this may be ``self`` if this object
-        is already an instance of :class:`.Connection`, or a newly 
+        is already an instance of :class:`.Connection`, or a newly
         procured :class:`.Connection` if this object is an instance
         of :class:`.Engine`.
 
         """
         raise NotImplementedError()
 
-    def _run_visitor(self, visitorcallable, element, 
+    def _run_visitor(self, visitorcallable, element,
                                     **kwargs):
         raise NotImplementedError()
 
     """
 
     def __init__(self, engine, connection=None, close_with_result=False,
-                 _branch=False, _execution_options=None):
+                 _branch=False, _execution_options=None,
+                 _dispatch=None,
+                 _has_events=False):
         """Construct a new Connection.
 
         The constructor here is not public and is only called only by an
         self.__savepoint_seq = 0
         self.__branch = _branch
         self.__invalid = False
-        self._has_events = engine._has_events
+        if _dispatch:
+            self.dispatch = _dispatch
+        self._has_events = _has_events or engine._has_events
         self._echo = self.engine._should_log_info()
         if _execution_options:
             self._execution_options =\
         """
 
         return self.engine._connection_cls(
-                                self.engine, 
-                                self.__connection, _branch=True)
+                                self.engine,
+                                self.__connection,
+                                _branch=True,
+                                _has_events=self._has_events,
+                                _dispatch=self.dispatch)
 
     def _clone(self):
         """Create a shallow copy of this Connection.
         self.close()
 
     def execution_options(self, **opt):
-        """ Set non-SQL options for the connection which take effect 
+        """ Set non-SQL options for the connection which take effect
         during execution.
 
         The method returns a copy of this :class:`.Connection` which references
 
         :meth:`.Connection.execution_options` accepts all options as those
         accepted by :meth:`.Executable.execution_options`.  Additionally,
-        it includes options that are applicable only to 
+        it includes options that are applicable only to
         :class:`.Connection`.
 
         :param autocommit: Available on: Connection, statement.
-          When True, a COMMIT will be invoked after execution 
+          When True, a COMMIT will be invoked after execution
           when executed in 'autocommit' mode, i.e. when an explicit
           transaction is not begun on the connection. Note that DBAPI
           connections by default are always in a transaction - SQLAlchemy uses
 
         :param compiled_cache: Available on: Connection.
           A dictionary where :class:`.Compiled` objects
-          will be cached when the :class:`.Connection` compiles a clause 
+          will be cached when the :class:`.Connection` compiles a clause
           expression into a :class:`.Compiled` object.
           It is the user's responsibility to
           manage the size of this dictionary, which will have keys
           corresponding to the dialect, clause element, the column
-          names within the VALUES or SET clause of an INSERT or UPDATE, 
+          names within the VALUES or SET clause of an INSERT or UPDATE,
           as well as the "batch" mode for an INSERT or UPDATE statement.
           The format of this dictionary is not guaranteed to stay the
           same in future releases.
 
-          Note that the ORM makes use of its own "compiled" caches for 
+          Note that the ORM makes use of its own "compiled" caches for
           some operations, including flush operations.  The caching
           used by the ORM internally supersedes a cache dictionary
           specified here.
           the lifespan of this connection.   Valid values include
           those string values accepted by the ``isolation_level``
           parameter passed to :func:`.create_engine`, and are
-          database specific, including those for :ref:`sqlite_toplevel`, 
+          database specific, including those for :ref:`sqlite_toplevel`,
           :ref:`postgresql_toplevel` - see those dialect's documentation
           for further info.
 
-          Note that this option necessarily affects the underlying 
-          DBAPI connection for the lifespan of the originating 
-          :class:`.Connection`, and is not per-execution. This 
-          setting is not removed until the underlying DBAPI connection 
+          Note that this option necessarily affects the underlying
+          DBAPI connection for the lifespan of the originating
+          :class:`.Connection`, and is not per-execution. This
+          setting is not removed until the underlying DBAPI connection
           is returned to the connection pool, i.e.
           the :meth:`.Connection.close` method is called.
 
-        :param no_parameters: When ``True``, if the final parameter 
-          list or dictionary is totally empty, will invoke the 
+        :param no_parameters: When ``True``, if the final parameter
+          list or dictionary is totally empty, will invoke the
           statement on the cursor as ``cursor.execute(statement)``,
           not passing the parameter collection at all.
           Some DBAPIs such as psycopg2 and mysql-python consider
-          percent signs as significant only when parameters are 
+          percent signs as significant only when parameters are
           present; this option allows code to generate SQL
           containing percent signs (and possibly other characters)
           that is neutral regarding whether it's executed by the DBAPI
-          or piped into a script that's later invoked by 
+          or piped into a script that's later invoked by
           command line tools.
 
           .. versionadded:: 0.7.6
 
         :param stream_results: Available on: Connection, statement.
-          Indicate to the dialect that results should be 
+          Indicate to the dialect that results should be
           "streamed" and not pre-buffered, if possible.  This is a limitation
           of many DBAPIs.  The flag is currently understood only by the
           psycopg2 dialect.
         return c
 
     def _set_isolation_level(self):
-        self.dialect.set_isolation_level(self.connection, 
+        self.dialect.set_isolation_level(self.connection,
                                 self._execution_options['isolation_level'])
         self.connection._connection_record.finalize_callback = \
                     self.dialect.reset_isolation_level
         return self
 
     def invalidate(self, exception=None):
-        """Invalidate the underlying DBAPI connection associated with 
+        """Invalidate the underlying DBAPI connection associated with
         this Connection.
 
         The underlying DB-API connection is literally closed (if
 
         Nested calls to :meth:`.begin` on the same :class:`.Connection`
         will return new :class:`.Transaction` objects that represent
-        an emulated transaction within the scope of the enclosing 
+        an emulated transaction within the scope of the enclosing
         transaction, that is::
-        
+
             trans = conn.begin()   # outermost transaction
-            trans2 = conn.begin()  # "nested" 
+            trans2 = conn.begin()  # "nested"
             trans2.commit()        # does nothing
             trans.commit()         # actually commits
-            
-        Calls to :meth:`.Transaction.commit` only have an effect 
+
+        Calls to :meth:`.Transaction.commit` only have an effect
         when invoked via the outermost :class:`.Transaction` object, though the
         :meth:`.Transaction.rollback` method of any of the
         :class:`.Transaction` objects will roll back the
         transaction.
 
         See also:
-        
+
         :meth:`.Connection.begin_nested` - use a SAVEPOINT
-        
+
         :meth:`.Connection.begin_twophase` - use a two phase /XID transaction
-        
+
         :meth:`.Engine.begin` - context manager available from :class:`.Engine`.
 
         """
         still controls the overall ``commit`` or ``rollback`` of the
         transaction of a whole.
 
-        See also :meth:`.Connection.begin`, 
+        See also :meth:`.Connection.begin`,
         :meth:`.Connection.begin_twophase`.
         """
 
         :class:`.Transaction`, also provides a :meth:`~.TwoPhaseTransaction.prepare`
         method.
 
-        :param xid: the two phase transaction id.  If not supplied, a 
+        :param xid: the two phase transaction id.  If not supplied, a
           random id will be generated.
 
-        See also :meth:`.Connection.begin`, 
+        See also :meth:`.Connection.begin`,
         :meth:`.Connection.begin_twophase`.
 
         """
             self.engine.logger.info("BEGIN (implicit)")
 
         if self._has_events:
+            self.dispatch.begin(self)
             self.engine.dispatch.begin(self)
 
         try:
 
     def _rollback_impl(self):
         if self._has_events:
+            self.dispatch.rollback(self)
             self.engine.dispatch.rollback(self)
 
         if self._still_open_and_connection_is_valid:
 
     def _commit_impl(self):
         if self._has_events:
+            self.dispatch.commit(self)
             self.engine.dispatch.commit(self)
 
         if self._echo:
 
     def _savepoint_impl(self, name=None):
         if self._has_events:
+            self.dispatch.savepoint(self, name)
             self.engine.dispatch.savepoint(self, name)
 
         if name is None:
 
     def _rollback_to_savepoint_impl(self, name, context):
         if self._has_events:
+            self.dispatch.rollback_savepoint(self, name, context)
             self.engine.dispatch.rollback_savepoint(self, name, context)
 
         if self._still_open_and_connection_is_valid:
 
     def _release_savepoint_impl(self, name, context):
         if self._has_events:
+            self.dispatch.release_savepoint(self, name, context)
             self.engine.dispatch.release_savepoint(self, name, context)
 
         if self._still_open_and_connection_is_valid:
 
     def _begin_twophase_impl(self, xid):
         if self._has_events:
+            self.dispatch.begin_twophase(self, xid)
             self.engine.dispatch.begin_twophase(self, xid)
 
         if self._still_open_and_connection_is_valid:
 
     def _prepare_twophase_impl(self, xid):
         if self._has_events:
+            self.dispatch.prepare_twophase(self, xid)
             self.engine.dispatch.prepare_twophase(self, xid)
 
         if self._still_open_and_connection_is_valid:
 
     def _rollback_twophase_impl(self, xid, is_prepared):
         if self._has_events:
+            self.dispatch.rollback_twophase(self, xid, is_prepared)
             self.engine.dispatch.rollback_twophase(self, xid, is_prepared)
 
         if self._still_open_and_connection_is_valid:
 
     def _commit_twophase_impl(self, xid, is_prepared):
         if self._has_events:
+            self.dispatch.commit_twophase(self, xid, is_prepared)
             self.engine.dispatch.commit_twophase(self, xid, is_prepared)
 
         if self._still_open_and_connection_is_valid:
     def execute(self, object, *multiparams, **params):
         """Executes the a SQL statement construct and returns a :class:`.ResultProxy`.
 
-        :param object: The statement to be executed.  May be 
+        :param object: The statement to be executed.  May be
          one of:
 
          * a plain string
          * any :class:`.ClauseElement` construct that is also
-           a subclass of :class:`.Executable`, such as a 
+           a subclass of :class:`.Executable`, such as a
            :func:`~.expression.select` construct
          * a :class:`.FunctionElement`, such as that generated
            by :attr:`.func`, will be automatically wrapped in
          dictionaries passed to \*multiparams::
 
              conn.execute(
-                 table.insert(), 
+                 table.insert(),
                  {"id":1, "value":"v1"},
                  {"id":2, "value":"v2"}
              )
                  table.insert(), id=1, value="v1"
              )
 
-         In the case that a plain SQL string is passed, and the underlying 
+         In the case that a plain SQL string is passed, and the underlying
          DBAPI accepts positional bind parameters, a collection of tuples
          or individual values in \*multiparams may be passed::
- 
+
              conn.execute(
                  "INSERT INTO table (id, value) VALUES (?, ?)",
                  (1, "v1"), (2, "v2")
              )
 
          Note above, the usage of a question mark "?" or other
-         symbol is contingent upon the "paramstyle" accepted by the DBAPI 
+         symbol is contingent upon the "paramstyle" accepted by the DBAPI
          in use, which may be any of "qmark", "named", "pyformat", "format",
-         "numeric".   See `pep-249 <http://www.python.org/dev/peps/pep-0249/>`_ 
+         "numeric".   See `pep-249 <http://www.python.org/dev/peps/pep-0249/>`_
          for details on paramstyle.
 
          To execute a textual SQL statement which uses bound parameters in a
         for c in type(object).__mro__:
             if c in Connection.executors:
                 return Connection.executors[c](
-                                                self, 
+                                                self,
                                                 object,
-                                                multiparams, 
+                                                multiparams,
                                                 params)
         else:
             raise exc.InvalidRequestError(
     def _execute_function(self, func, multiparams, params):
         """Execute a sql.FunctionElement object."""
 
-        return self._execute_clauseelement(func.select(), 
+        return self._execute_clauseelement(func.select(),
                                             multiparams, params)
 
     def _execute_default(self, default, multiparams, params):
         """Execute a schema.ColumnDefault object."""
 
         if self._has_events:
-            for fn in self.engine.dispatch.before_execute:
+            for fn in chain(
+                        self.dispatch.before_execute,
+                        self.engine.dispatch.before_execute
+                    ):
                 default, multiparams, params = \
                     fn(self, default, multiparams, params)
 
             self.close()
 
         if self._has_events:
-            self.engine.dispatch.after_execute(self, 
+            self.dispatch.after_execute(self,
+                default, multiparams, params, ret)
+            self.engine.dispatch.after_execute(self,
                 default, multiparams, params, ret)
 
         return ret
         """Execute a schema.DDL object."""
 
         if self._has_events:
-            for fn in self.engine.dispatch.before_execute:
+            for fn in chain(
+                    self.dispatch.before_execute,
+                    self.engine.dispatch.before_execute
+                    ):
                 ddl, multiparams, params = \
                     fn(self, ddl, multiparams, params)
 
         ret = self._execute_context(
             dialect,
             dialect.execution_ctx_cls._init_ddl,
-            compiled, 
+            compiled,
             None,
             compiled
         )
         if self._has_events:
-            self.engine.dispatch.after_execute(self, 
+            self.engine.dispatch.after_execute(self,
                 ddl, multiparams, params, ret)
         return ret
 
         """Execute a sql.ClauseElement object."""
 
         if self._has_events:
-            for fn in self.engine.dispatch.before_execute:
+            for fn in chain(
+                        self.dispatch.before_execute,
+                        self.engine.dispatch.before_execute
+                    ):
                 elem, multiparams, params = \
                     fn(self, elem, multiparams, params)
 
                 compiled_sql = self._execution_options['compiled_cache'][key]
             else:
                 compiled_sql = elem.compile(
-                                dialect=dialect, column_keys=keys, 
+                                dialect=dialect, column_keys=keys,
                                 inline=len(distilled_params) > 1)
                 self._execution_options['compiled_cache'][key] = compiled_sql
         else:
             compiled_sql = elem.compile(
-                            dialect=dialect, column_keys=keys, 
+                            dialect=dialect, column_keys=keys,
                             inline=len(distilled_params) > 1)
 
 
         ret = self._execute_context(
             dialect,
             dialect.execution_ctx_cls._init_compiled,
-            compiled_sql, 
+            compiled_sql,
             distilled_params,
             compiled_sql, distilled_params
         )
         if self._has_events:
-            self.engine.dispatch.after_execute(self, 
+            self.dispatch.after_execute(self,
+                elem, multiparams, params, ret)
+            self.engine.dispatch.after_execute(self,
                 elem, multiparams, params, ret)
         return ret
 
         """Execute a sql.Compiled object."""
 
         if self._has_events:
-            for fn in self.engine.dispatch.before_execute:
+            for fn in chain(
+                        self.dispatch.before_execute,
+                        self.engine.dispatch.before_execute
+                    ):
                 compiled, multiparams, params = \
                     fn(self, compiled, multiparams, params)
 
         ret = self._execute_context(
             dialect,
             dialect.execution_ctx_cls._init_compiled,
-            compiled, 
+            compiled,
             parameters,
             compiled, parameters
         )
         if self._has_events:
-            self.engine.dispatch.after_execute(self, 
+            self.dispatch.after_execute(self,
+                compiled, multiparams, params, ret)
+            self.engine.dispatch.after_execute(self,
                 compiled, multiparams, params, ret)
         return ret
 
         """Execute a string SQL statement."""
 
         if self._has_events:
-            for fn in self.engine.dispatch.before_execute:
+            for fn in chain(
+                        self.dispatch.before_execute,
+                        self.engine.dispatch.before_execute
+                    ):
                 statement, multiparams, params = \
                     fn(self, statement, multiparams, params)
 
         ret = self._execute_context(
             dialect,
             dialect.execution_ctx_cls._init_statement,
-            statement, 
+            statement,
             parameters,
             statement, parameters
         )
         if self._has_events:
-            self.engine.dispatch.after_execute(self, 
+            self.dispatch.after_execute(self,
+                statement, multiparams, params, ret)
+            self.engine.dispatch.after_execute(self,
                 statement, multiparams, params, ret)
         return ret
 
-    def _execute_context(self, dialect, constructor, 
-                                    statement, parameters, 
+    def _execute_context(self, dialect, constructor,
+                                    statement, parameters,
                                     *args):
         """Create an :class:`.ExecutionContext` and execute, returning
         a :class:`.ResultProxy`."""
 
             context = constructor(dialect, self, conn, *args)
         except Exception, e:
-            self._handle_dbapi_exception(e, 
-                        str(statement), parameters, 
+            self._handle_dbapi_exception(e,
+                        str(statement), parameters,
                         None, None)
             raise
 
             parameters = parameters[0]
 
         if self._has_events:
-            for fn in self.engine.dispatch.before_cursor_execute:
+            for fn in chain(
+                        self.dispatch.before_cursor_execute,
+                        self.engine.dispatch.before_cursor_execute
+                    ):
                 statement, parameters = \
-                            fn(self, cursor, statement, parameters, 
+                            fn(self, cursor, statement, parameters,
                                         context, context.executemany)
 
         if self._echo:
             self.engine.logger.info(statement)
-            self.engine.logger.info("%r", 
+            self.engine.logger.info("%r",
                     sql_util._repr_params(parameters, batches=10))
         try:
             if context.executemany:
                 self.dialect.do_executemany(
-                                    cursor, 
-                                    statement, 
-                                    parameters, 
+                                    cursor,
+                                    statement,
+                                    parameters,
                                     context)
             elif not parameters and context.no_parameters:
                 self.dialect.do_execute_no_params(
-                                    cursor, 
-                                    statement, 
+                                    cursor,
+                                    statement,
                                     context)
             else:
                 self.dialect.do_execute(
-                                    cursor, 
-                                    statement, 
-                                    parameters, 
+                                    cursor,
+                                    statement,
+                                    parameters,
                                     context)
         except Exception, e:
             self._handle_dbapi_exception(
-                                e, 
-                                statement, 
-                                parameters, 
-                                cursor, 
+                                e,
+                                statement,
+                                parameters,
+                                cursor,
                                 context)
             raise
 
 
         if self._has_events:
-            self.engine.dispatch.after_cursor_execute(self, cursor, 
-                                                statement, 
-                                                parameters, 
-                                                context, 
+            self.dispatch.after_cursor_execute(self, cursor,
+                                                statement,
+                                                parameters,
+                                                context,
+                                                context.executemany)
+            self.engine.dispatch.after_cursor_execute(self, cursor,
+                                                statement,
+                                                parameters,
+                                                context,
                                                 context.executemany)
 
         if context.compiled:
             elif not context._is_explicit_returning:
                 result.close(_autoclose_connection=False)
         elif result._metadata is None:
-            # no results, get rowcount 
+            # no results, get rowcount
             # (which requires open cursor on some drivers
             # such as kintersbasdb, mxodbc),
             result.rowcount
 
         This method is used by DefaultDialect for special-case
         executions, such as for sequences and column defaults.
-        The path of statement execution in the majority of cases 
+        The path of statement execution in the majority of cases
         terminates at _execute_context().
 
         """
             self.engine.logger.info("%r", parameters)
         try:
             self.dialect.do_execute(
-                                cursor, 
-                                statement, 
+                                cursor,
+                                statement,
                                 parameters)
         except Exception, e:
             self._handle_dbapi_exception(
-                                e, 
-                                statement, 
-                                parameters, 
+                                e,
+                                statement,
+                                parameters,
                                 cursor,
                                 None)
             raise
             if isinstance(e, (SystemExit, KeyboardInterrupt)):
                 raise
 
-    def _handle_dbapi_exception(self, 
-                                    e, 
-                                    statement, 
-                                    parameters, 
-                                    cursor, 
+    def _handle_dbapi_exception(self,
+                                    e,
+                                    statement,
+                                    parameters,
+                                    cursor,
                                     context):
         if getattr(self, '_reentrant_error', False):
             # Py3K
-            #raise exc.DBAPIError.instance(statement, parameters, e, 
+            #raise exc.DBAPIError.instance(statement, parameters, e,
             #                               self.dialect.dbapi.Error) from e
             # Py2K
-            raise exc.DBAPIError.instance(statement, 
-                                            parameters, 
-                                            e, 
+            raise exc.DBAPIError.instance(statement,
+                                            parameters,
+                                            e,
                                             self.dialect.dbapi.Error), \
                                             None, sys.exc_info()[2]
             # end Py2K
 
             if should_wrap and context:
                 if self._has_events:
-                    self.engine.dispatch.dbapi_error(self, 
-                                                    cursor, 
-                                                    statement, 
-                                                    parameters, 
-                                                    context, 
+                    self.dispatch.dbapi_error(self,
+                                                    cursor,
+                                                    statement,
+                                                    parameters,
+                                                    context,
+                                                    e)
+                    self.engine.dispatch.dbapi_error(self,
+                                                    cursor,
+                                                    statement,
+                                                    parameters,
+                                                    context,
                                                     e)
                 context.handle_dbapi_exception(e)
 
 
             # Py3K
             #raise exc.DBAPIError.instance(
-            #                        statement, 
-            #                        parameters, 
-            #                        e, 
+            #                        statement,
+            #                        parameters,
+            #                        e,
             #                        self.dialect.dbapi.Error,
             #                        connection_invalidated=is_disconnect) \
             #                        from e
             # Py2K
             raise exc.DBAPIError.instance(
-                                    statement, 
-                                    parameters, 
-                                    e, 
+                                    statement,
+                                    parameters,
+                                    e,
                                     self.dialect.dbapi.Error,
                                     connection_invalidated=is_disconnect), \
                                     None, sys.exc_info()[2]
         set) is specified, limit the autoload to the given column
         names.
 
-        The default implementation uses the 
-        :class:`.Inspector` interface to 
+        The default implementation uses the
+        :class:`.Inspector` interface to
         provide the output, building upon the granular table/column/
         constraint etc. methods of :class:`.Dialect`.
 
     def transaction(self, callable_, *args, **kwargs):
         """Execute the given function within a transaction boundary.
 
-        The function is passed this :class:`.Connection` 
+        The function is passed this :class:`.Connection`
         as the first argument, followed by the given \*args and \**kwargs,
         e.g.::
-        
+
             def do_something(conn, x, y):
                 conn.execute("some statement", {'x':x, 'y':y})
 
             conn.transaction(do_something, 5, 10)
 
         The operations inside the function are all invoked within the
-        context of a single :class:`.Transaction`.   
-        Upon success, the transaction is committed.  If an 
+        context of a single :class:`.Transaction`.
+        Upon success, the transaction is committed.  If an
         exception is raised, the transaction is rolled back
         before propagating the exception.
 
            The :meth:`.transaction` method is superseded by
            the usage of the Python ``with:`` statement, which can
            be used with :meth:`.Connection.begin`::
-        
+
                with conn.begin():
                    conn.execute("some statement", {'x':5, 'y':10})
-            
+
            As well as with :meth:`.Engine.begin`::
-           
+
                with engine.begin() as conn:
                    conn.execute("some statement", {'x':5, 'y':10})
-        
+
         See also:
-        
-            :meth:`.Engine.begin` - engine-level transactional 
+
+            :meth:`.Engine.begin` - engine-level transactional
             context
-             
+
             :meth:`.Engine.transaction` - engine-level version of
             :meth:`.Connection.transaction`
 
         The given \*args and \**kwargs are passed subsequent
         to the :class:`.Connection` argument.
 
-        This function, along with :meth:`.Engine.run_callable`, 
+        This function, along with :meth:`.Engine.run_callable`,
         allows a function to be run with a :class:`.Connection`
         or :class:`.Engine` object without the need to know
         which one is being dealt with.
 class Transaction(object):
     """Represent a database transaction in progress.
 
-    The :class:`.Transaction` object is procured by 
+    The :class:`.Transaction` object is procured by
     calling the :meth:`~.Connection.begin` method of
     :class:`.Connection`::
 
         trans.commit()
 
     The object provides :meth:`.rollback` and :meth:`.commit`
-    methods in order to control transaction boundaries.  It 
-    also implements a context manager interface so that 
-    the Python ``with`` statement can be used with the 
+    methods in order to control transaction boundaries.  It
+    also implements a context manager interface so that
+    the Python ``with`` statement can be used with the
     :meth:`.Connection.begin` method::
 
         with connection.begin():
 
 class Engine(Connectable, log.Identified):
     """
-    Connects a :class:`~sqlalchemy.pool.Pool` and 
-    :class:`~sqlalchemy.engine.base.Dialect` together to provide a source 
+    Connects a :class:`~sqlalchemy.pool.Pool` and
+    :class:`~sqlalchemy.engine.base.Dialect` together to provide a source
     of database connectivity and behavior.
 
-    An :class:`.Engine` object is instantiated publicly using the 
+    An :class:`.Engine` object is instantiated publicly using the
     :func:`~sqlalchemy.create_engine` function.
 
     See also:
     _has_events = False
     _connection_cls = Connection
 
-    def __init__(self, pool, dialect, url, 
+    def __init__(self, pool, dialect, url,
                         logging_name=None, echo=None, proxy=None,
                         execution_options=None
                         ):
                 )
             self.update_execution_options(**execution_options)
 
-    dispatch = event.dispatcher(events.ConnectionEvents)
-
     def update_execution_options(self, **opt):
-        """Update the default execution_options dictionary 
+        """Update the default execution_options dictionary
         of this :class:`.Engine`.
 
         The given keys/values in \**opt are added to the
-        default execution options that will be used for 
+        default execution options that will be used for
         all connections.  The initial contents of this dictionary
         can be sent via the ``execution_options`` parameter
         to :func:`.create_engine`.
 
         A new connection pool is created immediately after the old one has
         been disposed.   This new pool, like all SQLAlchemy connection pools,
-        does not make any actual connections to the database until one is 
+        does not make any actual connections to the database until one is
         first requested.
 
         This method has two general use cases:
 
          * When a dropped connection is detected, it is assumed that all
-           connections held by the pool are potentially dropped, and 
+           connections held by the pool are potentially dropped, and
            the entire pool is replaced.
 
-         * An application may want to use :meth:`dispose` within a test 
+         * An application may want to use :meth:`dispose` within a test
            suite that is creating multiple engines.
 
         It is critical to note that :meth:`dispose` does **not** guarantee
         that the application will release all open database connections - only
         those connections that are checked into the pool are closed.
         Connections which remain checked out or have been detached from
-        the engine are not affected. 
+        the engine are not affected.
 
         """
         self.pool = self.pool._replace()
 
         from sqlalchemy.engine import ddl
 
-        self._run_visitor(ddl.SchemaGenerator, entity, 
+        self._run_visitor(ddl.SchemaGenerator, entity,
                                 connection=connection, **kwargs)
 
     @util.deprecated("0.7", "Use the drop() method on the given schema "
 
         from sqlalchemy.engine import ddl
 
-        self._run_visitor(ddl.SchemaDropper, entity, 
+        self._run_visitor(ddl.SchemaDropper, entity,
                                 connection=connection, **kwargs)
 
     def _execute_default(self, default):
             connection.close()
 
     @property
-    @util.deprecated("0.7", 
+    @util.deprecated("0.7",
                 "Use :attr:`~sqlalchemy.sql.expression.func` to create function constructs.")
     def func(self):
         return expression._FunctionGenerator(bind=self)
 
-    @util.deprecated("0.7", 
+    @util.deprecated("0.7",
                 "Use :func:`.expression.text` to create text constructs.")
     def text(self, text, *args, **kwargs):
-        """Return a :func:`~sqlalchemy.sql.expression.text` construct, 
+        """Return a :func:`~sqlalchemy.sql.expression.text` construct,
         bound to this engine.
 
         This is equivalent to::
 
         return expression.text(text, bind=self, *args, **kwargs)
 
-    def _run_visitor(self, visitorcallable, element, 
+    def _run_visitor(self, visitorcallable, element,
                                     connection=None, **kwargs):
         if connection is None:
             conn = self.contextual_connect(close_with_result=False)
         with a :class:`.Transaction` established.
 
         E.g.::
-        
+
             with engine.begin() as conn:
                 conn.execute("insert into table (x, y, z) values (1, 2, 3)")
                 conn.execute("my_special_procedure(5)")
 
-        Upon successful operation, the :class:`.Transaction` 
+        Upon successful operation, the :class:`.Transaction`
         is committed.  If an error is raised, the :class:`.Transaction`
-        is rolled back.  
-        
+        is rolled back.
+
         The ``close_with_result`` flag is normally ``False``, and indicates
         that the :class:`.Connection` will be closed when the operation
         is complete.   When set to ``True``, it indicates the :class:`.Connection`
         has exhausted all result rows.
 
         .. versionadded:: 0.7.6
-        
+
         See also:
-        
+
         :meth:`.Engine.connect` - procure a :class:`.Connection` from
         an :class:`.Engine`.
 
         """Execute the given function within a transaction boundary.
 
         The function is passed a :class:`.Connection` newly procured
-        from :meth:`.Engine.contextual_connect` as the first argument, 
+        from :meth:`.Engine.contextual_connect` as the first argument,
         followed by the given \*args and \**kwargs.
-        
+
         e.g.::
-        
+
             def do_something(conn, x, y):
                 conn.execute("some statement", {'x':x, 'y':y})
 
             engine.transaction(do_something, 5, 10)
-        
+
         The operations inside the function are all invoked within the
-        context of a single :class:`.Transaction`.   
-        Upon success, the transaction is committed.  If an 
+        context of a single :class:`.Transaction`.
+        Upon success, the transaction is committed.  If an
         exception is raised, the transaction is rolled back
         before propagating the exception.
 
            The :meth:`.transaction` method is superseded by
            the usage of the Python ``with:`` statement, which can
            be used with :meth:`.Engine.begin`::
-           
+
                with engine.begin() as conn:
                    conn.execute("some statement", {'x':5, 'y':10})
-        
+
         See also:
-        
-            :meth:`.Engine.begin` - engine-level transactional 
+
+            :meth:`.Engine.begin` - engine-level transactional
             context
-             
+
             :meth:`.Connection.transaction` - connection-level version of
             :meth:`.Engine.transaction`
 
         The given \*args and \**kwargs are passed subsequent
         to the :class:`.Connection` argument.
 
-        This function, along with :meth:`.Connection.run_callable`, 
+        This function, along with :meth:`.Connection.run_callable`,
         allows a function to be run with a :class:`.Connection`
         or :class:`.Engine` object without the need to know
         which one is being dealt with.
 
         """
 
-        return self._connection_cls(self, 
-                                    self.pool.connect(), 
-                                    close_with_result=close_with_result, 
+        return self._connection_cls(self,
+                                    self.pool.connect(),
+                                    close_with_result=close_with_result,
                                     **kwargs)
 
     def table_names(self, schema=None, connection=None):
 
         Uses the given :class:`.Connection`, or if None produces
         its own :class:`.Connection`, and passes the ``table``
-        and ``include_columns`` arguments onto that 
+        and ``include_columns`` arguments onto that
         :class:`.Connection` object's :meth:`.Connection.reflecttable`
         method.  The :class:`.Table` object is then populated
         with new attributes.
     def raw_connection(self):
         """Return a "raw" DBAPI connection from the connection pool.
 
-        The returned object is a proxied version of the DBAPI 
+        The returned object is a proxied version of the DBAPI
         connection object used by the underlying driver in use.
         The object will have all the same behavior as the real DBAPI
         connection, except that its ``close()`` method will result in the
     # __setstate__.
     from sqlalchemy.cresultproxy import safe_rowproxy_reconstructor
 
-    # The extra function embedding is needed so that the 
-    # reconstructor function has the same signature whether or not 
+    # The extra function embedding is needed so that the
+    # reconstructor function has the same signature whether or not
     # the extension is present.
     def rowproxy_reconstructor(cls, state):
         return safe_rowproxy_reconstructor(cls, state)
         return iter(self)
 
 try:
-    # Register RowProxy with Sequence, 
+    # Register RowProxy with Sequence,
     # so sequence protocol is implemented
     from collections import Sequence
     Sequence.register(RowProxy)
 
             if context.result_map:
                 try:
-                    name, obj, type_ = context.result_map[colname 
-                                                    if self.case_sensitive 
+                    name, obj, type_ = context.result_map[colname
+                                                    if self.case_sensitive
                                                     else colname.lower()]
                 except KeyError:
                     name, obj, type_ = \
 
             # populate primary keymap, looking for conflicts.
             if primary_keymap.setdefault(
-                                name if self.case_sensitive 
-                                else name.lower(), 
-                                rec) is not rec: 
+                                name if self.case_sensitive
+                                else name.lower(),
+                                rec) is not rec:
                 # place a record that doesn't have the "index" - this
                 # is interpreted later as an AmbiguousColumnError,
-                # but only when actually accessed.   Columns 
+                # but only when actually accessed.   Columns
                 # colliding by name is not a problem if those names
                 # aren't used; integer and ColumnElement access is always
                 # unambiguous.
-                primary_keymap[name 
-                                if self.case_sensitive 
+                primary_keymap[name
+                                if self.case_sensitive
                                 else name.lower()] = (processor, obj, None)
 
 
     def _set_keymap_synonym(self, name, origname):
         """Set a synonym for the given name.
 
-        Some dialects (SQLite at the moment) may use this to 
+        Some dialects (SQLite at the moment) may use this to
         adjust the column names that are significant within a
         row.
 
         """
-        rec = (processor, obj, i) = self._keymap[origname if 
-                                                self.case_sensitive 
+        rec = (processor, obj, i) = self._keymap[origname if
+                                                self.case_sensitive
                                                 else origname.lower()]
         if self._keymap.setdefault(name, rec) is not rec:
             self._keymap[name] = (processor, obj, None)
             result = map.get(key if self.case_sensitive else key.lower())
         # fallback for targeting a ColumnElement to a textual expression
         # this is a rare use case which only occurs when matching text()
-        # or colummn('name') constructs to ColumnElements, or after a 
+        # or colummn('name') constructs to ColumnElements, or after a
         # pickle/unpickle roundtrip
         elif isinstance(key, expression.ColumnElement):
             if key._label and (
-                            key._label 
-                            if self.case_sensitive 
+                            key._label
+                            if self.case_sensitive
                             else key._label.lower()) in map:
-                result = map[key._label 
-                            if self.case_sensitive 
+                result = map[key._label
+                            if self.case_sensitive
                             else key._label.lower()]
             elif hasattr(key, 'name') and (
-                                    key.name 
-                                    if self.case_sensitive 
+                                    key.name
+                                    if self.case_sensitive
                                     else key.name.lower()) in map:
                 # match is only on name.
-                result = map[key.name 
-                            if self.case_sensitive 
+                result = map[key.name
+                            if self.case_sensitive
                             else key.name.lower()]
-            # search extra hard to make sure this 
+            # search extra hard to make sure this
             # isn't a column/label name overlap.
             # this check isn't currently available if the row
             # was unpickled.
         if result is None:
             if raiseerr:
                 raise exc.NoSuchColumnError(
-                    "Could not locate column in row for column '%s'" % 
+                    "Could not locate column in row for column '%s'" %
                         expression._string_or_unprintable(key))
             else:
                 return None
       col3 = row[mytable.c.mycol] # access via Column object.
 
     ``ResultProxy`` also handles post-processing of result column
-    data using ``TypeEngine`` objects, which are referenced from 
+    data using ``TypeEngine`` objects, which are referenced from
     the originating SQL statement that produced this result set.
 
     """
         """Return the 'rowcount' for this result.
 
         The 'rowcount' reports the number of rows *matched*
-        by the WHERE criterion of an UPDATE or DELETE statement.  
-        
+        by the WHERE criterion of an UPDATE or DELETE statement.
+
         .. note::
-        
+
            Notes regarding :attr:`.ResultProxy.rowcount`:
-           
-           
+
+
            * This attribute returns the number of rows *matched*,
              which is not necessarily the same as the number of rows
              that were actually *modified* - an UPDATE statement, for example,
              may have no net change on a given row if the SET values
              given are the same as those present in the row already.
              Such a row would be matched but not modified.
-             On backends that feature both styles, such as MySQL, 
-             rowcount is configured by default to return the match 
+             On backends that feature both styles, such as MySQL,
+             rowcount is configured by default to return the match
              count in all cases.
 
            * :attr:`.ResultProxy.rowcount` is *only* useful in conjunction
              with an UPDATE or DELETE statement.  Contrary to what the Python
              DBAPI says, it does *not* return the
              number of rows available from the results of a SELECT statement
-             as DBAPIs cannot support this functionality when rows are 
+             as DBAPIs cannot support this functionality when rows are
              unbuffered.
-        
+
            * :attr:`.ResultProxy.rowcount` may not be fully implemented by
              all dialects.  In particular, most DBAPIs do not support an
              aggregate rowcount result from an executemany call.
-             The :meth:`.ResultProxy.supports_sane_rowcount` and 
+             The :meth:`.ResultProxy.supports_sane_rowcount` and
              :meth:`.ResultProxy.supports_sane_multi_rowcount` methods
              will report from the dialect if each usage is known to be
              supported.
-         
+
            * Statements that use RETURNING may not return a correct
              rowcount.
 
 
         This is a DBAPI specific method and is only functional
         for those backends which support it, for statements
-        where it is appropriate.  It's behavior is not 
+        where it is appropriate.  It's behavior is not
         consistent across backends.
 
-        Usage of this method is normally unnecessary when 
+        Usage of this method is normally unnecessary when
         using insert() expression constructs; the
         :attr:`~ResultProxy.inserted_primary_key` attribute provides a
         tuple of primary key values for a newly inserted row,
             return self._saved_cursor.lastrowid
         except Exception, e:
             self.connection._handle_dbapi_exception(
-                                 e, None, None, 
+                                 e, None, None,
                                  self._saved_cursor, self.context)
             raise
 
     def returns_rows(self):
         """True if this :class:`.ResultProxy` returns rows.
 
-        I.e. if it is legal to call the methods 
-        :meth:`~.ResultProxy.fetchone`, 
+        I.e. if it is legal to call the methods
+        :meth:`~.ResultProxy.fetchone`,
         :meth:`~.ResultProxy.fetchmany`
         :meth:`~.ResultProxy.fetchall`.
 
     @property
     def is_insert(self):
         """True if this :class:`.ResultProxy` is the result
-        of a executing an expression language compiled 
+        of a executing an expression language compiled
         :func:`.expression.insert` construct.
 
-        When True, this implies that the 
+        When True, this implies that the
         :attr:`inserted_primary_key` attribute is accessible,
         assuming the statement did not include
         a user defined "returning" construct.
     def inserted_primary_key(self):
         """Return the primary key for the row just inserted.
 
-        The return value is a list of scalar values 
+        The return value is a list of scalar values
         corresponding to the list of primary key columns
         in the target table.
 
-        This only applies to single row :func:`.insert` 
-        constructs which did not explicitly specify 
+        This only applies to single row :func:`.insert`
+        constructs which did not explicitly specify
         :meth:`.Insert.returning`.
 
         Note that primary key columns which specify a
-        server_default clause, 
+        server_default clause,
         or otherwise do not qualify as "autoincrement"
         columns (see the notes at :class:`.Column`), and were
         generated using the database-side default, will
-        appear in this list as ``None`` unless the backend 
+        appear in this list as ``None`` unless the backend
         supports "returning" and the insert statement executed
         with the "implicit returning" enabled.
 
         :class:`.ExecutionContext`.
 
         See :class:`.ExecutionContext` for details.
-        
+
         """
 
         return self.context.lastrow_has_defaults()
         Raises :class:`.InvalidRequestError` if the executed
         statement is not a compiled expression construct
         or is not an insert() or update() construct.
-        
+
         """
 
         if not self.context.compiled:
         Raises :class:`.InvalidRequestError` if the executed
         statement is not a compiled expression construct
         or is not an insert() or update() construct.
-        
+
         """
 
         if not self.context.compiled:
 
     def supports_sane_rowcount(self):
         """Return ``supports_sane_rowcount`` from the dialect.
-        
+
         See :attr:`.ResultProxy.rowcount` for background.
-        
+
         """
 
         return self.dialect.supports_sane_rowcount
         """Return ``supports_sane_multi_rowcount`` from the dialect.
 
         See :attr:`.ResultProxy.rowcount` for background.
-        
+
         """
 
         return self.dialect.supports_sane_multi_rowcount
             return l
         except Exception, e:
             self.connection._handle_dbapi_exception(
-                                    e, None, None, 
+                                    e, None, None,
                                     self.cursor, self.context)
             raise
 
             return l
         except Exception, e:
             self.connection._handle_dbapi_exception(
-                                    e, None, None, 
+                                    e, None, None,
                                     self.cursor, self.context)
             raise
 
                 return None
         except Exception, e:
             self.connection._handle_dbapi_exception(
-                                    e, None, None, 
+                                    e, None, None,
                                     self.cursor, self.context)
             raise
 
             row = self._fetchone_impl()
         except Exception, e:
             self.connection._handle_dbapi_exception(
-                                    e, None, None, 
+                                    e, None, None,
                                     self.cursor, self.context)
             raise
 

File lib/sqlalchemy/engine/default.py

         self.label_length = label_length
 
         if self.description_encoding == 'use_encoding':
-            self._description_decoder = processors.to_unicode_processor_factory(
+            self._description_decoder = \
+                            processors.to_unicode_processor_factory(
                                             encoding
                                     )
         elif self.description_encoding is not None:
-            self._description_decoder = processors.to_unicode_processor_factory(
+            self._description_decoder = \
+                            processors.to_unicode_processor_factory(
                                             self.description_encoding
                                     )
         self._encoder = codecs.getencoder(self.encoding)
         # end Py2K
         # Py3K
         #cast_to = str
+
         def check_unicode(formatstr, type_):
             cursor = connection.connection.cursor()
             try:
                             expression.select(
                             [expression.cast(
                                 expression.literal_column(
-                                        "'test %s returns'" % formatstr), type_)
+                                        "'test %s returns'" % formatstr),
+                                        type_)
                             ]).compile(dialect=self)
                         )
                     )
         """
         return sqltypes.adapt_type(typeobj, self.colspecs)
 
-    def reflecttable(self, connection, table, include_columns, exclude_columns=None):
+    def reflecttable(self, connection, table, include_columns,
+                    exclude_columns=None):
         insp = reflection.Inspector.from_engine(connection)
         return insp.reflecttable(table, include_columns, exclude_columns)
 
         return self
 
     @classmethod
-    def _init_compiled(cls, dialect, connection, dbapi_connection, compiled, parameters):
+    def _init_compiled(cls, dialect, connection, dbapi_connection,
+                    compiled, parameters):
         """Initialize execution context for a Compiled construct."""
 
         self = cls.__new__(cls)
 
         self.unicode_statement = unicode(compiled)
         if not dialect.supports_unicode_statements:
-            self.statement = self.unicode_statement.encode(self.dialect.encoding)
+            self.statement = self.unicode_statement.encode(
+                                        self.dialect.encoding)
         else:
             self.statement = self.unicode_statement
 
         else:
             self.compiled_parameters = \
                         [compiled.construct_params(m, _group_number=grp) for
-                                        grp,m in enumerate(parameters)]
+                                        grp, m in enumerate(parameters)]
 
             self.executemany = len(parameters) > 1
 
                             param[dialect._encoder(key)[0]] = \
                                         processors[key](compiled_params[key])
                         else:
-                            param[dialect._encoder(key)[0]] = compiled_params[key]
+                            param[dialect._encoder(key)[0]] = \
+                                    compiled_params[key]
                 else:
                     for key in compiled_params:
                         if key in processors:
         return self
 
     @classmethod
-    def _init_statement(cls, dialect, connection, dbapi_connection, statement, parameters):
+    def _init_statement(cls, dialect, connection, dbapi_connection,
+                                                    statement, parameters):
         """Initialize execution context for a string SQL statement."""
 
         self = cls.__new__(cls)
             if dialect.supports_unicode_statements:
                 self.parameters = parameters
             else:
-                self.parameters= [
+                self.parameters = [
                             dict((dialect._encoder(k)[0], d[k]) for k in d)
                             for d in parameters
                         ] or [{}]
 
         self.executemany = len(parameters) > 1
 
-        if not dialect.supports_unicode_statements and isinstance(statement, unicode):
+        if not dialect.supports_unicode_statements and \
+            isinstance(statement, unicode):
             self.unicode_statement = statement
             self.statement = dialect._encoder(statement)[0]
         else:
             autoinc_col = table._autoincrement_column
             if autoinc_col is not None:
                 # apply type post processors to the lastrowid
-                proc = autoinc_col.type._cached_result_processor(self.dialect, None)
+                proc = autoinc_col.type._cached_result_processor(
+                                        self.dialect, None)
                 if proc is not None:
                     lastrowid = proc(lastrowid)
 
             inputsizes = []
             for key in self.compiled.positiontup:
                 typeengine = types[key]
-                dbtype = typeengine.dialect_impl(self.dialect).get_dbapi_type(self.dialect.dbapi)
-                if dbtype is not None and (not exclude_types or dbtype not in exclude_types):
+                dbtype = typeengine.dialect_impl(self.dialect).\
+                                    get_dbapi_type(self.dialect.dbapi)
+                if dbtype is not None and \
+                    (not exclude_types or dbtype not in exclude_types):
                     inputsizes.append(dbtype)
             try:
                 self.cursor.setinputsizes(*inputsizes)
             except Exception, e:
-                self.root_connection._handle_dbapi_exception(e, None, None, None, self)
+                self.root_connection._handle_dbapi_exception(
+                                e, None, None, None, self)
                 raise
         else:
             inputsizes = {}
             for key in self.compiled.bind_names.values():
                 typeengine = types[key]
-                dbtype = typeengine.dialect_impl(self.dialect).get_dbapi_type(self.dialect.dbapi)
-                if dbtype is not None and (not exclude_types or dbtype not in exclude_types):
+                dbtype = typeengine.dialect_impl(self.dialect).\
+                                get_dbapi_type(self.dialect.dbapi)
+                if dbtype is not None and \
+                        (not exclude_types or dbtype not in exclude_types):
                     if translate:
                         key = translate.get(key, key)
                     inputsizes[self.dialect._encoder(key)[0]] = dbtype
             try:
                 self.cursor.setinputsizes(**inputsizes)
             except Exception, e:
-                self.root_connection._handle_dbapi_exception(e, None, None, None, self)
+                self.root_connection._handle_dbapi_exception(
+                                e, None, None, None, self)
                 raise
 
     def _exec_default(self, default, type_):

File lib/sqlalchemy/events.py

     that is, :class:`.SchemaItem` and :class:`.SchemaEvent`
     subclasses, including :class:`.MetaData`, :class:`.Table`,
     :class:`.Column`.
-    
+
     :class:`.MetaData` and :class:`.Table` support events
     specifically regarding when CREATE and DROP
-    DDL is emitted to the database.  
-    
+    DDL is emitted to the database.
+
     Attachment events are also provided to customize
     behavior whenever a child schema element is associated
     with a parent, such as, when a :class:`.Column` is associated
         some_table = Table('some_table', m, Column('data', Integer))
 
         def after_create(target, connection, **kw):
-            connection.execute("ALTER TABLE %s SET name=foo_%s" % 
+            connection.execute("ALTER TABLE %s SET name=foo_%s" %
                                     (target.name, target.name))
 
         event.listen(some_table, "after_create", after_create)
 
-    DDL events integrate closely with the 
+    DDL events integrate closely with the
     :class:`.DDL` class and the :class:`.DDLElement` hierarchy
-    of DDL clause constructs, which are themselves appropriate 
+    of DDL clause constructs, which are themselves appropriate
     as listener callables::
 
         from sqlalchemy import DDL
          to the event.  The contents of this dictionary
          may vary across releases, and include the
          list of tables being generated for a metadata-level
-         event, the checkfirst flag, and other 
+         event, the checkfirst flag, and other
          elements used by internal events.
 
         """
          to the event.  The contents of this dictionary
          may vary across releases, and include the
          list of tables being generated for a metadata-level
-         event, the checkfirst flag, and other 
+         event, the checkfirst flag, and other
          elements used by internal events.
 
         """
          to the event.  The contents of this dictionary
          may vary across releases, and include the
          list of tables being generated for a metadata-level
-         event, the checkfirst flag, and other 
+         event, the checkfirst flag, and other
          elements used by internal events.
 
         """
          to the event.  The contents of this dictionary
          may vary across releases, and include the
          list of tables being generated for a metadata-level
-         event, the checkfirst flag, and other 
+         event, the checkfirst flag, and other
          elements used by internal events.
 
         """
 
     def before_parent_attach(self, target, parent):
-        """Called before a :class:`.SchemaItem` is associated with 
+        """Called before a :class:`.SchemaItem` is associated with
         a parent :class:`.SchemaItem`.
-        
+
         :param target: the target object
         :param parent: the parent to which the target is being attached.
-        
+
         :func:`.event.listen` also accepts a modifier for this event:
-        
+
         :param propagate=False: When True, the listener function will
          be established for any copies made of the target object,
          i.e. those copies that are generated when
          :meth:`.Table.tometadata` is used.
-        
+
         """
 
     def after_parent_attach(self, target, parent):
-        """Called after a :class:`.SchemaItem` is associated with 
+        """Called after a :class:`.SchemaItem` is associated with
         a parent :class:`.SchemaItem`.
 
         :param target: the target object
         :param parent: the parent to which the target is being attached.
-        
+
         :func:`.event.listen` also accepts a modifier for this event:
-        
+
         :param propagate=False: When True, the listener function will
          be established for any copies made of the target object,
          i.e. those copies that are generated when
          :meth:`.Table.tometadata` is used.
-        
+
         """
 
     def column_reflect(self, inspector, table, column_info):
         """Called for each unit of 'column info' retrieved when
-        a :class:`.Table` is being reflected.   
-        
+        a :class:`.Table` is being reflected.
+
         The dictionary of column information as returned by the
         dialect is passed, and can be modified.  The dictionary
-        is that returned in each element of the list returned 
-        by :meth:`.reflection.Inspector.get_columns`. 
-        
+        is that returned in each element of the list returned
+        by :meth:`.reflection.Inspector.get_columns`.
+
         The event is called before any action is taken against
         this dictionary, and the contents can be modified.
         The :class:`.Column` specific arguments ``info``, ``key``,
         will be passed to the constructor of :class:`.Column`.
 
         Note that this event is only meaningful if either
-        associated with the :class:`.Table` class across the 
+        associated with the :class:`.Table` class across the
         board, e.g.::
-        
+
             from sqlalchemy.schema import Table
             from sqlalchemy import event
 
             def listen_for_reflect(inspector, table, column_info):
                 "receive a column_reflect event"
                 # ...
-                
+
             event.listen(
-                    Table, 
-                    'column_reflect', 
+                    Table,
+                    'column_reflect',
                     listen_for_reflect)
-                
+
         ...or with a specific :class:`.Table` instance using
         the ``listeners`` argument::
-        
+
             def listen_for_reflect(inspector, table, column_info):
                 "receive a column_reflect event"
                 # ...
-                
+
             t = Table(
-                'sometable', 
+                'sometable',
                 autoload=True,
                 listeners=[
                     ('column_reflect', listen_for_reflect)
                 ])
-        
+
         This because the reflection process initiated by ``autoload=True``
         completes within the scope of the constructor for :class:`.Table`.
-        
+
         """
 
 class SchemaEventTarget(object):
-    """Base class for elements that are the targets of :class:`.DDLEvents` events.
-    
+    """Base class for elements that are the targets of :class:`.DDLEvents`
+    events.
+
     This includes :class:`.SchemaItem` as well as :class:`.SchemaType`.
-    
+
     """
     dispatch = event.dispatcher(DDLEvents)
 
         raise NotImplementedError()
 
     def _set_parent_with_dispatch(self, parent):
-        self.dispatch.before_parent_attach(self, parent) 
-        self._set_parent(parent) 
-        self.dispatch.after_parent_attach(self, parent) 
+        self.dispatch.before_parent_attach(self, parent)
+        self._set_parent(parent)
+        self.dispatch.after_parent_attach(self, parent)
 
 class PoolEvents(event.Events):
     """Available events for :class:`.Pool`.
 
         event.listen(Pool, 'checkout', my_on_checkout)
 
-    In addition to accepting the :class:`.Pool` class and :class:`.Pool` instances,
-    :class:`.PoolEvents` also accepts :class:`.Engine` objects and
-    the :class:`.Engine` class as targets, which will be resolved
-    to the ``.pool`` attribute of the given engine or the :class:`.Pool`
-    class::
+    In addition to accepting the :class:`.Pool` class and
+    :class:`.Pool` instances, :class:`.PoolEvents` also accepts
+    :class:`.Engine` objects and the :class:`.Engine` class as
+    targets, which will be resolved to the ``.pool`` attribute of the
+    given engine or the :class:`.Pool` class::
 
         engine = create_engine("postgresql://scott:tiger@localhost/test")
 
         """
 
 class ConnectionEvents(event.Events):
-    """Available events for :class:`.Connection`.
+    """Available events for :class:`.Connectable`, which includes
+    :class:`.Connection` and :class:`.Engine`.
 
-    The methods here define the name of an event as well as the names of members that are passed to listener functions.
+    The methods here define the name of an event as well as the names of
+    members that are passed to listener functions.
 
-    e.g.::
+    An event listener can be associated with any :class:`.Connectable`,
+    such as an :class:`.Engine`, e.g.::
 
         from sqlalchemy import event, create_engine
 
         engine = create_engine('postgresql://scott:tiger@localhost/test')
         event.listen(engine, "before_execute", before_execute)
 
-    Some events allow modifiers to the listen() function.
+    Some events allow modifiers to the :func:`.event.listen` function.
 
-    :param retval=False: Applies to the :meth:`.before_execute` and 
+    :param retval=False: Applies to the :meth:`.before_execute` and
       :meth:`.before_cursor_execute` events only.  When True, the
       user-defined event function must have a return value, which
-      is a tuple of parameters that replace the given statement 
+      is a tuple of parameters that replace the given statement
       and parameters.  See those methods for a description of
       specific return arguments.
 
+    .. versionchanged:: 0.8 :class:`.ConnectionEvents` can now be associated
+       with any :class:`.Connectable` including :class:`.Connection`,
+       in addition to the existing support for :class:`.Engine`.
+
     """
 
     @classmethod
         if not retval:
             if identifier == 'before_execute':
                 orig_fn = fn
-                def wrap(conn, clauseelement, multiparams, params):
+
+                def wrap_before_execute(conn, clauseelement,
+                                                multiparams, params):
                     orig_fn(conn, clauseelement, multiparams, params)
                     return clauseelement, multiparams, params