Source

pygame / lib / threads / __init__.py

"""
* Experimental *

Like the map function, but can use a pool of threads.

Really easy to use threads.  eg.  tmap(f, alist)

If you know how to use the map function, you can use threads.
"""

__author__ = "Rene Dudfield"
__version__ = "0.3.0"
__license__ = 'Python license'

import traceback, sys

if (sys.version_info[0] == 2 and sys.version_info[1] < 5):
    from Py25Queue import Queue
    from Py25Queue import Empty
else:
    # use up to date version
    from Queue import Queue
    from Queue import Empty
    
import threading
Thread = threading.Thread

STOP = object()
FINISH = object()

# DONE_ONE = object()
# DONE_TWO = object()

# a default worker queue.
_wq = None

# if we are using threads or not.  This is the number of workers.
_use_workers = 0

# Set this to the maximum for the amount of Cores/CPUs
#    Note, that the tests early out.  
#    So it should only test the best number of workers +2
MAX_WORKERS_TO_TEST = 64



def init(number_of_workers = 0):
    """ Does a little test to see if threading is worth it.
          Sets up a global worker queue if it's worth it.

        Calling init() is not required, but is generally better to do.
    """
    global _wq, _use_workers

    if number_of_workers:
        _use_workers = number_of_workers
    else:
        _use_workers = benchmark_workers()

    # if it is best to use zero workers, then use that.
    _wq = WorkerQueue(_use_workers)




def quit():
    """ cleans up everything.
    """
    global _wq, _use_workers
    _wq.stop()
    _wq = None
    _use_workers = False


def benchmark_workers(a_bench_func = None, the_data = None):
    """ does a little test to see if workers are at all faster.
        Returns the number of workers which works best.
        Takes a little bit of time to run, so you should only really call
          it once.
        You can pass in benchmark data, and functions if you want.
        a_bench_func - f(data)
        the_data - data to work on.
    """
    global _use_workers

    #TODO: try and make this scale better with slower/faster cpus.
    #  first find some variables so that using 0 workers takes about 1.0 seconds.
    #  then go from there.


    # note, this will only work with pygame 1.8rc3+
    # replace the doit() and the_data with something that releases the GIL


    import pygame
    import pygame.transform
    import time

    if not a_bench_func:
        def doit(x):
            return pygame.transform.scale(x, (544, 576))
    else:
        doit = a_bench_func

    if not the_data:
        thedata = []
        for x in range(10):
            thedata.append(pygame.Surface((155,155), 0, 32))
    else:
        thedata = the_data

    best = time.time() + 100000000
    best_number = 0
    last_best = -1

    for num_workers in range(0, MAX_WORKERS_TO_TEST):

        wq = WorkerQueue(num_workers)
        t1 = time.time()
        for xx in range(20):
	    print "active count:%s" % threading.activeCount()
            results = tmap(doit, thedata, worker_queue = wq)
        t2 = time.time()

        wq.stop()


        total_time = t2 - t1
        print "total time num_workers:%s: time:%s:" % (num_workers, total_time)

        if total_time < best:
            last_best = best_number
            best_number =num_workers 
            best = total_time

        if num_workers - best_number > 1:
            # We tried to add more, but it didn't like it.
            #   so we stop with testing at this number.
            break


    return best_number




class WorkerQueue(object):

    def __init__(self, num_workers = 20):
        self.queue = Queue()
        self.pool = []
        self._setup_workers(num_workers)

    def _setup_workers(self, num_workers):
        """ Sets up the worker threads
              NOTE: undefined behaviour if you call this again.
        """
        self.pool = []

        for _ in range(num_workers):
            self.pool.append(Thread(target=self.threadloop))

        for a_thread in self.pool:
            a_thread.setDaemon(True)
            a_thread.start()


    def do(self, f, *args, **kwArgs):
        """ puts a function on a queue for running later.
        """
        self.queue.put((f, args, kwArgs))


    def stop(self):
        """ Stops the WorkerQueue, waits for all of the threads to finish up.
        """
        self.queue.put(STOP)
        for thread in self.pool:
            thread.join()


    def threadloop(self): #, finish = False):
        """ Loops until all of the tasks are finished.
        """
        while True:
            args = self.queue.get()
            if args is STOP:
                self.queue.put(STOP)
                self.queue.task_done()
                break
            else:
                try:
                    args[0](*args[1], **args[2])
                finally:
                    # clean up the queue, raise the exception.
                    self.queue.task_done()
                    #raise


    def wait(self):
        """ waits until all tasks are complete.
        """
        self.queue.join()

class FuncResult:
    """ Used for wrapping up a function call so that the results are stored
         inside the instances result attribute.
    """
    def __init__(self, f, callback = None, errback = None):
        """ f - is the function we that we call 
            callback(result) - this is called when the function(f) returns
            errback(exception) - this is called when the function(f) raises
                                   an exception.
        """
        self.f = f
        self.exception = None
        self.callback = callback
        self.errback = errback

    def __call__(self, *args, **kwargs):

        #we try to call the function here.  If it fails we store the exception.
        try:
            self.result = self.f(*args, **kwargs)
            if self.callback:
                self.callback(self.result)
        except Exception, e:
            self.exception = e
            if self.errback:
                self.errback(self.exception)


def tmap(f, seq_args, num_workers = 20, worker_queue = None, wait = True, stop_on_error = True):
    """ like map, but uses a thread pool to execute.
        num_workers - the number of worker threads that will be used.  If pool
                        is passed in, then the num_workers arg is ignored.
        worker_queue - you can optionally pass in an existing WorkerQueue.
        wait - True means that the results are returned when everything is finished.
               False means that we return the [worker_queue, results] right away instead. 
               results, is returned as a list of FuncResult instances.
        stop_on_error - 
    """

    if worker_queue:
        wq = worker_queue
    else:
        # see if we have a global queue to work with.
        if _wq:
            wq = _wq
        else:
            if num_workers == 0:
                return map(f, seq_args)

            wq = WorkerQueue(num_workers)

    # we short cut it here if the number of workers is 0.
    # normal map should be faster in this case.
    if len(wq.pool) == 0:
        return map(f, seq_args)

    #print "queue size:%s" % wq.queue.qsize()


    #TODO: divide the data (seq_args) into even chunks and 
    #       then pass each thread a map(f, equal_part(seq_args))
    #      That way there should be less locking, and overhead.



    results = []
    for sa in seq_args:
        results.append(FuncResult(f))
        wq.do(results[-1], sa)


    #wq.stop()

    if wait:
        #print "wait"
        wq.wait()
        #print "after wait"
        #print "queue size:%s" % wq.queue.qsize()
        if wq.queue.qsize():
            raise Exception("buggy threadmap")
        # if we created a worker queue, we need to stop it.
        if not worker_queue and not _wq:
            #print "stoping"
            wq.stop()
            if wq.queue.qsize():
                um = wq.queue.get()
                if not um is STOP:
                    raise Exception("buggy threadmap")
        
        
        # see if there were any errors.  If so raise the first one.  This matches map behaviour.
        # TODO: the traceback doesn't show up nicely.
        # NOTE: TODO: we might want to return the results anyway?  This should be an option.
        if stop_on_error:
            error_ones = filter(lambda x:x.exception, results)
            if error_ones:
                raise error_ones[0].exception
        
        return map(lambda x:x.result, results)
    else:
        return [wq, results]
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.