Commits

Robert Brewer  committed 6eaa905

Revamp to store data in RAM. PG is now just for durability.

  • Participants
  • Parent commits e965310

Comments (0)

Files changed (7)

File flowrate/__init__.py

 import datetime
 import decimal
 import heapq
+from operator import itemgetter
 import os
 os.umask(000)
 thisdir = os.path.abspath(os.path.dirname(__file__))
 
 from sqlalchemy import create_engine
 
-db = None
-
-from flowrate import csvutil, flows
+from flowrate import csvutil, ledger, flows, cmpdates
 from flowrate.variables import ReferenceFinder, environment as env
 
 
+db = None
+
 def set_db(connstring, **kwargs):
     """Replace the global db with a new SQLAlchemy Engine."""
     global db
                         i.append(int(r[0].strip()))
     return list(set(i))
 
+def parse_date(date_string):
+    """Parse the given date string and return a datetime.date."""
+    return datetime.date(*map(int, date_string.split('-')))
 
-def transactions(accounts=None, credits=None, debits=None,
-                 years=None, months=None, days=None, description=None):
-    """Return Transaction objects (dicts) matching the given criteria."""
-    whereclause, args = [], {}
-    if accounts:
-        whereclause.append("(ARRAY[credit_account] <@ %(accounts)s"
-                           " OR ARRAY[debit_account] <@ %(accounts)s)")
+def ordered(iterable, *keys):
+    """Return the iterator, ordered by keys (descending if '-key')."""
+    for k in reversed(keys):
+        if k.startswith('-'):
+            iterable = sorted(iterable, key=itemgetter(k[1:]), reverse=True)
+        else:
+            iterable = sorted(iterable, key=itemgetter(k), reverse=False)
+    return iterable
 
-        # Grab all subaccounts of the given accounts.
-        all_accts = [row.id for row in db.execute(
-                     "SELECT id FROM accounts;").fetchall()]
-        def isSubAccount(child, parents):
-            for p in parents:
-                for scale in (1000, 100, 10, 1):
-                    if p % scale == 0 and p <= child < (p + scale):
-                        return True
-            return False
-        accounts = [a for a in all_accts if isSubAccount(a, accounts)]
+_subaccountmap = {}
+def isSubAccount(child, parents):
+    if isinstance(parents, tuple):
+        pass
+    elif isinstance(parents, list):
+        parents = tuple(parents)
+    else:
+        parents = (parents,)
 
-        args['accounts'] = accounts
-    if credits:
-        whereclause.append("ARRAY[credit_account] <@ %(credits)s")
-        args['credits'] = credits
-    if debits:
-        whereclause.append("ARRAY[debit_account] <@ %(debits)s")
-        args['debits'] = debits
-    if description:
-        whereclause.append("description ILIKE %(desc)s")
-        args['desc'] = '%' + description + '%';
-    if years:
-        whereclause.append("ARRAY[EXTRACT(year FROM postdate)::integer] <@ %(years)s")
-        args['years'] = years
-    if months:
-        whereclause.append("ARRAY[EXTRACT(month FROM postdate)::integer] <@ %(months)s")
-        args['months'] = months
-    if days:
-        whereclause.append("ARRAY[EXTRACT(day FROM postdate)::integer] <@ %(days)s")
-        args['days'] = days
-
-    if not whereclause:
-        # If there are no filters, return nothing instead of everything
-        return
-
-    # Yield all real transactions
-    whereclause = " AND ".join(whereclause)
-    sql = ("SELECT * FROM transactions WHERE " + whereclause +
-           " ORDER BY postdate DESC, credit_account, debit_account;")
-    rows = db.execute(sql, args).fetchall()
-    for row in rows:
-        yield {"id": row.id, "postdate": row.postdate,
-               "amount": row.amount,
-               "credit": row.credit_account, "debit": row.debit_account,
-               "description": row.description,
-               }
+    try:
+        return _subaccountmap[(child, parents)]
+    except KeyError:
+        for p in parents:
+            for scale in (1000, 100, 10, 1):
+                if p % scale == 0 and p <= child < (p + scale):
+                    _subaccountmap[(child, parents)] = True
+                    return True
+        _subaccountmap[(child, parents)] = False
+        return False
 
 
 class Transaction(object):
-    
+
     exposed = True
     description = (
 """A Flowrate transaction. PUT a Transaction representation to replace it
 (but note that the server will ignore any members other than those
 in the 'body' object). GET to return it. DELETE to destroy it.""")
-    
+
     @cherrypy.tools.json_out(handler=json_handler)
     def GET(self):
         """Return the transaction."""
         req = cherrypy.serving.request
 
-        row = db.execute(
-            "SELECT * FROM transactions WHERE id = %s;", (req.txid,)).fetchone()
-        if row is None:
+        try:
+            tx = ledger.transactions[req.txid]
+        except KeyError:
             raise cherrypy.NotFound()
 
         return {
             'self': cherrypy.url(),
             'description': self.description % vars(),
             'body': {
-                'amount': row['amount'],
-                'credit': cherrypy.url("/accounts/%s" % row['credit_account']),
-                'debit': cherrypy.url("/accounts/%s" % row['debit_account']),
-                'postdate': row['postdate'].isoformat(),
-                'description': row['description'],
+                'amount': tx['amount'],
+                'credit': cherrypy.url("/accounts/%s" % tx['credit']),
+                'debit': cherrypy.url("/accounts/%s" % tx['debit']),
+                'postdate': tx['postdate'].isoformat(),
+                'description': tx['description'],
                 },
             }
 
         """Update the given transaction entity."""
         req = cherrypy.serving.request
 
-        row = db.execute(
-            "SELECT * FROM transactions WHERE id = %s;", (req.txid,)).fetchone()
-        if row is None:
+        try:
+            tx = ledger.transactions[req.txid]
+        except KeyError:
             raise cherrypy.NotFound()
 
         if "body" not in req.json:
 
         # Accept the new transaction definition.
         vals = req.json["body"]
-        credit, debit = popint(vals['credit']), popint(vals['debit'])
-        credit_type = db.execute(
-            "SELECT type FROM accounts WHERE id = %s", (credit,)).fetchone()
-        debit_type = db.execute(
-            "SELECT type FROM accounts WHERE id = %s", (debit,)).fetchone()
-        amount = vals['amount']
-        txrow = db.execute(
-            "UPDATE transactions SET amount = %s, "
-            "credit_account = %s, debit_account = %s, postdate = %s, "
-            "description = %s, credit_mult = %s, debit_mult = %s "
-            "WHERE id = %s RETURNING *;",
-            (amount, credit, debit,
-             vals['postdate'], vals['description'],
-             -1 if credit_type['type'] in ('asset', 'expense') else 1,
-             1 if debit_type['type'] in ('asset', 'expense') else -1,
-             req.txid)).fetchone()
+        tx['credit'] = popint(vals['credit'])
+        tx['debit'] = popint(vals['debit'])
+        credit_type = ledger.accounts[tx['credit']]['type']
+        tx['credit_mult'] = -1 if credit_type in ('asset', 'expense') else 1
+        debit_type = ledger.accounts[tx['debit']]['type']
+        tx['debit_mult'] = 1 if debit_type in ('asset', 'expense') else -1
 
-        flows.unfulfill(txrow.id)
-        flows.fulfill(txrow)
+        tx['amount'] = vals['amount']
+        tx['postdate'] = parse_date(vals['postdate'])
+        tx['description'] = vals['description']
+        tx.save()
+
+        flows.unfulfill(tx)
+        flows.fulfill(tx)
 
         cherrypy.response.status = 204
 
     def DELETE(self):
         """Destroy the definition of this transaction."""
         req = cherrypy.serving.request
-        flows.unfulfill(req.txid)
-        db.execute("DELETE FROM transactions WHERE id = %s;", (req.txid,))
+
+        try:
+            tx = ledger.transactions[req.txid]
+        except KeyError:
+            raise cherrypy.NotFound()
+
+        flows.unfulfill(tx)
+        tx.delete()
+
         cherrypy.response.status = 204
 
 
 txmanager.GET = txmanager
 
 
+def filters(accts=None, credits=None, debits=None,
+            years=None, months=None, days=None, description=None):
+    """Return a list of boolean functions to check the given criteria."""
+    filters = []
+    if accts:
+        # Grab all subaccounts of the given accounts.
+        accts = [a for a in ledger.accounts if isSubAccount(a, accts)]
+        filters.append(
+            lambda tx: tx['credit'] in accts or tx['debit'] in accts)
+    if credits:
+        filters.append(lambda tx: tx['credit'] in credits)
+    if debits:
+        filters.append(lambda tx: tx['debit'] in debits)
+    if description:
+        filters.append(lambda tx: description.lower() in tx['description'].lower())
+    if years:
+        filters.append(lambda tx: tx['postdate'].year in years)
+    if months:
+        filters.append(lambda tx: tx['postdate'].month in months)
+    if days:
+        filters.append(lambda tx: tx['postdate'].day in days)
+    return filters
+
+
 class Transactions(object):
     
     exposed = True
         months = toInts(months)
         days = toInts(days)
 
-        realdata = transactions(accounts, credits, debits,
-                                years, months, days, description)
-        flowdata = flows.transactions(accounts, credits, debits,
-                                      years, months, days, description)
-        for tx in heapq.merge(realdata, flowdata):
-            if tx.get('id', None):
+        fs = filters(accounts, credits, debits,
+                     years, months, days, description)
+        # Decorate with the negative postdate ordinal so heapq.merge can sort.
+        realdata = ((-tx['postdate'].toordinal(), tx)
+                    for tx in ledger.find_transactions(fs))
+        flowdata = ((-tx['postdate'].toordinal(), tx)
+                    for tx in flows.find_transactions(fs))
+        for postdate, tx in heapq.merge(realdata, flowdata):
+            if tx['id'] is None:
+                del tx['id']
+            else:
                 tx['id'] = cherrypy.url("/transactions/%s" % tx['id'])
             tx['credit'] = cherrypy.url("/accounts/%s" % tx['credit'])
             tx['debit'] = cherrypy.url("/accounts/%s" % tx['debit'])
 
         # Accept the new transaction definition.
         vals = req.json["body"]
-        credit, debit = popint(vals['credit']), popint(vals['debit'])
-        credit_type = db.execute(
-            "SELECT type FROM accounts WHERE id = %s", (credit,)).fetchone()
-        debit_type = db.execute(
-            "SELECT type FROM accounts WHERE id = %s", (debit,)).fetchone()
-        amount = vals['amount']
-        newrow = db.execute(
-            "INSERT INTO transactions"
-            " (amount, credit_account, debit_account, postdate, description, "
-            "credit_mult, debit_mult) "
-            "VALUES (%s, %s, %s, %s, %s, %s, %s) RETURNING *",
-            (amount, credit, debit,
-             vals["postdate"], vals['description'],
-             -1 if credit_type['type'] in ('asset', 'expense') else 1,
-             1 if debit_type['type'] in ('asset', 'expense') else -1,
-             )).fetchone()
+        vals['credit'] = popint(vals['credit'])
+        vals['debit'] = popint(vals['debit'])
+        credit_type = ledger.accounts[vals['credit']]['type']
+        vals['credit_mult'] = -1 if credit_type in ('asset', 'expense') else 1
+        debit_type = ledger.accounts[vals['debit']]['type']
+        vals['debit_mult'] = 1 if debit_type in ('asset', 'expense') else -1
+        vals['postdate'] = parse_date(vals['postdate'])
 
-        flows.fulfill(newrow)
+        tx = ledger.Transaction(id=None, **vals)
+        tx.save()
+
+        flows.fulfill(tx)
 
         cherrypy.response.status = 201
-        cherrypy.response.headers['Location'] = cherrypy.url("%s" % newrow.id)
+        cherrypy.response.headers['Location'] = cherrypy.url("%s" % tx.id)
 
     def _cp_dispatch(self, vpath):
         segment = vpath.pop(0)
         # Determine the full range of buckets; even if our ledger has no
         # transactions within a given bucket, we still want to output it.
         if not years:
-            years = [row.year for row in db.execute(
-                "SELECT DISTINCT EXTRACT(year FROM postdate) AS year "
-                "FROM transactions;").fetchall()]
+            years = [tx['postdate'].year
+                     for tx in ledger.transactions.itervalues()]
             if not years:
                 years = [datetime.date.today().year]
         if not months:
             months = range(1, 13)
 
         if dategroup == 'year':
+            coerce_date = cmpdates.strkeys['years']
             dategroups = [str(y) for y in years]
-            pgdatefmt = 'YYYY'
         elif dategroup == 'day':
+            coerce_date = cmpdates.strkeys['days']
             dategroups = []
             for y in years:
                 for m in months:
                     for d in days or xrange(1, calendar.monthrange(y, m)[1] + 1):
-                        dategroups.append(datetime.date(y, m, d).isoformat())
-            pgdatefmt = 'YYYY-MM-DD'
+                        dategroups.append('%04d-%02d-%02d' % (y, m, d))
         else:
             # dategroup == 'month' or other
+            coerce_date = cmpdates.strkeys['months']
             dategroups = ['%04d-%02d' % (y, m) for y in years for m in months]
-            pgdatefmt = 'YYYY-MM'
 
         # Calculate the budget of each requested account for each
         # dategroup in the output.
         budgets = dict((a, dict((dg, [0, 0]) for dg in dategroups))
                        for a in accounts)
-        whereclause, args = [], {}
-        if accounts:
-            whereclause.append("ARRAY[account] <@ %(accounts)s")
-            args['accounts'] = accounts
-        for dg in dategroups:
-            wc = whereclause + [
-                "to_char(postdate, %(df)s) = %(dg)s"]
-            args['df'] = pgdatefmt
-            args['dg'] = dg
-            sql = ("SELECT account, SUM(total) AS budget "
-                   "FROM flowledger WHERE " +  " AND ".join(wc) +
-                   " GROUP BY 1 ORDER BY 1;")
-            for row in db.execute(sql, args).fetchall():
-                budgets[row.account][dg][0] = row.budget
-            sql = ("SELECT account, SUM(amount) AS spent "
-                   "FROM ledger WHERE " +  " AND ".join(wc) +
-                   " GROUP BY 1 ORDER BY 1;")
-            for row in db.execute(sql, args).fetchall():
-                budgets[row.account][dg][1] = row.spent
 
-        b['data'] = budgets
+        for tx in ledger.transactions.itervalues():
+            txdate = coerce_date(tx['postdate'])
+            if txdate not in dategroups:
+                continue
+
+            if (not accounts) or tx['credit'] in accounts:
+                credit_amount = tx['amount'] * tx['credit_mult']
+                budgets[tx['credit']][txdate][1] += credit_amount
+
+            if (not accounts) or tx['debit'] in accounts:
+                debit_amount = tx['amount'] * tx['debit_mult']
+                budgets[tx['debit']][txdate][1] += debit_amount
+
+        for flowid, flow in flows.flows.iteritems():
+            credit = (not accounts) or flow['credit'] in accounts
+            debit = (not accounts) or flow['debit'] in accounts
+            if not (credit or debit):
+                continue
+
+            if credit:
+                credit_bucket = budgets[flow['credit']]
+            if debit:
+                debit_bucket = budgets[flow['debit']]
+
+            for ob in flow.obligations:
+                obdate = coerce_date(ob['postdate'])
+                if obdate not in dategroups:
+                    continue
+
+                if credit:
+                    credit_bucket[obdate][0] += ob['amount'] * ob['credit_mult']
+
+                if debit:
+                    debit_bucket[obdate][0] += ob['amount'] * ob['debit_mult']
+
+
+        # Touch up the data for JSON, which doesn't allow non-string keys
+        b['data'] = dict(
+            (str(accountid), bucket)
+            for accountid, bucket in budgets.iteritems()
+            )
 
         return b
 
 
 class Flow(object):
-    
+
     exposed = True
     description = (
 """A Flowrate Flow. PUT a Flow representation to replace it
         """Return the flow."""
         req = cherrypy.serving.request
 
-        flow = flows.Flow(req.flowid)
-        flow.find()
-        if flow.row is None:
+        try:
+            flow = flows.flows[req.flowid]
+        except KeyError:
             raise cherrypy.NotFound()
 
         return {'self': cherrypy.url(),
                 'description': self.description % vars(),
                 'body': {
-                    'credit': cherrypy.url("/accounts/%s" % flow.credit_account),
-                    'debit': cherrypy.url("/accounts/%s" % flow.debit_account),
-                    'amount': flow.amount,
-                    'start': flow.range_start,
-                    'end': flow.range_end,
-                    'period': flow.period,
-                    'unit': flow.unit,
-                    'days': flow.days,
-                    'description': flow.description,
+                    'credit': cherrypy.url("/accounts/%s" % flow['credit']),
+                    'debit': cherrypy.url("/accounts/%s" % flow['debit']),
+                    'amount': flow['amount'],
+                    'start': flow['start'],
+                    'end': flow['end'],
+                    'period': flow['period'],
+                    'unit': flow['unit'],
+                    'days': flow['days'],
+                    'description': flow['description'],
                     },
                 }
 
         """Update the given flow entity."""
         req = cherrypy.serving.request
 
-        flow = flows.Flow(req.flowid)
-        flow.find()
-        if flow.row is None:
+        try:
+            flow = flows.flows[req.flowid]
+        except KeyError:
             raise cherrypy.NotFound()
 
         if "body" not in req.json:
 
         # Accept the new flow definition.
         vals = req.json["body"]
-        credit, debit = popint(vals['credit']), popint(vals['debit'])
+        vals['credit'] = popint(vals['credit'])
+        vals['debit'] = popint(vals['debit'])
+        flow.row.update(vals)
 
-        refs = set()
-        finder = ReferenceFinder()
-        for key in ('start', 'end', 'amount'):
-            val = vals[key].strip()
-            if val.startswith("="):
-                newrefs = finder.find(val[1:])
-                refs.update(newrefs)
-
-        flow.row = db.execute(
-            "UPDATE flows SET amount = %s, "
-            "credit_account = %s, debit_account = %s, "
-            "range_start = %s, range_end = %s, "
-            "period = %s, unit = %s, days = %s, "
-            "description = %s, variables = %s "
-            "WHERE id = %s RETURNING *;",
-            (vals['amount'], credit, debit, vals['start'], vals['end'],
-            vals['period'], vals['unit'], vals['days'], vals['description'],
-            list(refs), flow.id)).fetchone()
-
+        flow.save()
         flow.clear_obligations()
         flow.obligate()
 
         """Destroy the definition of this flow."""
         req = cherrypy.serving.request
 
-        f = flows.Flow(req.flowid)
-        f.clear_obligations()
-        f.delete()
+        flow = flows.flows[req.flowid]
+        flow.clear_obligations()
+        flow.delete()
         cherrypy.response.status = 204
 
 flowsmanager = cherrypy.tools.staticfile.handler(
                                  }},
             }
 
-        whereclause, args = [], {}
-        if accounts:
-            whereclause.append("(ARRAY[credit_account] <@ %(accounts)s"
-                               " OR ARRAY[debit_account] <@ %(accounts)s)")
-            args['accounts'] = toInts(accounts)
-        if credits:
-            whereclause.append("ARRAY[credit_account] <@ %(credit)s")
-            args['credits'] = toInts(credits)
-        if debits:
-            whereclause.append("ARRAY[debit_account] <@ %(debits)s")
-            args['debits'] = toInts(debits)
-        if description:
-            whereclause.append("description ILIKE %(desc)s")
-            args['desc'] = '%' + description + '%';
+        fs = filters(accts=toInts(accounts), credits=toInts(credits),
+                     debits=toInts(debits),
+                     # TODO: try to filter by years/months/days?
+                     years=None, months=None, days=None,
+                     description=description)
 
-        # TODO: try to filter by years/months/days?
-
-        sql = "SELECT * FROM flows"
-        if whereclause:
-            sql += " WHERE " + " AND ".join(whereclause)
-        sql += " ORDER BY debit_account, credit_account, range_start DESC;"
-        rows = db.execute(sql, args).fetchall()
+        flowobs = [flow for flow in flows.flows.itervalues()
+                   if all(f(flow) for f in fs)]
         t['data'] = [
-            {"id": cherrypy.url("/flows/%s" % row.id),
-             "start": row.range_start,
-             "end": row.range_end,
-             "period": row.period,
-             "unit": row.unit,
-             "days": row.days,
-             "amount": row.amount,
-             "credit": cherrypy.url("/accounts/%s" % row.credit_account),
-             "debit": cherrypy.url("/accounts/%s" % row.debit_account),
-             "description": row.description,
-             } for row in rows]
+            {"id": cherrypy.url("/flows/%s" % flow.id),
+             "start": flow['start'],
+             "end": flow['end'],
+             "period": flow['period'],
+             "unit": flow['unit'],
+             "days": flow['days'],
+             "amount": flow['amount'],
+             "credit": cherrypy.url("/accounts/%s" % flow['credit']),
+             "debit": cherrypy.url("/accounts/%s" % flow['debit']),
+             "description": flow['description'],
+             } for flow in ordered(flowobs, 'debit', 'credit', '-start')]
 
         return t
 
     
         # Accept the new flow definition.
         vals = req.json["body"]
-        vals["credit"] = popint(vals["credit"])
-        vals["debit"] = popint(vals["debit"])
-        flow = flows.Flow.insert(**vals)
+        vals["credit"] = popint(vals.pop("credit"))
+        vals["debit"] = popint(vals.pop("debit"))
+        flow = flows.Flow(None, **vals)
+        flow.save()
         flow.obligate()
 
         cherrypy.response.status = 201
-        cherrypy.response.headers['Location'] = cherrypy.url("%s" % flow.row.id)
+        cherrypy.response.headers['Location'] = cherrypy.url("%s" % flow.id)
 
     def _cp_dispatch(self, vpath):
         cherrypy.serving.request.flowid = int(vpath.pop(0))
         env.bind(name, vals['source'])
 
         # Calculate this variable and all its dependents.
-        dependents = list(env.calc(name))
-        dependents.append(name)
+        dependents = set(env.calc(name))
+        dependents.add(name)
 
         # Recalc any flows which depend on these variables.
-        for row in db.execute(
-                "SELECT * FROM flows WHERE variables && %(names)s;",
-                {"names": dependents or '{}'}).fetchall():
-            flow = flows.Flow(row.id, row)
-            flow.clear_obligations()
-            flow.obligate()
+        for flow in flows.flows.itervalues():
+            if set(flow['variables']).intersection(dependents):
+                flow.clear_obligations()
+                flow.obligate()
 
         if exists:
             db.execute("UPDATE variables SET source = %s WHERE name = %s;",
             "views": {"filtered": "{?accounts+,years+,months+,dategroup}"},
             }
 
-        assets = [row.id for row in db.execute(
-                  "SELECT id FROM accounts WHERE type = 'asset';").fetchall()]
+        assets = [id for id, acct in ledger.accounts.iteritems()
+                  if acct['type'] == 'asset']
         accounts = toInts(accounts)
         if accounts:
             accounts = [x for x in accounts if x in assets]
         # Determine the full range of buckets; even if our ledger has no
         # transactions within a given bucket, we still want to output it.
         if not years:
-            years = [row.year for row in db.execute(
-                "SELECT DISTINCT EXTRACT(year FROM postdate) AS year "
-                "FROM transactions;").fetchall()]
+            years = [tx['postdate'].year
+                     for tx in ledger.transactions.itervalues()]
             if not years:
                 years = [datetime.date.today().year]
         if not months:
             months = range(1, 13)
 
         if dategroup == 'year':
+            coerce_date = cmpdates.units['years']
             balance_dates = [datetime.date(y, 12, 31) for y in years]
             dategroups = [str(y) for y in years]
         elif dategroup == 'day':
+            coerce_date = cmpdates.units['days']
             balance_dates = []
             for y in years:
                 for m in months:
             dategroups = [d.isoformat() for d in balance_dates]
         else:
             # dategroup == 'month' or other
+            coerce_date = cmpdates.units['months']
             balance_dates = [datetime.date(y, m, calendar.monthrange(y, m)[1])
                              for y in years for m in months]
-            dategroups = ['%04d-%02d' % (y, m)
-                          for y in years for m in months]
+            dategroups = ['%04d-%02d' % (y, m) for y in years for m in months]
 
         # Calculate the balance of each requested account for each
         # dategroup in the output.
         balances = dict((a, dict((dg, 0) for dg in dategroups))
                        for a in accounts)
-        whereclause, args = [], {}
-        if accounts:
-            whereclause.append("ARRAY[account] <@ %(accounts)s")
-            args['accounts'] = accounts
-        for bd, dg in zip(balance_dates, dategroups):
-            # Grab ledger from transactions and obligations together
-            wc = whereclause + ["postdate <= %(balance_date)s"]
-            args['balance_date'] = bd
-            sql = ("SELECT account, SUM(amount) AS balance "
-                   "FROM fullledger WHERE " +  " AND ".join(wc) +
-                   " GROUP BY 1;")
-            for row in db.execute(sql, args).fetchall():
-                balances[row.account][dg] = row.balance
 
-        b['data'] = balances
+        dates_and_groups = zip(balance_dates, dategroups)
+
+        # Grab ledger entries from transactions
+        for tx in ledger.transactions.itervalues():
+            if (not accounts) or (tx['credit'] in accounts):
+                credit_amount = tx['amount'] * tx['credit_mult']
+                credit_bucket = balances[tx['credit']]
+            else:
+                credit_amount = None
+
+            if (not accounts) or (tx['debit'] in accounts):
+                debit_amount = tx['amount'] * tx['debit_mult']
+                debit_bucket = balances[tx['debit']]
+            else:
+                debit_amount = None
+
+            if credit_amount is None and debit_amount is None:
+                # This transaction doesn't bear on the selected accounts.
+                continue
+
+            for bd, dg in dates_and_groups:
+                if tx["postdate"] <= bd:
+                    if credit_amount is not None:
+                        credit_bucket[dg] += credit_amount
+                    if debit_amount is not None:
+                        debit_bucket[dg] += debit_amount
+
+        # Grab obligations between 30 days ago and the balance date
+        age = datetime.date.today() - datetime.timedelta(days=30)
+        for flow in flows.flows.itervalues():
+            if (not accounts) or (flow['credit'] in accounts):
+                credit_bucket = balances[flow['credit']]
+            else:
+                credit_bucket = None
+
+            if (not accounts) or (flow['debit'] in accounts):
+                debit_bucket = balances[flow['debit']]
+            else:
+                debit_bucket = None
+
+            if credit_bucket is None and debit_bucket is None:
+                # This obligation doesn't bear on the selected accounts.
+                continue
+
+            for ob in flow.obligations:
+                if ob['postdate'] < age:
+                    continue
+
+                credit_amount = ob['remaining'] * ob['credit_mult']
+                debit_amount = ob['remaining'] * ob['debit_mult']
+
+                for bd, dg in dates_and_groups:
+                    if ob["postdate"] <= bd:
+                        if credit_bucket is not None:
+                            credit_bucket[dg] += credit_amount
+
+                        if debit_bucket is not None:
+                            debit_bucket[dg] += debit_amount
+
+        # Touch up the data for JSON, which doesn't allow non-string keys
+        b['data'] = dict(
+            (str(accountid), bucket)
+            for accountid, bucket in balances.iteritems()
+            )
 
         return b
 
         
         # Return the definition for this account.
         # Look up the name in our config DB.
-        row = db.execute(
-            "SELECT * FROM accounts WHERE id = %s;", (req.account,)).fetchone()
-        if row is None:
+        try:
+            acct = ledger.accounts[req.account]
+        except KeyError:
             raise cherrypy.NotFound()
         
         return {
             "self": cherrypy.url(),
             "description": self.description % vars(),
-            "views": {"balances": "/balances?account=%s" % req.account},
+            "views": {"balances": "/balances?account=%s" % acct.id},
             "body": {
-                "name": row.name,
-                "type": row.type,
+                "name": acct['name'],
+                "type": acct['type'],
                 },
             }
     
         
         # Accept the new account definition.
         # Look up the name in our config DB.
-        row = db.execute("SELECT * FROM accounts WHERE id = %s;",
-                         (req.account,)).fetchone()
         vals = req.json["body"]
-        if row is None:
-            db.execute("INSERT INTO accounts (id, name, type) "
-                       "VALUES (%s, %s, %s)",
-                       (req.account, vals["name"], vals["type"]))
+        try:
+            acct = ledger.accounts[req.account]
+        except KeyError:
+            acct = ledger.Account(
+                id=req.account, name=vals['name'], type=vals['type'])
+            acct.create()
+            cherrypy.response.status = 201
             cherrypy.response.headers['Location'] = cherrypy.url()
         else:
-            db.execute("UPDATE accounts SET name = %s, type = %s "
-                       "WHERE id = %s;",
-                       (vals["name"], vals["type"], req.account))
-        cherrypy.response.status = 201 if row is None else 204
+            acct['name'] = vals["name"]
+            acct['vals'] = vals["type"]
+            acct.update()
+            cherrypy.response.status = 204
 
     def DELETE(self):
         req = cherrypy.serving.request
         # TODO: require ?force=true if any flows or transactions exist
-        db.execute("DELETE FROM accounts WHERE id = %s;", (req.account,))
+        try:
+            acct = ledger.accounts[req.account]
+        except KeyError:
+            raise cherrypy.NotFound()
+        acct.delete()
         cherrypy.response.status = 204
 
 account = Account()
 
     @cherrypy.tools.json_out(handler=json_handler)
     def GET(self):
-        rows = db.execute(
-            "SELECT id, name, type FROM accounts ORDER BY id;").fetchall()
         return {
             "self": cherrypy.url(),
             "description": self.description % vars(),
             "manager": cherrypy.url('/accounts/manager'),
             "example": {"body": {"name": "Gas & Electric", "type": "asset"}},
-            "data": [{"id": cherrypy.url("/accounts/%s" % row.id),
-                      "name": row.name,
-                      "type": row.type,
+            "data": [{"id": cherrypy.url("/accounts/%s" % id),
+                      "name": acct['name'],
+                      "type": acct['type'],
                       }
-                     for row in rows],
+                     for id, acct in sorted(ledger.accounts.items())],
             }
     
     def _cp_dispatch(self, vpath):

File flowrate/cmpdates.py

+import datetime
+
+
+class month(object):
+    """A given month in a given year. Like datetime.date without a .day field."""
+
+    __slots__ = ('year', 'month')
+
+    def __init__(self, year, month):
+        self.year = year
+        self.month = month
+
+    @classmethod
+    def fromdate(cls, dt):
+        return cls(dt.year, dt.month)
+
+    def todate(self, day=1):
+        return datetime.date(self.year, self.month, day)
+
+    def __add__(self, other):
+        # Add an integer number of months to self.
+        y, m = divmod(self.month + other, 12)
+        return month(self.year + y, m)
+
+    def __sub__(self, other):
+        if isinstance(other, month):
+            # Subtract a month() instance to return an integer diff of months
+            y, m = divmod(self.month - other.month, 12)
+            return (y * 12) + m
+        else:
+            # Subtract an integer number of months from self.
+            y, m = divmod(self.month - other, 12)
+            return month(self.year + y, m)
+
+    def __cmp__(self, other):
+        return cmp((self.year, self.month), (other.year, other.month))
+
+
+class week(object):
+    """A given week in a given year. Like datetime.date with a .week field."""
+
+    __slots__ = ('year', 'week')
+
+    def __init__(self, year, week):
+        self.year = year
+        self.week = week
+
+    @classmethod
+    def fromdate(cls, dt):
+        y, w, d =  dt.isocalendar()
+        return cls(y, w)
+
+    def todate(self, day=1):
+        jan1 = datetime.date(self.year, 1, 1)
+        day1ofweek1 = jan1 + datetime.timedelta(days=8 - jan1.isocalendar()[2])
+        return (day1ofweek1 +
+                datetime.timedelta(
+                    days=((self.week - 1) * 7) + (day - 1)
+                ))
+
+    def __add__(self, other):
+        # Add an integer number of weeks to self.
+        f = self.todate() + datetime.timedelta(days=other * 7)
+        y, w, d = f.isocalendar()
+        return week(y, w)
+
+    def __sub__(self, other):
+        if isinstance(other, week):
+            # Subtract a week() instance to return an integer diff of weeks
+            return ((self.todate() - other.todate()).days / 7)
+        else:
+            # Subtract an integer number of weeks from self.
+            f = self.todate() - datetime.timedelta(days=other * 7)
+            y, w, d = f.isocalendar()
+            return week(y, w)
+
+    def __cmp__(self, other):
+        return cmp((self.year, self.week), (other.year, other.week))
+
+
+# A map from 'units' (time periods) to coercion_funcs.
+# The coercion_func is used to coerce two dates to a common representation
+# at the granularity for the units. For example, if 'units' is 'months',
+# we want date(2012, 5, 1) to compare equal with date(2012, 5, 15).
+units = {
+    'days': lambda d: d,
+    'weeks': week.fromdate,
+    'months': month.fromdate,
+    'years': lambda d: d.year,
+    }
+# The day_func is used to determine if a date matches our 'days' value,
+# which, depending on the 'units', can mean "day of the month",
+# "day of the week", or "day of the year".
+days = {
+    'days': lambda d: 1,
+    'weeks': lambda d: d.weekday(),
+    'months': lambda d: d.day,
+    'years': lambda d: d.timetuple().tm_yday,
+    }
+
+# A map from units to coercion funcs which take dates and return strings.
+# These can then be compared to each and also used as dict keys for fast
+# lookups (but cannot be added and subtracted).
+strkeys = {
+    'days': lambda d: d.isoformat(),
+    'weeks': lambda d: '%04d,%02d' % d.isocalendar()[:2],
+    'months': lambda d: '%04d-%02d' % (d.year, d.month),
+    'years': lambda d: '%04d' % d.year,
+    }
+

File flowrate/flows.py

 import calendar
 import datetime
 import decimal
+import sys
 
 import flowrate
 from flowrate.variables import ReferenceFinder, environment as env
+from flowrate import cmpdates, ledger
 
 
-class month(object):
-    """A given month in a given year. Like datetime.date without a .day field."""
-
-    __slots__ = ('year', 'month')
-
-    def __init__(self, year, month):
-        self.year = year
-        self.month = month
-
-    @classmethod
-    def fromdate(cls, dt):
-        return cls(dt.year, dt.month)
-
-    def todate(self, day=1):
-        return datetime.date(self.year, self.month, day)
-
-    def __add__(self, other):
-        # Add an integer number of months to self.
-        y, m = divmod(self.month + other, 12)
-        return month(self.year + y, m)
-
-    def __sub__(self, other):
-        if isinstance(other, month):
-            # Subtract a month() instance to return an integer diff of months
-            y, m = divmod(self.month - other.month, 12)
-            return (y * 12) + m
-        else:
-            # Subtract an integer number of months from self.
-            y, m = divmod(self.month - other, 12)
-            return month(self.year + y, m)
-
-    def __cmp__(self, other):
-        return cmp((self.year, self.month), (other.year, other.month))
-
-
-class week(object):
-    """A given week in a given year. Like datetime.date with a .week field."""
-
-    __slots__ = ('year', 'week')
-
-    def __init__(self, year, week):
-        self.year = year
-        self.week = week
-
-    @classmethod
-    def fromdate(cls, dt):
-        y, w, d =  dt.isocalendar()
-        return cls(y, w)
-
-    def todate(self, day=1):
-        jan1 = datetime.date(self.year, 1, 1)
-        day1ofweek1 = jan1 + datetime.timedelta(days=8 - jan1.isocalendar()[2])
-        return (day1ofweek1 +
-                datetime.timedelta(
-                    days=((self.week - 1) * 7) + (day - 1)
-                ))
-
-    def __add__(self, other):
-        # Add an integer number of weeks to self.
-        f = self.todate() + datetime.timedelta(days=other * 7)
-        y, w, d = f.isocalendar()
-        return week(y, w)
-
-    def __sub__(self, other):
-        if isinstance(other, week):
-            # Subtract a week() instance to return an integer diff of weeks
-            return ((self.todate() - other.todate()).days / 7)
-        else:
-            # Subtract an integer number of weeks from self.
-            f = self.todate() - datetime.timedelta(days=other * 7)
-            y, w, d = f.isocalendar()
-            return week(y, w)
-
-    def __cmp__(self, other):
-        return cmp((self.year, self.week), (other.year, other.week))
+# A table of (flow.id, Flow()) pairs.
+flows = {}
 
 
 class Flow(object):
 
-    def __init__(self, id, row=None):
+    fields = ('credit', 'debit', 'description',
+              'period', 'unit', 'days', 'variables', 'amount',
+              'start', 'end')
+
+    def __init__(self, id, **kwargs):
         self.id = id
-        self.row = row
+        self.row = dict((k, kwargs[k]) for k in self.fields if k != 'variables')
 
-    def find(self):
-        """Set self.row to a DB row matching the given ID, or None."""
-        self.row = flowrate.db.execute(
-            "SELECT * FROM flows WHERE id = %s;", (self.id,)).fetchone()
+        if 'variables' in kwargs:
+            self.row['variables'] = kwargs['variables']
+        else:
+            self.row['variables'] = list(self.get_refs())
+
+        self.obligations = []
 
     @classmethod
-    def insert(cls, **vals):
-        flow = cls(None)
+    def load_all(cls):
+        """Populate the flows dictionary from the database."""
+        for row in flowrate.db.execute("SELECT * FROM flows;").fetchall():
+            data = {}
+            for k in cls.fields:
+                f = k
+                if f in ('start', 'end'):
+                    f = 'range_' + f
+                elif f in ('credit', 'debit'):
+                    f = f + '_account'
+                data[k] = row[f]
+            flow = cls(row.id, **data)
+            flows[row.id] = flow
 
+    def __getitem__(self, key):
+        return self.row[key]
+
+    def __setitem__(self, key, value):
+        self.row[key] = value
+
+    def get_refs(self):
+        """Return the set of all variable names referenced by this flow."""
         refs = set()
         finder = ReferenceFinder()
         for key in ('start', 'end', 'amount'):
-            val = vals[key]
-            if not isinstance(val, basestring):
+            val = self.row[key]
+            if isinstance(val, basestring):
                 # Allow 'amount' entries to be numbers
-                val = str(val)
-            val = val.strip()
-            if val.startswith("="):
-                newrefs = finder.find(val[1:])
-                refs.update(newrefs)
+                val = val.strip()
+                if val.startswith("="):
+                    newrefs = finder.find(val[1:])
+                    refs.update(newrefs)
+        return refs
 
-        flow.row = flowrate.db.execute("INSERT INTO flows"
-                   " (amount, credit_account, debit_account, "
-                   "range_start, range_end, period, unit, days, description, "
-                   "variables) "
-                   "VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) "
-                   "RETURNING *",
-                   (vals["amount"],
-                    vals["credit"], vals["debit"],
-                    vals["start"], vals["end"],
-                    vals["period"], vals['unit'], vals["days"],
-                    vals["description"], list(refs),
-                    )).fetchone()
-        flow.id = flow.row.id
-        return flow
+    def save(self):
+        self.row['variables'] = list(self.get_refs())
+
+        if self.id is None:
+            row = flowrate.db.execute(
+                "INSERT INTO flows"
+                " (amount, credit_account, debit_account, range_start, range_end,"
+                " period, unit, days, description, variables) "
+                "VALUES (%(amount)s, %(credit)s, %(debit)s,"
+                " %(start)s, %(end)s, %(period)s, %(unit)s, %(days)s,"
+                " %(description)s, %(variables)s) "
+                "RETURNING *", self.row).fetchone()
+            self.id = row.id
+            flows[self.id] = self
+        else:
+            row = {'id': self.id}
+            row.update(self.row)
+            flowrate.db.execute(
+                "UPDATE flows SET amount = %(amount)s, "
+                "credit_account = %(credit)s, "
+                "debit_account = %(debit)s, "
+                "range_start = %(start)s, range_end = %(end)s, "
+                "period = %(period)s, unit = %(unit)s, days = %(days)s, "
+                "description = %(description)s, variables = %(variables)s "
+                "WHERE id = %(id)s;", row)
 
     def delete(self):
         flowrate.db.execute("DELETE FROM flows WHERE id = %s;", (self.id,))
+        flows.pop(self.id, None)
 
-    def __getattr__(self, key):
-        return getattr(self.row, key)
+    def get_obligations(self):
+        """Yield obligation rows for the given flow."""
+        credit_type = ledger.accounts[self['credit']]['type']
+        credit_mult = -1 if credit_type in ('asset', 'expense') else 1
+        debit_type = ledger.accounts[self['debit']]['type']
+        debit_mult = 1 if debit_type in ('asset', 'expense') else -1
 
-    def obligations(self):
-        """Yield obligation rows for the given flow."""
-        if self.unit == 'months':
-            unit, day = month.fromdate, lambda d: d.day
-        elif self.unit == 'weeks':
-            unit, day = week.fromdate, lambda d: d.weekday()
-        elif self.unit == 'years':
-            unit, day = lambda d: d.year, lambda d: d.timetuple().tm_yday
+        unit = cmpdates.units[self['unit']]
+        day = cmpdates.days[self['unit']]
 
-        start = self.range_start.strip()
+        start = self['start'].strip()
         if start.startswith("="):
             # It's a Python expression
             start = env.eval(start[1:], locals())
             # Assume it's an ISO date YYYY-MM-DD
             start = datetime.date(*map(int, start.split("-")))
 
-        end = self.range_end.strip()
+        end = self['end'].strip()
         if end.startswith("="):
             # It's a Python expression
             end = env.eval(end[1:], locals())
 
         for d in range(0, (end - start).days + 1):
             postdate = start + datetime.timedelta(days=d)
-            if day(postdate) not in self.days:
+            if day(postdate) not in self['days']:
                 continue
 
             p = unit(postdate)
             # This will be an integer number of units between post and start
             diff = p - unit(start)
-            # But range_start might be after one or more of our self.day(s)
-            for fd in sorted(self.days):
+            # But start might be after one or more of our self.day(s)
+            for fd in sorted(self['days']):
                 if day(start) > fd:
                     diff -= 1
 
-            if diff % self.period == 0:
+            if diff % self['period'] == 0:
                 # Eval the amount inside this loop to allow it to refer
                 # to the postdate or the iteration variable "d"
-                amount = self.amount.strip()
+                amount = self['amount'].strip()
                 if amount.startswith("="):
                     # It's a Python expression
                     amount = env.eval(amount[1:], locals())
                     # Assume it's a number
                     amount = decimal.Decimal(amount)
 
-                # Yield one transaction for the year/month/week on the day.
+                # Yield one obligation for the year/month/week on the day.
                 yield {
-                    'id': None,
+                    'flowid': self.id,
                     'postdate': postdate,
-                    'credit': self.credit_account,
-                    'debit': self.debit_account,
-                    'description': self.description,
+                    'credit': self['credit'],
+                    'credit_mult': credit_mult,
+                    'debit': self['debit'],
+                    'debit_mult': debit_mult,
+                    'description': self['description'],
                     'amount': amount,
+                    'remaining': amount
                     }
 
     def obligate(self):
-        """Insert obligation rows for the given flow; fulfill as possible."""
-        if self.unit == 'years':
-            dategroupformat = 'YYYY'
-        elif self.unit == 'days':
-            dategroupformat = 'YYYY-MM-DD'
-        else:
-            dategroupformat = 'YYYY-MM'
+        """Insert obligations for self; fulfill as possible."""
+        coerce_date = cmpdates.strkeys[self['unit']]
 
-        credit_type = flowrate.db.execute(
-            "SELECT type FROM accounts WHERE id = %s",
-            (self.credit_account,)).fetchone()
-        debit_type = flowrate.db.execute(
-            "SELECT type FROM accounts WHERE id = %s",
-            (self.debit_account,)).fetchone()
+        obs = list(self.get_obligations())
 
-        obs = {}
-        for ob in self.obligations():
-            # TODO: this is slow. Can we change it to an "INSERT INTO ... FROM"?
-            row = flowrate.db.execute(
-                "INSERT INTO obligations "
-                "(flowid, postdate, credit_account, debit_account,"
-                " description, amount, remaining, dategroupformat,"
-                " credit_mult, debit_mult) "
-                "VALUES (%s, %s, %s, %s, %s, %s, 0.0, %s, %s, %s) "
-                "RETURNING id;",
-                (self.id, ob['postdate'], ob['credit'], ob['debit'],
-                 ob['description'], ob['amount'], dategroupformat,
-                 -1 if credit_type['type'] in ('asset', 'expense') else 1,
-                 1 if debit_type['type'] in ('asset', 'expense') else -1,
-                 )).fetchone()
-            obs[row.id] = ob
+        # Now, fulfill the new obligations.
+        # Start by finding the subset of transactions which match accounts;
+        # this will be the same for each obligation since they all have the
+        # same flow.
+        txtable = {}
+        for tx in ledger.transactions.itervalues():
+            if (flowrate.isSubAccount(tx['credit'], self['credit']) and
+                flowrate.isSubAccount(tx['debit'], self['debit'])):
+                txtable.setdefault(coerce_date(tx['postdate']), []).append(tx)
+        # Order by debit account descending so that
+        # more specific accounts match before more general ones.
+        for d, txs in txtable.iteritems():
+            txs.sort(key=lambda tx: (0 - tx['debit'], tx['postdate']))
 
-        # Now, fulfill the new obligations
-        for obid, ob in obs.iteritems():
-            obrem = ob['amount']
-            for tx in flowrate.db.execute(
-                "SELECT t.*, "
-                "(SELECT COALESCE(SUM(f.amount), 0) FROM fulfillments f"
-                " WHERE f.transactionid = t.id) AS fulfilled "
-                "FROM transactions t "
-                "WHERE isSubAccount(t.credit_account, %s) "
-                "AND isSubAccount(t.debit_account, %s) "
-                "AND (to_char(t.postdate, %s) = to_char(%s, %s)) "
-                "ORDER BY t.debit_account DESC, t.postdate ASC;",
-                (ob['credit'], ob['debit'],
-                 dategroupformat, ob['postdate'], dategroupformat)).fetchall(
-                ):
-                txrem = tx.amount - tx.fulfilled
+        for ob in obs:
+            sys.stdout.write(".")
+            sys.stdout.flush()
+            obrem = ob['remaining']
+            obdate = coerce_date(ob['postdate'])
+
+            for tx in txtable.get(obdate, []):
+                used = sum(f['amount'] for f in tx.fulfillments)
+                txrem = tx['amount'] - used
                 if obrem > 0 and txrem > 0:
                     f_amt = min(obrem, txrem)
-                    flowrate.db.execute(
-                        "INSERT INTO fulfillments "
-                        "(transactionid, obligationid, amount) "
-                        "VALUES (%s, %s, %s);", (tx.id, obid, f_amt))
+                    tx.fulfillments.append({'obligation': ob, 'amount': f_amt})
                     obrem -= f_amt
                     if obrem <= 0:
-                        # Don't allow obligations.remaining to be < 0, below
+                        # Don't allow obligations.remaining to be < 0
                         obrem = 0
                         break
 
-            flowrate.db.execute(
-                "UPDATE obligations SET remaining = %s "
-                "WHERE id = %s;", (obrem, obid))
+            ob['remaining'] = obrem
+
+        # Only swap in the new obligations once all the
+        # fulfillment calculations are done.
+        self.obligations = obs
 
     def clear_obligations(self):
         """Remove all existing obligations (and their fulfillments) for self."""
         # TODO: This isn't quite right; it only deletes existing fulfillments
         # for this flow. It might want to delete all fulfillments which
-        # *might* apply to this flow, so it can "take over" other existing
-        # fulfillments. Tough nut.
-        flowrate.db.execute(
-            "DELETE FROM fulfillments "
-            "WHERE obligationid"
-            " IN (SELECT id FROM obligations WHERE flowid = %s); "
+        # *might* apply to this flow, so self.obligate() can "take over"
+        # other existing fulfillments. Tough nut.
+        for tx in ledger.transactions.itervalues():
+            tx.fulfillments = [f for f in tx.fulfillments
+                               if f['obligation'] not in self.obligations]
+        self.obligations = []
 
-            "DELETE FROM obligations WHERE flowid = %s;",
-            (self.id, self.id))
 
-
-def isSubAccount(child, parents):
-    for p in parents:
-        for scale in (1000, 100, 10, 1):
-            if p % scale == 0 and p <= child < (p + scale):
-                return True
-    return False
-
-def fulfill(txrow):
+def fulfill(tx):
     """Use the given transaction to fulfill an obligation, if possible.
 
     Any existing fulfillments for the given transaction will be deleted.
     than for income. They are then ordered by date, ascending, within each
     debit account.
     """
-    if txrow.amount <= 0:
+    if tx['amount'] <= 0:
         return
 
-    obs = [(row.id, row.amount, row.remaining)
-           for row in flowrate.db.execute(
-                "SELECT * FROM obligations "
-                "WHERE isSubAccount(%s, credit_account) "
-                "AND isSubAccount(%s, debit_account) "
-                "AND (to_char(%s, dategroupformat) = "
-                     "to_char(postdate, dategroupformat)) "
-                "ORDER BY debit_account DESC, postdate ASC",
-                (txrow.credit_account, txrow.debit_account,
-                 txrow.postdate)).fetchall()]
+    allobs = []
+    for flowid, flow in flows.iteritems():
+        unit = cmpdates.units[flow['unit']]
 
-    txrem = txrow.amount
-    for obid, obamount, obrem in obs:
-        f_amt = min(obrem, txrem)
+        for ob in flow.obligations:
+            if (flowrate.isSubAccount(tx['credit'], ob['credit']) and
+                flowrate.isSubAccount(tx['debit'], ob['debit']) and
+                unit(tx['postdate']) == unit(ob['postdate'])
+                ):
+                allobs.append(ob)
+
+    txrem = tx['amount']
+    for ob in flowrate.ordered(allobs, '-debit', 'postdate'):
+        f_amt = min(ob['remaining'], txrem)
         if f_amt > 0:
-            flowrate.db.execute(
-                "INSERT INTO fulfillments "
-                "(transactionid, obligationid, amount) VALUES (%s, %s, %s);",
-                (txrow.id, obid, f_amt))
-            flowrate.db.execute(
-                "UPDATE obligations SET remaining = remaining - %s "
-                "WHERE id = %s;", (f_amt, obid))
+            tx.fulfillments.append({'obligation': ob, 'amount': f_amt})
+            ob['remaining'] -= f_amt
             txrem -= f_amt
             if txrem <= 0:
                 break
 
-def unfulfill(txid):
+def unfulfill(tx):
     """Delete any fulfillments for the given transaction. Update obligations."""
-    obids = [row.obligationid for row in flowrate.db.execute(
-        "DELETE FROM fulfillments "
-        "WHERE transactionid = %s RETURNING obligationid;",
-        (txid,)).fetchall()]
-    for obid in obids:
-        flowrate.db.execute(
-            "UPDATE obligations ob SET ob.remaining = ob.amount - "
-            "(SELECT COALESCE(SUM(f.amount), 0) FROM fulfillments f"
-            " WHERE f.obligationid = ob.id) "
-            "WHERE ob.id = %s;", (obid,))
+    while tx.fulfillments:
+        f = tx.fulfillments.pop()
+        f['obligation']['remaining'] += f['amount']
 
 
-def transactions(accounts=None, credits=None, debits=None,
-                 years=None, months=None, days=None,
-                 description=None):
-    """Yield flow transactions matching the given criteria (by postdate desc)."""
-    whereclause, args = [], {}
-    if accounts:
-        whereclause.append("(ARRAY[credit_account] <@ %(accounts)s"
-                           " OR ARRAY[debit_account] <@ %(accounts)s)")
-        all_accts = [row.id for row in flowrate.db.execute(
-                     "SELECT id FROM accounts;").fetchall()]
-        args['accounts'] = [a for a in all_accts if isSubAccount(a, accounts)]
-    if credits:
-        whereclause.append("ARRAY[credit_account] <@ %(credits)s")
-        args['credits'] = credits
-    if debits:
-        whereclause.append("ARRAY[debit_account] <@ %(debits)s")
-        args['debits'] = debits
-    if description:
-        whereclause.append("description ILIKE %(desc)s")
-        args['desc'] = '%' + description + '%';
-    if years:
-        whereclause.append("ARRAY[EXTRACT(year FROM postdate)::integer] <@ %(years)s")
-        args['years'] = years
-    if months:
-        whereclause.append("ARRAY[EXTRACT(month FROM postdate)::integer] <@ %(months)s")
-        args['months'] = months
-    if days:
-        whereclause.append("ARRAY[EXTRACT(day FROM postdate)::integer] <@ %(days)s")
-        args['days'] = days
+def find_transactions(filters):
+    """Return flow transactions matching the given criteria (by postdate desc)."""
+    if not filters:
+        # If there are no filters, return nothing instead of everything
+        return []
 
-    if not whereclause:
-        # If there are no filters, return nothing instead of everything
-        return
+    # Don't let old unfulfilled obligations screw up balances
+    txs = []
+    age = datetime.date.today() - datetime.timedelta(days=30)
+    for flowid, flow in flows.iteritems():
+        for ob in flow.obligations:
+            if ob['remaining'] > 0 and ob['postdate'] > age:
+                if all(f(ob) for f in filters):
+                    txs.append({
+                        'id': None,
+                        'postdate': ob['postdate'],
+                        'credit': ob['credit'],
+                        'debit': ob['debit'],
+                        'description': ob['description'],
+                        'amount': ob['remaining'],
+                        })
+    return flowrate.ordered(txs, '-postdate')
 
-    for ob in flowrate.db.execute(
-        "SELECT * FROM obligations "
-        "WHERE " + " AND ".join(whereclause) +
-        # Don't let old unfulfilled obligations screw up balances
-        " AND postdate >= (CURRENT_DATE - '1 month'::interval)"
-        " AND remaining > 0 "
-        "ORDER BY postdate DESC;", args).fetchall(
-        ):
-        yield {
-            'postdate': ob.postdate,
-            'credit': ob.credit_account,
-            'debit': ob.debit_account,
-            'description': ob.description,
-            'amount': ob.remaining,
-            }
-

File flowrate/ledger.py

+import flowrate
+
+
+accounts = {}
+
+class Account(object):
+
+    fields = ('name', 'type')
+
+    def __init__(self, id, **kwargs):
+        self.id = id
+        self.row = dict((k, kwargs[k]) for k in self.fields)
+
+    @classmethod
+    def load_all(cls):
+        """Populate the accounts dictionary from the database."""
+        for row in flowrate.db.execute("SELECT * FROM accounts;").fetchall():
+            acct = cls(row.id, **dict((k, row[k]) for k in cls.fields))
+            accounts[row.id] = acct
+
+    def __getitem__(self, key):
+        return self.row[key]
+
+    def __setitem__(self, key, value):
+        self.row[key] = value
+
+    def create(self):
+        flowrate.db.execute(
+            "INSERT INTO accounts (id, name, type) "
+            "VALUES (%(id)s, %(name)s, %(type)s);",
+            {'id': self.id, 'name': self['name'], 'type': self['type']}
+            )
+        accounts[self.id] = self
+
+    def update(self):
+        row = {'id': self.id}
+        row.update(self.row)
+        flowrate.db.execute(
+            "UPDATE accounts SET name = %(name)s, type = %(type)s "
+            "WHERE id = %(id)s;", row)
+
+    def delete(self):
+        flowrate.db.execute(
+            "DELETE FROM accounts WHERE id = %s;", (self.id,))
+        accounts.pop(self.id, None)
+
+
+transactions = {}
+
+class Transaction(object):
+
+    fields = ('postdate', 'credit', 'debit', 'description',
+              'amount', 'credit_mult', 'debit_mult')
+
+    def __init__(self, id, **kwargs):
+        self.id = id
+        self.row = dict((k, kwargs[k]) for k in self.fields)
+        self.fulfillments = []
+
+    @classmethod
+    def load_all(cls):
+        """Populate the transactions dictionary from the database."""
+        for row in flowrate.db.execute("SELECT * FROM transactions;").fetchall():
+            data = {}
+            for k in cls.fields:
+                f = k
+                if f in ('credit', 'debit'):
+                    f = f + '_account'
+                data[k] = row[f]
+            tx = cls(row.id, **data)
+            transactions[row.id] = tx
+
+    def __getitem__(self, key):
+        return self.row[key]
+
+    def __setitem__(self, key, value):
+        self.row[key] = value
+
+    def save(self):
+        if self.id is None:
+            row = flowrate.db.execute(
+                "INSERT INTO transactions"
+                " (postdate, credit_account, debit_account, description,"
+                " amount, credit_mult, debit_mult) "
+                "VALUES (%(postdate)s, %(credit)s, %(debit)s,"
+                " %(description)s, %(amount)s, %(credit_mult)s, %(debit_mult)s) "
+                "RETURNING *", self.row).fetchone()
+            self.id = row.id
+            transactions[self.id] = self
+        else:
+            row = {'id': self.id}
+            row.update(self.row)
+            flowrate.db.execute(
+                "UPDATE transactions SET postdate = %(postdate)s, "
+                "credit_account = %(credit)s, "
+                "debit_account = %(debit)s, "
+                "description = %(description)s, amount = %(amount)s "
+                "credit_mult = %(credit_mult)s, debit_mult = %(debit_mult)s, "
+                "WHERE id = %(id)s;", row)
+
+    def delete(self):
+        flowrate.db.execute(
+            "DELETE FROM transactions WHERE id = %s;", (self.id,))
+        transactions.pop(self.id, None)
+
+
+def find_transactions(filters):
+    """Return Transaction objects (dicts) matching the given criteria."""
+    if not filters:
+        # If there are no filters, return nothing instead of everything
+        return []
+
+    txs = [{'id': tx.id, 'postdate': tx['postdate'],
+            'credit': tx['credit'], 'debit': tx['debit'],
+            'description': tx['description'], 'amount': tx['amount']}
+           for tx in transactions.itervalues()
+           if all(f(tx) for f in filters)]
+    return flowrate.ordered(txs, '-postdate', 'credit', 'debit')
+

File flowrate/run.py

 import simplejson
 
 import flowrate
-from flowrate import dbutil
+from flowrate import dbutil, ledger, flows
 from flowrate.variables import environment as env
 
 
     start.priority = 70
 
 
-class EnvLoader(plugins.SimplePlugin):
+class DataLoader(plugins.SimplePlugin):
 
     def __init__(self, bus):
         self.bus = bus
 
     def start(self):
+        # Do this before flows since they need variables to obligate/fulfill.
         self.bus.log("Loading Flowrate variables...")
 
         for row in flowrate.db.execute("SELECT * FROM variables;"
                                        ).fetchall():
             env.bind(row['name'], row['source'])
-
         env.calc_all()
 
         #print "Environment:"
         #for name, source in env.variables.iteritems():
         #    print name, source, '=>', env.locals[name]
 
+        self.bus.log("Loading Flowrate accounts...")
+        ledger.Account.load_all()
+
+        # Do this first because Flow.load_all tries to obligate
+        self.bus.log("Loading Flowrate transactions...")
+        ledger.Transaction.load_all()
+
+        self.bus.log("Loading Flowrate flows...")
+        flows.Flow.load_all()
+
+        self.bus.log("Calculating Flowrate obligations...")
+        for flow in flows.flows.itervalues():
+            self.bus.log(flow['description'])
+            flow.obligate()
+
     start.priority = Postgres.start.priority + 5
 
 
 def run(config):
     cherrypy.engine.timeout_monitor.unsubscribe()
     Postgres(cherrypy.engine, config['db_info']).subscribe()
-    EnvLoader(cherrypy.engine).subscribe()
+    DataLoader(cherrypy.engine).subscribe()
     FlowrateApp(cherrypy.engine, config).subscribe()
     cherrypy.engine.start()
     cherrypy.engine.block()

File flowrate/ui/flows.html

     var flow = {};
     flow.start = $('edit_start').value;
     flow.end = $('edit_end').value;
-    flow.period = $('edit_period').value;
+    flow.period = parseInt($('edit_period').value);
     flow.unit = $('edit_unit').value;
     flow.days = [];
     var d = $('edit_days').value.split(",");

File flowrate/variables.py

         try:
             return eval(source, self.globals, l)
         except Exception, e:
-            e.args += (source, extra_locals)
+            e.args += (source, self.globals, l)
             raise
 
     def bind(self, name, source):