Commits

Robert Brewer  committed d8433a0 Merge

Merge.

  • Participants
  • Parent commits 5b2a6ad, f4b78ca

Comments (0)

Files changed (9)

File magicbus/LICENSE.txt

-Copyright (c) 2004-2011, CherryPy Team (team@cherrypy.org)
+Copyright (c) 2004-2014, CherryPy Team (team@cherrypy.org)
 All rights reserved.
 
 Redistribution and use in source and binary forms, with or without modification, 

File magicbus/__init__.py

-"""A pub/sub Bus for managing process states.
+"""A pub/sub Bus for managing states and transitions.
 
-A Bus object is used to connect applications, servers,
-and frameworks with site-wide services such as daemonization, process
-reload, signal handling, drop privileges, PID file management, logging
-for all of these, and many more.
+The 'process' subpackage defines a ProcessBus object, which is used to
+connect applications, servers, and frameworks with site-wide services
+such as daemonization, process reload, signal handling, drop privileges,
+PID file management, logging for all of these, and many more.
 
 The 'plugins' subpackage defines a few abstract and concrete services for
-use with the bus. Some use tool-specific channels; see the documentation
-for each class.
+use with a Bus. Some use custom channels; see the documentation for each class.
 """
 
 from magicbus.base import ChannelFailures
+
 try:
-    from magicbus.win32 import Win32Bus as Bus
+    from magicbus.win32 import Win32Bus as Bus, Win32ProcessBus as ProcessBus
 except ImportError:
     from magicbus.base import Bus
+    from magicbus.process import ProcessBus
+bus = ProcessBus()
 
-bus = Bus()
+__all__ = ["ChannelFailures", "Bus", "ProcessBus", "bus"]

File magicbus/base.py

-"""A pub/sub Bus for managing process states.
+"""A pub/sub Bus for managing states.
 
-This package is completely standalone, depending only on the stdlib.
-
-A Bus object is used to contain and manage site-wide behavior:
-daemonization, socket server start/stop, process reload, signal handling,
-drop privileges, PID file management, logging for all of these,
-and many more.
-
-In addition, a Bus object provides a place for frameworks
-to register code that runs in response to site-wide events (like
-process start and stop), or which controls or otherwise interacts with
-the site-wide components mentioned above. For example, a framework which
-uses file-based templates would add known template filenames to an
-autoreload component.
-
-Ideally, a Bus object will be flexible enough to be useful in a variety
-of invocation scenarios:
-
- 1. The deployer starts a site from the command line via a
-    framework-neutral deployment script; applications from multiple frameworks
-    are mixed in a single site. Command-line arguments and configuration
-    files are used to define site-wide components such as a socket server,
-    component graphs, autoreload behavior, signal handling, etc.
- 2. The deployer starts a site via some other process, such as Apache;
-    applications from multiple frameworks are mixed in a single site.
-    Autoreload and signal handling (from Python at least) are disabled.
- 3. The deployer starts a site via a framework-specific mechanism;
-    for example, when running tests, exploring tutorials, or deploying
-    single applications from a single framework. The framework controls
-    which site-wide components are enabled as it sees fit.
+A Bus object is used to contain and manage behavior for any system
+of diverse components. A Bus object provides a place for frameworks
+to register code that runs in response to events, or which controls
+or otherwise interacts with the components.
 
 The Bus object in this package uses topic-based publish-subscribe
-messaging to accomplish all this. A few topic channels are built in
-('start', 'stop', 'exit', 'graceful', 'log', and 'main'). Frameworks and
-site containers are free to define their own. If a message is sent to a
+messaging to accomplish all this. Frameworks and site containers
+are free to define their own channels. If a message is sent to a
 channel that has not been defined or has no listeners, there is no effect.
-
-In general, there should only ever be a single Bus object per process.
-Frameworks and site containers share a single Bus object by publishing
-messages and subscribing listeners.
-
-The Bus object works as a finite state machine which models the current
-state of the process. Bus methods move it from one state to another;
-those methods then publish to subscribed listeners on the channel for
-the new state.::
-
-                        O
-                        |
-                        V
-       STOPPING --> STOPPED --> EXITING -> X
-          A   A         |
-          |    \___     |
-          |        \    |
-          |         V   V
-        STARTED <-- STARTING
-
 """
 
-import atexit
-import os
 import sys
-import threading
 import time
 import traceback as _traceback
-import warnings
-
-# Here I save the value of os.getcwd(), which, if I am imported early enough,
-# will be the directory from which the startup script was run.  This is needed
-# by _do_execv(), to change back to the original directory before execv()ing a
-# new process.  This is a defense against the application having changed the
-# current working directory (which could make sys.executable "not found" if
-# sys.executable is a relative-path, and/or cause other problems).
-_startup_cwd = os.getcwd()
 
 
 class ChannelFailures(Exception):
-    """Exception raised when errors occur in a listener during Bus.publish().
-    """
+    """Exception raised when errors occur in a listener during .publish()."""
     delimiter = '\n'
 
     def __init__(self, *args, **kwargs):
     __nonzero__ = __bool__
 
 
-# Use a flag to indicate the state of the bus.
-class _StateEnum(object):
+class State(object):
 
-    class State(object):
-        name = None
+    def __init__(self, name):
+        self.name = name
 
-        def __repr__(self):
-            return "states.%s" % self.name
+    def __repr__(self):
+        return "State(%s)" % repr(self.name)
 
-    def __setattr__(self, key, value):
-        if isinstance(value, self.State):
-            value.name = key
-        object.__setattr__(self, key, value)
-states = _StateEnum()
-states.STOPPED = states.State()
-states.STARTING = states.State()
-states.STARTED = states.State()
-states.STOPPING = states.State()
-states.EXITING = states.State()
 
-
-try:
-    import fcntl
-except ImportError:
-    max_files = 0
-else:
-    try:
-        max_files = os.sysconf('SC_OPEN_MAX')
-    except AttributeError:
-        max_files = 1024
+class StateEnum(object):
+    """An object with enumerated state attributes."""
+    pass
 
 
 class Bus(object):
-    """Process state-machine and messenger.
+    """State machine and pub/sub messenger.
 
     All listeners for a given channel are guaranteed to be called even
     if others at the same channel fail. Each failure is logged, but
     whole server.
     """
 
-    states = states
-    state = states.STOPPED
-    debug = False
-    execv = False
-    max_cloexec_files = max_files
+    def __init__(self, states=None, state=None, channels=None):
+        if isinstance(states, StateEnum):
+            self.states = states
+        else:
+            self.states = StateEnum()
+            for s in states or []:
+                setattr(self.states, s, State(s))
 
-    def __init__(self):
-        self.execv = False
-        self.state = states.STOPPED
-        self.listeners = dict(
-            [(channel, set()) for channel
-             in ('start', 'stop', 'exit', 'graceful', 'log', 'main')])
+        if state is not None:
+            state = getattr(self.states, state)
+        self.state = state
+
+        if channels is None:
+            channels = ('log',)
+        self.listeners = dict((channel, set()) for channel in channels)
+
         self._priorities = {}
         self.debug = False
 
 
     def clear(self):
         """Discard all subscribed callbacks."""
-        self.execv = False
-        self.state = states.STOPPED
         # Use items() as a snapshot instead of while+pop so that callers
         # can be slightly lax in subscribing new listeners while the old
         # ones are being removed.
             raise exc
         return output
 
-    def _clean_exit(self):
-        """An atexit handler which asserts the Bus is not running."""
-        if self.state != states.EXITING:
-            warnings.warn(
-                "The main thread is exiting, but the Bus is in the %r state; "
-                "shutting it down automatically now. You must either call "
-                "bus.block() after start(), or call bus.exit() before the "
-                "main thread exits." % self.state, RuntimeWarning)
-            self.exit()
-
-    def start(self):
-        """Start all services."""
-        atexit.register(self._clean_exit)
-
-        self.state = states.STARTING
-        self.log('Bus STARTING')
-        try:
-            self.publish('start')
-            self.state = states.STARTED
-            self.log('Bus STARTED')
-        except (KeyboardInterrupt, SystemExit):
-            raise
-        except:
-            self.log("Shutting down due to error in start listener:",
-                     level=40, traceback=True)
-            e_info = sys.exc_info()[1]
-            try:
-                self.exit()
-            except:
-                # Any stop/exit errors will be logged inside publish().
-                pass
-            # Re-raise the original error
-            raise e_info
-
-    def exit(self):
-        """Stop all services and prepare to exit the process."""
-        exitstate = self.state
-        try:
-            self.stop()
-
-            self.state = states.EXITING
-            self.log('Bus EXITING')
-            self.publish('exit')
-            # This isn't strictly necessary, but it's better than seeing
-            # "Waiting for child threads to terminate..." and then nothing.
-            self.log('Bus EXITED')
-        except:
-            # This method is often called asynchronously (whether thread,
-            # signal handler, console handler, or atexit handler), so we
-            # can't just let exceptions propagate out unhandled.
-            # Assume it's been logged and just die.
-            os._exit(70)  # EX_SOFTWARE
-
-        if exitstate == states.STARTING:
-            # exit() was called before start() finished, possibly due to
-            # Ctrl-C because a start listener got stuck. In this case,
-            # we could get stuck in a loop where Ctrl-C never exits the
-            # process, so we just call os.exit here.
-            os._exit(70)  # EX_SOFTWARE
-
-    def restart(self):
-        """Restart the process (may close connections).
-
-        This method does not restart the process from the calling thread;
-        instead, it stops the bus and asks the main thread to call execv.
-        """
-        self.execv = True
-        self.exit()
-
-    def graceful(self):
-        """Advise all services to reload."""
-        self.log('Bus graceful')
-        self.publish('graceful')
-
-    def block(self, interval=0.1):
-        """Wait for the EXITING state, KeyboardInterrupt or SystemExit.
-
-        This function is intended to be called only by the main thread.
-        After waiting for the EXITING state, it also waits for all threads
-        to terminate, and then calls os.execv if self.execv is True. This
-        design allows another thread to call bus.restart, yet have the main
-        thread perform the actual execv call (required on some platforms).
-        """
-        try:
-            self.wait(states.EXITING, interval=interval, channel='main')
-        except (KeyboardInterrupt, IOError):
-            # The time.sleep call might raise
-            # "IOError: [Errno 4] Interrupted function call" on KBInt.
-            self.log('Keyboard Interrupt: shutting down bus')
-            self.exit()
-        except SystemExit:
-            self.log('SystemExit raised: shutting down bus')
-            self.exit()
-            raise
-
-        # Waiting for ALL child threads to finish is necessary on OS X.
-        # See https://bitbucket.org/cherrypy/cherrypy/issue/581.
-        # It's also good to let them all shut down before allowing
-        # the main thread to call atexit handlers.
-        # See https://bitbucket.org/cherrypy/cherrypy/issue/751.
-        self.log("Waiting for child threads to terminate...")
-        for t in threading.enumerate():
-            if t != threading.currentThread() and t.isAlive():
-                # Note that any dummy (external) threads are always daemonic.
-                if not t.daemon:
-                    self.log("Waiting for thread %s." % t.getName())
-                    t.join()
-
-        if self.execv:
-            self._do_execv()
-
     def wait(self, state, interval=0.1, channel=None):
         """Poll for the given state(s) at intervals; publish to channel."""
         if isinstance(state, (tuple, list)):
-            _states = state
+            _states_to_wait_for = state
         else:
-            _states = [state]
+            _states_to_wait_for = [state]
 
         def _wait():
-            while self.state not in _states:
+            while self.state not in _states_to_wait_for:
                 time.sleep(interval)
                 self.publish(channel)
 
 
         _wait()
 
-    def _do_execv(self):
-        """Re-execute the current process.
-
-        This must be called from the main thread, because certain platforms
-        (OS X) don't allow execv to be called in a child thread very well.
-        """
-        args = sys.argv[:]
-        self.log('Re-spawning %s' % ' '.join(args))
-
-        if sys.platform[:4] == 'java':
-            from _systemrestart import SystemRestart
-            raise SystemRestart
-        else:
-            args.insert(0, sys.executable)
-            if sys.platform == 'win32':
-                args = ['"%s"' % arg for arg in args]
-
-            os.chdir(_startup_cwd)
-            if self.max_cloexec_files:
-                self._set_cloexec()
-            os.execv(sys.executable, args)
-
-    def _set_cloexec(self):
-        """Set the CLOEXEC flag on all open files (except stdin/out/err).
-
-        If self.max_cloexec_files is an integer (the default), then on
-        platforms which support it, it represents the max open files setting
-        for the operating system. This function will be called just before
-        the process is restarted via os.execv() to prevent open files
-        from persisting into the new process.
-
-        Set self.max_cloexec_files to 0 to disable this behavior.
-        """
-        for fd in range(3, self.max_cloexec_files):  # skip stdin/out/err
-            try:
-                flags = fcntl.fcntl(fd, fcntl.F_GETFD)
-            except IOError:
-                continue
-            fcntl.fcntl(fd, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC)
-
-    def stop(self):
-        """Stop all services."""
-        self.state = states.STOPPING
-        self.log('Bus STOPPING')
-        self.publish('stop')
-        self.state = states.STOPPED
-        self.log('Bus STOPPED')
-
-    def start_with_callback(self, func, args=None, kwargs=None):
-        """Start 'func' in a new thread T, then start self (and return T)."""
-        if args is None:
-            args = ()
-        if kwargs is None:
-            kwargs = {}
-        args = (func,) + args
-
-        def _callback(func_, *a, **kw):
-            self.wait(states.STARTED)
-            func_(*a, **kw)
-        t = threading.Thread(target=_callback, args=args, kwargs=kwargs)
-        t.setName('Bus Callback ' + t.getName())
-        t.start()
-
-        self.start()
-
-        return t
-
     def log(self, msg="", level=20, traceback=False):
         """Log the given message. Append the last traceback if requested."""
         if traceback:

File magicbus/compat.py

 """Compatibility code for using magicbus with various versions of Python.
 
-Process Bus 3.3 is compatible with Python versions 2.7+. This module provides a
+Magic Bus 3.3 is compatible with Python versions 2.7+. This module provides a
 useful abstraction over the differences between Python versions, sometimes by
 preferring a newer idiom, sometimes an older one, and sometimes a custom one.
 

File magicbus/plugins/__init__.py

     def subscribe(self):
         """Register this object as a (multi-channel) listener on the bus."""
         for channel in self.bus.listeners:
-            # Subscribe self.start, self.exit, etc. if present.
             method = getattr(self, channel, None)
             if method is not None:
                 self.bus.subscribe(channel, method)
     def unsubscribe(self):
         """Unregister this object as a listener on the bus."""
         for channel in self.bus.listeners:
-            # Unsubscribe self.start, self.exit, etc. if present.
             method = getattr(self, channel, None)
             if method is not None:
                 self.bus.unsubscribe(channel, method)

File magicbus/process.py

+import atexit
+import os
+import sys
+import threading
+import warnings
+
+try:
+    import fcntl
+except ImportError:
+    max_files = 0
+else:
+    try:
+        max_files = os.sysconf('SC_OPEN_MAX')
+    except AttributeError:
+        max_files = 1024
+
+from magicbus import base
+
+# Here I save the value of os.getcwd(), which, if I am imported early enough,
+# will be the directory from which the startup script was run.  This is needed
+# by _do_execv(), to change back to the original directory before execv()ing a
+# new process.  This is a defense against the application having changed the
+# current working directory (which could make sys.executable "not found" if
+# sys.executable is a relative-path, and/or cause other problems).
+_startup_cwd = os.getcwd()
+
+
+class ProcessBus(base.Bus):
+    """Process state-machine and messenger.
+
+    All listeners for a given channel are guaranteed to be called even
+    if others at the same channel fail. Each failure is logged, but
+    execution proceeds on to the next listener. The only way to stop all
+    processing from inside a listener is to raise SystemExit and stop the
+    whole server.
+
+    In general, there should only ever be a single ProcessBus object per process.
+    Frameworks and site containers share a single ProcessBus object by publishing
+    messages and subscribing listeners.
+
+    The ProcessBus works as a finite state machine which models the current
+    state of the process. ProcessBus methods move it from one state to another;
+    those methods then publish to subscribed listeners on the channel for
+    the new state.::
+
+                            O
+                            |
+                            V
+           STOPPING --> STOPPED --> EXITING -> X
+              A   A         |
+              |    \___     |
+              |        \    |
+              |         V   V
+            STARTED <-- STARTING
+    """
+
+    execv = False
+    max_cloexec_files = max_files
+
+    def __init__(self):
+        self.execv = False
+        base.Bus.__init__(
+            self,
+            states=('STOPPED', 'STARTING', 'STARTED', 'STOPPING', 'EXITING'),
+            state='STOPPED',
+            channels=('start', 'stop', 'exit', 'graceful', 'log', 'main')
+        )
+
+    def _clean_exit(self):
+        """An atexit handler which asserts the Bus is not running."""
+        if self.state != self.states.EXITING:
+            warnings.warn(
+                "The main thread is exiting, but the Bus is in the %r state; "
+                "shutting it down automatically now. You must either call "
+                "bus.block() after start(), or call bus.exit() before the "
+                "main thread exits." % self.state, RuntimeWarning)
+            self.exit()
+
+    def start(self):
+        """Start all services."""
+        atexit.register(self._clean_exit)
+
+        self.state = self.states.STARTING
+        self.log('Bus STARTING')
+        try:
+            self.publish('start')
+            self.state = self.states.STARTED
+            self.log('Bus STARTED')
+        except (KeyboardInterrupt, SystemExit):
+            raise
+        except:
+            self.log("Shutting down due to error in start listener:",
+                     level=40, traceback=True)
+            e_info = sys.exc_info()[1]
+            try:
+                self.exit()
+            except:
+                # Any stop/exit errors will be logged inside publish().
+                pass
+            # Re-raise the original error
+            raise e_info
+
+    def exit(self):
+        """Stop all services and prepare to exit the process."""
+        exitstate = self.state
+        try:
+            self.stop()
+
+            self.state = self.states.EXITING
+            self.log('Bus EXITING')
+            self.publish('exit')
+            # This isn't strictly necessary, but it's better than seeing
+            # "Waiting for child threads to terminate..." and then nothing.
+            self.log('Bus EXITED')
+        except:
+            # This method is often called asynchronously (whether thread,
+            # signal handler, console handler, or atexit handler), so we
+            # can't just let exceptions propagate out unhandled.
+            # Assume it's been logged and just die.
+            os._exit(70)  # EX_SOFTWARE
+
+        if exitstate is self.states.STARTING:
+            # exit() was called before start() finished, possibly due to
+            # Ctrl-C because a start listener got stuck. In this case,
+            # we could get stuck in a loop where Ctrl-C never exits the
+            # process, so we just call os.exit here.
+            os._exit(70)  # EX_SOFTWARE
+
+    def restart(self):
+        """Restart the process (may close connections).
+
+        This method does not restart the process from the calling thread;
+        instead, it stops the bus and asks the main thread to call execv.
+        """
+        self.execv = True
+        self.exit()
+
+    def graceful(self):
+        """Advise all services to reload."""
+        self.log('Bus graceful')
+        self.publish('graceful')
+
+    def block(self, interval=0.1):
+        """Wait for the EXITING state, KeyboardInterrupt or SystemExit.
+
+        This function is intended to be called only by the main thread.
+        After waiting for the EXITING state, it also waits for all threads
+        to terminate, and then calls os.execv if self.execv is True. This
+        design allows another thread to call bus.restart, yet have the main
+        thread perform the actual execv call (required on some platforms).
+        """
+        try:
+            self.wait(self.states.EXITING, interval=interval, channel='main')
+        except (KeyboardInterrupt, IOError):
+            # The time.sleep call might raise
+            # "IOError: [Errno 4] Interrupted function call" on KBInt.
+            self.log('Keyboard Interrupt: shutting down bus')
+            self.exit()
+        except SystemExit:
+            self.log('SystemExit raised: shutting down bus')
+            self.exit()
+            raise
+
+        # Waiting for ALL child threads to finish is necessary on OS X.
+        # See http://www.cherrypy.org/ticket/581.
+        # It's also good to let them all shut down before allowing
+        # the main thread to call atexit handlers.
+        # See http://www.cherrypy.org/ticket/751.
+        self.log("Waiting for child threads to terminate...")
+        for t in threading.enumerate():
+            if t != threading.currentThread() and t.isAlive():
+                # Note that any dummy (external) threads are always daemonic.
+                if not t.daemon:
+                    self.log("Waiting for thread %s." % t.getName())
+                    t.join()
+
+        if self.execv:
+            self._do_execv()
+
+    def _do_execv(self):
+        """Re-execute the current process.
+
+        This must be called from the main thread, because certain platforms
+        (OS X) don't allow execv to be called in a child thread very well.
+        """
+        args = sys.argv[:]
+        self.log('Re-spawning %s' % ' '.join(args))
+
+        if sys.platform[:4] == 'java':
+            from _systemrestart import SystemRestart
+            raise SystemRestart
+        else:
+            args.insert(0, sys.executable)
+            if sys.platform == 'win32':
+                args = ['"%s"' % arg for arg in args]
+
+            os.chdir(_startup_cwd)
+            if self.max_cloexec_files:
+                self._set_cloexec()
+            os.execv(sys.executable, args)
+
+    def _set_cloexec(self):
+        """Set the CLOEXEC flag on all open files (except stdin/out/err).
+
+        If self.max_cloexec_files is an integer (the default), then on
+        platforms which support it, it represents the max open files setting
+        for the operating system. This function will be called just before
+        the process is restarted via os.execv() to prevent open files
+        from persisting into the new process.
+
+        Set self.max_cloexec_files to 0 to disable this behavior.
+        """
+        for fd in range(3, self.max_cloexec_files):  # skip stdin/out/err
+            try:
+                flags = fcntl.fcntl(fd, fcntl.F_GETFD)
+            except IOError:
+                continue
+            fcntl.fcntl(fd, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC)
+
+    def stop(self):
+        """Stop all services."""
+        self.state = self.states.STOPPING
+        self.log('Bus STOPPING')
+        self.publish('stop')
+        self.state = self.states.STOPPED
+        self.log('Bus STOPPED')
+
+    def start_with_callback(self, func, args=None, kwargs=None):
+        """Start 'func' in a new thread T, then start self (and return T)."""
+        if args is None:
+            args = ()
+        if kwargs is None:
+            kwargs = {}
+        args = (func,) + args
+
+        def _callback(func_, *a, **kw):
+            self.wait(self.states.STARTED)
+            func_(*a, **kw)
+        t = threading.Thread(target=_callback, args=args, kwargs=kwargs)
+        t.setName('Bus Callback ' + t.getName())
+        t.start()
+
+        self.start()
+
+        return t

File magicbus/test/__init__.py

 
     def start(self):
         # Exceptions in the child will be re-raised in the parent,
-        # so if yyou're expecting one, trap this call and check for it.
+        # so if you're expecting one, trap this call and check for it.
         cwd = os.path.abspath(os.path.join(os.path.dirname(__file__), '../..'))
         env = os.environ.copy()
         env['PYTHONPATH'] = cwd

File magicbus/test/test_bus.py

 import time
 import unittest
 
-from magicbus import Bus, ChannelFailures
+from magicbus.base import Bus, ChannelFailures
+from magicbus.process import ProcessBus
 
 
 msg = "Listener %d on channel %s: %s."
         return listener
 
     def test_builtin_channels(self):
-        b = Bus()
+        b = ProcessBus()
 
         self.responses, expected = [], []
 
         return listener
 
     def test_start(self):
-        b = Bus()
+        b = ProcessBus()
         self.log(b)
 
         self.responses = []
             b.exit()
 
     def test_stop(self):
-        b = Bus()
+        b = ProcessBus()
         self.log(b)
 
         self.responses = []
         self.assertLog(['Bus STOPPING', 'Bus STOPPED'])
 
     def test_graceful(self):
-        b = Bus()
+        b = ProcessBus()
         self.log(b)
 
         self.responses = []
         self.assertLog(['Bus graceful'])
 
     def test_exit(self):
-        b = Bus()
+        b = ProcessBus()
         self.log(b)
 
         self.responses = []
             ['Bus STOPPING', 'Bus STOPPED', 'Bus EXITING', 'Bus EXITED'])
 
     def test_wait(self):
-        b = Bus()
+        b = ProcessBus()
 
         def f(method):
             time.sleep(0.2)
                 self.fail("State %r not in %r" % (b.state, states))
 
     def test_block(self):
-        b = Bus()
+        b = ProcessBus()
         self.log(b)
 
         def f():
         self.assertEqual(len(threads), 1)
         # The last message will mention an indeterminable thread name; ignore
         # it
-        self.assertEqual(self._log_entries[:-1],
-                         ['Bus STOPPING', 'Bus STOPPED',
-                          'Bus EXITING', 'Bus EXITED',
-                          'Waiting for child threads to terminate...'])
+        self.assertEqual(
+            [entry for entry in self._log_entries
+             if not entry.startswith("Waiting")],
+            [
+                'Bus STOPPING',
+                'Bus STOPPED',
+                'Bus EXITING',
+                'Bus EXITED',
+            ]
+        )
 
     def test_start_with_callback(self):
-        b = Bus()
+        b = ProcessBus()
         self.log(b)
         try:
             events = []

File magicbus/win32.py

-"""Windows service. Requires pywin32."""
+"""Windows implementations. Requires pywin32."""
 
 import os
 import win32api
 from magicbus import base, plugins
 
 
+class Win32Bus(base.Bus):
+    """A Bus implementation for Win32.
+
+    Instead of time.sleep, this bus blocks using native win32event objects.
+    """
+
+    def __init__(self):
+        self.events = {}
+        super(base.Bus, self).__init__()
+        self.console_control_handler = ConsoleCtrlHandler(self)
+
+    def _get_state_event(self, state):
+        """Return a win32event for the given state (creating it if needed)."""
+        try:
+            return self.events[state]
+        except KeyError:
+            event = win32event.CreateEvent(None, 0, 0,
+                                           "Bus %s Event (pid=%r)" %
+                                           (state.name, os.getpid()))
+            self.events[state] = event
+            return event
+
+    def _get_state(self):
+        return self._state
+
+    def _set_state(self, value):
+        self._state = value
+        event = self._get_state_event(value)
+        win32event.PulseEvent(event)
+    state = property(_get_state, _set_state)
+
+    def wait(self, state, interval=0.1, channel=None):
+        """Wait for the given state(s), KeyboardInterrupt or SystemExit.
+
+        Since this class uses native win32event objects, the interval
+        argument is ignored.
+        """
+        if isinstance(state, (tuple, list)):
+            # Don't wait for an event that beat us to the punch ;)
+            if self.state not in state:
+                events = tuple([self._get_state_event(s) for s in state])
+                win32event.WaitForMultipleObjects(
+                    events, 0, win32event.INFINITE)
+        else:
+            # Don't wait for an event that beat us to the punch ;)
+            if self.state != state:
+                event = self._get_state_event(state)
+                win32event.WaitForSingleObject(event, win32event.INFINITE)
+
+
 class ConsoleCtrlHandler(plugins.SimplePlugin):
+
     """A Bus plugin for handling Win32 console events (like Ctrl-C)."""
 
     def __init__(self, bus):
         return 0
 
 
-class Win32Bus(base.Bus):
-    """A Bus implementation for Win32.
-
-    Instead of time.sleep, this bus blocks using native win32event objects.
-    """
-
-    def __init__(self):
-        self.events = {}
-        base.Bus.__init__(self)
-        self.console_control_handler = ConsoleCtrlHandler(self)
-
-    def _get_state_event(self, state):
-        """Return a win32event for the given state (creating it if needed)."""
-        try:
-            return self.events[state]
-        except KeyError:
-            event = win32event.CreateEvent(None, 0, 0,
-                                           "Bus %s Event (pid=%r)" %
-                                           (state.name, os.getpid()))
-            self.events[state] = event
-            return event
-
-    @property
-    def state(self):
-        return self._state
-
-    @state.setter
-    def state(self, value):
-        self._state = value
-        event = self._get_state_event(value)
-        win32event.PulseEvent(event)
-
-    def wait(self, state, interval=0.1, channel=None):
-        """Wait for the given state(s), KeyboardInterrupt or SystemExit.
-
-        Since this class uses native win32event objects, the interval
-        argument is ignored.
-        """
-        if isinstance(state, (tuple, list)):
-            # Don't wait for an event that beat us to the punch ;)
-            if self.state not in state:
-                events = tuple([self._get_state_event(s) for s in state])
-                win32event.WaitForMultipleObjects(
-                    events, 0, win32event.INFINITE)
-        else:
-            # Don't wait for an event that beat us to the punch ;)
-            if self.state != state:
-                event = self._get_state_event(state)
-                win32event.WaitForSingleObject(event, win32event.INFINITE)
-
+# ----------------------------- Win32 Service ----------------------------- #
 
 class _ControlCodes(dict):
+
     """Control codes used to "signal" a service via ControlService.
 
     User-defined control codes are in the range 128-255. We generally use
 
 
 class PyWebService(win32serviceutil.ServiceFramework):
+
     """Python Web Service."""
 
     _svc_name_ = "Python Web Service"