asyncio.ProcessPoolExecutor tracing not working correctly

Issue #481 closed
Alexander Mohr
created an issue

first I had to monkey patch concurrent.futures.process._process_worker from concurrent.futures.ProcessPoolExecutor because it doesn't call the atexit handlers:

def _concurrent_futures_process_worker(*args, **kwargs):
    orig_call = kwargs.pop('_orig_call')
    result = orig_call(*args, **kwargs)

    for aeh in _concurrent_futures_process_worker._atexit_handlers:
        aeh()

    return result

_concurrent_futures_process_worker._atexit_handlers = []

def _init_coverage_monkey_patch():
    try:
        import coverage
        cps = os.environ.get("COVERAGE_PROCESS_START")
        if not cps:
            return

        # process pool executors don't call atexit handlers :(
        concurrent.futures.process._process_worker = functools.partial(_concurrent_futures_process_worker, _orig_call=concurrent.futures.process._process_worker)
    except:
        pass

def start_coverage():
    try:
       import coverage

        cps = os.environ.get("COVERAGE_PROCESS_START")
        if not cps:
            # No request for coverage, nothing to do.
            return

        cov = coverage.Coverage(config_file=cps, auto_data=True)
        cov.start()
        cov._warn_no_data = False
        cov._warn_unimported_source = False

        _concurrent_futures_process_worker._atexit_handlers.append(cov._atexit)
    except:
        pass

my first note is that this monkey patch would be a lot simpler if coverage.process_startup() returned the Coverage instance.

With the above patching, and running with "-p" for parallel, I actually get coverage data, however it's all bizarre. For example: in a function it will say the first line executed, but the next line did not, and the loop.run_until_complete line did not execute, but most of the lines in the coroutine did.

Comments (8)

  1. Alexander Mohr reporter

    ok I have a reproducible testcase!

    cov_tester.py:

    import asyncio
    import unittest
    import concurrent.futures
    import cov_helper
    
    
    def async_test(f):
        def wrapper(*args, **kwargs):
            coro = asyncio.coroutine(f)
            future = coro(*args, **kwargs)
            loop = asyncio.get_event_loop()
            loop.run_until_complete(future)
        return wrapper
    
    
    def get_executor_props(method, **kwargs):
        if getattr(method, '_logger', None) is None:
            cov_helper.init_coverage()
    
            loop = asyncio.new_event_loop()
            asyncio.set_event_loop(loop)
            method._loop = loop
    
            return loop
        else:
            return method._loop
    
    
    def executor_task():
        loop = get_executor_props(executor_task)
    
        async def doit():
            try:
                print("hello")
            finally:
                print("Done")
    
        loop.run_until_complete(doit())
        return None
    
    
    class TransformerUnitTest(unittest.TestCase):
        def __init__(self, *args, **kwargs):
            unittest.TestCase.__init__(self, *args, **kwargs)
    
        @async_test
        def test_cov(self):
            loop = asyncio.get_event_loop()
    
            cov_helper.init_coverage_monkey_patch()
    
            executor = concurrent.futures.ProcessPoolExecutor(1)
    
            yield from loop.run_in_executor(executor, executor_task)
            executor.shutdown()
    
    
    if __name__ == '__main__':
        unittest.main()
    

    cov_helper.py

    import os
    import concurrent.futures
    import functools
    
    
    def _concurrent_futures_process_worker(*args, **kwargs):
        try:
            orig_call = kwargs.pop('_orig_call')
            result = orig_call(*args, **kwargs)
        finally:
            for aeh in _concurrent_futures_process_worker._atexit_handlers:
                aeh()
    
        return result
    
    _concurrent_futures_process_worker._atexit_handlers = []
    
    
    def init_coverage_monkey_patch():
        try:
            import coverage
            cps = os.environ.get("COVERAGE_PROCESS_START")
            if not cps:
                return
    
            # process pool executors don't call atexit handlers :(
            concurrent.futures.process._process_worker = functools.partial(_concurrent_futures_process_worker, _orig_call=concurrent.futures.process._process_worker)
        except:
            pass
    
    
    def init_coverage():
        if getattr(init_coverage, '_cov', None) is not None:
            return
    
        try:
            # unfortunately the ProcessPoolExecutor uses os._exit so it won't call the atexit handlers and thus use the
            # coverage.process_startup trick
            import coverage
    
            cps = os.environ.get("COVERAGE_PROCESS_START")
            if not cps:
                # No request for coverage, nothing to do.
                return
    
            cov = coverage.Coverage(config_file=cps, auto_data=True)
            cov.start()
            cov._warn_no_data = False
            cov._warn_unimported_source = False
    
            _concurrent_futures_process_worker._atexit_handlers.append(cov._atexit)
    
            init_coverage._cov = cov
        except:
            # not available
            init_coverage._cov = True
    

    .coveragerc

    [run]
    parallel = True
    

    test run:

    export COVERAGE_PROCESS_START=.coveragerc
    python3.5 /usr/local/bin/coverage run -p cov_tester.py
    coverage combine
    coverage html
    open htmlcov/index.html
    

    now look at the results, note how nothing in get_executor_props is marked as run, further note that in executor_task, loop.run_until_complete is not marked as run, and yet the function that it runs is marked as run :) also "async def doit()" is not marked as run (would expect everything after init_coverage would be).

    it seems like its only marking lines as hit in the async task, but not outside it. Perhaps there's some other initialization thats missing?

  2. Log in to comment