Commits

Jean-François Roche committed f6d4f0f

abort transaction correctly on error - send error to utility if available

Comments (0)

Files changed (3)

src/affinitic/zamqp/interfaces.py

         :param connectionId: the id of the broker connection where the consumers are connected to
         :rtype: ConsumerSet
         """
+
+
+class IErrorHandler(Interface):
+    """
+    Error handler for a specific exchange
+    """
+
+    def __call__(message, error, traceback):
+        """
+        Do something with the error and the traceback that we got while consuming message
+        """

src/affinitic/zamqp/message.py

         self.acknowledged = False
 
     def __getattr__(self, name):
-        try:
-            return super(MessageWrapper, self).__getattr__(name)
-        except AttributeError:
+        if hasattr(self.__class__, name):
+            return object.__getattribute__(self, name)
+            #return super(MessageWrapper, self).__getattr__(name)
+        else:
             return getattr(self.context, name)

src/affinitic/zamqp/processor.py

 """
 import random
 import logging
+import sys
 import threading
 import transaction
 from time import sleep
 from ZODB.POSException import ConflictError
-from zope.component import createObject
+from zope.component import createObject, queryUtility
 from zope.app.publication.zopepublication import ZopePublication
 from zope.app.component.hooks import setSite
 
 from affinitic.zamqp import logger
-from affinitic.zamqp.interfaces import IArrivedMessage
+from affinitic.zamqp.interfaces import IArrivedMessage, IErrorHandler
 
 log = logging.getLogger('affinitic.zamqp')
 
         try:
             results = self.sm.subscribers((message,), IArrivedMessage)
         except Exception, error:
-            #XXX Send to Error queue ?
-            log.error('Error while running job %s on exchange %s' % (messageId, exchange))
-            log.exception(error)
+            transaction.abort()
+            exc_type, exc_value, exc_traceback = sys.exc_info()
+            errorHandler = queryUtility(IErrorHandler, name=exchange)
+            if errorHandler is not None:
+                errorHandler(message, error, exc_traceback)
+            else:
+                log.error('Error while running job %s on exchange %s' % (messageId, exchange))
+                log.exception(error)
         else:
             logger.debug("Before commit Message %s (status = '%s')" % (messageId,
                                                                  message._state))
                 transaction.commit()
             except ConflictError:
                 logger.error('Conflict while working on message %s' % messageId)
-            logger.debug("Handled Message %s (status = '%s')" % (messageId,
-                                                                 message._state))
-            if not results:
-                #If there is no result
-                #XXX nobody handled the message: error queue/dead letter queue ?
-                pass
+                transaction.abort()
+            else:
+                logger.debug("Handled Message %s (status = '%s')" % (messageId,
+                                                                     message._state))
+                if not results:
+                    #If there is no result
+                    #XXX nobody handled the message: error queue/dead letter queue ?
+                    logger.warning('Nobody handled message %s on exchange %s' % (messageId, exchange))
 
 
 class MultiProcessor(object):