1. Kyle Schaffrick
  2. amqpev

Source

amqpev / examples / consumer_simple.py

from __future__ import with_statement
import amqpev.api as api

def main():

    # Each one of these creates a single connection to the broker.
    service_factory = api.BrokerServiceFactory('dev.rabbitmq.com')

    # Calling the factory creates a new broker service object on its own channel.
    # Each channel may be used from a separate coroutine, and the traffic between
    # it and the broker will be multiplexed on the connection.
    try:
        with service_factory() as svc:
            consume(svc)
    finally:
        service_factory.close()


def consume(svc):
    # Create a queue object. If you have never declared this queue before, it
    # will be declared on the broker and created for you. If it has been
    # previously declared, you will get the same object.
    #q = svc.queue('foo')
    exch = svc.exchange('amq.rabbitmq.log', type='topic')
    q = svc.queue().bind(exch, routing_key="#")

    # Create a consumer set. consume_from is variadic and will accept any
    # number of queue objects from which to consume simultaneously.
    with svc.consume_from(q) as cs:

        # The consumer set is an iterable object, and it will continue to
        # return messages as they are delivered, blocking your coroutine as
        # necessary. The messages are generated in the order they are delivered
        # by the broker.
        for msg in cs:

            print msg.delivery_headers['routing_key'], msg.body
            svc.ack(msg)

            # stop_consumers will notify the broker to stop sending messages
            # from our consumer set by cancelling the consume request(s).
            # However, the consumer set may not end iteration immediately if
            # messages have been delivered to us that we have not seen yet.
            # Once all messages have been seen, iteration will end.
            if msg.body[:4] == "STOP":
                cs.stop_consumers()

if __name__ == '__main__':
    main()