Robert Brewer avatar Robert Brewer committed 1675297

Hacking and slashing on the test suite

Comments (0)

Files changed (7)

-* To install, change to the directory where setup.py is located and type (python-2.3 or later needed):
+* To install, change to the directory where setup.py is located and
+  type (python-2.3 or later needed):
 
     python setup.py install
 
-* To run the regression tests, just go to the magicbus/test/ directory and type:
+* To run the regression tests, just go to the magicbus/test/ directory
+  and type:
 
     nosetests -s ./
 

magicbus/test/__init__.py

 Run 'nosetests -s test/' to exercise all tests.
 """
 
+def assertEqual(x, y, msg=None):
+    if not x == y:
+        raise AssertionError(msg or "%r != %r" % (x, y))
+

magicbus/test/helper.py

 """A library of helper functions for the magicbus test suite."""
 
-import os
-thisdir = os.path.abspath(os.path.dirname(__file__))
-
-import sys
-import time
-
-from magicbus._compat import basestring, ntob
-
-# --------------------------- Spawning helpers --------------------------- #
-
-
-class Process(object):
-    
-    pid_file = os.path.join(thisdir, 'test.pid')
-    config_file = os.path.join(thisdir, 'test.conf')
-    
-    def __init__(self, wait=False, daemonize=False, ssl=False, socket_host=None, socket_port=None):
-        self.wait = wait
-        self.daemonize = daemonize
-        self.ssl = ssl
-        self.host = socket_host or cherrypy.server.socket_host
-        self.port = socket_port or cherrypy.server.socket_port
-    
-    def start(self, imports=None):
-        """Start the subprocess."""
-        servers.wait_for_free_port(self.host, self.port)
-        
-        args = [sys.executable, os.path.join(thisdir, '..', 'cherryd'),
-                '-c', self.config_file, '-p', self.pid_file]
-        
-        if not isinstance(imports, (list, tuple)):
-            imports = [imports]
-        for i in imports:
-            if i:
-                args.append('-i')
-                args.append(i)
-        
-        if self.daemonize:
-            args.append('-d')
-
-        env = os.environ.copy()
-        # Make sure we import the cherrypy package in which this module is defined.
-        grandparentdir = os.path.abspath(os.path.join(thisdir, '..', '..'))
-        if env.get('PYTHONPATH', ''):
-            env['PYTHONPATH'] = os.pathsep.join((grandparentdir, env['PYTHONPATH']))
-        else:
-            env['PYTHONPATH'] = grandparentdir
-        if self.wait:
-            self.exit_code = os.spawnve(os.P_WAIT, sys.executable, args, env)
-        else:
-            os.spawnve(os.P_NOWAIT, sys.executable, args, env)
-            servers.wait_for_occupied_port(self.host, self.port)
-        
-        # Give the engine a wee bit more time to finish STARTING
-        if self.daemonize:
-            time.sleep(2)
-        else:
-            time.sleep(1)
-    
-    def get_pid(self):
-        return int(open(self.pid_file, 'rb').read())
-    
-    def join(self):
-        """Wait for the process to exit."""
-        try:
-            try:
-                # Mac, UNIX
-                os.wait()
-            except AttributeError:
-                # Windows
-                try:
-                    pid = self.get_pid()
-                except IOError:
-                    # Assume the subprocess deleted the pidfile on shutdown.
-                    pass
-                else:
-                    os.waitpid(pid, 0)
-        except OSError:
-            x = sys.exc_info()[1]
-            if x.args != (10, 'No child processes'):
-                raise
-

magicbus/test/test_states.py

 import threading
 import time
 
-import cherrypy
-engine = cherrypy.engine
+from magicbus import bus
 thisdir = os.path.join(os.getcwd(), os.path.dirname(__file__))
 
 
+class WebService:
+
+    def __init__(self, bus):
+        self.bus = bus
+        self.running = False
+
+    def subscribe(self):
+        self.bus.subscribe('start', self.start)
+        self.bus.subscribe('stop', self.stop)
+
+    def start(self):
+        self.running = True
+
+    def stop(self):
+        self.running = False
+
+    def __call__(self, environ, start_response):
+        self.bus.publish('acquire_thread')
+        try:
+            pi = environ["PATH_INFO"]
+            if pi == '/':
+                start_response('200 OK', [('Content-type', 'text/plain')])
+                return ["Hello world!"]
+            raise ValueError("Unknown URI")
+        finally:
+            self.bus.publish('release_thread')
+
+
 class Dependency:
 
     def __init__(self, bus):
     def stopthread(self, thread_id):
         del self.threads[thread_id]
 
-db_connection = Dependency(engine)
-
-def setup_server():
-    class Root:
-        def index(self):
-            return "Hello World"
-        index.exposed = True
 
         def ctrlc(self):
             raise KeyboardInterrupt()
         ctrlc.exposed = True
 
         def graceful(self):
-            engine.graceful()
+            bus.graceful()
             return "app was (gracefully) restarted succesfully"
         graceful.exposed = True
 
-        def block_explicit(self):
-            while True:
-                if cherrypy.response.timed_out:
-                    cherrypy.response.timed_out = False
-                    return "broken!"
-                time.sleep(0.01)
-        block_explicit.exposed = True
 
-        def block_implicit(self):
-            time.sleep(0.5)
-            return "response.timeout = %s" % cherrypy.response.timeout
-        block_implicit.exposed = True
+class StateTests(object):
 
-    cherrypy.tree.mount(Root())
-    cherrypy.config.update({
-        'environment': 'test_suite',
-        'engine.deadlock_poll_freq': 0.1,
-        })
+    def test_normal_flow(self):
+        bus.clear()
 
-    db_connection.subscribe()
+        service = WebService(bus)
 
-
-
-# ------------ Enough helpers. Time for real live test cases. ------------ #
-
-
-from cherrypy.test import helper
-
-class ServerStateTests(helper.CPWebCase):
-    setup_server = staticmethod(setup_server)
-
-    def setUp(self):
-        cherrypy.server.socket_timeout = 0.1
-        self.do_gc_test = False
-
-    def test_0_NormalStateFlow(self):
-        engine.stop()
         # Our db_connection should not be running
+        db_connection = Dependency(bus)
+        db_connection.subscribe()
         self.assertEqual(db_connection.running, False)
         self.assertEqual(db_connection.startcount, 1)
         self.assertEqual(len(db_connection.threads), 0)
 
         # Test server start
-        engine.start()
-        self.assertEqual(engine.state, engine.states.STARTED)
+        bus.start()
+        self.assertEqual(bus.state, bus.states.STARTED)
 
-        host = cherrypy.server.socket_host
-        port = cherrypy.server.socket_port
-        self.assertRaises(IOError, cherrypy._cpserver.check_port, host, port)
+        self.assertEqual(service.running, True)
 
         # The db_connection should be running now
         self.assertEqual(db_connection.running, True)
         self.assertEqual(db_connection.startcount, 2)
         self.assertEqual(len(db_connection.threads), 0)
 
-        self.getPage("/")
-        self.assertBody("Hello World")
+        response = service("/")
+        self.assertEqual(response, "Hello World")
         self.assertEqual(len(db_connection.threads), 1)
 
-        # Test engine stop. This will also stop the HTTP server.
-        engine.stop()
-        self.assertEqual(engine.state, engine.states.STOPPED)
+        # Test bus stop. This will also stop the HTTP server.
+        bus.stop()
+        self.assertEqual(bus.state, bus.states.STOPPED)
 
         # Verify that our custom stop function was called
         self.assertEqual(db_connection.running, False)
         def exittest():
             self.getPage("/")
             self.assertBody("Hello World")
-            engine.exit()
+            bus.exit()
         cherrypy.server.start()
-        engine.start_with_callback(exittest)
-        engine.block()
-        self.assertEqual(engine.state, engine.states.EXITING)
+        bus.start_with_callback(exittest)
+        bus.block()
+        self.assertEqual(bus.state, bus.states.EXITING)
 
     def test_1_Restart(self):
         cherrypy.server.start()
-        engine.start()
+        bus.start()
 
         # The db_connection should be running now
         self.assertEqual(db_connection.running, True)
         self.assertEqual(len(db_connection.threads), 1)
 
         # Test server restart from this thread
-        engine.graceful()
-        self.assertEqual(engine.state, engine.states.STARTED)
+        bus.graceful()
+        self.assertEqual(bus.state, bus.states.STARTED)
         self.getPage("/")
         self.assertBody("Hello World")
         self.assertEqual(db_connection.running, True)
 
         # Test server restart from inside a page handler
         self.getPage("/graceful")
-        self.assertEqual(engine.state, engine.states.STARTED)
+        self.assertEqual(bus.state, bus.states.STARTED)
         self.assertBody("app was (gracefully) restarted succesfully")
         self.assertEqual(db_connection.running, True)
         self.assertEqual(db_connection.gracecount, grace + 2)
         # Note that the "/graceful" request has been flushed.
         self.assertEqual(len(db_connection.threads), 0)
 
-        engine.stop()
-        self.assertEqual(engine.state, engine.states.STOPPED)
+        bus.stop()
+        self.assertEqual(bus.state, bus.states.STOPPED)
         self.assertEqual(db_connection.running, False)
         self.assertEqual(len(db_connection.threads), 0)
 
     def test_2_KeyboardInterrupt(self):
         # Raise a keyboard interrupt in the HTTP server's main thread.
         # We must start the server in this, the main thread
-        engine.start()
+        bus.start()
         cherrypy.server.start()
 
         self.persistent = True
             self.assertNoHeader("Connection")
 
             cherrypy.server.httpserver.interrupt = KeyboardInterrupt
-            engine.block()
+            bus.block()
 
             self.assertEqual(db_connection.running, False)
             self.assertEqual(len(db_connection.threads), 0)
-            self.assertEqual(engine.state, engine.states.EXITING)
+            self.assertEqual(bus.state, bus.states.EXITING)
         finally:
             self.persistent = False
 
         # servers, this should occur in one of the worker threads.
         # This should raise a BadStatusLine error, since the worker
         # thread will just die without writing a response.
-        engine.start()
+        bus.start()
         cherrypy.server.start()
 
         try:
             print(self.body)
             self.fail("AssertionError: BadStatusLine not raised")
 
-        engine.block()
+        bus.block()
         self.assertEqual(db_connection.running, False)
         self.assertEqual(len(db_connection.threads), 0)
 
-    def test_3_Deadlocks(self):
-        cherrypy.config.update({'response.timeout': 0.2})
-
-        engine.start()
-        cherrypy.server.start()
-        try:
-            self.assertNotEqual(engine.timeout_monitor.thread, None)
-
-            # Request a "normal" page.
-            self.assertEqual(engine.timeout_monitor.servings, [])
-            self.getPage("/")
-            self.assertBody("Hello World")
-            # request.close is called async.
-            while engine.timeout_monitor.servings:
-                sys.stdout.write(".")
-                time.sleep(0.01)
-
-            # Request a page that explicitly checks itself for deadlock.
-            # The deadlock_timeout should be 2 secs.
-            self.getPage("/block_explicit")
-            self.assertBody("broken!")
-
-            # Request a page that implicitly breaks deadlock.
-            # If we deadlock, we want to touch as little code as possible,
-            # so we won't even call handle_error, just bail ASAP.
-            self.getPage("/block_implicit")
-            self.assertStatus(500)
-            self.assertInBody("raise cherrypy.TimeoutError()")
-        finally:
-            engine.exit()
-
     def test_4_Autoreload(self):
         # Start the demo script in a new process
         p = helper.CPProcess(ssl=(self.scheme.lower()=='https'))
         p.write_conf(
                 extra='test_case_name: "test_4_Autoreload"')
-        p.start(imports='cherrypy.test._test_states_demo')
+        p.start(imports='magicbus.test._test_states_demo')
         try:
             self.getPage("/start")
             start = float(self.body)
         p.join()
 
     def test_5_Start_Error(self):
-        # If a process errors during start, it should stop the engine
+        # If a process errors during start, it should stop the bus
         # and exit with a non-zero exit code.
         p = helper.CPProcess(ssl=(self.scheme.lower()=='https'),
                              wait=True)
 test_case_name: "test_5_Start_Error"
 """
         )
-        p.start(imports='cherrypy.test._test_states_demo')
+        p.start(imports='magicbus.test._test_states_demo')
         if p.exit_code == 0:
             self.fail("Process failed to return nonzero exit code.")
 
                              socket_port=8081)
         p.write_conf(
              extra='test_case_name: "test_daemonize"')
-        p.start(imports='cherrypy.test._test_states_demo')
+        p.start(imports='magicbus.test._test_states_demo')
         try:
             # Just get the pid of the daemonization process.
             self.getPage("/pid")
         p = helper.CPProcess(ssl=(self.scheme.lower()=='https'))
         p.write_conf(
                 extra='test_case_name: "test_SIGHUP_tty"')
-        p.start(imports='cherrypy.test._test_states_demo')
+        p.start(imports='magicbus.test._test_states_demo')
         # Send a SIGHUP
         os.kill(p.get_pid(), SIGHUP)
         # This might hang if things aren't working right, but meh.
                              wait=True, daemonize=True)
         p.write_conf(
              extra='test_case_name: "test_SIGHUP_daemonized"')
-        p.start(imports='cherrypy.test._test_states_demo')
+        p.start(imports='magicbus.test._test_states_demo')
 
         pid = p.get_pid()
         try:
         p = helper.CPProcess(ssl=(self.scheme.lower()=='https'))
         p.write_conf(
                 extra='test_case_name: "test_SIGTERM"')
-        p.start(imports='cherrypy.test._test_states_demo')
+        p.start(imports='magicbus.test._test_states_demo')
         # Send a SIGTERM
         os.kill(p.get_pid(), SIGTERM)
         # This might hang if things aren't working right, but meh.
                                  wait=True, daemonize=True)
             p.write_conf(
                  extra='test_case_name: "test_SIGTERM_2"')
-            p.start(imports='cherrypy.test._test_states_demo')
+            p.start(imports='magicbus.test._test_states_demo')
             # Send a SIGTERM
             os.kill(p.get_pid(), SIGTERM)
             # This might hang if things aren't working right, but meh.
             extra="""unsubsig: True
 test_case_name: "test_signal_handler_unsubscribe"
 """)
-        p.start(imports='cherrypy.test._test_states_demo')
+        p.start(imports='magicbus.test._test_states_demo')
         # Send a SIGTERM
         os.kill(p.get_pid(), SIGTERM)
         # This might hang if things aren't working right, but meh.

magicbus/test/test_tasks.py

+from magicbus._compat import BadStatusLine, ntob
+import os
+import sys
+import threading
+import time
+
+from magicbus import bus
+from magicbus.plugins import tasks
+from magicbus.test import assertEqual
+thisdir = os.path.join(os.getcwd(), os.path.dirname(__file__))
+
+
+class WebService:
+
+    def __init__(self, bus):
+        self.bus = bus
+        self.running = False
+
+    def subscribe(self):
+        self.bus.subscribe('start', self.start)
+        self.bus.subscribe('stop', self.stop)
+
+    def start(self):
+        self.running = True
+
+    def stop(self):
+        self.running = False
+
+    def __call__(self, path):
+        self.bus.publish('acquire_thread')
+        if path == '/':
+            return "Hello World"
+        elif path == '/graceful':
+            bus.graceful()
+            return "app was (gracefully) restarted succesfully"
+        elif path == '/ctrlc':
+            raise KeyboardInterrupt
+        raise ValueError("Unknown URI")
+
+
+class Dependency:
+
+    def __init__(self, bus):
+        self.bus = bus
+        self.running = False
+        self.startcount = 0
+        self.gracecount = 0
+        self.threads = {}
+
+    def subscribe(self):
+        self.bus.subscribe('start', self.start)
+        self.bus.subscribe('stop', self.stop)
+        self.bus.subscribe('graceful', self.graceful)
+        self.bus.subscribe('start_thread', self.startthread)
+        self.bus.subscribe('stop_thread', self.stopthread)
+
+    def start(self):
+        self.running = True
+        self.startcount += 1
+
+    def stop(self):
+        self.running = False
+
+    def graceful(self):
+        self.gracecount += 1
+
+    def startthread(self, thread_id):
+        self.threads[thread_id] = None
+
+    def stopthread(self, thread_id):
+        del self.threads[thread_id]
+
+
+class TestTasks(object):
+
+    def test_thread_manager(self):
+        bus.clear()
+        thread_manager = tasks.ThreadManager(bus)
+        thread_manager.subscribe()
+        service = WebService(bus)
+        service.subscribe()
+        db = Dependency(bus)
+        db.subscribe()
+
+        # Our db should not be running
+        assertEqual(db.running, False)
+        assertEqual(db.startcount, 0)
+        assertEqual(len(db.threads), 0)
+
+        # Test server start
+        bus.start()
+        assertEqual(bus.state, bus.states.STARTED)
+        assertEqual(service.running, True)
+
+        # The db should be running now
+        assertEqual(db.running, True)
+        assertEqual(db.startcount, 1)
+        assertEqual(len(db.threads), 0)
+
+        assertEqual(service("/"), "Hello World")
+        assertEqual(len(db.threads), 1)
+
+        # Test bus stop. This will also stop the WebService.
+        bus.stop()
+        assertEqual(bus.state, bus.states.STOPPED)
+
+        # Verify that our custom stop function was called
+        assertEqual(db.running, False)
+        assertEqual(len(db.threads), 0)
+
+        # Block the main thread now and verify that exit() works.
+        def exittest():
+            assertEqual(service("/"), "Hello World")
+            bus.exit()
+        bus.start_with_callback(exittest)
+        bus.block()
+        assertEqual(bus.state, bus.states.EXITING)
+
+    def test_restart(self):
+        bus.clear()
+        thread_manager = tasks.ThreadManager(bus)
+        thread_manager.subscribe()
+        service = WebService(bus)
+        service.subscribe()
+        db = Dependency(bus)
+        db.subscribe()
+
+        bus.start()
+
+        # The db should be running now
+        assertEqual(db.running, True)
+        grace = db.gracecount
+
+        assertEqual(service("/"), "Hello World")
+        assertEqual(len(db.threads), 1)
+
+        # Test server restart from this thread
+        bus.graceful()
+        assertEqual(bus.state, bus.states.STARTED)
+
+        assertEqual(service("/"), "Hello World")
+        assertEqual(db.running, True)
+        assertEqual(db.gracecount, grace + 1)
+        assertEqual(len(db.threads), 1)
+
+        # Test server restart from inside a page handler
+        result = service("/graceful")
+        assertEqual(bus.state, bus.states.STARTED)
+        assertEqual(result, "app was (gracefully) restarted succesfully")
+        assertEqual(db.running, True)
+        assertEqual(db.gracecount, grace + 2)
+        # Since we are requesting synchronously, is only one thread used?
+        # Note that the "/graceful" request has been flushed.
+        assertEqual(len(db.threads), 0)
+
+        bus.stop()
+        assertEqual(bus.state, bus.states.STOPPED)
+        assertEqual(db.running, False)
+        assertEqual(len(db.threads), 0)
+

magicbus/wspbus.py

             listeners.discard(callback)
             del self._priorities[(channel, callback)]
     
+    def clear(self):
+        """Discard all subscribed callbacks."""
+        self.execv = False
+        self.state = states.STOPPED
+        # Use items() as a snapshot instead of while+pop so that callers
+        # can be slightly lax in subscribing new listeners while the old
+        # ones are being removed.
+        for channel, listeners in self.listeners.items():
+            for callback in list(listeners):
+                listeners.discard(callback)
+                del self._priorities[(channel, callback)]
+    
     def publish(self, channel, *args, **kwargs):
         """Return output of all subscribers for the given channel."""
         if channel not in self.listeners:
     from distutils.core import setup
 
 from distutils.command.install import INSTALL_SCHEMES
-from distutils.command.build_py import build_py
 import sys
-import os
-import re
 
 ###############################################################################
 # arguments for the setup command
     ('magicbus/test', []),
 ]
 
-if sys.version_info >= (3, 0):
-    required_python_version = '3.0'
-else:
-    required_python_version = '2.3'
-
 ###############################################################################
 # end arguments for setup
 ###############################################################################
     data_files = [(r'\PURELIB\%s' % path, files) for path, files in data_files]
 
 def main():
-    if sys.version < required_python_version:
-        s = "I'm sorry, but %s %s requires Python %s or later."
-        print(s % (name, version, required_python_version))
+    if sys.version < '2.4':
+        s = "I'm sorry, but %s %s requires Python 2.4 or later."
+        print(s % (name, version))
         sys.exit(1)
     # set default location for "data_files" to
     # platform specific "site-packages" location
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.