Commits

holger krekel  committed 7d7649d

refine termination to only have one place where finished_receiving, terminate_execution()
and closing of io is called.

  • Participants
  • Parent commits 87dcd87

Comments (0)

Files changed (4)

File execnet/__init__.py

 
 (c) 2012, Holger Krekel and others
 """
-__version__ = '1.2.0.dev1'
+__version__ = '1.2.0.dev2'
 
 from . import apipkg
 

File execnet/gateway.py

             self._send(Message.GATEWAY_TERMINATE)
             self._trace("--> io.close_write")
             self._io.close_write()
-        except IOError:
+        except (ValueError, EOFError, IOError):
             v = sys.exc_info()[1]
             self._trace("io-error: could not send termination sequence")
             self._trace(" exception: %r" % v)

File execnet/gateway_base.py

             return "<Message.%s channelid=%d %s>" %(name,
                         self.channelid, r)
 
+class GatewayReceivedTerminate(Exception):
+    """ Receiverthread got termination message. """
+
 def _setupmessages():
     def status(message, gateway):
         # we use the channelid to send back information
         gateway._channelfactory._local_close(message.channelid, sendonly=True)
 
     def gateway_terminate(message, gateway):
-        # wake up and terminate any execution waiting to receive something
-        gateway._channelfactory._finished_receiving()
-        # then try harder to terminate execution
-        gateway._terminate_execution()
+        raise GatewayReceivedTerminate(gateway)
 
     def reconfigure(message, gateway):
         if message.channelid == 0:
         # globals may be NONE at process-termination
         self.__trace = trace
         self._geterrortext = geterrortext
-        self._workerpool = self.execmodel.WorkerPool(1)
+        self._receivepool = self.execmodel.WorkerPool(1)
 
     def _trace(self, *msg):
         self.__trace(self.id, *msg)
 
     def _initreceive(self):
-        self._receiverthread = self._workerpool.spawn(self._thread_receiver)
+        self._receiverthread = self._receivepool.spawn(self._thread_receiver)
 
     def _thread_receiver(self):
         def log(*msg):
             self._trace("[receiver-thread]", *msg)
+
         log("RECEIVERTHREAD: starting to run")
-        eof = False
         io = self._io
         try:
             try:
                     with self._receivelock:
                         msg.received(self)
                         del msg
-            except self._sysex:
-                log("io.close_read()")
-                self._io.close_read()
+            except (KeyboardInterrupt, GatewayReceivedTerminate):
+                pass
             except EOFError:
-                log("got EOF")
-                #log("traceback was: ", self._geterrortext(self.exc_info()))
+                log("EOF without prior gateway termination message")
                 self._error = self.exc_info()[1]
-                eof = True
-            except:
+            except Exception:
                 log(self._geterrortext(self.exc_info()))
         finally:
             try:
                 log('entering finalization')
-                # wake up any execution waiting to receive something
+                # wake up and terminate any execution waiting to receive
                 self._channelfactory._finished_receiving()
-                if eof:
-                    self._terminate_execution()
+                log('terminating execution')
+                self._terminate_execution()
+                log('closing read')
+                self._io.close_read()
+                log('closing write')
+                self._io.close_write()
                 log('leaving finalization')
-            except:
-                pass # XXX be silent at interp-shutdown
+            except:  # be silent at shutdown
+                pass
 
     def _terminate_execution(self):
         pass
         except (IOError, ValueError):
             e = sys.exc_info()[1]
             self._trace('failed to send', message, e)
-            raise
-
+            # ValueError might be because the IO is already closed
+            raise IOError("cannot send (already closed?)")
 
     def _local_schedulexec(self, channel, sourcetask):
         channel.close("execution disallowed")
         # called from receiverthread
         self._trace("shutting down execution pool")
         self._execpool.shutdown()
-        if not self._execpool.waitall(1.0):
-            self._trace("execution ongoing, trying interrupt_main")
+        if not self._execpool.waitall(5.0):
+            self._trace("execution ongoing after 5 secs, trying interrupt_main")
             # We try hard to terminate execution based on the assumption
             # that there is only one gateway object running per-process.
             if sys.platform != "win32":
                     trace("waiting for execution to finish")
                     self._execpool.wait_for_shutdown()
             finally:
-                trace("execution finished, closing io write stream")
-                self._io.close_write()
+                trace("execution finished")
 
             trace("joining receiver thread")
             self.join()
         name='execnet',
         description='execnet: rapid multi-Python deployment',
         long_description = __doc__,
-        version='1.2.0.dev1',
+        version='1.2.0.dev2',
         url='http://codespeak.net/execnet',
         license='MIT',
         platforms=['unix', 'linux', 'osx', 'cygwin', 'win32'],