Commits

Ronny Pfannschmidt  committed 2a0f875

topological shutdown

  • Participants
  • Parent commits ec324fa

Comments (0)

Files changed (1)

File topologic-shutdown

 # Parent 0b929d167a8c9d84955c193832f8bf3ebb898423
 ensure gateways shut down before their io proxy is shut down
 
+diff --git a/execnet/multi.py b/execnet/multi.py
+--- a/execnet/multi.py
++++ b/execnet/multi.py
+@@ -142,26 +142,34 @@ class Group:
+         and ssh-gateways.  Timeout defaults to None meaning
+         open-ended waiting and no kill attempts.
+         """
+-        for gw in reversed(self):
+-            gw.exit()
+-        def join_receiver_and_wait_for_subprocesses():
+-            for gw in self._gateways_to_join:
+-                gw.join()
+-            while self._gateways_to_join:
+-                gw = self._gateways_to_join[0]
+-                gw._io.wait()
+-                del self._gateways_to_join[0]
+-        from execnet.threadpool import WorkerPool
+-        pool = WorkerPool(1)
+-        reply = pool.dispatch(join_receiver_and_wait_for_subprocesses)
+-        try:
+-            reply.get(timeout=timeout)
+-        except IOError:
+-            trace("Gateways did not come down after timeout: %r"
+-                  %(self._gateways_to_join))
+-            while self._gateways_to_join:
+-                gw = self._gateways_to_join.pop(0)
+-                gw._io.kill()
++
++        while self:
++            #XXX multiplies the maximal timeout by the levels of indirection
++            vias = {}
++            for gw in self:
++                if gw.spec.via:
++                    vias[gw.spec.via] = True
++            for gw in self:
++                if gw.id not in vias:
++                    gw.exit()
++            def join_receiver_and_wait_for_subprocesses():
++                for gw in self._gateways_to_join:
++                    gw.join()
++                while self._gateways_to_join:
++                    gw = self._gateways_to_join[0]
++                    gw._io.wait()
++                    del self._gateways_to_join[0]
++            from execnet.threadpool import WorkerPool
++            pool = WorkerPool(1)
++            reply = pool.dispatch(join_receiver_and_wait_for_subprocesses)
++            try:
++                reply.get(timeout=timeout)
++            except IOError:
++                trace("Gateways did not come down after timeout: %r"
++                      %(self._gateways_to_join))
++                while self._gateways_to_join:
++                    gw = self._gateways_to_join.pop(0)
++                    gw._io.kill()
+ 
+     def remote_exec(self, source, **kwargs):
+         """ remote_exec source on all member gateways and return
+diff --git a/testing/test_multi.py b/testing/test_multi.py
+--- a/testing/test_multi.py
++++ b/testing/test_multi.py
+@@ -84,9 +84,13 @@ class TestGroup:
+             def wait(self):
+                 pass
+ 
++        class PseudoSpec:
++            via = None
++
+         class PseudoGW:
+             id = "9999"
+             _io = PseudoIO()
++            spec = PseudoSpec()
+             def exit(self):
+                 exitlist.append(self)
+                 group._unregister(self)
+@@ -176,3 +180,11 @@ class TestGroup:
+         mch = group.remote_exec(fun, arg=1)
+         result = mch.receive_each()
+         assert result == [1]
++
++    def test_terminate_with_proxying(self):
++        group = Group()
++        master = group.makegateway('popen//id=master')
++        slave = group.makegateway('popen//via=master//id=slave')
++        group.terminate(1.0)
++
++