Armin Rigo avatar Armin Rigo committed 83962cb Draft

Finish the tests, and write a simple handling of ec.topframeref.

Comments (0)

Files changed (2)

pypy/module/_continuation/interp_continuation.py

         ec = self.space.getexecutioncontext()
         if ec.stacklet_thread is not self.sthread:
             raise geterror(self.space, "inter-thread support is missing")
+        return ec
 
     def descr_init(self, w_callable, __args__):
         if self.h:
             raise geterror(self.space, "continuation not initialized yet")
         if self.sthread.is_empty_handle(self.h):
             raise geterror(self.space, "continuation already finished")
+        ec = self.check_sthread()
+        saved_frame_top = ec.topframeref
         start_state.w_value = w_value
         try:
             self.h = self.sthread.switch(self.h)
         except MemoryError:
             raise getmemoryerror(self.space)
+        ec = self.sthread.ec
+        ec.topframeref = saved_frame_top
         if start_state.propagate_exception:
             e = start_state.propagate_exception
             start_state.propagate_exception = None
         start_state.w_value = None
         return w_value
 
+    def descr_is_pending(self):
+        valid = bool(self.h) and not self.sthread.is_empty_handle(self.h)
+        return self.space.newbool(valid)
+
 
 def W_Continuation___new__(space, w_subtype, __args__):
     r = space.allocate_instance(W_Continuation, w_subtype)
     __new__     = interp2app(W_Continuation___new__),
     __init__    = interp2app(W_Continuation.descr_init),
     switch      = interp2app(W_Continuation.descr_switch),
-    #is_pending = interp2app(W_Stacklet.is_pending),
+    is_pending  = interp2app(W_Continuation.descr_is_pending),
     )
 
 

pypy/module/_continuation/test/test_stacklet.py

         assert seen == [1, 2, 3, 4]
 
     def test_exception_with_switch(self):
-        from _continuation import new
+        from _continuation import continuation
         #
-        def depth1(h):
+        def depth1(c):
             seen.append(1)
-            h = h.switch()
+            c.switch()
             seen.append(3)
             raise ValueError
         #
         seen = []
-        h = new(depth1)
+        c = continuation(depth1)
+        seen.append(0)
+        c.switch()
         seen.append(2)
-        raises(ValueError, h.switch)
-        assert seen == [1, 2, 3]
+        raises(ValueError, c.switch)
+        assert seen == [0, 1, 2, 3]
+
+    def test_is_pending(self):
+        from _continuation import continuation
+        #
+        def switchbackonce_callback(c):
+            assert c.is_pending()
+            res = c.switch('a')
+            assert res == 'b'
+            assert c.is_pending()
+            return 'c'
+        #
+        c = continuation.__new__(continuation)
+        assert not c.is_pending()
+        c.__init__(switchbackonce_callback)
+        assert c.is_pending()
+        res = c.switch()
+        assert res == 'a'
+        assert c.is_pending()
+        res = c.switch('b')
+        assert res == 'c'
+        assert not c.is_pending()
+
+    def test_switch_alternate(self):
+        from _continuation import continuation
+        #
+        def func_lower(c):
+            res = c.switch('a')
+            assert res == 'b'
+            res = c.switch('c')
+            assert res == 'd'
+            return 'e'
+        #
+        def func_upper(c):
+            res = c.switch('A')
+            assert res == 'B'
+            res = c.switch('C')
+            assert res == 'D'
+            return 'E'
+        #
+        c_lower = continuation(func_lower)
+        c_upper = continuation(func_upper)
+        res = c_lower.switch()
+        assert res == 'a'
+        res = c_upper.switch()
+        assert res == 'A'
+        res = c_lower.switch('b')
+        assert res == 'c'
+        res = c_upper.switch('B')
+        assert res == 'C'
+        res = c_lower.switch('d')
+        assert res == 'e'
+        res = c_upper.switch('D')
+        assert res == 'E'
 
     def test_exception_with_switch_depth2(self):
-        from _continuation import new
+        from _continuation import continuation
         #
-        def depth2(h):
+        def depth2(c):
             seen.append(4)
-            h = h.switch()
+            c.switch()
             seen.append(6)
             raise ValueError
         #
-        def depth1(h):
+        def depth1(c):
             seen.append(1)
-            h = h.switch()
+            c.switch()
             seen.append(3)
-            h2 = new(depth2)
+            c2 = continuation(depth2)
+            c2.switch()
             seen.append(5)
-            raises(ValueError, h2.switch)
-            assert not h2.is_pending()
+            raises(ValueError, c2.switch)
+            assert not c2.is_pending()
             seen.append(7)
+            assert c.is_pending()
             raise KeyError
         #
         seen = []
-        h = new(depth1)
+        c = continuation(depth1)
+        c.switch()
         seen.append(2)
-        raises(KeyError, h.switch)
-        assert not h.is_pending()
+        raises(KeyError, c.switch)
+        assert not c.is_pending()
         assert seen == [1, 2, 3, 4, 5, 6, 7]
 
     def test_various_depths(self):
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.