Source

dag-discovery / src / testing.py

Full commit
# Efficient non-chatty discovery of nodes missing in one DAG vs another.
#
# Copyright 2009 Peter Arrenbrecht <peter@arrenbrecht.ch>
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2, incorporated herein by reference.

import dagutil, dagprinter
import sys, random, time

# Required test cases:
#
#  * Both DAGs empty.
#  * One DAG empty.
#  * Both DAGs equal.
#  * One DAG a small superset of the other.
#  * One DAG a large superset of the other.
#  * Both have a separate long chain leading off from a common root.
#  * Bugfix off of an old root.

def makedags(desc):
    dag = dagutil.MemDAG.fromdesc(desc)
    return [dag.subset(dag.labels[l]) for l in 'ab']

def assertnodes(exp, act):
    if exp != act:
        sys.stderr.write("Bad\n")
        print exp
        print act

class Writer(object):

    def __init__(self):
        self._stack = []
        self._indent = ''
        self._lastclock = None

    def indent(self, quiesce=False):
        self._stack.append(self._indent)
        if self._indent is None or quiesce:
            self._indent = None
        else:
            self._indent += '  '

    def unindent(self):
        self.done()
        self._indent = self._stack.pop()

    def show(self, s=None, emptyline=False, newline=True):
        if self._indent is None:
            return
        if self._lastclock:
            sys.stdout.write("\n")
        if emptyline:
            sys.stdout.write("\n")
        sys.stdout.write(self._indent)
        sys.stdout.write(s)
        if newline and not self._lastclock:
            sys.stdout.write("\n")

    def heading(self, title, quiesce=False):
        self.show(title, emptyline=True)
        self.indent(quiesce=quiesce)

    def section(self, title, quiesce=False):
        if not quiesce:
            self.show(title, emptyline=False)
        self.indent(quiesce=quiesce)

    def step(self, title):
        self.done()
        self.show(title + "...", newline=False)
        self._lastclock = time.clock()

    def done(self):
        if self._lastclock:
            now = time.clock()
            sys.stdout.write(" -> %f\n" % (now - self._lastclock))
            self._lastclock = None

class DiscoveryTests(object):

    def __init__(self, writer=None, quiet=False):
        self.glog = False
        self.writer = writer or Writer()
        self.quiet = quiet

    def setupdags(self, a, b, ans, bns):
        pass

    def test(self, cltdag, srvdag, cltnodes, srvnodes):
        pass

    def testdags(self, dags):
        a, b = dags
        ans, bns = [d.nodeset() for d in dags]
        self.setupdags(a, b, ans, bns)

        aout = ans - bns
        bout = bns - ans
        self.writer.heading("a (%d, %d) -> b (%d, %d)" % (len(ans), len(aout), len(bns), len(bout)), quiesce=self.quiet)
        self.test(a, b, ans, bns)
        self.writer.unindent()

        self.writer.heading("b (%d, %d) -> a (%d, %d)" % (len(bns), len(bout), len(ans), len(aout)), quiesce=self.quiet)
        self.test(b, a, bns, ans)
        self.writer.unindent()

    def testdag(self, name, desc):
        self.writer.heading(name)
        dags = makedags(desc)
        if self.glog:
            for d in dags:
                dagprinter.printfmtwalk(d.fmtwalk())
        self.testdags(dags)
        self.writer.unindent()

    def testfile(self, fname, pairs=None, n=5):
        self.writer.heading(fname)
        self.writer.step("parsing...")
        dag = dagutil.MemDAG.fromfile(fname)
        self.writer.done()
        nodes = dag.nodeset()
        if not pairs:
            pairs = [(dag.headsof(random.sample(nodes, 30)),
                      dag.headsof(random.sample(nodes, 30)))
                     for _i in xrange(n)]
        for p in pairs:
            self.writer.heading("heads: %s" % format([list(hs) for hs in p]))
            ds = [dag.subset(list(hs)) for hs in p]
            self.testdags(ds)
            self.writer.unindent()
        self.writer.unindent()

    def testpredefined(self):
        tests = [
                 ('small superset',
                  '+2:f +1:a:b'
                  '<f +4 :a'
                  '+5 :b'
                  '<f +3 :b'
                 ),
                 ('many new',
                  '+2:f +3:a +3:b '
                  '<f +30 :a'
                 ),
                 ('both many new with stub',
                  '+2:f +2:a +30 :b'
                  '<f +30 :a'
                 ),
                 ('both many new',
                  '+2:f +30 :b'
                  '<f +30 :a'
                 ),
                 ('both many new skewed',
                  '+2:f +30 :b'
                  '<f +50 :a'
                 ),
                 ('very long separate branches',
                  '+90000:f +90000:a <f +90000:b'
                 ),
                ]
        for name, desc in tests:
            self.testdag(name, desc)

    def testall(self):
        #self.testpredefined()
        #self.testfile("../data/linux.dag", pairs=[([160371], [162753])])
        #self.testfile("../data/linux.dag", pairs=[([158097, 167257], [167019])])
        self.testfile("../data/linux.dag")