1. Jean-François Roche
  2. affinitic.zamqp

Commits

Jean-François Roche  committed 41ffec8

more documentation

  • Participants
  • Parent commits bb979d8
  • Branches default

Comments (0)

Files changed (18)

File README.txt

View file
   * Documentation: http://docs.affinitic.be/affinitic.zamqp
   * Code Repository: http://bitbucket.org/jfroche/affinitic.zamqp
   * Buildbot: http://buildbot.affinitic.be/builders/affinitic.zamqp%20linux_debian/
-  * Test Coverage: http://coverage.affinitic.be/affinitic.zamqp
+  * Test Coverage: http://coverage.affinitic.be/affinitic.zamqp/affinitic.zamqp.html

File src/affinitic/zamqp/connection.py

View file
 
 $Id: event.py 67630 2006-04-27 00:54:03Z jfroche $
 """
+import grokcore.component as grok
 from zope.component import getUtility
 from zope.component.interfaces import IFactory
 from zope.interface import implementedBy
-import grokcore.component as grok
+
 from carrot.connection import BrokerConnection as CarrotBrokerConnection
+
 from affinitic.zamqp.interfaces import IBrokerConnection, IBrokerConnectionFactory
 
 

File src/affinitic/zamqp/connection.txt

View file
     :show-inheritance:
     :inherited-members:
 
+.. autoclass:: BrokerConnectionFactory
+    :members:
+    :show-inheritance:
+    :inherited-members:
 
 Example and Tests
 -----------------

File src/affinitic/zamqp/consumer.py

View file
         self._connection = connection
         self.backend = kwargs.get("backend", None)
         self.queue = queue or self.queue
-
-        # Binding.
         self.queue = queue or self.queue
         self.exchange = exchange or self.exchange
         self.routing_key = routing_key or self.routing_key
         self.callbacks = []
-
-        # Options
         self.durable = kwargs.get("durable", self.durable)
         self.exclusive = kwargs.get("exclusive", self.exclusive)
         self.auto_delete = kwargs.get("auto_delete", self.auto_delete)
                                          self.warn_if_exists)
         self.auto_ack = kwargs.get("auto_ack", self.auto_ack)
         self.auto_declare = kwargs.get("auto_declare", self.auto_declare)
-
-        # exclusive implies auto-delete.
         self._backend = None
         if self.exclusive:
             self.auto_delete = True
     def receive(self, message_data, message):
         message = self._markMessage(message)
         message = self._adaptMessage(message)
-        message = self._markMessage(message)
         if not self.callbacks:
             raise NotImplementedError("No consumer callbacks registered")
         for callback in self.callbacks:
             callback(message_data, message)
+
+    receive.__doc__ = CarrotConsumer.receive.__doc__

File src/affinitic/zamqp/consumerset.py

View file
 
 $Id: event.py 67630 2006-04-27 00:54:03Z jfroche $
 """
-from zope.component import getUtilitiesFor, createObject, getUtility
+import grokcore.component as grok
+from zope.component import getUtilitiesFor, createObject, getUtility, queryAdapter
 from zope.component.interfaces import IFactory
-from zope.interface import alsoProvides
+from zope.interface import alsoProvides, implements, implementedBy
+
 from carrot.messaging import ConsumerSet as CarrotConsumerSet
-from affinitic.zamqp.interfaces import IMessageWrapper
+
+from affinitic.zamqp.interfaces import IMessageWrapper, IConsumerSet, IConsumerSetFactory
 from affinitic.zamqp.interfaces import IMessage, IConsumer
-import grokcore.component as grok
 
 
 class ConsumerSet(CarrotConsumerSet):
+    implements(IConsumerSet)
+
+    def _adaptMessage(self, message):
+        alsoProvides(message, IMessage)
+        return queryAdapter(message, IMessageWrapper, default=message)
+
+    def _markMessage(self, message):
+        consumer = getUtility(IConsumer, message.delivery_info['exchange'])
+        return consumer._markMessage(message)
 
     def receive(self, message_data, message):
-        alsoProvides(message, IMessage)
-        message = IMessageWrapper(message)
-        consumer = getUtility(IConsumer, message.delivery_info['exchange'])
-        alsoProvides(message, consumer.messageInterface)
+        message = self._markMessage(message)
+        message = self._adaptMessage(message)
         if not self.callbacks:
             raise NotImplementedError("No consumer callbacks registered")
         for callback in self.callbacks:
 
 
 class ConsumerSetFactory(object):
+    grok.implements(IConsumerSetFactory)
+
+    title = 'ConsumerSet Factory'
+    description = 'Help creating a new Consumer Set'
+
+    def getInterfaces(self):
+        return implementedBy(ConsumerSet)
 
     def __call__(self, connectionId):
         conn = createObject('AMQPBrokerConnection', connectionId)
         consumerSet = ConsumerSet(conn)
+        consumerSet.connection_id = connectionId
         for name, consumerUtility in getUtilitiesFor(IConsumer):
             if consumerUtility.connection_id == connectionId:
                 consumerSet.add_consumer(consumerUtility)

File src/affinitic/zamqp/consumerset.txt

View file
+ConsumerSet
+===========
+
+A `ConsumerSet <#affinitic.zamqp.consumerset.ConsumerSet>`_ is a set of consumers that are listening to the same `BrokerConnection <connection.html#affinitic.zamqp.interfaces.IBrokerConnection>`_ grouped into one object.
+
+We also define a factory to ease creation of a ConsumerSet, see `IConsumerSetFactory <#affinitic.zamqp.interfaces.IConsumerSetFactory>`_
+
+Interfaces
+----------
+
+.. autointerface:: affinitic.zamqp.interfaces.IConsumerSet
+    :show-inheritance:
+    :inherited-members:
+
+.. autointerface:: affinitic.zamqp.interfaces.IConsumerSetFactory
+    :members:
+    :show-inheritance:
+    :inherited-members:
+
+Implementation
+--------------
+
+.. module:: affinitic.zamqp.consumerset
+
+.. autoclass:: ConsumerSet
+    :members:
+    :show-inheritance:
+    :inherited-members:
+
+Example and Tests
+-----------------
+
+.. includedoc:: affinitic.zamqp.tests:/consumerset.txt

File src/affinitic/zamqp/index.txt

View file
    :maxdepth: 3
 
    introduction
+   service
    connection
    consumer
+   consumerset
    publisher
    message
    changelog

File src/affinitic/zamqp/interfaces.py

View file
     """
 
     connection_id = Attribute('The BrokerConnection id where the queue is/will be registered')
+
+
+class IConsumerSet(Interface):
+    """
+    A Set of consumers connected to the same broker connection
+    """
+
+
+class IConsumerSetFactory(IFactory):
+
+    def __call__(connectionId):
+        """
+        Create a ConsumerSet and link the corresponding consumers
+        based on the ``connectionId``
+
+        :param connectionId: the id of the broker connection where the consumers are connected to
+        :rtype: ConsumerSet
+        """

File src/affinitic/zamqp/introduction.txt

View file
   * Documentation: http://docs.affinitic.be/affinitic.zamqp
   * Code Repository: http://bitbucket.org/jfroche/affinitic.zamqp
   * Buildbot: http://buildbot.affinitic.be/builders/affinitic.zamqp%20linux_debian/
-  * Test Coverage: http://coverage.affinitic.be/affinitic.zamqp
+  * Test Coverage: http://coverage.affinitic.be/affinitic.zamqp/affinitic.zamqp.html
 
 AMQP
 ----
 We use `grok <http://grok.zope.org>`_ to define our zope components. This avoids us to write too much
 zcml.
 
+Tested with
+-----------
+
 We tested this package with:
 
     * Python 2.4 and Python 2.6

File src/affinitic/zamqp/message.py

View file
 
 class MessageWrapper(grok.Adapter, VTM):
     """
+    A message wrapper that can be transaction aware
     """
     grok.context(IMessage)
     grok.implements(IMessageWrapper)
 
-    acknoledged = False
+    acknowledged = False
 
     def ack(self):
-        self.acknoledged = True
+        """
+        Mark the message as acknowledge.
+
+        If the message is registered in a transaction, we defer transmition of acknowledgement.
+
+        If the message is not registered in a transaction, we transmit acknowledgement.
+        """
+        self.acknowledged = True
         if not self.registered():
             self._ackMessage()
 
     def _ackMessage(self):
+        """
+        Transmit acknowledgement to the message broker
+        """
         self.context.ack()
 
     def _finish(self):
-        if self.acknoledged:
+        if self.acknowledged:
             self._ackMessage()
 
     def _abort(self):
-        self.acknoledged = False
+        self.acknowledged = False
 
     def __getattr__(self, name):
         try:

File src/affinitic/zamqp/message.txt

View file
-Message
-=======
+MessageWrapper
+==============
 
-A MessageWrapper enables transaction support and defered acknoledgement.
+A MessageWrapper is an adapter that wraps an incoming message, enable transaction support and defered acknoledgement.
 
-This means that we a message is received it is registered in the transaction so that if someone acknoledge the message, the acknoledgement will be send only if the transaction is correctly commited. Also if a message is acknoledge and for some reason the transaction is aborted, the message acknoledgement never gets sent.
+This means that we a message is received it is registered in the transaction so that if someone acknoledge the message, the acknoledgement will be send to the message broker only if the transaction is correctly commited. Also if a message is acknoledge and for some reason the transaction is aborted, the message acknoledgement never gets sent.
+
+Implementation
+--------------
+
+.. module:: affinitic.zamqp.message
+
+.. autoclass:: MessageWrapper
+    :members:
+    :show-inheritance:
+    :inherited-members:
+
+Example and Tests
+-----------------
 
 .. includedoc:: affinitic.zamqp.tests:/message.txt
-

File src/affinitic/zamqp/publisher.py

View file
 
 import grokcore.component as grok
 from zope.component import getUtility
+
 from carrot.connection import BrokerConnection
 from carrot.messaging import Publisher as CarrotPublisher
 
                                mandatory, immediate, priority, content_type,
                                content_encoding, serializer)
 
+    send.__doc__ = CarrotPublisher.send.__doc__
+
     def _finish(self):
         for msgInfo in self._queueOfPendingMessage:
             message_data = msgInfo['data']

File src/affinitic/zamqp/service.txt

View file
+Consumer Service in Zope
+========================
+
+Service creation
+----------------
+
+Once a Zope instance is started, after instance initialization (ZServers startup, ZODB startup, zcml parsing ...) an event (`DatabaseOpenedWithRootEvent`) is trigerred.
+
+We have a subscriber that catch this event. At first it reads the the instance configuration (zope.conf) to see if there is any ampq connection defined.
+
+If there is an amqp connection defined, it creates a new thread with a *Processor* that connect to the specified amqp connection and which consume messages on exchange related to this connection. The thread is constantly waiting for new message.
+
+A processor has a list of worker threads, that's why we call it `MultiProcessor`. Once a message arrive in the *Processor* thread, it delegates processing of the message to one of these worker thread (`ConsumerProcessor`).
+
+Service Configuration
+---------------------
+
+In the Zope 2 configuration of an instance (zope.conf), there is a section called *product-config*. In this section you can define product specific configuration in the instance.
+
+The syntax is::
+
+    <product-config affinitic.zamqp>
+        CONNECTIONID SITEPATH@SERVICENAME
+    </product-config>
+
+The service will then create a *Processor* on the connection with id `CONNECTIONID` (see BrokerConnection).
+All new incoming messages will be notified on the site manager of the site which is on path `SITEPATH`.
+For logging purpose, the multiprocessor thread will be named as `SERVICENAME`.
+
+Here is how we define a new amqp broker connection to be used by the service::
+
+    <product-config affinitic.zamqp>
+      connection1 /site1@fooService
+    </product-config>
+
+The `ConnectionBroker` utility named `connection1` has to be defined in zcml and at least one `Consumer` should exist for this connection.
+
+ZODB Event subscription
+-----------------------
+
+The ZODB events are not really present by default in Zope 2. The package `five.dbevent` patch the correct code that enables us to plug our subscriber just after instance initialization.
+The subscriber is configured in configure.zcml::
+
+    <subscriber
+        for="zope.app.appsetup.interfaces.IDatabaseOpenedWithRootEvent"
+        handler=".service.bootStrapSubscriber"/>
+
+Processor thread
+----------------
+
+One *Processor* (see `MultiProcessor`) thread create a ConsumerSet (see `ConsumerSet`) on the broker connection id given by the configuration.
+Once the ConsumerSet is created, it waits for messages on the different Consumer queues. When a message arrive it create a `ConsumerProcessor` thread
+that will consume the message and immediately returns waiting for new message.
+
+Message Consumer
+----------------
+
+The `ConsumerProcessor` thread just take the message, register it into the ZODB transaction system and pass it to the zope event system. This allow
+loose coupling of components: as the message is marked with the Consumer interface, it allow us to create the corresponding subscribers for type of message
+coming from a queue.
+
+Example and Tests
+-----------------
+
+.. includedoc:: affinitic.zamqp.tests:/service.txt

File src/affinitic/zamqp/tests/connection.txt

View file
     >>> verifyObject(IBrokerConnectionFactory, factory)
     True
 
-The provided interface by the factory is IBrokerConnection
+The provided interface by the objects created by the factory is IBrokerConnection
 
     >>> implemented = factory.getInterfaces()
     >>> implemented.isOrExtends(IBrokerConnection)

File src/affinitic/zamqp/tests/consumer.txt

View file
     ...     body = 'Hello from Message Broker'
     >>> message = Message()
 
-The ``receive`` method will call first the ``_markMessage`` which will add our marker interface on the incoming message::
+The ``receive`` method on the consumer will call first the ``_markMessage`` method which will add our marker interface on the incoming message::
 
     >>> message = consumer._markMessage(message)
     >>> IBarMessage.providedBy(message)
     True
 
-Then the ``receive`` method will call the ``_adaptMessage`` that adapts our message::
+Then the ``receive`` method will call the ``_adaptMessage`` method that can adapt our message::
 
     >>> adaptedMessage = consumer._adaptMessage(message)
     >>> adaptedMessage

File src/affinitic/zamqp/tests/consumerset.txt

View file
+Defining a set of consumers
+===========================
+
+Instead of creating connections for different exchanges on the same virtual host, you can create one
+connection to the virtual host and subscribe to different exchange.
+
+At first you need to define a connection::
+
+    >>> from affinitic.zamqp.connection import BrokerConnection
+    >>> import grokcore.component as grok
+    >>> class DummyBrokerConnection(BrokerConnection):
+    ...     id = 'bar'
+    ...     grok.name(id)
+    >>> from affinitic.zamqp.interfaces import IBrokerConnection
+    >>> from zope.component import provideUtility
+    >>> conn = DummyBrokerConnection()
+    >>> provideUtility(conn, IBrokerConnection, name=conn.id)
+
+Then define different consumers (with two different marker interface) and link them to the same connection::
+
+    >>> from zope.interface import Interface
+    >>> class IBlueMessage(Interface):
+    ...     """
+    ...     Mark incoming message on the blue queue
+    ...     """
+    >>> from affinitic.zamqp.consumer import Consumer
+    >>> class BlueMessageConsumer(Consumer):
+    ...     connection_id = 'bar'
+    ...     exchange = 'blue-queue'
+    ...     exchange = 'blue'
+    ...     messageInterface = IBlueMessage
+    ...
+    ...     def declare(self):
+    ...         """
+    ...         Fake this method as we don't want to create any queue
+    ...         in this test
+    ...         """
+
+We provide the utility with the same name as the exchange::
+
+    >>> provideUtility(BlueMessageConsumer(), name='blue')
+
+Another message consumer, for red messages this time::
+
+    >>> from zope.interface import Interface
+    >>> class IRedMessage(Interface):
+    ...     """
+    ...     Mark incoming message on the red queue
+    ...     """
+    >>> class RedMessageConsumer(Consumer):
+    ...     connection_id = 'bar'
+    ...     exchange = 'red-queue'
+    ...     exchange = 'red'
+    ...     messageInterface = IRedMessage
+    ...
+    ...     def declare(self):
+    ...         """
+    ...         Fake this method as we don't want to create any queue
+    ...         in this test
+    ...         """
+
+Again, we provide the utility with the same name as the exchange::
+
+    >>> provideUtility(RedMessageConsumer(), name='red')
+
+Now we can define our ConsumerSet which will wait for incoming message on the red and the blue queue.
+To easy consumer set creation, we have defined a factory. The only parameter that the factory takes is
+the id of the connection::
+
+    >>> from zope.component import createObject
+    >>> consumerSet = createObject('ConsumerSet', 'bar')
+    >>> consumerSet
+    <affinitic.zamqp.consumerset.ConsumerSet object at ...>
+
+We already have our two consumers in this consumerSet::
+
+    >>> consumerSet.consumers
+    [<BlueMessageConsumer object at ...>, <RedMessageConsumer object at ...>]
+
+Now let's fake reception of a message on the Blue queue. We create a basic message first::
+
+    >>> class Message(object):
+    ...
+    ...     def __init__(self, body, queue):
+    ...         self.body = body
+    ...         self.delivery_info = {}
+    ...         self.delivery_info['exchange'] = queue
+
+    >>> message = Message('World is Blue', 'blue')
+
+The ``receive`` method on the consumer set will call first the ``_markMessage`` method which will add
+the marker interface of the corresponding consumer on the incoming message::
+
+    >>> message = consumerSet._markMessage(message)
+    >>> IBlueMessage.providedBy(message)
+    True
+
+Then the ``receive`` method will call the ``_adaptMessage`` method that can adapt our message. Let's
+define a the adapter first::
+
+    >>> from zope.component import provideAdapter, adapts
+    >>> from datetime import datetime
+    >>> from affinitic.zamqp.message import MessageWrapper
+    >>> class MessageWithTimeStamp(MessageWrapper):
+    ...     adapts(IBlueMessage)
+    ...
+    ...     @property
+    ...     def incomingDate(self):
+    ...         return datetime(2010, 1, 1)
+
+    >>> provideAdapter(MessageWithTimeStamp)
+
+    >>> adaptedMessage = consumerSet._adaptMessage(message)
+    >>> adaptedMessage
+    <MessageWithTimeStamp object at ...>
+
+As we did in the consumer we need a callback on our consumer set as we want the consumer set to do
+something with the incoming message. Otherwise we get an exception::
+
+    >>> consumerSet.receive(message.body, message)
+    Traceback (most recent call last):
+    ...
+    NotImplementedError: No consumer callbacks registered
+
+So we define a dummy callback that print the message content::
+
+    >>> def printMessage(message_data, message):
+    ...     print '%s received on %s queue with body "%s"' % (message,
+    ...                                                       message.delivery_info['exchange'],
+    ...                                                       message_data)
+    >>> consumerSet.register_callback(printMessage)
+
+Now each time a blue message is received, we adapt the message and print it::
+
+    >>> consumerSet.receive(message.body, message)
+    <MessageWithTimeStamp object ...> received on blue queue with body "World is Blue"
+
+If we pass a red message, it is also printed and adapted with the default adapter for message, ``MessageWrapper``::
+
+    >>> message = Message('World is Red', 'red')
+    >>> consumerSet.receive(message.body, message)
+    <affinitic.zamqp.message.MessageWrapper object ...> received on red queue with body "World is Red"
+
+Interface conformance
+=====================
+
+Using ``zope.interface`` to check wheter our implementation does what it promise to implement.
+
+    >>> from zope.interface.verify import verifyClass
+
+For ConsumerSet::
+
+    >>> from affinitic.zamqp.interfaces import IConsumerSet
+    >>> from affinitic.zamqp.consumerset import ConsumerSet
+    >>> verifyClass(IConsumerSet, ConsumerSet)
+    True
+
+For ConsumerSetFactory::
+
+    >>> from zope.interface.verify import verifyObject
+    >>> from affinitic.zamqp.interfaces import IConsumerSetFactory
+    >>> from affinitic.zamqp.consumerset import ConsumerSetFactory
+    >>> factory = ConsumerSetFactory()
+    >>> verifyObject(IConsumerSetFactory, factory)
+    True
+
+The provided interface by the objects created by the factory is IConsumerSet
+
+    >>> implemented = factory.getInterfaces()
+    >>> implemented.isOrExtends(IConsumerSet)
+    True
+

File src/affinitic/zamqp/tests/message.txt

View file
 Message and transaction
 -----------------------
 
-If the message is transaction aware and is acknoledged then the message broker receives acknoledgement
+If the message is transaction aware and is acknowledged then the message broker receives acknowledgement
 only on commit::
 
     >>> message = DummyAMQPMessage()
     >>> import transaction
     >>> tr = transaction.begin()
     >>> messageWrapper = IMessageWrapper(message)
-    >>> messageWrapper.acknoledged
+    >>> messageWrapper.acknowledged
     False
     >>> messageWrapper._register()
-    >>> messageWrapper.acknoledged
+    >>> messageWrapper.acknowledged
     False
     >>> message._state
     ''
     >>> messageWrapper.ack()
-    >>> messageWrapper.acknoledged
+    >>> messageWrapper.acknowledged
     True
     >>> message._state
     ''
     >>> from affinitic.zamqp.interfaces import IMessageWrapper
     >>> messageWrapper = IMessageWrapper(message)
     >>> messageWrapper._register()
-    >>> messageWrapper.acknoledged
+    >>> messageWrapper.acknowledged
     False
     >>> message._state
     ''
     >>> messageWrapper.ack()
-    >>> messageWrapper.acknoledged
+    >>> messageWrapper.acknowledged
     True
     >>> message._state
     ''
     >>> transaction.abort()
     >>> message._state
     ''
-    >>> messageWrapper.acknoledged
+    >>> messageWrapper.acknowledged
     False
 
-If the message is not acknoledged nothing happen on the message.
+If the message is not acknowledged nothing happen on the message.
 
     >>> tr = transaction.begin()
     >>> message = DummyAMQPMessage()
     >>> messageWrapper = IMessageWrapper(message)
     >>> messageWrapper._register()
-    >>> messageWrapper.acknoledged
+    >>> messageWrapper.acknowledged
     False
     >>> message._state
     ''
     >>> message._state
     ''
 
-If we work without a transaction, message recieve directly the acknoledgment
+If we work without a transaction, message receive directly the acknowledgment
 
     >>> messageWrapper = IMessageWrapper(message)
     >>> message._state
     ''
     >>> messageWrapper.ack()
-    >>> messageWrapper.acknoledged
+    >>> messageWrapper.acknowledged
     True
     >>> message._state
     'ACK'

File src/affinitic/zamqp/tests/service.txt

View file
+Product configuration from zope.conf
+====================================
+
+To test the configuration that will be coming from the zope.conf file we change the global config object that is given by ``getConfiguration``::
+
+    >>> from affinitic.zamqp.service import getAutostartServiceNames
+    >>> from App.config import getConfiguration
+    >>> config = getConfiguration()
+
+If we don't configure any connection, an empty dictionary is returned::
+
+    >>> config.product_config = {}
+    >>> getAutostartServiceNames()
+    {}
+
+If we configure two connections, we get the two connections in two key/values of a dictionary::
+
+    >>> config = getConfiguration()
+    >>> config.product_config = {'affinitic.zamqp': {'conn1' : 'foo@bar',
+    ...                                              'conn2' : 'bar@foo'}}
+    >>> getAutostartServiceNames()
+    {'conn2': 'bar@foo', 'conn1': 'foo@bar'}
+
+ConsumerSet loop after Zope initialization
+==========================================
+
+We define the two connection corresponding to the previous configuration.
+
+The bar connection first::
+
+    >>> from affinitic.zamqp.connection import BrokerConnection
+    >>> from affinitic.zamqp.interfaces import IBrokerConnection
+    >>> from zope.component import provideUtility
+
+    >>> class DummyBarBrokerConnection(BrokerConnection):
+    ...     id = 'bar'
+    >>> conn = DummyBarBrokerConnection()
+    >>> provideUtility(conn, IBrokerConnection, name='bar')
+
+Then the foo connection is created::
+
+    >>> class DummyFooBrokerConnection(BrokerConnection):
+    ...     id = 'foo'
+    >>> conn = DummyFooBrokerConnection()
+    >>> provideUtility(conn, IBrokerConnection, name='foo')
+
+To be able to get the output from the two threads that will be created, we
+need to setup some logging (which is thread safe)::
+
+    >>> import logging
+    >>> import StringIO
+    >>> logger = logging.getLogger('test log')
+    >>> logger.setLevel(logging.DEBUG)
+    >>> stream = StringIO.StringIO()
+    >>> h = logging.StreamHandler(stream)
+    >>> logger.addHandler(h)
+
+As we can't receive messages in this test from a message broker, we mock the iterconsume method from the ConsumerSet so that it just log the fact that it should be consuming and don't make it blocking::
+
+    >>> def fakeIterConsume(self):
+    ...     logger.debug('Consuming on %s with connection id %s' % (self, self.connection_id))
+    ...     return []
+    >>> from carrot.messaging import ConsumerSet
+    >>> ConsumerSet.iterconsume_bak = ConsumerSet.iterconsume
+    >>> ConsumerSet.iterconsume = fakeIterConsume
+
+One more fake object, a database::
+
+    >>> from affinitic.zamqp.service import bootStrapSubscriber
+    >>> class FakeDb(object):
+    ...
+    ...     def open(self):
+    ...         pass
+    >>> db = FakeDb()
+
+And we fake the ``IDatabaseOpenedWithRootEvent`` event::
+
+    >>> class FakeDatabaseOpenedWithRootEvent(object):
+    ...     database = db
+    >>> event = FakeDatabaseOpenedWithRootEvent()
+
+We can at last read the configuration and initialise the two thread, one for consuming messages on connection bar and another for consuming messages on connection foo::
+
+    >>> bootStrapSubscriber(event)
+
+The log should have now informations telling us that the two thread were consuming::
+
+    >>> print stream.getvalue()
+    Consuming on <affinitic.zamqp.consumerset.ConsumerSet object ...> with connection id foo
+    Consuming on <affinitic.zamqp.consumerset.ConsumerSet object ...> with connection id bar
+
+Back to normal, removing our changes::
+
+    >>> logger.removeHandler(h)
+    >>> ConsumerSet.iterconsume = ConsumerSet.iterconsume_bak