Chris Mutel avatar Chris Mutel committed ea5ffc3

0.10.1: Add MC vector and iterative MC class, fix compatibility with upstream changes, add sensitivity

Comments (0)

Files changed (17)

bw2calc/__init__.py

 from .lca import LCA
 from .simple_regionalized import SimpleRegionalizedLCA
 from .monte_carlo import MonteCarloLCA, ParallelMonteCarlo, MultiMonteCarlo
+from .mc_vector import ParameterVectorLCA
 from .graph_traversal import GraphTraversal, edge_cutter, node_pruner, \
     d3_fd_graph_formatter
 from .matrices import MatrixBuilder, TechnosphereBiosphereMatrixBuilder
 
-__version__ = (0, 9, 3)
+__version__ = (0, 10, 1)

bw2calc/graph_traversal.py

         # Create matrix of LCIA CFs times biosphere flows, as these don't
         # change. This is also the unit score of each activity.
         characterized_biosphere = np.array((
-            lca.characterization_matrix.data *
-            lca.biosphere_matrix.data).sum(axis=0)).ravel()
+            lca.characterization_matrix *
+            lca.biosphere_matrix).sum(axis=0)).ravel()
 
         heap, nodes, edges = self.initialize_heap(
             demand, lca, supply, characterized_biosphere)
         while heap and counter < max_calc:
             parent_score_inverted, parent_index = heappop(heap)
             # parent_score = 1 / parent_score_inverted
-            col = lca.technosphere_matrix.data[:, parent_index].tocoo()
+            col = lca.technosphere_matrix[:, parent_index].tocoo()
             # Multiply by -1 because technosphere values are negative
             # (consumption of inputs)
             children = [(col.row[i], -1 * col.data[i]) for i in xrange(
 # -*- coding: utf-8 -*
 from __future__ import division
 from brightway2 import config as base_config
-from brightway2 import databases, methods, mapping
+from brightway2 import databases, methods, mapping, weightings
 from scipy.sparse.linalg import factorized, spsolve
 from scipy import sparse
 import numpy as np
 import os
 from .matrices import MatrixBuilder
 from .matrices import TechnosphereBiosphereMatrixBuilder as TBMBuilder
+from .utils import load_arrays
 
 
 class LCA(object):
     ### Setup ###
     #############
 
-    def __init__(self, demand, method=None, config=None):
+    def __init__(self, demand, method=None, weighting=None, config=None):
         """Create a new LCA calculation.
 
         Args:
             raise ValueError("Demand must be a dictionary")
         self.demand = demand
         self.method = method
+        self.weighting = weighting
         self.databases = self.get_databases(demand)
 
     def get_databases(self, demand):
 Doesn't require any arguments or return anything, but changes ``self.technosphere_dict`` and ``self.biosphere_dict``.
 
         """
+        if not isinstance(self.technosphere_dict.keys()[0], int):
+            # Already reversed - should be idempotent
+            return
         rev_mapping = {v: k for k, v in mapping.iteritems()}
         self.technosphere_dict = {
             rev_mapping[k]: v for k, v in self.technosphere_dict.iteritems()}
             "amount", "flow", "index", row_dict=self.biosphere_dict,
             one_d=True)
 
+    def load_weighting_data(self):
+        """Load weighting data, a 1-element array."""
+        self.weighting_params = load_arrays(
+            self.dirpath,
+            [weightings[self.weighting]['abbreviation']]
+        )
+        self.weighting_value = self.weighting_params['amount']
+
     ####################
     ### Calculations ###
     ####################
         self.characterized_inventory = \
             self.characterization_matrix * self.inventory
 
+    def weighting_calculation(self):
+        if not hasattr(self, "weighting_value"):
+            self.load_weighting_data()
+        self.weighted_inventory = \
+            self.weighting_value[0] * self.characterized_inventory
+
     @property
     def score(self):
         """
 Note that this is a `property <http://docs.python.org/2/library/functions.html#property>`_, so it is ``foo.lca``, not ``foo.score()``
         """
         assert hasattr(self, "characterized_inventory"), "Must do LCIA first"
+        if self.weighting:
+            assert hasattr(self, "weighted_inventory"), "Must do weighting first"
+            return float(self.weighted_inventory.sum())
         return float(self.characterized_inventory.sum())
 
     #########################

bw2calc/mc_vector.py

+# -*- coding: utf-8 -*
+from __future__ import division
+from .monte_carlo import IterativeMonteCarlo
+from .utils import extract_uncertainty_fields as euf
+from stats_arrays.random import MCRandomNumberGenerator
+import numpy as np
+
+
+class ParameterVectorLCA(IterativeMonteCarlo):
+    """A Monte Carlo class where all uncertain parameters are stored in a single large array.
+
+    Useful for sensitivity analysis and easy manipulation."""
+    def load_data(self):
+        self.load_lci_data()
+        positions = {
+            "tech": (0, self.tech_params.shape[0]),
+            "bio": (
+                self.tech_params.shape[0],
+                self.tech_params.shape[0] + self.bio_params.shape[0]
+            )
+        }
+        params = (euf(self.tech_params), euf(self.bio_params))
+
+        if self.lcia:
+            self.load_lcia_data()
+            positions["cf"] = (
+                positions["bio"][1],
+                positions["bio"][1] + self.cf_params.shape[0]
+            )
+            params = params + (euf(self.cf_params),)
+
+        if self.weighting:
+            self.load_weighting_data()
+            positions["weighting"] = (
+                positions["bio"][1],
+                positions["bio"][1] + self.cf_params.shape[0]
+            )
+            params = params + (euf(self.weighting_params),)
+
+        self.positions = positions
+        self.params = np.hstack(params)
+        self.rng = MCRandomNumberGenerator(self.params, seed=self.seed)
+
+    def __call__(self, vector=None):
+        return self.next(vector)
+
+    def next(self, vector=None):
+        """Generate a new Monte Carlo iteration."""
+        if vector is not None:
+            self.sample = vector
+        else:
+            self.sample = self.rng.next()
+
+        self.rebuild_technosphere_matrix(self.sample[
+            self.positions["tech"][0]:self.positions["tech"][1]
+            ])
+        self.rebuild_biosphere_matrix(self.sample[
+            self.positions["bio"][0]:self.positions["bio"][1]
+            ])
+        if self.lcia:
+            self.rebuild_characterization_matrix(self.sample[
+                self.positions["cf"][0]:self.positions["cf"][1]
+            ])
+        if self.weighting:
+            self.weighting_value = self.sample[
+                self.positions["weighting"][0]:self.positions["weighting"][1]
+            ]
+
+        if not hasattr(self, "demand_array"):
+            self.build_demand_array()
+
+        self.lci_calculation()
+        if self.lcia:
+            self.lcia_calculation()
+            if self.weighting:
+                self.weighting_calculation()
+            return self.score
+        else:
+            return self.supply_array

bw2calc/monte_carlo.py

 import itertools
 import multiprocessing
 
-
-class MonteCarloLCA(LCA):
+class IterativeMonteCarlo(LCA):
+    """Base class to use iterative techniques instead of LU factorization in Monte Carlo."""
     def __init__(self, demand, method=None, iter_solver=iterative.cgs,
                  seed=None, *args, **kwargs):
-        super(MonteCarloLCA, self).__init__(demand, method=method, *args,
-                                            **kwargs)
+        super(IterativeMonteCarlo, self).__init__(demand, method=method, *args,
+                                                  **kwargs)
         self.seed = seed
         self.iter_solver = iter_solver
         self.guess = None
-        self.load_lci_data()
-        self.tech_rng = MCRandomNumberGenerator(self.tech_params, seed=seed)
-        self.bio_rng = MCRandomNumberGenerator(self.bio_params, seed=seed)
-        if method is None:
-            self.lcia = False
-        else:
-            self.lcia = True
-            self.load_lcia_data()
-            self.cf_rng = MCRandomNumberGenerator(self.cf_params, seed=seed)
+        self.lcia = method is not None
 
     def __iter__(self):
         return self
 
+    def __call__(self):
+        return self.next()
+
     def next(self):
-        self.rebuild_technosphere_matrix(self.tech_rng.next())
-        self.rebuild_biosphere_matrix(self.bio_rng.next())
-        if self.lcia:
-            self.rebuild_characterization_matrix(self.cf_rng.next())
-
-        if not hasattr(self, "demand_array"):
-            self.build_demand_array()
-
-        self.lci_calculation()
-        if self.lcia:
-            self.lcia_calculation()
-            return self.score
-        else:
-            return self.supply_array
+        raise NotImplemented
 
     def solve_linear_system(self):
         if not self.iter_solver or self.guess is None:
             solution, status = self.iter_solver(
                 self.technosphere_matrix,
                 self.demand_array,
-                x0=self.guess)
+                x0=self.guess,
+                maxiter=1000)
             if status != 0:
-                raise
+                return spsolve(
+                    self.technosphere_matrix,
+                    self.demand_array
+                )
             return solution
 
 
-class ComparativeMonteCarlo(LCA):
+class MonteCarloLCA(IterativeMonteCarlo):
+    """Monte Carlo uncertainty analysis with separate RNGs for each set of parameters."""
+    def load_data(self):
+        self.load_lci_data()
+        self.tech_rng = MCRandomNumberGenerator(self.tech_params, seed=self.seed)
+        self.bio_rng = MCRandomNumberGenerator(self.bio_params, seed=self.seed)
+        if self.lcia:
+            self.load_lcia_data()
+            self.cf_rng = MCRandomNumberGenerator(self.cf_params, seed=self.seed)
+        if self.weighting:
+            self.load_weighting_data()
+            self.weighting_rng = MCRandomNumberGenerator(self.weighting_params, seed=self.seed)
+
+    def next(self):
+        self.rebuild_technosphere_matrix(self.tech_rng.next())
+        self.rebuild_biosphere_matrix(self.bio_rng.next())
+        if self.lcia:
+            self.rebuild_characterization_matrix(self.cf_rng.next())
+        if self.weighting:
+            self.weighting_value = self.weighting_rng.next()
+
+        if not hasattr(self, "demand_array"):
+            self.build_demand_array()
+
+        self.lci_calculation()
+        if self.lcia:
+            self.lcia_calculation()
+            if self.weighting:
+                self.weighting_calculation()
+            return self.score
+        else:
+            return self.supply_array
+
+
+class ComparativeMonteCarlo(IterativeMonteCarlo):
     """First draft approach at comparative LCA"""
-    def __init__(self, demands, method=None, iter_solver=iterative.cgs,
-                 seed=None, *args, **kwargs):
+    def __init__(self, demands, *args, **kwargs):
         self.demands = demands
         # Get all possibilities for database retrieval
         demand_all = demands[0].copy()
         for other in demands[1:]:
             demand_all.update(other)
-        super(ComparativeMonteCarlo, self).__init__(demand_all, method)
-        self.seed = seed
-        self.iter_solver = iter_solver
-        self.guess = None
+        super(ComparativeMonteCarlo, self).__init__(demand_all, *args, **kwargs)
+
+    def load_data(self):
         self.load_lci_data()
         self.load_lcia_data()
         self.tech_rng = MCRandomNumberGenerator(self.tech_params, seed=seed)
         self.bio_rng = MCRandomNumberGenerator(self.bio_params, seed=seed)
         self.cf_rng = MCRandomNumberGenerator(self.cf_params, seed=seed)
 
-    def __iter__(self):
-        return self
-
     def next(self):
         self.rebuild_technosphere_matrix(self.tech_rng.next())
         self.rebuild_biosphere_matrix(self.bio_rng.next())
             results.append(self.score)
         return results
 
-    def solve_linear_system(self):
-        if not self.iter_solver or self.guess is None:
-            self.guess = spsolve(
-                self.technosphere_matrix,
-                self.demand_array)
-            return self.guess
-        else:
-            solution, status = self.iter_solver(
-                self.technosphere_matrix,
-                self.demand_array,
-                x0=self.guess)
-            if status != 0:
-                raise
-            return solution
-
-    def iterate(self):
-        raise NotImplemented
-
 
 def single_worker(demand, method, iterations):
     # demand, method, iterations = args
     mc = MonteCarloLCA(demand=demand, method=method)
+    mc.load_data()
     return [mc.next() for x in range(iterations)]
 
 

bw2calc/sensitivity/__init__.py

+from .oat_sensitivity import OATSensitivity
+from .sobol_sensitivity import mp_calculate_sensitivity, SobolSensitivity
+from .ctv import ContributionToVariance

bw2calc/sensitivity/ctv.py

+# -*- coding: utf-8 -*
+from __future__ import division
+from ..mc_vector import ParameterVectorLCA
+from scipy import stats
+import math
+import multiprocessing
+import numpy as np
+
+
+def _ctv_worker(args):
+    kwargs, iterations, mask = args
+    lca = ParameterVectorLCA(**kwargs)
+    lca.load_data()
+    results = np.zeros(iterations)
+    inputs = np.zeros((mask.sum(), iterations))
+    reference_vector = lca.params['amount']
+
+    for x in xrange(iterations):
+        sample = lca.rng.next()
+        vector = reference_vector.copy()
+        vector[mask] = sample[mask]
+        inputs[:, x] = sample[mask]
+        results[x] = lca(vector)
+
+    return inputs, results
+
+
+class ContributionToVariance(object):
+    def __init__(self, kwargs, mask=None, iterations=10000, cpus=None):
+        self.kwargs = kwargs
+        self.iterations = iterations
+        self.cpus = cpus or multiprocessing.cpu_count()
+        self.chunk_size = int(math.ceil(self.iterations / self.cpus))
+        self.lca = ParameterVectorLCA(**kwargs)
+        self.lca.load_data()
+        if mask is not None:
+            self.mask = mask
+        else:
+            self.mask = np.ones(self.lca.params.shape, dtype=bool)
+        self.n_params = self.mask.sum()
+        self.parameter_indices = np.arange(self.mask.shape[0])[self.mask]
+
+    def get_correlation_coefficient(self, x, y):
+        ranks_x = stats.rankdata(x)
+        ranks_y = stats.rankdata(y)
+        return stats.kendalltau(ranks_x, ranks_y)[0]
+
+    def calculate(self):
+        pool = multiprocessing.Pool(self.cpus)
+        args = [(self.kwargs, self.chunk_size, self.mask.copy()) for x in xrange(self.cpus)]
+        results = pool.map(_ctv_worker, args)
+
+        self.inputs = np.hstack([x[0] for x in results])
+        self.results = np.hstack([x[1] for x in results])
+
+        ctv = np.zeros(self.n_params)
+        for x in xrange(ctv.shape[0]):
+            ctv[x] = self.get_correlation_coefficient(
+                self.inputs[x, :],
+                self.results
+            )
+
+        ctv[np.isnan(ctv)] = 0
+        self.ctv = ctv ** 2 / (ctv ** 2).sum()
+        return self.ctv
+
+    def sort_ctv(self):
+        assert hasattr(self, "ctv")
+        self.sorted_results = [
+            (self.ctv[index], self.parameter_indices[index], index)
+            for index in range(self.n_params)]
+        self.sorted_results.sort(reverse=True)
+        self.sorted_results = self.sorted_results
+        return self.sorted_results

bw2calc/sensitivity/oat_sensitivity.py

+# -*- coding: utf-8 -*
+from __future__ import division
+from .. import ParameterVectorLCA
+from .percentages import get_percentages
+import math
+import multiprocessing
+import numpy as np
+
+
+def replace_one(old_values, new_values, index):
+    p = old_values.copy()
+    p[index] = new_values[index]
+    return p
+
+
+def _prescreen_worker(args):
+    start, stop, params, percentages, kwargs = args
+    lca = ParameterVectorLCA(**kwargs)
+    lca.load_data()
+    return start, stop, [
+        lca(replace_one(params, percentages, index))
+        for index in range(start, stop)]
+
+
+class OATSensitivity(object):
+    """Evaluate LCA model parameters using a simple one at a time (OAT) test to screen out unimportant parameters."""
+    def __init__(self, kwargs, percentage=0.9, seed=None, cpus=None):
+        self.kwargs = kwargs
+        self.percentage = percentage
+        self.seed = seed
+        self.cpus = cpus or multiprocessing.cpu_count()
+        self.lca = ParameterVectorLCA(**kwargs)
+        self.lca.load_data()
+        self.params = self.lca.params.copy()
+        self.chunk_size = int(math.ceil(self.params.shape[0] / self.cpus))
+        self.ref_score = self.lca(self.params['amount'].copy())
+
+    def screen(self, cutoff=100):
+        self.percentages = get_percentages(self.percentage, self.params,
+                                      seed=self.seed, num_cpus=self.cpus)
+        pool = multiprocessing.Pool(self.cpus)
+        args = [[
+            self.chunk_size * index,
+            self.chunk_size * (index + 1),
+            self.params['amount'].copy(),
+            self.percentages.copy(),
+            self.kwargs] for index in range(self.cpus)]
+        args[-1][1] = self.params.shape[0]
+        results = pool.map(_prescreen_worker, args)
+        self.delta = np.zeros(self.params.shape[0])
+        for start, stop, values in results:
+            self.delta[start:stop] = values
+        self.deltas = np.abs((self.delta - self.ref_score) / self.ref_score)
+        self.top_indices = np.argsort(self.deltas)[:-(cutoff + 1):-1]
+        return self.top_indices
+
+    def get_mask(self):
+        self.mask = np.zeros(self.params.shape, dtype=bool)
+        self.mask[self.top_indices] = True
+        return self.mask

bw2calc/sensitivity/parameter_utils.py

+# -*- coding: utf-8 -*
+from ..mc_vector import ParameterVectorLCA
+from bw2data import Database
+
+
+class ParameterNaming(object):
+    """Translate parameter indices into something meaningful to humans."""
+    def __init__(self, lca):
+        assert isinstance(lca, ParameterVectorLCA)
+        self.databases = {"biosphere": Database("biosphere").load()}
+        self.lca = lca
+        self.positions = self.lca.positions
+        self.lca.fix_dictionaries()
+        self.rt, self.rb = self.lca.reverse_dict()
+
+    def lookup(self, index):
+        kind = self.get_kind(index)
+        if kind == "weighting":
+            return "Weighting"
+        elif kind == "bio":
+            offset = self.positions['tech'][1]
+            row_key = self.rb[self.lca.bio_params[index - offset]['row']]
+            input_ds = self.databases["biosphere"][row_key]
+            col_key = self.rt[self.lca.bio_params[index - offset]['col']]
+            if col_key[0] not in self.databases:
+                self.databases[col_key[0]] = Database(col_key[0]).load()
+            output_ds = self.databases[col_key[0]][col_key]
+            return "Biosphere: %s (%s) to %s" % (
+                input_ds['name'],
+                "-".join(input_ds['categories']),
+                output_ds['name']
+            )
+        elif kind == "tech":
+            row_key = self.rt[self.lca.tech_params[index]['row']]
+            if row_key[0] not in self.databases:
+                self.databases[row_key[0]] = Database(row_key[0]).load()
+            input_ds = self.databases[row_key[0]][row_key]
+            col_key = self.rt[self.lca.tech_params[index]['col']]
+            if col_key[0] not in self.databases:
+                self.databases[col_key[0]] = Database(col_key[0]).load()
+            output_ds = self.databases[col_key[0]][col_key]
+            return "Technosphere: %s (%s) to %s (%s)" % (
+                input_ds['name'],
+                input_ds['location'],
+                output_ds['name'],
+                output_ds['location']
+            )
+        elif kind == "cf":
+            offset = self.positions['bio'][1]
+            row_key = self.rb[self.lca.cf_params[index - offset]['index']]
+            ds = self.databases["biosphere"][row_key]
+            return "CF: %s (%s)" % (ds['name'], "-".join(ds['categories']))
+
+    def get_kind(self, index):
+        for key, (lower, upper) in self.positions.iteritems():
+            if lower <= index < upper:
+                return key
+        raise ValueError("Can't find index %s in ``positions``" % index)

bw2calc/sensitivity/percentages.py

+# -*- coding: utf-8 -*
+from __future__ import division
+from stats_arrays.random import RandomNumberGenerator
+from stats_arrays import uncertainty_choices
+import numpy as np
+import multiprocessing
+
+
+def get_percentages(percentage, pa, seed=None,
+        brute_force_size=10000,
+        num_cpus=multiprocessing.cpu_count()):
+    """Get PPF (percentage point function) values for a parameter array.
+
+    Fallback to brute force if PPF function not defined for a uncertainty distribution."""
+    assert 0 <= percentage <= 1
+    num_params = pa.shape[0]
+    if num_params < num_cpus:
+        args = [(percentage, pa.copy(), seed, brute_force_size, 0)]
+    else:
+        step_size = int(num_params / num_cpus) + 1
+        args = [(
+            percentage,
+            pa[i * step_size:(i + 1) * step_size].copy(),
+            seed,
+            brute_force_size,
+            i) for i in xrange(num_cpus)]
+    pool = multiprocessing.Pool(num_cpus)
+    results = pool.map(_percentage_worker, args)
+    results.sort()
+    return np.hstack([x[1] for x in results])
+
+
+def _percentage_worker(args):
+    percentage, pa, seed, size, number = args
+    vector = np.zeros(pa.shape[0])
+    rng = RandomNumberGenerator(
+        uncertainty_choices[pa[0]['uncertainty_type']],
+        pa[0].reshape((-1,)),
+        seed=seed,
+        size=size
+    )
+    for index, row in enumerate(pa):
+        distribution = uncertainty_choices[row['uncertainty_type']]
+        try:
+            value = distribution.ppf(
+                row.reshape((1,)),
+                np.array(percentage, dtype=np.float32).reshape((1,1))
+            )
+        except:
+            data = rng.generate_random_numbers(
+                uncertainty_type=distribution,
+                params=row.reshape((1,))
+            )
+            data.sort()
+            value = data[0, int((size - 1.) * percentage)]
+        vector[index] = value
+    return (number, vector)

bw2calc/sensitivity/sobol_sensitivity.py

+# -*- coding: utf-8 -*
+from __future__ import division
+from ..mc_vector import ParameterVectorLCA
+from stats_arrays.random import MCRandomNumberGenerator
+import multiprocessing
+import numpy as np
+
+
+def _saltelli_worker(args):
+    kwargs, matrix, label = args
+    ss = SobolSensitivity()
+    model = ParameterVectorLCA(**kwargs)
+    return (label, ss.evaluate_matrix(matrix, model))
+
+
+def mp_calculate_sensitivity(params, kwargs=None, mask=None, N=250,
+                             cpus=None, seed=None):
+    """Compute sensitivity indices using *multiprocesing* and all available CPU cores.
+
+    Inputs:
+        * *params*: The
+
+    Returns:
+        * mapping of result indices to parameter indices
+        * first-order sensitivity indices
+        * global sensitivity indices
+
+    """
+    if kwargs is None:
+        kwargs = {}
+    ss = SobolSensitivity(seed)
+    pool = multiprocessing.Pool(
+        processes=cpus or multiprocessing.cpu_count()
+    )
+    matrix_a = ss.generate_matrix(params, N)
+    matrix_b = ss.generate_matrix(params, N)
+    jobs = [
+        pool.apply_async(
+            _saltelli_worker,
+            (kwargs, matrix_a.copy(), "a")
+        ),
+        pool.apply_async(
+            _saltelli_worker,
+            (kwargs, matrix_b.copy(), "b")
+        )
+    ]
+    for index in xrange(params.shape[0]):
+        if mask is not None and not mask[index]:
+            continue
+        jobs.append(pool.apply_async(
+            _saltelli_worker,
+            (kwargs, ss.generate_c(matrix_a, matrix_b, index), index)
+        ))
+    pool.close()
+    pool.join()
+    values = [x.get() for x in jobs]
+    values.sort()
+    results = np.empty((len(values) - 2, N))
+    mapping = {}
+    reference_a = filter(lambda x: x[0] == "a", values)
+    reference_b = filter(lambda x: x[0] == "b", values)
+    for index in xrange(len(values) - 2):
+        assert isinstance(values[index][0], int)
+        mapping[index] = values[index][0]
+        results[index, :] = values[index][1]
+    return (
+        mapping,
+        ss.compute_fo_sensitivity_indices(reference_a, results),
+        ss.compute_total_sensitivity_indices(reference_a, reference_b, results)
+    )
+
+
+class SobolSensitivity(object):
+    """Compute variance-based sensitivity indices using Saltelli's variation of Sobol's method.
+
+    See more infomation in the following sources:
+
+    Takes an optional *seed* argument for the random number generation.
+
+    """
+    def __init__(self, seed=None):
+        self.seed = seed
+
+    def generate_matrix(self, params, N):
+        """Evaluate the ``model`` for each row of ``matrix``.
+
+        TODO: Consider using space-filling curves or other  sequences (e.g. `Low discrepancy sequences <http://en.wikipedia.org/wiki/Constructions_of_low-discrepancy_sequences>`_ or `Sobol sequences <http://en.wikipedia.org/wiki/Sobol_sequence>`_) which can improve random sampling.
+
+        Inputs:
+            * *matrix*: The array of input values, shape (*k*, *N*).
+            * *N*: The integer number of Monte Carlo samples.
+
+        Returns a one-dimensional array of model results.
+
+        """
+        rng = MCRandomNumberGenerator(params, seed=self.seed)
+        return np.hstack([rng.next().reshape((-1, 1)) for x in xrange(N)])
+
+    def generate_c(self, array_a, array_b, index):
+        """Generate the **C** matrix, which is mostly **B** but one column of **A**.
+
+        Inputs:
+            * *array_a*: The **A** matrix.
+            * *array_b*: The **B** matrix.
+            * *index*: The integer column index to replace.
+
+        Returns the **C** matrix.
+
+        """
+        array_c = array_b.copy()
+        array_c[:, index] = array_a[:, index]
+        return array_c
+
+    def evaluate_matrix(self, matrix, model):
+        """Evaluate the ``model`` for each row of ``matrix``.
+
+        Inputs:
+            * *matrix*: The array of input values, shape (*k*, *N*).
+
+        Returns a one-dimensional array of model results.
+
+        """
+        return np.array([model(matrix[:, x]) for x in xrange(matrix.shape[1])])
+
+    def compute_fo_sensitivity_indices(self, reference_a, perturbations):
+        """Compute first order sensitivity indices.
+
+        Inputs:
+
+        ``reference_a`` is a one-dimensional array of LCA results generated from the matrix of reference values.
+
+        ``perturbations`` is a two-dimensional array of LCA results for the perturbed parameters.
+
+        ``reference_a`` has size *N*, where *N* is the number of iterations. ``perturbations`` has size (*k*, *N*), where *k* is the number of perturbed parameters.
+
+        Returns:
+            An array of sensitivity indices of shape *k*.
+        """
+        numerator = (perturbations * reference_a).sum(axis=0) -  np.average(reference_a) ** 2
+        denominator = (reference_a ** 2).sum() - np.average(reference_a) ** 2
+        return numerator / denominator
+
+    def compute_total_sensitivity_indices(self, reference_a, reference_b, perturbations):
+        """Compute total sensitivity indices.
+
+        Inputs:
+
+        ``reference_a`` is a one-dimensional array of LCA results generated from the matrix of reference values.
+
+        ``reference_b`` is a one-dimensional array of LCA results generated from the matrix of reference values, where one column was replaced with the perturbed values.
+
+        ``perturbations`` is a two-dimensional array of LCA results for the perturbed parameters.
+
+        ``reference_a`` has size *N*, ``reference_b`` has size *N*, where *N* is the number of iterations. ``perturbations`` has size (*k*, *N*), where *k* is the number of perturbed parameters.
+
+        Returns:
+            An array of sensitivity indices of shape *k*.
+        """
+        numerator = (perturbations * reference_b).sum(axis=0) -  np.average(reference_a) ** 2
+        denominator = (reference_a ** 2).sum() - np.average(reference_a) ** 2
+        return 1 - numerator / denominator
+
+    def calculate_sensitivity(self, params, model, N=250):
+        """Calculate first-order and total sensitivity indices for a set of parameters and a model.
+
+        Normally using the ``mp_calculate_sensitivity`` function`` is more efficient, as it distributes work amoung all available cores.
+
+        Inputs:
+            * *params*: a ``stats_arrays`` set of stochastic parameters.
+            * *model*: a model object, i.e. something that supports the syntax ``model(input_vector)`` syntax.
+            * *N*: Number of Monte Carlo iterations to calculate for each parameter. Should be several hundred to several thousand, depending on the linearity of the model.
+
+        Returns two one-dimensional arrays, first-order and total sensitivity indices, each of length (number of parameters).
+
+        """
+        matrix_a = self.generate_matrix(params, N)
+        matrix_b = self.generate_matrix(params, N)
+        reference_a = self.evaluate_matrix(matrix_a, model)
+        reference_b = self.evaluate_matrix(matrix_b, model)
+        k = params.shape[0]
+        results = np.empty((k, N))
+        for index in xrange(k):
+            matrix_c = self.generate_c(matrix_a, matrix_b, index)
+            results[index, :] = self.evaluate_matrix(matrix_c, model)
+        return (
+            self.compute_fo_sensitivity_indices(reference_a, results),
+            self.compute_total_sensitivity_indices(
+                reference_a,
+                reference_b,
+                results
+            )
+        )
+
+

bw2calc/tests/__init__.py

 from .lca import LCACalculationTestCase
+from .sensitivity import SobolSensitivityTestCase

bw2calc/tests/sensitivity.py

+# -*- coding: utf-8 -*
+from __future__ import division
+from stats_arrays.distributions import *
+from ..sensitivity import SobolSensitivity
+from bw2data import *
+from bw2data.tests import BW2DataTest
+import numpy as np
+
+
+class SobolSensitivityTestCase(BW2DataTest):
+    def test_generate_matrix(self):
+        params = UncertaintyBase.from_dicts(
+            {'loc': 2, 'scale': 0.2, 'uncertainty_type': NormalUncertainty.id},
+            {'loc': 1.5, 'minimum': 0, 'maximum': 10,
+                'uncertainty_type': TriangularUncertainty.id}
+        )
+        ss = SobolSensitivity()
+        matrix = ss.generate_matrix(params, 1000)
+        self.assertEqual(matrix.shape, (2, 1000))
+        self.assertTrue(1.98 < np.median(matrix[0, :]) < 2.02)
+
+    def test_seed(self):
+        params = UncertaintyBase.from_dicts(
+            {'loc': 2, 'scale': 0.2, 'uncertainty_type': NormalUncertainty.id},
+        )
+        ss = SobolSensitivity(seed=17)
+        matrix_1 = ss.generate_matrix(params, 1000)
+        ss = SobolSensitivity(seed=17)
+        matrix_2 = ss.generate_matrix(params, 1000)
+        self.assertTrue(np.allclose(matrix_1, matrix_2))
+
+    def test_c_matrix(self):
+        a = np.random.random(size=20000).reshape((100, 200))
+        b = np.random.random(size=20000).reshape((100, 200))
+        ss = SobolSensitivity()
+        c = ss.generate_c(a, b, 14)
+        self.assertTrue(np.allclose(
+            b[:, 13], c[:, 13]
+        ))
+        self.assertTrue(np.allclose(
+            a[:, 14], c[:, 14]
+        ))
+        self.assertTrue(np.allclose(
+            b[:, 15], c[:, 15]
+        ))
+
+    def test_evaluate_matrix(self):
+        matrix = np.arange(15).reshape((3, 5))
+        model = lambda x: x[1]
+        ss = SobolSensitivity()
+        results = ss.evaluate_matrix(matrix, model)
+        self.assertTrue(np.allclose(results, [5, 6, 7, 8, 9]))
     return np.hstack([pickle.load(open(os.path.join(
         dirpath, "processed", "%s.pickle" % name), "rb")
     ) for name in names])
+
+
+def extract_uncertainty_fields(array):
+    """Extract the core set of fields needed for uncertainty analysis from a parameter array"""
+    fields = ["uncertainty_type", "amount", 'loc', 'scale', 'shape', 'minimum', 'maximum', 'negative']
+    return array[fields].copy()
 # built documents.
 #
 # The short X.Y version.
-version = '0.9'
+version = '0.10'
 # The full version, including alpha/beta/rc tags.
-release = '0.9.3'
+release = '0.10.1'
 
 # The language for content autogenerated by Sphinx. Refer to documentation
 # for a list of supported languages.
 progressbar
 scipy
 voluptuous
-bw2data>=0.9.1
-bw2analyzer>=0.3.1.1
+bw2data>=0.10.1
+bw2analyzer>=0.4
 bw-stats-toolkit>=0.7
 
 setup(
     name='bw2calc',
-    version="0.9.3",
-    packages=["bw2calc", "bw2calc.tests"],
+    version="0.10.1",
+    packages=["bw2calc", "bw2calc.tests", "bw2calc.sensitivity"],
     author="Chris Mutel",
     author_email="cmutel@gmail.com",
     license=open('LICENSE.txt').read(),
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.