1. Michael Bayer
  2. sqlalchemy

Wiki

Clone wiki

sqlalchemy / UsageRecipes / GlobalFilter

Global Query Filter

The problem is to load all objects using some external filter criterion. In this example, every class in the model has a timestamp attribute, and we'd like to load multiple objects whose timestamps are all within a given date range.

Starting with imports and a model like:

from sqlalchemy import *
from sqlalchemy.orm import *
from sqlalchemy.ext.declarative import declarative_base

e = create_engine('sqlite://', echo=True)

Base = declarative_base(e)

class Parent(Base):
    __tablename__ = 'parent'
    id = Column(Integer, primary_key=True)
    timestamp = Column(TIMESTAMP, nullable=False)
    children = relation("Child")

class Child(Base):
    __tablename__ = 'child'
    id = Column(Integer, primary_key=True)
    parent_id = Column(Integer, ForeignKey('parent.id'), nullable=False)
    timestamp = Column(TIMESTAMP, nullable=False)

Base.metadata.create_all()

We would like to load Parent and Child using a range for timestamp, and apply it both to parent and children. Lazyloading + eagerloading + filtering.

An easy way to get an external value into the join condition of a relation is to use bindparam(), and supply the values at query time using query.params(). We add a relation temporal_children to Parent which accomplishes this:

Parent.temporal_children = relation(Child, 
                            primaryjoin=and_(
                                Parent.id==Child.parent_id,
                                Child.timestamp.between(
                                    bindparam("temporal_lower"),
                                    bindparam("temporal_upper")
                                )
                            ),viewonly=True
                        )

A session and some test data to get off the ground:

import datetime

session = sessionmaker()()

c1, c2, c3, c4, c5 = [
    Child(timestamp=datetime.datetime(2009, 10, 15, 12, 00, 00)),
    Child(timestamp=datetime.datetime(2009, 10, 17, 12, 00, 00)),
    Child(timestamp=datetime.datetime(2009, 10, 20, 12, 00, 00)),
    Child(timestamp=datetime.datetime(2009, 10, 12, 12, 00, 00)),
    Child(timestamp=datetime.datetime(2009, 10, 17, 12, 00, 00)),
]

p1, p2 = [
Parent(
    timestamp=datetime.datetime(2009, 10, 15, 12, 00, 00),
    children=[c1, c2, c3]
),
Parent(
    timestamp=datetime.datetime(2009, 10, 17, 12, 00, 00),
    children=[c4, c5]
)]

session.add_all([p1, p2])
session.commit()

We can now manually add the value for "temporal_lower" and "temporal_upper", to both Parent and Child, using more bindparam() objects as well as query.params(). Using an eagerload for temporal_children, the bind param value gets applied to both places:

parents = session.query(Parent).\
        filter(Parent.timestamp.between(bindparam("temporal_lower"), bindparam("temporal_upper"))).\
        params(temporal_lower=datetime.datetime(2009, 10, 16, 12, 00, 00)).\
        params(temporal_upper=datetime.datetime(2009, 10, 18, 12, 00, 00)).\
        options(eagerload(Parent.temporal_children)).\
        all()

assert parents[0] == p2
assert parents[0].temporal_children == [c5]

The above pattern will apply the daterange anywhere in the query it's specified, including if filtering is performed on Parent.temporal_children.

This doesn't yet address the "lazyload" case, and we'd like to simplify what we're doing above. For this we'll use MapperOption, and a little bit of underscore access in Query. This option does the same thing that we're doing above, except it will also get applied to subsequent lazyloads:

from sqlalchemy.orm.interfaces import MapperOption

class TemporalOption(MapperOption):
    propagate_to_loaders = True

    def __init__(self, range_lower, range_upper):
        self.range_lower = range_lower
        self.range_upper = range_upper

    def process_query_conditionally(self, query):
        """process query during a lazyload"""
        query._params = query._params.union(dict(
                temporal_lower=self.range_lower,
                temporal_upper=self.range_upper
            ))

    def process_query(self, query):
        """process query during a primary user query"""

        # apply bindparam values
        self.process_query_conditionally(query)

        # requires a query against a single mapper
        parent_cls = query._mapper_zero().class_
        filter_crit = parent_cls.timestamp.between(
            bindparam("temporal_lower"),
            bindparam("temporal_upper")
        )

        if query._criterion is None:
            query._criterion = filter_crit
        else:
            query._criterion = query._criterion & filter_crit

Usage now is through the option:

session.expire_all()  # for a clean test

# try it with lazyload
parents = session.query(Parent).\
        options(
            TemporalOption(
                datetime.datetime(2009, 10, 16, 12, 00, 00), 
                datetime.datetime(2009, 10, 18, 12, 00, 00))
        ).all()


assert parents[0] == p2
assert parents[0].temporal_children == [c5]

session.expire_all()

# try it with eager load 
parents = session.query(Parent).\
        options(
            TemporalOption(
                datetime.datetime(2009, 10, 16, 12, 00, 00), 
                datetime.datetime(2009, 10, 18, 12, 00, 00))
        ).\
        options(eagerload(Parent.temporal_children)).\
        all()


assert parents[0] == p2
assert parents[0].temporal_children == [c5]

session.expire_all()

# try some filtering
parents = session.query(Parent).\
        options(
            TemporalOption(
                datetime.datetime(2009, 10, 15, 11, 00, 00), 
                datetime.datetime(2009, 10, 18, 12, 00, 00))
        ).\
        join(Parent.temporal_children).\
        filter(Child.id==2).\
        all()


assert parents[0] == p1
assert parents[0].temporal_children == [c1, c2]

The option we've set up is "sticky" for the lifetime of p1, p2. Expiring them doesn't clear the option from their internal state:

session.expire(parents[0], ['temporal_children'])
assert parents[0].temporal_children == [c1, c2]

session.expire(parents[0])
assert parents[0].temporal_children == [c1, c2]

There's no public API at the moment to clear out these options. For new criterion, you need to start again with a new Parent instance:

session.expunge(p1)
p1 = session.query(Parent).options(TemporalOption(
        datetime.datetime(2009, 10, 15, 12, 00, 00), 
        datetime.datetime(2009, 10, 21, 12, 00, 00)
)).filter(Parent.id==p1.id).first()
assert p1.temporal_children == [c1, c2, c3]

Updated