Commits

Jean-François Roche committed 555c160

add multithreaded consumer (default to 1)

Comments (0)

Files changed (4)

src/affinitic/zamqp/consumer.py

 
     def setBackend(self, backend):
         self._backend = backend
-        self.declare()
+        if self.auto_declare:
+            self.declare()
 
     backend = property(getBackend, setBackend)
 

src/affinitic/zamqp/consumerset.py

 $Id$
 """
 import grokcore.component as grok
-from zope.component import getUtilitiesFor, createObject, getUtility, queryAdapter
+from zope.component import getUtilitiesFor, createObject, getUtility, queryAdapter, queryUtility
 from zope.component.interfaces import IFactory
 from zope.interface import alsoProvides, implements, implementedBy
 
 class ConsumerSet(CarrotConsumerSet):
     implements(IConsumerSet)
 
+    maxThreads = 1
+
     def _adaptMessage(self, message):
         alsoProvides(message, IMessage)
         return queryAdapter(message, IMessageWrapper, default=message)
 
     def _markMessage(self, message):
-        consumer = getUtility(IConsumer, message.delivery_info['exchange'])
+        exchange = message.delivery_info['exchange']
+        consumer = queryUtility(IConsumer, exchange)
+        if consumer is None:
+            routingKey = message.delivery_info['routing_key']
+            consumer = getUtility(IConsumer, "%s.%s" % (exchange, routingKey))
         return consumer._markMessage(message)
 
     def receive(self, message_data, message):

src/affinitic/zamqp/processor.py

 
 class MultiProcessor(object):
 
-    def __init__(self, db, sitePath, connectionId, waitTime=3.0):
+    def __init__(self, db, sitePath, connectionId, maxThreads, waitTime=0.3):
         self.db = db
         self.connection = self.db.open()
         self.waitTime = waitTime
         self.sitePath = sitePath
         self.registerConsumers(self.connectionId)
         self.threads = []
-        self.maxThreads = 1
+        self.maxThreads = maxThreads
 
     @property
     def threadName(self):
         thread.start()
 
     def __call__(self):
+        sleep(1)
         list(self.consumerSet.iterconsume())

src/affinitic/zamqp/service.py

 
 $Id$
 """
-from time import sleep
 import threading
 from App.config import getConfiguration
 from affinitic.zamqp.processor import MultiProcessor
 
 class ConsumerService(object):
 
-    def startProcessing(self, serviceId, db, siteName, connectionId):
+    def startProcessing(self, serviceId, db, siteName, connectionId, threads):
         """See interfaces.ITaskService"""
         # Start the thread running the processor inside.
-        processor = MultiProcessor(db, siteName, connectionId)
+        processor = MultiProcessor(db, siteName, connectionId, maxThreads=threads)
         thread = threading.Thread(target=processor, name=serviceId)
         thread.setDaemon(True)
         thread.running = True
         thread.start()
-        sleep(1)
 
 
 def bootStrapSubscriber(event):
     db = event.database
     for serviceId, serviceName in serviceItems.items():
         siteName, serviceName = serviceName.split('@')
+        threads = 1
+        nameAndThreads = serviceName.split(' ')
+        serviceName = nameAndThreads[0]
+        if len(nameAndThreads) > 1:
+            threads = int(nameAndThreads[1])
         consumer = ConsumerService()
         logger.info('Starting consumer %s' % serviceId)
-        consumer.startProcessing(serviceId, db, siteName, serviceName)
+        consumer.startProcessing(serviceId, db, siteName, serviceName, threads)