Commits

Anonymous committed 2151824

[controllers,lib] fix API and aggregator

  • Participants
  • Parent commits 4126512
  • Branches mongo

Comments (0)

Files changed (8)

File solr/wdmmg_schema.xml

    <field name="id" type="string" indexed="true" stored="true" required="true" /> 
    <field name="amount" type="tfloat" indexed="true" stored="true" />
    <field name="dataset" type="string" indexed="true" stored="true" />
+   <field name="name" type="string" indexed="true" stored="true" />
    <field name="time" type="string" indexed="true" stored="true" sortMissingLast="true" omitNorms="true" />
    <field name="time_norm" type="date" indexed="true" stored="true" sortMissingLast="true" />
    <field name="location" type="text_ws" indexed="true" stored="true" />
-   <field name="from" type="text_ws" indexed="true" stored="true" />
-   <field name="to" type="text_ws" indexed="true" stored="true" />
-   <field name="notes" type="text" indexed="true" />
+   <field name="from" type="string" indexed="true" stored="true" />
+   <field name="from.title" type="textgen" indexed="true" stored="true" />
+   <field name="to" type="string" indexed="true" stored="true" />
+   <field name="to.title" type="textgen" indexed="true" stored="true" />
+   <field name="description" type="textgen" indexed="true" />
    <dynamicField name="*" type="textgen" indexed="true" stored="true"/>
    <dynamicField name="*_str" type="string" indexed="true" stored="true"/>
    <dynamicField name="*_dt" type="date" indexed="true" stored="true"/>
 
  <!-- standard text field search -->
  <copyField source="dataset"  dest="text" />
+ <copyField source="dataset.title"  dest="text" />
+ <copyField source="name"  dest="text" />
  <copyField source="amount"  dest="text" />
  <copyField source="time"  dest="text" />
  <copyField source="location"  dest="text" />
  <copyField source="location"  dest="location_facet" />
  <copyField source="from"  dest="text" />
- <copyField source="from"  dest="from_facet" />
+ <copyField source="from.title"  dest="text" />
+ <copyField source="from.title"  dest="from_facet" />
  <copyField source="to"  dest="text" />
- <copyField source="to"  dest="to_facet" />
- <copyField source="notes"  dest="text" />
+ <copyField source="to.title"  dest="text" />
+ <copyField source="to.title"  dest="to_facet" />
+ <copyField source="description"  dest="text" />
  <copyField source="*"  dest="text" />
  <copyField source="*_str"  dest="text" />
  <copyField source="*_dt"  dest="text" />

File wdmmg/config/routing.py

 
     map.connect('/api/rest', controller='rest', action='index')
     map.connect('/api/rest/dataset/{name_or_id}', controller='rest', action='dataset')
-    map.connect('/api/rest/entry/{id_}', controller='rest', action='entry')
-    map.connect('/api/rest/key/{name_or_id}', controller='rest', action='key')
-    map.connect('/api/rest/key/{name_or_id}/value/{code}', controller='rest', action='enumeration_value')
-    map.connect('/api/rest/enumeration_value/{id_}', controller='rest', action='enumeration_value_id')
+    map.connect('/api/rest/entry/{name_or_id}', controller='rest', action='entry')
 
     map.redirect('/*(url)/', '/{url}', 
         _redirect_code='301 Moved Permanently') 

File wdmmg/controllers/api.py

         # Construct query strings by hand to keep the parameters in an instructive order.
         c.aggregate_url = url(controller='api', action='aggregate') + \
             '?dataset=%s' % app_globals.default_dataset + \
-            '&include-cofog1=07&breakdown-from=yes&breakdown-region=yes'
+            '&include-cofog.1.name=07&breakdown-from=yes&breakdown-region=yes'
         c.mytax_url = url(controller='api', action='mytax') + \
             '?income=20000&spending=10000&smoker=yes&driver=yes'
         c.jsonp_url = '&'.join(c.mytax_url.split('&')[:-2] +
         if not 'sort' in solrargs:
             solrargs['sort'] = 'score desc,amount desc'
         query = app_globals.solr.raw_query(**solrargs)
+        response.content_type = 'application/json'
         return query
 
     @jsonpify
             name_or_id = aggregator_params.get('dataset')
         else:
             raise Exception, "Dataset not defined"
-        dataset_ = self.get_by_name_or_id(model.Dataset,
-            name_or_id=unicode(name_or_id))
+        dataset = model.Dataset.by_name_or_id(name_or_id)
+        assert dataset, "Dataset %s not found" % name_or_id
         # Retrieve request parameters of the form "verb-key=value"
         include, axes, per, per_time = [], [], [], []
         # Resort the dictionary (since we sorted the keys)
             if param.startswith('exclude-'):
                 pass
             elif param.startswith('include-'):
-                key = (model.Session.query(model.Key)
-                    .filter_by(name=unicode(param[8:]))
-                    ).one() # FIXME: Nicer error message needed.
-                include.append((key, value))
+                include.append((param[8:], value))
             elif param.startswith('breakdown-'):
-                key = (model.Session.query(model.Key)
-                    .filter_by(name=unicode(param[10:]))
-                    ).one() # FIXME: Nicer error message needed.
-                # FIXME: sort keys by parent values, not alphabetically!
-                axes.append(key) # Value ignored (e.g. "yes").
+                axes.append(param[10:]) # Value ignored (e.g. "yes").
                 # keys for breakdown get added to 'axes'
             elif param.startswith('per-'):
-                if value and value!='time':
-                    statistic = (model.Session.query(model.Key)
-                        .filter_by(name=unicode(param[4:]))
-                        ).one() # FIXME: Nicer error message needed.
-                    axis = (model.Session.query(model.Key)
-                        .filter_by(name=unicode(value))
-                        ).one() # FIXME: Nicer error message needed.
-                    per.append((axis, statistic))
+                if value and value != 'time':
+                    per.append((value, param[4:]))
                 else:
                     name = param[4:]
                     assert name in aggregator.time_series, value # FIXME: Nicer error message needed.
                 abort(status_code=400,
                   detail='Unknown request parameter: %s'%param)
         results = aggregator.aggregate(
-            dataset_,
+            dataset,
             include,
             axes,
         )
             results.divide_by_time_statistic(statistic_name)
         ans = {
             'metadata': {
-                'dataset': dataset_.name,
-                'include': [(k.name, v) for (k, v) in include],
+                'dataset': dataset.name,
+                'include': include,
                 'dates': [unicode(d) for d in results.dates],
-                'axes': results.axes,
-                'per': [(a.name, s.name) for a, s in per],
+                'per': per,
                 'per_time': per_time
             },
             'results': results.matrix.items(),

File wdmmg/controllers/rest.py

 class RestController(BaseController):
 
     def index(self):
-        dataset_ = model.Session.query(model.Dataset).first()
-        enumeration_value = model.Session.query(model.EnumerationValue).first()
-        key = enumeration_value.key
+        dataset_ = model.Dataset.find_one()
         c.urls = [
             url(controller='rest', action='dataset', name_or_id=dataset_.name),
             url(controller='rest', action='dataset', name_or_id=dataset_.id),
-            url(controller='rest', action='entry',
-                id_=model.Session.query(model.Entry).first().id),
-            url(controller='rest', action='key', name_or_id=key.name),
-            url(controller='rest', action='key', name_or_id=key.id),
-            url(controller='rest', action='enumeration_value',
-                name_or_id=enumeration_value.key.name, code=enumeration_value.code),
+            #url(controller='rest', action='entry', name_or_id=model.Entry.find_one().name),
+            url(controller='rest', action='entry', name_or_id=model.Entry.find_one().id),
         ]
         return render('home/rest.html')
     
     @jsonpify
     def dataset(self, name_or_id=None):
-        return self._domain_object(self.get_by_name_or_id(model.Dataset, name_or_id))
+        return self._domain_object(model.Dataset.by_name_or_id(name_or_id))
         
     @jsonpify
-    def entry(self, id_=None):
-        return self._domain_object(self.get_by_id(model.Entry, id_))
+    def entry(self, name_or_id=None):
+        return self._domain_object(model.Entry.by_name_or_id(name_or_id))
         
-    @jsonpify
-    def key(self, name_or_id=None):
-        return self._domain_object(self.get_by_name_or_id(model.Key, name_or_id))
-        
-    @jsonpify
-    def enumeration_value_id(self, id_=None):
-        '''Deprecated'''
-        return self._domain_object(self.get_by_id(model.EnumerationValue, id_))
-        
-    @jsonpify
-    def enumeration_value(self, name_or_id=None, code=None):
-        '''
-        name_or_id - a `Key.name or a `Key.id`.
-        code - an `EnumerationValue.code`.
-        '''
-        key = self.get_by_name_or_id(model.Key, name_or_id)
-        ev = (model.Session.query(model.EnumerationValue)
-            .filter_by(key=key)
-            .filter_by(code=code)
-            ).first()
-        return self._domain_object(ev)
-
     def _domain_object(self, domain_object):
         self._check_access(domain_object, READ)
-        return domain_object.to_flat_dict()
+        return domain_object.to_safe_dict()
 
     def _check_access(self, domain_object, action):
         '''

File wdmmg/lib/aggregator.py

 from datetime import datetime
 from StringIO import StringIO
 
-from sqlalchemy.orm import eagerload
-from sqlalchemy.sql.expression import and_
-
 import wdmmg.model as model
 
 # This is the deflator used in PESA. It is normalised to 2006-07 using a
         def to_float(x):
             try: return float(x)
             except ValueError: return None
-        index = dict([
-            (ev.code, to_float(ev.keyvalues[statistic]))
-            for ev in model.Session.query(model.EnumerationValue).filter_by(key=axis)
-            if statistic in ev.keyvalues
-        ])
-        if axis.name in self.axes:
-            n = self.axis_index[axis.name] # Which coordinate?
+        axis_values = model.Entry.c.distinct(axis)
+        index = [(av, model.Entry.find_one({axis: av}, {statistic: 1}).get(statistic)) \
+                for av in axis_values]
+        index = dict([(k, v) for k, v in index if v is not None])
+        if axis in self.axes:
+            n = self.axis_index[axis] # Which coordinate?
             for coordinates, amounts in self.matrix.items():
                 divisor = index.get(coordinates[n])
                 for i in range(len(self.dates)):
             (self.dates, self.axes, self.matrix)
         )
 
-def aggregate(
-    dataset_,
-    include={}, # list((Key, unicode))
-    axes=[], # list(Key)
-):
+def aggregate(dataset_, include={}, axes=[]):
     '''
     Returns the dataset `dataset_`, converted to a pivot table. Entries are
     filtered and then classified according to their properties, and summarised
     
     Returns a Results object.
     '''
-    query, params = _make_aggregate_query(
-        dataset_,
-        include,
-        axes,
-    )
-    results = list(model.Session.execute(query, params))
+    iterator = _make_aggregate_iterator(dataset_, include, axes)
+    results = list(iterator)
     dates = sorted(set([row['time'] for row in results]))
-    ans = Results(dates, [key.name for key in axes])
+    ans = Results(dates, axes)
     for row in results:
         ans._add_spending(
-            tuple([row[i] for i in range(len(axes))]),
+            tuple([row[axis] for axis in axes]),
             row['amount'],
             row['time'],
         )
     return ans
 
-def _make_aggregate_query(
-    dataset_,
-    include,
-    axes,
-):
+def _make_aggregate_iterator(dataset, include, axes,):
     '''
-    Uses string manipulation to construct the SQL query needed by
-    `aggregate()`.
+    Queries MongoDB to get all desired fields. 
+    Parameters: same as `aggregate()`.
+    '''
+    fq = dict(include)
+    fq['dataset.name'] = dataset.name
     
-    Parameters: same as `aggregate()`.
-    Returns: (string, dict) pair representing the query and its params.
-    '''
-    # N.B. Don't attempt to alchemise the raw SQL. Two reasons:
-    #  - The query is not primitive-recursive. The subselect beginning
-    #    "SELECT ev.code FROM" mentions "t.id" but does not bind it.
-    #  - It is likely to be far too slow.
-    
-    # Retrieve the 'time' Key.
-    key_time = model.Session.query(model.Key).filter_by(name=u'time').one()
-    assert key_time not in axes, '''No need to break down by Key 'time'.'''
-
-    # Compute some useful strings for each breakdown key.
-    bds = [{
-        'id': key.id, # The database `id` of the Key.
-        'param': 'ak_%d' % i, # The SQL bind parameter whose value is `id`.
-        'name': 'axis_%d' % i, # The SQL alias used for this coordinate.
-    } for i, key in enumerate(axes)]
-
-    # Compile an SQL query and its bind parameters at the same time.
-    query, params = StringIO(), {}
-
-    # Utility function that computes some useful strings for filter subselects.
-    subselect_count = [0] # Use a singleton list for a mutable up-value.
-    def subselect_params(key, value):
-        # Update counter, and choose unambiguous SQL bind parameter names.
-        n = subselect_count[0]
-        subselect_count[0] += 1
-        kv = {
-            'k_param': 'k_%d' % n, # The SQL bind parameter whose value is `key.id`.
-            'v_param': 'v_%d' % n, # The SQL bind parameter whose value is `value`.
-        }
-        params[kv['k_param']] = key.id
-        params[kv['v_param']] = value
-        return kv
-    # SELECT
-    query.write('''\
-SELECT''')
-    for bd in bds:
-        query.write('''
-    (SELECT ev.code FROM classification_item ci, enumeration_value ev
-        WHERE ev.key_id = :%(param)s
-        AND ci.entry_id = t.id AND ci.value_id = ev.id) AS %(name)s,''' % bd)
-        params[bd['param']] = bd['id']
-    query.write('''
-    SUM(t.amount) as amount,
-    (SELECT ev.code FROM classification_item ci, enumeration_value ev
-        WHERE ev.key_id = :key_time_id
-        AND ci.entry_id = t.id AND ci.value_id = ev.id) AS time''')
-    params['key_time_id'] = key_time.id
-    # FROM
-    query.write('''
-FROM "entry" t''')
-    # WHERE
-    query.write('''
-WHERE t.dataset_id = :dataset_id''')
-    params['dataset_id'] = dataset_.id
-    for key, value in include:
-        query.write('''
-AND t.id IN (SELECT ci.entry_id FROM classification_item ci, enumeration_value ev
-    WHERE ev.key_id = :%(k_param)s
-    AND ev.code = :%(v_param)s AND ci.value_id = ev.id)''' %
-            subselect_params(key, value))
-    # GROUP BY
-    query.write('''
-GROUP BY time''')
-    for bd in bds:
-        query.write(', %(name)s' % bd)
-
-    return (query.getvalue(), params)
-
+    # TODO: Do I want to handle axes at this level? 
+    aq = set(axes + ['time', 'amount'])
+    aq = dict([(a, 1) for a in aq])
+    print "FQ", fq
+    print "AQ", aq
+    return (e.to_flat_dict() for e in model.Entry.find(fq, aq))
+    

File wdmmg/lib/base.py

 
 import wdmmg
 from wdmmg import model
-from wdmmg.model import meta
 
 def render(template_name, extra_vars=None, cache_key=None, 
            cache_type=None, cache_expire=None, method='xhtml'):
         try:
             return WSGIController.__call__(self, environ, start_response)
         finally:
-            meta.Session.remove()
+            pass
     
     def __before__(self, action, **params):
         c.q = ''
         c.items_per_page = int(request.params.get('items_per_page', 20))
 
-    def get_by_id(self, domain_class, id_):
-        ans = model.Session.query(domain_class).get(id_)
-        if not ans:
-            abort(404, 'No record with id %r'%id_)
-        return ans
-    
-    def get_by_name_or_id(self, domain_class, name_or_id):
-        ans = (model.Session.query(domain_class)
-            .filter_by(name=name_or_id)
-            ).first()
-        if not ans:
-            ans = model.Session.query(domain_class).get(name_or_id)
-        if not ans:
-            abort(404, 'No record with name or id %r' % name_or_id)
-        return ans
-

File wdmmg/lib/cli.py

         check_page('/key', 'keys')
         check_page('/key/time', 'time')
         check_page('/api/rest/dataset/cra', 'cra')
-        check_page('/api/rest/key/cofog1', 'Health')
-        check_page('/api/rest/key/cofog1/value/05', 'The breakdown of environmental protection')
-        check_page('/api/rest/key/cofog1/value/05', 'The breakdown of environmental protection')
         check_page('/search?q=cra', 'results')
         check_page('/api/search?q=cra', 'numFound')
                             

File wdmmg/model/mongo.py

             fq['context'] = self.context
         keys = KeyMeta.find(fq)
         return dict([(e.key, e) for e in keys])
-        
+    
+    def to_safe_dict(self):
+        """ Replace any non-standard types. """
+        def _safen(orig):
+            safe = {}
+            for k, v in orig.items():
+                if isinstance(v, DBRef):
+                    v = v.as_doc().to_dict()
+                if isinstance(v, ObjectId):
+                    v = str(v)
+                if isinstance(v, dict):
+                    v = _safen(v)
+                safe[k] = v
+            return safe
+        return _safen(self)
+    
     def to_flat_dict(self, sep='.'):
         """ Flatten down a dictionary with some smartness. """
         def _flatten(orig):
             flat = {}
             for k, v in orig.items():
-                if isinstance(v, DBRef):
-                    v = v.as_doc().to_dict()
                 k = k.replace('$', '')
                 if k == '_id': 
                     k = 'id'
                 else:
                     flat[k] = v
             return flat
-        return _flatten(self)
+        return _flatten(self.to_safe_dict())
     
 
 class KeyMeta(Base):