Commits

Brian Kearns committed e102f57

add a modified test of the signals_enabled context manager that works untranslated

Comments (0)

Files changed (1)

pypy/module/__pypy__/test/test_signal.py

 
 
 class AppTestThreadSignal:
+    spaceconfig = dict(usemodules=['__pypy__', 'thread', 'signal', 'time'])
+
+    def test_enable_signals(self):
+        import __pypy__, thread, signal, time
+
+        def subthread():
+            try:
+                with __pypy__.thread.signals_enabled:
+                    thread.interrupt_main()
+                    for i in range(10):
+                        print 'x'
+                        time.sleep(0.1)
+            except BaseException, e:
+                interrupted.append(e)
+            finally:
+                done.append(None)
+
+        # This is normally called by app_main.py
+        signal.signal(signal.SIGINT, signal.default_int_handler)
+
+        for i in range(10):
+            __pypy__.thread._signals_exit()
+            try:
+                done = []
+                interrupted = []
+                thread.start_new_thread(subthread, ())
+                for i in range(10):
+                    if len(done): break
+                    print '.'
+                    time.sleep(0.1)
+                assert len(done) == 1
+                assert len(interrupted) == 1
+                assert 'KeyboardInterrupt' in interrupted[0].__class__.__name__
+            finally:
+                __pypy__.thread._signals_enter()
+
+
+class AppTestThreadSignalLock:
     spaceconfig = dict(usemodules=['__pypy__', 'thread', 'signal'])
 
     def setup_class(cls):
 
     def test_enable_signals(self):
         import __pypy__, thread, signal, time
-        #
+
         interrupted = []
         lock = thread.allocate_lock()
         lock.acquire()
-        #
+
         def subthread():
             try:
                 time.sleep(0.25)
                     thread.interrupt_main()
             except BaseException, e:
                 interrupted.append(e)
-            lock.release()
-        #
+            finally:
+                lock.release()
+
         thread.start_new_thread(subthread, ())
         lock.acquire()
         assert len(interrupted) == 1