Source

sqlalchemy / lib / sqlalchemy / testing / exclusions.py

Full commit
import operator
from nose import SkipTest
from ..util import decorator
from . import config
from .. import util
import contextlib


class skip_if(object):
    def __init__(self, predicate, reason=None):
        self.predicate = _as_predicate(predicate)
        self.reason = reason

    _fails_on = None

    @property
    def enabled(self):
        return not self.predicate()

    @contextlib.contextmanager
    def fail_if(self, name='block'):
        try:
            yield
        except Exception, ex:
            if self.predicate():
                print ("%s failed as expected (%s): %s " % (
                    name, self.predicate, str(ex)))
            else:
                raise
        else:
            if self.predicate():
                raise AssertionError(
                    "Unexpected success for '%s' (%s)" %
                    (name, self.predicate))

    def __call__(self, fn):
        @decorator
        def decorate(fn, *args, **kw):
            if self.predicate():
                if self.reason:
                    msg = "'%s' : %s" % (
                            fn.__name__,
                            self.reason
                        )
                else:
                    msg = "'%s': %s" % (
                            fn.__name__, self.predicate
                        )
                raise SkipTest(msg)
            else:
                if self._fails_on:
                    with self._fails_on.fail_if(name=fn.__name__):
                        return fn(*args, **kw)
                else:
                    return fn(*args, **kw)
        return decorate(fn)

    def fails_on(self, other, reason=None):
        self._fails_on = skip_if(other, reason)
        return self

class fails_if(skip_if):
    def __call__(self, fn):
        @decorator
        def decorate(fn, *args, **kw):
            with self.fail_if(name=fn.__name__):
                return fn(*args, **kw)
        return decorate(fn)

def only_if(predicate, reason=None):
    predicate = _as_predicate(predicate)
    return skip_if(NotPredicate(predicate), reason)

def succeeds_if(predicate, reason=None):
    predicate = _as_predicate(predicate)
    return fails_if(NotPredicate(predicate), reason)

class Predicate(object):
    @classmethod
    def as_predicate(cls, predicate):
        if isinstance(predicate, Predicate):
            return predicate
        elif isinstance(predicate, list):
            return OrPredicate([cls.as_predicate(pred) for pred in predicate])
        elif isinstance(predicate, tuple):
            return SpecPredicate(*predicate)
        elif isinstance(predicate, basestring):
            return SpecPredicate(predicate, None, None)
        elif util.callable(predicate):
            return LambdaPredicate(predicate)
        else:
            assert False, "unknown predicate type: %s" % predicate

class BooleanPredicate(Predicate):
    def __init__(self, value, description=None):
        self.value = value
        self.description = description or "boolean %s" % value

    def __call__(self):
        return self.value

    def _as_string(self, negate=False):
        if negate:
            return "not " + self.description
        else:
            return self.description

    def __str__(self):
        return self._as_string()

class SpecPredicate(Predicate):
    def __init__(self, db, op=None, spec=None, description=None):
        self.db = db
        self.op = op
        self.spec = spec
        self.description = description

    _ops = {
            '<': operator.lt,
             '>': operator.gt,
             '==': operator.eq,
             '!=': operator.ne,
             '<=': operator.le,
             '>=': operator.ge,
             'in': operator.contains,
             'between': lambda val, pair: val >= pair[0] and val <= pair[1],
             }

    def __call__(self, engine=None):
        if engine is None:
            engine = config.db

        if "+" in self.db:
            dialect, driver = self.db.split('+')
        else:
            dialect, driver = self.db, None

        if dialect and engine.name != dialect:
            return False
        if driver is not None and engine.driver != driver:
            return False

        if self.op is not None:
            assert driver is None, "DBAPI version specs not supported yet"

            version = _server_version(engine)
            oper = hasattr(self.op, '__call__') and self.op \
                        or self._ops[self.op]
            return oper(version, self.spec)
        else:
            return True

    def _as_string(self, negate=False):
        if self.description is not None:
            return self.description
        elif self.op is None:
            if negate:
                return "not %s" % self.db
            else:
                return "%s" % self.db
        else:
            if negate:
                return "not %s %s %s" % (
                        self.db,
                        self.op,
                        self.spec
                    )
            else:
                return "%s %s %s" % (
                        self.db,
                        self.op,
                        self.spec
                    )

    def __str__(self):
        return self._as_string()

class LambdaPredicate(Predicate):
    def __init__(self, lambda_, description=None, args=None, kw=None):
        self.lambda_ = lambda_
        self.args = args or ()
        self.kw = kw or {}
        if description:
            self.description = description
        elif lambda_.__doc__:
            self.description = lambda_.__doc__
        else:
            self.description = "custom function"

    def __call__(self):
        return self.lambda_(*self.args, **self.kw)

    def _as_string(self, negate=False):
        if negate:
            return "not " + self.description
        else:
            return self.description

    def __str__(self):
        return self._as_string()

class NotPredicate(Predicate):
    def __init__(self, predicate):
        self.predicate = predicate

    def __call__(self, *arg, **kw):
        return not self.predicate(*arg, **kw)

    def __str__(self):
        return self.predicate._as_string(True)

class OrPredicate(Predicate):
    def __init__(self, predicates, description=None):
        self.predicates = predicates
        self.description = description

    def __call__(self, *arg, **kw):
        for pred in self.predicates:
            if pred(*arg, **kw):
                self._str = pred
                return True
        return False

    _str = None

    def _eval_str(self, negate=False):
        if self._str is None:
            if negate:
                conjunction = " and "
            else:
                conjunction = " or "
            return conjunction.join(p._as_string(negate=negate)
                            for p in self.predicates)
        else:
            return self._str._as_string(negate=negate)

    def _negation_str(self):
        if self.description is not None:
            return "Not " + (self.description % {"spec": self._str})
        else:
            return self._eval_str(negate=True)

    def _as_string(self, negate=False):
        if negate:
            return self._negation_str()
        else:
            if self.description is not None:
                return self.description % {"spec": self._str}
            else:
                return self._eval_str()

    def __str__(self):
        return self._as_string()

_as_predicate = Predicate.as_predicate

def _is_excluded(db, op, spec):
    return SpecPredicate(db, op, spec)()

def _server_version(engine):
    """Return a server_version_info tuple."""

    # force metadata to be retrieved
    conn = engine.connect()
    version = getattr(engine.dialect, 'server_version_info', ())
    conn.close()
    return version

def db_spec(*dbs):
    return OrPredicate(
            Predicate.as_predicate(db) for db in dbs
        )

def open():
    return skip_if(BooleanPredicate(False, "mark as execute"))

def closed():
    return skip_if(BooleanPredicate(True, "marked as skip"))

@decorator
def future(fn, *args, **kw):
    return fails_if(LambdaPredicate(fn, *args, **kw), "Future feature")

def fails_on(db, reason):
    return fails_if(SpecPredicate(db), reason)

def fails_on_everything_except(*dbs):
    return succeeds_if(
                OrPredicate([
                    SpecPredicate(db) for db in dbs
                    ])
            )

def skip(db, reason):
    return skip_if(SpecPredicate(db), reason)

def only_on(dbs, reason):
    return only_if(
            OrPredicate([SpecPredicate(db) for db in util.to_list(dbs)])
    )


def exclude(db, op, spec, reason):
    return skip_if(SpecPredicate(db, op, spec), reason)


def against(*queries):
    return OrPredicate([
                Predicate.as_predicate(query)
                for query in queries
            ])()