Clone wiki

sqlalchemy / UsageRecipes / WindowedRangeQuery

WindowedRangeQuery

The goal is to select through a very large number of rows that's too large to fetch all at once. Many DBAPIs pre-buffer result sets fully, and otherwise it can be difficult to keep an active cursor when using an option like psycopg2's server side cursors. The usual alternative, i.e. to page through the results using LIMIT/OFFSET, has the downside that the OFFSET will scan through all the previous rows each time in order to get to the requested row. The recipe below instead provides a generic way to break a table into "windows" based on ranges of a particular column. The collection of ranges is then applied to a Query to produce a fairly seamless iteration of rows.

import sqlalchemy
from sqlalchemy import and_, func

def column_windows(session, column, windowsize):
    """Return a series of WHERE clauses against 
    a given column that break it into windows.

    Result is an iterable of tuples, consisting of
    ((start, end), whereclause), where (start, end) are the ids.

    Requires a database that supports window functions, 
    i.e. Postgresql, SQL Server, Oracle.

    Enhance this yourself !  Add a "where" argument
    so that windows of just a subset of rows can
    be computed.

    """
    def int_for_range(start_id, end_id):
        if end_id:
            return and_(
                column>=start_id,
                column<end_id
            )
        else:
            return column>=start_id

    q = session.query(
                column, 
                func.row_number().\
                        over(order_by=column).\
                        label('rownum')
                ).\
                from_self(column)
    if windowsize > 1:
        q = q.filter(sqlalchemy.text("rownum %% %d=1" % windowsize))

    intervals = [id for id, in q]

    while intervals:
        start = intervals.pop(0)
        if intervals:
            end = intervals[0]
        else:
            end = None
        yield int_for_range(start, end)

def windowed_query(q, column, windowsize):
    """"Break a Query into windows on a given column."""

    for whereclause in column_windows(
                                        q.session, 
                                        column, windowsize):
        for row in q.filter(whereclause).order_by(column):
            yield row

if __name__ == '__main__':
    from sqlalchemy import Column, Integer, create_engine
    from sqlalchemy.orm import Session
    from sqlalchemy.ext.declarative import declarative_base
    import random

    Base = declarative_base()

    class Widget(Base):
        __tablename__ = 'widget'
        id = Column(Integer, primary_key=True)
        data = Column(Integer)

    e = create_engine('postgresql://scott:tiger@localhost/test', echo='debug')

    Base.metadata.drop_all(e)
    Base.metadata.create_all(e)

    # get some random list of unique values
    data = set([random.randint(1, 1000000) for i in xrange(10000)])

    s = Session(e)
    s.add_all([Widget(id=i, data=j) for i, j in enumerate(data)])
    s.commit()

    q = s.query(Widget)

    for widget in windowed_query(q, Widget.data, 1000):
        print "data:", widget.data

Here's an example of the kind of SQL this emits:

-- first, it gets a list of ranges, with 1000 values in each bucket
SELECT anon_1.widget_data AS anon_1_widget_data 
FROM (SELECT widget.data AS widget_data, row_number() OVER (ORDER BY widget.data) AS rownum 
FROM widget) AS anon_1 
WHERE rownum %% 1000=1

Col ('anon_1_widget_data',)
Row (4,)
Row (100107,)
Row (200004,)
Row (299526,)
Row (397664,)
Row (502373,)
Row (597853,)
Row (695306,)
Row (798335,)
Row (899000,)

-- then, the original query is run for each window, adding in 
-- the extra range criterion

SELECT widget.id AS widget_id, widget.data AS widget_data 
FROM widget 
WHERE widget.data >= %(data_1)s AND widget.data < %(data_2)s ORDER BY widget.data
-- values: {'data_2': 100107, 'data_1': 4}
Col ('widget_id', 'widget_data')
Row (1, 4)
Row (64, 211)
Row (5415, 554)
Row (168, 568)
Row (203, 672)
Row (275, 914)
Row (343, 1124)
Row (344, 1132)
...


SELECT widget.id AS widget_id, widget.data AS widget_data 
FROM widget 
WHERE widget.data >= %(data_1)s AND widget.data < %(data_2)s ORDER BY widget.data
-- values: {'data_2': 200004, 'data_1': 100107}
Col ('widget_id', 'widget_data')
Row (544, 100107)
Row (549, 100120)
Row (583, 100225)
Row (3564, 100235)
Row (588, 100241)
Row (594, 100258)
Row (599, 100274)

...

Non Window Function Version

Don't have window functions on the target database? Here's an approach that uses LIMIT in conjunction with tracking the last fetched primary key:

def _yield_limit(qry, pk_attr, maxrq=100):
    """specialized windowed query generator (using LIMIT/OFFSET)

    This recipe is to select through a large number of rows thats too
    large to fetch at once. The technique depends on the primary key
    of the FROM clause being an integer value, and selects items
    using LIMIT."""

    firstid = None
    while True:
        q = qry
        if firstid is not None:
            q = qry.filter(pk_attr > firstid)
        rec = None
        for rec in q.order_by(pk_attr).limit(maxrq):
            yield rec
        if rec is None:
            break
        firstid = pk_attr.__get__(rec, pk_attr) if rec else None

if __name__ == '__main__':
    from sqlalchemy import create_engine, Column, Integer
    from sqlalchemy.orm import Session
    from sqlalchemy.ext.declarative import declarative_base

    Base = declarative_base()

    class A(Base):
        __tablename__ = 'a'

        id = Column(Integer, primary_key=True)

    e = create_engine("sqlite://", echo=True)
    Base.metadata.create_all(e)

    sess = Session(e)

    sess.add_all([A() for i in range(2000)])

    for rec in _yield_limit(sess.query(A), A.id):
        print(rec.id, rec)

Updated