whoosh / tests / test_queries.py

from nose.tools import assert_equal, assert_not_equal  #@UnresolvedImport

import copy

from whoosh import fields, query
from whoosh.compat import u
from whoosh.filedb.filestore import RamStorage
from whoosh.qparser import QueryParser
from whoosh.query import (And, AndMaybe, ConstantScoreQuery, Every, DateRange,
                          DisjunctionMax, FuzzyTerm, Not, NullQuery,
                          NumericRange, Or, Phrase, Prefix, Require, Term,
                          TermRange, Variations, Wildcard)
from whoosh.spans import SpanContains, SpanFirst, SpanNear, SpanNot, SpanOr


def test_all_terms():
    q = QueryParser("a", None).parse(u('hello b:there c:"my friend"'))
    ts = q.all_terms(phrases=False)
    assert_equal(sorted(ts), [("a", "hello"), ("b", "there")])
    ts = q.all_terms(phrases=True)
    assert_equal(sorted(ts), [("a", "hello"), ("b", "there"), ("c", "friend"), ("c", "my")])

def test_existing_terms():
    s = fields.Schema(key=fields.ID, value=fields.TEXT)
    ix = RamStorage().create_index(s)

    w = ix.writer()
    w.add_document(key=u("a"), value=u("alfa bravo charlie delta echo"))
    w.add_document(key=u("b"), value=u("foxtrot golf hotel india juliet"))
    w.commit()

    r = ix.reader()
    q = QueryParser("value", None).parse(u('alfa hotel tango "sierra bravo"'))

    ts = q.existing_terms(r, phrases=False)
    assert_equal(sorted(ts), [("value", "alfa"), ("value", "hotel")])

    ts = q.existing_terms(r)
    assert_equal(sorted(ts), [("value", "alfa"), ("value", "bravo"), ("value", "hotel")])

    ts = set()
    q.existing_terms(r, ts, reverse=True)
    assert_equal(sorted(ts), [("value", "sierra"), ("value", "tango")])

def test_wildcard_existing_terms():
    s = fields.Schema(key=fields.ID, value=fields.TEXT)
    ix = RamStorage().create_index(s)

    w = ix.writer()
    w.add_document(key=u("a"), value=u("alfa bravo bear charlie delta"))
    w.add_document(key=u("a"), value=u("boggle echo render rendering renders"))
    w.commit()
    r = ix.reader()
    qp = QueryParser("value", ix.schema)

    def words(terms):
        z = []
        for t in terms:
            assert t[0] == "value"
            z.append(t[1])
        return " ".join(sorted(z))

    q = qp.parse(u("b*"))
    ts = q.existing_terms(r)
    assert_equal(ts, set())
    ts = q.existing_terms(r, expand=True)
    assert_equal(words(ts), "bear boggle bravo")

    q = qp.parse(u("[a TO f]"))
    ts = q.existing_terms(r)
    assert_equal(ts, set())
    ts = q.existing_terms(r, expand=True)
    assert_equal(words(ts), "alfa bear boggle bravo charlie delta echo")

    q = query.Variations("value", "render")
    ts = q.existing_terms(r, expand=False)
    assert_equal(ts, set())
    ts = q.existing_terms(r, expand=True)
    assert_equal(words(ts), "render rendering renders")

def test_replace():
    q = And([Or([Term("a", "b"), Term("b", "c")], boost=1.2), Variations("a", "b", boost=2.0)])
    q = q.replace("a", "b", "BB")
    assert_equal(q, And([Or([Term("a", "BB"), Term("b", "c")], boost=1.2),
                         Variations("a", "BB", boost=2.0)]))

def test_apply():
    def visit(q):
        if isinstance(q, (Term, Variations, FuzzyTerm)):
            q.text = q.text.upper()
            return q
        return q.apply(visit)

    before = And([Not(Term("a", u("b"))), Variations("a", u("c")), Not(FuzzyTerm("a", u("d")))])
    after = visit(before)
    assert_equal(after, And([Not(Term("a", u("B"))), Variations("a", u("C")),
                             Not(FuzzyTerm("a", u("D")))]))

    def term2var(q):
        if isinstance(q, Term):
            return Variations(q.fieldname, q.text)
        else:
            return q.apply(term2var)

    q = And([Term("f", "alfa"), Or([Term("f", "bravo"), Not(Term("f", "charlie"))])])
    q = term2var(q)
    assert_equal(q, And([Variations('f', 'alfa'),
                         Or([Variations('f', 'bravo'), Not(Variations('f', 'charlie'))])]))

def test_accept():
    def boost_phrases(q):
        if isinstance(q, Phrase):
            q.boost *= 2.0
        return q

    before = And([Term("a", u("b")), Or([Term("c", u("d")), Phrase("a", [u("e"), u("f")])]),
                  Phrase("a", [u("g"), u("h")], boost=0.25)])
    after = before.accept(boost_phrases)
    assert_equal(after, And([Term("a", u("b")),
                             Or([Term("c", u("d")), Phrase("a", [u("e"), u("f")], boost=2.0)]),
                             Phrase("a", [u("g"), u("h")], boost=0.5)]))

    before = Phrase("a", [u("b"), u("c")], boost=2.5)
    after = before.accept(boost_phrases)
    assert_equal(after, Phrase("a", [u("b"), u("c")], boost=5.0))

def test_simplify():
    s = fields.Schema(k=fields.ID, v=fields.TEXT)
    ix = RamStorage().create_index(s)

    w = ix.writer()
    w.add_document(k=u("1"), v=u("aardvark apple allan alfa bear bee"))
    w.add_document(k=u("2"), v=u("brie glue geewhiz goop julia"))
    w.commit()

    r = ix.reader()
    q1 = And([Prefix("v", "b", boost=2.0), Term("v", "juliet")])
    q2 = And([Or([Term('v', u('bear'), boost=2.0), Term('v', u('bee'), boost=2.0),
                  Term('v', u('brie'), boost=2.0)]), Term('v', 'juliet')])
    assert_equal(q1.simplify(r), q2)

def test_merge_ranges():
    q = And([TermRange("f1", u("a"), None), TermRange("f1", None, u("z"))])
    assert_equal(q.normalize(), TermRange("f1", u("a"), u("z")))

    q = And([NumericRange("f1", None, u("aaaaa")), NumericRange("f1", u("zzzzz"), None)])
    assert_equal(q.normalize(), q)

    q = And([TermRange("f1", u("a"), u("z")), TermRange("f1", "b", "x")])
    assert_equal(q.normalize(), TermRange("f1", u("a"), u("z")))

    q = And([TermRange("f1", u("a"), u("m")), TermRange("f1", u("f"), u("q"))])
    assert_equal(q.normalize(), TermRange("f1", u("f"), u("m")))

    q = Or([TermRange("f1", u("a"), u("m")), TermRange("f1", u("f"), u("q"))])
    assert_equal(q.normalize(), TermRange("f1", u("a"), u("q")))

    q = Or([TermRange("f1", u("m"), None), TermRange("f1", None, u("n"))])
    assert_equal(q.normalize(), Every("f1"))

    q = And([Every("f1"), Term("f1", "a"), Variations("f1", "b")])
    assert_equal(q.normalize(), Every("f1"))

    q = Or([Term("f1", u("q")), TermRange("f1", u("m"), None), TermRange("f1", None, u("n"))])
    assert_equal(q.normalize(), Every("f1"))

    q = And([Or([Term("f1", u("a")), Term("f1", u("b"))]), Every("f1")])
    assert_equal(q.normalize(), Every("f1"))

    q = And([Term("f1", u("a")), And([Or([Every("f1")])])])
    assert_equal(q.normalize(), Every("f1"))

def test_normalize_compound():
    def oq():
        return Or([Term("a", u("a")), Term("a", u("b"))])
    def nq(level):
        if level == 0:
            return oq()
        else:
            return Or([nq(level - 1), nq(level - 1), nq(level - 1)])

    q = nq(7)
    q = q.normalize()
    assert_equal(q, Or([Term("a", u("a")), Term("a", u("b"))]))

def test_duplicates():
    q = And([Term("a", u("b")), Term("a", u("b"))])
    assert_equal(q.normalize(), Term("a", u("b")))

    q = And([Prefix("a", u("b")), Prefix("a", u("b"))])
    assert_equal(q.normalize(), Prefix("a", u("b")))

    q = And([Variations("a", u("b")), And([Variations("a", u("b")), Term("a", u("b"))])])
    assert_equal(q.normalize(), And([Variations("a", u("b")), Term("a", u("b"))]))

    q = And([Term("a", u("b")), Prefix("a", u("b")), Term("a", u("b"), boost=1.1)])
    assert_equal(q.normalize(), q)

    # Wildcard without * or ? normalizes to Term
    q = And([Wildcard("a", u("b")), And([Wildcard("a", u("b")), Term("a", u("b"))])])
    assert_equal(q.normalize(), Term("a", u("b")))


# TODO: FIX THIS
def test_query_copy_hash():
    def do(q1, q2):
        q1a = copy.deepcopy(q1)
        assert_equal(q1, q1a)
        assert_equal(hash(q1), hash(q1a))
        assert_not_equal(q1, q2)

    do(Term("a", u("b"), boost=1.1), Term("a", u("b"), boost=1.5))
    do(And([Term("a", u("b")), Term("c", u("d"))], boost=1.1),
       And([Term("a", u("b")), Term("c", u("d"))], boost=1.5))
    do(Or([Term("a", u("b"), boost=1.1), Term("c", u("d"))]),
       Or([Term("a", u("b"), boost=1.8), Term("c", u("d"))], boost=1.5))
    do(DisjunctionMax([Term("a", u("b"), boost=1.8), Term("c", u("d"))]),
       DisjunctionMax([Term("a", u("b"), boost=1.1), Term("c", u("d"))], boost=1.5))
    do(Not(Term("a", u("b"), boost=1.1)), Not(Term("a", u("b"), boost=1.5)))
    do(Prefix("a", u("b"), boost=1.1), Prefix("a", u("b"), boost=1.5))
    do(Wildcard("a", u("b*x?"), boost=1.1), Wildcard("a", u("b*x?"), boost=1.5))
    do(FuzzyTerm("a", u("b"), constantscore=True),
       FuzzyTerm("a", u("b"), constantscore=False))
    do(FuzzyTerm("a", u("b"), boost=1.1), FuzzyTerm("a", u("b"), boost=1.5))
    do(TermRange("a", u("b"), u("c")), TermRange("a", u("b"), u("d")))
    do(TermRange("a", None, u("c")), TermRange("a", None, None))
    do(TermRange("a", u("b"), u("c"), boost=1.1),
       TermRange("a", u("b"), u("c"), boost=1.5))
    do(TermRange("a", u("b"), u("c"), constantscore=True),
       TermRange("a", u("b"), u("c"), constantscore=False))
    do(NumericRange("a", 1, 5), NumericRange("a", 1, 6))
    do(NumericRange("a", None, 5), NumericRange("a", None, None))
    do(NumericRange("a", 3, 6, boost=1.1), NumericRange("a", 3, 6, boost=1.5))
    do(NumericRange("a", 3, 6, constantscore=True),
       NumericRange("a", 3, 6, constantscore=False))
    # do(DateRange)
    do(Variations("a", u("render")), Variations("a", u("renders")))
    do(Variations("a", u("render"), boost=1.1),
       Variations("a", u("renders"), boost=1.5))
    do(Phrase("a", [u("b"), u("c"), u("d")]), Phrase("a", [u("b"), u("c"), u("e")]))
    do(Phrase("a", [u("b"), u("c"), u("d")], boost=1.1),
       Phrase("a", [u("b"), u("c"), u("d")], boost=1.5))
    do(Phrase("a", [u("b"), u("c"), u("d")], slop=1),
       Phrase("a", [u("b"), u("c"), u("d")], slop=2))
    # do(Ordered)
    do(Every(), Every("a"))
    do(Every("a"), Every("b"))
    do(Every("a", boost=1.1), Every("a", boost=1.5))
    do(NullQuery, Term("a", u("b")))
    do(ConstantScoreQuery(Term("a", u("b"))), ConstantScoreQuery(Term("a", u("c"))))
    do(ConstantScoreQuery(Term("a", u("b")), score=2.0),
       ConstantScoreQuery(Term("a", u("c")), score=2.1))
    do(Require(Term("a", u("b")), Term("c", u("d"))),
       Require(Term("a", u("b"), boost=1.1), Term("c", u("d"))))
    # do(Require)
    # do(AndMaybe)
    # do(AndNot)
    # do(Otherwise)

    do(SpanFirst(Term("a", u("b")), limit=1), SpanFirst(Term("a", u("b")), limit=2))
    do(SpanNear(Term("a", u("b")), Term("c", u("d"))),
       SpanNear(Term("a", u("b")), Term("c", u("e"))))
    do(SpanNear(Term("a", u("b")), Term("c", u("d")), slop=1),
       SpanNear(Term("a", u("b")), Term("c", u("d")), slop=2))
    do(SpanNear(Term("a", u("b")), Term("c", u("d")), mindist=1),
       SpanNear(Term("a", u("b")), Term("c", u("d")), mindist=2))
    do(SpanNear(Term("a", u("b")), Term("c", u("d")), ordered=True),
       SpanNear(Term("a", u("b")), Term("c", u("d")), ordered=False))
    do(SpanNot(Term("a", u("b")), Term("a", u("c"))),
       SpanNot(Term("a", u("b")), Term("a", u("d"))))
    do(SpanOr([Term("a", u("b")), Term("a", u("c")), Term("a", u("d"))]),
       SpanOr([Term("a", u("b")), Term("a", u("c")), Term("a", u("e"))]))
    do(SpanContains(Term("a", u("b")), Term("a", u("c"))),
       SpanContains(Term("a", u("b")), Term("a", u("d"))))
    # do(SpanBefore)
    # do(SpanCondition)

def test_requires():
    a = Term("f", u("a"))
    b = Term("f", u("b"))
    assert_equal(And([a, b]).requires(), set([a, b]))
    assert_equal(Or([a, b]).requires(), set())
    assert_equal(AndMaybe(a, b).requires(), set([a]))
    assert_equal(a.requires(), set([a]))

def test_highlight_daterange():
    from datetime import datetime

    schema = fields.Schema(id=fields.ID(unique=True, stored=True),
                           title=fields.TEXT(stored=True),
                           content=fields.TEXT(stored=True),
                           released=fields.DATETIME(stored=True))
    ix = RamStorage().create_index(schema)

    w = ix.writer()
    w.update_document(
        id=u('1'),
        title=u('Life Aquatic'),
        content=u('A nautic film crew sets out to kill a gigantic shark.'),
        released=datetime(2004, 12, 25)
    )
    w.update_document(
        id=u('2'),
        title=u('Darjeeling Limited'),
        content=u('Three brothers meet in India for a life changing train journey.'),
        released=datetime(2007, 10, 27)
    )
    w.commit()

    s = ix.searcher()
    r = s.search(Term('content', u('train')), terms=True)
    assert_equal(len(r), 1)
    assert_equal(r[0]["id"], "2")
    assert_equal(r[0].highlights("content"), 'for a life changing <b class="match term0">train</b> journey')

    r = s.search(DateRange('released', datetime(2007, 1, 1), None))
    assert_equal(len(r), 1)
    assert_equal(r[0].highlights("content"), '')
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.