Source

whoosh / tests / test_nested.py

from __future__ import with_statement

from whoosh import fields, query, sorting
from whoosh.compat import u
from whoosh.filedb.filestore import RamStorage


def test_nested_parent():
    schema = fields.Schema(name=fields.ID(stored=True), type=fields.ID,
                           part=fields.ID, price=fields.NUMERIC)
    ix = RamStorage().create_index(schema)
    with ix.writer() as w:
        with w.group():
            w.add_document(name=u("iPad"), type=u("product"))
            w.add_document(part=u("screen"), price=100)
            w.add_document(part=u("battery"), price=50)
            w.add_document(part=u("case"), price=20)

        with w.group():
            w.add_document(name=u("iPhone"), type=u("product"))
            w.add_document(part=u("screen"), price=60)
            w.add_document(part=u("battery"), price=30)
            w.add_document(part=u("case"), price=10)

        with w.group():
            w.add_document(name=u("Mac mini"), type=u("product"))
            w.add_document(part=u("hard drive"), price=50)
            w.add_document(part=u("case"), price=50)

    with ix.searcher() as s:
        price = s.schema["price"]

        pq = query.Term("type", "product")
        cq = query.Term("price", 50)
        q = query.NestedParent(pq, cq)

        r = s.search(q)
        assert sorted([hit["name"] for hit in r]) == ["Mac mini", "iPad"]


def test_scoring():
    schema = fields.Schema(kind=fields.ID,
                           name=fields.KEYWORD(scorable=True, stored=True))
    ix = RamStorage().create_index(schema)
    with ix.writer() as w:
        with w.group():
            w.add_document(kind=u("class"), name=u("Index"))
            w.add_document(kind=u("method"), name=u("add document"))
            w.add_document(kind=u("method"), name=u("add reader"))
            w.add_document(kind=u("method"), name=u("close"))
        with w.group():
            w.add_document(kind=u("class"), name=u("Accumulator"))
            w.add_document(kind=u("method"), name=u("add"))
            w.add_document(kind=u("method"), name=u("get result"))
        with w.group():
            w.add_document(kind=u("class"), name=u("Calculator"))
            w.add_document(kind=u("method"), name=u("add"))
            w.add_document(kind=u("method"), name=u("add all"))
            w.add_document(kind=u("method"), name=u("add some"))
            w.add_document(kind=u("method"), name=u("multiply"))
            w.add_document(kind=u("method"), name=u("close"))

    with ix.searcher() as s:
        q = query.NestedParent(query.Term("kind", "class"),
                               query.Term("name", "add"))
        r = s.search(q)
        assert [hit["name"] for hit in r] == ["Calculator", "Index", "Accumulator"]


def test_missing():
    schema = fields.Schema(kind=fields.ID,
                           name=fields.KEYWORD(scorable=True, stored=True))
    ix = RamStorage().create_index(schema)
    with ix.writer() as w:
        with w.group():
            w.add_document(kind=u("class"), name=u("Index"))
            w.add_document(kind=u("method"), name=u("add document"))
            w.add_document(kind=u("method"), name=u("add reader"))
            w.add_document(kind=u("method"), name=u("close"))
        with w.group():
            w.add_document(kind=u("class"), name=u("Accumulator"))
            w.add_document(kind=u("method"), name=u("add"))
            w.add_document(kind=u("method"), name=u("get result"))
        with w.group():
            w.add_document(kind=u("class"), name=u("Calculator"))
            w.add_document(kind=u("method"), name=u("add"))
            w.add_document(kind=u("method"), name=u("add all"))
            w.add_document(kind=u("method"), name=u("add some"))
            w.add_document(kind=u("method"), name=u("multiply"))
            w.add_document(kind=u("method"), name=u("close"))
        with w.group():
            w.add_document(kind=u("class"), name=u("Deleter"))
            w.add_document(kind=u("method"), name=u("add"))
            w.add_document(kind=u("method"), name=u("delete"))

    with ix.searcher() as s:
        q = query.NestedParent(query.Term("kind", "class"),
                               query.Term("name", "add"))

        r = s.search(q)
        assert [hit["name"] for hit in r] == ["Calculator", "Index", "Accumulator", "Deleter"]

    with ix.writer() as w:
        w.delete_by_term("name", "Accumulator")
        w.delete_by_term("name", "Calculator")

    with ix.searcher() as s:
        pq = query.Term("kind", "class")
        assert len(list(pq.docs(s))) == 2
        q = query.NestedParent(pq, query.Term("name", "add"))
        r = s.search(q)
        assert [hit["name"] for hit in r] == ["Index", "Deleter"]


def test_nested_delete():
    schema = fields.Schema(kind=fields.ID,
                           name=fields.KEYWORD(scorable=True, stored=True))
    ix = RamStorage().create_index(schema)
    with ix.writer() as w:
        with w.group():
            w.add_document(kind=u("class"), name=u("Index"))
            w.add_document(kind=u("method"), name=u("add document"))
            w.add_document(kind=u("method"), name=u("add reader"))
            w.add_document(kind=u("method"), name=u("close"))
        with w.group():
            w.add_document(kind=u("class"), name=u("Accumulator"))
            w.add_document(kind=u("method"), name=u("add"))
            w.add_document(kind=u("method"), name=u("get result"))
        with w.group():
            w.add_document(kind=u("class"), name=u("Calculator"))
            w.add_document(kind=u("method"), name=u("add"))
            w.add_document(kind=u("method"), name=u("add all"))
            w.add_document(kind=u("method"), name=u("add some"))
            w.add_document(kind=u("method"), name=u("multiply"))
            w.add_document(kind=u("method"), name=u("close"))
        with w.group():
            w.add_document(kind=u("class"), name=u("Deleter"))
            w.add_document(kind=u("method"), name=u("add"))
            w.add_document(kind=u("method"), name=u("delete"))

    # Delete "Accumulator" class
    with ix.writer() as w:
        q = query.NestedParent(query.Term("kind", "class"),
                               query.Term("name", "Accumulator"))
        w.delete_by_query(q)

    # Check that Accumulator AND ITS METHODS are deleted
    with ix.searcher() as s:
        r = s.search(query.Term("kind", "class"))
        assert sorted(hit["name"] for hit in r) == ["Calculator", "Deleter", "Index"]

        names = [fs["name"] for _, fs in s.iter_docs()]
        assert names == ["Index", "add document", "add reader", "close",
                         "Calculator", "add", "add all", "add some",
                         "multiply", "close", "Deleter", "add", "delete"]

    # Delete any class with a close method
    with ix.writer() as w:
        q = query.NestedParent(query.Term("kind", "class"),
                               query.Term("name", "close"))
        w.delete_by_query(q)

    # Check the CLASSES AND METHODS are gone
    with ix.searcher() as s:
        names = [fs["name"] for _, fs in s.iter_docs()]
        assert names == ["Deleter", "add", "delete"]


def test_all_parents_deleted():
    schema = fields.Schema(kind=fields.ID,
                           name=fields.KEYWORD(scorable=True, stored=True))
    ix = RamStorage().create_index(schema)
    with ix.writer() as w:
        with w.group():
            w.add_document(kind=u("class"), name=u("Index"))
            w.add_document(kind=u("method"), name=u("add document"))
            w.add_document(kind=u("method"), name=u("add reader"))
            w.add_document(kind=u("method"), name=u("close"))
        with w.group():
            w.add_document(kind=u("class"), name=u("Accumulator"))
            w.add_document(kind=u("method"), name=u("add"))
            w.add_document(kind=u("method"), name=u("get result"))
        with w.group():
            w.add_document(kind=u("class"), name=u("Calculator"))
            w.add_document(kind=u("method"), name=u("add"))
            w.add_document(kind=u("method"), name=u("add all"))
            w.add_document(kind=u("method"), name=u("add some"))
            w.add_document(kind=u("method"), name=u("multiply"))
            w.add_document(kind=u("method"), name=u("close"))
        with w.group():
            w.add_document(kind=u("class"), name=u("Deleter"))
            w.add_document(kind=u("method"), name=u("add"))
            w.add_document(kind=u("method"), name=u("delete"))

    with ix.writer() as w:
        w.delete_by_term("name", "Index")
        w.delete_by_term("name", "Accumulator")
        w.delete_by_term("name", "Calculator")
        w.delete_by_term("name", "Deleter")

    with ix.searcher() as s:
        q = query.NestedParent(query.Term("kind", "class"),
                               query.Term("name", "add"))
        r = s.search(q)
        assert r.is_empty()


def test_everything_is_a_parent():
    schema = fields.Schema(id=fields.STORED, kind=fields.ID,
                           name=fields.ID(stored=True))
    k = u("alfa")
    ix = RamStorage().create_index(schema)
    with ix.writer() as w:
        w.add_document(id=0, kind=k, name=u("one"))
        w.add_document(id=1, kind=k, name=u("two"))
        w.add_document(id=2, kind=k, name=u("three"))
        w.add_document(id=3, kind=k, name=u("four"))
        w.add_document(id=4, kind=k, name=u("one"))
        w.add_document(id=5, kind=k, name=u("two"))
        w.add_document(id=6, kind=k, name=u("three"))
        w.add_document(id=7, kind=k, name=u("four"))
        w.add_document(id=8, kind=k, name=u("one"))
        w.add_document(id=9, kind=k, name=u("two"))
        w.add_document(id=10, kind=k, name=u("three"))
        w.add_document(id=11, kind=k, name=u("four"))

    with ix.searcher() as s:
        pq = query.Term("kind", k)
        cq = query.Or([query.Term("name", "two"), query.Term("name", "four")])
        q = query.NestedParent(pq, cq)
        r = s.search(q)
        assert [hit["id"] for hit in r] == [1, 3, 5, 7, 9, 11]


def test_no_parents():
    schema = fields.Schema(id=fields.STORED, kind=fields.ID,
                           name=fields.ID(stored=True))
    k = u("alfa")
    ix = RamStorage().create_index(schema)
    with ix.writer() as w:
        w.add_document(id=0, kind=k, name=u("one"))
        w.add_document(id=1, kind=k, name=u("two"))
        w.add_document(id=2, kind=k, name=u("three"))
        w.add_document(id=3, kind=k, name=u("four"))
        w.add_document(id=4, kind=k, name=u("one"))
        w.add_document(id=5, kind=k, name=u("two"))
        w.add_document(id=6, kind=k, name=u("three"))
        w.add_document(id=7, kind=k, name=u("four"))
        w.add_document(id=8, kind=k, name=u("one"))
        w.add_document(id=9, kind=k, name=u("two"))
        w.add_document(id=10, kind=k, name=u("three"))
        w.add_document(id=11, kind=k, name=u("four"))

    with ix.searcher() as s:
        pq = query.Term("kind", "bravo")
        cq = query.Or([query.Term("name", "two"), query.Term("name", "four")])
        q = query.NestedParent(pq, cq)
        r = s.search(q)
        assert r.is_empty()


def test_nested_children():
    schema = fields.Schema(t=fields.ID(stored=True),
                           track=fields.NUMERIC(stored=True),
                           album_name=fields.TEXT(stored=True),
                           song_name=fields.TEXT(stored=True))
    ix = RamStorage().create_index(schema)
    with ix.writer() as w:
        with w.group():
            w.add_document(t=u("album"), album_name=u("alfa bravo charlie"))
            w.add_document(t=u("track"), track=1,
                           song_name=u("delta echo foxtrot"))
            w.add_document(t=u("track"), track=2,
                           song_name=u("golf hotel india"))
            w.add_document(t=u("track"), track=3,
                           song_name=u("juliet kilo lima"))
        with w.group():
            w.add_document(t=u("album"), album_name=u("mike november oskar"))
            w.add_document(t=u("track"), track=1,
                           song_name=u("papa quebec romeo"))
            w.add_document(t=u("track"), track=2,
                           song_name=u("sierra tango ultra"))
            w.add_document(t=u("track"), track=3,
                           song_name=u("victor whiskey xray"))
        with w.group():
            w.add_document(t=u("album"), album_name=u("yankee zulu one"))
            w.add_document(t=u("track"), track=1,
                           song_name=u("two three four"))
            w.add_document(t=u("track"), track=2,
                           song_name=u("five six seven"))
            w.add_document(t=u("track"), track=3,
                           song_name=u("eight nine ten"))

    with ix.searcher() as s:
        pq = query.Term("t", "album")
        aq = query.Term("album_name", "november")

        r = s.search(query.NestedChildren(pq, pq), limit=None)
        assert len(r) == 9
        assert [str(hit["t"]) for hit in r] == ["track"] * 9

        ncq = query.NestedChildren(pq, aq)
        assert list(ncq.docs(s)) == [5, 6, 7]
        r = s.search(ncq, limit=None)
        assert len(r) == 3
        assert [str(hit["song_name"]) for hit in r] == ["papa quebec romeo",
                                                        "sierra tango ultra",
                                                        "victor whiskey xray"]

        zq = query.NestedChildren(pq, query.Term("album_name", "zulu"))
        f = sorting.StoredFieldFacet("song_name")
        r = s.search(zq, sortedby=f)
        assert [hit["track"] for hit in r] == [3, 2, 1]
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.