Commits

Anonymous committed 1007dfe

added WSGIThreadPoolServer. Provides much better performance than
WSGIServer. paste.httpserver.serve now uses the thread pooled server
by default. Users can set the number of worker threads (defaults to 10)
or switch back to the normal WSGIServer with config file toggles.
submitted by: james@jamestaylor.org (Thanks!)
resolves: #112

  • Participants
  • Parent commits ce0f4d2

Comments (0)

Files changed (2)

File docs/news.txt

   and PkgResourcesParser where, with some servers, you could escape
   the document root.
 
+* Significantly improved ``paste.httpserver``'s (egg:Paste#http)
+  performance. It now uses a thread pool: previously it created a new
+  thread for every request. To revert back to the old, slower behavior,
+  set:
+
+  use_threadpool = false
+
+  in the [server:main] section of the config file.
+
 * More control of where the output of ``paste.debug.prints`` goes.
 
 * Added a warning to ``paste.wsgilib.add_close`` if the upstream

File paste/httpserver.py

 # @@: add support for chunked encoding, this is not a 1.1 server
 #     till this is completed.
 
+import errno, socket, sys, threading, urlparse, Queue
 from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
 from SocketServer import ThreadingMixIn
-import urlparse, sys, socket
+from paste.deploy import converters
 
 __all__ = ['WSGIHandlerMixin','WSGIServer','WSGIHandler', 'serve']
 __version__ = "0.5"
         except SocketErrors, exce:
             self.wsgi_connection_drop(exce)
 
-class WSGIServer(ThreadingMixIn, SecureHTTPServer):
-    daemon_threads = False
+class ThreadPool(object):
+    """
+    Generic thread pool with a queue of callables to consume.
+    """
+    SHUTDOWN = object()
 
+    def __init__(self, nworkers, name="ThreadPool"):
+        """
+        Create thread pool with `nworkers` worker threads.
+        """
+        self.nworkers = nworkers
+        self.name = name
+        self.queue = Queue.Queue()
+        self.workers = []
+        for i in range(self.nworkers):
+            worker = threading.Thread(target=self.worker_thread_callback,
+                                      name=("%s worker %d" % (self.name, i)))
+            worker.start()
+            self.workers.append(worker)
+
+    def worker_thread_callback(self):
+        """
+        Worker thread should call this method to get and process queued
+        callables.
+        """
+        while True:
+            runnable = self.queue.get()
+            if runnable is ThreadPool.SHUTDOWN:
+                return
+            else:
+                runnable()
+
+    def shutdown(self):
+        """
+        Shutdown the queue (after finishing any pending requests).
+        """
+        # Add a shutdown request for every worker
+        for i in range(self.nworkers):
+            self.queue.put(ThreadPool.SHUTDOWN)
+        # Wait for each thread to terminate
+        for worker in self.workers:
+            worker.join()
+
+class ThreadPoolMixIn:
+    """
+    Mix-in class to process requests from a thread pool
+    """
+    def __init__(self, nworkers):
+        # Create and start the workers
+        self.running = True
+        assert nworkers > 0, "ThreadPoolMixin servers must have at least one worker"
+        self.thread_pool = ThreadPool(nworkers,
+            "ThreadPoolMixin HTTP server on %s:%d"
+                % (self.server_name, self.server_port))
+
+    def process_request(self, request, client_address):
+        """
+        Queue the request to be processed by on of the thread pool threads
+        """
+        # This sets the socket to blocking mode (and no timeout) since it
+        # may take the thread pool a little while to get back to it. (This
+        # is the default but since we set a timeout on the parent socket so
+        # that we can trap interrupts we need to restore this,.)
+        request.setblocking(1)
+        # Queue processing of the request
+        self.thread_pool.queue.put(
+            lambda: self.process_request_in_thread(request, client_address))
+
+    def process_request_in_thread(self, request, client_address):
+        """
+        The worker thread should call back here to do the rest of the
+        request processing. Error handling normaller done in 'handle_request'
+        must be done here.
+        """
+        try:
+            self.finish_request(request, client_address)
+            self.close_request(request)
+        except:
+            self.handle_error(request, client_address)
+            self.close_request(request)
+
+    def serve_forever(self):
+        """
+        Overrides `serve_forever` to shut the threadpool down cleanly.
+        """
+        try:
+            while self.running:
+                try:
+                    self.handle_request()
+                except socket.timeout:
+                    # Timeout is expected, gives interrupts a chance to
+                    # propogate, just keep handling
+                    pass
+        finally:
+            self.thread_pool.shutdown()
+
+    def server_activate(self):
+        """
+        Overrides server_activate to set timeout on our listener socket.
+        """
+        # We set the timeout here so that we can trap interrupts on windows
+        self.socket.settimeout(1)
+        self.socket.listen(self.request_queue_size)
+
+    def server_close(self):
+        """
+        Finish pending requests and shutdown the server.
+        """
+        self.running = False
+        self.socket.close()
+
+class WSGIServerBase(SecureHTTPServer):
     def __init__(self, wsgi_application, server_address,
                  RequestHandlerClass=None, ssl_context=None):
         SecureHTTPServer.__init__(self, server_address,
             conn.settimeout(self.wsgi_socket_timeout)
         return (conn, info)
 
+class WSGIServer(ThreadingMixIn, WSGIServerBase):
+    daemon_threads = False
+
+class WSGIThreadPoolServer(ThreadPoolMixIn, WSGIServerBase):
+    def __init__(self, wsgi_application, server_address,
+                 RequestHandlerClass=None, ssl_context=None, nworkers=10):
+        WSGIServerBase.__init__(self, wsgi_application, server_address,
+                                RequestHandlerClass, ssl_context)
+        ThreadPoolMixIn.__init__(self, nworkers)
+
 def serve(application, host=None, port=None, handler=None, ssl_pem=None,
           server_version=None, protocol_version=None, start_loop=True,
-          daemon_threads=None, socket_timeout=None):
+          daemon_threads=None, socket_timeout=None, use_threadpool=True,
+          threadpool_workers=10):
     """
     Serves your ``application`` over HTTP(S) via WSGI interface
 
         disconnect, but at a later time it might follow the RFC a bit
         more closely.
 
+    ``use_threadpool``
+
+        Server requests from a pool of worker threads (``threadpool_workers``)
+        rather than creating a new thread for each request. This can
+        substantially reduce latency since there is a high cost associated
+        with thread creation.
+
+    ``threadpool_workers``
+
+        Number of worker threads to create when ``use_threadpool`` is true. This
+        can be a string or an integer value.
     """
     ssl_context = None
     if ssl_pem:
         assert protocol_version in ('HTTP/0.9','HTTP/1.0','HTTP/1.1')
         handler.protocol_version = protocol_version
 
-    server = WSGIServer(application, server_address, handler, ssl_context)
-    if daemon_threads:
-        server.daemon_threads = daemon_threads
+
+    if converters.asbool(use_threadpool):
+        server = WSGIThreadPoolServer(application, server_address, handler,
+                                      ssl_context, int(threadpool_workers))
+    else:
+        server = WSGIServer(application, server_address, handler, ssl_context)
+        if daemon_threads:
+            server.daemon_threads = daemon_threads
+
     if socket_timeout:
         server.wsgi_socket_timeout = int(socket_timeout)
 
-    if start_loop:
+    if converters.asbool(start_loop):
         print "serving on %s:%s" % server.server_address
         try:
             server.serve_forever()
     #serve(dump_environ, ssl_pem="test.pem")
     serve(dump_environ, server_version="Wombles/1.0",
           protocol_version="HTTP/1.1", port="8888")
-