Commits

Trent Nelson committed 9802727 Draft

Implement wait support.

Comments (0)

Files changed (4)

Lib/async/__init__.py

 def submit_work(func, args=None, kwds=None, callback=None, errback=None):
     _async.submit_work(func, args, kwds, callback, errback)
 
-def submit_wait(obj, func, args=None, kwds=None, callback=None, errback=None):
-    raise NotImplementedError
-    _async.submit_wait(obj, func, args, kwds, callback, errback)
+def submit_wait(obj, func=None, args=None, kwds=None,
+                callback=None, errback=None, timeout=None):
+    _async.submit_wait(obj, timeout, func, args, kwds, callback, errback)
 
 def wait_any(waits):
     raise NotImplementedError

Lib/async/test/test_primitives.py

         async.run()
 
 class TestAsyncProtection(unittest.TestCase):
-    def _test_basic(self):
+    def test_basic(self):
         d = {}
-        o = object()
-        async.protect(o)
+        o = async.protect(object())
+        r = async.protect(object())
+        w = async.protect(object())
 
         @async.call_from_main_thread_and_wait
         def _timestamp(name):
 
         def reader(name):
             async.read_lock(o)
-            async.signal(o)         # start writer callback
-            async.wait(o)           # wait for writer callback
+            async.signal(w)         # start writer callback
+            async.wait(r)           # wait for writer callback
             _timestamp(name)
             async.read_unlock(o)
 
         def writer(name):
-            async.signal(o)         # tell the reader we've entered
+            async.signal(r)         # tell the reader we've entered
             async.write_lock(o)     # will be blocked until reader unlocks
             _timestamp(name)
             async.write_unlock(o)
-            async.signal(o)         # tell the main thread we're done
 
-        async.submit_wait(o, writer, 'w')
-        async.submit_work(reader, 'r')
+        async.submit_wait(r, reader, 'r')
+        async.submit_wait(w, writer, 'w')
+        async.signal(r)
         async.run()
         self.assertGreater(d['w'], d['r'])
 

Python/pyparallel.c

 static PyObject *PyExc_ProtectionError;
 static PyObject *PyExc_NoWaitersError;
 static PyObject *PyExc_WaitError;
+static PyObject *PyExc_WaitTimeoutError;
 
 __inline
 PyThreadState *
     return active;
 }
 
+__inline
+char
+_protected(PyObject *obj)
+{
+    return (obj->px_flags & Py_PXFLAGS_RWLOCK);
+}
+
+PyObject *
+_async_protected(PyObject *self, PyObject *obj)
+{
+    Py_INCREF(obj);
+    Py_RETURN_BOOL(_protected(obj));
+}
+
+__inline
+PyObject *
+_protect(PyObject *obj)
+{
+    if (!_protected(obj)) {
+        InitializeSRWLock((PSRWLOCK)&(obj->srw_lock));
+        obj->px_flags |= Py_PXFLAGS_RWLOCK;
+    }
+    return obj;
+}
+
+PyObject *
+_async_protect(PyObject *self, PyObject *obj)
+{
+    Py_INCREF(obj);
+    if (Py_ISPX(obj)) {
+        PyErr_SetNone(PyExc_ProtectionError);
+        return NULL;
+    }
+    return _protect(obj);
+}
+
+__inline
+PyObject *
+_unprotect(PyObject *obj)
+{
+    if (_protected(obj)) {
+        obj->px_flags &= ~Py_PXFLAGS_RWLOCK;
+        obj->srw_lock = NULL;
+    }
+    return obj;
+}
+
+PyObject *
+_async_unprotect(PyObject *self, PyObject *obj)
+{
+    Py_INCREF(obj);
+    if (Py_ISPX(obj)) {
+        PyErr_SetNone(PyExc_ProtectionError);
+        return NULL;
+    }
+    return _unprotect(obj);
+}
+
+__inline
+PyObject *
+_read_lock(PyObject *obj)
+{
+    AcquireSRWLockShared((PSRWLOCK)&(obj->srw_lock));
+    return obj;
+}
+
+PyObject *
+_async_read_lock(PyObject *self, PyObject *obj)
+{
+    Px_PROTECTION_GUARD(obj);
+    Py_INCREF(obj);
+    return _read_lock(obj);
+}
+
+__inline
+PyObject *
+_read_unlock(PyObject *obj)
+{
+    ReleaseSRWLockShared((PSRWLOCK)&(obj->srw_lock));
+    return obj;
+}
+
+PyObject *
+_async_read_unlock(PyObject *self, PyObject *obj)
+{
+    Px_PROTECTION_GUARD(obj);
+    Py_INCREF(obj);
+    return _read_unlock(obj);
+}
+
+__inline
+char
+_try_read_lock(PyObject *obj)
+{
+    return TryAcquireSRWLockShared((PSRWLOCK)&(obj->srw_lock));
+}
+
+PyObject *
+_async_try_read_lock(PyObject *self, PyObject *obj)
+{
+    Px_PROTECTION_GUARD(obj);
+    Py_INCREF(obj);
+    Py_RETURN_BOOL(_try_read_lock(obj));
+}
+
+__inline
+PyObject *
+_write_lock(PyObject *obj)
+{
+    AcquireSRWLockExclusive((PSRWLOCK)&(obj->srw_lock));
+    return obj;
+}
+
+PyObject *
+_async_write_lock(PyObject *self, PyObject *obj)
+{
+    Px_PROTECTION_GUARD(obj);
+    Py_INCREF(obj);
+    return _write_lock(obj);
+}
+
+__inline
+PyObject *
+_write_unlock(PyObject *obj)
+{
+    ReleaseSRWLockExclusive((PSRWLOCK)&(obj->srw_lock));
+    return obj;
+}
+
+PyObject *
+_async_write_unlock(PyObject *self, PyObject *obj)
+{
+    Px_PROTECTION_GUARD(obj);
+    Py_INCREF(obj);
+    return _write_unlock(obj);
+}
+
+__inline
+char
+_try_write_lock(PyObject *obj)
+{
+    return TryAcquireSRWLockExclusive((PSRWLOCK)&(obj->srw_lock));
+}
+
+PyObject *
+_async_try_write_lock(PyObject *self, PyObject *obj)
+{
+    Px_PROTECTION_GUARD(obj);
+    Py_INCREF(obj);
+    Py_RETURN_BOOL(_try_write_lock(obj));
+}
+
+__inline
+char
+_PyEvent_TryCreate(PyObject *o)
+{
+    char success = 1;
+    assert(Py_HAS_RWLOCK(o));
+    _write_lock(o);
+    if (!Py_HAS_EVENT(o)) {
+        success = 0;
+        if (Py_ISPX(o))
+            PyErr_SetNone(PyExc_WaitError);
+        else if (!PyEvent_CREATE(o))
+            PyErr_SetFromWindowsErr(0);
+        else {
+            Py_PXFLAGS(o) |= Py_PXFLAGS_EVENT;
+            success = 1;
+        }
+    }
+    _write_unlock(o);
+    return success;
+}
+
+PyObject *
+_async_wait(PyObject *self, PyObject *o)
+{
+    DWORD result;
+    Px_PROTECTION_GUARD(o);
+    Py_INCREF(o);
+    if (!_PyEvent_TryCreate(o))
+        return NULL;
+
+    result = WaitForSingleObject((HANDLE)o->event, INFINITE);
+
+    if (result == WAIT_OBJECT_0)
+        Py_RETURN_NONE;
+
+    else if (result == WAIT_ABANDONED)
+        PyErr_SetString(PyExc_SystemError, "wait abandoned");
+
+    else if (result == WAIT_TIMEOUT)
+        PyErr_SetString(PyExc_SystemError, "infinite wait timed out?");
+
+    else if (result == WAIT_FAILED)
+        PyErr_SetFromWindowsErr(0);
+
+    else
+        PyErr_SetString(PyExc_SystemError, "unexpected result from wait");
+
+    return NULL;
+}
+
+PyObject *
+_async_signal(PyObject *self, PyObject *o)
+{
+    PyObject *result = NULL;
+    Px_PROTECTION_GUARD(o);
+    Py_INCREF(o);
+    _write_lock(o);
+
+    if (!Py_HAS_EVENT(o))
+        PyErr_SetNone(PyExc_NoWaitersError);
+    else if (!PyEvent_SIGNAL(o))
+        PyErr_SetFromWindowsErr(0);
+    else
+        result = Py_None;
+
+    _write_unlock(o);
+    Py_XINCREF(result);
+    return result;
+}
+
+
 #ifdef Py_DEBUG
 int
 _PxPages_LookupHeapPage(PxPages *pages, Px_UINTPTR *value, void *p)
              * as all that stuff is initialized in the next section. */
             bytes_to_copy = _PyObject_VAR_SIZE(tp, Py_SIZE(p));
 
-            /* (We may need to change this to <= down the track.) */
             assert(bytes_to_copy < object_size);
             assert(Py_SIZE(p) < nitems);
 
 
 void
 NTAPI
-_PyParallel_SimpleWorkCallback(PTP_CALLBACK_INSTANCE instance, void *context)
+_PyParallel_WorkCallback(PTP_CALLBACK_INSTANCE instance, void *context)
 {
     Context  *c = (Context *)context;
     Stats    *s;
     PyObject *args;
     PyThreadState *pstate;
 
+    volatile long       *pending;
+    volatile long       *inflight;
+    volatile long long  *done;
+
     assert(c->tstate);
     assert(c->heap_handle);
 
     px = (PxState *)c->tstate->px;
-    InterlockedDecrement(&(px->pending));
-    InterlockedIncrement(&(px->inflight));
+
+    if (c->tp_wait) {
+        pending = &(px->waits_pending);
+        inflight = &(px->waits_inflight);
+        done = &(px->waits_done);
+    } else if (c->tp_io) {
+        pending = &(px->io_pending);
+        inflight = &(px->io_inflight);
+        done = &(px->io_done);
+    } else if (c->tp_timer) {
+        pending = &(px->timers_pending);
+        inflight = &(px->timers_inflight);
+        done = &(px->timers_done);
+    } else {
+        pending = &(px->pending);
+        inflight = &(px->inflight);
+        done = &(px->done);
+    }
+
+    InterlockedDecrement(pending);
+    InterlockedIncrement(inflight);
 
     ctx = c;
 
     s = &(c->stats);
     s->startup_size = s->allocated;
 
+    if (c->tp_wait) {
+        assert(
+            c->wait_result == WAIT_OBJECT_0 ||
+            c->wait_result == WAIT_TIMEOUT  ||
+            c->wait_result == WAIT_ABANDONED_0
+        );
+        if (c->wait_result == WAIT_OBJECT_0)
+            goto start;
+        else if (c->wait_result == WAIT_TIMEOUT) {
+            PyErr_SetNone(PyExc_WaitTimeoutError);
+            goto errback;
+        } else {
+            PyErr_SetFromWindowsErr(0);
+            goto errback;
+        }
+    } else if (c->tp_io) {
+        assert(0);
+    } else if (c->tp_timer) {
+        assert(0);
+    }
+
+start:
     s->start = _Py_rdtsc();
     c->result = PyObject_Call(c->func, c->args, c->kwds);
     s->end = _Py_rdtsc();
         c->callback_completed->from = c;
         PxList_TimestampItem(c->callback_completed);
         InterlockedExchange(&(c->done), 1);
-        InterlockedIncrement64(&(px->done));
-        InterlockedDecrement(&(px->inflight));
+        InterlockedIncrement64(done);
+        InterlockedDecrement(inflight);
         PxList_Push(px->completed_callbacks, c->callback_completed);
         SetEvent(px->wakeup);
         return;
             c->errback_completed->from = c;
             PxList_TimestampItem(c->errback_completed);
             InterlockedExchange(&(c->done), 1);
-            InterlockedIncrement64(&(px->done));
-            InterlockedDecrement(&(px->inflight));
+            InterlockedIncrement64(done);
+            InterlockedDecrement(inflight);
             PxList_Push(px->completed_errbacks, c->errback_completed);
             SetEvent(px->wakeup);
             return;
     c->error->p2 = pstate->curexc_value;
     c->error->p3 = pstate->curexc_traceback;
     InterlockedExchange(&(c->done), 1);
-    InterlockedIncrement64(&(px->done));
-    InterlockedDecrement(&(px->inflight));
+    InterlockedIncrement64(done);
+    InterlockedDecrement(inflight);
     PxList_Push(px->errors, c->error);
     SetEvent(px->wakeup);
     return;
 }
 
 void
+NTAPI
+_PyParallel_WaitCallback(
+    PTP_CALLBACK_INSTANCE instance,
+    void *context,
+    PTP_WAIT wait,
+    TP_WAIT_RESULT wait_result)
+{
+    Context  *c = (Context *)context;
+
+    assert(wait == c->tp_wait);
+    c->wait_result = wait_result;
+
+    _PyParallel_WorkCallback(instance, context);
+}
+
+void
 _PyParallel_Init(void)
 {
     _Py_sfence();
 
 __inline
 void
+incref_waitobj_args(Context *c)
+{
+    Py_INCREF(c->waitobj);
+    Py_INCREF(c->waitobj_timeout);
+    incref_args(c);
+}
+
+
+__inline
+void
 decref_args(Context *c)
 {
     Py_DECREF(c->func);
     Py_XDECREF(c->errback);
 }
 
+__inline
+void
+decref_waitobj_args(Context *c)
+{
+    Py_DECREF(c->waitobj);
+    Py_DECREF(c->waitobj_timeout);
+}
+
 int
 _PxState_PurgeContexts(PxState *px)
 {
         /* xxx todo: check refcnts of func/args/kwds etc? */
         decref_args(c);
 
+        if (c->tp_wait)
+            decref_waitobj_args(c);
+
         h = c->h;
         s = &(c->stats);
         _PyHeap_FastFree(h, s, c->error);
 
     px = (PxState *)tstate->px;
 
-    if (px->submitted == 0 && px->persistent == 0) {
+    if (px->submitted == 0 &&
+        px->waits_submitted == 0 &&
+        px->persistent == 0)
+    {
         PyErr_SetNone(PyExc_AsyncRunCalledWithoutEventsError);
         return NULL;
     }
 
 __inline
 int
+extract_waitobj_args(PyObject *args, Context *c)
+{
+    if (!PyArg_UnpackTuple(
+            args, "", 2, 7,
+            &(c->waitobj),
+            &(c->waitobj_timeout),
+            &(c->func),
+            &(c->args),
+            &(c->kwds),
+            &(c->callback),
+            &(c->errback)))
+        return 0;
+
+    if (c->waitobj_timeout != Py_None) {
+        PyErr_SetString(PyExc_ValueError, "non-None value for timeout");
+        return 0;
+    }
+
+    if (c->callback == Py_None) {
+        Py_DECREF(c->callback);
+        c->callback = NULL;
+    }
+
+    if (c->errback == Py_None) {
+        Py_DECREF(c->errback);
+        c->errback = NULL;
+    }
+
+    if (c->args == Py_None) {
+        Py_DECREF(c->args);
+        c->args = Py_BuildValue("()");
+    }
+
+    if (c->kwds == Py_None) {
+        Py_DECREF(c->kwds);
+        c->kwds = NULL;
+    }
+
+    if (c->args && !PyTuple_Check(c->args)) {
+        PyObject *tmp = c->args;
+        c->args = Py_BuildValue("(O)", c->args);
+        Py_DECREF(tmp);
+    }
+
+    return 1;
+}
+
+__inline
+int
 submit_work(Context *c)
 {
     int retval;
     PTP_SIMPLE_CALLBACK cb;
 
-    cb = _PyParallel_SimpleWorkCallback;
+    cb = _PyParallel_WorkCallback;
     retval = TrySubmitThreadpoolCallback(cb, c, NULL);
     if (!retval)
         PyErr_SetFromWindowsErr(0);
 _async_submit_wait(PyObject *self, PyObject *args)
 {
     PyObject *result = NULL;
-
+    Context  *c;
+    PxState  *px;
+    PTP_WAIT_CALLBACK cb;
+
+    c = new_context();
+    if (!c)
+        return NULL;
+
+    px = c->px;
+
+    if (!extract_waitobj_args(args, c))
+        goto free_context;
+
+    cb = _PyParallel_WaitCallback;
+    c->tp_wait = CreateThreadpoolWait(cb, c, NULL);
+    if (!c->tp_wait) {
+        PyErr_SetFromWindowsErr(0);
+        goto free_context;
+    }
+
+    if (!_PyEvent_TryCreate(c->waitobj))
+        goto free_context;
+
+    SetThreadpoolWait(c->tp_wait, Py_EVENT(c->waitobj), NULL);
+
+    InterlockedIncrement64(&(px->waits_submitted));
+    InterlockedIncrement(&(px->waits_pending));
+    InterlockedIncrement(&(px->active));
+    c->stats.submitted = _Py_rdtsc();
+
+    incref_waitobj_args(c);
+
+    c->px->contexts_created++;
+    c->px->contexts_active++;
+    result = (Py_INCREF(Py_None), Py_None);
+    goto done;
+
+free_context:
+    HeapDestroy(c->heap_handle);
+    free(c);
+
+done:
+    if (!result)
+        assert(PyErr_Occurred());
     return result;
 }
 
     return _call_from_main_thread(self, args, 1);
 }
 
-/* protection */
-
-__inline
-char
-_protected(PyObject *obj)
-{
-    return (obj->px_flags & Py_PXFLAGS_RWLOCK);
-}
-
-PyObject *
-_async_protected(PyObject *self, PyObject *obj)
-{
-    Py_INCREF(obj);
-    Py_RETURN_BOOL(_protected(obj));
-}
-
-__inline
-PyObject *
-_protect(PyObject *obj)
-{
-    if (!_protected(obj)) {
-        InitializeSRWLock((PSRWLOCK)&(obj->srw_lock));
-        obj->px_flags |= Py_PXFLAGS_RWLOCK;
-    }
-    return obj;
-}
-
-PyObject *
-_async_protect(PyObject *self, PyObject *obj)
-{
-    Py_INCREF(obj);
-    if (Py_ISPX(obj)) {
-        PyErr_SetNone(PyExc_ProtectionError);
-        return NULL;
-    }
-    return _protect(obj);
-}
-
-__inline
-PyObject *
-_unprotect(PyObject *obj)
-{
-    if (_protected(obj)) {
-        obj->px_flags &= ~Py_PXFLAGS_RWLOCK;
-        obj->srw_lock = NULL;
-    }
-    return obj;
-}
-
-PyObject *
-_async_unprotect(PyObject *self, PyObject *obj)
-{
-    Py_INCREF(obj);
-    if (Py_ISPX(obj)) {
-        PyErr_SetNone(PyExc_ProtectionError);
-        return NULL;
-    }
-    return _unprotect(obj);
-}
-
-__inline
-PyObject *
-_read_lock(PyObject *obj)
-{
-    AcquireSRWLockShared((PSRWLOCK)&(obj->srw_lock));
-    return obj;
-}
-
-PyObject *
-_async_read_lock(PyObject *self, PyObject *obj)
-{
-    Px_PROTECTION_GUARD(obj);
-    Py_INCREF(obj);
-    return _read_lock(obj);
-}
-
-__inline
-PyObject *
-_read_unlock(PyObject *obj)
-{
-    ReleaseSRWLockShared((PSRWLOCK)&(obj->srw_lock));
-    return obj;
-}
-
-PyObject *
-_async_read_unlock(PyObject *self, PyObject *obj)
-{
-    Px_PROTECTION_GUARD(obj);
-    Py_INCREF(obj);
-    return _read_unlock(obj);
-}
-
-__inline
-char
-_try_read_lock(PyObject *obj)
-{
-    return TryAcquireSRWLockShared((PSRWLOCK)&(obj->srw_lock));
-}
-
-PyObject *
-_async_try_read_lock(PyObject *self, PyObject *obj)
-{
-    Px_PROTECTION_GUARD(obj);
-    Py_INCREF(obj);
-    Py_RETURN_BOOL(_try_read_lock(obj));
-}
-
-__inline
-PyObject *
-_write_lock(PyObject *obj)
-{
-    AcquireSRWLockExclusive((PSRWLOCK)&(obj->srw_lock));
-    return obj;
-}
-
-PyObject *
-_async_write_lock(PyObject *self, PyObject *obj)
-{
-    Px_PROTECTION_GUARD(obj);
-    Py_INCREF(obj);
-    return _write_lock(obj);
-}
-
-__inline
-PyObject *
-_write_unlock(PyObject *obj)
-{
-    ReleaseSRWLockExclusive((PSRWLOCK)&(obj->srw_lock));
-    return obj;
-}
-
-PyObject *
-_async_write_unlock(PyObject *self, PyObject *obj)
-{
-    Px_PROTECTION_GUARD(obj);
-    Py_INCREF(obj);
-    return _write_unlock(obj);
-}
-
-__inline
-char
-_try_write_lock(PyObject *obj)
-{
-    return TryAcquireSRWLockExclusive((PSRWLOCK)&(obj->srw_lock));
-}
-
-PyObject *
-_async_try_write_lock(PyObject *self, PyObject *obj)
-{
-    Px_PROTECTION_GUARD(obj);
-    Py_INCREF(obj);
-    Py_RETURN_BOOL(_try_write_lock(obj));
-}
-
-__inline
-char
-_PyEvent_TryCreate(PyObject *o)
-{
-    char result = 0;
-    assert(Py_HAS_RWLOCK(o));
-    _write_lock(o);
-    if (!Py_HAS_EVENT(o)) {
-        result = 1;
-        if (Py_ISPX(o))
-            PyErr_SetNone(PyExc_WaitError);
-        if (!PyEvent_CREATE(o))
-            PyErr_SetFromWindowsErr(0);
-        else
-            result = 0;
-    }
-    _write_unlock(o);
-    return result;
-}
-
-PyObject *
-_async_wait(PyObject *self, PyObject *o)
-{
-    DWORD result;
-    Px_PROTECTION_GUARD(o);
-    Py_INCREF(o);
-    if (!_PyEvent_TryCreate(o))
-        return NULL;
-
-    result = WaitForSingleObject((HANDLE)o->event, INFINITE);
-
-    if (result == WAIT_OBJECT_0)
-        Py_RETURN_NONE;
-
-    else if (result == WAIT_ABANDONED)
-        PyErr_SetString(PyExc_SystemError, "wait abandoned");
-
-    else if (result == WAIT_TIMEOUT)
-        PyErr_SetString(PyExc_SystemError, "infinite wait timed out?");
-
-    else if (result == WAIT_FAILED)
-        PyErr_SetFromWindowsErr(0);
-
-    else
-        PyErr_SetString(PyExc_SystemError, "unexpected result from wait");
-
-    return NULL;
-}
-
-PyObject *
-_async_signal(PyObject *self, PyObject *o)
-{
-    PyObject *result = NULL;
-    Px_PROTECTION_GUARD(o);
-    Py_INCREF(o);
-    _write_lock(o);
-
-    if (!Py_HAS_EVENT(o))
-        PyErr_SetNone(PyExc_NoWaitersError);
-    else if (!PyEvent_SIGNAL(o))
-        PyErr_SetFromWindowsErr(0);
-    else
-        result = Py_None;
-
-    _write_unlock(o);
-    Py_XINCREF(result);
-    return result;
-}
-
 PyDoc_STRVAR(_async_doc,
 "_async module.\n\
 \n\
     if (!PyExc_WaitError)
         return NULL;
 
+    PyExc_WaitTimeoutError = \
+        PyErr_NewException("_async.WaitTimeoutError", PyExc_AsyncError, NULL);
+    if (!PyExc_WaitTimeoutError)
+        return NULL;
+
     if (PyModule_AddObject(m, "AsyncError", PyExc_AsyncError))
         return NULL;
 
     if (PyModule_AddObject(m, "WaitError", PyExc_WaitError))
         return NULL;
 
+    if (PyModule_AddObject(m, "WaitTimeoutError", PyExc_WaitTimeoutError))
+        return NULL;
+
     Py_INCREF(PyExc_AsyncError);
     Py_INCREF(PyExc_ProtectionError);
     Py_INCREF(PyExc_NoWaitersError);
     Py_INCREF(PyExc_WaitError);
+    Py_INCREF(PyExc_WaitTimeoutError);
 
     /* Uncomment the following (during development) as needed. */
     /*

Python/pyparallel_private.h

     //volatile long long  succeeded;
 
     //__declspec(align(SYSTEM_CACHE_ALIGNMENT_SIZE))
+    volatile long long  waits_submitted;
+    volatile long       waits_pending;
+    volatile long       waits_inflight;
+    volatile long long  waits_done;
+    //volatile long long  failed;
+    //volatile long long  succeeded;
+
+    //__declspec(align(SYSTEM_CACHE_ALIGNMENT_SIZE))
+    volatile long long  timers_submitted;
+    volatile long       timers_pending;
+    volatile long       timers_inflight;
+    volatile long long  timers_done;
+    //volatile long long  failed;
+    //volatile long long  succeeded;
+
+    //__declspec(align(SYSTEM_CACHE_ALIGNMENT_SIZE))
     volatile long long  io_submitted;
     volatile long       io_pending;
     volatile long       io_inflight;
 } PxState;
 
 typedef struct _PyParallelContext {
+    PyObject *waitobj;
+    PyObject *waitobj_timeout;
     PyObject *func;
     PyObject *args;
     PyObject *kwds;
     PyObject *errback;
     PyObject *result;
 
+    TP_WAIT        *tp_wait;
+    TP_WAIT_RESULT  wait_result;
+    PFILETIME       wait_timeout;
+
+    TP_IO    *tp_io;
+
+    TP_TIMER *tp_timer;
+
     Context *prev;
     Context *next;