Commits

Armin Rigo committed bad40ff

Work in progress: a Pool class with a subset of the interface from
the multithreading module.

  • Participants
  • Parent commits e3e77dd

Comments (0)

Files changed (1)

hack/pypy-hack/stm/stmpool.py

+import thread
+from Queue import Queue
+from __pypy__.thread import atomic, signals_enabled
+
+
+class Pool(object):
+
+    def __init__(self, processes=3):
+        self._taskqueue = Queue()
+        for i in range(processes):
+            thread.start_new_thread(_run_thread, (self._taskqueue,))
+
+    def __del__(self):
+        self._taskqueue.append(None)
+
+    def imap_unordered(self, function, iterable):
+        resultsqueue = Queue()
+        self._taskqueue.put((function, iter(iterable), resultsqueue, [], []))
+        return _fetch_results(resultsqueue)
+
+
+def _run_thread_1(function, iterator, resultsqueue, working, interrupt):
+    try:
+        with signals_enabled:
+            for item in iterator:
+                with atomic:
+                    result = function(item)
+                if interrupt:
+                    break
+                resultsqueue.put((0, result))
+                del result
+    except:
+        resultsqueue.put((1, sys.exc_info()))
+        interrupt.append(1)
+    if working.pop() == 0:
+        resultsqueue.put((1, None))
+
+def _run_thread(taskqueue):
+    while True:
+        queued = taskqueue.get()
+        if queued is None:
+            taskqueue.put(None)
+            return
+        queued[3].append(len(queued[3]))
+        taskqueue.put(queued)
+        _run_thread_1(*queued)
+        del queued
+
+def _fetch_results(resultsqueue):
+    while True:
+        finished, result = resultsqueue.get()
+        if finished:
+            break
+        yield result
+    if result is not None:
+        exc = result
+        while resultsqueue.get() != (1, None):
+            pass
+        raise exc[0], exc[1], exc[2]