pygame / lib / threads /

Full commit
* Experimental *

Like the map function, but can use a pool of threads.

Really easy to use threads.  eg.  tmap(f, alist)

If you know how to use the map function, you can use threads.

__author__ = "Rene Dudfield"
__version__ = "0.3.0"
__license__ = 'Python license'

import traceback, sys

if (sys.version_info[0] == 2 and sys.version_info[1] < 5):
    from Py25Queue import Queue
    from Py25Queue import Empty
    # use up to date version
    from Queue import Queue
    from Queue import Empty
import threading
Thread = threading.Thread

STOP = object()
FINISH = object()

# DONE_ONE = object()
# DONE_TWO = object()

# a default worker queue.
_wq = None

# if we are using threads or not.  This is the number of workers.
_use_workers = 0

# Set this to the maximum for the amount of Cores/CPUs
#    Note, that the tests early out.  
#    So it should only test the best number of workers +2

def init(number_of_workers = 0):
    """ Does a little test to see if threading is worth it.
          Sets up a global worker queue if it's worth it.

        Calling init() is not required, but is generally better to do.
    global _wq, _use_workers

    if number_of_workers:
        _use_workers = number_of_workers
        _use_workers = benchmark_workers()

    # if it is best to use zero workers, then use that.
    _wq = WorkerQueue(_use_workers)

def quit():
    """ cleans up everything.
    global _wq, _use_workers
    _wq = None
    _use_workers = False

def benchmark_workers(a_bench_func = None, the_data = None):
    """ does a little test to see if workers are at all faster.
        Returns the number of workers which works best.
        Takes a little bit of time to run, so you should only really call
          it once.
        You can pass in benchmark data, and functions if you want.
        a_bench_func - f(data)
        the_data - data to work on.
    global _use_workers

    #TODO: try and make this scale better with slower/faster cpus.
    #  first find some variables so that using 0 workers takes about 1.0 seconds.
    #  then go from there.

    # note, this will only work with pygame 1.8rc3+
    # replace the doit() and the_data with something that releases the GIL

    import pygame
    import pygame.transform
    import time

    if not a_bench_func:
        def doit(x):
            return pygame.transform.scale(x, (544, 576))
        doit = a_bench_func

    if not the_data:
        thedata = []
        for x in range(10):
            thedata.append(pygame.Surface((155,155), 0, 32))
        thedata = the_data

    best = time.time() + 100000000
    best_number = 0
    last_best = -1

    for num_workers in range(0, MAX_WORKERS_TO_TEST):

        wq = WorkerQueue(num_workers)
        t1 = time.time()
        for xx in range(20):
	    print "active count:%s" % threading.activeCount()
            results = tmap(doit, thedata, worker_queue = wq)
        t2 = time.time()


        total_time = t2 - t1
        print "total time num_workers:%s: time:%s:" % (num_workers, total_time)

        if total_time < best:
            last_best = best_number
            best_number =num_workers 
            best = total_time

        if num_workers - best_number > 1:
            # We tried to add more, but it didn't like it.
            #   so we stop with testing at this number.

    return best_number

class WorkerQueue(object):

    def __init__(self, num_workers = 20):
        self.queue = Queue()
        self.pool = []

    def _setup_workers(self, num_workers):
        """ Sets up the worker threads
              NOTE: undefined behaviour if you call this again.
        self.pool = []

        for _ in range(num_workers):

        for a_thread in self.pool:

    def do(self, f, *args, **kwArgs):
        """ puts a function on a queue for running later.
        self.queue.put((f, args, kwArgs))

    def stop(self):
        """ Stops the WorkerQueue, waits for all of the threads to finish up.
        for thread in self.pool:

    def threadloop(self): #, finish = False):
        """ Loops until all of the tasks are finished.
        while True:
            args = self.queue.get()
            if args is STOP:
                    args[0](*args[1], **args[2])
                    # clean up the queue, raise the exception.

    def wait(self):
        """ waits until all tasks are complete.

class FuncResult:
    """ Used for wrapping up a function call so that the results are stored
         inside the instances result attribute.
    def __init__(self, f, callback = None, errback = None):
        """ f - is the function we that we call 
            callback(result) - this is called when the function(f) returns
            errback(exception) - this is called when the function(f) raises
                                   an exception.
        self.f = f
        self.exception = None
        self.callback = callback
        self.errback = errback

    def __call__(self, *args, **kwargs):

        #we try to call the function here.  If it fails we store the exception.
            self.result = self.f(*args, **kwargs)
            if self.callback:
        except Exception, e:
            self.exception = e
            if self.errback:

def tmap(f, seq_args, num_workers = 20, worker_queue = None, wait = True, stop_on_error = True):
    """ like map, but uses a thread pool to execute.
        num_workers - the number of worker threads that will be used.  If pool
                        is passed in, then the num_workers arg is ignored.
        worker_queue - you can optionally pass in an existing WorkerQueue.
        wait - True means that the results are returned when everything is finished.
               False means that we return the [worker_queue, results] right away instead. 
               results, is returned as a list of FuncResult instances.
        stop_on_error - 

    if worker_queue:
        wq = worker_queue
        # see if we have a global queue to work with.
        if _wq:
            wq = _wq
            if num_workers == 0:
                return map(f, seq_args)

            wq = WorkerQueue(num_workers)

    # we short cut it here if the number of workers is 0.
    # normal map should be faster in this case.
    if len(wq.pool) == 0:
        return map(f, seq_args)

    #print "queue size:%s" % wq.queue.qsize()

    #TODO: divide the data (seq_args) into even chunks and 
    #       then pass each thread a map(f, equal_part(seq_args))
    #      That way there should be less locking, and overhead.

    results = []
    for sa in seq_args:
        results.append(FuncResult(f))[-1], sa)


    if wait:
        #print "wait"
        #print "after wait"
        #print "queue size:%s" % wq.queue.qsize()
        if wq.queue.qsize():
            raise Exception("buggy threadmap")
        # if we created a worker queue, we need to stop it.
        if not worker_queue and not _wq:
            #print "stoping"
            if wq.queue.qsize():
                um = wq.queue.get()
                if not um is STOP:
                    raise Exception("buggy threadmap")
        # see if there were any errors.  If so raise the first one.  This matches map behaviour.
        # TODO: the traceback doesn't show up nicely.
        # NOTE: TODO: we might want to return the results anyway?  This should be an option.
        if stop_on_error:
            error_ones = filter(lambda x:x.exception, results)
            if error_ones:
                raise error_ones[0].exception
        return map(lambda x:x.result, results)
        return [wq, results]