Source

tweetstream / servercontext.py

import threading
import contextlib
import time
import os
import socket
import random
from functools import partial
from inspect import isclass
from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler
from SimpleHTTPServer import SimpleHTTPRequestHandler
from SocketServer import BaseRequestHandler


class ServerError(Exception):
    pass


class ServerContext(object):
    """Context object with information about a running test server."""

    def __init__(self, address, port):
        self.address = address or "localhost"
        self.port = port

    @property
    def baseurl(self):
        return "http://%s:%s" % (self.address, self.port)

    def __str__(self):
        return "<ServerContext %s >" % self.baseurl

    __repr__ = __str__


class _SilentSimpleHTTPRequestHandler(SimpleHTTPRequestHandler):

    def __init__(self, *args, **kwargs):
        self.logging = kwargs.get("logging", False)
        SimpleHTTPRequestHandler.__init__(self, *args, **kwargs)

    def log_message(self, *args, **kwargs):
        if self.logging:
            SimpleHTTPRequestHandler.log_message(self, *args, **kwargs)


class _TestHandler(BaseHTTPRequestHandler):
    """RequestHandler class that handles requests that use a custom handler
    callable."""

    def __init__(self, handler, methods, *args, **kwargs):
        self._handler = handler
        self._methods = methods
        self._response_sent = False
        self._headers_sent = False
        self.logging = kwargs.get("logging", False)
        BaseHTTPRequestHandler.__init__(self, *args, **kwargs)

    def log_message(self, *args, **kwargs):
        if self.logging:
            BaseHTTPRequestHandler.log_message(self, *args, **kwargs)

    def send_response(self, *args, **kwargs):
        self._response_sent = True
        BaseHTTPRequestHandler.send_response(self, *args, **kwargs)

    def end_headers(self, *args, **kwargs):
        self._headers_sent = True
        BaseHTTPRequestHandler.end_headers(self, *args, **kwargs)

    def _do_whatever(self):
        """Called in place of do_METHOD"""
        data = self._handler(self)

        if hasattr(data, "next"):
            # assume it's something supporting generator protocol
            self._handle_with_iterator(data)
        else:
            # Nothing more to do then.
            pass


    def __getattr__(self, name):
        if name.startswith("do_") and name[3:].lower() in self._methods:
            return self._do_whatever
        else:
            # fixme instance or class?
            raise AttributeError(name)

    def _handle_with_iterator(self, iterator):
        self.connection.settimeout(0.1)
        for data in iterator:
            if not self.server.server_thread.running:
                return

            if not self._response_sent:
                self.send_response(200)
            if not self._headers_sent:
                self.end_headers()

            self.wfile.write(data)
            # flush immediatly. We may want to do trickling writes
            # or something else tha trequires bypassing normal caching
            self.wfile.flush()

class _TestServerThread(threading.Thread):
    """Thread class for a running test server"""

    def __init__(self, handler, methods, cwd, port, address):
        threading.Thread.__init__(self)
        self.startup_finished = threading.Event()
        self._methods = methods
        self._cwd = cwd
        self._orig_cwd = None
        self._handler = self._wrap_handler(handler, methods)
        self._setup()
        self.running = True
        self.serverloc = (address, port)
        self.error = None

    def _wrap_handler(self, handler, methods):
        if isclass(handler) and issubclass(handler, BaseRequestHandler):
            return handler # It's OK. user passed in a proper handler
        elif callable(handler):
            return partial(_TestHandler, handler, methods)
            # it's a callable, so wrap in a req handler
        else:
            raise ServerError("handler must be callable or RequestHandler")

    def _setup(self):
        if self._cwd != "./":
            self._orig_cwd = os.getcwd()
            os.chdir(self._cwd)

    def _init_server(self):
        """Hooks up the server socket"""
        try:
            if self.serverloc[1] == "random":
                retries = 10 # try getting an available port max this many times
                while True:
                    try:
                        self.serverloc = (self.serverloc[0],
                                          random.randint(1025, 49151))
                        self._server = HTTPServer(self.serverloc, self._handler)
                    except socket.error:
                        retries -= 1
                        if not retries: # not able to get a port.
                            raise
                    else:
                        break
            else: # use specific port. this might throw, that's expected
                self._server = HTTPServer(self.serverloc, self._handler)
        except socket.error, e:
            self.running = False
            self.error = e
            # set this here, since we'll never enter the serve loop where
            # it is usually set:
            self.startup_finished.set()
            return

        self._server.allow_reuse_address = True # lots of tests, same port
        self._server.timeout = 0.1
        self._server.server_thread = self


    def run(self):
        self._init_server()

        while self.running:
            self._server.handle_request() # blocks for self.timeout secs
            # First time this falls through, signal the parent thread that
            # the server is ready for incomming connections
            if not self.startup_finished.is_set():
                self.startup_finished.set()

        self._cleanup()

    def stop(self):
        """Stop the server and attempt to make the thread terminate.
        This happens async but the calling code can check periodically
        the isRunning flag on the thread object.
        """
        # actual stopping happens in the run method
        self.running = False

    def _cleanup(self):
        """Do some rudimentary cleanup."""
        if self._orig_cwd:
            os.chdir(self._orig_cwd)


@contextlib.contextmanager
def test_server(handler=_SilentSimpleHTTPRequestHandler, port=8514,
                address="", methods=("get", "head"), cwd="./"):
    """Context that makes available a web server in a separate thread"""
    thread = _TestServerThread(handler=handler, methods=methods, cwd=cwd,
                               port=port, address=address)
    thread.start()

    # fixme: should this be daemonized? If it isn't it will block the entire
    # app, but that should never happen anyway..
    thread.startup_finished.wait()

    if thread.error: # startup failed! Bail, throw whatever the server did
        raise thread.error

    exc = None
    try:
        yield ServerContext(*thread.serverloc)
    except Exception, exc:
        pass
    thread.stop()
    thread.join(5) # giving it a lot of leeway. should never happen

    if exc:
        raise exc

    # fixme: this takes second priorty after the internal exception but would
    # still be nice to signal back to calling code.

    if thread.isAlive():
        raise Warning("Test server could not be stopped")