Commits

Alexander Hesse committed a3ebad4

Moved rpython tests to rpython

  • Participants
  • Parent commits dd8815a
  • Branches split-rpython

Comments (0)

Files changed (28)

pypy/module/thread/test/test_rthread.py

-import gc
-from rpython.rlib.rthread import *
-from rpython.translator.c.test.test_boehm import AbstractGCTestClass
-from rpython.rtyper.lltypesystem import lltype, rffi
-import py
-
-def setup_module(mod):
-    # Hack to avoid a deadlock if the module is run after other test files :-(
-    # In this module, we assume that rthread.start_new_thread() is not
-    # providing us with a GIL equivalent, except in test_gc_locking
-    # which installs its own aroundstate.
-    rffi.aroundstate._cleanup_()
-
-def test_lock():
-    l = allocate_lock()
-    ok1 = l.acquire(True)
-    ok2 = l.acquire(False)
-    l.release()
-    ok3 = l.acquire(False)
-    res = ok1 and not ok2 and ok3
-    assert res == 1
-
-def test_thread_error():
-    l = allocate_lock()
-    try:
-        l.release()
-    except error:
-        pass
-    else:
-        py.test.fail("Did not raise")
-
-
-class AbstractThreadTests(AbstractGCTestClass):
-    use_threads = True
-
-    def test_start_new_thread(self):
-        import time
-
-        class State:
-            pass
-        state = State()
-
-        def bootstrap1():
-            state.my_thread_ident1 = get_ident()
-        def bootstrap2():
-            state.my_thread_ident2 = get_ident()
-
-        def f():
-            state.my_thread_ident1 = get_ident()
-            state.my_thread_ident2 = get_ident()
-            start_new_thread(bootstrap1, ())
-            start_new_thread(bootstrap2, ())
-            willing_to_wait_more = 1000
-            while (state.my_thread_ident1 == get_ident() or
-                   state.my_thread_ident2 == get_ident()):
-                willing_to_wait_more -= 1
-                if not willing_to_wait_more:
-                    raise Exception("thread didn't start?")
-                time.sleep(0.01)
-            return 42
-
-        fn = self.getcompiled(f, [])
-        res = fn()
-        assert res == 42
-
-    def test_gc_locking(self):
-        import time
-        from rpython.rlib.objectmodel import invoke_around_extcall
-        from rpython.rlib.debug import ll_assert
-
-        class State:
-            pass
-        state = State()
-
-        class Z:
-            def __init__(self, i, j):
-                self.i = i
-                self.j = j
-            def run(self):
-                j = self.j
-                if self.i > 1:
-                    g(self.i-1, self.j * 2)
-                    ll_assert(j == self.j, "1: bad j")
-                    g(self.i-2, self.j * 2 + 1)
-                else:
-                    if len(state.answers) % 7 == 5:
-                        gc.collect()
-                    state.answers.append(self.j)
-                ll_assert(j == self.j, "2: bad j")
-            run._dont_inline_ = True
-
-        def before_extcall():
-            release_NOAUTO(state.gil)
-        before_extcall._gctransformer_hint_cannot_collect_ = True
-        # ^^^ see comments in gil.py about this hint
-
-        def after_extcall():
-            acquire_NOAUTO(state.gil, True)
-            gc_thread_run()
-        after_extcall._gctransformer_hint_cannot_collect_ = True
-        # ^^^ see comments in gil.py about this hint
-
-        def bootstrap():
-            # after_extcall() is called before we arrive here.
-            # We can't just acquire and release the GIL manually here,
-            # because it is unsafe: bootstrap() is called from a rffi
-            # callback which checks for and reports exceptions after
-            # bootstrap() returns.  The exception checking code must be
-            # protected by the GIL too.
-            z = state.z
-            state.z = None
-            state.bootstrapping.release()
-            z.run()
-            gc_thread_die()
-            # before_extcall() is called after we leave here
-
-        def g(i, j):
-            state.bootstrapping.acquire(True)
-            state.z = Z(i, j)
-            gc_thread_prepare()
-            start_new_thread(bootstrap, ())
-
-        def f():
-            state.gil = allocate_ll_lock()
-            acquire_NOAUTO(state.gil, True)
-            state.bootstrapping = allocate_lock()
-            state.answers = []
-            state.finished = 0
-            # the next line installs before_extcall() and after_extcall()
-            # to be called automatically around external function calls.
-            invoke_around_extcall(before_extcall, after_extcall)
-
-            g(10, 1)
-            done = False
-            willing_to_wait_more = 2000
-            while not done:
-                if not willing_to_wait_more:
-                    break
-                willing_to_wait_more -= 1
-                done = len(state.answers) == expected
-
-                time.sleep(0.01)
-
-            time.sleep(0.1)
-
-            return len(state.answers)
-
-        expected = 89
-        try:
-            fn = self.getcompiled(f, [])
-        finally:
-            rffi.aroundstate._cleanup_()
-        answers = fn()
-        assert answers == expected
-
-#class TestRunDirectly(AbstractThreadTests):
-#    def getcompiled(self, f, argtypes):
-#        return f
-# These are disabled because they crash occasionally for bad reasons
-# related to the fact that ll2ctypes is not at all thread-safe
-
-class TestUsingBoehm(AbstractThreadTests):
-    gcpolicy = 'boehm'
-
-class TestUsingFramework(AbstractThreadTests):
-    gcpolicy = 'generation'

pypy/tool/test/test_cache.py

-from rpython.rlib.cache import Cache 
-
-class MyCache(Cache):
-    counter = 0
-    def _build(self, key):
-        self.counter += 1
-        return key*7
-
-class TestCache: 
-    def test_getorbuild(self):
-        cache = MyCache()
-        assert cache.getorbuild(1) == 7
-        assert cache.counter == 1
-        assert cache.getorbuild(1) == 7
-        assert cache.counter == 1
-        assert cache.getorbuild(3) == 21
-        assert cache.counter == 2
-        assert cache.getorbuild(1) == 7
-        assert cache.counter == 2
-        assert cache.getorbuild(3) == 21
-        assert cache.counter == 2

pypy/tool/test/test_descriptor.py

-rpython.tool.descriptor import InstanceMethod
-
-class X(object):
-    def f(self, *args, **kwds):
-        return args, kwds
-
-def test_bound():
-    obj = X()
-    obj.x = 12
-    meth = InstanceMethod(X.f.im_func, obj, X)
-    assert meth(1, z=2) == ((1,), {'z': 2})
-
-def test_unbound():
-    obj = X()
-    obj.x = 12
-    meth = InstanceMethod(X.f.im_func, None, X)
-    assert meth(obj, 1, z=2) == ((1,), {'z': 2})
-
-def test_eq_hash():
-    obj1 = X()
-    obj1.x = 12
-    meth1 = InstanceMethod(X.f.im_func, obj1, X)
-    meth1bis = InstanceMethod(X.f.im_func, obj1, X)
-    obj2 = X()
-    obj2.x = 12
-    meth2 = InstanceMethod(X.f.im_func, obj2, X)
-    d = {meth1: 123, meth2: 456}
-    assert len(d) == 2
-    assert d[meth1bis] == 123

pypy/tool/test/test_error.py

-
-""" Tests some error handling routines
-"""
-
-from rpython.translator.translator import TranslationContext
-from rpython.tool.error import AnnotatorError
-from rpython.annotator.model import UnionError
-
-import py
-
-
-def compile_function(function, annotation=[]):
-    t = TranslationContext()
-    t.buildannotator().build_types(function, annotation)
-
-class AAA(object):
-    pass
-
-def test_blocked_inference1():
-    def blocked_inference():
-        return AAA().m()
-
-    py.test.raises(AnnotatorError, compile_function, blocked_inference)
-
-def test_blocked_inference2():
-    def blocked_inference():
-        a = AAA()
-        b = a.x
-        return b
-
-    py.test.raises(AnnotatorError, compile_function, blocked_inference)
-
-def test_someobject():
-    def someobject_degeneration(n):
-        if n == 3:
-            a = "a"
-        else:
-            a = 9
-        return a
-
-    py.test.raises(UnionError, compile_function, someobject_degeneration, [int])
-
-def test_someobject2():
-    def someobject_deg(n):
-        if n == 3:
-            a = "a"
-        else:
-            return AAA()
-        return a
-
-    py.test.raises(UnionError, compile_function, someobject_deg, [int])
-
-def test_eval_someobject():
-    exec("def f(n):\n if n == 2:\n  return 'a'\n else:\n  return 3")
-
-    py.test.raises(UnionError, compile_function, f, [int])
-
-def test_someobject_from_call():
-    def one(x):
-        return str(x)
-
-    def two(x):
-        return int(x)
-
-    def fn(n):
-        if n:
-            to_call = one
-        else:
-            to_call = two
-        return to_call(n)
-
-    try:
-        compile_function(fn, [int])
-    except UnionError, e:
-        assert 'function one' in str(e)
-        assert 'function two' in str(e)

pypy/tool/test/test_frozenlist.py

-import py
-from rpython.tool.frozenlist import frozenlist
-
-def test_frozenlist():
-    l = frozenlist([1, 2, 3])
-    assert l[0] == 1
-    assert l[:2] == [1, 2]
-    assert l.index(2) == 1
-    py.test.raises(TypeError, "l[0] = 1")
-    py.test.raises(TypeError, "del l[0]")
-    py.test.raises(TypeError, "l[:] = []")
-    py.test.raises(TypeError, "del l[:]")
-    py.test.raises(TypeError, "l += []")
-    py.test.raises(TypeError, "l *= 2")
-    py.test.raises(TypeError, "l.append(1)")
-    py.test.raises(TypeError, "l.insert(0, 0)")
-    py.test.raises(TypeError, "l.pop()")
-    py.test.raises(TypeError, "l.remove(1)")
-    py.test.raises(TypeError, "l.reverse()")
-    py.test.raises(TypeError, "l.sort()")
-    py.test.raises(TypeError, "l.extend([])")

pypy/tool/test/test_identitydict.py

-import py
-from rpython.tool.identity_dict import identity_dict, IdentityDictPurePython
-
-class TestIdentityDictNative:
-    identity_dict = identity_dict
-
-    def test_numbers(self):
-        d = self.identity_dict()
-        d[0] = 1
-        d[0.0] = 2
-        d[long(0)] = 3
-
-        assert d
-        assert len(d) == 3
-        d.clear()
-        assert not d
-
-    def test_get(self):
-        d = self.identity_dict()
-        d[None] = 1
-
-        assert d.get(None, 42) == 1
-        assert d.get(None) == 1
-        assert d.get(1) is None
-        assert d.get(1, 42) == 42
-
-    def test_unhashable(self):
-        d = self.identity_dict()
-        d[[]] = 1
-        d[[]] = 2
-        a = []
-        d[a] = 3
-        assert len(d) == 3
-        d[a] = 4
-        assert len(d) == 3
-        assert d[a] == 4
-
-        raises(KeyError, d.__getitem__, [])
-
-    def test_keys(self):
-        d = self.identity_dict()
-        d[[]] = 1
-        d[[]] = 2
-        d[[]] = 3
-
-        assert d.keys() == [[], [], []]
-        assert sorted(d.values()) == [1, 2, 3]
-
-    def test_in(self):
-        d = self.identity_dict()
-        d[None] = 1
-
-        assert None in d
-        assert [] not in d
-
-
-class TestIdentityDictPurePython(TestIdentityDictNative):
-    identity_dict = IdentityDictPurePython

pypy/tool/test/test_killsubprocess.py

-import sys, time
-import subprocess
-from rpython.tool.killsubprocess import killsubprocess
-
-def waitdead(process):
-    for i in range(50):
-        time.sleep(0.1)
-        if process.poll() is not None:
-            break       # ok
-    else:
-        raise AssertionError("the subprocess did not die within 5 seconds")
-
-def test_killsubprocess():
-    popen = subprocess.Popen([sys.executable, '-c', 'raw_input()'],
-                             stdin=subprocess.PIPE)
-    time.sleep(0.9)
-    assert popen.poll() is None
-    assert popen.poll() is None
-    killsubprocess(popen)
-    waitdead(popen)
-
-def test_already_dead_but_no_poll():
-    popen = subprocess.Popen([sys.executable, '-c', 'pass'],
-                             stdin=subprocess.PIPE)
-    time.sleep(3)    # a safe margin to be sure the subprocess is already dead
-    killsubprocess(popen)
-    assert popen.poll() is not None
-
-def test_already_dead_and_polled():
-    popen = subprocess.Popen([sys.executable, '-c', 'pass'],
-                             stdin=subprocess.PIPE)
-    waitdead(popen)
-    killsubprocess(popen)
-    assert popen.poll() is not None

pypy/tool/test/test_leakfinder.py

-import py
-from rpython.tool import leakfinder
-
-def test_start_stop():
-    leakfinder.start_tracking_allocations()
-    assert leakfinder.TRACK_ALLOCATIONS
-    leakfinder.stop_tracking_allocations(True)
-    assert not leakfinder.TRACK_ALLOCATIONS
-
-def test_start_stop_nested():
-    leakfinder.start_tracking_allocations()
-    p2 = leakfinder.start_tracking_allocations()
-    assert leakfinder.TRACK_ALLOCATIONS
-    leakfinder.stop_tracking_allocations(True, prev=p2)
-    assert leakfinder.TRACK_ALLOCATIONS
-    leakfinder.stop_tracking_allocations(True)
-    assert not leakfinder.TRACK_ALLOCATIONS
-
-def test_remember_free():
-    leakfinder.start_tracking_allocations()
-    x = 1234
-    leakfinder.remember_malloc(x)
-    leakfinder.remember_free(x)
-    leakfinder.stop_tracking_allocations(True)
-
-def test_remember_forget():
-    leakfinder.start_tracking_allocations()
-    x = 1234
-    leakfinder.remember_malloc(x)
-    py.test.raises(leakfinder.MallocMismatch,
-                   leakfinder.stop_tracking_allocations, True)
-
-def test_nested_remember_forget_1():
-    leakfinder.start_tracking_allocations()
-    x = 1234
-    leakfinder.remember_malloc(x)
-    p2 = leakfinder.start_tracking_allocations()
-    leakfinder.stop_tracking_allocations(True, prev=p2)
-    py.test.raises(leakfinder.MallocMismatch,
-                   leakfinder.stop_tracking_allocations, True)
-
-def test_nested_remember_forget_2():
-    p2 = leakfinder.start_tracking_allocations()
-    x = 1234
-    leakfinder.remember_malloc(x)
-    py.test.raises(leakfinder.MallocMismatch,
-                   leakfinder.stop_tracking_allocations, True, prev=p2)
-    leakfinder.stop_tracking_allocations(True)
-
-def test_traceback():
-    leakfinder.start_tracking_allocations()
-    x = 1234
-    leakfinder.remember_malloc(x)
-    res = leakfinder.stop_tracking_allocations(check=False)
-    assert res.keys() == [x]
-    print res[x]
-    assert isinstance(res[x], str)
-    assert 'test_traceback' in res[x]
-    assert 'leakfinder.remember_malloc(x)' in res[x]
-
-def test_malloc_mismatch():
-    import sys, traceback, cStringIO
-    sio = cStringIO.StringIO()
-    traceback.print_stack(sys._getframe(), limit=10, file=sio)
-    tb = sio.getvalue()
-    e = leakfinder.MallocMismatch({1234: tb, 2345: tb})
-    print str(e)
-    # grouped entries for 1234 and 2345
-    assert '1234:\n2345:\n' in str(e) or '2345:\n1234:\n' in str(e)
-    assert tb[-80:] in str(e)

pypy/tool/test/test_logparser.py

-from rpython.tool.udir import udir
-from rpython.tool.logparser import *
-
-
-globalpath = udir.join('test_logparser.log')
-globalpath.write("""\
-test1
-[12a0] {foo
-test2a
-test2b
-[12b0] {bar
-test3
-[12e0] bar}
-test4
-[12e5] {bar
-test5a
-test5b
-[12e6] bar}
-test6
-[12f0] foo}
-test7
-""")
-
-
-def test_parse_log_file():
-    log = parse_log_file(str(globalpath))
-    assert log == [
-        ('debug_print', 'test1'),
-        ('foo', 0x12a0, 0x12f0, [
-            ('debug_print', 'test2a'),
-            ('debug_print', 'test2b'),
-            ('bar', 0x12b0, 0x12e0, [
-                ('debug_print', 'test3')]),
-            ('debug_print', 'test4'),
-            ('bar', 0x12e5, 0x12e6, [
-                ('debug_print', 'test5a'),
-                ('debug_print', 'test5b')]),
-            ('debug_print', 'test6')]),
-        ('debug_print', 'test7')]
-
-def test_extract_category():
-    log = parse_log_file(str(globalpath))
-    catbar = list(extract_category(log, 'bar'))
-    assert catbar == ["test3\n", "test5a\ntest5b\n"]
-    assert catbar == list(extract_category(log, 'ba'))
-    catfoo = list(extract_category(log, 'foo'))
-    assert catfoo == ["test2a\ntest2b\ntest4\ntest6\n"]
-    assert catfoo == list(extract_category(log, 'f'))
-    catall = list(extract_category(log, ''))
-    assert catall == catfoo + catbar
-
-def test_gettotaltimes():
-    result = gettotaltimes([
-        ('foo', 2, 17, [
-            ('bar', 4, 5, []),
-            ('bar', 7, 9, []),
-            ]),
-        ('bar', 20, 30, []),
-        ])
-    assert result == {None: 3,              # the hole between 17 and 20
-                      'foo': 15 - 1 - 2,
-                      'bar': 1 + 2 + 10}

pypy/tool/test/test_nullpath.py

-import sys, os
-import py
-from rpython.tool.nullpath import NullPyPathLocal
-
-def test_nullpath(tmpdir):
-    path = NullPyPathLocal(tmpdir)
-    assert repr(path).endswith('[fake]')
-    foo_txt = path.join('foo.txt')
-    assert isinstance(foo_txt, NullPyPathLocal)
-    #
-    f = foo_txt.open('w')
-    assert f.name == os.devnull

pypy/tool/test/test_pairtype.py

-
-from rpython.tool.pairtype import pairtype, pair, extendabletype
-
-def test_binop(): 
-    ### Binary operation example
-    class __extend__(pairtype(int, int)):
-        def add((x, y)):
-            return 'integer: %s+%s' % (x, y)
-        def sub((x, y)):
-            return 'integer: %s-%s' % (x, y)
-
-    class __extend__(pairtype(bool, bool)):
-        def add((x, y)):
-            return 'bool: %s+%s' % (x, y)
-
-    assert pair(3,4).add() == 'integer: 3+4'
-    assert pair(3,4).sub() == 'integer: 3-4'
-    assert pair(3,True).add() == 'integer: 3+True'
-    assert pair(3,True).sub() == 'integer: 3-True'
-    assert pair(False,4).add() == 'integer: False+4'
-    assert pair(False,4).sub() == 'integer: False-4'
-    assert pair(False,True).add() == 'bool: False+True'
-    assert pair(False,True).sub() == 'integer: False-True'
-
-def test_somebuiltin(): 
-    ### Operation on built-in types
-    class MiniPickler:
-        def __init__(self):
-            self.data = []
-        def emit(self, datum):
-            self.data.append(datum)
-
-    class __extend__(pairtype(MiniPickler, int)):
-        def write((pickler, x)):
-            pickler.emit('I%d' % x)
-
-    class __extend__(pairtype(MiniPickler, str)):
-        def write((pickler, x)):
-            pickler.emit('S%s' % x)
-
-    class __extend__(pairtype(MiniPickler, list)):
-        def write((pickler, x)):
-            for item in x:
-                pair(pickler, item).write()
-            pickler.emit('L%d' % len(x))
-
-    p = MiniPickler()
-    pair(p, [1, 2, ['hello', 3]]).write()
-    assert p.data == ['I1', 'I2', 'Shello', 'I3', 'L2', 'L3']
-
-def test_some_multimethod(): 
-    ### Another multimethod example
-    class Block:
-        def __init__(self, exit):
-            self.exit = exit
-    class Jump:
-        pass
-    class Switch:
-        pass
-    
-    class C_Generator:
-        def __init__(self):
-            self.lines = []
-
-    class __extend__(pairtype(C_Generator, Block)):
-        def emit((gen, block), inputvars):
-            gen.lines.append("C code for block")
-            outputvars = inputvars + ['v4', 'v5']
-            pair(gen, block.exit).emit(outputvars)
-
-    class __extend__(pairtype(C_Generator, Jump)):
-        def emit((gen, jump), inputvars):
-            gen.lines.append("goto xyz")
-
-    class __extend__(pairtype(C_Generator, Switch)):
-        def emit((gen, jump), inputvars):
-            gen.lines.append("switch (%s) { ... }" % inputvars[-1])
-
-    g = C_Generator()
-    pair(g, Block(Switch())).emit(['v1', 'v2'])
-    assert g.lines == ["C code for block", "switch (v5) { ... }"] 
-
-    class Lisp_Generator:
-        def __init__(self):
-            self.progn = []
-
-    class __extend__(pairtype(Lisp_Generator, Block)):
-        def emit((gen, block), inputvars):
-            gen.progn.append("(do 'something)")
-
-    g = Lisp_Generator()
-    pair(g, Block(Switch())).emit(['v1', 'v2'])
-    assert g.progn == ["(do 'something)"]
-
-def test_multiple_extend():
-    class A:
-        __metaclass__ = extendabletype
-    class B:
-        __metaclass__ = extendabletype
-
-    class __extend__(A,B):
-
-        def f(self):
-            pass
-
-    assert hasattr(A, 'f')
-    assert hasattr(B, 'f')
-    
-
-        

pypy/tool/test/test_runsubprocess.py

-import py, os
-from pypy.tool.runsubprocess import run_subprocess
-
-def test_no_such_command():
-    py.test.raises(EnvironmentError, run_subprocess,
-                   'this_command_does_not_exist', [])
-    py.test.raises(EnvironmentError, run_subprocess,
-                   'this_command_does_not_exist', [])
-
-def test_echo():
-    if not os.path.exists('/bin/echo'):
-        py.test.skip("there is no /bin/echo")
-    returncode, stdout, stderr = run_subprocess('/bin/echo', 'FooBar')
-    assert returncode == 0
-    assert stdout == 'FooBar\n'
-    assert stderr == ''
-
-def test_false():
-    if not os.path.exists('/bin/false'):
-        py.test.skip("there is no /bin/false")
-    returncode, stdout, stderr = run_subprocess('/bin/false', [])
-    assert returncode == 1
-    assert stdout == ''
-    assert stderr == ''
-
-def test_cat_fail():
-    if not os.path.exists('/bin/cat'):
-        py.test.skip("there is no /bin/cat")
-    returncode, stdout, stderr = run_subprocess('/bin/cat', 'no/such/filename')
-    assert returncode == 1
-    assert stdout == ''
-    assert 'no/such/filename' in stderr
-
-def test_recover_lost_process():
-    if not hasattr(os, 'fork'):
-        py.test.skip("there is no os.fork()")
-    from pypy.tool import runsubprocess
-    import signal
-    os.kill(runsubprocess._child.pid, signal.SIGTERM)
-    runsubprocess._child.wait()
-    test_echo()

pypy/tool/test/test_sourcetools.py

-from rpython.tool.sourcetools import func_with_new_name, func_renamer, rpython_wrapper
-
-def test_rename():
-    def f(x, y=5):
-        return x + y
-    f.prop = int
-
-    g = func_with_new_name(f, "g")
-    assert g(4, 5) == 9
-    assert g.func_name == "g"
-    assert f.func_defaults == (5,)
-    assert g.prop is int
-
-def test_rename_decorator():
-    @func_renamer("g")
-    def f(x, y=5):
-        return x + y
-    f.prop = int
-
-    assert f(4, 5) == 9
-
-    assert f.func_name == "g"
-    assert f.func_defaults == (5,)
-    assert f.prop is int
-
-def test_func_rename_decorator():
-    def bar():
-        'doc'
-
-    bar2 = func_with_new_name(bar, 'bar2')
-    assert bar.func_doc == bar2.func_doc == 'doc'
-
-    bar.func_doc = 'new doc'
-    bar3 = func_with_new_name(bar, 'bar3')
-    assert bar3.func_doc == 'new doc'
-    assert bar2.func_doc != bar3.func_doc
-
-
-def test_rpython_wrapper():
-    calls = []
-
-    def bar(a, b):
-        calls.append(('bar', a, b))
-        return a+b
-
-    template = """
-        def {name}({arglist}):
-            calls.append(('decorated', {arglist}))
-            return {original}({arglist})
-    """
-    bar = rpython_wrapper(bar, template, calls=calls)
-    assert bar(40, 2) == 42
-    assert calls == [
-        ('decorated', 40, 2),
-        ('bar', 40, 2),
-        ]
-
-        

pypy/tool/test/test_udir.py

-
-from rpython.tool import udir
-
-def test_make_udir():
-    root = str(udir.udir.ensure('make_udir1', dir=1))
-    p1 = udir.make_udir(dir=root)
-    p2 = udir.make_udir(dir=root)
-    assert p1.relto(root).startswith('usession-')
-    assert p2.relto(root).startswith('usession-')
-    assert p1.basename.endswith('-0')
-    assert p2.basename.endswith('-1')
-
-def test_make_udir_with_basename():
-    root = str(udir.udir.ensure('make_udir2', dir=1))
-    p1 = udir.make_udir(dir=root, basename='foobar')
-    def assert_relto(path, root, expected):
-        assert path.relto(root) == expected, path.relto(root)
-    assert p1.relto(root) == 'usession-foobar-0'
-    p1 = udir.make_udir(dir=root, basename='-foobar')
-    assert p1.relto(root) == 'usession-foobar-1'
-    p1 = udir.make_udir(dir=root, basename='foobar-')
-    assert p1.relto(root) == 'usession-foobar-2'
-    p1 = udir.make_udir(dir=root, basename='-foobar-')
-    assert p1.relto(root) == 'usession-foobar-3'
-    p1 = udir.make_udir(dir=root, basename='')
-    assert p1.relto(root) == 'usession-0'
-    p1 = udir.make_udir(dir=root, basename='-')
-    assert p1.relto(root) == 'usession-1'
-    p1 = udir.make_udir(dir=root, basename='fun/bar')
-    assert p1.relto(root) == 'usession-fun--bar-0'

rpython/rlib/test/test_cache.py

+from rpython.rlib.cache import Cache 
+
+class MyCache(Cache):
+    counter = 0
+    def _build(self, key):
+        self.counter += 1
+        return key*7
+
+class TestCache: 
+    def test_getorbuild(self):
+        cache = MyCache()
+        assert cache.getorbuild(1) == 7
+        assert cache.counter == 1
+        assert cache.getorbuild(1) == 7
+        assert cache.counter == 1
+        assert cache.getorbuild(3) == 21
+        assert cache.counter == 2
+        assert cache.getorbuild(1) == 7
+        assert cache.counter == 2
+        assert cache.getorbuild(3) == 21
+        assert cache.counter == 2

rpython/rlib/test/test_rthread.py

+import gc
+from rpython.rlib.rthread import *
+from rpython.translator.c.test.test_boehm import AbstractGCTestClass
+from rpython.rtyper.lltypesystem import lltype, rffi
+import py
+
+def setup_module(mod):
+    # Hack to avoid a deadlock if the module is run after other test files :-(
+    # In this module, we assume that rthread.start_new_thread() is not
+    # providing us with a GIL equivalent, except in test_gc_locking
+    # which installs its own aroundstate.
+    rffi.aroundstate._cleanup_()
+
+def test_lock():
+    l = allocate_lock()
+    ok1 = l.acquire(True)
+    ok2 = l.acquire(False)
+    l.release()
+    ok3 = l.acquire(False)
+    res = ok1 and not ok2 and ok3
+    assert res == 1
+
+def test_thread_error():
+    l = allocate_lock()
+    try:
+        l.release()
+    except error:
+        pass
+    else:
+        py.test.fail("Did not raise")
+
+
+class AbstractThreadTests(AbstractGCTestClass):
+    use_threads = True
+
+    def test_start_new_thread(self):
+        import time
+
+        class State:
+            pass
+        state = State()
+
+        def bootstrap1():
+            state.my_thread_ident1 = get_ident()
+        def bootstrap2():
+            state.my_thread_ident2 = get_ident()
+
+        def f():
+            state.my_thread_ident1 = get_ident()
+            state.my_thread_ident2 = get_ident()
+            start_new_thread(bootstrap1, ())
+            start_new_thread(bootstrap2, ())
+            willing_to_wait_more = 1000
+            while (state.my_thread_ident1 == get_ident() or
+                   state.my_thread_ident2 == get_ident()):
+                willing_to_wait_more -= 1
+                if not willing_to_wait_more:
+                    raise Exception("thread didn't start?")
+                time.sleep(0.01)
+            return 42
+
+        fn = self.getcompiled(f, [])
+        res = fn()
+        assert res == 42
+
+    def test_gc_locking(self):
+        import time
+        from rpython.rlib.objectmodel import invoke_around_extcall
+        from rpython.rlib.debug import ll_assert
+
+        class State:
+            pass
+        state = State()
+
+        class Z:
+            def __init__(self, i, j):
+                self.i = i
+                self.j = j
+            def run(self):
+                j = self.j
+                if self.i > 1:
+                    g(self.i-1, self.j * 2)
+                    ll_assert(j == self.j, "1: bad j")
+                    g(self.i-2, self.j * 2 + 1)
+                else:
+                    if len(state.answers) % 7 == 5:
+                        gc.collect()
+                    state.answers.append(self.j)
+                ll_assert(j == self.j, "2: bad j")
+            run._dont_inline_ = True
+
+        def before_extcall():
+            release_NOAUTO(state.gil)
+        before_extcall._gctransformer_hint_cannot_collect_ = True
+        # ^^^ see comments in gil.py about this hint
+
+        def after_extcall():
+            acquire_NOAUTO(state.gil, True)
+            gc_thread_run()
+        after_extcall._gctransformer_hint_cannot_collect_ = True
+        # ^^^ see comments in gil.py about this hint
+
+        def bootstrap():
+            # after_extcall() is called before we arrive here.
+            # We can't just acquire and release the GIL manually here,
+            # because it is unsafe: bootstrap() is called from a rffi
+            # callback which checks for and reports exceptions after
+            # bootstrap() returns.  The exception checking code must be
+            # protected by the GIL too.
+            z = state.z
+            state.z = None
+            state.bootstrapping.release()
+            z.run()
+            gc_thread_die()
+            # before_extcall() is called after we leave here
+
+        def g(i, j):
+            state.bootstrapping.acquire(True)
+            state.z = Z(i, j)
+            gc_thread_prepare()
+            start_new_thread(bootstrap, ())
+
+        def f():
+            state.gil = allocate_ll_lock()
+            acquire_NOAUTO(state.gil, True)
+            state.bootstrapping = allocate_lock()
+            state.answers = []
+            state.finished = 0
+            # the next line installs before_extcall() and after_extcall()
+            # to be called automatically around external function calls.
+            invoke_around_extcall(before_extcall, after_extcall)
+
+            g(10, 1)
+            done = False
+            willing_to_wait_more = 2000
+            while not done:
+                if not willing_to_wait_more:
+                    break
+                willing_to_wait_more -= 1
+                done = len(state.answers) == expected
+
+                time.sleep(0.01)
+
+            time.sleep(0.1)
+
+            return len(state.answers)
+
+        expected = 89
+        try:
+            fn = self.getcompiled(f, [])
+        finally:
+            rffi.aroundstate._cleanup_()
+        answers = fn()
+        assert answers == expected
+
+#class TestRunDirectly(AbstractThreadTests):
+#    def getcompiled(self, f, argtypes):
+#        return f
+# These are disabled because they crash occasionally for bad reasons
+# related to the fact that ll2ctypes is not at all thread-safe
+
+class TestUsingBoehm(AbstractThreadTests):
+    gcpolicy = 'boehm'
+
+class TestUsingFramework(AbstractThreadTests):
+    gcpolicy = 'generation'

rpython/tool/test/test_descriptor.py

+rpython.tool.descriptor import InstanceMethod
+
+class X(object):
+    def f(self, *args, **kwds):
+        return args, kwds
+
+def test_bound():
+    obj = X()
+    obj.x = 12
+    meth = InstanceMethod(X.f.im_func, obj, X)
+    assert meth(1, z=2) == ((1,), {'z': 2})
+
+def test_unbound():
+    obj = X()
+    obj.x = 12
+    meth = InstanceMethod(X.f.im_func, None, X)
+    assert meth(obj, 1, z=2) == ((1,), {'z': 2})
+
+def test_eq_hash():
+    obj1 = X()
+    obj1.x = 12
+    meth1 = InstanceMethod(X.f.im_func, obj1, X)
+    meth1bis = InstanceMethod(X.f.im_func, obj1, X)
+    obj2 = X()
+    obj2.x = 12
+    meth2 = InstanceMethod(X.f.im_func, obj2, X)
+    d = {meth1: 123, meth2: 456}
+    assert len(d) == 2
+    assert d[meth1bis] == 123

rpython/tool/test/test_error.py

+
+""" Tests some error handling routines
+"""
+
+from rpython.translator.translator import TranslationContext
+from rpython.tool.error import AnnotatorError
+from rpython.annotator.model import UnionError
+
+import py
+
+
+def compile_function(function, annotation=[]):
+    t = TranslationContext()
+    t.buildannotator().build_types(function, annotation)
+
+class AAA(object):
+    pass
+
+def test_blocked_inference1():
+    def blocked_inference():
+        return AAA().m()
+
+    py.test.raises(AnnotatorError, compile_function, blocked_inference)
+
+def test_blocked_inference2():
+    def blocked_inference():
+        a = AAA()
+        b = a.x
+        return b
+
+    py.test.raises(AnnotatorError, compile_function, blocked_inference)
+
+def test_someobject():
+    def someobject_degeneration(n):
+        if n == 3:
+            a = "a"
+        else:
+            a = 9
+        return a
+
+    py.test.raises(UnionError, compile_function, someobject_degeneration, [int])
+
+def test_someobject2():
+    def someobject_deg(n):
+        if n == 3:
+            a = "a"
+        else:
+            return AAA()
+        return a
+
+    py.test.raises(UnionError, compile_function, someobject_deg, [int])
+
+def test_eval_someobject():
+    exec("def f(n):\n if n == 2:\n  return 'a'\n else:\n  return 3")
+
+    py.test.raises(UnionError, compile_function, f, [int])
+
+def test_someobject_from_call():
+    def one(x):
+        return str(x)
+
+    def two(x):
+        return int(x)
+
+    def fn(n):
+        if n:
+            to_call = one
+        else:
+            to_call = two
+        return to_call(n)
+
+    try:
+        compile_function(fn, [int])
+    except UnionError, e:
+        assert 'function one' in str(e)
+        assert 'function two' in str(e)

rpython/tool/test/test_frozenlist.py

+import py
+from rpython.tool.frozenlist import frozenlist
+
+def test_frozenlist():
+    l = frozenlist([1, 2, 3])
+    assert l[0] == 1
+    assert l[:2] == [1, 2]
+    assert l.index(2) == 1
+    py.test.raises(TypeError, "l[0] = 1")
+    py.test.raises(TypeError, "del l[0]")
+    py.test.raises(TypeError, "l[:] = []")
+    py.test.raises(TypeError, "del l[:]")
+    py.test.raises(TypeError, "l += []")
+    py.test.raises(TypeError, "l *= 2")
+    py.test.raises(TypeError, "l.append(1)")
+    py.test.raises(TypeError, "l.insert(0, 0)")
+    py.test.raises(TypeError, "l.pop()")
+    py.test.raises(TypeError, "l.remove(1)")
+    py.test.raises(TypeError, "l.reverse()")
+    py.test.raises(TypeError, "l.sort()")
+    py.test.raises(TypeError, "l.extend([])")

rpython/tool/test/test_identitydict.py

+import py
+from rpython.tool.identity_dict import identity_dict, IdentityDictPurePython
+
+class TestIdentityDictNative:
+    identity_dict = identity_dict
+
+    def test_numbers(self):
+        d = self.identity_dict()
+        d[0] = 1
+        d[0.0] = 2
+        d[long(0)] = 3
+
+        assert d
+        assert len(d) == 3
+        d.clear()
+        assert not d
+
+    def test_get(self):
+        d = self.identity_dict()
+        d[None] = 1
+
+        assert d.get(None, 42) == 1
+        assert d.get(None) == 1
+        assert d.get(1) is None
+        assert d.get(1, 42) == 42
+
+    def test_unhashable(self):
+        d = self.identity_dict()
+        d[[]] = 1
+        d[[]] = 2
+        a = []
+        d[a] = 3
+        assert len(d) == 3
+        d[a] = 4
+        assert len(d) == 3
+        assert d[a] == 4
+
+        raises(KeyError, d.__getitem__, [])
+
+    def test_keys(self):
+        d = self.identity_dict()
+        d[[]] = 1
+        d[[]] = 2
+        d[[]] = 3
+
+        assert d.keys() == [[], [], []]
+        assert sorted(d.values()) == [1, 2, 3]
+
+    def test_in(self):
+        d = self.identity_dict()
+        d[None] = 1
+
+        assert None in d
+        assert [] not in d
+
+
+class TestIdentityDictPurePython(TestIdentityDictNative):
+    identity_dict = IdentityDictPurePython

rpython/tool/test/test_killsubprocess.py

+import sys, time
+import subprocess
+from rpython.tool.killsubprocess import killsubprocess
+
+def waitdead(process):
+    for i in range(50):
+        time.sleep(0.1)
+        if process.poll() is not None:
+            break       # ok
+    else:
+        raise AssertionError("the subprocess did not die within 5 seconds")
+
+def test_killsubprocess():
+    popen = subprocess.Popen([sys.executable, '-c', 'raw_input()'],
+                             stdin=subprocess.PIPE)
+    time.sleep(0.9)
+    assert popen.poll() is None
+    assert popen.poll() is None
+    killsubprocess(popen)
+    waitdead(popen)
+
+def test_already_dead_but_no_poll():
+    popen = subprocess.Popen([sys.executable, '-c', 'pass'],
+                             stdin=subprocess.PIPE)
+    time.sleep(3)    # a safe margin to be sure the subprocess is already dead
+    killsubprocess(popen)
+    assert popen.poll() is not None
+
+def test_already_dead_and_polled():
+    popen = subprocess.Popen([sys.executable, '-c', 'pass'],
+                             stdin=subprocess.PIPE)
+    waitdead(popen)
+    killsubprocess(popen)
+    assert popen.poll() is not None

rpython/tool/test/test_leakfinder.py

+import py
+from rpython.tool import leakfinder
+
+def test_start_stop():
+    leakfinder.start_tracking_allocations()
+    assert leakfinder.TRACK_ALLOCATIONS
+    leakfinder.stop_tracking_allocations(True)
+    assert not leakfinder.TRACK_ALLOCATIONS
+
+def test_start_stop_nested():
+    leakfinder.start_tracking_allocations()
+    p2 = leakfinder.start_tracking_allocations()
+    assert leakfinder.TRACK_ALLOCATIONS
+    leakfinder.stop_tracking_allocations(True, prev=p2)
+    assert leakfinder.TRACK_ALLOCATIONS
+    leakfinder.stop_tracking_allocations(True)
+    assert not leakfinder.TRACK_ALLOCATIONS
+
+def test_remember_free():
+    leakfinder.start_tracking_allocations()
+    x = 1234
+    leakfinder.remember_malloc(x)
+    leakfinder.remember_free(x)
+    leakfinder.stop_tracking_allocations(True)
+
+def test_remember_forget():
+    leakfinder.start_tracking_allocations()
+    x = 1234
+    leakfinder.remember_malloc(x)
+    py.test.raises(leakfinder.MallocMismatch,
+                   leakfinder.stop_tracking_allocations, True)
+
+def test_nested_remember_forget_1():
+    leakfinder.start_tracking_allocations()
+    x = 1234
+    leakfinder.remember_malloc(x)
+    p2 = leakfinder.start_tracking_allocations()
+    leakfinder.stop_tracking_allocations(True, prev=p2)
+    py.test.raises(leakfinder.MallocMismatch,
+                   leakfinder.stop_tracking_allocations, True)
+
+def test_nested_remember_forget_2():
+    p2 = leakfinder.start_tracking_allocations()
+    x = 1234
+    leakfinder.remember_malloc(x)
+    py.test.raises(leakfinder.MallocMismatch,
+                   leakfinder.stop_tracking_allocations, True, prev=p2)
+    leakfinder.stop_tracking_allocations(True)
+
+def test_traceback():
+    leakfinder.start_tracking_allocations()
+    x = 1234
+    leakfinder.remember_malloc(x)
+    res = leakfinder.stop_tracking_allocations(check=False)
+    assert res.keys() == [x]
+    print res[x]
+    assert isinstance(res[x], str)
+    assert 'test_traceback' in res[x]
+    assert 'leakfinder.remember_malloc(x)' in res[x]
+
+def test_malloc_mismatch():
+    import sys, traceback, cStringIO
+    sio = cStringIO.StringIO()
+    traceback.print_stack(sys._getframe(), limit=10, file=sio)
+    tb = sio.getvalue()
+    e = leakfinder.MallocMismatch({1234: tb, 2345: tb})
+    print str(e)
+    # grouped entries for 1234 and 2345
+    assert '1234:\n2345:\n' in str(e) or '2345:\n1234:\n' in str(e)
+    assert tb[-80:] in str(e)

rpython/tool/test/test_logparser.py

+from rpython.tool.udir import udir
+from rpython.tool.logparser import *
+
+
+globalpath = udir.join('test_logparser.log')
+globalpath.write("""\
+test1
+[12a0] {foo
+test2a
+test2b
+[12b0] {bar
+test3
+[12e0] bar}
+test4
+[12e5] {bar
+test5a
+test5b
+[12e6] bar}
+test6
+[12f0] foo}
+test7
+""")
+
+
+def test_parse_log_file():
+    log = parse_log_file(str(globalpath))
+    assert log == [
+        ('debug_print', 'test1'),
+        ('foo', 0x12a0, 0x12f0, [
+            ('debug_print', 'test2a'),
+            ('debug_print', 'test2b'),
+            ('bar', 0x12b0, 0x12e0, [
+                ('debug_print', 'test3')]),
+            ('debug_print', 'test4'),
+            ('bar', 0x12e5, 0x12e6, [
+                ('debug_print', 'test5a'),
+                ('debug_print', 'test5b')]),
+            ('debug_print', 'test6')]),
+        ('debug_print', 'test7')]
+
+def test_extract_category():
+    log = parse_log_file(str(globalpath))
+    catbar = list(extract_category(log, 'bar'))
+    assert catbar == ["test3\n", "test5a\ntest5b\n"]
+    assert catbar == list(extract_category(log, 'ba'))
+    catfoo = list(extract_category(log, 'foo'))
+    assert catfoo == ["test2a\ntest2b\ntest4\ntest6\n"]
+    assert catfoo == list(extract_category(log, 'f'))
+    catall = list(extract_category(log, ''))
+    assert catall == catfoo + catbar
+
+def test_gettotaltimes():
+    result = gettotaltimes([
+        ('foo', 2, 17, [
+            ('bar', 4, 5, []),
+            ('bar', 7, 9, []),
+            ]),
+        ('bar', 20, 30, []),
+        ])
+    assert result == {None: 3,              # the hole between 17 and 20
+                      'foo': 15 - 1 - 2,
+                      'bar': 1 + 2 + 10}

rpython/tool/test/test_nullpath.py

+import sys, os
+import py
+from rpython.tool.nullpath import NullPyPathLocal
+
+def test_nullpath(tmpdir):
+    path = NullPyPathLocal(tmpdir)
+    assert repr(path).endswith('[fake]')
+    foo_txt = path.join('foo.txt')
+    assert isinstance(foo_txt, NullPyPathLocal)
+    #
+    f = foo_txt.open('w')
+    assert f.name == os.devnull

rpython/tool/test/test_pairtype.py

+
+from rpython.tool.pairtype import pairtype, pair, extendabletype
+
+def test_binop(): 
+    ### Binary operation example
+    class __extend__(pairtype(int, int)):
+        def add((x, y)):
+            return 'integer: %s+%s' % (x, y)
+        def sub((x, y)):
+            return 'integer: %s-%s' % (x, y)
+
+    class __extend__(pairtype(bool, bool)):
+        def add((x, y)):
+            return 'bool: %s+%s' % (x, y)
+
+    assert pair(3,4).add() == 'integer: 3+4'
+    assert pair(3,4).sub() == 'integer: 3-4'
+    assert pair(3,True).add() == 'integer: 3+True'
+    assert pair(3,True).sub() == 'integer: 3-True'
+    assert pair(False,4).add() == 'integer: False+4'
+    assert pair(False,4).sub() == 'integer: False-4'
+    assert pair(False,True).add() == 'bool: False+True'
+    assert pair(False,True).sub() == 'integer: False-True'
+
+def test_somebuiltin(): 
+    ### Operation on built-in types
+    class MiniPickler:
+        def __init__(self):
+            self.data = []
+        def emit(self, datum):
+            self.data.append(datum)
+
+    class __extend__(pairtype(MiniPickler, int)):
+        def write((pickler, x)):
+            pickler.emit('I%d' % x)
+
+    class __extend__(pairtype(MiniPickler, str)):
+        def write((pickler, x)):
+            pickler.emit('S%s' % x)
+
+    class __extend__(pairtype(MiniPickler, list)):
+        def write((pickler, x)):
+            for item in x:
+                pair(pickler, item).write()
+            pickler.emit('L%d' % len(x))
+
+    p = MiniPickler()
+    pair(p, [1, 2, ['hello', 3]]).write()
+    assert p.data == ['I1', 'I2', 'Shello', 'I3', 'L2', 'L3']
+
+def test_some_multimethod(): 
+    ### Another multimethod example
+    class Block:
+        def __init__(self, exit):
+            self.exit = exit
+    class Jump:
+        pass
+    class Switch:
+        pass
+    
+    class C_Generator:
+        def __init__(self):
+            self.lines = []
+
+    class __extend__(pairtype(C_Generator, Block)):
+        def emit((gen, block), inputvars):
+            gen.lines.append("C code for block")
+            outputvars = inputvars + ['v4', 'v5']
+            pair(gen, block.exit).emit(outputvars)
+
+    class __extend__(pairtype(C_Generator, Jump)):
+        def emit((gen, jump), inputvars):
+            gen.lines.append("goto xyz")
+
+    class __extend__(pairtype(C_Generator, Switch)):
+        def emit((gen, jump), inputvars):
+            gen.lines.append("switch (%s) { ... }" % inputvars[-1])
+
+    g = C_Generator()
+    pair(g, Block(Switch())).emit(['v1', 'v2'])
+    assert g.lines == ["C code for block", "switch (v5) { ... }"] 
+
+    class Lisp_Generator:
+        def __init__(self):
+            self.progn = []
+
+    class __extend__(pairtype(Lisp_Generator, Block)):
+        def emit((gen, block), inputvars):
+            gen.progn.append("(do 'something)")
+
+    g = Lisp_Generator()
+    pair(g, Block(Switch())).emit(['v1', 'v2'])
+    assert g.progn == ["(do 'something)"]
+
+def test_multiple_extend():
+    class A:
+        __metaclass__ = extendabletype
+    class B:
+        __metaclass__ = extendabletype
+
+    class __extend__(A,B):
+
+        def f(self):
+            pass
+
+    assert hasattr(A, 'f')
+    assert hasattr(B, 'f')
+    
+
+        

rpython/tool/test/test_runsubprocess.py

+import py, os
+from pypy.tool.runsubprocess import run_subprocess
+
+def test_no_such_command():
+    py.test.raises(EnvironmentError, run_subprocess,
+                   'this_command_does_not_exist', [])
+    py.test.raises(EnvironmentError, run_subprocess,
+                   'this_command_does_not_exist', [])
+
+def test_echo():
+    if not os.path.exists('/bin/echo'):
+        py.test.skip("there is no /bin/echo")
+    returncode, stdout, stderr = run_subprocess('/bin/echo', 'FooBar')
+    assert returncode == 0
+    assert stdout == 'FooBar\n'
+    assert stderr == ''
+
+def test_false():
+    if not os.path.exists('/bin/false'):
+        py.test.skip("there is no /bin/false")
+    returncode, stdout, stderr = run_subprocess('/bin/false', [])
+    assert returncode == 1
+    assert stdout == ''
+    assert stderr == ''
+
+def test_cat_fail():
+    if not os.path.exists('/bin/cat'):
+        py.test.skip("there is no /bin/cat")
+    returncode, stdout, stderr = run_subprocess('/bin/cat', 'no/such/filename')
+    assert returncode == 1
+    assert stdout == ''
+    assert 'no/such/filename' in stderr
+
+def test_recover_lost_process():
+    if not hasattr(os, 'fork'):
+        py.test.skip("there is no os.fork()")
+    from pypy.tool import runsubprocess
+    import signal
+    os.kill(runsubprocess._child.pid, signal.SIGTERM)
+    runsubprocess._child.wait()
+    test_echo()

rpython/tool/test/test_sourcetools.py

+from rpython.tool.sourcetools import func_with_new_name, func_renamer, rpython_wrapper
+
+def test_rename():
+    def f(x, y=5):
+        return x + y
+    f.prop = int
+
+    g = func_with_new_name(f, "g")
+    assert g(4, 5) == 9
+    assert g.func_name == "g"
+    assert f.func_defaults == (5,)
+    assert g.prop is int
+
+def test_rename_decorator():
+    @func_renamer("g")
+    def f(x, y=5):
+        return x + y
+    f.prop = int
+
+    assert f(4, 5) == 9
+
+    assert f.func_name == "g"
+    assert f.func_defaults == (5,)
+    assert f.prop is int
+
+def test_func_rename_decorator():
+    def bar():
+        'doc'
+
+    bar2 = func_with_new_name(bar, 'bar2')
+    assert bar.func_doc == bar2.func_doc == 'doc'
+
+    bar.func_doc = 'new doc'
+    bar3 = func_with_new_name(bar, 'bar3')
+    assert bar3.func_doc == 'new doc'
+    assert bar2.func_doc != bar3.func_doc
+
+
+def test_rpython_wrapper():
+    calls = []
+
+    def bar(a, b):
+        calls.append(('bar', a, b))
+        return a+b
+
+    template = """
+        def {name}({arglist}):
+            calls.append(('decorated', {arglist}))
+            return {original}({arglist})
+    """
+    bar = rpython_wrapper(bar, template, calls=calls)
+    assert bar(40, 2) == 42
+    assert calls == [
+        ('decorated', 40, 2),
+        ('bar', 40, 2),
+        ]
+
+        

rpython/tool/test/test_udir.py

+
+from rpython.tool import udir
+
+def test_make_udir():
+    root = str(udir.udir.ensure('make_udir1', dir=1))
+    p1 = udir.make_udir(dir=root)
+    p2 = udir.make_udir(dir=root)
+    assert p1.relto(root).startswith('usession-')
+    assert p2.relto(root).startswith('usession-')
+    assert p1.basename.endswith('-0')
+    assert p2.basename.endswith('-1')
+
+def test_make_udir_with_basename():
+    root = str(udir.udir.ensure('make_udir2', dir=1))
+    p1 = udir.make_udir(dir=root, basename='foobar')
+    def assert_relto(path, root, expected):
+        assert path.relto(root) == expected, path.relto(root)
+    assert p1.relto(root) == 'usession-foobar-0'
+    p1 = udir.make_udir(dir=root, basename='-foobar')
+    assert p1.relto(root) == 'usession-foobar-1'
+    p1 = udir.make_udir(dir=root, basename='foobar-')
+    assert p1.relto(root) == 'usession-foobar-2'
+    p1 = udir.make_udir(dir=root, basename='-foobar-')
+    assert p1.relto(root) == 'usession-foobar-3'
+    p1 = udir.make_udir(dir=root, basename='')
+    assert p1.relto(root) == 'usession-0'
+    p1 = udir.make_udir(dir=root, basename='-')
+    assert p1.relto(root) == 'usession-1'
+    p1 = udir.make_udir(dir=root, basename='fun/bar')
+    assert p1.relto(root) == 'usession-fun--bar-0'