Anonymous avatar Anonymous committed 6696d30

Add support for using Connection and Channel objects in Python 'with'
statements.

Comments (0)

Files changed (4)

     when closing Connections and Channels, to help garbage collection.
     Thanks for majek04@... for pointing these out.
 
+    Add support for using Connection and Channel objects as context
+    managers for 'with' statements (available in Python >= 2.5), so you
+    can write code like:
+
+        with conn as Connection():
+            with ch as conn.channel()
+                # do stuff with ch and conn
+
+    and have the Channel and Connection objects closed automatically
+    when the blocks exit.
+
+
 Version 0.6
 
     Very large rearrangement of code, breaking the large client_0_8.py

amqplib/client_0_8/abstract_channel.py

         self.auto_decode = False
 
 
+    def __enter__(self):
+        """
+        Support for Python >= 2.5 'with' statements.
+
+        """
+        return self
+
+
+    def __exit__(self, type, value, traceback):
+        """
+        Support for Python >= 2.5 'with' statements.
+
+        """
+        self.close()
+
+
     def _send_method(self, method_sig, args='', content=None):
         """
         Send a method for our channel.
             method_sig, args, content)
 
 
+    def close(self):
+        """
+        Close this Channel or Connection
+
+        """
+        raise NotImplementedError('Must be overriden in subclass')
+
+
+
     def wait(self, allowed_methods=None):
         """
         Wait for a method that matches our allowed_methods parameter (the

tests/client_0_8/run_all.py

 # License along with this library; if not, write to the Free Software
 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301
 
+import sys
 import unittest
 
 import settings
 
-def main():
-    suite = unittest.TestLoader().loadTestsFromNames([
+TEST_NAMES = [
         'test_exceptions',
         'test_serialization',
         'test_basic_message',
         'test_connection',
         'test_channel',
-        ])
+        ]
 
+if sys.version_info >= (2, 5):
+    TEST_NAMES.append('test_with')
+
+def main():
+    suite = unittest.TestLoader().loadTestsFromNames(TEST_NAMES)
     unittest.TextTestRunner(**settings.test_args).run(suite)
 
 if __name__ == '__main__':

tests/client_0_8/test_with.py

+#!/usr/bin/env python
+"""
+Test support for 'with' statements in Python >= 2.5
+
+"""
+# Copyright (C) 2009 Barry Pederson <bp@barryp.org>
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301
+
+from __future__ import with_statement
+import unittest
+
+import settings
+
+from amqplib.client_0_8 import Connection, Message
+
+
+class TestChannel(unittest.TestCase):
+
+    def test_with(self):
+        with Connection(**settings.connect_args) as conn:
+            self.assertEqual(conn.transport is None, False)
+
+            with conn.channel(1) as ch:
+                self.assertEqual(1 in conn.channels, True)
+
+                #
+                # Do something with the channel
+                #
+                ch.access_request('/data', active=True, write=True)
+                ch.exchange_declare('unittest.fanout', 'fanout', auto_delete=True)
+
+                msg = Message('unittest message',
+                    content_type='text/plain',
+                    application_headers={'foo': 7, 'bar': 'baz'})
+
+                ch.basic_publish(msg, 'unittest.fanout')
+
+            #
+            # check that the channel was closed
+            #
+            self.assertEqual(1 in conn.channels, False)
+            self.assertEqual(ch.is_open, False)
+
+        #
+        # Check that the connection was closed
+        #
+        self.assertEqual(conn.transport, None)
+
+
+def main():
+    suite = unittest.TestLoader().loadTestsFromTestCase(TestChannel)
+    unittest.TextTestRunner(**settings.test_args).run(suite)
+
+
+if __name__ == '__main__':
+    main()
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.