brightway2-data / bw2data / io / import_ecospold.py

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
# -*- coding: utf-8 -*
from __future__ import division
from .. import Database, mapping
from ..logs import get_io_logger
from ..utils import activity_hash
from ..units import normalize_units
from lxml import objectify
from stats_toolkit.distributions import *
import copy
import math
import numpy as np
import os
import pprint
import progressbar
import warnings

BIOSPHERE = ("air", "water", "soil", "resource")


class EcospoldDataExtractor(object):
    def extract(self, path, log):
        data = []
        if os.path.isdir(path):
            files = [os.path.join(path, y) for y in filter(
                lambda x: x[-4:].lower() == ".xml", os.listdir(path))]
        else:
            files = [path]
        widgets = ['Extracting data: ', progressbar.Percentage(), ' ',
            progressbar.Bar(marker=progressbar.RotatingMarker()), ' ',
            progressbar.ETA()]
        pbar = progressbar.ProgressBar(widgets=widgets, maxval=len(files)
            ).start()

        for index, filename in enumerate(files):
            root = objectify.parse(open(filename)).getroot()

            if root.tag != '{http://www.EcoInvent.org/EcoSpold01}ecoSpold':
                # Unrecognized file type
                log.critical(u"skipping %s - no ecoSpold element" % filename)
                continue

            for dataset in root.iterchildren():
                data.append(self.process_dataset(dataset))

            pbar.update(index)
        pbar.finish()
        return data

    def process_dataset(self, dataset):
        ref_func = dataset.metaInformation.processInformation.\
            referenceFunction
        data = {
            "name": ref_func.get("name"),
            "type": "process",  # True for all ecospold?
            "categories": [ref_func.get("category"), ref_func.get(
                "subCategory")],
            "location": dataset.metaInformation.processInformation.\
                geography.get("location"),
            "code": int(dataset.get("number")),
            "unit": normalize_units(ref_func.get("unit")),
            "exchanges": self.process_exchanges(dataset)
            }
        # Convert ("foo", "unspecified") to ("foo",)
        while data["categories"] and data["categories"][-1] in (
                "unspecified", None):
            data["categories"] = data["categories"][:-1]
        return data

    def process_exchanges(self, dataset):
        data = []
        # Skip definitional exchange - we assume this already
        for exc in dataset.flowData.iterchildren():
            if exc.tag == "{http://www.EcoInvent.org/EcoSpold01}exchange":
                data.append(self.process_exchange(exc, dataset))
            elif exc.tag == "{http://www.EcoInvent.org/EcoSpold01}allocation":
                data.append(self.process_allocation(exc, dataset))
            else:
                raise ValueError("Flow data type %s no understood" % exc.tag)
        return data

    def process_allocation(self, exc, dataset):
        return {
            "reference": int(exc.get("referenceToCoProduct")),
            "fraction": float(exc.get("fraction")),
            "exchanges": [int(c.text) for c in exc.iterchildren()]
        }

    def process_exchange(self, exc, dataset):
        # if exc.get("name") == dataset.metaInformation.processInformation.\
        #         referenceFunction.get("name") != None and float(
        #         exc.get("meanValue", 0.)) == 1.0:
        #     continue

        data = {
            "code": int(exc.get("number")),
            "matching": {
                "categories": (exc.get("category"), exc.get("subCategory")),
                "location": exc.get("location"),
                "unit": normalize_units(exc.get("unit")),
                "name": exc.get("name")
                }
            }

        try:
            data["group"] = int(exc.getchildren()[0].text)
        except:
            pass

        # Convert ("foo", "unspecified") to ("foo",)
        while data["matching"]["categories"] and \
                data["matching"]["categories"][-1] in ("unspecified", None):
            data["matching"]["categories"] = \
                data["matching"]["categories"][:-1]

        if exc.get("generalComment"):
            data["comment"] = exc.get("generalComment")
        return self.process_uncertainty_fields(exc, data)

    def process_uncertainty_fields(self, exc, data):
        uncertainty = int(exc.get("uncertaintyType", 0))
        mean = exc.get("meanValue")
        min_ = exc.get("minValue")
        max_ = exc.get("maxValue")
        sigma = exc.get("standardDeviation95")

        if uncertainty == 1:
            # Lognormal
            data.update({
                'uncertainty type': LognormalUncertainty.id,
                'amount': float(mean),
                'sigma': math.log(math.sqrt(float(sigma)))
                })
            if data['sigma'] == 0:
                # Bad ecoinvent data
                data['uncertainty type'] = UndefinedUncertainty.id
                del data["sigma"]
        elif uncertainty == 2:
            # Normal
            data.update({
                'uncertainty type': NormalUncertainty.id,
                'amount': float(mean),
                'sigma': float(sigma) / 2
                })
        elif uncertainty == 3:
            # Triangular
            data.update({
                'uncertainty type': TriangularUncertainty.id,
                'minimum': float(min_),
                'maximum': float(max_)
                })
            # Sometimes this isn't included (though it SHOULD BE)
            if exc.get("mostLikelyValue"):
                data['amount'] = float(exc.get("mostLikelyValue"))
            else:
                data['amount'] = float(mean)
        elif uncertainty == 4:
            # Uniform
            data.update({
                'uncertainty type': UniformUncertainty.id,
                'amount': float(mean),
                'minimum': float(min_),
                'maximum': float(max_)
                })
        else:
            # None
            data.update({
                'uncertainty type': UndefinedUncertainty.id,
                'amount': float(mean)
            })
        return data


class EcospoldImporter(object):
    """Import inventory datasets from ecospold XML format.

    Does not have any arguments; instead, instantiate the class, and then import using the ``importer`` method, i.e. ``EcospoldImporter().importer(filepath)``."""
    def importer(self, path, name, depends=["biosphere"]):
        """Import an inventory dataset, or a directory of inventory datasets.

        .. image:: images/import-method.png
            :align: center

        Args:
            *path* (str): A filepath or directory.

        """

        self.log, self.logfile = get_io_logger("lci-import")
        self.new_activities = []
        self.new_biosphere = []

        data = EcospoldDataExtractor().extract(path, self.log)
        data = self.allocate_datasets(data)
        data = self.apply_transforms(data)
        data = self.add_hashes(data)

        widgets = ['Linking exchanges:', progressbar.Percentage(), ' ',
            progressbar.Bar(marker=progressbar.RotatingMarker()), ' ',
            progressbar.ETA()]
        pbar = progressbar.ProgressBar(widgets=widgets, maxval=len(data)
            ).start()

        linked_data = []
        for index, ds in enumerate(data):
            linked_data.append(self.link_exchanges(ds, data, depends, name))
            pbar.update(index)
        pbar.finish()

        data = linked_data + self.new_activities

        if self.new_biosphere:
            self.new_biosphere = dict([((u"biosphere", o.pop("hash")), o) \
                for o in self.new_biosphere])
            biosphere = Database("biosphere")
            biosphere_data = biosphere.load()
            biosphere_data.update(self.new_biosphere)
            biosphere.write(biosphere_data)
            # biosphere.process()

        data = self.set_exchange_types(data)
        data = self.clean_exchanges(data)
        # Dictionary constructor eliminates duplicate activities
        data = dict([((name, o.pop("hash")), o) for o in data])
        self.write_database(name, data, depends)

    def allocate_datasets(self, data):
        activities = []
        for ds in data:
            multi_output = [exc for exc in ds["exchanges"] \
                if "reference" in exc]
            if multi_output:
                for activity in self.allocate_exchanges(ds):
                    activities.append(activity)
            else:
                activities.append(ds)
        return activities

    def allocate_exchanges(self, ds):
        """
Take a dataset, which has multiple outputs, and return a list of allocated datasets.

Two things change in the allocated datasets. First, the name changes to the names of the individual outputs. Second, the list of exchanges is rewritten, and only the allocated exchanges are used.

The list of ``exchanges`` looks like:

.. code-block:: python

    [
        {
        "code": 2,
        "matching": {
            "categories": data,
            "location": data,
            "unit": data,
            "name": name
            },
        "uncertainty type": data
        "amount": reference amount,
        "group": 2
        }, {
        "code": 4,
        "matching": {data},
        "uncertainty type": data
        "amount": 1,
        "group": 5
        }, {
        "reference": 2,
        "fraction": 0.5,
        "exchanges": [4,]
        }
    ]

It should be changed to:

.. code-block:: python

    [
        {
        "code": 2,
        "matching": {
            "categories": data,
            "location": data,
            "unit": data,
            "name": name
            },
        "uncertainty type": data
        "amount": number,
        "group": 2
        }, {
        "code": 4,
        "matching": {data},
        "uncertainty type": data
        "amount": 0.5,
        "group": 5
        }
    ]

Exchanges should also be copied and allocated for any other co-products.

        """
        coproduct_codes = [exc["code"] for exc in ds["exchanges"] if exc.get(
            "group", None) == 2]
        coproducts = dict([(x, copy.deepcopy(ds)) for x in coproduct_codes])
        exchanges = dict([(exc["code"], exc) for exc in ds["exchanges"
            ] if "code" in exc])
        allocations = [a for a in ds["exchanges"] if "fraction" in a]
        # First, get production amounts for each coproduct.
        # these aren't included in the allocations
        for key, product in coproducts.iteritems():
            product["exchanges"] = [exc for exc in product["exchanges"] if exc.get("code", None) == key]
        # Next, correct names, location, and unit
        for key, product in coproducts.iteritems():
            for label in ("unit", "name", "location"):
                if exchanges[key]["matching"].get(label, None):
                    product[label] = exchanges[key]["matching"][label]
        # Finally, add the allocated exchanges
        for allocation in allocations:
            if allocation["fraction"] == 0:
                continue
            product = coproducts[allocation["reference"]]
            for exc_code in allocation["exchanges"]:
                copied = copy.deepcopy(exchanges[exc_code])
                copied["amount"] = copied["amount"] * allocation["fraction"]
                product["exchanges"].append(copied)
        return coproducts.values()

    def apply_transforms(self, data):
        # Reserved for sublcasses, e.g. SimaPro import
        # where some cleaning is necessary...
        return data

    def add_hashes(self, ds):
        for o in ds:
            o["hash"] = activity_hash(o)
        return ds

    def link_exchanges(self, ds, data, depends, name):
        if self.sequential_exchanges(ds):
            del ds["code"]
        ds["exchanges"] = [self.link_exchange(exc, ds, data, depends, name
            ) for exc in ds["exchanges"]]
        return ds

    def sequential_exchanges(self, ds):
        codes = np.array([x["code"] for x in ds["exchanges"]])
        return np.allclose(np.diff(codes), np.ones(np.diff(codes).shape))

    def link_exchange(self, exc, ds, data, depends, name):
        # Has to happen before others because US LCI doesn't define categories
        # for product definitions...
        if exc.get("group", None) == 0:
            # Activity dataset production
            exc["input"] = (name, activity_hash(ds))
            return exc
        # Hack for US LCI-specific bug - both "Energy recovered"
        # and "Energy, recovered" are present
        elif exc["matching"]["categories"] == () and \
                exc["matching"]["name"] == "Recovered energy":
            exc["matching"].update(
                name="Energy, recovered",
                categories=("resource",),
                )
        elif not exc["matching"]["categories"]:
            # US LCI doesn't list categories, subcategories for
            # technosphere inputs. Try to find based on name. Need to lowercase
            # because US LCI is not consistent within itself (!!!)
            for other_ds in data:
                if other_ds["name"].lower() == \
                        exc["matching"]["name"].lower():
                    exc["input"] = (name, other_ds["hash"])
                    return exc
            # Can't find matching process - but could be a US LCI "dummy"
            # activity
            if exc["matching"]["name"][:5].lower() == "dummy":
                self.log.warning(u"New activity created by:\n%s" % \
                    pprint.pformat(exc))
                exc["input"] = (name, self.create_activity(exc["matching"]))
                return exc
            else:
                raise ValueError("Exchange can't be matched:\n%s" % \
                    pprint.pformat(exc))
        exc["hash"] = activity_hash(exc["matching"])
        if exc["matching"].get("categories", [None])[0] in BIOSPHERE:
            return self.link_biosphere(exc)
        else:
            return self.link_activity(exc, ds, data, depends, name)

    def link_biosphere(self, exc):
        exc["input"] = ("biosphere", exc["hash"])
        if (u"biosphere", exc["hash"]) in Database("biosphere").load():
            return exc
        else:
            new_flow = copy.deepcopy(exc["matching"])
            new_flow.update({
                "hash": activity_hash(exc["matching"]),
                "type": "resource" if new_flow["categories"][0] == "resource" \
                    else "emission",
                "exchanges": []
                })
            # Biosphere flows don't have locations
            del new_flow["location"]
            self.new_biosphere.append(new_flow)
            return exc

    def link_activity(self, exc, ds, data, depends, name):
        if exc["hash"] in [o["hash"] for o in data]:
            exc["input"] = (name, exc["hash"])
            return exc
        else:
            return self.link_activity_dependent_database(exc, depends, name)

    def link_activity_dependent_database(self, exc, depends, name):
        for database in depends:
            if (database, exc["hash"]) in mapping:
                exc["input"] = (database, exc["hash"])
                return exc
        # Create new activity in this database and log
        self.log.warning(u"New activity created by:\n%s" % pprint.pformat(exc))
        exc["input"] = (name, self.create_activity(exc["matching"]))
        return exc

    def create_activity(self, exc):
        exc = copy.deepcopy(exc)
        exc.update({
            "exchanges": [],
            "type": "process",
            "hash": activity_hash(exc),
            })
        self.new_activities.append(exc)
        return exc["hash"]

    def set_exchange_types(self, data):
        """Set the ``type`` attribute for each exchange, one of either (``production``, ``technosphere``, ``biosphere``). ``production`` defines the amount produced by the activity dataset (default is 1)."""
        for ds in data:
            for exc in ds["exchanges"]:
                if exc["input"][0] == "biosphere":
                    exc["type"] = "biosphere"
                elif exc["input"][1] == ds["hash"]:
                    exc["type"] = "production"
                else:
                    exc["type"] = "technosphere"
        return data

    def clean_exchanges(self, data):
        for ds in data:
            for exc in ds["exchanges"]:
                if "matching" in exc:
                    del exc["matching"]
                if "hash" in exc:
                    del exc["hash"]
        return data

    def write_database(self, name, data, depends):
        with warnings.catch_warnings():
            warnings.simplefilter("ignore")
            manager = Database(name)
            manager.register(("Ecospold", 1), depends, len(data))
            manager.write(data)
            manager.process()
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.