Commits

kiwi committed 074d72c

first commit

  • Participants

Comments (0)

Files changed (8)

+============
+Peerdownload
+============
+
+Peerdownload is a plugin for yum which attempts to download rpms from
+participating computers in the local area network (peers).
+It is composed of two components:
+
+* the actual plugin, which scan the local network and fetch the rpms
+* the server, which let other peers download our cached files
+
+
+How to use
+==========
+
+#. get peerdownload and install it (hint: use a virtualenv)
+#. ``ln -s plugin.py /usr/lib/yum-plugins/peerdownload.py``
+#. enable it:
+   ``echo -e "[main]\nenabled=1" > /etc/yum/pluginconf.d/peerdownload.conf``
+#. start the server with python -m peerdownload.main
+#. temporarily disable selinux as it prevents dbus to work in yum
+   (or fix it and send a patch :) )
+#. make yum to keep the downloaded files: add keepcache=1 in the main section of
+   yum.conf
+#. use yum normally
+
+Done!

peerdownload/__init__.py

Empty file added.

peerdownload/collection.py

+# -*- coding: utf-8 -*-
+import os
+import re, glob
+import sqlite3
+
+import logging
+log = logging.getLogger(__name__)
+
+VERSION=0
+
+HASH_ALGOS=("sha256","md5")
+from hashlib import sha256, md5
+HASH_FUNCS=(sha256, md5)
+
+#def _upgrade_to_1(db):
+#    pass
+
+def upgrade(db, initial_version):
+    for v in range(initial_version, VERSION):
+        eval("_upgrade_to_"+str(v))(db)
+
+def checkschema(collection):
+    db = collection.db
+    c = db.cursor()
+    c.execute("""select version from metadata""")
+    metadata = c.fetchone()
+    if not metadata:
+        raise(ValueError)
+    version = metadata[0]
+    if version > VERSION:
+        raise(ValueError)
+    elif version < VERSION:
+        upgrade(db, int(version))
+    else:
+        log.debug("version is current")
+
+def initdb(collection):
+    log.debug("initializing db")
+    db = collection.db
+    hashfields = "".join(["h_{0} blob unique,".format(h) for h in HASH_ALGOS])[:-1]
+    db.execute("""create table files
+             (filename text,
+              path text,
+              mtime int not null,"""+hashfields+")")
+    db.execute("""create table metadata
+             (version int)""")
+    db.execute("""INSERT INTO metadata
+             VALUES (?)""", (VERSION,))
+    db.commit()
+
+
+
+
+class File(object):
+
+    def __init__(self, row):
+        self.row = row
+
+    def gethash(self, hashtype):
+        field = "h_{0}".format(hashtype)
+        return self.row[field]
+
+    @property
+    def name(self):
+        return self.row['filename']
+
+    @property
+    def path(self):
+        return self.row['path']
+
+    @property
+    def fullpath(self):
+        return os.path.join(self.row['path'], self.row['filename'])
+
+    def get_file(self):
+        return open(self.fullpath, 'r')
+
+PATTERNS = ['.*\.rpm']
+
+class Collection(object):
+
+    def __init__(self, path=None, dbpath=None, patterns=None):
+        if patterns is None:
+            patterns = PATTERNS
+        self.path = path
+        self.patterns = [re.compile(p) for p in patterns]
+        self.dbpath = dbpath
+        if os.path.exists(dbpath):
+            self.db = sqlite3.connect(dbpath)
+            self.db.row_factory = sqlite3.Row
+            checkschema(self)
+            self.algos = self.get_supported_algos()
+        else:
+            self.db = sqlite3.connect(dbpath)
+            self.db.row_factory = sqlite3.Row
+            self.algos = HASH_ALGOS
+            initdb(self)
+        self.algos_funcs = HASH_FUNCS # algo_to_funcs(self.algos)
+
+    def get_supported_algos(self):
+        row = self.db.execute('SELECT * from files limit 1').fetchone()
+        if row is None:
+            return tuple()
+        else:
+            r = filter(lambda x:x.startswith('h_'), row.keys())
+            r = map(lambda x:x[2:], r)
+            return r
+
+    def compute_hashes(self, f):
+        hashmachines = []
+        for alg, hashname in zip(self.algos_funcs, self.algos):
+            hashmachines.append(alg())
+        while True:
+            chunk = f.read(4096)
+            if len(chunk) == 0:
+                break # only chunk norris can read after EOF
+            for hashmachine in hashmachines:
+                hashmachine.update(chunk)
+        result = {}
+        for hashmachine, algo in zip(hashmachines, self.algos):
+            result[algo] = hashmachine.hexdigest()
+        return result
+
+
+    def by_fullpath(self, fullpath):
+        c = self.db.cursor()
+        path, name = os.path.split(fullpath)
+        c.execute("SELECT * from files WHERE path = ? and filename = ?", (path, name))
+        row = c.fetchone()
+        if row is None:
+            return None
+        return File(row)
+
+    def by_filename(self, filename):
+        c = self.db.cursor()
+        c.execute("SELECT * from files WHERE filename = ?", (filename,))
+        rows = c.fetchall()
+        return [File(row) for row in rows]
+
+    def by_hash(self, hashtype, hashvalue):
+        c = self.db.cursor()
+        field = 'h_'+hashtype
+        q="SELECT * from files WHERE {} = ?".format(field)
+        c.execute(q, (hashvalue,))
+        row = c.fetchone()
+        if row is None:
+            return None
+        return File(row)
+
+    def delete(self, row, many=False):
+        c = self.db.cursor()
+        c.execute("DELETE FROM files WHERE filename=? and path=?",
+                  (row['filename'], row['path']))
+        self.db.commit()
+
+
+    def upsert(self, path, name, hashes, mtime):
+        c = self.db.cursor()
+        c.execute("DELETE FROM files WHERE filename = ? and path = ?", (name, path))
+        c.execute("INSERT into files (path, filename, mtime) VALUES \
+                    (?, ? ,?)",
+                (path, name, mtime))
+        field = lambda x:"h_{}".format(x)
+        for key, value in hashes.items():
+            q = "UPDATE files SET {} = ? WHERE \
+                filename = ? AND path = ?".format(field(key))
+            c.execute(q, (value, name, path))
+        self.db.commit()
+
+    def allrows(self):
+        c = self.db.cursor()
+        q="SELECT * from files"
+        c.execute(q)
+        while True:
+            row = c.fetchone()
+            if row is None:
+                return
+            else:
+                yield File(row)
+
+    def scan(self):
+        for root, dirs, files in os.walk(self.path):
+            for fname in files:
+                for p in self.patterns:
+                    if p.match(fname):
+                        fullpath = os.path.join(root, fname)
+                        statinfo = os.stat(fullpath)
+                        mtime = statinfo.st_mtime
+                        thisfile = self.by_fullpath(fullpath)
+                        if (not thisfile) or \
+                            (thisfile and (not thisfile.row['mtime']) or \
+                                          (thisfile.row['mtime'] < mtime)):
+                            log.debug("updating {0}".format(fullpath))
+                            f = open(fullpath)
+                            hashes = self.compute_hashes(f)
+                            f.close()
+                            self.upsert(root, fname, hashes, mtime)
+                        else:
+                            log.debug("skipping {0} update".format(fullpath))
+        for f in self.allrows():
+            try:
+                statinfo = os.stat(f.fullpath)
+            except OSError, e:
+                if e.errno == 2:
+                    log.debug("cleaning up"+f.fullpath)
+                    self.delete(f.row)
+                else:
+                    raise()
+
+

peerdownload/main.py

+# -*- coding: utf-8 -*-
+
+import logging
+import sys
+logging.basicConfig(level=logging.DEBUG)
+
+DBPATH = sys.argv[1] or "/var/lib/yum/filecache.db"
+
+if __name__ == "__main__":
+    from peerdownload import collection, publish, serve
+    c = collection.Collection("/var/cache/yum", DBPATH)
+    c.scan()
+    publish.register()
+    serve.run(collection = c)

peerdownload/plugin.py

+# -*- coding: utf-8 -*-
+
+import random
+import os
+import json
+import subprocess
+import tempfile
+from urllib2 import urlopen,HTTPError
+import urlgrabber
+
+from yum.plugins import TYPE_CORE
+
+requires_api_version = '2.6'
+plugin_type = (TYPE_CORE)
+NAME = "PeerDownload"
+NAME_ = NAME.lower()
+CHECKSUMS_TYPES = ('sha256',) # a whitelist of decent checksums
+subp = None
+tempf = None
+
+def prereposetup_hook(conduit):
+    global subp, tempf
+    opts, commands = conduit.getCmdLine()
+    if opts and opts.disablepeerdownload:
+        conduit.info(5, ('{0} disabled by user request').format(NAME))
+        return
+    conduit.info(2, '{} enabled'.format(NAME))
+    _, tempf = tempfile.mkstemp()
+    subp=subprocess.Popen(['/usr/bin/env', 'python', __file__, tempf],
+                          bufsize=-1)
+    conduit.info(3, 'Background scan for peers started')
+
+
+
+def config_hook(conduit):
+    # Add --disable-presto option
+    parser = conduit.getOptParser()
+    if parser:
+        parser.add_option('', '--disable'+NAME_, dest='disablepeerdownload',
+            action='store_true', default=False,
+            help=("disable {0} plugin").format(NAME))
+
+def predownload_hook(conduit):
+    global subp
+    if subp is None:
+        # no repo setup, probably some offline op (remove)
+        return
+    subp.poll()
+    if subp.returncode is not None:
+        conduit.info(4, 'waiting for scan to finish…')
+        subp.wait()
+        conduit.info(4, 'done')
+    try:
+        hosts = json.loads(open(tempf).read())
+    except Exception, e:
+        conduit.info(1, "Cannot parse network scan output")
+        hosts = []
+    os.unlink(tempf)
+    if not hosts:
+        conduit.info(5, "No peers found, exiting early")
+        return
+    packages = conduit.getDownloadPackages()
+    for po in packages:
+        myhosts = list(hosts)
+        random.shuffle(myhosts)
+        if not po.verifyLocalPkg():
+            if (hasattr(po, 'pkgtype') and po.pkgtype == 'local') or \
+                po.remote_url.startswith("file:/"):
+                conduit.info(5, "Package {0}.{1} is local".format(
+                    po.name,
+                    po.arch))
+                continue
+            conduit.info(3, "Package {0}.{1} is missing, asking peers".format(
+                    po.name,
+                    po.arch))
+            getpackage(conduit, po, myhosts)
+        else:
+            conduit.info(4, "Package {0}.{1} already there".format(
+                    po.name,
+                    po.arch))
+
+def posttrans_hook(conduit):
+    rescan(conduit)
+
+def close_hook(conduit):
+    global subp
+    if subp is not None:
+        try:
+            subp.kill()
+        except:
+            pass
+
+def rescan(conduit):
+    conduit.info(4, "Triggering package rescan…")
+    try:
+        urlopen("http://localhost:2426/rescan").read()
+    except Exception, e:
+        print e
+        conduit.info(4, "peerdownload package rescan failed")
+
+def getpackage(conduit, po, hosts):
+    path = po.localpath
+    checksums = dict()
+    for (csumtype, csum, csumid) in po.checksums:
+        checksums[csumtype] = csum
+    for checksum_type in CHECKSUMS_TYPES:
+        if checksum_type in checksums:
+            break
+    else:
+        return None
+    for host in hosts:
+        d = dict(checksum_type=checksum_type,
+                 checksum=checksums[checksum_type],
+                 host=host[0],
+                 port=host[1])
+        url = "http://{host}:{port}/files/{checksum_type}/{checksum}".format(**d)
+        try:
+            progress = urlgrabber.progress.TextMeter()
+            resp = urlgrabber.urlopen(url, progress=progress)
+        except Exception, e:
+            if isinstance(e, HTTPError) and e.code == 404:
+                conduit.info(4, "Host {2} have not {0}.{1}".format(
+                    po.name,
+                    po.arch,
+                    host[0]))
+                continue
+            else:
+                conduit.info(3, "Download of {0}.{1}  from {2} failed".format(
+                    po.name,
+                    po.arch,
+                    host[0]))
+                continue
+        try:
+            os.unlink(path)
+        except OSError:
+            pass
+        f = open(path, 'wb')
+        f.write(resp.read())
+        if not po.verifyLocalPkg():
+            conduit.info(2, "Download of {0}.{1}  from {2} is bad!".format(
+                    po.name,
+                    po.arch,
+                    host[0]))
+    else:
+        conduit.info(2, "No good peer found for {0}.{1}".format(
+                po.name,
+                po.arch))
+
+
+
+if __name__ == '__main__':
+    import dbus, gobject, avahi
+    from dbus.mainloop.glib import DBusGMainLoop
+    import sys
+
+    TYPE = '_filecache._tcp'
+    hosts = []
+
+    def service_resolved(*args):
+        print '\nservice resolved'
+        print 'name:', args[2]
+        print 'address:', args[7]
+        print 'port:', args[8]
+        h = (args[7], args[8])
+        hosts.append(h)
+
+    def print_error(*args):
+        print args[0]
+
+    def myhandler(interface, protocol, name, stype, domain, flags):
+        #print "Found service '%s' type '%s' domain '%s' " % (name, stype, domain)
+
+        if flags & avahi.LOOKUP_RESULT_LOCAL:
+                # local service, skip
+                pass
+        else:
+            server.ResolveService(interface, protocol, name, stype,
+            domain, avahi.PROTO_UNSPEC, dbus.UInt32(0),
+            reply_handler=service_resolved, error_handler=print_error)
+
+    loop = DBusGMainLoop()
+    bus = dbus.SystemBus(mainloop=loop)
+    server = dbus.Interface( bus.get_object(avahi.DBUS_NAME, '/'),
+            'org.freedesktop.Avahi.Server')
+    sbrowser = dbus.Interface(bus.get_object(avahi.DBUS_NAME,
+            server.ServiceBrowserNew(avahi.IF_UNSPEC,
+                avahi.PROTO_UNSPEC, TYPE, 'local', dbus.UInt32(0))),
+            avahi.DBUS_INTERFACE_SERVICE_BROWSER)
+    sbrowser.connect_to_signal("ItemNew", myhandler)
+
+    l = gobject.MainLoop()
+
+    def stop():
+        f = open(sys.argv[1], 'wb')
+        json.dump(hosts, f)
+        f.close()
+        sys.exit(0)
+
+    gobject.timeout_add(2000, stop)
+    l.run()
+
+

peerdownload/publish.py

+import dbus
+import gobject
+import avahi
+from dbus.mainloop.glib import DBusGMainLoop
+
+
+serviceName = "File Caching Service"
+serviceType = "_filecache._tcp" # See http://www.dns-sd.org/ServiceTypes.html
+servicePort = 2426
+serviceTXT = {'gattini':'pelosi'}  #TXT record for the service
+
+domain = "" # Domain to publish on, default to .local
+host = "" # Host to publish records for, default to loc
+
+group = None #our entry group
+bus = None
+server = None
+rename_count = 12 # Counter so we only rename after collisions a sensible number of times
+
+def add_service():
+    global group, serviceName, serviceType, servicePort, serviceTXT, domain, host, bus
+    if group is None:
+        group = dbus.Interface(
+                bus.get_object( avahi.DBUS_NAME, server.EntryGroupNew()),
+                avahi.DBUS_INTERFACE_ENTRY_GROUP)
+        #group.connect_to_signal('StateChanged', entry_group_state_changed)
+
+    print "Adding service '%s' of type '%s' ..." % (serviceName, serviceType)
+
+    group.AddService(
+            avahi.IF_UNSPEC,    #interface
+            avahi.PROTO_UNSPEC, #protocol
+            dbus.UInt32(0),                  #flags
+            serviceName, serviceType,
+            domain, host,
+            dbus.UInt16(servicePort),
+            avahi.string_array_to_txt_array(serviceTXT)
+            )
+    group.Commit()
+
+def remove_service():
+    global group
+
+    if not group is None:
+        group.Reset()
+
+def server_state_changed(state):
+    if state == avahi.SERVER_COLLISION:
+        print "WARNING: Server name collision"
+        remove_service()
+    elif state == avahi.SERVER_RUNNING:
+        add_service()
+
+def entry_group_state_changed(state, error):
+    global serviceName, server, rename_count, main_loop
+
+    print "state change: %i" % state
+
+    if state == avahi.ENTRY_GROUP_ESTABLISHED:
+        print "Service established."
+    elif state == avahi.ENTRY_GROUP_COLLISION:
+
+        rename_count = rename_count - 1
+        if rename_count > 0:
+            name = server.GetAlternativeServiceName(name)
+            print "WARNING: Service name collision, changing name to '%s' ..." % name
+            remove_service()
+            add_service()
+
+        else:
+            print "ERROR: No suitable service name found after %i retries, exiting." % n_rename
+            #main_loop.quit()
+    elif state == avahi.ENTRY_GROUP_FAILURE:
+        print "Error in group state changed", error
+        #main_loop.quit()
+        return
+
+
+
+
+def register():
+    global bus, server, main_loop
+    DBusGMainLoop( set_as_default=True )
+
+    #main_loop = gobject.MainLoop()
+    bus = dbus.SystemBus()
+
+    server = dbus.Interface(
+            bus.get_object( avahi.DBUS_NAME, avahi.DBUS_PATH_SERVER ),
+            avahi.DBUS_INTERFACE_SERVER )
+
+    #server.connect_to_signal( "StateChanged", server_state_changed )
+    server_state_changed( server.GetState() )

peerdownload/serve.py

+# -*- coding: utf-8 -*-
+import bottle
+import re
+
+
+def run(collection, address='', port=2426):
+    global scanthread, scanlock
+    #def run(server_class=BaseHTTPServer.HTTPServer,
+    #    handler_class=BaseHTTPServer.BaseHTTPRequestHandler):
+    app = bottle.Bottle()
+
+    @app.route("/files/<hashtype>/<hashcontent>")
+    def getfile(hashtype, hashcontent):
+        if not re.match("[a-zA-Z0-9_-]+", hashtype):
+            raise bottle.HTTPError(403)
+        wantedfile = collection.by_hash(hashtype, hashcontent)
+        if wantedfile is None:
+            raise bottle.HTTPError(404)
+        bottle.response.content_type = "application/octet-stream"
+        return wantedfile.get_file()
+
+    @app.route("/rescan")
+    def rescan():
+        if bottle.request.remote_addr not in {"127.0.0.1", "::1"}:
+            raise bottle.HTTPError(403)
+        # FIXME: be async
+        collection.scan()
+        return ':)'
+
+    bottle.run(app, port=port, host=address)
+# -*- coding: utf-8 -*-
+
+from setuptools import setup, find_packages
+
+setup(
+    name='yum-peercache',
+    version='0.0.1',
+    description='.',
+    author='Stefano Cavallari',
+    author_email='spiky.kiwi@gmail.com',
+    url='',
+    packages = find_packages(),
+    install_requires=[
+        'urlgrabber',
+        'bottle',
+        ],
+
+)