Commits

Edward George committed bef16a5

hubs: kqueue: squashed fixes

* try and recover from bad file descriptor error after fork
* remove some stuff in kqueue hub that was added by accident
* skip test_closure test with kqueue hub
kqueue hub cannot detect closures of file descriptors
* simplify _reinit_kqueue()

Comments (0)

Files changed (2)

eventlet/hubs/kqueue.py

+import os
 import sys
 from eventlet import patcher
 select = patcher.original('select')
 
     def __init__(self, clock=time.time):
         super(Hub, self).__init__(clock)
+        self._events = {}
+        self._init_kqueue()
+
+    def _init_kqueue(self):
         self.kqueue = select.kqueue()
-        self._events = {}
+        self._pid = os.getpid()
+
+    def _reinit_kqueue(self):
+        self.kqueue.close()
+        self._init_kqueue()
+        kqueue = self.kqueue
+        events = [e for i in self._events.itervalues()
+                  for e in i.itervalues()]
+        kqueue.control(events, 0, 0)
+
+    def _control(self, events, max_events, timeout):
+        try:
+            return self.kqueue.control(events, max_events, timeout)
+        except OSError:
+            # have we forked?
+            if os.getpid() != self._pid:
+                self._reinit_kqueue()
+                return self.kqueue.control(events, max_events, timeout)
+            raise
 
     def add(self, evtype, fileno, cb):
         listener = super(Hub, self).add(evtype, fileno, cb)
         events = self._events.setdefault(fileno, {})
         if evtype not in events:
-            event = events[evtype] = select.kevent(fileno,
-                FILTERS.get(evtype), select.KQ_EV_ADD)
-            self.kqueue.control([event], 0, 0)
+            try:
+                event = select.kevent(fileno,
+                    FILTERS.get(evtype), select.KQ_EV_ADD)
+                self._control([event], 0, 0)
+                events[evtype] = event
+            except ValueError:
+                super(Hub, self).remove(listener)
+                raise
         return listener
 
     def _delete_events(self, events):
         del_events = map(lambda e: select.kevent(e.ident, e.filter,
                          select.KQ_EV_DELETE), events)
-        self.kqueue.control(del_events, 0, 0)
+        self._control(del_events, 0, 0)
 
     def remove(self, listener):
         super(Hub, self).remove(listener)
             if seconds:
                 sleep(seconds)
             return
-        result = self.kqueue.control([], self.MAX_EVENTS, seconds)
+        result = self._control([], self.MAX_EVENTS, seconds)
         SYSTEM_EXCEPTIONS = self.SYSTEM_EXCEPTIONS
         for event in result:
             fileno = event.ident
-            evfilt = event.filter 
+            evfilt = event.filter
             try:
                 if evfilt == FILTERS[READ]:
                     readers.get(fileno, noop).cb(fileno)
             except:
                 self.squelch_exception(fileno, sys.exc_info())
                 clear_sys_exc_info()
-
-
-
-
-

tests/greenio_test.py

             return False
 
 
+def using_kqueue_hub(_f):
+        try:
+            return 'kqueue' in type(get_hub()).__module__
+        except Exception:
+            return False
+
+
 class TestGreenSocket(LimitedTestCase):
     def assertWriteToClosedFileRaises(self, fd):
         if sys.version_info[0] < 3:
 
     @skip_with_pyevent
     @skip_if(using_epoll_hub)
+    @skip_if(using_kqueue_hub)
     def test_closure(self):
         def spam_to_me(address):
             sock = eventlet.connect(address)