Commits

Jean-François Roche  committed c56c511

add transaction aware messages

  • Participants
  • Parent commits bdacb73

Comments (0)

Files changed (9)

File src/affinitic/zamqp/configure.zcml

 <configure xmlns="http://namespaces.zope.org/zope"
+           xmlns:grok="http://namespaces.zope.org/grok"
            i18n_domain="zamqp">
 
     <include package="z3c.autoinclude" file="meta.zcml" />
         for="zope.app.appsetup.interfaces.IDatabaseOpenedWithRootEvent"
         handler=".service.bootStrapSubscriber"/>
 
+    <grok:grok package="."/>
+
 </configure>

File src/affinitic/zamqp/connection.py

 
 $Id: event.py 67630 2006-04-27 00:54:03Z jfroche $
 """
+from zope.component import getUtility
+from zope.component.interfaces import IFactory
 import grokcore.component as grok
 from carrot.connection import BrokerConnection as CarrotBrokerConnection
 from affinitic.zamqp.interfaces import IBrokerConnection
     grok.implements(IBrokerConnection)
     grok.baseclass()
 
-    def __init__(self, hostname=None, userid=None, password=None,
-                 virtual_host=None, port=None, **kwargs):
+    def __init__(self):
         self._closed = None
         self._connection = None
+
+
+class BrokerConnectionFactory(object):
+    grok.implements(IFactory)
+
+    def __call__(self, connectionId):
+        return getUtility(IBrokerConnection, name=connectionId).__class__()
+
+
+grok.global_utility(BrokerConnectionFactory,
+    provides=IFactory, name='AMQPBrokerConnection')

File src/affinitic/zamqp/consumer.py

 
 $Id: event.py 67630 2006-04-27 00:54:03Z jfroche $
 """
+from zope.interface import alsoProvides
 from zope.component import getUtility
 from carrot.messaging import Consumer as CarrotConsumer
-from affinitic.zamqp.interfaces import IConsumer, IBrokerConnection
+from affinitic.zamqp.interfaces import IConsumer, IBrokerConnection, IMessage, IMessageWrapper
 import grokcore.component as grok
 
 
     def as_dict(self):
         return {'exchange': self.exchange,
                 'routing_key': self.routingKey}
+
+    def receive(self, message_data, message):
+        alsoProvides(message, IMessage)
+        message = IMessageWrapper(message)
+        alsoProvides(message, self.messageInterface)
+        if not self.callbacks:
+            raise NotImplementedError("No consumer callbacks registered")
+        for callback in self.callbacks:
+            callback(message_data, message)

File src/affinitic/zamqp/consumerset.py

+# -*- coding: utf-8 -*-
+"""
+affinitic.zamqp
+
+Licensed under the GPL license, see LICENCE.txt for more details.
+Copyright by Affinitic sprl
+
+$Id: event.py 67630 2006-04-27 00:54:03Z jfroche $
+"""
+from zope.component import getUtilitiesFor, createObject, getUtility
+from zope.component.interfaces import IFactory
+from zope.interface import alsoProvides
+from carrot.messaging import ConsumerSet as CarrotConsumerSet
+from affinitic.zamqp.interfaces import IMessageWrapper
+from affinitic.zamqp.interfaces import IMessage, IConsumer
+import grokcore.component as grok
+
+
+class ConsumerSet(CarrotConsumerSet):
+
+    def receive(self, message_data, message):
+        alsoProvides(message, IMessage)
+        message = IMessageWrapper(message)
+        consumer = getUtility(IConsumer, message.delivery_info['exchange'])
+        alsoProvides(message, consumer.messageInterface)
+        if not self.callbacks:
+            raise NotImplementedError("No consumer callbacks registered")
+        for callback in self.callbacks:
+            callback(message_data, message)
+
+
+class ConsumerSetFactory(object):
+
+    def __call__(self, connectionId):
+        conn = createObject('AMQPBrokerConnection', connectionId)
+        consumerSet = ConsumerSet(conn)
+        for name, consumerUtility in getUtilitiesFor(IConsumer):
+            if consumerUtility.connection_id == connectionId:
+                consumerSet.add_consumer(consumerUtility)
+        return consumerSet
+
+grok.global_utility(ConsumerSetFactory,
+    provides=IFactory, name='ConsumerSet')

File src/affinitic/zamqp/interfaces.py

     """
     Event fired when a new message has arrived
     """
+
+
+class IMessageWrapper(Interface):
+    """
+    A Message wrapper
+    """
+
+
+class IMessage(Interface):
+    """
+    """
+
+
+class IPublisher(Interface):
+    """
+    A Publisher send message to a queue via an exchange
+    """
+
+    connectionId = Attribute('')

File src/affinitic/zamqp/message.py

+# -*- coding: utf-8 -*-
+"""
+affinitic.zamqp
+
+Licensed under the GPL license, see LICENCE.txt for more details.
+Copyright by Affinitic sprl
+
+$Id: event.py 67630 2006-04-27 00:54:03Z jfroche $
+"""
+from affinitic.zamqp.interfaces import IMessageWrapper, IMessage
+from affinitic.zamqp.transactionmanager import VTM
+import grokcore.component as grok
+
+
+class MessageWrapper(grok.Adapter, VTM):
+    """
+    """
+    grok.context(IMessage)
+    grok.implements(IMessageWrapper)
+
+    def __init__(self, message):
+        super(MessageWrapper, self).__init__(message)
+        self.acknoledged = False
+
+    def ack(self):
+        print 'mark as ack'
+        self.acknoledged = True
+
+    def _finish(self):
+        print 'finish transaction'
+        if self.acknoledged:
+            print 'send ack'
+            self.context.ack()
+
+    def __getattr__(self, name):
+        try:
+            return super(MessageWrapper, self).__getattr__(name)
+        except AttributeError:
+            return getattr(self.context, name)

File src/affinitic/zamqp/processor.py

 import transaction
 from time import sleep
 from ZODB.POSException import ConflictError
-from zope.component import queryUtility, getUtilitiesFor, getUtility
+from zope.component import queryUtility, createObject
 from zope.app.publication.zopepublication import ZopePublication
 from zope.app.component.hooks import setSite
 
-from carrot.messaging import ConsumerSet
-
 from affinitic.zamqp import logger
-from zope.interface import alsoProvides
-from affinitic.zamqp.interfaces import IConsumer, IBrokerConnection, IArrivedMessage
+from affinitic.zamqp.interfaces import IConsumer, IArrivedMessage
 
 log = logging.getLogger('affinitic.zamqp')
 
-ERROR_MARKER = object()
 storage = threading.local()
 
-THREAD_STARTUP_WAIT = 0.05
-
 
 class ConsumerProcessor(object):
 
         exchange = message.delivery_info.get('exchange')
         logger.debug('Notify new message %s in exchange: %s' % (messageId,
                                                                 exchange))
-        stateBeforeNotification = message._state
         setSite(self.site)
+        #XXX all this should be implemented with a with statement
         transaction.begin()
+        message._register()
         try:
-            self.sm.subscribers((message,), IArrivedMessage)
+            results = self.sm.subscribers((message,), IArrivedMessage)
         except Exception, error:
             #XXX Send to Error queue ?
             log.error('Error while running job %s on exchange %s' % (messageId, exchange))
             log.exception(error)
         else:
+            logger.debug("Before commit Message %s (status = '%s')" % (messageId,
+                                                                 message._state))
             try:
                 transaction.commit()
             except ConflictError:
                 logger.error('Conflict while working on message %s' % messageId)
-            else:
-                ack = getattr(message, '_ack', False)
-                if ack:
-                    message.ack()
             logger.debug("Handled Message %s (status = '%s')" % (messageId,
                                                                  message._state))
-            if message._state == stateBeforeNotification:
+            if not results:
+                #If there is no result
                 #XXX nobody used the message: error queue/dead letter queue ?
                 pass
 
         return self.getSite().getSiteManager()
 
     def registerConsumers(self, connectionId):
-        conn = getUtility(IBrokerConnection, name=connectionId).__class__()
-        self.consumerSet = ConsumerSet(connection=conn)
-        for name, consumerUtility in getUtilitiesFor(IConsumer):
-            if consumerUtility.connection_id == connectionId:
-                self.consumerSet.add_consumer(consumerUtility)
-        self.consumerSet.register_callback(self.mark_message)
+        self.consumerSet = createObject('ConsumerSet', connectionId)
         self.consumerSet.register_callback(self.notify_message)
 
-    def mark_message(self, message_data, message):
-        channelId = message.delivery_info.get('exchange')
-        interfaceClass = self.getInterfaceByChannel(channelId)
-        if interfaceClass is not None:
-            logger.debug('Thread %s - Mark message %s with interface %s' % (self.threadName,
-                                                                            message.delivery_info.get('delivery_tag'),
-                                                                            interfaceClass.__name__))
-            alsoProvides(message, interfaceClass)
-
     def notify_message(self, message_data, message):
         while True:
             for thread in self.threads:

File src/affinitic/zamqp/tests/test_doctest.py

+# -*- coding: utf-8 -*-
+"""
+affinitic.zamqp
+
+Licensed under the GPL license, see LICENCE.txt for more details.
+Copyright by Affinitic sprl
+
+$Id: event.py 67630 2006-04-27 00:54:03Z jfroche $
+"""
+import os
+import glob
+from zope.testing import doctest
+from Globals import package_home
+from unittest import TestSuite
+from affinitic.zamqp.tests import GLOBALS
+
+OPTIONFLAGS = (doctest.ELLIPSIS |
+               doctest.NORMALIZE_WHITESPACE |
+               doctest.REPORT_ONLY_FIRST_FAILURE)
+
+
+def list_doctests():
+    home = package_home(GLOBALS)
+    return [filename for filename in
+            glob.glob(os.path.sep.join([home, '*.txt']))]
+
+
+def test_suite():
+    filenames = list_doctests()
+    suite = TestSuite()
+    suites = [suite.addtest(os.path.basename(filename),
+               optionflags=OPTIONFLAGS,
+               package='affinitic.zamqp')
+              for filename in filenames]
+    return suites

File src/affinitic/zamqp/transactionmanager.py

+##############################################################################
+#
+# Copyright (c) 2001 Zope Corporation and Contributors. All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+"""Provide support for linking an external transaction manager with Zope's
+"""
+import transaction
+
+
+class VTM:
+    """Mix-in class that provides transaction management support
+
+    A sub class should call self._register() whenever it performs any
+    transaction-dependent operations (e.g. sql statements).
+
+    The sub class will need to override _finish, to finalize work,
+    _abort, to roll-back work, and perhaps _begin, if any work is
+    needed at the start of a transaction.
+
+    A subclass that uses locking during transaction commit must
+    defined a sortKey() method.
+
+    The VTM variety can be mixed-in with persistent classes.
+    """
+
+    _v_registered = 0
+    _v_finalize = 0
+
+    def _begin(self):
+        pass
+
+    def _register(self):
+        if not self._v_registered:
+            try:
+                transaction.get().register(Surrogate(self))
+                self._begin()
+                self._v_registered = 1
+                self._v_finalize = 0
+            except:
+                pass
+
+    def tpc_begin(self, *ignored):
+        pass
+
+    commit = tpc_begin
+
+    def _finish(self):
+        raise NotImplementedError
+
+    def _abort(self):
+        raise NotImplementedError
+
+    def tpc_vote(self, *ignored):
+        self._v_finalize = 1
+
+    def tpc_finish(self, *ignored):
+        if self._v_finalize:
+            try:
+                self._finish()
+            finally:
+                self._v_registered = 0
+                self._v_finalize = 0
+
+    def abort(self, *ignored):
+        try:
+            self._abort()
+        finally:
+            self._v_registered = 0
+            self._v_finalize = 0
+
+    tpc_abort = abort
+
+    def sortKey(self, *ignored):
+        """ The sortKey method is used for recent ZODB compatibility which
+            needs to have a known commit order for lock acquisition.  Most
+            DA's talking to RDBMS systems do not care about commit order, so
+            return the constant 1
+        """
+        return 10
+
+
+class Surrogate:
+
+    def __init__(self, db):
+        self._p_jar = db
+        self.__inform_commit__ = db.tpc_finish
+        self.__inform_abort__ = db.tpc_abort