1. Andrew Godwin
  2. py-amqplib

Commits

bar...@macbook.home  committed 6696d30

Add support for using Connection and Channel objects in Python 'with'
statements.

  • Participants
  • Parent commits bf5dae0
  • Branches default

Comments (0)

Files changed (4)

File CHANGES

View file
     when closing Connections and Channels, to help garbage collection.
     Thanks for majek04@... for pointing these out.
 
+    Add support for using Connection and Channel objects as context
+    managers for 'with' statements (available in Python >= 2.5), so you
+    can write code like:
+
+        with conn as Connection():
+            with ch as conn.channel()
+                # do stuff with ch and conn
+
+    and have the Channel and Connection objects closed automatically
+    when the blocks exit.
+
+
 Version 0.6
 
     Very large rearrangement of code, breaking the large client_0_8.py

File amqplib/client_0_8/abstract_channel.py

View file
         self.auto_decode = False
 
 
+    def __enter__(self):
+        """
+        Support for Python >= 2.5 'with' statements.
+
+        """
+        return self
+
+
+    def __exit__(self, type, value, traceback):
+        """
+        Support for Python >= 2.5 'with' statements.
+
+        """
+        self.close()
+
+
     def _send_method(self, method_sig, args='', content=None):
         """
         Send a method for our channel.
             method_sig, args, content)
 
 
+    def close(self):
+        """
+        Close this Channel or Connection
+
+        """
+        raise NotImplementedError('Must be overriden in subclass')
+
+
+
     def wait(self, allowed_methods=None):
         """
         Wait for a method that matches our allowed_methods parameter (the

File tests/client_0_8/run_all.py

View file
 # License along with this library; if not, write to the Free Software
 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301
 
+import sys
 import unittest
 
 import settings
 
-def main():
-    suite = unittest.TestLoader().loadTestsFromNames([
+TEST_NAMES = [
         'test_exceptions',
         'test_serialization',
         'test_basic_message',
         'test_connection',
         'test_channel',
-        ])
+        ]
 
+if sys.version_info >= (2, 5):
+    TEST_NAMES.append('test_with')
+
+def main():
+    suite = unittest.TestLoader().loadTestsFromNames(TEST_NAMES)
     unittest.TextTestRunner(**settings.test_args).run(suite)
 
 if __name__ == '__main__':

File tests/client_0_8/test_with.py

View file
+#!/usr/bin/env python
+"""
+Test support for 'with' statements in Python >= 2.5
+
+"""
+# Copyright (C) 2009 Barry Pederson <bp@barryp.org>
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301
+
+from __future__ import with_statement
+import unittest
+
+import settings
+
+from amqplib.client_0_8 import Connection, Message
+
+
+class TestChannel(unittest.TestCase):
+
+    def test_with(self):
+        with Connection(**settings.connect_args) as conn:
+            self.assertEqual(conn.transport is None, False)
+
+            with conn.channel(1) as ch:
+                self.assertEqual(1 in conn.channels, True)
+
+                #
+                # Do something with the channel
+                #
+                ch.access_request('/data', active=True, write=True)
+                ch.exchange_declare('unittest.fanout', 'fanout', auto_delete=True)
+
+                msg = Message('unittest message',
+                    content_type='text/plain',
+                    application_headers={'foo': 7, 'bar': 'baz'})
+
+                ch.basic_publish(msg, 'unittest.fanout')
+
+            #
+            # check that the channel was closed
+            #
+            self.assertEqual(1 in conn.channels, False)
+            self.assertEqual(ch.is_open, False)
+
+        #
+        # Check that the connection was closed
+        #
+        self.assertEqual(conn.transport, None)
+
+
+def main():
+    suite = unittest.TestLoader().loadTestsFromTestCase(TestChannel)
+    unittest.TextTestRunner(**settings.test_args).run(suite)
+
+
+if __name__ == '__main__':
+    main()