1. Unofficial Redmine Clones
  2. Untitled project
  3. hgredmine

Source

hgredmine / hgredmine.py

# hgredmine.py - Web interface for repositories configured in
#                Redmine projects.
#
# Copyright 1 Oct 2009 - (c) 2010 Brant Young <brant@9thsoft.com>
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.

from base64 import b64decode
import logging
import hashlib

import os, re, time
from mercurial.i18n import _
from mercurial import ui, hg, util, templater
from mercurial import error, encoding
from mercurial.hgweb.common import ErrorResponse, get_mtime, staticfile, paritygen,\
                   get_contact, HTTP_OK, HTTP_NOT_FOUND, HTTP_SERVER_ERROR
from mercurial.hgweb.hgweb_mod import hgweb
from mercurial.hgweb.request import wsgirequest
import mercurial.hgweb.webutil

from mercurial.hgweb.common import HTTP_BAD_REQUEST, HTTP_UNAUTHORIZED, HTTP_METHOD_NOT_ALLOWED
from mercurial.hgweb.hgwebdir_mod import hgwebdir
from mercurial.hgweb import hgweb_mod

def _is_admin(req):
    """
    Find repos from Redmine database.
    
    @return True / False
    """
    return req.env.get('REMOTE_USER_ADMIN', 'f') == 't'

class HgRedmine(hgwebdir):
    """A simple HTTP basic authentication implementation (RFC 2617) usable
    as WSGI middleware.
    """
    
    def __init__(self, realm, dsn, conf, baseui=None):
        self.dsn = dsn
        self.realm = realm
        self.placeholder = None
        
        hgwebdir.__init__(self, conf, baseui)
    
    def findrepos(self, db):
        """
        Find repos from Redmine database.
        """
        dbcur = db.cursor()
        dbcur.execute('SELECT projects.identifier, repositories.url FROM projects, repositories '
                            'WHERE repositories.type="Mercurial" AND projects.id = repositories.project_id'
                       )
        
        repos = {}
        row = dbcur.fetchone()
        
        while row:
            repos[row[0]] = row[1]
            row = dbcur.fetchone()
            
        self.repos = repos.items()
        
    def _send_challenge(self, req):
        req.header([('WWW-Authenticate', 'Basic realm="%s"' % self.realm)])
        raise ErrorResponse(HTTP_UNAUTHORIZED, 'List Redmine repositories is unauthorized')
    
    def _user_login(self, db, req):
        req.env['REMOTE_USER'] = None
        req.env['REMOTE_USER_ADMIN'] = 'f'
        
        header = req.env.get('HTTP_AUTHORIZATION')
        if not header or not header.startswith('Basic'):
            return False
        
        creds = b64decode(header[6:]).split(':')
        if len(creds) != 2:
            return False
        
        username, password = creds
        
        hashed_password = hashlib.sha1(password).hexdigest()
        
        dbcur = db.cursor()
        dbcur.execute('SELECT users.admin FROM users '
                            'WHERE users.login=%(ph)s AND users.hashed_password=%(ph)s' 
                             % {'ph':self.placeholder},
                      (username, hashed_password)
                     )
        
        row = dbcur.fetchone()
        if not row:
            return False
        
        req.env['AUTH_TYPE'] = 'Basic'
        req.env['REMOTE_USER'] = username
        is_admin = row[0]
        if type(is_admin) == int:
            if is_admin == 0:
                is_admin = 'f'
            else:
                is_admin = 't'
        req.env['REMOTE_USER_ADMIN'] = is_admin
        
        return True
        
        
    def _setup_repo(self, db, repo, project_id):
        dbcur = db.cursor()
        dbcur.execute('SELECT projects.name, projects.description FROM projects '
                            'WHERE projects.identifier=%s' % (self.placeholder,),
                      (project_id, )
                     )
        
        row = dbcur.fetchone()
        if not row:
            return
        
        repo.ui.setconfig('web', 'name', row[0])
        repo.ui.setconfig('web', 'description', row[1])
        repo.ui.setconfig('web', 'contact', 'Project Owner')
    
    def run_wsgi(self, req):
        try:
            try:
                db, self.placeholder = connect(self.dsn)
                
                self.refresh()
                self.findrepos(db)

                virtual = req.env.get("PATH_INFO", "").strip('/')
                tmpl = self.templater(req)
                ctype = tmpl('mimetype', encoding=encoding.encoding)
                ctype = templater.stringify(ctype)

                # a static file
                if virtual.startswith('static/') or 'static' in req.form:
                    if virtual.startswith('static/'):
                        fname = virtual[7:]
                    else:
                        fname = req.form['static'][0]
                    static = templater.templatepath('static')
                    return (staticfile(static, fname, req),)
                
                self._user_login(db, req)
                
                # top-level index
                if not virtual:
                    # only administrators can list repositories
                    if _is_admin(req):
                        req.respond(HTTP_OK, ctype)
                        return self.makeindex(req, tmpl)
                    else:
                        self._send_challenge(req)
                
                # navigate to hgweb
                project_id = virtual.split('/')[0]
                
                repos = dict(self.repos)
                real = repos.get(project_id)
                
                if real:
                    req.env['REPO_NAME'] = project_id
                    
                    try:
                        repo = hg.repository(self.ui, real)
                        self._setup_repo(db, repo, project_id)
                        return HgwebRedmine(db, self.placeholder, self.realm, repo).run_wsgi(req)
                    except IOError, inst:
                        msg = inst.strerror
                        raise ErrorResponse(HTTP_SERVER_ERROR, msg)
                    except error.RepoError, inst:
                        raise ErrorResponse(HTTP_SERVER_ERROR, str(inst))
                
                # prefixes not found
                req.respond(HTTP_NOT_FOUND, ctype)
                return tmpl("notfound", repo=virtual)

            except ErrorResponse, err:
                req.respond(err, ctype)
                return tmpl('error', error=err.message or '')
        finally:
            if vars().has_key('db'):
                db.close()
            
            db = None
            tmpl = None


perms = {
    'lookup': 'pull',
    'heads': 'pull',
    'branches': 'pull',
    'between': 'pull',
    'capabilities': 'pull',
    'branchmap': 'pull',
    
    'changegroup': 'pull',
    'changegroupsubset': 'pull',
    'unbundle': 'push',
    'stream_out': 'pull',
}
hgweb_mod.perms = perms
            
class HgwebRedmine(hgweb):
    def __init__(self, dbconn, placeholder, realm, repo, name=None):
        self.db = dbconn
        self.realm = realm
        self.placeholder = placeholder
        
        hgweb.__init__(self, repo, name)
    
    def _send_challenge(self, req, msg):
        req.header([('WWW-Authenticate', 'Basic realm="%s"' % self.realm)])
        raise ErrorResponse(HTTP_UNAUTHORIZED, msg)
        
    def _get_perms(self, user, project_id):
        """
        Find member permissions from Redmine database.
        
        Redmine repository relate permissions:
            repo admin - :manage_repository
            allow_read - :browse_repository
            allow_pull - :view_changesets
            allow_push - :commit_access
        
        @return (allow_read, allow_pull, allow_push) tuple
        """
        is_public = self._is_public_repo(project_id)
        
        if not user: # anonymous user
            if is_public:
                return (True, True, False)
            
            return (False, False, False)
        
        # Redmine member
        
        dbcur = self.db.cursor()
        dbcur.execute('SELECT roles.permissions FROM users, projects, members, roles, member_roles '
                            'WHERE users.login=%(ph)s AND projects.identifier=%(ph)s' 
                            ' AND projects.id = members.project_id AND users.id = members.user_id '
                            'AND members.id = member_roles.member_id '
                            'AND roles.id = member_roles.role_id' % {'ph':self.placeholder},
                      (user, project_id)
                     )
        
        row = dbcur.fetchone()
        if not row:
            # user doesn't have any permits
            return (False, False, False)
        
        perms = row[0].splitlines()
        
        if '- :manage_repository' in perms or '- :commit_access' in perms :
            return (True, True, True)
        
        if '- :view_changesets' in perms:
            return (True, True, False)
        
        if '- :browse_repository' in perms:
            return (True, False, False)
        
        return (False, False, False)
            
    def _is_public_repo(self, project_id):        
        dbcur = self.db.cursor()
        dbcur.execute('SELECT projects.is_public FROM projects '
                            'WHERE projects.identifier=%s' % (self.placeholder,),
                      (project_id, )
                     )
        
        row = dbcur.fetchone()
        if not row:
            return False
            
        return row[0] == 't'
    
    def check_perm(self, req, op):
        '''Check permission for operation based on request data (including
        authentication info). Return if op allowed, else raise an ErrorResponse
        exception.'''
        
        user = req.env.get('REMOTE_USER')
        project_id = req.env.get('REPO_NAME')
        
        if _is_admin(req) :
            allow_read, allow_pull, allow_push = (True, True, True)
        else :
            allow_read, allow_pull, allow_push = self._get_perms(user, project_id)
        
        if not allow_read:
            self._send_challenge(req, 'read not authorized')
        
        if op == 'pull' and not self.allowpull:
            raise ErrorResponse(HTTP_UNAUTHORIZED, 'pull not authorized')
        elif op == 'pull' and not allow_pull :
            self._send_challenge(req, 'pull not authorized')
        elif op == 'pull' or op is None: # op is None for interface requests
            return
        
        # enforce that you can only push using POST requests
        if req.env['REQUEST_METHOD'] != 'POST':
            msg = 'push requires POST request'
            raise ErrorResponse(HTTP_METHOD_NOT_ALLOWED, msg)

        # require ssl by default for pushing, auth info cannot be sniffed
        # and replayed
        scheme = req.env.get('wsgi.url_scheme')
        if self.configbool('web', 'push_ssl', True) and scheme != 'https':
            raise ErrorResponse(HTTP_OK, 'ssl required')
        
        if not allow_push:
            self._send_challenge(req, 'push not authorized')


def connect(dsn):
    """
    Connect to database parsing dsn.

    @param dsn Database specification.
    @return Database object.
    """
    
    driver = dsn['ENGINE']
    host = dsn['HOST']
    user = dsn['USER']
    password = dsn['PASSWORD']
    dbname = dsn['NAME']
    port = dsn['PORT']
    
    # Try to import database driver
    if driver == 'mysql':
        import MySQLdb

        # Create database
        db = MySQLdb.connect(
            user=user, passwd=password, host=host,
            port=port, db=dbname, use_unicode=True
        )
        placeholder = "%s"
        
    elif driver == 'postgresql':
        import psycopg2, psycopg2.extras, psycopg2.extensions
        psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
        
        if not port:
            port = '5432'
        
        dsn = "dbname='%s' user='%s' host='%s' password='%s' port=%s" % (
                dbname, user, host, password, port
        )
        
        db = psycopg2.connect(dsn)
        db.set_client_encoding('UTF-8')
        placeholder = "%s" #not so sure ;)

    elif driver == 'sqlite3':
        import sqlite3
        
        # Create database
        db = sqlite3.connect(dbname)
        placeholder = "?"
    else:
        raise ValueError('Unknown database type %s' % (driver, ))
    
    return [db, placeholder]