Commits

George Sakkis committed ad5244f

add patch_subprocess() function and 'subprocess' argument to patch_all().
Adapted from gevent-subprocess (https://bitbucket.org/eriks5/gevent-subprocess/)

Comments (0)

Files changed (2)

     httplib.HTTPSConnection = HTTPSConnection
 
 
-def patch_all(socket=True, dns=True, time=True, select=True, thread=True, os=True, ssl=True, httplib=False, aggressive=True):
+def patch_subprocess():
+    from gevent.subprocess import Popen
+    _subprocess = __import__('subprocess')
+    _subprocess.Popen = Popen
+
+
+def patch_all(socket=True, dns=True, time=True, select=True, thread=True, os=True,
+              ssl=True, httplib=False, aggressive=True, subprocess=True):
     """Do all of the default monkey patching (calls every other function in this module."""
     # order is important
     if os:
         patch_ssl()
     if httplib:
         patch_httplib()
+    if subprocess:
+        patch_subprocess()
 
 
 if __name__ == '__main__':
         import pprint
         import os
         print 'gevent.monkey.patch_all(%s)' % ', '.join('%s=%s' % item for item in args.items())
-        print 'sys.version=%s' % (sys.version.strip().replace('\n', ' '), )
+        print 'sys.version=%s' % (sys.version.strip().replace('\n', ' '),)
         print 'sys.path=%s' % pprint.pformat(sys.path)
         print 'sys.modules=%s' % pprint.pformat(sorted(sys.modules.keys()))
         print 'cwd=%s' % os.getcwd()

gevent/subprocess.py

+import errno, fcntl, os, sys
+
+import gevent
+from gevent.event import Event
+from gevent.queue import Queue
+from gevent.socket import wait_read, wait_write
+
+
+class PipeClosed(Exception):
+    """Exception raised by :func:`Pipe.get` and :func:`Pipe.put` indicating
+       the :class:`Pipe` was closed
+    """
+    pass
+
+
+class Pipe(Queue):
+    """A :class:`Pipe` is a :class:`gevent.queue.Queue` that can be closed."""
+
+    def __init__(self, maxsize=None):
+        Queue.__init__(self, maxsize)
+        self._closed = Event()
+        self._event_cancel = None
+
+    def close(self):
+        """Closes the pipe"""
+        self._closed.set()
+
+        # Raise PipeClosed on all waiting getters and putters
+        if self.getters or self.putters:
+            self._schedule_unlock()
+
+    def closed(self):
+        """Returns ``True`` when the pipe has been closed.
+        (There might still be items available though)
+        """
+        return self._closed.is_set()
+
+    def finished(self):
+        """Returns ``True`` when the pipe has been closed and is empty."""
+        return self.empty() and self.closed()
+
+    def wait(self, timeout=None):
+        """ Wait until the pipe is closed
+        """
+        self._closed.wait(timeout)
+
+    def waiting(self):
+        return len(self.getters) > 0 or len(self.putters) > 0
+
+    def put(self, item, block=True, timeout=None):
+        """Put an item into the pipe.
+
+        If optional arg *block* is true and *timeout* is ``None`` (the default),
+        block if necessary until a free slot is available. If *timeout* is
+        a positive number, it blocks at most *timeout* seconds and raises
+        the :class:`Full` exception if no free slot was available within that time.
+        Otherwise (*block* is false), put an item on the pipe if a free slot
+        is immediately available, else raise the :class:`Full` exception (*timeout*
+        is ignored in that case).
+
+        :raises: :class:`PipeClosed` if the pipe is closed
+        """
+        if self.closed():
+            raise PipeClosed
+
+        Queue.put(self, item, block, timeout)
+
+    def get(self, block=True, timeout=None):
+        """Remove and return an item from the pipe.
+
+        If optional args *block* is true and *timeout* is ``None`` (the default),
+        block if necessary until an item is available. If *timeout* is a positive number,
+        it blocks at most *timeout* seconds and raises the :class:`Empty` exception
+        if no item was available within that time. Otherwise (*block* is false), return
+        an item if one is immediately available, else raise the :class:`Empty` exception
+        (*timeout* is ignored in that case).
+
+        :raises: :class:`PipeClosed` if the pipe is closed
+        """
+        if self.finished():
+            raise PipeClosed
+
+        return Queue.get(self, block, timeout)
+
+    def _unlock(self):
+        #if self.finished():
+        if self.closed():
+            while self.getters:
+                getter = self.getters.pop()
+                if getter:
+                    getter.throw(PipeClosed)
+
+            while self.putters:
+                putter = self.putters.pop()
+                if putter:
+                    putter.throw(PipeClosed)
+
+        Queue._unlock(self)
+
+    def next(self):
+        """Iterate over the items in the pipe, until the pipe is empty and closed."""
+        try:
+            return self.get()
+
+        except PipeClosed:
+            raise StopIteration
+
+def pipe_to_file(pipe, file):
+    """Copy items received from *pipe* to *file*"""
+    if file.closed:
+        return
+
+    fcntl.fcntl(file, fcntl.F_SETFL, os.O_NONBLOCK)
+
+    fno = file.fileno()
+
+    try:
+        wait_write(fno)
+    except IOError, ex:
+        if ex[0] != errno.EPERM:
+            raise
+
+        sys.exc_clear()
+        use_wait = False
+    else:
+        use_wait = True
+
+    for chunk in pipe:
+        while chunk:
+            try:
+                written = os.write(fno, chunk)
+                chunk = chunk[written:]
+
+            except IOError, ex:
+                if ex[0] != errno.EAGAIN:
+                    raise
+
+                sys.exc_clear()
+
+            except OSError, ex:
+                if not file.closed:
+                    raise
+
+                sys.exc_clear()
+                pipe.close()
+                return
+
+            if use_wait:
+                wait_write(fno)
+            else:
+                gevent.sleep(0)
+
+    file.close()
+
+def file_to_pipe(file, pipe, chunksize= -1):
+    """Copy contents of *file* to *pipe*. *chunksize* is passed on to file.read()
+    """
+    if file.closed:
+        pipe.close()
+        return
+
+    fcntl.fcntl(file, fcntl.F_SETFL, os.O_NONBLOCK)
+
+    fno = file.fileno()
+    use_wait = True
+
+    while True:
+        try:
+            chunk = file.read(chunksize)
+            if not chunk:
+                break
+            pipe.put(chunk)
+
+        except IOError, ex:
+            if ex[0] != errno.EAGAIN:
+                raise
+
+            sys.exc_clear()
+
+        try:
+            if use_wait:
+                wait_read(fno)
+        except IOError, ex:
+            if ex[0] != errno.EPERM:
+                raise
+
+            sys.exc_clear()
+            use_wait = False
+
+        if not use_wait:
+            gevent.sleep(0)
+
+    file.close()
+    pipe.close()
+
+
+class Popen(__import__('subprocess').Popen):
+    """Like :class:`subprocess.Popen`, but communicates with child using
+       :class:`gevsubprocess.pipe.Pipe` objects instead of file objects
+    """
+    def __init__(self, args, **kwds):
+        super(Popen, self).__init__(args, **kwds)
+
+        # Replace self.std(in|out|err) with pipes
+        self._greenlets = []
+
+        for name in ('stdin', 'stdout', 'stderr'):
+            file = getattr(self, name)
+            if file:
+                pipe = Pipe()
+                setattr(self, name, pipe)
+
+                if name == 'stdin':
+                    gl = gevent.spawn(pipe_to_file, pipe, file)
+                else:
+                    gl = gevent.spawn(file_to_pipe, file, pipe)
+
+                self._greenlets.append(gl)
+
+    def communicate(self, input=None):
+        if self.stdin:
+            if input:
+                self.stdin.put(input)
+            self.stdin.close()
+
+        if self.stdout:
+            self.stdout.wait()
+            stdout = ''.join(line for line in self.stdout)
+        else:
+            stdout = None
+
+        if self.stderr:
+            self.stderr.wait()
+            stderr = ''.join(line for line in self.stderr)
+        else:
+            stderr = None
+
+        self.wait()
+        return (stdout, stderr)
+
+    def wait(self, check_interval=0.1):
+        # non-blocking, use hub.sleep
+        try:
+            while True:
+                status = self.poll()
+                if status is not None:
+                    return status
+                gevent.hub.sleep(check_interval)
+
+        except OSError, e:
+            if e.errno == errno.ECHILD:
+                # no child process, this happens if the child process
+                # already died and has been cleaned up
+                return -1
+
+        finally:
+            if self._greenlets:
+                if sys.exc_info()[0] is None:
+                    gevent.joinall(self._greenlets)
+                else:
+                    gevent.killall(self._greenlets)
+
+                self._greenlets = None
+