Commits

sbt committed 5c48263 Draft

Factor out different start methods

Comments (0)

Files changed (6)

Lib/multiprocessing/forking.py

 from pickle import load, HIGHEST_PROTOCOL
 from multiprocessing import util, process
 
-__all__ = ['Popen', 'assert_spawning', 'duplicate', 'close', 'ForkingPickler']
+__all__ = ['Popen', 'assert_spawning', 'ForkingPickler']
 
 #
 # Choose method to use
 #
 #
 
+if sys.platform == 'win32':
+    from .popen_spawn_win32 import PopenSpawn
+else:
+    from .popen_fork import PopenFork
+    from .popen_spawn_posix import PopenSpawn
+    from .popen_forkserver import PopenForkServer
+
 def Popen(process_obj):
     if _start_method == 'fork':
         return PopenFork(process_obj)
 def dump(obj, file, protocol=None):
     ForkingPickler(file, protocol).dump(obj)
 
-#
-# Unix
-#
-
-if sys.platform != 'win32':
-    import _posixsubprocess
-    WINEXE = False
-    WINSERVICE = False
-
-    duplicate = os.dup
-    close = os.close
-
-    #
-    # We define a Popen class similar to the one from subprocess, but
-    # whose constructor takes a process object as its argument.
-    #
-
-    class PopenFork(object):
-
-        def __init__(self, process_obj):
-            sys.stdout.flush()
-            sys.stderr.flush()
-            self.returncode = None
-            self._launch(process_obj)
-
-        def duplicate_for_child(self, fd):
-            self._fds.append(fd)
-            return fd
-
-        def poll(self, flag=os.WNOHANG):
-            if self.returncode is None:
-                try:
-                    pid, sts = os.waitpid(self.pid, flag)
-                except os.error:
-                    # Child process not yet created. See #1731717
-                    # e.errno == errno.ECHILD == 10
-                    return None
-                if pid == self.pid:
-                    if os.WIFSIGNALED(sts):
-                        self.returncode = -os.WTERMSIG(sts)
-                    else:
-                        assert os.WIFEXITED(sts)
-                        self.returncode = os.WEXITSTATUS(sts)
-            return self.returncode
-
-        def wait(self, timeout=None):
-            if self.returncode is None:
-                if timeout is not None:
-                    from .connection import wait
-                    if not wait([self.sentinel], timeout):
-                        return None
-                # This shouldn't block if wait() returned successfully.
-                return self.poll(os.WNOHANG if timeout == 0.0 else 0)
-            return self.returncode
-
-        def terminate(self):
-            if self.returncode is None:
-                try:
-                    os.kill(self.pid, signal.SIGTERM)
-                except ProcessLookupError:
-                    pass
-                except OSError:
-                    if self.wait(timeout=0.1) is None:
-                        raise
-
-        def duplicate_for_child(self, fd):
-            return fd
-
-        def _launch(self, process_obj):
-            code = 1
-            parent_r, child_w = os.pipe()
-            self.pid = os.fork()
-            if self.pid == 0:
-                try:
-                    os.close(parent_r)
-                    if 'random' in sys.modules:
-                        import random
-                        random.seed()
-                    code = process_obj._bootstrap()
-                finally:
-                    os._exit(code)
-            else:
-                os.close(child_w)
-                util.Finalize(self, os.close, (parent_r,))
-                self.sentinel = parent_r
-
-    def spawn(cmd, executable, fds):
-        # Start a process with only specified fds kept open
-        fds = sorted(fds)
-        errpipe_read, errpipe_write = _posixsubprocess.cloexec_pipe()
-        try:
-            return _posixsubprocess.fork_exec(
-                cmd, [os.fsencode(executable)], True, fds, None, None,
-                -1, -1, -1, -1, -1, -1, errpipe_read, errpipe_write,
-                False, False, None)
-        finally:
-            os.close(errpipe_read)
-            os.close(errpipe_write)
-
-    class PopenSpawn(PopenFork):
-        def __init__(self, process_obj):
-            self._fds = []
-            PopenFork.__init__(self, process_obj)
-
-        def duplicate_for_child(self, fd):
-            self._fds.append(fd)
-            return fd
-
-        def _launch(self, process_obj):
-            from . import helper
-            self._fds.append(helper.cleanup_helper.getfd())
-
-            prep_data = get_preparation_data(process_obj._name, False)
-            fp = io.BytesIO()
-            _tls.spawning_popen = self
-            try:
-                dump(prep_data, fp, HIGHEST_PROTOCOL)
-                dump(process_obj, fp, HIGHEST_PROTOCOL)
-            finally:
-                _tls.spawning_popen = None
-
-            parent_r = child_w = child_r = parent_w = None
-            try:
-                parent_r, child_w = os.pipe()
-                child_r, parent_w = os.pipe()
-                cmd = get_command_line() + [str(child_r)]
-                self._fds.extend([child_r, child_w])
-                self.pid = spawn(cmd, sys.executable, self._fds)
-                self.sentinel = parent_r
-                with open(parent_w, 'wb', closefd=False) as f:
-                    f.write(fp.getbuffer())
-            finally:
-                if parent_r is not None:
-                    util.Finalize(self, os.close, (parent_r,))
-                for fd in (child_r, child_w, parent_w):
-                    if fd is not None:
-                        os.close(fd)
-
-    class PopenForkServer(PopenFork):
-
-        def __init__(self, process_obj):
-            self._fds = []
-            PopenFork.__init__(self, process_obj)
-
-        def duplicate_for_child(self, fd):
-            self._fds.append(fd)
-            return len(self._fds) - 1
-
-        def _launch(self, process_obj):
-            from .helper import forkserver_helper
-            prep_data = get_preparation_data(process_obj._name, True)
-
-            buf = io.BytesIO()
-            _tls.spawning_popen = self
-            try:
-                dump(prep_data, buf, HIGHEST_PROTOCOL)
-                dump(process_obj, buf, HIGHEST_PROTOCOL)
-            finally:
-                del _tls.spawning_popen
-
-            self.sentinel, w = forkserver_helper.prepare_new_process(self._fds)
-            util.Finalize(self, os.close, (self.sentinel,))
-            with open(w, 'wb', True) as f:
-                f.write(buf.getbuffer())
-            self.pid = forkserver_helper.read_ulong(self.sentinel)
-
-        def poll(self, flag=os.WNOHANG):
-            if self.returncode is None:
-                from .connection import wait
-                from .helper import forkserver_helper
-                timeout = 0 if flag == os.WNOHANG else None
-                if not wait([self.sentinel], timeout):
-                    return None
-                try:
-                    self.returncode = forkserver_helper.read_ulong(
-                        self.sentinel)
-                except (OSError, ValueError):
-                    # The process ended abnormally perhaps because of a signal
-                    self.returncode = 255
-            return self.returncode
-
-#
-# Windows
-#
-
-else:
-    import msvcrt
-    import _winapi
-
-    #
-    #
-    #
-
-    TERMINATE = 0x10000
-    WINEXE = (sys.platform == 'win32' and getattr(sys, 'frozen', False))
-    WINSERVICE = sys.executable.lower().endswith("pythonservice.exe")
-
-    close = _winapi.CloseHandle
-
-    #
-    #
-    #
-
-    def duplicate(handle, target_process=None, inheritable=False):
-        if target_process is None:
-            target_process = _winapi.GetCurrentProcess()
-        return _winapi.DuplicateHandle(
-            _winapi.GetCurrentProcess(), handle, target_process,
-            0, inheritable, _winapi.DUPLICATE_SAME_ACCESS
-            )
-
-    #
-    # We define a Popen class similar to the one from subprocess, but
-    # whose constructor takes a process object as its argument.
-    #
-
-    class PopenSpawn(object):
-        '''
-        Start a subprocess to run the code of a process object
-        '''
-        def __init__(self, process_obj):
-            cmd = ' '.join('"%s"' % x for x in get_command_line())
-            prep_data = get_preparation_data(process_obj._name, False)
-
-            # create pipe for communication with child
-            rfd, wfd = os.pipe()
-
-            # get handle for read end of the pipe and make it inheritable
-            rhandle = duplicate(msvcrt.get_osfhandle(rfd), inheritable=True)
-            os.close(rfd)
-
-            with open(wfd, 'wb', closefd=True) as to_child:
-                # start process
-                try:
-                    hp, ht, pid, tid = _winapi.CreateProcess(
-                        _python_exe, cmd + (' %s' % rhandle),
-                        None, None, 1, 0, None, None, None
-                        )
-                    _winapi.CloseHandle(ht)
-                finally:
-                    close(rhandle)
-
-                # set attributes of self
-                self.pid = pid
-                self.returncode = None
-                self._handle = hp
-                self.sentinel = int(hp)
-
-                # send information to child
-                _tls.spawning_popen = self
-                try:
-                    dump(prep_data, to_child, HIGHEST_PROTOCOL)
-                    dump(process_obj, to_child, HIGHEST_PROTOCOL)
-                finally:
-                    _tls.spawning_popen = None
-
-        def duplicate_for_child(self, handle):
-            return duplicate(handle, _tls.spawning_popen.sentinel)
-
-        def wait(self, timeout=None):
-            if self.returncode is None:
-                if timeout is None:
-                    msecs = _winapi.INFINITE
-                else:
-                    msecs = max(0, int(timeout * 1000 + 0.5))
-
-                res = _winapi.WaitForSingleObject(int(self._handle), msecs)
-                if res == _winapi.WAIT_OBJECT_0:
-                    code = _winapi.GetExitCodeProcess(self._handle)
-                    if code == TERMINATE:
-                        code = -signal.SIGTERM
-                    self.returncode = code
-
-            return self.returncode
-
-        def poll(self):
-            return self.wait(timeout=0)
-
-        def terminate(self):
-            if self.returncode is None:
-                try:
-                    _winapi.TerminateProcess(int(self._handle), TERMINATE)
-                except OSError:
-                    if self.wait(timeout=1.0) is None:
-                        raise
 
 #
 # _python_exe is the assumed path to the python executable.
 # People embedding Python want to modify it.
 #
 
+if sys.platform != 'win32':
+    WINEXE = False
+    WINSERVICE = False
+else:
+    WINEXE = (sys.platform == 'win32' and getattr(sys, 'frozen', False))
+    WINSERVICE = sys.executable.lower().endswith("pythonservice.exe")
+
 if WINSERVICE:
     _python_exe = os.path.join(sys.exec_prefix, 'python.exe')
 else:

Lib/multiprocessing/popen_fork.py

+import os
+import sys
+import signal
+from . import util
+
+__all__ = ['PopenFork']
+
+
+class PopenFork(object):
+
+    def __init__(self, process_obj):
+        sys.stdout.flush()
+        sys.stderr.flush()
+        self.returncode = None
+        self._launch(process_obj)
+
+    def duplicate_for_child(self, fd):
+        self._fds.append(fd)
+        return fd
+
+    def poll(self, flag=os.WNOHANG):
+        if self.returncode is None:
+            try:
+                pid, sts = os.waitpid(self.pid, flag)
+            except os.error:
+                # Child process not yet created. See #1731717
+                # e.errno == errno.ECHILD == 10
+                return None
+            if pid == self.pid:
+                if os.WIFSIGNALED(sts):
+                    self.returncode = -os.WTERMSIG(sts)
+                else:
+                    assert os.WIFEXITED(sts)
+                    self.returncode = os.WEXITSTATUS(sts)
+        return self.returncode
+
+    def wait(self, timeout=None):
+        if self.returncode is None:
+            if timeout is not None:
+                from .connection import wait
+                if not wait([self.sentinel], timeout):
+                    return None
+            # This shouldn't block if wait() returned successfully.
+            return self.poll(os.WNOHANG if timeout == 0.0 else 0)
+        return self.returncode
+
+    def terminate(self):
+        if self.returncode is None:
+            try:
+                os.kill(self.pid, signal.SIGTERM)
+            except ProcessLookupError:
+                pass
+            except OSError:
+                if self.wait(timeout=0.1) is None:
+                    raise
+
+    def duplicate_for_child(self, fd):
+        return fd
+
+    def _launch(self, process_obj):
+        code = 1
+        parent_r, child_w = os.pipe()
+        self.pid = os.fork()
+        if self.pid == 0:
+            try:
+                os.close(parent_r)
+                if 'random' in sys.modules:
+                    import random
+                    random.seed()
+                code = process_obj._bootstrap()
+            finally:
+                os._exit(code)
+        else:
+            os.close(child_w)
+            util.Finalize(self, os.close, (parent_r,))
+            self.sentinel = parent_r

Lib/multiprocessing/popen_forkserver.py

+import os
+import io
+
+from .popen_fork import PopenFork
+
+__all__ = ['PopenForkServer']
+
+
+class PopenForkServer(PopenFork):
+
+    def __init__(self, process_obj):
+        self._fds = []
+        PopenFork.__init__(self, process_obj)
+
+    def duplicate_for_child(self, fd):
+        self._fds.append(fd)
+        return len(self._fds) - 1
+
+    def _launch(self, process_obj):
+        from . import helper, util, forking
+        prep_data = forking.get_preparation_data(process_obj._name, True)
+
+        buf = io.BytesIO()
+        forking._tls.spawning_popen = self
+        try:
+            forking.dump(prep_data, buf, -1)
+            forking.dump(process_obj, buf, -1)
+        finally:
+            del forking._tls.spawning_popen
+
+        self.sentinel, w = helper.forkserver_helper.prepare_new_process(
+            self._fds)
+        util.Finalize(self, os.close, (self.sentinel,))
+        with open(w, 'wb', closefd=True) as f:
+            f.write(buf.getbuffer())
+        self.pid = helper.forkserver_helper.read_ulong(self.sentinel)
+
+    def poll(self, flag=os.WNOHANG):
+        if self.returncode is None:
+            from .connection import wait
+            from .helper import forkserver_helper
+            timeout = 0 if flag == os.WNOHANG else None
+            if not wait([self.sentinel], timeout):
+                return None
+            try:
+                self.returncode = forkserver_helper.read_ulong(
+                    self.sentinel)
+            except (OSError, ValueError):
+                # The process ended abnormally perhaps because of a signal
+                self.returncode = 255
+        return self.returncode

Lib/multiprocessing/popen_spawn_posix.py

+import io
+import os
+import sys
+import _posixsubprocess
+
+from .popen_fork import PopenFork
+
+__all__ = ['PopenSpawn']
+
+
+def spawn(cmd, executable, fds):
+    # Start a process with only specified fds kept open
+    fds = sorted(fds)
+    errpipe_read, errpipe_write = _posixsubprocess.cloexec_pipe()
+    try:
+        return _posixsubprocess.fork_exec(
+            cmd, [os.fsencode(executable)], True, fds, None, None,
+            -1, -1, -1, -1, -1, -1, errpipe_read, errpipe_write,
+            False, False, None)
+    finally:
+        os.close(errpipe_read)
+        os.close(errpipe_write)
+
+
+class PopenSpawn(PopenFork):
+    def __init__(self, process_obj):
+        self._fds = []
+        PopenFork.__init__(self, process_obj)
+
+    def duplicate_for_child(self, fd):
+        self._fds.append(fd)
+        return fd
+
+    def _launch(self, process_obj):
+        from . import helper, util, forking
+        self._fds.append(helper.cleanup_helper.getfd())
+        prep_data = forking.get_preparation_data(process_obj._name, False)
+        fp = io.BytesIO()
+        forking._tls.spawning_popen = self
+        try:
+            forking.dump(prep_data, fp, -1)
+            forking.dump(process_obj, fp, -1)
+        finally:
+            forking._tls.spawning_popen = None
+
+        parent_r = child_w = child_r = parent_w = None
+        try:
+            parent_r, child_w = os.pipe()
+            child_r, parent_w = os.pipe()
+            cmd = forking.get_command_line() + [str(child_r)]
+            self._fds.extend([child_r, child_w])
+            self.pid = spawn(cmd, sys.executable, self._fds)
+            self.sentinel = parent_r
+            with open(parent_w, 'wb', closefd=False) as f:
+                f.write(fp.getbuffer())
+        finally:
+            if parent_r is not None:
+                util.Finalize(self, os.close, (parent_r,))
+            for fd in (child_r, child_w, parent_w):
+                if fd is not None:
+                    os.close(fd)

Lib/multiprocessing/popen_spawn_win32.py

+import os
+import sys
+import msvcrt
+import _winapi
+
+#
+#
+#
+
+TERMINATE = 0x10000
+WINEXE = (sys.platform == 'win32' and getattr(sys, 'frozen', False))
+WINSERVICE = sys.executable.lower().endswith("pythonservice.exe")
+
+close = _winapi.CloseHandle
+
+#
+#
+#
+
+def duplicate(handle, target_process=None, inheritable=False):
+    if target_process is None:
+        target_process = _winapi.GetCurrentProcess()
+    return _winapi.DuplicateHandle(
+        _winapi.GetCurrentProcess(), handle, target_process,
+        0, inheritable, _winapi.DUPLICATE_SAME_ACCESS
+        )
+
+#
+# We define a Popen class similar to the one from subprocess, but
+# whose constructor takes a process object as its argument.
+#
+
+class PopenSpawn(object):
+    '''
+    Start a subprocess to run the code of a process object
+    '''
+    def __init__(self, process_obj):
+        cmd = ' '.join('"%s"' % x for x in get_command_line())
+        prep_data = get_preparation_data(process_obj._name, False)
+
+        # create pipe for communication with child
+        rfd, wfd = os.pipe()
+
+        # get handle for read end of the pipe and make it inheritable
+        rhandle = duplicate(msvcrt.get_osfhandle(rfd), inheritable=True)
+        os.close(rfd)
+
+        with open(wfd, 'wb', closefd=True) as to_child:
+            # start process
+            try:
+                hp, ht, pid, tid = _winapi.CreateProcess(
+                    _python_exe, cmd + (' %s' % rhandle),
+                    None, None, 1, 0, None, None, None
+                    )
+                _winapi.CloseHandle(ht)
+            finally:
+                close(rhandle)
+
+            # set attributes of self
+            self.pid = pid
+            self.returncode = None
+            self._handle = hp
+            self.sentinel = int(hp)
+
+            # send information to child
+            _tls.spawning_popen = self
+            try:
+                dump(prep_data, to_child, HIGHEST_PROTOCOL)
+                dump(process_obj, to_child, HIGHEST_PROTOCOL)
+            finally:
+                _tls.spawning_popen = None
+
+    def duplicate_for_child(self, handle):
+        return duplicate(handle, _tls.spawning_popen.sentinel)
+
+    def wait(self, timeout=None):
+        if self.returncode is None:
+            if timeout is None:
+                msecs = _winapi.INFINITE
+            else:
+                msecs = max(0, int(timeout * 1000 + 0.5))
+
+            res = _winapi.WaitForSingleObject(int(self._handle), msecs)
+            if res == _winapi.WAIT_OBJECT_0:
+                code = _winapi.GetExitCodeProcess(self._handle)
+                if code == TERMINATE:
+                    code = -signal.SIGTERM
+                self.returncode = code
+
+        return self.returncode
+
+    def poll(self):
+        return self.wait(timeout=0)
+
+    def terminate(self):
+        if self.returncode is None:
+            try:
+                _winapi.TerminateProcess(int(self._handle), TERMINATE)
+            except OSError:
+                if self.wait(timeout=1.0) is None:
+                    raise

Lib/test/test_multiprocessing.py

             'multiprocessing', 'multiprocessing.connection',
             'multiprocessing.heap', 'multiprocessing.managers',
             'multiprocessing.pool', 'multiprocessing.process',
-            'multiprocessing.synchronize', 'multiprocessing.util'
+            'multiprocessing.synchronize', 'multiprocessing.util',
+            'multiprocessing.forking'
             ]
 
         if HAS_REDUCTION: