Commits

Anonymous committed dcf81e5

Executor map() and map_async() methods take optional chunksize parameter to limit concurrenty. This lets caller avoid using too many threads when processing many items.

Comments (0)

Files changed (6)

asyncthreads/executor.py

 __maintainer__ = "Andrew Gillis"
 __email__ = "gillis.andrewj@gmail.com"
 
+try:
+    import queue
+except ImportError:
+    import Queue as queue
 from asyncthreads import completion
 
 
             completion._Task(future, function, args, callback, result_q))
         return future
 
-    def map(self, func, iterable, callback=None):
+    def map(self, func, iterable, callback=None, chunksize=None):
         """Parallel equivalent to map() built-in function.
 
-        Runs the given function and callback, for every item of iterable, in as
-        many separate threads as are available.
+        Runs the given function and callback, for every item of iterable, in a
+        separate thread.  If chunksize is specified, then do not use more than
+        that number of concurrent threads.  Otherwise, use up to the number of
+        threads allowed by pool.  Providing a chunksize is useful to avoid
+        using too many threads from the pool.
 
         Blocks until all results are ready.  Results are returned in the same
         order that their corresponding requests were submitted.
 
         Arguments:
-        func     -- Function to execute.
-        iterable -- Sequence of args, each item passed in separate call.
-        callback -- Optional callback that takes return from func as
+        func      -- Function to execute.
+        iterable  -- Sequence of args, each item passed in separate call.
+        callback  -- Optional callback that takes return from func as.
+        chunksize -- Optional number of calls to process concurrently.  If None
+                     then use as many threads as pool allows.
 
         Return:
         List of results from each call to func.
 
         """
-        futures = [self.submit(func, i, callback) for i in iterable]
+        if chunksize:
+            futures = []
+            rq = queue.Queue()
+            for i in iterable:
+                futures.append(self.submit(func, i, callback, rq))
+                if len(futures) < chunksize:
+                    # Keep submitting until chunksize submitted.
+                    continue
+                # Wait for any call to complete before submitting next.
+                rq.get()
+        else:
+            futures = [self.submit(func, i, callback) for i in iterable]
+
         return [f.result() for f in futures]
 
-    def map_async(self, func, iterable, callback=None):
+    def map_async(self, func, iterable, callback=None, chunksize=None):
         """Variant of map() which returns a Completion object immediately.
 
         This method consumes an additional thread to collect the result from
         each call and return the array of results on a single Completion.
 
         Arguments:
-        func     -- Function to execute.
-        iterable -- Sequence of args, each item passed in separate call.
-        callback -- Optional callback that takes return from func as
+        See map()
 
         Return:
         Completion object to wait on and retrieve result.  Completion result

asyncthreads/reactor.py

              action_time, True))
         return future
 
-    def map_in_thread(self, func, iterable, callback=None):
+    def map_in_thread(self, func, iterable, callback=None, chunksize=None):
         """Parallel equivalent to map() built-in function.
 
-        Runs the given function and callback, for every item of iterable, in as
-        many separate threads as are available.
+        Calls the map() method of the reactor's thread pool.  Blocks until all
+        results are ready.
 
-        Blocks until all results are ready.
-
-        Arguments:
-        func     -- Function to execute.
-        iterable -- Sequence of args, each item passed in separate call.
-        callback -- Optional callback that takes return from func as
+        See Executor.map() for description.
 
         Return:
         List of results from each call to func.
         """
         return self._thread_pool.map(func, iterable, callback)
 
-    def map_in_thread_async(self, func, iterable, callback=None):
+    def map_in_thread_async(self, func, iterable, callback=None,
+                            chunksize=None):
         """Variant of map_in_thread() which returns a Completion object.
 
+        Calls the map_async() method of the reactor's thread pool.
+        See Executor.map() for description.
+
         This method consumes an additional thread to collect the result from
         each call and return the array of results on a single Completion.
 
-        Arguments:
-        func     -- Function to execute.
-        iterable -- Sequence of args, each item passed in separate call.
-        callback -- Optional callback that takes return from func as
-
         Return:
         Completion object to wait on and retrieve result.  Completion result
         is a list of results from each call to func.
 
         """
-        return self._thread_pool.map_async(func, iterable, callback)
+        return self._thread_pool.map_async(func, iterable, callback, chunksize)
 
     def cancel_scheduled(self, future):
         """Cancel a call scheduled to be called at a later time.
 def main():
     setup(
         name='asyncthreads',
-        version= '1.5.3',
+        version= '1.5.4',
         author='Andrew Gillis',
         author_email='gillis.andrewj@gmail.com',
         url='http://bitbucket.org/agillis/asyncthreads',

test/test_reactor.py

     import Queue as queue
 
 # Uncomment to import from repo instead of site-packages.
-#import os
-#import sys
-#parentdir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
-#sys.path.insert(0, parentdir)
+import os
+import sys
+parentdir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
+sys.path.insert(0, parentdir)
 
 from asyncthreads.reactor import Reactor
 from asyncthreads.threadpool import ThreadPool

test/test_threadpool.py

 from random import randrange
 
 # Uncomment to import from repo instead of site-packages.
-#import os
-#import sys
-#parentdir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
-#sys.path.insert(0, parentdir)
+import os
+import sys
+parentdir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
+sys.path.insert(0, parentdir)
 
 from asyncthreads import threadpool
 
 
         while True:
             self.pool.submit(range, (0,1))
-            time.sleep(0.25)
+            time.sleep(0.15)
             threads = self.pool.threads()
             idle = self.pool.idle_threads()
             print('\n---> queued items: %s' % self.pool.qsize())
             print("%s * %s = %s" % (x,y,r))
             assert r == x * y
 
+    def test_map_chunksize(self):
+        """Test that concurrency of map() is limited by chunksize."""
+        times = [3,3,2,2,1,1,3,1,2,2,1,3,1,2,3,3,2,1,3,1,2]
+        pool = threadpool.ThreadPool(100, 100)
+        chunksize = 5
+        pool.map(wait_task, times, None, chunksize)
+        print 'executed %s tasks with chunksize=%s, max pool size: %s' % (
+            len(times), chunksize, pool.threads())
+        # It is OK for there to be a few more threads than chunksize.  This
+        # can happen if a task has completed by its thread has not yet been
+        # marked idle before the next task arrives.
+        assert pool.threads() <= chunksize + 3
+
     def test_shutdown(self):
         # When all tasks are finished, allow the threads to terminate
         print('Shutting down thread pool')

test/test_threadrunner.py

 from random import randrange
 
 # Uncomment to import from repo instead of site-packages.
-#import os
-#import sys
-#parentdir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
-#sys.path.insert(0, parentdir)
+import os
+import sys
+parentdir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
+sys.path.insert(0, parentdir)
 
 from asyncthreads import threadrunner