Source

amqpev / api.py

Full commit
from protocol import Connection
from transport import GreenTCPTransport
from spec.spec import spec_0_8 as spec

import logging

log = logging.getLogger("amqpev.api")

class BrokerServiceFactory(object):
    def __init__(self, host):
        trans = GreenTCPTransport(host)
        self.conn = Connection(trans)

    def __call__(self):
        return BrokerService(self.conn.channel())

    def close(self):
        self.conn.close()


class BrokerService(object):
    """
    A BrokerService object along with it's helper classes for the various
    objects in AMQP model provide a high-level API for interacting with the
    AMQP broker services. A BrokerService is essentially an adapter around a
    pre-configured channel.
    """
    def __init__(self, channel):
        self._ch = channel
        self._exchanges = {
                '': Exchange(self._ch, "", 'direct', False, False, {},
                    pre_declared=True),
                'amq.direct': Exchange(self._ch, "amq.direct", 'direct', False,
                    False, {}, pre_declared=True),
                'amq.fanout': Exchange(self._ch, "amq.fanout", 'fanout', False,
                    False, {}, pre_declared=True),
                'amq.topic': Exchange(self._ch, "amq.topic", 'topic', False,
                    False, {}, pre_declared=True) }
        self._queues = {}

    def __enter__(self):
        return self

    def __exit__(self, type, value, tb):
        try:
            self.close()
        except:
            if type is None:
                log.exception("Exception while closing BrokerService.")
                raise
            else:
                log.exception("Masking exception while shutting down"
                        " BrokerService")

    def close(self):
        self._ch.close()

    @property
    def channel(self):
        return self._ch

    def exchange(self, name=None, type='direct', passive=False, durable=False,
            arguments={}):
        """
        If we've seen this exchange name before, return the same Exchange
        object from our identity map. If we haven't, declare it on the channel,
        save, and return the new Exchange.
        """
        if name is None:
            raise ValueError("Exchanges must be explicitly named")
        elif name in self._exchanges:
            return self._exchanges[name]
        else:
            ex = Exchange(self._ch, name, type, passive, durable, arguments)
            self._exchanges[ex.name] = ex
            return ex

    def queue(self, name=None, passive=False, durable=False, exclusive=None,
            auto_delete=None, arguments=None):
        """
        If we've seen this queue name before, return the same Queue object from
        our identity map. If we haven't, declare it on the channel, save, and
        return.

        If no name is given, a new, exclusive, auto-delete queue will always be
        declared and returned.
        """
        if exclusive is None:
            exclusive = name is None
        if auto_delete is None:
            auto_delete = name is None
        arguments = arguments or {}

        if name is not None and name in self._queues:
            return self._queues[name]
        else:
            q = Queue(self._ch, self, name, passive, durable, exclusive,
                    auto_delete, arguments)
            self._queues[q.name] = q
            return q

    def publish(self, message, routing_key, **kwargs):
        """
        Convenience for publishing on the default direct exchange.
        """
        return self.exchange('').publish(message, routing_key, **kwargs)

    def consume_from(self, *queues, **kwargs):
        """
        Consume from some set of queues. Returns a context manager object,
        specifically a ConsumerSet.
        """
        no_ack = kwargs.get('no_ack', False)
        return ConsumerSet(self._ch, queues, no_ack)

    def ack(self, message, thru=False):
        """
        Acknowledge a message.
        """
        self._ch.send_method(spec.basic.ack,
                delivery_tag=message.delivery_headers['delivery_tag'],
                multiple=thru)


class Exchange(object):
    """
    This is a helper object representing an AMQP exchange in the high-level
    API.
    """
    def __init__(self, channel, name, type, passive, durable, arguments,
            pre_declared=False):
        self._ch = channel
        self._name = name

        if not pre_declared:
            self._ch.sync_method(spec.exchange.declare,
                    (spec.exchange.declare_ok,), ticket=0, exchange=name,
                    type=type, passive=passive, durable=durable,
                    auto_delete=False, internal=False, nowait=False,
                    arguments=arguments)

    @property
    def name(self):
        return self._name

    def publish(self, message, routing_key="", mandatory=False, immediate=False):
        self._ch.send_method(spec.basic.publish, ticket=0, exchange=self._name,
                routing_key=routing_key, mandatory=mandatory,
                immediate=immediate)
        self._ch.send_content(spec.basic, message.properties, message.body,
                len(message.body))


class Queue(object):
    """
    This helper object represents an AMQP queue in the high-level API.
    """
    def __init__(self, channel, broker_svc, name, passive, durable, exclusive,
            auto_delete, arguments):
        self._ch = channel
        self._bs = broker_svc
        self._name = "" if name is None else name

        rx_m = self._ch.sync_method(spec.queue.declare,
                (spec.queue.declare_ok,), ticket=0, queue=self._name,
                passive=passive, durable=durable, exclusive=exclusive,
                auto_delete=auto_delete, nowait=False, arguments=arguments)

        self._name = rx_m.args['queue']

    @property
    def name(self):
        return self._name

    def bind(self, exch, routing_key="", arguments=None):
        arguments = arguments or {}
        self._ch.sync_method(spec.queue.bind, (spec.queue.bind_ok,), ticket=0,
                queue=self.name, exchange=exch.name, routing_key=routing_key,
                nowait=False, arguments=arguments)
        return self

    def publish(self, message, **kwargs):
        """
        Convenience for publishing to this queue via the default direct
        exchange.
        """
        return self._bs.publish(message, self.name, **kwargs)

    def get(self, no_ack=True):
        rx_m = self._ch.sync_method(spec.basic.get,
                (spec.basic.get_ok, spec.basic.get_empty),
                ticket=0, queue=self.name, no_ack=no_ack)

        if rx_m.method == spec.basic.get_empty:
            return None

        (headers, body_len, body) = self._ch.recv_content()
        msg = Message(body, headers)
        msg.delivery_headers = rx_m.args.copy()
        msg._consumed_from_queue = self
        return msg

    def __contains__(self, message):
        """
        This magic method allows neatly testing if a delivered message came
        from a particular queue using the expression "msg in queue". Returns
        true if the message was delivered from this queue.
        """
        return message._consumed_from_queue is self


class Message(object):
    """
    An AMQP message, with payload and headers.
    """
    def __init__(self, body, properties=None):
        self.body = body
        self.properties = properties or {}
        self.delivery_headers = {}

    def encode(self):
        return self.body

    @classmethod
    def decode(cls, encoded_body):
        return encoded_body


class ConsumerSet(object):
    """
    A set of consumers that can be used to asynchronously consume messages from
    some set of queues.
    """
    def __init__(self, channel, queues, no_ack=False):
        self._ch = channel
        self.queues = queues
        self._c_tags = {}
        self._state = 'stopped'
        self._no_ack = no_ack

    def __enter__(self):
        self.start_consumers()
        return self

    def __exit__(self, type, value, tb):
        try:
            self.stop_consumers()
            # This is to consume any pending messages from the receive queues.
            # stop_consumers() does not do this because the application may
            # call stop_consumers() and continue iterating to actually consume
            # those messages intead of discarding them.
            list(self)
        except:
            if type is None:
                log.exception("Exception while shutting down ConsumerSet.")
                raise
            else:
                log.exception("Masking exception while shutting down"
                        " ConsumerSet.")

    def start_consumers(self):
        if self._state == 'running': return

        for queue in self.queues:
            rx_m = self._ch.sync_method(spec.basic.consume,
                    (spec.basic.consume_ok,), ticket=0, queue=queue.name,
                    consumer_tag='', no_local=False, no_ack=self._no_ack,
                    exclusive=False, nowait=False)
            self._c_tags[ rx_m.args['consumer_tag'] ] = queue

        self._state = 'running'

    def stop_consumers(self):
        if self._state != 'running': return 

        for c_tag in self._c_tags.keys():
            self._ch.send_method(spec.basic.cancel, consumer_tag=c_tag,
                    nowait=False)

        self._state = 'stop-wait'

    def next_message(self):
        if self._state == 'stopped':
            return

        rx_m = self._ch.recv_method()

        if rx_m.method == spec.basic.deliver:
            (headers, body_len, body) = self._ch.recv_content()
            msg = Message(body, headers)
            msg.delivery_headers = rx_m.args.copy()
            msg._consumed_from_queue = self._c_tags[
                    msg.delivery_headers['consumer_tag'] ]
            return msg

        elif rx_m.method == spec.basic.cancel_ok:
            self._c_tags.pop(rx_m.args['consumer_tag'])

            if not self._c_tags:
                self._state == 'stopped'
                return

    def __iter__(self):
        while True:
            msg = self.next_message()
            if msg is None:
                return
            else:
                yield msg