Commits

Cameron Simpson committed c3beb75

cs.threads: runTree(): sanity check RunTreeOp constructor calls, correct calls in "FORK" implementation, adjust unit tests

Comments (0)

Files changed (2)

lib/python/cs/threads.py

       return func(*a, **kw)
   return f
 
-RunTreeOp = namedtuple('RunTreeOp', 'func mode copystate branch')
+_RunTreeOp = namedtuple('RunTreeOp', 'func mode copystate branch')
+
+def RunTreeOp(func, mode, copy, branch):
+  ok = True
+  if func is not None and not callable(func):
+    error("RunTreeOp: bad func: %r", func)
+    ok = False
+  if mode is not None and mode not in ('PARALLEL', 'FORK'):
+    error("RunTreeOp: bad mode: %r", mode)
+    ok = False
+  if branch is not None and not callable(branch):
+    error("RunTreeOp: bad branch: %r", branch)
+    ok = False
+  if not ok:
+    raise ValueError("invalid RunTreeOp() call")
+  return _RunTreeOp(func, mode, copy, branch)
 
 def runTree(input, operators, state, funcQ):
   ''' Descend an operation tree expressed as:
       This is the core algorithm underneath the cs.app.pilfer operation.
   '''
   from cs.later import report
+  input0 = input
   operators = list(operators)
   bg = []
   while operators:
         # push the function back on without a fork
         # then queue a call per current item
         # using a copy of the state
-        suboperators = tuple([ RunTreeOp(op.func, False, False, op.branch) ] + operators)
+        suboperators = tuple([ RunTreeOp(op.func, None, False, op.branch) ] + operators)
         qops = []
         for item in input:
           substate = copy(state) if op.copystate else state
-          qops.append(funcQ.bg(runTree, (item,), suboperators, substate, funcQ))
+          qops.append(funcQ.bg(runTree, item, suboperators, substate, funcQ))
         outputs = []
         for qop in qops:
           output, exc_info = qop.wait()

lib/python/cs/threads_tests.py

 
 class TestRuntree(unittest.TestCase):
 
+  # A many to many identity function.
   @staticmethod
-  def f_same(items, state):
-    return items
+  def f_same(input, state):
+    return input
+  # A one to (one,) identity function.
+  @staticmethod
+  def f_same_one2many(input, state):
+    return (input,)
   @staticmethod
   def f_incr(items, state):
     return [ n+1 for n in items ]
 
   def test__01_same(self):
     L = Later(1)
-    self.assertEquals(runTree( [1,2,3], [ RunTreeOp(self.f_same, False, False, None) ], None, L), [1,2,3])
+    self.assertEquals(runTree( [1,2,3], [ RunTreeOp(self.f_same, None, False, None) ], None, L), [1,2,3])
     L.close()
 
   def test__01_same_fork(self):
     L = Later(1)
-    self.assertEquals(list(runTree( [1,2,3], [ RunTreeOp(self.f_same, True, True, None) ], None, L)), [1,2,3])
+    self.assertEquals(list(runTree( [1,2,3], [ RunTreeOp(self.f_same_one2many, 'FORK', True, None) ], None, L)), [1,2,3])
     L.close()
 
 def selftest(argv):