Sergey Shepelev avatar Sergey Shepelev committed 807fcc8 Merge

merge

Comments (0)

Files changed (1)

eventlet/green/zmq.py

 slurp_properties(__zmq__, globals(), ignore=__patched__)
 
 from collections import deque
-from types import MethodType
 
 class _QueueLock(object):
     """A Lock that can be acquired by at most one thread. Any other
         * send_multipart
         * recv_multipart
     """
+    def __init__(self, context, socket_type):
+        super(Socket, self).__init__(context, socket_type)
 
-    _event_listener = None
-    def __init__(self, context, socket_type):
-        _Socket.__init__(self, context, socket_type)
-
-        self._send_event = _BlockedThread()
-        self._recv_event = _BlockedThread()
-
-        # customize send and recv methods based on socket type
-        ops = self._eventlet_ops.get(socket_type)
-        if ops:
-            self._send_lock = None
-            self._recv_lock = None
-            send, msend, recv, mrecv = ops
-            if send:
-                self._send_lock = _QueueLock()
-                self.send = MethodType(send, self, Socket)
-                self.send_multipart = MethodType(msend, self, Socket)
-            else:
-                self.send = self.send_multipart = self._send_not_supported
-
-            if recv:
-                self._recv_lock = _QueueLock()
-                self.recv = MethodType(recv, self, Socket)
-                self.recv_multipart = MethodType(mrecv, self, Socket)
-            else:
-                self.recv = self.recv_multipart = self._send_not_supported
+        self._eventlet_send_event = _BlockedThread()
+        self._eventlet_recv_event = _BlockedThread()
+        self._eventlet_send_lock = _QueueLock()
+        self._eventlet_recv_lock = _QueueLock()
 
         def event(fd):
             # Some events arrived at the zmq socket. This may mean
             # there's a message that can be read or there's space for
             # a message to be written.
-            self._send_event.wake()
-            self._recv_event.wake()
+            self._eventlet_send_event.wake()
+            self._eventlet_recv_event.wake()
 
         hub = hubs.get_hub()
-        self._event_listener = hub.add(hub.READ, self.getsockopt(FD), event)
+        self._eventlet_listener = hub.add(hub.READ, self.getsockopt(FD), event)
 
     @_wraps(_Socket.close)
     def close(self):
-        if self._event_listener is not None:
-            hubs.get_hub().remove(self._event_listener)
-            self._event_listener = None
+        _Socket.close(self)
+        if self._eventlet_listener is not None:
+            hubs.get_hub().remove(self._eventlet_listener)
+            self._eventlet_listener = None
             # wake any blocked threads
-            self._send_event.wake()
-            self._recv_event.wake()
-
-        _Socket.close(self)
-
-    @_wraps(_Socket.send)
-    def send(self, msg, flags=0, copy=True, track=False):
-        """Send method used by REP and REQ sockets. The lock-step
-        send->recv->send->recv restriction of these sockets makes this
-        implementation simple.
-        """
-        if flags & NOBLOCK:
-            return _Socket_send(self, msg, flags, copy, track)
-
-        flags |= NOBLOCK
-
-        while True:
-            # Avoid recreating a Message object each time Socket.send
-            # is called. TODO: Not sure if this is benefitial or not.
-            #if not copy and not track and type(msg) is str:
-            #    msg = Message(msg)
-
-            try:
-                return _Socket_send(self, msg, flags, copy, track)
-            except ZMQError, e:
-                if e.errno == EAGAIN:
-                    self._send_event.block()
-                else:
-                    raise
-
-    @_wraps(_Socket.recv)
-    def recv(self, flags=0, copy=True, track=False):
-        """Recv method used by REP and REQ sockets. The lock-step
-        send->recv->send->recv restriction of these sockets makes this
-        implementation simple.
-        """
-        if flags & NOBLOCK:
-            return _Socket_recv(self, flags, copy, track)
-
-        flags |= NOBLOCK
-
-        while True:
-            try:
-                return _Socket_recv(self, flags, copy, track)
-            except ZMQError, e:
-                if e.errno == EAGAIN:
-                    self._recv_event.block()
-                else:
-                    raise
+            self._eventlet_send_event.wake()
+            self._eventlet_recv_event.wake()
 
     @_wraps(_Socket.getsockopt)
     def getsockopt(self, option):
         return result
 
     @_wraps(_Socket.send)
-    def _send_not_supported(self, msg, flags, copy, track):
-        raise ZMQError(ENOTSUP)
-
-    @_wraps(_Socket.recv)
-    def _recv_not_supported(self, flags, copy, track):
-        raise ZMQError(ENOTSUP)
-
-    @_wraps(_Socket.send)
-    def _xsafe_send(self, msg, flags=0, copy=True, track=False):
+    def send(self, msg, flags=0, copy=True, track=False):
         """A send method that's safe to use when multiple greenthreads
         are calling send, send_multipart, recv and recv_multipart on
         the same socket.
             # Instead of calling both wake methods, could call
             # self.getsockopt(EVENTS) which would trigger wakeups if
             # needed.
-            self._send_event.wake()
-            self._recv_event.wake()
+            self._eventlet_send_event.wake()
+            self._eventlet_recv_event.wake()
             return result
 
         # TODO: pyzmq will copy the message buffer and create Message
         # objects under some circumstances. We could do that work here
         # once to avoid doing it every time the send is retried.
         flags |= NOBLOCK
-        with self._send_lock:
+        with self._eventlet_send_lock:
             while True:
                 try:
                     return _Socket_send(self, msg, flags, copy, track)
                 except ZMQError, e:
                     if e.errno == EAGAIN:
-                        self._send_event.block()
+                        self._eventlet_send_event.block()
                     else:
                         raise
                 finally:
                     # The call to send processes 0mq events and may
                     # make the socket ready to recv. Wake the next
                     # receiver. (Could check EVENTS for POLLIN here)
-                    self._recv_event.wake()
+                    self._eventlet_recv_event.wake()
 
 
     @_wraps(_Socket.send_multipart)
-    def _xsafe_send_multipart(self, msg_parts, flags=0, copy=True, track=False):
+    def send_multipart(self, msg_parts, flags=0, copy=True, track=False):
         """A send_multipart method that's safe to use when multiple
         greenthreads are calling send, send_multipart, recv and
         recv_multipart on the same socket.
 
         # acquire lock here so the subsequent calls to send for the
         # message parts after the first don't block
-        with self._send_lock:
+        with self._eventlet_send_lock:
             return _Socket_send_multipart(self, msg_parts, flags, copy, track)
 
     @_wraps(_Socket.recv)
-    def _xsafe_recv(self, flags=0, copy=True, track=False):
+    def recv(self, flags=0, copy=True, track=False):
         """A recv method that's safe to use when multiple greenthreads
         are calling send, send_multipart, recv and recv_multipart on
         the same socket.
             # Instead of calling both wake methods, could call
             # self.getsockopt(EVENTS) which would trigger wakeups if
             # needed.
-            self._send_event.wake()
-            self._recv_event.wake()
+            self._eventlet_send_event.wake()
+            self._eventlet_recv_event.wake()
             return msg
 
         flags |= NOBLOCK
-        with self._recv_lock:
+        with self._eventlet_recv_lock:
             while True:
                 try:
                     return _Socket_recv(self, flags, copy, track)
                 except ZMQError, e:
                     if e.errno == EAGAIN:
-                        self._recv_event.block()
+                        self._eventlet_recv_event.block()
                     else:
                         raise
                 finally:
                     # The call to recv processes 0mq events and may
                     # make the socket ready to send. Wake the next
                     # receiver. (Could check EVENTS for POLLOUT here)
-                    self._send_event.wake()
+                    self._eventlet_send_event.wake()
 
     @_wraps(_Socket.recv_multipart)
-    def _xsafe_recv_multipart(self, flags=0, copy=True, track=False):
+    def recv_multipart(self, flags=0, copy=True, track=False):
         """A recv_multipart method that's safe to use when multiple
         greenthreads are calling send, send_multipart, recv and
         recv_multipart on the same socket.
 
         # acquire lock here so the subsequent calls to recv for the
         # message parts after the first don't block
-        with self._recv_lock:
+        with self._eventlet_recv_lock:
             return _Socket_recv_multipart(self, flags, copy, track)
-
-    # The behavior of the send and recv methods depends on the socket
-    # type. See http://api.zeromq.org/2-1:zmq-socket for explanation
-    # of socket types. For the green Socket, our main concern is
-    # supporting calling send or recv from multiple greenthreads when
-    # it makes sense for the socket type.
-    _send_only_ops = (_xsafe_send, _xsafe_send_multipart, None, None)
-    _recv_only_ops = (None, None, _xsafe_recv, _xsafe_recv_multipart)
-    _full_ops = (_xsafe_send, _xsafe_send_multipart, _xsafe_recv, _xsafe_recv_multipart)
-
-    _eventlet_ops = {
-        PUB: _send_only_ops,
-        SUB: _recv_only_ops,
-
-        PUSH: _send_only_ops,
-        PULL: _recv_only_ops,
-
-        PAIR: _full_ops
-        }
-
-    try:
-        _eventlet_ops[XREP] = _full_ops
-        _eventlet_ops[XREQ] = _full_ops
-    except AttributeError:
-        # XREP and XREQ are being renamed ROUTER and DEALER
-        _eventlet_ops[ROUTER] = _full_ops
-        _eventlet_ops[DEALER] = _full_ops
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.