Source

gevent-playground / tests / test_threadpool.py

Full commit
Denis Bilenko 2a7d4b3 




Denis Bilenko 61bb12c 
Denis Bilenko 2a7d4b3 














pajs 186ec1c 








Denis Bilenko 2a7d4b3 



























pajs 186ec1c 






















Denis Bilenko 2a7d4b3 
















































































































#!/usr/bin/env python
import sys
import os
import time
import greentest
from geventutil.threadpool import *

# Very simple call that will block the running thread
def blockSleep(x):
    time.sleep(x)
    return x

def myException(async_result):
    try:
        raise Exception("bob")
    except Exception, e:
        async_result.set_exception(e)

def my_exception():
    raise Exception("bob")

def sleepOrDie(x):
    if x == 5:
        raise DummyException("I died")
    time.sleep(x)
    return x

class DummyException(Exception): pass


# Test that setting the result in a different thread
# correctly sets the result, and wakes up, the current
# greenlet.
# Also tests a blocking wait() correctly works.
class TestMTAsyncResultiViaThread(greentest.TestCase):
    __timeout__ = 1
    def testwait(self):
        e = MTAsyncResult(None, None, None)
        t = threading.Thread(target=e.set, args=(5,))
        t.start()
        gevent.sleep(0)
        e.wait()
        self.assertEqual(e.value, 5)

    def test_exception(self):
        e = MTAsyncResult(None, None, None)
        t = threading.Thread(target=myException, args=(e, ))
        t.start()
        gevent.sleep(0)
        e.wait()
        self.assertTrue(not e.successful())

class ThreadPoolTestFunctions(greentest.TestCase):
    __timeout__ = 20
    def setUp(self):
        greentest.TestCase.setUp(self)
        self.pool = ThreadPool(2)

    def test_map_cancel(self):
        arglst = []
        for x in range(3,20):
            arglst.append((x,))
        gevent.sleep(0)
        asynclst = self.pool.map(sleepOrDie, arglst, abort=True)
        success = 0
        cancel = 0
        failed = 0
        for x in asynclst:
            orig_val = x._args[0]
            try:
                x.get()
                success = success + 1
            except DummyException:
                failed = failed + 1
            except Cancelled:
                cancel = cancel + 1
        self.assertEqual(failed, 1)
        self.assert_(cancel>12)
        self.assert_(success>1)
        gevent.sleep(0)

    def test_async_exception(self):
        async = self.pool.apply_async(my_exception)
        async.wait()
        self.assertTrue(not async.successful())
        try:
            async.get()
            self.assertTrue(False)
        except:
            self.assertTrue(True)

    def test_async_invalid_func(self):
        async = self.pool.apply_async(blockSleep)
        async.wait()
        self.assertTrue(not async.successful())
        try:
            waitany([async])
            self.assertTrue(False)
        except Exception, e:
            self.assertTrue(True)

    # Test that get, and wait both exit correctly when a timeout
    # is given.
    def test_async_timeout(self):
        async = self.pool.apply_async(blockSleep, 1)
        try:
            async.get(timeout=0.1)
        except Timeout, e:
            self.assertTrue(True)

        self.assertTrue(not async.ready())

        async.wait(timeout=0.1)
        self.assertTrue(not async.ready())

        async.wait()
        self.assertTrue(async.ready())

    # Test that a blocking get() works
    def test_run_block_get(self):
        sleepWait = 0.1
        async = self.pool.apply_async(blockSleep, sleepWait)
        async.get()
        self.assertTrue(async.ready())
        self.assertEqual(async.value, sleepWait)

    # Test the map() function - which should block until
    # all the results are ready.
    def test_map(self):
        l = [(0.1,), (1.2,), (1.3,), (1.4,), (1.5,), (1.6,), (1.7,), (1.8,)]
        results = self.pool.map(blockSleep, l)
        gevent.sleep(0)
        for x in range(0, len(l)):
            test = l[x]
            result = results[x]
            self.assertTrue(result.ready())
            self.assertEqual(result.value, test[0])

    # tests the "waitany" function, that will return when
    # one of any number of events are ready.
    # Test requires pool size to be at least 2
    def test_waitany(self):
        wait3 = self.pool.apply_async(blockSleep, 3)
        wait1 = self.pool.apply_async(blockSleep, 1)
        waitany([wait3, wait1])

        self.assertTrue(wait1.ready())
        self.assert_(not wait3.ready())
        self.assertEqual(wait1.value, 1)

        waitany([wait3])
        self.assertTrue(wait3.ready())
        self.assertEqual(wait3.value, 3)

    # Test that the cancelling of any existing, unstarted
    # jobs works correctly.
    # Test requires pool size to be quite small, e.g. 2
    def test_remove(self):
        l = [(0.1,), (1.2,), (1.3,), (1.4,), (1.5,), (1.6,), (1.7,), (1.8,)]
        results = self.pool.map_async(blockSleep, l)
        gevent.sleep(0)
        waitany(results)
        self.assertTrue(results[0].ready())
        removed = self.pool.queue_remove_many(results)
        for x in results[5:]:
            self.assertTrue(x in removed)

    def tearDown(self):
        self.pool.close()
        greentest.TestCase.tearDown(self)

class ThreadPoolAdminFunctions(greentest.TestCase):
    __timeout__ = 20

    def setUp(self):
        greentest.TestCase.setUp(self)
        self.pool = ThreadPool(2)

    def test_resize(self):
        oldsize = self.pool.poolsize
        self.pool.resize(oldsize*2)
        gevent.sleep(0.1)
        self.assertEqual(len(self.pool.thread_list), oldsize*2)
        self.pool.resize(oldsize)
        gevent.sleep(1)
        self.pool._check_threads()
        self.assertEqual(len(self.pool.thread_list), oldsize)

    def tearDown(self):
        self.pool.close()
        greentest.TestCase.tearDown(self)

if __name__ == "__main__":
    greentest.main()