Alexey Borzenkov avatar Alexey Borzenkov committed c9bda72

Implement vfd for libev on win32 (no-op on other platforms)

Comments (0)

Files changed (4)

 from python cimport *
 
 
+libev.vfd_init()
+
+
 __all__ = ['get_version',
            'get_header_version',
            'supported_backends',
     ACTIVE($1)')
 
 
-define(INIT, `def __cinit__(self, loop loop$2):
+define(INIT, `def __init__(self, loop loop$2):
         libev.ev_$1_init(&self._watcher, <void *>gevent_callback_$1$3)
         self.loop = loop
         self._incref = 0')
         return ''
 
 
-cdef int _open_osfhandle(int handle) except -1:
-    cdef int fd = handle
-    IFDEF_WINDOWS()
-    fd = libev._open_osfhandle(fd, 0)
-    if fd < 0:
-        raise IOError("handle=%s does not have a valid file descriptor" % handle)
-    if fd >= FD_SETSIZE:
-        raise IOError("fd %s (handle=%s) is bigger than FD_SETSIZE (%s)" % (fd, handle, FD_SETSIZE))
-    cdef unsigned long arg
-    if ioctlsocket(handle, FIONREAD, &arg) != 0:
-        raise IOError("fd=%s (handle=%s) is not a socket descriptor (file descriptors are not supported)" % (fd, handle))
-    ENDIF()
-    return fd
-
-
 cdef public class io(watcher) [object PyGeventIOObject, type PyGeventIO_Type]:
 
     WATCHER(io)
 
     cdef int _initialized # for __dealloc__, whether io is initialized
 
-    def __cinit__(self, loop loop, int fd, int events):
-        IFDEF_WINDOWS()
-        fd = _open_osfhandle(fd)
-        ENDIF()
-        libev.ev_io_init(&self._watcher, <void *>gevent_callback_io, fd, events)
+    def __init__(self, loop loop, long fd, int events):
+        cdef int vfd = libev.vfd_open(fd)
+        if self._initialized:
+            libev.vfd_free(self._watcher.fd)
+        libev.ev_io_init(&self._watcher, <void *>gevent_callback_io, vfd, events)
         self._initialized = 1
         self.loop = loop
         self._incref = 0
 
     def __dealloc__(self):
-        IFDEF_WINDOWS()
         if self._initialized:
-            libev.close(self._watcher.fd)
-        ENDIF()
+            libev.vfd_free(self._watcher.fd)
 
     property fd:
 
         def __get__(self):
-            cdef int fd = self._watcher.fd
-            IFDEF_WINDOWS()
-            fd = libev._get_osfhandle(fd)
-            ENDIF()
-            return fd
+            return libev.vfd_get(self._watcher.fd)
 
-        def __set__(self, int fd):
+        def __set__(self, long fd):
             if libev.ev_is_active(&self._watcher):
                 raise AttributeError("'io' watcher attribute 'fd' is read-only while watcher is active")
-            IFDEF_WINDOWS()
-            fd = _open_osfhandle(fd) # careful, might raise
-            libev.close(self._watcher.fd) # close the old one
-            ENDIF()
-            libev.ev_io_init(&self._watcher, <void *>gevent_callback_io, fd, self._watcher.events)
+            cdef int vfd = libev.vfd_open(fd)
+            libev.vfd_free(self._watcher.fd)
+            libev.ev_io_init(&self._watcher, <void *>gevent_callback_io, vfd, self._watcher.events)
 
     property events:
 
             return _events_to_str(self._watcher.events)
 
     def _format(self):
-        return ' fd=%s events=%s' % (self._watcher.fd, self.events_str)
+        return ' fd=%s events=%s' % (self.fd, self.events_str)
 
 
 cdef public class timer(watcher) [object PyGeventTimerObject, type PyGeventTimer_Type]:
+cdef extern from "libev_vfd.h":
+    void vfd_init()
+    long vfd_get(int)
+    int vfd_open(long) except -1
+    void vfd_free(int)
+
 cdef extern from "libev.h":
     int EV_MINPRI
     int EV_MAXPRI
     void ev_ref(ev_loop*)
     void ev_unref(ev_loop*)
     void ev_break(ev_loop*, int)
-
-    # not from libev, but it's there to avoid collisions with gevent.core._open_osfhandle
-    int _open_osfhandle(int, int)
-    int _get_osfhandle(int)
-    void close(int)

gevent/libev_vfd.h

+#ifdef _WIN32
+#ifdef EV_STANDALONE
+/*
+ * If libev on win32 is embedded, then we can use an
+ * arbitrary mapping between integer fds and OS
+ * handles. Then by defining special macros libev
+ * will use our functions.
+ */
+
+#define WIN32_LEAN_AND_MEAN
+#include <windows.h>
+#include <winsock.h>
+
+typedef struct vfd_entry_t
+{
+	long handle; /* OS handle, i.e. SOCKET */
+	int count; /* Reference count, 0 if free */
+	int next; /* Next free fd, -1 if last */
+} vfd_entry;
+
+#define VFD_INCREMENT 128
+static int vfd_num = 0; /* num allocated fds */
+static int vfd_max = 0; /* max allocated fds */
+static int vfd_next = -1; /* next free fd for reuse */
+static PyObject* vfd_map = NULL; /* map OS handle -> virtual fd */
+static vfd_entry* vfd_entries = NULL; /* list of virtual fd entries */
+
+#ifdef WITH_THREAD
+static CRITICAL_SECTION vfd_lock;
+#define VFD_LOCK_INIT   InitializeCriticalSection(&vfd_lock)
+#define VFD_LOCK_ENTER  EnterCriticalSection(&vfd_lock)
+#define VFD_LOCK_LEAVE  LeaveCriticalSection(&vfd_lock)
+#define VFD_GIL_DECLARE PyGILState_STATE ___save
+#define VFD_GIL_ENSURE  ___save = PyGILState_Ensure()
+#define VFD_GIL_RELEASE PyGILState_Release(___save)
+#else
+#define VFD_LOCK_INIT
+#define VFD_LOCK_ENTER
+#define VFD_LOCK_LEAVE
+#define VFD_GIL_DECLARE
+#define VFD_GIL_ENSURE
+#define VFD_GIL_RELEASE
+#endif
+
+static void vfd_init()
+{
+	VFD_LOCK_INIT;
+}
+
+/*
+ * Given a virtual fd returns an OS handle or -1
+ * This function is speed critical, so it cannot use GIL
+ */
+static long vfd_get(int fd)
+{
+	int handle = -1;
+	VFD_LOCK_ENTER;
+	if (vfd_entries != NULL && fd >= 0 && fd < vfd_num)
+		handle = vfd_entries[fd].handle;
+	VFD_LOCK_LEAVE;
+	return handle;
+}
+
+#define EV_FD_TO_WIN32_HANDLE(fd) vfd_get((fd))
+
+/*
+ * Given an OS handle finds or allocates a virtual fd
+ * Returns -1 on failure and sets Python exception if pyexc is non-zero
+ */
+static int vfd_open_(long handle, int pyexc)
+{
+	VFD_GIL_DECLARE;
+	int fd = -1;
+	unsigned long arg;
+	PyObject* key = NULL;
+	PyObject* value;
+
+	VFD_GIL_ENSURE;
+	if (ioctlsocket(handle, FIONREAD, &arg) != 0) {
+		if (pyexc)
+			PyErr_Format(PyExc_IOError, "%ld is not a socket (files are not supported)", handle);
+		goto done;
+	}
+	if (vfd_map == NULL) {
+		vfd_map = PyDict_New();
+		if (vfd_map == NULL)
+			goto done;
+	}
+	if (vfd_entries == NULL) {
+		vfd_entry* entries = PyMem_Malloc(sizeof(vfd_entry) * VFD_INCREMENT);
+		if (entries == NULL) {
+			if (pyexc)
+				PyErr_NoMemory();
+			goto done;
+		}
+		VFD_LOCK_ENTER;
+		vfd_entries = entries;
+		vfd_max = VFD_INCREMENT;
+		vfd_num = 0;
+		VFD_LOCK_LEAVE;
+	}
+	key = PyLong_FromLong(handle);
+	/* check if it's already in the dict */
+	value = PyDict_GetItem(vfd_map, key);
+	if (value != NULL) {
+		/* is it safe to use PyInt_AS_LONG(value) here? */
+		fd = PyInt_AsLong(value);
+		if (fd >= 0) {
+			++vfd_entries[fd].count;
+			goto done;
+		}
+	}
+	/* use the free entry, if available */
+	if (vfd_next >= 0) {
+		fd = vfd_next;
+		vfd_next = vfd_entries[fd].next;
+		VFD_LOCK_ENTER;
+		goto allocated;
+	}
+	/* check if it would be out of bounds */
+	if (vfd_num >= FD_SETSIZE) {
+		/* libev's select doesn't support more that FD_SETSIZE fds */
+		if (pyexc)
+			PyErr_Format(PyExc_IOError, "cannot watch more than %d sockets", (int)FD_SETSIZE);
+		goto done;
+	}
+	/* allocate more space if needed */
+	VFD_LOCK_ENTER;
+	if (vfd_num >= vfd_max) {
+		int newsize = vfd_max + VFD_INCREMENT;
+		vfd_entry* entries = PyMem_Realloc(vfd_entries, sizeof(vfd_entry) * newsize);
+		if (entries == NULL) {
+			VFD_LOCK_LEAVE;
+			if (pyexc)
+				PyErr_NoMemory();
+			goto done;
+		}
+		vfd_entries = entries;
+		vfd_max += VFD_INCREMENT;
+	}
+	fd = vfd_num++;
+allocated:
+	/* vfd_lock must be acquired when entering here */
+	vfd_entries[fd].handle = handle;
+	vfd_entries[fd].count = 1;
+	value = PyInt_FromLong(fd);
+	PyDict_SetItem(vfd_map, key, value);
+	Py_DECREF(value);
+	VFD_LOCK_LEAVE;
+done:
+	Py_XDECREF(key);
+	VFD_GIL_RELEASE;
+	return fd;
+}
+
+#define vfd_open(fd) vfd_open_((fd), 1)
+#define EV_WIN32_HANDLE_TO_FD(handle) vfd_open_((handle), 0)
+
+static void vfd_free_(int fd, int needclose)
+{
+	VFD_GIL_DECLARE;
+	PyObject* key;
+
+	VFD_GIL_ENSURE;
+	if (fd < 0 || fd >= vfd_num)
+		goto done; /* out of bounds */
+	if (vfd_entries[fd].count <= 0)
+		goto done; /* free entry, ignore */
+	if (!--vfd_entries[fd].count) {
+		/* fd has just been freed */
+		long handle = vfd_entries[fd].handle;
+		vfd_entries[fd].handle = -1;
+		vfd_entries[fd].next = vfd_next;
+		vfd_next = fd;
+		if (needclose)
+			closesocket(handle);
+		/* vfd_map is assumed to be != NULL */
+		key = PyLong_FromLong(handle);
+		PyDict_DelItem(vfd_map, key);
+		Py_DECREF(key);
+	}
+done:
+	VFD_GIL_RELEASE;
+}
+
+#define vfd_free(fd) vfd_free_((fd), 0)
+#define EV_WIN32_CLOSE_FD(fd) vfd_free_((fd), 1)
+
+#else
+/*
+ * If libev on win32 is not embedded in gevent, then
+ * the only way to map vfds is to use the default of
+ * using runtime fds in libev. Note that it will leak
+ * fds, because there's no way of closing them safely
+ */
+#define vfd_init() do {} while(0)
+#define vfd_get(fd) _get_osfhandle((fd))
+#define vfd_open(fd) _open_osfhandle((fd), 0)
+#define vfd_free(fd)
+#endif
+#else
+/*
+ * On non-win32 platforms vfd_* are noop macros
+ */
+#define vfd_init() do {} while(0)
+#define vfd_get(fd) (fd)
+#define vfd_open(fd) ((int)(fd))
+#define vfd_free(fd)
+#endif
         io = self.hub.loop.io
         self._read_event = io(fileno, 1)
         self._write_event = io(fileno, 2)
-        if is_windows:
-            self._connect_event = io(fileno, 3)
-        else:
-            self._connect_event = self._write_event
 
     def __repr__(self):
         return '<%s at %s %s>' % (type(self).__name__, hex(id(self)), self._formatinfo())
         # use self._read_event.loop.run_callback
         self.hub.cancel_wait(self._read_event, cancel_wait_ex)
         self.hub.cancel_wait(self._write_event, cancel_wait_ex)
-        if self._connect_event is not self._write_event:
-            self.hub.cancel_wait(self._connect_event, cancel_wait_ex)
         self._sock = _closedsocket()
         dummy = self._sock._dummy
         for method in _delegate_methods:
                 if not result or result == EISCONN:
                     break
                 elif (result in (EWOULDBLOCK, EINPROGRESS, EALREADY)) or (result == EINVAL and is_windows):
-                    self._wait(self._connect_event)
+                    self._wait(self._write_event)
                 else:
                     raise error(result, strerror(result))
         finally:
         else:
             self.hub.cancel_wait(self._read_event, cancel_wait_ex)
             self.hub.cancel_wait(self._write_event, cancel_wait_ex)
-        if self._connect_event is not self._write_event:
-            self.hub.cancel_wait(self._connect_event, cancel_wait_ex)
         self._sock.shutdown(how)
 
     family = property(lambda self: self._sock.family, doc="the socket family")
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.