django-acl / acl / utils.py

from django.db import connection, models
from django.db.models import Q
from django.core.cache import cache
from django.contrib.auth.models import Permission
from django.contrib.contenttypes.models import ContentType
from acl.models import Role, RoleData, ACL, ACL_YES, ACL_NEVER, ACL_NO

qn = connection.ops.quote_name

def get_model_label_and_name(model):
    if isinstance(model, models.Model):
        model = model.__class__
    if not isinstance(model, str):
        app_label = model._meta.app_label
        model = model.__name__.lower()
    else:
        app_label, model = model.split('.')
    return app_label, model

def get_model_contenttype(model):
    return '%s.%s' % (get_model_label_and_name(model))
get_model_path = get_model_contenttype

def permissions_for_groups(groups=None):
    raise NotImplementedError('will I really implement this?')

def permissions_for_roles(roles=None):
    permissions = dict()
    cached_roles = dict()
    sql = """
        SELECT rd.%s, ct.%s, p.%s, rd.%s
        FROM %s rd
        INNER JOIN %s p ON (rd.%s = p.%s)
        INNER JOIN %s ct ON (p.%s = ct.%s)
        """ % tuple(qn(param) for param in
                ('role_id', 'app_label', 'codename', 'setting',
                'acl_roledata', 'auth_permission', 'permission_id', 'id',
                'django_content_type', 'content_type_id', 'id'))
    
    if roles:
        cached_roles = cache.get('acl.roles') or cached_roles
        for role in cached_roles:
            if role in roles:
                permissions[role] = cached_roles[role]
                roles.remove(role)
        if not roles:
            return permissions
        where = "WHERE rd.%s IN (%%s)" % (qn('role_id'))
        roles = [', '.join(str(id) for id in roles)]
        sql = '\n'.join((sql, where))

    cursor = connection.cursor()
    cursor.execute(sql, roles)
    for row in cursor.fetchall():
        permissions.setdefault(row[0], list()).append(row[1:])

    cached_roles.update(permissions)
    cache.set('acl.roles', cached_roles)
    return permissions



# XXX deprecated, should be removed. I am keeping it here only for the SQL
# que.
def _permissions_for_model(user, model):
    cursor = connection.cursor()
    permissions = dict()

    app_label, model = get_model_label_and_name(model)

    def set_perm(object, label, codename, setting):
        # we're assuming that ACL_NO are not stored.
        p_name = '%s.%s' % (label, codename)
        p_dict = permissions.setdefault(object, dict())
        if p_dict.get(p_name) != ACL_NEVER:
            p_dict[p_name] = setting

    query = ACL.objects.values_list('role_id', 'group_id', 'object_id',
                                    'permission__content_type__app_label',
                                    'permission__codename', 'setting',
                                    flat=True
            ).filter(Q(user=user) | Q(group__user_id=user)
            ).filter(content_type__app_label=app_label,
                     content_type__model=model)
    
    sql = """
        SELECT acl.%s, acl.%s, acl.%s,
            pct.%s, p.%s, acl.%s
        FROM %s acl
        INNER JOIN %s act ON (acl.%s = act.%s)
        LEFT OUTER JOIN %s p ON (acl.%s = p.%s)
        LEFT OUTER JOIN %s pct ON (p.%s = pct.%s)
        LEFT OUTER JOIN %s ug ON (acl.%s = ug.%s)
        WHERE (acl.%s = %%s OR ug.%s = %%s)
            AND act.%s = %%s
            AND act.%s = %%s;
        """ % tuple(qn(param) for param in (
                'role_id', 'group_id', 'object_id', 'app_label', 'codename',
                'setting', 'acl_acl', 'django_content_type', 'content_type_id',
                'id', 'auth_permission', 'permission_id', 'id',
                'django_content_type',  'content_type_id', 'id',
                'auth_user_groups', 'group_id', 'group_id', 'user_id',
                'user_id', 'app_label', 'model'))
    
    cursor.execute(sql, [user.id, user.id, app_label, model])
    res = cursor.fetchall()
    roles = set([row[0] for row in res if row[0]])
    """
    for row in res:
        if not row[0]:
            permissions.setdefault(row[2], list()).append(row[3:])
        else:
            roles.append(
    """

    if roles:
        roles_permissions = permissions_for_roles(roles)
    
    for role, group, object, label, codename, setting in res:
        if role:
            for perm in roles_permissions[role]:
                set_perm(object, *perm)
        else:
            set_perm(object, label, codename, setting)

    return permissions

def get_permissions(user, model=None, all=True):
    cursor = connection.cursor()
    permissions = dict()

    def set_perm(path, perm, setting):
        # we're assuming that ACL_NO are not stored.
        p_dict = permissions
        for key in path:
            p_dict = p_dict.setdefault(key, dict())
        p_name = '%s.%s' % perm
        if p_dict.get(p_name) != ACL_NEVER:
            p_dict[p_name] = setting
    
    # this query can be optimized
    query = ACL.objects.values_list(
                'content_type__app_label', 'content_type__model', 'object_id',
                'role_id', 'permission__content_type__app_label',
                'permission__codename', 'setting'
            ).filter(Q(user=user) | Q(group__user=user))
    if model: 
        app_label, model = get_model_label_and_name(model)
        query = query.filter(content_type__app_label=app_label,
                             content_type__model=model)
    elif not all:
        query = query.filter(object_id=None)
    
    #print query.query.as_sql()
    #query = list(query)
    roles = set([row[3] for row in query if row[3]])

    if roles:
        roles_permissions = permissions_for_roles(roles)
    
    for app, model, id, role, p_label, p_code, setting in query:
        #path = (app, model, id) if app else (model, id)
        path = app and (app, model, id) or (None,)
        if role:
            for perm in roles_permissions[role]:
                set_perm(path, perm[0:2], perm[2])
        else:
            set_perm(path, (p_label, p_code), setting)

    # if we are getting permissions for a single model
    # instead of returning {'auth.user': perms} return only perms.
    #if model and permissions:
    #    return permissions.values()[0]
    return permissions

# tyrion.set_perm('pybb.view_forum')
# tyrion.set_acl('pybb.view_forum', model=a_forum)
# tyrion.set_acl('pybb.view_forum')
def set_acl(**params):
    """
    Set permission or role to setting for user or group.
    ::

        >>> # the user john will *never* be able to add a thread.
        >>> set_acl(user=john, permission='pybb.add_thread', setting=ACL_NEVER)

        >>> # treat the anonymous user as a normal forum user.
        >>> set_acl(user=AnonymousUser(), role='ForumUser')

        >>> # the moderators will not be able to add a forum.
        >>> set_acl(group=moderators, permission='pybb.add_forum', setting=ACL_NO)
    
    If the model parameter is provided it should be either a
    django.models.Model instance or subclass. If not provided the permission
    will be global (valid for all models and instances), unless glob=False is
    supplied, in this case the function will try to guess it from the
    permission or model parameters.
    
        # this permission is valid on all pybb.Forum(s)
        >>> set_acl(user=u, model=pybb.Forum, permission='pybb.add_forum)

        # same as the previous.
        >>> set_acl(user=u, permission='pybb.add_forum', glob=False)

        # permission valid only for the given Forum instance.
        >>> set_acl(user=u, model=Forum.objects.get(pk=1), ...)

        # global permission
        >>> set_acl(user=u, permission='pybb.add_forum')
    """
    if not ('group' in params or 'user' in params):
        raise ValueError('provide either "group" or "user"')

    app_label = model_name = None
    setting = params.pop('setting', ACL_YES)
    model = params.pop('model', None)
    if model:
        app_label, model_name = get_model_label_and_name(model)
        if isinstance(model, Model):
            params['object_id'] = model.id
        elif not issubclass(model, Model):
            raise ValueError('"model" must be a django.models.Model')

    if 'permission' in params:
        app_label, codename = params['permission'].split('.')
        model_name = codename.rsplit('_',1)[-1]
        params['permission'] = Permission.objects.get(codename=codename, 
                content_type__app_label=app_label)
    elif 'role' in params:
        params['role'] = Role.objects.get(name=params['role'])
        setting = None
    else:
        raise ValueError('provide either "permission" or "role"')

    if app_label and model_name:
        ct = ContentType.objects.get(app_label=app_label, model=model_name)
        params['content_type'] = ct
    elif not params.pop('glob', True):
        raise ValueError('glob=False must be used with permission or model')

    if 'permission' in params and setting == ACL_NO:
        ACL.objects.filter(**params).delete()
    else:
        acl = ACL.objects.get_or_create(**params)[0]
        if 'permission' in params:
            acl.setting = setting
            acl.save()

def set_role_acl(role, permissions, setting=ACL_YES):
    """
    Set permissions to setting for a given role.
    Setting defaults to ACL_YES.
    >>> set_acl(role='ForumUser', setting=ACL_YES, permissions=(
                    'pybb.view_forum', 'pybb.add_post', 'pybb.add_thread'))
    """
    def perm_from_string(string):
        app_label, codename = string.split('.')
        return Permission.objects.get(codename=codename,
                content_type__app_label=app_label)

    role = Role.objects.get_or_create(name=role)[0]
    for perm in permissions:
        rd = RoleData.objects.get_or_create(role=role,
                      permission=perm_from_string(perm))[0]
        rd.setting = setting
        rd.save()
        role.roledata_set.add(rd)

def update_permissions(a, b):
    for ct in b:
        a.setdefault(ct, dict())
        for pk in b[ct]:
            for perm, setting in b[ct][pk].iteritems():
                a[ct].setdefault(pk, dict())
                if not a[ct][pk].get(perm) is ACL_NEVER:
                    a[ct][pk][perm] = setting


# get_perm(permissions, model, perm)


def get_perm(permissions, path, perm):
    """
    >>> perms = {
    ...     None: {'pybb.view_forum': True},
    ...     {'pybb':
    ...         {'forum':   {3: 'pybb.view_forum': False}}
    ...     }
    ... }
    >>> get_perm(perms, ('pybb', 'forum', 3), 'pybb.view_forum')
    False
    >>> get_perm(perms, ('pybb', 'forum', 6), 'pybb.view_forum')
    True
    >>> get_perm(perms, (None,), 'pybb.view_forum')
    True
    >>> get_perm(perms, Forum.objects.get(pk=3), 'pybb.view_forum')
    False
    >>> get_perm(perms, Forum.objects.get(pk=5), 'pybb.view_forum')
    True
    """
    try:
        path = (path._meta.app_label,
                path._meta.object_name,
                getattr(path, 'id', None))
        get_global = True
    except AttributeError:
        get_global = False

    p_dict = permissions
    for key in path:
        try:
            p_dict = p_dict[key]
        except KeyError:
            s_perm = ACL_NO
            break
    else:
        s_perm = p_dict.get(perm, ACL_NO)
    if not s_perm == ACL_NEVER and get_global:
        g_perm = get_perm(permissions, (None,), perm)
        if s_perm == ACL_NO or g_perm == ACL_NEVER:
            return bool(g_perm)   
    return bool(s_perm)
# u.set_perm('pybb.view_forum', forum)
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.