1. Robert Brewer
  2. flowrate

Commits

Robert Brewer  committed 4681778

Added a Flow model class, and started a test suite

  • Participants
  • Parent commits 027ceee
  • Branches default

Comments (0)

Files changed (11)

File flowrate/__init__.py

View file
 """Flowrate, a web-based cash flow planner.
 
-Flowrate's main task is to record and report cash flows.
+Flowrate's main task is to record and report cash flows and their fulfillment.
 
 """
 
 
     manager = txmanager
 
+    def __init__(self):
+        # This triggers tools.trailing_slash to force a redirect if missing
+        self.index = self
+
     @cherrypy.tools.json_out()
     def GET(self, accounts='', credits='', debits='',
             years='', months='', days='', description='',
         """Return the flow."""
         req = cherrypy.serving.request
 
-        row = db.execute(
-            "SELECT * FROM flows WHERE id = %s;", (req.flowid,)).fetchone()
-        if row is None:
+        flow = flows.Flow()
+        flow.find(req.flowid)
+        if flow.row is None:
             raise cherrypy.NotFound()
 
         return {'self': cherrypy.url(),
                 'description': self.description % vars(),
                 'body': {
-                    'amount': row['amount'],
-                    'credit': cherrypy.url("/accounts/%s" % row['credit_account']),
-                    'debit': cherrypy.url("/accounts/%s" % row['debit_account']),
-                    'start': row['range_start'],
-                    'end': row['range_end'],
-                    'period': row['period'],
-                    'unit': row['unit'],
-                    'days': row['days'],
-                    'description': row['description'],
+                    'amount': flow.amount,
+                    'credit': cherrypy.url("/accounts/%s" % flow.credit_account),
+                    'debit': cherrypy.url("/accounts/%s" % flow.debit_account),
+                    'start': flow.range_start.isoformat(),
+                    'end': flow.range_end.isoformat(),
+                    'period': flow.period,
+                    'unit': flow.unit,
+                    'days': flow.days,
+                    'description': flow.description,
                     },
                 }
 
         """Update the given flow entity."""
         req = cherrypy.serving.request
 
-        row = db.execute(
-            "SELECT * FROM flows WHERE id = %s;", (req.flowid,)).fetchone()
-        if row is None:
+        flow = flows.Flow()
+        flow.find(req.flowid)
+        if flow.row is None:
             raise cherrypy.NotFound()
 
         if "body" not in req.json:
         # Accept the new flow definition.
         vals = req.json["body"]
         credit, debit = popint(vals['credit']), popint(vals['debit'])
-        flowrow = db.execute(
+        flow.row = db.execute(
             "UPDATE flows SET amount = %s, "
             "credit_account = %s, debit_account = %s, "
             "range_start = %s, range_end = %s, "
             vals['period'], vals['unit'], vals['days'], vals['description'],
             req.flowid)).fetchone()
 
-        # Remove all existing obligations
-        # (and their fulfillments) for this flow
-        # TODO: This isn't quite right; it only deletes existing fulfillments
-        # for this flow. It might want to delete all fulfillments which
-        # *might* apply to this flow, so it can "take over" other existing
-        # fulfillments. Tough nut.
-        db.execute(
-            "DELETE FROM fulfillments "
-            "WHERE obligationid"
-            " IN (SELECT id FROM obligations WHERE flowid = %s); "
-
-            "DELETE FROM obligations WHERE flowid = %s;",
-            (req.flowid, req.flowid))
-
-        flows.obligate(flowrow)
+        flow.clear_obligations()
+        flow.obligate()
 
         cherrypy.response.status = 204
 
 
     manager = flowsmanager
 
+    def __init__(self):
+        # This triggers tools.trailing_slash to force a redirect if missing
+        self.index = self
+
     @cherrypy.tools.json_out()
     def GET(self, accounts='', credits='', debits='',
             years='', months='', days='', description='',
     
         # Accept the new flow definition.
         vals = req.json["body"]
-        newrow = db.execute("INSERT INTO flows"
+        flow = flows.Flow()
+        flow.row = db.execute("INSERT INTO flows"
                    " (amount, credit_account, debit_account, "
                    "range_start, range_end, period, unit, days, description) "
                    "VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s) RETURNING *",
                     vals["description"],
                     )).fetchone()
 
-        flows.obligate(newrow)
+        flow.obligate()
 
         cherrypy.response.status = 201
-        cherrypy.response.headers['Location'] = cherrypy.url("%s" % newrow.id)
+        cherrypy.response.headers['Location'] = cherrypy.url("%s" % flow.row.id)
 
     def _cp_dispatch(self, vpath):
         cherrypy.serving.request.flowid = int(vpath.pop(0))
 
     manager = acctmanager
 
+    def __init__(self):
+        # This triggers tools.trailing_slash to force a redirect if missing
+        self.index = self
+
     @cherrypy.tools.json_out()
     def GET(self):
         rows = db.execute(
     transactions = Transactions()
     json2_js = json2_js
     common_js = common_js
-    
+
+    def __init__(self):
+        # This triggers tools.trailing_slash to force a redirect if missing
+        self.index = self
+
     @cherrypy.tools.json_out()
     def GET(self):
         return {

File flowrate/db.py

-import os
-thisdir = os.path.abspath(os.path.dirname(__file__))
-
-
-class PgDatabase(object):
-    """A Postgres database.
-    
-    Usage for, say, a test run that starts from scratch:
-        db = PgDatabase(db=db_info['database'], **cluster_info)
-        db.drop_if_exists()
-        db.create('cube.sql')
-        db.execute('flowrate_master.sql')
-        db.execute('%s/testing/db/fixture.sql' % base)
-    """
-    
-    def __init__(self, dbname, user='postgres', host='localhost', port=5432,
-                 superuser='postgres', template='template1'):
-        self.dbname = dbname
-        self.user = user
-        self.host = host
-        self.port = port
-        self.superuser = superuser
-        self.template = template
-    
-    def psql(self, cmd):
-        """Invoke psql with the given cmd.  Connargs are taken from self."""
-        system('psql -q -U %s -h %s -p %s %s %s' % 
-               (self.user, self.host, self.port, cmd, self.dbname))
-    
-    def execute(self, script):
-        """Execute the given script (using "psql -f").
-        
-        The 'script' arg must be a SQL filename which can be executed
-        via "psql -f". If given and not absolute, it will be join'ed
-        with the directory of this package.
-        """
-        if not os.path.isabs(script):
-            script = os.path.abspath(os.path.join(thisdir, script))
-        self.psql('-f %s' % script)
-    
-    def super_psql(self, cmd):
-        """Invoke psql as the postgres supersuer with the given cmd.  Connargs are taken from self."""
-        system('psql -q -U %s -h %s -p %s %s %s' % 
-               (self.superuser, self.host, self.port, cmd, self.template))
-    
-    def create(self, script=None):
-        """CREATE this DATABASE.
-        
-        If 'script' is given, it must be a SQL file which can be executed
-        via "psql -f". If given and not absolute, it will be join'ed with
-        the directory of this module.
-        """
-        self.super_psql('-c "CREATE DATABASE %s WITH OWNER %s"' % (self.dbname, self.user))
-        if script:
-            if not os.path.isabs(script):
-                script = os.path.abspath(os.path.join(thisdir, script))
-            self.psql('-f %s' % script)
-    
-    def drop(self):
-        """DROP this DATABASE."""
-        self.super_psql('-c "DROP DATABASE %s"' % self.dbname)
-    
-    def drop_if_exists(self):
-        """DROP this DATABASE, if it exists."""
-        self.super_psql('-c "DROP DATABASE IF EXISTS %s"' % self.dbname)
-    
-    def create_user(self):
-        """CREATE this USER."""
-        self.super_psql('-c "CREATE USER %s"' % self.user)
-    
-    def drop_user(self):
-        """DROP this USER."""
-        self.super_psql('-c "DROP USER %s"' % self.user)
-
-
-def system(command_string, verbose=False):
-    """Like os.system(), except that we raise RuntimeError in those
-    cases where os.system() returns nonzero.
-    
-    This helps catch errors that can otherwise slip by undetected.
-    
-    This is really just a stopgap; the subprocess module is better.
-    """
-    if verbose:
-        print 'Running:', command_string
-    status = os.system(command_string)
-    if status != 0:
-        raise RuntimeError, \
-              'command "%s" terminated with status %d' % (command_string, status)
-    return status  # Yeah, it's guaranteed to be 0 at this point
-

File flowrate/dbutil.py

View file
+import os
+thisdir = os.path.abspath(os.path.dirname(__file__))
+
+
+class PgDatabase(object):
+    """A Postgres database.
+    
+    Usage for, say, a test run that starts from scratch:
+        db = PgDatabase(db=db_info['database'], **cluster_info)
+        db.drop_if_exists()
+        db.create('cube.sql')
+        db.execute('flowrate_master.sql')
+        db.execute('%s/testing/db/fixture.sql' % base)
+    """
+    
+    def __init__(self, database, user='postgres', host='localhost', port=5432,
+                 superuser='postgres', template='template1', **kwargs):
+        self.database = database
+        self.user = user
+        self.host = host
+        self.port = port
+        self.superuser = superuser
+        self.template = template
+    
+    def psql(self, cmd):
+        """Invoke psql with the given cmd.  Connargs are taken from self."""
+        system('psql -q -U %s -h %s -p %s %s %s' % 
+               (self.user, self.host, self.port, cmd, self.database))
+    
+    def execute(self, script):
+        """Execute the given script (using "psql -f").
+        
+        The 'script' arg must be a SQL filename which can be executed
+        via "psql -f". If given and not absolute, it will be join'ed
+        with the directory of this package.
+        """
+        if not os.path.isabs(script):
+            script = os.path.abspath(os.path.join(thisdir, script))
+        self.psql('-f %s' % script)
+    
+    def super_psql(self, cmd):
+        """Invoke psql as the postgres supersuer with the given cmd.  Connargs are taken from self."""
+        system('psql -q -U %s -h %s -p %s %s %s' % 
+               (self.superuser, self.host, self.port, cmd, self.template))
+    
+    def create(self, script=None):
+        """CREATE this DATABASE.
+        
+        If 'script' is given, it must be a SQL file which can be executed
+        via "psql -f". If given and not absolute, it will be join'ed with
+        the directory of this module.
+        """
+        self.super_psql('-c "CREATE DATABASE %s WITH OWNER %s"' % (self.database, self.user))
+        if script:
+            if not os.path.isabs(script):
+                script = os.path.abspath(os.path.join(thisdir, script))
+            self.psql('-f %s' % script)
+    
+    def drop(self):
+        """DROP this DATABASE."""
+        self.super_psql('-c "DROP DATABASE %s"' % self.database)
+    
+    def drop_if_exists(self):
+        """DROP this DATABASE, if it exists."""
+        self.super_psql('-c "DROP DATABASE IF EXISTS %s"' % self.database)
+    
+    def create_user(self):
+        """CREATE this USER."""
+        self.super_psql('-c "CREATE USER %s"' % self.user)
+    
+    def drop_user(self):
+        """DROP this USER."""
+        self.super_psql('-c "DROP USER %s"' % self.user)
+
+
+def system(command_string, verbose=False):
+    """Like os.system(), except that we raise RuntimeError in those
+    cases where os.system() returns nonzero.
+    
+    This helps catch errors that can otherwise slip by undetected.
+    
+    This is really just a stopgap; the subprocess module is better.
+    """
+    if verbose:
+        print 'Running:', command_string
+    status = os.system(command_string)
+    if status != 0:
+        raise RuntimeError, \
+              'command "%s" terminated with status %d' % (command_string, status)
+    return status  # Yeah, it's guaranteed to be 0 at this point
+

File flowrate/flowrate.sql

View file
     range_end date NOT NULL,
     period integer,
     unit text,
-    days integer[],
+    days integer[]
 );
 
 
 
 
 --
+-- Name: fki_obligations_flowid; Type: INDEX; Schema: public; Owner: postgres; Tablespace: 
+--
+
+CREATE INDEX fki_obligations_flowid ON obligations USING btree (flowid);
+
+
+--
 -- Name: obligations_flowid_fkey; Type: FK CONSTRAINT; Schema: public; Owner: postgres
 --
 

File flowrate/flows.py

View file
         return cmp((self.year, self.week), (other.year, other.week))
 
 
-def obligations(flow):
-    """Yield obligation rows for the given flow row."""
-    # Add dummy transactions for this flow based on its unit
-    if flow.unit == 'months':
-        unit, day = month.fromdate, lambda d: d.day
-        pgfmt = 'YYYY-MM'
-    elif flow.unit == 'weeks':
-        unit, day = week.fromdate, lambda d: d.weekday()
-        pgfmt = 'IYYY-IW'
-    elif flow.unit == 'years':
-        unit, day = lambda d: d.year, lambda d: d.timetuple().tm_yday
-        pgfmt = 'YYYY'
+class Flow(object):
 
-    for d in range(0, (flow.range_end - flow.range_start).days + 1):
-        postdate = flow.range_start + datetime.timedelta(days=d)
-        if day(postdate) not in flow.days:
-            continue
+    def __init__(self):
+        self.row = None
 
-        p = unit(postdate)
-        # This will be an integer number of units between post and start
-        diff = p - unit(flow.range_start)
-        # But range_start might be after one or more of our flow.day(s)
-        for fd in sorted(flow.days):
-            if day(flow.range_start) > fd:
-                diff -= 1
+    def find(self, flowid):
+        """Set self.row to a DB row matching the given ID, or None."""
+        self.row = flowrate.db.execute(
+            "SELECT * FROM flows WHERE id = %s;", (flowid,)).fetchone()
 
-        if diff % flow.period == 0:
-            # Yield one transaction for the year/month/week on the day.
-            yield {
-                'id': None, 'postdate': postdate,
-                'credit': flow.credit_account,
-                'debit': flow.debit_account,
-                'description': flow.description,
-                'amount': flow.amount,
-                }
+    def __getattr__(self, key):
+        return getattr(self.row, key)
 
-def obligate(flowrow):
-    """Insert obligation rows for the given flow; fulfill as possible."""
-    if flowrow.unit == 'years':
-        dategroupformat = 'YYYY'
-    elif flowrow.unit == 'days':
-        dategroupformat = 'YYYY-MM-DD'
-    else:
-        dategroupformat = 'YYYY-MM'
+    def obligations(self):
+        """Yield obligation rows for the given flow."""
+        # Add dummy transactions for this flow based on its unit
+        if self.unit == 'months':
+            unit, day = month.fromdate, lambda d: d.day
+        elif self.unit == 'weeks':
+            unit, day = week.fromdate, lambda d: d.weekday()
+        elif self.unit == 'years':
+            unit, day = lambda d: d.year, lambda d: d.timetuple().tm_yday
 
-    obs = {}
-    for ob in obligations(flowrow):
-        # TODO: this is slow. Can we change it to an "INSERT INTO ... FROM"?
-        row = flowrate.db.execute(
-            "INSERT INTO obligations "
-            "(flowid, postdate, credit_account, debit_account,"
-            " description, amount, dategroupformat) "
-            "VALUES (%s, %s, %s, %s, %s, %s, %s) RETURNING id;",
-            (flowrow.id, ob['postdate'], ob['credit'], ob['debit'],
-             ob['description'], ob['amount'], dategroupformat)).fetchone()
-        obs[row.id] = ob
+        for d in range(0, (self.range_end - self.range_start).days + 1):
+            postdate = self.range_start + datetime.timedelta(days=d)
+            if day(postdate) not in self.days:
+                continue
 
-    # Now, fulfill the new obligations
-    for obid, ob in obs.iteritems():
-        obrem = ob['amount']
-        for tx in flowrate.db.execute(
-            "SELECT t.*, "
-            "(SELECT COALESCE(SUM(f.amount), 0) FROM fulfillments f"
-            " WHERE f.transactionid = t.id) AS fulfilled "
-            "FROM transactions t "
-            "WHERE isSubAccount(t.credit_account, %s) "
-            "AND isSubAccount(t.debit_account, %s) "
-            "AND (to_char(t.postdate, %s) = to_char(%s, %s)) "
-            "ORDER BY t.postdate ASC;",
-            (ob['credit'], ob['debit'],
-             dategroupformat, ob['postdate'], dategroupformat)).fetchall(
-            ):
-            txrem = tx.amount - tx.fulfilled
-            if obrem > 0 and txrem > 0:
-                f_amt = min(obrem, txrem)
-                flowrate.db.execute(
-                    "INSERT INTO fulfillments "
-                    "(transactionid, obligationid, amount) "
-                    "VALUES (%s, %s, %s);", (tx.id, obid, f_amt))
-                obrem -= f_amt
-                if obrem <= 0:
-                    break
+            p = unit(postdate)
+            # This will be an integer number of units between post and start
+            diff = p - unit(self.range_start)
+            # But range_start might be after one or more of our self.day(s)
+            for fd in sorted(self.days):
+                if day(self.range_start) > fd:
+                    diff -= 1
+
+            if diff % self.period == 0:
+                # Yield one transaction for the year/month/week on the day.
+                yield {
+                    'id': None, 'postdate': postdate,
+                    'credit': self.credit_account,
+                    'debit': self.debit_account,
+                    'description': self.description,
+                    'amount': self.amount,
+                    }
+
+    def obligate(self):
+        """Insert obligation rows for the given flow; fulfill as possible."""
+        if self.unit == 'years':
+            dategroupformat = 'YYYY'
+        elif self.unit == 'days':
+            dategroupformat = 'YYYY-MM-DD'
+        else:
+            dategroupformat = 'YYYY-MM'
+
+        obs = {}
+        for ob in self.obligations():
+            # TODO: this is slow. Can we change it to an "INSERT INTO ... FROM"?
+            row = flowrate.db.execute(
+                "INSERT INTO obligations "
+                "(flowid, postdate, credit_account, debit_account,"
+                " description, amount, dategroupformat) "
+                "VALUES (%s, %s, %s, %s, %s, %s, %s) RETURNING id;",
+                (self.id, ob['postdate'], ob['credit'], ob['debit'],
+                 ob['description'], ob['amount'], dategroupformat)).fetchone()
+            obs[row.id] = ob
+
+        # Now, fulfill the new obligations
+        for obid, ob in obs.iteritems():
+            obrem = ob['amount']
+            for tx in flowrate.db.execute(
+                "SELECT t.*, "
+                "(SELECT COALESCE(SUM(f.amount), 0) FROM fulfillments f"
+                " WHERE f.transactionid = t.id) AS fulfilled "
+                "FROM transactions t "
+                "WHERE isSubAccount(t.credit_account, %s) "
+                "AND isSubAccount(t.debit_account, %s) "
+                "AND (to_char(t.postdate, %s) = to_char(%s, %s)) "
+                "ORDER BY t.debit_account DESC, t.postdate ASC;",
+                (ob['credit'], ob['debit'],
+                 dategroupformat, ob['postdate'], dategroupformat)).fetchall(
+                ):
+                txrem = tx.amount - tx.fulfilled
+                if obrem > 0 and txrem > 0:
+                    f_amt = min(obrem, txrem)
+                    flowrate.db.execute(
+                        "INSERT INTO fulfillments "
+                        "(transactionid, obligationid, amount) "
+                        "VALUES (%s, %s, %s);", (tx.id, obid, f_amt))
+                    obrem -= f_amt
+                    if obrem <= 0:
+                        break
+
+    def clear_obligations(self):
+        """Remove all existing obligations (and their fulfillments) for self."""
+        # TODO: This isn't quite right; it only deletes existing fulfillments
+        # for this flow. It might want to delete all fulfillments which
+        # *might* apply to this flow, so it can "take over" other existing
+        # fulfillments. Tough nut.
+        flowrate.db.execute(
+            "DELETE FROM fulfillments "
+            "WHERE obligationid"
+            " IN (SELECT id FROM obligations WHERE flowid = %s); "
+
+            "DELETE FROM obligations WHERE flowid = %s;",
+            (self.id, self.id))
+
 
 def isSubAccount(child, parents):
     for p in parents:

File flowrate/run.py

View file
 from cherrypy.process import plugins
 
 import flowrate
+from flowrate import dbutil
+
+
+class Postgres(plugins.SimplePlugin):
+
+    def __init__(self, bus, config):
+        self.bus = bus
+        self.config = config
+
+    def start(self):
+        self.bus.log("Connecting to Postgres database...")
+        flowrate.set_db(
+            'postgresql://%(user)s:%(password)s@%(host)s:%(port)s/%(database)s' %
+            self.config, echo=False, max_overflow=10,
+            strategy='threadlocal')
+        self.bus.log("Connected to Postgres database.")
+    start.priority = 30
+
+    def recreate(self):
+        d = dbutil.PgDatabase(**self.config)
+        self.bus.log("Dropping Postgres database %s..." %
+                     self.config['database'])
+        d.drop_if_exists()
+        self.bus.log("Creating Postgres database %s..." %
+                     self.config['database'])
+        d.create('flowrate.sql')
+        self.bus.log("Postgres database created.")
+
+
+class FlowrateApp(plugins.SimplePlugin):
+
+    def __init__(self, bus, config):
+        self.bus = bus
+        self.config = config
+
+    def start(self):
+        self.bus.log("Mounting Flowrate application...")
+        
+        cherrypy.config.update({
+            'server.socket_port': self.config['port'],
+            'server.socket_host': self.config['host'],
+            })
+        appconf = {
+            '/': {
+                'tools.cpstats.on': True,
+                'request.dispatch': cherrypy.dispatch.MethodDispatcher(),
+                'tools.trailing_slash.status': 307,
+            },
+            "/okapi": {
+                "tools.staticdir.on": True,
+                "tools.staticdir.dir": os.path.join(thisdir, 'okapi'),
+            },
+            "/cpstats": {
+                'request.dispatch': cherrypy.dispatch.Dispatcher(),
+            },
+            "/manager": {
+                'request.dispatch': cherrypy.dispatch.Dispatcher(),
+            },
+        }
+        cpenv = self.config.get('environment', 'development')
+        if cpenv != 'development':
+            appconf['global']['environment'] = cpenv
+
+        cherrypy.server.statistics = True
+
+        self.app = cherrypy.tree.mount(flowrate.root, '/', appconf)
+    start.priority = 70
 
 
 def run(config):
-    flowrate.set_db(
-        'postgresql://%(user)s:%(password)s@%(host)s:%(port)s/%(database)s' %
-        config['db_info'], echo=False, max_overflow=10, strategy='threadlocal')
-    
     cherrypy.engine.timeout_monitor.unsubscribe()
-    
-    appconf = {
-        'global': {
-            'server.socket_port': config.get('port', 8090),
-            'server.socket_host': config.get('host', '127.0.0.1'),
-        },
-        '/': {
-            'tools.cpstats.on': True,
-            'request.dispatch': cherrypy.dispatch.MethodDispatcher(),
-            'tools.trailing_slash.status': 307,
-        },
-        "/okapi": {
-            "tools.staticdir.on": True,
-            "tools.staticdir.dir": os.path.join(thisdir, 'okapi'),
-        },
-        "/cpstats": {
-            'request.dispatch': cherrypy.dispatch.Dispatcher(),
-        },
-        "/manager": {
-            'request.dispatch': cherrypy.dispatch.Dispatcher(),
-        },
-    }
-    cpenv = config.get('environment', 'development')
-    if cpenv != 'development':
-        appconf['global']['environment'] = cpenv
-
-    cherrypy.server.statistics = True
-    cherrypy.quickstart(flowrate.root, '/', appconf)
+    Postgres(cherrypy.engine, config['db_info']).subscribe()
+    FlowrateApp(cherrypy.engine, config).subscribe()
+    cherrypy.engine.start()
+    cherrypy.engine.block()
 
 
 usage = 'flowrate/run.py <config>'

File flowrate/testing/__init__.py

View file
+import os
+thisdir = os.path.abspath(os.path.dirname(__file__))
+import traceback
+
+import cherrypy
+from cherrypy.process.plugins import SimplePlugin
+from cherrypy.test import webtest
+import simplejson
+
+# Nose hooks. We would just use setup() instead of start() and let nose run
+# it as a package setup, but unfortunately nose imports the test module
+# before running package setup methods. That means we can't import modules
+# from panoptic that depend on having a configured and/or running server.
+def start():
+    try:
+        _start()
+    except:
+        # If errors occur during start(), we don't want nose to move on to
+        # the next test and call start() again.
+        traceback.print_exc()
+        raise SystemExit
+
+def _start():
+    if cherrypy.engine.state == cherrypy.engine.states.STARTED:
+        return
+
+    fname = os.path.join(thisdir, 'test.cfg')
+    config = simplejson.loads(open(fname, 'rb').read())
+
+    webtest.WebCase.PORT = config['port']
+
+    host = config['host']
+    if host == '0.0.0.0':
+        host = '127.0.0.1'
+    elif host == '::':
+        host = '::1'
+    webtest.WebCase.HOST = host
+
+    from flowrate import run
+    cherrypy.engine.timeout_monitor.unsubscribe()
+    pg = run.Postgres(cherrypy.engine, config['db_info'])
+    pg.subscribe()
+    pg.recreate()
+    run.FlowrateApp(cherrypy.engine, config).subscribe()
+    cherrypy.engine.start()
+    cherrypy.engine.wait(cherrypy.engine.states.STARTED, interval=1)
+
+
+def teardown():
+    cherrypy.engine.exit()
+
+
+class FlowrateTest(webtest.WebCase):
+
+    def assert_(self, expr, msg=None):
+        if not expr:
+            raise AssertionError(msg if msg is not None else expr)
+
+    def assertEqual(self, x, y, msg=None):
+        self.assert_(x == y, msg or "%r != %r" % (x, y))
+
+    def base(self):
+        if ((self.scheme == "http" and self.PORT == 80) or
+            (self.scheme == "https" and self.PORT == 443)):
+            port = ""
+        else:
+            port = ":%s" % self.PORT
+        
+        return "%s://%s%s" % (self.scheme, self.HOST, port)
+
+    @property
+    def json(self):
+        return simplejson.loads(self.body)
+
+    def define_account(self, id, **kwargs):
+        """Add the given account to the system."""
+        loc = '/accounts/%s' % id
+        b = simplejson.dumps({'body': kwargs})
+        h = [('Content-Type', 'application/json'),
+             ('Content-Length', str(len(b)))]
+        self.getPage(loc, method="PUT", headers=h, body=b)
+        self.assertStatus((201, 204))
+        self.getPage(loc)
+        self.assertStatus(200)
+        self.assertEqual(self.json['body'], kwargs)
+
+    def define_flow(self, **kwargs):
+        """Add the given flow to the system."""
+        b = simplejson.dumps({'body': kwargs})
+        h = [('Content-Type', 'application/json'),
+             ('Content-Length', str(len(b)))]
+        self.getPage('/flows/', method="POST", headers=h, body=b)
+        self.assertStatus(201)
+        loc = self.assertHeader('Location')
+        print loc
+        self.getPage(loc)
+        self.assertStatus(200)
+        self.assertEqual(self.json['body'], kwargs)
+

File flowrate/testing/config.py

-{"cluster_info": {
-    "host": "localhost",
-    "superuser": "postgres",
-    "template": "template1",
-    "port": 5434
-    },
- "db_info": {
-    "host": "localhost",
-    "database": "flowrate",
-    "user": "postgres",
-    "password": "",
-    "port": 5434
-    },
- "port": 8090
-}
-

File flowrate/testing/db.py

View file
-import sys
-
-from flowrate.db import PgDatabase
-from flowrate.testing import config
-
-import flowrate
-
-flowrate.set_db(
-    'postgresql://%(user)s:%(password)s@%(host)s:%(port)s/%(database)s' %
-    config.db_info, echo=False, max_overflow=10, strategy="threadlocal")
-#flowrate.handle_error = traceback.print_exc
-
-def create_databases():
-    flowratedb = PgDatabase(config.db_info['database'], **config.cluster_info)
-    flowratedb.create()
-    flowratedb.psql('-c "CREATE LANGUAGE plpgsql"')
-    flowratedb.execute('flowrate.sql')
-    #flowratedb.execute('testing/flowrate_master_fixture.sql')
-
-def drop_databases():
-    PgDatabase(config.db_info['database'], **config.cluster_info).drop_if_exists()
-
-started = False
-def start():
-    global started
-    if not started:
-        drop_databases()
-        create_databases()
-        started = True
-
-
-if __name__ == '__main__':
-    if len(sys.argv) >= 1:
-        try:
-            # Call any function in this module named by the 1st arg.
-            # Pass the remaining args.
-            func = vars()[sys.argv[1]]
-        except KeyError:
-            pass
-        else:
-            func(*tuple(sys.argv[2:]))
-

File flowrate/testing/test.cfg

View file
+{"db_info": {
+    "host": "localhost",
+    "superuser": "postgres",
+    "template": "template1",
+    "database": "flowrate",
+    "user": "postgres",
+    "password": "",
+    "port": 5434
+    },
+ "host": "localhost",
+ "port": 8090
+}
+

File flowrate/testing/test_flowrate.py

View file
+from flowrate.testing import start, FlowrateTest
+start()
+
+
+class TestWorkflow(FlowrateTest):
+
+    def test_basic_workflow(self):
+        self.define_account(id=1012, type="asset", name="Checking")
+        self.define_account(id=3011, type="income", name="Paycheck")
+        self.define_flow(start='2012-01-01', end='2012-12-31',
+                         credit=self.base() + '/accounts/3011',
+                         debit=self.base() + '/accounts/1012',
+                         period=1, unit='months', days=[15, 28],
+                         description='External Deposit MyCompany',
+                         amount=1500)
+