Commits

Matt Chaput committed 9f5e774

Reduced iterations in various tests to speed up test suite.

Comments (0)

Files changed (10)

tests/test_columns.py

     domain = {}
     for _ in xrange(100):
         name = randstring(random.randint(5, 10))
-        value = randstring(10000)
+        value = randstring(2500)
         domain[name] = value
 
     outfiles = dict((name, BytesIO(value)) for name, value in domain.items())
 
     with TempStorage() as st:
-        msw = compound.CompoundWriter(st, buffersize=4096)
+        msw = compound.CompoundWriter(st, buffersize=1024)
         mfiles = {}
         for name in domain:
             mfiles[name] = msw.create_file(name)
         f = st.create_file("test")
         cw = col.writer(f)
         for i in xrange(size):
-            cw.add(i, str(i).encode("latin1"))
+            cw.add(i, hex(i).encode("latin1"))
         cw.finish(size)
         length = f.tell()
         f.close()
             v = cr[i]
             # Column ignores additional unique values after 65535
             if i <= 65535 - 1:
-                assert v == str(i).encode("latin1")
+                assert v == hex(i).encode("latin1")
             else:
                 assert v == b('')
         f.close()

tests/test_dawg.py

 
 def test_random():
     def randstring():
-        length = random.randint(1, 10)
+        length = random.randint(1, 5)
         a = array("B", (random.randint(0, 255) for _ in xrange(length)))
         return array_tobytes(a)
-    keys = sorted(randstring() for _ in xrange(1000))
+    keys = sorted(randstring() for _ in xrange(100))
 
     with TempStorage() as st:
         gwrite(keys, st)

tests/test_indexing.py

         assert not ix.is_empty()
 
 
-def _check_writer(name, writer_fn):
+def test_simple_indexing():
     schema = fields.Schema(text=fields.TEXT, id=fields.STORED)
     domain = (u("alfa"), u("bravo"), u("charlie"), u("delta"), u("echo"),
               u("foxtrot"), u("golf"), u("hotel"), u("india"), u("juliet"),
               u("kilo"), u("lima"), u("mike"), u("november"))
     docs = defaultdict(list)
-    with TempIndex(schema, name) as ix:
-        w = writer_fn(ix)
-        for i in xrange(1000):
-            smp = random.sample(domain, 5)
-            for word in smp:
-                docs[word].append(i)
-            w.add_document(text=u(" ").join(smp), id=i)
-        w.commit()
+    with TempIndex(schema, "simple") as ix:
+        with ix.writer() as w:
+            for i in xrange(100):
+                smp = random.sample(domain, 5)
+                for word in smp:
+                    docs[word].append(i)
+                w.add_document(text=u(" ").join(smp), id=i)
 
         with ix.searcher() as s:
             for word in domain:
                 assert rset == docs[word]
 
 
-def test_simple_indexing():
-    _check_writer("simplew", lambda ix: ix.writer())
-
-
 def test_integrity():
     s = fields.Schema(name=fields.TEXT, value=fields.TEXT)
     st = RamStorage()
     schema = fields.Schema(num=fields.NUMERIC(unique=True, stored=True),
                            text=fields.ID(stored=True))
     with TempIndex(schema, "updatenum") as ix:
-        nums = list(range(10)) * 3
+        nums = list(range(5)) * 3
         random.shuffle(nums)
         for num in nums:
             with ix.writer() as w:
         with ix.searcher() as s:
             results = [d["text"] for _, d in s.iter_docs()]
             results = " ".join(sorted(results))
-            assert results == "0 1 2 3 4 5 6 7 8 9"
+            assert results == "0 1 2 3 4"
 
 
 def test_reindex():

tests/test_mpwriter.py

     docs = []
     # A ring buffer for creating string values
     buf = deque()
-    for ls in permutations(u("abcdef")):
+    for ls in permutations(u("abcd")):
         word = "".join(ls)
         # Remember this word is in the index (to check lexicon)
         words.append(word)

tests/test_queries.py

         else:
             return Or([nq(level - 1), nq(level - 1), nq(level - 1)])
 
-    q = nq(7)
+    q = nq(5)
     q = q.normalize()
     assert q == Or([Term("a", u("a")), Term("a", u("b"))])
 

tests/test_reading.py

         self.ix = ix
 
     def run(self):
-        for _ in xrange(200):
+        for _ in xrange(50):
             r = self.ix.reader()
             r.close()
 
         self.ix = ix
 
     def run(self):
-        for _ in xrange(20):
+        for _ in xrange(10):
             w = self.ix.writer()
             w.add_document(text=random.sample(self.domain, 4))
             w.commit()
-            time.sleep(0.05)
+            time.sleep(0.01)
 
 
 def test_delete_recovery():

tests/test_searching.py

 
 
 def test_ors():
-    domain = u("alfa bravo charlie delta echo foxtrot").split()
+    domain = u("alfa bravo charlie delta").split()
     s = fields.Schema(num=fields.STORED, text=fields.TEXT)
     st = RamStorage()
     ix = st.create_index(s)
 
 
 def test_open_numeric_ranges():
-    domain = range(0, 10000, 7)
+    domain = range(0, 1000, 7)
 
     schema = fields.Schema(num=fields.NUMERIC(stored=True))
     ix = RamStorage().create_index(schema)
         r = [hit["num"] for hit in s.search(q, limit=None)]
         assert r == [n for n in domain if n >= 100]
 
-        q = qp.parse("[to 5000]")
+        q = qp.parse("[to 500]")
         r = [hit["num"] for hit in s.search(q, limit=None)]
-        assert r == [n for n in domain if n <= 5000]
+        assert r == [n for n in domain if n <= 500]
 
 
 def test_open_date_ranges():
         r = s.search(query.Term("text", "bravo"), filter=fq)
         assert [d["id"] for d in r] == [1, 2, 5, 7, ]
 
-
 def test_timelimit():
     schema = fields.Schema(text=fields.TEXT)
     ix = RamStorage().create_index(schema)

tests/test_sorting.py

         q = query.Term("ev", u("a"))
 
     correct = [d["id"] for d in sorted(docs, key=key, reverse=reverse)][:limit]
+    schema = get_schema()
 
     for fn in (make_single_index, make_multi_index):
-        with TempIndex(get_schema()) as ix:
-            fn(ix)
-            with ix.searcher() as s:
-                r = s.search(q, sortedby=sortedby, limit=limit,
-                             reverse=reverse)
-                rids = [d["id"] for d in r]
-                assert rids == correct
+        ix = RamStorage().create_index(schema)
+        fn(ix)
+        with ix.searcher() as s:
+            r = s.search(q, sortedby=sortedby, limit=limit,
+                         reverse=reverse)
+            rids = [d["id"] for d in r]
+            assert rids == correct
 
 
 def test_sortedby():

tests/test_tables.py

 
 
 def test_random_access():
-    times = 10000
+    times = 1000
     with TempStorage("orderedhash") as st:
         hw = HashWriter(st.create_file("test.hsh"))
         hw.add_all((b("%08x" % x), b(str(x))) for x in xrange(times))

tests/test_writing.py

 def test_buffered():
     schema = fields.Schema(id=fields.ID, text=fields.TEXT)
     with TempIndex(schema, "buffered") as ix:
-        domain = (u("alfa"), u("bravo"), u("charlie"), u("delta"), u("echo"),
-                  u("foxtrot"), u("golf"), u("hotel"), u("india"))
+        domain = u("alfa bravo charlie delta echo foxtrot golf hotel india")
+        domain = domain.split()
 
         w = writing.BufferedWriter(ix, period=None, limit=10,
                                    commitargs={"merge": False})
-        for i in xrange(100):
+        for i in xrange(20):
             w.add_document(id=text_type(i),
                            text=u(" ").join(random.sample(domain, 5)))
-        time.sleep(0.5)
+        time.sleep(0.1)
         w.close()
 
-        assert len(ix._segments()) == 10
+        assert len(ix._segments()) == 2
 
 
 def test_buffered_search():
     with TempIndex(schema, "buffthreads") as ix:
         class SimWriter(threading.Thread):
             def run(self):
-                for _ in xrange(10):
+                for _ in xrange(5):
                     w.update_document(name=random.choice(domain))
                     time.sleep(random.uniform(0.01, 0.1))
 
         w = writing.BufferedWriter(ix, limit=10)
-        threads = [SimWriter() for _ in xrange(10)]
+        threads = [SimWriter() for _ in xrange(5)]
         for thread in threads:
             thread.start()
         for thread in threads: