magicbus / magicbus / test / test_opsys.py

from magicbus._compat import ntob
import os
thismodule = os.path.abspath(__file__)
import sys

from magicbus import bus
from magicbus.plugins import loggers, opsys
from magicbus.test import assertEqual, Process, WebAdapter, WebService, WebHandler

#loggers.StdoutLogger(bus).subscribe()
pidfile = opsys.PIDFile(bus, os.path.join(thismodule + ".pid"))

class Handler(WebHandler):

    bus = bus

    def do_GET(self):
        if self.path == '/':
            self.respond("Hello World")
        elif self.path == '/pid':
            self.respond(str(os.getpid()))
        elif self.path == '/exit':
            self.respond("ok")
            self.bus.exit()
        else:
            self.respond(status=404)
service = WebService(handler_class=Handler)


class TestOpsys(object):

    def test_daemonize(self):
        if os.name not in ['posix']:
            return "skipped (not on posix)"

        # Spawn the process and wait, when this returns, the original process
        # is finished.  If it daemonized properly, we should still be able
        # to access pages.
        p = Process([sys.executable, thismodule, "daemonize"])
        p.start()
        pidfile.wait()
        try:
            # Just get the pid of the daemonization process.
            resp = service.do_GET("/pid")
            assertEqual(resp.status, 200)
            page_pid = int(resp.read())
            assertEqual(ntob(str(page_pid)), open(pidfile.pidfile, 'rb').read())
        finally:
            # Shut down the spawned process
            service.do_GET("/exit")
        pidfile.join()

        # Wait until here to test the exit code because we want to ensure
        # that we wait for the daemon to finish running before we fail.
        p.process.wait()
        if p.process.returncode != 0:
            raise AssertionError(
                "Daemonized parent process returned exit code %s." %
                p.process.returncode)


if __name__ == '__main__':
    mode = sys.argv[1]
    if mode == 'daemonize':
        opsys.Daemonizer(bus).subscribe()
    pidfile.subscribe()
    WebAdapter(bus, service).subscribe()
    bus.start()
    bus.block()
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.