Commits

Robert Brewer committed 6655429

Moved the thread pool and worker to a new workers subpackage.

  • Participants
  • Parent commits 8c3a2be

Comments (0)

Files changed (4)

File cheroot/server.py

 
 __all__ = ['HTTPRequest', 'HTTPConnection', 'HTTPServer',
            'SizeCheckWrapper', 'KnownLengthRFile', 'ChunkedRFile',
-           'WorkerThread', 'ThreadPool', 'Gateway']
+           'Gateway']
 
 from cheroot._compat import bytestr, unicodestr, basestring, ntob, py3k
 from cheroot._compat import HTTPDate, format_exc, unquote
 FORWARD_SLASH = ntob('/')
 
 import os
-try:
-    import queue
-except:
-    import Queue as queue
 import re
 import socket
 import sys
 else:
     DEFAULT_BUFFER_SIZE = -1
 
-import threading
 import time
-import warnings
+
+from cheroot.workers import threadpool
 
 from cheroot import errors
 if py3k:
             pass
 
 
-class TrueyZero(object):
-    """An object which equals and does math like the integer '0' but evals True."""
-    def __add__(self, other):
-        return other
-    def __radd__(self, other):
-        return other
-trueyzero = TrueyZero()
-
-
-_SHUTDOWNREQUEST = None
-
-class WorkerThread(threading.Thread):
-    """Thread which continuously polls a Queue for Connection objects.
-    
-    Due to the timing issues of polling a Queue, a WorkerThread does not
-    check its own 'ready' flag after it has started. To stop the thread,
-    it is necessary to stick a _SHUTDOWNREQUEST object onto the Queue
-    (one for each running WorkerThread).
-    """
-    
-    conn = None
-    """The current connection pulled off the Queue, or None."""
-    
-    server = None
-    """The HTTP Server which spawned this thread, and which owns the
-    Queue and is placing active connections into it."""
-    
-    ready = False
-    """A simple flag for the calling server to know when this thread
-    has begun polling the Queue."""
-    
-    
-    def __init__(self, server):
-        self.ready = False
-        self.server = server
-        
-        self.requests_seen = 0
-        self.bytes_read = 0
-        self.bytes_written = 0
-        self.start_time = None
-        self.work_time = 0
-        self.stats = {
-            'Requests': lambda s: self.requests_seen + ((self.start_time is None) and trueyzero or self.conn.requests_seen),
-            'Bytes Read': lambda s: self.bytes_read + ((self.start_time is None) and trueyzero or self.conn.rfile.bytes_read),
-            'Bytes Written': lambda s: self.bytes_written + ((self.start_time is None) and trueyzero or self.conn.wfile.bytes_written),
-            'Work Time': lambda s: self.work_time + ((self.start_time is None) and trueyzero or time.time() - self.start_time),
-            'Read Throughput': lambda s: s['Bytes Read'](s) / (s['Work Time'](s) or 1e-6),
-            'Write Throughput': lambda s: s['Bytes Written'](s) / (s['Work Time'](s) or 1e-6),
-        }
-        threading.Thread.__init__(self)
-    
-    def run(self):
-        self.server.stats['Worker Threads'][self.getName()] = self.stats
-        try:
-            self.ready = True
-            while True:
-                conn = self.server.requests.get()
-                if conn is _SHUTDOWNREQUEST:
-                    return
-                
-                self.conn = conn
-                if self.server.stats['Enabled']:
-                    self.start_time = time.time()
-                try:
-                    conn.communicate()
-                finally:
-                    conn.close()
-                    if self.server.stats['Enabled']:
-                        self.requests_seen += self.conn.requests_seen
-                        self.bytes_read += self.conn.rfile.bytes_read
-                        self.bytes_written += self.conn.wfile.bytes_written
-                        self.work_time += time.time() - self.start_time
-                        self.start_time = None
-                    self.conn = None
-        except (KeyboardInterrupt, SystemExit):
-            exc = sys.exc_info()[1]
-            self.server.interrupt = exc
-
-
-class ThreadPool(object):
-    """A Request Queue for an HTTPServer which pools threads.
-    
-    ThreadPool objects must provide min, get(), put(obj), start()
-    and stop(timeout) attributes.
-    """
-    
-    def __init__(self, server, min=10, max=-1):
-        self.server = server
-        self.min = min
-        self.max = max
-        self._threads = []
-        self._queue = queue.Queue()
-        self.get = self._queue.get
-    
-    def start(self):
-        """Start the pool of threads."""
-        for i in range(self.min):
-            self._threads.append(WorkerThread(self.server))
-        for worker in self._threads:
-            worker.setName("CP Server " + worker.getName())
-            worker.start()
-        for worker in self._threads:
-            while not worker.ready:
-                time.sleep(.1)
-    
-    def _get_idle(self):
-        """Number of worker threads which are idle. Read-only."""
-        return len([t for t in self._threads if t.conn is None])
-    idle = property(_get_idle, doc=_get_idle.__doc__)
-    
-    def put(self, obj):
-        self._queue.put(obj)
-        if obj is _SHUTDOWNREQUEST:
-            return
-    
-    def grow(self, amount):
-        """Spawn new worker threads (not above self.max)."""
-        for i in range(amount):
-            if self.max > 0 and len(self._threads) >= self.max:
-                break
-            worker = WorkerThread(self.server)
-            worker.setName("CP Server " + worker.getName())
-            self._threads.append(worker)
-            worker.start()
-    
-    def shrink(self, amount):
-        """Kill off worker threads (not below self.min)."""
-        # Grow/shrink the pool if necessary.
-        # Remove any dead threads from our list
-        for t in self._threads:
-            if not t.isAlive():
-                self._threads.remove(t)
-                amount -= 1
-        
-        if amount > 0:
-            for i in range(min(amount, len(self._threads) - self.min)):
-                # Put a number of shutdown requests on the queue equal
-                # to 'amount'. Once each of those is processed by a worker,
-                # that worker will terminate and be culled from our list
-                # in self.put.
-                self._queue.put(_SHUTDOWNREQUEST)
-    
-    def stop(self, timeout=5):
-        # Must shut down threads here so the code that calls
-        # this method can know when all threads are stopped.
-        for worker in self._threads:
-            self._queue.put(_SHUTDOWNREQUEST)
-        
-        # Don't join currentThread (when stop is called inside a request).
-        current = threading.currentThread()
-        if timeout and timeout >= 0:
-            endtime = time.time() + timeout
-        while self._threads:
-            worker = self._threads.pop()
-            if worker is not current and worker.isAlive():
-                try:
-                    if timeout is None or timeout < 0:
-                        worker.join()
-                    else:
-                        remaining_time = endtime - time.time()
-                        if remaining_time > 0:
-                            worker.join(remaining_time)
-                        if worker.isAlive():
-                            # We exhausted the timeout.
-                            # Forcibly shut down the socket.
-                            c = worker.conn
-                            if c and not c.rfile.closed:
-                                try:
-                                    c.socket.shutdown(socket.SHUT_RD)
-                                except TypeError:
-                                    # pyOpenSSL sockets don't take an arg
-                                    c.socket.shutdown()
-                            worker.join()
-                except (AssertionError,
-                        # Ignore repeated Ctrl-C.
-                        # See http://www.cherrypy.org/ticket/691.
-                        KeyboardInterrupt):
-                    pass
-    
-    def _get_qsize(self):
-        return self._queue.qsize()
-    qsize = property(_get_qsize)
-
-
-
 try:
     import fcntl
 except ImportError:
         self.bind_addr = bind_addr
         self.gateway = gateway
         
-        self.requests = ThreadPool(self, min=minthreads or 1, max=maxthreads)
+        self.requests = threadpool.ThreadPool(
+            self, min=minthreads or 1, max=maxthreads)
         
         if not server_name:
             server_name = socket.gethostname()

File cheroot/workers/__init__.py

Empty file added.

File cheroot/workers/threadpool.py

+"""A thread-based worker pool."""
+
+__all__ = ['WorkerThread', 'ThreadPool']
+
+try:
+    import queue
+except:
+    import Queue as queue
+import socket
+import sys
+import threading
+import time
+
+from cheroot import errors
+
+
+class TrueyZero(object):
+    """An object which equals and does math like the integer '0' but evals True."""
+    def __add__(self, other):
+        return other
+    def __radd__(self, other):
+        return other
+trueyzero = TrueyZero()
+
+
+_SHUTDOWNREQUEST = None
+
+class WorkerThread(threading.Thread):
+    """Thread which continuously polls a Queue for Connection objects.
+    
+    Due to the timing issues of polling a Queue, a WorkerThread does not
+    check its own 'ready' flag after it has started. To stop the thread,
+    it is necessary to stick a _SHUTDOWNREQUEST object onto the Queue
+    (one for each running WorkerThread).
+    """
+    
+    conn = None
+    """The current connection pulled off the Queue, or None."""
+    
+    server = None
+    """The HTTP Server which spawned this thread, and which owns the
+    Queue and is placing active connections into it."""
+    
+    ready = False
+    """A simple flag for the calling server to know when this thread
+    has begun polling the Queue."""
+
+    def __init__(self, server):
+        self.ready = False
+        self.server = server
+
+        self.requests_seen = 0
+        self.bytes_read = 0
+        self.bytes_written = 0
+        self.start_time = None
+        self.work_time = 0
+        self.stats = {
+            'Requests': lambda s: self.requests_seen + ((self.start_time is None) and trueyzero or self.conn.requests_seen),
+            'Bytes Read': lambda s: self.bytes_read + ((self.start_time is None) and trueyzero or self.conn.rfile.bytes_read),
+            'Bytes Written': lambda s: self.bytes_written + ((self.start_time is None) and trueyzero or self.conn.wfile.bytes_written),
+            'Work Time': lambda s: self.work_time + ((self.start_time is None) and trueyzero or time.time() - self.start_time),
+            'Read Throughput': lambda s: s['Bytes Read'](s) / (s['Work Time'](s) or 1e-6),
+            'Write Throughput': lambda s: s['Bytes Written'](s) / (s['Work Time'](s) or 1e-6),
+        }
+        threading.Thread.__init__(self)
+    
+    def run(self):
+        self.server.stats['Worker Threads'][self.getName()] = self.stats
+        try:
+            self.ready = True
+            while True:
+                conn = self.server.requests.get()
+                if conn is _SHUTDOWNREQUEST:
+                    return
+                
+                self.conn = conn
+                if self.server.stats['Enabled']:
+                    self.start_time = time.time()
+                try:
+                    conn.communicate()
+                finally:
+                    conn.close()
+                    if self.server.stats['Enabled']:
+                        self.requests_seen += self.conn.requests_seen
+                        self.bytes_read += self.conn.rfile.bytes_read
+                        self.bytes_written += self.conn.wfile.bytes_written
+                        self.work_time += time.time() - self.start_time
+                        self.start_time = None
+                    self.conn = None
+        except (KeyboardInterrupt, SystemExit):
+            exc = sys.exc_info()[1]
+            self.server.interrupt = exc
+
+
+class ThreadPool(object):
+    """A Request Queue for an HTTPServer which pools threads.
+    
+    ThreadPool objects must provide min, get(), put(obj), start()
+    and stop(timeout) attributes.
+    """
+    
+    def __init__(self, server, min=10, max=-1):
+        self.server = server
+        self.min = min
+        self.max = max
+        self._threads = []
+        self._queue = queue.Queue()
+        self.get = self._queue.get
+    
+    def start(self):
+        """Start the pool of threads."""
+        for i in range(self.min):
+            self._threads.append(WorkerThread(self.server))
+        for worker in self._threads:
+            worker.setName("CP Server " + worker.getName())
+            worker.start()
+        for worker in self._threads:
+            while not worker.ready:
+                time.sleep(.1)
+    
+    def _get_idle(self):
+        """Number of worker threads which are idle. Read-only."""
+        return len([t for t in self._threads if t.conn is None])
+    idle = property(_get_idle, doc=_get_idle.__doc__)
+    
+    def put(self, obj):
+        self._queue.put(obj)
+        if obj is _SHUTDOWNREQUEST:
+            return
+    
+    def grow(self, amount):
+        """Spawn new worker threads (not above self.max)."""
+        for i in range(amount):
+            if self.max > 0 and len(self._threads) >= self.max:
+                break
+            worker = WorkerThread(self.server)
+            worker.setName("CP Server " + worker.getName())
+            self._threads.append(worker)
+            worker.start()
+    
+    def shrink(self, amount):
+        """Kill off worker threads (not below self.min)."""
+        # Grow/shrink the pool if necessary.
+        # Remove any dead threads from our list
+        for t in self._threads:
+            if not t.isAlive():
+                self._threads.remove(t)
+                amount -= 1
+        
+        if amount > 0:
+            for i in range(min(amount, len(self._threads) - self.min)):
+                # Put a number of shutdown requests on the queue equal
+                # to 'amount'. Once each of those is processed by a worker,
+                # that worker will terminate and be culled from our list
+                # in self.put.
+                self._queue.put(_SHUTDOWNREQUEST)
+    
+    def stop(self, timeout=5):
+        # Must shut down threads here so the code that calls
+        # this method can know when all threads are stopped.
+        for worker in self._threads:
+            self._queue.put(_SHUTDOWNREQUEST)
+        
+        # Don't join currentThread (when stop is called inside a request).
+        current = threading.currentThread()
+        if timeout and timeout >= 0:
+            endtime = time.time() + timeout
+        while self._threads:
+            worker = self._threads.pop()
+            if worker is not current and worker.isAlive():
+                try:
+                    if timeout is None or timeout < 0:
+                        worker.join()
+                    else:
+                        remaining_time = endtime - time.time()
+                        if remaining_time > 0:
+                            worker.join(remaining_time)
+                        if worker.isAlive():
+                            # We exhausted the timeout.
+                            # Forcibly shut down the socket.
+                            c = worker.conn
+                            if c and not c.rfile.closed:
+                                try:
+                                    c.socket.shutdown(socket.SHUT_RD)
+                                except TypeError:
+                                    # pyOpenSSL sockets don't take an arg
+                                    c.socket.shutdown()
+                            worker.join()
+                except (AssertionError,
+                        # Ignore repeated Ctrl-C.
+                        # See http://www.cherrypy.org/ticket/691.
+                        KeyboardInterrupt):
+                    pass
+    
+    def _get_qsize(self):
+        return self._queue.qsize()
+    qsize = property(_get_qsize)
+

File cheroot/wsgi.py

 """WSGI gateways for the Cheroot HTTP server."""
 
-import socket
 import sys
 
-from cheroot.server import HTTPServer, ThreadPool, Gateway
+from cheroot.server import HTTPServer, Gateway
 from cheroot._compat import basestring, ntob, ntou, tonative, py3k, unicodestr