Source

whoosh / tests / test_reading.py

Full commit
from __future__ import with_statement
import random, threading, time

from nose.tools import assert_equal  # @UnresolvedImport

from whoosh import analysis, fields, formats, reading
from whoosh.compat import u, xrange
from whoosh.filedb.filereading import SegmentReader
from whoosh.filedb.filestore import RamStorage
from whoosh.ramindex import RamIndex
from whoosh.support.testing import TempIndex


def _create_index():
    s = fields.Schema(f1=fields.KEYWORD(stored=True),
                      f2=fields.KEYWORD,
                      f3=fields.KEYWORD)
    st = RamStorage()
    ix = st.create_index(s)
    return ix


def _one_segment_index():
    ix = _create_index()
    w = ix.writer()
    w.add_document(f1=u("A B C"), f2=u("1 2 3"), f3=u("X Y Z"))
    w.add_document(f1=u("D E F"), f2=u("4 5 6"), f3=u("Q R S"))
    w.add_document(f1=u("A E C"), f2=u("1 4 6"), f3=u("X Q S"))
    w.add_document(f1=u("A A A"), f2=u("2 3 5"), f3=u("Y R Z"))
    w.add_document(f1=u("A B"), f2=u("1 2"), f3=u("X Y"))
    w.commit()

    return ix


def _multi_segment_index():
    ix = _create_index()
    w = ix.writer()
    w.add_document(f1=u("A B C"), f2=u("1 2 3"), f3=u("X Y Z"))
    w.add_document(f1=u("D E F"), f2=u("4 5 6"), f3=u("Q R S"))
    w.commit()

    w = ix.writer()
    w.add_document(f1=u("A E C"), f2=u("1 4 6"), f3=u("X Q S"))
    w.add_document(f1=u("A A A"), f2=u("2 3 5"), f3=u("Y R Z"))
    w.commit(merge=False)

    w = ix.writer()
    w.add_document(f1=u("A B"), f2=u("1 2"), f3=u("X Y"))
    w.commit(merge=False)

    return ix


def _stats(r):
    return [(fname, text, ti.doc_frequency(), ti.weight())
            for (fname, text), ti in r]


def _fstats(r):
    return [(text, ti.doc_frequency(), ti.weight())
            for text, ti in r]


def test_readers():
    target = [("f1", u('A'), 4, 6), ("f1", u('B'), 2, 2), ("f1", u('C'), 2, 2),
              ("f1", u('D'), 1, 1), ("f1", u('E'), 2, 2), ("f1", u('F'), 1, 1),
              ("f2", u('1'), 3, 3), ("f2", u('2'), 3, 3), ("f2", u('3'), 2, 2),
              ("f2", u('4'), 2, 2), ("f2", u('5'), 2, 2), ("f2", u('6'), 2, 2),
              ("f3", u('Q'), 2, 2), ("f3", u('R'), 2, 2), ("f3", u('S'), 2, 2),
              ("f3", u('X'), 3, 3), ("f3", u('Y'), 3, 3), ("f3", u('Z'), 2, 2)]
    target = sorted(target)

    stored = [{"f1": "A B C"}, {"f1": "D E F"}, {"f1": "A E C"},
              {"f1": "A A A"}, {"f1": "A B"}]

    def t(ix):
        r = ix.reader()
        assert_equal(list(r.all_stored_fields()), stored)
        assert_equal(sorted(_stats(r)), target)

    ix = _one_segment_index()
    assert_equal(len(ix._segments()), 1)
    t(ix)

    ix = _multi_segment_index()
    assert_equal(len(ix._segments()), 3)
    t(ix)


def test_term_inspection():
    schema = fields.Schema(title=fields.TEXT(stored=True),
                           content=fields.TEXT)
    st = RamStorage()
    ix = st.create_index(schema)
    writer = ix.writer()
    writer.add_document(title=u("My document"),
                        content=u("AA AA BB BB CC AA AA AA BB BB CC DD EE EE"))
    writer.add_document(title=u("My other document"),
                        content=u("AA AB BB CC EE EE AX AX DD"))
    writer.commit()

    reader = ix.reader()
    assert_equal(list(reader.lexicon("content")),
                 [u('aa'), u('ab'), u('ax'), u('bb'), u('cc'), u('dd'),
                  u('ee')])
    assert_equal(list(reader.expand_prefix("content", "a")),
                 [u('aa'), u('ab'), u('ax')])
    assert_equal(set(reader.all_terms()),
                 set([('content', u('aa')), ('content', u('ab')),
                      ('content', u('ax')), ('content', u('bb')),
                      ('content', u('cc')), ('content', u('dd')),
                      ('content', u('ee')), ('title', u('document')),
                      ('title', u('my')), ('title', u('other'))]))
    # (text, doc_freq, index_freq)
    assert_equal(_fstats(reader.iter_field("content")),
                 [(u('aa'), 2, 6), (u('ab'), 1, 1), (u('ax'), 1, 2),
                  (u('bb'), 2, 5), (u('cc'), 2, 3), (u('dd'), 2, 2),
                  (u('ee'), 2, 4)])
    assert_equal(_fstats(reader.iter_field("content", prefix="c")),
                 [(u('cc'), 2, 3), (u('dd'), 2, 2), (u('ee'), 2, 4)])
    assert_equal(list(reader.most_frequent_terms("content")),
                 [(6, u('aa')), (5, u('bb')), (4, u('ee')), (3, u('cc')),
                  (2, u('dd'))])
    assert_equal(list(reader.most_frequent_terms("content", prefix="a")),
                 [(6, u('aa')), (2, u('ax')), (1, u('ab'))])
    assert_equal(list(reader.most_distinctive_terms("content", 3)),
                 [(1.3862943611198906, u('ax')), (0.6931471805599453, u('ab')),
                  (0.0, u('ee'))])


def test_vector_postings():
    s = fields.Schema(id=fields.ID(stored=True, unique=True),
                      content=fields.TEXT(vector=formats.Positions()))
    st = RamStorage()
    ix = st.create_index(s)

    writer = ix.writer()
    writer.add_document(id=u('1'),
                        content=u('the quick brown fox jumped over the ' +
                                  'lazy dogs'))
    writer.commit()
    r = ix.reader()

    terms = list(r.vector_as("weight", 0, "content"))
    assert_equal(terms, [(u('brown'), 1.0), (u('dogs'), 1.0), (u('fox'), 1.0),
                         (u('jumped'), 1.0), (u('lazy'), 1.0),
                         (u('over'), 1.0), (u('quick'), 1.0)])


def test_stored_fields():
    s = fields.Schema(a=fields.ID(stored=True), b=fields.STORED,
                      c=fields.KEYWORD, d=fields.TEXT(stored=True))
    st = RamStorage()
    ix = st.create_index(s)

    writer = ix.writer()
    writer.add_document(a=u("1"), b="a", c=u("zulu"), d=u("Alfa"))
    writer.add_document(a=u("2"), b="b", c=u("yankee"), d=u("Bravo"))
    writer.add_document(a=u("3"), b="c", c=u("xray"), d=u("Charlie"))
    writer.commit()

    with ix.searcher() as sr:
        assert_equal(sr.stored_fields(0),
                     {"a": u("1"), "b": "a", "d": u("Alfa")})
        assert_equal(sr.stored_fields(2),
                     {"a": u("3"), "b": "c", "d": u("Charlie")})

        assert_equal(sr.document(a=u("1")),
                     {"a": u("1"), "b": "a", "d": u("Alfa")})
        assert_equal(sr.document(a=u("2")),
                     {"a": u("2"), "b": "b", "d": u("Bravo")})


def test_stored_fields2():
    schema = fields.Schema(content=fields.TEXT(stored=True),
                           title=fields.TEXT(stored=True),
                           summary=fields.STORED,
                           path=fields.ID(stored=True),
                           helpid=fields.KEYWORD,
                           parent=fields.KEYWORD,
                           context=fields.KEYWORD(stored=True),
                           type=fields.KEYWORD(stored=True),
                           status=fields.KEYWORD(stored=True),
                           superclass=fields.KEYWORD(stored=True),
                           exampleFor=fields.KEYWORD(stored=True),
                           chapter=fields.KEYWORD(stored=True),
                           replaces=fields.KEYWORD,
                           time=fields.STORED,
                           methods=fields.STORED,
                           exampleFile=fields.STORED,
                           )

    storedkeys = ["chapter", "content", "context", "exampleFile",
                  "exampleFor", "methods", "path", "status", "summary",
                  "superclass", "time", "title", "type"]
    assert_equal(storedkeys, schema.stored_names())

    st = RamStorage()
    ix = st.create_index(schema)

    writer = ix.writer()
    writer.add_document(content=u("Content of this document."),
                        title=u("This is the title"),
                        summary=u("This is the summary"), path=u("/main"))
    writer.add_document(content=u("Second document."), title=u("Second title"),
                        summary=u("Summary numero due"), path=u("/second"))
    writer.add_document(content=u("Third document."), title=u("Title 3"),
                        summary=u("Summary treo"), path=u("/san"))
    writer.commit()
    ix.close()

    ix = st.open_index()
    with ix.searcher() as s:
        doc = s.document(path="/main")
        assert doc is not None
        assert ([doc[k] for k in sorted(doc.keys())]
                == ["Content of this document.", "/main",
                    "This is the summary", "This is the title"])

    ix.close()


def test_first_id():
    schema = fields.Schema(path=fields.ID(stored=True))
    ix = RamStorage().create_index(schema)

    w = ix.writer()
    w.add_document(path=u("/a"))
    w.add_document(path=u("/b"))
    w.add_document(path=u("/c"))
    w.commit()

    r = ix.reader()
    docid = r.first_id("path", u("/b"))
    assert_equal(r.stored_fields(docid), {"path": "/b"})

    ix = RamStorage().create_index(schema)
    w = ix.writer()
    w.add_document(path=u("/a"))
    w.add_document(path=u("/b"))
    w.add_document(path=u("/c"))
    w.commit(merge=False)

    w = ix.writer()
    w.add_document(path=u("/d"))
    w.add_document(path=u("/e"))
    w.add_document(path=u("/f"))
    w.commit(merge=False)

    w = ix.writer()
    w.add_document(path=u("/g"))
    w.add_document(path=u("/h"))
    w.add_document(path=u("/i"))
    w.commit(merge=False)

    r = ix.reader()
    assert_equal(r.__class__, reading.MultiReader)
    docid = r.first_id("path", u("/e"))
    assert_equal(r.stored_fields(docid), {"path": "/e"})


class RecoverReader(threading.Thread):
    def __init__(self, ix):
        threading.Thread.__init__(self)
        self.ix = ix

    def run(self):
        for _ in xrange(200):
            r = self.ix.reader()
            r.close()


class RecoverWriter(threading.Thread):
    domain = u("alfa bravo charlie deleta echo foxtrot golf hotel india")
    domain = domain.split()

    def __init__(self, ix):
        threading.Thread.__init__(self)
        self.ix = ix

    def run(self):
        for _ in xrange(20):
            w = self.ix.writer()
            w.add_document(text=random.sample(self.domain, 4))
            w.commit()
            time.sleep(0.05)


def test_delete_recovery():
    schema = fields.Schema(text=fields.TEXT)
    with TempIndex(schema, "delrecover") as ix:
        rw = RecoverWriter(ix)
        rr = RecoverReader(ix)
        rw.start()
        rr.start()
        rw.join()
        rr.join()


def test_nonexclusive_read():
    schema = fields.Schema(text=fields.TEXT)
    with TempIndex(schema, "readlock") as ix:
        for num in u("one two three four five").split():
            w = ix.writer()
            w.add_document(text=u("Test document %s") % num)
            w.commit(merge=False)

        def fn():
            for _ in xrange(5):
                r = ix.reader()
                assert_equal(list(r.lexicon("text")),
                             ["document", "five", "four", "one", "test",
                              "three", "two"])
                r.close()

        ths = [threading.Thread(target=fn) for _ in xrange(5)]
        for th in ths:
            th.start()
        for th in ths:
            th.join()


def test_doc_count():
    schema = fields.Schema(id=fields.NUMERIC)
    ix = RamStorage().create_index(schema)
    w = ix.writer()
    for i in xrange(10):
        w.add_document(id=i)
    w.commit()

    r = ix.reader()
    assert_equal(r.doc_count(), 10)
    assert_equal(r.doc_count_all(), 10)

    w = ix.writer()
    w.delete_document(2)
    w.delete_document(4)
    w.delete_document(6)
    w.delete_document(8)
    w.commit()

    r = ix.reader()
    assert_equal(r.doc_count(), 6)
    assert_equal(r.doc_count_all(), 10)

    w = ix.writer()
    for i in xrange(10, 15):
        w.add_document(id=i)
    w.commit(merge=False)

    r = ix.reader()
    assert_equal(r.doc_count(), 11)
    assert_equal(r.doc_count_all(), 15)

    w = ix.writer()
    w.delete_document(10)
    w.delete_document(12)
    w.delete_document(14)
    w.commit(merge=False)

    r = ix.reader()
    assert_equal(r.doc_count(), 8)
    assert_equal(r.doc_count_all(), 15)

    ix.optimize()
    r = ix.reader()
    assert_equal(r.doc_count(), 8)
    assert_equal(r.doc_count_all(), 8)


def test_reader_subclasses():
    from whoosh.support.testing import check_abstract_methods

    check_abstract_methods(reading.IndexReader, SegmentReader)
    check_abstract_methods(reading.IndexReader, reading.MultiReader)
    check_abstract_methods(reading.IndexReader, reading.EmptyReader)
    check_abstract_methods(reading.IndexReader, RamIndex)