Source

django-tagging2 / tagging2 / managers.py

Full commit
from itertools import groupby

from django.contrib.contenttypes.models import ContentType
from django.db import models
from django.db.models import Count

from models import TaggedItem, Tag
from utils import get_tag_list, LOGARITHMIC, calculate_cloud, cache_result


class TagManager(models.Manager):
    def _get_usage(self, queryset, min_count=None, top=False):
        """
        Obtain a list of dicts with tag names and counters if counts is True
        """
        objects_id = [i.pk for i in queryset.only("id")]
        ctype = ContentType.objects.get_for_model(self.model)
        qs = TaggedItem.objects.filter(content_type=ctype, object_id__in=objects_id).values('tag__name')
        qs = qs.annotate(count=Count('tag'))
        if top:
            qs = qs.order_by('-count')
        tags = []
        for item in qs:
            tag = {'name': item['tag__name'], 'count': item['count']}
            if min_count and min_count > item['count']:
                continue
            tags.append(tag)
        return tags

    def usage_for_model(self, filters=None, min_count=None, top=False):
        """
        Obtain a list of all tags associated with Model excluding filtered instances.

        If ``min_count`` is given, only tags which have a ``count``
        greater than or equal to ``min_count`` will be returned.
        Passing a value for ``min_count`` implies ``counts=True``.

        To limit the tags (and counts, if specified) returned to those
        used by a subset of the Model's instances, pass a dictionary
        of field lookups to be applied to the given Model as the
        ``filters`` argument.
        
        If ``top==True``, tags will be ordered by popularity, otherwise alphabet ordering.
        """
        
        if filters is None: 
            filters = {}
            
        queryset = self.model._default_manager.filter()
        for filter in filters.items():
            queryset.query.add_filter(filter)
            
        return self._get_usage(queryset, min_count, top)

    def cloud_for_model(self, steps=None, distribution=LOGARITHMIC,
                        filters=None, min_count=None, top=False):
        """
        Obtain a list of tags associated with instances of the given
        Model, giving each tag a ``count`` attribute indicating how
        many times it has been used and a ``font_size`` attribute for
        use in displaying a tag cloud.

        ``steps`` defines the range of font sizes - ``font_size`` will
        be an integer between 1 and ``steps`` (inclusive).

        ``distribution`` defines the type of font size distribution
        algorithm which will be used - logarithmic or linear. It must
        be either ``tagging2.utils.LOGARITHMIC`` or
        ``tagging2.utils.LINEAR``.

        To limit the tags displayed in the cloud to those associated
        with a subset of the Model's instances, pass a dictionary of
        field lookups to be applied to the given Model as the
        ``filters`` argument.

        To limit the tags displayed in the cloud to those with a
        ``count`` greater than or equal to ``min_count``, pass a value
        for the ``min_count`` argument.
        """
        tags = self.usage_for_model(filters, min_count, top)
        return calculate_cloud(tags, steps, distribution)

    def get_items_by_tags(self, tags):
        ctype = ContentType.objects.get_for_model(self.model)
        tags = get_tag_list(tags)
        items_id_list = [tagged_item.object_id for tag in tags for tagged_item in tag.items.filter(content_type=ctype)]
        items = self.model._default_manager.filter(pk__in=items_id_list)
        return items
    
    def related_tags_cloud(self, tags, steps=None, min_count=None):
        """
        Obtain tags cloud used in instances along with tags provided
        
        ``tags` - tag instances, tag primary keys or string 
        representation of tags    
        """
        items = self.get_items_by_tags(tags)
        tags = self._get_usage(items)
        return calculate_cloud(tags, steps=steps)
        
    def related_tags(self, tags, min_count=None, top=False):
        """
        Obtain a list of tags related to a given list of tags - that
        is, other tags used by items which have all the given tags.

        If ``min_count`` is given, only tags which have a ``count``
        greater than or equal to ``min_count`` will be returned.
        Passing a value for ``min_count`` implies ``counts=True``.
        """
        items = self.get_objects_with_any_tags(tags)
        return self._get_usage(items, min_count, top)
    
    def _get_tagged_items_by_tags(self, tags):
        tags = get_tag_list(tags)
        ctype = ContentType.objects.get_for_model(self.model)
        tagged_items = TaggedItem.objects.filter(tag__in=tags, content_type=ctype).order_by("object_id")
        return tagged_items
    
    def get_objects_with_all_tags(self, tags):
        """
        Create a ``QuerySet`` containing instances of the specified
        model associated with *all* of the given list of tags.
        """
        tags = get_tag_list(tags)
        tagged_items = self._get_tagged_items_by_tags(tags)

        objects_id_list = []
        
        for object_id, tag_list in groupby(tagged_items, lambda i: i.object_id):
            if len(list(tag_list)) == len(tags):
                objects_id_list.append(object_id)
                
        items = self.model._default_manager.filter(pk__in=objects_id_list)

        return items

    def get_objects_with_any_tags(self, tags):
        """
        Create a ``QuerySet`` containing instances of the specified
        model associated with *any* of the given list of tags.
        """
        tagged_items = self._get_tagged_items_by_tags(tags)
        objects_id_list = [item.object_id for item in set(tagged_items)]
        items = self.model._default_manager.filter(pk__in=objects_id_list)
        
        return items