Commits

Buck Evan committed f329a35

fix pre-existing flaws in the multiprocessing doctest
main fix was to use multiprocess.Manager in place of the "raw" objects
all changes in the plugin are simply reactions to differences in
API between the raw and managed object types

Comments (0)

Files changed (2)

functional_tests/doc_tests/test_multiprocess/multiprocess.rst

 
     >>> run(argv=['nosetests', '-v', '--processes=2', test_can_split],
     ...     plugins=[MultiProcess()]) #doctest: +ELLIPSIS
+    setup called
+    teardown called
     test_can_split....
     ...
     FAILED (failures=...)

nose/plugins/multiprocess.py

 def _import_mp():
     global Process, Queue, Pool, Event, Value, Array
     try:
-        from multiprocessing import (Process as Process_, Queue as Queue_,
-                                     Pool as Pool_, Event as Event_,
-                                     Value as Value_, Array as Array_)
-        Process, Queue, Pool, Event, Value, Array = (Process_, Queue_, Pool_,
-                                                     Event_, Value_, Array_)
+        from multiprocessing import Manager, Process
+        m = Manager()
+        Queue, Pool, Event, Value, Array = (
+                m.Queue, m.Pool, m.Event, m.Value, m.Array
+        )
     except ImportError:
         warn("multiprocessing module is not available, multiprocess plugin "
              "cannot be used", RuntimeWarning)
 
         log.debug("Starting %s workers", self.config.multiprocess_workers)
         for i in range(self.config.multiprocess_workers):
-            currentaddr = Array('c',1000)
-            currentaddr.value = bytes_('')
-            currentstart = Value('d')
+            currentaddr = Value('c',bytes_(''))
+            currentstart = Value('d',0.0)
             keyboardCaught = Event()
             p = Process(target=runner, args=(i, testQueue, resultQueue,
                                              currentaddr, currentstart,
                     workers[iworker].join(timeout=1)
                     if not shouldStop.is_set() and not testQueue.empty():
                         log.debug('starting new process on worker %s',iworker)
-                        currentaddr = Array('c',1000)
-                        currentaddr.value = bytes_('')
-                        currentstart = Value('d')
-                        currentstart.value = time.time()
+                        currentaddr = Value('c',bytes_(''))
+                        currentstart = Value('d',time.time())
                         keyboardCaught = Event()
                         workers[iworker] = Process(target=runner,
                                                    args=(iworker, testQueue,
                 for iworker, w in enumerate(workers):
                     if w.is_alive():
                         worker_addr = bytes_(w.currentaddr.value,'ascii')
-                        timeprocessing = time.time()-w.currentstart.value
-                        if (len(worker_addr) == 0
-                            and timeprocessing > self.config.multiprocess_timeout-0.1):
+                        timeprocessing = time.time() - w.currentstart.value
+                        if ( len(worker_addr) == 0
+                                and timeprocessing > self.config.multiprocess_timeout-0.1):
                             log.debug('worker %d has finished its work item, '
                                       'but is not exiting? do we wait for it?',
                                       iworker)
                                     # have to terminate...
                                     log.error("terminating worker %s",iworker)
                                     w.terminate()
-                                    currentaddr = Array('c',1000)
-                                    currentaddr.value = bytes_('')
-                                    currentstart = Value('d')
-                                    currentstart.value = time.time()
+                                    currentaddr = Value('c',bytes_(''))
+                                    currentstart = Value('d',time.time())
                                     keyboardCaught = Event()
                                     workers[iworker] = Process(target=runner,
                                         args=(iworker, testQueue, resultQueue,
            keyboardCaught, shouldStop, loaderClass, resultClass, config):
     try:
         try:
-            try:
-                return __runner(ix, testQueue, resultQueue, currentaddr, currentstart,
-                        keyboardCaught, shouldStop, loaderClass, resultClass, config)
-            except KeyboardInterrupt:
-                keyboardCaught.set()
-                log.debug('Worker %s keyboard interrupt, stopping',ix)
-        except Empty:
-            log.debug("Worker %s timed out waiting for tasks", ix)
-    finally:
-        testQueue.close()
-        resultQueue.close()
+            return __runner(ix, testQueue, resultQueue, currentaddr, currentstart,
+                    keyboardCaught, shouldStop, loaderClass, resultClass, config)
+        except KeyboardInterrupt:
+            log.debug('Worker %s keyboard interrupt, stopping',ix)
+    except Empty:
+        log.debug("Worker %s timed out waiting for tasks", ix)
 
 def __runner(ix, testQueue, resultQueue, currentaddr, currentstart,
            keyboardCaught, shouldStop, loaderClass, resultClass, config):