Source

tutagx / tutagx / assets.py

Full commit
"""
This module is responsible for finding and providing access to assets.
To abstract away platform differences, and catch code undermining this by
juggling with actual paths, this module supports a way to specify
assets that's entirely unlike the all supported file systems' paths.
Specifically, folder separators are replaced by dots (.) and dots are replaced
by hash signs (#).
A few examples:

- ``img/geoscape.png`` becomes ``img.geoscape#png``.
- ``config.xml`` becomes ``config#xml``.
- ``scripts/ai/sneaky/bastard.py`` becomes ``scripts.ai.sneaky.bastard#py``.

Outside this module, all assets should be referred to by that format.
For now, any / and \\\\ in identifiers is considered an error.
Likewise, multiple # (and thus, dots in folder names and multiple dots in the
file name) are not supported either.
"""
import os
import logging
import weakref
import pyglet
from tutagx.meta import from_yaml, model, validate
from tutagx.util.script import ScriptResource

log = logging.getLogger(__name__)


def asset_ident_error(ident):
    """
    Check whether an asset identifier is syntactically valid.
    Return a non-empty error message if there is an error.
    Return None if the identifier is valid.
    """
    if '/' in ident:
        return "Appears to be a path (contains /)"
    if '\\' in ident:
        return "Appears to be a path (contains \\)"
    if '#' not in ident:
        return "Has no file extension (no #)"
    if ident.count('#') > 1:
        return "Has multiple #"
    return None


def _ident_to_relpath(ident):
    # Convert an asset identifier into an absolute path.
    # Raise ValueError if given an invalid identifier.
    error = asset_ident_error(ident)
    if error:
        raise RuntimeError(error)
    parts = ident.split('.')
    base, ext = parts[-1].split('#', 1)
    base_with_ext = base + '.' + ext
    parts[-1] = base_with_ext
    relpath = os.path.join(*parts)
    assert not os.path.isabs(relpath)
    return relpath


def _relpath_to_ident(relpath):
    # Convert a relative path into an asset identifier.
    # Raise ValueError if given a path that can't be represented as
    # identifier. No directory may have a dot in its name, and the file
    # itself must have exactly one dot in its name.
    assert not os.path.isabs(relpath), relpath
    parts = relpath.split(os.sep)
    base, ext = parts[-1].split('.', 1)
    base_with_ext = base + '#' + ext
    parts[-1] = base_with_ext
    if any('.' in part for part in parts):
        raise RuntimeError('Cannot handle dot (.)')
    return '.'.join(parts)


class AssetTypeError(TypeError):
    """
    Raised if an asset is of an unexpected file type.
    """
    def __init__(self, expected, actual):
        self.expected = expected
        self.actual = actual

    def __str__(self):
        MSG = "Invalid file type {!r}, expected one of {!r}"
        return MSG.format(self.actual, self.expected)


class AssetListing(model.Model):
    is_value_type = True
    include_dirs = model.List(model.String())
    # Keys are files in this directory, values are MIME types.
    #TODO: Enum() types would be neat, for MIME type checking
    assets_here = model.Dict(model.String(), model.String())

_listing_reader = from_yaml.YAMLReader(AssetListing)


class AssetCollection:
    """
    Maintains information about assets below a given directory,
    dubbed the asset root.
    """

    IMAGE_FILE_TYPES = ("image/png", "image/gif")
    _missing_image_replacement = None

    def __init__(self, asset_root):
        if not os.path.isabs(asset_root):
            raise IOError("Need absolute path")
        self.paths = [asset_root]
        self._root = asset_root
        self._index = {}
        self._img_cache = weakref.WeakValueDictionary()
        self.reindex()

    def _abspath(self, path):
        # Turn a relative (to the asset root) path into an absolute path.
        assert os.path.isabs(self._root), self._root
        if os.path.isabs(path):
            raise ValueError('Path must be relative')
        return os.path.normpath(os.path.join(self._root, path))

    def _relpath(self, path):
        # Turn an absolute path into a path relative to the asset root.
        assert os.path.isabs(self._root), self._root
        if not os.path.isabs(path):
            raise ValueError('Path must be absolute')
        return os.path.relpath(path, self._root)

    def reindex(self):
        """
        Discard assets index and re-fill it by crawling the asset root.
        Called on __init__, but must be called manually after file system
        changes.
        """
        self._index.clear()
        del self.paths[:]
        log.debug('Indexing assets from %r', self._root)
        for dirpath, dirnames, filenames in os.walk(self._root):
            log.debug('Traversing %r searching for assets', dirpath)
            subindex = self._process_asset_dir(dirpath, dirnames)
            self._index.update(subindex)

    def _process_asset_dir(self, dirpath, dirnames):
        # Process a single asset directory: Load the listing,
        # check presence of all files and include directories, and modify the
        # list dirnames to include only the subdirs to be recursed into.
        # Return an index for the assets in this directory only (i.e. ignoring
        # include directoies).

        index = {}
        assert os.path.isabs(dirpath), dirpath
        listing_path = os.path.join(dirpath, 'assets.yaml')
        listing_rel_path = os.path.relpath(listing_path, self._root)
        listing = self._load_listing(listing_path)
        if listing is None:
            log.error('Failed to open listing file: %r', listing_rel_path)
            return {}
        for filename, mime_type in listing.assets_here.items():
            asset_path = os.path.join(dirpath, filename)
            asset_rel_path = self._relpath(asset_path)
            asset_ident = _relpath_to_ident(asset_rel_path)
            exists = self._file_exists(asset_rel_path)
            if exists:
                index[asset_ident] = (filename, mime_type)
            elif mime_type in self.IMAGE_FILE_TYPES:
                # For images, there's an easier way to make the error apparent:
                # Show them an image that's obviously a placeholder.
                log.error(
                    'Failed to open image (using placeholder instead): %s',
                    asset_ident
                )
                index[asset_ident] = (filename, mime_type)
            else:
                log.error('Failed to open asset: %r', asset_ident)
        self.paths.append(dirpath)
        dirnames[:] = self._filter_dirs(
            listing, set(dirnames), listing_rel_path
        )
        return index

    def _filter_dirs(self, listing, dirs_available, listing_name):
        # Filtering one listing's include_directories.
        # Check if they exist and gives an error messages otherwise.
        # Return a new list, containing the names of the existing directories.

        # Copy to avoid thrashing assets.yaml (in case we save it later).
        dirnames = listing.include_dirs[:]
        bad_dirnames = set()
        # We filter bad dirnames in a seperate step because removal during
        # iteration is neither possible directly (we'd have to work solely
        # with indices) nor straightfoward even with indices (we can't iterate
        # from 0 to len(dirnames), we'd have to go the other way because
        # removal moves "later" items = changes their indices).
        for i, dirname in enumerate(dirnames):
            if dirname in dirs_available:
                continue
            log.error('Include directory from listing %r not found: %r',
                    listing_name, dirname)
            if dirs_available:
                log.debug('Subdirectories available: %r',
                        ', '.join(dirs_available))
            else:
                log.debug('No subdirectories known')
            bad_dirnames.add(i)
        return [dirname for i, dirname in enumerate(dirnames)
                if i not in bad_dirnames]

    def _load_listing(self, path):
        # Try to open the asset listing at the given path.
        # Return the AssetListing objects, or None if an IOError occured.
        try:
            with open(path) as f:
                obj, errors = _listing_reader.load(f)
            if errors:
                log.error('Asset listing %r is invalid:', path)
                for err_line in validate.format_errors(errors):
                    log.error('%s', err_line)
                return None
            return obj
        except IOError:
            return None

    def _file_exists(self, relpath):
        try:
            f = open(self._abspath(relpath))
        except IOError:
            return False
        else:
            f.close()
            return True

    def asset_exists(self, ident):
        """
        Return whether the file with the given identifier is known.
        """
        return ident in self._index

    def asset_file(self, ident, mode='r'):
        """
        Open and return the asset file of the given identifier using the
        given mode. Note that this returns a plain file object, it does not
        differentiate files based on the kind of data they contain.
        Raise IOError if the file is not in the asset listings, even if the
        file exists in the underlying file system.
        May also raise IOError if the file has been removed since the last
        call to reindex(), or if permissions changed.
        """
        if not self.asset_exists(ident):
            raise IOError(
                "{!r} does not refer to a known asset".format(ident)
            )
        relpath = _ident_to_relpath(ident)
        return open(self._abspath(relpath), mode)

    def _check_typed_asset_exists(self, ident, *expected_types):
        # Helper for functions that need to:
        # * verify that ident actually refers to an asset in the index
        # * verify that the asset's file type is among a given few
        # Raise IOError if the file does not exist.
        # Raise AssetTypeError if an unexpected MIME type is found.
        if not self.asset_exists(ident):
            raise IOError(
                "{!r} does not refer to a known asset".format(ident)
            )
        filename, mime_type = self._index[ident]
        if mime_type not in expected_types:
            raise AssetTypeError(expected_types, mime_type)

    def image(self, ident):
        """
        Return a pyglet Texture for the asset with the given identifier.
        Does not raise IOError, instead returns a placeholder texture to
        make the missing image apparent to humans not looking at the log.
        Raise AssetTypeError if the file is not of a MIME type belonging
        to a supported image file format (currently: PNG, GIF).
        """
        try:
            self._check_typed_asset_exists(ident, *self.IMAGE_FILE_TYPES)
        except (IOError, AssetTypeError):
            log.debug(
                "Asset does not exist or has non-image MIME type "
                "(using placeholder): %r", ident
            )
            return self.missing_image_replacement
        relpath = _ident_to_relpath(ident)
        abspath = self._abspath(relpath)
        if abspath in self._img_cache:
            return self._img_cache[abspath]
        try:
            img = pyglet.image.load(abspath)
        except IOError:
            log.debug(
                "Failed to open image file (using placeholder): %r", abspath
            )
            return self.missing_image_replacement
        else:
            self._img_cache[abspath] = img
            return img

    def script(self, ident):
        """
        Return a :class:`tutagx.util.script.ScriptResource` for the script
        at the given relative path.
        Raises IOError if asset_file would raise.
        Also raises AssetTypeError if the file is not of a MIME type belonging
        to Python source code (text/x-python).
        """
        self._check_typed_asset_exists(ident, 'text/x-python')
        with self.asset_file(ident) as f:
            src = f.read()
        return ScriptResource(ident, src)

    @property
    def missing_image_replacement(self):
        # It's preferable to do this lazily, to avoid touching the OpenGL
        # context before the window is created.
        # This is, in turn, preferable because it allows us sidestepping
        # problems with some crappy/old drivers that do not allow sharing of
        # OpenGL contexts.
        # (Also see context-sharing-workaround.bat)
        if self._missing_image_replacement is None:
            #TODO: find out how to get descriptive text into the image, e.g.
            # "Image .../foo.png not found"
            pattern = pyglet.image.SolidColorImagePattern((255, 0, 255, 255))
            # yes this is quite large - intentionally, to be more obvious
            img = pattern.create_image(250, 250)
            self._missing_image_replacement = img.texture
        return self._missing_image_replacement