pypy / pypy / module / thread / test / support.py

The branch 'remove-globals-in-jit' does not exist.
import gc
import time
import thread
import os

from pypy.interpreter.gateway import interp2app, unwrap_spec
from pypy.module.thread import gil


NORMAL_TIMEOUT = 300.0   # 5 minutes


def waitfor(space, w_condition, delay=1):
    adaptivedelay = 0.04
    limit = time.time() + delay * NORMAL_TIMEOUT
    while time.time() <= limit:
        gil.before_external_call()
        time.sleep(adaptivedelay)
        gil.after_external_call()
        gc.collect()
        if space.is_true(space.call_function(w_condition)):
            return
        adaptivedelay *= 1.05
    print '*** timed out ***'


def timeout_killer(pid, delay):
    def kill():
        for x in range(delay * 10):
            time.sleep(0.1)
            os.kill(pid, 0)
        os.kill(pid, 9)
        print "process %s killed!" % (pid,)
    thread.start_new_thread(kill, ())


class GenericTestThread:
    spaceconfig = dict(usemodules=('thread', 'rctime', 'signal'))

    def setup_class(cls):
        if cls.runappdirect:
            def plain_waitfor(self, condition, delay=1):
                adaptivedelay = 0.04
                limit = time.time() + NORMAL_TIMEOUT * delay
                while time.time() <= limit:
                    time.sleep(adaptivedelay)
                    gc.collect()
                    if condition():
                        return
                    adaptivedelay *= 1.05
                print '*** timed out ***'
            cls.w_waitfor = plain_waitfor

            def py_timeout_killer(self, *args, **kwargs):
                timeout_killer(*args, **kwargs)
            cls.w_timeout_killer = cls.space.wrap(py_timeout_killer)
        else:
            @unwrap_spec(delay=int)
            def py_waitfor(space, w_condition, delay=1):
                waitfor(space, w_condition, delay)
            cls.w_waitfor = cls.space.wrap(interp2app(py_waitfor))

            def py_timeout_killer(space, __args__):
                args_w, kwargs_w = __args__.unpack()
                args = map(space.unwrap, args_w)
                kwargs = dict([
                    (k, space.unwrap(v))
                    for k, v in kwargs_w.iteritems()
                ])
                timeout_killer(*args, **kwargs)
            cls.w_timeout_killer = cls.space.wrap(interp2app(py_timeout_killer))

        cls.w_busywait = cls.space.appexec([], """():
            import time
            return time.sleep
        """)
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.