Commits

Jean-François Roche  committed 7b81834

made publisher transaction aware + some tests

  • Participants
  • Parent commits c56c511

Comments (0)

Files changed (12)

File buildout.cfg

     instance02
     omelette
     test
+    coverage-test
+    coverage-report
     docs
 
 #    rabbitmq
 
 [test]
 recipe = zc.recipe.testrunner
-eggs = affinitic.zamqp
+eggs = affinitic.zamqp [test]
 defaults = ['--tests-pattern', '^f?tests$']
 script = test
 
              collective.sphinx.includedoc
              collective.sphinx.includechangelog
              repoze.sphinx.autointerface
+
+[coverage-test]
+recipe = zc.recipe.testrunner
+eggs = affinitic.zamqp [test]
+defaults = ['--coverage', '${buildout:directory}/coverage', '--auto-progress']
+
+[coverage-report]
+recipe = zc.recipe.egg
+eggs = z3c.coverage
+scripts = coverage
+arguments = ('coverage', 'coverage/report')
       namespace_packages=['affinitic'],
       include_package_data=True,
       zip_safe=False,
+      entry_points={
+            'console_scripts': [
+                  'publishmsg = affinitic.zamqp.publisher:main']},
       extras_require=dict(
+            test=['zope.testing', 'Zope2'],
             docs=['z3c.recipe.sphinxdoc',
                   'collective.sphinx.includechangelog',
                   'repoze.sphinx.autointerface',
       install_requires=[
           'setuptools',
           'carrot',
+          'transaction',
           'five.dbevent',
+          'zope.component',
           'uuid', # python < 2.5
           #'Zope2', # python = 2.6
           'grokcore.component',

File src/affinitic/zamqp/event.py

-# -*- coding: utf-8 -*-
-"""
-affinitic.zamqp
-
-Licensed under the GPL license, see LICENCE.txt for more details.
-Copyright by Affinitic sprl
-
-$Id: event.py 67630 2006-04-27 00:54:03Z jfroche $
-"""
-from zope.interface import implements
-from zope.component.interfaces import ObjectEvent
-from affinitic.zamqp.interfaces import IArrivedMessage
-
-
-class ArrivedMessage(ObjectEvent):
-    implements(IArrivedMessage)
-
-    def __init__(self, object):
-        self.object = object
-        self.data = object.payload

File src/affinitic/zamqp/message.py

     grok.context(IMessage)
     grok.implements(IMessageWrapper)
 
-    def __init__(self, message):
-        super(MessageWrapper, self).__init__(message)
-        self.acknoledged = False
+    acknoledged = False
 
     def ack(self):
-        print 'mark as ack'
         self.acknoledged = True
+        if not self.registered():
+            self._ackMessage()
+
+    def _ackMessage(self):
+        self.context.ack()
 
     def _finish(self):
-        print 'finish transaction'
         if self.acknoledged:
-            print 'send ack'
-            self.context.ack()
+            self._ackMessage()
+
+    def _abort(self):
+        self.acknoledged = False
 
     def __getattr__(self, name):
         try:

File src/affinitic/zamqp/publisher.py

 from carrot.messaging import Publisher as CarrotPublisher
 
 from affinitic.zamqp.interfaces import IPublisher, IBrokerConnection
+from affinitic.zamqp.transactionmanager import VTM
 
 
-class Publisher(grok.GlobalUtility, CarrotPublisher):
+class Publisher(grok.GlobalUtility, CarrotPublisher, VTM):
     grok.baseclass()
     grok.implements(IPublisher)
 
         self._connection = None
         self._backend = None
         self._closed = False
+        self._queueOfPendingMessage = None
+
+    def _begin(self):
+        self._queueOfPendingMessage = []
+        # establish a connection even if the message might
+        # not be send directly
+        backend = self.backend
+
+    _sendToBroker = CarrotPublisher.send
+
+    def send(self, message_data, routing_key=None, delivery_mode=None,
+            mandatory=False, immediate=False, priority=0, content_type=None,
+            content_encoding=None, serializer=None):
+        if self.registered():
+            msgInfo = {'data': message_data,
+                       'info': {'routing_key': routing_key,
+                                'delivery_mode': delivery_mode,
+                                'mandatory': mandatory,
+                                'immediate': immediate,
+                                'priority': priority,
+                                'content_type': content_type,
+                                'content_encoding': content_encoding,
+                                'serializer': serializer}}
+            self._queueOfPendingMessage.append(msgInfo)
+        else:
+            self._sendToBroker(message_data, routing_key, delivery_mode,
+                               mandatory, immediate, priority, content_type,
+                               content_encoding, serializer)
+
+    def _finish(self):
+        for msgInfo in self._queueOfPendingMessage:
+            message_data = msgInfo['data']
+            sendInfo = msgInfo['info']
+            self._sendToBroker(message_data, **sendInfo)
+
+    def _abort(self):
+        self._queueOfPendingMessage = None
 
     @property
     def connection(self):
 
 
 def getCommandLineConfig():
+    opts = []
     try:
         opts, args = getopt.getopt(sys.argv[1:], "ho:t:u:p:v:e:r:m:",
             ["help", "hostname=", "port=", "userid=", "password=", "virtual-host=",

File src/affinitic/zamqp/testing.zcml

+<configure xmlns="http://namespaces.zope.org/zope"
+           i18n_domain="affinitic.zamqp">
+
+    <include package="affinitic.zamqp"/>
+
+</configure>
+

File src/affinitic/zamqp/tests/connection.txt

+    >>> from affinitic.zamqp.connection import BrokerConnection
+    >>> import grokcore.component as grok
+    >>> class DummyBrokerConnection(BrokerConnection):
+    ...     grok.name('bar')
+
+    >>> from zope.component import provideUtility, createObject
+    >>> from affinitic.zamqp.interfaces import IBrokerConnection
+    >>> provideUtility(DummyBrokerConnection(), IBrokerConnection, name='bar')
+    >>> connection = createObject('AMQPBrokerConnection', 'bar')
+    >>> connection2 = createObject('AMQPBrokerConnection', 'bar')
+    >>> connection == connection2
+    False

File src/affinitic/zamqp/tests/message.txt

+Message Wrapper attributes lookup
+---------------------------------
+
+Some setup first. We create a fake amqp message.
+
+    >>> from affinitic.zamqp.interfaces import IMessage
+    >>> from zope.interface import implements
+    >>> class DummyAMQPMessage(object):
+    ...     implements(IMessage)
+    ...     _state = ''
+    ...
+    ...     def ack(self):
+    ...         self._state = 'ACK'
+
+
+    >>> from affinitic.zamqp.interfaces import IMessageWrapper
+    >>> message = DummyAMQPMessage()
+    >>> messageWrapper = IMessageWrapper(message)
+
+The message wrapper does not have a _state attribute but the
+lookup will look in the adapted object (the DummyAMQPMessage object)::
+
+    >>> messageWrapper._state
+    ''
+    >>> messageWrapper.ack()
+    >>> messageWrapper._state
+    'ACK'
+
+
+Message and transaction
+-----------------------
+
+If the message is transaction aware and is acknoledged then the message receive acknoledgement
+only on commit::
+
+    >>> message = DummyAMQPMessage()
+    >>> import transaction
+    >>> tr = transaction.begin()
+    >>> messageWrapper = IMessageWrapper(message)
+    >>> messageWrapper.acknoledged
+    False
+    >>> messageWrapper._register()
+    >>> messageWrapper.acknoledged
+    False
+    >>> message._state
+    ''
+    >>> messageWrapper.ack()
+    >>> messageWrapper.acknoledged
+    True
+    >>> message._state
+    ''
+    >>> transaction.commit()
+    >>> message._state
+    'ACK'
+
+If the transaction is aborted::
+
+    >>> message = DummyAMQPMessage()
+    >>> import transaction
+    >>> tr = transaction.begin()
+    >>> from affinitic.zamqp.interfaces import IMessageWrapper
+    >>> messageWrapper = IMessageWrapper(message)
+    >>> messageWrapper._register()
+    >>> messageWrapper.acknoledged
+    False
+    >>> message._state
+    ''
+    >>> messageWrapper.ack()
+    >>> messageWrapper.acknoledged
+    True
+    >>> message._state
+    ''
+    >>> transaction.abort()
+    >>> message._state
+    ''
+    >>> messageWrapper.acknoledged
+    False
+
+If the message is not acknoledged nothing happen on the message.
+
+    >>> tr = transaction.begin()
+    >>> message = DummyAMQPMessage()
+    >>> messageWrapper = IMessageWrapper(message)
+    >>> messageWrapper._register()
+    >>> messageWrapper.acknoledged
+    False
+    >>> message._state
+    ''
+    >>> transaction.commit()
+    >>> message._state
+    ''
+
+If we work without a transaction, message recieve directly the acknoledgment
+
+    >>> messageWrapper = IMessageWrapper(message)
+    >>> message._state
+    ''
+    >>> messageWrapper.ack()
+    >>> messageWrapper.acknoledged
+    True
+    >>> message._state
+    'ACK'

File src/affinitic/zamqp/tests/publisher.txt

+Command line parsing for publisher
+----------------------------------
+
+    >>> import sys
+    >>> exitOriginal = sys.exit
+    >>> def exit(level=0):
+    ...     print '--Exit with level %s--' % level
+    >>> sys.exit = exit
+
+If you don't provide enough argument, it will print you the usage and exit::
+
+    >>> from affinitic.zamqp.publisher import getCommandLineConfig
+    >>> getCommandLineConfig()
+    Usage: publishmsg [-h | -o hostname -t port -u (userid) -p (password) -v (virtual_host) -e (exchange) -r (routing_key) -m (message)]
+    ...
+    --Exit with level 2--
+    (None, 5672, None, None, None, None, None, None)
+
+If you provide wrong arguments, it will print you the usage and exit::
+
+    >>> sys.argv = ['cmd', '--foobar']
+    >>> getCommandLineConfig()
+    option --foobar not recognized
+    Usage: publishmsg [-h | -o hostname -t port -u (userid) -p (password) -v (virtual_host) -e (exchange) -r (routing_key) -m (message)]
+    ...
+    --Exit with level 2--
+    (None, 5672, None, None, None, None, None, None)
+
+To get help, just use `publishmsg -h` or `publishmsg --help`::
+
+    >>> sys.argv = ['cmd', '-h']
+    >>> getCommandLineConfig()
+    Usage: publishmsg [-h | -o hostname -t port -u (userid) -p (password) -v (virtual_host) -e (exchange) -r (routing_key) -m (message)]
+    ...
+    --Exit with level 0--
+    (None, 5672, None, None, None, None, None, None)
+    >>> sys.exit = exitOriginal
+
+Testing if we provide the right parameter, everything is returned correctly::
+
+    >>> sys.argv = ['cmd', '-o', 'localhost', '-t', '20', '-u', 'john', '-p', 'secret',
+    ...                    '-v', 'virtual1', '-e', 'exchange1', '-r', 'routing1',
+    ...                    '-m', 'Superb Message']
+    >>> getCommandLineConfig()
+    ('localhost', '20', 'john', 'secret', 'virtual1', 'exchange1', 'routing1', 'Superb Message')
+
+    >>> sys.argv = ['cmd', '--hostname', 'localhost', '--port', '20', '--user', 'john',
+    ...                    '--password', 'secret', '--virtual-host', 'virtual1', 
+    ...                    '--exchange', 'exchange1', '--routing-key', 'routing1',
+    ...                    '--message', 'Superb Message']
+    >>> getCommandLineConfig()
+    ('localhost', '20', 'john', 'secret', 'virtual1', 'exchange1', 'routing1', 'Superb Message')
+
+Publisher and transaction
+-------------------------
+
+If the publisher is transaction aware, a message sent to the publisher will be
+transfered to the message broker once the transaction has been commited::
+
+    >>> from affinitic.zamqp.connection import BrokerConnection
+    >>> class FakeBackend(object):
+    ...
+    ...     def establish_connection(self):
+    ...         print 'establishing connection with backend'
+    ...
+    ...     def exchange_declare(self, exchange, type, durable, auto_delete):
+    ...         self.establish_connection()
+
+    >>> class DummyBrokerConnection(BrokerConnection):
+    ...     hostname = 'localhost'
+    ...     port = 12
+    ...
+    ...     def create_backend(self):
+    ...         return FakeBackend()
+
+    >>> from zope.component import provideUtility, createObject
+    >>> from affinitic.zamqp.interfaces import IBrokerConnection
+    >>> provideUtility(DummyBrokerConnection(), IBrokerConnection, name='bar')
+
+    >>> from affinitic.zamqp.publisher import Publisher
+    >>> class FakePublisher(Publisher):
+    ...
+    ...     connection_id = 'bar'
+    ...
+    ...     def _sendToBroker(self, message_data, routing_key=None, delivery_mode=None,
+    ...                       mandatory=False, immediate=False, priority=0, content_type=None,
+    ...                       content_encoding=None, serializer=None):
+    ...         print "Sending to message broker: '%s' with priority: %s " % (message_data, priority)
+
+
+    >>> import transaction
+    >>> tr = transaction.begin()
+    >>> pub = FakePublisher()
+    >>> pub._register()
+    establishing connection with backend
+    >>> pub._queueOfPendingMessage
+    []
+
+A connection is established to the Broker, but nothing is send yet::
+
+    >>> pub.send('My Message', priority=10)
+    >>> from pprint import pprint
+    >>> pprint(pub._queueOfPendingMessage)
+    [{'data': 'My Message',
+      'info': {'content_encoding': None,
+               'content_type': None,
+               'delivery_mode': None,
+               'immediate': False,
+               'mandatory': False,
+               'priority': 10,
+               'routing_key': None,
+               'serializer': None}}]
+    >>> pub.send('My Better Message', priority=1)
+    >>> len(pub._queueOfPendingMessage)
+    2
+    >>> transaction.commit()
+    Sending to message broker: 'My Message' with priority: 10
+    Sending to message broker: 'My Better Message' with priority: 1
+
+If the transaction is aborted, nothing gets sent to the server:
+
+    >>> tr = transaction.begin()
+    >>> pub = FakePublisher()
+    >>> pub._register()
+    establishing connection with backend
+    >>> pub._queueOfPendingMessage
+    []
+    >>> pub.send('My Message', priority=10)
+    >>> from pprint import pprint
+    >>> pprint(pub._queueOfPendingMessage)
+    [{'data': 'My Message',
+      'info': {'content_encoding': None,
+               'content_type': None,
+               'delivery_mode': None,
+               'immediate': False,
+               'mandatory': False,
+               'priority': 10,
+               'routing_key': None,
+               'serializer': None}}]
+    >>> pub.send('My Better Message', priority=1)
+    >>> len(pub._queueOfPendingMessage)
+    2
+    >>> transaction.abort()
+    >>> pub._queueOfPendingMessage is None
+    True
+
+If the publisher is not transaction awe, the message gets sent directly to the server:
+
+    >>> pub = FakePublisher()
+    >>> pub._queueOfPendingMessage is None
+    True
+    >>> pub.send('My Message', priority=10)
+    Sending to message broker: 'My Message' with priority: 10
+    >>> pub.send('My Better Message', priority=1)
+    Sending to message broker: 'My Better Message' with priority: 1

File src/affinitic/zamqp/tests/test_doctest.py

 import os
 import glob
 from zope.testing import doctest
-from Globals import package_home
 from unittest import TestSuite
-from affinitic.zamqp.tests import GLOBALS
+import zope.configuration.xmlconfig
 
 OPTIONFLAGS = (doctest.ELLIPSIS |
                doctest.NORMALIZE_WHITESPACE |
                doctest.REPORT_ONLY_FIRST_FAILURE)
+testPath = os.path.normpath(os.path.dirname(__file__))
 
 
 def list_doctests():
-    home = package_home(GLOBALS)
     return [filename for filename in
-            glob.glob(os.path.sep.join([home, '*.txt']))]
+            glob.glob(os.path.sep.join([testPath, '*.txt']))]
+
+
+def setUp(suite):
+    import affinitic.zamqp
+    zope.configuration.xmlconfig.XMLConfig('testing.zcml', affinitic.zamqp)()
 
 
 def test_suite():
     filenames = list_doctests()
-    suite = TestSuite()
-    suites = [suite.addtest(os.path.basename(filename),
+    return TestSuite([doctest.DocFileSuite(
+                            os.path.basename(filename),
+               setUp=setUp,
                optionflags=OPTIONFLAGS,
-               package='affinitic.zamqp')
-              for filename in filenames]
-    return suites
+               package='affinitic.zamqp.tests')
+              for filename in filenames])

File src/affinitic/zamqp/tests/testing.py

 $Id: event.py 67630 2006-04-27 00:54:03Z jfroche $
 """
 from time import sleep
+from unittest import TestSuite
+
 from zope.interface import Interface
 import grokcore.component as grok
 from zope.component import provideHandler
     Feed Message marker interface
     """
 
+
 class TestConnection(BrokerConnection):
     grok.name("test")
     virtual_host = "test"
     sleep(20)
 
 provideHandler(handleMessage, (IFeedMessage, IArrivedMessage))
+
+
+def test_suite():
+    return TestSuite()

File src/affinitic/zamqp/transactionmanager.py

     def _begin(self):
         pass
 
+    def registered(self):
+        return self._v_registered
+
     def _register(self):
         if not self._v_registered:
             try: