Commits

Jean-François Roche committed a04b682

add ErrorManager that consumes errors from error queues and delegate fix to registered subscribers

  • Participants
  • Parent commits b3681d5

Comments (0)

Files changed (3)

src/affinitic/zamqp/error.py

+# -*- coding: utf-8 -*-
+"""
+affinitic.zamqp
+
+Licensed under the GPL license, see LICENCE.txt for more details.
+Copyright by Affinitic sprl
+"""
+from zope.component import createObject
+from zope.component import subscribers
+from zope.component import getUtilitiesFor
+from zope.component import IFactory
+import grokcore.component as grok
+from affinitic.zamqp.consumerset import ConsumerSetFactory, ConsumerSet
+from affinitic.zamqp.interfaces import IErrorFixerHandler
+from affinitic.zamqp.interfaces import IConsumer, IErrorConsumer
+
+
+class ErrorManager(object):
+
+    def __init__(self, connectionId):
+        self.connectionId
+
+    @property
+    def errors(self):
+        consumers = createObject('ErrorConsumerSet', self.connectionId)
+        consumers.register_callback(self.handleErrorMessage)
+        return consumers.iterconsume()
+
+    def handleErrorMessage(self, data, message):
+        for errorFixerHandler in subscribers((data, message), IErrorFixerHandler):
+            if errorFixerHandler.match():
+                errorFixerHandler.fix()
+
+    def main(self):
+        list(self.errors)
+
+
+class ErrorConsumerSetFactory(ConsumerSetFactory):
+
+    def __call__(self, connectionId):
+        conn = createObject('AMQPBrokerConnection', connectionId)
+        consumerSet = ConsumerSet(conn)
+        consumerSet.connection_id = connectionId
+        for name, consumerUtility in getUtilitiesFor(IConsumer):
+            if consumerUtility.connection_id == connectionId and \
+                IErrorConsumer.providedBy(consumerUtility):
+                consumerSet.add_consumer(consumerUtility)
+        return consumerSet
+
+grok.global_utility(ConsumerSetFactory,
+    provides=IFactory, name='ErrorConsumerSet')

src/affinitic/zamqp/interfaces.py

         """
         Do something with the error and the traceback that we got while consuming message
         """
+
+
+class IErrorFixerHandler(Interface):
+    """
+    Error Handler that can fix a problem
+    """
+
+
+class IErrorConsumer(Interface):
+    """
+    Marker interface for a Consumer that handle error messages
+    """

src/affinitic/zamqp/processor.py

 from ZODB.POSException import ConflictError
 from zope.component import createObject, queryUtility
 from zope.app.publication.zopepublication import ZopePublication
-from zope.app.component.hooks import setSite
+from zope.component.hooks import setSite
 
 from affinitic.zamqp import logger
 from affinitic.zamqp.interfaces import IArrivedMessage, IErrorHandler