Commits

Robert Brewer committed 644465d

New loggers.FileLogger. New test_signals.py

Comments (0)

Files changed (7)

magicbus/plugins/loggers.py

 """Logging plugins for magicbus."""
-
+from magicbus._compat import ntob
+import datetime
 import sys
 
 from magicbus.plugins import SimplePlugin
             sys.stdout.flush()
 
 
+class FileLogger(SimplePlugin):
 
+    def __init__(self, bus, filename=None, file=None, encoding='utf8', level=None):
+        SimplePlugin.__init__(self, bus)
+        self.filename = filename
+        self.file = file
+        self.encoding = encoding
+        self.level = level
+
+    def start(self):
+        if self.filename is not None:
+            self.file = open(self.filename, 'wb')
+    start.priority = 0
+
+    def log(self, msg, level):
+        if (self.level is None or self.level <= level) and self.file is not None:
+            if isinstance(msg, str):
+                msg = msg.encode(self.encoding)
+            self.file.write(ntob('[%s] ' % datetime.datetime.now().isoformat())
+                            + msg + ntob('\n'))
+            self.file.flush()
+
+    def stop(self):
+        if self.filename is not None:
+            self.file.close()
+            self.file = None
+    stop.priority = 100
+

magicbus/plugins/opsys.py

             pass
     
     def wait(self, timeout=None, poll_interval=0.1):
-        """Return when the PID file exists, or the timeout expires."""
+        """Return the PID when the file exists, or None when timeout expires."""
         starttime = time.time()
         while timeout is None or time.time() - starttime <= timeout:
             if os.path.exists(self.pidfile):
-                return
+                return int(open(self.pidfile, 'rb').read())
             time.sleep(poll_interval)
     
     def join(self, timeout=None, poll_interval=0.1):
                 return
             time.sleep(poll_interval)
 
-
-

magicbus/plugins/signalhandler.py

             self.handlers['SIGINT'] = self._jython_SIGINT_handler
 
         self._previous_handlers = {}
-    
+
     def _jython_SIGINT_handler(self, signum=None, frame=None):
         # See http://bugs.jython.org/issue1313
         self.bus.log('Keyboard Interrupt: shutting down bus')
         self.bus.exit()
-        
+
     def subscribe(self):
+        self.bus.subscribe('start', self.subscribe_handlers)
+
+    def subscribe_handlers(self):
         """Subscribe self.handlers to signals."""
         for sig, func in self.handlers.items():
             try:
                 self.set_handler(sig, func)
             except ValueError:
                 pass
-    
-    def unsubscribe(self):
+    # Only run after Daemonizer.start
+    subscribe_handlers.priority = 70
+
+    def unsubscribe_handlers(self):
         """Unsubscribe self.handlers from signals."""
         for signum, handler in self._previous_handlers.items():
             signame = self.signals[signum]
         signame = self.signals[signum]
         self.bus.log("Caught signal %s." % signame)
         self.bus.publish(signame)
-    
+
     def handle_SIGHUP(self):
         """Restart if daemonized, else exit."""
         if os.isatty(sys.stdin.fileno()):

magicbus/test/__init__.py

     if not x == y:
         raise AssertionError(msg or "%r != %r" % (x, y))
 
+def assertNotEqual(x, y, msg=None):
+    if x == y:
+        raise AssertionError(msg or "%r == %r" % (x, y))
+
 
 class Process(object):
 

magicbus/test/test_opsys.py

 
     def test_daemonize(self):
         if os.name not in ['posix']:
-            return self.skip("skipped (not on posix) ")
+            return "skipped (not on posix)"
 
         # Spawn the process and wait, when this returns, the original process
         # is finished.  If it daemonized properly, we should still be able

magicbus/test/test_signals.py

+from magicbus._compat import ntob
+import errno
+import os
+thismodule = os.path.abspath(__file__)
+import sys
+import time
+
+from magicbus import bus
+from magicbus.plugins import loggers, opsys, signalhandler
+from magicbus.test import assertEqual, assertNotEqual
+from magicbus.test import Process
+
+loggers.FileLogger(bus, "test_signals.log").subscribe()
+pidfile = opsys.PIDFile(bus, os.path.join(thismodule + ".pid"))
+
+
+class TestSignalHandling(object):
+
+    def test_SIGHUP_tty(self):
+        # When not daemonized, SIGHUP should exit the process.
+        try:
+            from signal import SIGHUP
+        except ImportError:
+            return "skipped (no SIGHUP)"
+
+        try:
+            from os import kill
+        except ImportError:
+            return "skipped (no os.kill)"
+
+        p = Process([sys.executable, thismodule, "tty"])
+        p.start()
+        pid = pidfile.wait()
+        os.kill(pid, SIGHUP)
+        pidfile.join()
+
+    def test_SIGHUP_daemonized(self):
+        # When daemonized, SIGHUP should restart the server.
+        try:
+            from signal import SIGHUP, SIGTERM, SIGKILL
+        except ImportError:
+            return "skipped (no SIGHUP)"
+
+        try:
+            from os import kill
+        except ImportError:
+            return "skipped (no os.kill)"
+
+        if os.name not in ['posix']:
+            return "skipped (not on posix)"
+
+        p = Process([sys.executable, thismodule, "daemonize"])
+        p.start()
+        pid = pidfile.wait()
+        os.kill(pid, SIGHUP)
+
+        # Give the server some time to restart
+        time.sleep(1)
+        new_pid = pidfile.wait()
+        assertNotEqual(new_pid, pid)
+        os.kill(new_pid, SIGTERM)
+        pidfile.join()
+
+    def test_SIGTERM_tty(self):
+        # SIGTERM should shut down the server whether daemonized or not.
+        try:
+            from signal import SIGTERM
+        except ImportError:
+            return "skipped (no SIGTERM)"
+
+        try:
+            from os import kill
+        except ImportError:
+            return "skipped (no os.kill)"
+
+        # Spawn a normal, undaemonized process.
+        p = Process([sys.executable, thismodule, "tty"])
+        p.start()
+        pid = pidfile.wait()
+        os.kill(pid, SIGTERM)
+        pidfile.join()
+
+    def test_SIGTERM_daemonized(self):
+        # SIGTERM should shut down the server whether daemonized or not.
+        try:
+            from signal import SIGTERM
+        except ImportError:
+            return "skipped (no SIGTERM)"
+
+        try:
+            from os import kill
+        except ImportError:
+            return "skipped (no os.kill)"
+
+        if os.name not in ['posix']:
+            return "skipped (not on posix)"
+
+        # Spawn a daemonized process and test again.
+        p = Process([sys.executable, thismodule, "daemonize"])
+        p.start()
+        pid = pidfile.wait()
+        os.kill(pid, SIGTERM)
+        pidfile.join()
+
+
+if __name__ == '__main__':
+    mode = sys.argv[1]
+    if mode == 'daemonize':
+        opsys.Daemonizer(bus).subscribe()
+    pidfile.subscribe()
+    signalhandler.SignalHandler(bus).subscribe()
+    bus.debug = True
+    bus.start()
+    bus.block()
+

magicbus/test/test_states.py

 thisdir = os.path.join(os.getcwd(), os.path.dirname(__file__))
 
 
-class WebService:
-
-    def __init__(self, bus):
-        self.bus = bus
-        self.running = False
-
-    def subscribe(self):
-        self.bus.subscribe('start', self.start)
-        self.bus.subscribe('stop', self.stop)
-
-    def start(self):
-        self.running = True
-
-    def stop(self):
-        self.running = False
-
-    def __call__(self, environ, start_response):
-        self.bus.publish('acquire_thread')
-        try:
-            pi = environ["PATH_INFO"]
-            if pi == '/':
-                start_response('200 OK', [('Content-type', 'text/plain')])
-                return ["Hello world!"]
-            raise ValueError("Unknown URI")
-        finally:
-            self.bus.publish('release_thread')
-
-
-class Dependency:
-
-    def __init__(self, bus):
-        self.bus = bus
-        self.running = False
-        self.startcount = 0
-        self.gracecount = 0
-        self.threads = {}
-
-    def subscribe(self):
-        self.bus.subscribe('start', self.start)
-        self.bus.subscribe('stop', self.stop)
-        self.bus.subscribe('graceful', self.graceful)
-        self.bus.subscribe('start_thread', self.startthread)
-        self.bus.subscribe('stop_thread', self.stopthread)
-
-    def start(self):
-        self.running = True
-        self.startcount += 1
-
-    def stop(self):
-        self.running = False
-
-    def graceful(self):
-        self.gracecount += 1
-
-    def startthread(self, thread_id):
-        self.threads[thread_id] = None
-
-    def stopthread(self, thread_id):
-        del self.threads[thread_id]
-
-
-        def ctrlc(self):
-            raise KeyboardInterrupt()
-        ctrlc.exposed = True
-
-        def graceful(self):
-            bus.graceful()
-            return "app was (gracefully) restarted succesfully"
-        graceful.exposed = True
-
-
 class StateTests(object):
 
-    def test_normal_flow(self):
-        bus.clear()
-
-        service = WebService(bus)
-
-        # Our db_connection should not be running
-        db_connection = Dependency(bus)
-        db_connection.subscribe()
-        self.assertEqual(db_connection.running, False)
-        self.assertEqual(db_connection.startcount, 1)
-        self.assertEqual(len(db_connection.threads), 0)
-
-        # Test server start
-        bus.start()
-        self.assertEqual(bus.state, bus.states.STARTED)
-
-        self.assertEqual(service.running, True)
-
-        # The db_connection should be running now
-        self.assertEqual(db_connection.running, True)
-        self.assertEqual(db_connection.startcount, 2)
-        self.assertEqual(len(db_connection.threads), 0)
-
-        response = service("/")
-        self.assertEqual(response, "Hello World")
-        self.assertEqual(len(db_connection.threads), 1)
-
-        # Test bus stop. This will also stop the HTTP server.
-        bus.stop()
-        self.assertEqual(bus.state, bus.states.STOPPED)
-
-        # Verify that our custom stop function was called
-        self.assertEqual(db_connection.running, False)
-        self.assertEqual(len(db_connection.threads), 0)
-
-        # Block the main thread now and verify that exit() works.
-        def exittest():
-            self.getPage("/")
-            self.assertBody("Hello World")
-            bus.exit()
-        cherrypy.server.start()
-        bus.start_with_callback(exittest)
-        bus.block()
-        self.assertEqual(bus.state, bus.states.EXITING)
-
-    def test_1_Restart(self):
-        cherrypy.server.start()
-        bus.start()
-
-        # The db_connection should be running now
-        self.assertEqual(db_connection.running, True)
-        grace = db_connection.gracecount
-
-        self.getPage("/")
-        self.assertBody("Hello World")
-        self.assertEqual(len(db_connection.threads), 1)
-
-        # Test server restart from this thread
-        bus.graceful()
-        self.assertEqual(bus.state, bus.states.STARTED)
-        self.getPage("/")
-        self.assertBody("Hello World")
-        self.assertEqual(db_connection.running, True)
-        self.assertEqual(db_connection.gracecount, grace + 1)
-        self.assertEqual(len(db_connection.threads), 1)
-
-        # Test server restart from inside a page handler
-        self.getPage("/graceful")
-        self.assertEqual(bus.state, bus.states.STARTED)
-        self.assertBody("app was (gracefully) restarted succesfully")
-        self.assertEqual(db_connection.running, True)
-        self.assertEqual(db_connection.gracecount, grace + 2)
-        # Since we are requesting synchronously, is only one thread used?
-        # Note that the "/graceful" request has been flushed.
-        self.assertEqual(len(db_connection.threads), 0)
-
-        bus.stop()
-        self.assertEqual(bus.state, bus.states.STOPPED)
-        self.assertEqual(db_connection.running, False)
-        self.assertEqual(len(db_connection.threads), 0)
-
     def test_2_KeyboardInterrupt(self):
         # Raise a keyboard interrupt in the HTTP server's main thread.
         # We must start the server in this, the main thread
         self.assertEqual(db_connection.running, False)
         self.assertEqual(len(db_connection.threads), 0)
 
-    def test_4_Autoreload(self):
-        # Start the demo script in a new process
-        p = helper.CPProcess(ssl=(self.scheme.lower()=='https'))
-        p.write_conf(
-                extra='test_case_name: "test_4_Autoreload"')
-        p.start(imports='magicbus.test._test_states_demo')
-        try:
-            self.getPage("/start")
-            start = float(self.body)
-
-            # Give the autoreloader time to cache the file time.
-            time.sleep(2)
-
-            # Touch the file
-            os.utime(os.path.join(thisdir, "_test_states_demo.py"), None)
-
-            # Give the autoreloader time to re-exec the process
-            time.sleep(2)
-            host = cherrypy.server.socket_host
-            port = cherrypy.server.socket_port
-            cherrypy._cpserver.wait_for_occupied_port(host, port)
-
-            self.getPage("/start")
-            if not (float(self.body) > start):
-                raise AssertionError("start time %s not greater than %s" %
-                                     (float(self.body), start))
-        finally:
-            # Shut down the spawned process
-            self.getPage("/exit")
-        p.join()
-
     def test_5_Start_Error(self):
         # If a process errors during start, it should stop the bus
         # and exit with a non-zero exit code.
             self.fail("Process failed to return nonzero exit code.")
 
 
-class PluginTests(helper.CPWebCase):
-    def test_daemonize(self):
-        if os.name not in ['posix']:
-            return self.skip("skipped (not on posix) ")
-        self.HOST = '127.0.0.1'
-        self.PORT = 8081
-        # Spawn the process and wait, when this returns, the original process
-        # is finished.  If it daemonized properly, we should still be able
-        # to access pages.
-        p = helper.CPProcess(ssl=(self.scheme.lower()=='https'),
-                             wait=True, daemonize=True,
-                             socket_host='127.0.0.1',
-                             socket_port=8081)
-        p.write_conf(
-             extra='test_case_name: "test_daemonize"')
-        p.start(imports='magicbus.test._test_states_demo')
-        try:
-            # Just get the pid of the daemonization process.
-            self.getPage("/pid")
-            self.assertStatus(200)
-            page_pid = int(self.body)
-            self.assertEqual(page_pid, p.get_pid())
-        finally:
-            # Shut down the spawned process
-            self.getPage("/exit")
-        p.join()
-
-        # Wait until here to test the exit code because we want to ensure
-        # that we wait for the daemon to finish running before we fail.
-        if p.exit_code != 0:
-            self.fail("Daemonized parent process failed to exit cleanly.")
-
-
 class SignalHandlingTests(helper.CPWebCase):
     def test_SIGHUP_tty(self):
         # When not daemonized, SIGHUP should shut down the server.
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.