Source

pypy / rpython / translator / c / gcc / test / test_asmgcroot.py

import py
import sys, os, gc
from rpython.translator.c.test import test_newgc
from rpython.translator.translator import TranslationContext
from rpython.translator.c.genc import CStandaloneBuilder
from rpython.annotator.listdef import s_list_of_strings
from rpython.translator.tool.cbuild import ExternalCompilationInfo
from rpython.translator.platform import platform as compiler
from rpython.rlib.rarithmetic import is_emulated_long
from rpython.rtyper.lltypesystem import lltype, rffi
from rpython.rlib.entrypoint import entrypoint, secondary_entrypoints
from rpython.rtyper.lltypesystem.lloperation import llop

_MSVC = compiler.name == "msvc"
_MINGW = compiler.name == "mingw32"
_WIN32 = _MSVC or _MINGW
_WIN64 = _WIN32 and is_emulated_long
# XXX get rid of 'is_emulated_long' and have a real config here.

class AbstractTestAsmGCRoot:
    # the asmgcroot gc transformer doesn't generate gc_reload_possibly_moved
    # instructions:
    should_be_moving = False

    @classmethod
    def make_config(cls):
        if _MSVC and _WIN64:
            py.test.skip("all asmgcroot tests disabled for MSVC X64")
        from rpython.config.translationoption import get_combined_translation_config
        config = get_combined_translation_config(translating=True)
        config.translation.gc = cls.gcpolicy
        config.translation.gcrootfinder = "asmgcc"
        config.translation.taggedpointers = getattr(cls, "taggedpointers", False)
        return config

    @classmethod
    def _makefunc_str_int(cls, func):
        def main(argv):
            arg0 = argv[1]
            arg1 = int(argv[2])
            try:
                res = func(arg0, arg1)
            except MemoryError:
                print 'Result: MemoryError'
            else:
                print 'Result: "%s"' % (res,)
            return 0
        config = cls.make_config()
        t = TranslationContext(config=config)
        a = t.buildannotator()
        sec_ep = getattr(cls, 'secondary_entrypoints', [])
        for f, inputtypes in sec_ep:
            a.build_types(f, inputtypes, False)
        a.build_types(main, [s_list_of_strings])
        t.buildrtyper().specialize()
        t.checkgraphs()

        cbuilder = CStandaloneBuilder(t, main, config=config,
                secondary_entrypoints=sec_ep)
        c_source_filename = cbuilder.generate_source(
            defines = cbuilder.DEBUG_DEFINES)
        cls._patch_makefile(cbuilder.targetdir)
        if py.test.config.option.view:
            t.view()
        exe_name = cbuilder.compile()

        def run(arg0, arg1):
            lines = []
            print >> sys.stderr, 'RUN: starting', exe_name, arg0, arg1
            if sys.platform == 'win32':
                redirect = ' 2> NUL'
            else:
                redirect = ''
            if config.translation.shared and os.name == 'posix':
                env = 'LD_LIBRARY_PATH="%s" ' % (exe_name.dirpath(),)
            else:
                env = ''
            cwd = os.getcwd()
            try:
                os.chdir(str(exe_name.dirpath()))
                g = os.popen(
                    '%s"%s" %s %d%s' % (env, exe_name, arg0, arg1, redirect), 'r')
            finally:
                os.chdir(cwd)
            for line in g:
                print >> sys.stderr, 'RUN:', line.rstrip()
                lines.append(line)
            g.close()
            if not lines:
                py.test.fail("no output from subprocess")
            if not lines[-1].startswith('Result:'):
                py.test.fail("unexpected output from subprocess")
            result = lines[-1][len('Result:'):].strip()
            if result == 'MemoryError':
                raise MemoryError("subprocess got an RPython MemoryError")
            if result.startswith('"') and result.endswith('"'):
                return result[1:-1]
            else:
                return int(result)
        return run

    @classmethod
    def _patch_makefile(cls, targetdir):
        # for testing, patch the Makefile to add the -r option to
        # trackgcroot.py.
        makefile = targetdir.join('Makefile')
        f = makefile.open()
        lines = f.readlines()
        f.close()
        found = False
        for i in range(len(lines)):
            if 'trackgcroot.py' in lines[i]:
                lines[i] = lines[i].replace('trackgcroot.py',
                                            'trackgcroot.py -r')
                found = True
        assert found
        f = makefile.open('w')
        f.writelines(lines)
        f.close()

    if sys.platform == 'win32':
        def test_callback_with_collect(self):
            py.test.skip("No libffi yet with mingw32")

        def define_callback_with_collect(cls):
            return lambda: 0

class TestAsmGCRootWithSemiSpaceGC(AbstractTestAsmGCRoot,
                                   test_newgc.TestSemiSpaceGC):
    # for the individual tests see
    # ====> ../../test/test_newgc.py
    secondary_entrypoints = []

    def define_large_function(cls):
        class A(object):
            def __init__(self):
                self.x = 0
        d = dict(A=A)
        exec ("def g(a):\n" +
              "    a.x += 1\n" * 1000 +
              "    return A()\n"
              ) in d
        g = d['g']
        def f():
            a = A()
            g(a)
            return a.x
        return f

    def test_large_function(self):        
        res = self.run('large_function')
        assert res == 1000

    def define_callback_simple(cls):
        c_source = py.code.Source("""
        int mystuff(int(*cb)(int, int))
        {
            return cb(40, 2) + cb(3, 4);
        }
        """)
        eci = ExternalCompilationInfo(separate_module_sources=[c_source])
        S = lltype.GcStruct('S', ('x', lltype.Signed))
        CALLBACK = lltype.FuncType([lltype.Signed, lltype.Signed],
                                   lltype.Signed)
        z = rffi.llexternal('mystuff', [lltype.Ptr(CALLBACK)], lltype.Signed,
                            compilation_info=eci)

        def mycallback(a, b):
            gc.collect()
            return a + b

        def f():
            p = lltype.malloc(S)
            p.x = 100
            result = z(mycallback)
            return result * p.x

        return f

    def test_callback_simple(self):
        res = self.run('callback_simple')
        assert res == 4900

    def define_secondary_entrypoint_callback(cls):
        # XXX this is baaaad, cleanup global state
        try:
            del secondary_entrypoints["x42"]
        except KeyError:
            pass
        
        @entrypoint("x42", [lltype.Signed, lltype.Signed], c_name='callback')
        def mycallback(a, b):
            llop.gc_stack_bottom(lltype.Void)
            rffi.stackcounter.stacks_counter += 1
            gc.collect()
            rffi.stackcounter.stacks_counter -= 1
            return a + b

        c_source = py.code.Source("""
        int mystuff2()
        {
            return callback(40, 2) + callback(3, 4);
        }
        """)

        eci = ExternalCompilationInfo(separate_module_sources=[c_source])
        z = rffi.llexternal('mystuff2', [], lltype.Signed,
                            compilation_info=eci)
        S = lltype.GcStruct('S', ('x', lltype.Signed))

        cls.secondary_entrypoints = secondary_entrypoints["x42"]

        def f():
            p = lltype.malloc(S)
            p.x = 100
            result = z()
            return result * p.x

        return f

    def test_secondary_entrypoint_callback(self):
        res = self.run('secondary_entrypoint_callback')
        assert res == 4900

class TestAsmGCRootWithSemiSpaceGC_Mingw32(TestAsmGCRootWithSemiSpaceGC):
    # for the individual tests see
    # ====> ../../test/test_newgc.py

    @classmethod
    def setup_class(cls):
        if sys.platform != 'win32':
            py.test.skip("mingw32 specific test")
        if not ('mingw' in os.popen('gcc --version').read() and
                'GNU' in os.popen('make --version').read()):
            py.test.skip("mingw32 and MSYS are required for this test")

        test_newgc.TestSemiSpaceGC.setup_class.im_func(cls)

    @classmethod
    def make_config(cls):
        config = TestAsmGCRootWithSemiSpaceGC.make_config()
        config.translation.cc = 'mingw32'
        return config


    def test_callback_with_collect(self):
        py.test.skip("No libffi yet with mingw32")

    def define_callback_with_collect(cls):
        return lambda: 0

class TestAsmGCRootWithSemiSpaceGC_Shared(TestAsmGCRootWithSemiSpaceGC):
    @classmethod
    def make_config(cls):
        config = TestAsmGCRootWithSemiSpaceGC.make_config()
        config.translation.shared = True
        return config

class TestAsmGCRootWithHybridTagged(AbstractTestAsmGCRoot,
                                    test_newgc.TestHybridTaggedPointers):
    pass
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.