1. Prometheus Research, LLC
  2. Prometheus
  3. cogs

Commits

Kirill Simonov  committed f4a934c

First draft.

  • Participants
  • Branches default

Comments (0)

Files changed (11)

File .hgignore

View file
+syntax: glob
+*.pyc
+*.pyo
+.*.sw?
+*.egg-info
+build
+dist
+sandbox

File LICENSE

View file
+Copyright (c) 2012, Prometheus Research, LLC
+
+Permission is hereby granted, free of charge, to any person
+obtaining a copy of this software and associated documentation
+files (the "Software"), to deal in the Software without
+restriction, including without limitation the rights to use,
+copy, modify, merge, publish, distribute, sublicense, and/or
+sell copies of the Software, and to permit persons to whom the
+Software is furnished to do so, subject to the following
+conditions:
+
+The above copyright notice and this permission notice shall be
+included in all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+NONINFRINGEMENT.  IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+OTHER DEALINGS IN THE SOFTWARE.

File README

View file
+*****************************************************************
+  DDT -- Development, Deployment and Testing automation toolkit
+*****************************************************************
+
+Overview
+========
+
+DDT is a toolkit for developing command-line utilities in Python.  It
+handles common operations such as parsing command-line parameters,
+dispatching commands, loading configuration files and is targeted
+to developers, sysadmins, testers, or anybody else who needs to script
+their routine tasks.
+
+DDT is a free software licensed under MIT license.  DDT is written by
+Kirill Simonov from Prometheus Research, LLC.
+
+
+Getting Started
+===============
+
+You can install DDT using `PIP package manager`_::
+
+    # pip install DDT
+
+.. _PIP package manager: http://www.pip-installer.org/
+
+This operation installs a command-line utility ``ddt`` and a Python
+package of the same name.  The ``ddt`` utility is a dispatcher which
+which lets you select and execute your scripts (called *tasks*).  Let us
+show how to do it with a simple example.
+
+In the current directory, make a subdirectory ``ddt.local`` and create a
+file ``__init__.py`` with the following content::
+
+    from ddt import task, log
+    import os
+
+    @task
+    def Hello(name=None):
+        """greet the given entity (if not specified, the current user)"""
+        if name is None:
+            name = os.getlogin()
+        log("Hello, %s!" % name.capitalize())
+
+Now run::
+
+    $ ddt hello world
+    Hello, World!
+
+    $ ddt hello
+    Hello, Xi!
+
+    $ ddt help hello
+    HELLO - greet the given entity (if not specified, the current user)
+    Usage: ddt hello [<name>]
+
+DDT converts function ``Hello()`` into a command-line script with
+parameters inferred from the function signature, so then when you
+execute a command ``ddt hello world``, you call a function
+``Hello('world')``.
+
+
+Loading Extensions
+==================
+
+In this section, we describe how DDT finds and loads extensions.
+
+DDT loads extensions from two places:
+
+* ``ddt.local`` subdirectory in the current directory;
+* all Python packages under ``ddt.extensions`` entry point.
+
+The easiest way to add an extension is to create a ``ddt.local``
+subdirectory and add your scripts there.  The subdirectory must contain
+an ``__init__.py`` script, which is executed by DDT on startup.  The
+``ddt.local`` subdirectory must be owned by the same user who runs the
+``ddt`` script, or by ``root``; otherwise it is ignored.
+
+If you need to package and distribute your DDT extensions, using
+``ddt.local`` may be inconvenient.  In this case, you may package your
+DDT extensions as a regular Python distribution.
+
+Suppose we want to pack the ``hello`` task as a separate package.
+Create a directory tree with the following structure::
+
+    DDT-Hello/
+        src/
+            ddt_hello/
+                __init__.py
+        setup.py
+
+The file ``ddt_hello/__init__.py`` contains the definition of the
+``hello`` task and has the same content as ``ddt.local/__init__.py`` in
+our previous example.
+
+The file ``setup.py`` contains the meta-data of the package and may have
+the following content::
+
+    from setuptools import setup
+
+    setup(
+        name='DDT-Hello',
+        version='0.0.1',
+        description="""A DDT task to greet somebody""",
+        packages=['ddt_hello'],
+        package_dir={'': 'src'},
+        install_requires=['DDT'],
+        entry_points={ 'ddt.extensions': ['Hello = ddt_hello'] },
+    )
+
+Note the parameter ``entry_points`` in ``setup()`` invocation; it adds
+an entry point ``ddt.extensions`` named ``Hello`` that refers to package
+``ddt_hello``.  On startup, DDT finds and loads all packages defined for
+the entry point ``ddt.extensions``.
+
+
+Defining Tasks
+==============
+
+A task can be created from a function or a class by augmenting it with
+the ``task`` decorator::
+
+    from ddt import task, argument, log, fail
+
+    @task
+    def Factorial(n):
+        """calculate n!
+
+        This task calculates the value of the factorial of the given
+        positive number `n`.  Factorial of n, also known as n!, is
+        defined by the formula:
+
+            n! = 1*2*...*(n-1)*n
+        """
+        try:
+            n = int(n)
+        except ValueError:
+            raise fail("n must be an integer")
+        if n < 1:
+            raise fail("n must be positive")
+        f = 1
+        for k in range(2, n+1):
+            f *= k
+        log("%s! = `%s`" % (n, f))
+
+    @task
+    class Fibonacci:
+        """calculate the n-th Fibonacci number
+
+        The n-th Fibonacci number `F_n` is defined by:
+
+            F_0 = 0
+            F_1 = 1
+            F_n = F_{n-1}+F_{n-2} (n>1)
+        """
+
+        n = argument(int)
+
+        def __init__(self, n):
+            if n < 0:
+                raise ValueError("n must be non-negative")
+            self.n = n
+
+        def __call__(self):
+            p, q = 0, 1
+            for k in range(self.n):
+                p, q = p+q, p
+            log("F_%s = `%s`" % (self.n, p))
+
+You can now execute the tasks by running::
+
+    $ ddt factorial 10
+    10! = 3628800
+
+    $ ddt fibonacci 10
+    F_10 = 55
+
+DDT uses the name of the function or the class as the task identifier.
+The name is normalized: it is converted to lower case and has all
+underscore characters converted to the dash symbol.
+
+If the task is derived from a function, the task arguments are inferred
+from the function signature.  DDT executes such a task by calling the
+function with the parsed command-line parameters.
+
+If the task is derived from a class, the task arguments and options must
+be defined using ``argument()`` and ``option()`` descriptors.  To
+execute a task, DDT creates an instance of the class passing the task
+parameters as the constructor arguments.  Then DDT invokes the
+``__call__`` method of the instance.  Thus the call of::
+
+    $ ddt factorial 10
+
+is translated to::
+
+    Factorial('10')
+
+and the call of::
+
+    $ ddt fibonacci 10
+
+is translated to::
+
+    t = Fibonacci(10)
+    t()
+
+The docstring of the function or the class becomes the task
+description::
+
+    $ ddt help factorial
+    FACTORIAL - calculate n!
+    Usage: ddt factorial <n>
+
+    This task calculates the value of the factorial of the given
+    positive number n.  Factorial of n, also known as n!, is
+    defined by the formula:
+
+        n! = 1*2*...*(n-1)*n
+
+    $ ddt help fibonacci
+    Usage: ddt fibonacci <n>
+
+    The n-th Fibonacci number F_n is defined by:
+
+        F_0 = 0
+        F_1 = 1
+        F_n = F_{n-1}+F_{n-2} (n>1)
+
+A task derived from a function cannot have options.  To add an option to
+a task derived from a class, use the ``option()`` descriptor.  For
+example::
+
+    from ddt import task, argument, option
+    import sys, os
+
+    @task
+    class Write_Hello:
+
+        name = argument(default=None)
+        output = option(key='o', default=None)
+
+        def __init__(self, name, output):
+            if name is None:
+                name = os.getlogin()
+            self.name = name
+            if output is None:
+                self.file = sys.stdout
+            else:
+                self.file = open(output, 'w')
+
+        def __call__(self):
+            log("Hello, %s!" % self.name.capitalize(),
+                file=self.file)
+
+You can execute this task with option ``--output`` or ``-o`` to redirect
+the output to a file::
+
+    $ ddt write-hello world -o hello.txt
+
+
+Configuration and Environment
+=============================
+
+DDT allows you to define custom configuration parameters.  For example::
+
+    from ddt import env, task, setting, log
+    import os
+
+    @setting
+    def Default_Name(name=None):
+        """the name to use for greetings (if not set: login name)"""
+        if name is None or name == '':
+            name = os.getlogin()
+        if not isinstance(name, str):
+            raise ValueError("a string value is expected")
+        env.add(default_name=name)
+
+    @task
+    def Hello_With_Configuration(name=None):
+        if name is None:
+            name = env.default_name
+        log("Hello, %s!" % name.capitalize())
+
+Now you could specify the name as a configuration parameter
+``default-name``.  One way to do it is to use global option
+``--default-name``::
+
+    $ ddt --default-name=world hello-with-configuration
+
+You could also pass a configuration parameter using an environment
+variable::
+
+    $ DDT_DEFAULT_NAME=world ddt hello-with-configuration
+
+Alternatively, you can put parameters to a configuration file.  In the
+current directory, create a file ``ddt.conf`` with the following
+content::
+
+    default-name: world
+
+Now run::
+
+    $ ddt hello-with-configuration
+
+DDT reads configuration from the following locations:
+
+* ``/etc/ddt.conf``
+* ``$PREFIX/etc/ddt.conf``
+* ``$HOME/.ddt.conf``
+* ``./ddt.conf``
+* program environment
+* command-line options
+
+To create a new configuration parameter, wrap a function named after the
+parameter with the ``@setting`` decorator.  The function must accept
+zero or one argument: the function is called without arguments if the
+parameter is not specified explicitly, and is called with the value of
+the parameter is it was set using one of the methods described above.
+
+DDT does not impose any rules on what to do with the parameter value,
+but we recommend to store the value in the global ``env`` variable.  The
+call of ``env.add(default_name=name)`` adds a new parameter
+``default_name`` which could then be accessed as ``env.default_name``.
+
+
+API Reference
+=============
+
+The following functions and classes are defined in the package ``ddt``:
+
+``@task``
+
+    The ``@task`` decorator converts the wrapped function or class into
+    a task.  Task properties are inferred from the wrapped object as
+    follows:
+
+    *name*
+        Generated from the function or the class name.  The name is
+        converted to lower case and all underscores are replaced with
+        dashes.
+
+    *documentation*
+        Generated from the docstring.  The first line of the docstring
+        produces a one-line *hint* string, the rest of the docstring
+        produces a multi-line *help* string.
+
+    *arguments*
+        When the task is inferred from a function, the arguments are
+        generated from the function signature.  Each function parameter
+        becomes a task argument, those which have default values are
+        optional.
+
+        If the task is inferred from a class, the arguments must be
+        specified using the ``argument()`` descriptor.
+
+    *options*
+        A task inferred from a function has no options.  A task inferred
+        from a class may have options specified using the ``option()``
+        descriptor.
+
+    When a task is executed, the wrapped object is invoked according
+    to the following rules:
+
+    * If the task is inferred from a function, parsed command-line
+      parameters are passed as the function arguments.
+    * If the task is inferred from a class, command-line parameters
+      are passed to the class constructor, then the ``__call__``
+      method is called on the instance.
+
+``@setting``
+
+    The ``@setting`` decorator converts the wrapped function to a
+    configuration parameter, which properties are inferred from the
+    function attributes.
+
+    The setting name is generated from the function name.  The name is
+    converted to lower case and has all underscores replaced with
+    dashes.
+
+    The setting documentation is generated from the function docstring.
+
+    The function must be able to accept zero and one parameter.  The
+    function is called at startup with no parameters if the setting is
+    not explicitly set by the user; otherwise it is called with the
+    value of the setting.  The function is responsible for storing the
+    value in the ``env`` object.
+
+``argument(check=None, default=None, is_optional=False, is_list=False)``
+
+    Describe a task argument.
+
+    ``check``
+        A function which is called to check and/or transform the
+        argument value.  The function must return the transformed value
+        or raise ``ValueError`` exception on error.
+
+    ``default``
+        The default value to be used if the argument is optional and not
+        specified.
+
+    ``is_optional``
+        If set, the argument can be omitted.  An optional argument must
+        not appear before any mandatory argument.
+
+    ``is_list``
+        If set, the argument consumes all the remaining command-line
+        parameters.  Must be the last argument specified.
+
+
+``option(key=None, check=None, default=None, has_value=False, value_name=None, hint=None)``
+
+    Describe a task option.
+
+    ``key``
+        A one-character shorthand.
+
+    ``check``
+        A function called to check and transform the value of the option.
+        The function must return the transformed value or raise ``ValueError``
+        exception on error.
+
+    ``default``
+        The default value used when the option is not specified.
+
+    ``has_value``
+        If set, the option takes a value.  If not set, the option is a
+        toggle with the value of ``False`` if the option is not
+        specified; ``True`` otherwise.
+
+    ``value_name``
+        The preferred name for the option value; used for the task
+        description.
+
+    ``hint``
+        A one-line description of the option; used for the task
+        description.
+
+``env``
+
+    A global object that keeps values of configuration parameters and
+    other properties.
+
+    ``env.add(**keywords)``
+        Add new parameters.
+
+    ``env.set(**keywords)``
+        Set values for existing parameters.
+
+    ``env.push(**keywords)``
+        Save the current state and set new values for existing parameters.
+
+    ``env.pop()``
+        Restore a previously saved state of parameters and values.
+
+    ``env(**keywords)``
+        A context manager for ``with`` statement.  On entering, saves
+        the current state and sets new parameter values.  On exiting,
+        restores the saved state.
+
+``log(*msgs, sep=' ', end='\n', file=sys.stdout)``
+    Print the data to the standard output.
+
+    ``log()`` has the interface and behavior similar to the standard
+    ``print()`` function.
+
+    ``log()`` supports styling: a substring of the form::
+
+        `...`
+
+    or::
+
+        :fmt:`...`
+
+    is colorized when displayed on a color terminal.  The supported
+    formats are: *default* (white), ``footnote`` (dark grey),
+    ``warning`` (red), ``success`` (green).
+
+``debug(*msgs, sep=' ', end='\n', file=sys.stderr)``
+    Print the data when the ``env.debug`` parameter is set.  We
+    recommend to accompany any permanent change to the filesystem or
+    other system state with a respective ``debug()`` call.
+
+    Add command-line parameter ``--debug`` or set environment variable
+    ``DDT_DEBUG=1`` to see debug output.
+
+``warn(*msgs, sep=' ', end='\n', file=sys.stderr)``
+    Display a warning.  ``warn()`` should be used for reporting error
+    conditions which do not prevent the script from continuing the job.
+
+``fail(*msgs, sep=' ', end='\n', file=sys.stderr)``
+    Display an error message and return an exception object.  It should
+    be used in the following manner::
+
+        raise fail("no more beer in the refrigerator")
+
+``cp(src, dst)``
+    Copy a file or a directory tree.
+
+``mv(src, dst)``
+    Move a file or a directory tree.
+
+``rm(path)``
+    Remove a file.
+
+``mktree(path)``
+    Create all directories in the path.
+
+``rmtree()``
+    Remove a directory tree.
+
+``exe(cmd)``
+    Replace the current process with the given shell command.
+
+``sh(cmd, data=None, cd=None)``
+    Execute a shell command with the given input and working directory.
+
+``pipe(cmd, data=None, cd=None)``
+    Execute a shell command with the given input and working directory;
+    return the command output.
+
+
+.. vim: set spell spelllang=en textwidth=72:

File setup.py

View file
+#
+# Copyright (c) 2012, Prometheus Research, LLC
+# Released under MIT license, see `LICENSE` for details.
+#
+
+
+from setuptools import setup, find_packages
+
+
+NAME = "DDT"
+VERSION = "0.1.1" # FIXME: synchronize with `ddt.__version__`?
+DESCRIPTION = """Development, Deployment and Testing automation toolkit"""
+LONG_DESCRIPTION = open('README', 'r').read()
+AUTHOR = """Kirill Simonov (Prometheus Research, LLC)"""
+LICENSE = "MIT"
+PACKAGES = find_packages()
+PACKAGE_DIR = {'': 'src'}
+INSTALL_REQUIRES = ['setuptools', 'PyYAML']
+ENTRY_POINTS = {
+    'console_scripts': [
+        'ddt = ddt.run:run',
+    ],
+    'ddt.extensions': [],
+}
+
+
+setup(name=NAME,
+      version=VERSION,
+      description=DESCRIPTION,
+      author=AUTHOR,
+      license=LICENSE,
+      packages=PACKAGES,
+      package_dir=PACKAGE_DIR,
+      install_requires=INSTALL_REQUIRES,
+      entry_points=ENTRY_POINTS)
+
+

File src/ddt/__init__.py

View file
+#
+# Copyright (c) 2012, Prometheus Research, LLC
+# Released under MIT license, see `LICENSE` for details.
+#
+
+
+from .run import main
+from .core import env, task, argument, option, setting
+from .out import debug, log, warn, fail
+
+__import__('ddt.std')
+
+

File src/ddt/core.py

View file
+#
+# Copyright (c) 2012, Prometheus Research, LLC
+# Released under MIT license, see `LICENSE` for details.
+#
+
+
+import collections
+import itertools
+import types
+import re
+import sys
+import os
+
+
+class Failure(Exception):
+    pass
+
+
+class Environment(object):
+
+    _state = {}
+    _state_stack = []
+
+    class _context(object):
+
+        def __init__(self, env, **kwds):
+            self.env = env
+            self.kwds = kwds
+
+        def __enter__(self):
+            self.env.push(**self.kwds)
+
+        def __exit__(self, exc_type, exc_value, exc_tb):
+            self.env.pop()
+
+    def __init__(self):
+        self.__dict__ = self._state
+
+    def add(self, **kwds):
+        for key in sorted(kwds):
+            assert key not in self.__dict__, \
+                    "duplicate parameter %r" % key
+            self.__dict__[key] = kwds[key]
+
+    def set(self, **kwds):
+        for key in sorted(kwds):
+            assert key in self.__dict__, \
+                    "unknown parameter %r" % key
+            self.__dict__[key] = kwds[key]
+
+    def push(self, **kwds):
+        self._state_stack.append(self._state)
+        self._state = self._state.copy()
+        self.__dict__ = self._state
+        self.set(**kwds)
+
+    def pop(self):
+        assert self._stack, "unbalanced pop()"
+        self._state = self._state_stack.pop()
+        self.__dict__ = self._state
+
+    def __call__(self, **kwds):
+        return self._context(self, **kwds)
+
+
+env = Environment()
+env.add(_task_by_name={},
+        _setting_by_name={},
+        debug=(os.environ.get('DDT_DEBUG') in ['true', '1']
+               or '--debug' in sys.argv))
+
+
+def _attr_to_name(attr):
+    # Convert an attribute name to a task/setting name.
+    return attr.lower().replace(' ', '-').replace('_', '-')
+
+
+def _doc_to_hint_and_help(doc):
+    # Convert a docstring to a hint line and a description.
+    hint = help = ""
+    if doc:
+        hint = doc.lstrip().splitlines()[0].rstrip()
+        lines = doc.strip().splitlines()[1:]
+        while lines and not lines[0].strip():
+            lines.pop(0)
+        while lines and not lines[-1].strip():
+            lines.pop(-1)
+        indent = None
+        for line in lines:
+            short_line = line.lstrip()
+            if short_line:
+                line_indent = len(line)-len(short_line)
+                if indent is None or line_indent < indent:
+                    indent = line_indent
+        if indent:
+            lines = [line[indent:] for line in lines]
+        help = "\n".join(lines)
+    return hint, help
+
+
+class argument(object):
+
+    COUNTER = itertools.count(1)
+    REQUIRED = object()
+
+    def __init__(self, check=None, default=REQUIRED,
+                 is_optional=False, is_list=False):
+        if default is self.REQUIRED:
+            default = None
+        else:
+            is_optional = True
+        self.check = check
+        self.default = default
+        self.is_optional = is_optional
+        self.is_list = is_list
+        self.order = next(self.COUNTER)
+
+    def __get__(self, instance, owner):
+        if instance is None:
+            return self
+        raise AttributeError("unset argument")
+
+
+class option(object):
+
+    COUNTER = itertools.count(1)
+
+    def __init__(self, key=None, check=None, default=None,
+                 has_value=None, value_name=None, hint=None):
+        assert key is None or (isinstance(key, str) and
+                               re.match(r'^[a-zA-Z]$', key)), \
+                "key must be a letter, got %r" % key
+        if has_value is None:
+            if default is False:
+                has_value = False
+            else:
+                has_value = True
+        self.key = key
+        self.check = check
+        self.default = default
+        self.has_value = has_value
+        self.value_name = value_name
+        self.hint = hint
+        self.order = next(self.COUNTER)
+
+    def __get__(self, instance, owner):
+        if instance is None:
+            return self
+        raise AttributeError("unset option")
+
+
+argspec = collections.namedtuple('argspec',
+        ['name', 'attribute', 'check', 'default',
+         'is_optional', 'is_list', 'order'])
+optspec = collections.namedtuple('optspec',
+        ['name', 'key', 'attribute', 'check', 'default',
+         'has_value', 'value_name', 'hint', 'usage', 'order'])
+
+
+def task(T, is_default=False):
+    assert isinstance(T, (types.ClassType, types.TypeType, types.FunctionType)), \
+            "a task must be either a function or a class"
+    T_orig = T
+    if isinstance(T, types.FunctionType):
+        T_dict = {}
+        T_dict['__module__'] = T_orig.__module__
+        T_dict['__doc__'] = T_orig.__doc__
+        T_dict['_fn'] = staticmethod(T_orig)
+        T_dict['_vararg'] = None
+        def __init__(self, **params):
+            self._params = params
+        T_dict['__init__'] = __init__
+        def __call__(self):
+            args = ()
+            kwds = self._params.copy()
+            if self._vararg:
+                args = tuple(kwds.pop(self._vararg))
+            return self._fn(*args, **kwds)
+        T_dict['__call__'] = __call__
+        T = type(T_orig.__name__, (object,), T_dict)
+        code = T_orig.func_code
+        defaults = T_orig.func_defaults or ()
+        for idx, name in enumerate(code.co_varnames[:code.co_argcount]):
+            if idx < code.co_argcount-len(defaults):
+                arg = argument()
+            else:
+                default = defaults[idx-code.co_argcount+len(defaults)]
+                arg = argument(default=default, is_optional=True)
+            setattr(T, name, arg)
+        if code.co_flags & 0x04: # CO_VARARGS
+            name = code.co_varnames[code.co_argcount]
+            arg = argument(default=(), is_optional=True,
+                           is_list=True)
+            setattr(T, name, arg)
+            T._vararg = name
+    elif isinstance(T, types.ClassType):
+        T_dict = {}
+        T_dict['__module__'] = T_orig.__module__
+        T_dict['__doc__'] = T_orig.__doc__
+        T = type(T_orig.__name__, (T_orig, object), T_dict)
+    attrs = {}
+    for C in reversed(T.__mro__):
+        attrs.update(C.__dict__)
+    T._args = []
+    T._arg_by_name = {}
+    T._opts = []
+    T._opt_by_key = {}
+    T._opt_by_name = {}
+    for attr in sorted(attrs):
+        value = attrs[attr]
+        if isinstance(value, argument):
+            name = _attr_to_name(attr)
+            assert name not in T._arg_by_name, \
+                    "duplicate argument %s" % keyword
+            A = argspec(name, attr, value.check, value.default,
+                        value.is_optional, value.is_list, value.order)
+            T._args.append(A)
+            T._arg_by_name[A.name] = A
+        elif isinstance(value, option):
+            name = _attr_to_name(attr)
+            key = value.key
+            assert name not in T._opt_by_name, \
+                    "duplicate option --%s" % name
+            assert key is None or key not in T._opt_by_key, \
+                    "duplicate option -%s" % key
+            usage = "--%s" % name
+            if key:
+                usage = "-%s/%s" % (key, usage)
+            if value.has_value:
+                value_name = value.value_name or name
+                usage = "%s=%s" % (usage, value_name.upper())
+            O = optspec(name, key, attr, value.check, value.default,
+                        value.has_value, value.value_name, value.hint,
+                        usage, value.order)
+            T._opts.append(O)
+            T._opt_by_name[O.name] = O
+            if O.key:
+                T._opt_by_key[O.key] = O
+    T._args.sort(key=(lambda a: a.order))
+    T._opts.sort(key=(lambda o: o.order))
+    name = _attr_to_name(T.__name__)
+    optionals = 0
+    usage = name
+    for A in T._args:
+        if A.is_list:
+            assert A is T._args[-1], "wildcard argument must be the last"
+        if optionals:
+            assert A.is_optional, "mandatory argument must precede" \
+                    " all optional arguments"
+        if A.is_optional:
+            optionals += 1
+        if A.is_optional:
+            usage = "%s [<%s>" % (usage, A.name)
+        else:
+            usage = "%s <%s>" % (usage, A.name)
+        if A.is_list:
+            usage += "..."
+    if optionals:
+        usage += "]"*optionals
+    if is_default:
+        name = None
+    assert name not in env._task_by_name, \
+            "duplicate task %s" % name
+    T._name = name
+    T._usage = usage
+    T._hint, T._help = _doc_to_hint_and_help(T.__doc__)
+    env._task_by_name[name] = T
+    return T_orig
+
+
+def default_task(T):
+    return task(T, True)
+
+
+def setting(S):
+    assert isinstance(S, types.FunctionType), \
+            "a task must be a function"
+    code = S.func_code
+    varnames = code.co_varnames[:code.co_argcount]
+    defaults = S.func_defaults or ()
+    assert len(varnames) >= 1 and len(defaults) == len(varnames)
+    name = _attr_to_name(S.__name__)
+    S._name = name
+    S._hint, S._help = _doc_to_hint_and_help(S.__doc__)
+    value_name = _attr_to_name(varnames[0])
+    default = defaults[0]
+    has_value = default is not False
+    if has_value:
+        usage = "--%s=%s" % (name, value_name.upper())
+        usage_conf = "%s: %s" % (name, value_name.upper())
+        usage_environ = "DDT_%s=%s" % (name.upper().replace('-', '_'),
+                                       value_name.upper())
+    else:
+        usage = "--%s" % name
+        usage_conf = "%s: true" % name
+        usage_environ = "DDT_%s=1" % name.upper().replace('-', '_')
+    S._has_value = has_value
+    S._value_name = value_name
+    S._usage = usage
+    S._usage_conf = usage_conf
+    S._usage_environ = usage_environ
+    assert name not in env._setting_by_name, "duplicate setting %s" % name
+    env._setting_by_name[name] = S
+    return S
+
+

File src/ddt/fs.py

View file
+#
+# Copyright (c) 2012, Prometheus Research, LLC
+# Released under MIT license, see `LICENSE` for details.
+#
+
+
+from ddt.core import env
+from ddt.out import debug, fail
+import sys
+import glob
+import os
+import shutil
+import subprocess
+
+
+def ls(pattern='.'):
+    # List files matching the pattern.
+    return sorted(glob.glob(pattern))
+
+
+def cp(src_path, dst_path):
+    # Copy a file or a directory.
+    debug("cp `%s` `%s`" % (src_path, dst_path))
+    shutil.copy(src_path, dst_path)
+
+
+def mv(src_path, dst_path):
+    # Rename a file.
+    debug("mv `%s` `%s`" % (src_path, dst_path))
+    os.rename(src_path, dst_path)
+
+
+def rm(path):
+    # Remove a file.
+    debug("rm `%s`" % path)
+    os.unlink(path)
+
+
+def rmtree(path):
+    # Remove a directory tree.
+    debug("rmtree `%s`" % path)
+    shutil.rmtree(path)
+
+
+def mktree(path):
+    # Create a directory tree.
+    if not os.path.isdir(path):
+        debug("mktree `%s`" % path)
+        os.makedirs(path)
+
+
+def exe(cmd):
+    # Execute the command replacing the current process.
+    log("`%s`" % cmd)
+    line = cmd.split()
+    try:
+        os.execvp(line[0], line)
+    except OSError, exc:
+        raise fail(exc)
+
+
+def sh(cmd, data=None, cd=None):
+    # Execute a command using shell.
+    if cd is not None:
+        cmd = "cd %s && %s" % (cd, cmd)
+    stream = subprocess.PIPE
+    if env.debug:
+        stream = None
+    debug("`sh %s`" % cmd)
+    proc = subprocess.Popen(cmd, shell=True, stdin=stream,
+                            stdout=stream, stderr=stream)
+    proc.communicate(data)
+    if proc.returncode != 0:
+        raise fail("`sh %s`: non-zero exit code" % cmd)
+
+
+def pipe(cmd, data=None, cd=None):
+    # Execute the command, return the output.
+    if cd is not None:
+        cmd = "cd %s && %s" % (cd, cmd)
+    stream = subprocess.PIPE
+    debug("`| %s`" % cmd)
+    proc = subprocess.Popen(cmd, shell=True,
+                            stdout=stream, stderr=stream)
+    out, err = proc.communicate(data)
+    if proc.returncode != 0:
+        if env.debug:
+            if out:
+                sys.stdout.write(out)
+            if err:
+                sys.stderr.write(err)
+        raise fail("`| %s`: non-zero exit code" % cmd)
+    return out
+
+

File src/ddt/local/__init__.py

View file
+#
+# Copyright (c) 2012, Prometheus Research, LLC
+# Released under MIT license, see `LICENSE` for details.
+#
+
+
+# This is a dummy package for extensions loaded from `ddt.local`
+# subdirectory.
+
+

File src/ddt/out.py

View file
+#
+# Copyright (c) 2012, Prometheus Research, LLC
+# Released under MIT license, see `LICENSE` for details.
+#
+
+
+from ddt.core import Failure, env
+import sys
+import os
+import re
+
+
+S_RESET = 0
+S_BRIGHT = 1
+S_DIM = 2
+S_UNDERSCORE = 4
+S_BLINK = 5
+S_REVERSE = 7
+S_HIDDEN = 8
+FG_BLACK = 30
+FG_RED = 31
+FG_GREEN = 32
+FG_YELLOW = 33
+FG_BLUE = 34
+FG_MAGENTA = 35
+FG_CYAN = 36
+FG_WHITE = 37
+BG_BLACK = 40
+BG_RED = 41
+BG_GREEN = 42
+BG_YELLOW = 43
+BG_BLUE = 44
+BG_MAGENTA = 45
+BG_CYAN = 46
+BG_WHITE = 47
+
+
+styles = {
+    None: [S_BRIGHT],
+    'footnote': [S_DIM],
+    'warning': [S_BRIGHT, FG_RED],
+    'success': [S_BRIGHT, FG_GREEN],
+}
+
+
+def colorize(msg, has_colors=True):
+    def replace(match):
+        indicator = match.group('indicator')
+        data = match.group('data')
+        assert indicator in styles, "unknown style %r" % indicator
+        if not has_colors:
+            return data
+        lesc = "\x1b[%sm" % ";".join(str(style)
+                                     for style in styles[indicator])
+        resc = "\x1b[%sm" % S_RESET
+        return lesc+data+resc
+    return re.sub(r"(?::(?P<indicator>[a-zA-Z]+):)?`(?P<data>[^`]*)`",
+                  replace, msg)
+
+
+def log(*msgs, **opts):
+    sep = opts.pop('sep', " ")
+    end = opts.pop('end', "\n")
+    file = opts.pop('file', sys.stdout)
+    assert not opts, opts
+    has_colors = (file.isatty() and os.environ.get('COLORTERM'))
+    data = sep.join(colorize(str(msg), has_colors) for msg in msgs) + end
+    file.write(data)
+    file.flush()
+
+
+def debug(*msgs, **opts):
+    if env.debug:
+        return log(":footnote:`...`", file=sys.stderr, *msgs, **opts)
+
+
+def warn(*msgs, **opts):
+    return log(":warning:`WARNING`:", file=sys.stderr, *msgs, **opts)
+
+
+def fail(*msgs, **opts):
+    log(":warning:`FATAL ERROR`:", file=sys.stderr, *msgs, **opts)
+    return Failure()
+
+
+def prompt(msg):
+    value = ""
+    while not value:
+        value = raw_input(msg+" ").strip()
+    return value
+
+

File src/ddt/run.py

View file
+#
+# Copyright (c) 2012, Prometheus Research, LLC
+# Released under MIT license, see `LICENSE` for details.
+#
+
+
+from ddt.core import env, Failure, _attr_to_name
+from ddt.out import debug, warn, fail
+import sys
+import os.path
+import collections
+import imputil
+import pkg_resources
+import yaml
+
+
+def main(argv):
+    # Load extensions from entry point `ddt.extensions`.
+    for entry in pkg_resources.iter_entry_points('ddt.extensions'):
+        debug("loading extensions from `%s`" % entry)
+        entry.load()
+    # Load extensions from `./ddt.local/`.
+    cwd = os.getcwd()
+    local_dir = os.path.join(cwd, 'ddt.local')
+    if os.path.isdir(local_dir):
+        local_package = os.path.join(local_dir, '__init__.py')
+        if not os.path.isfile(local_package):
+            warn("cannot load extensions from `%s`:"
+                 " missing `__init__.py`" % local_dir)
+        else:
+            uid = os.stat(local_package).st_uid
+            if not (uid == os.getuid() or uid == 0):
+                warn("cannot load extensions from `%s`:"
+                     " not owned by the user or the root" % local_dir)
+            else:
+                debug("loading extensions from `%s`" % local_dir)
+                from . import local
+                local.__path__.append(local_dir)
+                imputil._os_stat = os.stat
+                code = imputil.py_suffix_importer(local_package,
+                                                  os.stat(local_package),
+                                                  'ddt.local')[1]
+                exec code in local.__dict__
+
+    # Load configuration files.
+    settings = {}
+    conf_paths = []
+    conf_paths.append('/etc/ddt.conf')
+    conf_paths.append(os.path.join(sys.prefix, '/etc/ddt.conf'))
+    conf_paths.append(os.path.expanduser('~/.ddt.conf'))
+    conf_paths.append(os.path.abspath('./ddt.conf'))
+    for conf_path in conf_paths:
+        if not os.path.isfile(conf_path):
+            continue
+        debug("loading configuration from `%s`" % conf_path)
+        try:
+            data = yaml.load(open(conf_path, 'r'))
+        except yaml.YAMLError, exc:
+            warn("failed to load configuration from `%s`: %s"
+                 % (conf_path, exc))
+            continue
+        if data is None:
+            continue
+        if not isinstance(data, dict):
+            warn("ill-formed configuration file `%s`" % conf_path)
+            continue
+        for key in sorted(data):
+            if not isinstance(key, str):
+                warn("invalid setting `%r` in `%s`"
+                     % (key, conf_path))
+                continue
+            name = _attr_to_name(key)
+            if name not in env._setting_by_name:
+                warn("unknown setting `%s` in `%s`"
+                     % (key, conf_path))
+                continue
+            settings[name] = data[key]
+    for key in sorted(os.environ):
+        if key.startswith('DDT_'):
+            name = _attr_to_name(key[4:])
+            if name not in env._setting_by_name:
+                warn("unknown configuration parameter `%s`"
+                     " in the environment" % key)
+                continue
+            settings[name] = os.environ[key]
+
+    # Parse command-line parameters.
+    T = None
+    args = {}
+    opts = {}
+    no_more_opts = False
+    argv = argv[1:]
+    while argv:
+        arg = argv.pop(0)
+        if arg == '--':
+            no_more_opts = True
+        elif arg.startswith('--') and not no_more_opts:
+            if '=' in arg:
+                key, value = arg.split('=', 1)
+                no_value = False
+            else:
+                key = arg
+                value = None
+                no_value = True
+            name = _attr_to_name(key[2:])
+            if name in env._setting_by_name:
+                S = env._setting_by_name[name]
+                if S._has_value and no_value:
+                    if not argv:
+                        raise fail("missing value for setting `%s`" % key)
+                    value = argv.pop(0)
+                    no_value = False
+                if not S._has_value:
+                    if not no_value:
+                        raise fail("unexpected value for a toggle setting `%s`"
+                                   % key)
+                    value = True
+                settings[name] = value
+            else:
+                if T is None:
+                    T = env._task_by_name[None]
+                if name not in T._opt_by_name:
+                    raise fail("unknown option or setting `%s`" % key)
+                O = T._opt_by_name[name]
+                if O.has_value and no_value:
+                    if not argv:
+                        raise fail("missing value for option `%s`" % key)
+                    value = argv.pop(0)
+                    no_value = False
+                if not O.has_value:
+                    if not no_value:
+                        raise fail("unexpected value for a toggle option `%s`"
+                                   % key)
+                    value = True
+                if O.attribute in opts:
+                    raise fail("duplicate option `%s`" % key)
+                opts[O.attribute] = value
+        elif arg.startswith('-') and arg != '-' and not no_more_opts:
+            if T is None:
+                T = env._task_by_name[None]
+            keys = arg[1:]
+            while keys:
+                key = keys[0]
+                keys = keys[1:]
+                if key not in T._opt_by_key:
+                    raise fail("unknown option `-%s`" % key)
+                O = T._opt_by_key[key]
+                if O.has_value:
+                    if keys:
+                        value = keys
+                        keys = ''
+                    else:
+                        if not argv:
+                            raise fail("missing value for option `-%s`" % key)
+                        value = argv.pop(0)
+                else:
+                    value = True
+                opts[O.attribute] = value
+        elif T is None:
+            if arg == '-':
+                T = env._task_by_name[None]
+            else:
+                name = _attr_to_name(arg)
+                if name not in env._task_by_name:
+                    raise fail("unknown task `%s`" % arg)
+                T = env._task_by_name[name]
+        else:
+            if arg == '-':
+                arg = None
+            for A in T._args:
+                if A.attribute not in args or A.is_list:
+                    break
+            else:
+                if T._name:
+                    raise fail("too many arguments for task `%s`"
+                               % T._name)
+                else:
+                    raise fail("too many arguments")
+            if A.is_list:
+                if A.attribute not in args:
+                    args[A.attribute] = []
+                args[A.attribute].append(arg)
+            else:
+                args[A.attribute] = arg
+    if T is None:
+        T = env._task_by_name[None]
+    for O in T._opts:
+        if O.attribute in opts:
+            if O.check is not None:
+                try:
+                    opts[O.attribute] = O.check(opts[O.attribute])
+                except ValueError, exc:
+                    raise fail("invalid option `--%s`: %s" % (O.name, exc))
+        else:
+            opts[O.attribute] = O.default
+    for A in T._args:
+        if A.attribute in args:
+            if A.check is not None:
+                try:
+                    args[A.attribute] = A.check(args[A.attribute])
+                except ValueError, exc:
+                    raise fail("invalid argument <%s>: %s" % (A.name, exc))
+        else:
+            if not A.is_optional:
+                if T._name:
+                    raise fail("not enough arguments for task `%s`"
+                               % T._name)
+                else:
+                    raise fail("not enough arguments")
+            args[A.attribute] = A.default
+
+    # Process settings.
+    for name in sorted(env._setting_by_name):
+        S = env._setting_by_name[name]
+        try:
+            if name in settings:
+                S(settings[name])
+            else:
+                S()
+        except ValueError, exc:
+            raise fail("invalid setting `%s`: %s" % (name, exc))
+
+    # Start the task.
+    kwds = {}
+    kwds.update(opts)
+    kwds.update(args)
+    try:
+        t = T(**kwds)
+    except ValueError, exc:
+        raise fail(str(exc))
+    exit = t()
+
+    return exit
+
+
+def run():
+    try:
+        return main(sys.argv)
+    except (Failure, IOError, KeyboardInterrupt), exc:
+        if env.debug:
+            raise
+        return exc
+
+

File src/ddt/std/__init__.py

View file
+#
+# Copyright (c) 2012, Prometheus Research, LLC
+# Released under MIT license, see `LICENSE` for details.
+#
+
+
+from ddt.core import env, task, default_task, argument, setting
+from ddt.out import log, fail
+
+
+@default_task
+class Usage(object):
+    """explain how to obtain help on DDT"""
+
+    # FIXME: implement common options: `--help`, `--version`, `--license`
+    #help = option()
+    #version = option()
+    #license = option()
+
+    def __init__(self, help=False, version=False, license=False):
+        self.help = help
+        self.version = version
+        self.license = license
+
+    def __call__(self):
+        #if self.help:
+        #    t = Help()
+        #    return t()
+        #if self.version:
+        #    t = Version()
+        #    return t()
+        #if self.license:
+        #    t = License()
+        #    return t()
+        log("DDT - Development, Deployment and Testing automation toolkit")
+        log("Copyright (c) 2012, Prometheus Research, LLC")
+        log()
+        log("Run `ddt help` for general usage and a list of tasks"
+            " and settings.")
+        log("Run `ddt help <task>` for help on a specific task.")
+
+
+@task
+class Help(object):
+    """display help on tasks and options
+
+    When started without arguments, displays a list of available tasks,
+    options and toggles.
+
+    When `<name>` is given, describes the usage of the specified task
+    or option.
+    """
+
+    name = argument(default=None)
+
+    def __init__(self, name):
+        self.name = name
+
+    def __call__(self):
+        if self.name is None:
+            return self.describe_all()
+        if self.name in env._task_by_name:
+            T = env._task_by_name[self.name]
+            return self.describe_task(T)
+        elif self.name in env._setting_by_name:
+            S = env._setting_by_name[self.name]
+            return self.describe_setting(S)
+        else:
+            raise fail("unknown task or setting `%s`" % self.name)
+
+    def describe_all(self):
+        log("DDT - Development, Deployment and Testing automation toolkit")
+        log("Copyright (c) 2012, Prometheus Research, LLC")
+        log("Usage: `ddt [<settings>...] <task> [<arguments>...]`")
+        log()
+        log("Run `ddt help` for general usage and a list of tasks"
+            " and settings.")
+        log("Run `ddt help <task>` for help on a specific task.")
+        log()
+        log("Available tasks:")
+        for name in sorted(env._task_by_name):
+            if not name:
+                continue
+            T = env._task_by_name[name]
+            usage = T._usage
+            hint = T._hint
+            if hint:
+                log("  %-24s : %s" % (usage, hint))
+            else:
+                log("  %s" % usage)
+        log()
+        log("Settings:")
+        for name in sorted(env._setting_by_name):
+            S = env._setting_by_name[name]
+            usage = S._usage
+            hint = S._hint
+            if hint:
+                log("  %-24s : %s" % (usage, hint))
+            else:
+                log("  %s" % usage)
+        log()
+
+    def describe_task(self, T):
+        name = T._name
+        hint = T._hint
+        if hint:
+            log("%s - %s" % (name.upper(), hint))
+        else:
+            log(name.upper())
+        usage = T._usage
+        log("Usage: `ddt %s`" % usage)
+        log()
+        help = T._help
+        if help:
+            log(help)
+            log()
+        options = T._opts
+        if options:
+            log("Options:")
+            for O in options:
+                usage = O.usage
+                hint = O.hint
+                if hint:
+                    log("  %-24s : %s" % (usage, hint))
+                else:
+                    log("  %s" % usage)
+
+    def describe_setting(self, S):
+        name = S._name
+        hint = S._hint
+        if hint:
+            log("%s - %s" % (name.upper(), hint))
+        else:
+            log(name.upper())
+        usage = S._usage
+        usage_conf = S._usage_conf
+        usage_environ = S._usage_environ
+        log("Usage: `ddt %s`" % usage)
+        log("       `%s` (ddt.conf)" % usage_conf)
+        log("       `%s` (environment)" % usage_environ)
+        log()
+        help = S._help
+        if help:
+            log(help)
+            log()
+
+
+@setting
+def Debug(value=False):
+    """print debug information"""
+    if value is None or value in ['false', '', '0', 0]:
+        value = False
+    if value in ['true', '1', 1]:
+        value = True
+    if not isinstance(value, bool):
+        raise ValueError("debug: expected a Boolean value; got %r" % value)
+    env.set(debug=value)
+
+