Source

execnet-patches / io-on-remote

Full commit
# HG changeset patch
# Parent 8e59db1f886eb16bcc28c382cb3916b8f4ae86f0
introduce creating a io on a gateway and proxying to it

diff --git a/conftest.py b/conftest.py
--- a/conftest.py
+++ b/conftest.py
@@ -82,7 +82,7 @@ def pytest_generate_tests(metafunc):
         elif hasattr(metafunc.cls, 'gwtype'):
             gwtypes = [metafunc.cls.gwtype]
         else:
-            gwtypes = ['popen', 'socket', 'ssh']
+            gwtypes = ['popen', 'socket', 'ssh', 'proxy']
         metafunc.parametrize("gw", gwtypes, indirect=True)
     elif 'anypython' in metafunc.funcargnames:
         metafunc.parametrize("anypython", indirect=True, argvalues=
@@ -144,4 +144,7 @@ def pytest_funcarg__gw(request):
         elif request.param == "ssh":
             sshhost = request.getfuncargvalue('specssh').ssh
             gw = group.makegateway("ssh=%s//id=ssh" %(sshhost,))
+        elif request.param == 'proxy':
+            master = group.makegateway('popen//id=proxy-transport')
+            gw = group.makegateway('popen//via=proxy-transport//id=proxy')
         return gw
diff --git a/execnet/gateway_base.py b/execnet/gateway_base.py
--- a/execnet/gateway_base.py
+++ b/execnet/gateway_base.py
@@ -147,7 +147,7 @@ class Message:
         except LoadError:
             data = self.data
         r = repr(data)
-        if len(r) > 50:
+        if len(r) > 90:
             return "<Message.%s channelid=%d len=%d>" %(name,
                         self.channelid, len(r))
         else:
diff --git a/execnet/gateway_io.py b/execnet/gateway_io.py
--- a/execnet/gateway_io.py
+++ b/execnet/gateway_io.py
@@ -5,9 +5,13 @@ creates io instances used for gateway io
 """
 import os
 import sys
-from execnet.gateway_base import Popen2IO
 from subprocess import Popen, PIPE
 
+try:
+    from execnet.gateway_base import Popen2IO, Message
+except ImportError:
+    from __main__ import Popen2IO, Message
+
 class Popen2IOMaster(Popen2IO):
     def __init__(self, args):
         self.popen = p = Popen(args, stdin=PIPE, stdout=PIPE)
@@ -87,3 +91,79 @@ def create_io(spec):
         io.remoteaddress = spec.ssh
         return io
 
+RIO_KILL = 1
+RIO_WAIT = 2
+RIO_REMOTEADDRESS = 3
+RIO_CLOSE_WRITE = 4
+
+class RemoteIO(object):
+    def __init__(self, master_channel):
+        self.iochan = master_channel.gateway.newchannel()
+        self.controlchan = master_channel.gateway.newchannel()
+        master_channel.send((self.iochan, self.controlchan))
+        self.io = self.iochan.makefile('r')
+
+
+    def read(self, nbytes):
+        return self.io.read(nbytes)
+
+    def write(self, data):
+        return self.iochan.send(data)
+
+    def _controll(self, event):
+        self.controlchan.send(event)
+        return self.controlchan.receive()
+
+    def close_write(self):
+        self._controll(RIO_CLOSE_WRITE)
+
+    def kill(self):
+        self._controll(RIO_KILL)
+
+    def wait(self):
+        return self._controll(RIO_WAIT)
+
+
+
+def serve_remote_io(channel):
+    class PseudoSpec(object):
+        def __getattr__(self, name):
+            return None
+    spec = PseudoSpec()
+    spec.__dict__.update(channel.receive())
+    io = create_io(spec)
+    io_chan, control_chan = channel.receive()
+    io_target = io_chan.makefile()
+
+    def iothread():
+        initial = io.read(1)
+        assert initial == '1'.encode('ascii')
+        channel.gateway._trace('initializing transfer io for', spec.id)
+        io_target.write(initial)
+        while True:
+            message = Message.from_io(io)
+            message.to_io(io_target)
+    import threading
+    thread = threading.Thread(name='io-forward-'+spec.id,
+                              target=iothread)
+    thread.setDaemon(True)
+    thread.start()
+
+    def iocallback(data):
+        io.write(data)
+    io_chan.setcallback(iocallback)
+
+
+    def controll(data):
+        if data==RIO_WAIT:
+            control_chan.send(io.wait())
+        elif data==RIO_KILL:
+            control_chan.send(io.kill())
+        elif data==RIO_REMOTEADDRESS:
+            control_chan.send(io.remoteaddress)
+        elif data==RIO_CLOSE_WRITE:
+            control_chan.send(io.close_write())
+    control_chan.setcallback(controll)
+
+if __name__ == "__channelexec__":
+    serve_remote_io(channel)
diff --git a/execnet/multi.py b/execnet/multi.py
--- a/execnet/multi.py
+++ b/execnet/multi.py
@@ -74,7 +74,14 @@ class Group:
         if not isinstance(spec, XSpec):
             spec = XSpec(spec)
         self.allocate_id(spec)
-        if spec.popen or spec.ssh:
+        if spec.via:
+            assert not spec.socket
+            master = self[spec.via]
+            channel = master.remote_exec(gateway_io)
+            channel.send(vars(spec))
+            io = gateway_io.RemoteIO(channel)
+            gw = gateway_bootstrap.bootstrap(io, spec)
+        elif spec.popen or spec.ssh:
             io = gateway_io.create_io(spec)
             gw = gateway_bootstrap.bootstrap(io, spec)
         elif spec.socket:
@@ -135,7 +142,7 @@ class Group:
         and ssh-gateways.  Timeout defaults to None meaning
         open-ended waiting and no kill attempts.
         """
-        for gw in self:
+        for gw in reversed(self):
             gw.exit()
         def join_receiver_and_wait_for_subprocesses():
             for gw in self._gateways_to_join: