Commits

Jean-François Roche committed cc479f4

more documentation

  • Participants
  • Parent commits 11490f5

Comments (0)

Files changed (6)

 This package defines basic components of a Messaging Gateway integrated inside Zope using AMQP.
 
-ZCA is about loosely coupled design in an Application.
+Zope Component Architecture (ZCA) is about loosely coupled design in an Application.
 AMQP is about loosely coupled communication between Applications. This package tries to join the two.
 
   * Documentation: http://docs.affinitic.be/affinitic.zamqp
 from setuptools import setup, find_packages
 import os
 
-version = '0.2dev'
+version = '0.1'
 
 setup(name='affinitic.zamqp',
       version=version,
             'console_scripts': [
                   'publishmsg = affinitic.zamqp.publisher:main']},
       extras_require=dict(
-            test=['zope.testing', 'Zope2'],
+            test=['zope.testing', 'Zope2', 'five.dbevent'],
             zope210=['five.dbevent', 'uuid'],
             zope212=['five.dbevent', 'Zope2'],
             docs=['z3c.recipe.sphinxdoc',

src/affinitic/zamqp/introduction.txt

 
 This package define basic components of a Messaging Gateway [#MessagingGateway]_ integrated inside Zope using AMQP.
 
-ZCA is about loosely coupled design in an Application.
+Zope Component Architecture (ZCA) is about loosely coupled design in an Application.
 AMQP is about loosely coupled communication between Applications. This package tries to join the two.
 
   * Documentation: http://docs.affinitic.be/affinitic.zamqp

src/affinitic/zamqp/processor.py

 # -*- coding: utf-8 -*-
 """
-<+ MODULE_NAME +>
+affinitic.zamqp
 
-Licensed under the <+ LICENSE +> license, see LICENCE.txt for more details.
+Licensed under the GPL license, see LICENCE.txt for more details.
 Copyright by Affinitic sprl
 
 $Id: event.py 67630 2006-04-27 00:54:03Z jfroche $
 import transaction
 from time import sleep
 from ZODB.POSException import ConflictError
-from zope.component import queryUtility, createObject
+from zope.component import createObject
 from zope.app.publication.zopepublication import ZopePublication
 from zope.app.component.hooks import setSite
 
 from affinitic.zamqp import logger
-from affinitic.zamqp.interfaces import IConsumer, IArrivedMessage
+from affinitic.zamqp.interfaces import IArrivedMessage
 
 log = logging.getLogger('affinitic.zamqp')
 
 storage = threading.local()
 
 
-class ConsumerProcessor(object):
+class ConsumerWorker(object):
 
     def __init__(self, site, waitTime=3.0):
         self.site = site
                                                                  message._state))
             if not results:
                 #If there is no result
-                #XXX nobody used the message: error queue/dead letter queue ?
+                #XXX nobody handled the message: error queue/dead letter queue ?
                 pass
 
 
     def threadName(self):
         return threading.currentThread().getName()
 
-    def getInterfaceByChannel(self, channelId):
-        consumer = queryUtility(IConsumer, name=channelId)
-        if consumer is not None:
-            return consumer.messageInterface
-        return None
-
     def getSite(self):
         self.root = self.connection.root()
         return getattr(self.root[ZopePublication.root_name], self.sitePath)
 
-    def getSiteManager(self):
-        return self.getSite().getSiteManager()
-
     def registerConsumers(self, connectionId):
         self.consumerSet = createObject('ConsumerSet', connectionId)
         self.consumerSet.register_callback(self.notify_message)
             else:
                 sleep(random.random())
                 break
-        processor = ConsumerProcessor(self.getSite())
+        processor = ConsumerWorker(self.getSite())
+        newThreadName = 'ConsumerWork-%s' % (len(self.threads))
+        logger.debug('Thread %s is starting new thread %s' % (self.threadName, newThreadName))
         thread = threading.Thread(
+            name=newThreadName,
             target=processor, args=(message,))
         self.threads.append(thread)
         thread.start()

src/affinitic/zamqp/service.txt

     </product-config>
 
 The service will then create a *Processor* on the connection with id `CONNECTIONID` (see BrokerConnection).
-All new incoming messages will be notified on the site manager of the site which is on path `SITEPATH`.
+All new incoming messages will be notified by the site manager of the site which is on path `SITEPATH`.
 For logging purpose, the multiprocessor thread will be named as `SERVICENAME`.
 
 Here is how we define a new amqp broker connection to be used by the service::
 
     <product-config affinitic.zamqp>
-      connection1 /site1@fooService
+      connection1 site1@fooService
     </product-config>
 
 The `ConnectionBroker` utility named `connection1` has to be defined in zcml and at least one `Consumer` should exist for this connection.
 
-ZODB Event subscription
------------------------
+ZODB initialization event subscription
+--------------------------------------
 
-The ZODB events are not really present by default in Zope 2. The package `five.dbevent` patch the correct code that enables us to plug our subscriber just after instance initialization.
+The ZODB initialization events are not really present by default in Zope 2. The package `five.dbevent` patch the correct code that enables us to plug our subscriber just after instance initialization.
 The subscriber is configured in configure.zcml::
 
     <subscriber
 ----------------
 
 One *Processor* (see `MultiProcessor`) thread create a ConsumerSet (see `ConsumerSet`) on the broker connection id given by the configuration.
-Once the ConsumerSet is created, it waits for messages on the different Consumer queues. When a message arrive it create a `ConsumerProcessor` thread
+Once the ConsumerSet is created, it waits for messages on the different Consumer queues. When a message arrive it create a `ConsumerWorker` thread
 that will consume the message and immediately returns waiting for new message.
 
 Message Consumer
 ----------------
 
-The `ConsumerProcessor` thread just take the message, register it into the ZODB transaction system and pass it to the zope event system. This allow
-loose coupling of components: as the message is marked with the Consumer interface, it allow us to create the corresponding subscribers for type of message
-coming from a queue.
+The `ConsumerWorker` thread just take the message, register it into the ZODB transaction system and pass it to the zope event system. This allow loose coupling of components: as the message is marked with the Consumer interface, it allow us to create the corresponding subscribers for type of message coming from a queue.
 
 Example and Tests
 -----------------
 
 .. includedoc:: affinitic.zamqp.tests:/service.txt
+.. includedoc:: affinitic.zamqp.tests:/processor.txt

src/affinitic/zamqp/tests/processor.txt

+Handling incoming messages using Processor and Worker
+=====================================================
+
+We have two separated component that compose a Consumer in Zope:
+
+    1. ``MultiProcessor`` which is waiting for message to come. Once it has one it pass it start a
+       new thread and pass the message over to one of its worker.
+
+    2. ``ConsumingWorker`` which recieve a message, register it into the transaction machinery and pass it to the Zope event subscribers.
+
+Create a bar connection first::
+
+    >>> from affinitic.zamqp.connection import BrokerConnection
+    >>> from affinitic.zamqp.interfaces import IBrokerConnection
+    >>> from zope.component import provideUtility
+
+    >>> class DummyBarBrokerConnection(BrokerConnection):
+    ...     id = 'bar'
+    >>> conn = DummyBarBrokerConnection()
+    >>> provideUtility(conn, IBrokerConnection, name='bar')
+
+We fake an Application, a site, a database object and a db connection::
+
+    >>> from zope.component import getGlobalSiteManager
+    >>> class Site(object):
+    ...     """
+    ...     A fake site with a site manager
+    ...     """
+    ...
+    ...     def getSiteManager(self):
+    ...         return getGlobalSiteManager()
+
+    >>> class Application(object):
+    ...     """
+    ...     A fake Application with one site
+    ...     """
+    ...
+    ...     site1 = Site()
+
+    >>> class Connection(object):
+    ...     """
+    ...     A fake connection to the root of a ZODB
+    ...     """
+    ...
+    ...     def root(self):
+    ...         return {'Application':Application()}
+
+    >>> class FakeDb(object):
+    ...     """
+    ...     A fake database
+    ...     """
+    ...
+    ...     def open(self):
+    ...         return Connection()
+
+    >>> db = FakeDb()
+
+We create by hand a MultiProcessor. It is normally instantiated by the ``bootStrapSubscriber``
+once the connection to the ZODB is ready after Zope startup.
+
+We pass it the database connection, the name of the site that will call the subscribers and then
+name of the connection that will be used to instantiate the ``ConsumerSet``::
+
+    >>> from affinitic.zamqp.processor import MultiProcessor
+    >>> processor = MultiProcessor(db, 'site1', 'bar')
+
+The site related to this processor is the one created previously::
+
+    >>> processor.getSite()
+     <Site object ...>
+
+We fake now a message ::
+
+    >>> from affinitic.zamqp.interfaces import IMessage
+    >>> from zope.interface import implements
+    >>> class DummyAMQPMessage(object):
+    ...     implements(IMessage)
+    ...     _state = 'NEW'
+    ...     delivery_info = {'delivery_tag': 999,
+    ...                      'exchange': 'SuperExchange'}
+    ...
+    ...     def ack(self):
+    ...         self._state = 'ACK'
+
+    >>> from affinitic.zamqp.interfaces import IMessageWrapper
+    >>> message = DummyAMQPMessage()
+
+We wrap the message (just as the ``Consumer`` or ``ConsumerSet`` are doing)::
+
+    >>> messageWrapper = IMessageWrapper(message)
+
+We also mark the message wrapper with a marker interface so that subscribers will be able
+to subscribe to this specific type of message::
+
+    >>> from zope.interface import Interface, alsoProvides
+    >>> class IDummyMessage(Interface):
+    ...     """
+    ...     Marker interface for dummy message
+    ...     """
+    >>> alsoProvides(messageWrapper, IDummyMessage)
+
+To be able to get the output from the threads that will be created, we
+need to setup some logging (which is thread safe)::
+
+    >>> import logging
+    >>> import StringIO
+    >>> logger = logging.getLogger('affinitic.zamqp')
+    >>> logger.setLevel(logging.DEBUG)
+    >>> logger.handlers = []
+    >>> stream = StringIO.StringIO()
+    >>> h = logging.StreamHandler(stream)
+    >>> logger.addHandler(h)
+
+Process a message without subscription adapter
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+We notify a new incoming message (this is normally called by the processor loop)::
+
+    >>> processor.notify_message('A Message', messageWrapper)
+
+This is asynchronous so let's wait a bit for the ConsumerWorker to do its job
+
+    >>> from time import sleep
+    >>> sleep(1)
+
+We read what happen thanks to the logging. And we see that no subscriber handled our message and
+nobody acknowledged it::
+
+    >>> print stream.getvalue()
+    Thread MainThread is starting new thread ConsumerWork-0
+    Notify new message 999 in exchange: SuperExchange
+    Before commit Message 999 (status = 'NEW')
+    Handled Message 999 (status = 'NEW')
+
+Process a message with one subscription adapter
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Now let's use a new message and define a subscriber to see what happens.
+
+Reset the logging stream::
+
+    >>> stream.seek(0)
+
+Mark the message as being another one::
+
+    >>> message.delivery_info['delivery_tag'] = 1000
+
+Register a subscription adapter that log a dummy message and acknowledge the dummy message::
+
+    >>> from zope.component import provideSubscriptionAdapter
+    >>> def loggingSubscriber(message):
+    ...     logger = logging.getLogger('affinitic.zamqp')
+    ...     logger.debug('Subscriber handling message %s' % message)
+    ...     message.ack()
+
+    >>> from affinitic.zamqp.interfaces import IArrivedMessage
+    >>> provideSubscriptionAdapter(loggingSubscriber, [IDummyMessage], IArrivedMessage)
+
+We notify again new incoming message (this is normally called by the processor loop)::
+
+    >>> processor.notify_message('A Message', messageWrapper)
+
+This is asynchronous so let's wait a bit for the ConsumerWorker to do its job
+
+    >>> sleep(1)
+
+We read what happen thanks to the logging. And we see that our subscriber handled our message and
+acknowledged it. You can see also that before transaction commit the message is still marked as not
+being acknowledged::
+
+    >>> print stream.getvalue()
+    Thread MainThread is starting new thread ConsumerWork-0
+    Notify new message 1000 in exchange: SuperExchange
+    Subscriber handling message <affinitic.zamqp.message.MessageWrapper object ...>
+    Before commit Message 1000 (status = 'NEW')
+    Handled Message 1000 (status = 'ACK')
+
+Process a message with one failing subscription adapter
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Now let's see what happen if one of the subscription adapter is failing.
+
+Reset the logging stream::
+
+    >>> stream.seek(0)
+
+Mark the message as being another one::
+
+    >>> message.delivery_info['delivery_tag'] = 1001
+
+Register a subscription adapter that just fails::
+
+    >>> def failingSubscriber(message):
+    ...     raise AttributeError('Fake error')
+
+    >>> provideSubscriptionAdapter(failingSubscriber, [IDummyMessage], IArrivedMessage)
+
+We notify again new incoming message (this is normally called by the processor loop)::
+
+    >>> processor.notify_message('A Message', messageWrapper)
+
+This is asynchronous so let's wait a bit for the ConsumerWorker to do its job
+
+    >>> sleep(1)
+
+We read what happen thanks to the logging. And we see that our subscriber handled our message and
+failed. As the transaction was not commited the message is not acknowledged.
+
+    >>> print stream.getvalue()
+    Thread MainThread is starting new thread ConsumerWork-0
+    Notify new message 1001 in exchange: SuperExchange
+    Subscriber handling message <affinitic.zamqp.message.MessageWrapper object ...>
+    Error while running job 1001 on exchange SuperExchange
+    Fake error
+    Traceback (most recent call last):
+    ...
+    AttributeError: Fake error
+