1. Matt Chaput
  2. marlowe


marlowe / marlowe / config.py

# Copyright 2009 Matt Chaput
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#    http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.

import logging, re
from cStringIO import StringIO
from ConfigParser import ConfigParser
from inspect import getargspec

trues = set(("true", "yes", "1"))
falses = set(("false", "no", "0"))
listsplitter = re.compile(r"\s*[,\n]\s*", re.MULTILINE)
typemap = {"int":int, "float":float, "list":list, "dict":dict, "str":str, "unicode":unicode}
forbidden = ("os.", "sys.", "gc.")

def configparser_from_string(s):
    "Returns a ConfigParser instance based on the string s."
    f = StringIO(s)
    cp = ConfigParser()
    return cp

def find_object(name, blacklist, whitelist):
    """Imports and returns an object given a fully qualified name.
    >>> find_object("whoosh.analysis.StopFilter")
    <class 'whoosh.analysis.StopFilter'>
    if blacklist:
        for pre in blacklist:
            if name.startswith(pre):
                raise TypeError("%r: can't instantiate names starting with %r" % (name, pre))
    if whitelist:
        passes = False
        for pre in whitelist:
            if name.startswith(pre):
                passes = True
        if not passes:
            raise TypeError("Can't instantiate %r" % name)
    logging.debug("Attempting to instantiate %r" % name)
    lastdot = name.rfind(".")
    assert lastdot > -1, "Name %r must be fully qualified" % name
    modname = name[:lastdot]
    clsname = name[lastdot+1:]
    logging.debug("Module=%r, class=%r" % (modname, clsname))
    mod = __import__(modname, fromlist=[clsname])
    logging.debug("Module imported: %r" % mod)
    cls = getattr(mod, clsname)
    logging.debug("Class imported: %r" % cls)
    return cls

class Configuration(object):
    """Base class for objects that build an object tree from a configuration file.
    This class is subclassed for different file formats (e.g. INI, YAML, XML).
    Subclasses must implement _object_from_section().
    :param blacklist: a list of prefixes that can't be instantiated,
        for example `("os.", "sys.", "gc.")`.
    :param whitelist: a list of prefixes that are allowed to be instantiated.
        Only object names beginning with one of the prefixes in this list may
        be specified in the configuration.
    def __init__(self, blacklist=forbidden, whitelist=None):
        self.blacklist = blacklist
        self.whitelist = whitelist
        self._cache = {}
    def _put(self, section, obj):
        if not hasattr(self, "_cache"):
            self._cache = {}
        self._cache[section] = obj
    def _is_cached(self, section):
        if not hasattr(self, "_cache"):
            return False
        return section in self._cache
    def _cached(self, section):
        return self._cache[section]
    def class_from_name(self, typename):
        if typename in typemap:
            return typemap[typename]
        return find_object(typename, self.blacklist, self.whitelist)
    def string_to_bool(self, value):
        lv = value.lower()
        if lv in trues: return True
        if lv in falses: return False
    def string_to_unicode(self, value):
        return value.decode("utf8")
    def string_to_list(self, value):
        items = listsplitter.split(value)
        return items
    def string_to_dict(self, value):
        d = {}
        items = listsplitter.split(value)
        for item in items:
            if ":" not in item:
                raise ValueError("Can't split %r into key/value" % item)
            key, value = item.split(":", 1)
            d[key] = value
        return d
    def string_to_value(self, section, key, value, cls):
        """Converts the a string value into a Python object of a given type.
        This method can use heuristics to convert values, for example when
        converting to a bool, values of 'true', 'false', 'yes', 'no', etc.
        are checked.
        Values that start with '@' are assumed to refer to an object defined
        in another "section" of the configuration file.
        >>> Configuration.string_to_value("option", "no", bool)
        :param section: the "section" in which the value is defined in the
            configuration file.
        :param key: the name of the value in the configuration file.
        :param value: the string value to convert.
        :param cls: the type to convert the value to.
        :param refprefix: if not None, values starting with this string are
            taken to be references to an object defined by another section in
            the configuration file.
        if cls in (bool, list, unicode, dict):
            return getattr(self, "string_to_" + cls.__name__)(value)
        elif cls in (tuple, set, frozenset):
            return cls(self.string_to_list(value))
        elif cls is type:
            return self.class_from_name(value)
        elif cls in (int, float):
            return cls(value)
        elif cls is str:
            return value
        raise ValueError("%s.%s: can't convert %r to %r" % (section, key, value, cls))
    def to_value(self, section, key, value, cls):
        return self.string_to_value(section, key, value, cls)
    def object_from_dict(self, cls, items, section="", types = None):
        """Instantiates and returns an object based on the arguments contained in
        the given dictionary.
        :param cls: the type to instantiate.
        :param items: a dictionary containing the arguments to the object's initializer.
        :param section: the "section" of the configuration file from which the dictionary
            is taken, used for debugging.
        :param types: a dictionary mapping initializer parameter names to
            types. The type to be instantiated will also be checked for a
            `__inittypes__` attribute.
        assert callable(cls), "%r is not callable in [%s]" % (cls, section)
        if types is None: types = {}
        types = dict(types)
        if hasattr(cls, "__inittypes__"):
        # If types are specified using string names, convert them to the
        # actual type objects.
        for tkey, t in types.items():
            if isinstance(t, str):
                types[tkey] = self.class_from_name(t)
        # Convert the arguments using the types dictionary
        for key, value in items.items():
            if key in types:
                itype = types[key]
                if not isinstance(value, itype):
                    logging.debug("Converting %r:%r to type %r" % (key, value, itype))
                    if cls is not str and isinstance(value, str):
                        items[key] = self.to_value(section, key, value, itype)
                    logging.debug("Coerced value is %r" % items[key])
        logging.debug("Calling %s with %r" % (cls, items))
            inst = cls(**items)
            return inst
        except TypeError, e:
            raise TypeError("[%s] %s: %s" % (section, cls, e.message))
    def object_from_section(self, section, defaulttype=None, inittypes=None):
        """Instantiates and returns an object based on the configuration
        options in a given "section" of the configuration file.
        :param section: name of the section containing the object specification.
        :param defaulttype: the type to instantiate, if the section does not
            specify a type explicitly.
        :param inittypes: a dictionary mapping initializer parameter names to
            types. The type to be instantiated will also be checked for a
            `__inittypes__` attribute.
        if self._is_cached(section):
            return self._cached(section)
        obj = self._object_from_section(section, defaulttype=defaulttype, inittypes=inittypes)
        self._put(section, obj)
        return obj

class IniConfiguration(Configuration):
    def __init__(self, config, blacklist=forbidden, whitelist=None,
                 typeoption="type", commentoption="_", typesep="|", objprefix="@"):
        super(IniConfiguration, self).__init__(blacklist, whitelist)
        self.config = config
        self.typeoption = typeoption
        self.commentoption = commentoption
        self.typesep = typesep
        self.objprefix = objprefix
        self._cache = {}
    def to_value(self, section, key, value, cls):
        objprefix = self.objprefix
        # Process @references
        if isinstance(value, str) and objprefix and callable(cls) and value.startswith(objprefix):
            inst = self.object_from_section(value[len(objprefix):], defaulttype=cls)
            if inst is not None and not isinstance(inst, cls):
                raise TypeError("%s.%s: %r is not %r" % (section, key, inst, cls))
            return inst
            return super(IniConfiguration, self).to_value(section, key, value, cls)
    def _object_from_section(self, section, defaulttype=None, inittypes=None):
        if not self.config.has_section(section):
            raise NameError("No section [%s]" % section)
        items = dict(self.config.items(section))
        if inittypes is None:
            inittypes = {}
        # Process any options in the form keyname|typename=value
        for key, value in items.items():
            if self.typesep in key:
                keyname, typename = key.split(self.typesep)
                inittypes[keyname] = typemap[typename]
                items[keyname] = value
                del items[key]
        # Get the type/function to instantiate/call.
        if "type" in items.keys():
            cls = self.class_from_name(items["type"])
            del items["type"]
        elif defaulttype:
            cls = defaulttype
            raise Exception("Object specification [%s] does not have %r" % (section, self.typeoption))

        # Remove the "comment" option if present
        if self.commentoption and self.commentoption in items:
            del items[self.commentoption]

        return self.object_from_dict(cls, items, section=section, inittypes=inittypes)

class EtreeConfiguration(Configuration):
    def __init__(self, tree, blacklist=forbidden, whitelist=None):
        super(EtreeConfiguration, self).__init__(blacklist, whitelist)
        self.tree = tree
    def object_from_element(self, element, defaulttype=None, inittypes=None):
    def _object_from_section(self, section, defaulttype=None, inittypes=None):

if __name__ == "__main__":
    cf = configparser_from_string("""
_= Test configuration

    c = IniConfiguration(cf)
    print "test=", c.object_from_section("test")