Commits

Kyle Schaffrick committed a90bc79

Add timeout support in ConsumerSet.

Comments (0)

Files changed (1)

 from protocol import Connection
 from transport import GreenTCPTransport
 from spec.spec import spec_0_8 as spec
+import eventlet.api as ev_api
 
 import logging
 
         Consume from some set of queues. Returns a context manager object,
         specifically a ConsumerSet.
         """
-        no_ack = kwargs.get('no_ack', False)
-        prefetch = kwargs.get('prefetch', None)
-        return ConsumerSet(self._ch, queues, no_ack, prefetch)
+        return ConsumerSet(self._ch, queues, **kwargs)
 
     def ack(self, message, thru=False):
         """
     A set of consumers that can be used to asynchronously consume messages from
     some set of queues.
     """
-    def __init__(self, channel, queues, no_ack=False, prefetch=None):
+    def __init__(self, channel, queues, no_ack=False, prefetch=None,
+            timeout=None, raise_on_timeout=False):
         self._ch = channel
         self.queues = queues
         self._c_tags = {}
         self._state = 'stopped'
         self._no_ack = no_ack
         self._prefetch = prefetch
+        self._raise_on_timeout = raise_on_timeout
+        self._timeout = timeout
 
     def __enter__(self):
         self.start_consumers()
         if self._state == 'stopped':
             return
 
-        rx_m = self._ch.recv_method()
+        if self._timeout is not None:
+            try:
+                rx_m = ev_api.with_timeout(self._timeout, self._ch.recv_method)
+            except ev_api.TimeoutError:
+                if self._raise_on_timeout: raise
+                else: return None
+
+        else:
+            rx_m = self._ch.recv_method()
 
         if rx_m.method == spec.basic.deliver:
             (properties, body_len, body) = self._ch.recv_content()