+"""Tests for coroutining."""
+from nose.plugins.skip import SkipTest
+from tests.coveragetest import CoverageTest
+# These libraries aren't always available, we'll skip tests if they aren't.
+ import eventlet # pylint: disable=import-error
+ import gevent # pylint: disable=import-error
+ """How many non-blank lines are in `s`?"""
+ return sum(1 for l in s.splitlines() if l.strip())
+ """Tests of the coroutine support in coverage.py."""
+ # The code common to all the concurrency models. Don't use any comments,
+ # we're counting non-blank lines to see they are all covered.
+ class Producer(threading.Thread):
+ class Consumer(threading.Thread):
+ # Import the things to use threads.
+ # Import the things to use eventlet.
+ import eventlet.green.threading as threading
+ import eventlet.queue as Queue
+ # Import the things to use gevent.
+ from gevent import monkey
+ import gevent.queue as Queue
+ def try_some_code(self, code, args):
+ """Run some coroutine testing code and see that it was all covered."""
+ raise SkipTest("Need to put this on a back burner for a while...")
+ self.make_file("try_it.py", code)
+ out = self.run_command("coverage run %s try_it.py" % args)
+ expected_out = "%d\n" % (sum(range(1000)))
+ self.assertEqual(out, expected_out)
+ # Read the coverage file and see that try_it.py has all its lines
+ data = coverage.CoverageData()
+ lines = line_count(code)
+ self.assertEqual(data.summary()['try_it.py'], lines)
+ def test_threads(self):
+ self.try_some_code(self.THREAD, "")
+ def test_eventlet(self):
+ raise SkipTest("No eventlet available")
+ self.try_some_code(self.EVENTLET, "--coroutine=eventlet")
+ raise SkipTest("No gevent available")
+ self.try_some_code(self.GEVENT, "--coroutine=gevent")