Zed Shaw avatar Zed Shaw committed 53bc2d3

Fixes an edge case in ZeroMQ's use of edge triggered events and eventlet trampoline race conditions.

Comments (0)

Files changed (2)

eventlet/green/zmq.py

 from eventlet import sleep
 from eventlet.hubs import trampoline, _threadlocal
 from eventlet.patcher import slurp_properties
+from eventlet.timeout import Timeout
 
 __patched__ = ['Context', 'Socket']
 slurp_properties(__zmq__, globals(), ignore=__patched__)
 
     def _sock_wait(self, read=False, write=False):
         """
-        First checks if there are events in the socket, to avoid
-        edge trigger problems with race conditions.  Then if there
-        are none it will trampoline and when coming back check
-        for the events.
+        Tries to do a wait that works around a couple race conditions.  ZMQ is
+        edge triggered, and the IO fd it uses can go off before eventlet can
+        check for the IO event.  That means we need to trampoline only if 
+        necessary, only on READ, and we need to timeout the trampoline so we
+        can catch the missed events and then try the recv/send again.
         """
         events = self.getsockopt(__zmq__.EVENTS)
 
             return events
         else:
             # ONLY trampoline on read events for the zmq FD
-            trampoline(self.getsockopt(__zmq__.FD), read=True)
+            try:
+                trampoline(self.getsockopt(__zmq__.FD), read=True, timeout=0.1)
+            except Timeout:
+                # this handle the edge case of missing the real event on the FD
+                # event though zmq thinks there is none
+                pass 
+                
             return self.getsockopt(__zmq__.EVENTS)
 
+    def _nasty_call_wrapper(self, func, read, flags, *args, **kw):
+        """
+        This is what its name implies:  A nasty set of work arounds to minimize
+        a race condition between zmq and eventlet.  Basically, zmq will report
+        that there are NO read events on a zmq socket, but by the time you 
+        trampoline you'll miss an event that comes in on the internal FD.  Since
+        zmq is edge triggered and eventlet doesn't know to check the
+        getsockopt(EVENTS), it then stalls.
+
+        The solution is to try to read, catch the EAGAIN, then try to wait, but
+        only if getsockopt(EVENTS) says you need to, and *then* also timeout the
+        trampoline so that you exit when you miss an event.  In the event of a
+        timeout then you try the read again and repeat the process.
+
+        Kind of gross, but works and you only hit the 0.1 sleep time when
+        there's this rare stall or if you're idle.
+        """
+        if flags & __zmq__.NOBLOCK:
+            # explicitly requesting noblock to do it and return
+            return func(*args, flags=flags, **kw)
+
+        # we have to force a noblock on this so that we can do a lame
+        # poll to work around a race condition between ZMQ and eventlet
+        flags |= __zmq__.NOBLOCK
+
+        while True:
+            try:
+                m = func(*args, flags=flags, **kw)
+
+                # there's a situation where a recv returns None but that means
+                # try again, which is weird but whatever
+                if read and m != None: 
+                    return m
+                elif not read:
+                    return
+                # else: on this is to just try again
+
+            except __zmq__.ZMQError, e:
+                # we got an EAGAIN, so now we try to wait
+                if e.errno == EAGAIN:
+                    self._sock_wait(write=not read, read=read)
+                else:
+                    # well looks like something bad happened
+                    raise
+
     def send(self, msg, flags=0, copy=True, track=False):
         """
         Override this instead of the internal _send_* methods 
         since those change and it's not clear when/how they're
         called in real code.
         """
-        if flags & __zmq__.NOBLOCK:
-            super(Socket, self).send(msg, flags=flags, track=track, copy=copy)
-            return
+        func = super(Socket, self).send
+        return self._nasty_call_wrapper(func, False, flags, msg, copy=copy, track=track)
 
-        flags |= __zmq__.NOBLOCK
-
-        while True:
-            try:
-                self._sock_wait(write=True)
-                super(Socket, self).send(msg, flags=flags, track=track,
-                                         copy=copy)
-                return
-            except __zmq__.ZMQError, e:
-                if e.errno != EAGAIN:
-                    raise
 
     def recv(self, flags=0, copy=True, track=False):
         """
         since those change and it's not clear when/how they're
         called in real code.
         """
-        if flags & __zmq__.NOBLOCK:
-            return super(Socket, self).recv(flags=flags, track=track, copy=copy)
+        func = super(Socket, self).recv
+        return self._nasty_call_wrapper(func, True, flags, copy=copy, track=track)
 
-        flags |= __zmq__.NOBLOCK
-
-        while True:
-            try:
-                self._sock_wait(read=True)
-                m = super(Socket, self).recv(flags=flags, track=track, copy=copy)
-                if m is not None:
-                    return m
-            except __zmq__.ZMQError, e:
-                if e.errno != EAGAIN:
-                    raise
-
-

tests/zmq_test.py

     @skip_unless(zmq_supported)
     def test_recv_multipart_bug68(self):
         req, rep, port = self.create_bound_pair(zmq.REQ, zmq.REP)
-        msg = ['']
+        msg = ['test']
+
         req.send_multipart(msg)
         recieved_msg = rep.recv_multipart()
         self.assertEqual(recieved_msg, msg)
 
         # Send a message back the other way
-        msg2 = [""]
+        msg2 = ["test2"]
         rep.send_multipart(msg2, copy=False)
         # When receiving a copy it's a zmq.core.message.Message you get back
         recieved_msg = req.recv_multipart(copy=False)
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.