Source / tests /

"""Run tests in the farm subdirectory.  Designed for nose."""

import difflib, filecmp, fnmatch, glob, os, re, shutil, sys
from nose.plugins.skip import SkipTest

from tests.backtest import run_command, execfile      # pylint: disable=W0622

from coverage.control import _TEST_NAME_FILE

def test_farm(clean_only=False):
    """A test-generating function for nose to find and run."""
    for fname in glob.glob("tests/farm/*/*.py"):
        case = FarmTestCase(fname, clean_only)
        yield (case,)

class FarmTestCase(object):
    """A test case from the farm tree.

    Tests are short Python script files, often called

        copy("src", "out")
            coverage -x
            coverage -a
            ''', rundir="out")
        compare("out", "gold", "*,cover")

    Verbs (copy, run, compare, clean) are methods in this class.  FarmTestCase
    has options to allow various uses of the test cases (normal execution,
    cleaning-only, or run and leave the results for debugging).

    def __init__(self, runpy, clean_only=False, dont_clean=False):
        """Create a test case from a file.

        `clean_only` means that only the clean() action is executed.
        `dont_clean` means that the clean() action is not executed.

        self.description = runpy
        self.dir, self.runpy = os.path.split(runpy)
        self.clean_only = clean_only
        self.dont_clean = dont_clean

    def cd(self, newdir):
        """Change the current directory, and return the old one."""
        cwd = os.getcwd()
        return cwd

    def addtopath(self, directory):
        """Add `directory` to the path, and return the old path."""
        oldpath = sys.path[:]
        if directory is not None:
            sys.path.insert(0, directory)
        return oldpath

    def restorepath(self, path):
        """Restore the system path to `path`."""
        sys.path = path

    def __call__(self):
        """Execute the test from the file.

        if _TEST_NAME_FILE:
            f = open(_TEST_NAME_FILE, "w")
            f.write(self.description.replace("/", "_"))

        cwd =

        # Prepare a dictionary of globals for the files to use.
        fns = """
            copy run runfunc compare contains doesnt_contain clean skip
        if self.clean_only:
            glo = dict([(fn, self.noop) for fn in fns])
            glo['clean'] = self.clean
            glo = dict([(fn, getattr(self, fn)) for fn in fns])
            if self.dont_clean:                 # pragma: not covered
                glo['clean'] = self.noop

        old_mods = dict(sys.modules)
            execfile(self.runpy, glo)
            # Remove any new modules imported during the test run. This lets us
            # import the same source files for more than one test.
            to_del = [m for m in sys.modules if m not in old_mods]
            for m in to_del:
                del sys.modules[m]

    def run_fully(self):        # pragma: not covered
        """Run as a full test case, with setUp and tearDown."""

    def fnmatch_list(self, files, file_pattern):
        """Filter the list of `files` to only those that match `file_pattern`.

        If `file_pattern` is None, then return the entire list of files.

        Returns a list of the filtered files.

        if file_pattern:
            files = [f for f in files if fnmatch.fnmatch(f, file_pattern)]
        return files

    def setUp(self):
        """Test set up, run by nose before __call__."""

        # Modules should be importable from the current directory.
        self.old_syspath = sys.path[:]
        sys.path.insert(0, '')

    def tearDown(self):
        """Test tear down, run by nose after __call__."""
        # Make sure no matter what, the test is cleaned up.
        if not self.dont_clean:         # pragma: part covered
            self.clean_only = True

        # Restore the original sys.path
        sys.path = self.old_syspath

    # Functions usable inside farm files

    def noop(self, *args, **kwargs):
        """A no-op function to stub out run, copy, etc, when only cleaning."""

    def copy(self, src, dst):
        """Copy a directory."""

        if os.path.exists(dst):
        shutil.copytree(src, dst)

    def run(self, cmds, rundir="src", outfile=None):
        """Run a list of commands.

        `cmds` is a string, commands separated by newlines.
        `rundir` is the directory in which to run the commands.
        `outfile` is a filename to redirect stdout to.

        cwd =
        if outfile:
            fout = open(outfile, "a+")
            for cmd in cmds.split("\n"):
                cmd = cmd.strip()
                if not cmd:
                retcode, output = run_command(cmd)
                if outfile:
                if retcode:
                    raise Exception("command exited abnormally")
            if outfile:

    def runfunc(self, fn, rundir="src", addtopath=None):
        """Run a function.

        `fn` is a callable.
        `rundir` is the directory in which to run the function.


        cwd =
        oldpath = self.addtopath(addtopath)

    def compare(self, dir1, dir2, file_pattern=None, size_within=0,
            left_extra=False, right_extra=False, scrubs=None
        """Compare files matching `file_pattern` in `dir1` and `dir2`.

        `dir2` is interpreted as a prefix, with Python version numbers appended
        to find the actual directory to compare with. "foo" will compare against
        "foo_v241", "foo_v24", "foo_v2", or "foo", depending on which directory
        is found first.

        `size_within` is a percentage delta for the file sizes.  If non-zero,
        then the file contents are not compared (since they are expected to
        often be different), but the file sizes must be within this amount.
        For example, size_within=10 means that the two files' sizes must be
        within 10 percent of each other to compare equal.

        `left_extra` true means the left directory can have extra files in it
        without triggering an assertion.  `right_extra` means the right
        directory can.

        `scrubs` is a list of pairs, regex find and replace patterns to use to
        scrub the files of unimportant differences.

        An assertion will be raised if the directories fail one of their

        # Search for a dir2 with a version suffix.
        version_suff = ''.join(map(str, sys.version_info[:3]))
        while version_suff:
            trydir = dir2 + '_v' + version_suff
            if os.path.exists(trydir):
                dir2 = trydir
            version_suff = version_suff[:-1]

        assert os.path.exists(dir1), "Left directory missing: %s" % dir1
        assert os.path.exists(dir2), "Right directory missing: %s" % dir2

        dc = filecmp.dircmp(dir1, dir2)
        diff_files = self.fnmatch_list(dc.diff_files, file_pattern)
        left_only = self.fnmatch_list(dc.left_only, file_pattern)
        right_only = self.fnmatch_list(dc.right_only, file_pattern)

        if size_within:
            # The files were already compared, use the diff_files list as a
            # guide for size comparison.
            wrong_size = []
            for f in diff_files:
                left = open(os.path.join(dir1, f), "rb").read()
                right = open(os.path.join(dir2, f), "rb").read()
                size_l, size_r = len(left), len(right)
                big, little = max(size_l, size_r), min(size_l, size_r)
                if (big - little) / float(little) > size_within/100.0:
                    # print "%d %d" % (big, little)
                    # print "Left: ---\n%s\n-----\n%s" % (left, right)
            assert not wrong_size, (
                "File sizes differ between %s and %s: %s" % (
                    dir1, dir2, wrong_size
            # filecmp only compares in binary mode, but we want text mode.  So
            # look through the list of different files, and compare them
            # ourselves.
            text_diff = []
            for f in diff_files:
                left = open(os.path.join(dir1, f), "rU").readlines()
                right = open(os.path.join(dir2, f), "rU").readlines()
                if scrubs:
                    left = self._scrub(left, scrubs)
                    right = self._scrub(right, scrubs)
                if left != right:
                    print("".join(list(difflib.Differ().compare(left, right))))
            assert not text_diff, "Files differ: %s" % text_diff

        if not left_extra:
            assert not left_only, "Files in %s only: %s" % (dir1, left_only)
        if not right_extra:
            assert not right_only, "Files in %s only: %s" % (dir2, right_only)

    def _scrub(self, strlist, scrubs):
        """Scrub uninteresting data from the strings in `strlist`.

        `scrubs is a list of (find, replace) pairs of regexes that are used on
        each string in `strlist`.  A list of scrubbed strings is returned.

        scrubbed = []
        for s in strlist:
            for rgx_find, rgx_replace in scrubs:
                s = re.sub(rgx_find, rgx_replace, s)
        return scrubbed

    def contains(self, filename, *strlist):
        """Check that the file contains all of a list of strings.

        An assert will be raised if one of the arguments in `strlist` is
        missing in `filename`.

        text = open(filename, "r").read()
        for s in strlist:
            assert s in text, "Missing content in %s: %r" % (filename, s)

    def doesnt_contain(self, filename, *strlist):
        """Check that the file contains none of a list of strings.

        An assert will be raised if any of the strings in strlist appears in

        text = open(filename, "r").read()
        for s in strlist:
            assert s not in text, "Forbidden content in %s: %r" % (filename, s)

    def clean(self, cleandir):
        """Clean `cleandir` by removing it and all its children completely."""
        # rmtree gives mysterious failures on Win7, so retry a "few" times.
        # I've seen it take over 100 tries, so, 1000!  This is probably the
        # most unpleasant hack I've written in a long time...
        tries = 1000
        while tries:                    # pragma: part covered
            if os.path.exists(cleandir):
                except OSError:         # pragma: not covered
                    if tries == 1:
                        tries -= 1

    def skip(self, msg=None):
        """Skip the current test."""
        raise SkipTest(msg)

def main():     # pragma: not covered
    """Command-line access to test_farm.


    run testcase    - Run a single test case.
    out testcase    - Run a test case, but don't clean up, to see the output.
    clean           - Clean all the output for all tests.

    op = 'help'
        op = sys.argv[1]
    except IndexError:

    if op == 'run':
        # Run the test for real.
        case = FarmTestCase(sys.argv[2])
    elif op == 'out':
        # Run the test, but don't clean up, so we can examine the output.
        case = FarmTestCase(sys.argv[2], dont_clean=True)
    elif op == 'clean':
        # Run all the tests, but just clean.
        for test in test_farm(clean_only=True):

# So that we can run just one farm at a time.
if __name__ == '__main__':