1. Rufus Pollock
  2. wdmmg

Source

wdmmg / wdmmg / tests / test_aggregator.py

from datetime import date
import os, sys, csv
import pkg_resources

import wdmmg.model as model
import wdmmg.lib.aggregator as aggregator
from wdmmg.tests import Fixtures

class TestAggregator(object):
    @classmethod
    def setup_class(self):
        Fixtures.setup()
    
    @classmethod
    def teardown_class(self):
        Fixtures.teardown()
    
    def test_aggregate(self):
        ans = aggregator.aggregate(
            Fixtures.dataset_,
            include=[('region', u'ENGLAND_South West')],
            axes=[
                'cap_or_cur', 'cofog.1',
                # Omit Fixtures.pog, Fixtures.region,
            ])
        print ans
        assert ans.dates == [u'2003', u'2004', u'2005',
            u'2006', u'2007', u'2008', u'2009',
            u'2010'], ans.dates
        assert ans.axes == [u'cap_or_cur', u'cofog.1'], ans.axes
        index = dict([(coords, sum(amount)) for (coords, amount) in ans.matrix.items()])
        for k, v in index.items():
            print k, v
        assert len(index) == 3, index
        for amount, coords in [
            (70700000.0, (u'CUR', u'10')),
            (500000.0, (u'CAP', u'03')),
            (-608900000.0, (u'CAP', u'06')),
        ]:
            assert index.has_key(coords), coords
            # Tolerate rounding errors.
            assert abs(index[coords] - amount) < 1, (coords, amount)

    def test_aggregate_per(self):
        ans = aggregator.aggregate(
            Fixtures.dataset_,
            axes=['cofog.1.name', 'region'],
        )
        print ans
        ans.divide_by_statistic('region', 'population2006')
        print ans
        assert ans.axes == [u'cofog.1.name', u'region'], ans.axes
        index = dict([(coords, amount[-2]) for (coords, amount) in ans.matrix.items()])
        for k, v in index.items():
            print k, v
        assert len(index) == 7, index
        for amount, coords in [
            (2.365, (u'04', u'SCOTLAND')),
            (5.795, (u'10', u'ENGLAND_West Midlands')),
            (0.106, (u'04', u'ENGLAND_London')),
            (3.083, (u'10', u'ENGLAND_South West')),
            (0.020, (u'03', u'ENGLAND_South West')),
            (4.356, (u'04', u'ENGLAND_Yorkshire and The Humber')),
            (-4.879, (u'06', u'ENGLAND_South West')),
        ]:
            assert index.has_key(coords), coords
            # Tolerate rounding errors.
            assert abs(index[coords] - amount) < 1e-3, (coords, amount)

    def test_aggregate_per_time(self):
        ans = aggregator.aggregate(
            Fixtures.dataset_,
            axes=[],
        )
        print ans
        ans.divide_by_time_statistic('gdp_deflator2006')
        print ans
        data = ans.matrix[()]
        assert len(data) == 8, data
        for i, amount in enumerate([
            -180.8, -122.5, -88.0, -82.4, -19.6, 21.9, 52.4, 18.4]):
            assert abs(data[i] - amount*1e6) < 1e5, (i, amount)

# TODO: Test filtering on dataset.
# TODO: Test with some breakdown KeyValues missing (i.e. coordinate is NULL).
# TODO: Test per without breakdown.