1. Rufus Pollock
  2. wdmmg

Commits

pudo  committed a6be703

[tests,controllers,templates,lib] get a reasonable part of the tests to pass again

  • Participants
  • Parent commits af56315
  • Branches mongo

Comments (0)

Files changed (23)

File wdmmg/config/routing.py

View file
     map.connect('/aggregate', controller='aggregate', action='view')
 
     map.connect('/dataset', controller='dataset', action='index')
-    map.connect('/dataset/{name_or_id}', controller='dataset', action='view')
-    map.connect('/dataset/{name_or_id}/{action}', controller='dataset')
+    map.connect('/dataset/{id}', controller='dataset', action='view')
+    map.connect('/dataset/{id}/{action}', controller='dataset')
     
     map.connect('/entity', controller='entity', action='index')
-    map.connect('/entity/{name_or_id}', controller='entity', action='view')
-    map.connect('/entity/{name_or_id}/{action}', controller='entity')
+    map.connect('/entity/{id}', controller='entity', action='view')
+    map.connect('/entity/{id}/{action}', controller='entity')
     
     map.connect('/classifier', controller='classifier', action='index')
-    map.connect('/classifier/{name_or_id}', controller='classifier', action='view')
-    map.connect('/classifier/{name_or_id}/{action}', controller='classifier')
+    map.connect('/classifier/{id}', controller='classifier', action='view')
+    map.connect('/classifier/{id}/{action}', controller='classifier')
     
     map.connect('/entry', controller='entry', action='index')
-    map.connect('/entry/{id_}', controller='entry', action='view')
-    map.connect('/entry/{id_}/{action}', controller='entry')
+    map.connect('/entry/{id}', controller='entry', action='view')
+    map.connect('/entry/{id}/{action}', controller='entry')
 
     map.connect('/api', controller='api', action='index')
     map.connect('/api/search', controller='api', action='search')
     map.connect('/api/mytax', controller='api', action='mytax')
 
     map.connect('/api/rest', controller='rest', action='index')
-    map.connect('/api/rest/dataset/{name_or_id}', controller='rest', action='dataset')
-    map.connect('/api/rest/entry/{name_or_id}', controller='rest', action='entry')
+    map.connect('/api/rest/dataset/{id}', controller='rest', action='dataset')
+    map.connect('/api/rest/entry/{id}', controller='rest', action='entry')
 
     map.redirect('/*(url)/', '/{url}', 
         _redirect_code='301 Moved Permanently') 

File wdmmg/controllers/aggregate.py

View file
     def view(self):
         abort(404)
         # Read request parameters.
-        dataset_ = self.get_by_name_or_id(model.Dataset,
-            name_or_id=unicode(request.params.get('dataset', g.default_dataset)))
+        dataset_ = self.get_by_id(model.Dataset,
+            id=unicode(request.params.get('dataset', g.default_dataset)))
         c.filters = {}
         for param, value in request.params.items():
             if param.startswith('include-'):
-                key = self.get_by_name_or_id(model.Key, name_or_id=unicode(param[8:]))
+                key = self.get_by_id(model.Key, id=unicode(param[8:]))
                 c.filters[key] = value
         c.axis = request.params.get('breakdown')
         if c.axis:
-            c.axis = self.get_by_name_or_id(model.Key, name_or_id=c.axis)
+            c.axis = self.get_by_id(model.Key, id=c.axis)
         # Do the aggregation.
         c.results = aggregator.aggregate(
             dataset_=dataset_,

File wdmmg/controllers/api.py

View file
     def _aggregate(self, keys, values):
         aggregator_params = dict(zip(keys,values))
         if aggregator_params.get('slice'):
-            name_or_id = aggregator_params.get('slice')
+            id = aggregator_params.get('slice')
         elif aggregator_params.get('dataset'):
-            name_or_id = aggregator_params.get('dataset')
+            id = aggregator_params.get('dataset')
         else:
             raise Exception, "Dataset not defined"
-        dataset = model.Dataset.by_name_or_id(name_or_id)
-        assert dataset, "Dataset %s not found" % name_or_id
+        dataset = model.Dataset.by_id(id)
+        assert dataset, "Dataset %s not found" % id
         # Retrieve request parameters of the form "verb-key=value"
         include, axes, per, per_time = [], [], [], []
         # Resort the dictionary (since we sorted the keys)
                 'dataset': dataset.name,
                 'include': include,
                 'dates': [unicode(d) for d in results.dates],
+                'axes': results.axes,
                 'per': per,
                 'per_time': per_time
             },

File wdmmg/controllers/classifier.py

View file
     def index(self):
         return 'Not implemented'
 
-    def view(self, name_or_id):
+    def view(self, id):
         return 'Not implemented'

File wdmmg/controllers/dataset.py

View file
         c.results = model.Dataset.find()[:c.limit]
         return render('dataset/index.html')
 
-    def view(self, name_or_id=None):
-        c.row = model.Dataset.by_name_or_id(name_or_id)
+    def view(self, id=None):
+        c.row = model.Dataset.by_id(id)
         c.num_entries = model.Entry.find({'dataset.name': c.row.name}).count()
         return render('dataset/view.html')
 
-    def entries(self, name_or_id=None):
-        c.dataset_ = self.get_by_name_or_id(model.Dataset, name_or_id)
+    def entries(self, id=None):
+        c.dataset_ = self.get_by_id(model.Dataset, id)
         query = model.Session.query(model.Entry).filter_by(dataset_=c.dataset_)
         c.page = Page(
             collection=query,

File wdmmg/controllers/entity.py

View file
     def index(self):
         return 'Not implemented'
 
-    def view(self, name_or_id):
+    def view(self, id):
         return 'Not implemented'

File wdmmg/controllers/entry.py

View file
         )
         return render('entry/index.html')
 
-    def view(self, id_=None):
-        c.row = model.Entry.by_name_or_id(id_)
+    def view(self, id=None):
+        c.row = model.Entry.by_id(id)
         if not c.row:
-            abort(404, 'Sorry, there is no entry with code %r'% id_)
+            abort(404, 'Sorry, there is no entry with code %r'% id)
     
         c.desc = c.row.explain()
         c.id = c.row.get('_id')

File wdmmg/controllers/rest.py

View file
     def index(self):
         dataset_ = model.Dataset.find_one()
         c.urls = [
-            url(controller='rest', action='dataset', name_or_id=dataset_.name),
-            url(controller='rest', action='dataset', name_or_id=dataset_.id),
-            #url(controller='rest', action='entry', name_or_id=model.Entry.find_one().name),
-            url(controller='rest', action='entry', name_or_id=model.Entry.find_one().id),
+            url(controller='rest', action='dataset', id=dataset_.name),
+            url(controller='rest', action='dataset', id=dataset_.id),
+            #url(controller='rest', action='entry', id=model.Entry.find_one().name),
+            url(controller='rest', action='entry', id=model.Entry.find_one().id),
         ]
         return render('home/rest.html')
     
     @jsonpify
-    def dataset(self, name_or_id=None):
-        return self._domain_object(model.Dataset.by_name_or_id(name_or_id))
+    def dataset(self, id=None):
+        return self._domain_object(model.Dataset.by_id(id))
         
     @jsonpify
-    def entry(self, name_or_id=None):
-        return self._domain_object(model.Entry.by_name_or_id(name_or_id))
+    def entry(self, id=None):
+        return self._domain_object(model.Entry.by_id(id))
         
     def _domain_object(self, domain_object):
         self._check_access(domain_object, READ)

File wdmmg/lib/solrhelp.py

View file
 'to', 'notes' ]
 
 
+def extend_entry(entry):
+    entry = entry.to_flat_dict()
+    if 'year' in entry and 'time' not in entry:
+        entry['time'] = entry['year']
+    if 'time' in entry:
+        try: # ignore bad dates ... (ugly)
+            ourdt = swiss.date.parse(entry['time']).as_datetime()
+            # add Z as solr requires this
+            entry['time_norm'] = ourdt.isoformat() + 'Z'
+        except: pass
+    return entry
+
+
 def build_index(dataset_name=None, solr=None):
     dataset_name = dataset_name or unicode(config.get('default_dataset', u'cra'))
     solr = solr or solr_connection()
     total = 0
     increment = 500
     for entry in cur:
-        ourdata = entry.to_flat_dict()
+        ourdata = extend_entry(entry)
         buf.append(ourdata)
         if len(buf) == increment:
             print 'Writing %d records...' % len(buf)

File wdmmg/model/mongo.py

View file
         return cls.c.find_one(*a, **kw)
     
     @classmethod
-    def by_name_or_id(cls, name_or_id):
-        fl = [{'name': name_or_id}]
+    def by_id(cls, id):
+        fl = [{'name': id}, {'_id': id}]
         try:
-            fl.append({'_id': ObjectId(name_or_id)})
+            fl.append({'_id': ObjectId(id)})
         except: pass
         return cls.find_one({'$or': fl})
     
                 if isinstance(v, dict):
                     if 'name' in v.keys():
                         flat[k] = v['name']
-                        del v['name']
+                        #del v['name']
                     for sk, sv in _flatten(v).items():
                         flat[k + sep + sk] = sv
                 else:

File wdmmg/templates/_util.html

View file
         templating issues -->
 
   <div py:def="render_key(key)" class="key-desc" style="display: inline;">
-    <a href="${url(controller='key', action='view', name_or_id=key.name)}">${key.name}</a>
+    <a href="${url(controller='key', action='view', id=key.name)}">${key.name}</a>
     <py:if test="key.notes">
     &ndash; 
     ${' '.join(key.notes.split()[:20]) + ' ...'}
   <div py:def="render_entries(entries)">
     <ul class="entry-list">
       <li py:for="row in entries">
-        <a href="${url(controller='entry', action='view', id_=row.get('id'))}">
+        <a href="${url(controller='entry', action='view', id=row.get('id'))}">
           <span class="time">${row.get('time')}</span>: <span class="from">${row.get('from.label')}</span> spent
           <span class="amount">${h.format_number(row.get('amount'))}</span>
           with
         <div class="summary">
           ${row.get('description', '')}
         </div>
-        <a href="${url(controller='entry', action='view', id_=row.get('id'))}">More info &raquo;</a>
+        <a href="${url(controller='entry', action='view', id=row.get('id'))}">More info &raquo;</a>
       </li>
     </ul>
   </div>
     <ul class="entry-list">
       <li py:for="row in entries" py:with="from_=row.get('from.label', ''); from_code=row.get('from.id', ''); to_code=row.get('to.id', '');  time=row.get('time', ''); amount=row.get('amount', 0); to_=row.get('to.label', 'General Population');">
           <span class="time">${time}</span>: 
-          <a href="	${url(controller='entity', action='view', name_or_id=from_code)}">
+          <a href="	${url(controller='entity', action='view', id=from_code)}">
             <span class="from">${from_}</span>
           </a> spent
           <span class="amount">${h.format_number(amount)}</span>
           with
-          <a href="	${url(controller='entity', action='view', name_or_id=to_code)}">
+          <a href="	${url(controller='entity', action='view', id=to_code)}">
               <span class="to">${to_}</span></a>.
           <br/>
-          <a href="${url(controller='entry', action='view', id_=row.get('id', 1))}">See full entry &raquo;</a>
+          <a href="${url(controller='entry', action='view', id=row.get('id', 1))}">See full entry &raquo;</a>
       </li>
     </ul>
   </div>

File wdmmg/templates/dataset/index.html

View file
     <p>The database contains the following datasets:</p>
     <ul>
       <li py:for="row in c.results">
-        <a href="${url(controller='dataset', action='view', name_or_id=row.name)}">${row.label if row.label else row.name}</a>: ${h.markdown(row.description)}
+        <a href="${url(controller='dataset', action='view', id=row.name)}">${row.label if row.label else row.name}</a>: ${h.markdown(row.description)}
       </li>
     </ul>
     <p py:if="c.results.count()>=c.limit">Only the first ${c.limit} results are shown.</p>

File wdmmg/templates/entry/view.html

View file
 		</py:choose>
       </a></dd>
       <dt>To:</dt>
-      <dd><a href="${url(controller='entity', action='view', name_or_id=c.to.get('_id'))}">${c.to.get('label')}</a></dd> 
+      <dd><a href="${url(controller='entity', action='view', id=c.to.get('_id'))}">${c.to.get('label')}</a></dd> 
       <dt>From:</dt>
-      <dd><a href="${url(controller='entity', action='view', name_or_id=c.from_.get('_id'))}">
+      <dd><a href="${url(controller='entity', action='view', id=c.from_.get('_id'))}">
         ${c.from_.get('label')}
       </a></dd> 
       

File wdmmg/tests/functional/test_api.py

View file
         out = json.loads(str(response.body))['response']
         assert out['numFound'] == 7, out['numFound']
         exp_entity = 'Department for Children, Schools and Families'
-        assert out['docs'][0]['from'] == exp_entity, out['docs'][0]
+        assert out['docs'][0]['from.label'] == exp_entity, out['docs'][0]
 
     def test_search_03_jsonpify(self):
         callback = 'mycallback'

File wdmmg/tests/functional/test_dataset.py

View file
         assert 'cra' in response
 
     def test_view(self):
-        response = self.app.get(url(controller='dataset', action='view', name_or_id='cra'))
-        assert '''Properties of dataset 'cra':''' in response
+        response = self.app.get(url(controller='dataset', action='view', id='cra'))
+        assert '''Country Regional Analysis''' in response
         assert '''Number of entries:''' in response
 

File wdmmg/tests/functional/test_entry.py

View file
         assert '36 entries' in response, response
 
     def test_view(self):
-        t = model.Session.query(model.Entry).first()
-        response = self.app.get(url(controller='entry', action='view', id_=t.id))
+        t = model.Entry.find_one()
+        response = self.app.get(url(controller='entry', action='view', 
+            id=str(t.id)))
         assert '''cra''' in response
 
     def test_foi(self):
-        t = model.Session.query(model.Entry).first()
-        response = self.app.get(url(controller='entry', action='view', id_=t.id))
+        t = model.Entry.find_one()
+        response = self.app.get(url(controller='entry', action='view', 
+            id=str(t.id)))
         # For now, just check we AREN'T showing the FOI screen on CRA pages.
         # TODO: more detailed testing.
         assert not '''Make an FOI request''' in response

File wdmmg/tests/functional/test_rest.py

View file
     
     def test_index(self):
         response = self.app.get(url(controller='rest', action='index'))
-        for word in ['dataset', 'entry', 'key', 'value']:
+        for word in ['dataset', 'entry']:
             assert word in response, response
     
     def test_dataset(self):
         response = self.app.get(url(controller='rest', action='dataset',
-            name_or_id=Fixtures.dataset_.name))
-        assert '"id":' in response, response
+            id=Fixtures.dataset_.name))
+        assert '"_id":' in response, response
         assert '"name": "cra"' in response, response
     
     def test_entry(self):
-        example = (model.Session.query(model.Entry)
-            .filter_by(dataset_=Fixtures.dataset_)
-            ).first()
+        example = model.Entry.find_one({'dataset.name': Fixtures.dataset_.name})
         response = self.app.get(url(controller='rest', action='entry',
-            id_=example.id))
-        assert '"id":' in response, response
-        assert '"cofog1":' in response, response
-        assert '"from_code":' in response, response
+            id=str(example.id)))
+        assert '"_id":' in response, response
+        assert '"cofog":' in response, response
+        assert '"from":' in response, response
         assert '"Dept032"' in response, response
 
-    def test_key(self):
-        response = self.app.get(url(controller='rest', action='key',
-            name_or_id=Fixtures.cofog2.name))
-        assert '"id":' in response, response
-        assert '"enumeration_values":' in response, response
-        assert '"04.5":' in response, response
-        assert '"parent":' in response, response
-
-    def test_enumeration_value(self):
-        example = (model.Session.query(model.EnumerationValue)
-            .filter_by(key=Fixtures.region)
-            ).first()
-        response = self.app.get(url(controller='rest', action='enumeration_value',
-            name_or_id=Fixtures.region.name, code=example.code))
-        assert '"id":' in response, response
-        assert '"code":' in response, response
-        assert '"population2006":' in response, response
-

File wdmmg/tests/test_aggregator.py

View file
     def test_aggregate(self):
         ans = aggregator.aggregate(
             Fixtures.dataset_,
-            include=[(Fixtures.region, u'ENGLAND_South West')],
+            include=[('region', u'ENGLAND_South West')],
             axes=[
-                Fixtures.cap_or_cur, Fixtures.cofog1,
+                'cap_or_cur', 'cofog.1',
                 # Omit Fixtures.pog, Fixtures.region,
             ])
         print ans
         assert ans.dates == [u'2003', u'2004', u'2005',
             u'2006', u'2007', u'2008', u'2009',
             u'2010'], ans.dates
-        assert ans.axes == [u'cap_or_cur', u'cofog1'], ans.axes
+        assert ans.axes == [u'cap_or_cur', u'cofog.1'], ans.axes
         index = dict([(coords, sum(amount)) for (coords, amount) in ans.matrix.items()])
         for k, v in index.items():
             print k, v
             # Tolerate rounding errors.
             assert abs(index[coords] - amount) < 1, (coords, amount)
 
-    def test_make_aggregate_query(self):
-        query, params = aggregator._make_aggregate_query(
-            Fixtures.dataset_,
-            include=[(Fixtures.region, u'ENGLAND_South West')],
-            axes=[Fixtures.cap_or_cur, Fixtures.cofog1],
-        )
-        print query
-        print params
-        assert query == '''\
-SELECT
-    (SELECT ev.code FROM classification_item ci, enumeration_value ev
-        WHERE ev.key_id = :ak_0
-        AND ci.entry_id = t.id AND ci.value_id = ev.id) AS axis_0,
-    (SELECT ev.code FROM classification_item ci, enumeration_value ev
-        WHERE ev.key_id = :ak_1
-        AND ci.entry_id = t.id AND ci.value_id = ev.id) AS axis_1,
-    SUM(t.amount) as amount,
-    (SELECT ev.code FROM classification_item ci, enumeration_value ev
-        WHERE ev.key_id = :key_time_id
-        AND ci.entry_id = t.id AND ci.value_id = ev.id) AS time
-FROM "entry" t
-WHERE t.dataset_id = :dataset_id
-AND t.id IN (SELECT ci.entry_id FROM classification_item ci, enumeration_value ev
-    WHERE ev.key_id = :k_0
-    AND ev.code = :v_0 AND ci.value_id = ev.id)
-GROUP BY time, axis_0, axis_1'''
-
     def test_aggregate_per(self):
         ans = aggregator.aggregate(
             Fixtures.dataset_,
-            axes=[Fixtures.cofog1, Fixtures.region],
+            axes=['cofog.1.name', 'region'],
         )
         print ans
         ans.divide_by_statistic('region', 'population2006')

File wdmmg/tests/test_cofog.py

View file
 
 class TestCOFOGLoader(object):
     def test_cofog(self):
-        model.repo.delete_all()
-        model.Session.remove()
+        model.mongo.drop_db()
         cofog.load_file(pkg_resources.resource_stream('wdmmg', 'tests/COFOG_english_structure_short.txt'))
-        codes = model.Session.query(model.EnumerationValue.code).all()
-        assert len(codes)==12, codes
-        model.Session.rollback()
+        codes = model.Classifier.find()
+        assert codes.count()==12, codes
 

File wdmmg/tests/test_cra.py

View file
 import wdmmg.model as model
 from wdmmgext.load import cofog
 from wdmmgext.load import cra
+from wdmmgext.load import util
 
 class TestCRA(object):
     @classmethod
     def setup_class(self):
-        model.repo.delete_all()
-        model.Session.remove()
+        model.mongo.drop_db()
         cofog.load_file(pkg_resources.resource_stream('wdmmg', 'tests/COFOG_english_structure_short.txt'))
-        cofog_mapper = cra.CofogMapper(json.load(pkg_resources.resource_stream('wdmmg', 'tests/cofog_map_short.json')))
+        cofog_mapper = util.CofogMapper(json.load(pkg_resources.resource_stream('wdmmg', 'tests/cofog_map_short.json')))
         fileobj = pkg_resources.resource_stream('wdmmg', 'tests/cra_2009_db_short.csv')
         cra.load_file(fileobj, cofog_mapper, commit_every=10)
-        model.Session.commit()
-        model.Session.remove()
-
+        
     @classmethod
     def teardown_class(self):
-        model.repo.delete_all()
-        model.Session.commit()
-        model.Session.remove()
+        model.mongo.drop_db()
 
 	# check dataset exists
     def test_01_dataset(self):
-        out = (model.Session.query(model.Dataset)
-            .filter_by(name=u'cra')
-            ).first()
+        out = model.Dataset.find_one({'name': 'cra'})
         assert out, out
 
 	# get all our keys, and check values exist for them
     # APS: why don't we check for the key 'to' at this point?   
     def test_02_classification(self):
+        entry = model.Entry.find_one({'dataset.name': 'cra'})
         for key_name in [u'from', u'time', u'pog', u'cap_or_cur', u'region',
-                u'cofog1', u'cofog2', u'cofog3']:
-            key = model.Session.query(model.Key).filter_by(name=key_name).first()
-            assert key, key_name
-            count = (model.Session.query(model.ClassificationItem)
-                .join(model.EnumerationValue)
-                .filter_by(key=key)
-                ).count()
-            assert count, (key_name, count)
+                u'cofog']:
+            assert key_name in entry.keys(), "Key %s is missing" % key_name
 
     # check there are some entries and none of them are null
     def test_03_entry(self):
-        dataset_ = (model.Session.query(model.Dataset)
-            .filter_by(name=u'cra')
-            ).one()
-        count = (model.Session.query(model.Entry)
-            .filter_by(dataset_=dataset_)
-            ).count()
+        count = model.Entry.find({'dataset.name': 'cra'}).count()
         assert count, 'There are no Entries'
-        assert not (model.Session.query(model.Entry)
-            .filter_by(dataset_=dataset_)
-            .filter_by(amount=None)
-            ).first(), 'Some Entries have NULL amounts'
+        assert not model.Entry.find({'dataset.name': 'cra', 
+                                'amount': None}).count(), 'Some Entries have NULL amounts'
     
     # look for a 'to' field on the first entry in the dataset
     def test_04_entry_to(self):
-        dataset_ = (model.Session.query(model.Dataset)
-            .filter_by(name=u'cra')
-            ).one()
-        txn = (model.Session.query(model.Entry)
-            .filter_by(dataset_=dataset_)
-            ).first()
-        classif = txn.classification_as_dict()
-        assert classif['to'] == u'society'
+        entry = model.Entry.find_one({'dataset.name': 'cra'})
+        assert entry['to']['name'] == u'society'
 

File wdmmg/tests/test_cra_2010.py

View file
 from wdmmgext.load import cofog
 from wdmmgext.load import cra2010
 import os
+from wdmmgext.load import util
 
 class TestCRA(object):
     @classmethod
     def setup_class(self):
-        model.repo.delete_all()
-        model.Session.remove()
+        model.mongo.drop_db()
         cofog.load_file(pkg_resources.resource_stream('wdmmg', 'tests/COFOG_english_structure_short_2010.txt'))
-        cofog_mapper = cra2010.CofogMapper(json.load(pkg_resources.resource_stream('wdmmg', 'tests/cofog_map_short_2010.json')))
+        cofog_mapper = util.CofogMapper(json.load(pkg_resources.resource_stream('wdmmg', 'tests/cofog_map_short_2010.json')))
         filename = os.path.abspath('wdmmg/tests/cra_2010_db_short.csv')
-        cra2010.load_file(filename, cofog_mapper, commit_every=10)
-        model.Session.commit()
-        model.Session.remove()
-
+        cra2010.load_file(filename, cofog_mapper)
+        
     @classmethod
     def teardown_class(self):
-        model.repo.delete_all()
-        model.Session.commit()
-        model.Session.remove()
+        model.mongo.drop_db()
 
 	# check dataset exists
     def test_01_dataset(self):
-        out = (model.Session.query(model.Dataset)
-            .filter_by(name=u'cra')
-            ).first()
+        out = model.Dataset.find_one({'name': 'cra2010'})
         assert out, out
 
 	# get all our keys, and check values exist for them
     # APS: why don't we check for the key 'to' at this point?   
     def test_02_classification(self):
+        entry = model.Entry.find_one({'dataset.name': 'cra2010'})
         for key_name in [u'from', u'time', u'pog', u'cap_or_cur', u'region',
-                u'cofog1', u'cofog2']:
-            key = model.Session.query(model.Key).filter_by(name=key_name).first()
-            assert key, key_name
-            count = (model.Session.query(model.ClassificationItem)
-                .join(model.EnumerationValue)
-                .filter_by(key=key)
-                ).count()
-            assert count, (key_name, count)
+                u'cofog', u'cg_lg_or_pc']:
+            assert key_name in entry.keys(), "Key %s is missing" % key_name
 
     # check there are some entries and none of them are null
     def test_03_entry(self):
-        dataset_ = (model.Session.query(model.Dataset)
-            .filter_by(name=u'cra')
-            ).one()
-        count = (model.Session.query(model.Entry)
-            .filter_by(dataset_=dataset_)
-            ).count()
+        count = model.Entry.find({'dataset.name': 'cra2010'}).count()
         assert count, 'There are no Entries'
-        assert not (model.Session.query(model.Entry)
-            .filter_by(dataset_=dataset_)
-            .filter_by(amount=None)
-            ).first(), 'Some Entries have NULL amounts'
+        assert not model.Entry.find({'dataset.name': 'cra2010', 
+                                'amount': None}).count(), 'Some Entries have NULL amounts'
     
     # look for a 'to' field on the first entry in the dataset
     def test_04_entry_to(self):
-        dataset_ = (model.Session.query(model.Dataset)
-            .filter_by(name=u'cra')
-            ).one()
-        txn = (model.Session.query(model.Entry)
-            .filter_by(dataset_=dataset_)
-            ).first()
-        classif = txn.classification_as_dict()
-        assert classif['to'] == u'society'
+        entry = model.Entry.find_one({'dataset.name': 'cra2010'})
+        assert entry['to']['name'] == u'society'
 
+

File wdmmg/tests/test_loader.py

View file
     (u'green', u'pungent'): 30.0,
 }
 
-class TestValueCache(object):
-    @classmethod
-    def setup_class(self):
-        pass
-
-    @classmethod
-    def teardown_class(self):
-        # Clean up.
-        model.repo.delete_all()
-        model.Session.remove()
-
-    def test_value_cache(self):
-        print 'test_value_cache'
-        key_colour = model.Key(name=u'colour')
-        model.Session.add(key_colour)
-        model.Session.commit()
-        cache_colour = loader.ValueCache(key_colour) 
-        model.Session.commit()
-        model.Session.remove()
-        ev_id = cache_colour.get_value_id(u'red') 
-        print ev_id
-        ev = model.Session.query(model.EnumerationValue).get(ev_id)
-        print ev, ev.id, ev.key_id, ev.key
-        model.Session.commit()
-        model.Session.remove()
-        assert ev_id == cache_colour.get_value_id(u'red')
-        assert ev_id != cache_colour.get_value_id(u'green')
-        model.Session.commit()
-        model.Session.remove()
-
 class TestLoader(object):
     @classmethod
     def setup_class(self):
         print 'setting up TestLoader'
         # Load an example data set.
         my_loader = loader.Loader(u'test', 'Test Dataset')
+        for (col, smell), a in test_data.items():
+            my_loader.create_entry(a, colour=col, smell=smell)
 
     @classmethod
     def teardown_class(self):
     def test_loader(self):
         print 'test_loader'
         # Read it back and see what we've got.
-        entries = (model.Session.query(model.Entry)
-#            .filter(dataset_=u'test')
-            ).all()
-        assert len(entries) == 5, len(entries)
+        entries = model.Entry.find()
+        assert entries.count() == 5, entries.count()
         pairs = set()
         for t in entries:
-            cis = (model.Session.query(model.ClassificationItem)
-                .filter_by(entry=t)
-                ).all()
-            cis_as_dict = dict([(ci.value.key.name, ci.value.code) for ci in cis])
-            pairs.add((cis_as_dict.get('colour'), cis_as_dict.get('smell')))
-        assert pairs == set(test_data.keys())
+            pairs.add((t.get('colour'), t.get('smell')))
+        assert pairs == set(test_data.keys()), pairs
 

File wdmmg/tests/test_models.py

-from datetime import datetime
-
-import wdmmg.model as model
-
-class TestORM(object):
-    @classmethod
-    def setup_class(self):
-        self.dataset_ = u'test'
-        self.accsrc = u'acc1'
-        self.accdst = u'acc2'
-        self.amount = 47865
-        dataset_ = model.Dataset(name=self.dataset_)
-        key_account = model.Key(name=u'account')
-        acc_src = model.EnumerationValue(key=key_account, code=self.accsrc)
-        acc_dst = model.EnumerationValue(key=key_account, code=self.accdst)
-        txn = model.Entry(
-            dataset_=dataset_,
-            amount=self.amount)
-        # TODO: Create some ClassificationItems.
-        model.Session.add_all([dataset_, acc_src, acc_dst, txn])
-        model.Session.commit()
-        model.Session.remove()
-
-    @classmethod
-    def teardown_class(self):
-        model.repo.delete_all()
-
-    def test_01(self):
-        dataset_ = model.Session.query(model.Dataset).filter_by(name=self.dataset_).one()
-        txn = (model.Session.query(model.Entry)
-            .filter_by(dataset_=dataset_)
-            .filter_by(amount=self.amount)).one()
-        assert txn
-        # TODO: Test ClassificationItems can be retrieved.
-
-    # TODO: Factor this into multiple tests?
-    def test_02(self):
-        dataset_ = model.Session.query(model.Dataset).filter_by(name=self.dataset_).one()
-        acc_src = (model.Session.query(model.EnumerationValue)
-            .filter_by(code=self.accsrc)).one()
-        acc_dst = (model.Session.query(model.EnumerationValue)
-            .filter_by(code=self.accdst)).one()
-        region = model.Key(name=u'region', notes=u'Area for which money was spent')
-        pog = model.Key(name=u'pog', notes=u'Programme Object Group')
-        randomkey = model.Key(name=u'randomkey')
-
-        northwest = model.EnumerationValue(code=u'Northwest', name=u'North west', key=region)
-        northeast = model.EnumerationValue(code=u'Northeast', name=u'North east', key=region)
-        pog1 = model.EnumerationValue(code=u'surestart', name=u'Sure start', key=pog)
-        pog2 = model.EnumerationValue(code=u'surestart2', name=u'Another start', key=pog)
-
-        kv1 = model.KeyValue(ns=u'enumeration_value', ns_enumerationvalue=acc_src, key=region,
-                value=northwest.code)
-        acc_src.keyvalues[region] = northeast.code # This should overwrite the KeyValue explicitly constructed above.
-        acc_src.keyvalues[randomkey]= u'annakarenina' # This should create a new KeyValue.
-
-        kv2 = model.KeyValue(ns=u'enumeration_value', ns_enumerationvalue=acc_dst, key=randomkey,
-                value=u'orangesarenottheonlyfruit') # This one should not get overwritten.
-        acc_dst.keyvalues[region] = northwest.code # This should create a new KeyValue.
-
-        model.Session.add_all([region, pog, randomkey, kv1, kv2])
-        model.Session.commit()
-        model.Session.remove()
-        del acc_src, acc_dst, region, pog, randomkey, kv1, kv2
-        
-        # Read it all back again.
-
-        pog = model.Session.query(model.Key).filter_by(name=u'pog').one()
-        assert pog.notes.startswith(u'Programme')
-        assert len(pog.enumeration_values) == 2, pog
-
-        region = model.Session.query(model.Key).filter_by(name=u'region').one()
-        assert region
-        region_kvs = model.Session.query(model.KeyValue).filter_by(key=region).all()
-        assert len(region_kvs) == 2, region_kvs
-        
-        randomkey = model.Session.query(model.Key).filter_by(name=u'randomkey').one()
-        assert randomkey
-
-        acc_src = model.Session.query(model.EnumerationValue).filter_by(code=self.accsrc).one()
-        assert acc_src
-        acc_dst = model.Session.query(model.EnumerationValue).filter_by(code=self.accdst).one()
-        assert acc_dst
-        
-        northeast = model.Session.query(model.EnumerationValue).filter_by(key=region).filter_by(code=u'Northeast').one()
-        assert northeast.key == region, northest.key
-        assert northeast.code == u'Northeast', northeast.code
-        
-        acc_src_region_kv = (model.Session.query(model.KeyValue)
-            .filter_by(ns_enumerationvalue=acc_src)
-            .filter_by(key=region)
-            ).one()
-        assert acc_src_region_kv.value == u'Northeast', acc_src_region_kv.value
-        assert acc_src_region_kv.enumeration_value == northeast, acc_src_region_kv.enumeration_value
-        assert acc_src._keyvalues[region] == acc_src_region_kv
-        assert acc_src.keyvalues[region] == u'Northeast'
-        assert acc_src.keyvalues[randomkey] == u'annakarenina'
-        
-        assert acc_dst.keyvalues[region] == u'Northwest'
-        assert acc_dst.keyvalues[randomkey] == u'orangesarenottheonlyfruit'
-        
-        # Test cascading deletion of KeyValues when removed from objects.
-        print [str(x) for x in model.Session.query(model.KeyValue).all()]
-        before_count = model.Session.query(model.KeyValue).count()
-        acc_src.keyvalues.clear()
-        model.Session.commit()
-        model.Session.remove()
-        
-        acc_src = (model.Session.query(model.EnumerationValue)
-            .filter_by(code=self.accsrc)
-            ).one()
-        assert not acc_src.keyvalues
-        print [str(x) for x in model.Session.query(model.KeyValue).all()]
-        after_count = model.Session.query(model.KeyValue).count()
-        assert before_count > after_count, (before_count, after_count)
-        
-        # Test cascading deletion of EnumerationValues when Keys are deleted.
-        print [str(x) for x in model.Session.query(model.EnumerationValue).all()]
-        before_count = model.Session.query(model.EnumerationValue).count()
-        region = model.Session.query(model.Key).filter_by(name=u'region').one()
-        print 'region =', region
-        model.Session.delete(region)
-        model.Session.commit()
-        model.Session.remove()
-
-        print [str(x) for x in model.Session.query(model.EnumerationValue).all()]
-        after_count = model.Session.query(model.EnumerationValue).count()
-        assert before_count > after_count, (before_count, after_count)
-        
-        # Test cascading deletion of KeyValues when Keys are deleted.
-        print [str(x) for x in model.Session.query(model.KeyValue).all()]
-        before_count = model.Session.query(model.KeyValue).count()
-        randomkey = model.Session.query(model.Key).filter_by(name=u'randomkey').one()
-        print 'randomkey =', randomkey
-        model.Session.delete(randomkey)
-        model.Session.commit()
-        model.Session.remove()
-
-        print [str(x) for x in model.Session.query(model.KeyValue).all()]
-        after_count = model.Session.query(model.KeyValue).count()
-        assert before_count > after_count, (before_count, after_count)
-