Commits

Jean-François Roche  committed baae110 Merge

merge

  • Participants
  • Parent commits 52c86cb, 1159f8a

Comments (0)

Files changed (21)

+84aa8210981b1a1adc7d8e337a7e46c94b136fdd 0.1

File buildout.cfg

 sources-dir = src
 versions = versions
 extends = http://download.zope.org/Zope2/index/2.12.3/versions.cfg
+          http://dist.plone.org/release/4.0a5/versions.cfg
           versions.cfg
 develop =
     .
 
 parts =
     PILwoTK
+    zeo
     instance01
+    instance02
+    omelette
     test
     docs
 
 zcml =
     affinitic.zamqp
 
+[zeo]
+recipe = plone.recipe.zeoserver
+zeo-address = 0.0.0.0:5010
+zeo-conf-additional =
+
+[omelette]
+recipe = collective.recipe.omelette
+eggs = ${instance01:eggs}
+
 [rabbitmq]
 # required erlang-dev erlang-base erlang-ssl erlang-os-mon erlang-mnesia
 recipe = rod.recipe.rabbitmq
 user = admin:admin
 verbose-security = on
 http-address = 6080
+zeo-client = True
+zeo-address = ${zeo:zeo-address}
+zeo-client-cache-size = 100MB
 debug-mode = on
+shared-blob = on
 zcml = ${buildout:zcml}
+zserver-threads = 4
 eggs =
     PILwoTK
     Plone
     ${buildout:eggs}
 zope-conf-additional =
-    <product-config affinitic.zamqp>
-      autostart @RootTaskService, @blue
-    </product-config>
+    %import affinitic.zamqp
+    <amqp-consumer-server>
+      amqpconnection test
+      user admin
+      password admin
+      host localhost
+    </amqp-consumer-server>
+    #<amqp-consumer-server>
+    #  amqpconnection barfoo
+    #  user admin
+    #  password admin
+    #  host localhost
+    #</amqp-consumer-server>
+
+    #<product-config affinitic.zamqp>
+    #  autostart *@RootTaskService
+      #@blue
+    #</product-config>
+
+[instance02]
+recipe = ${instance01:recipe}
+debug-mode = ${instance01:debug-mode}
+verbose-security = ${instance01:verbose-security}
+zeo-client = ${instance01:zeo-client}
+zeo-address = ${instance01:zeo-address}
+user = ${instance01:user}
+http-address = 0.0.0.0:5012
+shared-blob = on
+zserver-threads = ${instance01:zserver-threads}
+eggs = ${instance01:eggs}
+zcml = ${instance01:zcml}
+zope-conf-additional = ${instance01:zope-conf-additional}
+zeo-client-cache-size = ${instance01:zeo-client-cache-size}
+zope-conf-additional =
+    %import affinitic.zamqp
+    <amqp-consumer-server>
+      amqpconnection test
+      user admin
+      password admin
+      host localhost
+    </amqp-consumer-server>
 
 [docs]
 recipe = z3c.recipe.sphinxdoc
 from setuptools import setup, find_packages
 import os
 
-version = '0.1'
+version = '0.2dev'
 
 setup(name='affinitic.zamqp',
       version=version,
         "Programming Language :: Python",
         ],
       keywords='',
-      author='',
-      author_email='',
-      url='',
+      author='Jean-Francois Roche',
+      author_email='jfroche@affinitic.be',
+      url='http://hg.affinitic.be/affinitic.zamqp',
       license='GPL',
       packages=find_packages('src', exclude=['ez_setup']),
       package_dir={'': 'src'},
       install_requires=[
           'setuptools',
           'carrot',
-          'Zope2',
-          'z3c.autoinclude',
-          'five.dbevent'])
+          'uuid', # python < 2.5
+          #'Zope2', # python = 2.6
+          'grokcore.component',
+          'z3c.autoinclude'])

File src/affinitic/zamqp/__init__.py

+
+
+

File src/affinitic/zamqp/browser/__init__.py

Empty file added.

File src/affinitic/zamqp/browser/configure.zcml

+<configure xmlns="http://namespaces.zope.org/zope"
+           xmlns:browser="http://namespaces.zope.org/browser"
+           i18n_domain="affinitic.zamqp">
+
+  <browser:page
+    for="*"
+    name="consume"
+    class=".consumer.ConsumerView"
+    permission="zope.Public"
+    />
+
+</configure>
+

File src/affinitic/zamqp/browser/consumer.py

+# -*- coding: utf-8 -*-
+"""
+affinitic.zamqp
+
+Licensed under the GPL license, see LICENCE.txt for more details.
+Copyright by Affinitic sprl
+"""
+from Products.Five import BrowserView
+from zope.event import notify
+from zope.component import queryUtility, getUtilitiesFor, getUtility
+from zope.interface import alsoProvides
+
+from carrot.messaging import ConsumerSet
+
+from affinitic.zamqp.event import ArrivedMessage
+from affinitic.zamqp.interfaces import IConsumer, IBrokerConnection
+
+
+class ConsumerView(BrowserView):
+
+    def mark_message(self, message_data, message):
+        channelId = message.delivery_info.get('exchange')
+        interfaceClass = self.getInterfaceByChannel(channelId)
+        if interfaceClass is not None:
+            alsoProvides(message, interfaceClass)
+
+    def notify_message(self, message_data, message):
+        notify(ArrivedMessage(message))
+        message.ack()
+
+    def registerConsumer(self, connectionId):
+        conn = getUtility(IBrokerConnection, name=connectionId)
+        print conn
+        self.consumerSet = ConsumerSet(connection=conn)
+        for name, consumerUtility in getUtilitiesFor(IConsumer):
+            if consumerUtility.connection_id == connectionId:
+                self.consumerSet.add_consumer(consumerUtility)
+        self.consumerSet.register_callback(self.mark_message)
+        self.consumerSet.register_callback(self.notify_message)
+
+    def getInterfaceByChannel(self, channelId):
+        consumer = queryUtility(IConsumer, name=channelId)
+        if consumer is not None:
+            return consumer.messageInterface
+        return None
+
+    def __call__(self, message_channel):
+        self.registerConsumer(message_channel)
+        print 'in consumer loop'
+        list(self.consumerSet.iterconsume())

File src/affinitic/zamqp/component.xml

+<component prefix="affinitic.zamqp.config">
+  <import package="ZServer"/>
+  <sectiontype name="amqp-consumer-server"
+               datatype=".ZAMQPConsumerFactory"
+               implements="ZServer.server">
+     <key name="amqpconnection" datatype="string">
+       <description>
+       The traversal path (from the Zope root) to an
+       executable Zope method (Python Script, external method, product
+       method, etc).  The method must take no arguments.  Ex: "/site/methodname"
+       </description>
+     </key>
+     <key name="user" datatype="string">
+     <description>
+       A zope username. Ex: "admin"
+     </description>
+     </key>
+     <key name="password" datatype="string">
+     <description>
+      The password for the zope username provided above.  Careful: this
+      is obviously not encrypted in the config file. Ex: "123"
+     </description>
+     </key>
+     <key name="host" datatype="string">
+     <description>
+       The hostname passed in via the "Host:" header in the
+       faux request.  Could be useful if you have virtual host rules
+       set up inside Zope itself. Ex: "www.example.com"
+     </description>
+    </key>
+     <key name="sitePath" datatype="string">
+     <description>
+       The site path to your application (having a sitemanager)
+     </description>
+     </key>
+
+  </sectiontype>
+</component>

File src/affinitic/zamqp/config.py

+# -*- coding: utf-8 -*-
+"""
+affinitic.zamqp
+
+Licensed under the GPL license, see LICENCE.txt for more details.
+Copyright by Affinitic sprl
+
+$Id: event.py 67630 2006-04-27 00:54:03Z jfroche $
+"""
+from ZServer.datatypes import ServerFactory
+from affinitic.zamqp.server import ZAMQPConsumerServer
+
+
+class ZAMQPConsumerFactory(ServerFactory):
+    """Open a storage configured via ZConfig"""
+
+    def __init__(self, section):
+        self.user = section.user
+        self.host = section.host
+        self.amqpconnection = section.amqpconnection
+        self.password = section.password
+        self.sitepath = section.sitepath
+
+    def create(self):
+        return ZAMQPConsumerServer(self.user, self.password, self.host, self.amqpconnection, self.sitepath)

File src/affinitic/zamqp/configure.zcml

     <include package="z3c.autoinclude" file="meta.zcml" />
     <includeDependencies package="." />
 
-    <subscriber
-        for="zope.app.appsetup.interfaces.IDatabaseOpenedWithRootEvent"
-        handler=".service.bootStrapSubscriber"/>
+    <include package=".browser"/>
 
 </configure>

File src/affinitic/zamqp/connection.py

+# -*- coding: utf-8 -*-
+"""
+affinitic.zamqp
+
+Licensed under the GPL license, see LICENCE.txt for more details.
+Copyright by Affinitic sprl
+
+$Id: event.py 67630 2006-04-27 00:54:03Z jfroche $
+"""
+import grokcore.component as grok
+from carrot.connection import BrokerConnection as CarrotBrokerConnection
+from affinitic.zamqp.interfaces import IBrokerConnection
+
+
+class BrokerConnection(grok.GlobalUtility, CarrotBrokerConnection):
+    grok.implements(IBrokerConnection)
+    grok.baseclass()
+
+    def __init__(self, hostname=None, userid=None, password=None,
+                 virtual_host=None, port=None, **kwargs):
+        self._closed = None
+        self._connection = None

File src/affinitic/zamqp/consumer.py

+# -*- coding: utf-8 -*-
+"""
+affinitic.zamqp
+
+Licensed under the GPL license, see LICENCE.txt for more details.
+Copyright by Affinitic sprl
+
+$Id: event.py 67630 2006-04-27 00:54:03Z jfroche $
+"""
+from zope.component import getUtility
+from carrot.messaging import Consumer as CarrotConsumer
+from affinitic.zamqp.interfaces import IConsumer, IBrokerConnection
+import grokcore.component as grok
+
+
+class Consumer(grok.GlobalUtility, CarrotConsumer):
+    grok.baseclass()
+    grok.implements(IConsumer)
+
+    queue = None
+
+    def __init__(self):
+        self._connection = None
+        self._backend = None
+        self.callbacks = []
+        if self.exclusive:
+            self.auto_delete = True
+        self.consumer_tag = self._generate_consumer_tag()
+
+    @property
+    def connection(self):
+        if self._connection is None:
+            self._connection = getUtility(IBrokerConnection, name=self.connection_id)
+        return self._connection
+
+    def getBackend(self):
+        if self._backend is None:
+            self._backend = self.connection.create_backend()
+            if self.auto_declare:
+                self.declare()
+        return self._backend
+
+    def setBackend(self, backend):
+        self._backend = backend
+        self.declare()
+
+    backend = property(getBackend, setBackend)
+
+    def as_dict(self):
+        return {'exchange': self.exchange,
+                'routing_key': self.routingKey}

File src/affinitic/zamqp/event.py

+# -*- coding: utf-8 -*-
+"""
+affinitic.zamqp
+
+Licensed under the GPL license, see LICENCE.txt for more details.
+Copyright by Affinitic sprl
+
+$Id: event.py 67630 2006-04-27 00:54:03Z jfroche $
+"""
+from zope.interface import implements
+from zope.component.interfaces import ObjectEvent
+from affinitic.zamqp.interfaces import IArrivedMessage
+
+
+class ArrivedMessage(ObjectEvent):
+    implements(IArrivedMessage)
+
+    def __init__(self, object):
+        self.object = object
+        self.data = object.payload

File src/affinitic/zamqp/interfaces.py

 $Id: event.py 67630 2006-04-27 00:54:03Z jfroche $
 """
 from zope.interface import Interface, Attribute
+from zope.component.interfaces import IObjectEvent
 
 
-class Connection(Interface):
+class IBrokerConnection(Interface):
     """
     AMQP Broker connection to an AMQP server via a specific virtual host
     """
 
+    id = Attribute('')
+
     hostname = Attribute('')
 
     port = Attribute('')
         """
 
 
-class Consumer(Interface):
+class IConsumer(Interface):
     """
     A Consumer receive messages sent to a queue via an exchange
     """
 
+    connectionId = Attribute('')
+
     queue = Attribute('')
 
     exchange = Attribute('')
 
+    exchangeType = Attribute('')
+
     routingKey = Attribute('')
+
+    auto_delete = Attribute('')
+
+    messageInterface = Attribute("Return the interface related to the message")
+
+
+class IArrivedMessage(IObjectEvent):
+    """
+    Event fired when a new message has arrived
+    """

File src/affinitic/zamqp/introduction.txt

 ----
 
 The Advanced Message Queuing Protocol [#amqp]_ is a very performant protocol that is implemented
-in many platform, many languages and thus can exchange messages between many framework/applications.
+in many platforms, many languages and thus can exchange messages between many framework/applications.
 
 The most known AMQP open source servers are: `RabbitMQ <http://www.rabbitmq.com>`_ and `Qpid <http://qpid.apache.org/>`_
 
 
 As you might also see, AMQP also permit asynchronous events in Zope 2.
 
-We use ``carrot`` to send/receive messages and enable consumers in separate threads within
+Dependencies
+------------
+
+We use `carrot <http://pypi.python.org/pypi/carrot>`_ to send/receive messages and enable consumers in separate threads within
 Zope.
 
+We tested this package with:
+
+    * Zope 2.12
+    * `five.dbevent <http://pypi.python.org/pypi/five.dbevent>`_ to add our event subscribers on Zope database initialization
+    * `RabbitMQ <http://www.rabbitmq.com>`_
+
 .. rubric:: Footnotes
 
 .. [#amqp] If needed here are some nice links that might help you to understand what AMQP is about:

File src/affinitic/zamqp/server.py

+# -*- coding: utf-8 -*-
+"""
+affinitic.zamqp
+
+Licensed under the GPL license, see LICENCE.txt for more details.
+Copyright by Affinitic sprl
+
+$Id: event.py 67630 2006-04-27 00:54:03Z jfroche $
+"""
+import asyncore
+import socket
+
+from ZServer.ClockServer import ClockServer, LogHelper
+from ZServer.PubCore import handle
+from ZServer.AccessLogger import access_logger
+
+
+class ZAMQPConsumerServer(ClockServer):
+
+    SERVER_IDENT = 'Zope AMQP consumer'
+    _shutdown = 0
+
+    def __init__(self, user, password, host, amqpconnection, sitePath):
+        asyncore.dispatcher.__init__(self)
+        self.user = user
+        self.password = password
+        self.host = host
+        self.method = '%s/consume' % sitePath
+        self.connection = amqpconnection
+        self.logger = LogHelper(access_logger)
+        h = self.headers = []
+        h.append('User-Agent: Zope AMQP Consumer Server Client')
+        h.append('Accept: text/html,text/plain')
+        if not host:
+            host = socket.gethostname()
+        h.append('Host: %s' % host)
+        auth = False
+        if user and password:
+            encoded = ('%s:%s' % (user, password)).encode('base64')
+            h.append('Authorization: Basic %s' % encoded)
+            auth = True
+        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
+        self.log_info('ZAMQP Consumer Server started')
+        self.started = False
+        self.zhandler = handle
+
+    def readable(self):
+        # generate a request at most once every self.period seconds
+        if not self.started:
+            req, zreq, resp = self.get_requests_and_response()
+            zreq.args = (self.connection,)
+            ret = self.zhandler('Zope2', zreq, resp)
+            print ret
+            self.started = True
+        return False
+
+    def clean_shutdown_control(self, phase, time_in_this_phase):
+        if phase == 1:
+            self.log_info('Shutting down ZAMQP Consumer Server')
+            self.consumer.close()
+            self.conn.close()

File src/affinitic/zamqp/service.py

-# -*- coding: utf-8 -*-
-"""
-affinitic.zamqp
-
-Licensed under the GPL license, see LICENCE.txt for more details.
-Copyright by Affinitic sprl
-
-$Id: event.py 67630 2006-04-27 00:54:03Z jfroche $
-"""
-import logging
-import threading
-import time
-
-from zope.app.publication.zopepublication import ZopePublication
-from App.config import getConfiguration
-from ZPublisher.HTTPRequest import HTTPRequest
-from ZPublisher.HTTPResponse import HTTPResponse
-import ZPublisher
-
-log = logging.getLogger('affinitic.zamqp')
-
-ERROR_MARKER = object()
-storage = threading.local()
-
-
-def getAutostartServiceNames():
-    """get a list of services to start"""
-
-    config = getConfiguration().product_config
-    if config is not None:
-        task_config = config.get('affinitic.zamqp', None)
-        if task_config:
-            autostart = task_config.get('autostart', '')
-            serviceNames = [name.strip()
-                            for name in autostart.split(',')]
-    return serviceNames
-
-
-class ConsumerProcessor(object):
-
-    def __init__(self, db, servicePath, waitTime=3.0):
-        self.db = db
-        self.servicePath = servicePath
-        self.waitTime = waitTime
-
-    def processNext(self):
-        self.call('acl_users2')
-
-    def call(self, method, args=(), errorValue=ERROR_MARKER):
-        path = [method] + self.servicePath[:]
-        path.reverse()
-        response = HTTPResponse()
-        env = {'SERVER_NAME': 'dummy',
-               'SERVER_PORT': '8080',
-               'PATH_INFO': '/' + '/'.join(path)}
-        request = HTTPRequest(None, env, response)
-        conn = self.db.open()
-        root = conn.root()
-        request['PARENTS'] = [root[ZopePublication.root_name]]
-        try:
-            try:
-                ZPublisher.Publish.publish(request, 'Zope2', [None])
-            except Exception, error:
-                # This thread should never crash, thus a blank except
-                log.error('Processor: ``%s()`` caused an error!' % method)
-                log.exception(error)
-                return errorValue is ERROR_MARKER and error or errorValue
-        finally:
-            request.close()
-            conn.close()
-            if not request.response.body:
-                time.sleep(1)
-            else:
-                return request.response.body
-
-    def __call__(self):
-        while 1:
-            result = self.processNext()
-            print result
-            # If there are no jobs available, sleep a little bit and then
-            # check again.
-            if not result:
-                print 'nothing, we sleep'
-                time.sleep(self.waitTime)
-
-
-class ConsumerService(object):
-
-    def startProcessing(self, app):
-        """See interfaces.ITaskService"""
-        # Start the thread running the processor inside.
-        processor = ConsumerProcessor(app, [])
-        thread = threading.Thread(target=processor, name='foo')
-        thread.setDaemon(True)
-        thread.running = True
-        thread.start()
-
-
-def bootStrapSubscriber(event):
-    """Start the queue processing services based on the
-       settings in zope.conf"""
-
-    serviceNames = getAutostartServiceNames()
-
-    db = event.database
-    connection = db.open()
-    root = connection.root()
-    root_folder = root.get(ZopePublication.root_name, None)
-    # we assume that portals can only added at site root level
-
-    log.info('handling event IStartRemoteTasksEvent')
-
-    for siteName, serviceName in [name.split('@')
-                                  for name in serviceNames if name]:
-        sites = [root_folder]
-        print 'add consumer service %s' % serviceName
-        #servicePath = [path for path in self.getPhysicalPath() if path]
-        consumer = ConsumerService()
-        consumer.startProcessing(db)
-#        rootServices = list(rootSM.getUtilitiesFor(interfaces.ITaskService))
-#
-#        for site in sites:
-#            csName = getattr(site, "__name__", '')
-#            if csName is None:
-#                csName = 'root'
-#            if site is not None:
-#                sm = site.getSiteManager()
-#                if serviceName == '*':
-#                    services = list(sm.getUtilitiesFor(interfaces.ITaskService))
-#                    if siteName != "*" and siteName != '':
-#                        services = [s for s in services
-#                                       if s not in rootServices]
-#                else:
-#                    services = [(serviceName,
-#                                 component.queryUtility(interfaces.ITaskService,
-#                                                       context=site,
-#                                                       name=serviceName))]
-#                serviceCount = 0
-#                for srvname, service in services:
-#                    if service is not None and not service.isProcessing():
-#                        service.startProcessing()
-#                        serviceCount += 1
-#                        msg = 'service %s on site %s started'
-#                        log.info(msg % (srvname, csName))
-#                    else:
-#                        if siteName != "*" and serviceName != "*":
-#                            msg = 'service %s on site %s not found'
-#                            log.error(msg % (srvname, csName))
-#            else:
-#                log.error('site %s not found' % siteName)
-#
-#        if (siteName == "*" or serviceName == "*") and serviceCount == 0:
-#            msg = 'no services started by directive %s'
-#            log.warn(msg % name)
-#

File src/affinitic/zamqp/tests/__init__.py

Empty file added.

File src/affinitic/zamqp/tests/testing.py

+# -*- coding: utf-8 -*-
+"""
+affinitic.zamqp
+
+Licensed under the GPL license, see LICENCE.txt for more details.
+Copyright by Affinitic sprl
+
+$Id: event.py 67630 2006-04-27 00:54:03Z jfroche $
+"""
+from time import sleep
+from zope.interface import Interface
+import grokcore.component as grok
+from zope.component import provideHandler
+
+from affinitic.zamqp.consumer import Consumer
+from affinitic.zamqp.connection import BrokerConnection
+from affinitic.zamqp.interfaces import IArrivedMessage
+
+
+class IFeedMessage(Interface):
+    """
+    Feed Message marker interface
+    """
+
+class TestConnection(BrokerConnection):
+    grok.name("test")
+    virtual_host = "test"
+    hostname = "localhost"
+    port = 5672
+    userid = "test"
+    password = "test"
+
+
+class FeedConsumer(Consumer):
+    grok.name('feed')
+    queue = "db.foo"
+    exchange = 'db.foo'
+    exchange_type = 'direct'
+    routing_key = 'importer'
+    connection_id = 'cerise'
+    messageInterface = IFeedMessage
+
+
+def handleMessage(message, event):
+    print 'consuming %s' % message.payload
+    sleep(20)
+
+provideHandler(handleMessage, (IFeedMessage, IArrivedMessage))

File src/affinitic/zamqp/tests/testing.zcml

+<configure xmlns="http://namespaces.zope.org/zope"
+           xmlns:grok="http://namespaces.zope.org/grok"
+           i18n_domain="zamqp">
+
+      <grok:grok package=".testing"/>
+
+</configure>
+

File versions.cfg

 [versions]
-Plone=4.0a3
+Plone=4.0a5
 z3c.recipe.sphinxdoc = 0.0.9dev