1. Jean-François Roche
  2. affinitic.zamqp

Commits

Jean-François Roche  committed bdacb73 Merge

merge

  • Participants
  • Parent commits baae110, 7cc0331
  • Branches default

Comments (0)

Files changed (11)

File setup.py

View file
       install_requires=[
           'setuptools',
           'carrot',
+          'five.dbevent',
           'uuid', # python < 2.5
           #'Zope2', # python = 2.6
           'grokcore.component',

File src/affinitic/zamqp/__init__.py

View file
-
-
-
+import logging
+LOGGER = 'affinitic.zamqp'
+logger = logging.getLogger(LOGGER)
+logger.setLevel(logging.DEBUG)
+fh = logging.handlers.TimedRotatingFileHandler("amqp.log", 'midnight', 1)
+fh.suffix = "%Y-%m-%d-%H-%M"
+formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
+fh.setFormatter(formatter)
+fh.setLevel(logging.DEBUG)
+logger.addHandler(fh)

File src/affinitic/zamqp/browser/__init__.py

Empty file removed.

File src/affinitic/zamqp/browser/configure.zcml

-<configure xmlns="http://namespaces.zope.org/zope"
-           xmlns:browser="http://namespaces.zope.org/browser"
-           i18n_domain="affinitic.zamqp">
-
-  <browser:page
-    for="*"
-    name="consume"
-    class=".consumer.ConsumerView"
-    permission="zope.Public"
-    />
-
-</configure>
-

File src/affinitic/zamqp/browser/consumer.py

-# -*- coding: utf-8 -*-
-"""
-affinitic.zamqp
-
-Licensed under the GPL license, see LICENCE.txt for more details.
-Copyright by Affinitic sprl
-"""
-from Products.Five import BrowserView
-from zope.event import notify
-from zope.component import queryUtility, getUtilitiesFor, getUtility
-from zope.interface import alsoProvides
-
-from carrot.messaging import ConsumerSet
-
-from affinitic.zamqp.event import ArrivedMessage
-from affinitic.zamqp.interfaces import IConsumer, IBrokerConnection
-
-
-class ConsumerView(BrowserView):
-
-    def mark_message(self, message_data, message):
-        channelId = message.delivery_info.get('exchange')
-        interfaceClass = self.getInterfaceByChannel(channelId)
-        if interfaceClass is not None:
-            alsoProvides(message, interfaceClass)
-
-    def notify_message(self, message_data, message):
-        notify(ArrivedMessage(message))
-        message.ack()
-
-    def registerConsumer(self, connectionId):
-        conn = getUtility(IBrokerConnection, name=connectionId)
-        print conn
-        self.consumerSet = ConsumerSet(connection=conn)
-        for name, consumerUtility in getUtilitiesFor(IConsumer):
-            if consumerUtility.connection_id == connectionId:
-                self.consumerSet.add_consumer(consumerUtility)
-        self.consumerSet.register_callback(self.mark_message)
-        self.consumerSet.register_callback(self.notify_message)
-
-    def getInterfaceByChannel(self, channelId):
-        consumer = queryUtility(IConsumer, name=channelId)
-        if consumer is not None:
-            return consumer.messageInterface
-        return None
-
-    def __call__(self, message_channel):
-        self.registerConsumer(message_channel)
-        print 'in consumer loop'
-        list(self.consumerSet.iterconsume())

File src/affinitic/zamqp/component.xml

-<component prefix="affinitic.zamqp.config">
-  <import package="ZServer"/>
-  <sectiontype name="amqp-consumer-server"
-               datatype=".ZAMQPConsumerFactory"
-               implements="ZServer.server">
-     <key name="amqpconnection" datatype="string">
-       <description>
-       The traversal path (from the Zope root) to an
-       executable Zope method (Python Script, external method, product
-       method, etc).  The method must take no arguments.  Ex: "/site/methodname"
-       </description>
-     </key>
-     <key name="user" datatype="string">
-     <description>
-       A zope username. Ex: "admin"
-     </description>
-     </key>
-     <key name="password" datatype="string">
-     <description>
-      The password for the zope username provided above.  Careful: this
-      is obviously not encrypted in the config file. Ex: "123"
-     </description>
-     </key>
-     <key name="host" datatype="string">
-     <description>
-       The hostname passed in via the "Host:" header in the
-       faux request.  Could be useful if you have virtual host rules
-       set up inside Zope itself. Ex: "www.example.com"
-     </description>
-    </key>
-     <key name="sitePath" datatype="string">
-     <description>
-       The site path to your application (having a sitemanager)
-     </description>
-     </key>
-
-  </sectiontype>
-</component>

File src/affinitic/zamqp/config.py

-# -*- coding: utf-8 -*-
-"""
-affinitic.zamqp
-
-Licensed under the GPL license, see LICENCE.txt for more details.
-Copyright by Affinitic sprl
-
-$Id: event.py 67630 2006-04-27 00:54:03Z jfroche $
-"""
-from ZServer.datatypes import ServerFactory
-from affinitic.zamqp.server import ZAMQPConsumerServer
-
-
-class ZAMQPConsumerFactory(ServerFactory):
-    """Open a storage configured via ZConfig"""
-
-    def __init__(self, section):
-        self.user = section.user
-        self.host = section.host
-        self.amqpconnection = section.amqpconnection
-        self.password = section.password
-        self.sitepath = section.sitepath
-
-    def create(self):
-        return ZAMQPConsumerServer(self.user, self.password, self.host, self.amqpconnection, self.sitepath)

File src/affinitic/zamqp/configure.zcml

View file
     <include package="z3c.autoinclude" file="meta.zcml" />
     <includeDependencies package="." />
 
-    <include package=".browser"/>
+    <subscriber
+        for="zope.app.appsetup.interfaces.IDatabaseOpenedWithRootEvent"
+        handler=".service.bootStrapSubscriber"/>
 
 </configure>

File src/affinitic/zamqp/processor.py

View file
+# -*- coding: utf-8 -*-
+"""
+<+ MODULE_NAME +>
+
+Licensed under the <+ LICENSE +> license, see LICENCE.txt for more details.
+Copyright by Affinitic sprl
+
+$Id: event.py 67630 2006-04-27 00:54:03Z jfroche $
+"""
+import random
+import logging
+import threading
+import transaction
+from time import sleep
+from ZODB.POSException import ConflictError
+from zope.component import queryUtility, getUtilitiesFor, getUtility
+from zope.app.publication.zopepublication import ZopePublication
+from zope.app.component.hooks import setSite
+
+from carrot.messaging import ConsumerSet
+
+from affinitic.zamqp import logger
+from zope.interface import alsoProvides
+from affinitic.zamqp.interfaces import IConsumer, IBrokerConnection, IArrivedMessage
+
+log = logging.getLogger('affinitic.zamqp')
+
+ERROR_MARKER = object()
+storage = threading.local()
+
+THREAD_STARTUP_WAIT = 0.05
+
+
+class ConsumerProcessor(object):
+
+    def __init__(self, site, waitTime=3.0):
+        self.site = site
+        self.sm = site.getSiteManager()
+
+    def __call__(self, message):
+        messageId = message.delivery_info.get('delivery_tag')
+        exchange = message.delivery_info.get('exchange')
+        logger.debug('Notify new message %s in exchange: %s' % (messageId,
+                                                                exchange))
+        stateBeforeNotification = message._state
+        setSite(self.site)
+        transaction.begin()
+        try:
+            self.sm.subscribers((message,), IArrivedMessage)
+        except Exception, error:
+            #XXX Send to Error queue ?
+            log.error('Error while running job %s on exchange %s' % (messageId, exchange))
+            log.exception(error)
+        else:
+            try:
+                transaction.commit()
+            except ConflictError:
+                logger.error('Conflict while working on message %s' % messageId)
+            else:
+                ack = getattr(message, '_ack', False)
+                if ack:
+                    message.ack()
+            logger.debug("Handled Message %s (status = '%s')" % (messageId,
+                                                                 message._state))
+            if message._state == stateBeforeNotification:
+                #XXX nobody used the message: error queue/dead letter queue ?
+                pass
+
+
+class MultiProcessor(object):
+
+    def __init__(self, db, sitePath, connectionId, waitTime=3.0):
+        self.db = db
+        self.connection = self.db.open()
+        self.waitTime = waitTime
+        self.connectionId = connectionId
+        self.sitePath = sitePath
+        self.registerConsumers(self.connectionId)
+        self.threads = []
+        self.maxThreads = 1
+
+    @property
+    def threadName(self):
+        return threading.currentThread().getName()
+
+    def getInterfaceByChannel(self, channelId):
+        consumer = queryUtility(IConsumer, name=channelId)
+        if consumer is not None:
+            return consumer.messageInterface
+        return None
+
+    def getSite(self):
+        self.root = self.connection.root()
+        return getattr(self.root[ZopePublication.root_name], self.sitePath)
+
+    def getSiteManager(self):
+        return self.getSite().getSiteManager()
+
+    def registerConsumers(self, connectionId):
+        conn = getUtility(IBrokerConnection, name=connectionId).__class__()
+        self.consumerSet = ConsumerSet(connection=conn)
+        for name, consumerUtility in getUtilitiesFor(IConsumer):
+            if consumerUtility.connection_id == connectionId:
+                self.consumerSet.add_consumer(consumerUtility)
+        self.consumerSet.register_callback(self.mark_message)
+        self.consumerSet.register_callback(self.notify_message)
+
+    def mark_message(self, message_data, message):
+        channelId = message.delivery_info.get('exchange')
+        interfaceClass = self.getInterfaceByChannel(channelId)
+        if interfaceClass is not None:
+            logger.debug('Thread %s - Mark message %s with interface %s' % (self.threadName,
+                                                                            message.delivery_info.get('delivery_tag'),
+                                                                            interfaceClass.__name__))
+            alsoProvides(message, interfaceClass)
+
+    def notify_message(self, message_data, message):
+        while True:
+            for thread in self.threads:
+                if not thread.isAlive():
+                    self.threads.remove(thread)
+            if len(self.threads) == self.maxThreads:
+                sleep(self.waitTime)
+                continue
+            else:
+                sleep(random.random())
+                break
+        processor = ConsumerProcessor(self.getSite())
+        thread = threading.Thread(
+            target=processor, args=(message,))
+        self.threads.append(thread)
+        thread.start()
+
+    def __call__(self):
+        list(self.consumerSet.iterconsume())

File src/affinitic/zamqp/server.py

-# -*- coding: utf-8 -*-
-"""
-affinitic.zamqp
-
-Licensed under the GPL license, see LICENCE.txt for more details.
-Copyright by Affinitic sprl
-
-$Id: event.py 67630 2006-04-27 00:54:03Z jfroche $
-"""
-import asyncore
-import socket
-
-from ZServer.ClockServer import ClockServer, LogHelper
-from ZServer.PubCore import handle
-from ZServer.AccessLogger import access_logger
-
-
-class ZAMQPConsumerServer(ClockServer):
-
-    SERVER_IDENT = 'Zope AMQP consumer'
-    _shutdown = 0
-
-    def __init__(self, user, password, host, amqpconnection, sitePath):
-        asyncore.dispatcher.__init__(self)
-        self.user = user
-        self.password = password
-        self.host = host
-        self.method = '%s/consume' % sitePath
-        self.connection = amqpconnection
-        self.logger = LogHelper(access_logger)
-        h = self.headers = []
-        h.append('User-Agent: Zope AMQP Consumer Server Client')
-        h.append('Accept: text/html,text/plain')
-        if not host:
-            host = socket.gethostname()
-        h.append('Host: %s' % host)
-        auth = False
-        if user and password:
-            encoded = ('%s:%s' % (user, password)).encode('base64')
-            h.append('Authorization: Basic %s' % encoded)
-            auth = True
-        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
-        self.log_info('ZAMQP Consumer Server started')
-        self.started = False
-        self.zhandler = handle
-
-    def readable(self):
-        # generate a request at most once every self.period seconds
-        if not self.started:
-            req, zreq, resp = self.get_requests_and_response()
-            zreq.args = (self.connection,)
-            ret = self.zhandler('Zope2', zreq, resp)
-            print ret
-            self.started = True
-        return False
-
-    def clean_shutdown_control(self, phase, time_in_this_phase):
-        if phase == 1:
-            self.log_info('Shutting down ZAMQP Consumer Server')
-            self.consumer.close()
-            self.conn.close()

File src/affinitic/zamqp/service.py

View file
+# -*- coding: utf-8 -*-
+"""
+affinitic.zamqp
+
+Licensed under the GPL license, see LICENCE.txt for more details.
+Copyright by Affinitic sprl
+
+$Id: event.py 67630 2006-04-27 00:54:03Z jfroche $
+"""
+from time import sleep
+import threading
+from App.config import getConfiguration
+from affinitic.zamqp.processor import MultiProcessor
+import logging
+logger = logging.getLogger('affinitic.zamqp')
+
+
+def getAutostartServiceNames():
+    """get a list of services to start"""
+    config = getConfiguration().product_config
+    if config is not None:
+        task_config = config.get('affinitic.zamqp', None)
+        if task_config:
+            return task_config
+    return {}
+
+
+class ConsumerService(object):
+
+    def startProcessing(self, serviceId, db, siteName, connectionId):
+        """See interfaces.ITaskService"""
+        # Start the thread running the processor inside.
+        processor = MultiProcessor(db, siteName, connectionId)
+        thread = threading.Thread(target=processor, name=serviceId)
+        thread.setDaemon(True)
+        thread.running = True
+        thread.start()
+        sleep(1)
+
+
+def bootStrapSubscriber(event):
+    """Start the queue processing services based on the
+       settings in zope.conf"""
+    serviceItems = getAutostartServiceNames()
+    db = event.database
+    for serviceId, serviceName in serviceItems.items():
+        siteName, serviceName = serviceName.split('@')
+        consumer = ConsumerService()
+        logger.info('Starting consumer %s' % serviceId)
+        consumer.startProcessing(serviceId, db, siteName, serviceName)