argh / argh / helpers.py

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
# -*- coding: utf-8 -*-
#
#  Copyright (c) 2010—2012 Andrey Mikhailenko and contributors
#
#  This file is part of Argh.
#
#  Argh is free software under terms of the GNU Lesser
#  General Public License version 3 (LGPLv3) as published by the Free
#  Software Foundation. See the file README for copying conditions.
#
"""
Helpers
=======
"""
import argparse
import locale
import sys
from types import GeneratorType

from argh.six import b, u, string_types, text_type, BytesIO, PY3
from argh.exceptions import CommandError
from argh.utils import get_subparsers
from argh.completion import autocomplete
from argh.constants import (
    ATTR_ALIAS, ATTR_ARGS, ATTR_NO_NAMESPACE, ATTR_WRAPPED_EXCEPTIONS
)


if PY3:
    def raw_input(text):
        return input(text.decode())


__all__ = [
    'ArghParser', 'add_commands', 'autocomplete', 'confirm', 'dispatch',
    'dispatch_command', 'set_default_command', 'wrap_errors'
]


def set_default_command(parser, function):
    """ Sets default command (i.e. a function) for given parser.

    .. note::

       An attempt to set default command to a parser which already has
       subparsers (e.g. added with :func:`~argh.helpers.add_commands`)
       results in a `RuntimeError`.

    """
    if parser._subparsers:
        raise RuntimeError('Cannot set default command to a parser with '
                           'existing subparsers')

    for a_args, a_kwargs in getattr(function, ATTR_ARGS, []):
        parser.add_argument(*a_args, **a_kwargs)
    parser.set_defaults(function=function)


def add_commands(parser, functions, namespace=None, title=None,
                 description=None, help=None):
    """Adds given functions as commands to given parser.

    :param parser:

        an :class:`argparse.ArgumentParser` instance.

    :param functions:

        a list of functions. A subparser is created for each of them. If the
        function is decorated with :func:`arg`, the arguments are passed to
        the :class:`~argparse.ArgumentParser.add_argument` method of the
        parser. See also :func:`dispatch` for requirements concerning function
        signatures. The command name is inferred from the function name. Note
        that the underscores in the name are replaced with hyphens, i.e.
        function name "foo_bar" becomes command name "foo-bar".

    :param namespace:

        an optional string representing the group of commands. For example, if
        a command named "hello" is added without the namespace, it will be
        available as "prog.py hello"; if the namespace if specified as "greet",
        then the command will be accessible as "prog.py greet hello". The
        namespace itself is not callable, so "prog.py greet" will fail and only
        display a help message.

    Help message for a namespace can be also tuned with these params (provided
    that you specify the `namespace`):

    :param title:

        passed to :meth:`argsparse.ArgumentParser.add_subparsers` as `title`.

    :param description:

        passed to :meth:`argsparse.ArgumentParser.add_subparsers` as
        `description`.

    :param help:

        passed to :meth:`argsparse.ArgumentParser.add_subparsers` as `help`.

    .. note::

        This function modifies the parser object. Generally side effects are
        bad practice but we don't seem to have any choice as ArgumentParser is
        pretty opaque. You may prefer :class:`ArghParser.add_commands` for a
        bit more predictable API.

    .. admonition:: Design flaw

        This function peeks into the parser object using its internal API.
        Unfortunately the public API does not allow to *get* the subparsers, it
        only lets you *add* them, and do that *once*. So you would have to toss
        the subparsers object around to add something later. That said, I doubt
        that argparse will change a lot in the future as it's already pretty
        stable. If some implementation details would change and break `argh`,
        we'll simply add a workaround a keep it compatibile.

    .. note::

       An attempt to add commands to a parser which already has a default
       function (e.g. added with :func:`~argh.helpers.set_default_command`)
       results in a `RuntimeError`.

    """
    if 'function' in parser._defaults:
        raise RuntimeError('Cannot add commands to a single-command parser')

    subparsers = get_subparsers(parser, create=True)

    if namespace:
        # make a namespace placeholder and register the commands within it
        assert isinstance(namespace, string_types)
        subsubparser = subparsers.add_parser(namespace, help=title)
        subparsers = subsubparser.add_subparsers(title=title,
                                                 description=description,
                                                 help=help)
    else:
        assert not any([title, description, help]), (
            'Arguments "title", "description" or "extra_help" only make sense '
            'if provided along with a namespace.')

    for func in functions:
        # XXX we could add multiple aliases here but it's a bit of a hack
        cmd_name = getattr(func, ATTR_ALIAS, func.__name__.replace('_','-'))
        cmd_help = func.__doc__
        command_parser = subparsers.add_parser(cmd_name, help=cmd_help)
        set_default_command(command_parser, func)


def dispatch_command(function, *args, **kwargs):
    """ A wrapper for :func:`dispatch` that creates a one-command parser.

    This::

        @command
        def foo():
            return 1

        dispatch_command(foo)

    ...is a shortcut for::

        @command
        def foo():
            return 1

        parser = ArgumentParser()
        set_default_command(parser, foo)
        dispatch(parser)

    This function can also be used as a decorator. Here's a more or less
    sensible example::

        from argh import *

        @dispatch_command
        @arg('name')
        def main(args):
            return args.name

    """
    parser = argparse.ArgumentParser()
    set_default_command(parser, function)
    dispatch(parser, *args, **kwargs)


def dispatch(parser, argv=None, add_help_command=True, encoding=None,
             completion=True, pre_call=None, output_file=sys.stdout,
             raw_output=False, namespace=None):
    """Parses given list of arguments using given parser, calls the relevant
    function and prints the result.

    The target function should expect one positional argument: the
    :class:`argparse.Namespace` object. However, if the function is decorated with
    :func:`plain_signature`, the positional and named arguments from the
    namespace object are passed to the function instead of the object itself.

    :param parser:

        the ArgumentParser instance.

    :param argv:

        a list of strings representing the arguments. If `None`, ``sys.argv``
        is used instead. Default is `None`.

    :param add_help_command:

        if `True`, converts first positional argument "help" to a keyword
        argument so that ``help foo`` becomes ``foo --help`` and displays usage
        information for "foo". Default is `True`.

    :param encoding:

        Encoding for results. If `None`, it is determined automatically.
        Default is `None`.

    :param output_file:

        A file-like object for output. If `None`, the resulting lines are
        collected and returned as a string. Default is ``sys.stdout``.

    :param raw_output:

        If `True`, results are written to the output file raw, without adding
        whitespaces or newlines between yielded strings. Default is `False`.

    :param completion:

        If `True`, shell tab completion is enabled. Default is `True`. (You
        will also need to install it.)

    By default the exceptions are not wrapped and will propagate. The only
    exception that is always wrapped is :class:`CommandError` which is
    interpreted as an expected event so the traceback is hidden. You can also
    mark arbitrary exceptions as "wrappable" by using the :func:`wrap_errors`
    decorator.
    """
    if completion:
        autocomplete(parser)

    if argv is None:
        argv = sys.argv[1:]

    if add_help_command:
        if argv and argv[0] == 'help':
            argv.pop(0)
            argv.append('--help')

    # this will raise SystemExit if parsing fails
    args = parser.parse_args(argv, namespace=namespace)

    if hasattr(args, 'function'):
        if pre_call:  # XXX undocumented because I'm unsure if it's OK
            pre_call(args)
        lines = _execute_command(args)
    else:
        # no commands declared, can't dispatch; display help message
        lines = [parser.format_usage()]

    if output_file is None:
        # user wants a string; we create an internal temporary file-like object
        # and will return its contents as a string
        f = BytesIO()
    else:
        # normally this is stdout; can be any file
        f = output_file

    for line in lines:
        # print the line as soon as it is generated to ensure that it is
        # displayed to the user before anything else happens, e.g.
        # raw_input() is called
        output = _encode(line, f, encoding)
        output = '' if output is None else output
        f.write(output)
        if not raw_output:
            # in most cases user wants on message per line
            f.write(b('\n'))

    if output_file is None:
        # user wanted a string; return contents of our temporary file-like obj
        f.seek(0)
        return f.read()


def _encode(line, output_file, encoding=None):
    """Converts given string to given encoding. If no encoding is specified, it
    is determined from terminal settings or, if none, from system settings.
    """
    # Convert string to Unicode
    if not isinstance(line, text_type):
        try:
            line = text_type(line)
        except UnicodeDecodeError:
            line = b(line).decode('utf-8')

    # Choose output encoding
    if not encoding:
        # choose between terminal's and system's preferred encodings
        if output_file.isatty():
            encoding = getattr(output_file, 'encoding', None)
        encoding = encoding or locale.getpreferredencoding()

    # Convert string from Unicode to the output encoding
    return line.encode(encoding)


def _execute_command(args):
    """Asserts that ``args.function`` is present and callable. Tries different
    approaches to calling the function (with an `argparse.Namespace` object or
    with ordinary signature). Yields the results line by line. If CommandError
    is raised, its message is appended to the results (i.e. yielded by the
    generator as a string). All other exceptions propagate unless marked as
    wrappable by :func:`wrap_errors`.
    """
    assert hasattr(args, 'function') and hasattr(args.function, '__call__')

    # the function is nested to catch certain exceptions (see below)
    def _call():
        # Actually call the function
        if getattr(args.function, ATTR_NO_NAMESPACE, False):
            # filter the namespace variables so that only those expected by the
            # actual function will pass
            f = args.function
            if hasattr(f, 'func_code'):
                # Python 2
                expected_args = f.func_code.co_varnames[:f.func_code.co_argcount]
            else:
                # Python 3
                expected_args = f.__code__.co_varnames[:f.__code__.co_argcount]
            ok_args = [x for x in args._get_args() if x in expected_args]
            ok_kwargs = dict((k,v) for k,v in args._get_kwargs()
                             if k in expected_args)
            result = args.function(*ok_args, **ok_kwargs)
        else:
            result = args.function(args)

        # Yield the results
        if isinstance(result, (GeneratorType, list, tuple)):
            # yield each line ASAP, convert CommandError message to a line
            for line in result:
                yield line
        else:
            # yield non-empty non-iterable result as a single line
            if result is not None:
                yield result

    wrappable_exceptions = [CommandError]
    wrappable_exceptions += getattr(args.function, ATTR_WRAPPED_EXCEPTIONS, [])

    try:
        result = _call()
        for line in result:
            yield line
    except tuple(wrappable_exceptions) as e:
        yield text_type(e)


class ArghParser(argparse.ArgumentParser):
    """An :class:`ArgumentParser` suclass which adds a couple of convenience
    methods.

    There is actually no need to subclass the parser. The methods are but
    wrappers for stand-alone functions :func:`add_commands` ,
    :func:`autocomplete` and :func:`dispatch`.
    """
    def set_default_command(self, *args, **kwargs):
        "Wrapper for :func:`set_default_command`."
        return set_default_command(self, *args, **kwargs)

    def add_commands(self, *args, **kwargs):
        "Wrapper for :func:`add_commands`."
        return add_commands(self, *args, **kwargs)

    def autocomplete(self):
        return autocomplete(self)

    def dispatch(self, *args, **kwargs):
        "Wrapper for :func:`dispatch`."
        return dispatch(self, *args, **kwargs)


def confirm(action, default=None, skip=False):
    """A shortcut for typical confirmation prompt.

    :param action:

        a string describing the action, e.g. "Apply changes". A question mark
        will be appended.

    :param default:

        `bool` or `None`. Determines what happens when user hits :kbd:`Enter`
        without typing in a choice. If `True`, default choice is "yes". If
        `False`, it is "no". If `None` the prompt keeps reappearing until user
        types in a choice (not necessarily acceptable) or until the number of
        iteration reaches the limit. Default is `None`.

    :param skip:

        `bool`; if `True`, no interactive prompt is used and default choice is
        returned (useful for batch mode). Default is `False`.

    Usage::

        @arg('key')
        @arg('--silent', help='do not prompt, always give default answers')
        def delete(args):
            item = db.get(Item, args.key)
            if confirm('Delete '+item.title, default=True, skip=args.silent):
                item.delete()
                print('Item deleted.')
            else:
                print('Operation cancelled.')

    Returns `None` on `KeyboardInterrupt` event.
    """
    MAX_ITERATIONS = 3
    if skip:
        return default
    else:
        defaults = {
            None: ('y','n'),
            True: ('Y','n'),
            False: ('y','N'),
        }
        y, n = defaults[default]
        prompt = u('{action}? ({y}/{n})').format(**locals()).encode('utf-8')
        choice = None
        try:
            if default is None:
                cnt = 1
                while not choice and cnt < MAX_ITERATIONS:
                    choice = raw_input(prompt)
                    cnt += 1
            else:
                choice = raw_input(prompt)
        except KeyboardInterrupt:
            return None
    if choice in ('yes', 'y', 'Y'):
        return True
    if choice in ('no', 'n', 'N'):
        return False
    if default is not None:
        return default
    return None


def wrap_errors(*exceptions):
    """Decorator. Wraps given exceptions into :class:`CommandError`. Usage::

        @arg('-x')
        @arg('-y')
        @wrap_errors(AssertionError)
        def foo(args):
            assert args.x or args.y, 'x or y must be specified'

    If the assertion fails, its message will be correctly printed and the
    stack hidden. This helps to avoid boilerplate code.
    """
    def wrapper(func):
        setattr(func, ATTR_WRAPPED_EXCEPTIONS, exceptions)
        return func
    return wrapper
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.