Source

cffi / testing / callback_in_thread.py

import time
from cffi import FFI

def _run_callback_in_thread():
    ffi = FFI()
    ffi.cdef("""
        typedef int (*mycallback_func_t)(int, int);
        int threaded_ballback_test(mycallback_func_t mycb);
    """)
    lib = ffi.verify("""
        #include <pthread.h>
        typedef int (*mycallback_func_t)(int, int);
        void *my_wait_function(void *ptr) {
            mycallback_func_t cbfunc = (mycallback_func_t)ptr;
            cbfunc(10, 10);
            cbfunc(12, 15);
            return NULL;
        }
        int threaded_ballback_test(mycallback_func_t mycb) {
            pthread_t thread;
            pthread_create(&thread, NULL, my_wait_function, (void*)mycb);
            return 0;
        }
    """, extra_compile_args=['-pthread'])
    seen = []
    @ffi.callback('int(*)(int,int)')
    def mycallback(x, y):
        seen.append((x, y))
        return 0
    lib.threaded_ballback_test(mycallback)
    count = 300
    while len(seen) != 2:
        time.sleep(0.01)
        count -= 1
        assert count > 0, "timeout"
    assert seen == [(10, 10), (12, 15)]

print('STARTING')
_run_callback_in_thread()
print('DONE')
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.