Commits

Armin Rigo committed e80549f

Test for acquire_timed() returning RPY_LOCK_INTR.

Comments (0)

Files changed (3)

rpython/rlib/rthread.py

         return bool(res)
 
     def acquire_timed(self, timeout):
-        "timeout is in microseconds."
+        """Timeout is in microseconds.  Returns 0 in case of failure,
+        1 in case it works, 2 if interrupted by a signal."""
         res = c_thread_acquirelock_timed(self._lock, timeout, 1)
         res = rffi.cast(lltype.Signed, res)
-        return bool(res)
+        return res
 
     def release(self):
         # Sanity check: the lock must be locked

rpython/rlib/test/test_rthread.py

             l = allocate_lock()
             l.acquire(True)
             t1 = time.time()
-            ok = l.acquire_timed(1000000)
+            ok = l.acquire_timed(1000001)
             t2 = time.time()
             delay = t2 - t1
-            if ok:
+            if ok == 0:        # RPY_LOCK_FAILURE
+                return -delay
+            elif ok == 2:      # RPY_LOCK_INTR
                 return delay
-            else:
-                return -delay
+            else:              # RPY_LOCK_ACQUIRED
+                return 0.0
         fn = self.getcompiled(f, [])
         res = fn()
         assert res < -1.0
 
+    def test_acquire_timed_alarm(self):
+        import sys
+        if not sys.platform.startswith('linux'):
+            py.test.skip("skipped on non-linux")
+        import time
+        from rpython.rlib import rsignal
+        def f():
+            l = allocate_lock()
+            l.acquire(True)
+            #
+            rsignal.pypysig_setflag(rsignal.SIGALRM)
+            rsignal.c_alarm(1)
+            #
+            t1 = time.time()
+            ok = l.acquire_timed(2500000)
+            t2 = time.time()
+            delay = t2 - t1
+            if ok == 0:        # RPY_LOCK_FAILURE
+                return -delay
+            elif ok == 2:      # RPY_LOCK_INTR
+                return delay
+            else:              # RPY_LOCK_ACQUIRED
+                return 0.0
+        fn = self.getcompiled(f, [])
+        res = fn()
+        assert res >= 0.95
+
 #class TestRunDirectly(AbstractThreadTests):
 #    def getcompiled(self, f, argtypes):
 #        return f

rpython/translator/c/src/thread.h

 typedef enum RPyLockStatus {
     RPY_LOCK_FAILURE = 0,
     RPY_LOCK_ACQUIRED = 1,
-    RPY_LOCK_INTR
+    RPY_LOCK_INTR = 2
 } RPyLockStatus;
 
 #ifdef _WIN32