Child process stdout seems to be getting closed unexpectedly

Issue #6 resolved
Paul Moore created an issue

I wanted to use sarge to read process output line by line. As a small test case I created the attached files, yada.py and yadatest.py. The yada.py script writes its argument repeatedly, with a user-specified delay between each write. Standard output is flushed after each write. Used at the command line, this works as expected.

The yadatest script exercises yada.py via sarge, capturing stdout and displaying each line as it is received. Again, stdout is flushed after each write.

The yadatest script runs for a while (about 5 seconds) with no output, and then fails with the following error:

>python .\yadatest.py
Traceback (most recent call last):
  File "yada.py", line 11, in <module>
    print(args.text, flush=True)
OSError: [Errno 22] Invalid argument
Exception OSError: OSError(22, 'Invalid argument') in <_io.TextIOWrapper name='<stdout>' mode='w' encoding='cp1252'> ignored
0 foo

Did something close my child process' stdout? Note that there is no "Error encountered" message, implying that the failure was in the child, not in the parent.

Comments (6)

  1. Vinay Sajip repo owner

    The iterator doesn't give you enough control over e.g. timeouts - I'll probably need to rethink this part of the API. I'm guessing that the child process terminated because of a broken pipe error - you could confirm this by adding logging to it ;-) The following slightly modified yadatest.py, annotated to show why I made changes, behaves as expected with an unchanged yada.py:

    import sys
    from sarge import Capture, run
    from io import TextIOWrapper
    # If we don't indicate a buffer size for capturing, it'll default to 4096.
    # Using -1 means line-buffering in the client, in the background thread
    # which reads the child's output.
    c = Capture(buffer_size=-1)
    # capture_stdout() is the same as run() with a capture passed in stdout.
    # You need to specify async=True so that run doesn't wait for the
    # command to finish.
    p = run('%s yada.py -t 5 foo' % sys.executable, stdout=c, async=True)
    try:
        i = 0
        while True:
            # We don't use the iterator because we want to control timeouts. The
            # timeout is set higher than what yada outputs at - if you set it
            # smaller, you have to handle empty reads (because there was nothing
            # more from the child process to read, before the timeout expired).
            line = c.readline(timeout=10)
            print(i, line)
            i += 1
    except:
        print(">>> Error encountered")
        raise
    

    When this is run, it outputs

    0 b'foo\n'
    1 b'foo\n'
    2 b'foo\n'
    3 b'foo\n'
    ...
    

    at 5-second intervals.

    By the way, when looking into this I encountered a type error in an assignment (which is fixed in 92eed70, but not related to this issue). With that version of sarge, and a timeout of 2 instead of in yadatest.py, you get output like

    0 b'foo\n'
    1 b''
    2 b'foo\n'
    3 b''
    4 b'foo\n'
    5 b''
    6 b'foo\n'
    7 b''
    8 b'foo\n'
    

    with a line being produced every 2 seconds.

    BTW the distribution contains a file, lister.py, which sort of does what yada.py does (but can also print successive lines from a file if specified) - this is used in the test for the expect functionality.

  2. Paul Moore reporter

    Thanks for the clarification. I guess the key thing to take from this is that sarge doesn't (currently) really support this scenario "out of the box". That's fine, I was mainly evaluating its suitability for something I'm looking at and "not suitable yet" is a perfectly good answer :-)

    I'm not 100% sure I understand the need for timeouts, though. The corresponding test harness using subprocess works fine:

    from subprocess import Popen, PIPE
    from io import TextIOWrapper
    p = Popen(['py', 'yada.py', '-t', '5', 'foo'], stdout=PIPE)
    try:
        for i, line in enumerate(TextIOWrapper(p.stdout)):
            print(i, line, flush=True)
    except:
        print(">>> Error encountered")
        raise
    

    Maybe I'm missing something about why a sarge process has to be a bit more complex than this.

  3. Vinay Sajip repo owner

    Maybe I'm missing something about why a sarge process has to be a bit more complex than this.

    In the example you give, you're happy to wait, perhaps indefinitely, until the child process produces some output, and that's fine in this simple scenario. There are scenarios where you may not be happy to wait forever (perhaps the child process has hung, and needs restarting, or you need to get an answer from somewhere else) and that level of control is missing from subprocess - you have no non-blocking access to what the child process is producing. While you get more control with sarge in some ways, you have to do more work in other ways - e.g. dealing with cases when there's no output from the child. This is of course not a true async solution - we'll have to wait for Tulip for that - but it does provide better control than subprocess in some scenarios (another example - you might need to look at both stdout and stderr from the child concurrently and take action based on what you find).

  4. Vinay Sajip repo owner

    I fixed a bug in the Capture code. The following script:

    from io import TextIOWrapper
    import sys
    
    from sarge import Capture, run
    
    cap = Capture(buffer_size=-1, timeout=10)
    p = run('%s yada.py -t 5 foo' % sys.executable, async=True, stdout=cap)
    try:
        for i, line in enumerate(TextIOWrapper(p.stdout)):
            print(i, line.strip(), flush=True)
    except:
        print(">>> Error encountered")
        raise
    

    now seems to work as expected (with commit 0b0055a). Can you confirm that it works for you, and if so, can we close this issue?

  5. Log in to comment