Commits

Cameron Simpson  committed 6102b91

cs.threads: runTree_inner: queue functions directly, collect via LateFunctions (avoiding deadlock if a function throws an exception), some result delivery streamlining

  • Participants
  • Parent commits 777b693

Comments (0)

Files changed (1)

File lib/python/cs/threads.py

 from collections import namedtuple
 from copy import copy
 from functools import partial
+from itertools import chain
 import sys
 import time
 from threading import Lock
       Return a LateFunction to collect the final result.
       `func` is a many-to-many function.
   '''
+  from cs.later import report
   debug("runTree_inner(input=%s, ops=%s, state=%s, funcQ=%s, retq=%s)...",
     input, ops, state, funcQ, retq)
   if retq is None:
   try:
     op = ops.next()
   except StopIteration:
-    funcQ.defer(lambda: retq.put(input))
+    retq.put(input)
     return retq
 
   func, fork_input, fork_state = op.func, op.fork_input, op.fork_state
     # 1-iterable of all-items iterables
     inputs = (input,)
 
-  # submit function calls that put results onto a Queue
-  Q = Queue()
-  def func_put(Q, input, state):
-    ''' Call `func` with `input` and `state`.
-        Put results onto the Queue for collection.
-    '''
-    debug("func_put: call func...")
-    results = func(input, state)
-    debug("func_put: called func, got: %s", results)
-    Q.put(list(results))
-  nfuncs = 0
+  LFs = []
   for input in inputs:
     substate = copy(state) if fork_state else state
-    funcQ.defer(func_put, Q, input, substate)
-    nfuncs += 1
+    LF = funcQ.defer(func, input, substate)
+    LFs.append(LF)
 
   # now submit a function to collect the results
   # if there are no more ops, put the outputs onto the retq
-  def func_get(Q, nfuncs):
-    debug("func_get(Q, nfuncs=%d)...", nfuncs)
-    outputs = []
-    while nfuncs > 0:
-      debug("func_get: %d results to collect, Q.get()...", nfuncs)
-      results = Q.get()
-      debug("func_get: Q.get() got: %s", results)
-      outputs.extend(results)
-      nfuncs -= 1
-    Q = None
+  def collate_and_requeue(LFs):
+    debug("collate_and_requeue(LFs=%s)...", LFs)
+    results = []
+    for LF in report(LFs):
+      result, exc_info = LF.wait()
+      if exc_info:
+        exception("exception: %s", exc_info[1])
+      else:
+        results.append(result)
     # resubmit with new state etc
     debug("func_get: queue another call to runTree_inner")
-    funcQ.defer(runTree_inner, outputs, ops, state, funcQ, retq)
-  funcQ.defer(func_get, Q, nfuncs)
-  Q = None
+    funcQ.defer(runTree_inner, chain(*results), ops, state, funcQ, retq)
+  funcQ.defer(collate_and_requeue, LFs)
 
   return retq