Commits

Sebastian Wiesner committed 7e0958f

Handle unexpected return codes more gracefully

Instead of raises CalledProcessError, simply emit a Sphinx warning, if a
command returned a non-zero code.

Comments (0)

Files changed (5)

programoutput/CHANGES.rst

 0.5 (in development)
 ====================
 
+- Warn on unexpected return codes instead of raising
+  :exc:`~subprocess.CalledProcessError`
+
 
 0.4.1 (Mar 11, 2011)
 ====================

programoutput/sphinxcontrib/programoutput.py

 
 import sys
 import shlex
-from subprocess import Popen, CalledProcessError, PIPE, STDOUT
-from collections import defaultdict
+from subprocess import Popen, PIPE, STDOUT
+from collections import defaultdict, namedtuple
 
 from docutils import nodes
 from docutils.parsers import rst
         return [node]
 
 
+_Command = namedtuple('Command', 'command shell hide_standard_error')
+
+
+class Command(_Command): #pylint: disable=W0232
+    """
+    A command to be executed.
+    """
+
+    def __new__(cls, command, shell=False, hide_standard_error=False):
+        if isinstance(command, list):
+            command = tuple(command)
+        return _Command.__new__(cls, command, shell, hide_standard_error)
+
+    @classmethod
+    def from_program_output_node(cls, node):
+        """
+        Create a command from a :class:`program_output` node.
+        """
+        extraargs = node.get('extraargs', '')
+        command = (node['command'] + ' ' + extraargs).strip()
+        return cls(command, node['use_shell'], node['hide_standard_error'])
+
+    def execute(self):
+        """
+        Execute this command.
+
+        Return the :class:`~subprocess.Popen` object representing the running
+        command.
+        """
+        # pylint: disable=E1101
+        if isinstance(self.command, unicode):
+            command = self.command.encode(sys.getfilesystemencoding())
+        else:
+            command = self.command
+        if isinstance(command, basestring) and not self.shell:
+            command = shlex.split(command)
+        return Popen(command, shell=self.shell, stdout=PIPE,
+                     stderr=PIPE if self.hide_standard_error else STDOUT)
+
+    def get_output(self):
+        """
+        Get the output of this command.
+
+        Return a tuple ``(returncode, output)``.  ``returncode`` is the
+        integral return code of the process, ``output`` is the output as
+        unicode string, with final trailing spaces and new lines stripped.
+        """
+        process = self.execute()
+        output = process.communicate()[0].decode(
+            sys.getfilesystemencoding()).rstrip()
+        return process.returncode, output
+
+    def __str__(self):
+        # pylint: disable=E1101
+        if isinstance(self.command, tuple):
+            return repr(list(self.command))
+        return repr(self.command)
+
+
 class ProgramOutputCache(defaultdict): # pylint: disable=W0232
     """
-    :class:`collections.defaultdict` sub-class, which caches program output.
+    Execute command and cache their output.
 
-    If a program's output is not contained in this cache, the program is
-    executed, and its output is placed in the cache.
+    This class is a mapping.  Its keys are :class:`Command` objects represeting
+    command invocations.  Its values are tuples of the form ``(returncode,
+    output)``, where ``returncode`` is the integral return code of the command,
+    and ``output`` is the output as unicode string.
+
+    The first time, a key is retrieved from this object, the command is
+    invoked, and its result is cached.  Subsequent access to the same key
+    returns the cached value.
     """
 
-    def __missing__(self, key):
+    def __missing__(self, command):
         """
         Called, if a command was not found in the cache.
 
-        ``key`` is a triple of ``(cmd, shell, hide_stderr)``.  ``cmd`` is
-        the command tuple.  If ``shell`` is ``True``, the command is
-        executed in the shell, otherwise it is executed directly.  If
-        ``hide_stderr`` is ``True``, the standard error of the program is
-        discarded, otherwise it is included in the output.
+        ``command`` is an instance of :class:`Command`.
         """
-        cmd, shell, hide_stderr = key
-        proc = Popen(cmd, shell=shell, stdout=PIPE,
-                     stderr=PIPE if hide_stderr else STDOUT)
-        stdout = proc.communicate()[0].decode(
-            sys.getfilesystemencoding()).rstrip()
-        if proc.returncode != 0:
-            raise CalledProcessError(proc.returncode, cmd)
-        self[key] = stdout
-        return stdout
+        result = command.get_output()
+        self[command] = result
+        return result
 
 
 def run_programs(app, doctree):
     cache = app.env.programoutput_cache
 
     for node in doctree.traverse(program_output):
-        command = node['command']
-        cmd_bytes = command.encode(sys.getfilesystemencoding())
+        command = Command.from_program_output_node(node)
+        returncode, output = cache[command]
 
-        extra_args = node.get('extraargs', '').encode(
-            sys.getfilesystemencoding())
-        if node['use_shell']:
-            cmd = cmd_bytes
-            if extra_args:
-                cmd += ' ' + extra_args
-        else:
-            cmd = shlex.split(cmd_bytes)
-            if extra_args:
-                cmd.extend(shlex.split(extra_args))
-            cmd = tuple(cmd)
-
-        cache_key = (cmd, node['use_shell'], node['hide_standard_error'])
-        output = cache[cache_key]
+        if returncode != 0:
+            app.warn('Command {0} failed with return code {1}'.format(
+                command, returncode))
 
         # replace lines with ..., if ellipsis is specified
         if 'strip_lines' in node:
 
         if node['show_prompt']:
             tmpl = app.config.programoutput_prompt_template
-            output = tmpl % dict(command=command, output=output)
+            output = tmpl % dict(command=node['command'], output=output,
+                                 returncode=returncode)
 
         new_node = node_class(output, output)
         new_node['language'] = 'text'

programoutput/tests/test_cache.py

 from __future__ import (print_function, division, unicode_literals,
                         absolute_import)
 
-from subprocess import CalledProcessError
-
 import pytest
 
-from sphinxcontrib.programoutput import ProgramOutputCache
+from sphinxcontrib.programoutput import ProgramOutputCache, Command
 
 
 def pytest_funcarg__cache(request): # pylint: disable=W0613
     return ProgramOutputCache()
 
 
+def assert_cache(cache, cmd, output, returncode=0):
+    result = (returncode, output)
+    assert not cache
+    assert cache[cmd] == result
+    assert cache == {cmd: result}
+
+
 def test_simple(cache):
-    key = (('echo', 'spam'), False, False)
-    assert not cache
-    assert cache[key] == 'spam'
-    assert cache == {key: 'spam'}
+    assert_cache(cache, Command(['echo', 'spam']), 'spam')
 
 
 def test_shell(cache):
-    key = ('echo spam', True, False)
-    assert not cache
-    assert cache[key] == 'spam'
-    assert cache == {key: 'spam'}
+    assert_cache(cache, Command('echo spam', shell=True), 'spam')
 
 
 def test_hidden_standard_error(cache):
-    key = (('python', '-c', 'import sys; sys.stderr.write("spam")'),
-           False, True)
-    assert not cache
-    assert cache[key] == ''
-    assert cache == {key: ''}
+    cmd = ['python', '-c', 'import sys; sys.stderr.write("spam")']
+    assert_cache(cache, Command(cmd, hide_standard_error=True), '')
 
 
 def test_nonzero_return_code(cache):
-    key = (('python', '-c', 'import sys; sys.exit(1)'), False, False)
-    assert not cache
-    with pytest.raises(CalledProcessError) as excinfo:
-        cache[key] # pylint: disable=W0104
-    assert not cache
-    exc = excinfo.value
-    assert exc.cmd == key[0]
-    assert exc.returncode == 1
+    cmd = ['python', '-c', 'import sys; sys.exit(1)']
+    assert_cache(cache, Command(cmd), '', returncode=1)
+
+
+def test_nonzero_return_code_shell(cache):
+    cmd = "python -c 'import sys; sys.exit(1)'"
+    assert_cache(cache, Command(cmd, shell=True), '', returncode=1)
 
 
 @pytest.mark.with_content('dummy content')
 def test_cache_pickled(app, doctreedir):
-    key = (('echo', 'spam'), False, False)
-    assert app.env.programoutput_cache[key] == 'spam'
+    cmd = Command(['echo', 'spam'])
+    result = (0, 'spam')
+    assert app.env.programoutput_cache[cmd] == result
     app.build()
     pickled_env = doctreedir.join('environment.pickle').load()
-    assert pickled_env.programoutput_cache == {key: 'spam'}
+    assert pickled_env.programoutput_cache == {cmd: result}

programoutput/tests/test_command.py

+# -*- coding: utf-8 -*-
+# Copyright (c) 2011, Sebastian Wiesner <lunaryorn@googlemail.com>
+# All rights reserved.
+
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are met:
+
+# 1. Redistributions of source code must retain the above copyright notice,
+#    this list of conditions and the following disclaimer.
+# 2. Redistributions in binary form must reproduce the above copyright
+#    notice, this list of conditions and the following disclaimer in the
+#    documentation and/or other materials provided with the distribution.
+
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+
+from __future__ import (print_function, division, unicode_literals,
+                        absolute_import)
+
+from sphinxcontrib.programoutput import Command, program_output
+
+
+def test_new_with_string_command():
+    cmd = 'echo "spam with eggs"'
+    assert Command(cmd).command == cmd
+    assert Command(cmd, shell=True).command == cmd
+
+
+def test_new_with_list():
+    cmd = Command(['echo', 'spam'])
+    assert cmd.command == ('echo', 'spam')
+
+
+def test_new_with_list_hashable():
+    """
+    Test that Command objects are hashable even when passed a non-hashable
+    list.  Important for caching!
+    """
+    hash(Command(['echo', 'spam']))
+
+
+def test_from_programoutput_node():
+    node = program_output()
+    node['command'] = 'echo spam'
+    node['use_shell'] = False
+    node['hide_standard_error'] = False
+    command = Command.from_program_output_node(node)
+    assert command.command == 'echo spam'
+    assert not command.shell
+    assert not command.hide_standard_error
+    node['use_shell'] = True
+    assert Command.from_program_output_node(node).shell
+    assert not Command.from_program_output_node(node).hide_standard_error
+    node['hide_standard_error'] = True
+    assert Command.from_program_output_node(node).hide_standard_error
+
+
+def test_from_programoutput_node_extraargs():
+    node = program_output()
+    node['command'] = 'echo spam'
+    node['use_shell'] = False
+    node['hide_standard_error'] = False
+    node['extraargs'] = 'with eggs'
+    command = Command.from_program_output_node(node)
+    assert command.command == 'echo spam with eggs'
+
+
+def test_execute():
+    process = Command('echo spam').execute()
+    assert process.stderr is None
+    assert not process.stdout.closed
+    assert process.wait() == 0
+
+
+def test_execute_with_shell():
+    process = Command('echo spam', shell=True).execute()
+    assert process.stderr is None
+    assert not process.stdout.closed
+    assert process.wait() == 0
+
+
+def test_execute_with_hidden_standard_error():
+    process = Command('echo spam', hide_standard_error=True).execute()
+    assert not process.stderr.closed
+    assert process.wait() == 0
+
+
+def test_get_output():
+    returncode, output = Command('echo spam').get_output()
+    assert returncode == 0
+    assert output == 'spam'
+
+
+def test_get_output_non_zero():
+    returncode, output = Command(
+        'python -c "import sys; print(\'spam\'); sys.exit(1)"').get_output()
+    assert returncode == 1
+    assert output == 'spam'
+
+
+def test_get_output_with_hidden_standard_error():
+    returncode, output = Command(
+        'python -c "import sys; sys.stderr.write(\'spam\')"',
+        hide_standard_error=True).get_output()
+    assert returncode == 0
+    assert output == ''

programoutput/tests/test_directive.py

 
 import os
 import sys
-from subprocess import CalledProcessError
 
 import pytest
+from sphinx.errors import SphinxWarning
 from docutils.nodes import literal_block
 
+from sphinxcontrib.programoutput import Command
+
 
 def assert_output(doctree, output):
     __tracebackhide__ = True
 
 
 def assert_cache(cache, cmd, output, use_shell=False,
-                 hide_standard_error=False):
-    if isinstance(cmd, list):
-        cmd = tuple(cmd)
-    cache_key = (cmd, use_shell, hide_standard_error)
-    assert cache[cache_key] == output
+                 hide_standard_error=False, returncode=0):
+    cache_key = Command(cmd, use_shell, hide_standard_error)
+    assert cache[cache_key] == (returncode, output)
 
 
 @pytest.mark.with_content('.. program-output:: echo eggs')
 @pytest.mark.with_content("""\
 .. program-output:: python -c 'import sys; sys.exit(1)'""")
 def test_non_zero_return_code(app):
-    with pytest.raises(CalledProcessError) as excinfo:
+    with pytest.raises(SphinxWarning) as excinfo:
         app.build()
-    exc = excinfo.value
-    assert exc.cmd == ('python', '-c', 'import sys; sys.exit(1)')
-    assert exc.returncode == 1
+    exc_message = 'WARNING: Command {0!r} failed with return code {1}\n'.format(
+        "python -c 'import sys; sys.exit(1)'", 1)
+    assert str(excinfo.value) == exc_message
 
 
 @pytest.mark.with_content("""\
 .. program-output:: python -c 'import sys; sys.exit(1)'
    :shell:""")
 def test_shell_non_zero_return_code(app):
-    with pytest.raises(CalledProcessError) as excinfo:
+    with pytest.raises(SphinxWarning) as excinfo:
         app.build()
-    exc = excinfo.value
-    assert exc.cmd == "python -c 'import sys; sys.exit(1)'"
-    assert exc.returncode == 1
+    exc_message = 'WARNING: Command {0!r} failed with return code {1}\n'.format(
+        "python -c 'import sys; sys.exit(1)'", 1)
+    assert str(excinfo.value) == exc_message