Commits

Chris Mutel committed a9866ce

Split matrix building off into separate classes

Comments (0)

Files changed (6)

 from __future__ import division
 from brightway2 import config as base_config
 from brightway2 import databases, methods, mapping
-from bw2data.proxies import OneDimensionalArrayProxy, \
-    CompressedSparseMatrixProxy
-from bw2data.utils import MAX_INT_32, TYPE_DICTIONARY
-from fallbacks import dicter
 from scipy.sparse.linalg import factorized, spsolve
 from scipy import sparse
 import numpy as np
 import os
-try:
-    import cPickle as pickle
-except ImportError:
-    import pickle
-try:
-    from bw2speedups import indexer
-except ImportError:
-    from fallbacks import indexer
+from .matrices import MatrixBuilder
+from .matrices import TechnosphereBiosphereMatrixBuilder as TBMBuilder
 
 
 class LCA(object):
     def __init__(self, demand, method=None, config=None):
-        self.config = config or base_config
+        self.dirpath = (config or base_config).dir
         if isinstance(demand, (basestring, tuple, list)):
             raise ValueError("Demand must be a dictionary")
         self.demand = demand
         self.method = method
-        self.databases = self.get_databases()
+        self.databases = self.get_databases(demand)
 
-    def get_databases(self):
+    def get_databases(self, demand):
         """Get list of databases for functional unit"""
-        return set.union(*[set(databases[key[0]]["depends"] + [key[0]]) for key \
-            in self.demand])
-
-    def load_databases(self):
-        params = np.hstack([pickle.load(open(os.path.join(
-            self.config.dir, "processed", "%s.pickle" % name), "rb")
-            ) for name in self.databases])
-        # Technosphere
-        self.tech_params = params[
-            np.hstack((
-                np.where(params['type'] == TYPE_DICTIONARY["technosphere"])[0],
-                np.where(params['type'] == TYPE_DICTIONARY["production"])[0]
-                ))
-            ]
-        self.bio_params = params[np.where(params['type'] == TYPE_DICTIONARY["biosphere"])]
-        self.technosphere_dict = self.build_dictionary(np.hstack((
-            self.tech_params['input'],
-            self.tech_params['output'],
-            self.bio_params['output']
-            )))
-        self.add_matrix_indices(self.tech_params['input'], self.tech_params['row'],
-            self.technosphere_dict)
-        self.add_matrix_indices(self.tech_params['output'], self.tech_params['col'],
-            self.technosphere_dict)
-        # Biosphere
-        self.biosphere_dict = self.build_dictionary(self.bio_params['input'])
-        self.add_matrix_indices(self.bio_params['input'], self.bio_params['row'],
-            self.biosphere_dict)
-        self.add_matrix_indices(self.bio_params['output'], self.bio_params['col'],
-            self.technosphere_dict)
-
-    def load_method(self):
-        params = pickle.load(open(os.path.join(self.config.dir, "processed",
-            "%s.pickle" % methods[self.method]['abbreviation']), "rb"))
-        self.add_matrix_indices(params['flow'], params['index'],
-            self.biosphere_dict)
-        # Eliminate references to biosphere flows that don't appear in this
-        # assessment; they are masked with MAX_INT_32 values
-        self.cf_params = params[np.where(params['index'] != MAX_INT_32)]
-
-    def build_technosphere_matrix(self, vector=None):
-        vector = self.tech_params['amount'].copy() \
-            if vector is None else vector
-        count = len(self.technosphere_dict)
-        technosphere_mask = np.where(self.tech_params["type"] == \
-            TYPE_DICTIONARY["technosphere"])
-        # Inputs are consumed, so are negative
-        vector[technosphere_mask] = -1 * vector[technosphere_mask]
-        # coo_matrix construction is coo_matrix((values, (rows, cols)),
-        # (row_count, col_count))
-        self.technosphere_matrix = CompressedSparseMatrixProxy(
-            sparse.coo_matrix((vector.astype(np.float64),
-            (self.tech_params['row'], self.tech_params['col'])),
-            (count, count)).tocsr(),
-            self.technosphere_dict, self.technosphere_dict)
-
-    def build_biosphere_matrix(self, vector=None):
-        vector = self.bio_params['amount'] if vector is None else vector
-        row_count = len(self.biosphere_dict)
-        col_count = len(self.technosphere_dict)
-        # coo_matrix construction is coo_matrix((values, (rows, cols)),
-        # (row_count, col_count))
-        self.biosphere_matrix = CompressedSparseMatrixProxy(
-            sparse.coo_matrix((vector.astype(np.float64),
-            (self.bio_params['row'], self.bio_params['col'])),
-            (row_count, col_count)).tocsr(),
-            self.biosphere_dict, self.technosphere_dict)
+        return set.union(*[set(databases[key[0]]["depends"] + [key[0]]
+            ) for key in demand])
 
     def decompose_technosphere(self):
-        self.solver = factorized(self.technosphere_matrix.data.tocsc())
+        self.solver = factorized(self.technosphere_matrix.tocsc())
 
     def build_demand_array(self, demand=None):
         demand = demand or self.demand
-        self.demand_array = OneDimensionalArrayProxy(
-            np.zeros(len(self.technosphere_dict)),
-            self.technosphere_dict)
+        self.demand_array = np.zeros(len(self.technosphere_dict))
         for key in demand:
-            self.demand_array[mapping[key]] = demand[key]
-
-    def build_characterization_matrix(self, vector=None):
-        vector = self.cf_params['amount'] if vector is None else vector
-        count = len(self.biosphere_dict)
-        self.characterization_matrix = CompressedSparseMatrixProxy(
-            sparse.coo_matrix((vector.astype(np.float64),
-            (self.cf_params['index'], self.cf_params['index'])),
-            (count, count)).tocsr(),
-            self.biosphere_dict, self.biosphere_dict)
-
-    def build_dictionary(self, array):
-        """Build a dictionary from the sorted, unique elements of an array"""
-        return dicter(array)
-
-    def add_matrix_indices(self, array_from, array_to, mapping):
-        """Map ``array_from`` keys to ``array_to`` values using ``mapping``"""
-        indexer(array_from, array_to, mapping)
+            self.demand_array[self.technosphere_dict[mapping[key]]] = demand[key]
 
     def solve_linear_system(self):
         """
 Higham, Accuracy and Stability of Numerical Algorithms, 2002, p. 260.
         """
         if hasattr(self, "solver"):
-            return self.solver(self.demand_array.data)
+            return self.solver(self.demand_array)
         else:
             return spsolve(
-                self.technosphere_matrix.data,
-                self.demand_array.data)
+                self.technosphere_matrix,
+                self.demand_array)
 
-    def lci(self, factorize=False):
+    def load_lci_data(self, builder=TBMBuilder):
+        self.bio_params, self.tech_params, \
+            self.biosphere_dict, self.technosphere_dict, \
+            self.biosphere_matrix, self.technosphere_matrix = \
+            builder.build(self.dirpath, self.databases)
+
+    def load_lcia_data(self, builder=MatrixBuilder):
+        self.cf_params, dummy, dummy, self.characterization_matrix = \
+            builder.build(self.dirpath, [methods[self.method]['abbreviation']
+                ], "amount", "flow", "index", row_dict=self.biosphere_dict,
+                one_d=True)
+
+    def rebuild_technosphere_matrix(self, vector):
+        self.technosphere_matrix = MatrixBuilder.build_matrix(self.tech_params, self.technosphere_dict, self.technosphere_dict, "row", "col", new_data=vector)
+
+    def rebuild_biosphere_matrix(self, vector):
+        self.biosphere_matrix = MatrixBuilder.build_matrix(self.bio_params, self.biosphere_dict, self.technosphere_dict, "row", "col", new_data=vector)
+
+    def rebuild_characterization_matrix(self, vector):
+        self.characterization_matrix = MatrixBuilder.build_diagonal_matrix(self.cf_params, self.biosphere_dict, "index", new_data=vector)
+
+    def lci(self, factorize=False,
+            builder=TBMBuilder):
         """Life cycle inventory"""
-        self.load_databases()
-        self.build_technosphere_matrix()
-        self.build_biosphere_matrix()
+        self.load_lci_data(builder)
         self.build_demand_array()
         if factorize:
             self.decompose_technosphere()
         self.lci_calculation()
 
+    def fix_dictionaries(self):
+        # TODO: Explain this
+        rev_mapping = {v: k for k, v in mapping.iteritems()}
+        self.technosphere_dict = {rev_mapping[k]: v for k, v in self.technosphere_dict.iteritems()}
+        self.biosphere_dict = {rev_mapping[k]: v for k, v in self.biosphere_dict.iteritems()}
+
     def lci_calculation(self):
-        self.supply_array = OneDimensionalArrayProxy(
-            self.solve_linear_system(),
-            self.technosphere_dict)
+        self.supply_array = self.solve_linear_system()
         count = len(self.technosphere_dict)
-        self.inventory = CompressedSparseMatrixProxy(
-            self.biosphere_matrix.data * \
-            sparse.spdiags([self.supply_array.data], [0], count, count),
-            self.biosphere_dict, self.technosphere_dict)
+        self.inventory = self.biosphere_matrix * \
+            sparse.spdiags([self.supply_array], [0], count, count)
 
     def redo_lci(self, demand):
         """Redo LCI with same databases but different demand"""
         self.build_demand_array(demand)
         self.lci_calculation()
 
-    def lcia(self):
+    def lcia(self, builder=MatrixBuilder):
         """Life cycle impact assessment"""
         assert hasattr(self, "inventory"), "Must do lci first"
         assert self.method, "Must specify a method to perform LCIA"
-        self.load_method()
-        self.build_characterization_matrix()
+        self.load_lcia_data(builder)
         self.lcia_calculation()
 
     def lcia_calculation(self):
-        self.characterized_inventory = CompressedSparseMatrixProxy(
-            self.characterization_matrix.data * self.inventory.data,
-            self.biosphere_dict, self.technosphere_dict)
+        self.characterized_inventory = \
+            self.characterization_matrix * self.inventory
 
     def redo_lcia(self, demand=None):
         assert hasattr(self, "characterized_inventory"), "Must do LCIA first"
 
     def reverse_dict(self):
         """Construct reverse dicts from row and col indices to processes"""
-        rev_mapping = dict([(v, k) for k, v in mapping.iteritems()])
-        rev_tech = dict([(v, rev_mapping[k]) for k, v in \
-            self.technosphere_dict.iteritems()])
-        rev_bio = dict([(v, rev_mapping[k]) for k, v in \
-            self.biosphere_dict.iteritems()])
+        rev_tech = {v: k for k, v in self.technosphere_dict.iteritems()}
+        rev_bio = {v: k for k, v in self.biosphere_dict.iteritems()}
         return rev_tech, rev_bio

bw2calc/matrices.py

+# -*- coding: utf-8 -*
+from __future__ import division
+from .fallbacks import dicter
+from .utils import load_arrays
+from bw2data.utils import MAX_INT_32, TYPE_DICTIONARY
+from scipy import sparse
+import numpy as np
+try:
+    from bw2speedups import indexer
+except ImportError:
+    from .fallbacks import indexer
+
+
+class MatrixBuilder(object):
+    """Load a structured array from a file, manipulate it. Generates a sparse matrix."""
+
+    @classmethod
+    def load(self, dirpath, names):
+        """Load a structured array from a file."""
+        return load_arrays(dirpath, names)
+
+    @classmethod
+    def add_matrix_indices(self, array_from, array_to, mapping):
+        """Map ``array_from`` keys to ``array_to`` values using ``mapping``.
+
+        Operates in place. Doesn't return anything."""
+        indexer(array_from, array_to, mapping)
+
+    @classmethod
+    def build_dictionary(self, array):
+        """Build a dictionary from the sorted, unique elements of an array"""
+        return dicter(array)
+
+    @classmethod
+    def build(cls, dirpath, names, data_label,
+            row_id_label, row_index_label,
+            col_id_label=None, col_index_label=None,
+            row_dict=None, col_dict=None, one_d=False):
+        """Build a sparse matrix from NumPy structured array(s).
+
+        This method does the following:
+
+        #. Load and concatenate some structured arrays.
+        #. Using the ``row_id_label``, and the ``row_dict`` if available, add matrix indices to the ``row_index_label`` column.
+        #. If not ``ond_d``, do the same to ``col_index_label`` using ``col_id_label`` and ``col_dict``.
+        #. If not ``ond_d``, build a sparse matrix using ``data_label`` for the matrix data, and ``row_index_label`` and ``col_index_label`` as indices.
+        #. Else if ``ond_d``, build a diagonal matrix using only ``data_label`` for values and ``row_index_label`` as indices.
+        #. Return the loaded parameter arrays, row and column dicts, and matrix."""
+        assert isinstance(names, (tuple, list, set)), "names must be a list"
+        array = load_arrays(dirpath, names)
+        if not row_dict:
+            row_dict = cls.build_dictionary(array[row_id_label])
+        cls.add_matrix_indices(array[row_id_label], array[row_index_label],
+            row_dict)
+        if one_d:
+            # Eliminate references to row data which isn't used;
+            # Unused data remains MAX_INT_32 values because it isn't mapped
+            # by ``add_matrix_indices``.
+            array = array[np.where(array[row_index_label] != MAX_INT_32)]
+            matrix = cls.build_diagonal_matrix(array, row_dict, row_index_label, data_label)
+        else:
+            if not col_dict:
+                col_dict = cls.build_dictionary(array[col_id_label])
+            cls.add_matrix_indices(array[col_id_label],
+                array[col_index_label], col_dict)
+            matrix = cls.build_matrix(array, row_dict, col_dict,
+                row_index_label, col_index_label, data_label)
+        return array, row_dict, col_dict, matrix
+
+    @classmethod
+    def build_matrix(cls, array, row_dict, col_dict, row_index_label,
+            col_index_label, data_label=None, new_data=None):
+        """Build sparse matrix."""
+        vector = array[data_label] if new_data is None else new_data
+        assert vector.shape[0] == array.shape[0], "Incompatible data & indices"
+        # coo_matrix construction is coo_matrix((values, (rows, cols)),
+        # (row_count, col_count))
+        return sparse.coo_matrix((
+            vector.astype(np.float64),
+            (array[row_index_label], array[col_index_label])),
+            (len(row_dict), len(col_dict))).tocsr()
+
+    @classmethod
+    def build_diagonal_matrix(cls, array, row_dict, index_label,
+            data_label=None, new_data=None):
+        """Build diagonal sparse matrix."""
+        return cls.build_matrix(array, row_dict, row_dict, index_label, index_label, data_label)
+
+
+class TechnosphereBiosphereMatrixBuilder(MatrixBuilder):
+    """Subclass of ``MatrixBuilder`` that separates technosphere and biosphere parameters."""
+    @classmethod
+    def build(cls, dirpath, names):
+        """Build the technosphere and biosphere sparse matrices."""
+        assert isinstance(names, (tuple, list, set)), "names must be a list"
+        array = load_arrays(dirpath, names)
+        tech_array = array[
+            np.hstack((
+                np.where(array['type'] == TYPE_DICTIONARY["technosphere"])[0],
+                np.where(array['type'] == TYPE_DICTIONARY["production"])[0]
+                ))
+            ]
+        bio_array = array[np.where(array['type'] == TYPE_DICTIONARY["biosphere"])]
+        tech_dict = cls.build_dictionary(np.hstack((
+            tech_array['input'],
+            tech_array['output'],
+            bio_array['output']
+            )))
+        bio_dict = cls.build_dictionary(bio_array["input"])
+        cls.add_matrix_indices(tech_array['input'], tech_array['row'],
+            tech_dict)
+        cls.add_matrix_indices(tech_array['output'], tech_array['col'],
+            tech_dict)
+        cls.add_matrix_indices(bio_array['input'], bio_array['row'], bio_dict)
+        cls.add_matrix_indices(bio_array['output'], bio_array['col'], tech_dict)
+        technosphere = cls.build_technosphere_matrix(tech_array, tech_dict)
+        biosphere = cls.build_matrix(bio_array, bio_dict, tech_dict, "row", "col", "amount")
+        return bio_array, tech_array, bio_dict, tech_dict, biosphere, \
+            technosphere
+
+    @classmethod
+    def get_technosphere_inputs_mask(cls, array):
+        """Get mask of technosphere inputs from ``array``"""
+        return np.where(array["type"] == \
+            TYPE_DICTIONARY["technosphere"])
+
+    @classmethod
+    def fix_supply_use(cls, array, vector):
+        """Make technosphere inputs negative."""
+        # Inputs are consumed, so are negative
+        # Operates in place
+        mask = cls.get_technosphere_inputs_mask(array)
+        vector[mask] = -1 * vector[mask]
+
+    @classmethod
+    def build_technosphere_matrix(cls, array, tech_dict, new_data=None):
+        vector = array["amount"] if new_data is None else new_data
+        cls.fix_supply_use(array, vector)
+        return cls.build_matrix(array, tech_dict, tech_dict, "row", "col", "amount", vector)

bw2calc/monte_carlo.py

 # -*- coding: utf-8 -*
-import itertools
-from lca import LCA
+from __future__ import division
+from .lca import LCA
+from .matrices import MatrixBuilder
+from .matrices import TechnosphereBiosphereMatrixBuilder as TBMBuilder
 from scipy.sparse.linalg import iterative, spsolve
 from stats_toolkit.random import MCRandomNumberGenerator
+import itertools
 import multiprocessing
 import numpy as np
 
         self.seed = seed
         self.iter_solver = iter_solver
         self.guess = None
-        self.load_databases()
-        self.load_method()
+        self.load_lci_data()
+        self.load_lcia_data()
         self.tech_rng = MCRandomNumberGenerator(self.tech_params, seed=seed)
         self.bio_rng = MCRandomNumberGenerator(self.bio_params, seed=seed)
         self.cf_rng = MCRandomNumberGenerator(self.cf_params, seed=seed)
 
     def next(self):
-        self.build_technosphere_matrix(self.tech_rng.next())
-        self.build_biosphere_matrix(self.bio_rng.next())
-        self.build_characterization_matrix(self.cf_rng.next())
+        self.rebuild_technosphere_matrix(self.tech_rng.next())
+        self.rebuild_biosphere_matrix(self.bio_rng.next())
+        self.rebuild_characterization_matrix(self.cf_rng.next())
 
         if not hasattr(self, "demand_array"):
             self.build_demand_array()
     def solve_linear_system(self):
         if not self.iter_solver or self.guess == None:
             self.guess = spsolve(
-                self.technosphere_matrix.data,
-                self.demand_array.data)
+                self.technosphere_matrix,
+                self.demand_array)
             return self.guess
         else:
             solution, status = self.iter_solver(
-                self.technosphere_matrix.data,
-                self.demand_array.data,
+                self.technosphere_matrix,
+                self.demand_array,
                 x0=self.guess)
             if status != 0:
                 raise
         self.seed = seed
         self.iter_solver = iter_solver
         self.guess = None
-        self.load_databases()
-        self.load_method()
+        self.load_lci_data()
+        self.load_lcia_data()
         self.tech_rng = MCRandomNumberGenerator(self.tech_params, seed=seed)
         self.bio_rng = MCRandomNumberGenerator(self.bio_params, seed=seed)
         self.cf_rng = MCRandomNumberGenerator(self.cf_params, seed=seed)
 
     def next(self):
-        self.build_technosphere_matrix(self.tech_rng.next())
-        self.build_biosphere_matrix(self.bio_rng.next())
-        self.build_characterization_matrix(self.cf_rng.next())
+        self.rebuild_technosphere_matrix(self.tech_rng.next())
+        self.rebuild_biosphere_matrix(self.bio_rng.next())
+        self.rebuild_characterization_matrix(self.cf_rng.next())
+
         results = []
         for demand in self.demands:
             self.build_demand_array(demand)
     def solve_linear_system(self):
         if not self.iter_solver or self.guess == None:
             self.guess = spsolve(
-                self.technosphere_matrix.data,
-                self.demand_array.data)
+                self.technosphere_matrix,
+                self.demand_array)
             return self.guess
         else:
             solution, status = self.iter_solver(
-                self.technosphere_matrix.data,
-                self.demand_array.data,
+                self.technosphere_matrix,
+                self.demand_array,
                 x0=self.guess)
             if status != 0:
                 raise
 
 def multi_worker(demands, method):
     lca = LCA(demands[0], method)
-    lca.load_databases()
-    lca.load_method()
+    lca.load_lci_data()
+    lca.load_lcia_data()
     # Create new matrices
-    lca.build_technosphere_matrix(MCRandomNumberGenerator(lca.tech_params).next())
-    lca.build_biosphere_matrix(MCRandomNumberGenerator(lca.bio_params).next())
-    lca.build_characterization_matrix(MCRandomNumberGenerator(lca.cf_params).next())
+    lca.rebuild_technosphere_matrix(
+        MCRandomNumberGenerator(lca.tech_params).next())
+    lca.rebuild_biosphere_matrix(
+        MCRandomNumberGenerator(lca.bio_params).next())
+    lca.rebuild_characterization_matrix(
+        MCRandomNumberGenerator(lca.cf_params).next())
     lca.decompose_technosphere()
     lca.lci()
     lca.lcia()

bw2calc/tests/lca.py

         answer = np.zeros((2,))
         answer[lca.technosphere_dict[mapping[("t", 1)]]] = 1
         answer[lca.technosphere_dict[mapping[("t", 2)]]] = 0.5
-        self.assertTrue(np.allclose(answer, lca.supply_array.data))
+        self.assertTrue(np.allclose(answer, lca.supply_array))
 
     def test_production_values(self):
         test_data = {
         answer = np.zeros((2,))
         answer[lca.technosphere_dict[mapping[("t", 1)]]] = 0.5
         answer[lca.technosphere_dict[mapping[("t", 2)]]] = 0.25
-        self.assertTrue(np.allclose(answer, lca.supply_array.data))
+        self.assertTrue(np.allclose(answer, lca.supply_array))
 
     def test_substitution(self):
         test_data = {
         answer = np.zeros((2,))
         answer[lca.technosphere_dict[mapping[("t", 1)]]] = 1
         answer[lca.technosphere_dict[mapping[("t", 2)]]] = -1
-        self.assertTrue(np.allclose(answer, lca.supply_array.data))
+        self.assertTrue(np.allclose(answer, lca.supply_array))
 
     def test_circular_chains(self):
         test_data = {
         answer = np.zeros((2,))
         answer[lca.technosphere_dict[mapping[("t", 1)]]] = 20 / 19.
         answer[lca.technosphere_dict[mapping[("t", 2)]]] = 10 / 19.
-        self.assertTrue(np.allclose(answer, lca.supply_array.data))
+        self.assertTrue(np.allclose(answer, lca.supply_array))
 
     def test_dependent_databases(self):
         pass
+# -*- coding: utf-8 -*
+import numpy as np
+import os
+try:
+    import cPickle as pickle
+except ImportError:
+    import pickle
+
+
+def load_arrays(dirpath, names):
+    return np.hstack([pickle.load(open(os.path.join(
+        dirpath, "processed", "%s.pickle" % name), "rb")
+        ) for name in names])
 
 setup(
   name='bw2calc',
-  version="0.9.0-alpha",
+  version="0.9.0-alpha2",
   packages=["bw2calc", "bw2calc.tests"],
   author="Chris Mutel",
   author_email="cmutel@gmail.com",