Robert Brewer avatar Robert Brewer committed 373436b

Renamed from processbus to magicbus.

Comments (0)

Files changed (31)

-include processbus/LICENSE.txt
+include magicbus/LICENSE.txt
 
 
     python setup.py install
 
-* To run the regression tests, just go to the processbus/test/ directory and type:
+* To run the regression tests, just go to the magicbus/test/ directory and type:
 
     nosetests -s ./
 

magicbus/LICENSE.txt

+Copyright (c) 2004-2011, CherryPy Team (team@cherrypy.org)
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without modification, 
+are permitted provided that the following conditions are met:
+
+    * Redistributions of source code must retain the above copyright notice, 
+      this list of conditions and the following disclaimer.
+    * Redistributions in binary form must reproduce the above copyright notice, 
+      this list of conditions and the following disclaimer in the documentation 
+      and/or other materials provided with the distribution.
+    * Neither the name of the CherryPy Team nor the names of its contributors 
+      may be used to endorse or promote products derived from this software 
+      without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND 
+ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 
+WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 
+DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE 
+FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 
+DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR 
+SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
+CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 
+OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

magicbus/__init__.py

+"""An implementation of the Web Site Process Bus.
+
+A Web Site Process Bus object is used to connect applications, servers,
+and frameworks with site-wide services such as daemonization, process
+reload, signal handling, drop privileges, PID file management, logging
+for all of these, and many more.
+
+The 'plugins' subpackage defines a few abstract and concrete services for
+use with the bus. Some use tool-specific channels; see the documentation
+for each class.
+"""
+
+try:
+    from magicbus import win32
+    bus = win32.Win32Bus()
+    bus.console_control_handler = win32.ConsoleCtrlHandler(bus)
+    del win32
+except ImportError:
+    from magicbus.wspbus import Bus
+    bus = Bus()
+

magicbus/_compat.py

+"""Compatibility code for using magicbus with various versions of Python.
+
+Process Bus 3.3 is compatible with Python versions 2.3+. This module provides a
+useful abstraction over the differences between Python versions, sometimes by
+preferring a newer idiom, sometimes an older one, and sometimes a custom one.
+
+In particular, Python 2 uses str and '' for byte strings, while Python 3
+uses str and '' for unicode strings. We will call each of these the 'native
+string' type for each version. Because of this major difference, this module
+provides new 'bytestr', 'unicodestr', and 'nativestr' attributes, as well as
+the function: 'ntob', which translates native strings (of type 'str') into
+byte strings regardless of Python version.
+"""
+import os
+import re
+import sys
+
+if sys.version_info >= (3, 0):
+    py3k = True
+    bytestr = bytes
+    unicodestr = str
+    nativestr = unicodestr
+    basestring = (bytes, str)
+    def ntob(n, encoding='ISO-8859-1'):
+        """Return the given native string as a byte string in the given encoding."""
+        # In Python 3, the native string type is unicode
+        return n.encode(encoding)
+else:
+    # Python 2
+    py3k = False
+    bytestr = str
+    unicodestr = unicode
+    nativestr = bytestr
+    basestring = basestring
+    def ntob(n, encoding='ISO-8859-1'):
+        """Return the given native string as a byte string in the given encoding."""
+        # In Python 2, the native string type is bytes. Assume it's already
+        # in the given encoding, which for ISO-8859-1 is almost always what
+        # was intended.
+        return n
+
+try:
+    set = set
+except NameError:
+    from sets import Set as set
+
+try:
+    from httplib import BadStatusLine
+except ImportError:
+    # Python 3
+    from http.client import BadStatusLine
+
+import threading
+if hasattr(threading.Thread, "daemon"):
+    # Python 2.6+
+    def get_daemon(t):
+        return t.daemon
+    def set_daemon(t, val):
+        t.daemon = val
+else:
+    def get_daemon(t):
+        return t.isDaemon()
+    def set_daemon(t, val):
+        t.setDaemon(val)
+
+try:
+    from _thread import get_ident as get_thread_ident
+except ImportError:
+    from thread import get_ident as get_thread_ident
+

magicbus/plugins/__init__.py

+"""Site services for use with a Web Site Process Bus."""
+
+
+class SimplePlugin(object):
+    """Plugin base class which auto-subscribes methods for known channels."""
+    
+    bus = None
+    """A :class:`Bus <magicbus.wspbus.Bus>`."""
+    
+    def __init__(self, bus):
+        self.bus = bus
+    
+    def subscribe(self):
+        """Register this object as a (multi-channel) listener on the bus."""
+        for channel in self.bus.listeners:
+            # Subscribe self.start, self.exit, etc. if present.
+            method = getattr(self, channel, None)
+            if method is not None:
+                self.bus.subscribe(channel, method)
+    
+    def unsubscribe(self):
+        """Unregister this object as a listener on the bus."""
+        for channel in self.bus.listeners:
+            # Unsubscribe self.start, self.exit, etc. if present.
+            method = getattr(self, channel, None)
+            if method is not None:
+                self.bus.unsubscribe(channel, method)
+

magicbus/plugins/opsys.py

+"""Operating system interaction for the Process Bus."""
+
+import os
+import sys
+import threading
+
+from magicbus.plugins import SimplePlugin
+from magicbus._compat import basestring, ntob
+
+
+try:
+    import pwd, grp
+except ImportError:
+    pwd, grp = None, None
+
+
+class DropPrivileges(SimplePlugin):
+    """Drop privileges. uid/gid arguments not available on Windows.
+    
+    Special thanks to Gavin Baker: http://antonym.org/node/100.
+    """
+    
+    def __init__(self, bus, umask=None, uid=None, gid=None):
+        SimplePlugin.__init__(self, bus)
+        self.finalized = False
+        self.uid = uid
+        self.gid = gid
+        self.umask = umask
+    
+    def _get_uid(self):
+        return self._uid
+    def _set_uid(self, val):
+        if val is not None:
+            if pwd is None:
+                self.bus.log("pwd module not available; ignoring uid.",
+                             level=30)
+                val = None
+            elif isinstance(val, basestring):
+                val = pwd.getpwnam(val)[2]
+        self._uid = val
+    uid = property(_get_uid, _set_uid,
+        doc="The uid under which to run. Availability: Unix.")
+    
+    def _get_gid(self):
+        return self._gid
+    def _set_gid(self, val):
+        if val is not None:
+            if grp is None:
+                self.bus.log("grp module not available; ignoring gid.",
+                             level=30)
+                val = None
+            elif isinstance(val, basestring):
+                val = grp.getgrnam(val)[2]
+        self._gid = val
+    gid = property(_get_gid, _set_gid,
+        doc="The gid under which to run. Availability: Unix.")
+    
+    def _get_umask(self):
+        return self._umask
+    def _set_umask(self, val):
+        if val is not None:
+            try:
+                os.umask
+            except AttributeError:
+                self.bus.log("umask function not available; ignoring umask.",
+                             level=30)
+                val = None
+        self._umask = val
+    umask = property(_get_umask, _set_umask,
+        doc="""The default permission mode for newly created files and directories.
+        
+        Usually expressed in octal format, for example, ``0644``.
+        Availability: Unix, Windows.
+        """)
+    
+    def start(self):
+        # uid/gid
+        def current_ids():
+            """Return the current (uid, gid) if available."""
+            name, group = None, None
+            if pwd:
+                name = pwd.getpwuid(os.getuid())[0]
+            if grp:
+                group = grp.getgrgid(os.getgid())[0]
+            return name, group
+        
+        if self.finalized:
+            if not (self.uid is None and self.gid is None):
+                self.bus.log('Already running as uid: %r gid: %r' %
+                             current_ids())
+        else:
+            if self.uid is None and self.gid is None:
+                if pwd or grp:
+                    self.bus.log('uid/gid not set', level=30)
+            else:
+                self.bus.log('Started as uid: %r gid: %r' % current_ids())
+                if self.gid is not None:
+                    os.setgid(self.gid)
+                    os.setgroups([])
+                if self.uid is not None:
+                    os.setuid(self.uid)
+                self.bus.log('Running as uid: %r gid: %r' % current_ids())
+        
+        # umask
+        if self.finalized:
+            if self.umask is not None:
+                self.bus.log('umask already set to: %03o' % self.umask)
+        else:
+            if self.umask is None:
+                self.bus.log('umask not set', level=30)
+            else:
+                old_umask = os.umask(self.umask)
+                self.bus.log('umask old: %03o, new: %03o' %
+                             (old_umask, self.umask))
+        
+        self.finalized = True
+    # This is slightly higher than the priority for server.start
+    # in order to facilitate the most common use: starting on a low
+    # port (which requires root) and then dropping to another user.
+    start.priority = 77
+
+
+class Daemonizer(SimplePlugin):
+    """Daemonize the running script.
+    
+    Use this with a Web Site Process Bus via::
+    
+        Daemonizer(bus).subscribe()
+    
+    When this component finishes, the process is completely decoupled from
+    the parent environment. Please note that when this component is used,
+    the return code from the parent process will still be 0 if a startup
+    error occurs in the forked children. Errors in the initial daemonizing
+    process still return proper exit codes. Therefore, if you use this
+    plugin to daemonize, don't use the return code as an accurate indicator
+    of whether the process fully started. In fact, that return code only
+    indicates if the process succesfully finished the first fork.
+    """
+    
+    def __init__(self, bus, stdin='/dev/null', stdout='/dev/null',
+                 stderr='/dev/null'):
+        SimplePlugin.__init__(self, bus)
+        self.stdin = stdin
+        self.stdout = stdout
+        self.stderr = stderr
+        self.finalized = False
+    
+    def start(self):
+        if self.finalized:
+            self.bus.log('Already deamonized.')
+        
+        # forking has issues with threads:
+        # http://www.opengroup.org/onlinepubs/000095399/functions/fork.html
+        # "The general problem with making fork() work in a multi-threaded
+        #  world is what to do with all of the threads..."
+        # So we check for active threads:
+        if threading.activeCount() != 1:
+            self.bus.log('There are %r active threads. '
+                         'Daemonizing now may cause strange failures.' %
+                         threading.enumerate(), level=30)
+        
+        # See http://www.erlenstar.demon.co.uk/unix/faq_2.html#SEC16
+        # (or http://www.faqs.org/faqs/unix-faq/programmer/faq/ section 1.7)
+        # and http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/66012
+        
+        # Finish up with the current stdout/stderr
+        sys.stdout.flush()
+        sys.stderr.flush()
+        
+        # Do first fork.
+        try:
+            pid = os.fork()
+            if pid == 0:
+                # This is the child process. Continue.
+                pass
+            else:
+                # This is the first parent. Exit, now that we've forked.
+                self.bus.log('Forking once.')
+                os._exit(0)
+        except OSError:
+            # Python raises OSError rather than returning negative numbers.
+            exc = sys.exc_info()[1]
+            sys.exit("%s: fork #1 failed: (%d) %s\n"
+                     % (sys.argv[0], exc.errno, exc.strerror))
+        
+        os.setsid()
+        
+        # Do second fork
+        try:
+            pid = os.fork()
+            if pid > 0:
+                self.bus.log('Forking twice.')
+                os._exit(0) # Exit second parent
+        except OSError:
+            exc = sys.exc_info()[1]
+            sys.exit("%s: fork #2 failed: (%d) %s\n"
+                     % (sys.argv[0], exc.errno, exc.strerror))
+        
+        os.chdir("/")
+        os.umask(0)
+        
+        si = open(self.stdin, "r")
+        so = open(self.stdout, "a+")
+        se = open(self.stderr, "a+")
+
+        # os.dup2(fd, fd2) will close fd2 if necessary,
+        # so we don't explicitly close stdin/out/err.
+        # See http://docs.python.org/lib/os-fd-ops.html
+        os.dup2(si.fileno(), sys.stdin.fileno())
+        os.dup2(so.fileno(), sys.stdout.fileno())
+        os.dup2(se.fileno(), sys.stderr.fileno())
+        
+        self.bus.log('Daemonized to PID: %s' % os.getpid())
+        self.finalized = True
+    start.priority = 65
+
+
+class PIDFile(SimplePlugin):
+    """Maintain a PID file via a WSPBus."""
+    
+    def __init__(self, bus, pidfile):
+        SimplePlugin.__init__(self, bus)
+        self.pidfile = pidfile
+        self.finalized = False
+    
+    def start(self):
+        pid = os.getpid()
+        if self.finalized:
+            self.bus.log('PID %r already written to %r.' % (pid, self.pidfile))
+        else:
+            open(self.pidfile, "wb").write(ntob("%s" % pid, 'utf8'))
+            self.bus.log('PID %r written to %r.' % (pid, self.pidfile))
+            self.finalized = True
+    start.priority = 70
+    
+    def exit(self):
+        try:
+            os.remove(self.pidfile)
+            self.bus.log('PID file removed: %r.' % self.pidfile)
+        except (KeyboardInterrupt, SystemExit):
+            raise
+        except:
+            pass
+

magicbus/plugins/servers.py

+"""
+Multiple servers/ports
+======================
+
+If you need to start more than one HTTP server (to serve on multiple ports, or
+protocols, etc.), you can manually register each one and then start them all
+with bus.start::
+
+    s1 = ServerAdapter(bus, MyWSGIServer(host='0.0.0.0', port=80))
+    s2 = ServerAdapter(bus, another.HTTPServer(host='127.0.0.1', SSL=True))
+    s1.subscribe()
+    s2.subscribe()
+    bus.start()
+
+.. index:: SCGI
+
+FastCGI/SCGI
+============
+
+There are also Flup\ **F**\ CGIServer and Flup\ **S**\ CGIServer classes in
+:mod:`magicbus.plugins.servers`. To start an fcgi server, for example,
+wrap an instance of it in a ServerAdapter::
+
+    addr = ('0.0.0.0', 4000)
+    f = servers.FlupFCGIServer(application=mywsgiapp, bindAddress=addr)
+    s = servers.ServerAdapter(bus, httpserver=f, bind_addr=addr)
+    s.subscribe()
+
+Note that you need to download and install `flup <http://trac.saddi.com/flup>`_
+yourself.
+
+.. _fastcgi:
+.. index:: FastCGI
+
+FastCGI
+-------
+
+A very simple setup lets your server run with FastCGI.
+You just need the flup library,
+plus a running Apache server (with ``mod_fastcgi``) or lighttpd server.
+
+Apache
+^^^^^^
+
+At the top level in httpd.conf::
+
+    FastCgiIpcDir /tmp
+    FastCgiServer /path/to/myapp.fcgi -idle-timeout 120 -processes 4
+
+And inside the relevant VirtualHost section::
+
+    # FastCGI config
+    AddHandler fastcgi-script .fcgi
+    ScriptAliasMatch (.*$) /path/to/myapp.fcgi$1
+
+Lighttpd
+^^^^^^^^
+
+For `Lighttpd <http://www.lighttpd.net/>`_ you can follow these
+instructions. Within ``lighttpd.conf`` make sure ``mod_fastcgi`` is
+active within ``server.modules``. Then, within your ``$HTTP["host"]``
+directive, configure your fastcgi script like the following::
+
+    $HTTP["url"] =~ "" {
+      fastcgi.server = (
+        "/" => (
+          "script.fcgi" => (
+            "bin-path" => "/path/to/your/script.fcgi",
+            "socket"          => "/tmp/script.sock",
+            "check-local"     => "disable",
+            "disable-time"    => 1,
+            "min-procs"       => 1,
+            "max-procs"       => 1, # adjust as needed
+          ),
+        ),
+      )
+    } # end of $HTTP["url"] =~ "^/"
+
+Please see `Lighttpd FastCGI Docs
+<http://redmine.lighttpd.net/wiki/lighttpd/Docs:ModFastCGI>`_ for an explanation 
+of the possible configuration options.
+"""
+
+import sys
+import time
+
+
+class ServerAdapter(object):
+    """Adapter for an HTTP server.
+    
+    If you need to start more than one HTTP server (to serve on multiple
+    ports, or protocols, etc.), you can manually register each one and then
+    start them all with bus.start:
+    
+        s1 = ServerAdapter(bus, MyWSGIServer(host='0.0.0.0', port=80))
+        s2 = ServerAdapter(bus, another.HTTPServer(host='127.0.0.1', SSL=True))
+        s1.subscribe()
+        s2.subscribe()
+        bus.start()
+    """
+    
+    def __init__(self, bus, httpserver=None, bind_addr=None):
+        self.bus = bus
+        self.httpserver = httpserver
+        self.bind_addr = bind_addr
+        self.interrupt = None
+        self.running = False
+    
+    def subscribe(self):
+        self.bus.subscribe('start', self.start)
+        self.bus.subscribe('stop', self.stop)
+    
+    def unsubscribe(self):
+        self.bus.unsubscribe('start', self.start)
+        self.bus.unsubscribe('stop', self.stop)
+    
+    def start(self):
+        """Start the HTTP server."""
+        if self.bind_addr is None:
+            on_what = "unknown interface (dynamic?)"
+        elif isinstance(self.bind_addr, tuple):
+            host, port = self.bind_addr
+            on_what = "%s:%s" % (host, port)
+        else:
+            on_what = "socket file: %s" % self.bind_addr
+        
+        if self.running:
+            self.bus.log("Already serving on %s" % on_what)
+            return
+        
+        self.interrupt = None
+        if not self.httpserver:
+            raise ValueError("No HTTP server has been created.")
+        
+        # Start the httpserver in a new thread.
+        if isinstance(self.bind_addr, tuple):
+            wait_for_free_port(*self.bind_addr)
+        
+        import threading
+        t = threading.Thread(target=self._start_http_thread)
+        t.setName("HTTPServer " + t.getName())
+        t.start()
+        
+        self.wait()
+        self.running = True
+        self.bus.log("Serving on %s" % on_what)
+    start.priority = 75
+    
+    def _start_http_thread(self):
+        """HTTP servers MUST be running in new threads, so that the
+        main thread persists to receive KeyboardInterrupt's. If an
+        exception is raised in the httpserver's thread then it's
+        trapped here, and the bus (and therefore our httpserver)
+        are shut down.
+        """
+        try:
+            self.httpserver.start()
+        except KeyboardInterrupt:
+            self.bus.log("<Ctrl-C> hit: shutting down HTTP server")
+            self.interrupt = sys.exc_info()[1]
+            self.bus.exit()
+        except SystemExit:
+            self.bus.log("SystemExit raised: shutting down HTTP server")
+            self.interrupt = sys.exc_info()[1]
+            self.bus.exit()
+            raise
+        except:
+            self.interrupt = sys.exc_info()[1]
+            self.bus.log("Error in HTTP server: shutting down",
+                         traceback=True, level=40)
+            self.bus.exit()
+            raise
+    
+    def wait(self):
+        """Wait until the HTTP server is ready to receive requests."""
+        while not getattr(self.httpserver, "ready", False):
+            if self.interrupt:
+                raise self.interrupt
+            time.sleep(.1)
+        
+        # Wait for port to be occupied
+        if isinstance(self.bind_addr, tuple):
+            host, port = self.bind_addr
+            wait_for_occupied_port(host, port)
+    
+    def stop(self):
+        """Stop the HTTP server."""
+        if self.running:
+            # stop() MUST block until the server is *truly* stopped.
+            self.httpserver.stop()
+            # Wait for the socket to be truly freed.
+            if isinstance(self.bind_addr, tuple):
+                wait_for_free_port(*self.bind_addr)
+            self.running = False
+            self.bus.log("HTTP Server %s shut down" % self.httpserver)
+        else:
+            self.bus.log("HTTP Server %s already shut down" % self.httpserver)
+    stop.priority = 25
+    
+    def restart(self):
+        """Restart the HTTP server."""
+        self.stop()
+        self.start()
+
+
+class FlupCGIServer(object):
+    """Adapter for a flup.server.cgi.WSGIServer."""
+   
+    def __init__(self, *args, **kwargs):
+        self.args = args
+        self.kwargs = kwargs
+        self.ready = False
+    
+    def start(self):
+        """Start the CGI server."""
+        # We have to instantiate the server class here because its __init__
+        # starts a threadpool. If we do it too early, daemonize won't work.
+        from flup.server.cgi import WSGIServer
+       
+        self.cgiserver = WSGIServer(*self.args, **self.kwargs)
+        self.ready = True
+        self.cgiserver.run()
+    
+    def stop(self):
+        """Stop the HTTP server."""
+        self.ready = False
+
+
+class FlupFCGIServer(object):
+    """Adapter for a flup.server.fcgi.WSGIServer."""
+    
+    def __init__(self, *args, **kwargs):
+        if kwargs.get('bindAddress', None) is None:
+            import socket
+            if not hasattr(socket, 'fromfd'):
+                raise ValueError(
+                    'Dynamic FCGI server not available on this platform. '
+                    'You must use a static or external one by providing a '
+                    'legal bindAddress.')
+        self.args = args
+        self.kwargs = kwargs
+        self.ready = False
+    
+    def start(self):
+        """Start the FCGI server."""
+        # We have to instantiate the server class here because its __init__
+        # starts a threadpool. If we do it too early, daemonize won't work.
+        from flup.server.fcgi import WSGIServer
+        self.fcgiserver = WSGIServer(*self.args, **self.kwargs)
+        # TODO: report this bug upstream to flup.
+        # If we don't set _oldSIGs on Windows, we get:
+        #   File "C:\Python24\Lib\site-packages\flup\server\threadedserver.py",
+        #   line 108, in run
+        #     self._restoreSignalHandlers()
+        #   File "C:\Python24\Lib\site-packages\flup\server\threadedserver.py",
+        #   line 156, in _restoreSignalHandlers
+        #     for signum,handler in self._oldSIGs:
+        #   AttributeError: 'WSGIServer' object has no attribute '_oldSIGs'
+        self.fcgiserver._installSignalHandlers = lambda: None
+        self.fcgiserver._oldSIGs = []
+        self.ready = True
+        self.fcgiserver.run()
+    
+    def stop(self):
+        """Stop the HTTP server."""
+        # Forcibly stop the fcgi server main event loop.
+        self.fcgiserver._keepGoing = False
+        # Force all worker threads to die off.
+        self.fcgiserver._threadPool.maxSpare = self.fcgiserver._threadPool._idleCount
+        self.ready = False
+
+
+class FlupSCGIServer(object):
+    """Adapter for a flup.server.scgi.WSGIServer."""
+    
+    def __init__(self, *args, **kwargs):
+        self.args = args
+        self.kwargs = kwargs
+        self.ready = False
+    
+    def start(self):
+        """Start the SCGI server."""
+        # We have to instantiate the server class here because its __init__
+        # starts a threadpool. If we do it too early, daemonize won't work.
+        from flup.server.scgi import WSGIServer
+        self.scgiserver = WSGIServer(*self.args, **self.kwargs)
+        # TODO: report this bug upstream to flup.
+        # If we don't set _oldSIGs on Windows, we get:
+        #   File "C:\Python24\Lib\site-packages\flup\server\threadedserver.py",
+        #   line 108, in run
+        #     self._restoreSignalHandlers()
+        #   File "C:\Python24\Lib\site-packages\flup\server\threadedserver.py",
+        #   line 156, in _restoreSignalHandlers
+        #     for signum,handler in self._oldSIGs:
+        #   AttributeError: 'WSGIServer' object has no attribute '_oldSIGs'
+        self.scgiserver._installSignalHandlers = lambda: None
+        self.scgiserver._oldSIGs = []
+        self.ready = True
+        self.scgiserver.run()
+    
+    def stop(self):
+        """Stop the HTTP server."""
+        self.ready = False
+        # Forcibly stop the scgi server main event loop.
+        self.scgiserver._keepGoing = False
+        # Force all worker threads to die off.
+        self.scgiserver._threadPool.maxSpare = 0
+
+
+def client_host(server_host):
+    """Return the host on which a client can connect to the given listener."""
+    if server_host == '0.0.0.0':
+        # 0.0.0.0 is INADDR_ANY, which should answer on localhost.
+        return '127.0.0.1'
+    if server_host in ('::', '::0', '::0.0.0.0'):
+        # :: is IN6ADDR_ANY, which should answer on localhost.
+        # ::0 and ::0.0.0.0 are non-canonical but common ways to write IN6ADDR_ANY.
+        return '::1'
+    return server_host
+
+def check_port(host, port, timeout=1.0):
+    """Raise an error if the given port is not free on the given host."""
+    if not host:
+        raise ValueError("Host values of '' or None are not allowed.")
+    host = client_host(host)
+    port = int(port)
+    
+    import socket
+    
+    # AF_INET or AF_INET6 socket
+    # Get the correct address family for our host (allows IPv6 addresses)
+    try:
+        info = socket.getaddrinfo(host, port, socket.AF_UNSPEC,
+                                  socket.SOCK_STREAM)
+    except socket.gaierror:
+        if ':' in host:
+            info = [(socket.AF_INET6, socket.SOCK_STREAM, 0, "", (host, port, 0, 0))]
+        else:
+            info = [(socket.AF_INET, socket.SOCK_STREAM, 0, "", (host, port))]
+    
+    for res in info:
+        af, socktype, proto, canonname, sa = res
+        s = None
+        try:
+            s = socket.socket(af, socktype, proto)
+            # See http://groups.google.com/group/cherrypy-users/
+            #        browse_frm/thread/bbfe5eb39c904fe0
+            s.settimeout(timeout)
+            s.connect((host, port))
+            s.close()
+            raise IOError("Port %s is in use on %s; perhaps the previous "
+                          "httpserver did not shut down properly." %
+                          (repr(port), repr(host)))
+        except socket.error:
+            if s:
+                s.close()
+
+
+# Feel free to increase these defaults on slow systems:
+free_port_timeout = 0.1
+occupied_port_timeout = 1.0
+
+def wait_for_free_port(host, port, timeout=None):
+    """Wait for the specified port to become free (drop requests)."""
+    if not host:
+        raise ValueError("Host values of '' or None are not allowed.")
+    if timeout is None:
+        timeout = free_port_timeout
+    
+    for trial in range(50):
+        try:
+            # we are expecting a free port, so reduce the timeout
+            check_port(host, port, timeout=timeout)
+        except IOError:
+            # Give the old server thread time to free the port.
+            time.sleep(timeout)
+        else:
+            return
+    
+    raise IOError("Port %r not free on %r" % (port, host))
+
+def wait_for_occupied_port(host, port, timeout=None):
+    """Wait for the specified port to become active (receive requests)."""
+    if not host:
+        raise ValueError("Host values of '' or None are not allowed.")
+    if timeout is None:
+        timeout = occupied_port_timeout
+    
+    for trial in range(50):
+        try:
+            check_port(host, port, timeout=timeout)
+        except IOError:
+            return
+        else:
+            time.sleep(timeout)
+    
+    raise IOError("Port %r not bound on %r" % (port, host))
+

magicbus/plugins/signalhandler.py

+"""Signal handling for the Process Bus."""
+
+import os
+import signal as _signal
+import sys
+
+from magicbus._compat import basestring
+
+
+class SignalHandler(object):
+    """Register bus channels (and listeners) for system signals.
+    
+    You can modify what signals your application listens for, and what it does
+    when it receives signals, by modifying :attr:`SignalHandler.handlers`,
+    a dict of {signal name: callback} pairs. The default set is::
+    
+        handlers = {'SIGTERM': self.bus.exit,
+                    'SIGHUP': self.handle_SIGHUP,
+                    'SIGUSR1': self.bus.graceful,
+                   }
+    
+    The :func:`SignalHandler.handle_SIGHUP`` method calls
+    :func:`bus.restart()<magicbus.wspbus.Bus.restart>`
+    if the process is daemonized, but
+    :func:`bus.exit()<magicbus.wspbus.Bus.exit>`
+    if the process is attached to a TTY. This is because Unix window
+    managers tend to send SIGHUP to terminal windows when the user closes them.
+    
+    Feel free to add signals which are not available on every platform. The
+    :class:`SignalHandler` will ignore errors raised from attempting to register
+    handlers for unknown signals.
+    """
+    
+    handlers = {}
+    """A map from signal names (e.g. 'SIGTERM') to handlers (e.g. bus.exit)."""
+    
+    signals = {}
+    """A map from signal numbers to names."""
+    
+    for k, v in vars(_signal).items():
+        if k.startswith('SIG') and not k.startswith('SIG_'):
+            signals[v] = k
+    del k, v
+    
+    def __init__(self, bus):
+        self.bus = bus
+        # Set default handlers
+        self.handlers = {'SIGTERM': self.bus.exit,
+                         'SIGHUP': self.handle_SIGHUP,
+                         'SIGUSR1': self.bus.graceful,
+                         }
+
+        if sys.platform[:4] == 'java':
+            del self.handlers['SIGUSR1']
+            self.handlers['SIGUSR2'] = self.bus.graceful
+            self.bus.log("SIGUSR1 cannot be set on the JVM platform. "
+                         "Using SIGUSR2 instead.")
+            self.handlers['SIGINT'] = self._jython_SIGINT_handler
+
+        self._previous_handlers = {}
+    
+    def _jython_SIGINT_handler(self, signum=None, frame=None):
+        # See http://bugs.jython.org/issue1313
+        self.bus.log('Keyboard Interrupt: shutting down bus')
+        self.bus.exit()
+        
+    def subscribe(self):
+        """Subscribe self.handlers to signals."""
+        for sig, func in self.handlers.items():
+            try:
+                self.set_handler(sig, func)
+            except ValueError:
+                pass
+    
+    def unsubscribe(self):
+        """Unsubscribe self.handlers from signals."""
+        for signum, handler in self._previous_handlers.items():
+            signame = self.signals[signum]
+            
+            if handler is None:
+                self.bus.log("Restoring %s handler to SIG_DFL." % signame)
+                handler = _signal.SIG_DFL
+            else:
+                self.bus.log("Restoring %s handler %r." % (signame, handler))
+            
+            try:
+                our_handler = _signal.signal(signum, handler)
+                if our_handler is None:
+                    self.bus.log("Restored old %s handler %r, but our "
+                                 "handler was not registered." %
+                                 (signame, handler), level=30)
+            except ValueError:
+                self.bus.log("Unable to restore %s handler %r." %
+                             (signame, handler), level=40, traceback=True)
+    
+    def set_handler(self, signal, listener=None):
+        """Subscribe a handler for the given signal (number or name).
+        
+        If the optional 'listener' argument is provided, it will be
+        subscribed as a listener for the given signal's channel.
+        
+        If the given signal name or number is not available on the current
+        platform, ValueError is raised.
+        """
+        if isinstance(signal, basestring):
+            signum = getattr(_signal, signal, None)
+            if signum is None:
+                raise ValueError("No such signal: %r" % signal)
+            signame = signal
+        else:
+            try:
+                signame = self.signals[signal]
+            except KeyError:
+                raise ValueError("No such signal: %r" % signal)
+            signum = signal
+        
+        prev = _signal.signal(signum, self._handle_signal)
+        self._previous_handlers[signum] = prev
+        
+        if listener is not None:
+            self.bus.log("Listening for %s." % signame)
+            self.bus.subscribe(signame, listener)
+    
+    def _handle_signal(self, signum=None, frame=None):
+        """Python signal handler (self.set_handler subscribes it for you)."""
+        signame = self.signals[signum]
+        self.bus.log("Caught signal %s." % signame)
+        self.bus.publish(signame)
+    
+    def handle_SIGHUP(self):
+        """Restart if daemonized, else exit."""
+        if os.isatty(sys.stdin.fileno()):
+            # not daemonized (may be foreground or background)
+            self.bus.log("SIGHUP caught but not daemonized. Exiting.")
+            self.bus.exit()
+        else:
+            self.bus.log("SIGHUP caught while daemonized. Restarting.")
+            self.bus.restart()
+

magicbus/plugins/tasks.py

+"""Repeating tasks and monitors for the Web Site Process Bus."""
+
+import os
+import re
+import sys
+import time
+import threading
+
+from magicbus.plugins import SimplePlugin
+from magicbus._compat import get_daemon, get_thread_ident, set
+
+# _module__file__base is used by Autoreload to make
+# absolute any filenames retrieved from sys.modules which are not
+# already absolute paths.  This is to work around Python's quirk
+# of importing the startup script and using a relative filename
+# for it in sys.modules.
+#
+# Autoreload examines sys.modules afresh every time it runs. If an application
+# changes the current directory by executing os.chdir(), then the next time
+# Autoreload runs, it will not be able to find any filenames which are
+# not absolute paths, because the current directory is not the same as when the
+# module was first imported.  Autoreload will then wrongly conclude the file has
+# "changed", and initiate the shutdown/re-exec sequence.
+# See ticket #917.
+# For this workaround to have a decent probability of success, this module
+# needs to be imported as early as possible, before the app has much chance
+# to change the working directory.
+_module__file__base = os.getcwd()
+
+
+class PerpetualTimer(threading._Timer):
+    """A responsive subclass of threading._Timer whose run() method repeats.
+    
+    Use this timer only when you really need a very interruptible timer;
+    this checks its 'finished' condition up to 20 times a second, which can
+    results in pretty high CPU usage 
+    """
+    
+    def run(self):
+        while True:
+            self.finished.wait(self.interval)
+            if self.finished.isSet():
+                return
+            try:
+                self.function(*self.args, **self.kwargs)
+            except Exception:
+                self.bus.log("Error in perpetual timer thread function %r." %
+                             self.function, level=40, traceback=True)
+                # Quit on first error to avoid massive logs.
+                raise
+
+
+class BackgroundTask(threading.Thread):
+    """A subclass of threading.Thread whose run() method repeats.
+    
+    Use this class for most repeating tasks. It uses time.sleep() to wait
+    for each interval, which isn't very responsive; that is, even if you call
+    self.cancel(), you'll have to wait until the sleep() call finishes before
+    the thread stops. To compensate, it defaults to being daemonic, which means
+    it won't delay stopping the whole process.
+    """
+    
+    def __init__(self, interval, function, args=[], kwargs={}, bus=None):
+        threading.Thread.__init__(self)
+        self.interval = interval
+        self.function = function
+        self.args = args
+        self.kwargs = kwargs
+        self.running = False
+        self.bus = bus
+    
+    def cancel(self):
+        self.running = False
+    
+    def run(self):
+        self.running = True
+        while self.running:
+            time.sleep(self.interval)
+            if not self.running:
+                return
+            try:
+                self.function(*self.args, **self.kwargs)
+            except Exception:
+                if self.bus:
+                    self.bus.log("Error in background task thread function %r."
+                                 % self.function, level=40, traceback=True)
+                # Quit on first error to avoid massive logs.
+                raise
+    
+    def _set_daemon(self):
+        return True
+
+
+class Monitor(SimplePlugin):
+    """WSPBus listener to periodically run a callback in its own thread."""
+    
+    callback = None
+    """The function to call at intervals."""
+    
+    frequency = 60
+    """The time in seconds between callback runs."""
+    
+    thread = None
+    """A :class:`BackgroundTask<magicbus.plugins.tasks.BackgroundTask>` thread."""
+    
+    def __init__(self, bus, callback, frequency=60, name=None):
+        SimplePlugin.__init__(self, bus)
+        self.callback = callback
+        self.frequency = frequency
+        self.thread = None
+        self.name = name
+    
+    def start(self):
+        """Start our callback in its own background thread."""
+        if self.frequency > 0:
+            threadname = self.name or self.__class__.__name__
+            if self.thread is None:
+                self.thread = BackgroundTask(self.frequency, self.callback,
+                                             bus = self.bus)
+                self.thread.setName(threadname)
+                self.thread.start()
+                self.bus.log("Started monitor thread %r." % threadname)
+            else:
+                self.bus.log("Monitor thread %r already started." % threadname)
+    start.priority = 70
+    
+    def stop(self):
+        """Stop our callback's background task thread."""
+        if self.thread is None:
+            self.bus.log("No thread running for %s." % self.name or self.__class__.__name__)
+        else:
+            if self.thread is not threading.currentThread():
+                name = self.thread.getName()
+                self.thread.cancel()
+                if not get_daemon(self.thread):
+                    self.bus.log("Joining %r" % name)
+                    self.thread.join()
+                self.bus.log("Stopped thread %r." % name)
+            self.thread = None
+    
+    def graceful(self):
+        """Stop the callback's background task thread and restart it."""
+        self.stop()
+        self.start()
+
+
+class Autoreloader(Monitor):
+    """Monitor which re-executes the process when files change.
+    
+    This :ref:`plugin<plugins>` restarts the process (via :func:`os.execv`)
+    if any of the files it monitors change (or is deleted). By default, the
+    autoreloader monitors all imported modules; you can add to the
+    set by adding to ``autoreload.files``::
+    
+        bus.autoreload.files.add(myFile)
+    
+    If there are imported files you do *not* wish to monitor, you can adjust the
+    ``match`` attribute, a regular expression. For example, to stop monitoring
+    the bus itself::
+    
+        bus.autoreload.match = r'^(?!magicbus).+'
+    
+    Like all :class:`Monitor<magicbus.plugins.tasks.Monitor>` plugins,
+    the autoreload plugin takes a ``frequency`` argument. The default is
+    1 second; that is, the autoreloader will examine files once each second.
+    """
+    
+    files = None
+    """The set of files to poll for modifications."""
+    
+    frequency = 1
+    """The interval in seconds at which to poll for modified files."""
+    
+    match = '.*'
+    """A regular expression by which to match filenames."""
+    
+    def __init__(self, bus, frequency=1, match='.*'):
+        self.mtimes = {}
+        self.files = set()
+        self.match = match
+        Monitor.__init__(self, bus, self.run, frequency)
+    
+    def start(self):
+        """Start our own background task thread for self.run."""
+        if self.thread is None:
+            self.mtimes = {}
+        Monitor.start(self)
+    start.priority = 70 
+    
+    def sysfiles(self):
+        """Return a Set of sys.modules filenames to monitor."""
+        files = set()
+        for k, m in sys.modules.items():
+            if re.match(self.match, k):
+                if hasattr(m, '__loader__') and hasattr(m.__loader__, 'archive'):
+                    f = m.__loader__.archive
+                else:
+                    f = getattr(m, '__file__', None)
+                    if f is not None and not os.path.isabs(f):
+                        # ensure absolute paths so a os.chdir() in the app doesn't break me
+                        f = os.path.normpath(os.path.join(_module__file__base, f))
+                files.add(f)
+        return files
+    
+    def run(self):
+        """Reload the process if registered files have been modified."""
+        for filename in self.sysfiles() | self.files:
+            if filename:
+                if filename.endswith('.pyc'):
+                    filename = filename[:-1]
+                
+                oldtime = self.mtimes.get(filename, 0)
+                if oldtime is None:
+                    # Module with no .py file. Skip it.
+                    continue
+                
+                try:
+                    mtime = os.stat(filename).st_mtime
+                except OSError:
+                    # Either a module with no .py file, or it's been deleted.
+                    mtime = None
+                
+                if filename not in self.mtimes:
+                    # If a module has no .py file, this will be None.
+                    self.mtimes[filename] = mtime
+                else:
+                    if mtime is None or mtime > oldtime:
+                        # The file has been deleted or modified.
+                        self.bus.log("Restarting because %s changed." % filename)
+                        self.thread.cancel()
+                        self.bus.log("Stopped thread %r." % self.thread.getName())
+                        self.bus.restart()
+                        return
+
+
+class ThreadManager(SimplePlugin):
+    """Manager for HTTP request threads.
+    
+    If you have control over thread creation and destruction, publish to
+    the 'acquire_thread' and 'release_thread' channels (for each thread).
+    This will register/unregister the current thread and publish to
+    'start_thread' and 'stop_thread' listeners in the bus as needed.
+    
+    If threads are created and destroyed by code you do not control
+    (e.g., Apache), then, at the beginning of every HTTP request,
+    publish to 'acquire_thread' only. You should not publish to
+    'release_thread' in this case, since you do not know whether
+    the thread will be re-used or not. The bus will call
+    'stop_thread' listeners for you when it stops.
+    """
+    
+    threads = None
+    """A map of {thread ident: index number} pairs."""
+    
+    def __init__(self, bus):
+        self.threads = {}
+        SimplePlugin.__init__(self, bus)
+        self.bus.listeners.setdefault('acquire_thread', set())
+        self.bus.listeners.setdefault('start_thread', set())
+        self.bus.listeners.setdefault('release_thread', set())
+        self.bus.listeners.setdefault('stop_thread', set())
+
+    def acquire_thread(self):
+        """Run 'start_thread' listeners for the current thread.
+        
+        If the current thread has already been seen, any 'start_thread'
+        listeners will not be run again.
+        """
+        thread_ident = get_thread_ident()
+        if thread_ident not in self.threads:
+            # We can't just use get_ident as the thread ID
+            # because some platforms reuse thread ID's.
+            i = len(self.threads) + 1
+            self.threads[thread_ident] = i
+            self.bus.publish('start_thread', i)
+    
+    def release_thread(self):
+        """Release the current thread and run 'stop_thread' listeners."""
+        thread_ident = get_thread_ident()
+        i = self.threads.pop(thread_ident, None)
+        if i is not None:
+            self.bus.publish('stop_thread', i)
+    
+    def stop(self):
+        """Release all threads and run all 'stop_thread' listeners."""
+        for thread_ident, i in self.threads.items():
+            self.bus.publish('stop_thread', i)
+        self.threads.clear()
+    graceful = stop
+

magicbus/test/__init__.py

+"""Regression test suite for magicbus.
+
+Run 'nosetests -s test/' to exercise all tests.
+"""
+

magicbus/test/helper.py

+"""A library of helper functions for the magicbus test suite."""
+
+import os
+thisdir = os.path.abspath(os.path.dirname(__file__))
+
+import sys
+import time
+
+from magicbus._compat import basestring, ntob
+
+# --------------------------- Spawning helpers --------------------------- #
+
+
+class Process(object):
+    
+    pid_file = os.path.join(thisdir, 'test.pid')
+    config_file = os.path.join(thisdir, 'test.conf')
+    
+    def __init__(self, wait=False, daemonize=False, ssl=False, socket_host=None, socket_port=None):
+        self.wait = wait
+        self.daemonize = daemonize
+        self.ssl = ssl
+        self.host = socket_host or cherrypy.server.socket_host
+        self.port = socket_port or cherrypy.server.socket_port
+    
+    def start(self, imports=None):
+        """Start the subprocess."""
+        servers.wait_for_free_port(self.host, self.port)
+        
+        args = [sys.executable, os.path.join(thisdir, '..', 'cherryd'),
+                '-c', self.config_file, '-p', self.pid_file]
+        
+        if not isinstance(imports, (list, tuple)):
+            imports = [imports]
+        for i in imports:
+            if i:
+                args.append('-i')
+                args.append(i)
+        
+        if self.daemonize:
+            args.append('-d')
+
+        env = os.environ.copy()
+        # Make sure we import the cherrypy package in which this module is defined.
+        grandparentdir = os.path.abspath(os.path.join(thisdir, '..', '..'))
+        if env.get('PYTHONPATH', ''):
+            env['PYTHONPATH'] = os.pathsep.join((grandparentdir, env['PYTHONPATH']))
+        else:
+            env['PYTHONPATH'] = grandparentdir
+        if self.wait:
+            self.exit_code = os.spawnve(os.P_WAIT, sys.executable, args, env)
+        else:
+            os.spawnve(os.P_NOWAIT, sys.executable, args, env)
+            servers.wait_for_occupied_port(self.host, self.port)
+        
+        # Give the engine a wee bit more time to finish STARTING
+        if self.daemonize:
+            time.sleep(2)
+        else:
+            time.sleep(1)
+    
+    def get_pid(self):
+        return int(open(self.pid_file, 'rb').read())
+    
+    def join(self):
+        """Wait for the process to exit."""
+        try:
+            try:
+                # Mac, UNIX
+                os.wait()
+            except AttributeError:
+                # Windows
+                try:
+                    pid = self.get_pid()
+                except IOError:
+                    # Assume the subprocess deleted the pidfile on shutdown.
+                    pass
+                else:
+                    os.waitpid(pid, 0)
+        except OSError:
+            x = sys.exc_info()[1]
+            if x.args != (10, 'No child processes'):
+                raise
+

magicbus/test/test_bus.py

+import threading
+import time
+import unittest
+
+from magicbus._compat import get_daemon, set
+from magicbus import wspbus
+
+
+msg = "Listener %d on channel %s: %s."
+
+
+class PublishSubscribeTests(unittest.TestCase):
+
+    def get_listener(self, channel, index):
+        def listener(arg=None):
+            self.responses.append(msg % (index, channel, arg))
+        return listener
+
+    def test_builtin_channels(self):
+        b = wspbus.Bus()
+
+        self.responses, expected = [], []
+
+        for channel in b.listeners:
+            for index, priority in enumerate([100, 50, 0, 51]):
+                b.subscribe(channel, self.get_listener(channel, index), priority)
+
+        for channel in b.listeners:
+            b.publish(channel)
+            expected.extend([msg % (i, channel, None) for i in (2, 1, 3, 0)])
+            b.publish(channel, arg=79347)
+            expected.extend([msg % (i, channel, 79347) for i in (2, 1, 3, 0)])
+
+        self.assertEqual(self.responses, expected)
+
+    def test_custom_channels(self):
+        b = wspbus.Bus()
+
+        self.responses, expected = [], []
+
+        custom_listeners = ('hugh', 'louis', 'dewey')
+        for channel in custom_listeners:
+            for index, priority in enumerate([None, 10, 60, 40]):
+                b.subscribe(channel, self.get_listener(channel, index), priority)
+
+        for channel in custom_listeners:
+            b.publish(channel, 'ah so')
+            expected.extend([msg % (i, channel, 'ah so') for i in (1, 3, 0, 2)])
+            b.publish(channel)
+            expected.extend([msg % (i, channel, None) for i in (1, 3, 0, 2)])
+
+        self.assertEqual(self.responses, expected)
+
+    def test_listener_errors(self):
+        b = wspbus.Bus()
+
+        self.responses, expected = [], []
+        channels = [c for c in b.listeners if c != 'log']
+
+        for channel in channels:
+            b.subscribe(channel, self.get_listener(channel, 1))
+            # This will break since the lambda takes no args.
+            b.subscribe(channel, lambda: None, priority=20)
+
+        for channel in channels:
+            self.assertRaises(wspbus.ChannelFailures, b.publish, channel, 123)
+            expected.append(msg % (1, channel, 123))
+
+        self.assertEqual(self.responses, expected)
+
+
+class BusMethodTests(unittest.TestCase):
+
+    def log(self, bus):
+        self._log_entries = []
+        def logit(msg, level):
+            self._log_entries.append(msg)
+        bus.subscribe('log', logit)
+
+    def assertLog(self, entries):
+        self.assertEqual(self._log_entries, entries)
+
+    def get_listener(self, channel, index):
+        def listener(arg=None):
+            self.responses.append(msg % (index, channel, arg))
+        return listener
+
+    def test_start(self):
+        b = wspbus.Bus()
+        self.log(b)
+
+        self.responses = []
+        num = 3
+        for index in range(num):
+            b.subscribe('start', self.get_listener('start', index))
+
+        b.start()
+        try:
+            # The start method MUST call all 'start' listeners.
+            self.assertEqual(set(self.responses),
+                             set([msg % (i, 'start', None) for i in range(num)]))
+            # The start method MUST move the state to STARTED
+            # (or EXITING, if errors occur)
+            self.assertEqual(b.state, b.states.STARTED)
+            # The start method MUST log its states.
+            self.assertLog(['Bus STARTING', 'Bus STARTED'])
+        finally:
+            # Exit so the atexit handler doesn't complain.
+            b.exit()
+
+    def test_stop(self):
+        b = wspbus.Bus()
+        self.log(b)
+
+        self.responses = []
+        num = 3
+        for index in range(num):
+            b.subscribe('stop', self.get_listener('stop', index))
+
+        b.stop()
+
+        # The stop method MUST call all 'stop' listeners.
+        self.assertEqual(set(self.responses),
+                         set([msg % (i, 'stop', None) for i in range(num)]))
+        # The stop method MUST move the state to STOPPED
+        self.assertEqual(b.state, b.states.STOPPED)
+        # The stop method MUST log its states.
+        self.assertLog(['Bus STOPPING', 'Bus STOPPED'])
+
+    def test_graceful(self):
+        b = wspbus.Bus()
+        self.log(b)
+
+        self.responses = []
+        num = 3
+        for index in range(num):
+            b.subscribe('graceful', self.get_listener('graceful', index))
+
+        b.graceful()
+
+        # The graceful method MUST call all 'graceful' listeners.
+        self.assertEqual(set(self.responses),
+                         set([msg % (i, 'graceful', None) for i in range(num)]))
+        # The graceful method MUST log its states.
+        self.assertLog(['Bus graceful'])
+
+    def test_exit(self):
+        b = wspbus.Bus()
+        self.log(b)
+
+        self.responses = []
+        num = 3
+        for index in range(num):
+            b.subscribe('stop', self.get_listener('stop', index))
+            b.subscribe('exit', self.get_listener('exit', index))
+
+        b.exit()
+
+        # The exit method MUST call all 'stop' listeners,
+        # and then all 'exit' listeners.
+        self.assertEqual(set(self.responses),
+                         set([msg % (i, 'stop', None) for i in range(num)] +
+                             [msg % (i, 'exit', None) for i in range(num)]))
+        # The exit method MUST move the state to EXITING
+        self.assertEqual(b.state, b.states.EXITING)
+        # The exit method MUST log its states.
+        self.assertLog(['Bus STOPPING', 'Bus STOPPED', 'Bus EXITING', 'Bus EXITED'])
+
+    def test_wait(self):
+        b = wspbus.Bus()
+
+        def f(method):
+            time.sleep(0.2)
+            getattr(b, method)()
+
+        for method, states in [('start', [b.states.STARTED]),
+                               ('stop', [b.states.STOPPED]),
+                               ('start', [b.states.STARTING, b.states.STARTED]),
+                               ('exit', [b.states.EXITING]),
+                               ]:
+            threading.Thread(target=f, args=(method,)).start()
+            b.wait(states)
+
+            # The wait method MUST wait for the given state(s).
+            if b.state not in states:
+                self.fail("State %r not in %r" % (b.state, states))
+
+    def test_block(self):
+        b = wspbus.Bus()
+        self.log(b)
+
+        def f():
+            time.sleep(0.2)
+            b.exit()
+        def g():
+            time.sleep(0.4)
+        threading.Thread(target=f).start()
+        threading.Thread(target=g).start()
+        threads = [t for t in threading.enumerate() if not get_daemon(t)]
+        self.assertEqual(len(threads), 3)
+
+        b.block()
+
+        # The block method MUST wait for the EXITING state.
+        self.assertEqual(b.state, b.states.EXITING)
+        # The block method MUST wait for ALL non-main, non-daemon threads to finish.
+        threads = [t for t in threading.enumerate() if not get_daemon(t)]
+        self.assertEqual(len(threads), 1)
+        # The last message will mention an indeterminable thread name; ignore it
+        self.assertEqual(self._log_entries[:-1],
+                         ['Bus STOPPING', 'Bus STOPPED',
+                          'Bus EXITING', 'Bus EXITED',
+                          'Waiting for child threads to terminate...'])
+
+    def test_start_with_callback(self):
+        b = wspbus.Bus()
+        self.log(b)
+        try:
+            events = []
+            def f(*args, **kwargs):
+                events.append(("f", args, kwargs))
+            def g():
+                events.append("g")
+            b.subscribe("start", g)
+            b.start_with_callback(f, (1, 3, 5), {"foo": "bar"})
+            # Give wait() time to run f()
+            time.sleep(0.2)
+
+            # The callback method MUST wait for the STARTED state.
+            self.assertEqual(b.state, b.states.STARTED)
+            # The callback method MUST run after all start methods.
+            self.assertEqual(events, ["g", ("f", (1, 3, 5), {"foo": "bar"})])
+        finally:
+            b.exit()
+
+    def test_log(self):
+        b = wspbus.Bus()
+        self.log(b)
+        self.assertLog([])
+
+        # Try a normal message.
+        expected = []
+        for msg in ["O mah darlin'"] * 3 + ["Clementiiiiiiiine"]:
+            b.log(msg)
+            expected.append(msg)
+            self.assertLog(expected)
+
+        # Try an error message
+        try:
+            foo
+        except NameError:
+            b.log("You are lost and gone forever", traceback=True)
+            lastmsg = self._log_entries[-1]
+            if "Traceback" not in lastmsg or "NameError" not in lastmsg:
+                self.fail("Last log message %r did not contain "
+                          "the expected traceback." % lastmsg)
+        else:
+            self.fail("NameError was not raised as expected.")
+
+
+if __name__ == "__main__":
+    unittest.main()

magicbus/test/test_states.py

+from magicbus._compat import BadStatusLine, ntob
+import os
+import sys
+import threading
+import time
+
+import cherrypy
+engine = cherrypy.engine
+thisdir = os.path.join(os.getcwd(), os.path.dirname(__file__))
+
+
+class Dependency:
+
+    def __init__(self, bus):
+        self.bus = bus
+        self.running = False
+        self.startcount = 0
+        self.gracecount = 0
+        self.threads = {}
+
+    def subscribe(self):
+        self.bus.subscribe('start', self.start)
+        self.bus.subscribe('stop', self.stop)
+        self.bus.subscribe('graceful', self.graceful)
+        self.bus.subscribe('start_thread', self.startthread)
+        self.bus.subscribe('stop_thread', self.stopthread)
+
+    def start(self):
+        self.running = True
+        self.startcount += 1
+
+    def stop(self):
+        self.running = False
+
+    def graceful(self):
+        self.gracecount += 1
+
+    def startthread(self, thread_id):
+        self.threads[thread_id] = None
+
+    def stopthread(self, thread_id):
+        del self.threads[thread_id]
+
+db_connection = Dependency(engine)
+
+def setup_server():
+    class Root:
+        def index(self):
+            return "Hello World"
+        index.exposed = True
+
+        def ctrlc(self):
+            raise KeyboardInterrupt()
+        ctrlc.exposed = True
+
+        def graceful(self):
+            engine.graceful()
+            return "app was (gracefully) restarted succesfully"
+        graceful.exposed = True
+
+        def block_explicit(self):
+            while True:
+                if cherrypy.response.timed_out:
+                    cherrypy.response.timed_out = False
+                    return "broken!"
+                time.sleep(0.01)
+        block_explicit.exposed = True
+
+        def block_implicit(self):
+            time.sleep(0.5)
+            return "response.timeout = %s" % cherrypy.response.timeout
+        block_implicit.exposed = True
+
+    cherrypy.tree.mount(Root())
+    cherrypy.config.update({
+        'environment': 'test_suite',
+        'engine.deadlock_poll_freq': 0.1,
+        })
+
+    db_connection.subscribe()
+
+
+
+# ------------ Enough helpers. Time for real live test cases. ------------ #
+
+
+from cherrypy.test import helper
+
+class ServerStateTests(helper.CPWebCase):
+    setup_server = staticmethod(setup_server)
+
+    def setUp(self):
+        cherrypy.server.socket_timeout = 0.1
+        self.do_gc_test = False
+
+    def test_0_NormalStateFlow(self):
+        engine.stop()
+        # Our db_connection should not be running
+        self.assertEqual(db_connection.running, False)
+        self.assertEqual(db_connection.startcount, 1)
+        self.assertEqual(len(db_connection.threads), 0)
+
+        # Test server start
+        engine.start()
+        self.assertEqual(engine.state, engine.states.STARTED)
+
+        host = cherrypy.server.socket_host
+        port = cherrypy.server.socket_port
+        self.assertRaises(IOError, cherrypy._cpserver.check_port, host, port)
+
+        # The db_connection should be running now
+        self.assertEqual(db_connection.running, True)
+        self.assertEqual(db_connection.startcount, 2)
+        self.assertEqual(len(db_connection.threads), 0)
+
+        self.getPage("/")
+        self.assertBody("Hello World")
+        self.assertEqual(len(db_connection.threads), 1)
+
+        # Test engine stop. This will also stop the HTTP server.
+        engine.stop()
+        self.assertEqual(engine.state, engine.states.STOPPED)
+
+        # Verify that our custom stop function was called
+        self.assertEqual(db_connection.running, False)
+        self.assertEqual(len(db_connection.threads), 0)
+
+        # Block the main thread now and verify that exit() works.
+        def exittest():
+            self.getPage("/")
+            self.assertBody("Hello World")
+            engine.exit()
+        cherrypy.server.start()
+        engine.start_with_callback(exittest)
+        engine.block()
+        self.assertEqual(engine.state, engine.states.EXITING)
+
+    def test_1_Restart(self):
+        cherrypy.server.start()
+        engine.start()
+
+        # The db_connection should be running now
+        self.assertEqual(db_connection.running, True)
+        grace = db_connection.gracecount
+
+        self.getPage("/")
+        self.assertBody("Hello World")
+        self.assertEqual(len(db_connection.threads), 1)
+
+        # Test server restart from this thread
+        engine.graceful()
+        self.assertEqual(engine.state, engine.states.STARTED)
+        self.getPage("/")
+        self.assertBody("Hello World")
+        self.assertEqual(db_connection.running, True)
+        self.assertEqual(db_connection.gracecount, grace + 1)
+        self.assertEqual(len(db_connection.threads), 1)
+
+        # Test server restart from inside a page handler
+        self.getPage("/graceful")
+        self.assertEqual(engine.state, engine.states.STARTED)
+        self.assertBody("app was (gracefully) restarted succesfully")
+        self.assertEqual(db_connection.running, True)
+        self.assertEqual(db_connection.gracecount, grace + 2)
+        # Since we are requesting synchronously, is only one thread used?
+        # Note that the "/graceful" request has been flushed.
+        self.assertEqual(len(db_connection.threads), 0)
+
+        engine.stop()
+        self.assertEqual(engine.state, engine.states.STOPPED)
+        self.assertEqual(db_connection.running, False)
+        self.assertEqual(len(db_connection.threads), 0)
+
+    def test_2_KeyboardInterrupt(self):
+        # Raise a keyboard interrupt in the HTTP server's main thread.
+        # We must start the server in this, the main thread
+        engine.start()
+        cherrypy.server.start()
+
+        self.persistent = True
+        try:
+            # Make the first request and assert there's no "Connection: close".
+            self.getPage("/")
+            self.assertStatus('200 OK')
+            self.assertBody("Hello World")
+            self.assertNoHeader("Connection")
+
+            cherrypy.server.httpserver.interrupt = KeyboardInterrupt
+            engine.block()
+
+            self.assertEqual(db_connection.running, False)
+            self.assertEqual(len(db_connection.threads), 0)
+            self.assertEqual(engine.state, engine.states.EXITING)
+        finally:
+            self.persistent = False
+
+        # Raise a keyboard interrupt in a page handler; on multithreaded
+        # servers, this should occur in one of the worker threads.
+        # This should raise a BadStatusLine error, since the worker
+        # thread will just die without writing a response.
+        engine.start()
+        cherrypy.server.start()
+
+        try:
+            self.getPage("/ctrlc")
+        except BadStatusLine:
+            pass
+        else:
+            print(self.body)
+            self.fail("AssertionError: BadStatusLine not raised")
+
+        engine.block()
+        self.assertEqual(db_connection.running, False)
+        self.assertEqual(len(db_connection.threads), 0)
+
+    def test_3_Deadlocks(self):
+        cherrypy.config.update({'response.timeout': 0.2})
+
+        engine.start()
+        cherrypy.server.start()
+        try:
+            self.assertNotEqual(engine.timeout_monitor.thread, None)
+
+            # Request a "normal" page.
+            self.assertEqual(engine.timeout_monitor.servings, [])
+            self.getPage("/")
+            self.assertBody("Hello World")
+            # request.close is called async.
+            while engine.timeout_monitor.servings:
+                sys.stdout.write(".")
+                time.sleep(0.01)
+
+            # Request a page that explicitly checks itself for deadlock.
+            # The deadlock_timeout should be 2 secs.
+            self.getPage("/block_explicit")
+            self.assertBody("broken!")
+
+            # Request a page that implicitly breaks deadlock.
+            # If we deadlock, we want to touch as little code as possible,
+            # so we won't even call handle_error, just bail ASAP.
+            self.getPage("/block_implicit")
+            self.assertStatus(500)
+            self.assertInBody("raise cherrypy.TimeoutError()")
+        finally:
+            engine.exit()
+
+    def test_4_Autoreload(self):
+        # Start the demo script in a new process
+        p = helper.CPProcess(ssl=(self.scheme.lower()=='https'))
+        p.write_conf(
+                extra='test_case_name: "test_4_Autoreload"')
+        p.start(imports='cherrypy.test._test_states_demo')
+        try:
+            self.getPage("/start")
+            start = float(self.body)
+
+            # Give the autoreloader time to cache the file time.
+            time.sleep(2)
+
+            # Touch the file
+            os.utime(os.path.join(thisdir, "_test_states_demo.py"), None)
+
+            # Give the autoreloader time to re-exec the process
+            time.sleep(2)
+            host = cherrypy.server.socket_host
+            port = cherrypy.server.socket_port
+            cherrypy._cpserver.wait_for_occupied_port(host, port)
+
+            self.getPage("/start")
+            if not (float(self.body) > start):
+                raise AssertionError("start time %s not greater than %s" %
+                                     (float(self.body), start))
+        finally:
+            # Shut down the spawned process
+            self.getPage("/exit")
+        p.join()
+
+    def test_5_Start_Error(self):
+        # If a process errors during start, it should stop the engine
+        # and exit with a non-zero exit code.
+        p = helper.CPProcess(ssl=(self.scheme.lower()=='https'),
+                             wait=True)
+        p.write_conf(
+                extra="""starterror: True
+test_case_name: "test_5_Start_Error"
+"""
+        )
+        p.start(imports='cherrypy.test._test_states_demo')
+        if p.exit_code == 0:
+            self.fail("Process failed to return nonzero exit code.")
+
+
+class PluginTests(helper.CPWebCase):
+    def test_daemonize(self):
+        if os.name not in ['posix']:
+            return self.skip("skipped (not on posix) ")
+        self.HOST = '127.0.0.1'
+        self.PORT = 8081
+        # Spawn the process and wait, when this returns, the original process
+        # is finished.  If it daemonized properly, we should still be able
+        # to access pages.
+        p = helper.CPProcess(ssl=(self.scheme.lower()=='https'),
+                             wait=True, daemonize=True,
+                             socket_host='127.0.0.1',
+                             socket_port=8081)
+        p.write_conf(
+             extra='test_case_name: "test_daemonize"')
+        p.start(imports='cherrypy.test._test_states_demo')
+        try:
+            # Just get the pid of the daemonization process.
+            self.getPage("/pid")
+            self.assertStatus(200)
+            page_pid = int(self.body)
+            self.assertEqual(page_pid, p.get_pid())
+        finally:
+            # Shut down the spawned process
+            self.getPage("/exit")
+        p.join()
+
+        # Wait until here to test the exit code because we want to ensure
+        # that we wait for the daemon to finish running before we fail.
+        if p.exit_code != 0:
+            self.fail("Daemonized parent process failed to exit cleanly.")
+
+
+class SignalHandlingTests(helper.CPWebCase):
+    def test_SIGHUP_tty(self):
+        # When not daemonized, SIGHUP should shut down the server.
+        try:
+            from signal import SIGHUP
+        except ImportError:
+            return self.skip("skipped (no SIGHUP) ")
+
+        # Spawn the process.
+        p = helper.CPProcess(ssl=(self.scheme.lower()=='https'))
+        p.write_conf(
+                extra='test_case_name: "test_SIGHUP_tty"')
+        p.start(imports='cherrypy.test._test_states_demo')
+        # Send a SIGHUP
+        os.kill(p.get_pid(), SIGHUP)
+        # This might hang if things aren't working right, but meh.
+        p.join()
+
+    def test_SIGHUP_daemonized(self):
+        # When daemonized, SIGHUP should restart the server.
+        try:
+            from signal import SIGHUP
+        except ImportError:
+            return self.skip("skipped (no SIGHUP) ")
+
+        if os.name not in ['posix']:
+            return self.skip("skipped (not on posix) ")
+
+        # Spawn the process and wait, when this returns, the original process
+        # is finished.  If it daemonized properly, we should still be able
+        # to access pages.
+        p = helper.CPProcess(ssl=(self.scheme.lower()=='https'),
+                             wait=True, daemonize=True)
+        p.write_conf(
+             extra='test_case_name: "test_SIGHUP_daemonized"')
+        p.start(imports='cherrypy.test._test_states_demo')
+
+        pid = p.get_pid()
+        try:
+            # Send a SIGHUP
+            os.kill(pid, SIGHUP)
+            # Give the server some time to restart
+            time.sleep(2)
+            self.getPage("/pid")
+            self.assertStatus(200)
+            new_pid = int(self.body)
+            self.assertNotEqual(new_pid, pid)
+        finally:
+            # Shut down the spawned process
+            self.getPage("/exit")
+        p.join()
+
+    def test_SIGTERM(self):
+        # SIGTERM should shut down the server whether daemonized or not.
+        try:
+            from signal import SIGTERM
+        except ImportError:
+            return self.skip("skipped (no SIGTERM) ")
+
+        try:
+            from os import kill
+        except ImportError:
+            return self.skip("skipped (no os.kill) ")
+
+        # Spawn a normal, undaemonized process.
+        p = helper.CPProcess(ssl=(self.scheme.lower()=='https'))
+        p.write_conf(
+                extra='test_case_name: "test_SIGTERM"')
+        p.start(imports='cherrypy.test._test_states_demo')
+        # Send a SIGTERM
+        os.kill(p.get_pid(), SIGTERM)
+        # This might hang if things aren't working right, but meh.
+        p.join()
+
+        if os.name in ['posix']:
+            # Spawn a daemonized process and test again.
+            p = helper.CPProcess(ssl=(self.scheme.lower()=='https'),
+                                 wait=True, daemonize=True)
+            p.write_conf(
+                 extra='test_case_name: "test_SIGTERM_2"')
+            p.start(imports='cherrypy.test._test_states_demo')
+            # Send a SIGTERM
+            os.kill(p.get_pid(), SIGTERM)
+            # This might hang if things aren't working right, but meh.
+            p.join()
+
+    def test_signal_handler_unsubscribe(self):
+        try:
+            from signal import SIGTERM
+        except ImportError:
+            return self.skip("skipped (no SIGTERM) ")
+
+        try:
+            from os import kill
+        except ImportError:
+            return self.skip("skipped (no os.kill) ")
+
+        # Spawn a normal, undaemonized process.
+        p = helper.CPProcess(ssl=(self.scheme.lower()=='https'))
+        p.write_conf(
+            extra="""unsubsig: True
+test_case_name: "test_signal_handler_unsubscribe"
+""")
+        p.start(imports='cherrypy.test._test_states_demo')
+        # Send a SIGTERM
+        os.kill(p.get_pid(), SIGTERM)
+        # This might hang if things aren't working right, but meh.
+        p.join()
+
+        # Assert the old handler ran.
+        target_line = open(p.error_log, 'rb').readlines()[-10]
+        if not ntob("I am an old SIGTERM handler.") in target_line:
+            self.fail("Old SIGTERM handler did not run.\n%r" % target_line)
+

magicbus/win32.py

+"""Windows service. Requires pywin32."""
+
+import os
+import win32api
+import win32con
+import win32event
+import win32service
+import win32serviceutil
+
+from magicbus import wspbus, plugins
+
+
+class ConsoleCtrlHandler(plugins.SimplePlugin):
+    """A WSPBus plugin for handling Win32 console events (like Ctrl-C)."""
+    
+    def __init__(self, bus):
+        self.is_set = False
+        plugins.SimplePlugin.__init__(self, bus)
+    
+    def start(self):
+        if self.is_set:
+            self.bus.log('Handler for console events already set.', level=40)
+            return
+        
+        result = win32api.SetConsoleCtrlHandler(self.handle, 1)
+        if result == 0:
+            self.bus.log('Could not SetConsoleCtrlHandler (error %r)' %
+                         win32api.GetLastError(), level=40)
+        else:
+            self.bus.log('Set handler for console events.', level=40)
+            self.is_set = True
+    
+    def stop(self):
+        if not self.is_set:
+            self.bus.log('Handler for console events already off.', level=40)
+            return
+        
+        try:
+            result = win32api.SetConsoleCtrlHandler(self.handle, 0)
+        except ValueError:
+            # "ValueError: The object has not been registered"
+            result = 1
+        
+        if result == 0:
+            self.bus.log('Could not remove SetConsoleCtrlHandler (error %r)' %
+                         win32api.GetLastError(), level=40)
+        else:
+            self.bus.log('Removed handler for console events.', level=40)
+            self.is_set = False
+    
+    def handle(self, event):
+        """Handle console control events (like Ctrl-C)."""
+        if event in (win32con.CTRL_C_EVENT, win32con.CTRL_LOGOFF_EVENT,
+                     win32con.CTRL_BREAK_EVENT, win32con.CTRL_SHUTDOWN_EVENT,
+                     win32con.CTRL_CLOSE_EVENT):
+            self.bus.log('Console event %s: shutting down bus' % event)
+            
+            # Remove self immediately so repeated Ctrl-C doesn't re-call it.
+            try:
+                self.stop()
+            except ValueError:
+                pass
+            
+            self.bus.exit()
+            # 'First to return True stops the calls'
+            return 1
+        return 0
+
+
+class Win32Bus(wspbus.Bus):
+    """A Web Site Process Bus implementation for Win32.
+    
+    Instead of time.sleep, this bus blocks using native win32event objects.
+    """
+    
+    def __init__(self):
+        self.events = {}
+        wspbus.Bus.__init__(self)
+    
+    def _get_state_event(self, state):
+        """Return a win32event for the given state (creating it if needed)."""
+        try:
+            return self.events[state]
+        except KeyError:
+            event = win32event.CreateEvent(None, 0, 0,
+                                           "WSPBus %s Event (pid=%r)" %
+                                           (state.name, os.getpid()))
+            self.events[state] = event
+            return event
+    
+    def _get_state(self):
+        return self._state
+    def _set_state(self, value):
+        self._state = value
+        event = self._get_state_event(value)
+        win32event.PulseEvent(event)
+    state = property(_get_state, _set_state)
+    
+    def wait(self, state, interval=0.1, channel=None):
+        """Wait for the given state(s), KeyboardInterrupt or SystemExit.
+        
+        Since this class uses native win32event objects, the interval
+        argument is ignored.
+        """
+        if isinstance(state, (tuple, list)):
+            # Don't wait for an event that beat us to the punch ;)
+            if self.state not in state:
+                events = tuple([self._get_state_event(s) for s in state])
+                win32event.WaitForMultipleObjects(events, 0, win32event.INFINITE)
+        else:
+            # Don't wait for an event that beat us to the punch ;)
+            if self.state != state:
+                event = self._get_state_event(state)
+                win32event.WaitForSingleObject(event, win32event.INFINITE)
+
+
+class _ControlCodes(dict):
+    """Control codes used to "signal" a service via ControlService.
+    
+    User-defined control codes are in the range 128-255. We generally use
+    the standard Python value for the Linux signal and add 128. Example:
+    
+        >>> signal.SIGUSR1
+        10
+        control_codes['graceful'] = 128 + 10
+    """
+    
+    def key_for(self, obj):