Commits

__russ__ committed dc6e774

Added test_rawtcp to check concurrent connections
started split of HTTP objects so they derive from TCP objects (TCP/HTTPConnection and TCP/HTTPRequest)
About to start HTTPServer/TCPServer split

Comments (0)

Files changed (3)

cheroot/dprint.py

-if 0: #flip this between 1 and 0 (true/false) to enable/disable dprinting
+if 1: #flip this between 1 and 0 (true/false) to enable/disable dprinting
     def dprint(msg, skipnewline = False):
         if skipnewline:
             print msg,

cheroot/server.py

 import select
 import collections
 import struct
+import threading
 
 MSG_PEEK = socket.MSG_PEEK         #character saver
 MSG_DONTWAIT = socket.MSG_DONTWAIT #character saver
             line = self.readline(sizehint)
 
 
-class HTTPRequest(object):
-    """An HTTP Request (and response).
+class TCPRequest(object):
+    """Base handler class for all TCP connections.  Must be subclassed.
     
-    A single HTTP connection may consist of multiple request/response pairs.
+    At a minimum, 'parse_request' and 'respond' must be implemented.
+    
+    A single TCP connection may consist of multiple request/response pairs (if
+    the main server has recycle_threads==False).
+    
     """
     
     server = None
-    """The HTTPServer object which is receiving this request."""
+    """The server object which is receiving this request."""
     
     conn = None
-    """The HTTPConnection object on which this request connected."""
-    
-    inheaders = {}
-    """A dict of request headers."""
-    
-    outheaders = []
-    """A list of header tuples to write in the response."""
+    """The TCPConnection object on which this request connected."""
     
     ready = False
     """When True, the request has been parsed and is ready to begin generating
     not imply an error! The client and/or server may each request that the
     connection be closed."""
     
+    def __init__(self, server, conn):
+        self.server = server
+        self.conn = conn
+        
+        self.ready = False
+        self.started_request = False
+
+    def parse_request(self):
+        """Parse incoming TCP data.  Must be overridden.
+        
+        The socket is available through self.conn.socket.
+        
+        This method can completely handle lengthy bidrectional communication
+        if you want, but a worker thread will be consumed during the duration
+        of this call.
+        
+        Note that self.respond is called automatically by the server after
+        parse_request is completed, unless self.ready is set to False in this
+        function.
+        
+        """
+        raise NotImplementedError
+    
+    def respond(self):
+        """Respond to the client.
+        
+        Really, this is just a function called by the server after
+        parse_request is completed (as long as self.ready == True on
+        completion). Depending on the implementation of the higher level
+        protocol, it is ok to do nothing here and handle the entire
+        communication in parse_request.
+        
+        See parse_request for further notes.
+        
+        """
+        raise NotImplementedError
+
+class HTTPRequest(TCPRequest):
+    """An HTTP Request (and response).
+    
+    A single HTTP connection may consist of multiple request/response pairs
+    (if the server has recycle_threads == False).
+    """
+    
+    inheaders = {}
+    """A dict of request headers."""
+    
+    outheaders = []
+    """A list of header tuples to write in the response."""
+    
     chunked_write = False
     """If True, output will be encoded with the "chunked" transfer-coding.
     
     This value is set automatically inside send_headers."""
     
     def __init__(self, server, conn):
-        self.server= server
-        self.conn = conn
+        TCPRequest.__init__(self, server, conn)
         
-        self.ready = False
-        self.started_request = False
         self.scheme = ntob("http")
         if self.server.ssl_adapter is not None:
             self.scheme = ntob("https")
             self.simple_response("414 Request-URI Too Long",
                 "The Request-URI sent with the request exceeds the maximum "
                 "allowed bytes.")
+            self.conn.one_request_seen = True
             return
         else:
             if not success:
+                self.conn.one_request_seen = True
                 return
         
         try:
             self.simple_response("413 Request Entity Too Large",
                 "The headers sent with the request exceed the maximum "
                 "allowed bytes.")
+            self.conn.one_request_seen = True
             return
         else:
             if not success:
+                self.conn.one_request_seen = True
                 return
         
+        self.conn.one_request_seen = True
         self.ready = True
     
     def read_request_line(self):
         buf.append(CRLF)
         write(self.conn.wfile, EMPTY.join(buf))
 
+class TCPConnection(object):
+    """A TCP connection (active socket).
+    
+    server: the Server object which received this connection.
+    socket: the raw socket object (usually TCP) for this connection.
+    
+    """
+    ssl_env = None
+    rbufsize = DEFAULT_BUFFER_SIZE
+    wbufsize = DEFAULT_BUFFER_SIZE
+    RequestHandlerClass = TCPRequest
+    
+    def __init__(self, server, sock):
+        self.server = server
+        self.socket = sock
+        self.requests_seen = 0
+        self.remote_addr = None
+        self.remote_port = None
+        self.linger = False
+    
+    
+    def communicate(self, recycle_threads):
+        """Read each request and respond appropriately.
+        
+        Returns true if the connection should be closed when done.
+        
+        When recycle_threads==False, the return value is meaningless since the
+        calling logic will always close the connection in this mode. In this
+        mode, this method continually processes incoming requests, not
+        returning until the client closes the connection or an error occurs.
+        During this time, the current Worker thread can do nothing else
+        (idling between requests).
+        
+        If recycle_threads==True, the boolean return value is very important.
+        Determination of this return value is via the close_connection
+        property of the RequestHandlerClass in use, or is True in the event of
+        error. If the connection is to be kept alive, once the request is
+        completely handled the socket is returned to the main server to
+        monitor for future incoming requests (at which time the request will
+        be given to any available Worker).
+        
+        """
+        dprint("COMMUNICATE STARTED!!!")
+        #NOTE: the loop in this routine has been kept in order to replicate
+        #legacy operation where recycle_threads=False.
+        try:
+            while True:
+                req = None #this is pointless if recycle_threads=True
+                req = self.RequestHandlerClass(self.server, self)
+                
+                # This order of operations should guarantee correct pipelining.
+                req.parse_request()
+                if self.server.stats['Enabled']:
+                    self.requests_seen += 1
+                if not req.ready:
+                    # Something went wrong in the parsing (and the server has
+                    # probably already made a simple_response). Return and
+                    # let the conn close.
+                    return True #indicates to force socket close
+                
+                req.respond()
+                if recycle_threads:
+                    dprint("** communicate RETURNING %d from fd %d" % \
+                           (req.close_connection, self.socket.fileno()))
+                    return req.close_connection
+                else:
+                    if req.close_connection:
+                        return True #indicates to force socket close
+        
+        except socket.error:
+            e = sys.exc_info()[1]
+            return self.handle_exc_socket_error(e, req)
+        except KeyboardInterrupt:
+            e = sys.exc_info()[1]
+            return self.handle_exc_KeyboardInterrupt(e, req)
+        except SystemExit:
+            e = sys.exc_info()[1]
+            return self.handle_exc_SystemExit(e, req)
+        except errors.FatalSSLAlert:
+            e = sys.exc_info()[1]
+            return self.handle_exc_FatalSSLAlert(e, req)
+        except errors.NoSSLError:
+            e = sys.exc_info()[1]
+            return self.handle_exc_NoSSLError(e, req)
+        except Exception:
+            e = sys.exc_info()[1]
+            return self.handle_exc_Exception(e, req)
+    
+    def handle_exc_KeyboardInterupt(self, exc, req):
+        #Generically, this is managed by the server for proper shutdown.
+        raise exc
+    
+    def handle_exc_SystemExit(self, exc, req):
+        #Generically, this is managed by the server for proper shutdown.
+        raise exc
+    
+    def _generic_exc_handler(self, exc, req):
+        self.server.error_log(repr(exc), level=logging.ERROR, traceback=True)
+        self.ready = False #ensures no subsequent respond() call, if relevant
+        return True #close the connection
+    
+    #For raw TCP, we have no protocol with which to handle any other
+    #exceptions. Your implementation must handle these.
+    handle_exc_socket_error = _generic_exc_handler
+    handle_exc_FatalSSLAlert = _generic_exc_handler
+    handle_exc_NoSSLError = _generic_exc_handler
+    handle_exc_Exception = _generic_exc_handler
+    
+    def close(self):
+        """Close the socket underlying this connection."""
+        if not self.linger:
+            # Python's socket module does NOT call close on the kernel socket
+            # when you call socket.close(). We do so manually here because we
+            # want this server to send a FIN TCP segment immediately. Note this
+            # must be called *before* calling socket.close(), because the latter
+            # drops its reference to the kernel socket.
+            # Python 3 *probably* fixed this with socket._real_close; hard to tell.
+            if not py3k:
+                if hasattr(self.socket, '_sock'):
+                    self.socket._sock.close()
+            self.socket.close()
+        else:
+            # On the other hand, sometimes we want to hang around for a bit
+            # to make sure the client has a chance to read our entire
+            # response. Skipping the close() calls here delays the FIN
+            # packet until the socket object is garbage-collected later.
+            # Someday, perhaps, we'll do the full lingering_close that
+            # Apache does, but not today.
+            pass
 
-class HTTPConnection(object):
+class HTTPConnection(TCPConnection):
+    """An HTTP connection (active socket).
+    
+    server: the Server object which received this connection.
+    socket: the raw socket object (usually TCP) for this connection.
+    makefile: a class for reading from the socket.
+    """
+    ssl_env = None
+    rbufsize = DEFAULT_BUFFER_SIZE
+    wbufsize = DEFAULT_BUFFER_SIZE
+    RequestHandlerClass = HTTPRequest
+    
+    def __init__(self, server, sock, makefile=makefile):
+        self.server = server
+        self.socket = sock
+        self.requests_seen = 0
+        self.remote_addr = None
+        self.remote_port = None
+        self.linger = False
+        
+        self.rfile = makefile(sock, "rb", self.rbufsize)
+        self.wfile = makefile(sock, "wb", self.wbufsize)
+        self.one_request_seen = False
+    
+    def handle_exc_socket_error(self, exc, req):
+        errnum = exc.args[0]
+        # sadly SSL sockets return a different (longer) time out string
+        if errnum == 'timed out' or errnum == 'The read operation timed out':
+            # Don't error if we're between requests; only error
+            # if 1) no request has been started at all, or 2) we're
+            # in the middle of a request.
+            # See http://www.cherrypy.org/ticket/853
+            if (not self.one_request_seen) or (req and req.started_request):
+                # Don't bother writing the 408 if the response
+                # has already started being written.
+                if req and not req.sent_headers:
+                    try:
+                        req.simple_response("408 Request Timeout")
+                    except errors.FatalSSLAlert:
+                        # Close the connection.
+                        return True
+        elif errnum not in errors.socket_errors_to_ignore:
+            self.server.error_log("socket.error %s" % repr(errnum),
+                                  level=logging.WARNING, traceback=True)
+            if req and not req.sent_headers:
+                try:
+                    req.simple_response("500 Internal Server Error")
+                except errors.FatalSSLAlert:
+                    # Close the connection.
+                    return True
+        return True #close the connection
+    
+    def handle_exc_FatalSSLAlert(self, exc, req):
+        # Close the connection.
+        return True
+    
+    def handle_exc_NoSSLError(self, exc, req):
+        msg = ("The client sent a plain HTTP request, but "
+               "this server only speaks HTTPS on this port.")
+        self.server.error_log(msg)
+
+        if req and not req.sent_headers:
+            # Unwrap our wfile
+            self.wfile = makefile(self.socket._sock, "wb", self.wbufsize)
+            req.simple_response("400 Bad Request", msg)
+            #setting linger=True means that when we close the socket... don't!
+            #Or at least not explicitly. Intent here is so that the response
+            #makes it back to the client. See Connection.close().
+            self.linger = True 
+        return True #close the connection
+    
+    def handle_exc_Exception(self, exc, req):
+        self.server.error_log(repr(exc), level=logging.ERROR, traceback=True)
+        if req and not req.sent_headers:
+            try:
+                req.simple_response("500 Internal Server Error")
+            except errors.FatalSSLAlert:
+                # Close the connection.
+                return True
+        return True #close the connection
+    
+    
+    def close(self):
+        """Close the socket underlying this connection."""
+        self.rfile.close()
+        TCPConnection.close(self)
+
+class old_HTTPConnection(object):
     """An HTTP connection (active socket).
     
     server: the Server object which received this connection.
     makefile: a class for reading from the socket.
     """
     
-    remote_addr = None
-    remote_port = None
     ssl_env = None
     rbufsize = DEFAULT_BUFFER_SIZE
     wbufsize = DEFAULT_BUFFER_SIZE
         self.rfile = makefile(sock, "rb", self.rbufsize)
         self.wfile = makefile(sock, "wb", self.wbufsize)
         self.requests_seen = 0
+        self.remote_addr = None
+        self.remote_port = None
     
     def communicate(self, recycle_threads):
         """Read each request and respond appropriately.
             # Apache does, but not today.
             pass
 
-
 try:
     import fcntl
 except ImportError:
 
         self.clear_stats()
         
+        self._socket_checking_complete = threading.Event()
+        self._socket_checking_complete.set() #in case of exit prior to starting!
+        
         self._recycle_threads = recycle_threads
         if recycle_threads == False:
             self._get_needy_connection_info = self._accept
         
         self.ready = True
         self._start_time = time.time()
-        while self.ready:
-            try:
-                self.tick()
-            except (KeyboardInterrupt, SystemExit):
-                raise
-            except:
-                self.error_log("Error in HTTPServer.tick", level=logging.ERROR,
-                               traceback=True)
-            
-            if self.interrupt:
-                while self.interrupt is True:
-                    # Wait for self.stop() to complete. See _set_interrupt.
-                    time.sleep(0.1)
+        try:
+            self._socket_checking_complete.clear()
+            while self.ready:
+                try:
+                    self.tick()
+                except (KeyboardInterrupt, SystemExit):
+                    raise
+                except:
+                    self.error_log("Error in HTTPServer.tick",
+                                   level=logging.ERROR,
+                                   traceback=True)
+                
                 if self.interrupt:
-                    raise self.interrupt
+                    #self.interrupt setter calls stop internally, which needs the
+                    #_socket_checking_complete event set (and we're done).
+                    self._socket_checking_complete.set()
+                    while self.interrupt is True:
+                        # Wait for self.stop() to complete. See _set_interrupt.
+                        time.sleep(0.1)
+                    if self.interrupt:
+                        raise self.interrupt
+        finally:
+            self._socket_checking_complete.set()
 
     def error_log(self, msg="", level=20, traceback=False):
         # Override this in subclasses as desired
             pass
         
         try:
-            pollret = self._poller.poll(1000)
+            pollret = self._poller.poll(250) #TODO: determine a decent poll timeout
             dprint("pollret = %r" % pollret)
         except Exception as exc:
             dprint("CAUGHT poll() EXCEPTION: %r" % exc)
                 #The thread will have sent the relevant fd (as a 32 bit int,
                 #just in case) and it needs to be registered again.
                 returned_fd = recv_int32(sock)
+                if returned_fd == 0:
+                    #0 is special/safe signal to quit this loop immediately
+                    raise NoNeedySockets
                 #dprint("SOCKET WITH fd %d STARTING TO MONITOR AGAIN" % returned_fd)
                 self._poller.register(returned_fd, select.POLLIN)
                 
                          doc="Set this to an Exception instance to "
                              "interrupt the server.")
     
+    
+    def _stop_polling_loop(self):
+        """exit the polling loop as soon as possible."""
+        #Sending 0 is a signal down the worker->main pipe signals to leave
+        threadpool.send_int32(self._thread_comm_pipe__threadsock, 0)
+        
     def stop(self):
         """Gracefully shutdown a server that is serving forever."""
         dprint("************* SERVER STOP REQUESTED *******************")
         
         sock = getattr(self, "socket", None)
         if sock:
+            if self._recycle_threads:
+                #Exit the polling loop
+                self._stop_polling_loop()
+            
             if not isinstance(self.bind_addr, basestring):
                 # Touch our own socket to make accept() return immediately.
                 try:
                 sock.close()
             self.socket = None
         
+        #shut down any active worker threads
         self.requests.stop(self.shutdown_timeout)
-        dprint("************* SERVER SHUT DOWN *******************")
+        
+        #Ensure that we're definitely out of the main socket checking loop...
+        self._socket_checking_complete.wait(2)
+        if not self._socket_checking_complete.isSet():
+            print "GIVING UP ON EVENT!!"
+        
+        if self._recycle_threads:
+            #Close all active sockets, now that we're sure that the workers will
+            #not alter the dict.
+            for fd, ci in self._active_socket_info.iteritems():
+                ci[0].close()
+        
+        #dprint("************* SERVER SHUT DOWN *******************")
 
 
 class Gateway(object):

cheroot/test/test_rawtcp.py

+"""Tests for TCP handling *only* (ie: nothing to do with HTTP whatsoever)."""
+
+import unittest
+import struct
+import threading
+import socket
+import time
+import nose
+
+from cheroot import server
+from cheroot.test import helper
+
+ECHO_SERVER_ADDR = ('127.0.0.1', 54583 + 1)
+PACK_FMT = "@I" #native unsigned int (go native since same machine ensured)
+
+INT32_LEN = 4
+def recv_int32(sock):
+    """Read int32 from provided socket. 4 bytes MUST be there."""
+    buf = "" #TODO: Py3k handles bytes differently, I think.  struct impact?
+    while len(buf) < INT32_LEN:
+        buf += sock.recv(INT32_LEN - len(buf))
+    return struct.unpack(PACK_FMT, buf)[0] #native byte order (same machine!)
+
+class Int32Echo_RequestHandler(server.TCPRequest):
+    """A simple TCP server handler for testing known-length raw TCP messages.
+    
+    Expects 4 bytes from the client, and returns the same 4 bytes.
+    
+    Server keeps connection persistent after request.
+    
+    """
+    #Note that the base class is HTTPRequest only do to cheroot's HTTP
+    #heritage. It was initially made for dealing with HTTP only.
+    def __init__(self, server_instance, conn):
+        #Doing base class init is probably a waste of time for raw TCP, but it
+        #seems safe to do (and low cost) at this point since some useful
+        #server tracking may be initialized and things like
+        #self.started_request are set.
+        server.TCPRequest.__init__(self, server_instance, conn)
+        self.ready = True #makes sure no abort between request and respond
+        #Set to stay open until client closes (or server gives up on client).
+        self.close_connection = False
+    
+    def parse_request(self):
+        #Unpacking and repacking isn't really necessary for the echo protocol,
+        #but may want to do something with the int at some point, so
+        #unpack/repack it is.
+        self._rx = recv_int32(self.conn.socket)
+    
+    def respond(self):
+        self.conn.socket.sendall(struct.pack(PACK_FMT, self._rx))
+
+def CreateAndStartInt32EchoServer(bind_addr = ECHO_SERVER_ADDR,
+                                  minthreads = 10,
+                                  maxthreads = -1,
+                                  ssl_adapter = None,
+                                  recycle_threads = True,
+                                  ):
+    #Use HTTPServer, even though we're going to have nothing to do with
+    #HTTP (this stems from cheroot's HTTP heritage).
+    #TODO: make a TCPServer calss (TCPRequest and TCPConnection done already... one more to go)
+    srv = server.HTTPServer(bind_addr = bind_addr,
+                            gateway = None, #needed?
+                            minthreads = minthreads,
+                            maxthreads = maxthreads,
+                            ssl_adapter = ssl_adapter,
+                            recycle_threads = recycle_threads,
+                           )
+    #Jack up the listening request queue size because we are going to hit it
+    #hard. If left at the default size of 5, large numbers of simulataneous
+    #connections stutter badly.
+    srv.request_queue_size = 500
+    #Set the request handler to our Int32Echo handler...
+    # - some more HTTP legacy stuff here with "HTTPConnection"
+    oldReqHandler = srv.ConnectionClass.RequestHandlerClass
+    try:
+        srv.ConnectionClass.RequestHandlerClass = Int32Echo_RequestHandler
+        #Launch the server on a thread...
+        slt = threading.Thread(target = srv.safe_start)
+        slt.setDaemon(1)
+        slt.start()
+        #Give it a chance to bind...
+        time.sleep(0.1)
+        #print "Server started"
+    finally:
+        #Restore the old connection class to the HTTPServer (this is a
+        #tenmporary hack until HTTPServer is moved to a higher level above a
+        #base TCPServer)
+        srv.ConnectionClass.RequestHandlerClass = oldReqHandler
+    return srv
+
+class Int32EchoClient(object):
+    def __init__(self, server_addr, client_no):
+        self._server_addr = server_addr
+        self._client_no = client_no
+        #open the connection (better is to open on first request, but for
+        #testing purposes we will open on instantiation).
+        # - must support python < 2.6, so no socket.create_connection
+        self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        self._sock.connect(ECHO_SERVER_ADDR)
+    
+    def get_echo(self, x):
+        """Send x (an int32) to the echo server nd get it back"""
+        #no checking on x.  Just let struct.pack do it.
+        self._sock.sendall(struct.pack(PACK_FMT, x))
+        ret = recv_int32(self._sock)
+        return ret
+    
+    def close(self):
+        """close the client"""
+        self._sock.close()
+        
+class RawTcpPersistenceTests(unittest.TestCase):
+    #Going with a base test case here since a lot of the stuff in
+    #CherootWebCase is completely unnecessary for these tests and may cause
+    #issues.
+    
+    #TODO: integrate in any missing essentials, such as code coverage tricks
+    
+    def _test_500PersistentConnections(self):
+        #tests are run serially, so server must handle them all with all open.
+        #Set the number of connections to test...
+        # - default is 500 since many systems limit to 1024
+        # - Watch your max file descriptors here or get "too many files"
+        #   - ('ulimit -n' on linux, increase with 'ulimit -Sn <value>' up to
+        #     hard limit of 'ulimit -Hn', at least without hoop jumping.
+        # - also make sure request_queue_size is large, or it stutters on
+        #   the connection slam we're about to do (but still works).
+        #   - seems to be a max here for stutter-free connections, at least on
+        #     the box this was coded on.
+        # - I have tested up to 30000 connections
+        n = 1 #client count.
+        clients = {} #k=index; v=client
+        for i in xrange(n):
+            clients[i + 1] = Int32EchoClient(ECHO_SERVER_ADDR, i)
+            time.sleep(0.002) #try and give the listener a break
+            #print "Connected client %d" % i
+        
+        #time.sleep(1)
+        #x=raw_input("Press enter!!")
+        #at this point we will have n open connections.  Now exercise them.
+        for i, client in clients.iteritems():
+            echo = client.get_echo(i)
+            self.assertEqual(echo, i)
+            if i % 100 == 0:
+                print "Echo 1 complete for %d/%d clients" % (i, n)
+            #print "client %d worked!" % i
+
+        #Exercise them again to prove that our persistent connections still work...
+        for i, client in clients.iteritems():
+            echo = client.get_echo(i)
+            self.assertEqual(echo, i)
+            #if i % 100 == 0:
+                #print "Echo 2 complete for %d/%d clients" % (i, n)
+            #print "client %d worked!" % i
+        
+        #now politely close the clients down, rather than wait for gc
+        for i, client in clients.iteritems():
+            client.close()
+            #print "client %d closed" % i
+    
+    def test_500PersistentConnections(self):
+        #Set up 500 simultaneous/persistent connections and test them
+        srv = None
+        try:
+            srv = CreateAndStartInt32EchoServer(ssl_adapter=None,
+                                                recycle_threads=True)
+            self._test_500PersistentConnections()
+        finally:
+            if srv:
+                srv.stop()
+    
+    def test_500PersistentConnections_SSL(self):
+        raise nose.SkipTest("Need to make the connection SSL")
+        srv = None
+        ssl_adapter = helper.get_default_ssl_adapter()
+        try:
+            srv = CreateAndStartInt32EchoServer(ssl_adapter = ssl_adapter,
+                                                recycle_threads = True)
+            self._test_500PersistentConnections()
+        finally:
+            if srv:
+                srv.stop()
+