Source

cheesecake-service / webui.py

#!/usr/bin/env python

import os
import random
import sys
import time

import web

from store import Store
from config import LOG_DIRECTORY, TIMESTAMP_FILE, DB_NAME, DB_USER, DB_PASSWORD, STATIC_FILES_DIRECTORY, TEMPLATES_DIRECTORY

################################################################################
## Helpers.
################################################################################

def read_file_contents(path):
    fd = file(path)
    contents = fd.read()
    fd.close()
    return contents

def remove_duplicates(seq):
    return list(set(seq))

def put_anchors(log_contents):
    """Put anchors between logfile lines to divide it into sections.
    """
    mapping = {
        'Trying to download package': '<a name="download"></a>Trying to download package',
        'Trying to install package': '<a name="install"></a>Trying to install package',
    }

    for pattern, replacement in mapping.iteritems():
        log_contents = log_contents.replace(pattern, replacement)

    return log_contents

################################################################################
## Controllers.
################################################################################

urls = (
    '/', 'index',

    '/messages', 'messages',
    '/message/(.*)', 'message',

    '/scores', 'scores',
    '/scores/installability', 'scores_installability',
    '/scores/documentation', 'scores_documentation',
    '/scores/code_kwalitee', 'scores_code_kwalitee',
    '/score/(.*)/(.*)', 'score',
    '/easy_installability', 'easy_installability',

    '/log/(.*)', 'log',

    '/top10', 'top10',

    '/(style.css)', 'static',
    '/(cheesecake.gif)', 'static',
)


class HandlerWithStore(object):
    def GET(self, *args, **kwds):
        self.store = Store(directory=LOG_DIRECTORY,
                           timestamp_file=TIMESTAMP_FILE,
                           postgres_config=("dbname=%s user=%s password=%s" % (DB_NAME, DB_USER, DB_PASSWORD)))
        self._GET(*args, **kwds)
        self.store.close()

def get_top10_lists(store):
        top10_installability = list(store.score.iteritems())
        top10_installability.sort(lambda x,y: cmp(x[1].installability.relative,
                                                  y[1].installability.relative),
                                  reverse=True)

        top10_documentation = list(store.score.iteritems())
        top10_documentation.sort(lambda x,y: cmp(x[1].documentation.relative,
                                                  y[1].documentation.relative),
                                 reverse=True)

        top10_code_kwalitee = list(store.score.iteritems())
        top10_code_kwalitee.sort(lambda x,y: cmp(x[1].code_kwalitee.relative,
                                                  y[1].code_kwalitee.relative),
                                 reverse=True)

        return top10_installability, top10_documentation, top10_code_kwalitee

class index(HandlerWithStore):
    def _GET(self):
        last_timestamp = time.strftime('%b %d %Y, %H:%M:%S GMT', time.gmtime(self.store.timestamp))
        messages = web.select("messages", limit=3, order='time DESC')

        if self.store.score:
            random_release, random_score = random.sample(list(self.store.score.iteritems()), 1)[0]
        else:
            random_release = None

        failures = self.store.failures[:10]

        top10_installability, top10_documentation, top10_code_kwalitee = get_top10_lists(self.store)

        random_year = random.randint(1800, 2006)

        web.render('index.html')

class top10(HandlerWithStore):
    def _GET(self):
        top10_installability, top10_documentation, top10_code_kwalitee = get_top10_lists(self.store)

        web.render('top10.html')

class messages:
    def GET(self):
        messages = web.select("messages", limit=10, order='time DESC')
        web.render('messages.html')

class message:
    def GET(self, id):
        message = web.select("messages", where=["id = %s", (id,)])[0]
        level, text = message.message.split(' ', 1)
        web.render('message.html')

class scores(HandlerWithStore):
    def _GET(self):
        scores = list(self.store.score.iteritems())
        scores.sort(lambda x,y: cmp(x[0][0].lower(), y[0][0].lower()))

        web.render('scores.html')

class scores_installability(HandlerWithStore):
    def _GET(self):
        scores = get_top10_lists(self.store)[0]

        web.render('scores_installability.html')

class scores_documentation(HandlerWithStore):
    def _GET(self):
        scores = get_top10_lists(self.store)[1]

        web.render('scores_documentation.html')

class scores_code_kwalitee(HandlerWithStore):
    def _GET(self):
        scores = get_top10_lists(self.store)[2]

        web.render('scores_code_kwalitee.html')

class score(HandlerWithStore):
    def _GET(self, name, version):
        score = None
        logs = []
        if (name, version) in self.store.score:
            score = self.store.score[(name, version)]
            logs = self.store.logs_for_release(name, version)

        web.render('score.html')

class easy_installability(HandlerWithStore):
    def _GET(self):
        easy_installable_releases = []
        not_easy_installable_releases = []
        last_logname_of = {}
        _not_installable_because_of_download = []
        _not_installable_because_of_install = []

        for release, score in self.store.score.iteritems():
            if score.can_be_installed():
                easy_installable_releases.append(release)
            else:
                not_easy_installable_releases.append(release)
                last_logname_of[release] = self.store.logs_for_release(*release)[-1]
                _not_installable_because_of_install.append(release)

        # Append all failed runs as well.
        for run in self.store._runs_failure:
            release = (run.package, run.version)

            not_easy_installable_releases.append(release)
            last_logname_of[release] = run.logname()
            _not_installable_because_of_download.append(release)

        not_easy_installable_releases = remove_duplicates(not_easy_installable_releases)

        easy_installable_releases.sort()
        not_easy_installable_releases.sort()

        not_installable_because_of_download = lambda r: r in _not_installable_because_of_download
        not_installable_because_of_install = lambda r: r in _not_installable_because_of_install

        web.render('easy_installability.html')

class StaticHandler(object):
    def outputter(self, content, _):
        sys.stdout.write(content)

    def GET(self, filename):
        if not filename:
            web.notfound()
            return

        path = os.path.join(os.getcwd(),
                            os.path.join(self.top_directory, filename))
        path = os.path.realpath(path)

        base = os.path.realpath(self.top_directory)

        try:
            # Allow reading files only below top_directory.
            if not path.startswith(base):
                raise ValueError

            self.outputter(read_file_contents(path), filename)
        except (IOError, ValueError):
            web.notfound()

class static(StaticHandler):
    top_directory = STATIC_FILES_DIRECTORY

class log(StaticHandler):
    top_directory = LOG_DIRECTORY

    def outputter(self, content, logname):
        content = '<pre>%s</pre>' % put_anchors(web.htmlquote(content).replace('\n', '<br />'))
        web.render('log.html')


web.db_parameters = dict(dbn='postgres', db=DB_NAME, user=DB_USER, pw=DB_PASSWORD)
main = web.wsgifunc(web.webpyfunc(urls))

if __name__ == '__main__':
    web.run(urls, web.reloader)