Source

execnet-patches / io-wait-kill

Full commit
# HG changeset patch
# Parent da6994ca8402d9d29a367ed312cce0717ea46dad
unify killing/shuttdown polling for all io's

diff --git a/execnet/gateway.py b/execnet/gateway.py
--- a/execnet/gateway.py
+++ b/execnet/gateway.py
@@ -6,7 +6,8 @@ gateway code for initiating popen, socke
 import sys, os, inspect, types, linecache
 import textwrap
 import execnet
-from execnet.gateway_base import Message, Popen2IO, serialize
+from execnet.gateway_base import Message, serialize
+from execnet.gateway_io import Popen2IOMaster
 from execnet import gateway_base
 importdir = os.path.dirname(os.path.dirname(execnet.__file__))
 
@@ -221,12 +222,11 @@ class PopenCmdGateway(Gateway):
     _remotesetup = "io = init_popen_io()"
     def __init__(self, args, id):
         from subprocess import Popen, PIPE
-        self._popen = p = Popen(args, stdin=PIPE, stdout=PIPE)
-        io = Popen2IO(p.stdin, p.stdout)
+        io = Popen2IOMaster(args)
         super(PopenCmdGateway, self).__init__(io=io, id=id)
         # fix for jython 2.5.1
-        if p.pid is None:
-            p.pid = self.remote_exec(
+        if io.popen.pid is None:
+            io.popen.pid = self.remote_exec(
                 "import os; channel.send(os.getpid())").receive()
 
 popen_bootstrapline = "import sys;exec(eval(sys.stdin.readline()))"
@@ -291,6 +291,6 @@ class SshGateway(PopenCmdGateway):
         try:
             super(SshGateway, self)._remote_bootstrap_gateway(io)
         except EOFError:
-            ret = self._popen.wait()
+            ret = self._io.wait()
             if ret == 255:
                 raise HostNotFound(self.remoteaddress)
diff --git a/execnet/gateway_io.py b/execnet/gateway_io.py
new file mode 100644
--- /dev/null
+++ b/execnet/gateway_io.py
@@ -0,0 +1,60 @@
+"""
+execnet io initialization code
+
+creates io instances used for gateway io
+"""
+import os
+import sys
+from execnet.gateway_base import Popen2IO
+from subprocess import Popen, PIPE
+
+class Popen2IOMaster(Popen2IO):
+    def __init__(self, args):
+        self.popen = p = Popen(args, stdin=PIPE, stdout=PIPE)
+        Popen2IO.__init__(self, p.stdin, p.stdout)
+
+    def wait(self):
+        return self.popen.wait()
+
+    def kill(self):
+        killpopen(self.popen)
+
+def killpopen(popen):
+    try:
+        if hasattr(popen, 'kill'):
+            popen.kill()
+        else:
+            killpid(popen.pid)
+    except EnvironmentError:
+        sys.stderr.write("ERROR killing: %s\n" %(sys.exc_info()[1]))
+        sys.stderr.flush()
+
+def killpid(pid):
+    if hasattr(os, 'kill'):
+        os.kill(pid, 15)
+    elif sys.platform == "win32" or getattr(os, '_name', None) == 'nt':
+        try:
+            import ctypes
+        except ImportError:
+            import subprocess
+            # T: treekill, F: Force
+            cmd = ("taskkill /T /F /PID %d" %(pid)).split()
+            ret = subprocess.call(cmd)
+            if ret != 0:
+                raise EnvironmentError("taskkill returned %r" %(ret,))
+        else:
+            PROCESS_TERMINATE = 1
+            handle = ctypes.windll.kernel32.OpenProcess(
+                        PROCESS_TERMINATE, False, pid)
+            ctypes.windll.kernel32.TerminateProcess(handle, -1)
+            ctypes.windll.kernel32.CloseHandle(handle)
+    else:
+        raise EnvironmentError("no method to kill %s" %(pid,))
+
+
+
+
+
+
+
+
diff --git a/execnet/gateway_socket.py b/execnet/gateway_socket.py
--- a/execnet/gateway_socket.py
+++ b/execnet/gateway_socket.py
@@ -39,6 +39,12 @@ class SocketIO:
         except socket.error:
             pass
 
+    def wait(self):
+        pass
+
+    def kill(self):
+        pass
+
 class SocketGateway(Gateway):
     """ This Gateway provides interaction with a remote process
         by connecting to a specified socket.  On the remote
diff --git a/execnet/multi.py b/execnet/multi.py
--- a/execnet/multi.py
+++ b/execnet/multi.py
@@ -151,8 +151,7 @@ class Group:
                 gw.join()
             while self._gateways_to_join:
                 gw = self._gateways_to_join[0]
-                if hasattr(gw, '_popen'):
-                    gw._popen.wait()
+                gw._io.wait()
                 del self._gateways_to_join[0]
         from execnet.threadpool import WorkerPool
         pool = WorkerPool(1)
@@ -164,9 +163,7 @@ class Group:
                   %(self._gateways_to_join))
             while self._gateways_to_join:
                 gw = self._gateways_to_join.pop(0)
-                popen = getattr(gw, '_popen', None)
-                if popen:
-                    killpopen(popen)
+                gw._io.kill()
 
     def remote_exec(self, source, **kwargs):
         """ remote_exec source on all member gateways and return
@@ -234,38 +231,6 @@ class MultiChannel:
         if first:
             reraise(*first)
 
-def killpopen(popen):
-    try:
-        if hasattr(popen, 'kill'):
-            popen.kill()
-        else:
-            killpid(popen.pid)
-    except EnvironmentError:
-        sys.stderr.write("ERROR killing: %s\n" %(sys.exc_info()[1]))
-        sys.stderr.flush()
-
-def killpid(pid):
-    if hasattr(os, 'kill'):
-        os.kill(pid, 15)
-    elif sys.platform == "win32" or getattr(os, '_name', None) == 'nt':
-        try:
-            import ctypes
-        except ImportError:
-            import subprocess
-            # T: treekill, F: Force
-            cmd = ("taskkill /T /F /PID %d" %(pid)).split()
-            ret = subprocess.call(cmd)
-            if ret != 0:
-                raise EnvironmentError("taskkill returned %r" %(ret,))
-        else:
-            PROCESS_TERMINATE = 1
-            handle = ctypes.windll.kernel32.OpenProcess(
-                        PROCESS_TERMINATE, False, pid)
-            ctypes.windll.kernel32.TerminateProcess(handle, -1)
-            ctypes.windll.kernel32.CloseHandle(handle)
-    else:
-        raise EnvironmentError("no method to kill %s" %(pid,))
-
 default_group = Group()
 makegateway = default_group.makegateway
 
diff --git a/testing/test_gateway.py b/testing/test_gateway.py
--- a/testing/test_gateway.py
+++ b/testing/test_gateway.py
@@ -4,7 +4,7 @@ mostly functional tests of gateways.
 import os, sys, time
 import py
 import execnet
-from execnet import gateway_base, gateway
+from execnet import gateway_base, gateway, gateway_io
 from testing.test_serializer import _find_version
 TESTTIMEOUT = 10.0 # seconds
 needs_osdup = py.test.mark.skipif("not hasattr(os, 'dup')")
@@ -267,9 +267,8 @@ class TestSshPopenGateway:
     gwtype = "ssh"
 
     def test_sshconfig_config_parsing(self, monkeypatch):
-        import subprocess
         l = []
-        monkeypatch.setattr(subprocess, 'Popen',
+        monkeypatch.setattr(gateway_io, 'Popen',
             lambda *args, **kwargs: l.append(args[0]))
         py.test.raises(AttributeError,
             """execnet.makegateway("ssh=xyz//ssh_config=qwe")""")
diff --git a/testing/test_multi.py b/testing/test_multi.py
--- a/testing/test_multi.py
+++ b/testing/test_multi.py
@@ -79,8 +79,14 @@ class TestGroup:
         assert atexitlist == [group._cleanup_atexit]
         exitlist = []
         joinlist = []
+
+        class PseudoIO:
+            def wait(self):
+                pass
+
         class PseudoGW:
             id = "9999"
+            _io = PseudoIO()
             def exit(self):
                 exitlist.append(self)
                 group._unregister(self)