Commits

sbt  committed c64ff42

Rip out pollsters and replace with proactors.

Note that the IOCP proactor does not support ssl sockets or (currently)
ipv6. Also, there is no kqueue proactor.

To use IOCP first compile by doing

python setup.py build

  • Participants
  • Parent commits de0af7e

Comments (0)

Files changed (8)

 
 
 def doit():
-    TIMEOUT = 2
+    TIMEOUT = 5
     tasks = set()
 
     # This references NDB's default test service.

File overlapped.c

+/*
+ * Support for overlapped IO
+ *
+ * Some code borrowed from Modules/_winapi.c of CPython
+ */
+
+/* XXX check overflow and DWORD <-> Py_ssize_t conversions
+   Check itemsize */
+
+#include "Python.h"
+#include "structmember.h"
+
+#define WINDOWS_LEAN_AND_MEAN
+#include <winsock2.h>
+#include <ws2tcpip.h>
+#include <mswsock.h>
+
+#if defined(MS_WIN32) && !defined(MS_WIN64)
+#  define F_POINTER "k"
+#  define T_POINTER T_ULONG
+#else
+#  define F_POINTER "K"
+#  define T_POINTER T_ULONGLONG
+#endif
+
+#define F_HANDLE F_POINTER
+#define F_ULONG_PTR F_POINTER
+#define F_DWORD "k"
+#define F_BOOL "i"
+#define F_UINT "I"
+
+#define T_HANDLE T_POINTER
+
+enum {TYPE_NONE, TYPE_NOT_STARTED, TYPE_READ, TYPE_WRITE, TYPE_ACCEPT,
+      TYPE_CONNECT, TYPE_DISCONNECT};
+
+/*
+ * Some functions should be loaded at runtime
+ */
+
+static LPFN_ACCEPTEX Py_AcceptEx = NULL;
+static LPFN_CONNECTEX Py_ConnectEx = NULL;
+static LPFN_DISCONNECTEX Py_DisconnectEx = NULL;
+static BOOL (CALLBACK *Py_CancelIoEx)(HANDLE, LPOVERLAPPED) = NULL;
+
+#define GET_WSA_POINTER(s, x)                                           \
+    (SOCKET_ERROR != WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER,    \
+                              &Guid##x, sizeof(Guid##x), &Py_##x,       \
+                              sizeof(Py_##x), &dwBytes, NULL, NULL))
+
+static int
+initialize_function_pointers(void)
+{
+    GUID GuidAcceptEx = WSAID_ACCEPTEX;
+    GUID GuidConnectEx = WSAID_CONNECTEX;
+    GUID GuidDisconnectEx = WSAID_DISCONNECTEX;
+    HINSTANCE hKernel32;
+    SOCKET s;
+    DWORD dwBytes;
+
+    s = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
+
+    if (s == INVALID_SOCKET ||
+        !GET_WSA_POINTER(s, AcceptEx) ||
+        !GET_WSA_POINTER(s, ConnectEx) ||
+        !GET_WSA_POINTER(s, DisconnectEx))
+    {
+        PyErr_SetFromWindowsErr(WSAGetLastError());
+        return -1;
+    }
+
+    closesocket(s);
+
+    /* On WinXP we will have Py_CancelIoEx == NULL */
+    hKernel32 = GetModuleHandle("KERNEL32");
+    *(FARPROC *)&Py_CancelIoEx = GetProcAddress(hKernel32, "CancelIoEx");
+    return 0;
+}
+
+/*
+ * Completion port stuff
+ */
+
+PyDoc_STRVAR(
+    CreateIoCompletionPort_doc,
+    "CreateIoCompletionPort(handle, port, key, concurrency) -> port\n\n"
+    "Create a completion port or register a handle with a port.");
+
+static PyObject *
+overlapped_CreateIoCompletionPort(PyObject *self, PyObject *args)
+{
+    HANDLE FileHandle;
+    HANDLE ExistingCompletionPort;
+    ULONG_PTR CompletionKey;
+    DWORD NumberOfConcurrentThreads;
+    HANDLE ret;
+
+    if (!PyArg_ParseTuple(args, F_HANDLE F_HANDLE F_ULONG_PTR F_DWORD,
+                          &FileHandle, &ExistingCompletionPort, &CompletionKey,
+                          &NumberOfConcurrentThreads))
+        return NULL;
+
+    Py_BEGIN_ALLOW_THREADS
+    ret = CreateIoCompletionPort(FileHandle, ExistingCompletionPort,
+                                 CompletionKey, NumberOfConcurrentThreads);
+    Py_END_ALLOW_THREADS
+
+    if (ret == NULL)
+        return PyErr_SetFromWindowsErr(0);
+    return Py_BuildValue(F_HANDLE, ret);
+}
+
+PyDoc_STRVAR(
+    GetQueuedCompletionStatus_doc,
+    "GetQueuedCompletionStatus(port, msecs) -> (err, bytes, key, address)\n\n"
+    "Get a message from completion port.  Wait for up to msecs milliseconds.");
+
+static PyObject *
+overlapped_GetQueuedCompletionStatus(PyObject *self, PyObject *args)
+{
+    HANDLE CompletionPort;
+    DWORD Milliseconds;
+    DWORD NumberOfBytes;
+    ULONG_PTR CompletionKey;
+    OVERLAPPED *Overlapped = NULL;
+    DWORD err;
+    BOOL ret;
+
+    if (!PyArg_ParseTuple(args, F_HANDLE F_DWORD,
+                          &CompletionPort, &Milliseconds))
+        return NULL;
+
+    Py_BEGIN_ALLOW_THREADS
+    ret = GetQueuedCompletionStatus(CompletionPort, &NumberOfBytes,
+                                    &CompletionKey, &Overlapped, Milliseconds);
+    Py_END_ALLOW_THREADS
+
+    err = ret ? ERROR_SUCCESS : GetLastError();
+    if (Overlapped == NULL) {
+        if (err == WAIT_TIMEOUT)
+            Py_RETURN_NONE;
+        else
+            return PyErr_SetFromWindowsErr(err);
+    }
+    return Py_BuildValue(F_DWORD F_DWORD F_ULONG_PTR F_POINTER,
+                         err, NumberOfBytes, CompletionKey, Overlapped);
+}
+
+PyDoc_STRVAR(
+    PostQueuedCompletionStatus_doc,
+    "PostQueuedCompletionStatus(port, bytes, key, address) -> None\n\n"
+    "Post a message to completion port.");
+
+static PyObject *
+overlapped_PostQueuedCompletionStatus(PyObject *self, PyObject *args)
+{
+    HANDLE CompletionPort;
+    DWORD NumberOfBytes;
+    ULONG_PTR CompletionKey;
+    OVERLAPPED *Overlapped;
+    BOOL ret;
+
+    if (!PyArg_ParseTuple(args, F_HANDLE F_DWORD F_ULONG_PTR F_POINTER,
+                          &CompletionPort, &NumberOfBytes, &CompletionKey,
+                          &Overlapped))
+        return NULL;
+
+    Py_BEGIN_ALLOW_THREADS
+    ret = PostQueuedCompletionStatus(CompletionPort, NumberOfBytes,
+                                     CompletionKey, Overlapped);
+    Py_END_ALLOW_THREADS
+
+    if (!ret)
+        return PyErr_SetFromWindowsErr(0);
+    Py_RETURN_NONE;
+}
+
+/*
+ * Bind socket handle to local port without doing slow getaddrinfo()
+ */
+
+PyDoc_STRVAR(
+    BindLocal_doc,
+    "BindLocal(handle) -> Overlapped[None]\n\n"
+    "Bind a socket handle to arbitrary local port");
+
+static PyObject *
+overlapped_BindLocal(PyObject *self, PyObject *args)
+{
+    SOCKET Socket;
+    struct sockaddr_in addr;
+    BOOL ret;
+
+    if (!PyArg_ParseTuple(args, F_HANDLE, &Socket))
+        return NULL;
+
+    memset(&addr, 0, sizeof(addr));
+    addr.sin_family = AF_INET;
+    addr.sin_port = 0;
+    addr.sin_addr.S_un.S_addr = INADDR_ANY;
+
+    Py_BEGIN_ALLOW_THREADS
+    ret = bind(Socket, (SOCKADDR*)&addr, sizeof(addr)) != SOCKET_ERROR;
+    Py_END_ALLOW_THREADS
+
+    if (!ret)
+        return PyErr_SetExcFromWindowsErr(PyExc_IOError, WSAGetLastError());
+    Py_RETURN_NONE;
+}
+
+/*
+ * A Python object wrapping an OVERLAPPED structure and other useful data
+ * for overlapped I/O
+ */
+
+PyDoc_STRVAR(
+    Overlapped_doc,
+    "Overlapped object");
+
+typedef struct {
+    PyObject_HEAD
+    OVERLAPPED overlapped;
+    /* For convenience, we store the file handle too */
+    HANDLE handle;
+    /* Error returned by last method call */
+    DWORD error;
+    /* Type of operation */
+    DWORD type;
+    /* Buffer used for reading (optional) */
+    PyObject *read_buffer;
+    /* Buffer used for writing (optional) */
+    Py_buffer write_buffer;
+} OverlappedObject;
+
+
+static PyObject *
+Overlapped_new(PyTypeObject *type, PyObject *args, PyObject *kwds)
+{
+    OverlappedObject *self;
+    HANDLE event = INVALID_HANDLE_VALUE;
+    static char *kwlist[] = {"event", NULL};
+
+    if (!PyArg_ParseTupleAndKeywords(args, kwds, "|" F_HANDLE, kwlist, &event))
+        return NULL;
+
+    if (event == INVALID_HANDLE_VALUE) {
+        event = CreateEvent(NULL, TRUE, FALSE, NULL);
+        if (event == NULL)
+            return PyErr_SetExcFromWindowsErr(PyExc_OSError, 0);
+    }
+
+    self = PyObject_New(OverlappedObject, type);
+    if (self == NULL) {
+        if (event != NULL)
+            CloseHandle(event);
+        return NULL;
+    }
+
+    self->handle = NULL;
+    self->error = 0;
+    self->type = TYPE_NONE;
+    self->read_buffer = NULL;
+    memset(&self->overlapped, 0, sizeof(OVERLAPPED));
+    memset(&self->write_buffer, 0, sizeof(Py_buffer));
+    if (event)
+        self->overlapped.hEvent = event;
+    return (PyObject *)self;
+}
+
+static void
+Overlapped_dealloc(OverlappedObject *self)
+{
+    DWORD bytes;
+    DWORD olderr = GetLastError();
+    BOOL wait = FALSE;
+    BOOL ret;
+
+    if (!HasOverlappedIoCompleted(&self->overlapped) &&
+        self->type != TYPE_NOT_STARTED)
+    {
+        if (Py_CancelIoEx && Py_CancelIoEx(self->handle, &self->overlapped))
+            wait = TRUE;
+
+        Py_BEGIN_ALLOW_THREADS
+        ret = GetOverlappedResult(self->handle, &self->overlapped,
+                                  &bytes, wait);
+        Py_END_ALLOW_THREADS
+
+        switch (ret ? ERROR_SUCCESS : GetLastError()) {
+            case ERROR_SUCCESS:
+            case ERROR_NOT_FOUND:
+            case ERROR_OPERATION_ABORTED:
+                break;
+            default:
+                PyErr_SetString(
+                    PyExc_RuntimeError,
+                    "I/O operations still in flight while destroying "
+                    "Overlapped object, the process may crash");
+                PyErr_WriteUnraisable(NULL);
+        }
+    }
+
+    if (self->overlapped.hEvent != NULL)
+        CloseHandle(self->overlapped.hEvent);
+
+    if (self->write_buffer.obj)
+        PyBuffer_Release(&self->write_buffer);
+
+    Py_CLEAR(self->read_buffer);
+    PyObject_Del(self);
+    SetLastError(olderr);
+}
+
+PyDoc_STRVAR(
+    Overlapped_cancel_doc,
+    "cancel() -> None\n\n"
+    "Cancel overlapped operation");
+
+static PyObject *
+Overlapped_cancel(OverlappedObject *self)
+{
+    BOOL ret = TRUE;
+
+    if (self->type == TYPE_NOT_STARTED)
+        Py_RETURN_NONE;
+
+    if (!HasOverlappedIoCompleted(&self->overlapped)) {
+        Py_BEGIN_ALLOW_THREADS
+        if (Py_CancelIoEx)
+            ret = Py_CancelIoEx(self->handle, &self->overlapped);
+        else
+            ret = CancelIo(self->handle);
+        Py_END_ALLOW_THREADS
+    }
+
+    /* CancelIoEx returns ERROR_NOT_FOUND if the I/O completed in-between */
+    if (!ret && GetLastError() != ERROR_NOT_FOUND)
+        return PyErr_SetExcFromWindowsErr(PyExc_IOError, 0);
+    Py_RETURN_NONE;
+}
+
+PyDoc_STRVAR(
+    Overlapped_getresult_doc,
+    "getresult(wait=False) -> result\n\n"
+    "Retrieve result of operation.  If wait is true then it blocks\n"
+    "until the operation is finished.  If wait is false and the\n"
+    "operation is still pending then an error is raised.");
+
+static PyObject *
+Overlapped_getresult(OverlappedObject *self, PyObject *args)
+{
+    BOOL wait = FALSE;
+    DWORD transferred = 0;
+    BOOL ret;
+    DWORD err;
+
+    if (!PyArg_ParseTuple(args, "|" F_BOOL, &wait))
+        return NULL;
+
+    if (self->type == TYPE_NONE) {
+        PyErr_SetString(PyExc_ValueError, "operation not yet attempted");
+        return NULL;
+    }
+
+    if (self->type == TYPE_NOT_STARTED) {
+        PyErr_SetString(PyExc_ValueError, "operation failed to start");
+        return NULL;
+    }
+
+    Py_BEGIN_ALLOW_THREADS
+    ret = GetOverlappedResult(self->handle, &self->overlapped, &transferred,
+                              wait);
+    Py_END_ALLOW_THREADS
+
+    self->error = err = ret ? ERROR_SUCCESS : GetLastError();
+    switch (err) {
+        case ERROR_SUCCESS:
+        case ERROR_MORE_DATA:
+            break;
+        case ERROR_BROKEN_PIPE:
+            if (self->read_buffer != NULL)
+                break;
+            /* fall through */
+        default:
+            return PyErr_SetExcFromWindowsErr(PyExc_IOError, err);
+    }
+
+    switch (self->type) {
+        case TYPE_READ:
+            assert(PyBytes_CheckExact(self->read_buffer));
+            if (transferred != PyBytes_GET_SIZE(self->read_buffer) &&
+                _PyBytes_Resize(&self->read_buffer, transferred))
+                return NULL;
+            Py_INCREF(self->read_buffer);
+            return self->read_buffer;
+        case TYPE_ACCEPT:
+        case TYPE_CONNECT:
+        case TYPE_DISCONNECT:
+            Py_RETURN_NONE;
+        default:
+            return PyLong_FromUnsignedLong((unsigned long) transferred);
+    }
+}
+
+PyDoc_STRVAR(
+    Overlapped_ReadFile_doc,
+    "ReadFile(handle, size) -> Overlapped[message]\n\n"
+    "Start overlapped read");
+
+static PyObject *
+Overlapped_ReadFile(OverlappedObject *self, PyObject *args)
+{
+    HANDLE handle;
+    DWORD size;
+    DWORD nread;
+    PyObject *buf;
+    BOOL ret;
+    DWORD err;
+
+    if (!PyArg_ParseTuple(args, F_HANDLE F_DWORD, &handle, &size))
+        return NULL;
+
+    if (self->type != TYPE_NONE) {
+        PyErr_SetString(PyExc_ValueError, "operation already attempted");
+        return NULL;
+    }
+
+#if SIZEOF_SIZE_T <= SIZEOF_LONG
+    size = Py_MIN(size, (DWORD)PY_SSIZE_T_MAX);
+#endif
+    buf = PyBytes_FromStringAndSize(NULL, Py_MAX(size, 1));
+    if (buf == NULL)
+        return NULL;
+
+    self->type = TYPE_READ;
+    self->handle = handle;
+    self->read_buffer = buf;
+
+    Py_BEGIN_ALLOW_THREADS
+    ret = ReadFile(handle, PyBytes_AS_STRING(buf), size, &nread,
+                   &self->overlapped);
+    Py_END_ALLOW_THREADS
+
+    self->error = err = ret ? ERROR_SUCCESS : GetLastError();
+    switch (err) {
+        case ERROR_SUCCESS:
+        case ERROR_MORE_DATA:
+        case ERROR_IO_PENDING:
+        case ERROR_BROKEN_PIPE:
+            Py_RETURN_NONE;
+        default:
+            self->type = TYPE_NOT_STARTED;
+            return PyErr_SetExcFromWindowsErr(PyExc_IOError, err);
+    }
+}
+
+PyDoc_STRVAR(
+    Overlapped_WSARecv_doc,
+    "RecvFile(handle, size, flags) -> Overlapped[message]\n\n"
+    "Start overlapped receive");
+
+static PyObject *
+Overlapped_WSARecv(OverlappedObject *self, PyObject *args)
+{
+    HANDLE handle;
+    DWORD size;
+    DWORD flags;
+    DWORD nread;
+    PyObject *buf;
+    WSABUF wsabuf;
+    BOOL ret;
+    DWORD err;
+
+    if (!PyArg_ParseTuple(args, F_HANDLE F_DWORD F_DWORD,
+                          &handle, &size, &flags))
+        return NULL;
+
+    if (self->type != TYPE_NONE) {
+        PyErr_SetString(PyExc_ValueError, "operation already attempted");
+        return NULL;
+    }
+
+#if SIZEOF_SIZE_T <= SIZEOF_LONG
+    size = Py_MIN(size, (DWORD)PY_SSIZE_T_MAX);
+#endif
+    buf = PyBytes_FromStringAndSize(NULL, Py_MAX(size, 1));
+    if (buf == NULL)
+        return NULL;
+
+    self->type = TYPE_READ;
+    self->handle = handle;
+    self->read_buffer = buf;
+    wsabuf.len = size;
+    wsabuf.buf = PyBytes_AS_STRING(buf);
+
+    Py_BEGIN_ALLOW_THREADS
+    ret = WSARecv((SOCKET)handle, &wsabuf, 1, &nread, &flags,
+                  &self->overlapped, NULL);
+    Py_END_ALLOW_THREADS
+
+    self->error = err = ret ? ERROR_SUCCESS : WSAGetLastError();
+    switch (err) {
+        case ERROR_SUCCESS:
+        case ERROR_MORE_DATA:
+        case ERROR_IO_PENDING:
+        case ERROR_BROKEN_PIPE:
+            Py_RETURN_NONE;
+        default:
+            self->type = TYPE_NOT_STARTED;
+            return PyErr_SetExcFromWindowsErr(PyExc_IOError, err);
+    }
+}
+
+PyDoc_STRVAR(
+    Overlapped_WriteFile_doc,
+    "WriteFile(handle, buf) -> Overlapped[bytes_transferred]\n\n"
+    "Start overlapped write");
+
+static PyObject *
+Overlapped_WriteFile(OverlappedObject *self, PyObject *args)
+{
+    HANDLE handle;
+    PyObject *bufobj;
+    DWORD written;
+    BOOL ret;
+    DWORD err;
+
+    if (!PyArg_ParseTuple(args, F_HANDLE "O", &handle, &bufobj))
+        return NULL;
+
+    if (self->type != TYPE_NONE) {
+        PyErr_SetString(PyExc_ValueError, "operation already attempted");
+        return NULL;
+    }
+
+    if (!PyArg_Parse(bufobj, "y*", &self->write_buffer))
+        return NULL;
+
+#if SIZEOF_SIZE_T > SIZEOF_LONG
+    if (self->write_buffer.len > (Py_ssize_t)PY_ULONG_MAX) {
+        PyBuffer_Release(&self->write_buffer);
+        PyErr_SetString(PyExc_ValueError, "buffer to large");
+        return NULL;
+    }
+#endif
+
+    self->type = TYPE_WRITE;
+    self->handle = handle;
+
+    Py_BEGIN_ALLOW_THREADS
+    ret = WriteFile(handle, self->write_buffer.buf, self->write_buffer.len,
+                    &written, &self->overlapped);
+    Py_END_ALLOW_THREADS
+
+    self->error = err = ret ? ERROR_SUCCESS : GetLastError();
+    switch (err) {
+        case ERROR_SUCCESS:
+        case ERROR_MORE_DATA:
+        case ERROR_IO_PENDING:
+            Py_RETURN_NONE;
+        default:
+            self->type = TYPE_NOT_STARTED;
+            return PyErr_SetExcFromWindowsErr(PyExc_IOError, err);
+    }
+}
+
+PyDoc_STRVAR(
+    Overlapped_WSASend_doc,
+    "WSASend(handle, buf, flags) -> Overlapped[bytes_transferred]\n\n"
+    "Start overlapped send");
+
+static PyObject *
+Overlapped_WSASend(OverlappedObject *self, PyObject *args)
+{
+    HANDLE handle;
+    PyObject *bufobj;
+    DWORD flags;
+    DWORD written;
+    WSABUF wsabuf;
+    BOOL ret;
+    DWORD err;
+
+    if (!PyArg_ParseTuple(args, F_HANDLE "O" F_DWORD,
+                          &handle, &bufobj, &flags))
+        return NULL;
+
+    if (self->type != TYPE_NONE) {
+        PyErr_SetString(PyExc_ValueError, "operation already attempted");
+        return NULL;
+    }
+
+    if (!PyArg_Parse(bufobj, "y*", &self->write_buffer))
+        return NULL;
+
+#if SIZEOF_SIZE_T > SIZEOF_LONG
+    if (self->write_buffer.len > (Py_ssize_t)PY_ULONG_MAX) {
+        PyBuffer_Release(&self->write_buffer);
+        PyErr_SetString(PyExc_ValueError, "buffer to large");
+        return NULL;
+    }
+#endif
+
+    self->type = TYPE_WRITE;
+    self->handle = handle;
+    wsabuf.len = (DWORD)self->write_buffer.len;
+    wsabuf.buf = self->write_buffer.buf;
+
+    Py_BEGIN_ALLOW_THREADS
+    ret = WSASend((SOCKET)handle, &wsabuf, 1, &written, flags,
+                  &self->overlapped, NULL);
+    Py_END_ALLOW_THREADS
+
+    self->error = err = ret ? ERROR_SUCCESS : WSAGetLastError();
+    switch (err) {
+        case ERROR_SUCCESS:
+        case ERROR_MORE_DATA:
+        case ERROR_IO_PENDING:
+            Py_RETURN_NONE;
+        default:
+            self->type = TYPE_NOT_STARTED;
+            return PyErr_SetExcFromWindowsErr(PyExc_IOError, err);
+    }
+}
+
+PyDoc_STRVAR(
+    Overlapped_AcceptEx_doc,
+    "AcceptEx(listen_handle, accept_handle) -> Overlapped[address_as_bytes]\n\n"
+    "Start overlapped wait for client to connect");
+
+static PyObject *
+Overlapped_AcceptEx(OverlappedObject *self, PyObject *args)
+{
+    SOCKET ListenSocket;
+    SOCKET AcceptSocket;
+    DWORD BytesReceived;
+    DWORD size;
+    PyObject *buf;
+    BOOL ret;
+    DWORD err;
+
+    if (!PyArg_ParseTuple(args, F_HANDLE F_HANDLE,
+                          &ListenSocket, &AcceptSocket))
+        return NULL;
+
+    if (self->type != TYPE_NONE) {
+        PyErr_SetString(PyExc_ValueError, "operation already attempted");
+        return NULL;
+    }
+
+    size = sizeof(struct sockaddr_in) + 16;
+    buf = PyBytes_FromStringAndSize(NULL, size*2);
+    if (!buf)
+        return NULL;
+
+    self->type = TYPE_ACCEPT;
+    self->handle = (HANDLE)ListenSocket;
+    self->read_buffer = buf;
+
+    Py_BEGIN_ALLOW_THREADS
+    ret = Py_AcceptEx(ListenSocket, AcceptSocket, PyBytes_AS_STRING(buf),
+                      0, size, size, &BytesReceived, &self->overlapped);
+    Py_END_ALLOW_THREADS
+
+    self->error = err = ret ? ERROR_SUCCESS : WSAGetLastError();
+    switch (err) {
+        case ERROR_SUCCESS:
+        case ERROR_IO_PENDING:
+            Py_RETURN_NONE;
+        default:
+            self->type = TYPE_NOT_STARTED;
+            return PyErr_SetExcFromWindowsErr(PyExc_IOError, err);
+    }
+}
+
+PyDoc_STRVAR(
+    Overlapped_ConnectEx_doc,
+    "ConnectEx(client_handle, address_as_bytes) -> Overlapped[None]\n\n"
+    "Start overlapped connect.  client_handle should be unbound.");
+
+static PyObject *
+Overlapped_ConnectEx(OverlappedObject *self, PyObject *args)
+{
+    SOCKET ConnectSocket;
+    struct sockaddr_in Address = {AF_INET};
+    char *HostString;
+    unsigned long Host;
+    unsigned short Port;
+    BOOL ret;
+    DWORD err;
+
+    if (!PyArg_ParseTuple(args, F_HANDLE "(sH)",
+                          &ConnectSocket, &HostString, &Port))
+        return NULL;
+
+    if (self->type != TYPE_NONE) {
+        PyErr_SetString(PyExc_ValueError, "operation already attempted");
+        return NULL;
+    }
+
+    Host = inet_addr(HostString);
+    if (Host == INADDR_NONE) {
+        PyErr_SetString(PyExc_ValueError, "invalid host address");
+        return NULL;
+    }
+
+    Address.sin_addr.s_addr = Host;
+    Address.sin_port = htons(Port);
+
+    self->type = TYPE_CONNECT;
+    self->handle = (HANDLE)ConnectSocket;
+
+    Py_BEGIN_ALLOW_THREADS
+    ret = Py_ConnectEx(ConnectSocket, (SOCKADDR*)&Address, sizeof(Address),
+                       NULL, 0, NULL, &self->overlapped);
+    Py_END_ALLOW_THREADS
+
+    self->error = err = ret ? ERROR_SUCCESS : WSAGetLastError();
+    switch (err) {
+        case ERROR_SUCCESS:
+        case ERROR_IO_PENDING:
+            Py_RETURN_NONE;
+        default:
+            self->type = TYPE_NOT_STARTED;
+            return PyErr_SetExcFromWindowsErr(PyExc_IOError, err);
+    }
+}
+
+PyDoc_STRVAR(
+    Overlapped_DisconnectEx_doc,
+    "DisconnectEx(handle, flags) -> Overlapped[None]\n\n"
+    "Start overlapped connect.  client_handle should be unbound.");
+
+static PyObject *
+Overlapped_DisconnectEx(OverlappedObject *self, PyObject *args)
+{
+    SOCKET Socket;
+    DWORD flags;
+    BOOL ret;
+    DWORD err;
+
+    if (!PyArg_ParseTuple(args, F_HANDLE F_DWORD, &Socket, &flags))
+        return NULL;
+
+    if (self->type != TYPE_NONE) {
+        PyErr_SetString(PyExc_ValueError, "operation already attempted");
+        return NULL;
+    }
+
+    self->type = TYPE_DISCONNECT;
+    self->handle = (HANDLE)Socket;
+
+    Py_BEGIN_ALLOW_THREADS
+    ret = Py_DisconnectEx(Socket, &self->overlapped, flags, 0);
+    Py_END_ALLOW_THREADS
+
+    self->error = err = ret ? ERROR_SUCCESS : WSAGetLastError();
+    switch (err) {
+        case ERROR_SUCCESS:
+        case ERROR_IO_PENDING:
+            Py_RETURN_NONE;
+        default:
+            self->type = TYPE_NOT_STARTED;
+            return PyErr_SetExcFromWindowsErr(PyExc_IOError, err);
+    }
+}
+
+static PyObject*
+Overlapped_getaddress(OverlappedObject *self)
+{
+    return PyLong_FromVoidPtr(&self->overlapped);
+}
+
+static PyObject*
+Overlapped_getpending(OverlappedObject *self)
+{
+    return PyBool_FromLong(!HasOverlappedIoCompleted(&self->overlapped) &&
+                           self->type != TYPE_NOT_STARTED);
+}
+
+static PyMethodDef Overlapped_methods[] = {
+    {"getresult", (PyCFunction) Overlapped_getresult,
+     METH_VARARGS, Overlapped_getresult_doc},
+    {"cancel", (PyCFunction) Overlapped_cancel,
+     METH_NOARGS, Overlapped_cancel_doc},
+    {"ReadFile", (PyCFunction) Overlapped_ReadFile,
+     METH_VARARGS, Overlapped_ReadFile_doc},
+    {"WSARecv", (PyCFunction) Overlapped_WSARecv,
+     METH_VARARGS, Overlapped_WSARecv_doc},
+    {"WriteFile", (PyCFunction) Overlapped_WriteFile,
+     METH_VARARGS, Overlapped_WriteFile_doc},
+    {"WSASend", (PyCFunction) Overlapped_WSASend,
+     METH_VARARGS, Overlapped_WSASend_doc},
+    {"AcceptEx", (PyCFunction) Overlapped_AcceptEx,
+     METH_VARARGS, Overlapped_AcceptEx_doc},
+    {"ConnectEx", (PyCFunction) Overlapped_ConnectEx,
+     METH_VARARGS, Overlapped_ConnectEx_doc},
+    {"DisconnectEx", (PyCFunction) Overlapped_DisconnectEx,
+     METH_VARARGS, Overlapped_DisconnectEx_doc},
+    {NULL}
+};
+
+static PyMemberDef Overlapped_members[] = {
+    {"error", T_ULONG,
+     offsetof(OverlappedObject, error),
+     READONLY, "Error from last operation"},
+    {"event", T_HANDLE,
+     offsetof(OverlappedObject, overlapped) + offsetof(OVERLAPPED, hEvent),
+     READONLY, "Overlapped event handle"},
+    {NULL}
+};
+
+static PyGetSetDef Overlapped_getsets[] = {
+    {"address", (getter)Overlapped_getaddress, NULL,
+     "Address of overlapped structure"},
+    {"pending", (getter)Overlapped_getpending, NULL,
+     "Whether the operation is pending"},
+    {NULL},
+};
+
+PyTypeObject OverlappedType = {
+    PyVarObject_HEAD_INIT(NULL, 0)
+    /* tp_name           */ "_overlapped.Overlapped",
+    /* tp_basicsize      */ sizeof(OverlappedObject),
+    /* tp_itemsize       */ 0,
+    /* tp_dealloc        */ (destructor) Overlapped_dealloc,
+    /* tp_print          */ 0,
+    /* tp_getattr        */ 0,
+    /* tp_setattr        */ 0,
+    /* tp_reserved       */ 0,
+    /* tp_repr           */ 0,
+    /* tp_as_number      */ 0,
+    /* tp_as_sequence    */ 0,
+    /* tp_as_mapping     */ 0,
+    /* tp_hash           */ 0,
+    /* tp_call           */ 0,
+    /* tp_str            */ 0,
+    /* tp_getattro       */ 0,
+    /* tp_setattro       */ 0,
+    /* tp_as_buffer      */ 0,
+    /* tp_flags          */ Py_TPFLAGS_DEFAULT,
+    /* tp_doc            */ "OVERLAPPED structure wrapper",
+    /* tp_traverse       */ 0,
+    /* tp_clear          */ 0,
+    /* tp_richcompare    */ 0,
+    /* tp_weaklistoffset */ 0,
+    /* tp_iter           */ 0,
+    /* tp_iternext       */ 0,
+    /* tp_methods        */ Overlapped_methods,
+    /* tp_members        */ Overlapped_members,
+    /* tp_getset         */ Overlapped_getsets,
+    /* tp_base           */ 0,
+    /* tp_dict           */ 0,
+    /* tp_descr_get      */ 0,
+    /* tp_descr_set      */ 0,
+    /* tp_dictoffset     */ 0,
+    /* tp_init           */ 0,
+    /* tp_alloc          */ 0,
+    /* tp_new            */ Overlapped_new,
+};
+
+static PyMethodDef overlapped_functions[] = {
+    {"CreateIoCompletionPort", overlapped_CreateIoCompletionPort,
+     METH_VARARGS, CreateIoCompletionPort_doc},
+    {"GetQueuedCompletionStatus", overlapped_GetQueuedCompletionStatus,
+     METH_VARARGS, GetQueuedCompletionStatus_doc},
+    {"PostQueuedCompletionStatus", overlapped_PostQueuedCompletionStatus,
+     METH_VARARGS, PostQueuedCompletionStatus_doc},
+    {"BindLocal", overlapped_BindLocal,
+     METH_VARARGS, BindLocal_doc},
+    {NULL}
+};
+
+static struct PyModuleDef overlapped_module = {
+    PyModuleDef_HEAD_INIT,
+    "_overlapped",
+    NULL,
+    -1,
+    overlapped_functions,
+    NULL,
+    NULL,
+    NULL,
+    NULL
+};
+
+#define WINAPI_CONSTANT(fmt, con) \
+    PyDict_SetItemString(d, #con, Py_BuildValue(fmt, con))
+
+PyMODINIT_FUNC
+PyInit__overlapped(void)
+{
+    PyObject *m, *d;
+
+    /* Ensure WSAStartup() called before initializing function pointers */
+    m = PyImport_ImportModule("_socket");
+    if (!m)
+        return NULL;
+    Py_DECREF(m);
+
+    if (initialize_function_pointers() < 0)
+        return NULL;
+
+    if (PyType_Ready(&OverlappedType) < 0)
+        return NULL;
+
+    m = PyModule_Create(&overlapped_module);
+    if (PyModule_AddObject(m, "Overlapped", (PyObject *)&OverlappedType) < 0)
+        return NULL;
+
+    d = PyModule_GetDict(m);
+    WINAPI_CONSTANT(F_DWORD,  INFINITE);
+    WINAPI_CONSTANT(F_HANDLE, INVALID_HANDLE_VALUE);
+    WINAPI_CONSTANT(F_HANDLE, NULL);
+    WINAPI_CONSTANT(F_DWORD,  SO_UPDATE_ACCEPT_CONTEXT);
+    WINAPI_CONSTANT(F_DWORD,  SO_UPDATE_CONNECT_CONTEXT);
+    WINAPI_CONSTANT(F_DWORD,  TF_REUSE_SOCKET);
+
+    return m;
+}
 import os
 import select
 import time
+import socket
 
-
-class PollsterBase:
-    """Base class for all polling implementations.
-
-    This defines an interface to register and unregister readers and
-    writers for specific file descriptors, and an interface to get a
-    list of events.  There's also an interface to check whether any
-    readers or writers are currently registered.
-    """
-
-    def __init__(self):
-        super().__init__()
-        self.readers = {}  # {fd: token, ...}.
-        self.writers = {}  # {fd: token, ...}.
-
-    def pollable(self):
-        """Return True if any readers or writers are currently registered."""
-        return bool(self.readers or self.writers)
-
-    # Subclasses are expected to extend the add/remove methods.
-
-    def register_reader(self, fd, token):
-        """Add or update a reader for a file descriptor."""
-        self.readers[fd] = token
-
-    def register_writer(self, fd, token):
-        """Add or update a writer for a file descriptor."""
-        self.writers[fd] = token
-
-    def unregister_reader(self, fd):
-        """Remove the reader for a file descriptor."""
-        del self.readers[fd]
-
-    def unregister_writer(self, fd):
-        """Remove the writer for a file descriptor."""
-        del self.writers[fd]
-
-    def poll(self, timeout=None):
-        """Poll for events.  A subclass must implement this.
-
-        If timeout is omitted or None, this blocks until at least one
-        event is ready.  Otherwise, timeout gives a maximum time to
-        wait (an int of float in seconds) -- the method returns as
-        soon as at least one event is ready or when the timeout is
-        expired.  For a non-blocking poll, pass 0.
-
-        The return value is a list of events; it is empty when the
-        timeout expired before any events were ready.  Each event
-        is a token previously passed to register_reader/writer().
-        """
-        raise NotImplementedError
-
-
-class SelectPollster(PollsterBase):
-    """Pollster implementation using select."""
-
-    def poll(self, timeout=None):
-        readable, writable, _ = select.select(self.readers, self.writers,
-                                              [], timeout)
-        events = []
-        events += (self.readers[fd] for fd in readable)
-        events += (self.writers[fd] for fd in writable)
-        return events
-
-
-class PollPollster(PollsterBase):
-    """Pollster implementation using poll."""
-
-    def __init__(self):
-        super().__init__()
-        self._poll = select.poll()
-
-    def _update(self, fd):
-        assert isinstance(fd, int), fd
-        flags = 0
-        if fd in self.readers:
-            flags |= select.POLLIN
-        if fd in self.writers:
-            flags |= select.POLLOUT
-        if flags:
-            self._poll.register(fd, flags)
-        else:
-            self._poll.unregister(fd)
-
-    def register_reader(self, fd, callback, *args):
-        super().register_reader(fd, callback, *args)
-        self._update(fd)
-
-    def register_writer(self, fd, callback, *args):
-        super().register_writer(fd, callback, *args)
-        self._update(fd)
-
-    def unregister_reader(self, fd):
-        super().unregister_reader(fd)
-        self._update(fd)
-
-    def unregister_writer(self, fd):
-        super().unregister_writer(fd)
-        self._update(fd)
-
-    def poll(self, timeout=None):
-        # Timeout is in seconds, but poll() takes milliseconds.
-        msecs = None if timeout is None else int(round(1000 * timeout))
-        events = []
-        for fd, flags in self._poll.poll(msecs):
-            if flags & (select.POLLIN | select.POLLHUP):
-                if fd in self.readers:
-                    events.append(self.readers[fd])
-            if flags & (select.POLLOUT | select.POLLHUP):
-                if fd in self.writers:
-                    events.append(self.writers[fd])
-        return events
-
-
-class EPollPollster(PollsterBase):
-    """Pollster implementation using epoll."""
-
-    def __init__(self):
-        super().__init__()
-        self._epoll = select.epoll()
-
-    def _update(self, fd):
-        assert isinstance(fd, int), fd
-        eventmask = 0
-        if fd in self.readers:
-            eventmask |= select.EPOLLIN
-        if fd in self.writers:
-            eventmask |= select.EPOLLOUT
-        if eventmask:
-            try:
-                self._epoll.register(fd, eventmask)
-            except IOError:
-                self._epoll.modify(fd, eventmask)
-        else:
-            self._epoll.unregister(fd)
-
-    def register_reader(self, fd, callback, *args):
-        super().register_reader(fd, callback, *args)
-        self._update(fd)
-
-    def register_writer(self, fd, callback, *args):
-        super().register_writer(fd, callback, *args)
-        self._update(fd)
-
-    def unregister_reader(self, fd):
-        super().unregister_reader(fd)
-        self._update(fd)
-
-    def unregister_writer(self, fd):
-        super().unregister_writer(fd)
-        self._update(fd)
-
-    def poll(self, timeout=None):
-        if timeout is None:
-            timeout = -1  # epoll.poll() uses -1 to mean "wait forever".
-        events = []
-        for fd, eventmask in self._epoll.poll(timeout):
-            if eventmask & select.EPOLLIN:
-                if fd in self.readers:
-                    events.append(self.readers[fd])
-            if eventmask & select.EPOLLOUT:
-                if fd in self.writers:
-                    events.append(self.writers[fd])
-        return events
-
-
-class KqueuePollster(PollsterBase):
-    """Pollster implementation using kqueue."""
-
-    def __init__(self):
-        super().__init__()
-        self._kqueue = select.kqueue()
-
-    def register_reader(self, fd, callback, *args):
-        if fd not in self.readers:
-            kev = select.kevent(fd, select.KQ_FILTER_READ, select.KQ_EV_ADD)
-            self._kqueue.control([kev], 0, 0)
-        return super().register_reader(fd, callback, *args)
-
-    def register_writer(self, fd, callback, *args):
-        if fd not in self.readers:
-            kev = select.kevent(fd, select.KQ_FILTER_WRITE, select.KQ_EV_ADD)
-            self._kqueue.control([kev], 0, 0)
-        return super().register_writer(fd, callback, *args)
-
-    def unregister_reader(self, fd):
-        super().unregister_reader(fd)
-        kev = select.kevent(fd, select.KQ_FILTER_READ, select.KQ_EV_DELETE)
-        self._kqueue.control([kev], 0, 0)
-
-    def unregister_writer(self, fd):
-        super().unregister_writer(fd)
-        kev = select.kevent(fd, select.KQ_FILTER_WRITE, select.KQ_EV_DELETE)
-        self._kqueue.control([kev], 0, 0)
-
-    def poll(self, timeout=None):
-        events = []
-        max_ev = len(self.readers) + len(self.writers)
-        for kev in self._kqueue.control(None, max_ev, timeout):
-            fd = kev.ident
-            flag = kev.filter
-            if flag == select.KQ_FILTER_READ and fd in self.readers:
-                events.append(self.readers[fd])
-            elif flag == select.KQ_FILTER_WRITE and fd in self.writers:
-                events.append(self.writers[fd])
-        return events
-
-
-# Pick the best pollster class for the platform.
-if hasattr(select, 'kqueue'):
-    best_pollster = KqueuePollster
-elif hasattr(select, 'epoll'):
-    best_pollster = EPollPollster
-elif hasattr(select, 'poll'):
-    best_pollster = PollPollster
-else:
-    best_pollster = SelectPollster
+# local imports
+from proactor import Proactor
 
 
 class DelayedCall:
     This class's instance variables are not part of its API.
     """
 
-    def __init__(self, pollster=None):
+    def __init__(self, proactor=None):
         super().__init__()
-        if pollster is None:
-            logging.info('Using pollster: %s', best_pollster.__name__)
-            pollster = best_pollster()
-        self.pollster = pollster
+        if proactor is None:
+            logging.info('Using proactor: %s', Proactor.__name__)
+            proactor = Proactor()
+        self.proactor = proactor
         self.ready = collections.deque()  # [(callback, args), ...]
         self.scheduled = []  # [(when, callback, args), ...]
 
-    def add_reader(self, fd, callback, *args):
-        """Add a reader callback.  Return a DelayedCall instance."""
-        dcall = DelayedCall(None, callback, args)
-        self.pollster.register_reader(fd, dcall)
-        return dcall
-
-    def remove_reader(self, fd):
-        """Remove a reader callback."""
-        self.pollster.unregister_reader(fd)
-
-    def add_writer(self, fd, callback, *args):
-        """Add a writer callback.  Return a DelayedCall instance."""
-        dcall = DelayedCall(None, callback, args)
-        self.pollster.register_writer(fd, dcall)
-        return dcall
-
-    def remove_writer(self, fd):
-        """Remove a writer callback."""
-        self.pollster.unregister_writer(fd)
-
     def add_callback(self, dcall):
         """Add a DelayedCall to ready or scheduled."""
         if dcall.cancelled:
             heapq.heappop(self.scheduled)
 
         # Inspect the poll queue.
-        if self.pollster.pollable():
+        if self.proactor.pollable():
             if self.scheduled:
                 when = self.scheduled[0].when
                 timeout = max(0, when - time.time())
             else:
                 timeout = None
             t0 = time.time()
-            events = self.pollster.poll(timeout)
+            # done callbacks added to ready futures get run by poll()
+            self.proactor.poll(timeout)
             t1 = time.time()
             argstr = '' if timeout is None else ' %.3f' % timeout
             if t1-t0 >= 1:
             else:
                 level = logging.DEBUG
             logging.log(level, 'poll%s took %.3f seconds', argstr, t1-t0)
-            for dcall in events:
-                self.add_callback(dcall)
 
         # Handle 'later' callbacks that are ready.
         while self.scheduled:
         writable file descriptors, or scheduled callbacks (of either
         variety).
         """
-        while self.ready or self.scheduled or self.pollster.pollable():
+        while self.ready or self.scheduled or self.proactor.pollable():
             self.run_once()
 
 
+
 MAX_WORKERS = 5  # Default max workers when creating an executor.
 
+try:
+    from socket import socketpair
+except ImportError:
+    def socketpair():
+        with socket.socket() as l:
+            l.bind(('127.0.0.1', 0))
+            l.listen(1)
+            c = socket.socket()
+            c.connect(l.getsockname())
+            a, _ = l.accept()
+            return a, c
 
 class ThreadRunner:
     """Helper to submit work to a thread pool and wait for it.
     def __init__(self, eventloop, executor=None):
         self.eventloop = eventloop
         self.executor = executor  # Will be constructed lazily.
-        self.pipe_read_fd, self.pipe_write_fd = os.pipe()
+        self.rsock, self.wsock = socketpair()
+        self.rsock.settimeout(0)
         self.active_count = 0
+        self.read_future = None
 
-    def read_callback(self):
-        """Semi-permanent callback while at least one future is active."""
+    def read_callback(self, read_future):
         assert self.active_count > 0, self.active_count
-        data = os.read(self.pipe_read_fd, 8192)  # Traditional buffer size.
-        self.active_count -= len(data)
-        if self.active_count == 0:
-            self.eventloop.remove_reader(self.pipe_read_fd)
+        try:
+            self.active_count -= len(read_future.result())
+        except OSError:
+            pass                # We will just restart read_future
         assert self.active_count >= 0, self.active_count
+        if self.active_count > 0:
+            self.read_future = self.eventloop.proactor.recv(self.rsock, 128)
+            self.read_future.add_done_callback(self.read_callback)
 
-    def submit(self, func, *args, executor=None):
+    def submit(self, func, *args, executor=None, insert_callback=None):
         """Submit a function to the thread pool.
 
         This returns a concurrent.futures.Future instance.  The caller
                 self.executor = executor
         assert self.active_count >= 0, self.active_count
         future = executor.submit(func, *args)
-        if self.active_count == 0:
-            self.eventloop.add_reader(self.pipe_read_fd, self.read_callback)
+        if self.read_future is None or self.read_future.done():
+            self.read_future = self.eventloop.proactor.recv(self.rsock, 128)
+            self.read_future.add_done_callback(self.read_callback)
         self.active_count += 1
+        if insert_callback is not None:
+            future.add_done_callback(insert_callback)
         def done_callback(future):
-            os.write(self.pipe_write_fd, b'x')
+            self.wsock.send(b'x')
         future.add_done_callback(done_callback)
         return future
+#
+# Module implementing the Proactor pattern
+#
+# A proactor is used to initiate asynchronous I/O, and to wait for
+# completion of previously initiated operations.
+#
+
+import os
+import sys
+import errno
+import socket
+import select
+import time
+
+
+__all__ = ['SelectProactor']
+
+#
+# Future class
+#
+
+class Future:
+
+    def __init__(self):
+        self._callbacks = []
+
+    def result(self):
+        # does not block for operation to complete
+        assert self.done()
+        if self.success:
+            return self.value
+        else:
+            raise self.value
+
+    def set_result(self, value):
+        assert not self.done()
+        self.success = True
+        self.value = value
+        self._invoke_callbacks()
+
+    def set_exception(self, value):
+        assert not self.done()
+        self.success = False
+        self.value = value
+        self._invoke_callbacks()
+
+    def done(self):
+        return hasattr(self, 'success')
+
+    def add_done_callback(self, func):
+        if self.done():
+            func(self)
+        else:
+            self._callbacks.append(func)
+
+    def _invoke_callbacks(self):
+        for func in self._callbacks:
+            try:
+                func(self)
+            except Exception:
+                sys.excepthook(*sys.exc_info())
+        del self._callbacks
+
+#
+# Base class for all proactors
+#
+
+class BaseProactor:
+    _Future = Future
+
+    def __init__(self):
+        self._results = []
+
+    def poll(self, timeout=None):
+        if not self._results:
+            self._poll(timeout)
+        tmp, self._results = self._results, []
+        return tmp
+
+    def filteredpoll(self, penders, timeout=None):
+        if timeout is None:
+            deadline = None
+        elif timeout < 0:
+            raise ValueError('negative timeout')
+        else:
+            deadline = time.monotonic() + timeout
+        S = set(penders)
+        while True:
+            filtered = [x for x in self._results if x[0] in S]
+            if filtered:
+                self._results = [x for x in self._results if x[0] not in S]
+                return filtered
+            self._poll(timeout)
+            if deadline is not None:
+                timeout = deadline - time.monotonic()
+                if timeout <= 0:
+                    break
+
+    def close(self):
+        pass
+
+#
+# Initiator methods for proactors based on select()/poll()/epoll()/kqueue()
+#
+
+READABLE = 0
+WRITABLE = 1
+
+class ReadyBaseProactor(BaseProactor):
+    eager = True
+
+    def __init__(self):
+        super().__init__()
+        self._queue = [{}, {}]
+
+    def pollable(self):
+        return any(self._queue)
+
+    def recv(self, sock, nbytes, flags=0):
+        return self._register(self.eager, sock.fileno(), READABLE,
+                              sock.recv, nbytes, flags)
+
+    def send(self, sock, buf, flags=0):
+        return self._register(self.eager, sock.fileno(), WRITABLE,
+                              sock.send, buf, flags)
+
+    def accept(self, sock):
+        def _accept():
+            conn, addr = sock.accept()
+            conn.settimeout(0)
+            return conn, addr
+        return self._register(self.eager, sock.fileno(), READABLE, _accept)
+
+    def connect(self, sock, addr):
+        assert sock.gettimeout() == 0
+        err = sock.connect_ex(addr)
+        if err not in self._connection_errors:
+            raise OSError(err, os.strerror(err))
+        def _connect():
+            err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
+            if err != 0:
+                raise OSError(err, os.strerror(err))
+        return self._register(False, sock.fileno(), WRITABLE, _connect)
+
+    # hacks to support SSL
+    def _readable(self, sock):
+        return self._register(False, sock.fileno(), READABLE, lambda:None)
+
+    def _writable(self, sock):
+        return self._register(False, sock.fileno(), WRITABLE, lambda:None)
+
+#
+# Proactor using select()
+#
+
+class SelectProactor(ReadyBaseProactor):
+    _connection_errors = {0, errno.EINPROGRESS}
+    _select = select.select
+
+    def _poll(self, timeout=None):
+        rfds, wfds, xfds = self._select(self._queue[READABLE].keys(),
+                                        self._queue[WRITABLE].keys(),
+                                        (), timeout)
+        for fd in rfds:
+            self._handle(fd, READABLE)
+        for fd in wfds:
+            self._handle(fd, WRITABLE)
+
+    def _handle(self, fd, kind):
+        Q = self._queue[kind][fd]
+        f, callback, args = Q.pop(0)
+        try:
+            f.set_result(callback(*args))
+        except OSError as e:
+            f.set_exception(e)
+        self._results.append(f)
+        if not Q:
+            del self._queue[kind][fd]
+
+    def _register(self, eager, fd, kind, callback, *args):
+        f = self._Future()
+        if eager:
+            try:
+                f.set_result(callback(*args))
+                return f
+            except BlockingIOError:
+                pass
+        queue = self._queue[kind]
+        if fd not in queue:
+            queue[fd] = []
+        queue[fd].append((f, callback, args))
+        return f
+
+    if sys.platform == 'win32':
+        # Windows insists on being awkward...
+        _connection_errors = {0, errno.WSAEWOULDBLOCK}
+
+        def _select(self, rfds, wfds, _, timeout=None):
+            if not (rfds or wfds):
+                time.sleep(timeout)
+                return [], [], []
+            else:
+                rfds, wfds, xfds = select.select(rfds, wfds, wfds, timeout)
+                return rfds, wfds + xfds, []
+
+
+#
+# Proactor using poll()
+#
+
+if hasattr(select, 'poll'):
+    __all__.append('PollProactor')
+
+    from select import POLLIN, POLLPRI, POLLOUT, POLLHUP, POLLERR, POLLNVAL
+
+    FLAG = [POLLIN, POLLOUT]
+    READ_EXTRA_FLAGS = POLLIN | POLLHUP | POLLNVAL | POLLERR
+    WRITE_EXTRA_FLAGS = POLLOUT | POLLHUP | POLLNVAL | POLLERR
+
+    class PollProactor(ReadyBaseProactor):
+        _connection_errors = {0, errno.EINPROGRESS}
+        _make_poller = select.poll
+
+        def __init__(self):
+            super().__init__()
+            self._poller = self._make_poller()
+            self._flag = {}
+
+        def _poll(self, timeout=None):
+            if timeout is None:
+                ms = -1
+            elif timeout < 0:
+                raise ValueError('negative timeout')
+            else:
+                ms = int(timeout*1000 + 0.5)
+            ready = self._poller.poll(ms)
+            for fd, flags in ready:
+                if fd in self._queue[READABLE] and flags & READ_EXTRA_FLAGS:
+                    self._handle(fd, READABLE)
+                if fd in self._queue[WRITABLE] and flags & WRITE_EXTRA_FLAGS:
+                    self._handle(fd, WRITABLE)
+
+        def _handle(self, fd, kind):
+            Q = self._queue[kind][fd]
+            f, callback, args = Q.pop(0)
+            try:
+                f.set_result(callback(*args))
+            except OSError as e:
+                f.set_exception(e)
+            self._results.append(f)
+            if not Q:
+                del self._queue[kind][fd]
+                flag = self._flag[fd] = self._flag[fd] & ~FLAG[kind]
+                if flag == 0:
+                    del self._flag[fd]
+                    self._poller.unregister(fd)
+                else:
+                    self._poller.modify(fd, flag)
+
+        def _register(self, eager, fd, kind, callback, *args):
+            f = self._Future()
+            if eager:
+                try:
+                    f.set_result(callback(*args))
+                    return f
+                except BlockingIOError:
+                    pass
+            queue = self._queue[kind]
+            if fd not in queue:
+                queue[fd] = []
+                old_flag = self._flag.get(fd, 0)
+                flag = self._flag[fd] = old_flag | FLAG[kind]
+                if old_flag == 0:
+                    self._poller.register(fd, flag)
+                else:
+                    self._poller.modify(fd, flag)
+            queue[fd].append((f, callback, args))
+            return f
+
+#
+# Proactor using epoll()
+#
+
+if hasattr(select, 'epoll'):
+    assert (select.EPOLLIN, select.EPOLLOUT) == (POLLIN, POLLOUT)
+
+    __all__.append('EpollProactor')
+
+    class EpollProactor(PollProactor):
+        _make_poller = select.epoll
+
+#
+# Proactor using overlapped IO and a completion port
+#
+
+try:
+    from _overlapped import *
+except ImportError:
+    if sys.platform == 'win32':
+        print('IOCP support not compiled', file=sys.sdterr)
+else:
+    __all__.append('IocpProactor')
+
+    from _winapi import CloseHandle
+    import weakref
+
+    class IocpProactor(BaseProactor):
+        def __init__(self, concurrency=0xffffffff):
+            super().__init__()
+            self._iocp = CreateIoCompletionPort(
+                INVALID_HANDLE_VALUE, NULL, 0, concurrency)
+            self._cache = {}
+            self._registered = weakref.WeakSet()
+
+        def pollable(self):
+            return bool(self._cache)
+
+        def recv(self, conn, nbytes, flags=0):
+            self._register_obj(conn)
+            ov = Overlapped(NULL)
+            ov.WSARecv(conn.fileno(), nbytes, flags)
+            return self._register(ov, conn, ov.getresult)
+
+        def send(self, conn, buf, flags=0):
+            self._register_obj(conn)
+            ov = Overlapped(NULL)
+            ov.WSASend(conn.fileno(), buf, flags)
+            return self._register(ov, conn, ov.getresult)
+
+        def accept(self, listener):
+            self._register_obj(listener)
+            conn = self._get_accept_socket()
+            ov = Overlapped(NULL)
+            ov.AcceptEx(listener.fileno(), conn.fileno())
+            def finish_accept():
+                addr = ov.getresult()
+                conn.setsockopt(socket.SOL_SOCKET,
+                                SO_UPDATE_ACCEPT_CONTEXT, listener.fileno())
+                conn.settimeout(listener.gettimeout())
+                return conn, conn.getpeername()
+            return self._register(ov, listener, finish_accept)
+
+        def connect(self, conn, address):
+            self._register_obj(conn)
+            BindLocal(conn.fileno())
+            ov = Overlapped(NULL)
+            ov.ConnectEx(conn.fileno(), address)
+            def finish_connect():
+                ov.getresult()
+                conn.setsockopt(socket.SOL_SOCKET,
+                                SO_UPDATE_CONNECT_CONTEXT, 0)
+                return conn
+            return self._register(ov, conn, finish_connect)
+
+        def _readable(self, sock):
+            raise NotImplementedError('IocpProactor._readable()')
+
+        def _writable(self, sock):
+            raise NotImplementedError('IocpProactor._writable()')
+
+        def _register_obj(self, obj):
+            if obj not in self._registered:
+                self._registered.add(obj)
+                CreateIoCompletionPort(obj.fileno(), self._iocp, 0, 0)
+
+        def _register(self, ov, obj, callback, discard=False):
+            # we prevent ov and obj from being garbage collected
+            f = None if discard else self._Future()
+            self._cache[ov.address] = (f, ov, obj, callback)
+            return f
+
+        def _get_accept_socket(self):
+            s = socket.socket()
+            s.settimeout(0)
+            return s
+
+        def _poll(self, timeout=None):
+            if timeout is None:
+                ms = INFINITE
+            elif timeout < 0:
+                raise ValueError("negative timeout")
+            else:
+                ms = int(timeout * 1000 + 0.5)
+                if ms >= INFINITE:
+                    raise ValueError("timeout too big")
+            while True:
+                status = GetQueuedCompletionStatus(self._iocp, ms)
+                if status is None:
+                    return
+                f, ov, obj, callback = self._cache.pop(status[3])
+                try:
+                    value = callback()
+                except OSError as e:
+                    if f is None:
+                        sys.excepthook(*sys.exc_info())
+                        continue
+                    f.set_exception(e)
+                    self._results.append(f)
+                else:
+                    if f is None:
+                        continue
+                    f.set_result(value)
+                    self._results.append(f)
+                ms = 0
+
+        def close(self, *, CloseHandle=CloseHandle):
+            if self._iocp is not None:
+                CloseHandle(self._iocp)
+                self._iocp = None
+
+        __del__ = close
+
+#
+# Select default proactor (IOCP does not do SSL or ipv6)
+#
+
+for _ in ('EpollProactor', 'IocpProactor', 'PollProactor', 'SelectProactor'):
+    if _ in globals():
+        Proactor = globals()[_]
+        break
+del _
+
+# Proactor = SelectProactor

File scheduling.py

                 self.unblocker = None
         self.eventloop.call_soon(self.step)
 
-    def block_io(self, fd, flag):
-        assert isinstance(fd, int), repr(fd)
-        assert flag in ('r', 'w'), repr(flag)
-        if flag == 'r':
-            self.block(self.eventloop.remove_reader, fd)
-            self.eventloop.add_reader(fd, self.unblock)
-        else:
-            self.block(self.eventloop.remove_writer, fd)
-            self.eventloop.add_writer(fd, self.unblock)
-
     def wait(self):
         """COROUTINE: Wait until this task is finished."""
         current_task = context.current_task
     yield
 
 
-def block_r(fd):
-    """COROUTINE: Block until a file descriptor is ready for reading."""
-    context.current_task.block_io(fd, 'r')
+def block_future(future):
+    """COROUTINE: Block until future is set"""
+    task = context.current_task
+    future.add_done_callback(task.unblock)
+    task.block()
     yield
 
 
-def block_w(fd):
-    """COROUTINE: Block until a file descriptor is ready for writing."""
-    context.current_task.block_io(fd, 'w')
-    yield
+def block_r(obj):
+    """COROUTINE: Block until object is readable"""
+    # XXX not implemented for IOCP.
+    f = context.eventloop.proactor._readable(obj)
+    if not f.done():
+        yield from block_future(f)
+
+
+def block_w(obj):
+    """COROUTINE: Block until object is writable"""
+    # XXX not implemented for IOCP.
+    f = context.eventloop.proactor._writable(obj)
+    if not f.done():
+        yield from block_future(f)
 
 
 def call_in_thread(func, *args, executor=None):
     """COROUTINE: Run a function in a thread."""
     task = context.current_task
     eventloop = context.eventloop
-    future = context.threadrunner.submit(func, *args, executor=executor)
+    def reschedule(_):
+        eventloop.call_soon(task.unblock_if_alive)
+    future = context.threadrunner.submit(
+        func, *args, executor=executor, insert_callback=reschedule)
     task.block(future.cancel)
     # If the thread managed to complete before we get here,
-    # add_done_callback() will call the callback right now.  Make sure
-    # the unblock() call doesn't happen until later.  But then, the
-    # task may already have been cancelled (and it may have been too
-    # late to cancel the Future) so it should be okay if this call
-    # finds the task deceased.  For that purpose we have
-    # unblock_if_alive().
-    future.add_done_callback(
-        lambda _: eventloop.call_soon(task.unblock_if_alive))
+    # reschedule() have been called.  reschedule() must be the *first*
+    # callback added to future.  Otherwise the eventloop may exit
+    # before the current task is rescheduled.  The task may already
+    # have been cancelled (and it may have been too late to cancel the
+    # Future) so it should be okay if this call finds the task
+    # deceased.  For that purpose we use unblock_if_alive().
     yield
     assert future.done()
     return future.result()
+[build_ext]
+inplace=1
+from distutils.core import setup, Extension
+
+ext = Extension('_overlapped', ['overlapped.c'], libraries=['ws2_32'])
+setup(name='_overlapped', ext_modules=[ext])
 
 # Local imports.
 import scheduling
+from scheduling import context
 
 # Errno values indicating the connection was disconnected.
 _DISCONNECTED = frozenset((errno.ECONNRESET,
         returns b''.
         """
         assert n >= 0, n
+
         while True:
             try:
-                return self.sock.recv(n)
+                f = context.eventloop.proactor.recv(self.sock, n)
+                if not f.done():
+                    yield from scheduling.block_future(f)
+                return f.result()
             except socket.error as err:
                 if err.errno in _TRYAGAIN:
                     pass
                     return b''
                 else:
                     raise  # Unexpected, propagate.
-            yield from scheduling.block_r(self.sock.fileno())
 
     def send(self, data):
         """COROUTINE; Send data to the socket, blocking until all written.
         """
         while data:
             try:
-                n = self.sock.send(data)
+                f = context.eventloop.proactor.send(self.sock, data)
+                if not f.done():
+                    yield from scheduling.block_future(f)
+                n = f.result()
             except socket.error as err:
                 if err.errno in _TRYAGAIN:
                     pass
                 if n == len(data):
                     break
                 data = data[n:]
-                continue
-            yield from scheduling.block_w(self.sock.fileno())
 
         return True
 
             try:
                 self.sslsock.do_handshake()
             except ssl.SSLWantReadError:
-                yield from scheduling.block_r(self.sslsock.fileno())
+                yield from scheduling.block_r(self.sslsock)
             except ssl.SSLWantWriteError:
-                yield from scheduling.block_w(self.sslsock.fileno())
+                yield from scheduling.block_w(self.sslsock)
             else:
                 break
 
             try:
                 return self.sslsock.recv(n)
             except ssl.SSLWantReadError:
-                yield from scheduling.block_r(self.sslsock.fileno())
+                yield from scheduling.block_r(self.sslsock)
             except ssl.SSLWantWriteError:
-                yield from scheduling.block_w(self.sslsock.fileno())
+                yield from scheduling.block_w(self.sslsock)
             except socket.error as err:
                 if err.errno in _TRYAGAIN:
-                    yield from scheduling.block_r(self.sock.fileno())
+                    yield from scheduling.block_r(self.sslsock)
                 elif err.errno in _DISCONNECTED:
                     # Can this happen?
                     return b''
             try:
                 n = self.sslsock.send(data)
             except ssl.SSLWantReadError:
-                yield from scheduling.block_r(self.sslsock.fileno())
+                yield from scheduling.block_r(self.sslsock)
             except ssl.SSLWantWriteError:
-                yield from scheduling.block_w(self.sslsock.fileno())
+                yield from scheduling.block_w(self.sslsock)
             except socket.error as err:
                 if err.errno in _TRYAGAIN:
-                    yield from scheduling.block_w(self.sock.fileno())
+                    yield from scheduling.block_w(self.sslsock)
                 elif err.errno in _DISCONNECTED:
                     return False
                 else:
         count = 0
         while n > count:
             block = yield from self.read(n - count)
+            if not block:
+                break
             blocks.append(block)
             count += len(block)
         return b''.join(blocks)
 
 def connect(sock, address):
     """COROUTINE: Connect a socket to an address."""
-    try:
-        sock.connect(address)
-    except socket.error as err:
-        if err.errno != errno.EINPROGRESS:
-            raise
-    yield from scheduling.block_w(sock.fileno())
-    err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
-    if err != 0:
-        raise IOError(err, 'Connection refused')
+    f = context.eventloop.proactor.connect(sock, address)
+    if not f.done():
+        yield from scheduling.block_future(f)
+    return f.result()
 
 
 def getaddrinfo(host, port, af=0, socktype=0, proto=0):
         """COROUTINE: Accept a connection."""
         while True:
             try:
-                conn, addr = self.sock.accept()
+                f = context.eventloop.proactor.accept(self.sock)
+                if not f.done():
+                    yield from scheduling.block_future(f)
+                return f.result()
             except socket.error as err:
-                if err.errno in _TRYAGAIN:
-                    yield from scheduling.block_r(self.sock.fileno())
-                else:
-                    raise  # Unexpected, propagate.
-            else:
-                conn.setblocking(False)
-                return conn, addr
+                if err.errno not in _TRYAGAIN:
+                    raise
 
 
 def create_listener(host, port, af=0, socktype=0, proto=0,