freehg / repos /

from django.db import models
from django.contrib.auth.models import User
from django.conf import settings
from django.dispatch import dispatcher
from django.utils.html import escape, linebreaks, urlize

import os.path
import subprocess
import shutil
import re

from mercurial import hg, ui

path_re = re.compile(settings.REPO_PATH+"/(\w+)/(\w(\w|-|\.)+\w)(/\.hg)?/?$")

class RepoManager(models.Manager):
    def get_from_path(self, path):
        Returns the repository that corrisponds to the given filesystem

        ``path`` may optionally include the ".hg" directory.
        if path.endswith("/.hg"):
            path = path[:-4]
        match = path_re.match(path)
        if not match:
            raise ValueError("Invalid repository path: %r" % path)
        g = match.groups()
        return self.get(owner__username=g[0], name=g[1])

    def get_user_disk_usage(self, user):
        from django.db import connection
        cursor = connection.cursor()
        cursor.execute("SELECT sum(r.disk_usage) "
                "FROM repos_repo AS r JOIN auth_user AS u ON = r.owner_id "
                "WHERE u.username=%s ", [user.username])
        return cursor.fetchone()[0]

    def update_all_disk_usage(self):
        for repo in self.all():

    def update_all_hgrc(self):
        for repo in self.all():

class Repo(models.Model):
    # 'name' is the name used in the url and filesystem.
    name = models.CharField(max_length=30)

    long_name = models.CharField(max_length=50, blank=True)
    description = models.TextField(max_length=5000, blank=True)

    owner = models.ForeignKey(User)
    allow_push = models.ManyToManyField(User, related_name="allow_push_set",

    creation_date = models.DateTimeField(auto_now_add=True)

    disk_usage = models.IntegerField(blank=True, null=True)

    objects = RepoManager()

    class Admin:
        list_display = ("name", "owner")
        search_fields = ("name", "owner__username")

    class Meta:
        unique_together = (("name", "owner"),)

    def file_path(self):
        return settings.REPO_PATH+'/'+self.owner.username+"/"

    def hgrc_filename(self):
        return self.file_path + "/.hg/hgrc"

    def get_absolute_url(self):
        return "/u/%s/%s/" % (self.owner.username,

    def manage_url(self):
        return "/repos/manage/%s/%s/" % (self.owner.username,

    def __str__(self):
        return "%s/%s" % (self.owner.username,

    def __repr__(self):
        return "<Repo %s %s>" % (self.owner.username,

    def write_hgrc(self):
        f = open(self.hgrc_filename, 'w')
        def wl(line):
            f.write(line.encode('utf8') + "\n")
        f.write("push_ssl = false\n")
        f.write("allow_push = %s\n" % self.owner.username)
        if self.long_name:
            wl("name = %s" % escape(self.get_long_name()))
            wl("name = %s" %
        if self.description:
            wl(u"description = %s" %
                self.get_html_description().replace('\n','').replace('%', '%%'))
        # TODO do some more sanity checks for the name.  (Mostly make sure it
        # doesn't have any linebreaks.)
        wl(u"contact = %s" % (self.owner.get_full_name() or self.owner.username))

        f.write("changegroup = python:freehg.repos.hooks.changegroup\n")


    def update_disk_usage(self):
        p = subprocess.Popen(['du','-s','-b',self.file_path], stdout=subprocess.PIPE)
        if p.wait() != 0:
            raise RuntimeError("Calling du failed (returned %d.)" %
        result =
        match ="^(\d+)", result)
        if not match:
            raise ValueError("Could not parse du output: %r" % result)
        self.disk_usage = int(match.groups()[0])

    def get_html_description(self):
        return linebreaks(urlize(escape(self.description)))

    def get_long_name(self):
        if self.long_name:
            return self.long_name

    def __unicode__(self):

def save_repo(instance):
    repo = instance
    if not os.path.exists(repo.file_path):
        hg.repository(ui.ui(), repo.file_path, create=True)
    return True
dispatcher.connect(save_repo, signal=models.signals.post_save, sender=Repo)

def delete_repo(instance):
    repo = instance
    if os.path.exists(repo.file_path):
        return True
dispatcher.connect(delete_repo, signal=models.signals.post_delete, sender=Repo)
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.