Commits

Denis Bilenko committed 51785e4

gevent has a built-in thread pool since 1.0b1: https://bitbucket.org/denis/gevent/src/tip/gevent/threadpool.py#cl-1

You can get gevent 1.0b1 from http://code.google.com/p/gevent/downloads/list

Comments (0)

Files changed (5)

+Since 1.0b1, gevent has a thread pool built-in:
+
+https://bitbucket.org/denis/gevent/src/tip/gevent/threadpool.py#cl-1
+
+Download gevent 1.0 from http://code.google.com/p/gevent/downloads/list

examples/database_example.py

-#!/usr/bin/env python
-import psycopg2
-from geventutil import threadpool
-import threading
-
-def my_db_setup(*args):
-    curthread = threading.current_thread()
-    curthread.db = psycopg2.connect(*args)
-
-def runSqlFetchAll(sql, *sqlargs):
-    curthread = threading.current_thread()
-    cursor = curthread.db.cursor()
-    cursor.execute(sql, *sqlargs)
-    return cursor.fetchall()
-
-p = threadpool.ThreadPool(5, initializer=my_db_setup, initargs=("dbname=mydb user=myuser",))
-a = p.apply(runSqlFetchAll, "select * from users where id>%(id)s", {"id": 1800})
-print a

examples/multiping_example.py

-#!/usr/bin/env python
-from geventutil import threadpool
-import subprocess
-import sys
-from copy import copy
-
-def run_proc(cmd, shell=False, stdin=None):
-    p = subprocess.Popen(cmd, shell=shell, stdin=stdin, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
-    sout, serr = p.communicate(stdin)
-    return p.returncode, sout
-
-pool = threadpool.ThreadPool(5)
-cmdList = []
-for host in sys.argv[1:]:
-    # This is solaris ping - which doesn't need arguments :)
-    cmdList.append((["/usr/sbin/ping", host],))
-
-async_lst = pool.map_async(run_proc, cmdList)
-
-while async_lst != []:
-    threadpool.waitany(async_lst)
-    for async in copy(async_lst):
-        if async.ready():
-            exitcode, output = async.get()
-            print output,
-            async_lst.remove(async)
-
-sys.stdout.flush()
-

examples/process_example.py

-#!/usr/bin/env python
-
-from geventutil import threadpool
-import commands
-
-p = threadpool.ThreadPool(5)
-exitcode, output = p.apply(commands.getstatusoutput, "finger")
-print "Exitcode:", exitcode
-print "Output:"
-print output
-

geventutil/threadpool.py

-import os
-import copy
-import fcntl
-import Queue
-import gevent
-import gevent.event
-import gevent.monkey
-import threading
-gevent.monkey.patch_thread()
+# Since 1.0b1, gevent has a thread pool built-in:
 
-# Simple wrapper to os.pipe() - but sets to non-block
-def _pipe():
-    r, w = os.pipe()
-    fcntl.fcntl(r, fcntl.F_SETFL, os.O_NONBLOCK)
-    fcntl.fcntl(w, fcntl.F_SETFL, os.O_NONBLOCK)
-    return r, w
+# https://bitbucket.org/denis/gevent/src/tip/gevent/threadpool.py#cl-1
 
-_core_pipe_read, _core_pipe_write = _pipe()
-
-def _core_pipe_read_callback(event, evtype):
-    try:
-        os.read(event.fd, 1)
-    except EnvironmentError:
-        pass
-
-gevent.core.event(gevent.core.EV_READ|gevent.core.EV_PERSIST, \
-    _core_pipe_read, _core_pipe_read_callback).add()
-
-class Cancelled(Exception): pass
-
-# A multi-threaded working version of AsyncResult
-# Main difference is we need to write down a pipe
-# when set/set_exception is called - and in the
-# gevent loop, a greenlet is run to read from the
-# pipe, and set a "local" event to wake up any
-# get/wait calls.
-class MTAsyncResult(gevent.event.AsyncResult):
-    """A one-time event that stores a value or an exception from a job run in a threadpool.
-
-    Identical to the end user as :class:`event.AsyncResult` - but has a few extra attributes.
-    _func stores a pointer to the function that has been run.
-    _args and _kwargs stores the arguments that were passed to _func.
-    """
-
-    def __init__(self, func, *args, **kwargs):
-        self._func = func
-        self._args = args
-        self._kwargs = kwargs
-        self.started = False
-        gevent.event.AsyncResult.__init__(self)
-
-    def get(self, *args, **kwargs):
-        """Return the stored value or raise the exception. 
-
-        If the job was cancelled before it was executed, it will raise :class:`Cancelled`
-        """
-        return gevent.event.AsyncResult.get(self, *args, **kwargs)
-
-    def cancel(self):
-        self.set_exception(Cancelled())
-        os.write(_core_pipe_write, '\0')
-
-    def set_exception(self, exception):
-        gevent.event.AsyncResult.set_exception(self, exception)
-        os.write(_core_pipe_write, '\0')
-
-    def set(self, value=None):
-        gevent.event.AsyncResult.set(self, value)
-        os.write(_core_pipe_write, '\0')
-
-def waitany(event_list, timeout=None):
-    """Takes a list of events. (Event, AsyncEvent, or MTAsyncEvent).
-    Returns any event that ready() returns True. Will return None if
-    timeout is hit.
-    """
-    for event in event_list:
-        if event.ready():
-            return event
-
-    queue = gevent.queue.Queue()
-
-    try:
-        for event in event_list:
-            event.rawlink(queue.put)
-        event = queue.get(timeout=timeout)
-
-    except gevent.queue.Empty:
-        return None
-
-    finally:
-        for event in event_list:
-            event.unlink(queue.put)
-
-    for event in event_list:
-        if event.ready():
-            return event
-
-    return None
-
-
-
-class CancelQueue(Queue.Queue):
-    """ A standard Threading Queue.Queue() class, but with
-    2 extra methods. This is not intended for end-user use, but
-    used for the ThreadPool.
-    """
-    def remove(self, item):
-        """Takes a single item, and if it is on the queue, remove it.
-        Returns True if successful, otherwise returns False.
-        """
-        self.mutex.acquire()
-        try:
-            return self._remove(item)
-        finally:
-            self.mutex.release()
-
-    # DOES NOT DO LOCKING - DO NOT USE DIRECTLY
-    def _remove(self, item):
-        if item in self.queue:
-            self.queue.remove(item)
-            return True
-        return False
-
-    def remove_many(self, item_list):
-        """Takes a list of items to remove from the queue, and returns
-        a list of the succesfully removed items.
-        """
-        removed = []
-        self.mutex.acquire()
-        try:
-            for item in item_list:
-                if self._remove(item):
-                    removed.append(item)
-            return removed
-        finally:
-            self.mutex.release()
-
-    def priority_put(self, item):
-        """Puts an item to the front of the queue. Does not obey 
-        maximum queue size."""
-        self.mutex.acquire()
-        self.queue.appendleft(item)
-        self.not_empty.notify()
-        self.mutex.release()
-
-
-# Simple thread that takes a MTAsyncResult from the queue
-# and executes the requested function, and puts the result
-# back in the MTAsyncResult object.
-class ThreadJob(threading.Thread):
-    def __init__ (self, queue, initializer=None, initargs=None):
-        threading.Thread.__init__(self)
-        self.queue = queue
-        self.initializer = initializer
-        self.initargs = initargs
-
-    def run(self):
-        if self.initializer:
-            self.initializer(*self.initargs)
-        while True:
-            thread_async_res = self.queue.get()
-            if thread_async_res is None:
-                break
-
-            try:
-                func = thread_async_res._func
-                args = thread_async_res._args
-                kwargs = thread_async_res._kwargs
-                thread_async_res.started = True
-                res = func(*args, **kwargs)
-                thread_async_res.set(res)
-            except Exception, e:
-                thread_async_res.set_exception(e)
-
-class ThreadPool(object):
-    """A threadpool for running jobs that have to block, and are not gevent friendly.
-    """
-    def __init__(self, poolsize=5, initializer=None, initargs=None, \
-                 daemon_threads=True):
-        """Takes 4 keyword arguments.
-        poolsize 
-        The number of threads to create for the threadpool.
-
-        initializer
-        A function to execute in every thread before the thread takes jobs.
-
-        initargs
-        An arguments tuple to be passed to the initializer function.
-
-        daemon_threads
-        Sets if the threads should be daemon threads or not.
-        """
-        self.poolsize = poolsize
-        self.thread_list = []
-        self.queue = CancelQueue()
-        self.initializer = initializer
-        self.initargs = initargs
-        self.daemon_threads = daemon_threads
-        self.running = True
-
-        self._spawn_threads(self.poolsize)
-
-    def _spawn_threads(self, num):
-        for x in range(0, num):
-            t = ThreadJob(self.queue, self.initializer, self.initargs)
-            t.setDaemon(self.daemon_threads)
-            t.start()
-            self.thread_list.append(t)
-
-    def _check_threads(self):
-        for x in copy.copy(self.thread_list):
-            if not x.is_alive():
-                self.thread_list.remove(x)
-        self.poolsize = len(self.thread_list)
-
-    def resize(self, newsize):
-        """ Convert the threadpool to newsize."""
-        if newsize > self.poolsize:
-            self._spawn_threads(newsize - self.poolsize)
-        elif newsize < self.poolsize:
-            for x in range(0, self.poolsize-newsize):
-                self.queue.priority_put(None)
-        self.poolsize = newsize
-        self._check_threads()
-
-    def apply_async(self, func, *args, **kwargs):
-        """Run func, with the given args and kwargs in a thread from the pool.
-        Returns :class:`MTAsyncResult`
-        """
-        assert self.running, "Pool has been closed"
-        async_res = MTAsyncResult(func, *args, **kwargs)
-        self.queue.put(async_res)
-        return async_res
-
-    def apply(self, func, *args, **kwargs):
-        """Run func, with the given args and kwargs in a thread from the pool.
-        Blocks the current greenlet until the result has been completed. Returns
-        the result from the function.
-        """
-        result = self.apply_async(func, *args, **kwargs)
-        return result.get()
-
-    def close(self):
-        """Shut down the threadpool. This will only happen once the queue has been
-        drained of jobs. Does not block, and returning from this function does not
-        mean the threadpool has yet finished.
-        """
-        # TODO a way to empty the queue entirely for a "rapid" shutdown.
-        for x in range(0, self.poolsize):
-            self.queue.put(None)
-        self.running = False
-        # TODO Add a way of waiting for all threads to have exited
-
-    # TODO - add a timeout?
-    def map(self, func, args_list, abort=False):
-        """Takes a function, and a iterable of argument tuples.
-        Runs the function against every element in the iterable, and returns
-        a list of :class:`MTAsyncResult` objects.
-        Function will only return once all the jobs have been completed.
-        """
-        async_jobs = self.map_async(func, args_list)
-        if abort:
-            running_jobs = copy.copy(async_jobs)
-            aborted = False
-            while running_jobs != []:
-                g = waitany(running_jobs)
-                running_jobs.remove(g)
-                if not g.successful() and not aborted:
-                    self.queue_remove_many(async_jobs)
-                    aborted = True
-            return async_jobs
-        else:
-            gevent.event.waitall(async_jobs)
-            return async_jobs
-
-    def map_async(self, func, args_list):
-        """Takes a function, and a iterable of argument tuples.
-        Runs the function against every element in the iterable, and returns
-        a list of :class:`MTAsyncResult` objects.
-        """
-        async_jobs = []
-        for x in args_list:
-            async_jobs.append(self.apply_async(func, *x))
-        return async_jobs
-
-    def queue_remove(self, async_res):
-        """ Takes a :class:`MTAsyncResult` object, and if the job has not yet
-        been started, cancels the job, and removes it form the queue.
-        """
-        if self.queue.remove(async_res):
-            async_res.cancel()
-            return True
-        return False
-
-    def queue_remove_many(self, async_res_list):
-        """ Takes a list of :class:`MTAsyncResult` objects, and iterate over
-        each one, and if the job has not yet been started, cancels the job, and 
-        removes it form the queue.
-        """
-        removed = self.queue.remove_many(async_res_list)
-        for x in removed:
-            x.cancel()
-        return removed
-
+# Download gevent 1.0 from http://code.google.com/p/gevent/downloads/list