Commits

Cameron Simpson committed ccdf6bd Merge

Automated merge with file:///Users/cameron/hg/css

  • Participants
  • Parent commits 1d17487, 7b9f6ec

Comments (0)

Files changed (5)

File lib/python/cs/app/pilfer.py

   import xml.etree.ElementTree as ElementTree
 from cs.fileutils import file_property
 from cs.later import Later
+from cs.lex import get_identifier
 from cs.logutils import setup_logging, logTo, Pfx, debug, error, warning, exception, pfx_iter, D
 from cs.threads import runTree, RunTreeOp
 from cs.urlutils import URL
 from cs.misc import O
 
+if os.environ.get('DEBUG', ''):
+  def X(tag, *a):
+    D("TRACE: "+tag, *a)
+else:
+  def X(*a):
+    pass
+
 ARCHIVE_SUFFIXES = ( 'tar', 'tgz', 'tar.gz', 'tar.bz2', 'cpio', 'rar', 'zip', 'dmg' )
 IMAGE_SUFFIXES = ( 'png', 'jpg', 'jpeg', 'gif', 'ico', )
 VIDEO_SUFFIXES = ( 'mp2', 'mp4', 'avi', 'wmv', )
           else:
             urls = [ URL(url, None, P.user_agent) ]
           run_ops = [ action_operator(action) for action in argv ]
-          ##D("run_ops = %r", run_ops)
-          with Later(1) as PQ:
+          debug("run_ops = %r", run_ops)
+          with Later(4) as PQ:
+            debug("urls = %s", urls)
             runTree(urls, run_ops, P, PQ)
       else:
         error("unsupported op")
     self._lock = Lock()
     self.flush_print = False
     self._print_to = None
+    self._print_lock = Lock()
     self.user_agent = None
     self.user_vars = {}
     self._urlsfile = None
       if print_to is None:
         print_to = sys.stdout
     kw['file'] = print_to
-    print(*a, **kw)
-    if self.flush_print:
-      print_to.flush()
+    with self._print_lock:
+      print(*a, **kw)
+      if self.flush_print:
+        print_to.flush()
 
   def url_save_dir(self, U):
     ''' Return the current URL save dir.
     else:
       debug("with_exts: discard %s", U)
 
+def substitute(src, regexp, replacement, replace_all):
+  ''' Perform a regexp substitution on `src`.
+      `replacement` is a format string for the replacement text
+      using the str.format method.
+      The matched groups from the regexp take the positional arguments 1..n,
+      with 0 used for the whole matched string.
+      The keyword arguments consist of '_' for the whole matched string
+      and any named groups.
+  '''
+  debug("SUBSTITUTE: src=%r, regexp=%r, replacement=%r, replace_all=%s)...",
+        src, regexp.pattern, replacement, replace_all)
+  strs = []
+  sofar = 0
+  for m in regexp.finditer(src):
+    repl_args = [ m.group(0) ] + list(m.groups())
+    repl_kw = { '_': m.group(0) }
+    repl_kw.update(m.groupdict())
+    strs.append(src[sofar:m.start()])
+    strs.append(replacement.format(*repl_args, **repl_kw))
+    sofar = m.end()
+    if not replace_all:
+      break
+  strs.append(src[sofar:])
+  result = ''.join(strs)
+  debug("SUBSTITUTE: src=%r, result=%r", src, result)
+  return result
+
 def url_delay(U, delay, *a):
   sleep(float(delay))
   yield U
       yield i
 
 def url_hrefs(U, referrer=None):
-  return url_io_iter(URL(U, referrer).hrefs(absolute=True))
+  return list(url_io_iter(URL(U, referrer).hrefs(absolute=True)))
 
 def url_srcs(U, referrer=None):
   return url_io_iter(URL(U, referrer).srcs(absolute=True))
       'hostname':     lambda U, P: URL(U, None).hostname,
       'new_dir':      lambda U, P: (U, P.url_save_dir(U))[0],
       'per':          lambda U, P: (U, P.set_user_vars(save_dir=None))[0],
-      'print':        lambda U, P: (U, print(U))[0],
+      'print':        lambda U, P: (U, P.print(U))[0],
       'query':        lambda U, P, *a: url_query(U, *a),
       'quote':        lambda U, P: quote(U),
       'unquote':      lambda U, P: unquote(U),
       'save':         lambda U, P, *a: url_io(P.url_save, (), U, *a),
       'see':          lambda U, P: (U, P.see(U))[0],
+      'substitute':   lambda U, P, **kw: substitute(U, kw['regexp'], kw['replacement'], kw['all']),
       'title':        lambda U, P: U.title if U.title else U,
       'type':         lambda U, P: url_io(U.content_type, ""),
       'xmlattr':      lambda U, P, attr: [ A for A in (ElementTree.XML(U).get(attr),) if A is not None ],
       'unseen':       lambda U, P: not P.seen(U),
     }
 
-re_ASSIGN = re.compile(r'([a-z]\w*)=')
+re_COMPARE = re.compile(r'([a-z]\w*)==')
+re_ASSIGN  = re.compile(r'([a-z]\w*)=')
 
 def conv_one_to_one(func):
-  ''' Convert a one-to-one function to a one to many.
+  ''' Convert a one-to-one function to a many to many.
   '''
-  def func_one_to_one(U, P):
-    yield func(U, P)
-  return func_one_to_one
+  def converted(items, *a, **kw):
+    results = []
+    for item in items:
+      yield func(item, *a, **kw)
+  return converted
 
 def conv_one_test(func):
   ''' Convert a test-one function to one-to-many.
   '''
-  def func_one_test(U, P):
-    ok = func(U, P)
-    if ok:
-      yield U
-  return func_one_test
+  def converted(items, *a, **kw):
+    for item in items:
+      if func(item, *a, **kw):
+        yield item
+  return converted
+
+def conv_one_to_many(func):
+  def converted(items, *a, **kw):
+    for item in items:
+      for result in func(item, *a, **kw):
+        yield result
+  return converted
 
 def action_operator(action,
                     many_to_many=None,
   ''' Accept a string `action` and return a RunTreeOp for use with
       cs.threads.runTree.
   '''
+  if many_to_many is None:
+    many_to_many = MANY_TO_MANY
+  if one_to_many is None:
+    one_to_many = ONE_TO_MANY
+  if one_to_one is None:
+    one_to_one = ONE_TO_ONE
+  if one_test is None:
+    one_test = ONE_TEST
   with Pfx("%s", action):
-    if many_to_many is None:
-      many_to_many = MANY_TO_MANY
-    if one_to_many is None:
-      one_to_many = ONE_TO_MANY
-    if one_to_one is None:
-      one_to_one = ONE_TO_ONE
-    if one_test is None:
-      one_test = ONE_TEST
     kwargs = {}
     # select URLs matching regexp
     if action.startswith('/'):
         regexp = action[2:]
       kwargs['regexp'] = re.compile(regexp)
       action = 'reject_re'
-    # select URLs ending in particular extensions
+    # parent
     elif action == '..':
       pass
+    # select URLs ending in particular extensions
     elif action.startswith('.'):
       if action.endswith('/i'):
         exts, case = action[1:-2], False
       kwargs['exts'] = exts
       action = 'select_exts'
     else:
-      m = re_ASSIGN.match(action)
+      # varname== comparison
+      m = re_COMPARE.match(action)
       if m:
         var = m.group(1)
         value = action[m.end():]
-        def assign(U, P, var, value):
-          P.user_vars[var] = P.format(value)
-          return U
-      elif ':' in action:
-        action, kws = action.split(':', 1)
-        for kw in kws.split(','):
-          if '=' in kwarg:
-            kw, v = kw.split('=', 1)
-            kwargs[kw] = v
+        k
+      else:
+        # varname= assignment
+        m = re_ASSIGN.match(action)
+        if m:
+          var = m.group(1)
+          value = action[m.end():]
+          def assign(U, P, var, value):
+            P.user_vars[var] = P.format(value)
+            return U
+        else:
+          # regular action: split off parameters if any
+          name, offset = get_identifier(action)
+          if not name:
+            raise ValueError("unparsed action")
+          # s/this/that/
+          if name == 's':
+            if offset == len(action):
+              raise ValueError("missing delimiter")
+            delim = action[offset]
+            delim2pos = action.find(delim, offset+1)
+            if delim2pos < offset+1:
+              raise ValueError("missing second delimiter")
+            regexp = action[offset+1:delim2pos]
+            if not regexp:
+              raise ValueError("empty regexp")
+            delim3pos = action.find(delim, delim2pos+1)
+            if delim3pos < delim2pos+1:
+              raise ValueError("missing third delimiter")
+            repl_format = action[delim2pos+1:delim3pos]
+            offset = delim3pos+1
+            if offset < len(action) and action[offset] == 'g':
+              repl_all = True
+              offset += 1
+            else:
+              repl_all = False
+            if offset < len(action):
+              raise ValueError("unparsed action at: %s" % (action[offset:],))
+            action = 'substitute'
+            debug("s: regexp=%r, replacement=%r, repl_all=%s", regexp, repl_format, repl_all)
+            kwargs['regexp'] = re.compile(regexp)
+            kwargs['replacement'] = repl_format
+            kwargs['all'] = repl_all
           else:
-            kwargs[kwarg] = True
-    op_mode = None
-    do_copystate = False
-    branch_func = None
+            if offset < len(action) and action[offset] == ':':
+              for kw in action[offset+1:].split(','):
+                if '=' in kwarg:
+                  kw, v = kw.split('=', 1)
+                  kwargs[kw] = v
+                else:
+                  kwargs[kwarg] = True
+              offset = len(action)
+            if offset < len(action):
+              raise ValueError("parse error at: %s" % (action[offset:],))
+    fork_input = False
+    fork_state = False
     if action == "per":
+      raise ValueError("per needs fork_ops in addition to fork_state")
       op_mode = 'FORK'
-      do_copystate = True
+      fork_state = True
     if action in many_to_many:
       # many-to-many functions get passed straight in
       func = many_to_many[action]
         func = partial(func, **kwargs)
     elif action in one_to_many:
       # one-to-many is converted into many-to-many
-      op_mode = 'PARALLEL'
+      fork_input = True
       func = one_to_many[action]
       if kwargs:
         func = partial(func, **kwargs)
+      func = conv_one_to_many(func)
     elif action in one_to_one:
-      op_mode = 'PARALLEL'
+      fork_input = True
       func = one_to_one[action]
       if kwargs:
         func = partial(func, **kwargs)
       func = conv_one_to_one(func)
     elif action in one_test:
-      op_mode = 'PARALLEL'
+      fork_input = True
       func = one_test[action]
       if kwargs:
         func = partial(func, **kwargs)
       func = conv_one_test(func)
     else:
       raise ValueError("unknown action")
-    return RunTreeOp(func, op_mode, do_copystate, branch_func)
+    return RunTreeOp(func, fork_input, fork_state)
 
 if __name__ == '__main__':
   import sys

File lib/python/cs/later.py

       func = partial(func, *a, **kw)
     self.func = func
     self.state = STATE_PENDING
-    self.result = None
+    self._result = None
     self._lock = Lock()
     self.join_cond = Condition()
     self.notifiers = []
         If self.state is STATE_PENDING or STATE_CANCELLED, return True.
         Otherwise return False (too late to cancel).
     '''
+    notifiers = ()
     with self._lock:
       state = self.state
       if state == STATE_PENDING:
+        self.func = None
+        self._result = (None, None)
         self.state = STATE_CANCELLED
-        self.func = None
-        self.set_result( (None, None) )
+        notifiers = list(self.notifiers)
+      elif state == STATE_CANCELLED:
+        pass
       elif state == STATE_RUNNING or state == STATE_DONE:
         return False
+      else:
+        raise RuntimeError("<%s>.state not one of (STATE_PENDING, STATE_CANCELLED, STATE_RUNNING, STATE_DONE): %r"
+                           % (self, state))
+    for notifier in notifiers:
+      notifier(self)
+    with self.join_cond:
+      self.join_cond.notify_all()
     return True
 
+  @property
+  def result(self):
+    with self._lock:
+      result = self._result
+    if result is None:
+      raise ValueError("<%s>.result not available, state=%s" % (self, self.state))
+    if len(result) != 2:
+      raise RuntimeError("<%s>.result not a 2-tuple: %r" % (self, result))
+    return result
+
+  @result.setter
+  def result(self, new_result):
+    ''' Set the result unless the function is already cancelled.
+        Alert people to completion.
+    '''
+    if len(new_result) != 2:
+      raise ValueError("<%s>.result.setter: expected a 2-tuple but got: %r"
+                       % (self, new_result))
+    with self._lock:
+      state = self.state
+      if state == STATE_CANCELLED:
+        # silently discard result
+        pass
+      elif state == STATE_RUNNING:
+        if self._result is not None:
+          raise ValueError("<%s>.result.setter: tried to set .result to %r but .result is already: %r"
+                           % (self, new_result, self._result))
+        self._result = new_result
+        self.state = STATE_DONE
+        notifiers = list(self.notifiers)
+      else:
+        raise RuntimeError("<%s>.state is one of (STATE_CANCELLED, STATE_RUNNING): %r"
+                           % (self, state))
+    for notifier in notifiers:
+      notifier(self)
+    with self.join_cond:
+      self.join_cond.notify_all()
+
   def set_result(self, result):
     ''' set_result() is called by a worker thread to report completion of the
         function.
           None, exc_info
         where exc_info is (exc_type, exc_value, exc_traceback).
     '''
-    # collect the result and release the capacity
-    with self._lock:
-      if self.state != STATE_CANCELLED:
-        self.state = STATE_DONE
-        self.result = result
-      notifiers = list(self.notifiers)
-    for notifier in notifiers:
-      notifier(self)
-    with self.join_cond:
-      self.join_cond.notify_all()
+    self.result = result
 
   def wait(self):
     ''' Calling the .wait() method waits for the function to run to
         On completion the sequence:
           func_result, None
         is returned.
+        If the function was cancelled the sequence:
+          None, None
+        is returned.
         On an exception the sequence:
           None, exc_info
         is returned where exc_info is a tuple of (exc_type, exc_value, exc_traceback).
     result, exc_info = self.wait()
     if exc_info:
       exc_type, exc_value, exc_traceback = exc_info
-      raise exc_type(exc_value).with_traceback(exc_traceback)
+      raise exc_type, exc_value, exc_traceback
+      ##raise exc_type(exc_value).with_traceback(exc_traceback)
     return result
 
   def set_result(self, result):
     self._dispatchThread.start()
 
   def __repr__(self):
-    return '<%s "%s" running=%d pending=%d delayed=%d closed=%s>' \
+    return '<%s "%s" capacity=%s running=%d pending=%d delayed=%d closed=%s>' \
            % ( self.__class__, self.name,
+               self.capacity,
                len(self.running),
                len(self.pending),
                len(self.delayed),
              )
 
   def __str__(self):
-    return "<%s pending=%d running=%d delayed=%d>" \
-           % (self.name,
+    return "<%s[%s] pending=%d running=%d delayed=%d>" \
+           % (self.name, self.capacity,
               len(self.pending), len(self.running), len(self.delayed))
 
   def __enter__(self):
 	It can be useful for transient control functions that
 	themselves queue things through the Later queuing system
 	but do not want to consume capacity themselves, thus avoiding
-	deadlock at the cost of ransient overthreading.
+	deadlock at the cost of transient overthreading.
     '''
     if self.closed:
       raise RunTimError("%s.bg(...) after close()")
         If the parameter `pfx` is not None, submit pfx.func(func);
           see cs.logutils.Pfx's .func method for details.
     '''
+    ##D("%s.submit()...", self)
     if self.closed:
       raise RunTimError("%s.bg(...) after close()")
     if delay is not None and when is not None:

File lib/python/cs/later_tests.py

   def _delay(n):
     time.sleep(n)
     return n
-  class _Bang(BaseException):
+  class _Bang(Exception):
     pass
   @staticmethod
   def _bang():

File lib/python/cs/threads.py

       func = pfx.func(func)
     idle = self.idle
     with self._lock:
-      if idle:
+      debug("dispatch: idle = %s", idle)
+      if len(idle):
         # use an idle thread
         Hdesc = idle.pop()
+        debug("dispatch: reuse %s", Hdesc)
       else:
+        debug("dispatch: need new thread")
         # no available threads - make one
         args = []
         H = Thread(target=self._handler, args=args)
         result = func(), None
         debug("%s: worker thread: ran task: result = %s", self, result)
       except:
-        debug("%s: worker thread: ran task: exception!", self)
+        func = None     # release func+args
+        debug("%s: worker thread: ran task: exception! %r", self, sys.exc_info())
         # don't let exceptions go unhandled
         # if nobody is watching, raise the exception and don't return
         # this handler to the pool
         if retq is None and deliver is None:
           t, v, tb = sys.exc_info()
-          raise t(v).with_traceback(tb)
+          debug("%s: worker thread: reraise exception", self)
+          raise t, v, tb
+          ##raise t(v).with_traceback(tb)
+        debug("%s: worker thread: set result = (None, exc_info)", self)
         result = (None, sys.exc_info())
       finally:
         func = None     # release func+args
+      with self._lock:
+        self.idle.append( Hdesc )
+        ##D("_handler released thread: idle = %s", self.idle)
       if retq is not None:
+        debug("%s: worker thread: %r.put(result)...", self, retq)
         retq.put(result)
+        debug("%s: worker thread: %r.put(result) done", self, retq)
         retq = None
       if deliver is not None:
+        debug("%s: worker thread: deliver result...", self)
         deliver(result)
+        debug("%s: worker thread: delivery done", self)
         deliver = None
       result = None
-      with self._lock:
-        self.idle.append( Hdesc )
+      debug("%s: worker thread: proceed to next function...", self)
 
 class AdjustableSemaphore(object):
   ''' A semaphore whose value may be tuned after instantiation.
   '''
 
   def __init__(self, value=1, name="AdjustableSemaphore"):
+    self.limit0 = value
     self.__sem = Semaphore(value)
     self.__value = value
     self.__name = name
     self.__lock = Lock()
 
+  def __str__(self):
+    return "%s[%d]" % (self.__name, self.limit0)
+
   def __enter__(self):
     with LogTime("%s(%d).__enter__: acquire", self.__name, self.__value):
       self.acquire()
       return func(*a, **kw)
   return f
 
-_RunTreeOp = namedtuple('RunTreeOp', 'func mode copystate branch')
+_RunTreeOp = namedtuple('RunTreeOp', 'func fork_input fork_state')
 
-def RunTreeOp(func, mode, copy, branch):
+def RunTreeOp(func, fork_input, fork_state):
   ok = True
   if func is not None and not callable(func):
     error("RunTreeOp: bad func: %r", func)
     ok = False
-  if mode is not None and mode not in ('PARALLEL', 'FORK'):
-    error("RunTreeOp: bad mode: %r", mode)
-    ok = False
-  if branch is not None and not callable(branch):
-    error("RunTreeOp: bad branch: %r", branch)
-    ok = False
   if not ok:
     raise ValueError("invalid RunTreeOp() call")
-  return _RunTreeOp(func, mode, copy, branch)
+  return _RunTreeOp(func, fork_input, fork_state)
 
 def runTree(input, operators, state, funcQ):
   ''' Descend an operation tree expressed as:
         `input`: an input object
         `operators`: an iterable of RunTreeOp instances
 	  NB: if an item of the iterator is callable, presume it
-              to be a bare function and convert it to RunTreeOp(op, False,
-              False, None).
-          Each operator `op` has the following attributes:
-            op.func     A callable function accepting the input and state
-                        objects, and returning an output result to be passed
-                        as the input object to subsequent operators.  op.func
-                        may be None, in which case the function will not be
-                        called; this may be typical for op.branch operators.
-            op.mode     The function mode.
-                          If None, compute:
-                            output = op.func(input, state)
-                          If 'PARALLEL', iterate over the input object
-                            and for each item call:
-                              op.func(item, state)
-                            Each return value should be iterable, and the iterables
-                            are concatenated together to produce the output object.
-                          If 'FORK', iterate over the input object and for each item
-                            dispatch an asynchronous instance of
-                            runTree() whose `operators` iterable
-                            consists of a new RunTreeOp of the current
-                            function with op.mode=None and all the
-                            remaining operators. As with 'PARALLEL',
-                            each return value should be iterable, and
-                            the iterables are concatenated together to
-                            produce the output object. The remaining
-                            operators are discarded because they have
-                            been performed by the forks.
-            op.copystate Copy the state object for this and subsequent operations
-                         instead of using the original.
-            op.branch   If op.branch is not None it should be a callable
-                        returning an iterable of RunTreeOps. An asynchronous
-                        runTree will be dispatched to process these operators
-                        with the current item list.
-        `state`: a state object for use by op.func
-        `funcQ`: a cs.later.Later function queue to dispatch functions
+              to be a bare function and convert it to
+                RunTreeOp(func, False, False).
+        `state`: a state object to assist `func`.
+        `funcQ`: a cs.later.Later function queue to dispatch functions,
+                 or equivalent
       Returns the final output.
       This is the core algorithm underneath the cs.app.pilfer operation.
+
+      Each operator `op` has the following attributes:
+            op.func     A  many-to-many function taking a iterable of input
+			items and returning an iterable of output
+			items with the signature:
+                          func(inputs, state)
+            op.fork_input
+			Submit distinct calls to func( (item,), ...) for each
+                        item in input instead of passing input to a single call.
+                        `func` is still a many-to-many in signature but is
+                        handed 1-tuples of the input items to allow parallel
+                        execution.
+            op.fork_state
+                        Make a copy of the state object for this and
+                        subsequent operations instead of using the original.
   '''
-  from cs.later import report
-  input0 = input
-  operators = list(operators)
-  bg = []
-  while operators:
-    op = operators.pop(0)
-    if callable(op):
-      op = RunTreeOp(func, None, False, None)
-    if op.branch:
-      # dispatch another runTree to follow the branch with the current item list
-      bg.append( funcQ.bg(runTree, input, op.branch(), state, funcQ) )
-    if op.func:
-      if op.mode is None:
-        substate = copy(state) if op.copystate else state
-        qop = funcQ.defer(op.func, input, substate)
-        output, exc_info = qop.wait()
-        if exc_info:
-          exc_type, exc_value, exc_traceback = exc_info
-          try:
-            raise exc_type, exc_value, exc_traceback
-          except:
-            exception("runTree()")
-      elif op.mode == 'PARALLEL':
-        qops = []
-        for item in input:
-          substate = copy(state) if op.copystate else state
-          qops.append(funcQ.defer(op.func, item, substate))
-        outputs = []
-        for qop in qops:
-          output, exc_info = qop.wait()
-          if exc_info:
-            exc_type, exc_value, exc_traceback = exc_info
-            try:
-              raise exc_type, exc_value, exc_traceback
-            except:
-              exception("runTree()")
-          else:
-            outputs.extend(output)
-        output = outputs
-      elif op.mode == 'FORK':
-        # push the function back on without a fork
-        # then queue a call per current item
-        # using a copy of the state
-        suboperators = tuple([ RunTreeOp(op.func, None, False, op.branch) ] + operators)
-        qops = []
-        for item in input:
-          substate = copy(state) if op.copystate else state
-          qops.append(funcQ.bg(runTree, item, suboperators, substate, funcQ))
-        outputs = []
-        for qop in qops:
-          output, exc_info = qop.wait()
-          if exc_info:
-            exc_type, exc_value, exc_traceback = exc_info
-            try:
-              raise exc_type, exc_value, exc_traceback
-            except:
-              exception("runTree()")
-          else:
-            outputs.extend(output)
-        output = outputs
-        operators = []
-      else:
-        raise ValueError("invalid RuntreeOp.mode: %r" % (op.mode,))
-      input = output
+  return runTree_inner(input, iter(operators), state, funcQ).get()
 
-  # wait for any asynchronous runs to complete
-  # also report exceptions raised
-  for bgf in bg:
-    bg_output, exc_info = bgf.wait()
-    if exc_info:
-      exc_type, exc_value, exc_traceback = exc_info
-      try:
-        raise exc_type, exc_value, exc_traceback
-      except:
-        exception("runTree()")
+def runTree_inner(input, ops, state, funcQ, retq=None):
+  ''' Submit function calls to evaluate `func` as specified.
+      Return a LateFunction to collect the final result.
+      `func` is a many-to-many function.
+  '''
+  debug("runTree_inner(input=%s, ops=%s, state=%s, funcQ=%s, retq=%s)...",
+    input, ops, state, funcQ, retq)
+  if retq is None:
+    retq = Channel()
 
-  return input
+  try:
+    op = ops.next()
+  except StopIteration:
+    funcQ.defer(lambda: retq.put(input))
+    return retq
+
+  func, fork_input, fork_state = op.func, op.fork_input, op.fork_state
+  if fork_input:
+    # iterable of single item iterables
+    inputs = ( (item,) for item in input )
+  else:
+    # 1-iterable of all-items iterables
+    inputs = (input,)
+
+  # submit function calls that put results onto a Queue
+  Q = Queue()
+  def func_put(Q, input, state):
+    ''' Call `func` with `input` and `state`.
+        Put results onto the Queue for collection.
+    '''
+    debug("func_put: call func...")
+    results = func(input, state)
+    debug("func_put: called func, got: %s", results)
+    Q.put(list(results))
+  nfuncs = 0
+  for input in inputs:
+    substate = copy(state) if fork_state else state
+    funcQ.defer(func_put, Q, input, substate)
+    nfuncs += 1
+
+  # now submit a function to collect the results
+  # if there are no more ops, put the outputs onto the retq
+  def func_get(Q, nfuncs):
+    debug("func_get(Q, nfuncs=%d)...", nfuncs)
+    outputs = []
+    while nfuncs > 0:
+      debug("func_get: %d results to collect, Q.get()...", nfuncs)
+      results = Q.get()
+      debug("func_get: Q.get() got: %s", results)
+      outputs.extend(results)
+      nfuncs -= 1
+    Q = None
+    # resubmit with new state etc
+    debug("func_get: queue another call to runTree_inner")
+    funcQ.defer(runTree_inner, outputs, ops, state, funcQ, retq)
+  funcQ.defer(func_get, Q, nfuncs)
+  Q = None
+
+  return retq
 
 if __name__ == '__main__':
   import cs.threads_tests

File lib/python/cs/urlutils.py

       rq = Request(url, None, hdrs)
       auth_handler = HTTPBasicAuthHandler(NetrcHTTPPasswordMgr())
       opener = build_opener(auth_handler)
+      debug("open URL...")
       rsp = opener.open(rq)
       H = rsp.info()
       self._content_type = H.gettype()
       self._content = rsp.read()
+      debug("URL: content-type=%s, length=%d", self._content_type, len(self._content))
       self._parsed = None
 
   def get_content(self, onerror=None):