Commits

Chris Mutel committed fc16e37

Added parallel and multiple parallel Monte Carlo calculations

Comments (0)

Files changed (4)

 README.txt
 setup.py
 bw2calc/__init__.py
-bw2calc/errors.py
 bw2calc/fallbacks.py
 bw2calc/lca.py
 bw2calc/merge_sparse_blocks.py
 bw2calc/monte_carlo.py
+bw2calc/parallel_monte_carlo.py

bw2calc/__init__.py

+# -*- coding: utf-8 -*
 from lca import LCA
-from monte_carlo import MonteCarloLCA
+from monte_carlo import MonteCarloLCA, ParallelMonteCarlo, MultiMonteCarlo
             np.zeros(len(self.technosphere_dict)),
             self.technosphere_dict)
         for key in demand:
-            self.demand_array[mapping[key]] = self.demand[key]
+            self.demand_array[mapping[key]] = demand[key]
 
     def build_characterization_matrix(self, vector=None):
         vector = self.cf_params['amount'] if vector is None else vector
     @property
     def score(self):
         assert hasattr(self, "characterized_inventory"), "Must do LCIA first"
-        return self.characterized_inventory.sum()
+        return float(self.characterized_inventory.sum())

bw2calc/monte_carlo.py

+# -*- coding: utf-8 -*
+import itertools
 from lca import LCA
 from scipy.sparse.linalg import iterative, spsolve
 from stats_toolkit.random import MCRandomNumberGenerator
+import multiprocessing
 
 
 class MonteCarloLCA(LCA):
-    def __init__(self, activity, method=None, iter_solver=iterative.cgs,
+    def __init__(self, demand, method=None, iter_solver=iterative.cgs,
             seed=None, *args, **kwargs):
-        super(MonteCarloLCA, self).__init__(activity, method=method, *args,
+        super(MonteCarloLCA, self).__init__(demand, method=method, *args,
             **kwargs)
         self.seed = seed
         self.iter_solver = iter_solver
                 raise
             return solution
 
-    def __iter__(self):
+    def iterate(self):
+        # TODO: use yield or __iter__? What is most pythonic?
         self.next()
         return self.score
+
+
+class ParallelMonteCarlo(object):
+    """Split a Monte Carlo calculation into parallel jobs"""
+    def __init__(self, demand, method, iterations=1000, chunk_size=100):
+        self.demand = demand
+        self.method = method
+        self.chunk_size = chunk_size
+        self.num_jobs = iterations // chunk_size
+        if iterations % self.chunk_size:
+            self.num_jobs += 1
+
+    def calculate(self):
+        pool = multiprocessing.Pool(processes=multiprocessing.cpu_count() - 1)
+        results = [pool.apply_async(single_worker, (self.demand, self.method,
+            self.chunk_size)) for x in xrange(self.num_jobs)]
+        pool.close()
+        pool.join()  # Blocks until calculation is finished
+        return list(itertools.chain(*[x.get() for x in results]))
+
+
+class MultiMonteCarlo(object):
+    """
+This is a class for the efficient calculation of multiple demand vectors from
+each Monte Carlo iteration.
+    """
+    def __init__(self, demands, method, iterations):
+        self.demands = demands
+        self.method = method
+        self.iterations = iterations
+
+    def merge_dictionaries(self, *dicts):
+        r = {}
+        for dic in dicts:
+            for k, v in dic.iteritems():
+                r.setdefault(k, []).append(v)
+        return r
+
+    def calculate(self):
+        pool = multiprocessing.Pool(processes=multiprocessing.cpu_count() - 1)
+        results = [pool.apply_async(multi_worker, (self.demands, self.method)
+            ) for x in xrange(self.iterations)]
+        pool.close()
+        pool.join()  # Blocks until calculation is finished
+        return self.merge_dictionaries(*[x.get() for x in results])
+
+
+def single_worker(demand, method, iterations):
+    # demand, method, iterations = args
+    mc = MonteCarloLCA(demand=demand, method=method)
+    return [mc.iterate() for x in range(iterations)]
+
+
+def multi_worker(demands, method):
+    lca = LCA(demands[0], method)
+    lca.load_databases()
+    lca.load_method()
+    # Create new matrices
+    lca.build_technosphere_matrix(MCRandomNumberGenerator(lca.tech_params).next())
+    lca.build_biosphere_matrix(MCRandomNumberGenerator(lca.bio_params).next())
+    lca.build_characterization_matrix(MCRandomNumberGenerator(lca.cf_params).next())
+    lca.decompose_technosphere()
+    lca.lci()
+    lca.lcia()
+    results = {}
+    for demand in demands:
+        lca.redo_lcia(demand)
+        results[str(demand)] = lca.score
+    return results