1. windwiny
  2. winpexpect

Commits

geertj  committed af7db14

I had to completely change the architecture as Windows does not allow you to
close all file descriptions *except* the redirected stdin/stdout/stderr of the
child. This means file descriptors were leaking into children, which with long
lived children in a web application gives problems. Now the child is executed
indirectly via a stub program.

  • Participants
  • Parent commits 57a9c1b
  • Branches default

Comments (0)

Files changed (1)

File lib/winpexpect.py

View file
 # file "AUTHORS" for a complete overview.
 
 import os
+import sys
 import pywintypes
 import itertools
+import random
 
 from Queue import Queue, Empty
 from threading import Thread
 
-from pexpect import (spawn, which, ExceptionPexpect, EOF, TIMEOUT)
+from pexpect import spawn, which, ExceptionPexpect, EOF, TIMEOUT
 from subprocess import list2cmdline
 
 from msvcrt import open_osfhandle
-from win32api import SetHandleInformation, GetCurrentProcess
+from win32api import (SetHandleInformation, GetCurrentProcess, OpenProcess,
+                      CloseHandle)
+from win32pipe import CreateNamedPipe, ConnectNamedPipe
+from win32process import (STARTUPINFO, CreateProcess, CreateProcessAsUser,
+			  GetExitCodeProcess, TerminateProcess, ExitProcess)
+from win32event import WaitForSingleObject, INFINITE
+from win32security import LogonUser
+from win32file import CreateFile, ReadFile, WriteFile
+
 from win32con import (HANDLE_FLAG_INHERIT, STARTF_USESTDHANDLES,
-                      STARTF_USESHOWWINDOW, CREATE_NEW_CONSOLE, SW_HIDE)
-from win32pipe import CreatePipe
-from win32process import (STARTUPINFO, CreateProcess, CreateProcessAsUser,
-			  GetExitCodeProcess, TerminateProcess)
-from win32event import (WaitForSingleObject, INFINITE, WAIT_OBJECT_0,
-                        WAIT_TIMEOUT)
-from win32security import (LogonUser, OpenProcessToken, LOGON32_LOGON_BATCH,
-                           LOGON32_PROVIDER_DEFAULT, TOKEN_ALL_ACCESS)
+                      STARTF_USESHOWWINDOW, CREATE_NEW_CONSOLE, SW_HIDE,
+                      PIPE_ACCESS_DUPLEX, WAIT_OBJECT_0,
+                      WAIT_TIMEOUT, LOGON32_PROVIDER_DEFAULT,
+                      LOGON32_LOGON_BATCH, TOKEN_ALL_ACCESS, GENERIC_READ,
+                      GENERIC_WRITE, OPEN_EXISTING, PROCESS_ALL_ACCESS)
+from winerror import ERROR_PIPE_BUSY, ERROR_HANDLE_EOF, ERROR_BROKEN_PIPE
 from pywintypes import error as WindowsError
 
 
 join_command_line = list2cmdline
 
 
+def _read_header(handle, bufsize=4096):
+    """INTERNAL: read a stub header from a handle."""
+    header = ''
+    while '\n\n' not in header:
+        err, data = ReadFile(handle, bufsize)
+        header += data
+    return header
+
+
+def _parse_header(header):
+    """INTERNAL: pass the stub header format."""
+    parsed = {}
+    lines = header.split('\n')
+    for line in lines:
+        if not line:
+            break
+        p1 = line.find('=')
+        if p1 == -1:
+            if line.startswith(' '):  # Continuation
+                if key is None:
+                    raise ValueError, 'Continuation on first line.'
+                input[key] += '\n' + line[1:]
+            else:
+                raise ValueError, 'Expecting key=value format'
+        key = line[:p1]
+        parsed[key] = line[p1+1:]
+    return parsed
+
+
+def _quote_header(s):
+    """INTENAL: quote a string to be used in a stub header."""
+    return s.replace('\n', '\n ')
+
+
+def _stub(cmd_name, stdin_name, stdout_name, stderr_name):
+    """INTERNAL: Stub process that will start up the child process."""
+    # Open the 4 pipes (command, stdin, stdout, stderr)
+    cmd_pipe = CreateFile(cmd_name, GENERIC_READ|GENERIC_WRITE, 0, None,
+                          OPEN_EXISTING, 0, None)
+    SetHandleInformation(cmd_pipe, HANDLE_FLAG_INHERIT, 1)
+    stdin_pipe = CreateFile(stdin_name, GENERIC_READ, 0, None,
+                            OPEN_EXISTING, 0, None)
+    SetHandleInformation(stdin_pipe, HANDLE_FLAG_INHERIT, 1)
+    stdout_pipe = CreateFile(stdout_name, GENERIC_WRITE, 0, None,
+                             OPEN_EXISTING, 0, None)
+    SetHandleInformation(stdout_pipe, HANDLE_FLAG_INHERIT, 1)
+    stderr_pipe = CreateFile(stderr_name, GENERIC_WRITE, 0, None,
+                             OPEN_EXISTING, 0, None)
+    SetHandleInformation(stderr_pipe, HANDLE_FLAG_INHERIT, 1)
+
+    # Learn what we need to do..
+    header = _read_header(cmd_pipe)
+    input = _parse_header(header)
+    if 'command' not in input or 'args' not in input:
+        ExitProcess(2)
+
+    # http://msdn.microsoft.com/en-us/library/ms682499(VS.85).aspx
+    startupinfo = STARTUPINFO()
+    startupinfo.dwFlags |= STARTF_USESTDHANDLES
+    startupinfo.hStdInput = stdin_pipe
+    startupinfo.hStdOutput = stdout_pipe
+    startupinfo.hStdError = stderr_pipe
+
+    try:
+        res = CreateProcess(input['command'], input['args'], None, None,
+                            True, 0, os.environ, os.getcwd(), startupinfo)
+    except WindowsError, e:
+        message = _quote_header(str(e))
+        WriteFile(cmd_pipe, 'status=error\nmessage=%s\n\n' % message)
+        ExitProcess(3)
+    else:
+        pid = res[2]
+
+    # Pass back results and exit
+    err, nbytes = WriteFile(cmd_pipe, 'status=ok\npid=%s\n\n' % pid)
+    ExitProcess(0)
+
+
 class ChunkBuffer(object):
     """A buffer that allows a chunk of data to be read in smaller reads."""
 
 
 
 class winspawn(spawn):
-    """A version of pexpect.spawn that works on Windows.
+    """A version of pexpect.spawn for the Windows platform. """
 
-    I/O works completely different in this version when compared to the Posix
-    version of spawn. The difference is caused by the fact that it's not
-    possible on Windows to select() on a file descriptor that corresponds to a
-    file or a pipe. Therefore, to do non-blocking I/O, we need to use threads.
-    One thread is used to read the child's standard output, and another thread
-    reads standard error. Those results are passed down to the main thread via
-    a Queue.Queue.
+    # I/O works completely different in this version when compared to the
+    # Posix version of spawn.
+    #
+    # The first difference is that it's not possible on Windows to select()
+    # on a file descriptor that corresponds to a file or a pipe. Therefore,
+    # to do non-blocking I/O, we need to use threads. We use one thread to
+    # read the output of our child.
+    #
+    # The second difference is that there is no fork()/exec() on Windows
+    # but just a CreateProcess(). There is no way to close file all file
+    # descriptors /except/ the redirected stdin/out/err. So we indirectly
+    # execute our child via a stub for which we pass a flag to
+    # CreateProcess() to close all file descriptors. The stub communicates
+    # back to us via a named pipe.
+    # 
+    # Finally, Windows does not have ptys. It does have the concept of a
+    # "Console" though but it's much less sophisticated. This code runs the
+    # child in a new console by passing the flag CREATE_NEW_CONSOLE to
+    # CreateProcess(). We create a new console for our child because this
+    # way it cannot interfere with the current console, and it is also
+    # possible to run the main program without a console (e.g. a Windows
+    # service).
 
-    Windows also does not have ptys. It does have the concept of a "Console"
-    though but it's much less sophisticated. This code runs the child in a new
-    console by passing the flag CREATE_NEW_CONSOLE to CreateProcess(). The
-    advantage of this is the child not interfere with the current console, and
-    it is also possible to run the main program without a console (e.g. a
-    Windows service).
-    """
+    pipe_buffer = 4096
+    pipe_template = r'\\.\pipe\winpexpect-%06d'
 
     def __init__(self, command, args=[], timeout=30, maxread=2000,
                  searchwindowsize=None, logfile=None, cwd=None, env=None,
         self.child_handle = None
         self.child_output = Queue()
         self.chunk_buffer = ChunkBuffer()
-        self.stdout_fd = None
+        self.stdout_handle = None
         self.stdout_eof = False
         self.stdout_reader = None
-        self.stderr_fd = None
+        self.stderr_handle = None
         self.stderr_eof = False
         self.stderr_reader = None
         super(winspawn, self).__init__(command, args, timeout=timeout,
                 logfile=logfile, cwd=cwd, env=env)
 
     def __del__(self):
-        self.terminate()
+        try:
+            self.terminate()
+        except WindowsError:
+            pass
+
+    def _create_pipe(self):
+        """INTERNAL: create a named pipe."""
+        for i in range(100):
+            name = self.pipe_template % random.randint(0, 999999)
+            try:
+                pipe = CreateNamedPipe(name, PIPE_ACCESS_DUPLEX,
+                                       0, 1, 1, 1, 100000, None)
+                SetHandleInformation(pipe, HANDLE_FLAG_INHERIT, 0)
+            except WindowsError, e:
+                if e.winerror != ERROR_PIPE_BUSY:
+                    raise
+            else:
+                return pipe, name
+        raise ExceptionPexpect, 'Could not create pipe after 100 attempts.'
 
     def _spawn(self, command, args=None):
         """Start the child process. If args is empty, command will be parsed
-        (split on spaces) and args will be set to the parsed args."""
-        # The approach is documented here:
-        # http://msdn.microsoft.com/en-us/library/ms682499(VS.85).aspx
+        according to the rules of the MS C runtime, and args will be set to
+        the parsed args."""
         if args:
-            self.args = args[:]  # copy
-            self.args.insert(0, command)
-            self.command = command
+            args = args[:]  # copy
+            args.insert(0, command)
         else:
-            self.args = split_command_line(command)
-            self.command = self.args[0]
-        executable = which(self.command)
-        if executable is None:
+            args = split_command_line(command)
+            command = args[0]
+
+        self.command = command
+        self.args = args
+        command = which(self.command)
+        if command is None:
             raise ExceptionPexpect, 'Command not found: %s' % self.command
         args = join_command_line(self.args)
 
         # Create the pipes
-        stdin_read, stdin_write = CreatePipe(None, 0)
-        SetHandleInformation(stdin_read, HANDLE_FLAG_INHERIT, 1)
-        stdout_read, stdout_write = CreatePipe(None, 0)
-        SetHandleInformation(stdout_write, HANDLE_FLAG_INHERIT, 1)
-        stderr_read, stderr_write = CreatePipe(None, 0)
-        SetHandleInformation(stderr_write, HANDLE_FLAG_INHERIT, 1)
+        cmd_pipe, cmd_name = self._create_pipe()
+        stdin_pipe, stdin_name = self._create_pipe()
+        stdout_pipe, stdout_name = self._create_pipe()
+        stderr_pipe, stderr_name = self._create_pipe()
 
-        # The standard io streams are redirected by passing a STARTUPINFO
-        # structure to CreateProcess. We also create a new console and hide
-        # it. This allows us to run e.g. as a service.
         startupinfo = STARTUPINFO()
-        startupinfo.dwFlags |= STARTF_USESTDHANDLES | STARTF_USESHOWWINDOW
-        startupinfo.hStdInput = stdin_read
-        startupinfo.hStdOutput = stdout_write
-        startupinfo.hStdError = stderr_write
+        startupinfo.dwFlags |= STARTF_USESHOWWINDOW
         startupinfo.wShowWindow = SW_HIDE
 
+        python = os.path.join(sys.exec_prefix, 'python.exe')
+        pycmd = 'import winpexpect; winpexpect._stub(r"%s", r"%s", r"%s", r"%s")' \
+                    % (cmd_name, stdin_name, stdout_name, stderr_name)
+        pyargs = join_command_line([python, '-c', pycmd])
+
         # Create a new token or run as the current process.
         if self.username and self.password:
             token = LogonUser(self.username, self.domain, self.password,
                               LOGON32_LOGON_BATCH, LOGON32_PROVIDER_DEFAULT)
-            hp, ht, pid, tid = CreateProcessAsUser(token, executable, args,
-                                    None, None, True, CREATE_NEW_CONSOLE,
-                                    self.env, self.cwd, startupinfo)
+            res = CreateProcessAsUser(token, python, pyargs, None, None,
+                                      False, CREATE_NEW_CONSOLE, self.env,
+                                      self.cwd, startupinfo)
         else:
-            hp, ht, pid, tid = CreateProcess(executable, args, None, None,
-				    True, CREATE_NEW_CONSOLE, self.env,
-				    self.cwd, startupinfo)
+            res = CreateProcess(python, pyargs, None, None, False,
+                                0, self.env, self.cwd,
+                                startupinfo)
+        child_handle = res[0]
+        res[1].Close()  # don't need thread handle
 
-        self.child_handle = hp
-        self.pid = pid
-        ht.Close()
+        ConnectNamedPipe(cmd_pipe)
+        ConnectNamedPipe(stdin_pipe)
+        ConnectNamedPipe(stdout_pipe)
+        ConnectNamedPipe(stderr_pipe)
 
-        # Close parent copy of child handles
-        stdin_read.Close()
-        stdout_write.Close()
-        stderr_write.Close()
+        # Tell the stub what to do and wait for it to exit
+        WriteFile(cmd_pipe, 'command=%s\nargs=%s\n\n' % (command, args))
+        header = _read_header(cmd_pipe)
+        output = _parse_header(header)
+        if output['status'] != 'ok':
+            m = 'Child did not start up correctly. '
+            m += output.get('message', '')
+            raise ExceptionPexpect, m
+        self.pid = int(output['pid'])
+        self.child_handle = OpenProcess(PROCESS_ALL_ACCESS, False, self.pid)
+        WaitForSingleObject(child_handle, INFINITE)
 
-        # Create file descriptors and start up reader threads
-        self.child_fd = open_osfhandle(stdin_write.Detach(), 0)
-        self.stdout_fd = open_osfhandle(stdout_read.Detach(), 0)
+        # Start up the I/O threads
+        self.child_fd = open_osfhandle(stdin_pipe.Detach(), 0)  # for pexpect
+        self.stdout_handle = stdout_pipe
         self.stdout_reader = Thread(target=self._child_reader,
-                                    args=(self.stdout_fd,))
+                                    args=(self.stdout_handle,))
         self.stdout_reader.start()
-        self.stderr_fd = open_osfhandle(stderr_read.Detach(), 0)
+        self.stderr_handle = stderr_pipe
         self.stderr_reader = Thread(target=self._child_reader,
-                                    args=(self.stderr_fd,))
+                                    args=(self.stderr_handle,))
         self.stderr_reader.start()
         self.terminated = False
         self.closed = False
         if self.closed:
             return
         os.close(self.child_fd)
-        os.close(self.stdout_fd)
-        os.close(self.stderr_fd)
+        CloseHandle(self.stdout_handle)
+        CloseHandle(self.stderr_handle)
         # File descriptors are closed, nothing can be added to the queue
         # anymore. Empty it in case a thread was blocked on put().
         while self.child_output.qsize():
         self.closed = True
 
     def wait(self, timeout=None):
-        """Wait until the child exits. If timeout is not specified this blocks
-        indefinately. Otherwise, timeout specifies the number of seconds to wait."""
+        """Wait until the child exits. If timeout is not specified this
+        blocks indefinately. Otherwise, timeout specifies the number of
+        seconds to wait."""
         if self.exitstatus is not None:
             return
         if timeout is None:
         """Send a signal to the child (not available on Windows)."""
         raise ExceptionPexpect, 'Signals are not availalbe on Windows'
 
-    def _child_reader(self, fd):
-        """INTERNAL: Reader thread that reads stdout/stderr of the child process."""
+    def _child_reader(self, handle):
+        """INTERNAL: Reader thread that reads stdout/stderr of the child
+        process."""
         status = 'data'
         while True:
             try:
-                data = os.read(fd, self.maxread)
-                if data == '':
+                err, data = ReadFile(handle, self.maxread)
+                assert err == 0  # not expecting error w/o overlapped io
+            except WindowsError, e:
+                if e.winerror == ERROR_BROKEN_PIPE:
                     status = 'eof'
-            except OSError, e:
-                data = e.errno
-                status = 'error'
-            self.child_output.put((fd, status, data))
+                    data = ''
+                else:
+                    status = 'error'
+                    data = e.winerror
+            self.child_output.put((handle, status, data))
             if status != 'data':
                 break
 
-    def _set_eof(self, fd):
-        """INTERNAL: mark a file descriptor as end-of-file."""
-        if fd == self.stdout_fd:
+    def _set_eof(self, handle):
+        """INTERNAL: mark a file handle as end-of-file."""
+        if fd == self.stdout_handle:
             self.stdout_eof = True
-        elif fd == self.stderr_fd:
+        elif fd == self.stderr_handle:
             self.stderr_eof = True
 
     def read_nonblocking(self, size=1, timeout=-1):
-        """Non blocking read."""
+        """INTERNAL: Non blocking read."""
         if len(self.chunk_buffer):
             return self.chunk_buffer.read(size)
         if self.stdout_eof and self.stderr_eof:
         if timeout == -1:
             timeout = self.timeout
         try:    
-            fd, status, data = self.child_output.get(timeout=timeout)
+            handle, status, data = self.child_output.get(timeout=timeout)
         except Empty:
             raise TIMEOUT, 'Timeout exceeded in read_nonblocking().'
         if status == 'data':
             self.chunk_buffer.add(data)
         elif status == 'eof':
-            self._set_eof(fd)
+            self._set_eof(handle)
             raise EOF, 'End of file in read_nonblocking().'
         elif status == 'error':
-            self._set_eof(fd)
+            self._set_eof(handle)
             raise OSError, data
         return self.chunk_buffer.read(size)