dag-discovery / src / discovery_tonfa.py

# Efficient non-chatty discovery of nodes missing in one DAG vs another.
#
# Copyright 2009 Peter Arrenbrecht <peter@arrenbrecht.ch>
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2, incorporated herein by reference.

import random
from testing import DiscoveryTests, assertnodes

MAX_SAMPLE = 200
TRACE = False


class Config(object):

    def __init__(self):
        pass


class Participant(object):

    def __init__(self, dag, writer, cfg):
        self.dag = dag
        self.writer = writer
        self.cfg = cfg


def log2(n):
    i = 0
    while n > 0:
        n //= 2
        i += 1
    return i

def clever_sample(dag, nodes, stop):
    if len(nodes) < MAX_SAMPLE:
        return set(nodes)

    if TRACE: print "headsof"
    heads = dag.headsofconnectedset(nodes)
    if TRACE: print heads
    sample = set()

    dist = {}
    order = []
    visit = list(heads)
    seen = set(stop)
    cands = []

    if TRACE: print "heads -> roots"
    roots = set()

    while visit:
        curr = visit.pop(0)
        if curr in seen:
            continue
        d = dist.setdefault(curr, 1)
        order.append(curr)
        seen.add(curr)
        cands.append(curr)

        if not len(list(p for p in dag.parents(curr) if p not in stop)):
            roots.add(curr)

        for p in dag.parents(curr):
            dist.setdefault(p, d+1)
            visit.append(p)

    if TRACE: print "sample"
    factor = 1
    for n in order:
        if dist[n] > factor:
            factor *= 2
        if dist[n] == factor:
            sample.add(n)

    if TRACE: print "roots -> heads"
    visit = list(roots)
    order = []
    dist = {}
    seen = set()
    while visit:
        curr = visit.pop(0)
        if curr in seen:
            continue
        d = dist.setdefault(curr, 1)
        order.append(curr)
        seen.add(curr)
        for c in dag.children(curr):
            if c not in nodes:
                continue
            dist.setdefault(c, d+1)
            visit.append(c)

    if TRACE: print "sample"
    factor = 1
    for n in order:
        if dist[n] > factor:
            factor *= 2
        if dist[n] == factor:
            sample.add(n)

    assert sample
    sample.difference_update(heads)
    if len(sample)+len(heads) > MAX_SAMPLE:
        sample = set(random.sample(sample, MAX_SAMPLE-len(heads)))
    elif len(sample)+len(heads) < 200:
        if TRACE: print "Filling from", len(sample) + len(heads)
        sample.update(random.sample(list(set(cands) - sample - heads), 200 - len(sample) - len(heads)))
    sample.update(heads)
    return sample

class Client(Participant):

    def __init__(self, dag, writer, cfg):
        Participant.__init__(self, dag, writer, cfg)
        self._unknown = self.dag.nodeset()
        self._common = set()
        self._missing = set()

    def commonheads(self, server):

    def common(self, server):
        i = 0
        while self._unknown:
            if TRACE: self.writer.show("sampling...")
            sample = clever_sample(self.dag, self._unknown, self._common)
            if TRACE: self.writer.show("querying...")
            common, remain = server.discover(sample)
            self.writer.show("number of unknown left: %i, sample size: %s"
                             % (len(self._unknown), len(sample)))
            i += 1

            if TRACE: self.writer.show("updating missing...")
            self._missing.update(self.dag.descendants((n for n in sample if n not in common), self._missing))
            if TRACE: self.writer.show("updating common...")
            self._common.update(self.dag.ancestors(list(n for n in common if n in self._unknown), self._common))

            if remain is not None:
                if TRACE: self.writer.show("updating common...")
                self._common.update(self.dag.ancestors(list(n for n in remain if n in self._unknown)), self._common)
                break

            if TRACE: self.writer.show("updating unknown...")
            self._unknown.difference_update(self._missing)
            self._unknown.difference_update(self._common)

        self.writer.show("number of iterations: %i" % i)
        return self._common


class Server(Participant):

    def __init__(self, dag, writer, cfg):
        Participant.__init__(self, dag, writer, cfg)

    def discover(self, sample):
        nodes = self.dag.nodeset()

        common = set()
        for n in sample:
            if n in nodes:
                common.add(n)

        allcommon = self.dag.nodeset(list(common))
        allremain = nodes - allcommon
        self.writer.show("server remaining: %i" % len(allremain))
        if len(allremain) < MAX_SAMPLE:
            return common, allremain

        return common, None


class Tests(DiscoveryTests):

    def __init__(self):
        DiscoveryTests.__init__(self, quiet=False)
        self.cfg = Config()

    def setupdags(self, a, b, ans, bns):
        self.expected = ans & bns

    def test(self, cdag, sdag, cns, sns):
        s = Server(sdag, self.writer, self.cfg)
        c = Client(cdag, self.writer, self.cfg)

        notraffic = False

        self.writer.section("traffic", quiesce=notraffic)
        actual = c.common(s)
        self.writer.unindent()
        assertnodes(self.expected, actual)

if __name__ == "__main__":
    random.seed(0)
    Tests().testall()
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.