Clone wiki

sqlalchemy / UsageRecipes / GlobalFilter

Global Query Filter


NOTE: unfortunately this recipe was broken as of version 1.0. It is working again as of SQLAlchemy 1.2.5 with issue #4128, however note you must use .populate_existing() when querying for objects that are already present in the identity map in order for the options to be replaced with new ones.


The problem is to load all objects using some external filter criterion. In this example, every class in the model has a timestamp attribute, and we'd like to load multiple objects whose timestamps are all within a given date range.

Starting with imports and a model like:

from sqlalchemy import *
from sqlalchemy.orm import *
from sqlalchemy.ext.declarative import declarative_base

e = create_engine('sqlite://', echo=True)

Base = declarative_base(e)

class Parent(Base):
    __tablename__ = 'parent'
    id = Column(Integer, primary_key=True)
    timestamp = Column(TIMESTAMP, nullable=False)
    children = relation("Child")

class Child(Base):
    __tablename__ = 'child'
    id = Column(Integer, primary_key=True)
    parent_id = Column(Integer, ForeignKey('parent.id'), nullable=False)
    timestamp = Column(TIMESTAMP, nullable=False)

Base.metadata.create_all()

We would like to load Parent and Child using a range for timestamp, and apply it both to parent and children. Lazyloading + eagerloading + filtering.

An easy way to get an external value into the join condition of a relation is to use bindparam(), and supply the values at query time using query.params(). We add a relation temporal_children to Parent which accomplishes this:

Parent.temporal_children = relation(Child, 
                            primaryjoin=and_(
                                Parent.id==Child.parent_id,
                                Child.timestamp.between(
                                    bindparam("temporal_lower"),
                                    bindparam("temporal_upper")
                                )
                            ),viewonly=True
                        )

A session and some test data to get off the ground:

import datetime

session = sessionmaker()()

c1, c2, c3, c4, c5 = [
    Child(timestamp=datetime.datetime(2009, 10, 15, 12, 00, 00)),
    Child(timestamp=datetime.datetime(2009, 10, 17, 12, 00, 00)),
    Child(timestamp=datetime.datetime(2009, 10, 20, 12, 00, 00)),
    Child(timestamp=datetime.datetime(2009, 10, 12, 12, 00, 00)),
    Child(timestamp=datetime.datetime(2009, 10, 17, 12, 00, 00)),
]

p1, p2 = [
Parent(
    timestamp=datetime.datetime(2009, 10, 15, 12, 00, 00),
    children=[c1, c2, c3]
),
Parent(
    timestamp=datetime.datetime(2009, 10, 17, 12, 00, 00),
    children=[c4, c5]
)]

session.add_all([p1, p2])
session.commit()

We can now manually add the value for "temporal_lower" and "temporal_upper", to both Parent and Child, using more bindparam() objects as well as query.params(). Using an eagerload for temporal_children, the bind param value gets applied to both places:

parents = session.query(Parent).\
        filter(Parent.timestamp.between(bindparam("temporal_lower"), bindparam("temporal_upper"))).\
        params(temporal_lower=datetime.datetime(2009, 10, 16, 12, 00, 00)).\
        params(temporal_upper=datetime.datetime(2009, 10, 18, 12, 00, 00)).\
        options(eagerload(Parent.temporal_children)).\
        all()

assert parents[0] == p2
assert parents[0].temporal_children == [c5]

The above pattern will apply the daterange anywhere in the query it's specified, including if filtering is performed on Parent.temporal_children.

This doesn't yet address the "lazyload" case, and we'd like to simplify what we're doing above. For this we'll use MapperOption, and a little bit of underscore access in Query. This option does the same thing that we're doing above, except it will also get applied to subsequent lazyloads:

from sqlalchemy.orm.interfaces import MapperOption

class TemporalOption(MapperOption):
    propagate_to_loaders = True

    def __init__(self, range_lower, range_upper):
        self.range_lower = range_lower
        self.range_upper = range_upper

    def process_query_conditionally(self, query):
        """process query during a lazyload"""
        query._params = query._params.union(dict(
                temporal_lower=self.range_lower,
                temporal_upper=self.range_upper
            ))

    def process_query(self, query):
        """process query during a primary user query"""

        # apply bindparam values
        self.process_query_conditionally(query)

        # requires a query against a single mapper
        parent_cls = query._mapper_zero().class_
        filter_crit = parent_cls.timestamp.between(
            bindparam("temporal_lower"),
            bindparam("temporal_upper")
        )

        if query._criterion is None:
            query._criterion = filter_crit
        else:
            query._criterion = query._criterion & filter_crit

Usage now is through the option; note that in order to ensure the option is overwritten onto an existing Parent object that is already in the identity map, we must use the .populate_existing() method::

# try it with lazyload
parents = session.query(Parent).\
        populate_existing()\.
        options(
            TemporalOption(
                datetime.datetime(2009, 10, 16, 12, 00, 00), 
                datetime.datetime(2009, 10, 18, 12, 00, 00))
        ).all()


assert parents[0] == p2
assert parents[0].temporal_children == [c5]

session.expire_all()

# try it with eager load 
parents = session.query(Parent).\
        populate_existing().\
        options(
            TemporalOption(
                datetime.datetime(2009, 10, 16, 12, 00, 00), 
                datetime.datetime(2009, 10, 18, 12, 00, 00))
        ).\
        options(eagerload(Parent.temporal_children)).\
        all()


assert parents[0] == p2
assert parents[0].temporal_children == [c5]

session.expire_all()

# try some filtering
parents = session.query(Parent).\
        populate_existing().\
        options(
            TemporalOption(
                datetime.datetime(2009, 10, 15, 11, 00, 00), 
                datetime.datetime(2009, 10, 18, 12, 00, 00))
        ).\
        join(Parent.temporal_children).\
        filter(Child.id==2).\
        all()


assert parents[0] == p1
assert parents[0].temporal_children == [c1, c2]

The option we've set up is "sticky" for the lifetime of p1, p2. Expiring them doesn't clear the option from their internal state:

session.expire(parents[0], ['temporal_children'])
assert parents[0].temporal_children == [c1, c2]

session.expire(parents[0])
assert parents[0].temporal_children == [c1, c2]

Currently, the .populate_existing() method is the only public API that can clear these options; although for experimental access you can manipulate instance_state(p1).load_options to get at the MapperOption objects.

Updated