Source

brightway2-data / bw2data / database.py

Full commit
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
# -*- coding: utf-8 -*-
from . import databases, config, mapping, geomapping
from .errors import MissingIntermediateData
from .query import Query
from .data_store import DataStore
from .units import normalize_units
from .utils import natural_sort, MAX_INT_32, TYPE_DICTIONARY
from .validate import db_validator
from time import time
import datetime
import numpy as np
import os
import random
import warnings
try:
    import cPickle as pickle
except ImportError:
    import pickle


class Database(DataStore):
    """A manager for a database. This class can register or deregister databases, write intermediate data, process data to parameter arrays, query, validate, and copy databases.

    Databases are automatically versioned.

    The Database class never holds intermediate data, but it can load or write intermediate data. The only attribute is *database*, which is the name of the database being managed.

    Instantiation does not load any data. If this database is not yet registered in the metadata store, a warning is written to ``stdout``.

    Args:
        *name* (str): Name of the database to manage.

    """
    metadata = databases
    valdiator = db_validator
    dtype_fields = [
        ('input', np.uint32),
        ('output', np.uint32),
        ('row', np.uint32),
        ('col', np.uint32),
        ('type', np.uint8),
    ]

    dtype_fields_geomapping = [
        ('activity', np.uint32),
        ('geo', np.uint32),
        ('row', np.uint32),
        ('col', np.uint32),
    ]


    def backup(self):
        """Save a backup to ``backups`` folder.

        Returns:
            File path of backup.

        """
        from .io import BW2PackageExporter
        return BW2PackageExporter.export_database(self.name,
            folder="backups", extra_string="." + str(int(time()))
            )

    def copy(self, name):
        """Make a copy of the database.

        Internal links within the database will be updated to match the new database name, i.e. ``("old name", "some id")`` will be converted to ``("new name", "some id")`` for all exchanges.

        Args:
            * *name* (str): Name of the new database. Must not already exist.

        """
        assert name not in databases, ValueError("This database exists")
        data = self.relabel_data(self.load(), name)
        new_database = Database(name)
        new_database.register(
            format="Brightway2 copy",
            depends=databases[self.name]["depends"],
            num_processes=len(data)
        )
        new_database.write(data)
        return new_database

    @property
    def filename(self):
        return self.filename_for_version()

    def filename_for_version(self, version=None):
        """Filename for given version; Default is current.

        Returns:
            Filename (not path)

        """
        return "%s.%i" % (
            self.name,
            version or self.version
        )

    def load(self, version=None):
        """Load the intermediate data for this database.

        Can also load previous versions of this database's intermediate data.

        Args:
            * *version* (int): Version of the database to load. Default is *None*, for the latest version.

        Returns:
            The intermediate data, a dictionary.

        """
        self.assert_registered()
        if version is None and config.p.get("use_cache", False) and \
                self.name in config.cache:
            return config.cache[self.name]
        try:
            data = pickle.load(open(os.path.join(
                config.dir,
                u"intermediate",
                self.filename_for_version(version) + u".pickle"
            ), "rb"))
            if version is None and config.p.get("use_cache", False):
                config.cache[self.name] = data
            return data
        except OSError:
            raise MissingIntermediateData("This version (%i) not found" % version)


    def process(self, version=None):
        """
Process intermediate data from a Python dictionary to a `stats_arrays <https://pypi.python.org/pypi/stats_arrays/>`_ array, which is a `NumPy <http://numpy.scipy.org/>`_ `Structured <http://docs.scipy.org/doc/numpy/reference/generated/numpy.recarray.html#numpy.recarray>`_ `Array <http://docs.scipy.org/doc/numpy/user/basics.rec.html>`_. A structured array (also called record array) is a heterogeneous array, where each column has a different label and data type.

Processed arrays are saved in the ``processed`` directory.

Uses ``pickle`` instead of the native NumPy ``.tofile()``. Although pickle is ~2 times slower, this difference in speed has no practical effect (e.g. one twentieth of a second slower for ecoinvent 2.2), and the numpy ``fromfile`` and ``tofile`` functions don't preserve the datatype of structured arrays.

The structure for processed inventory databases includes additional columns beyond the basic ``stats_arrays`` format:

================ ======== ===================================
Column name      Type     Description
================ ======== ===================================
uncertainty_type uint8    integer type defined in `stats_arrays.uncertainty_choices`
input            uint32   integer value from `Mapping`
output           uint32   integer value from `Mapping`
geo              uint32   integer value from `GeoMapping`
row              uint32   column filled with `NaN` values, used for matrix construction
col              uint32   column filled with `NaN` values, used for matrix construction
type             uint8    integer type defined in `bw2data.utils.TYPE_DICTIONARY`
amount           float32  amount without uncertainty
loc              float32  location parameter, e.g. mean
scale            float32  scale parameter, e.g. standard deviation
shape            float32  shape parameter
minimum          float32  minimum bound
maximum          float32  maximum bound
negative         bool     `amount` < 0
================ ======== ===================================

See also `NumPy data types <http://docs.scipy.org/doc/numpy/user/basics.types.html>`_.

Args:
    * *version* (int, optional): The version of the database to process

Doesn't return anything, but writes a file to disk.

        """
        data = self.load(version)
        num_exchanges = sum([len(obj["exchanges"]) for obj in data.values()])

        gl = config.global_location

        # Create geomapping array
        arr = np.zeros((len(data), ), dtype=self.dtype_fields_geomapping + self.base_uncertainty_fields)
        for index, key in enumerate(sorted(data.keys(), key=lambda x: x[1])):
            arr[index] = (
                mapping[key],
                geomapping[data[key].get("location", gl) or gl],
                MAX_INT_32, MAX_INT_32,
                0, 1, np.NaN, np.NaN, np.NaN, np.NaN, np.NaN, False
            )

        filepath = os.path.join(
            config.dir,
            u"processed",
            self.name + u".geomapping.pickle"
        )
        with open(filepath, "wb") as f:
            pickle.dump(arr, f, protocol=pickle.HIGHEST_PROTOCOL)

        arr = np.zeros((num_exchanges + len(data), ), dtype=self.dtype)
        count = 0
        for key in sorted(data.keys(), key=lambda x: x[1]):
            production_found = False
            for exc in sorted(
                    data[key]["exchanges"],
                    key=lambda x: x["input"][1]):
                if key == exc["input"]:
                    production_found = True
                arr[count] = (
                    mapping[exc["input"]],
                    mapping[key],
                    MAX_INT_32,
                    MAX_INT_32,
                    TYPE_DICTIONARY[exc["type"]],
                    exc.get("uncertainty type", 0),
                    exc["amount"],
                    exc.get("loc", np.NaN),
                    exc.get("scale", np.NaN),
                    exc.get("shape", np.NaN),
                    exc.get("minimum", np.NaN),
                    exc.get("maximum", np.NaN),
                    exc["amount"] < 0
                )
                count += 1
            if not production_found and data[key]["type"] == "process":
                # Add amount produced for each process (default 1)
                arr[count] = (
                    mapping[key], mapping[key],
                    MAX_INT_32, MAX_INT_32, TYPE_DICTIONARY["production"],
                    0, 1, np.NaN, np.NaN, np.NaN, np.NaN, np.NaN, False
                )
                count += 1

        # The array is too big, because it can include a default production
        # amount for each activity. Trim to actual size.
        arr = arr[:count]
        filepath = os.path.join(
            config.dir,
            u"processed",
            self.name + u".pickle"
        )
        with open(filepath, "wb") as f:
            pickle.dump(arr, f, protocol=pickle.HIGHEST_PROTOCOL)


    def query(self, *queries):
        """Search through the database. See :class:`query.Query` for details."""
        return Query(*queries)(self.load())

    def random(self):
        """Return a random activity key.

        Returns a random activity key, or ``None`` (and issues a warning) if the current database is empty."""
        keys = self.load().keys()
        if not keys:
            warnings.warn("This database is empty")
            return None
        else:
            return random.choice(keys)

    def register(self, depends=None, **kwargs):
        """Register a database with the metadata store.

        Databases must be registered before data can be written.

        Args:
            * *format* (str): Format that the database was converted from, e.g. "Ecospold"
            * *depends* (list): Names of the databases that this database references, e.g. "biosphere"
            * *num_processes* (int): Number of processes in this database.

        """
        kwargs.update(
            depends=depends or [],
            version=kwargs.get('version', None) or 0
        )
        super(Database, self).register(**kwargs)

    def relabel_data(self, data, new_name):
        """Relabel database keys and exchanges.

        In a database which internally refer to the same database, update to new database name ``new_name``.

        Needed to copy a database completely or cut out a section of a database.

        For example:

        .. code-block:: python

            data = {
                ("old and boring", 1):
                    {"exchanges": [
                        {"input": ("old and boring", 42),
                        "amount": 1.0},
                        ]
                    },
                ("old and boring", 2):
                    {"exchanges": [
                        {"input": ("old and boring", 1),
                        "amount": 4.0}
                        ]
                    }
                }
            print relabel_database(data, "shiny new")
            >> {
                ("shiny new", 1):
                    {"exchanges": [
                        {"input": ("old and boring", 42),
                        "amount": 1.0},
                        ]
                    },
                ("shiny new", 2):
                    {"exchanges": [
                        {"input": ("shiny new", 1),
                        "amount": 4.0}
                        ]
                    }
                }

        In the example, the exchange to ``("old and boring", 42)`` does not change, as this is not part of the updated data.

        Args:
            * *data* (dict): The database data to modify
            * *new_name* (str): The name of the modified database

        Returns:
            The modified database

        """
        def relabel_exchanges(obj, new_name):
            for e in obj['exchanges']:
                if e["input"] in data:
                    e["input"] = (new_name, e["input"][1])
            return obj

        return dict([((new_name, k[1]), relabel_exchanges(v, new_name)) \
            for k, v in data.iteritems()])

    def rename(self, name):
        """Rename a database. Modifies exchanges to link to new name. Deregisters old database.

        Args:
            * *name* (str): New name.

        Returns:
            New ``Database`` object.

        """
        old_name = self.name
        with warnings.catch_warnings():
            warnings.simplefilter("ignore")
            new_db = Database(name)
            databases[name] = databases[old_name]
        new_data = self.relabel_data(self.load(), name)
        new_db.write(new_data)
        new_db.process()
        del databases[old_name]
        return new_db

    def revert(self, version):
        """Return data to a previous state.

        .. warning:: Reverted changes can be overwritten.

        Args:
            * *version* (int): Number of the version to revert to.

        """
        assert version in [x[0] for x in self.versions()], "Version not found"
        self.backup()
        databases[self.name]["version"] = version
        if config.p.get("use_cache", False) and self.name in config.cache:
            config.cache[self.name] = self.load(version)
        self.process(version)

    @property
    def version(self):
        """The current version number (integer) of this database.

        Returns:
            Version number

        """
        return databases.version(self.name)

    def versions(self):
        """Get a list of available versions of this database.

        Returns:
            List of (version, datetime created) tuples.

        """
        directory = os.path.join(config.dir, "intermediate")
        files = natural_sort(filter(
            lambda x: ".".join(x.split(".")[:-2]) == self.name,
            os.listdir(directory)))
        return sorted([(int(name.split(".")[-2]),
            datetime.datetime.fromtimestamp(os.stat(os.path.join(
            config.dir, directory, name)).st_mtime)) for name in files])


    def write(self, data):
        """Serialize data to disk.

        Args:
            * *data* (dict): Inventory data

        """
        self.assert_registered()
        databases.increment_version(self.name, len(data))
        mapping.add(data.keys())
        for ds in data.values():
            if 'unit' in ds:
                ds["unit"] = normalize_units(ds["unit"])
        geomapping.add({x["location"] for x in data.values() if
                       x.get("location", False)})
        if config.p.get("use_cache", False) and self.name in config.cache:
            config.cache[self.name] = data
        filepath = os.path.join(
            config.dir,
            u"intermediate",
            self.filename + u".pickle"
        )
        with open(filepath, "wb") as f:
            pickle.dump(data, f, protocol=pickle.HIGHEST_PROTOCOL)