Commits

Matt Chaput committed fa7dace

Basic implementation of lockless writing.

  • Participants
  • Parent commits 811bcca
  • Branches mpwrite

Comments (0)

Files changed (14)

src/whoosh/filedb/fileindex.py

 # those of the authors and should not be interpreted as representing official
 # policies, either expressed or implied, of Matt Chaput.
 
-import re, sys
-from base64 import b64encode, b64decode
+import copy, re, sys
 from datetime import datetime
 from os import urandom
 from time import time, sleep
 from whoosh.reading import EmptyReader, MultiReader
 from whoosh.store import Storage, LockError
 from whoosh.system import _INT_SIZE, _FLOAT_SIZE, _LONG_SIZE
+from whoosh.util import b64encode, b64decode
 
 
 _INDEX_VERSION = -110
 
 
-# TOC read/write functions
-
-ROOT_ID = "\x00" * 16
-
+def _make_id():
+    return urandom(12)
 
 class Revision(object):
-    def __init__(self, indexname, id, schema, parentids=None, segments=None,
+    def __init__(self, indexname, id, schema, segments=None,
                  release=None, created=None):
         self.indexname = indexname
-        self.id = id if id is not None else self._make_id()
+        self.id = id
         self.schema = schema
-        self.parentids = parentids or ()
         self.segments = segments or ()
         self.release = release
         self.created = created
     
     @staticmethod
-    def _make_id():
-        return urandom(12)
-    
-    @staticmethod
     def _filename(indexname, id):
-        i = b64encode(id, "-_")
+        i = b64encode(id)
         return "%s.%s.toc" % (indexname, i)
     
     @staticmethod
     def regex(indexname):
-        pat = r"^%s\.(?P<id>.{12})\.toc$" % indexname
+        pat = r"^%s\.(?P<id>.{16})\.toc$" % indexname
         return re.compile(pat)
     
     @classmethod
-    def create(cls, storage, indexname, schema, parentids=None, segments=None):
-        rev = cls(indexname, cls._make_id(), schema, parentids, segments)
+    def create(cls, storage, indexname, id, schema, segments=None):
+        rev = cls(indexname, id, schema, segments)
         rev.store(storage)
         return rev
     
             raise IndexVersionError("Can't read format %s" % version, version)
         # Load Whoosh version that created this TOC
         release = stream.read_pickle()
-        # Read the list of parent IDs
-        parentids = stream.read_pickle()
         # Check that the filename and internal ID match
-        _id = stream.read(16)
+        _id = stream.read(12)
         if _id != id:
             raise Exception("ID in %s is %s" % (fname, b64encode(_id)))
         # Creation date
         # Load the segments
         segments = stream.read_pickle()
         stream.close()
-        return cls(indexname, id, schema, parentids, segments, release, created)
+        return cls(indexname, id, schema, segments, release, created)
     
     @classmethod
     def find_all(cls, storage, indexname):
     
     @classmethod
     def load_all(cls, storage, indexname, schema=None, suppress=False):
+        revs = []
         for id in cls.find_all(storage, indexname):
             try:
-                yield cls.load(storage, indexname, id, schema=schema)
+                revs.append(cls.load(storage, indexname, id, schema=schema))
             except OSError:
                 if not suppress:
                     raise
+        return sorted(revs, key=lambda x: x.created)
     
     def __repr__(self):
         return "<%s %s>" % (self.__class__.__name__, self.filename())
         stream.write_int(_INDEX_VERSION)
         stream.write_pickle(__version__)
         # Write self
-        stream.write_pickle(tuple(self.parentids))
         stream.write(self.id)
         stream.write_pickle(datetime.utcnow())
         stream.write_string(pickle.dumps(self.schema, -1))
                   "vectorindex": "vec",
                   "vectorposts": "vps"}
     
-    def __init__(self, indexname, id=None, doccount=0, fieldlength_totals=None,
+    def __init__(self, indexname, id, doccount=0, fieldlength_totals=None,
                  fieldlength_mins=None, fieldlength_maxes=None, deleted=None):
         self.indexname = indexname
-        self.id = id or self._make_id()
+        self.id = id
         self.doccount = doccount
         self.fieldlength_totals = fieldlength_totals or {}
         self.fieldlength_mins = fieldlength_mins or {}
         self.fieldlength_maxes = fieldlength_maxes or {}
         self.deleted = deleted
         
-    @staticmethod
-    def _make_id():
-        return urandom(12)
-    
     @classmethod
     def _idstring(cls, segid):
         return b64encode(segid)
         return "%s.%s" % (cls._basename(indexname, segid), ext)
     
     def __repr__(self):
-        return "<%s %s>" % (self.__class__.__name__, b64encode(self.id))
+        r = "<%s %s %d" % (self.__class__.__name__, b64encode(self.id), self.doccount)
+        if self.deleted:
+            r += " " + ",".join(str(docnum) for docnum in sorted(self.deleted))
+        r += ">"
+        return r
 
     def __getattr__(self, name):
         # Capture accesses to e.g. Segment.fieldlengths_filename and return
         """
         :returns: the number of (undeleted) documents in this segment.
         """
+        
         return self.doccount - self.deleted_count()
 
     def has_deletions(self):
         return docnum in self.deleted
 
 
-def _leaf_revs(storage, indexname, parentids=None):
-    parentids = parentids or set()
-    revs = list(Revision.load_all(storage, indexname, suppress=True))
-    for rev in revs:
-        parentids.update(rev.parentids)
-    return [rev for rev in revs if rev.id not in parentids]
-
-
 def _create_index(storage, schema, indexname=_DEF_INDEX_NAME):
     # Clear existing files
     prefix = "%s." % indexname
     
     # Create and store the root revision
     schema = ensure_schema(schema)
-    return Revision.create(storage, indexname, schema)
+    return Revision.create(storage, indexname, _make_id(), schema)
 
 
+def segments_from_revs(revs):
+    segs = []
+    for rev in revs:
+        segs.extend(rev.segments)
+    return merge_segments(segs)
 
-    
-    
+
+def merge_segments(segments):
+    segs = []
+    byid = {}
+    # Merge the list of segments in each revision
+    for seg in segments:
+        if seg.id in byid:
+            first = byid[seg.id]
+            # Segment objects with the same id are identical except the
+            # set of deleted documents... merge that
+            if seg.deleted:
+                if first.deleted is None:
+                    first.deleted = seg.deleted
+                else:
+                    first.deleted.update(seg.deleted)
+        else:
+            # Make a copy so we can modify the deleted attr without
+            # having to worry if the caller expected the list of revs
+            # to remain unchanged
+            seg = copy.copy(seg)
+            byid[seg.id] = seg
+            segs.append(seg)
+    return segs
+
 
 # Index placeholder object
 
         self.indexname = indexname
         self._schema = schema
         
-        # Try reading the TOC to see if it's possible
-        #self._revision()
+        if len(self._revisions()) == 0:
+            raise EmptyIndexError
 
     def __repr__(self):
         return "%s(%r, %r)" % (self.__class__.__name__,
     def close(self):
         pass
 
-    def _leaf_revisions(self):
-        return _leaf_revs(self.storage, self.indexname)
-
+    def _revisions(self):
+        return list(Revision.load_all(self.storage, self.indexname, suppress=True))
+    
     def _segments(self):
-        segs = {}
-        for rev in self._leaf_revisions():
-            for seg in rev.segments:
-                if seg.id in segs:
-                    raise Exception
-                segs[seg.id] = seg
-        return list(segs.values())
+        return segments_from_revs(self._revisions())
 
     # add_field
     # remove_field
     
+    @staticmethod
+    def _comp_id(revs):
+        if len(revs) == 1:
+            return revs[0].id
+        else:
+            return tuple(sorted(rev.id for rev in revs))
+    
     def latest_generation(self):
-        return tuple(rev.id for rev in self._leaf_revisions())
+        return self._comp_id(self._revisions())
     
     # refresh
     # up_to_date
     
     def last_modified(self):
-        return max(rev.created for rev in self._leaf_revisions())
+        return max(rev.created for rev in self._revisions())
 
     def is_empty(self):
-        return sum(len(rev.segments) for rev in self._leaf_revisions()) == 0
+        return sum(len(rev.segments) for rev in self._revisions()) == 0
     
     def optimize(self):
         w = self.writer()
     def schema(self):
         return (self._schema
                 or merge_schemas([rev.schema for rev
-                                  in self._leaf_revisions()]))
+                                  in self._revisions()]))
 
-    @classmethod
-    def _reader(self, storage, schema, segments, reuse=None):
+    def _read_segments(self, schema, segments, reuse=None):
         from whoosh.filedb.filereading import SegmentReader
         
         reusable = {}
             # It removes any readers it reuses from the "reusable" dictionary,
             # so later we can close any readers left in the dictionary.
             def segreader(segment):
-                gen = segment.generation
-                if gen in reusable:
-                    r = reusable[gen]
-                    del reusable[gen]
+                segid = segment.id
+                if segid in reusable:
+                    r = reusable[segid]
+                    del reusable[segid]
                     return r
                 else:
-                    return SegmentReader(storage, schema, segment)
+                    return SegmentReader(self.storage, schema, segment)
             
             if len(segments) == 1:
                 # This index has one segment, so return a SegmentReader object
                 # MultiReader
                 
                 readers = [segreader(segment) for segment in segments]
-                return MultiReader(readers)
+                gen = tuple(r.generation() for r in readers)
+                return MultiReader(readers, generation=gen)
         finally:
             for r in reusable.values():
                 r.close()
             # Read the information from the TOC file
             try:
                 segments = self._segments()
-                return self._reader(self.storage, self.schema, segments,
-                                    reuse=reuse)
+                return self._read_segments(self.schema, segments, reuse=reuse)
             except IOError:
                 # Presume that we got a "file not found error" because a writer
                 # deleted one of the files just as we were trying to open it,

src/whoosh/filedb/filereading.py

 # those of the authors and should not be interpreted as representing official
 # policies, either expressed or implied, of Matt Chaput.
 
-from base64 import b64encode
 from bisect import bisect_left
 from heapq import nlargest, nsmallest
 from threading import Lock

src/whoosh/filedb/filewriting.py

 
 from whoosh.compat import integer_types, iteritems, text_type
 from whoosh.fields import merge_schemas, UnknownFieldError
-from whoosh.filedb.fileindex import Revision, Segment
+from whoosh.filedb.fileindex import (Revision, Segment, segments_from_revs,
+                                     _make_id)
 from whoosh.filedb.filepostings import FilePostingWriter
 from whoosh.filedb.filetables import (TermIndexWriter, StoredFieldWriter,
                                       TermVectorWriter)
 # Writer object
 
 class SegmentWriter(IndexWriter):
-    def __init__(self, ix, poolclass=None, procs=0, blocklimit=128,
+    def __init__(self, ix, id=None, poolclass=None, procs=0, blocklimit=128,
                  timeout=0.0, delay=0.1, **poolargs):
+        self._ix = ix
         self.mergelock = ix.lock("mergelock")
         self.storage = storage = ix.storage
         self.indexname = ix.indexname
-        self.id = Segment._make_id()
-        self.revs = list(ix._leaf_revisions())
+        self.id = id if id is not None else _make_id()
+        self.revs = list(ix._revisions())
         self.schema = ix._schema or merge_schemas([rev.schema for rev in self.revs])
         self.is_closed = False
         self.blocklimit = blocklimit
                 poolclass = TempfilePool
         self.pool = poolclass(self.schema, procs=procs, **poolargs)
         
-        self.segments = []
+        self.segments = segments_from_revs(self.revs)
         self._doc_offsets = []
         docbase = 0
-        for rev in self.revs:
-            for seg in rev.segments:
-                self.segments.append(seg)
-                self._doc_offsets.append(docbase)
-                docbase += seg.doc_count_all()
+        for seg in self.segments:
+            self._doc_offsets.append(docbase)
+            docbase += seg.doc_count_all()
         
         # Filenames
         newsegment = self._getsegment()
-        dawgname = newsegment.dawg_filename
-        termsname = newsegment.termsindex_filename
-        postsname = newsegment.termposts_filename
-        vectname = newsegment.vectorindex_filename
-        vpostsname = newsegment.vectorposts_filename
-        storedname = newsegment.storedfields_filename
-        lengthsname = newsegment.fieldlengths_filename
+        self.dawgname = newsegment.dawg_filename
+        self.termsname = newsegment.termsindex_filename
+        self.postsname = newsegment.termposts_filename
+        self.vectname = newsegment.vectorindex_filename
+        self.vpostsname = newsegment.vectorposts_filename
+        self.storedname = newsegment.storedfields_filename
+        self.lengthsname = newsegment.fieldlengths_filename
         
         # Create files
-        self.lengthfile = storage.create_file(lengthsname)
-        self.storedfields = StoredFieldWriter(storage.create_file(storedname),
+        self.lengthfile = storage.create_file(self.lengthsname)
+        self.storedfields = StoredFieldWriter(storage.create_file(self.storedname),
                                               self.schema.stored_names())
         # Terms writer
         self.wordsets = {}
         self.dawg = None
         if any(field.spelling for field in self.schema):
-            self.dawgfile = storage.create_file(dawgname)
+            self.dawgfile = storage.create_file(self.dawgname)
             self.dawg = DawgBuilder(field_root=True)
-        ti = TermIndexWriter(storage.create_file(termsname))
-        pw = FilePostingWriter(storage.create_file(postsname), blocklimit=blocklimit)
+        ti = TermIndexWriter(storage.create_file(self.termsname))
+        pw = FilePostingWriter(storage.create_file(self.postsname),
+                               blocklimit=blocklimit)
         self.termswriter = TermsWriter(self.schema, ti, pw, self.dawg)
         
         # Vectors
         self.vectorindex = self.vpostwriter = None
         if self.schema.has_vectored_fields():
             # Vector index
-            vf = storage.create_file(vectname)
+            vf = storage.create_file(self.vectname)
             self.vectorindex = TermVectorWriter(vf)
             
             # Vector posting file
-            vpf = storage.create_file(vpostsname)
+            vpf = storage.create_file(self.vpostsname)
             self.vpostwriter = FilePostingWriter(vpf, stringids=True)
     
     def _getsegment(self):
 
     def delete_document(self, docnum, delete=True):
         self._check_state()
-        if docnum >= sum(seg.doccount for seg in self.segments):
-            raise IndexingError("No document ID %r in this index" % docnum)
+        maxdoc = sum(seg.doccount for seg in self.segments)
+        if docnum >= maxdoc:
+            raise IndexingError("No document ID %r in this index (%d)" % (docnum, maxdoc))
         segment, segdocnum = self._segment_and_docnum(docnum)
         segment.delete_document(segdocnum, delete=delete)
 
 
     def reader(self, reuse=None):
         self._check_state()
-        from whoosh.filedb.fileindex import FileIndex
-        
-        return FileIndex._reader(self.storage, self.schema, self.segments,
-                                 reuse=reuse)
+        return self._ix._read_segments(self.schema, self.segments, reuse=reuse)
     
     def add_reader(self, reader):
         self._check_state()
         
         self.vectorindex.add((docnum, fieldname), offset)
     
-    def _close_all(self):
+    def _close_all(self, delete=False):
         self.is_closed = True
-        
         self.termswriter.close()
         self.storedfields.close()
         if not self.lengthfile.is_closed:
         if self.vpostwriter:
             self.vpostwriter.close()
     
+    def _delete_files(self):
+        storage = self.storage
+        def rem(name):
+            if storage.file_exists(name):
+                storage.delete_file(name)
+        rem(self.dawgname)
+        rem(self.termsname)
+        rem(self.postsname)
+        rem(self.vectname)
+        rem(self.vpostsname)
+        rem(self.storedname)
+        rem(self.lengthsname)
+    
     def commit(self, mergetype=None, optimize=False, merge=True):
         """Finishes writing and saves all additions and changes to disk.
         
         """
         
         self._check_state()
-        new_segments = []
+        segments = self.segments
         if merge:
             # Try to acquire the merge lock
             to, delay = self.timeout, self.delay
                         mergetype = MERGE_SMALL
                     
                     # Check that the segments we started with still exist
-                    segments = [seg for seg in self.segments
+                    segments = [seg for seg in segments
                                 if seg.exists_in(self.storage)]
                     # Remember the IDs of the pre-merged segments
                     orig_ids = set(seg.id for seg in segments)
                     # Call the merge policy function. The policy may choose to
                     # merge other segments into this writer's pool
-                    new_segments = mergetype(self, self.segments)
+                    new_segments = mergetype(self, segments)
                     # Find which segments got merged
                     merged_ids = orig_ids - set(seg.id for seg in new_segments)
                     
                         rev.delete_files(self.storage)
                     for seg in segments:
                         if seg.id in merged_ids:
-                            seg.delete_files()
+                            seg.delete_files(self.storage)
+                    segments = new_segments
                 finally:
                     self.mergelock.release()
         
             # Create a Segment object for the segment created by this
             # writer and add it to the list of new segments
             thissegment = self._getsegment()
-            new_segments.append(thissegment)
+            segments.append(thissegment)
             
             # Close all files
             self._close_all()
-            
-            # Write the new revision
-            parentids = [rev.id for rev in self.revs]
-            Revision.create(self.storage, self.indexname, self.schema,
-                            parentids, new_segments)
         else:
             self.pool.cleanup()
         
+        # Write new revision
+        return Revision.create(self.storage, self.indexname, self.id,
+                               self.schema, segments)
+        
     def cancel(self):
         self._check_state()
         self.pool.cancel()
         self._close_all()
+        self._delete_files()
 
 
 class TermsWriter(object):

src/whoosh/filedb/multiproc.py

 # Multiprocessing writer
 
 class SegmentWritingTask(Process):
-    def __init__(self, storage, indexname, segname, kwargs, jobqueue,
-                 resultqueue, firstjob=None):
+    def __init__(self, storage, indexname, kwargs, jobqueue, firstjob=None):
         Process.__init__(self)
         self.storage = storage
         self.indexname = indexname
-        self.segname = segname
         self.kwargs = kwargs
         self.jobqueue = jobqueue
-        self.resultqueue = resultqueue
         self.firstjob = firstjob
-        
-        self.segment = None
         self.running = True
     
     def _add_file(self, args):
     def run(self):
         jobqueue = self.jobqueue
         ix = self.storage.open_index(self.indexname)
-        writer = self.writer = SegmentWriter(ix, _lk=False, name=self.segname,
-                                             **self.kwargs)
+        writer = self.writer = SegmentWriter(ix, **self.kwargs)
         
         if self.firstjob:
             self._add_file(self.firstjob)
         if not self.running:
             writer.cancel()
         else:
-            writer.pool.finish(writer.termswriter, writer.docnum,
-                               writer.lengthfile)
-            writer._close_all()
-            self.resultqueue.put(writer._getsegment())
+            writer.commit(merge=False)
     
     def cancel(self):
         self.running = False
         self.kwargs = kwargs
         self.kwargs["dir"] = dir
         
-        self.segnames = []
         self.tasks = []
         self.jobqueue = Queue(self.procs * 4)
-        self.resultqueue = Queue()
         self.docbuffer = []
         
-        self.writelock = ix.lock("WRITELOCK")
-        self.writelock.acquire()
-        
-        info = ix._read_toc()
-        self.schema = info.schema
-        self.segment_number = info.segment_counter
-        self.generation = info.generation + 1
-        self.segments = info.segments
+        self.schema = ix.schema
         self.storage = ix.storage
         
     def _new_task(self, firstjob):
         ix = self.index
-        self.segment_number += 1
-        segmentname = Segment.basename(ix.indexname, self.segment_number)
-        task = SegmentWritingTask(ix.storage, ix.indexname, segmentname,
-                                  self.kwargs, self.jobqueue,
-                                  self.resultqueue, firstjob)
+        task = SegmentWritingTask(ix.storage, ix.indexname, self.kwargs,
+                                  self.jobqueue, firstjob)
         self.tasks.append(task)
         task.start()
         return task
         self.docbuffer = []
     
     def cancel(self):
-        try:
-            for task in self.tasks:
-                task.cancel()
-        finally:
-            self.writelock.release()
+        for task in self.tasks:
+            task.cancel()
     
     def add_document(self, **fields):
         self.docbuffer.append(fields)
             self._enqueue()
     
     def commit(self, **kwargs):
-        try:
-            # index the remaining stuff in self.docbuffer
-            self._enqueue()
-
-            for task in self.tasks:
-                self.jobqueue.put(None)
-            
-            for task in self.tasks:
-                task.join()
-            
-            for task in self.tasks:
-                taskseg = self.resultqueue.get()
-                assert isinstance(taskseg, Segment), type(taskseg)
-                self.segments.append(taskseg)
-            
-            self.jobqueue.close()
-            self.resultqueue.close()
-            
-            from whoosh.filedb.fileindex import _write_toc, _clean_files
-            _write_toc(self.storage, self.schema, self.index.indexname,
-                       self.generation, self.segment_number, self.segments)
-            
-            # Delete leftover files
-            _clean_files(self.storage, self.index.indexname,
-                         self.generation, self.segments)
-        finally:
-            self.writelock.release()
+        # index the remaining stuff in self.docbuffer
+        self._enqueue()
+        # Add sentries to the job queue
+        for task in self.tasks:
+            self.jobqueue.put(None)
+        # Wait for the tasks to finish
+        for task in self.tasks:
+            task.join()
+        # Clean up
+        self.jobqueue.close()
 
 
 # Multiprocessing pool

src/whoosh/filedb/structfile.py

 from whoosh.compat import integer_types, b
 from whoosh.system import (_INT_SIZE, _SHORT_SIZE, _FLOAT_SIZE, _LONG_SIZE,
                            pack_sbyte, pack_ushort, pack_int, pack_uint,
-                           pack_long, pack_float,
-                           unpack_sbyte, unpack_ushort, unpack_int,
-                           unpack_uint, unpack_long, unpack_float, IS_LITTLE)
+                           pack_long, pack_float, unpack_sbyte, unpack_ushort,
+                           unpack_int, unpack_uint, unpack_long, unpack_float,
+                           IS_LITTLE)
 from whoosh.util import (varint, read_varint, signed_varint,
                          decode_signed_varint, float_to_byte, byte_to_float)
 

src/whoosh/reading.py

     """Do not instantiate this object directly. Instead use Index.reader().
     """
 
-    def is_atomic(self):
-        return False
-
     def __init__(self, readers, generation=-1):
         self.readers = readers
         self._gen = generation
         offset = self.doc_offsets[segmentnum]
         return segmentnum, docnum - offset
 
+    def is_atomic(self):
+        return False
+
     def add_reader(self, reader):
         self.readers.append(reader)
         self.doc_offsets.append(self.base)

src/whoosh/util.py

 """
 
 from __future__ import with_statement
-import codecs, random, re, sys, time
+import base64, codecs, re, sys, time
 from array import array
 from bisect import insort, bisect_left
 from copy import copy
         return funcobj
 
 
+def b64encode(data):
+    return base64.b64encode(data, "-_")
+def b64decode(data):
+    return base64.b64decode(data, "-_")
 
 
 

tests/test_indexing.py

 from whoosh import fields, query
 from whoosh.compat import u, xrange, text_type, PY3
 from whoosh.filedb.filestore import RamStorage
-from whoosh.filedb.filewriting import NO_MERGE
 from whoosh.util import length_to_byte, byte_to_length, permutations
 from whoosh.writing import IndexingError
 from whoosh.support.testing import skip_if_unavailable, TempIndex, skip_if
     assert_equal(dr.max_field_length("f2"), 7)
     
 def test_merged_lengths():
-    s = fields.Schema(f1=fields.KEYWORD(stored=True, scorable=True),
+    s = fields.Schema(id=fields.ID,
+                      f1=fields.KEYWORD(stored=True, scorable=True),
                       f2=fields.KEYWORD(stored=True, scorable=True))
     with TempIndex(s, "mergedlengths") as ix:
         w = ix.writer()
-        w.add_document(f1=u("A B C"), f2=u("X"))
-        w.add_document(f1=u("B C D E"), f2=u("Y Z"))
+        w.add_document(id=u("1"), f1=u("A B C"), f2=u("X"))
+        w.add_document(id=u("2"), f1=u("B C D E"), f2=u("Y Z"))
         w.commit()
         
         w = ix.writer()
-        w.add_document(f1=u("A"), f2=u("B C D E X Y"))
-        w.add_document(f1=u("B C"), f2=u("X"))
-        w.commit(NO_MERGE)
+        w.add_document(id=u("3"), f1=u("A"), f2=u("B C D E X Y"))
+        w.add_document(id=u("4"), f1=u("B C"), f2=u("X"))
+        w.commit(merge=False)
         
         w = ix.writer()
-        w.add_document(f1=u("A B X Y Z"), f2=u("B C"))
-        w.add_document(f1=u("Y X"), f2=u("A B"))
-        w.commit(NO_MERGE)
+        w.add_document(id=u("5"), f1=u("A B X Y Z"), f2=u("B C"))
+        w.add_document(id=u("6"), f1=u("Y X"), f2=u("A B"))
+        w.commit(merge=False)
         
-        with ix.reader() as dr:
-            assert_equal(dr.stored_fields(0)["f1"], u("A B C"))
-            assert_equal(dr.doc_field_length(0, "f1"), 3)
-            assert_equal(dr.doc_field_length(2, "f2"), 6)
-            assert_equal(dr.doc_field_length(4, "f1"), 5)
+        with ix.searcher() as s:
+            docnum = s.document_number(id=u("1"))
+            assert_equal(s.stored_fields(docnum)["f1"], "A B C")
+            assert_equal(s.doc_field_length(docnum, "f1"), 3)
+            docnum = s.document_number(id=u("3"))
+            assert_equal(s.doc_field_length(docnum, "f2"), 6)
+            docnum = s.document_number(id=u("5"))
+            assert_equal(s.doc_field_length(docnum, "f1"), 5)
     
 def test_frequency_keyword():
     s = fields.Schema(content=fields.KEYWORD)

tests/test_lockless.py

+from __future__ import with_statement
+
+from nose.tools import assert_equal, assert_raises  #@UnresolvedImport
+
+from whoosh import analysis, highlight, fields, qparser, query
+from whoosh.compat import u
+from whoosh.filedb.filestore import RamStorage
+
+
+def test_writer():
+    schema = fields.Schema(text=fields.TEXT)
+    ix = RamStorage().create_index(schema)
+    w = ix.writer()
+    w.add_document(text=u("alfa bravo charlie delta alfa bravo"))
+    w.commit(merge=False)
+    r = ix.reader()
+    assert_equal(r.doc_count_all(), 1)
+    assert_equal(r.frequency("text", "alfa"), 2)
+

tests/test_reading.py

 
 from nose.tools import assert_equal  #@UnresolvedImport
 
-from whoosh import analysis, fields, formats, reading
+from whoosh import analysis, fields, formats, query, reading
 from whoosh.compat import u, xrange
 from whoosh.filedb.filereading import SegmentReader
 from whoosh.filedb.filestore import RamStorage
     w = ix.writer()
     w.add_document(f1 = u("A B"), f2 = u("1 2"), f3 = u("X Y"))
     w.commit(merge=False)
-    
     return ix
 
 def _stats(r):
             th.join()
 
 def test_doc_count():
-    schema = fields.Schema(id=fields.NUMERIC)
+    schema = fields.Schema(id=fields.ID(stored=True))
     ix = RamStorage().create_index(schema)
     w = ix.writer()
     for i in xrange(10):
-        w.add_document(id=i)
+        w.add_document(id=u("%s") % i)
     w.commit()
     
     r = ix.reader()
     assert_equal(r.doc_count_all(), 10)
     
     w = ix.writer()
-    w.delete_document(2)
-    w.delete_document(4)
-    w.delete_document(6)
-    w.delete_document(8)
+    w.delete_by_query(query.Term("id", "2"))
+    w.delete_by_query(query.Term("id", "4"))
+    w.delete_by_query(query.Term("id", "6"))
+    w.delete_by_query(query.Term("id", "8"))
     w.commit()
     
     r = ix.reader()
     
     w = ix.writer()
     for i in xrange(10, 15):
-        w.add_document(id=i)
+        w.add_document(id=u("%s") % i)
     w.commit(merge=False)
     
     r = ix.reader()
     assert_equal(r.doc_count_all(), 15)
     
     w = ix.writer()
-    w.delete_document(10)
-    w.delete_document(12)
-    w.delete_document(14)
+    w.delete_by_query(query.Term("id", "10"))
+    w.delete_by_query(query.Term("id", "12"))
+    w.delete_by_query(query.Term("id", "14"))
     w.commit(merge=False)
     
     r = ix.reader()

tests/test_searching.py

     schema = fields.Schema(id=fields.ID(stored=True))
     st = RamStorage()
     ix = st.create_index(schema)
+    print "    ix=", repr(ix.latest_generation())
     
     w = ix.writer()
     w.add_document(id=u("1"))
     w.add_document(id=u("2"))
     w.commit()
     
+    print "    ix=", repr(ix.latest_generation())
     s = ix.searcher()
+    print "reader=", repr(s.ixreader.generation())
+    print "    ix=", repr(ix.latest_generation())
     assert s.up_to_date()
     
     w = ix.writer()

tests/test_sorting.py

             with ix.searcher() as s:
                 results = s.search(query.Every(), groupedby="tag")
                 groups = results.groups("tag")
-                assert (sorted(groups.items())
-                        == [(u('one'), [0, 6]),
-                            (u('three'), [1, 3, 7, 8]),
-                            (u('two'), [2, 4, 5])])
+                target = [(u('one'), [0, 6]), (u('three'), [1, 3, 7, 8]),
+                          (u('two'), [2, 4, 5])]
+                assert_equal(sorted(groups.items()), target)
     
     check(make_single_index)
     check(make_multi_index)

tests/test_spans.py

     
     domain = ("alfa", "bravo", "charlie", "delta")
     
+    total = 0
     for _ in xrange(3):
         w = ix.writer()
         for ls in permutations(domain):
             w.add_document(content=u(" ").join(ls))
+            total += 1
         w.commit(merge=False)
     
+    print ix.storage.list()
+    print [(seg, seg.doccount) for seg in ix._segments()]
+    
+    assert_equal(ix.doc_count_all(), total)
     w = ix.writer()
     w.delete_document(5)
     w.delete_document(10)

tests/test_writing.py

         w.add_document(a=u("charlie"))
         w.commit()
 
-        tr = TermIndexReader(ix.storage.open_file("_readinline_1.trm"))
+        trms = [fname for fname in ix.storage if fname.endswith(".trm")]
+        assert_equal(len(trms), 1)
+        filename = trms[0]
+
+        tr = TermIndexReader(ix.storage.open_file(filename))
         for i, (_, terminfo) in enumerate(tr.items()):
             assert_equal(terminfo.postings[0], (i,))
             assert_equal(terminfo.postings[1], (1.0,))