Source

marlowe / marlowe / config.py

#===============================================================================
# Copyright 2009 Matt Chaput
# 
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# 
#    http://www.apache.org/licenses/LICENSE-2.0
# 
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#===============================================================================


import logging, re
from cStringIO import StringIO
from ConfigParser import ConfigParser
from inspect import getargspec


trues = set(("true", "yes", "1"))
falses = set(("false", "no", "0"))
listsplitter = re.compile(r"\s*[,\n]\s*", re.MULTILINE)
typemap = {"int":int, "float":float, "list":list, "dict":dict, "str":str, "unicode":unicode}
forbidden = ("os.", "sys.", "gc.")


def configparser_from_string(s):
    "Returns a ConfigParser instance based on the string s."
    
    f = StringIO(s)
    cp = ConfigParser()
    cp.readfp(f)
    return cp


def find_object(name, blacklist, whitelist):
    """Imports and returns an object given a fully qualified name.
    
    >>> find_object("whoosh.analysis.StopFilter")
    <class 'whoosh.analysis.StopFilter'>
    """
    
    if blacklist:
        for pre in blacklist:
            if name.startswith(pre):
                raise TypeError("%r: can't instantiate names starting with %r" % (name, pre))
    if whitelist:
        passes = False
        for pre in whitelist:
            if name.startswith(pre):
                passes = True
                break
        if not passes:
            raise TypeError("Can't instantiate %r" % name)
    
    logging.debug("Attempting to instantiate %r" % name)
    lastdot = name.rfind(".")
    
    assert lastdot > -1, "Name %r must be fully qualified" % name
    modname = name[:lastdot]
    clsname = name[lastdot+1:]
    logging.debug("Module=%r, class=%r" % (modname, clsname))
    
    mod = __import__(modname, fromlist=[clsname])
    logging.debug("Module imported: %r" % mod)
    cls = getattr(mod, clsname)
    logging.debug("Class imported: %r" % cls)
    return cls


class Configuration(object):
    """Base class for objects that build an object tree from a configuration file.
    This class is subclassed for different file formats (e.g. INI, YAML, XML).
    Subclasses must implement _object_from_section().
    
    :param blacklist: a list of prefixes that can't be instantiated,
        for example `("os.", "sys.", "gc.")`.
    :param whitelist: a list of prefixes that are allowed to be instantiated.
        Only object names beginning with one of the prefixes in this list may
        be specified in the configuration.
    """
    
    def __init__(self, blacklist=forbidden, whitelist=None):
        self.blacklist = blacklist
        self.whitelist = whitelist
        self._cache = {}
    
    def _put(self, section, obj):
        if not hasattr(self, "_cache"):
            self._cache = {}
        self._cache[section] = obj
    
    def _is_cached(self, section):
        if not hasattr(self, "_cache"):
            return False
        return section in self._cache
    
    def _cached(self, section):
        return self._cache[section]
    
    def class_from_name(self, typename):
        if typename in typemap:
            return typemap[typename]
        return find_object(typename, self.blacklist, self.whitelist)
    
    def string_to_bool(self, value):
        lv = value.lower()
        if lv in trues: return True
        if lv in falses: return False
    
    def string_to_unicode(self, value):
        return value.decode("utf8")
    
    def string_to_list(self, value):
        items = listsplitter.split(value)
        return items
    
    def string_to_dict(self, value):
        d = {}
        items = listsplitter.split(value)
        for item in items:
            if ":" not in item:
                raise ValueError("Can't split %r into key/value" % item)
            key, value = item.split(":", 1)
            d[key] = value
        return d
    
    def string_to_value(self, section, key, value, cls):
        """Converts the a string value into a Python object of a given type.
        This method can use heuristics to convert values, for example when
        converting to a bool, values of 'true', 'false', 'yes', 'no', etc.
        are checked.
        
        Values that start with '@' are assumed to refer to an object defined
        in another "section" of the configuration file.
        
        >>> Configuration.string_to_value("option", "no", bool)
        False
        
        :param section: the "section" in which the value is defined in the
            configuration file.
        :param key: the name of the value in the configuration file.
        :param value: the string value to convert.
        :param cls: the type to convert the value to.
        :param refprefix: if not None, values starting with this string are
            taken to be references to an object defined by another section in
            the configuration file.
        """
        
        if cls in (bool, list, unicode, dict):
            return getattr(self, "string_to_" + cls.__name__)(value)
        elif cls in (tuple, set, frozenset):
            return cls(self.string_to_list(value))
        elif cls is type:
            return self.class_from_name(value)
        elif cls in (int, float):
            return cls(value)
        elif cls is str:
            return value
        
        raise ValueError("%s.%s: can't convert %r to %r" % (section, key, value, cls))
    
    def to_value(self, section, key, value, cls):
        return self.string_to_value(section, key, value, cls)
    
    def object_from_dict(self, cls, items, section="", types = None):
        """Instantiates and returns an object based on the arguments contained in
        the given dictionary.
        
        :param cls: the type to instantiate.
        :param items: a dictionary containing the arguments to the object's initializer.
        :param section: the "section" of the configuration file from which the dictionary
            is taken, used for debugging.
        :param types: a dictionary mapping initializer parameter names to
            types. The type to be instantiated will also be checked for a
            `__inittypes__` attribute.
        """
        
        assert callable(cls), "%r is not callable in [%s]" % (cls, section)
        
        if types is None: types = {}
        types = dict(types)
        if hasattr(cls, "__inittypes__"):
            types.update(cls.__inittypes__)
        
        # If types are specified using string names, convert them to the
        # actual type objects.
        for tkey, t in types.items():
            if isinstance(t, str):
                types[tkey] = self.class_from_name(t)
        
        # Convert the arguments using the types dictionary
        for key, value in items.items():
            if key in types:
                itype = types[key]
                if not isinstance(value, itype):
                    logging.debug("Converting %r:%r to type %r" % (key, value, itype))
                    if cls is not str and isinstance(value, str):
                        items[key] = self.to_value(section, key, value, itype)
                    logging.debug("Coerced value is %r" % items[key])
        
        logging.debug("Calling %s with %r" % (cls, items))
        try:
            inst = cls(**items)
            return inst
        except TypeError, e:
            raise TypeError("[%s] %s: %s" % (section, cls, e.message))
        
    def object_from_section(self, section, defaulttype=None, inittypes=None):
        """Instantiates and returns an object based on the configuration
        options in a given "section" of the configuration file.
        
        :param section: name of the section containing the object specification.
        :param defaulttype: the type to instantiate, if the section does not
            specify a type explicitly.
        :param inittypes: a dictionary mapping initializer parameter names to
            types. The type to be instantiated will also be checked for a
            `__inittypes__` attribute.
        """
        
        if self._is_cached(section):
            return self._cached(section)
        obj = self._object_from_section(section, defaulttype=defaulttype, inittypes=inittypes)
        self._put(section, obj)
        return obj
        

class IniConfiguration(Configuration):
    def __init__(self, config, blacklist=forbidden, whitelist=None,
                 typeoption="type", commentoption="_", typesep="|", objprefix="@"):
        super(IniConfiguration, self).__init__(blacklist, whitelist)
        self.config = config
        self.typeoption = typeoption
        self.commentoption = commentoption
        self.typesep = typesep
        self.objprefix = objprefix
        self._cache = {}
    
    def to_value(self, section, key, value, cls):
        objprefix = self.objprefix
        # Process @references
        if isinstance(value, str) and objprefix and callable(cls) and value.startswith(objprefix):
            inst = self.object_from_section(value[len(objprefix):], defaulttype=cls)
            if inst is not None and not isinstance(inst, cls):
                raise TypeError("%s.%s: %r is not %r" % (section, key, inst, cls))
            return inst
        else:
            return super(IniConfiguration, self).to_value(section, key, value, cls)
    
    def _object_from_section(self, section, defaulttype=None, inittypes=None):
        if not self.config.has_section(section):
            raise NameError("No section [%s]" % section)
        
        items = dict(self.config.items(section))
        if inittypes is None:
            inittypes = {}
        
        # Process any options in the form keyname|typename=value
        for key, value in items.items():
            if self.typesep in key:
                keyname, typename = key.split(self.typesep)
                inittypes[keyname] = typemap[typename]
                items[keyname] = value
                del items[key]
        
        # Get the type/function to instantiate/call.
        if "type" in items.keys():
            cls = self.class_from_name(items["type"])
            del items["type"]
        elif defaulttype:
            cls = defaulttype
        else:
            raise Exception("Object specification [%s] does not have %r" % (section, self.typeoption))

        # Remove the "comment" option if present
        if self.commentoption and self.commentoption in items:
            del items[self.commentoption]

        return self.object_from_dict(cls, items, section=section, inittypes=inittypes)


class EtreeConfiguration(Configuration):
    def __init__(self, tree, blacklist=forbidden, whitelist=None):
        super(EtreeConfiguration, self).__init__(blacklist, whitelist)
        self.tree = tree
    
    def object_from_element(self, element, defaulttype=None, inittypes=None):
        
    
    def _object_from_section(self, section, defaulttype=None, inittypes=None):
        



if __name__ == "__main__":
    logging.basicConfig(level=logging.DEBUG)
    
    cf = configparser_from_string("""
[test]
_= Test configuration
type=whoosh.analysis.FancyAnalyzer
expression=\s+
gaps=yes
minsize=2
stoplist=apple
    bear
    claws

[stripper]
type=dict
hello=there
this=is
""")
    
    c = IniConfiguration(cf)
    print "test=", c.object_from_section("test")
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.