Source

gevent / greentest / test__pool.py

from time import time
import gevent
from gevent import pool
from gevent.event import Event
import greentest


class TestCoroutinePool(greentest.TestCase):
    klass = pool.Pool

    def test_apply_async(self):
        done = Event()
        def some_work(x):
            done.set()
        pool = self.klass(2)
        pool.apply_async(some_work, ('x', ))
        done.wait()

    def test_apply(self):
        value = 'return value'
        def some_work():
            return value
        pool = self.klass(2)
        result = pool.apply(some_work)
        self.assertEqual(value, result)

    def test_multiple_coros(self):
        evt = Event()
        results = []
        def producer():
            results.append('prod')
            evt.set()

        def consumer():
            results.append('cons1')
            evt.wait()
            results.append('cons2')

        pool = self.klass(2)
        done = pool.spawn(consumer)
        pool.apply_async(producer)
        done.get()
        self.assertEquals(['cons1', 'prod', 'cons2'], results)

    def dont_test_timer_cancel(self):
        timer_fired = []
        def fire_timer():
            timer_fired.append(True)
        def some_work():
            gevent.timer(0, fire_timer)
        pool = self.klass(2)
        pool.apply(some_work)
        gevent.sleep(0)
        self.assertEquals(timer_fired, [])

    def test_reentrant(self):
        pool = self.klass(1)
        def reenter():
            result = pool.apply(lambda a: a, ('reenter', ))
            self.assertEqual('reenter', result)

        pool.apply(reenter)

        evt = Event()
        def reenter_async():
            pool.apply_async(lambda a: a, ('reenter', ))
            evt.set()

        pool.apply_async(reenter_async)
        evt.wait()

    def test_stderr_raising(self):
        # testing that really egregious errors in the error handling code
        # (that prints tracebacks to stderr) don't cause the pool to lose
        # any members
        import sys
        pool = self.klass(size=1)
        def crash(*args, **kw):
            raise RuntimeError("Whoa")
        class FakeFile(object):
            write = crash

        # we're going to do this by causing the traceback.print_exc in
        # safe_apply to raise an exception and thus exit _main_loop
        normal_err = sys.stderr
        try:
            sys.stderr = FakeFile()
            waiter = pool.spawn(crash)
            self.assertRaises(RuntimeError, waiter.get)
            # the pool should have something free at this point since the
            # waiter returned
            # pool.Pool change: if an exception is raised during execution of a link,
            # the rest of the links are scheduled to be executed on the next hub iteration
            # this introduces a delay in updating pool.sem which makes pool.free_count() report 0
            # therefore, sleep:
            gevent.sleep(0)
            self.assertEqual(pool.free_count(), 1)
            # shouldn't block when trying to get
            t = gevent.Timeout.start_new(0.1)
            try:
                pool.apply(gevent.sleep, (0, ))
            finally:
                t.cancel()
        finally:
            sys.stderr = normal_err
            pool.join()


class PoolBasicTests(greentest.TestCase):
    klass = pool.Pool

    def test_execute_async(self):
        p = self.klass(size=2)
        self.assertEqual(p.free_count(), 2)
        r = []
        def foo(a):
            r.append(a)
        first = p.spawn(foo, 1)
        self.assertEqual(p.free_count(), 1)
        first.get()
        self.assertEqual(r, [1])
        gevent.sleep(0)
        self.assertEqual(p.free_count(), 2)

        #Once the pool is exhausted, calling an execute forces a yield.

        p.apply_async(foo, (2, ))
        self.assertEqual(1, p.free_count())
        self.assertEqual(r, [1])

        p.apply_async(foo, (3, ))
        self.assertEqual(0, p.free_count())
        self.assertEqual(r, [1])

        p.apply_async(foo, (4, ))
        self.assertEqual(r, [1])
        gevent.sleep(0.01)
        self.assertEqual(r, [1,2,3,4])

    def test_execute(self):
        p = self.klass()
        result = p.apply(lambda a: ('foo', a), (1, ))
        self.assertEqual(result, ('foo', 1))

#
# tests from standard library test/test_multiprocessing.py

class TimingWrapper(object):

    def __init__(self, func):
        self.func = func
        self.elapsed = None

    def __call__(self, *args, **kwds):
        t = time()
        try:
            return self.func(*args, **kwds)
        finally:
            self.elapsed = time() - t


def sqr(x, wait=0.0):
    gevent.sleep(wait)
    return x*x

TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.082, 0.035, 0.14

class TestPool(greentest.TestCase):
    size = 1

    def setUp(self):
        self.pool = pool.Pool(self.size)

    def tearDown(self):
        self.pool.join()

    def test_apply(self):
        papply = self.pool.apply
        self.assertEqual(papply(sqr, (5,)), sqr(5))
        self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))

    def test_map(self):
        pmap = self.pool.map
        self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10)))
        self.assertEqual(pmap(sqr, range(100)),  map(sqr, range(100)))

    def test_async(self):
        res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
        get = TimingWrapper(res.get)
        self.assertEqual(get(), 49)
        self.assertAlmostEqual(get.elapsed, TIMEOUT1, 1)

    def test_async_callback(self):
        result = []
        res = self.pool.apply_async(sqr, (7, TIMEOUT1,), callback=lambda x: result.append(x))
        get = TimingWrapper(res.get)
        self.assertEqual(get(), 49)
        self.assertAlmostEqual(get.elapsed, TIMEOUT1, 1)
        gevent.sleep(0) # let's the callback run
        assert result == [49], result

    def test_async_timeout(self):
        res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
        get = TimingWrapper(res.get)
        self.assertRaises(gevent.Timeout, get, timeout=TIMEOUT2)
        self.assertAlmostEqual(get.elapsed, TIMEOUT2, 1)

    def test_imap(self):
        it = self.pool.imap(sqr, range(10))
        self.assertEqual(list(it), map(sqr, range(10)))

        it = self.pool.imap(sqr, range(10))
        for i in range(10):
            self.assertEqual(it.next(), i*i)
        self.assertRaises(StopIteration, it.next)

        it = self.pool.imap(sqr, range(1000))
        for i in range(1000):
            self.assertEqual(it.next(), i*i)
        self.assertRaises(StopIteration, it.next)

    def test_imap_unordered(self):
        it = self.pool.imap_unordered(sqr, range(1000))
        self.assertEqual(sorted(it), map(sqr, range(1000)))

        it = self.pool.imap_unordered(sqr, range(1000))
        self.assertEqual(sorted(it), map(sqr, range(1000)))

    def test_terminate(self):
        result = self.pool.map_async(gevent.sleep, [0.1] * 1000)
        kill = TimingWrapper(self.pool.kill)
        kill(block=True)
        assert kill.elapsed < 0.5, kill.elapsed


class TestPool2(TestPool):
    size = 2

class TestPool3(TestPool):
    size = 3

class TestPool10(TestPool):
    size = 10

class TestPoolUnlimit(TestPool):
    size = None


class TestJoinSleep(greentest.GenericWaitTestCase):

    def wait(self, timeout):
        p = pool.Pool()
        p.spawn(gevent.sleep, 10)
        p.join(timeout=timeout)


class TestJoinEmpty(greentest.TestCase):
    switch_expected = False

    def test(self):
        p = pool.Pool()
        p.join()


class TestSpawn(greentest.TestCase):
    switch_expected = True

    def test(self):
        p = pool.Pool(1)
        self.assertEqual(len(p), 0)
        p.spawn(gevent.sleep, 0.1)
        self.assertEqual(len(p), 1)
        p.spawn(gevent.sleep, 0.1) # this spawn blocks until the old one finishes
        self.assertEqual(len(p), 1)
        gevent.sleep(0.19)
        self.assertEqual(len(p), 0)


if __name__=='__main__':
    greentest.main()