Source

bubble-economy / warehouse.py

#!/usr/bin/env python
# encoding: utf-8
"""
metadata.py

seachable metadata-happy storage for trial outputs

depends on my fsdict implementation:
https://bitbucket.org/howthebodyworks/fsdict

Created by dan mackinlay on 2010-12-16.
Copyright (c) 2010 __MyCompanyName__. All rights reserved.
"""

from path import path
from fsdict import FSDict
from utils import Bunch
import collections
import hashlib
from sys import maxint
from pprint import pformat

try:
    import cPickle as pickle
except ImportError:
    import pickle

class WarehouseException(Exception):
    pass

class WarehouseIOError(WarehouseException):
    pass

def digest(obj):
    """return a hex digest of a picklable object
    Since pickle's error messages are awful, we re-raise the error
    with more info."""
    try:
        return hashlib.sha224(pickle.dumps(obj)).hexdigest()
    except Exception, exc:
        our_exc = exc.__class__(
          'Got "%s" when pickling %s' % (repr(exc), repr(obj)))
        our_exc.obj = obj
        raise our_exc
    
class Warehouse(object):
    """A (for now) FS-backed store for trial output"""
    def __init__(self, path, *args, **kwargs):
        self.path = unicode(path)
        self.fs = FSDict(work_dir=path)
        
    def __eq__(self, other):
        return unicode(self.path)==unicode(other.path)
    
    def __repr__(self):
        return "%s:%s" % (self.__class__.__name__, unicode(self.path))
    
    def store(self, box, return_stored=True):
        """serialise the box fields and metadata on the fs, returning a stored
        box that may be used instread of the original so that it may be kept
        around with light RAM requirements"""
        metadata = box._metadata
        basename = box._basename
        self.fs[basename + '._metadata'] = metadata
        for field in box._attr_cache:
            self.fs['.'.join([basename, field])] = getattr(box, field)
        #to allow us to move boxes between warehouses:
        #(Maintaining consistency in that case is your problem)
        box._warehouse = self
        # you might want to use this handy dynamically loaded, disk-backed
        # version of your data in place of the original:
        if return_stored:
            return self.fetch(box._basename)
    
    def trash(self, box):
        """remove a box, given either as a box or as a string basename, from
        the FS
        """
        if isinstance(box, Box):
            basename = box._basename
        elif isinstance(box, basestring) and len(box)==56: #don't delete things that don't look like hashes
            basename = box
        else:
            raise ValueError(
              "%s is not a valid box or box identifier" % str(box))
        files_to_kill = self.fs.work_dir.glob(basename+'.*')
        if not files_to_kill:
            raise WarehouseIOError("%s not found in %s" % (box, str(self)))
        for f in files_to_kill:
            f.unlink()
        
    def Box(self, **kwargs):
        """factory for boxes, with warehouse set correctly.
        Loads if already exists."""
        box = Box(_warehouse=self, **kwargs)
        try:
            return self.fetch(box)
        except WarehouseIOError:
            return box
        
    def fetch(self, basename, cache=True):
        """return a Box with the given name, or idempotently return a Box.
        TODO: move data repopulation logic from warehouse into Box."""
        if hasattr(basename, '_basename'):
            #wait, this isn't a basename - this is an actual box
            basename = basename._basename
        metadata_file = basename + '._metadata'
        prefix_len = len(basename) + 1
        if not metadata_file in self.fs:
            raise WarehouseIOError('no matching file "%s"' % metadata_file)
        data_attr_files = [
          fn.basename()[prefix_len:]
          for 
          fn in self.fs.work_dir.glob(basename+'.*')
        ]
        data_attr_files.remove('_metadata')
        metadata = self.fs[metadata_file]
        return Box(
          _metadata=metadata,
          _warehouse=self,
          _basename=basename,
          _stored_fields=data_attr_files,
          _cache = cache)
    
    def load_field(self, basename, field_name):
        """return the field from a given box."""
        return self.fs[basename+'.'+field_name]
          
    def get_all_metadata(self):
        """iterate over tuples of (basename, metadata) pairs"""
        prefix_len = len(self.path) + 1
        
        for metadata_file_fullpath in self.fs.work_dir.glob('*._metadata'):
            
            # Convert from basename in the filesystem sense to basename in
            # our sense- i.e. with the attribute suffix stripped, and filename
            # in the sense with no basepath
            yield (metadata_file_fullpath.basename()[:-10],
               self.fs[metadata_file_fullpath[prefix_len:]],)
        
    def find(self, filt=None, limit=maxint, cache=True):
        """iterate over all records matching a given criterion"""
        if filt is None:
            filt = lambda x: True
        i = 0
        for basename, metadata in self.get_all_metadata():
            try:
                if not filt(metadata): continue
            except (IndexError, KeyError, ValueError, AttributeError):
                # ignore common lookup failures to ease scheme changes
                continue
            yield self.fetch(basename, cache=cache)
            i += 1
            if i>=limit:
                raise StopIteration
    
    def count(self, filt=None):
        """count all records matching a given criterion"""
        return len(list(self.find(filt=filt)))
        
    def __getstate__(self):
        state = self.__dict__.copy()
        # path objects seem to occasionally resurrect gracelessly
        del(state['fs'])
        return state
    
    def __setstate__(self, state):
        self.__dict__ = state
        self.fs = FSDict(work_dir=self.path)

class MissingField(object):
    "sentinel for planned cache-misses"
    pass
    
class Box(object):
    _attr_cache = {} #so that __init__ works
    _basename = ""
    
    def __init__(self, _metadata=None,
            _warehouse=None, #when fetching from warehouse only
            _stored_fields=None, #when fetching from warehouse only
            _basename='', #when fetching from warehouse only
            _cache=True, #when fetching from warehouse only
            *args, **extra_metadata):
        metadata = {}
        if _metadata is None: _metadata = {}
        metadata.update(_metadata)
        metadata.update(extra_metadata)
        # if loaded from disk, we set this to something:
        self._warehouse = _warehouse
        metadata = Bunch(**metadata)
        #we set a digest from the metadata if this box is created freshly
        # but allow the warehouse to create the box with predefined metadata
        # to avoid the digest changing during deserialisation (which happens)
        if _basename:
            self._basename = _basename
        else:
            self._basename = self._calc_basename(metadata)
        self._metadata = metadata
        self.set_load_cache(_cache)
        #re-clear the data_attrs, or initialise if loaded from disk.
        self._attr_cache = {}
        if _stored_fields is not None:
            for field in _stored_fields:
                self._attr_cache[field] = MissingField
    
    def set_load_cache(self, status):
        self._load_cache = status
        
    def store(self, return_stored=True):
        """boxes that have warehouses know how to store themselves"""
        return self._warehouse.store(self, return_stored=return_stored)

    def clear_cache(self):
        """Be disk-backed, not memory_backed."""
        if not getattr(self, '_warehouse', False):
            raise WarehouseException(
              'attempting to delete in-memory attributes before '
              'serialising to disk')
        for field in self._attr_cache:
            self._attr_cache[field] = MissingField
        return self
        
    def __len__(self):
        return len(self._attr_cache)
        
    #equality magic
    def __eq__(self, other):
        """magic comparison operator. Two boxes are equal if they have the
        same basename and reside in the same warehouse (if they have
        warehouses)"""
        return (self._warehouse, unicode(self._basename
          )) == (other._warehouse, unicode(other._basename))
    def __ne__(self, other):
        return not self==other
    
    #sorting magic.
    def __gt__(self, other):
        "sorting should be in basename lexical order"
        return self._basename>other._basename
    def __lt__(self, other):
        return self._basename<other._basename
    def __ge__(self, other):
        return self._basename>=other._basename
    def __le__(self, other):
        return self._basename<=other._basename

    #display magic
    def __unicode__(self):
        return "%s(_basename=%s, _warehouse=%s, **%s)" % (
          self.__class__.__name__,
          unicode(self._basename),
          unicode(self._warehouse)
        )

    def __repr__(self):
        return "%s(_basename=%s, _warehouse=%s, **%s)" % (
          self.__class__.__name__,
          pformat(self._basename),
          pformat(getattr(self, '_warehouse', None)),
          pformat(getattr(self, '_metadata', None)))
    
    def set_own_attr(self, name, value):
        super(Box, self).__setattr__(name, value)
    
    def __setattr__(self, name, value):
        """records non-underscore attributes added after init, which must be
        data members"""
        # TODO: implement as descriptors
        if name.startswith('_'):
            self.set_own_attr(name, value)
        else:
            self._attr_cache[name] = value

    def del_own_attr(self, name):
        super(Box, self).__delattr__(name)
        
    def __delattr__(self, name):
        """recalls if we delete those attributes."""
        if name in self._attr_cache:
            del(self._attr_cache[name])
        else:
            self.del_own_attr(name)

    def has_own_attr(self, name):
        """For consistency and clarity, we make an unpatched hasattr"""
        return name in self.__dir__()
        
    def get_own_attr(self, name):
        super(Box, self).__getattr__(name)
    
    def __getattr__(self, name):
        """if somone tries to access a missing attribute, fall back to
        unserializing from disk, if we have a warehouse attribute to use."""
        if name in self._attr_cache and self._attr_cache[name] is not MissingField:
            #already cached
            return self._attr_cache[name]
        if not hasattr(self, '_warehouse'):
            raise AttributeError(
                'no field "%s" known and no warehouse to check for it' % name)
        if name in self._attr_cache:
            #exists but not loaded
            att = self._warehouse.load_field(self._basename, name)
            if self._load_cache:
                self._attr_cache[name] = att
            return att
        raise AttributeError('no field "%s" known' % name)

    def __dir__(self):
        """hackish attempt to make dir() work"""

        # is there a better way than this? examples sparse
        l = dir(self.__class__)
        l.extend(['_attr_cache', '_metadata', '_basename', '_warehouse'])
        l.extend(self._attr_cache)
        return l
        
    def __getstate__(self):
        state = self.__dict__.copy()
        if state['_warehouse'] is not None:
            state['__warehouse_path'] = state['_warehouse'].path
        del(state['_warehouse'])
        return state
    
    def __setstate__(self, state):
        if '__warehouse_path' in state:
            state['_warehouse'] = Warehouse(state['__warehouse_path'])
            del(state['__warehouse_path'])
        else:
            state['_warehouse'] = None
        self.__dict__ = state
        
    @staticmethod
    def _calc_basename(metadata):
        """return a unique filename prefix based on the contents of metadata
        TODO: make once-only, or otherwise handle repeated attempts to write
          to disk."""
        return digest(metadata)