Commits

Erik van Zijst committed 98f2b11

Added support for sharing a single timeout quota/allotment across multiple calls to timeout().

Comments (0)

Files changed (4)

             pass
 
 
+Quotas
+------
+
+You can allocate a quota of time and then share it across multiple invocations
+to ``timeout()``. This is especially useful if you need to use timeouts inside
+a loop::
+
+    from interruptingcow import timeout, Quota
+
+    quota = Quota(1.0)
+    for i in something:
+        try:
+            with timeout(quota, RuntimeError):
+                # perform a slow operation
+                pass
+        except RuntimeError:
+            # do a cheaper thing instead
+
+Here the first iterations of the loop will be able to perform the expensive
+operation, until the shared quota of 1 second runs out and then the remaining
+iterations will perform the cheaper alternative.
+
+A single quota instance can also be shared across all calls to ``timeout()``
+your application makes (including nested calls), to give place an upper bound
+on the total runtime, regardless of how many calls to ``timeout()`` you have.
+
 Caveats
 -------
 

interruptingcow/__init__.py

 class StateException(Exception):
     pass
 
+class Quota(object):
+    def __init__(self, seconds):
+        if seconds <= 0:
+            raise ValueError('Invalid timeout: %s' % seconds)
+        else:
+            self._timeleft = seconds
+        self._depth = 0
+        self._starttime = None
+
+    def _start(self):
+        if self._depth is 0:
+            self._starttime = time.time()
+        self._depth += 1
+
+    def _stop(self):
+        if self._depth is 1:
+            self._timeleft = self.remaining()
+            self._starttime = None
+        self._depth -= 1
+
+    def running(self):
+        return self._depth > 0
+
+    def remaining(self):
+        if self.running():
+            return self._timeleft - (time.time() - self._starttime)
+        else:
+            return self._timeleft
+
 def _bootstrap():
 
     Timer = namedtuple('Timer', 'expiration exception')
                                  'programs that use SIGALRM.')
 
     def timeout(seconds, exception):
-        if seconds <= 0:
-            raise ValueError('Invalid timeout: %s' % seconds)
         if threading.currentThread().name != 'MainThread':
             raise StateException('Interruptingcow can only be used from the '
                                  'MainThread.')
+        if isinstance(seconds, Quota):
+            quota = seconds
+        elif seconds <= 0:
+            raise ValueError('Invalid timeout: %s' % seconds)
+        else:
+            quota = Quota(seconds)
         set_sighandler()
+        seconds = quota.remaining()
 
         depth = len(timers)
-        timeleft = signal.getitimer(signal.ITIMER_REAL)[0]
-        if not timers or timeleft > seconds:
+        parenttimeleft = signal.getitimer(signal.ITIMER_REAL)[0]
+        if not timers or parenttimeleft > seconds:
             try:
+                quota._start()
                 signal.setitimer(signal.ITIMER_REAL, seconds)
                 timers.append(Timer(time.time() + seconds, exception))
                 yield
             finally:
+                quota._stop()
                 if len(timers) > depth:
                     # cancel our timer
                     signal.setitimer(signal.ITIMER_REAL, 0)
                     timers.pop()
                     if timers:
                         # reinstall the parent timer
-                        timeleft = timers[-1].expiration - time.time()
-                        if timeleft > 0:
-                            signal.setitimer(signal.ITIMER_REAL, timeleft)
+                        parenttimeleft = timers[-1].expiration - time.time()
+                        if parenttimeleft > 0:
+                            signal.setitimer(signal.ITIMER_REAL, parenttimeleft)
                         else:
                             # the parent timer has expired, trigger the handler
                             handler()
         else:
             # not enough time left on the parent timer
-            yield
+            try:
+                quota._start()
+                yield
+            finally:
+                quota._stop()
 
     class Timeout(GeneratorContextManager):
         """This class allows us to use timeout() both as an inline
         'Topic :: Utilities',
     ],
     description='A watchdog that interrupts long running code.',
-    download_url='https://bitbucket.org/evzijst/interruptingcow/downloads/interruptingcow-0.6.tar.gz',
+    download_url='https://bitbucket.org/evzijst/interruptingcow/downloads/interruptingcow-0.7.tar.gz',
     keywords='debug watchdog timer interrupt',
     license='MIT',
     long_description=long_description(),
     name='interruptingcow',
     packages=['interruptingcow'],
     url='https://bitbucket.org/evzijst/interruptingcow',
-    version='0.6',
+    version='0.7',
 )

tests/__init__.py

 import threading
 import unittest
 
-from interruptingcow import timeout, StateException
+from interruptingcow import timeout, Quota, StateException
 
 class TimeoutError(Exception):
     pass
 
 if __name__ == '__main__':
     unittest.main()
+
+class TestQuota(unittest.TestCase):
+    def test_remaining(self):
+        q = Quota(1)
+        self.assertEquals(1, q.remaining())
+
+        q._start()
+        self.assertTrue(q.running())
+        time.sleep(0.5)
+        q._stop()
+
+        self.assertFalse(q.running())
+        self.assertLessEqual(q.remaining(), 0.5)
+
+    def test_nesting(self):
+        q = Quota(1)
+
+        q._start()
+        self.assertTrue(q.running())
+        q._start()
+        self.assertTrue(q.running())
+        time.sleep(0.5)
+        q._stop()
+        self.assertTrue(q.running())
+        q._stop()
+        self.assertFalse(q.running())
+
+        self.assertLessEqual(q.remaining(), 0.5)
+
+    def test_continuation(self):
+        q = Quota(1)
+
+        q._start()
+        q._stop()
+        time.sleep(0.2)
+        q._start()
+        q._stop()
+
+        # tolerate a little time spent outside the sleep:
+        self.assertGreater(q.remaining(), 0.9)
+
+class TestQuotaTimeouts(unittest.TestCase):
+    def test_timeout(self):
+        q = Quota(1)
+        with timeout(q, RuntimeError):
+            time.sleep(0.2)
+        self.assertTrue(0.75 < q.remaining() <= 0.8)
+        time.sleep(1)
+        self.assertTrue(0.75 < q.remaining() <= 0.8)
+        with timeout(q, RuntimeError):
+            time.sleep(0.2)
+        self.assertTrue(0.55 < q.remaining() <= 0.6)
+
+        try:
+            with timeout(q, RuntimeError):
+                time.sleep(0.7)
+            self.fail()
+        except RuntimeError:
+            pass
+
+
+    def test_nesting(self):
+        q1 = Quota(0.5)
+        q2 = Quota(1)
+
+        try:
+            with timeout(q1, Outer):
+                with timeout(q2, Inner):
+                    time.sleep(1)
+            self.fail()
+        except Outer:
+            self.assertFalse(q1.running())
+            self.assertLessEqual(q1.remaining(), 0.05)
+
+            self.assertFalse(q2.running())
+            self.assertTrue(0.45 < q2.remaining() <= 0.5)