-
assigned issue to
Child process stdout seems to be getting closed unexpectedly
I wanted to use sarge to read process output line by line. As a small test case I created the attached files, yada.py and yadatest.py. The yada.py script writes its argument repeatedly, with a user-specified delay between each write. Standard output is flushed after each write. Used at the command line, this works as expected.
The yadatest script exercises yada.py via sarge, capturing stdout and displaying each line as it is received. Again, stdout is flushed after each write.
The yadatest script runs for a while (about 5 seconds) with no output, and then fails with the following error:
>python .\yadatest.py
Traceback (most recent call last):
File "yada.py", line 11, in <module>
print(args.text, flush=True)
OSError: [Errno 22] Invalid argument
Exception OSError: OSError(22, 'Invalid argument') in <_io.TextIOWrapper name='<stdout>' mode='w' encoding='cp1252'> ignored
0 foo
Did something close my child process' stdout? Note that there is no "Error encountered" message, implying that the failure was in the child, not in the parent.
Comments (6)
-
reporter -
repo owner The iterator doesn't give you enough control over e.g. timeouts - I'll probably need to rethink this part of the API. I'm guessing that the child process terminated because of a broken pipe error - you could confirm this by adding logging to it ;-) The following slightly modified
yadatest.py
, annotated to show why I made changes, behaves as expected with an unchangedyada.py
:import sys from sarge import Capture, run from io import TextIOWrapper # If we don't indicate a buffer size for capturing, it'll default to 4096. # Using -1 means line-buffering in the client, in the background thread # which reads the child's output. c = Capture(buffer_size=-1) # capture_stdout() is the same as run() with a capture passed in stdout. # You need to specify async=True so that run doesn't wait for the # command to finish. p = run('%s yada.py -t 5 foo' % sys.executable, stdout=c, async=True) try: i = 0 while True: # We don't use the iterator because we want to control timeouts. The # timeout is set higher than what yada outputs at - if you set it # smaller, you have to handle empty reads (because there was nothing # more from the child process to read, before the timeout expired). line = c.readline(timeout=10) print(i, line) i += 1 except: print(">>> Error encountered") raise
When this is run, it outputs
0 b'foo\n' 1 b'foo\n' 2 b'foo\n' 3 b'foo\n' ...
at 5-second intervals.
By the way, when looking into this I encountered a type error in an assignment (which is fixed in 92eed70, but not related to this issue). With that version of
sarge
, and a timeout of 2 instead of inyadatest.py
, you get output like0 b'foo\n' 1 b'' 2 b'foo\n' 3 b'' 4 b'foo\n' 5 b'' 6 b'foo\n' 7 b'' 8 b'foo\n'
with a line being produced every 2 seconds.
BTW the distribution contains a file,
lister.py
, which sort of does whatyada.py
does (but can also print successive lines from a file if specified) - this is used in the test for theexpect
functionality. -
reporter Thanks for the clarification. I guess the key thing to take from this is that sarge doesn't (currently) really support this scenario "out of the box". That's fine, I was mainly evaluating its suitability for something I'm looking at and "not suitable yet" is a perfectly good answer :-)
I'm not 100% sure I understand the need for timeouts, though. The corresponding test harness using subprocess works fine:
from subprocess import Popen, PIPE from io import TextIOWrapper p = Popen(['py', 'yada.py', '-t', '5', 'foo'], stdout=PIPE) try: for i, line in enumerate(TextIOWrapper(p.stdout)): print(i, line, flush=True) except: print(">>> Error encountered") raise
Maybe I'm missing something about why a sarge process has to be a bit more complex than this.
-
repo owner Maybe I'm missing something about why a sarge process has to be a bit more complex than this.
In the example you give, you're happy to wait, perhaps indefinitely, until the child process produces some output, and that's fine in this simple scenario. There are scenarios where you may not be happy to wait forever (perhaps the child process has hung, and needs restarting, or you need to get an answer from somewhere else) and that level of control is missing from
subprocess
- you have no non-blocking access to what the child process is producing. While you get more control withsarge
in some ways, you have to do more work in other ways - e.g. dealing with cases when there's no output from the child. This is of course not a true async solution - we'll have to wait for Tulip for that - but it does provide better control thansubprocess
in some scenarios (another example - you might need to look at bothstdout
andstderr
from the child concurrently and take action based on what you find). -
repo owner I fixed a bug in the Capture code. The following script:
from io import TextIOWrapper import sys from sarge import Capture, run cap = Capture(buffer_size=-1, timeout=10) p = run('%s yada.py -t 5 foo' % sys.executable, async=True, stdout=cap) try: for i, line in enumerate(TextIOWrapper(p.stdout)): print(i, line.strip(), flush=True) except: print(">>> Error encountered") raise
now seems to work as expected (with commit 0b0055a). Can you confirm that it works for you, and if so, can we close this issue?
-
reporter - changed status to resolved
Yes, that seems to work for me. Thanks.
- Log in to comment