asyncio.ProcessPoolExecutor tracing not working correctly

Issue #481 closed
Alexander Mohr created an issue

first I had to monkey patch concurrent.futures.process._process_worker from concurrent.futures.ProcessPoolExecutor because it doesn't call the atexit handlers:

def _concurrent_futures_process_worker(*args, **kwargs):
    orig_call = kwargs.pop('_orig_call')
    result = orig_call(*args, **kwargs)

    for aeh in _concurrent_futures_process_worker._atexit_handlers:
        aeh()

    return result

_concurrent_futures_process_worker._atexit_handlers = []

def _init_coverage_monkey_patch():
    try:
        import coverage
        cps = os.environ.get("COVERAGE_PROCESS_START")
        if not cps:
            return

        # process pool executors don't call atexit handlers :(
        concurrent.futures.process._process_worker = functools.partial(_concurrent_futures_process_worker, _orig_call=concurrent.futures.process._process_worker)
    except:
        pass

def start_coverage():
    try:
       import coverage

        cps = os.environ.get("COVERAGE_PROCESS_START")
        if not cps:
            # No request for coverage, nothing to do.
            return

        cov = coverage.Coverage(config_file=cps, auto_data=True)
        cov.start()
        cov._warn_no_data = False
        cov._warn_unimported_source = False

        _concurrent_futures_process_worker._atexit_handlers.append(cov._atexit)
    except:
        pass

my first note is that this monkey patch would be a lot simpler if coverage.process_startup() returned the Coverage instance.

With the above patching, and running with "-p" for parallel, I actually get coverage data, however it's all bizarre. For example: in a function it will say the first line executed, but the next line did not, and the loop.run_until_complete line did not execute, but most of the lines in the coroutine did.

Comments (8)

  1. Ned Batchelder repo owner

    Can you provide a reproducible description of the problem before we jump into the solution?

  2. Alexander Mohr reporter

    ok I have a reproducible testcase!

    cov_tester.py:

    import asyncio
    import unittest
    import concurrent.futures
    import cov_helper
    
    
    def async_test(f):
        def wrapper(*args, **kwargs):
            coro = asyncio.coroutine(f)
            future = coro(*args, **kwargs)
            loop = asyncio.get_event_loop()
            loop.run_until_complete(future)
        return wrapper
    
    
    def get_executor_props(method, **kwargs):
        if getattr(method, '_logger', None) is None:
            cov_helper.init_coverage()
    
            loop = asyncio.new_event_loop()
            asyncio.set_event_loop(loop)
            method._loop = loop
    
            return loop
        else:
            return method._loop
    
    
    def executor_task():
        loop = get_executor_props(executor_task)
    
        async def doit():
            try:
                print("hello")
            finally:
                print("Done")
    
        loop.run_until_complete(doit())
        return None
    
    
    class TransformerUnitTest(unittest.TestCase):
        def __init__(self, *args, **kwargs):
            unittest.TestCase.__init__(self, *args, **kwargs)
    
        @async_test
        def test_cov(self):
            loop = asyncio.get_event_loop()
    
            cov_helper.init_coverage_monkey_patch()
    
            executor = concurrent.futures.ProcessPoolExecutor(1)
    
            yield from loop.run_in_executor(executor, executor_task)
            executor.shutdown()
    
    
    if __name__ == '__main__':
        unittest.main()
    

    cov_helper.py

    import os
    import concurrent.futures
    import functools
    
    
    def _concurrent_futures_process_worker(*args, **kwargs):
        try:
            orig_call = kwargs.pop('_orig_call')
            result = orig_call(*args, **kwargs)
        finally:
            for aeh in _concurrent_futures_process_worker._atexit_handlers:
                aeh()
    
        return result
    
    _concurrent_futures_process_worker._atexit_handlers = []
    
    
    def init_coverage_monkey_patch():
        try:
            import coverage
            cps = os.environ.get("COVERAGE_PROCESS_START")
            if not cps:
                return
    
            # process pool executors don't call atexit handlers :(
            concurrent.futures.process._process_worker = functools.partial(_concurrent_futures_process_worker, _orig_call=concurrent.futures.process._process_worker)
        except:
            pass
    
    
    def init_coverage():
        if getattr(init_coverage, '_cov', None) is not None:
            return
    
        try:
            # unfortunately the ProcessPoolExecutor uses os._exit so it won't call the atexit handlers and thus use the
            # coverage.process_startup trick
            import coverage
    
            cps = os.environ.get("COVERAGE_PROCESS_START")
            if not cps:
                # No request for coverage, nothing to do.
                return
    
            cov = coverage.Coverage(config_file=cps, auto_data=True)
            cov.start()
            cov._warn_no_data = False
            cov._warn_unimported_source = False
    
            _concurrent_futures_process_worker._atexit_handlers.append(cov._atexit)
    
            init_coverage._cov = cov
        except:
            # not available
            init_coverage._cov = True
    

    .coveragerc

    [run]
    parallel = True
    

    test run:

    export COVERAGE_PROCESS_START=.coveragerc
    python3.5 /usr/local/bin/coverage run -p cov_tester.py
    coverage combine
    coverage html
    open htmlcov/index.html
    

    now look at the results, note how nothing in get_executor_props is marked as run, further note that in executor_task, loop.run_until_complete is not marked as run, and yet the function that it runs is marked as run :) also "async def doit()" is not marked as run (would expect everything after init_coverage would be).

    it seems like its only marking lines as hit in the async task, but not outside it. Perhaps there's some other initialization thats missing?

  3. Loic Dachary

    @ned before starting with this issue I'd like to make sure I'm not missing a draft or notes you made when looking into this a few months ago. Just to avoid duplicating your effort ;-)

  4. Log in to comment