Commits

Serhiy Storchaka  committed 467c46e

Issue #16165: Fix sched.scheduler.run() method was block a scheduler for
other threads.

  • Participants
  • Parent commits 1c9c0f9
  • Branches 3.3

Comments (0)

Files changed (3)

File Lib/sched.py

         """
         # localize variable access to minimize overhead
         # and to improve thread safety
-        with self._lock:
-            q = self._queue
-            delayfunc = self.delayfunc
-            timefunc = self.timefunc
-            pop = heapq.heappop
-            while q:
-                time, priority, action, argument, kwargs = checked_event = q[0]
+        lock = self._lock
+        q = self._queue
+        delayfunc = self.delayfunc
+        timefunc = self.timefunc
+        pop = heapq.heappop
+        while True:
+            with lock:
+                if not q:
+                    break
+                time, priority, action, argument, kwargs = q[0]
                 now = timefunc()
-                if now < time:
-                    if not blocking:
-                        return time - now
-                    delayfunc(time - now)
+                if time > now:
+                    delay = True
                 else:
-                    event = pop(q)
-                    # Verify that the event was not removed or altered
-                    # by another thread after we last looked at q[0].
-                    if event is checked_event:
-                        action(*argument, **kwargs)
-                        delayfunc(0)   # Let other threads run
-                    else:
-                        heapq.heappush(q, event)
+                    delay = False
+                    pop(q)
+            if delay:
+                if not blocking:
+                    return time - now
+                delayfunc(time - now)
+            else:
+                action(*argument, **kwargs)
+                delayfunc(0)   # Let other threads run
 
     @property
     def queue(self):

File Lib/test/test_sched.py

 import time
 import unittest
 from test import support
-
+try:
+    import threading
+except ImportError:
+    threading = None
 
 class TestCase(unittest.TestCase):
 
         scheduler.run()
         self.assertEqual(l, [0.01, 0.02, 0.03, 0.04, 0.05])
 
+    @unittest.skipUnless(threading, 'Threading required for this test.')
+    def test_enter_concurrent(self):
+        l = []
+        fun = lambda x: l.append(x)
+        scheduler = sched.scheduler(time.time, time.sleep)
+        scheduler.enter(0.03, 1, fun, (0.03,))
+        t = threading.Thread(target=scheduler.run)
+        t.start()
+        for x in [0.05, 0.04, 0.02, 0.01]:
+            z = scheduler.enter(x, 1, fun, (x,))
+        scheduler.run()
+        t.join()
+        self.assertEqual(l, [0.01, 0.02, 0.03, 0.04, 0.05])
+
     def test_priority(self):
         l = []
         fun = lambda x: l.append(x)
         scheduler.run()
         self.assertEqual(l, [0.02, 0.03, 0.04])
 
+    @unittest.skipUnless(threading, 'Threading required for this test.')
+    def test_cancel_concurrent(self):
+        l = []
+        fun = lambda x: l.append(x)
+        scheduler = sched.scheduler(time.time, time.sleep)
+        now = time.time()
+        event1 = scheduler.enterabs(now + 0.01, 1, fun, (0.01,))
+        event2 = scheduler.enterabs(now + 0.02, 1, fun, (0.02,))
+        event3 = scheduler.enterabs(now + 0.03, 1, fun, (0.03,))
+        event4 = scheduler.enterabs(now + 0.04, 1, fun, (0.04,))
+        event5 = scheduler.enterabs(now + 0.05, 1, fun, (0.05,))
+        t = threading.Thread(target=scheduler.run)
+        t.start()
+        scheduler.cancel(event1)
+        scheduler.cancel(event5)
+        t.join()
+        self.assertEqual(l, [0.02, 0.03, 0.04])
+
     def test_empty(self):
         l = []
         fun = lambda x: l.append(x)
 Library
 -------
 
+- Issue #16165: Fix sched.scheduler.run() method was block a scheduler for
+  other threads.
+
 - Issue #16641: Fix default values of sched.scheduler.enter arguments were
   modifiable.