Source

django-pobject / pobject / __init__.py

Full commit
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import logging
logging.basicConfig(
    #level = logging.DEBUG,
)

class PermissionExprException(Exception):
    def __init__(self, e, checker):
        super(PermissionExprException, self).__init__(checker, str(e))


class P(object):
    """AndExpr 'checker' wrapper that can be AND, OR or XOR. This is inspired by Q object in Django.

    Given 'checker' a <op> 'checker' b,
        if <op> is &, both checks must succeed
        if <op> is |, either one must succeed
        if <op> is ^, one must succeed and the other one must fail

    Order or evaluation is from left to right (top to bottom).

    It can also be used to execute 'checks' conditionally based on the type of the current request method. If request method
    matches the given method or when method is not provided, checker must succeed.

    For example,
        P(is_owner(...), "POST")
        is_owner is executed only if the request method is POST.

    """
    class BaseExpr(object):
        def __init__(self, p):
            self.p = p
            self.p.is_in_expr = True

    class AndExpr(BaseExpr):
        def __call__(self, last_rval, request, *args, **kwargs):
            if last_rval:
                logging.debug('AND')
                return self.p.is_successful(request, *args, **kwargs)
            return False


    class OrExpr(BaseExpr):
        def __call__(self, last_rval, request, *args, **kwargs):
            if not last_rval:
                logging.debug('OR')
                return self.p.is_successful(request, *args, **kwargs)
            return True


    class XorExpr(BaseExpr):
        def __call__(self, last_rval, request, *args, **kwargs):
            logging.debug('XOR')
            this_rval = self.p.is_successful(request, *args, **kwargs)
            return last_rval != this_rval


    def __init__(self, *checkers, **kwargs):
        self.checkers = checkers
        self.method = kwargs.pop('method', None)
        self.is_in_expr = False
        self.next_expr = None


    def chain_expr(self, expr):
        p = self
        while p:
            if p.next_expr:
                p = p.next_expr.p
            else:
                break
        p.next_expr = expr


    def __and__(self, other):
        if isinstance(other, P):
            self.chain_expr(P.AndExpr(other))
            return self
        raise ValueError


    def __or__(self, other):
        if isinstance(other, P):
            self.chain_expr(P.OrExpr(other))
            return self
        raise ValueError


    def __xor__(self, other):
        if isinstance(other, P):
            self.chain_expr(P.XorExpr(other))
            return self
        raise ValueError


    def __invert__(self):
        return Invertor(*self.checkers, method=self.method)

    def __repr__(self):
        names = []
        for checker in self.checkers:
            names.append(checker.__name__)
        
        return "P(%s, method=%s)" % (", ".join(names), self.method)


    def exec_checkers(self, request, *args, **kwargs):
        last_rval = True
        for checker in self.checkers:
            last_rval = checker(request, *args, **kwargs)
            if not last_rval:
                break
        return last_rval


    def is_successful(self, request, *args, **kwargs):
        if not self.is_in_expr:
            logging.debug('evaluating %r' % self)
        else:
            logging.debug('%r' % self)

        if not self.method or request.method == self.method:
            last_rval = self.exec_checkers(request, *args, **kwargs)
            if self.next_expr:
                return self.next_expr(last_rval, request, *args, **kwargs)

            return last_rval
        return False


class Invertor(P):
    def exec_checkers(self, request, *args, **kwargs):
        ret_val = super(Invertor, self).exec_checkers(request, *args, **kwargs)
        return not ret_val

    def __repr__(self):
        names = []
        for checker in self.checkers:
            names.append(checker.__name__)
        return "~P(%s, method=%s)" % (", ".join(names), self.method)