Source

magicbus / magicbus / test / test_tasks.py

from magicbus._compat import ntob
from magicbus import bus
from magicbus.plugins import tasks
from magicbus.test import assertEqual, WebAdapter, WebService, WebHandler


class Handler(WebHandler):

    bus = bus

    def do_GET(self):
        if self.path == '/':
            self.respond("Hello World")
        elif self.path == '/graceful':
            self.bus.graceful()
            self.respond("app was (gracefully) restarted succesfully")
        elif self.path == '/ctrlc':
            raise KeyboardInterrupt
        else:
            self.respond(status=404)


class TestTasks(object):

    def test_thread_manager(self):
        bus.clear()

        service = WebService(handler_class=Handler)
        WebAdapter(bus, service).subscribe()

        tm = tasks.ThreadManager(bus)
        tm.subscribe()
        assertEqual(len(tm.threads), 0)

        # Test server start
        bus.start()
        try:
            assertEqual(bus.state, bus.states.STARTED)
            assertEqual(service.ready, True)
            assertEqual(len(tm.threads), 0)

            assertEqual(service.do_GET("/").read(), ntob("Hello World"))
            assertEqual(len(tm.threads), 1)

            # Test bus stop. This will also stop the WebService.
            bus.stop()
            assertEqual(bus.state, bus.states.STOPPED)

            # Verify that our custom stop function was called
            assertEqual(len(tm.threads), 0)
        finally:
            bus.exit()
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.