Issue #1205 new

Cherrypy dropping connection under heavy load.

Anonymous created an issue

There appears to an issue with cherrypy where in it simple does not process the incoming connections. It does accept the connection but ignores. Looks like the issue is with shrink() and grow() due to a race condition.

        if idle < min_spare and (max_threads < 0 or len(self._threads) < max_threads):
            self.grow(self.min_spare - idle)
        elif idle > self.max_spare and len(self._threads) > min_threads:
            self.shrink(idle - self.max_spare)
        time.sleep(1)

Grow() creates a new worker thread and append to self.threads.

def grow(self, amount):
    """Spawn new worker threads (not above self.max)."""
    for i in xrange(amount):
        if self.max > 0 and len(self._threads) >= self.max:
            break
        worker = WorkerThread(self.server)
        worker.setName("CP WSGIServer " + worker.getName())
        self._threads.append(worker)

Thread is not yet marked as ready as it has not finished start(). self.conn not set. worker.start()

Now monitor_threads() later finds that there are too many idle threads(self.conn not set) and tries to shrink.

       for i in xrange(min(amount, len(self._threads) - self.min)):
            # Put a number of shutdown requests on the queue equal
            # to 'amount'. Once each of those is processed by a worker,
            # that worker will terminate and be culled from our list
            # in self.put.
            self._queue.put(_SHUTDOWNREQUEST)

Later at some point of time we have min_spare threads(5) and one main thread remaining. All the worker threads were set _SHUTDOWNREQUEST even though conn not set so following accept loop drops the new connections as self.ready=false.

   def tick(self):
        """Accept a new connection and put it on the Queue."""
        try:
            if nativessl and isinstance(self.socket, nativessl.SSLSocket):
                try:
                    s, addr = self.socket.accept()
                except nativessl.SSLError, e:
                    return
            else:
                s, addr = self.socket.accept()

            # SPL-18564 OS X occasionally returns no remote address when talking to IE6
            if addr is None:
                addr = ('::', '0')

            prevent_socket_inheritance(s)
            if not self.ready:
                return

The source of the problem is race condition between grow() and shrink().

def shrink(self, amount):
    """Kill off worker threads (not below self.min)."""
    # Grow/shrink the pool if necessary.
    # Remove any dead threads from our list
    for t in self._threads:
        if not t.isAlive():
            self._threads.remove(t)
            amount -= 1

    if amount > 0:
        for i in xrange(min(amount, len(self._threads) - self.min)):
            # Put a number of shutdown requests on the queue equal
            # to 'amount'. Once each of those is processed by a worker,
            # that worker will terminate and be culled from our list
            # in self.put.
            self._queue.put(_SHUTDOWNREQUEST)

As as fix avoid calling shrink till all newly created workers are ready or may be in grow() make sure worker is ready before appending it to self.threads.

something like.

def grow(self, amount):
    """Spawn new worker threads (not above self.max)."""
    for i in xrange(amount):
        if self.max > 0 and len(self._threads) >= self.max:
            break
        worker = WorkerThread(self.server)
        worker.setName("CP WSGIServer " + worker.getName())
        worker.start()
        while not worker.ready:
            time.sleep(.1)
        self._threads.append(worker)

Thanks -Harendra Rawat

Comments (10)

  1. HARENDRA Rawat

    Under heavily loaded system there are only 6 threads shown by pstack.

    Thread 6 (Thread 0x7fc3e16d4700 (LWP 2008)): Thread 5 (Thread 0x7fc3e0bb1700 (LWP 2009)): Thread 4 (Thread 0x7fc3d97fa700 (LWP 2017)): Thread 3 (Thread 0x7fc3d8ff9700 (LWP 4923)): Thread 2 (Thread 0x7fc3b57fa700 (LWP 4949)): Thread 1 (Thread 0x7fc3ea210720 (LWP 2004)): number of EST connection to port 8000 are 121

    netstat -an |grep 8000|grep EST|wc -l 121 If a new connection is requested, cherrpy accepts it but does not process. netstat -an |grep 8000|grep EST|wc -l 122

  2. HARENDRA Rawat

    Not only drops connection by virtual memory grows due to CLOSE_WAIT as python does not free dict object due to a stale connection that cherrypy is not able to close now.

  3. Jason R. Coombs

    In 90165fe145e3, I implemented the proposed suggestion, only adding new workers to the pool once they're ready. I refactored the code first to facilitate more readily checking and then adding all new workers at once, so you may want to visit the preceding changesets.

    I would appreciate if someone would review the changes and also test them.

    I have a couple of concerns:

    1. Is it safe to have that loop? Is it possible that a thread might not enter the ready state, leaving the .grow method in an infinite loop?
    2. Is it possible to write a test to exercise this behavior and prove its effectiveness?
  4. HARENDRA Rawat

    I have a couple of concerns: Is it safe to have that loop? Is it possible that a thread might not enter the ready state, >leaving the .grow method in an infinite loop?

    Highly unlikely. Even if it comes out of loop no point in continuing. If we want to avoid it then here what I propose. I don't have the refactored code, so this wrt what I have. Idea is to not loop rather save threads that are not ready in another variable. Next time grow is called try to append new threads without waiting.

    def grow(self, amount):

    """Spawn new worker threads (not above self.max)."""
    
    if len(self.newthreads):
    
        appendToPool(self)
    
        return
    
    for i in xrange(amount):
    
        if self.max > 0 and len(self._threads) + len(self.newthreads) >= self.max:
    
            break
    
        worker = WorkerThread(self.server)
    
        worker.setName("CP WSGIServer " + worker.getName())
    
        worker.start()
    
        if worker.ready:
    
            self._threads.append(worker)
    
        else:
    
            self.newthreads.append(worker)
    

    def appendToTPool(self):

    """Append new ready worker threads ."""
    
    for worker in self.newthreads:
    
        if worker.ready:
    
            self._threads.append(worker)
    
            self.newthreads.remove(worker)
    
  5. Log in to comment