.listen() on Engine-level events fired from Connection does not work for in-progress Sessions if it's the first listener

Issue #2978 resolved
Alan Shreve created an issue

Here's the succinct code explanation:

sess = Session()
sess.query(A).all()
sqlalchemy.event.listen(engine, "before_execute", before_execute_cb)
sess.query(A).all() # <--- USE THE SAME SESSION AS BEFORE

That second query does not trigger before_execute_cb. I expect it to trigger that. However, if you do this:

# Register dummy listener to force Connection's dispatch descriptor to join with Engine's
sqlalchemy.event.listen(engine, "before_execute", noop_fn)
sess = Session()
sess.query(A).all()
sqlalchemy.event.listen(engine, "before_execute", before_execute_cb)
sess.query(A).all() # <--- USE THE SAME SESSION AS BEFORE

This time, the second query does trigger before_execute_cb.

A full set of cases: https://gist.github.com/inconshreveable/9300571

This is a very subtle problem in the initialization of the dispatcher on Connection objects. The bug is here:

https://bitbucket.org/zzzeek/sqlalchemy/src/fa23570a338b53c813b6342fba0cacd4f5e61384/lib/sqlalchemy/engine/base.py?at=master#cl-66

I'm not well-versed enough in SQLAlchemy design to decide how to solve this, but my suggestion is probably:

Connection objects should always join with the Engine's dispatch object and when registering a new event listener on the Engine's dispatch object, it needs to iterate each open connection and set _has_events to True.

Comments (8)

  1. Mike Bayer repo owner

    this would be the patch. it adds six function calls to when we first acquire a new Connection object but does not add execute overhead.

    diff --git a/lib/sqlalchemy/engine/base.py b/lib/sqlalchemy/engine/base.py
    index 888a15f..411d41f 100644
    --- a/lib/sqlalchemy/engine/base.py
    +++ b/lib/sqlalchemy/engine/base.py
    @@ -65,7 +65,7 @@ class Connection(Connectable):
             self.__can_reconnect = True
             if _dispatch:
                 self.dispatch = _dispatch
    -        elif engine._has_events:
    +        else:
                 self.dispatch = self.dispatch._join(engine.dispatch)
             self._has_events = _has_events or (
                                     _has_events is None and engine._has_events)
    @@ -77,7 +77,7 @@ class Connection(Connectable):
             else:
                 self._execution_options = engine._execution_options
    
    -        if self._has_events:
    +        if self._has_events or self.engine._has_events:
                 self.dispatch.engine_connect(self, _branch)
    
         def _branch(self):
    @@ -204,7 +204,7 @@ class Connection(Connectable):
             """
             c = self._clone()
             c._execution_options = c._execution_options.union(opt)
    -        if self._has_events:
    +        if self._has_events or self.engine._has_events:
                 self.dispatch.set_connection_execution_options(c, opt)
             self.dialect.set_connection_execution_options(c, opt)
             return c
    @@ -482,7 +482,7 @@ class Connection(Connectable):
             if self._echo:
                 self.engine.logger.info("BEGIN (implicit)")
    
    -        if self._has_events:
    +        if self._has_events or self.engine._has_events:
                 self.dispatch.begin(self)
    
             try:
    @@ -493,7 +493,7 @@ class Connection(Connectable):
                 self._handle_dbapi_exception(e, None, None, None, None)
    
         def _rollback_impl(self):
    -        if self._has_events:
    +        if self._has_events or self.engine._has_events:
                 self.dispatch.rollback(self)
    
             if self._still_open_and_connection_is_valid:
    @@ -511,7 +511,7 @@ class Connection(Connectable):
                 self.__transaction = None
    
         def _commit_impl(self, autocommit=False):
    -        if self._has_events:
    +        if self._has_events or self.engine._has_events:
                 self.dispatch.commit(self)
    
             if self._echo:
    @@ -526,7 +526,7 @@ class Connection(Connectable):
                 self.__transaction = None
    
         def _savepoint_impl(self, name=None):
    -        if self._has_events:
    +        if self._has_events or self.engine._has_events:
                 self.dispatch.savepoint(self, name)
    
             if name is None:
    @@ -537,7 +537,7 @@ class Connection(Connectable):
                 return name
    
         def _rollback_to_savepoint_impl(self, name, context):
    -        if self._has_events:
    +        if self._has_events or self.engine._has_events:
                 self.dispatch.rollback_savepoint(self, name, context)
    
             if self._still_open_and_connection_is_valid:
    @@ -545,7 +545,7 @@ class Connection(Connectable):
             self.__transaction = context
    
         def _release_savepoint_impl(self, name, context):
    -        if self._has_events:
    +        if self._has_events or self.engine._has_events:
                 self.dispatch.release_savepoint(self, name, context)
    
             if self._still_open_and_connection_is_valid:
    @@ -555,7 +555,7 @@ class Connection(Connectable):
         def _begin_twophase_impl(self, transaction):
             if self._echo:
                 self.engine.logger.info("BEGIN TWOPHASE (implicit)")
    -        if self._has_events:
    +        if self._has_events or self.engine._has_events:
                 self.dispatch.begin_twophase(self, transaction.xid)
    
             if self._still_open_and_connection_is_valid:
    @@ -565,7 +565,7 @@ class Connection(Connectable):
                     self.connection._reset_agent = transaction
    
         def _prepare_twophase_impl(self, xid):
    -        if self._has_events:
    +        if self._has_events or self.engine._has_events:
                 self.dispatch.prepare_twophase(self, xid)
    
             if self._still_open_and_connection_is_valid:
    @@ -573,7 +573,7 @@ class Connection(Connectable):
                 self.engine.dialect.do_prepare_twophase(self, xid)
    
         def _rollback_twophase_impl(self, xid, is_prepared):
    -        if self._has_events:
    +        if self._has_events or self.engine._has_events:
                 self.dispatch.rollback_twophase(self, xid, is_prepared)
    
             if self._still_open_and_connection_is_valid:
    @@ -588,7 +588,7 @@ class Connection(Connectable):
                 self.__transaction = None
    
         def _commit_twophase_impl(self, xid, is_prepared):
    -        if self._has_events:
    +        if self._has_events or self.engine._has_events:
                 self.dispatch.commit_twophase(self, xid, is_prepared)
    
             if self._still_open_and_connection_is_valid:
    @@ -725,7 +725,7 @@ class Connection(Connectable):
         def _execute_default(self, default, multiparams, params):
             """Execute a schema.ColumnDefault object."""
    
    -        if self._has_events:
    +        if self._has_events or self.engine._has_events:
                 for fn in self.dispatch.before_execute:
                     default, multiparams, params = \
                         fn(self, default, multiparams, params)
    @@ -746,7 +746,7 @@ class Connection(Connectable):
             if self.should_close_with_result:
                 self.close()
    
    -        if self._has_events:
    +        if self._has_events or self.engine._has_events:
                 self.dispatch.after_execute(self,
                     default, multiparams, params, ret)
    
    @@ -755,7 +755,7 @@ class Connection(Connectable):
         def _execute_ddl(self, ddl, multiparams, params):
             """Execute a schema.DDL object."""
    
    -        if self._has_events:
    +        if self._has_events or self.engine._has_events:
                 for fn in self.dispatch.before_execute:
                     ddl, multiparams, params = \
                         fn(self, ddl, multiparams, params)
    @@ -770,7 +770,7 @@ class Connection(Connectable):
                 None,
                 compiled
             )
    -        if self._has_events:
    +        if self._has_events or self.engine._has_events:
                 self.dispatch.after_execute(self,
                     ddl, multiparams, params, ret)
             return ret
    @@ -778,7 +778,7 @@ class Connection(Connectable):
         def _execute_clauseelement(self, elem, multiparams, params):
             """Execute a sql.ClauseElement object."""
    
    -        if self._has_events:
    +        if self._has_events or self.engine._has_events:
                 for fn in self.dispatch.before_execute:
                     elem, multiparams, params = \
                         fn(self, elem, multiparams, params)
    @@ -813,7 +813,7 @@ class Connection(Connectable):
                 distilled_params,
                 compiled_sql, distilled_params
             )
    -        if self._has_events:
    +        if self._has_events or self.engine._has_events:
                 self.dispatch.after_execute(self,
                     elem, multiparams, params, ret)
             return ret
    @@ -821,7 +821,7 @@ class Connection(Connectable):
         def _execute_compiled(self, compiled, multiparams, params):
             """Execute a sql.Compiled object."""
    
    -        if self._has_events:
    +        if self._has_events or self.engine._has_events:
                 for fn in self.dispatch.before_execute:
                     compiled, multiparams, params = \
                         fn(self, compiled, multiparams, params)
    @@ -835,7 +835,7 @@ class Connection(Connectable):
                 parameters,
                 compiled, parameters
             )
    -        if self._has_events:
    +        if self._has_events or self.engine._has_events:
                 self.dispatch.after_execute(self,
                     compiled, multiparams, params, ret)
             return ret
    @@ -843,7 +843,7 @@ class Connection(Connectable):
         def _execute_text(self, statement, multiparams, params):
             """Execute a string SQL statement."""
    
    -        if self._has_events:
    +        if self._has_events or self.engine._has_events:
                 for fn in self.dispatch.before_execute:
                     statement, multiparams, params = \
                         fn(self, statement, multiparams, params)
    @@ -857,7 +857,7 @@ class Connection(Connectable):
                 parameters,
                 statement, parameters
             )
    -        if self._has_events:
    +        if self._has_events or self.engine._has_events:
                 self.dispatch.after_execute(self,
                     statement, multiparams, params, ret)
             return ret
    @@ -890,7 +890,7 @@ class Connection(Connectable):
             if not context.executemany:
                 parameters = parameters[0]
    
    -        if self._has_events:
    +        if self._has_events or self.engine._has_events:
                 for fn in self.dispatch.before_cursor_execute:
                     statement, parameters = \
                                 fn(self, cursor, statement, parameters,
    @@ -926,7 +926,7 @@ class Connection(Connectable):
                                     cursor,
                                     context)
    
    -        if self._has_events:
    +        if self._has_events or self.engine._has_events:
                 self.dispatch.after_cursor_execute(self, cursor,
                                                     statement,
                                                     parameters,
    @@ -981,7 +981,7 @@ class Connection(Connectable):
             terminates at _execute_context().
    
             """
    -        if self._has_events:
    +        if self._has_events or self.engine._has_events:
                 for fn in self.dispatch.before_cursor_execute:
                     statement, parameters = \
                                 fn(self, cursor, statement, parameters,
    @@ -1051,7 +1051,7 @@ class Connection(Connectable):
                     (statement is not None and context is None)
    
                 if should_wrap and context:
    -                if self._has_events:
    +                if self._has_events or self.engine._has_events:
                         self.dispatch.dbapi_error(self,
                                                         cursor,
                                                         statement,
    
  2. Alan Shreve reporter

    Ahh. I expected it to be for performance reasons, but I just missed the documentation in the green box. Sorry!

    That seems like an acceptable tradeoff to me. The only reason I noticed is because an artificial test case caught it, not because of any real-world code that depended on this behavior. Basically, the reason I hit this is I'm about to re-release an old library for profiling sqlalchemy applications that could be used with a pattern like:

    profiler = sqltap.start(engine)
    
    # do stuff
    sess.query(Customer).get(id)
    
    # look for performance problems
    with profiler:
        # do more performance intensive stuff
        sess.query(Account).join().filter().etc()
    
    # more stuff that you don't want to profile
    

    Is there a reason Engine couldn't iterate over checked-out connections at registration time to set their _has_events to True? It seems like this would defer all of the overhead time to registration time. I can see potential threading issues, but I think storing/loading _has_events is atomic (at least on CPython).

  3. Mike Bayer repo owner

    Is there a reason Engine couldn't iterate over checked-out connections at registration time to set their _has_events to True?

    only because it starts getting out of hand (e.g. lots of mechanics to suit an almost-never kind of use case....) ...I think I'll likely just commit the patch I have above, it is simple enough and the overhead of chaining the listeners is not too different from creating a weakref when a Connection is checked out, which is what we'd need to iterate over them like that (e.g. a WeakSet).

  4. Mike Bayer repo owner
    • An event listener can now be associated with a :class:.Engine, after one or more :class:.Connection objects have been created (such as by an orm :class:.Session or via explicit connect) and the listener will pick up events from those connections. Previously, performance concerns pushed the event transfer from :class:.Engine to :class:.Connection at init-time only, but we've inlined a bunch of conditional checks to make this possible without any additional function calls. fixes #2978

    → <<cset b00e15b50f83>>

  5. Log in to comment