Source

whoosh / tests / test_queries.py

from nose.tools import assert_equal, assert_not_equal  #@UnresolvedImport

import copy

from whoosh import fields, query
from whoosh.compat import u
from whoosh.filedb.filestore import RamStorage
from whoosh.qparser import QueryParser
from whoosh.query import (And, AndMaybe, ConstantScoreQuery, Every, DateRange,
                          DisjunctionMax, FuzzyTerm, Not, NullQuery,
                          NumericRange, Or, Phrase, Prefix, Require, Term,
                          TermRange, Variations, Wildcard)
from whoosh.spans import SpanContains, SpanFirst, SpanNear, SpanNot, SpanOr


def test_all_terms():
    q = QueryParser("a", None).parse(u('hello b:there c:"my friend"'))
    ts = q.all_terms(phrases=False)
    assert_equal(sorted(ts), [("a", "hello"), ("b", "there")])
    ts = q.all_terms(phrases=True)
    assert_equal(sorted(ts), [("a", "hello"), ("b", "there"), ("c", "friend"), ("c", "my")])

def test_existing_terms():
    s = fields.Schema(key=fields.ID, value=fields.TEXT)
    ix = RamStorage().create_index(s)

    w = ix.writer()
    w.add_document(key=u("a"), value=u("alfa bravo charlie delta echo"))
    w.add_document(key=u("b"), value=u("foxtrot golf hotel india juliet"))
    w.commit()

    r = ix.reader()
    q = QueryParser("value", None).parse(u('alfa hotel tango "sierra bravo"'))

    ts = q.existing_terms(r, phrases=False)
    assert_equal(sorted(ts), [("value", "alfa"), ("value", "hotel")])

    ts = q.existing_terms(r)
    assert_equal(sorted(ts), [("value", "alfa"), ("value", "bravo"), ("value", "hotel")])

    ts = set()
    q.existing_terms(r, ts, reverse=True)
    assert_equal(sorted(ts), [("value", "sierra"), ("value", "tango")])

def test_wildcard_existing_terms():
    s = fields.Schema(key=fields.ID, value=fields.TEXT)
    ix = RamStorage().create_index(s)

    w = ix.writer()
    w.add_document(key=u("a"), value=u("alfa bravo bear charlie delta"))
    w.add_document(key=u("a"), value=u("boggle echo render rendering renders"))
    w.commit()
    r = ix.reader()
    qp = QueryParser("value", ix.schema)

    def words(terms):
        z = []
        for t in terms:
            assert t[0] == "value"
            z.append(t[1])
        return " ".join(sorted(z))

    q = qp.parse(u("b*"))
    ts = q.existing_terms(r)
    assert_equal(ts, set())
    ts = q.existing_terms(r, expand=True)
    assert_equal(words(ts), "bear boggle bravo")

    q = qp.parse(u("[a TO f]"))
    ts = q.existing_terms(r)
    assert_equal(ts, set())
    ts = q.existing_terms(r, expand=True)
    assert_equal(words(ts), "alfa bear boggle bravo charlie delta echo")

    q = query.Variations("value", "render")
    ts = q.existing_terms(r, expand=False)
    assert_equal(ts, set())
    ts = q.existing_terms(r, expand=True)
    assert_equal(words(ts), "render rendering renders")

def test_replace():
    q = And([Or([Term("a", "b"), Term("b", "c")], boost=1.2), Variations("a", "b", boost=2.0)])
    q = q.replace("a", "b", "BB")
    assert_equal(q, And([Or([Term("a", "BB"), Term("b", "c")], boost=1.2),
                         Variations("a", "BB", boost=2.0)]))

def test_apply():
    def visit(q):
        if isinstance(q, (Term, Variations, FuzzyTerm)):
            q.text = q.text.upper()
            return q
        return q.apply(visit)

    before = And([Not(Term("a", u("b"))), Variations("a", u("c")), Not(FuzzyTerm("a", u("d")))])
    after = visit(before)
    assert_equal(after, And([Not(Term("a", u("B"))), Variations("a", u("C")),
                             Not(FuzzyTerm("a", u("D")))]))

    def term2var(q):
        if isinstance(q, Term):
            return Variations(q.fieldname, q.text)
        else:
            return q.apply(term2var)

    q = And([Term("f", "alfa"), Or([Term("f", "bravo"), Not(Term("f", "charlie"))])])
    q = term2var(q)
    assert_equal(q, And([Variations('f', 'alfa'),
                         Or([Variations('f', 'bravo'), Not(Variations('f', 'charlie'))])]))

def test_accept():
    def boost_phrases(q):
        if isinstance(q, Phrase):
            q.boost *= 2.0
        return q

    before = And([Term("a", u("b")), Or([Term("c", u("d")), Phrase("a", [u("e"), u("f")])]),
                  Phrase("a", [u("g"), u("h")], boost=0.25)])
    after = before.accept(boost_phrases)
    assert_equal(after, And([Term("a", u("b")),
                             Or([Term("c", u("d")), Phrase("a", [u("e"), u("f")], boost=2.0)]),
                             Phrase("a", [u("g"), u("h")], boost=0.5)]))

    before = Phrase("a", [u("b"), u("c")], boost=2.5)
    after = before.accept(boost_phrases)
    assert_equal(after, Phrase("a", [u("b"), u("c")], boost=5.0))

def test_simplify():
    s = fields.Schema(k=fields.ID, v=fields.TEXT)
    ix = RamStorage().create_index(s)

    w = ix.writer()
    w.add_document(k=u("1"), v=u("aardvark apple allan alfa bear bee"))
    w.add_document(k=u("2"), v=u("brie glue geewhiz goop julia"))
    w.commit()

    r = ix.reader()
    q1 = And([Prefix("v", "b", boost=2.0), Term("v", "juliet")])
    q2 = And([Or([Term('v', u('bear'), boost=2.0), Term('v', u('bee'), boost=2.0),
                  Term('v', u('brie'), boost=2.0)]), Term('v', 'juliet')])
    assert_equal(q1.simplify(r), q2)

def test_merge_ranges():
    q = And([TermRange("f1", u("a"), None), TermRange("f1", None, u("z"))])
    assert_equal(q.normalize(), TermRange("f1", u("a"), u("z")))

    q = And([NumericRange("f1", None, u("aaaaa")), NumericRange("f1", u("zzzzz"), None)])
    assert_equal(q.normalize(), q)

    q = And([TermRange("f1", u("a"), u("z")), TermRange("f1", "b", "x")])
    assert_equal(q.normalize(), TermRange("f1", u("a"), u("z")))

    q = And([TermRange("f1", u("a"), u("m")), TermRange("f1", u("f"), u("q"))])
    assert_equal(q.normalize(), TermRange("f1", u("f"), u("m")))

    q = Or([TermRange("f1", u("a"), u("m")), TermRange("f1", u("f"), u("q"))])
    assert_equal(q.normalize(), TermRange("f1", u("a"), u("q")))

    q = Or([TermRange("f1", u("m"), None), TermRange("f1", None, u("n"))])
    assert_equal(q.normalize(), Every("f1"))

    q = And([Every("f1"), Term("f1", "a"), Variations("f1", "b")])
    assert_equal(q.normalize(), Every("f1"))

    q = Or([Term("f1", u("q")), TermRange("f1", u("m"), None), TermRange("f1", None, u("n"))])
    assert_equal(q.normalize(), Every("f1"))

    q = And([Or([Term("f1", u("a")), Term("f1", u("b"))]), Every("f1")])
    assert_equal(q.normalize(), Every("f1"))

    q = And([Term("f1", u("a")), And([Or([Every("f1")])])])
    assert_equal(q.normalize(), Every("f1"))

def test_normalize_compound():
    def oq():
        return Or([Term("a", u("a")), Term("a", u("b"))])
    def nq(level):
        if level == 0:
            return oq()
        else:
            return Or([nq(level - 1), nq(level - 1), nq(level - 1)])

    q = nq(7)
    q = q.normalize()
    assert_equal(q, Or([Term("a", u("a")), Term("a", u("b"))]))

def test_duplicates():
    q = And([Term("a", u("b")), Term("a", u("b"))])
    assert_equal(q.normalize(), Term("a", u("b")))

    q = And([Prefix("a", u("b")), Prefix("a", u("b"))])
    assert_equal(q.normalize(), Prefix("a", u("b")))

    q = And([Variations("a", u("b")), And([Variations("a", u("b")), Term("a", u("b"))])])
    assert_equal(q.normalize(), And([Variations("a", u("b")), Term("a", u("b"))]))

    q = And([Term("a", u("b")), Prefix("a", u("b")), Term("a", u("b"), boost=1.1)])
    assert_equal(q.normalize(), q)

    # Wildcard without * or ? normalizes to Term
    q = And([Wildcard("a", u("b")), And([Wildcard("a", u("b")), Term("a", u("b"))])])
    assert_equal(q.normalize(), Term("a", u("b")))


# TODO: FIX THIS
def test_query_copy_hash():
    def do(q1, q2):
        q1a = copy.deepcopy(q1)
        assert_equal(q1, q1a)
        assert_equal(hash(q1), hash(q1a))
        assert_not_equal(q1, q2)

    do(Term("a", u("b"), boost=1.1), Term("a", u("b"), boost=1.5))
    do(And([Term("a", u("b")), Term("c", u("d"))], boost=1.1),
       And([Term("a", u("b")), Term("c", u("d"))], boost=1.5))
    do(Or([Term("a", u("b"), boost=1.1), Term("c", u("d"))]),
       Or([Term("a", u("b"), boost=1.8), Term("c", u("d"))], boost=1.5))
    do(DisjunctionMax([Term("a", u("b"), boost=1.8), Term("c", u("d"))]),
       DisjunctionMax([Term("a", u("b"), boost=1.1), Term("c", u("d"))], boost=1.5))
    do(Not(Term("a", u("b"), boost=1.1)), Not(Term("a", u("b"), boost=1.5)))
    do(Prefix("a", u("b"), boost=1.1), Prefix("a", u("b"), boost=1.5))
    do(Wildcard("a", u("b*x?"), boost=1.1), Wildcard("a", u("b*x?"), boost=1.5))
    do(FuzzyTerm("a", u("b"), constantscore=True),
       FuzzyTerm("a", u("b"), constantscore=False))
    do(FuzzyTerm("a", u("b"), boost=1.1), FuzzyTerm("a", u("b"), boost=1.5))
    do(TermRange("a", u("b"), u("c")), TermRange("a", u("b"), u("d")))
    do(TermRange("a", None, u("c")), TermRange("a", None, None))
    do(TermRange("a", u("b"), u("c"), boost=1.1),
       TermRange("a", u("b"), u("c"), boost=1.5))
    do(TermRange("a", u("b"), u("c"), constantscore=True),
       TermRange("a", u("b"), u("c"), constantscore=False))
    do(NumericRange("a", 1, 5), NumericRange("a", 1, 6))
    do(NumericRange("a", None, 5), NumericRange("a", None, None))
    do(NumericRange("a", 3, 6, boost=1.1), NumericRange("a", 3, 6, boost=1.5))
    do(NumericRange("a", 3, 6, constantscore=True),
       NumericRange("a", 3, 6, constantscore=False))
    # do(DateRange)
    do(Variations("a", u("render")), Variations("a", u("renders")))
    do(Variations("a", u("render"), boost=1.1),
       Variations("a", u("renders"), boost=1.5))
    do(Phrase("a", [u("b"), u("c"), u("d")]), Phrase("a", [u("b"), u("c"), u("e")]))
    do(Phrase("a", [u("b"), u("c"), u("d")], boost=1.1),
       Phrase("a", [u("b"), u("c"), u("d")], boost=1.5))
    do(Phrase("a", [u("b"), u("c"), u("d")], slop=1),
       Phrase("a", [u("b"), u("c"), u("d")], slop=2))
    # do(Ordered)
    do(Every(), Every("a"))
    do(Every("a"), Every("b"))
    do(Every("a", boost=1.1), Every("a", boost=1.5))
    do(NullQuery, Term("a", u("b")))
    do(ConstantScoreQuery(Term("a", u("b"))), ConstantScoreQuery(Term("a", u("c"))))
    do(ConstantScoreQuery(Term("a", u("b")), score=2.0),
       ConstantScoreQuery(Term("a", u("c")), score=2.1))
    do(Require(Term("a", u("b")), Term("c", u("d"))),
       Require(Term("a", u("b"), boost=1.1), Term("c", u("d"))))
    # do(Require)
    # do(AndMaybe)
    # do(AndNot)
    # do(Otherwise)

    do(SpanFirst(Term("a", u("b")), limit=1), SpanFirst(Term("a", u("b")), limit=2))
    do(SpanNear(Term("a", u("b")), Term("c", u("d"))),
       SpanNear(Term("a", u("b")), Term("c", u("e"))))
    do(SpanNear(Term("a", u("b")), Term("c", u("d")), slop=1),
       SpanNear(Term("a", u("b")), Term("c", u("d")), slop=2))
    do(SpanNear(Term("a", u("b")), Term("c", u("d")), mindist=1),
       SpanNear(Term("a", u("b")), Term("c", u("d")), mindist=2))
    do(SpanNear(Term("a", u("b")), Term("c", u("d")), ordered=True),
       SpanNear(Term("a", u("b")), Term("c", u("d")), ordered=False))
    do(SpanNot(Term("a", u("b")), Term("a", u("c"))),
       SpanNot(Term("a", u("b")), Term("a", u("d"))))
    do(SpanOr([Term("a", u("b")), Term("a", u("c")), Term("a", u("d"))]),
       SpanOr([Term("a", u("b")), Term("a", u("c")), Term("a", u("e"))]))
    do(SpanContains(Term("a", u("b")), Term("a", u("c"))),
       SpanContains(Term("a", u("b")), Term("a", u("d"))))
    # do(SpanBefore)
    # do(SpanCondition)

def test_requires():
    a = Term("f", u("a"))
    b = Term("f", u("b"))
    assert_equal(And([a, b]).requires(), set([a, b]))
    assert_equal(Or([a, b]).requires(), set())
    assert_equal(AndMaybe(a, b).requires(), set([a]))
    assert_equal(a.requires(), set([a]))

def test_highlight_daterange():
    from datetime import datetime

    schema = fields.Schema(id=fields.ID(unique=True, stored=True),
                           title=fields.TEXT(stored=True),
                           content=fields.TEXT(stored=True),
                           released=fields.DATETIME(stored=True))
    ix = RamStorage().create_index(schema)

    w = ix.writer()
    w.update_document(
        id=u('1'),
        title=u('Life Aquatic'),
        content=u('A nautic film crew sets out to kill a gigantic shark.'),
        released=datetime(2004, 12, 25)
    )
    w.update_document(
        id=u('2'),
        title=u('Darjeeling Limited'),
        content=u('Three brothers meet in India for a life changing train journey.'),
        released=datetime(2007, 10, 27)
    )
    w.commit()

    s = ix.searcher()
    r = s.search(Term('content', u('train')), terms=True)
    assert_equal(len(r), 1)
    assert_equal(r[0]["id"], "2")
    assert_equal(r[0].highlights("content"), 'for a life changing <b class="match term0">train</b> journey')

    r = s.search(DateRange('released', datetime(2007, 1, 1), None))
    assert_equal(len(r), 1)
    assert_equal(r[0].highlights("content"), '')