Source

wdmmgext / wdmmgext / load / cra.py

import os, sys, csv, re
from datetime import date
try: import json
except ImportError: import simplejson as json

import datapkg
from pylons import config
import wdmmg.model as model
from wdmmg.lib import loader
import util

class CofogMapper(object):
    '''
    In the published data, the "function" and "subfunction" columns are used
    inconsistently. This is partly because some departments continue to use a
    previous coding system, and partly because only two columns have been
    allowed for the three levels of the COFOG hierarchy.
    
    This class uses a mapping provided by William Waites to work out the
    correct COFOG code, given the published data.
    '''
    def __init__(self, mappings):
        '''
        Constructs a COFOG mapper from a mappings object (which is
        usually loaded from a JSON file).
        
        mappings - a list of triples. In each
            triple, the first element is the good code, and the second and
            third elements give the published values. If the first element
            (the good code) contains non-numerical suffix, it will be removed.
        '''
        self.mappings = {}
        for good, bad1, bad2 in mappings:
            good = re.match(r'([0-9]+(\.[0-9])*)', good).group(1)
            self.mappings[bad1, bad2] = good
    
    def fix(self, function, subfunction):
        '''
        Looks up the fixed COFOG code given the published values.
        
        Returns a list giving all available COFOG levels, e.g.
        `[u'01', u'01.1', u'01.1.1']`

        Returns an empty list if no COFOG mapping has been
        defined.
        '''
        ans = self.mappings.get((function, subfunction))
        if ans is None:
            return []
        parts = ans.split('.')
        return ['.'.join(parts[:i+1]) for i, _ in enumerate(parts)]


def load_file(fileobj, cofog_mapper, commit_every=None):
    '''
    Loads a file from `fileobj` into a dataset with name 'cra'.
    The file should be CSV formatted, with the same structure as the 
    Country Regional Analysis data.
    
    fileobj - an open, readable file-like object.
    
    commit_every - if not None, call model.Session.commit() at the 
        specified frequency, expressed as a number of Entry records.
    '''
    # Semaphore to ensure COFOG is loaded first.
    assert (model.Session.query(model.Key)
        .filter_by(name=u'cofog1')
        ).first(), 'COFOG must be loaded first'
    # Retrieve or create the required Keys.
    key_from = util.get_or_create_key(name=u'from', notes=u'''\
The entity that the money was paid from.''')
    key_to = util.get_or_create_key(name=u'to', notes=u'''\
The entity that the money was paid to.''')
    key_time = util.get_or_create_key(name=u'time', notes=u'''\
The accounting period in which the spending happened.''')
    key_pog = util.get_or_create_key(name=u'pog', notes=u'''\
Programme Object Group.''')
    key_cap_or_cur = util.get_or_create_key(name=u'cap_or_cur', notes=u'''\
Capital (one-off investment) or Current (on-going running costs)''')
    key_region = util.get_or_create_key(u'region', u'''\
Geographical (NUTS) area for which money was spent''')
    # We also use 'cofog1', 'cofog2' and 'cofog3' from the COFOG package.
    key_cofog1 = model.Session.query(model.Key).filter_by(name=u'cofog1').one()
    key_cofog2 = model.Session.query(model.Key).filter_by(name=u'cofog2').one()
    key_cofog3 = model.Session.query(model.Key).filter_by(name=u'cofog3').one()
    # Make corresponding ValueCaches.
    cache_from = loader.ValueCache(key_from)
    cache_to = loader.ValueCache(key_to)
    cache_time = loader.ValueCache(key_time)
    cache_pog = loader.ValueCache(key_pog)
    cache_cap_or_cur = loader.ValueCache(key_cap_or_cur)
    cache_region = loader.ValueCache(key_region)
    cache_cofog1 = loader.ValueCache(key_cofog1, create=False)
    cache_cofog2 = loader.ValueCache(key_cofog2, create=False)
    cache_cofog3 = loader.ValueCache(key_cofog3, create=False)
    # Make a suitably configured Loader (this also creates the Dataset).
    cra = loader.Loader(
        u'cra',
        [key_from, key_to, key_time, key_pog, key_cap_or_cur, key_region,
            key_cofog1, key_cofog2, key_cofog3],
        notes = u'''The Country Regional Analysis published by HM Treasury (2009 version).

Source data can be found in the CKAN data package at:
<http://ckan.net/package/ukgov-finances-cra/>''',
        commit_every=commit_every
    )
    society = util.get_or_create_value(
        key=key_to, code=u'society',
        name=u'Society (the General Public)',
        notes= u'''A dummy entity to be the recipient of final government
spending'''
        )
    model.Session.commit()
    society_id = society.id
    # Utility function for parsing numbers.
    def to_float(s):
        if not s: return 0.0
        return float(s.replace(',', ''))
    # Utility function for formatting tax years.
    def to_year(s):
        y = int(s[:4])
        return unicode(y)
    def to_region(region_name):
        return region_name.replace('_', ' ')
    # For each line of the file...
    reader = csv.reader(fileobj)
    header = reader.next()
    year_col_start = 10
    years = [to_year(x) for x in header[year_col_start:]]
    for row in reader:
        if not row[0]:
            # Skip blank row.
            continue
        # Parse row.
        row = [unicode(x.strip()) for x in row]
        dept_code = row[0]
        dept_name = row[1]
        function = row[2]
        subfunction = row[3]
        pog_code = row[4]
        pog_name = row[5]
        cap_or_cur = row[7]
        # TODO: seems to affect a lot so requires work ...
        # region = to_region(row[9])
        region = row[9]
        expenditures = [round(1e6*to_float(x)) for x in row[year_col_start:]]
        # Skip row whose expenditures are all zero. (Pointless?)
        if not [x for x in expenditures if x]:
            continue
        # Map 'function' and 'subfunction' to three levels of COFOG.
        cofog_parts = cofog_mapper.fix(function, subfunction)
        assert cofog_parts, 'COFOG code is missing for (%s, %s)' % (function, subfunction)
        assert len(cofog_parts) <= 3, 'COFOG code %r has too many levels' % cofog_parts
        cofog_parts = (cofog_parts + [None]*3)[:3]
        # Make a Entry for each non-zero expenditure.
        for year, exp in zip(years, expenditures):
            if exp:
                cra.create_entry(exp, [
                    cache_from.get_value_id(dept_code, dept_name),
                    society_id,
                    cache_time.get_value_id(year),
                    cache_pog.get_value_id(pog_code, pog_name),
                    cache_cap_or_cur.get_value_id(cap_or_cur),
                    cache_region.get_value_id(region),
                    cache_cofog1.get_value_id(cofog_parts[0]),
                    cache_cofog2.get_value_id(cofog_parts[1]),
                    cache_cofog3.get_value_id(cofog_parts[2]),
                ])
    # Finish off.
    cra.compute_aggregates()

def load_population(fileobj):
    '''
    Adds population data to the EnumerationValues that represent regions.
    The annotations are added in the form of KeyValues, using a Key named
    `key_name`.
    '''
    key_region = model.Session.query(model.Key).filter_by(name=u'region').first()
    assert key_region, "CRA must be loaded first"
    key_population = model.Key(
        name=u'population2006',
        notes=u'''\
This data comes from the "All ages" column of the document "Table 8 Mid-2006 Population Estimates: Selected age groups for local authorities in the United Kingdom; estimated resident population" which is available here:

    http://www.statistics.gov.uk/statbase/Expodata/Spreadsheets/D9664.xls
'''
    )
    reader = csv.reader(fileobj)
    header = reader.next()
    for row in reader:
        if row:
            code, population = row
            ev = (model.Session.query(model.EnumerationValue)
                .filter_by(key=key_region)
                .filter_by(code=unicode(code))
                ).first()
            if ev:
                assert str(int(population)) == population, population
                ev.keyvalues[key_population] = unicode(population)
            else:
                print 'Population data for region %r ignored.' % code

def drop():
    '''
    Drops from the database all records associated with dataset 'cra'.
    '''
    # Delete only the keys we created ourselves.
    # TODO: Move as many as possible of these into separate data packages.
    for name in [u'dept', u'pog', u'cap_or_cur', u'region', u'population2006']:
        key = (model.Session.query(model.Key)
            .filter_by(name=name)
            ).first()
        if key:
            key.keyvalues.clear()
            model.Session.delete(key)
    # Delete ATP structure.
    # TODO: ORM-ise this code.
    dataset_ = (model.Session.query(model.Dataset)
        .filter_by(name=u'cra')
        ).one()
    assert dataset_
    (model.Session.query(model.KeyValue)
        .join(model.Entry)
        .filter_by(dataset_=dataset_)
        ).delete()
    (model.Session.query(model.Entry)
        .filter_by(dataset_=dataset_)
        ).delete()
    model.Session.delete(dataset_)

def load():
    '''
    Downloads the CRA, and loads it into the database with dataset name 'cra'.

    Usually access via the Paste Script comment "paster load cra", see
    ../lib/cli.py .
    '''
    # Get the CRA data package.
    pkgspec = 'file://%s' % os.path.join(config['getdata_cache'], 'ukgov-finances-cra')
    pkg = datapkg.load_package(pkgspec)
    # Make a CofogMapper.
    cofog_mapper = CofogMapper(json.load(pkg.stream('cofog_map.json')))
    # Load the data.
    # TODO: Move NUTS codes into a separate data package.
    # TODO: Move POG codes into a separate data package.
    load_file(pkg.stream('cra_2009_db.csv'), cofog_mapper, commit_every=1000)
    # Add population data.
    # TODO: Move population data into a separate data package. After NUTS.
    load_population(pkg.stream('nuts1_population_2006.csv'))
    model.Session.commit()
    model.Session.remove()