Robert Brewer avatar Robert Brewer committed 08c4591

New test_opsys.py. New bus.debug attribute.

Comments (0)

Files changed (6)

magicbus/_compat.py

     # Python 3
     from http.client import BadStatusLine
 
+try:
+    from http.server import HTTPServer, BaseHTTPRequestHandler as HTTPHandler
+except ImportError:
+    from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler as HTTPHandler
+
+try:
+    from http.client import HTTPConnection
+except ImportError:
+    from httplib import HTTPConnection
+
 import threading
 if hasattr(threading.Thread, "daemon"):
     # Python 2.6+

magicbus/plugins/opsys.py

 import os
 import sys
 import threading
+import time
 
 from magicbus.plugins import SimplePlugin
 from magicbus._compat import basestring, ntob
             raise
         except:
             pass
+    
+    def wait(self, timeout=None, poll_interval=0.1):
+        """Return when the PID file exists, or the timeout expires."""
+        starttime = time.time()
+        while timeout is None or time.time() - starttime <= timeout:
+            if os.path.exists(self.pidfile):
+                return
+            time.sleep(poll_interval)
+    
+    def join(self, timeout=None, poll_interval=0.1):
+        """Return when the PID file does not exist, or the timeout expires."""
+        starttime = time.time()
+        while timeout is None or time.time() - starttime <= timeout:
+            if not os.path.exists(self.pidfile):
+                return
+            time.sleep(poll_interval)
 
+
+

magicbus/test/__init__.py

 Run 'nosetests -s test/' to exercise all tests.
 """
 
+from magicbus._compat import HTTPServer, HTTPConnection, HTTPHandler
+from subprocess import Popen
+import threading
+
+
 def assertEqual(x, y, msg=None):
     if not x == y:
         raise AssertionError(msg or "%r != %r" % (x, y))
 
+
+class Process(object):
+
+    def __init__(self, args):
+        self.args = args
+        self.process = None
+
+    def start(self):
+        # Exceptions in the child will be re-raised in the parent,
+        # so if yyou're expecting one, trap this call and check for it.
+        self.process = Popen(self.args)
+
+    def stop(self):
+        if self.process is not None:
+            self.process.kill()
+
+    def join(self):
+        return self.process.wait()
+
+
+class WebServer(HTTPServer):
+
+    def stop(self):
+        """Stops the serve_forever loop without waiting."""
+        # Sigh. Really, standard library, really? Double underscores?
+        self._BaseServer__shutdown_request = True
+
+
+class WebService(object):
+
+    def __init__(self, bus, address=('127.0.0.1', 8000), handler_class=None):
+        self.bus = bus
+        self.address = address
+        self.handler_class = handler_class
+        self.httpd = None
+        self.running = False
+
+    def subscribe(self):
+        self.bus.subscribe('start', self.start)
+        self.bus.subscribe('stop', self.stop)
+
+    def start(self):
+        self.httpd = WebServer(self.address, self.handler_class)
+        threading.Thread(target=self.httpd.serve_forever).start()
+        self.running = True
+    # Make sure we start httpd after the daemonizer.
+    start.priority = 75
+
+    def stop(self):
+        if self.httpd is not None:
+            self.httpd.stop()
+        self.running = False
+    stop.priority = 25
+
+    def do_GET(self, uri):
+        conn = HTTPConnection(*self.address)
+        try:
+            conn.request("GET", uri)
+            return conn.getresponse()
+        finally:
+            conn.close()
+
+
+class WebHandler(HTTPHandler):
+
+    def log_request(self, code="-", size="-"):
+        if self.bus.debug:
+            HTTPHandler.log_request(self, code, size)
+
+    def respond(self, body=None, status=200, headers=None):
+        if headers is None:
+            headers = []
+        if body is not None:
+            if isinstance(body, str):
+                body = body.encode('utf-8')
+            if 'Content-Length' not in (k for k, v in headers):
+                headers.append(('Content-Length', str(len(body))))
+        self.send_response(status)
+        for k, v in headers:
+            self.send_header(k, v)
+        self.end_headers()
+        if body is not None:
+            self.wfile.write(body)
+
+    def handle(self, *args, **kwargs):
+        self.bus.publish('acquire_thread')
+        HTTPHandler.handle(self, *args, **kwargs)
+
+
+class Counter(object):
+
+    def __init__(self, bus):
+        self.bus = bus
+        self.running = False
+        self.startcount = 0
+        self.gracecount = 0
+        self.threads = {}
+
+    def subscribe(self):
+        self.bus.subscribe('start', self.start)
+        self.bus.subscribe('stop', self.stop)
+        self.bus.subscribe('graceful', self.graceful)
+        self.bus.subscribe('start_thread', self.startthread)
+        self.bus.subscribe('stop_thread', self.stopthread)
+
+    def start(self):
+        self.running = True
+        self.startcount += 1
+
+    def stop(self):
+        self.running = False
+
+    def graceful(self):
+        self.gracecount += 1
+
+    def startthread(self, thread_id):
+        self.threads[thread_id] = None
+
+    def stopthread(self, thread_id):
+        del self.threads[thread_id]
+

magicbus/test/test_opsys.py

+from magicbus._compat import ntob
+import os
+thismodule = os.path.abspath(__file__)
+import sys
+
+from magicbus import bus
+from magicbus.plugins import loggers, opsys
+from magicbus.test import assertEqual, Process, WebService, WebHandler
+
+#loggers.StdoutLogger(bus).subscribe()
+pidfile = opsys.PIDFile(bus, os.path.join(thismodule + ".pid"))
+
+class Handler(WebHandler):
+
+    bus = bus
+
+    def do_GET(self):
+        if self.path == '/':
+            self.respond("Hello World")
+        elif self.path == '/pid':
+            self.respond(str(os.getpid()))
+        elif self.path == '/exit':
+            self.respond("ok")
+            self.bus.exit()
+        else:
+            self.respond(status=404)
+service = WebService(bus, handler_class=Handler)
+
+
+class TestOpsys(object):
+
+    def test_daemonize(self):
+        if os.name not in ['posix']:
+            return self.skip("skipped (not on posix) ")
+
+        # Spawn the process and wait, when this returns, the original process
+        # is finished.  If it daemonized properly, we should still be able
+        # to access pages.
+        p = Process([sys.executable, thismodule, "daemonize"])
+        p.start()
+        pidfile.wait()
+        try:
+            # Just get the pid of the daemonization process.
+            resp = service.do_GET("/pid")
+            assertEqual(resp.status, 200)
+            page_pid = int(resp.read())
+            assertEqual(ntob(str(page_pid)), open(pidfile.pidfile, 'rb').read())
+        finally:
+            # Shut down the spawned process
+            service.do_GET("/exit")
+        pidfile.join()
+
+        # Wait until here to test the exit code because we want to ensure
+        # that we wait for the daemon to finish running before we fail.
+        p.process.wait()
+        if p.process.returncode != 0:
+            raise AssertionError(
+                "Daemonized parent process returned exit code %s." %
+                p.process.returncode)
+
+
+if __name__ == '__main__':
+    mode = sys.argv[1]
+    if mode == 'daemonize':
+        opsys.Daemonizer(bus).subscribe()
+    pidfile.subscribe()
+    service.subscribe()
+    bus.start()
+    bus.block()
+

magicbus/test/test_tasks.py

-from magicbus._compat import BadStatusLine, ntob
-import os
-import sys
-import threading
-import time
-
+from magicbus._compat import ntob
 from magicbus import bus
 from magicbus.plugins import tasks
-from magicbus.test import assertEqual
-thisdir = os.path.join(os.getcwd(), os.path.dirname(__file__))
+from magicbus.test import assertEqual, Counter, WebService, WebHandler
 
 
-class WebService:
+class Handler(WebHandler):
 
-    def __init__(self, bus):
-        self.bus = bus
-        self.running = False
+    bus = bus
 
-    def subscribe(self):
-        self.bus.subscribe('start', self.start)
-        self.bus.subscribe('stop', self.stop)
-
-    def start(self):
-        self.running = True
-
-    def stop(self):
-        self.running = False
-
-    def __call__(self, path):
-        self.bus.publish('acquire_thread')
-        if path == '/':
-            return "Hello World"
-        elif path == '/graceful':
-            bus.graceful()
-            return "app was (gracefully) restarted succesfully"
-        elif path == '/ctrlc':
+    def do_GET(self):
+        if self.path == '/':
+            self.respond("Hello World")
+        elif self.path == '/graceful':
+            self.bus.graceful()
+            self.respond("app was (gracefully) restarted succesfully")
+        elif self.path == '/ctrlc':
             raise KeyboardInterrupt
-        raise ValueError("Unknown URI")
-
-
-class Dependency:
-
-    def __init__(self, bus):
-        self.bus = bus
-        self.running = False
-        self.startcount = 0
-        self.gracecount = 0
-        self.threads = {}
-
-    def subscribe(self):
-        self.bus.subscribe('start', self.start)
-        self.bus.subscribe('stop', self.stop)
-        self.bus.subscribe('graceful', self.graceful)
-        self.bus.subscribe('start_thread', self.startthread)
-        self.bus.subscribe('stop_thread', self.stopthread)
-
-    def start(self):
-        self.running = True
-        self.startcount += 1
-
-    def stop(self):
-        self.running = False
-
-    def graceful(self):
-        self.gracecount += 1
-
-    def startthread(self, thread_id):
-        self.threads[thread_id] = None
-
-    def stopthread(self, thread_id):
-        del self.threads[thread_id]
+        else:
+            self.respond(status=404)
 
 
 class TestTasks(object):
 
     def test_thread_manager(self):
         bus.clear()
-        thread_manager = tasks.ThreadManager(bus)
-        thread_manager.subscribe()
-        service = WebService(bus)
+
+        service = WebService(bus, handler_class=Handler)
         service.subscribe()
-        db = Dependency(bus)
-        db.subscribe()
 
-        # Our db should not be running
-        assertEqual(db.running, False)
-        assertEqual(db.startcount, 0)
-        assertEqual(len(db.threads), 0)
+        tm = tasks.ThreadManager(bus)
+        tm.subscribe()
+        assertEqual(len(tm.threads), 0)
 
         # Test server start
         bus.start()
-        assertEqual(bus.state, bus.states.STARTED)
-        assertEqual(service.running, True)
+        try:
+            assertEqual(bus.state, bus.states.STARTED)
+            assertEqual(service.running, True)
+            assertEqual(len(tm.threads), 0)
 
-        # The db should be running now
-        assertEqual(db.running, True)
-        assertEqual(db.startcount, 1)
-        assertEqual(len(db.threads), 0)
+            assertEqual(service.do_GET("/").read(), ntob("Hello World"))
+            assertEqual(len(tm.threads), 1)
 
-        assertEqual(service("/"), "Hello World")
-        assertEqual(len(db.threads), 1)
+            # Test bus stop. This will also stop the WebService.
+            bus.stop()
+            assertEqual(bus.state, bus.states.STOPPED)
 
-        # Test bus stop. This will also stop the WebService.
-        bus.stop()
-        assertEqual(bus.state, bus.states.STOPPED)
+            # Verify that our custom stop function was called
+            assertEqual(len(tm.threads), 0)
+        finally:
+            bus.exit()
 
-        # Verify that our custom stop function was called
-        assertEqual(db.running, False)
-        assertEqual(len(db.threads), 0)
-
-        # Block the main thread now and verify that exit() works.
-        def exittest():
-            assertEqual(service("/"), "Hello World")
-            bus.exit()
-        bus.start_with_callback(exittest)
-        bus.block()
-        assertEqual(bus.state, bus.states.EXITING)
-
-    def test_restart(self):
-        bus.clear()
-        thread_manager = tasks.ThreadManager(bus)
-        thread_manager.subscribe()
-        service = WebService(bus)
-        service.subscribe()
-        db = Dependency(bus)
-        db.subscribe()
-
-        bus.start()
-
-        # The db should be running now
-        assertEqual(db.running, True)
-        grace = db.gracecount
-
-        assertEqual(service("/"), "Hello World")
-        assertEqual(len(db.threads), 1)
-
-        # Test server restart from this thread
-        bus.graceful()
-        assertEqual(bus.state, bus.states.STARTED)
-
-        assertEqual(service("/"), "Hello World")
-        assertEqual(db.running, True)
-        assertEqual(db.gracecount, grace + 1)
-        assertEqual(len(db.threads), 1)
-
-        # Test server restart from inside a page handler
-        result = service("/graceful")
-        assertEqual(bus.state, bus.states.STARTED)
-        assertEqual(result, "app was (gracefully) restarted succesfully")
-        assertEqual(db.running, True)
-        assertEqual(db.gracecount, grace + 2)
-        # Since we are requesting synchronously, is only one thread used?
-        # Note that the "/graceful" request has been flushed.
-        assertEqual(len(db.threads), 0)
-
-        bus.stop()
-        assertEqual(bus.state, bus.states.STOPPED)
-        assertEqual(db.running, False)
-        assertEqual(len(db.threads), 0)
-

magicbus/wspbus.py

     
     states = states
     state = states.STOPPED
+    debug = False
     execv = False
     max_cloexec_files = max_files
     
             [(channel, set()) for channel
              in ('start', 'stop', 'exit', 'graceful', 'log', 'main')])
         self._priorities = {}
+        self.debug = False
     
     def subscribe(self, channel, callback, priority=None):
         """Add the given callback at the given channel (if not present)."""
             items.sort()
         for priority, listener in items:
             try:
-                output.append(listener(*args, **kwargs))
+                if self.debug and channel != 'log':
+                    self.log("Publishing to %s: %s(*%s, **%s)" %
+                             (channel, listener, args, kwargs))
+                result = listener(*args, **kwargs)
+                if self.debug and channel != 'log':
+                    self.log("Publishing to %s: %s(*%s, **%s) = %s" %
+                             (channel, listener, args, kwargs, result))
+                output.append(result)
             except KeyboardInterrupt:
                 raise
             except SystemExit:
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.