.listen() on Engine-level events fired from Connection does not work for in-progress Sessions if it's the first listener
Here's the succinct code explanation:
sess = Session()
sess.query(A).all()
sqlalchemy.event.listen(engine, "before_execute", before_execute_cb)
sess.query(A).all() # <--- USE THE SAME SESSION AS BEFORE
That second query does not trigger before_execute_cb. I expect it to trigger that. However, if you do this:
# Register dummy listener to force Connection's dispatch descriptor to join with Engine's
sqlalchemy.event.listen(engine, "before_execute", noop_fn)
sess = Session()
sess.query(A).all()
sqlalchemy.event.listen(engine, "before_execute", before_execute_cb)
sess.query(A).all() # <--- USE THE SAME SESSION AS BEFORE
This time, the second query does trigger before_execute_cb.
A full set of cases: https://gist.github.com/inconshreveable/9300571
This is a very subtle problem in the initialization of the dispatcher on Connection objects. The bug is here:
I'm not well-versed enough in SQLAlchemy design to decide how to solve this, but my suggestion is probably:
Connection objects should always join with the Engine's dispatch object and when registering a new event listener on the Engine's dispatch object, it needs to iterate each open connection and set _has_events to True.
Comments (8)
-
repo owner -
repo owner - changed status to on hold
this is a documented limitation.
-
repo owner this would be the patch. it adds six function calls to when we first acquire a new Connection object but does not add execute overhead.
diff --git a/lib/sqlalchemy/engine/base.py b/lib/sqlalchemy/engine/base.py index 888a15f..411d41f 100644 --- a/lib/sqlalchemy/engine/base.py +++ b/lib/sqlalchemy/engine/base.py @@ -65,7 +65,7 @@ class Connection(Connectable): self.__can_reconnect = True if _dispatch: self.dispatch = _dispatch - elif engine._has_events: + else: self.dispatch = self.dispatch._join(engine.dispatch) self._has_events = _has_events or ( _has_events is None and engine._has_events) @@ -77,7 +77,7 @@ class Connection(Connectable): else: self._execution_options = engine._execution_options - if self._has_events: + if self._has_events or self.engine._has_events: self.dispatch.engine_connect(self, _branch) def _branch(self): @@ -204,7 +204,7 @@ class Connection(Connectable): """ c = self._clone() c._execution_options = c._execution_options.union(opt) - if self._has_events: + if self._has_events or self.engine._has_events: self.dispatch.set_connection_execution_options(c, opt) self.dialect.set_connection_execution_options(c, opt) return c @@ -482,7 +482,7 @@ class Connection(Connectable): if self._echo: self.engine.logger.info("BEGIN (implicit)") - if self._has_events: + if self._has_events or self.engine._has_events: self.dispatch.begin(self) try: @@ -493,7 +493,7 @@ class Connection(Connectable): self._handle_dbapi_exception(e, None, None, None, None) def _rollback_impl(self): - if self._has_events: + if self._has_events or self.engine._has_events: self.dispatch.rollback(self) if self._still_open_and_connection_is_valid: @@ -511,7 +511,7 @@ class Connection(Connectable): self.__transaction = None def _commit_impl(self, autocommit=False): - if self._has_events: + if self._has_events or self.engine._has_events: self.dispatch.commit(self) if self._echo: @@ -526,7 +526,7 @@ class Connection(Connectable): self.__transaction = None def _savepoint_impl(self, name=None): - if self._has_events: + if self._has_events or self.engine._has_events: self.dispatch.savepoint(self, name) if name is None: @@ -537,7 +537,7 @@ class Connection(Connectable): return name def _rollback_to_savepoint_impl(self, name, context): - if self._has_events: + if self._has_events or self.engine._has_events: self.dispatch.rollback_savepoint(self, name, context) if self._still_open_and_connection_is_valid: @@ -545,7 +545,7 @@ class Connection(Connectable): self.__transaction = context def _release_savepoint_impl(self, name, context): - if self._has_events: + if self._has_events or self.engine._has_events: self.dispatch.release_savepoint(self, name, context) if self._still_open_and_connection_is_valid: @@ -555,7 +555,7 @@ class Connection(Connectable): def _begin_twophase_impl(self, transaction): if self._echo: self.engine.logger.info("BEGIN TWOPHASE (implicit)") - if self._has_events: + if self._has_events or self.engine._has_events: self.dispatch.begin_twophase(self, transaction.xid) if self._still_open_and_connection_is_valid: @@ -565,7 +565,7 @@ class Connection(Connectable): self.connection._reset_agent = transaction def _prepare_twophase_impl(self, xid): - if self._has_events: + if self._has_events or self.engine._has_events: self.dispatch.prepare_twophase(self, xid) if self._still_open_and_connection_is_valid: @@ -573,7 +573,7 @@ class Connection(Connectable): self.engine.dialect.do_prepare_twophase(self, xid) def _rollback_twophase_impl(self, xid, is_prepared): - if self._has_events: + if self._has_events or self.engine._has_events: self.dispatch.rollback_twophase(self, xid, is_prepared) if self._still_open_and_connection_is_valid: @@ -588,7 +588,7 @@ class Connection(Connectable): self.__transaction = None def _commit_twophase_impl(self, xid, is_prepared): - if self._has_events: + if self._has_events or self.engine._has_events: self.dispatch.commit_twophase(self, xid, is_prepared) if self._still_open_and_connection_is_valid: @@ -725,7 +725,7 @@ class Connection(Connectable): def _execute_default(self, default, multiparams, params): """Execute a schema.ColumnDefault object.""" - if self._has_events: + if self._has_events or self.engine._has_events: for fn in self.dispatch.before_execute: default, multiparams, params = \ fn(self, default, multiparams, params) @@ -746,7 +746,7 @@ class Connection(Connectable): if self.should_close_with_result: self.close() - if self._has_events: + if self._has_events or self.engine._has_events: self.dispatch.after_execute(self, default, multiparams, params, ret) @@ -755,7 +755,7 @@ class Connection(Connectable): def _execute_ddl(self, ddl, multiparams, params): """Execute a schema.DDL object.""" - if self._has_events: + if self._has_events or self.engine._has_events: for fn in self.dispatch.before_execute: ddl, multiparams, params = \ fn(self, ddl, multiparams, params) @@ -770,7 +770,7 @@ class Connection(Connectable): None, compiled ) - if self._has_events: + if self._has_events or self.engine._has_events: self.dispatch.after_execute(self, ddl, multiparams, params, ret) return ret @@ -778,7 +778,7 @@ class Connection(Connectable): def _execute_clauseelement(self, elem, multiparams, params): """Execute a sql.ClauseElement object.""" - if self._has_events: + if self._has_events or self.engine._has_events: for fn in self.dispatch.before_execute: elem, multiparams, params = \ fn(self, elem, multiparams, params) @@ -813,7 +813,7 @@ class Connection(Connectable): distilled_params, compiled_sql, distilled_params ) - if self._has_events: + if self._has_events or self.engine._has_events: self.dispatch.after_execute(self, elem, multiparams, params, ret) return ret @@ -821,7 +821,7 @@ class Connection(Connectable): def _execute_compiled(self, compiled, multiparams, params): """Execute a sql.Compiled object.""" - if self._has_events: + if self._has_events or self.engine._has_events: for fn in self.dispatch.before_execute: compiled, multiparams, params = \ fn(self, compiled, multiparams, params) @@ -835,7 +835,7 @@ class Connection(Connectable): parameters, compiled, parameters ) - if self._has_events: + if self._has_events or self.engine._has_events: self.dispatch.after_execute(self, compiled, multiparams, params, ret) return ret @@ -843,7 +843,7 @@ class Connection(Connectable): def _execute_text(self, statement, multiparams, params): """Execute a string SQL statement.""" - if self._has_events: + if self._has_events or self.engine._has_events: for fn in self.dispatch.before_execute: statement, multiparams, params = \ fn(self, statement, multiparams, params) @@ -857,7 +857,7 @@ class Connection(Connectable): parameters, statement, parameters ) - if self._has_events: + if self._has_events or self.engine._has_events: self.dispatch.after_execute(self, statement, multiparams, params, ret) return ret @@ -890,7 +890,7 @@ class Connection(Connectable): if not context.executemany: parameters = parameters[0] - if self._has_events: + if self._has_events or self.engine._has_events: for fn in self.dispatch.before_cursor_execute: statement, parameters = \ fn(self, cursor, statement, parameters, @@ -926,7 +926,7 @@ class Connection(Connectable): cursor, context) - if self._has_events: + if self._has_events or self.engine._has_events: self.dispatch.after_cursor_execute(self, cursor, statement, parameters, @@ -981,7 +981,7 @@ class Connection(Connectable): terminates at _execute_context(). """ - if self._has_events: + if self._has_events or self.engine._has_events: for fn in self.dispatch.before_cursor_execute: statement, parameters = \ fn(self, cursor, statement, parameters, @@ -1051,7 +1051,7 @@ class Connection(Connectable): (statement is not None and context is None) if should_wrap and context: - if self._has_events: + if self._has_events or self.engine._has_events: self.dispatch.dbapi_error(self, cursor, statement,
-
repo owner - changed status to open
this bug report is just going to keep happening
-
reporter Ahh. I expected it to be for performance reasons, but I just missed the documentation in the green box. Sorry!
That seems like an acceptable tradeoff to me. The only reason I noticed is because an artificial test case caught it, not because of any real-world code that depended on this behavior. Basically, the reason I hit this is I'm about to re-release an old library for profiling sqlalchemy applications that could be used with a pattern like:
profiler = sqltap.start(engine) # do stuff sess.query(Customer).get(id) # look for performance problems with profiler: # do more performance intensive stuff sess.query(Account).join().filter().etc() # more stuff that you don't want to profile
Is there a reason Engine couldn't iterate over checked-out connections at registration time to set their _has_events to True? It seems like this would defer all of the overhead time to registration time. I can see potential threading issues, but I think storing/loading _has_events is atomic (at least on CPython).
-
repo owner Is there a reason Engine couldn't iterate over checked-out connections at registration time to set their _has_events to True?
only because it starts getting out of hand (e.g. lots of mechanics to suit an almost-never kind of use case....) ...I think I'll likely just commit the patch I have above, it is simple enough and the overhead of chaining the listeners is not too different from creating a weakref when a Connection is checked out, which is what we'd need to iterate over them like that (e.g. a WeakSet).
-
reporter That all makes sense. Thanks for your help!
-
repo owner - changed status to resolved
- An event listener can now be associated with a :class:
.Engine
, after one or more :class:.Connection
objects have been created (such as by an orm :class:.Session
or via explicit connect) and the listener will pick up events from those connections. Previously, performance concerns pushed the event transfer from :class:.Engine
to :class:.Connection
at init-time only, but we've inlined a bunch of conditional checks to make this possible without any additional function calls. fixes#2978
→ <<cset b00e15b50f83>>
- Log in to comment
this behavior is known and is documented in the green box you see here: http://docs.sqlalchemy.org/en/rel_0_9/core/events.html#sql-execution-and-connection-events. it is for performance reasons.
is there some compelling reason one would need to attach event listeners to engines after SQL and such is already well underway? if you want to listen to a specific session, you can listen to Session.connection().