Source

pypi-appengine / fetch.py

Full commit
import httplib, xmlrpclib, time, pickle, urllib2, binascii, os, logging, re
import cStringIO, gzip, zlib
try:
    from xml.etree.cElementTree import *
except ImportError:
    try:
      from xml.etree.ElementTree import *
    except ImportError:
      from elementtree.ElementTree import *
from google.appengine.api.labs import taskqueue
from google.appengine.api.urlfetch import DownloadError
import model
from model import get_package_key_name

UA = 'appengine-mirror'
# where should we mirror files from?
filesrc = 'pypi.python.org'
#filesrc = 'localhost:8000'

def rpc():
    return xmlrpclib.ServerProxy('http://pypi.python.org/pypi')

class StepFailed(Exception):
    pass

################### Transfer ######################################

def simple_page(m, uproject):
    logging.info("%s %s" % (m, uproject))
    project = uproject.encode('utf-8')
    h = httplib.HTTPConnection('pypi.python.org')
    if project:
        h.putrequest('GET', '/simple/'+urllib2.quote(project)+'/')
        obj = model.Project.get_by_key_name(get_package_key_name(uproject))
    else:
        h.putrequest('GET', '/simple/')
        obj = m
    h.putheader('Accept-Encoding', 'gzip')
    h.putheader('User-Agent', UA)
    h.endheaders()
    try:
        r = h.getresponse()
    except DownloadError, e:
        raise StepFailed(e)
    html = r.read()
    if r.getheader('Content-Encoding') == 'gzip':
        html = gzip.GzipFile(fileobj=cStringIO.StringIO(html), mode='r').read()
    if r.status == 404:
        if project:
            delete_package(obj)
        return None
    if r.status == 301:
        # package not existant anymore, however, similarly-spelled
        # package exists
        if project:
            delete_package(obj)
        return None
    if r.status != 200:
        raise ValueError, "Status %d on %s" % (r.status, project)
    html = zlib.compress(html)
    if obj:
        obj.simple = html
    else:
        obj = model.Project(key_name=get_package_key_name(uproject),
                            simple=html)
    if project:
        # the root index is not signed
        h.putrequest('GET', '/serversig/'+urllib2.quote(project)+'/')
        h.putheader('User-Agent', UA)
        h.endheaders()
        r = h.getresponse()
        obj.sig = r.read()
    obj.put()
    return html

def get_state():
    t = int(time.time())
    m = model.MirrorState.all().fetch(1)
    if m:
        return m[0]
    secret = binascii.b2a_hex(os.urandom(10))
    logging.info("Secret key "+secret)
    m = model.MirrorState(upload_key=secret,
                          epoch=0)
    todo = [('package', '')]
    for p in rpc().list_packages():
        todo.append(('package', p))
    todo.append(('last_modified', t))
    m.todo = pickle.dumps(todo)
    m.put()
    return m

def package(m, todo, name):
    data = simple_page(m, name)
    if not data:
        return
    x = fromstring(data)
    files = set()
    for a in x.findall(".//a"):
        url = a.attrib['href']
        if not url.startswith('../../packages/'):
            continue
        url = url.split('#')[0]
        url = url[len('../../packages/'):]
        # insert after current task
        todo.insert(1,('file', (name, url)))
        files.add(url)
    if name:
        delete_extra_files(name, files)

def copy_file(m, todo, (package, path)):
    project = model.Project.get_by_key_name(get_package_key_name(package))
    f = model.File.all().filter("path = ", path).fetch(1)
    if f:
        f = f[0]
    h = httplib.HTTPConnection('pypi.python.org')
    h.putrequest('HEAD', '/packages/'+urllib2.quote(path))
    if f and f.etag:
        h.putheader("If-none-match", f.etag)
    h.endheaders()
    try:
        r = h.getresponse()
    except DownloadError, e:
        # move job to the end of the list
        logging.error('Downloading %s failed: %s' % (path, str(e)))
        j = todo[0]
        todo.insert(-1, j)
        return
    if r.status == 304:
        # Not modified
        logging.info('%s not modified' % path)
        return
    if r.status == 200:
        if not f:
            #import pdb, sys
            #sys.__stdout__.write('\a')
            #sys.__stdout__.flush()
            #debugger = pdb.Pdb(stdin=sys.__stdin__, stdout=sys.__stdout__)
            #debugger.set_trace(sys._getframe().f_back) 
            parent, name = path.rsplit("/",1)
            model.Directory.mkdir(parent)
            secret = binascii.b2a_hex(os.urandom(10))
            f = model.File(path=path, dotdot=parent, project=project,
                           etag=r.msg.get('etag'), secret=secret)
        else:
            f.contents.delete()
            f.contents = None
            f.etag = r.msg.get('etag')
        f.put()
        # double-check it's a singleton
        files = model.File.all().filter("path = ", path).fetch(10)
        if len(files) != 1:
            logging.error("Duplicate file for "+path)
            for f in files[1:]:
                delete_file(f)
    elif r.status == 404:
        logging.error('File %s not found' % path)
    else:
        raise ValueError, "Bad HTTP status (%s %s)" % (path, r.status)

def delete_file(f):
    if f.contents:
        f.contents.delete()
    f.delete()

def last_modified(m, todo, t):
    m.last_modified = int(t)
    m.epoch = m.epoch+1
    m.put()

def transfer_file(m):
    "Trigger transmission of a file, return False if nothing needs to be done."
    if m.current_upload:
        path = m.current_upload
    else:
        f = model.File.all().filter("contents = ", None).fetch(1)
        if not f:
            return False
        m.current_upload = path = f[0].path
        m.current_upload_start = int(time.time())
        m.put()
    host = os.environ['SERVER_NAME']
    if os.environ['SERVER_PORT'] != '80':
        host += ':'+os.environ['SERVER_PORT']
    h = httplib.HTTPConnection(filesrc)
    h.putrequest('GET', '/pypi?:action=gae_file&host=%s&secret=%s' %
                 (host, m.upload_key))
    h.endheaders()
    r = h.getresponse()
    if r.status != 204:
        logging.error('Failed to initiate transfer (%s)' % r.status)
        return True
    return True

############## Incremental Updates #################

def delete_file(f):
    if f.contents:
        f.contents.delete()
    f.delete()

def delete_package(obj):
    if not obj:
        return
    for f in model.File.all().filter("project = ", obj):
        delete_file(f)
    obj.delete()

def delete_extra_files(name, files):
    p = model.Project.get_by_key_name(get_package_key_name(name))
    for f in model.File.all().filter("project = ", p):
        if f.path not in files:
            delete_file(f)

def check_modifications(m, todo):
    now = int(time.time())
    try:
        modified = rpc().changed_packages(m.last_modified-1)
    except DownloadError, e:
        logging.warning('changed_packages call failed: '+str(e))
        return
    add_count = 0
    for name in modified:
        if ('package', name) in todo:
            continue
        todo.append(('package', name))
        add_count += 1
    logging.debug("added %d steps " % (add_count,))
    if modified:
        todo.append(('package', ''))
    todo.append(('last_modified', now))

############## Statistics ##########################

def integrate_stats(m):
    import stats
    stats.integrate()

############## Queuing #############################

actions = {'package':package,
           'file':copy_file,
           'last_modified': last_modified,
           'integrate_stats': integrate_stats,
           }

def queue_step():
    taskqueue.add(countdown=2,url='/step')

def step():
    m = get_state()
    m.last_step = int(time.time())
    m.put()
    todo = pickle.loads(m.todo)
    if transfer_file(m):
        # a new file transfer was initiated, set timer
        return "file transfer started"
    if not todo:
        check_modifications(m, todo)
        if not todo:
            return ""
    num_steps = len(todo)
    logging.debug("%s steps left to complete" % (num_steps,))
    action, param = todo[0]
    logging.info('step %s %r' % (action, param))
    try:
        actions[action](m, todo, param)
    except StepFailed, e:
        logging.warning("Step %s/%s failed: %s" % (action, param, e))
        return "failed"
    del todo[0]
    m.todo = pickle.dumps(todo, pickle.HIGHEST_PROTOCOL)
    m.put()
    if todo:
        # name the task, so that no two of them will be added
        try:
            # Allow re-running a task once a day, per epoch
            day = int(time.time())//3600/24-14820
            n, p = todo[0]
            if n == 'file':
                p = p[1]
            name = '%s-%s-%d-%d-%s' % (n, p, hash(p) & 0xFFFF, day, m.epoch)
            name = re.sub('[^a-zA-Z0-9-]', '-', name)
            taskqueue.add(name=name, url='/step')
        except taskqueue.InvalidTaskError, e:
            # likely, task already existed, or was tombstoned
            logging.error("Queing task %s failed: %s" % (name,e.__class__.__name__))
    return "OK (%s %s)" % (action, param)

def cron():
    'Run every 5 minutes'
    m = model.MirrorState.instance()
    now = time.time()
    if m.current_upload and now-m.current_upload_start > 600:
        # restart failed downloads after 10 minutes
        logging.error("Restarting download of "+m.current_upload)
        transfer_file(m)
        return "download restarted"
    elif now-m.last_step > 60:
        # if there was no step in the last minute, restart stepping
        return step()
    return "Nothing to do"