Commits

Kirill Simonov  committed ac29c16

Refactored and renamed DDT -> Cogs.

  • Participants
  • Parent commits f4a934c

Comments (0)

Files changed (14)

-*****************************************************************
-  DDT -- Development, Deployment and Testing automation toolkit
-*****************************************************************
+********************************************************************
+  Cogs - A toolkit for developing command-line utilities in Python
+********************************************************************
 
 Overview
 ========
 
-DDT is a toolkit for developing command-line utilities in Python.  It
+Cogs is a toolkit for developing command-line utilities in Python.  It
 handles common operations such as parsing command-line parameters,
 dispatching commands, loading configuration files and is targeted
 to developers, sysadmins, testers, or anybody else who needs to script
 their routine tasks.
 
-DDT is a free software licensed under MIT license.  DDT is written by
+Cogs is a free software licensed under MIT license.  Cogs is written by
 Kirill Simonov from Prometheus Research, LLC.
 
 
 Getting Started
 ===============
 
-You can install DDT using `PIP package manager`_::
+You can install Cogs using `PIP package manager`_::
 
-    # pip install DDT
+    # pip install Cogs
 
 .. _PIP package manager: http://www.pip-installer.org/
 
-This operation installs a command-line utility ``ddt`` and a Python
-package of the same name.  The ``ddt`` utility is a dispatcher which
+This operation installs a command-line utility ``cogs`` and a Python
+package of the same name.  The ``cogs`` utility is a dispatcher which
 which lets you select and execute your scripts (called *tasks*).  Let us
 show how to do it with a simple example.
 
-In the current directory, make a subdirectory ``ddt.local`` and create a
+In the current directory, make a subdirectory ``cogs.local`` and create a
 file ``__init__.py`` with the following content::
 
-    from ddt import task, log
+    from cogs import task, log
     import os
 
     @task
 
 Now run::
 
-    $ ddt hello world
+    $ cogs hello world
     Hello, World!
 
-    $ ddt hello
+    $ cogs hello
     Hello, Xi!
 
-    $ ddt help hello
+    $ cogs help hello
     HELLO - greet the given entity (if not specified, the current user)
-    Usage: ddt hello [<name>]
+    Usage: cogs hello [<name>]
 
-DDT converts function ``Hello()`` into a command-line script with
+Cogs converts function ``Hello()`` into a command-line script with
 parameters inferred from the function signature, so then when you
-execute a command ``ddt hello world``, you call a function
+execute a command ``cogs hello world``, you call a function
 ``Hello('world')``.
 
 
 Loading Extensions
 ==================
 
-In this section, we describe how DDT finds and loads extensions.
+In this section, we describe how Cogs finds and loads extensions.
 
-DDT loads extensions from two places:
+Cogs loads extensions from two places:
 
-* ``ddt.local`` subdirectory in the current directory;
-* all Python packages under ``ddt.extensions`` entry point.
+* ``cogs.local`` subdirectory in the current directory;
+* all Python packages under ``cogs.extensions`` entry point.
 
-The easiest way to add an extension is to create a ``ddt.local``
+The easiest way to add an extension is to create a ``cogs.local``
 subdirectory and add your scripts there.  The subdirectory must contain
-an ``__init__.py`` script, which is executed by DDT on startup.  The
-``ddt.local`` subdirectory must be owned by the same user who runs the
-``ddt`` script, or by ``root``; otherwise it is ignored.
+an ``__init__.py`` script, which is executed by Cogs on startup.  The
+``cogs.local`` subdirectory must be owned by the same user who runs the
+``cogs`` script, or by ``root``; otherwise it is ignored.
 
-If you need to package and distribute your DDT extensions, using
-``ddt.local`` may be inconvenient.  In this case, you may package your
-DDT extensions as a regular Python distribution.
+If you need to package and distribute your Cogs extensions, using
+``cogs.local`` may be inconvenient.  In this case, you may package your
+Cogs extensions as a regular Python distribution.
 
 Suppose we want to pack the ``hello`` task as a separate package.
 Create a directory tree with the following structure::
 
-    DDT-Hello/
+    Cogs-Hello/
         src/
-            ddt_hello/
+            cogs_hello/
                 __init__.py
         setup.py
 
-The file ``ddt_hello/__init__.py`` contains the definition of the
-``hello`` task and has the same content as ``ddt.local/__init__.py`` in
+The file ``cogs_hello/__init__.py`` contains the definition of the
+``hello`` task and has the same content as ``cogs.local/__init__.py`` in
 our previous example.
 
 The file ``setup.py`` contains the meta-data of the package and may have
     from setuptools import setup
 
     setup(
-        name='DDT-Hello',
+        name='Cogs-Hello',
         version='0.0.1',
-        description="""A DDT task to greet somebody""",
-        packages=['ddt_hello'],
+        description="""A Cogs task to greet somebody""",
+        packages=['cogs_hello'],
         package_dir={'': 'src'},
-        install_requires=['DDT'],
-        entry_points={ 'ddt.extensions': ['Hello = ddt_hello'] },
+        install_requires=['Cogs'],
+        entry_points={ 'cogs.extensions': ['Hello = cogs_hello'] },
     )
 
 Note the parameter ``entry_points`` in ``setup()`` invocation; it adds
-an entry point ``ddt.extensions`` named ``Hello`` that refers to package
-``ddt_hello``.  On startup, DDT finds and loads all packages defined for
-the entry point ``ddt.extensions``.
+an entry point ``cogs.extensions`` named ``Hello`` that refers to package
+``cogs_hello``.  On startup, Cogs finds and loads all packages defined for
+the entry point ``cogs.extensions``.
 
 
 Defining Tasks
 A task can be created from a function or a class by augmenting it with
 the ``task`` decorator::
 
-    from ddt import task, argument, log, fail
+    from cogs import task, argument, log, fail
 
     @task
     def Factorial(n):
 
 You can now execute the tasks by running::
 
-    $ ddt factorial 10
+    $ cogs factorial 10
     10! = 3628800
 
-    $ ddt fibonacci 10
+    $ cogs fibonacci 10
     F_10 = 55
 
-DDT uses the name of the function or the class as the task identifier.
+Cogs uses the name of the function or the class as the task identifier.
 The name is normalized: it is converted to lower case and has all
 underscore characters converted to the dash symbol.
 
 If the task is derived from a function, the task arguments are inferred
-from the function signature.  DDT executes such a task by calling the
+from the function signature.  Cogs executes such a task by calling the
 function with the parsed command-line parameters.
 
 If the task is derived from a class, the task arguments and options must
 be defined using ``argument()`` and ``option()`` descriptors.  To
-execute a task, DDT creates an instance of the class passing the task
-parameters as the constructor arguments.  Then DDT invokes the
+execute a task, Cogs creates an instance of the class passing the task
+parameters as the constructor arguments.  Then Cogs invokes the
 ``__call__`` method of the instance.  Thus the call of::
 
-    $ ddt factorial 10
+    $ cogs factorial 10
 
 is translated to::
 
 
 and the call of::
 
-    $ ddt fibonacci 10
+    $ cogs fibonacci 10
 
 is translated to::
 
 The docstring of the function or the class becomes the task
 description::
 
-    $ ddt help factorial
+    $ cogs help factorial
     FACTORIAL - calculate n!
-    Usage: ddt factorial <n>
+    Usage: cogs factorial <n>
 
     This task calculates the value of the factorial of the given
     positive number n.  Factorial of n, also known as n!, is
 
         n! = 1*2*...*(n-1)*n
 
-    $ ddt help fibonacci
-    Usage: ddt fibonacci <n>
+    $ cogs help fibonacci
+    Usage: cogs fibonacci <n>
 
     The n-th Fibonacci number F_n is defined by:
 
 a task derived from a class, use the ``option()`` descriptor.  For
 example::
 
-    from ddt import task, argument, option
+    from cogs import task, argument, option
     import sys, os
 
     @task
 You can execute this task with option ``--output`` or ``-o`` to redirect
 the output to a file::
 
-    $ ddt write-hello world -o hello.txt
+    $ cogs write-hello world -o hello.txt
 
 
 Configuration and Environment
 =============================
 
-DDT allows you to define custom configuration parameters.  For example::
+Cogs allows you to define custom configuration parameters.  For example::
 
-    from ddt import env, task, setting, log
+    from cogs import env, task, setting, log
     import os
 
     @setting
 ``default-name``.  One way to do it is to use global option
 ``--default-name``::
 
-    $ ddt --default-name=world hello-with-configuration
+    $ cogs --default-name=world hello-with-configuration
 
 You could also pass a configuration parameter using an environment
 variable::
 
-    $ DDT_DEFAULT_NAME=world ddt hello-with-configuration
+    $ COGS_DEFAULT_NAME=world cogs hello-with-configuration
 
 Alternatively, you can put parameters to a configuration file.  In the
-current directory, create a file ``ddt.conf`` with the following
+current directory, create a file ``cogs.conf`` with the following
 content::
 
     default-name: world
 
 Now run::
 
-    $ ddt hello-with-configuration
+    $ cogs hello-with-configuration
 
-DDT reads configuration from the following locations:
+Cogs reads configuration from the following locations:
 
-* ``/etc/ddt.conf``
-* ``$PREFIX/etc/ddt.conf``
-* ``$HOME/.ddt.conf``
-* ``./ddt.conf``
+* ``/etc/cogs.conf``
+* ``$PREFIX/etc/cogs.conf``
+* ``$HOME/.cogs/cogs.conf``
+* ``./cogs.conf``
 * program environment
-* command-line options
+* command-line parameters
 
 To create a new configuration parameter, wrap a function named after the
 parameter with the ``@setting`` decorator.  The function must accept
 parameter is not specified explicitly, and is called with the value of
 the parameter is it was set using one of the methods described above.
 
-DDT does not impose any rules on what to do with the parameter value,
+Cogs does not impose any rules on what to do with the parameter value,
 but we recommend to store the value in the global ``env`` variable.  The
 call of ``env.add(default_name=name)`` adds a new parameter
 ``default_name`` which could then be accessed as ``env.default_name``.
 API Reference
 =============
 
-The following functions and classes are defined in the package ``ddt``:
+The following functions and classes are defined in the package ``cogs``:
 
 ``@task``
 
     value of the setting.  The function is responsible for storing the
     value in the ``env`` object.
 
-``argument(check=None, default=None, is_optional=False, is_list=False)``
+``argument(check, default, plural=False)``
 
     Describe a task argument.
 
 
     ``default``
         The default value to be used if the argument is optional and not
-        specified.
+        specified.  If this parameter is not set, the argument is mandatory.
 
-    ``is_optional``
-        If set, the argument can be omitted.  An optional argument must
-        not appear before any mandatory argument.
-
-    ``is_list``
+    ``plural``
         If set, the argument consumes all the remaining command-line
         parameters.  Must be the last argument specified.
 
 
-``option(key=None, check=None, default=None, has_value=False, value_name=None, hint=None)``
+``option(key, check, default, plural=False, value_name=None, hint=None)``
 
     Describe a task option.
 
         exception on error.
 
     ``default``
-        The default value used when the option is not specified.
+        The default value used when the option is not specified.  If this
+        parameter is not set, the option does not accept a value.  Such an
+        option is treated is a toggle and takes a value ``True`` if set
+        and ``False`` if not set.
 
-    ``has_value``
-        If set, the option takes a value.  If not set, the option is a
-        toggle with the value of ``False`` if the option is not
-        specified; ``True`` otherwise.
+    ``plural``
+        If set, indicates that the option could be specified more than once.
 
     ``value_name``
         The preferred name for the option value; used for the task
     other system state with a respective ``debug()`` call.
 
     Add command-line parameter ``--debug`` or set environment variable
-    ``DDT_DEBUG=1`` to see debug output.
+    ``COGS_DEBUG=1`` to see debug output.
 
 ``warn(*msgs, sep=' ', end='\n', file=sys.stderr)``
     Display a warning.  ``warn()`` should be used for reporting error
 from setuptools import setup, find_packages
 
 
-NAME = "DDT"
-VERSION = "0.1.1" # FIXME: synchronize with `ddt.__version__`?
-DESCRIPTION = """Development, Deployment and Testing automation toolkit"""
+NAME = "Cogs"
+VERSION = "0.1.1"
+DESCRIPTION = """A toolkit for developing command-line utilities in Python"""
 LONG_DESCRIPTION = open('README', 'r').read()
 AUTHOR = """Kirill Simonov (Prometheus Research, LLC)"""
 LICENSE = "MIT"
 INSTALL_REQUIRES = ['setuptools', 'PyYAML']
 ENTRY_POINTS = {
     'console_scripts': [
-        'ddt = ddt.run:run',
+        'cogs = cogs:run',
     ],
-    'ddt.extensions': [],
+    'cogs.extensions': [],
 }
 
 

File src/cogs/__init__.py

+#
+# Copyright (c) 2012, Prometheus Research, LLC
+# Released under MIT license, see `LICENSE` for details.
+#
+
+
+from .shell import Shell, ArgDsc, OptDsc, Failure
+from .sysutils import SysUtils
+from .logutils import LogUtils
+from .std import register_std
+import sys
+import os.path
+
+
+cogs = Shell(name="Cogs",
+             description="""A task dispatching utility""",
+             local_package='cogs.local',
+             entry_point='cogs.extensions',
+             config_name='cogs.conf',
+             config_dirs=['/etc',
+                          os.path.join(sys.prefix, '/etc'),
+                          os.path.expanduser('~/.cogs'),
+                          os.path.abspath('.')])
+env = cogs.environment
+task = cogs.register_task
+default_task = cogs.register_default_task
+setting = cogs.register_setting
+argument = ArgDsc
+option = OptDsc
+
+
+sys_utils = SysUtils(cogs)
+cp = sys_utils.cp
+mv = sys_utils.mv
+rm = sys_utils.rm
+mktree = sys_utils.mktree
+rmtree = sys_utils.rmtree
+exe = sys_utils.exe
+sh = sys_utils.sh
+pipe = sys_utils.pipe
+
+
+log_utils = LogUtils(cogs)
+log = log_utils.log
+debug = log_utils.debug
+warn = log_utils.warn
+fail = log_utils.fail
+prompt = log_utils.prompt
+
+
+register_std(cogs)
+
+
+run = cogs.run
+
+

File src/cogs/logutils.py

+#
+# Copyright (c) 2012, Prometheus Research, LLC
+# Released under MIT license, see `LICENSE` for details.
+#
+
+
+from .shell import Failure
+import sys
+import os
+import re
+
+
+class LogUtils(object):
+
+    S_RESET = 0
+    S_BRIGHT = 1
+    S_DIM = 2
+    S_UNDERSCORE = 4
+    S_BLINK = 5
+    S_REVERSE = 7
+    S_HIDDEN = 8
+    FG_BLACK = 30
+    FG_RED = 31
+    FG_GREEN = 32
+    FG_YELLOW = 33
+    FG_BLUE = 34
+    FG_MAGENTA = 35
+    FG_CYAN = 36
+    FG_WHITE = 37
+    BG_BLACK = 40
+    BG_RED = 41
+    BG_GREEN = 42
+    BG_YELLOW = 43
+    BG_BLUE = 44
+    BG_MAGENTA = 45
+    BG_CYAN = 46
+    BG_WHITE = 47
+
+    styles = {
+        None: [S_BRIGHT],
+        'footnote': [S_DIM],
+        'warning': [S_BRIGHT, FG_RED],
+        'success': [S_BRIGHT, FG_GREEN],
+    }
+
+    def __init__(self, shell):
+        self.shell = shell
+        self.env = shell.environment
+
+    def _colorize(self, msg, has_colors=True):
+        def replace(match):
+            indicator = match.group('indicator')
+            data = match.group('data')
+            assert indicator in self.styles, "unknown style %r" % indicator
+            if not has_colors:
+                return data
+            lesc = "\x1b[%sm" % ";".join(str(style)
+                                         for style in self.styles[indicator])
+            resc = "\x1b[%sm" % self.S_RESET
+            return lesc+data+resc
+        return re.sub(r"(?::(?P<indicator>[a-zA-Z]+):)?`(?P<data>[^`]*)`",
+                      replace, msg)
+
+    def log(self, *msgs, **opts):
+        sep = opts.pop('sep', " ")
+        end = opts.pop('end', "\n")
+        file = opts.pop('file', sys.stdout)
+        assert not opts, opts
+        has_colors = (file.isatty() and os.environ.get('COLORTERM'))
+        data = sep.join(self._colorize(str(msg), has_colors)
+                        for msg in msgs) + end
+        file.write(data)
+        file.flush()
+
+    def debug(self, *msgs, **opts):
+        if self.env.debug:
+            return self.log(":footnote:`...`", file=sys.stderr, *msgs, **opts)
+
+    def warn(self, *msgs, **opts):
+        return self.log(":warning:`WARNING`:", file=sys.stderr, *msgs, **opts)
+
+    def fail(self, *msgs, **opts):
+        self.log(":warning:`FATAL ERROR`:", file=sys.stderr, *msgs, **opts)
+        return Failure()
+
+    def prompt(self, msg):
+        value = ""
+        while not value:
+            value = raw_input(msg+" ").strip()
+        return value
+
+

File src/cogs/shell.py

+#
+# Copyright (c) 2012, Prometheus Research, LLC
+# Released under MIT license, see `LICENSE` for details.
+#
+
+
+import sys
+import re
+import os, os.path
+import itertools
+import types
+import imputil; imputil._os_stat = os.stat
+import pkg_resources
+import yaml
+
+
+class Failure(Exception):
+    pass
+
+
+class Environment(object):
+
+    __slots__ = ('_states', '__dict__')
+
+    class _context(object):
+
+        def __init__(self, owner, **updates):
+            self.owner = owner
+            self.updates = updates
+
+        def __enter__(self):
+            self.owner.push(**self.updates)
+
+        def __exit__(self, exc_type, exc_value, exc_tb):
+            self.owner.pop()
+
+    def __init__(self):
+        self._states = []
+
+    def clear(self):
+        self.__dict__.clear()
+
+    def add(self, **updates):
+        for key in sorted(updates):
+            assert not key.startswith('_'), \
+                    "parameter should not start with '_': %r" % key
+            assert key not in self.__dict__, \
+                    "duplicate parameter %r" % key
+            self.__dict__[key] = updates[key]
+
+    def set(self, **updates):
+        for key in sorted(updates):
+            assert key in self.__dict__, \
+                    "unknown parameter %r" % key
+            self.__dict__[key] = updates[key]
+
+    def push(self, **updates):
+        self._states.append(self.__dict__)
+        self.__dict__ = self.__dict__.copy()
+        self.set(**updates)
+
+    def pop(self):
+        assert self._states, "unbalanced pop()"
+        self.__dict__ = self._states.pop()
+
+    def __call__(self, **updates):
+        return self._context(self, **updates)
+
+
+class ArgDsc(object):
+
+    CTR = itertools.count(1)
+    REQ = object()
+
+    def __init__(self, check=None, default=REQ, plural=False):
+        assert isinstance(plural, bool)
+        self.check = check
+        self.default = default
+        self.plural = plural
+        self.order = next(self.CTR)
+
+    def __get__(self, instance, owner):
+        if instance is None:
+            return self
+        raise AttributeError("unset argument")
+
+
+class OptDsc(object):
+
+    CTR = itertools.count(1)
+    NOVAL = object()
+
+    def __init__(self, key=None, check=None, default=NOVAL, plural=False,
+                 value_name=None, hint=None):
+        assert key is None or (isinstance(key, str) and
+                               re.match(r'^[a-zA-Z]$', key)), \
+                "key must be a letter, got %r" % key
+        assert isinstance(plural, bool)
+        assert value_name is None or isinstance(value_name, str)
+        assert hint is None or isinstance(hint, str)
+        self.key = key
+        self.check = check
+        self.default = default
+        self.plural = plural
+        self.value_name = value_name
+        self.hint = hint
+        self.order = next(self.CTR)
+
+
+class TaskSpec(object):
+
+    def __init__(self, name, code, args, opts,
+                 usage=None, hint=None, help=None):
+        self.name = name
+        self.code = code
+        self.args = args
+        self.opts = opts
+        self.usage = usage
+        self.hint = hint
+        self.help = help
+        self.opt_by_name = {}
+        self.opt_by_key = {}
+        for opt in self.opts:
+            self.opt_by_name[opt.name] = opt
+            if opt.key is not None:
+                self.opt_by_key[opt.key] = opt
+
+
+class SettingSpec(object):
+
+    def __init__(self, name, code, has_value=False, value_name=None,
+                 usage=None, usage_conf=None, usage_environ=None,
+                 hint=None, help=None):
+        self.name = name
+        self.code = code
+        self.has_value = has_value
+        self.value_name = value_name
+        self.usage = usage
+        self.usage_conf = usage_conf
+        self.usage_environ = usage_environ
+        self.hint = hint
+        self.help = help
+
+
+class ArgSpec(object):
+
+    def __init__(self, attr, name, check, default,
+                 is_optional=False, is_plural=False):
+        self.attr = attr
+        self.name = name
+        self.check = check
+        self.default = default
+        self.is_optional = is_optional
+        self.is_plural = is_plural
+
+
+class OptSpec(object):
+
+    def __init__(self, attr, name, key, check, default,
+                 is_plural=False,  has_value=False, usage=None, hint=None):
+        self.attr = attr
+        self.name = name
+        self.key = key
+        self.check = check
+        self.default = default
+        self.is_plural = is_plural
+        self.has_value = has_value
+        self.usage = usage
+        self.hint = hint
+
+
+class Shell(object):
+
+    def __init__(self,
+                 name,
+                 description=None,
+                 local_package=None,
+                 entry_point=None,
+                 config_name=None,
+                 config_dirs=None):
+        self.name = name
+        self.description = description
+        self.local_package = local_package
+        self.entry_point = entry_point
+        self.config_name = config_name
+        self.config_dirs = config_dirs
+        self.environment = Environment()
+        self.task_by_name = {}
+        self.setting_by_name = {}
+
+    def register_task(self, T, is_default=False):
+        norm_T = self._normalize_task(T)
+        name = self._to_name(norm_T.__name__)
+        if is_default:
+            name = None
+        args, opts = self._extract_parameters(norm_T)
+        hint, help = self._describe(norm_T)
+        optionals = 0
+        usage = name
+        for arg in args:
+            if arg.is_plural:
+                assert arg is args[-1], "a plural argument must be the last" \
+                        " in the argument list: %s" % arg.name
+            if optionals:
+                assert arg.is_optional, "a mandatory argument must not follow" \
+                        " an optional argument: %s" % arg.name
+            if arg.is_optional:
+                optionals += 1
+                usage = "%s [<%s>" % (usage, arg.name)
+            else:
+                usage = "%s <%s>" % (usage, arg.name)
+            if arg.is_plural:
+                usage += "..."
+        if optionals:
+            usage += "]"*optionals
+        task = TaskSpec(name, norm_T, args, opts,
+                        usage=usage,
+                        hint=hint,
+                        help=help)
+        self.task_by_name[name] = task
+        return T
+
+    def register_default_task(self, T):
+        return self.register_task(T, True)
+
+    def register_setting(self, S):
+        assert isinstance(S, types.FunctionType), \
+                "a setting must be a function"
+        params, varargs, varkeywords = self._introspect(S)
+        assert (len(params) >= 1 and len(params[0]) == 2) or varargs, \
+                "a setting must accept zero or one parameter"
+        hint, help = self._describe(S)
+        name = self._to_name(S.__name__)
+        has_value = (not params or params[0][1] is not False)
+        value_name = (params and params[0][0] or varargs)
+        if has_value:
+            usage = "--%s=%s" % (name, value_name.upper())
+            usage_conf = "%s: %s" % (name, value_name.upper())
+            usage_environ = "%s_%s=%s" % (self.name.upper(),
+                                          name.upper().replace('-', '_'),
+                                          value_name.upper())
+        else:
+            usage = "--%s" % name
+            usage_conf = "%s: true" % name
+            usage_environ = "%s_%s=1" % (self.name.upper(),
+                                         name.upper().replace('-', '_'))
+        setting = SettingSpec(name, S,
+                              has_value=has_value,
+                              value_name=value_name,
+                              usage=usage,
+                              usage_conf=usage_conf,
+                              usage_environ=usage_environ,
+                              hint=hint,
+                              help=help)
+        self.setting_by_name[name] = setting
+        return S
+
+    def main(self, argv):
+        self._load_extensions()
+        config_settings = self._parse_config()
+        environ_settings = self._parse_environ()
+        task, attrs, argv_settings = self._parse_argv(argv)
+        self._init_settings(config_settings, environ_settings, argv_settings)
+        exit = self._execute_task(task, attrs)
+        return exit
+
+    def run(self):
+        debug_var = '%s_DEBUG' % self.name.upper()
+        debug = (os.environ.get(debug_var) in ['true', '1'] or
+                 '--debug' in sys.argv)
+        with self.environment():
+            self.environment.clear()
+            self.environment.add(debug=debug)
+            try:
+                return self.main(sys.argv)
+            except (Failure, IOError, KeyboardInterrupt), exc:
+                if self.environment.debug:
+                    raise
+                return exc
+
+    def _to_name(self, keyword):
+        return keyword.lower().replace(' ', '-').replace('_', '-')
+
+    def _introspect(self, fn):
+        # Find function parameters and default values.
+        params = []
+        varargs = None
+        varkeywords = None
+        code = fn.func_code
+        defaults = fn.func_defaults or ()
+        idx = 0
+        while idx < code.co_argcount:
+            name = code.co_varnames[idx]
+            if idx < code.co_argcount-len(defaults):
+                params.append(name)
+            else:
+                default = defaults[idx-code.co_argcount+len(defaults)]
+                params.append((name, default))
+            idx += 1
+        if code.co_flags & 0x04: # CO_VARARGS
+            varargs = code.co_varnames[idx]
+            idx += 1
+        if code.co_flags & 0x08: # CO_VARKEYWORDS
+            varkeywords = code.co_varnames[idx]
+            idx += 1
+        return params, varargs, varkeywords
+
+    def _describe(self, fn):
+        # Convert a docstring to a hint line and a description.
+        hint = ""
+        help = ""
+        doc = fn.__doc__
+        if doc:
+            hint = doc.lstrip().splitlines()[0].rstrip()
+            lines = doc.strip().splitlines()[1:]
+            while lines and not lines[0].strip():
+                lines.pop(0)
+            while lines and not lines[-1].strip():
+                lines.pop(-1)
+            indent = None
+            for line in lines:
+                short_line = line.lstrip()
+                if short_line:
+                    line_indent = len(line)-len(short_line)
+                    if indent is None or line_indent < indent:
+                        indent = line_indent
+            if indent:
+                lines = [line[indent:] for line in lines]
+            help = "\n".join(lines)
+        return hint, help
+
+    def _normalize_task(self, T):
+        assert isinstance(T, (types.ClassType,
+                              types.TypeType,
+                              types.FunctionType)), \
+                "a task must be either a function or a class"
+        if isinstance(T, types.FunctionType):
+            T_dict = {}
+            T_dict['__module__'] = T.__module__
+            T_dict['__doc__'] = T.__doc__
+            T_dict['_fn'] = staticmethod(T)
+            T_dict['_vararg'] = None
+            def __init__(self, **params):
+                self._params = params
+            T_dict['__init__'] = __init__
+            def __call__(self):
+                args = ()
+                kwds = self._params.copy()
+                if self._vararg:
+                    args = tuple(kwds.pop(self._vararg))
+                return self._fn(*args, **kwds)
+            T_dict['__call__'] = __call__
+            params, varargs, varkeywords = self._introspect(T)
+            for param in params:
+                if isinstance(param, str):
+                    T_dict[param] = ArgDsc()
+                else:
+                    param, default = param
+                    T_dict[param] = ArgDsc(default=default)
+            if varargs:
+                T_dict[param] = ArgDsc(default=(), plural=True)
+                T_dict['_vararg'] = varargs
+            T = type(T.__name__, (object,), T_dict)
+        elif isinstance(T, types.ClassType):
+            T_dict = {}
+            T_dict['__module__'] = T.__module__
+            T_dict['__doc__'] = T.__doc__
+            T = type(T.__name__, (T, object), T_dict)
+        return T
+
+    def _extract_parameters(self, T):
+        args = []
+        opts = []
+        attrs = {}
+        for C in reversed(T.__mro__):
+            attrs.update(C.__dict__)
+        arg_attrs = []
+        opt_attrs = []
+        for attr in sorted(attrs):
+            value = attrs[attr]
+            if isinstance(value, ArgDsc):
+                arg_attrs.append((value.order, attr, value))
+            if isinstance(value, OptDsc):
+                opt_attrs.append((value.order, attr, value))
+        arg_attrs.sort()
+        opt_attrs.sort()
+        names = set()
+        keys = set()
+        for order, attr, dsc in arg_attrs:
+            name = self._to_name(attr)
+            assert name not in names, \
+                    "duplicate argument name <%s>" % name
+            names.add(name)
+            check = dsc.check
+            default = dsc.default
+            is_plural = dsc.plural
+            is_optional = True
+            if default is dsc.REQ:
+                is_optional = False
+                default = None
+            arg = ArgSpec(attr, name, check,
+                          default=default,
+                          is_optional=is_optional,
+                          is_plural=is_plural)
+            args.append(arg)
+        for order, attr, dsc in opt_attrs:
+            name = self._to_name(attr)
+            assert name not in names, \
+                    "duplicate option name --%s" % name
+            names.add(name)
+            key = dsc.key
+            if key is not None:
+                assert key not in keys, \
+                        "duplicate option name -%s" % key
+                keys.add(key)
+            check = dsc.check
+            default = dsc.default
+            is_plural = dsc.plural
+            has_value = True
+            if default is dsc.NOVAL:
+                has_value = False
+                default = False
+            usage = "--%s" % name
+            if key is not None:
+                usage = "-%s/%s" % (key, usage)
+            if has_value:
+                value_name = (dsc.value_name or name).upper()
+                usage = "%s=%s" % (usage, value_name)
+            hint = dsc.hint
+            opt = OptSpec(attr, name, key, check, default,
+                          is_plural=is_plural,
+                          has_value=has_value,
+                          usage=usage,
+                          hint=hint)
+            opts.append(opt)
+        return args, opts
+
+    def _load_extensions(self):
+        # Load extensions registered using the entry point.
+        if self.entry_point:
+            for entry in pkg_resources.iter_entry_points(self.entry_point):
+                self._debug("loading extensions from %s" % entry)
+                entry.load()
+        # Load extensions from the current directory.
+        if self.local_package:
+            local_dir = os.path.join(os.getcwd(), self.local_package)
+            if os.path.isdir(local_dir):
+                local_init = os.path.join(local_dir, '__init__.py')
+                if not os.path.isfile(local_init):
+                    self._warn("cannot load extensions from %s:"
+                               " missing __init__.py" % local_dir)
+                else:
+                    uid = os.stat(local_init).st_uid
+                    if not (uid == os.getuid() or uid == 0):
+                        self._warn("cannot load extensions from %s:"
+                                   " not owned by the user or the root"
+                                   % local_dir)
+                    else:
+                        self._debug("loading extensions from %s" % local_dir)
+                        local = types.ModuleType(self.local_package)
+                        sys.modules[self.local_package] = local
+                        local.__package__ = self.local_package
+                        local.__path__ = [local_dir]
+                        code = imputil.py_suffix_importer(local_init,
+                                os.stat(local_init), self.local_package)[1]
+                        exec code in local.__dict__
+
+    def _parse_config(self):
+        settings = {}
+        # Load settings from configuration files.
+        if self.config_name and self.config_dirs:
+            for config_dir in self.config_dirs:
+                config_path = os.path.join(config_dir, self.config_name)
+                if not os.path.isfile(config_path):
+                    continue
+                self._debug("loading configuration from %s" % config_path)
+                try:
+                    data = yaml.load(open(config_path, 'r'))
+                except yaml.YAMLError, exc:
+                    self._warn("failed to load configuration from %s: %s"
+                               % (config_path, exc))
+                    continue
+                if data is None:
+                    continue
+                if not isinstance(data, dict):
+                    self._warn("ill-formed configuration file %s"
+                               % config_path)
+                    continue
+                for key in sorted(data):
+                    if not isinstance(key, str):
+                        self._warn("invalid setting %r"
+                                   " in configuration file %s"
+                                   % (key, config_path))
+                        continue
+                    name = self._to_name(key)
+                    if name not in self.setting_by_name:
+                        self._warn("unknown setting %s"
+                                   " in configuration file %s"
+                                   % (key, config_path))
+                        continue
+                    settings[name] = data[key]
+        return settings
+
+    def _parse_environ(self):
+        settings = {}
+        # Load settings from configuration files.
+        prefix = "%s_" % self.name.upper()
+        for key in sorted(os.environ):
+            if not key.startswith(prefix):
+                continue
+            name = self._to_name(key[len(prefix):])
+            if name not in self.setting_by_name:
+                self._warn("unknown setting %s in the environment" % key)
+                continue
+            settings[name] = os.environ[key]
+        return settings
+
+    def _parse_argv(self, argv):
+        # Parse command-line parameters.
+        settings = {}
+        task = None
+        attrs = {}
+        no_more_opts = False
+        params = argv[1:]
+        while params:
+            param = params.pop(0)
+            if param == '--' and not no_more_opts:
+                no_more_opts = True
+            elif param.startswith('--') and not no_more_opts:
+                if '=' in param:
+                    key, value = param.split('=', 1)
+                    no_value = False
+                else:
+                    key = param
+                    value = None
+                    no_value = True
+                name = self._to_name(key[2:])
+                if name in self.setting_by_name:
+                    setting = self.setting_by_name[name]
+                    if setting.has_value and no_value:
+                        if not params:
+                            raise self._fail("missing value for setting %s"
+                                             % key)
+                        value = params.pop(0)
+                        no_value = False
+                    if not setting.has_value:
+                        if not no_value:
+                            raise self._fail("unexpected value for a toggle"
+                                             " setting %s" % key)
+                        value = True
+                    settings[name] = value
+                else:
+                    if task is None:
+                        task = self.task_by_name[None]
+                    if name not in task.opt_by_name:
+                        raise self._fail("unknown option or setting %s" % key)
+                    opt = task.opt_by_name[name]
+                    if opt.has_value and no_value:
+                        if not params:
+                            raise self._fail("missing value for option %s"
+                                             % key)
+                        value = params.pop(0)
+                        no_value = False
+                    if not opt.has_value:
+                        if not no_value:
+                            raise self._fail("unexpected value for a toggle"
+                                             " option %s" % key)
+                        value = True
+                    if not opt.is_plural:
+                        if opt.attr in attrs:
+                            raise self._fail("duplicate option %s" % key)
+                        attrs[opt.attr] = value
+                    else:
+                        if opt.attr not in attrs:
+                            attrs[opt.attr] = []
+                        attrs[opt.attr].append(value)
+            elif param.startswith('-') and param != '-' and not no_more_opts:
+                if task is None:
+                    task = self.task_by_name[None]
+                keys = param[1:]
+                while keys:
+                    key = keys[0]
+                    keys = keys[1:]
+                    if key not in task.opt_by_key:
+                        raise self._fail("unknown option -%s" % key)
+                    opt = task.opt_by_key[key]
+                    if opt.has_value:
+                        if keys:
+                            value = keys
+                            keys = ''
+                        else:
+                            if not params:
+                                raise self._fail("missing value for option -%s"
+                                                 % key)
+                            value = params.pop(0)
+                    else:
+                        value = True
+                    if not opt.is_plural:
+                        if opt.attr in attrs:
+                            raise self._fail("duplicate option -%s" % key)
+                        attrs[opt.attr] = value
+                    else:
+                        if opt.attr not in attrs:
+                            attrs[opt.attr] = ()
+                        attrs[opt.attr] += (value,)
+            elif task is None:
+                if param == '-' and not no_more_opts:
+                    task = self.task_by_name[None]
+                else:
+                    name = self._to_name(param)
+                    if name not in self.task_by_name:
+                        raise self._fail("unknown task %s" % param)
+                    task = self.task_by_name[name]
+            else:
+                if param == '-' and not no_more_opts:
+                    param = None
+                for arg in task.args:
+                    if arg.attr not in attrs or arg.is_plural:
+                        break
+                else:
+                    if task.name:
+                        raise self._fail("too many arguments for task %s"
+                                         % task.name)
+                    else:
+                        raise self._fail("too many arguments")
+                if arg.is_plural:
+                    if arg.attr not in attrs:
+                        attrs[arg.attr] = ()
+                    attrs[arg.attr] += (param,)
+                else:
+                    attrs[arg.attr] = param
+        if task is None:
+            task = self.task_by_name[None]
+        for opt in task.opts:
+            if opt.attr in attrs:
+                if opt.check is not None:
+                    try:
+                        if opt.is_plural:
+                            attrs[opt.attr] = tuple(opt.check(value)
+                                                for value in attrs[opt.attr])
+                        else:
+                            attrs[opt.attr] = opt.check(attrs[opt.attr])
+                    except ValueError, exc:
+                        raise self._fail("invalid value for option --%s: %s"
+                                         % (opt.name, exc))
+            else:
+                attrs[opt.attr] = opt.default
+        for arg in task.args:
+            if arg.attr in attrs:
+                if arg.check is not None:
+                    try:
+                        if arg.is_plural:
+                            attrs[arg.attr] = tuple(arg.check(value)
+                                                for value in attrs[arg.attr])
+                        else:
+                            attrs[arg.attr] = arg.check(attrs[arg.attr])
+                    except ValueError, exc:
+                        raise self._fail("invalid value for argument <%s>: %s"
+                                         % (arg.name, exc))
+            else:
+                if not arg.is_optional:
+                    raise self._fail("missing argument <%s>" % arg.name)
+                attrs[arg.attr] = arg.default
+        return task, attrs, settings
+
+    def _init_settings(self, *values_list):
+        # Validate and initialize settings.
+        values = {}
+        for item in values_list:
+            values.update(item)
+        for name in sorted(self.setting_by_name):
+            setting = self.setting_by_name[name]
+            try:
+                if name in values:
+                    setting.code(values[name])
+                else:
+                    setting.code()
+            except ValueError, exc:
+                raise self._fail("invalid value for setting --%s: %s"
+                                 % (name, exc))
+
+    def _execute_task(self, task, attrs):
+        try:
+            instance = task.code(**attrs)
+        except ValueError, exc:
+            raise self._fail(str(exc))
+        return instance()
+
+    def _debug(self, *msgs, **opts):
+        from .logutils import LogUtils
+        log_utils = LogUtils(self)
+        return log_utils.debug(*msgs, **opts)
+
+    def _log(self, *msgs, **opts):
+        from .logutils import LogUtils
+        log_utils = LogUtils(self)
+        return log_utils.log(*msgs, **opts)
+
+    def _warn(self, *msgs, **opts):
+        from .logutils import LogUtils
+        log_utils = LogUtils(self)
+        return log_utils.warn(*msgs, **opts)
+
+    def _fail(self, *msgs, **opts):
+        from .logutils import LogUtils
+        log_utils = LogUtils(self)
+        return log_utils.fail(*msgs, **opts)
+
+

File src/cogs/std.py

+#
+# Copyright (c) 2012, Prometheus Research, LLC
+# Released under MIT license, see `LICENSE` for details.
+#
+
+
+from .logutils import LogUtils
+from .shell import ArgDsc
+import os.path
+import sys
+
+
+def register_std(shell):
+    log_utils = LogUtils(shell)
+    log = log_utils.log
+    fail = log_utils.fail
+
+
+    @shell.register_default_task
+    class Usage(object):
+        """explain how to obtain help on DDT"""
+
+        # FIXME: implement common options: `--help`, `--version`, `--license`
+        #help = option()
+        #version = option()
+        #license = option()
+
+        def __init__(self, help=False, version=False, license=False):
+            self.help = help
+            self.version = version
+            self.license = license
+
+        def __call__(self):
+            #if self.help:
+            #    t = Help()
+            #    return t()
+            #if self.version:
+            #    t = Version()
+            #    return t()
+            #if self.license:
+            #    t = License()
+            #    return t()
+            if shell.description:
+                log("%s - %s" % (shell.name, shell.description))
+            else:
+                log(shell.name)
+            executable = os.path.basename(sys.argv[0])
+            log("Usage: %s [<settings>...] <task> [<arguments>...]"
+                % executable)
+            log()
+            log("Run `%s help` for general usage and a list of tasks"
+                " and settings." % executable)
+            log("Run `%s help <task>` for help on a specific task."
+                % executable)
+
+
+    @shell.register_task
+    class Help(object):
+        """display help on tasks and options
+
+        When started without arguments, displays a list of available tasks,
+        options and toggles.
+
+        When `<name>` is given, describes the usage of the specified task
+        or option.
+        """
+
+        name = ArgDsc(default=None)
+
+        def __init__(self, name):
+            self.name = name
+
+        def __call__(self):
+            if self.name is None:
+                return self.describe_all()
+            if self.name in shell.task_by_name:
+                task = shell.task_by_name[self.name]
+                return self.describe_task(task)
+            elif self.name in shell.setting_by_name:
+                setting = shell.setting_by_name[self.name]
+                return self.describe_setting(setting)
+            else:
+                raise fail("unknown task or setting `%s`" % self.name)
+
+        def describe_all(self):
+            if shell.description:
+                log("%s - %s" % (shell.name, shell.description))
+            else:
+                log(shell.name)
+            executable = os.path.basename(sys.argv[0])
+            log("Usage: %s [<settings>...] <task> [<arguments>...]"
+                % executable)
+            log()
+            log("Run `%s help` for general usage and a list of tasks"
+                " and settings." % executable)
+            log("Run `%s help <task>` for help on a specific task."
+                % executable)
+            log()
+            log("Available tasks:")
+            for name in sorted(shell.task_by_name):
+                if not name:
+                    continue
+                task = shell.task_by_name[name]
+                usage = task.usage
+                hint = task.hint
+                if hint:
+                    log("  %-24s : %s" % (usage, hint))
+                else:
+                    log("  %s" % usage)
+            log()
+            log("Settings:")
+            for name in sorted(shell.setting_by_name):
+                setting = shell.setting_by_name[name]
+                usage = setting.usage
+                hint = setting.hint
+                if hint:
+                    log("  %-24s : %s" % (usage, hint))
+                else:
+                    log("  %s" % usage)
+            log()
+
+        def describe_task(self, task):
+            name = task.name
+            hint = task.hint
+            if hint:
+                log("%s - %s" % (name.upper(), hint))
+            else:
+                log(name.upper())
+            usage = task.usage
+            executable = os.path.basename(sys.argv[0])
+            log("Usage: `%s %s`" % (executable, usage))
+            log()
+            help = task.help
+            if help:
+                log(help)
+                log()
+            if task.opts:
+                log("Options:")
+                for opt in task.opts:
+                    usage = opt.usage
+                    hint = opt.hint
+                    if hint:
+                        log("  %-24s : %s" % (usage, hint))
+                    else:
+                        log("  %s" % usage)
+
+        def describe_setting(self, setting):
+            name = setting.name
+            hint = setting.hint
+            if hint:
+                log("%s - %s" % (name.upper(), hint))
+            else:
+                log(name.upper())
+            executable = os.path.basename(sys.argv[0])
+            usage = setting.usage
+            usage_conf = setting.usage_conf
+            usage_environ = setting.usage_environ
+            log("Usage: `%s %s`" % (executable, usage))
+            if shell.config_name:
+                log("       `%s` (%s)" % (usage_conf, shell.config_name))
+            log("       `%s` (environment)" % usage_environ)
+            log()
+            help = setting.help
+            if help:
+                log(help)
+                log()
+
+
+    @shell.register_setting
+    def Debug(value=False):
+        """print debug information"""
+        if value is None or value in ['false', '', '0', 0]:
+            value = False
+        if value in ['true', '1', 1]:
+            value = True
+        if not isinstance(value, bool):
+            raise ValueError("debug: expected a Boolean value; got %r" % value)
+        shell.environment.set(debug=value)
+
+

File src/cogs/sysutils.py

+#
+# Copyright (c) 2012, Prometheus Research, LLC
+# Released under MIT license, see `LICENSE` for details.
+#
+
+
+from .logutils import LogUtils
+import sys
+import glob
+import os
+import shutil
+import subprocess
+
+
+class SysUtils(object):
+
+    def __init__(self, shell):
+        self.shell = shell
+        self.env = shell.environment
+        log_utils = LogUtils(shell)
+        self._debug = log_utils.debug
+        self._fail = log_utils.fail
+
+    def cp(self, src_path, dst_path):
+        # Copy a file or a directory.
+        self._debug("cp `%s` `%s`" % (src_path, dst_path))
+        shutil.copy(src_path, dst_path)
+
+    def mv(self, src_path, dst_path):
+        # Rename a file.
+        self._debug("mv `%s` `%s`" % (src_path, dst_path))
+        os.rename(src_path, dst_path)
+
+    def rm(self, path):
+        # Remove a file.
+        self._debug("rm `%s`" % path)
+        os.unlink(path)
+
+    def rmtree(self, path):
+        # Remove a directory tree.
+        self._debug("rmtree `%s`" % path)
+        shutil.rmtree(path)
+
+    def mktree(self, path):
+        # Create a directory tree.
+        if not os.path.isdir(path):
+            self._debug("mktree `%s`" % path)
+            os.makedirs(path)
+
+    def exe(self, cmd):
+        # Execute the command replacing the current process.
+        self._debug("`%s`" % cmd)
+        line = cmd.split()
+        try:
+            os.execvp(line[0], line)
+        except OSError, exc:
+            raise self._fail(exc)
+
+    def sh(self, cmd, data=None, cd=None):
+        # Execute a command using shell.
+        if cd is not None:
+            cmd = "cd %s && %s" % (cd, cmd)
+        stream = subprocess.PIPE
+        if self.env.debug:
+            stream = None
+        self._debug("`sh %s`" % cmd)
+        proc = subprocess.Popen(cmd, shell=True, stdin=stream,
+                                stdout=stream, stderr=stream)
+        proc.communicate(data)
+        if proc.returncode != 0:
+            raise self._fail("`sh %s`: non-zero exit code" % cmd)
+
+    def pipe(self, cmd, data=None, cd=None):
+        # Execute the command, return the output.
+        if cd is not None:
+            cmd = "cd %s && %s" % (cd, cmd)
+        stream = subprocess.PIPE
+        self._debug("`| %s`" % cmd)
+        proc = subprocess.Popen(cmd, shell=True,
+                                stdout=stream, stderr=stream)
+        out, err = proc.communicate(data)
+        if proc.returncode != 0:
+            if self.env.debug:
+                if out:
+                    sys.stdout.write(out)
+                if err:
+                    sys.stderr.write(err)
+            raise self._fail("`| %s`: non-zero exit code" % cmd)
+        return out
+
+

File src/ddt/__init__.py

-#
-# Copyright (c) 2012, Prometheus Research, LLC
-# Released under MIT license, see `LICENSE` for details.
-#
-
-
-from .run import main
-from .core import env, task, argument, option, setting
-from .out import debug, log, warn, fail
-
-__import__('ddt.std')
-
-

File src/ddt/core.py

-#
-# Copyright (c) 2012, Prometheus Research, LLC
-# Released under MIT license, see `LICENSE` for details.
-#
-
-
-import collections
-import itertools
-import types
-import re
-import sys
-import os
-
-
-class Failure(Exception):
-    pass
-
-
-class Environment(object):
-
-    _state = {}
-    _state_stack = []
-
-    class _context(object):
-
-        def __init__(self, env, **kwds):
-            self.env = env
-            self.kwds = kwds
-
-        def __enter__(self):
-            self.env.push(**self.kwds)
-
-        def __exit__(self, exc_type, exc_value, exc_tb):
-            self.env.pop()
-
-    def __init__(self):
-        self.__dict__ = self._state
-
-    def add(self, **kwds):
-        for key in sorted(kwds):
-            assert key not in self.__dict__, \
-                    "duplicate parameter %r" % key
-            self.__dict__[key] = kwds[key]
-
-    def set(self, **kwds):
-        for key in sorted(kwds):
-            assert key in self.__dict__, \
-                    "unknown parameter %r" % key
-            self.__dict__[key] = kwds[key]
-
-    def push(self, **kwds):
-        self._state_stack.append(self._state)
-        self._state = self._state.copy()
-        self.__dict__ = self._state
-        self.set(**kwds)
-
-    def pop(self):
-        assert self._stack, "unbalanced pop()"
-        self._state = self._state_stack.pop()
-        self.__dict__ = self._state
-
-    def __call__(self, **kwds):
-        return self._context(self, **kwds)
-
-
-env = Environment()
-env.add(_task_by_name={},
-        _setting_by_name={},
-        debug=(os.environ.get('DDT_DEBUG') in ['true', '1']
-               or '--debug' in sys.argv))
-
-
-def _attr_to_name(attr):
-    # Convert an attribute name to a task/setting name.
-    return attr.lower().replace(' ', '-').replace('_', '-')
-
-
-def _doc_to_hint_and_help(doc):
-    # Convert a docstring to a hint line and a description.
-    hint = help = ""
-    if doc:
-        hint = doc.lstrip().splitlines()[0].rstrip()
-        lines = doc.strip().splitlines()[1:]
-        while lines and not lines[0].strip():
-            lines.pop(0)
-        while lines and not lines[-1].strip():
-            lines.pop(-1)
-        indent = None
-        for line in lines:
-            short_line = line.lstrip()
-            if short_line:
-                line_indent = len(line)-len(short_line)
-                if indent is None or line_indent < indent:
-                    indent = line_indent
-        if indent:
-            lines = [line[indent:] for line in lines]
-        help = "\n".join(lines)
-    return hint, help
-
-
-class argument(object):
-
-    COUNTER = itertools.count(1)
-    REQUIRED = object()
-
-    def __init__(self, check=None, default=REQUIRED,
-                 is_optional=False, is_list=False):
-        if default is self.REQUIRED:
-            default = None
-        else:
-            is_optional = True
-        self.check = check
-        self.default = default
-        self.is_optional = is_optional
-        self.is_list = is_list
-        self.order = next(self.COUNTER)
-
-    def __get__(self, instance, owner):
-        if instance is None:
-            return self
-        raise AttributeError("unset argument")
-
-
-class option(object):
-
-    COUNTER = itertools.count(1)
-
-    def __init__(self, key=None, check=None, default=None,
-                 has_value=None, value_name=None, hint=None):
-        assert key is None or (isinstance(key, str) and
-                               re.match(r'^[a-zA-Z]$', key)), \
-                "key must be a letter, got %r" % key
-        if has_value is None:
-            if default is False:
-                has_value = False
-            else:
-                has_value = True
-        self.key = key
-        self.check = check
-        self.default = default
-        self.has_value = has_value
-        self.value_name = value_name
-        self.hint = hint
-        self.order = next(self.COUNTER)
-
-    def __get__(self, instance, owner):
-        if instance is None:
-            return self
-        raise AttributeError("unset option")
-
-
-argspec = collections.namedtuple('argspec',
-        ['name', 'attribute', 'check', 'default',
-         'is_optional', 'is_list', 'order'])
-optspec = collections.namedtuple('optspec',
-        ['name', 'key', 'attribute', 'check', 'default',
-         'has_value', 'value_name', 'hint', 'usage', 'order'])
-
-
-def task(T, is_default=False):
-    assert isinstance(T, (types.ClassType, types.TypeType, types.FunctionType)), \
-            "a task must be either a function or a class"
-    T_orig = T
-    if isinstance(T, types.FunctionType):
-        T_dict = {}
-        T_dict['__module__'] = T_orig.__module__
-        T_dict['__doc__'] = T_orig.__doc__
-        T_dict['_fn'] = staticmethod(T_orig)
-        T_dict['_vararg'] = None
-        def __init__(self, **params):
-            self._params = params
-        T_dict['__init__'] = __init__
-        def __call__(self):
-            args = ()
-            kwds = self._params.copy()
-            if self._vararg:
-                args = tuple(kwds.pop(self._vararg))
-            return self._fn(*args, **kwds)
-        T_dict['__call__'] = __call__
-        T = type(T_orig.__name__, (object,), T_dict)
-        code = T_orig.func_code
-        defaults = T_orig.func_defaults or ()
-        for idx, name in enumerate(code.co_varnames[:code.co_argcount]):
-            if idx < code.co_argcount-len(defaults):
-                arg = argument()
-            else:
-                default = defaults[idx-code.co_argcount+len(defaults)]
-                arg = argument(default=default, is_optional=True)
-            setattr(T, name, arg)
-        if code.co_flags & 0x04: # CO_VARARGS
-            name = code.co_varnames[code.co_argcount]
-            arg = argument(default=(), is_optional=True,
-                           is_list=True)
-            setattr(T, name, arg)
-            T._vararg = name
-    elif isinstance(T, types.ClassType):
-        T_dict = {}
-        T_dict['__module__'] = T_orig.__module__
-        T_dict['__doc__'] = T_orig.__doc__
-        T = type(T_orig.__name__, (T_orig, object), T_dict)
-    attrs = {}
-    for C in reversed(T.__mro__):
-        attrs.update(C.__dict__)
-    T._args = []
-    T._arg_by_name = {}
-    T._opts = []
-    T._opt_by_key = {}
-    T._opt_by_name = {}
-    for attr in sorted(attrs):
-        value = attrs[attr]
-        if isinstance(value, argument):
-            name = _attr_to_name(attr)
-            assert name not in T._arg_by_name, \
-                    "duplicate argument %s" % keyword
-            A = argspec(name, attr, value.check, value.default,
-                        value.is_optional, value.is_list, value.order)
-            T._args.append(A)
-            T._arg_by_name[A.name] = A
-        elif isinstance(value, option):
-            name = _attr_to_name(attr)
-            key = value.key
-            assert name not in T._opt_by_name, \
-                    "duplicate option --%s" % name
-            assert key is None or key not in T._opt_by_key, \
-                    "duplicate option -%s" % key
-            usage = "--%s" % name
-            if key:
-                usage = "-%s/%s" % (key, usage)
-            if value.has_value:
-                value_name = value.value_name or name
-                usage = "%s=%s" % (usage, value_name.upper())
-            O = optspec(name, key, attr, value.check, value.default,
-                        value.has_value, value.value_name, value.hint,
-                        usage, value.order)
-            T._opts.append(O)
-            T._opt_by_name[O.name] = O
-            if O.key:
-                T._opt_by_key[O.key] = O
-    T._args.sort(key=(lambda a: a.order))
-    T._opts.sort(key=(lambda o: o.order))
-    name = _attr_to_name(T.__name__)
-    optionals = 0
-    usage = name
-    for A in T._args:
-        if A.is_list:
-            assert A is T._args[-1], "wildcard argument must be the last"
-        if optionals:
-            assert A.is_optional, "mandatory argument must precede" \
-                    " all optional arguments"
-        if A.is_optional:
-            optionals += 1
-        if A.is_optional:
-            usage = "%s [<%s>" % (usage, A.name)
-        else:
-            usage = "%s <%s>" % (usage, A.name)
-        if A.is_list:
-            usage += "..."
-    if optionals:
-        usage += "]"*optionals
-    if is_default:
-        name = None
-    assert name not in env._task_by_name, \
-            "duplicate task %s" % name
-    T._name = name
-    T._usage = usage
-    T._hint, T._help = _doc_to_hint_and_help(T.__doc__)
-    env._task_by_name[name] = T
-    return T_orig
-
-
-def default_task(T):
-    return task(T, True)
-
-
-def setting(S):
-    assert isinstance(S, types.FunctionType), \
-            "a task must be a function"
-    code = S.func_code
-    varnames = code.co_varnames[:code.co_argcount]
-    defaults = S.func_defaults or ()
-    assert len(varnames) >= 1 and len(defaults) == len(varnames)
-    name = _attr_to_name(S.__name__)
-    S._name = name
-    S._hint, S._help = _doc_to_hint_and_help(S.__doc__)
-    value_name = _attr_to_name(varnames[0])
-    default = defaults[0]
-    has_value = default is not False
-    if has_value:
-        usage = "--%s=%s" % (name, value_name.upper())
-        usage_conf = "%s: %s" % (name, value_name.upper())
-        usage_environ = "DDT_%s=%s" % (name.upper().replace('-', '_'),
-                                       value_name.upper())
-    else:
-        usage = "--%s" % name
-        usage_conf = "%s: true" % name
-        usage_environ = "DDT_%s=1" % name.upper().replace('-', '_')
-    S._has_value = has_value
-    S._value_name = value_name
-    S._usage = usage
-    S._usage_conf = usage_conf
-    S._usage_environ = usage_environ
-    assert name not in env._setting_by_name, "duplicate setting %s" % name
-    env._setting_by_name[name] = S
-    return S
-
-

File src/ddt/fs.py

-#
-# Copyright (c) 2012, Prometheus Research, LLC
-# Released under MIT license, see `LICENSE` for details.
-#
-
-
-from ddt.core import env
-from ddt.out import debug, fail
-import sys
-import glob
-import os
-import shutil
-import subprocess
-
-
-def ls(pattern='.'):
-    # List files matching the pattern.
-    return sorted(glob.glob(pattern))
-
-
-def cp(src_path, dst_path):
-    # Copy a file or a directory.
-    debug("cp `%s` `%s`" % (src_path, dst_path))
-    shutil.copy(src_path, dst_path)
-
-
-def mv(src_path, dst_path):
-    # Rename a file.
-    debug("mv `%s` `%s`" % (src_path, dst_path))
-    os.rename(src_path, dst_path)
-
-
-def rm(path):
-    # Remove a file.
-    debug("rm `%s`" % path)
-    os.unlink(path)
-
-
-def rmtree(path):
-    # Remove a directory tree.
-    debug("rmtree `%s`" % path)
-    shutil.rmtree(path)
-
-
-def mktree(path):
-    # Create a directory tree.
-    if not os.path.isdir(path):
-        debug("mktree `%s`" % path)
-        os.makedirs(path)
-
-
-def exe(cmd):
-    # Execute the command replacing the current process.
-    log("`%s`" % cmd)
-    line = cmd.split()
-    try:
-        os.execvp(line[0], line)
-    except OSError, exc:
-        raise fail(exc)
-
-
-def sh(cmd, data=None, cd=None):
-    # Execute a command using shell.
-    if cd is not None:
-        cmd = "cd %s && %s" % (cd, cmd)
-    stream = subprocess.PIPE
-    if env.debug:
-        stream = None
-    debug("`sh %s`" % cmd)
-    proc = subprocess.Popen(cmd, shell=True, stdin=stream,
-                            stdout=stream, stderr=stream)
-    proc.communicate(data)
-    if proc.returncode != 0:
-        raise fail("`sh %s`: non-zero exit code" % cmd)
-
-
-def pipe(cmd, data=None, cd=None):
-    # Execute the command, return the output.
-    if cd is not None:
-        cmd = "cd %s && %s" % (cd, cmd)
-    stream = subprocess.PIPE
-    debug("`| %s`" % cmd)
-    proc = subprocess.Popen(cmd, shell=True,
-                            stdout=stream, stderr=stream)
-    out, err = proc.communicate(data)
-    if proc.returncode != 0:
-        if env.debug:
-            if out:
-                sys.stdout.write(out)
-            if err:
-                sys.stderr.write(err)
-        raise fail("`| %s`: non-zero exit code" % cmd)
-    return out
-
-

File src/ddt/local/__init__.py

-#
-# Copyright (c) 2012, Prometheus Research, LLC
-# Released under MIT license, see `LICENSE` for details.
-#
-
-
-# This is a dummy package for extensions loaded from `ddt.local`
-# subdirectory.
-
-

File src/ddt/out.py

-#
-# Copyright (c) 2012, Prometheus Research, LLC
-# Released under MIT license, see `LICENSE` for details.
-#
-
-
-from ddt.core import Failure, env
-import sys
-import os
-import re
-
-
-S_RESET = 0
-S_BRIGHT = 1
-S_DIM = 2
-S_UNDERSCORE = 4
-S_BLINK = 5
-S_REVERSE = 7
-S_HIDDEN = 8
-FG_BLACK = 30
-FG_RED = 31
-FG_GREEN = 32
-FG_YELLOW = 33
-FG_BLUE = 34
-FG_MAGENTA = 35
-FG_CYAN = 36
-FG_WHITE = 37
-BG_BLACK = 40
-BG_RED = 41
-BG_GREEN = 42
-BG_YELLOW = 43
-BG_BLUE = 44
-BG_MAGENTA = 45
-BG_CYAN = 46
-BG_WHITE = 47
-
-
-styles = {
-    None: [S_BRIGHT],
-    'footnote': [S_DIM],
-    'warning': [S_BRIGHT, FG_RED],
-    'success': [S_BRIGHT, FG_GREEN],
-}
-
-
-def colorize(msg, has_colors=True):
-    def replace(match):
-        indicator = match.group('indicator')
-        data = match.group('data')
-        assert indicator in styles, "unknown style %r" % indicator
-        if not has_colors:
-            return data
-        lesc = "\x1b[%sm" % ";".join(str(style)
-                                     for style in styles[indicator])
-        resc = "\x1b[%sm" % S_RESET
-        return lesc+data+resc
-    return re.sub(r"(?::(?P<indicator>[a-zA-Z]+):)?`(?P<data>[^`]*)`",
-                  replace, msg)
-
-
-def log(*msgs, **opts):
-    sep = opts.pop('sep', " ")
-    end = opts.pop('end', "\n")
-    file = opts.pop('file', sys.stdout)
-    assert not opts, opts
-    has_colors = (file.isatty() and os.environ.get('COLORTERM'))
-    data = sep.join(colorize(str(msg), has_colors) for msg in msgs) + end
-    file.write(data)
-    file.flush()
-
-
-def debug(*msgs, **opts):
-    if env.debug:
-        return log(":footnote:`...`", file=sys.stderr, *msgs, **opts)
-
-
-def warn(*msgs, **opts):
-    return log(":warning:`WARNING`:", file=sys.stderr, *msgs, **opts)
-
-
-def fail(*msgs, **opts):
-    log(":warning:`FATAL ERROR`:", file=sys.stderr, *msgs, **opts)
-    return Failure()
-
-
-def prompt(msg):
-    value = ""
-    while not value:
-        value = raw_input(msg+" ").strip()
-    return value
-
-

File src/ddt/run.py

-#
-# Copyright (c) 2012, Prometheus Research, LLC
-# Released under MIT license, see `LICENSE` for details.
-#
-
-
-from ddt.core import env, Failure, _attr_to_name
-from ddt.out import debug, warn, fail
-import sys
-import os.path
-import collections
-import imputil
-import pkg_resources
-import yaml
-
-
-def main(argv):
-    # Load extensions from entry point `ddt.extensions`.
-    for entry in pkg_resources.iter_entry_points('ddt.extensions'):
-        debug("loading extensions from `%s`" % entry)
-        entry.load()
-    # Load extensions from `./ddt.local/`.
-    cwd = os.getcwd()
-    local_dir = os.path.join(cwd, 'ddt.local')
-    if os.path.isdir(local_dir):
-        local_package = os.path.join(local_dir, '__init__.py')
-        if not os.path.isfile(local_package):
-            warn("cannot load extensions from `%s`:"
-                 " missing `__init__.py`" % local_dir)
-        else:
-            uid = os.stat(local_package).st_uid
-            if not (uid == os.getuid() or uid == 0):
-                warn("cannot load extensions from `%s`:"
-                     " not owned by the user or the root" % local_dir)
-            else:
-                debug("loading extensions from `%s`" % local_dir)
-                from . import local
-                local.__path__.append(local_dir)
-                imputil._os_stat = os.stat
-                code = imputil.py_suffix_importer(local_package,
-                                                  os.stat(local_package),
-                                                  'ddt.local')[1]
-                exec code in local.__dict__
-
-    # Load configuration files.
-    settings = {}
-    conf_paths = []
-    conf_paths.append('/etc/ddt.conf')
-    conf_paths.append(os.path.join(sys.prefix, '/etc/ddt.conf'))
-    conf_paths.append(os.path.expanduser('~/.ddt.conf'))
-    conf_paths.append(os.path.abspath('./ddt.conf'))
-    for conf_path in conf_paths:
-        if not os.path.isfile(conf_path):
-            continue
-        debug("loading configuration from `%s`" % conf_path)
-        try:
-            data = yaml.load(open(conf_path, 'r'))
-        except yaml.YAMLError, exc:
-            warn("failed to load configuration from `%s`: %s"
-                 % (conf_path, exc))
-            continue
-        if data is None:
-            continue
-        if not isinstance(data, dict):
-            warn("ill-formed configuration file `%s`" % conf_path)
-            continue
-        for key in sorted(data):
-            if not isinstance(key, str):
-                warn("invalid setting `%r` in `%s`"
-                     % (key, conf_path))
-                continue
-            name = _attr_to_name(key)
-            if name not in env._setting_by_name:
-                warn("unknown setting `%s` in `%s`"
-                     % (key, conf_path))
-                continue
-            settings[name] = data[key]
-    for key in sorted(os.environ):
-        if key.startswith('DDT_'):
-            name = _attr_to_name(key[4:])
-            if name not in env._setting_by_name:
-                warn("unknown configuration parameter `%s`"
-                     " in the environment" % key)
-                continue
-            settings[name] = os.environ[key]
-
-    # Parse command-line parameters.
-    T = None
-    args = {}
-    opts = {}
-    no_more_opts = False
-    argv = argv[1:]
-    while argv:
-        arg = argv.pop(0)
-        if arg == '--':
-            no_more_opts = True
-        elif arg.startswith('--') and not no_more_opts:
-            if '=' in arg:
-                key, value = arg.split('=', 1)
-                no_value = False
-            else:
-                key = arg
-                value = None
-                no_value = True
-            name = _attr_to_name(key[2:])
-            if name in env._setting_by_name:
-                S = env._setting_by_name[name]
-                if S._has_value and no_value:
-                    if not argv:
-                        raise fail("missing value for setting `%s`" % key)
-                    value = argv.pop(0)
-                    no_value = False
-                if not S._has_value:
-                    if not no_value:
-                        raise fail("unexpected value for a toggle setting `%s`"