Source

3Task / project / utils.py

Full commit
import copy
import re

from django.db.models import Min, Q
from django.db import models
from django.contrib.contenttypes.models import ContentType

from openauth.utils import parse_template
from reversion.models import Version

from project.models import Status, StatusRule, Task, ReleaseTask
from project import settings


def get_tasks_tree(task, sort, depth, filter_, project=None):
    '''
    Pass task=None to get list of root nodes
    '''
    new_depth = max(0, depth - 1)
    if task:
        subtasks = task.get_children()\
                       .order_by(*sort)\
                       .select_related('task_type', 'status', 'priority', 'project', 'parent')
    else:
        subtasks = Task.tree.root_nodes()\
                       .filter(project=project)\
                       .order_by(*sort)\
                       .select_related('task_type', 'status', 'priority', 'project', 'parent')

    if filter_ == 'open':
        subtasks = subtasks.filter(status__visibility=settings.VISIBILITY_ACTIVE)
    elif filter_ == 'opendone':
        subtasks = subtasks.filter(Q(status__visibility=settings.VISIBILITY_ACTIVE) |
                                   Q(status__visibility=settings.VISIBILITY_DONE))
    elif filter_ == 'features':
        subtasks = subtasks.filter(task_type__behavior=settings.TYPE_FEATURE)
    elif filter_ == 'all':
        pass
    else:
        raise Exception('Invalid filter_ value: %s' % filter_)

    task_list = []
    for t in subtasks:
        task_list.append(t)
        if depth > 0:
            task_list += get_tasks_tree(t, sort, new_depth, filter_)
    return task_list


def update_parent_statuses(request, task):
    """
    recursevly update all parents
    """
    project = task.project
    parent = task.parent
    if parent:
        old_version = copy.deepcopy(parent)

        # get min weight of all descendands tasks
        try:
            weight = [x.status.weight for x in parent.get_descendants().select_related('status')]
            min_weight = min(weight)
        except ValueError:
            min_weight = parent.status.weight

        # get all possible destinations that parent could switch from current status
        dst_variants = [x.dst for x in StatusRule.objects.filter(src=parent.status)]
        dst_variants.append(parent.status)

        if dst_variants:

            # sort variants according to closest weight to minimal
            def compare_weight(a, b):
                return cmp(abs(min_weight - a.weight), abs(min_weight - b.weight))
            dst_variants.sort(compare_weight)

            closest_dst = dst_variants[0]

            # update parent status only if its different from closest destination.
            if parent.status != closest_dst:
                parent.status = closest_dst
                parent.status_comment = 'Descendant task #%d: status changed to %s' % (
                    task.task_number, task.status.name)
                parent.save()

            if request:
                find_changes(request, old_version, parent)

        # Go level up
        update_parent_statuses(request, parent)


class dummy_writer:
    def __init__(self):
        self.out = []
    def write(self, new):
        self.out.append(new.replace('\r\n', ''))


def add_row(writer, task, root_level):
    writer.writerow([
                     "—" * (task.level - root_level),
                     task.title,
                     task.desc,
                     task.priority.name if task.priority else 'normal',
                     task.status.name if task.status else '',
                     task.reporter.username,
                     task.owner.username if task.owner else ''
                     ])


def email_template(rcpt, template_path, **kwargs):
    """
    Load, render and email template.

    **kwargs may contain variables for template rendering.
    """
    from django.conf import settings
    from django.core.mail import EmailMessage

    subject, content = parse_template(template_path, **kwargs)
    msg = EmailMessage(subject, content, settings.DEFAULT_FROM_EMAIL, [rcpt])
    msg.content_subtype = "html"
    msg.send()


def find_changes(request, old_version, obj, old_m2m_data={}):
    changed_data = {}
    action = settings.EDIT
    for field in obj._meta.fields:
        try:
            old_value = getattr(old_version, field.name)
        except field.rel.to.DoesNotExist:
            action = settings.ADD
            old_value = None
        new_value = getattr(obj, field.name)
        if old_value != new_value:
            if issubclass(field.__class__, models.ForeignKey):
                changed_data.update({field.verbose_name: unicode(new_value)})

            elif issubclass(field.__class__, (models.DateTimeField, models.DateField)):
                continue
            else:
                changed_data.update({field.verbose_name: new_value})

    for m2mfield in Task._meta.many_to_many:
        try:
            new_value = ", ".join(i.__unicode__() for i in m2mfield.value_from_object(obj))
            if old_m2m_data[m2mfield.verbose_name] != new_value:
                changed_data.update({m2mfield.verbose_name: unicode(new_value)})
        except KeyError:
            if new_value:
                changed_data.update({m2mfield.verbose_name: unicode(new_value)})


    request.changed.append({
        'property': changed_data,
        'object_pk': obj.task_number,
        'ctype': ContentType.objects.get_for_model(obj),
        'action': action,
        'old_version': old_version,
        'new_version': obj,
        })


def get_m2m_fields_value(obj):
    """
    Return dict with string representaiton of all m2m
    fields value in object.
    """
    data = {}
    for m2mfield in obj._meta.many_to_many:
        value = ", ".join(i.__unicode__() for i in m2mfield.value_from_object(obj))
        data.update({m2mfield.verbose_name: unicode(value)})
    return data


def role_changed(request, _type, changed_role):
    data = {
            'type': _type,
            'role': changed_role,
            'project': request.project,
            'modified_by': request.user,
            'url': request.build_absolute_uri(request.project.show_url),
            }
    template = "project/mail/role_changed.html"
    for role in request.project.roles.all():
        if role.user == request.user and request.user.notification_set.filter(project=request.project)[0].myself:
            continue
        email_template(role.user.email, template, **data)



def update_descendant_releases(request, task, removed, added):
    """
    For each descendant of given task update releases:
    remove those contains in removed list and add those
    contains in added list.
    """

    for child in task.get_descendants():
        old_m2m_data = get_m2m_fields_value(child)
        for r in removed:
            ReleaseTask.objects.filter(task=child, release=r).delete()
        for r in added:
            try:
                ReleaseTask.objects.get(task=child, release=r)
            except ReleaseTask.DoesNotExist:
                ReleaseTask.objects.create(task=child, release=r)
        find_changes(request, child, child, old_m2m_data)


def count_fetched_descendants(task, object_list, counter=0):
    for t in object_list:
        if t.parent == task:
            counter = count_fetched_descendants(t, object_list, counter + 1)
    return counter


def parse_sort_argument(sort_string, sort_config, default_sort_key='priority',
                        default_sort_order=''):
    """
    Extract the field name and the order by which objects should be sorted.

    Accept two arguments:
    * the value from GET request
    * the config - mapping of readeable keys on ORM valid field names
    Return two values:
    * the correct argument for django queryset function order_by
    * the sort argument for using in HTML links
    
    sort logic described here http://3task.com/p/3task/task/151/
    """

    match = re.match(r'(-|)([a-z]+)$', sort_string or '')
    if match:
        sort_order = match.group(1)
        sort_key = match.group(2)
    if not match or not sort_key in sort_config:
        sort_key = default_sort_key
        sort_order = default_sort_order
    orm_sort_key = ['%s%s' % (sort_order, sort_config[sort_key])]
    sort_string = '%s%s' % (sort_order, sort_key)
    if sort_key == "priority":
        orm_sort_key.append(sort_config['title'])
    else:
        orm_sort_key.append(sort_config['priority'])
    return orm_sort_key, sort_string


def show_diff_sequnece(seqm):
    """Unify operations between two compared string sequences
    seqm is a difflib.SequenceMatcher instance whose a & b are 
    list of string lines"""

    output= []
    for group in seqm.get_grouped_opcodes(2):
        for opcode, a0, a1, b0, b1 in group:
            if opcode == 'equal':
                output.append("".join(seqm.a[a0:a1]))
            elif opcode == 'insert':
                output.append("<ins>" + "".join(seqm.b[b0:b1]) + "</ins>")
            elif opcode == 'delete':
                output.append("<del>" + "".join(seqm.a[a0:a1]) + "</del>")
            elif opcode == 'replace':
                output.append("<del>" + "".join(seqm.a[a0:a1]) + "</del>" + "<ins>" + "".join(seqm.b[b0:b1]) + "</ins>")
            else:
                raise RuntimeError, "unexpected opcode"
    return ''.join(output)


def get_type_and_filter(request):
    """
    save filtering in session seperate for each project and
    return type and filter fields
    """
    if 'list' in request.GET:
        request.session[request.project.slug + "-list_value"] = request.GET['list']
    if 'filter' in request.GET and request.GET['filter'] in ['open', 'opendone', 'features', 'all']:
        request.session[request.project.slug + "-filter_value"] = request.GET['filter']

    default_list_value = request.session.get(request.project.slug + "-list_value", "smart")
    default_filter_value = request.session.get(request.project.slug + "-filter_value", "open")

    list_type = request.GET.get('list', default_list_value)
    filter_ = request.GET.get('filter', default_filter_value)
    return default_list_value, default_filter_value, list_type, filter_


def get_templates(user):
    templates = [(0, "Basic Template")]
    for project in user.project_set.all():
        templates.append((project.id, project.name))
    return templates


def copy_to_project(project, obj, **kwargs):
    """
    Copy object to another project
    """
    obj = copy.deepcopy(obj)
    obj.pk = None
    obj.project = project
    for k,v in kwargs.items():
        setattr(obj, k, v)
    obj.save()
    return obj