Zed Shaw committed 53bc2d3

Fixes an edge case in ZeroMQ's use of edge triggered events and eventlet trampoline race conditions.

Comments (0)

Files changed (2)


 from eventlet import sleep
 from eventlet.hubs import trampoline, _threadlocal
 from eventlet.patcher import slurp_properties
+from eventlet.timeout import Timeout
 __patched__ = ['Context', 'Socket']
 slurp_properties(__zmq__, globals(), ignore=__patched__)
     def _sock_wait(self, read=False, write=False):
-        First checks if there are events in the socket, to avoid
-        edge trigger problems with race conditions.  Then if there
-        are none it will trampoline and when coming back check
-        for the events.
+        Tries to do a wait that works around a couple race conditions.  ZMQ is
+        edge triggered, and the IO fd it uses can go off before eventlet can
+        check for the IO event.  That means we need to trampoline only if 
+        necessary, only on READ, and we need to timeout the trampoline so we
+        can catch the missed events and then try the recv/send again.
         events = self.getsockopt(__zmq__.EVENTS)
             return events
             # ONLY trampoline on read events for the zmq FD
-            trampoline(self.getsockopt(__zmq__.FD), read=True)
+            try:
+                trampoline(self.getsockopt(__zmq__.FD), read=True, timeout=0.1)
+            except Timeout:
+                # this handle the edge case of missing the real event on the FD
+                # event though zmq thinks there is none
+                pass 
             return self.getsockopt(__zmq__.EVENTS)
+    def _nasty_call_wrapper(self, func, read, flags, *args, **kw):
+        """
+        This is what its name implies:  A nasty set of work arounds to minimize
+        a race condition between zmq and eventlet.  Basically, zmq will report
+        that there are NO read events on a zmq socket, but by the time you 
+        trampoline you'll miss an event that comes in on the internal FD.  Since
+        zmq is edge triggered and eventlet doesn't know to check the
+        getsockopt(EVENTS), it then stalls.
+        The solution is to try to read, catch the EAGAIN, then try to wait, but
+        only if getsockopt(EVENTS) says you need to, and *then* also timeout the
+        trampoline so that you exit when you miss an event.  In the event of a
+        timeout then you try the read again and repeat the process.
+        Kind of gross, but works and you only hit the 0.1 sleep time when
+        there's this rare stall or if you're idle.
+        """
+        if flags & __zmq__.NOBLOCK:
+            # explicitly requesting noblock to do it and return
+            return func(*args, flags=flags, **kw)
+        # we have to force a noblock on this so that we can do a lame
+        # poll to work around a race condition between ZMQ and eventlet
+        flags |= __zmq__.NOBLOCK
+        while True:
+            try:
+                m = func(*args, flags=flags, **kw)
+                # there's a situation where a recv returns None but that means
+                # try again, which is weird but whatever
+                if read and m != None: 
+                    return m
+                elif not read:
+                    return
+                # else: on this is to just try again
+            except __zmq__.ZMQError, e:
+                # we got an EAGAIN, so now we try to wait
+                if e.errno == EAGAIN:
+                    self._sock_wait(write=not read, read=read)
+                else:
+                    # well looks like something bad happened
+                    raise
     def send(self, msg, flags=0, copy=True, track=False):
         Override this instead of the internal _send_* methods 
         since those change and it's not clear when/how they're
         called in real code.
-        if flags & __zmq__.NOBLOCK:
-            super(Socket, self).send(msg, flags=flags, track=track, copy=copy)
-            return
+        func = super(Socket, self).send
+        return self._nasty_call_wrapper(func, False, flags, msg, copy=copy, track=track)
-        flags |= __zmq__.NOBLOCK
-        while True:
-            try:
-                self._sock_wait(write=True)
-                super(Socket, self).send(msg, flags=flags, track=track,
-                                         copy=copy)
-                return
-            except __zmq__.ZMQError, e:
-                if e.errno != EAGAIN:
-                    raise
     def recv(self, flags=0, copy=True, track=False):
         since those change and it's not clear when/how they're
         called in real code.
-        if flags & __zmq__.NOBLOCK:
-            return super(Socket, self).recv(flags=flags, track=track, copy=copy)
+        func = super(Socket, self).recv
+        return self._nasty_call_wrapper(func, True, flags, copy=copy, track=track)
-        flags |= __zmq__.NOBLOCK
-        while True:
-            try:
-                self._sock_wait(read=True)
-                m = super(Socket, self).recv(flags=flags, track=track, copy=copy)
-                if m is not None:
-                    return m
-            except __zmq__.ZMQError, e:
-                if e.errno != EAGAIN:
-                    raise


     def test_recv_multipart_bug68(self):
         req, rep, port = self.create_bound_pair(zmq.REQ, zmq.REP)
-        msg = ['']
+        msg = ['test']
         recieved_msg = rep.recv_multipart()
         self.assertEqual(recieved_msg, msg)
         # Send a message back the other way
-        msg2 = [""]
+        msg2 = ["test2"]
         rep.send_multipart(msg2, copy=False)
         # When receiving a copy it's a zmq.core.message.Message you get back
         recieved_msg = req.recv_multipart(copy=False)