Source

wdmmg / wdmmg / lib / aggregator.py

from datetime import datetime
from StringIO import StringIO

import wdmmg.model as model

# This is the deflator used in PESA. It is normalised to 2006-07 using a
# measure of inflation suitable for measuring GDP. A value of 'x' in year 'y'
# means that 1 pound from year 2006 was worth 'x' pounds from year 'y'.
# The figures were reverse-engineered from the PESA 2008-09 report, by dividing
# table 1.2 by table 1.1.
gdp_deflator2006 = {
  '2002': 1.0/1.1098,
  '2003': 1.0/1.0786,
  '2004': 1.0/1.0496,
  '2005': 1.0/1.0274,
  '2006': 1.0/1.0000,
  '2007': 1.0/0.9690,
  '2008': 1.0/0.9399,
  '2009': 1.0/0.9150,
  '2010': 1.0/0.8911
}

# Time-series data is hard-wired for now.
time_series = {
  'gdp_deflator2006': gdp_deflator2006
}

class Results:
    '''
    Represents the result of a call to `aggregate()`. This class has the
    following fields:
    
    dates - a list of the distinct entry dates. These are unicode strings,
        and they are returned in sorted order.
    
    axes - a list of key names. Does not include 'time', which is treated
        specially.
    
    matrix - a sparse matrix. This takes the form of a list of (coordinates,
        time series) pairs. The "coordinates" are a list giving the values of
        the Keys in the same order as they appear in `axes`. If no KeyValue
        exists for a given Key, the value `None` is supplied. The "time series"
        is a list of spending totals, one for each date in `dates`. If there
        was no spending on a given date, then `0.0` is supplied.
    '''
    
    def __init__(self, dates, axes, matrix=None):
        '''
        Do not construct a Results directly - call `aggregate()` instead.
        
        dates - a sorted list of unicode strings, representing all the distinct
            entry dates needed.
        
        axes - a list of unicode strings, representing Key names.
        '''
        self.dates = dates
        self.date_index = dict([(date, index)
            for index, date in enumerate(self.dates)])
        self.axes = axes
        self.axis_index = dict([(axis, i) for i, axis in enumerate(axes)])
        self.matrix = matrix or {}
    
    def _add_spending(self, coordinates, amount, timestamp):
        '''
        For use by `aggregate()`.
        '''
        assert amount is not None, (coordinates, amount, timestamp)
        if coordinates not in self.matrix:
            self.matrix[coordinates] = [0.0] * len(self.dates)
        self.matrix[coordinates][self.date_index[timestamp]] += amount
    
    def divide_by_statistic(self, axis, statistic):
        '''
        Divides spending by a property of a coordinate. This is useful for
        computing statistics such as per-capita spending.
        
        axis - a Key, representing the coordinate to use, e.g. region.
        
        statistic - a Key, representing the statistic to use, e.g. population.
        
        The Key `axis` selects a property of spending (e.g. geographical
        region). The value of that property (e.g. 'NORTHERN IRELAND') is
        retrieved for each Entry. Then, the Key `statistic` selects a
        property of those values (e.g. population). Finally, the aggregated
        spending is divided by the aggregated statistic. There are two cases,
        depending on whether `axis` is in `self.axes`:
        
         - If it is, then each spending item is divided by e.g. the population
        of its own region.
        
         - If it is not, then each spending item is divided by e.g. the total
        population of all regions.
        '''
        def to_float(x):
            try: return float(x)
            except ValueError: return None
        axis_values = model.Entry.c.distinct(axis)
        index = [(av, model.Entry.find_one({axis: av}, {statistic: 1}).get(statistic)) \
                for av in axis_values]
        index = dict([(k, v) for k, v in index if v is not None])
        if axis in self.axes:
            n = self.axis_index[axis] # Which coordinate?
            for coordinates, amounts in self.matrix.items():
                divisor = index.get(coordinates[n])
                for i in range(len(self.dates)):
                    if divisor and amounts[i] is not None: amounts[i] /= divisor
                    else: amounts[i] = None
        else:
            # FIXME: Does not work for hierarchical keys.
            divisor = sum(index.values())
            for coordinates, amounts in self.matrix.items():
                for i in range(len(self.dates)):
                    if divisor and amounts[i] is not None: amounts[i] /= divisor
                    else: amounts[i] = None
    
    def divide_by_time_statistic(self, statistic_name):
        '''
        Divides spending by a property of a coordinate. This is useful for
        computing statistics such as spending in real terms.
        
        statistic_name - the name of a time-dependent statistic. The supported
            statistics are: 'gdp_deflator2006'.
        '''
        statistic = time_series.get(statistic_name, {})
        divisors = [statistic.get(date, None) for date in self.dates]
        for coordinates, amounts in self.matrix.items():
            for i in range(len(self.dates)):
                if divisors[i] and amounts[i]: amounts[i] /= divisors[i]
                else: amounts[i] = None

    def __str__(self):
        return repr(self)
    
    def __repr__(self):
        return (
            'Results(\n\t%r,\n\t%r,\n\tmatrix=%r)' %
            (self.dates, self.axes, self.matrix)
        )

def aggregate(dataset_, include={}, axes=[]):
    '''
    Returns the dataset `dataset_`, converted to a pivot table. Entries are
    filtered and then classified according to their properties, and summarised
    by summing over all variables apart from those of interest.
    
    Arguments:
    
    dataset_ - a Dataset object representing the dataset of interest.
    
    include - rules for including postings. This is a list of (key, value)
        pairs. A entry will be excluded unless the value matches the
        entry's value for the key.
    
    axes - a list of Key objects representing the desired axes of the pivot
        table. Do not include 'time', which is treated specially.
    
    Returns a Results object.
    '''
    iterator = _make_aggregate_iterator(dataset_, include, axes)
    results = list(iterator)
    dates = sorted(set([row['time'] for row in results]))
    ans = Results(dates, axes)
    for row in results:
        ans._add_spending(
            tuple([row[axis] for axis in axes]),
            row['amount'],
            row['time'],
        )
    return ans

def _make_aggregate_iterator(dataset, include, axes,):
    '''
    Queries MongoDB to get all desired fields. 
    Parameters: same as `aggregate()`.
    '''
    fq = dict(include)
    fq['dataset.name'] = dataset.name
    
    # TODO: Do I want to handle axes at this level? 
    aq = set(axes + ['time', 'amount'])
    aq = dict([(a, 1) for a in aq])
    print "FQ", fq
    print "AQ", aq
    return (e.to_flat_dict() for e in model.Entry.find(fq, aq))
    
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.