Commits

Matt Chaput committed 811bcca

Started implementing lockless writes.

Comments (0)

Files changed (10)

src/whoosh/fields.py

         raise FieldConfigurationError("%r is not a Schema" % schema)
     return schema
     
-    
-    
 
+def merge_fielddict(d1, d2):
+    keyset = set(d1.keys()) | set(d2.keys())
+    out = {}
+    for name in keyset:
+        field1 = d1.get(name)
+        field2 = d2.get(name)
+        if field1 and field2 and field1 != field2:
+            raise Exception("Inconsistent field %r: %r != %r" % (name, field1, field2))
+        out[name] = field1 or field2
+    return out
+
+
+def merge_schema(s1, s2):
+    schema = Schema()
+    schema._fields = merge_fielddict(s1._fields, s2._fields)
+    schema._dyn_fields = merge_fielddict(s1._dyn_fields, s2._dyn_fields)
+    return schema
+
+
+def merge_schemas(schemas):
+    schema = schemas[0]
+    for i in xrange(1, len(schemas)):
+        schema = merge_schema(schema, schemas[i])
+    return schema
+

src/whoosh/filedb/fieldcache.py

         """
         :param basename: a prefix for filenames. This is usually the name of
             the reader's segment.
-        :param storage: a custom :class:`whoosh.store.Storage` object to use
-            for saving field caches. If this is ``None``, this object will not
+        :param storage: a :class:`whoosh.store.Storage` object to use for
+            saving field caches. If this is ``None``, this object will not
             save caches to disk.
         :param gzip_caches: if True, field caches saved to disk by this object
             will be compressed. Loading compressed caches is very slow, so you

src/whoosh/filedb/fileindex.py

 # those of the authors and should not be interpreted as representing official
 # policies, either expressed or implied, of Matt Chaput.
 
-import re, sys, uuid
+import re, sys
+from base64 import b64encode, b64decode
+from datetime import datetime
+from os import urandom
 from time import time, sleep
-from threading import Lock
 
 from whoosh import __version__
 from whoosh.compat import pickle, integer_types, string_type, iteritems
-from whoosh.fields import ensure_schema
+from whoosh.fields import ensure_schema, merge_schemas
 from whoosh.index import Index, EmptyIndexError, IndexVersionError, _DEF_INDEX_NAME
 from whoosh.reading import EmptyReader, MultiReader
 from whoosh.store import Storage, LockError
 
 # TOC read/write functions
 
-def _toc_filename(indexname, gen):
-    return "_%s_%s.toc" % (indexname, gen)
+ROOT_ID = "\x00" * 16
 
 
-def _toc_pattern(indexname):
-    """Returns a regular expression object that matches TOC filenames.
-    name is the name of the index.
-    """
-
-    return re.compile("^_%s_([0-9]+).toc$" % indexname)
-
-
-def _segment_pattern(indexname):
-    """Returns a regular expression object that matches segment filenames.
-    name is the name of the index.
-    """
-
-    return re.compile("(_%s_[0-9]+)\\..*" % indexname)
-
-
-def _latest_generation(storage, indexname):
-    pattern = _toc_pattern(indexname)
-
-    max = -1
-    for filename in storage:
-        m = pattern.match(filename)
-        if m:
-            num = int(m.group(1))
-            if num > max:
-                max = num
-    return max
-
-
-def _create_index(storage, schema, indexname=_DEF_INDEX_NAME):
-    # Clear existing files
-    prefix = "_%s_" % indexname
-    for filename in storage:
-        if filename.startswith(prefix):
-            storage.delete_file(filename)
+class Revision(object):
+    def __init__(self, indexname, id, schema, parentids=None, segments=None,
+                 release=None, created=None):
+        self.indexname = indexname
+        self.id = id if id is not None else self._make_id()
+        self.schema = schema
+        self.parentids = parentids or ()
+        self.segments = segments or ()
+        self.release = release
+        self.created = created
     
-    schema = ensure_schema(schema)
-    # Write a TOC file with an empty list of segments
-    _write_toc(storage, schema, indexname, 0, 0, [])
-
-
-def _write_toc(storage, schema, indexname, gen, segment_counter, segments):
-    schema = ensure_schema(schema)
-    schema.clean()
-
-    # Use a temporary file for atomic write.
-    tocfilename = _toc_filename(indexname, gen)
-    tempfilename = '%s.%s' % (tocfilename, time())
-    stream = storage.create_file(tempfilename)
-
-    stream.write_varint(_INT_SIZE)
-    stream.write_varint(_LONG_SIZE)
-    stream.write_varint(_FLOAT_SIZE)
-    stream.write_int(-12345)
-
-    stream.write_int(_INDEX_VERSION)
-    for num in __version__[:3]:
-        stream.write_varint(num)
-
-    stream.write_string(pickle.dumps(schema, -1))
-    stream.write_int(gen)
-    stream.write_int(segment_counter)
-    stream.write_pickle(segments)
-    stream.close()
-
-    # Rename temporary file to the proper filename
-    storage.rename_file(tempfilename, tocfilename, safe=True)
-
-
-class Toc(object):
-    def __init__(self, **kwargs):
-        for name, value in iteritems(kwargs):
-            setattr(self, name, value)
+    @staticmethod
+    def _make_id():
+        return urandom(12)
+    
+    @staticmethod
+    def _filename(indexname, id):
+        i = b64encode(id, "-_")
+        return "%s.%s.toc" % (indexname, i)
+    
+    @staticmethod
+    def regex(indexname):
+        pat = r"^%s\.(?P<id>.{12})\.toc$" % indexname
+        return re.compile(pat)
+    
+    @classmethod
+    def create(cls, storage, indexname, schema, parentids=None, segments=None):
+        rev = cls(indexname, cls._make_id(), schema, parentids, segments)
+        rev.store(storage)
+        return rev
+    
+    @classmethod
+    def load(cls, storage, indexname, id, schema=None):
+        fname = cls._filename(indexname, id)
+        stream = storage.open_file(fname)
+    
+        # Check size of data types
+        def check_size(name, target):
+            sz = stream.read_varint()
+            if sz != target:
+                raise IndexError("Index was created on different architecture:"
+                                 " saved %s=%s, this computer=%s" % (name, sz, target))
+        check_size("int", _INT_SIZE)
+        check_size("long", _LONG_SIZE)
+        check_size("float", _FLOAT_SIZE)
         
-
-def _read_toc(storage, schema, indexname):
-    gen = _latest_generation(storage, indexname)
-    if gen < 0:
-        raise EmptyIndexError("Index %r does not exist in %r" % (indexname, storage))
+        # Check byte order
+        if not stream.read_int() == -12345:
+            raise IndexError("Number misread: byte order problem?")
+        
+        # Check format version data
+        version = stream.read_int()
+        if version != _INDEX_VERSION:
+            raise IndexVersionError("Can't read format %s" % version, version)
+        # Load Whoosh version that created this TOC
+        release = stream.read_pickle()
+        # Read the list of parent IDs
+        parentids = stream.read_pickle()
+        # Check that the filename and internal ID match
+        _id = stream.read(16)
+        if _id != id:
+            raise Exception("ID in %s is %s" % (fname, b64encode(_id)))
+        # Creation date
+        created = stream.read_pickle()
+        # If a schema was supplied, use it instead of reading the one on disk
+        if schema:
+            stream.skip_string()
+        else:
+            schema = pickle.loads(stream.read_string())
+        # Load the segments
+        segments = stream.read_pickle()
+        stream.close()
+        return cls(indexname, id, schema, parentids, segments, release, created)
     
-    # Read the content of this index from the .toc file.
-    tocfilename = _toc_filename(indexname, gen)
-    stream = storage.open_file(tocfilename)
-
-    def check_size(name, target):
-        sz = stream.read_varint()
-        if sz != target:
-            raise IndexError("Index was created on different architecture:"
-                             " saved %s = %s, this computer = %s" % (name, sz, target))
-
-    check_size("int", _INT_SIZE)
-    check_size("long", _LONG_SIZE)
-    check_size("float", _FLOAT_SIZE)
-
-    if not stream.read_int() == -12345:
-        raise IndexError("Number misread: byte order problem")
-
-    version = stream.read_int()
-    if version != _INDEX_VERSION:
-        raise IndexVersionError("Can't read format %s" % version, version)
-    release = (stream.read_varint(), stream.read_varint(), stream.read_varint())
+    @classmethod
+    def find_all(cls, storage, indexname):
+        regex = cls.regex(indexname)
+        for fname in storage:
+            m = regex.match(fname)
+            if m:
+                yield b64decode(m.group("id"))
     
-    # If the user supplied a schema object with the constructor, don't load
-    # the pickled schema from the saved index.
-    if schema:
-        stream.skip_string()
-    else:
-        schema = pickle.loads(stream.read_string())
-    schema = ensure_schema(schema)
+    @classmethod
+    def load_all(cls, storage, indexname, schema=None, suppress=False):
+        for id in cls.find_all(storage, indexname):
+            try:
+                yield cls.load(storage, indexname, id, schema=schema)
+            except OSError:
+                if not suppress:
+                    raise
     
-    # Generation
-    index_gen = stream.read_int()
-    assert gen == index_gen
+    def __repr__(self):
+        return "<%s %s>" % (self.__class__.__name__, self.filename())
     
-    segment_counter = stream.read_int()
-    segments = stream.read_pickle()
+    def filename(self):
+        return self._filename(self.indexname, self.id)
     
-    stream.close()
-    return Toc(version=version, release=release, schema=schema,
-               segment_counter=segment_counter, segments=segments,
-               generation=gen)
-
-
-def _next_segment_name(self):
-    #Returns the name of the next segment in sequence.
-    if self.segment_num_lock is None:
-        self.segment_num_lock = Lock()
+    def store(self, storage):
+        schema = ensure_schema(self.schema)
+        schema.clean()
+        
+        tocfilename = self.filename()
+        tempfilename = '%s.%s' % (tocfilename, time())
+        stream = storage.create_file(tempfilename)
+        
+        # Write the sizes of datatypes to check system compatibility
+        stream.write_varint(_INT_SIZE)
+        stream.write_varint(_LONG_SIZE)
+        stream.write_varint(_FLOAT_SIZE)
+        # Write a dummy value to check byte order
+        stream.write_int(-12345)
+        # Write the index format version and Whoosh version
+        stream.write_int(_INDEX_VERSION)
+        stream.write_pickle(__version__)
+        # Write self
+        stream.write_pickle(tuple(self.parentids))
+        stream.write(self.id)
+        stream.write_pickle(datetime.utcnow())
+        stream.write_string(pickle.dumps(self.schema, -1))
+        stream.write_pickle(self.segments)
+        stream.close()
+        
+        # Rename temporary file to the proper filename
+        storage.rename_file(tempfilename, tocfilename, safe=True)
     
-    if self.segment_num_lock.acquire():
+    def delete_files(self, storage, suppress=True):
         try:
-            self.segment_counter += 1
-            return 
-        finally:
-            self.segment_num_lock.release()
-    else:
-        raise LockError
-
-
-def _clean_files(storage, indexname, gen, segments):
-    # Attempts to remove unused index files (called when a new generation
-    # is created). If existing Index and/or reader objects have the files
-    # open, they may not be deleted immediately (i.e. on Windows) but will
-    # probably be deleted eventually by a later call to clean_files.
-
-    current_segment_names = set(s.name for s in segments)
-
-    tocpattern = _toc_pattern(indexname)
-    segpattern = _segment_pattern(indexname)
-
-    todelete = set()
-    for filename in storage:
-        tocm = tocpattern.match(filename)
-        segm = segpattern.match(filename)
-        if tocm:
-            if int(tocm.group(1)) != gen:
-                todelete.add(filename)
-        elif segm:
-            name = segm.group(1)
-            if name not in current_segment_names:
-                todelete.add(filename)
-    
-    for filename in todelete:
-        try:
-            storage.delete_file(filename)
+            storage.delete_file(self.filename())
         except OSError:
-            # Another process still has this file open
-            pass
-
-
-# Index placeholder object
-
-class FileIndex(Index):
-    def __init__(self, storage, schema=None, indexname=_DEF_INDEX_NAME):
-        if not isinstance(storage, Storage):
-            raise ValueError("%r is not a Storage object" % storage)
-        if not isinstance(indexname, string_type):
-            raise ValueError("indexname %r is not a string" % indexname)
-        
-        if schema:
-            schema = ensure_schema(schema)
-        
-        self.storage = storage
-        self._schema = schema
-        self.indexname = indexname
-        
-        # Try reading the TOC to see if it's possible
-        _read_toc(self.storage, self._schema, self.indexname)
-
-    def __repr__(self):
-        return "%s(%r, %r)" % (self.__class__.__name__,
-                               self.storage, self.indexname)
-
-    def close(self):
-        pass
-
-    # add_field
-    # remove_field
-    
-    def latest_generation(self):
-        return _latest_generation(self.storage, self.indexname)
-    
-    # refresh
-    # up_to_date
-    
-    def last_modified(self):
-        gen = self.latest_generation()
-        filename = _toc_filename(self.indexname, gen)
-        return self.storage.file_modified(filename)
-
-    def is_empty(self):
-        return len(self._read_toc().segments) == 0
-    
-    def optimize(self):
-        w = self.writer()
-        w.commit(optimize=True)
-
-    # searcher
-    
-    def writer(self, **kwargs):
-        from whoosh.filedb.filewriting import SegmentWriter
-        return SegmentWriter(self, **kwargs)
-
-    def lock(self, name):
-        """Returns a lock object that you can try to call acquire() on to
-        lock the index.
-        """
-        
-        return self.storage.lock(self.indexname + "_" + name)
-
-    def _read_toc(self):
-        return _read_toc(self.storage, self._schema, self.indexname)
-
-    def _segments(self):
-        return self._read_toc().segments
-    
-    def _current_schema(self):
-        return self._read_toc().schema
-    
-    @property
-    def schema(self):
-        return self._current_schema()
-
-    @classmethod
-    def _reader(self, storage, schema, segments, generation, reuse=None):
-        from whoosh.filedb.filereading import SegmentReader
-        
-        reusable = {}
-        try:
-            if len(segments) == 0:
-                # This index has no segments! Return an EmptyReader object,
-                # which simply returns empty or zero to every method
-                return EmptyReader(schema)
-            
-            if reuse:
-                # Put all atomic readers in a dictionary keyed by their
-                # generation, so we can re-use them if them if possible
-                readers = [r for r, _ in reuse.leaf_readers()]
-                reusable = dict((r.generation(), r) for r in readers)
-            
-            # Make a function to open readers, which reuses reusable readers.
-            # It removes any readers it reuses from the "reusable" dictionary,
-            # so later we can close any readers left in the dictionary.
-            def segreader(segment):
-                gen = segment.generation
-                if gen in reusable:
-                    r = reusable[gen]
-                    del reusable[gen]
-                    return r
-                else:
-                    return SegmentReader(storage, schema, segment)
-            
-            if len(segments) == 1:
-                # This index has one segment, so return a SegmentReader object
-                # for the segment
-                return segreader(segments[0])
-            else:
-                # This index has multiple segments, so create a list of
-                # SegmentReaders for the segments, then composite them with a
-                # MultiReader
-                
-                readers = [segreader(segment) for segment in segments]
-                return MultiReader(readers, generation=generation)
-        finally:
-            for r in reusable.values():
-                r.close()
-
-    def reader(self, reuse=None):
-        retries = 10
-        while retries > 0:
-            # Read the information from the TOC file
-            try:
-                info = self._read_toc()
-                return self._reader(self.storage, info.schema, info.segments,
-                                    info.generation, reuse=reuse)
-            except IOError:
-                # Presume that we got a "file not found error" because a writer
-                # deleted one of the files just as we were trying to open it,
-                # and so retry a few times before actually raising the
-                # exception
-                e = sys.exc_info()[1]
-                retries -= 1
-                if retries <= 0:
-                    raise e
-                sleep(0.05)
+            if not suppress:
+                raise
 
 
 class Segment(object):
-    """Do not instantiate this object directly. It is used by the Index object
-    to hold information about a segment. A list of objects of this class are
-    pickled as part of the TOC file.
-    
-    The TOC file stores a minimal amount of information -- mostly a list of
-    Segment objects. Segments are the real reverse indexes. Having multiple
-    segments allows quick incremental indexing: just create a new segment for
-    the new documents, and have the index overlay the new segment over previous
-    ones for purposes of reading/search. "Optimizing" the index combines the
-    contents of existing segments into one (removing any deleted documents
-    along the way).
-    """
-
     EXTENSIONS = {"dawg": "dag",
                   "fieldlengths": "fln",
                   "storedfields": "sto",
                   "vectorindex": "vec",
                   "vectorposts": "vps"}
     
-    generation = 0
+    def __init__(self, indexname, id=None, doccount=0, fieldlength_totals=None,
+                 fieldlength_mins=None, fieldlength_maxes=None, deleted=None):
+        self.indexname = indexname
+        self.id = id or self._make_id()
+        self.doccount = doccount
+        self.fieldlength_totals = fieldlength_totals or {}
+        self.fieldlength_mins = fieldlength_mins or {}
+        self.fieldlength_maxes = fieldlength_maxes or {}
+        self.deleted = deleted
+        
+    @staticmethod
+    def _make_id():
+        return urandom(12)
     
-    def __init__(self, name, generation, doccount, fieldlength_totals,
-                 fieldlength_mins, fieldlength_maxes, deleted=None):
-        """
-        :param name: The name of the segment (the Index object computes this
-            from its name and the generation).
-        :param doccount: The maximum document number in the segment.
-        :param term_count: Total count of all terms in all documents.
-        :param fieldlength_totals: A dictionary mapping field names to the
-            total number of terms in that field across all documents in the
-            segment.
-        :param fieldlength_mins: A dictionary mapping field names to the
-            minimum length of that field across all documents.
-        :param fieldlength_maxes: A dictionary mapping field names to the
-            maximum length of that field across all documents.
-        :param deleted: A set of deleted document numbers, or None if no
-            deleted documents exist in this segment.
-        """
-
-        assert isinstance(name, string_type)
-        assert isinstance(doccount, integer_types)
-        assert fieldlength_totals is None or isinstance(fieldlength_totals, dict), "fl_totals=%r" % fieldlength_totals
-        assert fieldlength_maxes is None or isinstance(fieldlength_mins, dict), "fl_mins=%r" % fieldlength_maxes
-        assert fieldlength_maxes is None or isinstance(fieldlength_maxes, dict), "fl_maxes=%r" % fieldlength_maxes
-        
-        self.name = name
-        self.generation = generation
-        self.doccount = doccount
-        self.fieldlength_totals = fieldlength_totals
-        self.fieldlength_mins = fieldlength_mins
-        self.fieldlength_maxes = fieldlength_maxes
-        self.deleted = deleted
-        self.uuid = uuid.uuid4()
-        
+    @classmethod
+    def _idstring(cls, segid):
+        return b64encode(segid)
+    
+    @classmethod
+    def _basename(cls, indexname, segid):
+        return "%s.%s" % (indexname, cls._idstring(segid))
+    
+    @classmethod
+    def _make_filename(cls, indexname, segid, ext):
+        return "%s.%s" % (cls._basename(indexname, segid), ext)
+    
     def __repr__(self):
-        return "<%s %r %s>" % (self.__class__.__name__, self.name,
-                               getattr(self, "uuid", ""))
+        return "<%s %s>" % (self.__class__.__name__, b64encode(self.id))
 
     def __getattr__(self, name):
         # Capture accesses to e.g. Segment.fieldlengths_filename and return
                 return self.make_filename(self.EXTENSIONS[basename])
         
         raise AttributeError(name)
-
-    def copy(self):
-        return Segment(self.name, self.generation, self.doccount,
-                       self.fieldlength_totals, self.fieldlength_mins,
-                       self.fieldlength_maxes, self.deleted)
+    
+    def exists_in(self, storage):
+        return any(storage.file_exists(self.make_filename(ext))
+                   for ext in self.EXTENSIONS.values())
+    
+    def delete_files(self, storage, suppress=True):
+        for ext in self.EXTENSIONS.values():
+            fname = self.make_filename(ext)
+            if storage.file_exists(fname):
+                try:
+                    storage.delete_file(fname)
+                except OSError:
+                    if not suppress:
+                        raise
+    
+    def basename(self):
+        return self._basename(self.indexname, self.id)
 
     def make_filename(self, ext):
-        return "%s.%s" % (self.name, ext)
-
-    @classmethod
-    def basename(cls, indexname, segment_number):
-        return "_%s_%s" % (indexname, segment_number)
-
+        return self._make_filename(self.indexname, self.id, ext)
+    
     def doc_count_all(self):
         """
         :returns: the total number of documents, DELETED OR UNDELETED, in this
             return False
         return docnum in self.deleted
 
-    def __eq__(self, other):
-        return self.__class__ is type(other)
 
-    def __lt__(self, other):
-        return type(other) is self.__class__
+def _leaf_revs(storage, indexname, parentids=None):
+    parentids = parentids or set()
+    revs = list(Revision.load_all(storage, indexname, suppress=True))
+    for rev in revs:
+        parentids.update(rev.parentids)
+    return [rev for rev in revs if rev.id not in parentids]
 
-    def __ne__(self, other):
-        return not self.__eq__(other)
 
-    def __gt__(self, other):
-        return not (self.__lt__(other) or self.__eq__(other))
+def _create_index(storage, schema, indexname=_DEF_INDEX_NAME):
+    # Clear existing files
+    prefix = "%s." % indexname
+    for filename in storage:
+        if filename.startswith(prefix):
+            storage.delete_file(filename)
+    
+    # Create and store the root revision
+    schema = ensure_schema(schema)
+    return Revision.create(storage, indexname, schema)
 
-    def __le__(self, other):
-        return self.__eq__(other) or self.__lt__(other)
 
-    def __ge__(self, other):
-        return self.__eq__(other) or self.__gt__(other)
 
+    
+    
 
+# Index placeholder object
 
+class FileIndex(Index):
+    def __init__(self, storage, schema=None, indexname=_DEF_INDEX_NAME):
+        if not isinstance(storage, Storage):
+            raise ValueError("%r is not a Storage object" % storage)
+        if not isinstance(indexname, string_type):
+            raise ValueError("indexname %r is not a string" % indexname)
+        
+        if schema:
+            schema = ensure_schema(schema)
+        
+        self.storage = storage
+        self.indexname = indexname
+        self._schema = schema
+        
+        # Try reading the TOC to see if it's possible
+        #self._revision()
 
+    def __repr__(self):
+        return "%s(%r, %r)" % (self.__class__.__name__,
+                               self.storage, self.indexname)
 
+    def close(self):
+        pass
 
+    def _leaf_revisions(self):
+        return _leaf_revs(self.storage, self.indexname)
 
+    def _segments(self):
+        segs = {}
+        for rev in self._leaf_revisions():
+            for seg in rev.segments:
+                if seg.id in segs:
+                    raise Exception
+                segs[seg.id] = seg
+        return list(segs.values())
 
+    # add_field
+    # remove_field
+    
+    def latest_generation(self):
+        return tuple(rev.id for rev in self._leaf_revisions())
+    
+    # refresh
+    # up_to_date
+    
+    def last_modified(self):
+        return max(rev.created for rev in self._leaf_revisions())
 
+    def is_empty(self):
+        return sum(len(rev.segments) for rev in self._leaf_revisions()) == 0
+    
+    def optimize(self):
+        w = self.writer()
+        w.commit(optimize=True)
 
+    # searcher
+    
+    def writer(self, **kwargs):
+        from whoosh.filedb.filewriting import SegmentWriter
+        return SegmentWriter(self, **kwargs)
 
+    def lock(self, name):
+        """Returns a lock object that you can try to call acquire() on to
+        lock the index.
+        """
+        
+        return self.storage.lock(self.indexname + "_" + name)
 
+    @property
+    def schema(self):
+        return (self._schema
+                or merge_schemas([rev.schema for rev
+                                  in self._leaf_revisions()]))
 
+    @classmethod
+    def _reader(self, storage, schema, segments, reuse=None):
+        from whoosh.filedb.filereading import SegmentReader
+        
+        reusable = {}
+        try:
+            if len(segments) == 0:
+                # This index has no segments! Return an EmptyReader object,
+                # which simply returns empty or zero to every method
+                return EmptyReader(schema)
+            
+            if reuse:
+                # Put all atomic readers in a dictionary keyed by their
+                # generation, so we can re-use them if them if possible
+                readers = [r for r, _ in reuse.leaf_readers()]
+                reusable = dict((r.generation(), r) for r in readers)
+            
+            # Make a function to open readers, which reuses reusable readers.
+            # It removes any readers it reuses from the "reusable" dictionary,
+            # so later we can close any readers left in the dictionary.
+            def segreader(segment):
+                gen = segment.generation
+                if gen in reusable:
+                    r = reusable[gen]
+                    del reusable[gen]
+                    return r
+                else:
+                    return SegmentReader(storage, schema, segment)
+            
+            if len(segments) == 1:
+                # This index has one segment, so return a SegmentReader object
+                # for the segment
+                return segreader(segments[0])
+            else:
+                # This index has multiple segments, so create a list of
+                # SegmentReaders for the segments, then composite them with a
+                # MultiReader
+                
+                readers = [segreader(segment) for segment in segments]
+                return MultiReader(readers)
+        finally:
+            for r in reusable.values():
+                r.close()
 
+    def reader(self, reuse=None):
+        retries = 10
+        while retries > 0:
+            # Read the information from the TOC file
+            try:
+                segments = self._segments()
+                return self._reader(self.storage, self.schema, segments,
+                                    reuse=reuse)
+            except IOError:
+                # Presume that we got a "file not found error" because a writer
+                # deleted one of the files just as we were trying to open it,
+                # and so retry a few times before actually raising the
+                # exception
+                e = sys.exc_info()[1]
+                retries -= 1
+                if retries <= 0:
+                    raise e
+                sleep(0.05)
 
 
 
 
 
 
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+

src/whoosh/filedb/filereading.py

 # those of the authors and should not be interpreted as representing official
 # policies, either expressed or implied, of Matt Chaput.
 
+from base64 import b64encode
 from bisect import bisect_left
 from heapq import nlargest, nsmallest
 from threading import Lock
         self.schema = schema
         self.segment = segment
         
-        if hasattr(self.segment, "uuid"):
-            self.uuid_string = str(self.segment.uuid)
-        else:
-            import uuid
-            self.uuid_string = str(uuid.uuid4())
-        
         # Term index
         tf = storage.open_file(segment.termsindex_filename)
         self.termsindex = TermIndexReader(tf)
         return self.segment.is_deleted(docnum)
 
     def generation(self):
-        return self.segment.generation
+        return self.segment.id
 
     def _open_vectors(self):
         if self.vectorindex:
                 storage = self.storage
             else:
                 storage = None
-            cp = DefaultFieldCachingPolicy(self.segment.name, storage=storage)
+            cp = DefaultFieldCachingPolicy(self.segment.basename(), storage=storage)
         
         if type(cp) is type:
             cp = cp()
         self.caching_policy = cp
 
     def _fieldkey(self, fieldname):
-        return "%s/%s" % (self.uuid_string, fieldname)
+        return "%s/%s" % (self.segment.id, fieldname)
 
     def fieldcache(self, fieldname, save=SAVE_BY_DEFAULT):
         """Returns a :class:`whoosh.filedb.fieldcache.FieldCache` object for

src/whoosh/filedb/filewriting.py

     has_sqlite = False
 
 from whoosh.compat import integer_types, iteritems, text_type
-from whoosh.fields import UnknownFieldError
-from whoosh.filedb.fileindex import Segment
+from whoosh.fields import merge_schemas, UnknownFieldError
+from whoosh.filedb.fileindex import Revision, Segment
 from whoosh.filedb.filepostings import FilePostingWriter
 from whoosh.filedb.filetables import (TermIndexWriter, StoredFieldWriter,
                                       TermVectorWriter)
     """
 
     from whoosh.filedb.filereading import SegmentReader
+    
     newsegments = []
     sorted_segment_list = sorted((s.doc_count_all(), s) for s in segments)
     total_docs = 0
 
 class SegmentWriter(IndexWriter):
     def __init__(self, ix, poolclass=None, procs=0, blocklimit=128,
-                 timeout=0.0, delay=0.1, name=None, _lk=True, **poolargs):
-        
-        self.writelock = None
-        if _lk:
-            self.writelock = ix.lock("WRITELOCK")
-            if not try_for(self.writelock.acquire, timeout=timeout, delay=delay):
-                raise LockError
-        
-        info = ix._read_toc()
-        self.schema = info.schema
-        #self.ssnames = set(self.schema.special_spelling_names())
-        self.segments = info.segments
-        self.storage = ix.storage
+                 timeout=0.0, delay=0.1, **poolargs):
+        self.mergelock = ix.lock("mergelock")
+        self.storage = storage = ix.storage
         self.indexname = ix.indexname
+        self.id = Segment._make_id()
+        self.revs = list(ix._leaf_revisions())
+        self.schema = ix._schema or merge_schemas([rev.schema for rev in self.revs])
         self.is_closed = False
-        
         self.blocklimit = blocklimit
-        self.segment_number = info.segment_counter + 1
-        self.generation = info.generation + 1
-        
-        self._doc_offsets = []
-        base = 0
-        for s in self.segments:
-            self._doc_offsets.append(base)
-            base += s.doc_count_all()
-        
-        self.name = name or Segment.basename(self.indexname, self.segment_number)
+        self.timeout = timeout
+        self.delay = delay
         self.docnum = 0
         self.fieldlength_totals = defaultdict(int)
         self._added = False
-        self._unique_cache = {}
-    
-        # Create a temporary segment to use its .*_filename attributes
-        segment = Segment(self.name, self.generation, 0, None, None, None)
         
-        # Spelling
-        self.wordsets = {}
-        self.dawg = None
-        if any(field.spelling for field in self.schema):
-            self.dawgfile = self.storage.create_file(segment.dawg_filename)
-            self.dawg = DawgBuilder(field_root=True)
-        
-        # Terms index
-        tf = self.storage.create_file(segment.termsindex_filename)
-        ti = TermIndexWriter(tf)
-        # Term postings file
-        pf = self.storage.create_file(segment.termposts_filename)
-        pw = FilePostingWriter(pf, blocklimit=blocklimit)
-        # Terms writer
-        self.termswriter = TermsWriter(self.schema, ti, pw, self.dawg)
-        
-        if self.schema.has_vectored_fields():
-            # Vector index
-            vf = self.storage.create_file(segment.vectorindex_filename)
-            self.vectorindex = TermVectorWriter(vf)
-            
-            # Vector posting file
-            vpf = self.storage.create_file(segment.vectorposts_filename)
-            self.vpostwriter = FilePostingWriter(vpf, stringids=True)
-        else:
-            self.vectorindex = None
-            self.vpostwriter = None
-        
-        # Stored fields file
-        sf = self.storage.create_file(segment.storedfields_filename)
-        self.storedfields = StoredFieldWriter(sf, self.schema.stored_names())
-        
-        # Field lengths file
-        self.lengthfile = self.storage.create_file(segment.fieldlengths_filename)
-        
-        # Create the pool
+        # Pool
         if poolclass is None:
             if procs > 1:
                 from whoosh.filedb.multiproc import MultiPool
             else:
                 poolclass = TempfilePool
         self.pool = poolclass(self.schema, procs=procs, **poolargs)
+        
+        self.segments = []
+        self._doc_offsets = []
+        docbase = 0
+        for rev in self.revs:
+            for seg in rev.segments:
+                self.segments.append(seg)
+                self._doc_offsets.append(docbase)
+                docbase += seg.doc_count_all()
+        
+        # Filenames
+        newsegment = self._getsegment()
+        dawgname = newsegment.dawg_filename
+        termsname = newsegment.termsindex_filename
+        postsname = newsegment.termposts_filename
+        vectname = newsegment.vectorindex_filename
+        vpostsname = newsegment.vectorposts_filename
+        storedname = newsegment.storedfields_filename
+        lengthsname = newsegment.fieldlengths_filename
+        
+        # Create files
+        self.lengthfile = storage.create_file(lengthsname)
+        self.storedfields = StoredFieldWriter(storage.create_file(storedname),
+                                              self.schema.stored_names())
+        # Terms writer
+        self.wordsets = {}
+        self.dawg = None
+        if any(field.spelling for field in self.schema):
+            self.dawgfile = storage.create_file(dawgname)
+            self.dawg = DawgBuilder(field_root=True)
+        ti = TermIndexWriter(storage.create_file(termsname))
+        pw = FilePostingWriter(storage.create_file(postsname), blocklimit=blocklimit)
+        self.termswriter = TermsWriter(self.schema, ti, pw, self.dawg)
+        
+        # Vectors
+        self.vectorindex = self.vpostwriter = None
+        if self.schema.has_vectored_fields():
+            # Vector index
+            vf = storage.create_file(vectname)
+            self.vectorindex = TermVectorWriter(vf)
+            
+            # Vector posting file
+            vpf = storage.create_file(vpostsname)
+            self.vpostwriter = FilePostingWriter(vpf, stringids=True)
+    
+    def _getsegment(self):
+        return Segment(self.indexname, self.id, self.docnum,
+                       self.pool.fieldlength_totals(),
+                       self.pool.fieldlength_mins(),
+                       self.pool.fieldlength_maxes())
     
     def _check_state(self):
         if self.is_closed:
         from whoosh.filedb.fileindex import FileIndex
         
         return FileIndex._reader(self.storage, self.schema, self.segments,
-                                 self.generation, reuse=reuse)
+                                 reuse=reuse)
     
     def add_reader(self, reader):
         self._check_state()
         if self.vpostwriter:
             self.vpostwriter.close()
     
-    def _getsegment(self):
-        return Segment(self.name, self.generation, self.docnum,
-                       self.pool.fieldlength_totals(),
-                       self.pool.fieldlength_mins(),
-                       self.pool.fieldlength_maxes())
-    
     def commit(self, mergetype=None, optimize=False, merge=True):
         """Finishes writing and saves all additions and changes to disk.
         
         """
         
         self._check_state()
-        try:
-            if mergetype:
-                pass
-            elif optimize:
-                mergetype = OPTIMIZE
-            elif not merge:
-                mergetype = NO_MERGE
-            else:
-                mergetype = MERGE_SMALL
+        new_segments = []
+        if merge:
+            # Try to acquire the merge lock
+            to, delay = self.timeout, self.delay
+            gotlock = try_for(self.mergelock.acquire, timeout=to, delay=delay)
+            if gotlock:
+                try:
+                    if mergetype:
+                        pass
+                    elif optimize:
+                        mergetype = OPTIMIZE
+                    else:
+                        mergetype = MERGE_SMALL
+                    
+                    # Check that the segments we started with still exist
+                    segments = [seg for seg in self.segments
+                                if seg.exists_in(self.storage)]
+                    # Remember the IDs of the pre-merged segments
+                    orig_ids = set(seg.id for seg in segments)
+                    # Call the merge policy function. The policy may choose to
+                    # merge other segments into this writer's pool
+                    new_segments = mergetype(self, self.segments)
+                    # Find which segments got merged
+                    merged_ids = orig_ids - set(seg.id for seg in new_segments)
+                    
+                    # Delete leftover files
+                    for rev in self.revs:
+                        rev.delete_files(self.storage)
+                    for seg in segments:
+                        if seg.id in merged_ids:
+                            seg.delete_files()
+                finally:
+                    self.mergelock.release()
+        
+        if self._added:
+            # Tell the pool we're finished adding information, it should
+            # add its accumulated data to the lengths, terms index, and
+            # posting files.
+            self.pool.finish(self.termswriter, self.docnum, self.lengthfile)
             
-            # Call the merge policy function. The policy may choose to merge
-            # other segments into this writer's pool
-            new_segments = mergetype(self, self.segments)
+            # Write out spelling files
+            if self.dawg:
+                # Insert any wordsets we've accumulated into the word graph
+                self._add_wordsets()
+                # Write out the word graph
+                self.dawg.write(self.dawgfile)
             
-            if self._added:
-                # Create a Segment object for the segment created by this
-                # writer
-                thissegment = self._getsegment()
-                
-                # Tell the pool we're finished adding information, it should
-                # add its accumulated data to the lengths, terms index, and
-                # posting files.
-                self.pool.finish(self.termswriter, self.docnum, self.lengthfile)
+            # Create a Segment object for the segment created by this
+            # writer and add it to the list of new segments
+            thissegment = self._getsegment()
+            new_segments.append(thissegment)
             
-                # Write out spelling files
-                if self.dawg:
-                    # Insert any wordsets we've accumulated into the word graph
-                    self._add_wordsets()
-                    # Write out the word graph
-                    self.dawg.write(self.dawgfile)
-            
-                # Add new segment to the list of remaining segments returned by
-                # the merge policy function
-                new_segments.append(thissegment)
-            else:
-                self.pool.cleanup()
-            
-            # Close all files, write a new TOC with the new segment list, and
-            # release the lock.
+            # Close all files
             self._close_all()
             
-            from whoosh.filedb.fileindex import _write_toc, _clean_files
-            
-            _write_toc(self.storage, self.schema, self.indexname,
-                       self.generation, self.segment_number, new_segments)
-            
-            # Delete leftover files
-            _clean_files(self.storage, self.indexname, self.generation,
-                         new_segments)
-        
-        finally:
-            if self.writelock:
-                self.writelock.release()
+            # Write the new revision
+            parentids = [rev.id for rev in self.revs]
+            Revision.create(self.storage, self.indexname, self.schema,
+                            parentids, new_segments)
+        else:
+            self.pool.cleanup()
         
     def cancel(self):
         self._check_state()
-        try:
-            self.pool.cancel()
-            self._close_all()
-        finally:
-            if self.writelock:
-                self.writelock.release()
+        self.pool.cancel()
+        self._close_all()
 
 
 class TermsWriter(object):

src/whoosh/index.py

         
     def latest_generation(self):
         """Returns the generation number of the latest generation of this
-        index, or -1 if the backend doesn't support versioning.
+        index, or None if the backend doesn't support versioning.
         """
-        return -1
+        
+        return None
     
     def refresh(self):
         """Returns a new Index object representing the latest generation
         
         :returns: :class:`Index`
         """
+        
         return self
     
     def up_to_date(self):
         
         :param rtype: bool
         """
+        
         return True
     
     def last_modified(self):
-        """Returns the last modified time of the index, or -1 if the backend
+        """Returns the last modified time of the index, or None if the backend
         doesn't support last-modified times.
         """
-        return - 1
+        
+        return None
     
     def is_empty(self):
         """Returns True if this index is empty (that is, it has never had any
         
         :param rtype: bool
         """
+        
         raise NotImplementedError
     
     def optimize(self):
         """Optimizes this index, if necessary.
         """
+        
         pass
     
     def doc_count_all(self):

src/whoosh/support/base85.py

+"""
+This module contains generic base85 encoding and decoding functions. The
+whoosh.support.numeric module contains faster variants for encoding and
+decoding integers.
+
+Modified from:
+http://paste.lisp.org/display/72815
+"""
+
+import struct
+
+# Instead of using the character set from the ascii85 algorithm, I put the
+# characters in order so that the encoded text sorts properly (my life would be
+# a lot easier if they had just done that from the start)
+b85chars = "!$%&*+,-./0123456789:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ^_abcdefghijklmnopqrstuvwxyz{|}~"
+b85dec = {}
+for i in range(len(b85chars)):
+    b85dec[b85chars[i]] = i
+
+
+def b85encode(text, pad=False):
+    l = len(text)
+    r = l % 4
+    if r:
+        text += '\0' * (4 - r)
+    longs = len(text) >> 2
+    out = []
+    words = struct.unpack('>' + 'L' * longs, text[0:longs*4])
+    for word in words:
+        rems = [0, 0, 0, 0, 0]
+        for i in range(4, -1, -1):
+            rems[i] = b85chars[word % 85]
+            word /= 85
+        out.extend(rems)
+
+    out = ''.join(out)
+    if pad:
+        return out
+
+    # Trim padding
+    olen = l % 4
+    if olen:
+        olen += 1
+    olen += l / 4 * 5
+    return out[0:olen]
+
+
+def b85decode(text):
+    l = len(text)
+    out = []
+    for i in range(0, len(text), 5):
+        chunk = text[i:i+5]
+        acc = 0
+        for j in range(len(chunk)):
+            try:
+                acc = acc * 85 + b85dec[chunk[j]]
+            except KeyError:
+                raise TypeError('Bad base85 character at byte %d' % (i + j))
+        if acc > 4294967295:
+            raise OverflowError('Base85 overflow in hunk starting at byte %d' % i)
+        out.append(acc)
+
+    # Pad final chunk if necessary
+    cl = l % 5
+    if cl:
+        acc *= 85 ** (5 - cl)
+        if cl > 1:
+            acc += 0xffffff >> (cl - 2) * 8
+        out[-1] = acc
+
+    out = struct.pack('>' + 'L' * ((l + 4) / 5), *out)
+    if cl:
+        out = out[:-(5 - cl)]
+
+    return out

src/whoosh/support/numeric.py

 from array import array
 
 from whoosh.compat import long_type, xrange, PY3
+from whoosh.support.base85 import b85chars, b85dec
+
 
 _istruct = struct.Struct(">i")
 _qstruct = struct.Struct(">q")
 _qpack, _qunpack = _qstruct.pack, _qstruct.unpack
 _dpack, _dunpack = _dstruct.pack, _dstruct.unpack
 
-_max_sortable_int = long_type(4294967295)
-_max_sortable_long = long_type(18446744073709551615)
+_max_sortable_int = 4294967295
+_max_sortable_long = 18446744073709551615
 
 
 # Functions for converting numbers to and from sortable representations
 
 # Functions for encoding numeric values as sequences of 7-bit ascii characters
 
-# Instead of using the character set from the ascii85 algorithm, I put the
-# characters in order so that the encoded text sorts properly (my life would be
-# a lot easier if they had just done that from the start)
-_b85chars = "!$%&*+,-./0123456789:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ^_abcdefghijklmnopqrstuvwxyz{|}~"
-_b85dec = {}
-for i in range(len(_b85chars)):
-    _b85dec[_b85chars[i]] = i
-
-
 def to_base85(x, islong=False):
     "Encodes the given integer using base 85."
     
     size = 10 if islong else 5
     rems = ""
-    for i in xrange(size):
-        rems = _b85chars[x % 85] + rems
+    for _ in xrange(size):
+        rems = b85chars[x % 85] + rems
         x //= 85
     return rems
 
 
     acc = 0
     for c in text:
-        acc = acc * 85 + _b85dec[c]
+        acc = acc * 85 + b85dec[c]
     return acc
 
 
         x |= char
     x -= (1 << shift) - 1
     return int(x)
+
+
+

src/whoosh/util.py

 """
 
 from __future__ import with_statement
-import codecs
-import re
-import sys
-import time
+import codecs, random, re, sys, time
 from array import array
 from bisect import insort, bisect_left
 from copy import copy

tests/test_flexible.py

         w.remove_field("city")
         w.commit()
 
-        ixschema = ix._current_schema()
+        ixschema = ix.schema
         assert_equal(ixschema.names(), ["id"])
         assert_equal(ixschema.stored_names(), ["id"])