Robert Brewer avatar Robert Brewer committed af33286

Mostly moved cherrypy.process to separate processbus package dependency.

Comments (0)

Files changed (12)

cherrypy/__init__.py

 from cherrypy._cptree import Application
 from cherrypy import _cpwsgi as wsgi
 
-from cherrypy import process
-try:
-    from cherrypy.process import win32
-    engine = win32.Win32Bus()
-    engine.console_control_handler = win32.ConsoleCtrlHandler(engine)
-    del win32
-except ImportError:
-    engine = process.bus
-
+import processbus
+engine = processbus.bus
+from processbus.plugins.tasks import Autoreloader, Monitor, ThreadManager
+from processbus.plugins.signalhandler import SignalHandler
 
 # Timeout monitor. We add two channels to the engine
 # to which cherrypy.Application will publish.
 engine.listeners['before_request'] = set()
 engine.listeners['after_request'] = set()
 
-class _TimeoutMonitor(process.plugins.Monitor):
+class _TimeoutMonitor(Monitor):
     
     def __init__(self, bus):
         self.servings = []
-        process.plugins.Monitor.__init__(self, bus, self.run)
+        Monitor.__init__(self, bus, self.run)
     
     def before_request(self):
         self.servings.append((serving.request, serving.response))
 engine.timeout_monitor = _TimeoutMonitor(engine)
 engine.timeout_monitor.subscribe()
 
-engine.autoreload = process.plugins.Autoreloader(engine)
+engine.autoreload = Autoreloader(engine)
 engine.autoreload.subscribe()
 
-engine.thread_manager = process.plugins.ThreadManager(engine)
+engine.thread_manager = ThreadManager(engine)
 engine.thread_manager.subscribe()
 
-engine.signal_handler = process.plugins.SignalHandler(engine)
+engine.signal_handler = SignalHandler(engine)
 
 
 from cherrypy import _cpserver

cherrypy/_cpserver.py

 
 # We import * because we want to export check_port
 # et al as attributes of this module.
-from cherrypy.process.servers import *
+from processbus.plugins.servers import *
 
 
 class Server(ServerAdapter):
 import sys
 
 import cherrypy
-from cherrypy.process import plugins, servers
+from processbus.plugins import opsys, servers
 from cherrypy import Application
 
 def start(configfiles=None, daemonize=False, environment=None,
     if daemonize:
         # Don't print anything to stdout/sterr.
         cherrypy.config.update({'log.screen': False})
-        plugins.Daemonizer(engine).subscribe()
+        opsys.Daemonizer(engine).subscribe()
     
     if pidfile:
-        plugins.PIDFile(engine, pidfile).subscribe()
+        opsys.PIDFile(engine, pidfile).subscribe()
     
     if hasattr(engine, "signal_handler"):
         engine.signal_handler.subscribe()

cherrypy/lib/gctools.py

 
 import cherrypy
 from cherrypy import _cprequest, _cpwsgi
-from cherrypy.process.plugins import SimplePlugin
+from processbus.plugins import SimplePlugin
 
 
 class ReferrerTree(object):

cherrypy/lib/sessions.py

 import types
 from warnings import warn
 
+from processbus.plugins.tasks import Monitor
+
 import cherrypy
 from cherrypy._cpcompat import copyitems, pickle, random20, unicodestr
 from cherrypy.lib import httputil
         if self.clean_freq and not cls.clean_thread:
             # clean_up is in instancemethod and not a classmethod,
             # so that tool config can be accessed inside the method.
-            t = cherrypy.process.plugins.Monitor(
-                cherrypy.engine, self.clean_up, self.clean_freq * 60,
-                name='Session cleanup')
+            t = Monitor(cherrypy.engine, self.clean_up, self.clean_freq * 60,
+                        name='Session cleanup')
             t.subscribe()
             cls.clean_thread = t
             t.start()

cherrypy/process/__init__.py

-"""Site container for an HTTP server.
-
-A Web Site Process Bus object is used to connect applications, servers,
-and frameworks with site-wide services such as daemonization, process
-reload, signal handling, drop privileges, PID file management, logging
-for all of these, and many more.
-
-The 'plugins' module defines a few abstract and concrete services for
-use with the bus. Some use tool-specific channels; see the documentation
-for each class.
-"""
-
-from cherrypy.process.wspbus import bus
-from cherrypy.process import plugins, servers

cherrypy/process/plugins.py

-"""Site services for use with a Web Site Process Bus."""
-
-import os
-import re
-import signal as _signal
-import sys
-import time
-import threading
-
-from cherrypy._cpcompat import basestring, get_daemon, get_thread_ident, ntob, set
-
-# _module__file__base is used by Autoreload to make
-# absolute any filenames retrieved from sys.modules which are not
-# already absolute paths.  This is to work around Python's quirk
-# of importing the startup script and using a relative filename
-# for it in sys.modules.
-#
-# Autoreload examines sys.modules afresh every time it runs. If an application
-# changes the current directory by executing os.chdir(), then the next time
-# Autoreload runs, it will not be able to find any filenames which are
-# not absolute paths, because the current directory is not the same as when the
-# module was first imported.  Autoreload will then wrongly conclude the file has
-# "changed", and initiate the shutdown/re-exec sequence.
-# See ticket #917.
-# For this workaround to have a decent probability of success, this module
-# needs to be imported as early as possible, before the app has much chance
-# to change the working directory.
-_module__file__base = os.getcwd()
-
-
-class SimplePlugin(object):
-    """Plugin base class which auto-subscribes methods for known channels."""
-    
-    bus = None
-    """A :class:`Bus <cherrypy.process.wspbus.Bus>`, usually cherrypy.engine."""
-    
-    def __init__(self, bus):
-        self.bus = bus
-    
-    def subscribe(self):
-        """Register this object as a (multi-channel) listener on the bus."""
-        for channel in self.bus.listeners:
-            # Subscribe self.start, self.exit, etc. if present.
-            method = getattr(self, channel, None)
-            if method is not None:
-                self.bus.subscribe(channel, method)
-    
-    def unsubscribe(self):
-        """Unregister this object as a listener on the bus."""
-        for channel in self.bus.listeners:
-            # Unsubscribe self.start, self.exit, etc. if present.
-            method = getattr(self, channel, None)
-            if method is not None:
-                self.bus.unsubscribe(channel, method)
-
-
-
-class SignalHandler(object):
-    """Register bus channels (and listeners) for system signals.
-    
-    You can modify what signals your application listens for, and what it does
-    when it receives signals, by modifying :attr:`SignalHandler.handlers`,
-    a dict of {signal name: callback} pairs. The default set is::
-    
-        handlers = {'SIGTERM': self.bus.exit,
-                    'SIGHUP': self.handle_SIGHUP,
-                    'SIGUSR1': self.bus.graceful,
-                   }
-    
-    The :func:`SignalHandler.handle_SIGHUP`` method calls
-    :func:`bus.restart()<cherrypy.process.wspbus.Bus.restart>`
-    if the process is daemonized, but
-    :func:`bus.exit()<cherrypy.process.wspbus.Bus.exit>`
-    if the process is attached to a TTY. This is because Unix window
-    managers tend to send SIGHUP to terminal windows when the user closes them.
-    
-    Feel free to add signals which are not available on every platform. The
-    :class:`SignalHandler` will ignore errors raised from attempting to register
-    handlers for unknown signals.
-    """
-    
-    handlers = {}
-    """A map from signal names (e.g. 'SIGTERM') to handlers (e.g. bus.exit)."""
-    
-    signals = {}
-    """A map from signal numbers to names."""
-    
-    for k, v in vars(_signal).items():
-        if k.startswith('SIG') and not k.startswith('SIG_'):
-            signals[v] = k
-    del k, v
-    
-    def __init__(self, bus):
-        self.bus = bus
-        # Set default handlers
-        self.handlers = {'SIGTERM': self.bus.exit,
-                         'SIGHUP': self.handle_SIGHUP,
-                         'SIGUSR1': self.bus.graceful,
-                         }
-
-        if sys.platform[:4] == 'java':
-            del self.handlers['SIGUSR1']
-            self.handlers['SIGUSR2'] = self.bus.graceful
-            self.bus.log("SIGUSR1 cannot be set on the JVM platform. "
-                         "Using SIGUSR2 instead.")
-            self.handlers['SIGINT'] = self._jython_SIGINT_handler
-
-        self._previous_handlers = {}
-    
-    def _jython_SIGINT_handler(self, signum=None, frame=None):
-        # See http://bugs.jython.org/issue1313
-        self.bus.log('Keyboard Interrupt: shutting down bus')
-        self.bus.exit()
-        
-    def subscribe(self):
-        """Subscribe self.handlers to signals."""
-        for sig, func in self.handlers.items():
-            try:
-                self.set_handler(sig, func)
-            except ValueError:
-                pass
-    
-    def unsubscribe(self):
-        """Unsubscribe self.handlers from signals."""
-        for signum, handler in self._previous_handlers.items():
-            signame = self.signals[signum]
-            
-            if handler is None:
-                self.bus.log("Restoring %s handler to SIG_DFL." % signame)
-                handler = _signal.SIG_DFL
-            else:
-                self.bus.log("Restoring %s handler %r." % (signame, handler))
-            
-            try:
-                our_handler = _signal.signal(signum, handler)
-                if our_handler is None:
-                    self.bus.log("Restored old %s handler %r, but our "
-                                 "handler was not registered." %
-                                 (signame, handler), level=30)
-            except ValueError:
-                self.bus.log("Unable to restore %s handler %r." %
-                             (signame, handler), level=40, traceback=True)
-    
-    def set_handler(self, signal, listener=None):
-        """Subscribe a handler for the given signal (number or name).
-        
-        If the optional 'listener' argument is provided, it will be
-        subscribed as a listener for the given signal's channel.
-        
-        If the given signal name or number is not available on the current
-        platform, ValueError is raised.
-        """
-        if isinstance(signal, basestring):
-            signum = getattr(_signal, signal, None)
-            if signum is None:
-                raise ValueError("No such signal: %r" % signal)
-            signame = signal
-        else:
-            try:
-                signame = self.signals[signal]
-            except KeyError:
-                raise ValueError("No such signal: %r" % signal)
-            signum = signal
-        
-        prev = _signal.signal(signum, self._handle_signal)
-        self._previous_handlers[signum] = prev
-        
-        if listener is not None:
-            self.bus.log("Listening for %s." % signame)
-            self.bus.subscribe(signame, listener)
-    
-    def _handle_signal(self, signum=None, frame=None):
-        """Python signal handler (self.set_handler subscribes it for you)."""
-        signame = self.signals[signum]
-        self.bus.log("Caught signal %s." % signame)
-        self.bus.publish(signame)
-    
-    def handle_SIGHUP(self):
-        """Restart if daemonized, else exit."""
-        if os.isatty(sys.stdin.fileno()):
-            # not daemonized (may be foreground or background)
-            self.bus.log("SIGHUP caught but not daemonized. Exiting.")
-            self.bus.exit()
-        else:
-            self.bus.log("SIGHUP caught while daemonized. Restarting.")
-            self.bus.restart()
-
-
-try:
-    import pwd, grp
-except ImportError:
-    pwd, grp = None, None
-
-
-class DropPrivileges(SimplePlugin):
-    """Drop privileges. uid/gid arguments not available on Windows.
-    
-    Special thanks to Gavin Baker: http://antonym.org/node/100.
-    """
-    
-    def __init__(self, bus, umask=None, uid=None, gid=None):
-        SimplePlugin.__init__(self, bus)
-        self.finalized = False
-        self.uid = uid
-        self.gid = gid
-        self.umask = umask
-    
-    def _get_uid(self):
-        return self._uid
-    def _set_uid(self, val):
-        if val is not None:
-            if pwd is None:
-                self.bus.log("pwd module not available; ignoring uid.",
-                             level=30)
-                val = None
-            elif isinstance(val, basestring):
-                val = pwd.getpwnam(val)[2]
-        self._uid = val
-    uid = property(_get_uid, _set_uid,
-        doc="The uid under which to run. Availability: Unix.")
-    
-    def _get_gid(self):
-        return self._gid
-    def _set_gid(self, val):
-        if val is not None:
-            if grp is None:
-                self.bus.log("grp module not available; ignoring gid.",
-                             level=30)
-                val = None
-            elif isinstance(val, basestring):
-                val = grp.getgrnam(val)[2]
-        self._gid = val
-    gid = property(_get_gid, _set_gid,
-        doc="The gid under which to run. Availability: Unix.")
-    
-    def _get_umask(self):
-        return self._umask
-    def _set_umask(self, val):
-        if val is not None:
-            try:
-                os.umask
-            except AttributeError:
-                self.bus.log("umask function not available; ignoring umask.",
-                             level=30)
-                val = None
-        self._umask = val
-    umask = property(_get_umask, _set_umask,
-        doc="""The default permission mode for newly created files and directories.
-        
-        Usually expressed in octal format, for example, ``0644``.
-        Availability: Unix, Windows.
-        """)
-    
-    def start(self):
-        # uid/gid
-        def current_ids():
-            """Return the current (uid, gid) if available."""
-            name, group = None, None
-            if pwd:
-                name = pwd.getpwuid(os.getuid())[0]
-            if grp:
-                group = grp.getgrgid(os.getgid())[0]
-            return name, group
-        
-        if self.finalized:
-            if not (self.uid is None and self.gid is None):
-                self.bus.log('Already running as uid: %r gid: %r' %
-                             current_ids())
-        else:
-            if self.uid is None and self.gid is None:
-                if pwd or grp:
-                    self.bus.log('uid/gid not set', level=30)
-            else:
-                self.bus.log('Started as uid: %r gid: %r' % current_ids())
-                if self.gid is not None:
-                    os.setgid(self.gid)
-                    os.setgroups([])
-                if self.uid is not None:
-                    os.setuid(self.uid)
-                self.bus.log('Running as uid: %r gid: %r' % current_ids())
-        
-        # umask
-        if self.finalized:
-            if self.umask is not None:
-                self.bus.log('umask already set to: %03o' % self.umask)
-        else:
-            if self.umask is None:
-                self.bus.log('umask not set', level=30)
-            else:
-                old_umask = os.umask(self.umask)
-                self.bus.log('umask old: %03o, new: %03o' %
-                             (old_umask, self.umask))
-        
-        self.finalized = True
-    # This is slightly higher than the priority for server.start
-    # in order to facilitate the most common use: starting on a low
-    # port (which requires root) and then dropping to another user.
-    start.priority = 77
-
-
-class Daemonizer(SimplePlugin):
-    """Daemonize the running script.
-    
-    Use this with a Web Site Process Bus via::
-    
-        Daemonizer(bus).subscribe()
-    
-    When this component finishes, the process is completely decoupled from
-    the parent environment. Please note that when this component is used,
-    the return code from the parent process will still be 0 if a startup
-    error occurs in the forked children. Errors in the initial daemonizing
-    process still return proper exit codes. Therefore, if you use this
-    plugin to daemonize, don't use the return code as an accurate indicator
-    of whether the process fully started. In fact, that return code only
-    indicates if the process succesfully finished the first fork.
-    """
-    
-    def __init__(self, bus, stdin='/dev/null', stdout='/dev/null',
-                 stderr='/dev/null'):
-        SimplePlugin.__init__(self, bus)
-        self.stdin = stdin
-        self.stdout = stdout
-        self.stderr = stderr
-        self.finalized = False
-    
-    def start(self):
-        if self.finalized:
-            self.bus.log('Already deamonized.')
-        
-        # forking has issues with threads:
-        # http://www.opengroup.org/onlinepubs/000095399/functions/fork.html
-        # "The general problem with making fork() work in a multi-threaded
-        #  world is what to do with all of the threads..."
-        # So we check for active threads:
-        if threading.activeCount() != 1:
-            self.bus.log('There are %r active threads. '
-                         'Daemonizing now may cause strange failures.' %
-                         threading.enumerate(), level=30)
-        
-        # See http://www.erlenstar.demon.co.uk/unix/faq_2.html#SEC16
-        # (or http://www.faqs.org/faqs/unix-faq/programmer/faq/ section 1.7)
-        # and http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/66012
-        
-        # Finish up with the current stdout/stderr
-        sys.stdout.flush()
-        sys.stderr.flush()
-        
-        # Do first fork.
-        try:
-            pid = os.fork()
-            if pid == 0:
-                # This is the child process. Continue.
-                pass
-            else:
-                # This is the first parent. Exit, now that we've forked.
-                self.bus.log('Forking once.')
-                os._exit(0)
-        except OSError:
-            # Python raises OSError rather than returning negative numbers.
-            exc = sys.exc_info()[1]
-            sys.exit("%s: fork #1 failed: (%d) %s\n"
-                     % (sys.argv[0], exc.errno, exc.strerror))
-        
-        os.setsid()
-        
-        # Do second fork
-        try:
-            pid = os.fork()
-            if pid > 0:
-                self.bus.log('Forking twice.')
-                os._exit(0) # Exit second parent
-        except OSError:
-            exc = sys.exc_info()[1]
-            sys.exit("%s: fork #2 failed: (%d) %s\n"
-                     % (sys.argv[0], exc.errno, exc.strerror))
-        
-        os.chdir("/")
-        os.umask(0)
-        
-        si = open(self.stdin, "r")
-        so = open(self.stdout, "a+")
-        se = open(self.stderr, "a+")
-
-        # os.dup2(fd, fd2) will close fd2 if necessary,
-        # so we don't explicitly close stdin/out/err.
-        # See http://docs.python.org/lib/os-fd-ops.html
-        os.dup2(si.fileno(), sys.stdin.fileno())
-        os.dup2(so.fileno(), sys.stdout.fileno())
-        os.dup2(se.fileno(), sys.stderr.fileno())
-        
-        self.bus.log('Daemonized to PID: %s' % os.getpid())
-        self.finalized = True
-    start.priority = 65
-
-
-class PIDFile(SimplePlugin):
-    """Maintain a PID file via a WSPBus."""
-    
-    def __init__(self, bus, pidfile):
-        SimplePlugin.__init__(self, bus)
-        self.pidfile = pidfile
-        self.finalized = False
-    
-    def start(self):
-        pid = os.getpid()
-        if self.finalized:
-            self.bus.log('PID %r already written to %r.' % (pid, self.pidfile))
-        else:
-            open(self.pidfile, "wb").write(ntob("%s" % pid, 'utf8'))
-            self.bus.log('PID %r written to %r.' % (pid, self.pidfile))
-            self.finalized = True
-    start.priority = 70
-    
-    def exit(self):
-        try:
-            os.remove(self.pidfile)
-            self.bus.log('PID file removed: %r.' % self.pidfile)
-        except (KeyboardInterrupt, SystemExit):
-            raise
-        except:
-            pass
-
-
-class PerpetualTimer(threading._Timer):
-    """A responsive subclass of threading._Timer whose run() method repeats.
-    
-    Use this timer only when you really need a very interruptible timer;
-    this checks its 'finished' condition up to 20 times a second, which can
-    results in pretty high CPU usage 
-    """
-    
-    def run(self):
-        while True:
-            self.finished.wait(self.interval)
-            if self.finished.isSet():
-                return
-            try:
-                self.function(*self.args, **self.kwargs)
-            except Exception:
-                self.bus.log("Error in perpetual timer thread function %r." %
-                             self.function, level=40, traceback=True)
-                # Quit on first error to avoid massive logs.
-                raise
-
-
-class BackgroundTask(threading.Thread):
-    """A subclass of threading.Thread whose run() method repeats.
-    
-    Use this class for most repeating tasks. It uses time.sleep() to wait
-    for each interval, which isn't very responsive; that is, even if you call
-    self.cancel(), you'll have to wait until the sleep() call finishes before
-    the thread stops. To compensate, it defaults to being daemonic, which means
-    it won't delay stopping the whole process.
-    """
-    
-    def __init__(self, interval, function, args=[], kwargs={}, bus=None):
-        threading.Thread.__init__(self)
-        self.interval = interval
-        self.function = function
-        self.args = args
-        self.kwargs = kwargs
-        self.running = False
-        self.bus = bus
-    
-    def cancel(self):
-        self.running = False
-    
-    def run(self):
-        self.running = True
-        while self.running:
-            time.sleep(self.interval)
-            if not self.running:
-                return
-            try:
-                self.function(*self.args, **self.kwargs)
-            except Exception:
-                if self.bus:
-                    self.bus.log("Error in background task thread function %r."
-                                 % self.function, level=40, traceback=True)
-                # Quit on first error to avoid massive logs.
-                raise
-    
-    def _set_daemon(self):
-        return True
-
-
-class Monitor(SimplePlugin):
-    """WSPBus listener to periodically run a callback in its own thread."""
-    
-    callback = None
-    """The function to call at intervals."""
-    
-    frequency = 60
-    """The time in seconds between callback runs."""
-    
-    thread = None
-    """A :class:`BackgroundTask<cherrypy.process.plugins.BackgroundTask>` thread."""
-    
-    def __init__(self, bus, callback, frequency=60, name=None):
-        SimplePlugin.__init__(self, bus)
-        self.callback = callback
-        self.frequency = frequency
-        self.thread = None
-        self.name = name
-    
-    def start(self):
-        """Start our callback in its own background thread."""
-        if self.frequency > 0:
-            threadname = self.name or self.__class__.__name__
-            if self.thread is None:
-                self.thread = BackgroundTask(self.frequency, self.callback,
-                                             bus = self.bus)
-                self.thread.setName(threadname)
-                self.thread.start()
-                self.bus.log("Started monitor thread %r." % threadname)
-            else:
-                self.bus.log("Monitor thread %r already started." % threadname)
-    start.priority = 70
-    
-    def stop(self):
-        """Stop our callback's background task thread."""
-        if self.thread is None:
-            self.bus.log("No thread running for %s." % self.name or self.__class__.__name__)
-        else:
-            if self.thread is not threading.currentThread():
-                name = self.thread.getName()
-                self.thread.cancel()
-                if not get_daemon(self.thread):
-                    self.bus.log("Joining %r" % name)
-                    self.thread.join()
-                self.bus.log("Stopped thread %r." % name)
-            self.thread = None
-    
-    def graceful(self):
-        """Stop the callback's background task thread and restart it."""
-        self.stop()
-        self.start()
-
-
-class Autoreloader(Monitor):
-    """Monitor which re-executes the process when files change.
-    
-    This :ref:`plugin<plugins>` restarts the process (via :func:`os.execv`)
-    if any of the files it monitors change (or is deleted). By default, the
-    autoreloader monitors all imported modules; you can add to the
-    set by adding to ``autoreload.files``::
-    
-        cherrypy.engine.autoreload.files.add(myFile)
-    
-    If there are imported files you do *not* wish to monitor, you can adjust the
-    ``match`` attribute, a regular expression. For example, to stop monitoring
-    cherrypy itself::
-    
-        cherrypy.engine.autoreload.match = r'^(?!cherrypy).+'
-    
-    Like all :class:`Monitor<cherrypy.process.plugins.Monitor>` plugins,
-    the autoreload plugin takes a ``frequency`` argument. The default is
-    1 second; that is, the autoreloader will examine files once each second.
-    """
-    
-    files = None
-    """The set of files to poll for modifications."""
-    
-    frequency = 1
-    """The interval in seconds at which to poll for modified files."""
-    
-    match = '.*'
-    """A regular expression by which to match filenames."""
-    
-    def __init__(self, bus, frequency=1, match='.*'):
-        self.mtimes = {}
-        self.files = set()
-        self.match = match
-        Monitor.__init__(self, bus, self.run, frequency)
-    
-    def start(self):
-        """Start our own background task thread for self.run."""
-        if self.thread is None:
-            self.mtimes = {}
-        Monitor.start(self)
-    start.priority = 70 
-    
-    def sysfiles(self):
-        """Return a Set of sys.modules filenames to monitor."""
-        files = set()
-        for k, m in sys.modules.items():
-            if re.match(self.match, k):
-                if hasattr(m, '__loader__') and hasattr(m.__loader__, 'archive'):
-                    f = m.__loader__.archive
-                else:
-                    f = getattr(m, '__file__', None)
-                    if f is not None and not os.path.isabs(f):
-                        # ensure absolute paths so a os.chdir() in the app doesn't break me
-                        f = os.path.normpath(os.path.join(_module__file__base, f))
-                files.add(f)
-        return files
-    
-    def run(self):
-        """Reload the process if registered files have been modified."""
-        for filename in self.sysfiles() | self.files:
-            if filename:
-                if filename.endswith('.pyc'):
-                    filename = filename[:-1]
-                
-                oldtime = self.mtimes.get(filename, 0)
-                if oldtime is None:
-                    # Module with no .py file. Skip it.
-                    continue
-                
-                try:
-                    mtime = os.stat(filename).st_mtime
-                except OSError:
-                    # Either a module with no .py file, or it's been deleted.
-                    mtime = None
-                
-                if filename not in self.mtimes:
-                    # If a module has no .py file, this will be None.
-                    self.mtimes[filename] = mtime
-                else:
-                    if mtime is None or mtime > oldtime:
-                        # The file has been deleted or modified.
-                        self.bus.log("Restarting because %s changed." % filename)
-                        self.thread.cancel()
-                        self.bus.log("Stopped thread %r." % self.thread.getName())
-                        self.bus.restart()
-                        return
-
-
-class ThreadManager(SimplePlugin):
-    """Manager for HTTP request threads.
-    
-    If you have control over thread creation and destruction, publish to
-    the 'acquire_thread' and 'release_thread' channels (for each thread).
-    This will register/unregister the current thread and publish to
-    'start_thread' and 'stop_thread' listeners in the bus as needed.
-    
-    If threads are created and destroyed by code you do not control
-    (e.g., Apache), then, at the beginning of every HTTP request,
-    publish to 'acquire_thread' only. You should not publish to
-    'release_thread' in this case, since you do not know whether
-    the thread will be re-used or not. The bus will call
-    'stop_thread' listeners for you when it stops.
-    """
-    
-    threads = None
-    """A map of {thread ident: index number} pairs."""
-    
-    def __init__(self, bus):
-        self.threads = {}
-        SimplePlugin.__init__(self, bus)
-        self.bus.listeners.setdefault('acquire_thread', set())
-        self.bus.listeners.setdefault('start_thread', set())
-        self.bus.listeners.setdefault('release_thread', set())
-        self.bus.listeners.setdefault('stop_thread', set())
-
-    def acquire_thread(self):
-        """Run 'start_thread' listeners for the current thread.
-        
-        If the current thread has already been seen, any 'start_thread'
-        listeners will not be run again.
-        """
-        thread_ident = get_thread_ident()
-        if thread_ident not in self.threads:
-            # We can't just use get_ident as the thread ID
-            # because some platforms reuse thread ID's.
-            i = len(self.threads) + 1
-            self.threads[thread_ident] = i
-            self.bus.publish('start_thread', i)
-    
-    def release_thread(self):
-        """Release the current thread and run 'stop_thread' listeners."""
-        thread_ident = get_thread_ident()
-        i = self.threads.pop(thread_ident, None)
-        if i is not None:
-            self.bus.publish('stop_thread', i)
-    
-    def stop(self):
-        """Release all threads and run all 'stop_thread' listeners."""
-        for thread_ident, i in self.threads.items():
-            self.bus.publish('stop_thread', i)
-        self.threads.clear()
-    graceful = stop
-

cherrypy/process/servers.py

-"""
-Starting in CherryPy 3.1, cherrypy.server is implemented as an
-:ref:`Engine Plugin<plugins>`. It's an instance of
-:class:`cherrypy._cpserver.Server`, which is a subclass of
-:class:`cherrypy.process.servers.ServerAdapter`. The ``ServerAdapter`` class
-is designed to control other servers, as well.
-
-Multiple servers/ports
-======================
-
-If you need to start more than one HTTP server (to serve on multiple ports, or
-protocols, etc.), you can manually register each one and then start them all
-with engine.start::
-
-    s1 = ServerAdapter(cherrypy.engine, MyWSGIServer(host='0.0.0.0', port=80))
-    s2 = ServerAdapter(cherrypy.engine, another.HTTPServer(host='127.0.0.1', SSL=True))
-    s1.subscribe()
-    s2.subscribe()
-    cherrypy.engine.start()
-
-.. index:: SCGI
-
-FastCGI/SCGI
-============
-
-There are also Flup\ **F**\ CGIServer and Flup\ **S**\ CGIServer classes in
-:mod:`cherrypy.process.servers`. To start an fcgi server, for example,
-wrap an instance of it in a ServerAdapter::
-
-    addr = ('0.0.0.0', 4000)
-    f = servers.FlupFCGIServer(application=cherrypy.tree, bindAddress=addr)
-    s = servers.ServerAdapter(cherrypy.engine, httpserver=f, bind_addr=addr)
-    s.subscribe()
-
-The :doc:`cherryd</deployguide/cherryd>` startup script will do the above for
-you via its `-f` flag.
-Note that you need to download and install `flup <http://trac.saddi.com/flup>`_
-yourself, whether you use ``cherryd`` or not.
-
-.. _fastcgi:
-.. index:: FastCGI
-
-FastCGI
--------
-
-A very simple setup lets your cherry run with FastCGI.
-You just need the flup library,
-plus a running Apache server (with ``mod_fastcgi``) or lighttpd server.
-
-CherryPy code
-^^^^^^^^^^^^^
-
-hello.py::
-
-    #!/usr/bin/python
-    import cherrypy
-    
-    class HelloWorld:
-        \"""Sample request handler class.\"""
-        def index(self):
-            return "Hello world!"
-        index.exposed = True
-    
-    cherrypy.tree.mount(HelloWorld())
-    # CherryPy autoreload must be disabled for the flup server to work
-    cherrypy.config.update({'engine.autoreload_on':False})
-
-Then run :doc:`/deployguide/cherryd` with the '-f' arg::
-
-    cherryd -c <myconfig> -d -f -i hello.py
-
-Apache
-^^^^^^
-
-At the top level in httpd.conf::
-
-    FastCgiIpcDir /tmp
-    FastCgiServer /path/to/cherry.fcgi -idle-timeout 120 -processes 4
-
-And inside the relevant VirtualHost section::
-
-    # FastCGI config
-    AddHandler fastcgi-script .fcgi
-    ScriptAliasMatch (.*$) /path/to/cherry.fcgi$1
-
-Lighttpd
-^^^^^^^^
-
-For `Lighttpd <http://www.lighttpd.net/>`_ you can follow these
-instructions. Within ``lighttpd.conf`` make sure ``mod_fastcgi`` is
-active within ``server.modules``. Then, within your ``$HTTP["host"]``
-directive, configure your fastcgi script like the following::
-
-    $HTTP["url"] =~ "" {
-      fastcgi.server = (
-        "/" => (
-          "script.fcgi" => (
-            "bin-path" => "/path/to/your/script.fcgi",
-            "socket"          => "/tmp/script.sock",
-            "check-local"     => "disable",
-            "disable-time"    => 1,
-            "min-procs"       => 1,
-            "max-procs"       => 1, # adjust as needed
-          ),
-        ),
-      )
-    } # end of $HTTP["url"] =~ "^/"
-
-Please see `Lighttpd FastCGI Docs
-<http://redmine.lighttpd.net/wiki/lighttpd/Docs:ModFastCGI>`_ for an explanation 
-of the possible configuration options.
-"""
-
-import sys
-import time
-
-
-class ServerAdapter(object):
-    """Adapter for an HTTP server.
-    
-    If you need to start more than one HTTP server (to serve on multiple
-    ports, or protocols, etc.), you can manually register each one and then
-    start them all with bus.start:
-    
-        s1 = ServerAdapter(bus, MyWSGIServer(host='0.0.0.0', port=80))
-        s2 = ServerAdapter(bus, another.HTTPServer(host='127.0.0.1', SSL=True))
-        s1.subscribe()
-        s2.subscribe()
-        bus.start()
-    """
-    
-    def __init__(self, bus, httpserver=None, bind_addr=None):
-        self.bus = bus
-        self.httpserver = httpserver
-        self.bind_addr = bind_addr
-        self.interrupt = None
-        self.running = False
-    
-    def subscribe(self):
-        self.bus.subscribe('start', self.start)
-        self.bus.subscribe('stop', self.stop)
-    
-    def unsubscribe(self):
-        self.bus.unsubscribe('start', self.start)
-        self.bus.unsubscribe('stop', self.stop)
-    
-    def start(self):
-        """Start the HTTP server."""
-        if self.bind_addr is None:
-            on_what = "unknown interface (dynamic?)"
-        elif isinstance(self.bind_addr, tuple):
-            host, port = self.bind_addr
-            on_what = "%s:%s" % (host, port)
-        else:
-            on_what = "socket file: %s" % self.bind_addr
-        
-        if self.running:
-            self.bus.log("Already serving on %s" % on_what)
-            return
-        
-        self.interrupt = None
-        if not self.httpserver:
-            raise ValueError("No HTTP server has been created.")
-        
-        # Start the httpserver in a new thread.
-        if isinstance(self.bind_addr, tuple):
-            wait_for_free_port(*self.bind_addr)
-        
-        import threading
-        t = threading.Thread(target=self._start_http_thread)
-        t.setName("HTTPServer " + t.getName())
-        t.start()
-        
-        self.wait()
-        self.running = True
-        self.bus.log("Serving on %s" % on_what)
-    start.priority = 75
-    
-    def _start_http_thread(self):
-        """HTTP servers MUST be running in new threads, so that the
-        main thread persists to receive KeyboardInterrupt's. If an
-        exception is raised in the httpserver's thread then it's
-        trapped here, and the bus (and therefore our httpserver)
-        are shut down.
-        """
-        try:
-            self.httpserver.start()
-        except KeyboardInterrupt:
-            self.bus.log("<Ctrl-C> hit: shutting down HTTP server")
-            self.interrupt = sys.exc_info()[1]
-            self.bus.exit()
-        except SystemExit:
-            self.bus.log("SystemExit raised: shutting down HTTP server")
-            self.interrupt = sys.exc_info()[1]
-            self.bus.exit()
-            raise
-        except:
-            self.interrupt = sys.exc_info()[1]
-            self.bus.log("Error in HTTP server: shutting down",
-                         traceback=True, level=40)
-            self.bus.exit()
-            raise
-    
-    def wait(self):
-        """Wait until the HTTP server is ready to receive requests."""
-        while not getattr(self.httpserver, "ready", False):
-            if self.interrupt:
-                raise self.interrupt
-            time.sleep(.1)
-        
-        # Wait for port to be occupied
-        if isinstance(self.bind_addr, tuple):
-            host, port = self.bind_addr
-            wait_for_occupied_port(host, port)
-    
-    def stop(self):
-        """Stop the HTTP server."""
-        if self.running:
-            # stop() MUST block until the server is *truly* stopped.
-            self.httpserver.stop()
-            # Wait for the socket to be truly freed.
-            if isinstance(self.bind_addr, tuple):
-                wait_for_free_port(*self.bind_addr)
-            self.running = False
-            self.bus.log("HTTP Server %s shut down" % self.httpserver)
-        else:
-            self.bus.log("HTTP Server %s already shut down" % self.httpserver)
-    stop.priority = 25
-    
-    def restart(self):
-        """Restart the HTTP server."""
-        self.stop()
-        self.start()
-
-
-class FlupCGIServer(object):
-    """Adapter for a flup.server.cgi.WSGIServer."""
-   
-    def __init__(self, *args, **kwargs):
-        self.args = args
-        self.kwargs = kwargs
-        self.ready = False
-    
-    def start(self):
-        """Start the CGI server."""
-        # We have to instantiate the server class here because its __init__
-        # starts a threadpool. If we do it too early, daemonize won't work.
-        from flup.server.cgi import WSGIServer
-       
-        self.cgiserver = WSGIServer(*self.args, **self.kwargs)
-        self.ready = True
-        self.cgiserver.run()
-    
-    def stop(self):
-        """Stop the HTTP server."""
-        self.ready = False
-
-
-class FlupFCGIServer(object):
-    """Adapter for a flup.server.fcgi.WSGIServer."""
-    
-    def __init__(self, *args, **kwargs):
-        if kwargs.get('bindAddress', None) is None:
-            import socket
-            if not hasattr(socket, 'fromfd'):
-                raise ValueError(
-                    'Dynamic FCGI server not available on this platform. '
-                    'You must use a static or external one by providing a '
-                    'legal bindAddress.')
-        self.args = args
-        self.kwargs = kwargs
-        self.ready = False
-    
-    def start(self):
-        """Start the FCGI server."""
-        # We have to instantiate the server class here because its __init__
-        # starts a threadpool. If we do it too early, daemonize won't work.
-        from flup.server.fcgi import WSGIServer
-        self.fcgiserver = WSGIServer(*self.args, **self.kwargs)
-        # TODO: report this bug upstream to flup.
-        # If we don't set _oldSIGs on Windows, we get:
-        #   File "C:\Python24\Lib\site-packages\flup\server\threadedserver.py",
-        #   line 108, in run
-        #     self._restoreSignalHandlers()
-        #   File "C:\Python24\Lib\site-packages\flup\server\threadedserver.py",
-        #   line 156, in _restoreSignalHandlers
-        #     for signum,handler in self._oldSIGs:
-        #   AttributeError: 'WSGIServer' object has no attribute '_oldSIGs'
-        self.fcgiserver._installSignalHandlers = lambda: None
-        self.fcgiserver._oldSIGs = []
-        self.ready = True
-        self.fcgiserver.run()
-    
-    def stop(self):
-        """Stop the HTTP server."""
-        # Forcibly stop the fcgi server main event loop.
-        self.fcgiserver._keepGoing = False
-        # Force all worker threads to die off.
-        self.fcgiserver._threadPool.maxSpare = self.fcgiserver._threadPool._idleCount
-        self.ready = False
-
-
-class FlupSCGIServer(object):
-    """Adapter for a flup.server.scgi.WSGIServer."""
-    
-    def __init__(self, *args, **kwargs):
-        self.args = args
-        self.kwargs = kwargs
-        self.ready = False
-    
-    def start(self):
-        """Start the SCGI server."""
-        # We have to instantiate the server class here because its __init__
-        # starts a threadpool. If we do it too early, daemonize won't work.
-        from flup.server.scgi import WSGIServer
-        self.scgiserver = WSGIServer(*self.args, **self.kwargs)
-        # TODO: report this bug upstream to flup.
-        # If we don't set _oldSIGs on Windows, we get:
-        #   File "C:\Python24\Lib\site-packages\flup\server\threadedserver.py",
-        #   line 108, in run
-        #     self._restoreSignalHandlers()
-        #   File "C:\Python24\Lib\site-packages\flup\server\threadedserver.py",
-        #   line 156, in _restoreSignalHandlers
-        #     for signum,handler in self._oldSIGs:
-        #   AttributeError: 'WSGIServer' object has no attribute '_oldSIGs'
-        self.scgiserver._installSignalHandlers = lambda: None
-        self.scgiserver._oldSIGs = []
-        self.ready = True
-        self.scgiserver.run()
-    
-    def stop(self):
-        """Stop the HTTP server."""
-        self.ready = False
-        # Forcibly stop the scgi server main event loop.
-        self.scgiserver._keepGoing = False
-        # Force all worker threads to die off.
-        self.scgiserver._threadPool.maxSpare = 0
-
-
-def client_host(server_host):
-    """Return the host on which a client can connect to the given listener."""
-    if server_host == '0.0.0.0':
-        # 0.0.0.0 is INADDR_ANY, which should answer on localhost.
-        return '127.0.0.1'
-    if server_host in ('::', '::0', '::0.0.0.0'):
-        # :: is IN6ADDR_ANY, which should answer on localhost.
-        # ::0 and ::0.0.0.0 are non-canonical but common ways to write IN6ADDR_ANY.
-        return '::1'
-    return server_host
-
-def check_port(host, port, timeout=1.0):
-    """Raise an error if the given port is not free on the given host."""
-    if not host:
-        raise ValueError("Host values of '' or None are not allowed.")
-    host = client_host(host)
-    port = int(port)
-    
-    import socket
-    
-    # AF_INET or AF_INET6 socket
-    # Get the correct address family for our host (allows IPv6 addresses)
-    try:
-        info = socket.getaddrinfo(host, port, socket.AF_UNSPEC,
-                                  socket.SOCK_STREAM)
-    except socket.gaierror:
-        if ':' in host:
-            info = [(socket.AF_INET6, socket.SOCK_STREAM, 0, "", (host, port, 0, 0))]
-        else:
-            info = [(socket.AF_INET, socket.SOCK_STREAM, 0, "", (host, port))]
-    
-    for res in info:
-        af, socktype, proto, canonname, sa = res
-        s = None
-        try:
-            s = socket.socket(af, socktype, proto)
-            # See http://groups.google.com/group/cherrypy-users/
-            #        browse_frm/thread/bbfe5eb39c904fe0
-            s.settimeout(timeout)
-            s.connect((host, port))
-            s.close()
-            raise IOError("Port %s is in use on %s; perhaps the previous "
-                          "httpserver did not shut down properly." %
-                          (repr(port), repr(host)))
-        except socket.error:
-            if s:
-                s.close()
-
-
-# Feel free to increase these defaults on slow systems:
-free_port_timeout = 0.1
-occupied_port_timeout = 1.0
-
-def wait_for_free_port(host, port, timeout=None):
-    """Wait for the specified port to become free (drop requests)."""
-    if not host:
-        raise ValueError("Host values of '' or None are not allowed.")
-    if timeout is None:
-        timeout = free_port_timeout
-    
-    for trial in range(50):
-        try:
-            # we are expecting a free port, so reduce the timeout
-            check_port(host, port, timeout=timeout)
-        except IOError:
-            # Give the old server thread time to free the port.
-            time.sleep(timeout)
-        else:
-            return
-    
-    raise IOError("Port %r not free on %r" % (port, host))
-
-def wait_for_occupied_port(host, port, timeout=None):
-    """Wait for the specified port to become active (receive requests)."""
-    if not host:
-        raise ValueError("Host values of '' or None are not allowed.")
-    if timeout is None:
-        timeout = occupied_port_timeout
-    
-    for trial in range(50):
-        try:
-            check_port(host, port, timeout=timeout)
-        except IOError:
-            return
-        else:
-            time.sleep(timeout)
-    
-    raise IOError("Port %r not bound on %r" % (port, host))

cherrypy/process/win32.py

-"""Windows service. Requires pywin32."""
-
-import os
-import win32api
-import win32con
-import win32event
-import win32service
-import win32serviceutil
-
-from cherrypy.process import wspbus, plugins
-
-
-class ConsoleCtrlHandler(plugins.SimplePlugin):
-    """A WSPBus plugin for handling Win32 console events (like Ctrl-C)."""
-    
-    def __init__(self, bus):
-        self.is_set = False
-        plugins.SimplePlugin.__init__(self, bus)
-    
-    def start(self):
-        if self.is_set:
-            self.bus.log('Handler for console events already set.', level=40)
-            return
-        
-        result = win32api.SetConsoleCtrlHandler(self.handle, 1)
-        if result == 0:
-            self.bus.log('Could not SetConsoleCtrlHandler (error %r)' %
-                         win32api.GetLastError(), level=40)
-        else:
-            self.bus.log('Set handler for console events.', level=40)
-            self.is_set = True
-    
-    def stop(self):
-        if not self.is_set:
-            self.bus.log('Handler for console events already off.', level=40)
-            return
-        
-        try:
-            result = win32api.SetConsoleCtrlHandler(self.handle, 0)
-        except ValueError:
-            # "ValueError: The object has not been registered"
-            result = 1
-        
-        if result == 0:
-            self.bus.log('Could not remove SetConsoleCtrlHandler (error %r)' %
-                         win32api.GetLastError(), level=40)
-        else:
-            self.bus.log('Removed handler for console events.', level=40)
-            self.is_set = False
-    
-    def handle(self, event):
-        """Handle console control events (like Ctrl-C)."""
-        if event in (win32con.CTRL_C_EVENT, win32con.CTRL_LOGOFF_EVENT,
-                     win32con.CTRL_BREAK_EVENT, win32con.CTRL_SHUTDOWN_EVENT,
-                     win32con.CTRL_CLOSE_EVENT):
-            self.bus.log('Console event %s: shutting down bus' % event)
-            
-            # Remove self immediately so repeated Ctrl-C doesn't re-call it.
-            try:
-                self.stop()
-            except ValueError:
-                pass
-            
-            self.bus.exit()
-            # 'First to return True stops the calls'
-            return 1
-        return 0
-
-
-class Win32Bus(wspbus.Bus):
-    """A Web Site Process Bus implementation for Win32.
-    
-    Instead of time.sleep, this bus blocks using native win32event objects.
-    """
-    
-    def __init__(self):
-        self.events = {}
-        wspbus.Bus.__init__(self)
-    
-    def _get_state_event(self, state):
-        """Return a win32event for the given state (creating it if needed)."""
-        try:
-            return self.events[state]
-        except KeyError:
-            event = win32event.CreateEvent(None, 0, 0,
-                                           "WSPBus %s Event (pid=%r)" %
-                                           (state.name, os.getpid()))
-            self.events[state] = event
-            return event
-    
-    def _get_state(self):
-        return self._state
-    def _set_state(self, value):
-        self._state = value
-        event = self._get_state_event(value)
-        win32event.PulseEvent(event)
-    state = property(_get_state, _set_state)
-    
-    def wait(self, state, interval=0.1, channel=None):
-        """Wait for the given state(s), KeyboardInterrupt or SystemExit.
-        
-        Since this class uses native win32event objects, the interval
-        argument is ignored.
-        """
-        if isinstance(state, (tuple, list)):
-            # Don't wait for an event that beat us to the punch ;)
-            if self.state not in state:
-                events = tuple([self._get_state_event(s) for s in state])
-                win32event.WaitForMultipleObjects(events, 0, win32event.INFINITE)
-        else:
-            # Don't wait for an event that beat us to the punch ;)
-            if self.state != state:
-                event = self._get_state_event(state)
-                win32event.WaitForSingleObject(event, win32event.INFINITE)
-
-
-class _ControlCodes(dict):
-    """Control codes used to "signal" a service via ControlService.
-    
-    User-defined control codes are in the range 128-255. We generally use
-    the standard Python value for the Linux signal and add 128. Example:
-    
-        >>> signal.SIGUSR1
-        10
-        control_codes['graceful'] = 128 + 10
-    """
-    
-    def key_for(self, obj):
-        """For the given value, return its corresponding key."""
-        for key, val in self.items():
-            if val is obj:
-                return key
-        raise ValueError("The given object could not be found: %r" % obj)
-
-control_codes = _ControlCodes({'graceful': 138})
-
-
-def signal_child(service, command):
-    if command == 'stop':
-        win32serviceutil.StopService(service)
-    elif command == 'restart':
-        win32serviceutil.RestartService(service)
-    else:
-        win32serviceutil.ControlService(service, control_codes[command])
-
-
-class PyWebService(win32serviceutil.ServiceFramework):
-    """Python Web Service."""
-    
-    _svc_name_ = "Python Web Service"
-    _svc_display_name_ = "Python Web Service"
-    _svc_deps_ = None        # sequence of service names on which this depends
-    _exe_name_ = "pywebsvc"
-    _exe_args_ = None        # Default to no arguments
-    
-    # Only exists on Windows 2000 or later, ignored on windows NT
-    _svc_description_ = "Python Web Service"
-    
-    def SvcDoRun(self):
-        from cherrypy import process
-        process.bus.start()
-        process.bus.block()
-    
-    def SvcStop(self):
-        from cherrypy import process
-        self.ReportServiceStatus(win32service.SERVICE_STOP_PENDING)
-        process.bus.exit()
-    
-    def SvcOther(self, control):
-        process.bus.publish(control_codes.key_for(control))
-
-
-if __name__ == '__main__':
-    win32serviceutil.HandleCommandLine(PyWebService)

cherrypy/process/wspbus.py

-"""An implementation of the Web Site Process Bus.
-
-This module is completely standalone, depending only on the stdlib.
-
-Web Site Process Bus
---------------------
-
-A Bus object is used to contain and manage site-wide behavior:
-daemonization, HTTP server start/stop, process reload, signal handling,
-drop privileges, PID file management, logging for all of these,
-and many more.
-
-In addition, a Bus object provides a place for each web framework
-to register code that runs in response to site-wide events (like
-process start and stop), or which controls or otherwise interacts with
-the site-wide components mentioned above. For example, a framework which
-uses file-based templates would add known template filenames to an
-autoreload component.
-
-Ideally, a Bus object will be flexible enough to be useful in a variety
-of invocation scenarios:
-
- 1. The deployer starts a site from the command line via a
-    framework-neutral deployment script; applications from multiple frameworks
-    are mixed in a single site. Command-line arguments and configuration
-    files are used to define site-wide components such as the HTTP server,
-    WSGI component graph, autoreload behavior, signal handling, etc.
- 2. The deployer starts a site via some other process, such as Apache;
-    applications from multiple frameworks are mixed in a single site.
-    Autoreload and signal handling (from Python at least) are disabled.
- 3. The deployer starts a site via a framework-specific mechanism;
-    for example, when running tests, exploring tutorials, or deploying
-    single applications from a single framework. The framework controls
-    which site-wide components are enabled as it sees fit.
-
-The Bus object in this package uses topic-based publish-subscribe
-messaging to accomplish all this. A few topic channels are built in
-('start', 'stop', 'exit', 'graceful', 'log', and 'main'). Frameworks and
-site containers are free to define their own. If a message is sent to a
-channel that has not been defined or has no listeners, there is no effect.
-
-In general, there should only ever be a single Bus object per process.
-Frameworks and site containers share a single Bus object by publishing
-messages and subscribing listeners.
-
-The Bus object works as a finite state machine which models the current
-state of the process. Bus methods move it from one state to another;
-those methods then publish to subscribed listeners on the channel for
-the new state.::
-
-                        O
-                        |
-                        V
-       STOPPING --> STOPPED --> EXITING -> X
-          A   A         |
-          |    \___     |
-          |        \    |
-          |         V   V
-        STARTED <-- STARTING
-
-"""
-
-import atexit
-import os
-import sys
-import threading
-import time
-import traceback as _traceback
-import warnings
-
-from cherrypy._cpcompat import set
-
-# Here I save the value of os.getcwd(), which, if I am imported early enough,
-# will be the directory from which the startup script was run.  This is needed
-# by _do_execv(), to change back to the original directory before execv()ing a
-# new process.  This is a defense against the application having changed the
-# current working directory (which could make sys.executable "not found" if
-# sys.executable is a relative-path, and/or cause other problems).
-_startup_cwd = os.getcwd()
-
-class ChannelFailures(Exception):
-    """Exception raised when errors occur in a listener during Bus.publish()."""
-    delimiter = '\n'
-    
-    def __init__(self, *args, **kwargs):
-        # Don't use 'super' here; Exceptions are old-style in Py2.4
-        # See http://www.cherrypy.org/ticket/959
-        Exception.__init__(self, *args, **kwargs)
-        self._exceptions = list()
-    
-    def handle_exception(self):
-        """Append the current exception to self."""
-        self._exceptions.append(sys.exc_info()[1])
-    
-    def get_instances(self):
-        """Return a list of seen exception instances."""
-        return self._exceptions[:]
-    
-    def __str__(self):
-        exception_strings = map(repr, self.get_instances())
-        return self.delimiter.join(exception_strings)
-
-    __repr__ = __str__
-
-    def __bool__(self):
-        return bool(self._exceptions)
-    __nonzero__ = __bool__
-
-# Use a flag to indicate the state of the bus.
-class _StateEnum(object):
-    class State(object):
-        name = None
-        def __repr__(self):
-            return "states.%s" % self.name
-    
-    def __setattr__(self, key, value):
-        if isinstance(value, self.State):
-            value.name = key
-        object.__setattr__(self, key, value)
-states = _StateEnum()
-states.STOPPED = states.State()
-states.STARTING = states.State()
-states.STARTED = states.State()
-states.STOPPING = states.State()
-states.EXITING = states.State()
-
-
-try:
-    import fcntl
-except ImportError:
-    max_files = 0
-else:
-    try:
-        max_files = os.sysconf('SC_OPEN_MAX')
-    except AttributeError:
-        max_files = 1024
-
-
-class Bus(object):
-    """Process state-machine and messenger for HTTP site deployment.
-    
-    All listeners for a given channel are guaranteed to be called even
-    if others at the same channel fail. Each failure is logged, but
-    execution proceeds on to the next listener. The only way to stop all
-    processing from inside a listener is to raise SystemExit and stop the
-    whole server.
-    """
-    
-    states = states
-    state = states.STOPPED
-    execv = False
-    max_cloexec_files = max_files
-    
-    def __init__(self):
-        self.execv = False
-        self.state = states.STOPPED
-        self.listeners = dict(
-            [(channel, set()) for channel
-             in ('start', 'stop', 'exit', 'graceful', 'log', 'main')])
-        self._priorities = {}
-    
-    def subscribe(self, channel, callback, priority=None):
-        """Add the given callback at the given channel (if not present)."""
-        if channel not in self.listeners:
-            self.listeners[channel] = set()
-        self.listeners[channel].add(callback)
-        
-        if priority is None:
-            priority = getattr(callback, 'priority', 50)
-        self._priorities[(channel, callback)] = priority
-    
-    def unsubscribe(self, channel, callback):
-        """Discard the given callback (if present)."""
-        listeners = self.listeners.get(channel)
-        if listeners and callback in listeners:
-            listeners.discard(callback)
-            del self._priorities[(channel, callback)]
-    
-    def publish(self, channel, *args, **kwargs):
-        """Return output of all subscribers for the given channel."""
-        if channel not in self.listeners:
-            return []
-        
-        exc = ChannelFailures()
-        output = []
-        
-        items = [(self._priorities[(channel, listener)], listener)
-                 for listener in self.listeners[channel]]
-        try:
-            items.sort(key=lambda item: item[0])
-        except TypeError:
-            # Python 2.3 had no 'key' arg, but that doesn't matter
-            # since it could sort dissimilar types just fine.
-            items.sort()
-        for priority, listener in items:
-            try:
-                output.append(listener(*args, **kwargs))
-            except KeyboardInterrupt:
-                raise
-            except SystemExit:
-                e = sys.exc_info()[1]
-                # If we have previous errors ensure the exit code is non-zero
-                if exc and e.code == 0:
-                    e.code = 1
-                raise
-            except:
-                exc.handle_exception()
-                if channel == 'log':
-                    # Assume any further messages to 'log' will fail.
-                    pass
-                else:
-                    self.log("Error in %r listener %r" % (channel, listener),
-                             level=40, traceback=True)
-        if exc:
-            raise exc
-        return output
-    
-    def _clean_exit(self):
-        """An atexit handler which asserts the Bus is not running."""
-        if self.state != states.EXITING:
-            warnings.warn(
-                "The main thread is exiting, but the Bus is in the %r state; "
-                "shutting it down automatically now. You must either call "
-                "bus.block() after start(), or call bus.exit() before the "
-                "main thread exits." % self.state, RuntimeWarning)
-            self.exit()
-    
-    def start(self):
-        """Start all services."""
-        atexit.register(self._clean_exit)
-        
-        self.state = states.STARTING
-        self.log('Bus STARTING')
-        try:
-            self.publish('start')
-            self.state = states.STARTED
-            self.log('Bus STARTED')
-        except (KeyboardInterrupt, SystemExit):
-            raise
-        except:
-            self.log("Shutting down due to error in start listener:",
-                     level=40, traceback=True)
-            e_info = sys.exc_info()[1]
-            try:
-                self.exit()
-            except:
-                # Any stop/exit errors will be logged inside publish().
-                pass
-            # Re-raise the original error
-            raise e_info
-    
-    def exit(self):
-        """Stop all services and prepare to exit the process."""
-        exitstate = self.state
-        try:
-            self.stop()
-            
-            self.state = states.EXITING
-            self.log('Bus EXITING')
-            self.publish('exit')
-            # This isn't strictly necessary, but it's better than seeing
-            # "Waiting for child threads to terminate..." and then nothing.
-            self.log('Bus EXITED')
-        except:
-            # This method is often called asynchronously (whether thread,
-            # signal handler, console handler, or atexit handler), so we
-            # can't just let exceptions propagate out unhandled.
-            # Assume it's been logged and just die.
-            os._exit(70) # EX_SOFTWARE
-        
-        if exitstate == states.STARTING:
-            # exit() was called before start() finished, possibly due to
-            # Ctrl-C because a start listener got stuck. In this case,
-            # we could get stuck in a loop where Ctrl-C never exits the
-            # process, so we just call os.exit here.
-            os._exit(70) # EX_SOFTWARE
-    
-    def restart(self):
-        """Restart the process (may close connections).
-        
-        This method does not restart the process from the calling thread;
-        instead, it stops the bus and asks the main thread to call execv.
-        """
-        self.execv = True
-        self.exit()
-    
-    def graceful(self):
-        """Advise all services to reload."""
-        self.log('Bus graceful')
-        self.publish('graceful')
-    
-    def block(self, interval=0.1):
-        """Wait for the EXITING state, KeyboardInterrupt or SystemExit.
-        
-        This function is intended to be called only by the main thread.
-        After waiting for the EXITING state, it also waits for all threads
-        to terminate, and then calls os.execv if self.execv is True. This
-        design allows another thread to call bus.restart, yet have the main
-        thread perform the actual execv call (required on some platforms).
-        """
-        try:
-            self.wait(states.EXITING, interval=interval, channel='main')
-        except (KeyboardInterrupt, IOError):
-            # The time.sleep call might raise
-            # "IOError: [Errno 4] Interrupted function call" on KBInt.
-            self.log('Keyboard Interrupt: shutting down bus')
-            self.exit()
-        except SystemExit:
-            self.log('SystemExit raised: shutting down bus')
-            self.exit()
-            raise
-        
-        # Waiting for ALL child threads to finish is necessary on OS X.
-        # See http://www.cherrypy.org/ticket/581.
-        # It's also good to let them all shut down before allowing
-        # the main thread to call atexit handlers.
-        # See http://www.cherrypy.org/ticket/751.
-        self.log("Waiting for child threads to terminate...")
-        for t in threading.enumerate():
-            if t != threading.currentThread() and t.isAlive():
-                # Note that any dummy (external) threads are always daemonic.
-                if hasattr(threading.Thread, "daemon"):
-                    # Python 2.6+
-                    d = t.daemon
-                else:
-                    d = t.isDaemon()
-                if not d:
-                    self.log("Waiting for thread %s." % t.getName())
-                    t.join()
-        
-        if self.execv:
-            self._do_execv()
-    
-    def wait(self, state, interval=0.1, channel=None):
-        """Poll for the given state(s) at intervals; publish to channel."""
-        if isinstance(state, (tuple, list)):
-            states = state
-        else:
-            states = [state]
-        
-        def _wait():
-            while self.state not in states:
-                time.sleep(interval)
-                self.publish(channel)
-        
-        # From http://psyco.sourceforge.net/psycoguide/bugs.html:
-        # "The compiled machine code does not include the regular polling
-        # done by Python, meaning that a KeyboardInterrupt will not be
-        # detected before execution comes back to the regular Python
-        # interpreter. Your program cannot be interrupted if caught
-        # into an infinite Psyco-compiled loop."
-        try:
-            sys.modules['psyco'].cannotcompile(_wait)
-        except (KeyError, AttributeError):
-            pass
-        
-        _wait()
-    
-    def _do_execv(self):
-        """Re-execute the current process.
-        
-        This must be called from the main thread, because certain platforms
-        (OS X) don't allow execv to be called in a child thread very well.
-        """
-        args = sys.argv[:]
-        self.log('Re-spawning %s' % ' '.join(args))
-        
-        if sys.platform[:4] == 'java':
-            from _systemrestart import SystemRestart
-            raise SystemRestart
-        else:
-            args.insert(0, sys.executable)
-            if sys.platform == 'win32':
-                args = ['"%s"' % arg for arg in args]
-
-            os.chdir(_startup_cwd)
-            if self.max_cloexec_files:
-                self._set_cloexec()
-            os.execv(sys.executable, args)
-    
-    def _set_cloexec(self):
-        """Set the CLOEXEC flag on all open files (except stdin/out/err).
-        
-        If self.max_cloexec_files is an integer (the default), then on
-        platforms which support it, it represents the max open files setting
-        for the operating system. This function will be called just before
-        the process is restarted via os.execv() to prevent open files
-        from persisting into the new process.
-        
-        Set self.max_cloexec_files to 0 to disable this behavior.
-        """
-        for fd in range(3, self.max_cloexec_files): # skip stdin/out/err
-            try:
-                flags = fcntl.fcntl(fd, fcntl.F_GETFD)
-            except IOError:
-                continue
-            fcntl.fcntl(fd, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC)
-    
-    def stop(self):
-        """Stop all services."""
-        self.state = states.STOPPING
-        self.log('Bus STOPPING')
-        self.publish('stop')
-        self.state = states.STOPPED
-        self.log('Bus STOPPED')
-    
-    def start_with_callback(self, func, args=None, kwargs=None):
-        """Start 'func' in a new thread T, then start self (and return T)."""
-        if args is None:
-            args = ()
-        if kwargs is None:
-            kwargs = {}
-        args = (func,) + args
-        
-        def _callback(func, *a, **kw):
-            self.wait(states.STARTED)
-            func(*a, **kw)
-        t = threading.Thread(target=_callback, args=args, kwargs=kwargs)
-        t.setName('Bus Callback ' + t.getName())
-        t.start()
-        
-        self.start()
-        
-        return t
-    
-    def log(self, msg="", level=20, traceback=False):
-        """Log the given message. Append the last traceback if requested."""
-        if traceback:
-            msg += "\n" + "".join(_traceback.format_exception(*sys.exc_info()))
-        self.publish('log', msg, level)
-
-bus = Bus()

cherrypy/test/test_bus.py

-import threading
-import time
-import unittest
-
-import cherrypy
-from cherrypy._cpcompat import get_daemon, set
-from cherrypy.process import wspbus
-
-
-msg = "Listener %d on channel %s: %s."
-
-
-class PublishSubscribeTests(unittest.TestCase):
-
-    def get_listener(self, channel, index):
-        def listener(arg=None):
-            self.responses.append(msg % (index, channel, arg))
-        return listener
-
-    def test_builtin_channels(self):
-        b = wspbus.Bus()
-
-        self.responses, expected = [], []
-
-        for channel in b.listeners:
-            for index, priority in enumerate([100, 50, 0, 51]):
-                b.subscribe(channel, self.get_listener(channel, index), priority)
-
-        for channel in b.listeners:
-            b.publish(channel)
-            expected.extend([msg % (i, channel, None) for i in (2, 1, 3, 0)])
-            b.publish(channel, arg=79347)
-            expected.extend([msg % (i, channel, 79347) for i in (2, 1, 3, 0)])
-
-        self.assertEqual(self.responses, expected)
-
-    def test_custom_channels(self):
-        b = wspbus.Bus()
-
-        self.responses, expected = [], []
-
-        custom_listeners = ('hugh', 'louis', 'dewey')
-        for channel in custom_listeners:
-            for index, priority in enumerate([None, 10, 60, 40]):
-                b.subscribe(channel, self.get_listener(channel, index), priority)
-
-        for channel in custom_listeners:
-            b.publish(channel, 'ah so')
-            expected.extend([msg % (i, channel, 'ah so') for i in (1, 3, 0, 2)])
-            b.publish(channel)
-            expected.extend([msg % (i, channel, None) for i in (1, 3, 0, 2)])
-
-        self.assertEqual(self.responses, expected)
-
-    def test_listener_errors(self):
-        b = wspbus.Bus()
-
-        self.responses, expected = [], []
-        channels = [c for c in b.listeners if c != 'log']
-
-        for channel in channels:
-            b.subscribe(channel, self.get_listener(channel, 1))
-            # This will break since the lambda takes no args.
-            b.subscribe(channel, lambda: None, priority=20)
-
-        for channel in channels:
-            self.assertRaises(wspbus.ChannelFailures, b.publish, channel, 123)
-            expected.append(msg % (1, channel, 123))
-
-        self.assertEqual(self.responses, expected)
-
-
-class BusMethodTests(unittest.TestCase):
-
-    def log(self, bus):
-        self._log_entries = []
-        def logit(msg, level):
-            self._log_entries.append(msg)
-        bus.subscribe('log', logit)
-
-    def assertLog(self, entries):
-        self.assertEqual(self._log_entries, entries)
-
-    def get_listener(self, channel, index):
-        def listener(arg=None):
-            self.responses.append(msg % (index, channel, arg))
-        return listener
-
-    def test_start(self):
-        b = wspbus.Bus()
-        self.log(b)
-
-        self.responses = []
-        num = 3
-        for index in range(num):
-            b.subscribe('start', self.get_listener('start', index))
-
-        b.start()
-        try:
-            # The start method MUST call all 'start' listeners.
-            self.assertEqual(set(self.responses),
-                             set([msg % (i, 'start', None) for i in range(num)]))
-            # The start method MUST move the state to STARTED
-            # (or EXITING, if errors occur)
-            self.assertEqual(b.state, b.states.STARTED)
-            # The start method MUST log its states.
-            self.assertLog(['Bus STARTING', 'Bus STARTED'])
-        finally:
-            # Exit so the atexit handler doesn't complain.
-            b.exit()
-
-    def test_stop(self):
-        b = wspbus.Bus()
-        self.log(b)
-
-        self.responses = []
-        num = 3
-        for index in range(num):
-            b.subscribe('stop', self.get_listener('stop', index))
-
-        b.stop()
-
-        # The stop method MUST call all 'stop' listeners.
-        self.assertEqual(set(self.responses),
-                         set([msg % (i, 'stop', None) for i in range(num)]))
-        # The stop method MUST move the state to STOPPED
-        self.assertEqual(b.state, b.states.STOPPED)
-        # The stop method MUST log its states.
-        self.assertLog(['Bus STOPPING', 'Bus STOPPED'])
-
-    def test_graceful(self):
-        b = wspbus.Bus()
-        self.log(b)
-
-        self.responses = []
-        num = 3
-        for index in range(num):
-            b.subscribe('graceful', self.get_listener('graceful', index))
-
-        b.graceful()
-
-        # The graceful method MUST call all 'graceful' listeners.
-        self.assertEqual(set(self.responses),
-                         set([msg % (i, 'graceful', None) for i in range(num)]))
-        # The graceful method MUST log its states.
-        self.assertLog(['Bus graceful'])
-
-    def test_exit(self):
-        b = wspbus.Bus()
-        self.log(b)
-
-        self.responses = []
-        num = 3
-        for index in range(num):
-            b.subscribe('stop', self.get_listener('stop', index))
-            b.subscribe('exit', self.get_listener('exit', index))
-
-        b.exit()
-
-        # The exit method MUST call all 'stop' listeners,
-        # and then all 'exit' listeners.
-        self.assertEqual(set(self.responses),
-                         set([msg % (i, 'stop', None) for i in range(num)] +
-                             [msg % (i, 'exit', None) for i in range(num)]))
-        # The exit method MUST move the state to EXITING
-        self.assertEqual(b.state, b.states.EXITING)
-        # The exit method MUST log its states.
-        self.assertLog(['Bus STOPPING', 'Bus STOPPED', 'Bus EXITING', 'Bus EXITED'])
-
-    def test_wait(self):
-        b = wspbus.Bus()
-
-        def f(method):
-            time.sleep(0.2)
-            getattr(b, method)()
-
-        for method, states in [('start', [b.states.STARTED]),
-                               ('stop', [b.states.STOPPED]),
-                               ('start', [b.states.STARTING, b.states.STARTED]),
-                               ('exit', [b.states.EXITING]),
-                               ]:
-            threading.Thread(target=f, args=(method,)).start()
-            b.wait(states)
-
-            # The wait method MUST wait for the given state(s).
-            if b.state not in states:
-                self.fail("State %r not in %r" % (b.state, states))
-
-    def test_block(self):
-        b = wspbus.Bus()
-        self.log(b)
-
-        def f():
-            time.sleep(0.2)
-            b.exit()
-        def g():
-            time.sleep(0.4)
-        threading.Thread(target=f).start()
-        threading.Thread(target=g).start()
-        threads = [t for t in threading.enumerate() if not get_daemon(t)]
-        self.assertEqual(len(threads), 3)
-
-        b.block()
-
-        # The block method MUST wait for the EXITING state.
-        self.assertEqual(b.state, b.states.EXITING)
-        # The block method MUST wait for ALL non-main, non-daemon threads to finish.
-        threads = [t for t in threading.enumerate() if not get_daemon(t)]
-        self.assertEqual(len(threads), 1)
-        # The last message will mention an indeterminable thread name; ignore it
-        self.assertEqual(self._log_entries[:-1],
-                         ['Bus STOPPING', 'Bus STOPPED',
-                          'Bus EXITING', 'Bus EXITED',
-                          'Waiting for child threads to terminate...'])
-
-    def test_start_with_callback(self):
-        b = wspbus.Bus()
-        self.log(b)
-        try:
-            events = []
-            def f(*args, **kwargs):
-                events.append(("f", args, kwargs))
-            def g():
-                events.append("g")
-            b.subscribe("start", g)
-            b.start_with_callback(f, (1, 3, 5), {"foo": "bar"})
-            # Give wait() time to run f()
-            time.sleep(0.2)
-
-            # The callback method MUST wait for the STARTED state.
-            self.assertEqual(b.state, b.states.STARTED)
-            # The callback method MUST run after all start methods.
-            self.assertEqual(events, ["g", ("f", (1, 3, 5), {"foo": "bar"})])
-        finally:
-            b.exit()
-
-    def test_log(self):
-        b = wspbus.Bus()
-        self.log(b)
-        self.assertLog([])
-
-        # Try a normal message.
-        expected = []
-        for msg in ["O mah darlin'"] * 3 + ["Clementiiiiiiiine"]:
-            b.log(msg)
-            expected.append(msg)
-            self.assertLog(expected)
-
-        # Try an error message
-        try:
-            foo
-        except NameError:
-            b.log("You are lost and gone forever", traceback=True)
-            lastmsg = self._log_entries[-1]
-            if "Traceback" not in lastmsg or "NameError" not in lastmsg:
-                self.fail("Last log message %r did not contain "
-                          "the expected traceback." % lastmsg)
-        else:
-            self.fail("NameError was not raised as expected.")
-
-
-if __name__ == "__main__":
-    unittest.main()
         data_files=data_files,
         scripts=scripts,
         cmdclass=cmd_class,
+        install_requires=[
+            "processbus>=3.3.0alpha",
+        ],
     )
 
 
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.