Clone wiki

sqlalchemy / UsageRecipes / SessionModifiedSQL

AlterSQLPerSession

This recipe allows modification of SQL keyed to a particular Session. Examples include application of SQL comments or alternate table names within statements at execution time.

The ORM caches the compiled form of many of the SQL statements used during a flush, so while many recipes that provide for on-the-fly alteration of SQL use the compiler extension, in this case we need to ensure that we always are given access to the SQL string on every execution, so we use the before_cursor_execute event to intercept the SQL right before it's passed to the DBAPI cursor.

The recipe illustrates the usage of both ORM and Core events, as well as usage of the Connection.info dictionary as well as the new Session.info dictionary added in 0.9 (a workaround for prior versions is illustrated).
Other use cases that involve coordination between Session and Connection might build off of similar features.

from contextlib import contextmanager
from sqlalchemy import event
from sqlalchemy.orm import Session
from sqlalchemy.engine import Engine

from sqlalchemy import __version__

if __version__ < "0.9.0":
    """Make use of the .info dictionary of Session.

    This is a new feature as of 0.9 which works similarly to the
    .info dictionary of Connection and MapperProperty.

    Here, we apply a simple .info dictionary to the Session class
    for versions of SQLAlchemy prior to 0.9.

    """
    from sqlalchemy import util

    @util.memoized_property
    def info(session):
        return {}

    Session.info = info

@event.listens_for(Session, "after_begin")
def _connection_for_session(session, trans, connection):
    """Share the 'info' dictionary of Session with Connection
    objects.

    This occurs as new Connection objects are associated with the
    Session.   The .info dictionary on Connection is local to the
    DBAPI connection.

    """
    connection.info['session_info'] = session.info

@contextmanager
def session_comment(session, comment):
    """Apply the given comment to all SQL emitted by the given Session.
    """
    session.info["comment"] = comment
    yield
    del session.info["comment"]

@contextmanager
def session_shardid(session, shardid):
    """Apply the "shard" id to all SQL emitted by the given Session.
    """
    session.info["shardid"] = shardid
    yield
    del session.info["shardid"]


@event.listens_for(Engine, "before_cursor_execute", retval=True)
def _apply_comment(connection, cursor, statement, parameters,
                                        context, executemany):
    """Apply comments to statements.

    We intercept all statement executions at the cursor level, where the
    before_cursor_execute() event gives us the final string SQL statement
    in all cases and also gives us a change to modify the string.

    """
    session_info = connection.info.get('session_info', {})
    if "comment" in session_info:
        statement = statement + " -- %s" % session_info["comment"]
    return statement, parameters

@event.listens_for(Engine, "before_cursor_execute", retval=True)
def _apply_shard_id(connection, cursor, statement, parameters,
                                        context, executemany):
    """Apply a "shard id" to statements.

    Similar to the comment listener, we alter the statement on the
    fly replacing occurrences of "_shard_" with the current "shard id".

    """
    session_info = connection.info.get('session_info', {})
    if "shardid" in session_info:
        statement = statement.replace("_shardid_", session_info["shardid"])
    return statement, parameters


if __name__ == '__main__':
    from sqlalchemy import Column, Integer, String, create_engine
    from sqlalchemy.orm import Session
    from sqlalchemy.ext.declarative import declarative_base
    import logging
    logging.basicConfig(format="%(message)s")
    logging.getLogger("sqlalchemy.engine").setLevel(logging.INFO)

    Base = declarative_base()

    class A(Base):
        __tablename__ = "a"

        id = Column(Integer, primary_key=True)
        data = Column(String)

    class B(Base):
        __tablename__ = "b__shardid_"

        # or, apply "_shardid_" to the "schema", so you get
        # "select * from myshard.b"
        # __tablename__ = "b"
        # __table_args__ = {"schema": "_shardid_"}

        id = Column(Integer, primary_key=True)
        data = Column(String)

    e = create_engine("sqlite://")

    Base.metadata.create_all(e, tables=[A.__table__])

    s = Session(e)

    with session_shardid(s, "s1"):
        Base.metadata.create_all(s.connection(), tables=[B.__table__])

    with session_shardid(s, "s2"):
        Base.metadata.create_all(s.connection(), tables=[B.__table__])

    s.add(A(data='d1'))
    s.commit()

    with session_comment(s, "comment one"):
        s.add(A(data='d2'))
        d1 = s.query(A).filter_by(data='d1').one()
        d1.data = 'd1modified'
        s.commit()

    with session_shardid(s, "s1"):
        s.add(B(data='d2'))
        s.commit()

    with session_comment(s, "using shard s1"):
        with session_shardid(s, "s1"):
            s.query(A).join(B, A.data == B.data).all()

Output:

PRAGMA table_info("a")
()

CREATE TABLE a (
    id INTEGER NOT NULL,
    data VARCHAR,
    PRIMARY KEY (id)
)


()
COMMIT
BEGIN (implicit)
PRAGMA table_info("b_s1")
()

CREATE TABLE b_s1 (
    id INTEGER NOT NULL,
    data VARCHAR,
    PRIMARY KEY (id)
)


()
PRAGMA table_info("b_s2")
()

CREATE TABLE b_s2 (
    id INTEGER NOT NULL,
    data VARCHAR,
    PRIMARY KEY (id)
)


()
INSERT INTO a (data) VALUES (?)
('d1',)
COMMIT
BEGIN (implicit)
INSERT INTO a (data) VALUES (?) -- comment one
('d2',)
SELECT a.id AS a_id, a.data AS a_data
FROM a
WHERE a.data = ? -- comment one
('d1',)
UPDATE a SET data=? WHERE a.id = ? -- comment one
('d1modified', 1)
COMMIT
BEGIN (implicit)
INSERT INTO b_s1 (data) VALUES (?)
('d2',)
COMMIT
BEGIN (implicit)
SELECT a.id AS a_id, a.data AS a_data
FROM a JOIN b_s1 ON a.data = b_s1.data -- using shard s1
()

Updated