Commits

Denis Bilenko  committed 48d227b

threadpool.py: forward unhandled errors to hub's handle_error; better manage threads

spawn manager greenlet if necessary to finish off excess threads

  • Participants
  • Parent commits 1d52de4

Comments (0)

Files changed (1)

File gevent/threadpool.py

             hub = get_hub()
         self.hub = hub
         self._maxsize = 0
-        self._init(maxsize)
+        self.manager = None
         self.pid = os.getpid()
         self.fork_watcher = hub.loop.fork(ref=False)
-        self.fork_watcher.start(self._on_fork)
+        self._init(maxsize)
 
     def _set_maxsize(self, maxsize):
         if not isinstance(maxsize, integer_types):
         difference = maxsize - self._maxsize
         self._semaphore.counter += difference
         self._maxsize = maxsize
-        self._remove_threads()
-        self._add_threads()
+        self.adjust()
         # make sure all currently blocking spawn() start unlocking if maxsize increased
         self._semaphore._start_notify()
 
             delay = min(delay * 2, .05)
 
     def kill(self):
-        delay = 0.0005
-        while self._size > 0:
-            self._remove_threads(0)
+        if self.manager:
+            self.manager.kill()
+        self._manage(0)
+
+    def _adjust(self, maxsize):
+        if maxsize is None:
+            maxsize = self._maxsize
+        while self.task_queue.unfinished_tasks > self._size and self._size < maxsize:
+            self._add_thread()
+        while self._size - maxsize > self.task_queue.unfinished_tasks:
+            self.task_queue.put(None)
+        if self._size:
+            self.fork_watcher.start(self._on_fork)
+        else:
+            self.fork_watcher.stop()
+
+    def _manage(self, maxsize=None):
+        if maxsize is None:
+            maxsize = self._maxsize
+        delay = 0.0001
+        while True:
+            self._adjust(maxsize)
+            if self._size <= maxsize:
+                return
             sleep(delay)
             delay = min(delay * 2, .05)
 
-    def _add_threads(self):
-        while self.task_queue.unfinished_tasks > self._size:
-            if self._size >= self.maxsize:
-                break
-            self._add_thread()
-
-    def _remove_threads(self, maxsize=None):
-        if maxsize is None:
-            maxsize = self._maxsize
-        excess = self._size - maxsize
-        if excess > 0:
-            while excess > self.task_queue.qsize():
-                self.task_queue.put(None)
+    def adjust(self):
+        if self.manager:
+            return
+        if self._adjust(self.maxsize):
+            return
+        self.manager = Greenlet.spawn(self._manage)
 
     def _add_thread(self):
         with self._lock:
         try:
             task_queue = self.task_queue
             result = AsyncResult()
-            tr = ThreadResult(result, hub=self.hub)
-            self._remove_threads()
-            task_queue.put((func, args, kwargs, tr))
-            self._add_threads()
+            thread_result = ThreadResult(result, hub=self.hub)
+            task_queue.put((func, args, kwargs, thread_result))
+            self.adjust()
+            # rawlink() must be the last call
             result.rawlink(lambda *args: self._semaphore.release())
         except:
             semaphore.release()
             raise
         return result
 
+    def _decrease_size(self):
+        if sys is None:
+            return
+        _lock = getattr(self, '_lock', None)
+        if _lock is not None:
+            with _lock:
+                self._size -= 1
+
     def _worker(self):
+        need_decrease = True
         try:
             while True:
                 task_queue = self.task_queue
                 task = task_queue.get()
                 try:
                     if task is None:
+                        need_decrease = False
+                        self._decrease_size()
+                        # we want first to decrease size, then decrease unfinished_tasks
+                        # otherwise, _adjust might think there's one more idle thread that
+                        # needs to be killed
                         return
                     func, args, kwargs, result = task
                     try:
                         exc_info = getattr(sys, 'exc_info', None)
                         if exc_info is None:
                             return
-                        result.set_exception(exc_info()[1])
+                        result.handle_error(func, exc_info())
                     else:
                         if sys is None:
                             return
                         return
                     task_queue.task_done()
         finally:
-            if sys is None:
-                return
-            _lock = getattr(self, '_lock', None)
-            if _lock is not None:
-                with _lock:
-                    self._size -= 1
+            if need_decrease:
+                self._decrease_size()
 
     def apply(self, func, args=None, kwds=None):
         """Equivalent of the apply() builtin function. It blocks till the result is ready."""
     def __init__(self, receiver, hub=None):
         if hub is None:
             hub = get_hub()
+        self.receiver = receiver
+        self.hub = hub
         self.value = None
-        self.exception = None
-        self.receiver = receiver
+        self.context = None
+        self.exc_info = None
         self.async = hub.loop.async()
         self.async.start(self._on_async)
 
     def _on_async(self):
         self.async.stop()
-        if self.receiver is not None:
-            self.receiver(self)
+        try:
+            if self.exc_info is not None:
+                try:
+                    self.hub.handle_error(self.context, *self.exc_info)
+                finally:
+                    self.exc_info = None
+            self.context = None
+            self.async = None
+            self.hub = None
+            if self.receiver is not None:
+                self.receiver(self)
+        finally:
             self.receiver = None
-
-    def successful(self):
-        return self.exception is None
+            self.value = None
 
     def set(self, value):
         self.value = value
         self.async.send()
 
-    def set_exception(self, value):
-        self.exception = value
+    def handle_error(self, context, exc_info):
+        self.context = context
+        self.exc_info = exc_info
         self.async.send()
+
+    # link protocol:
+    def successful(self):
+        return True