Commits

Matt Chaput  committed 6b726d9

- In process of replacing multiprocessing writers -- non-functional in this commit.
- Min/max/total field lengths now stored in lengths file instead of segment.
- Greatly simplified file indexing code.
- Simplified testing objects.
- Replaced filedb.pools with support.externalsort.
- Decoupled segments from generations, in anticipation of future work.
- Rewrote "separate spelling" code path.
- Replaced LRU decorator with simpler "double barrel" implementation.
- Added create_temp() method to Storage.
- Docstring fixes.

  • Participants
  • Parent commits 9a2eb76

Comments (0)

Files changed (17)

File src/whoosh/fields.py

         (i.e. the ``spelling`` attribute is False).
         """
 
-        if not self.spelling:
-            # If the field doesn't support spelling, it doesn't support
-            # separate spelling
-            return False
-        elif not self.indexed:
-            # The field is marked as supporting spelling, but isn't indexed, so
-            # we need to generate the spelling words separately from indexing
-            return True
-        elif self.analyzer.has_morph():
-            # The analyzer has morphological transformations (e.g. stemming),
-            # so the words that go into the word graph need to be generated
-            # separately without the transformations.
-            return True
-        else:
-            return False
+        return self.spelling and self.analyzer.has_morph()
 
     def spellable_words(self, value):
         """Returns an iterator of each unique word (in sorted order) in the

File src/whoosh/filedb/fileindex.py

 # those of the authors and should not be interpreted as representing official
 # policies, either expressed or implied, of Matt Chaput.
 
-import re, sys, uuid
+import random, re, sys
 from time import time, sleep
-from threading import Lock
 
 from whoosh import __version__
-from whoosh.compat import pickle, integer_types, string_type, iteritems
+from whoosh.compat import pickle, integer_types, string_type, xrange
 from whoosh.fields import ensure_schema
 from whoosh.index import (Index, EmptyIndexError, IndexVersionError,
                           _DEF_INDEX_NAME)
 from whoosh.reading import EmptyReader, MultiReader
-from whoosh.store import Storage, LockError
+from whoosh.store import Storage
 from whoosh.system import _INT_SIZE, _FLOAT_SIZE, _LONG_SIZE
 
 
 _INDEX_VERSION = -110
 
 
-# TOC read/write functions
-
-def _toc_filename(indexname, gen):
-    return "_%s_%s.toc" % (indexname, gen)
-
-
-def _toc_pattern(indexname):
-    """Returns a regular expression object that matches TOC filenames.
-    name is the name of the index.
+class TOC(object):
+    """Object representing the state of the complete index after a commit.
     """
 
-    return re.compile("^_%s_([0-9]+).toc$" % indexname)
+    @classmethod
+    def _filename(cls, indexname, gen):
+        return "_%s_%s.toc" % (indexname, gen)
 
+    @classmethod
+    def _pattern(cls, indexname):
+        return re.compile("^_%s_([0-9]+).toc$" % indexname)
 
-def _segment_pattern(indexname):
-    """Returns a regular expression object that matches segment filenames.
-    name is the name of the index.
-    """
+    @classmethod
+    def _segment_pattern(cls, indexname):
+        return re.compile("(_%s_[0-9a-z]+)[.][a-z]+" % indexname)
 
-    return re.compile("(_%s_[0-9]+)\\..*" % indexname)
+    @classmethod
+    def _latest_generation(cls, storage, indexname):
+        pattern = cls._pattern(indexname)
 
+        mx = -1
+        for filename in storage:
+            m = pattern.match(filename)
+            if m:
+                mx = max(int(m.group(1)), mx)
+        return mx
 
-def _latest_generation(storage, indexname):
-    pattern = _toc_pattern(indexname)
+    @classmethod
+    def create(cls, storage, schema, indexname=_DEF_INDEX_NAME):
+        schema = ensure_schema(schema)
 
-    max = -1
-    for filename in storage:
-        m = pattern.match(filename)
-        if m:
-            num = int(m.group(1))
-            if num > max:
-                max = num
-    return max
+        # Clear existing files
+        prefix = "_%s_" % indexname
+        for filename in storage:
+            if filename.startswith(prefix):
+                storage.delete_file(filename)
 
+        # Write a TOC file with an empty list of segments
+        toc = cls(schema, [], 0)
+        toc.write(storage, indexname)
 
-def _create_index(storage, schema, indexname=_DEF_INDEX_NAME):
-    # Clear existing files
-    prefix = "_%s_" % indexname
-    for filename in storage:
-        if filename.startswith(prefix):
-            storage.delete_file(filename)
+    @classmethod
+    def read(cls, storage, indexname, gen=None, schema=None):
+        if gen is None:
+            gen = cls._latest_generation(storage, indexname)
+            if gen < 0:
+                raise EmptyIndexError("Index %r does not exist in %r"
+                                      % (indexname, storage))
 
-    schema = ensure_schema(schema)
-    # Write a TOC file with an empty list of segments
-    _write_toc(storage, schema, indexname, 0, 0, [])
+        # Read the content of this index from the .toc file.
+        tocfilename = cls._filename(indexname, gen)
+        stream = storage.open_file(tocfilename)
 
+        def check_size(name, target):
+            sz = stream.read_varint()
+            if sz != target:
+                raise IndexError("Index was created on different architecture:"
+                                 " saved %s = %s, this computer = %s"
+                                 % (name, sz, target))
 
-def _write_toc(storage, schema, indexname, gen, segment_counter, segments):
-    schema = ensure_schema(schema)
-    schema.clean()
+        check_size("int", _INT_SIZE)
+        check_size("long", _LONG_SIZE)
+        check_size("float", _FLOAT_SIZE)
 
-    # Use a temporary file for atomic write.
-    tocfilename = _toc_filename(indexname, gen)
-    tempfilename = '%s.%s' % (tocfilename, time())
-    stream = storage.create_file(tempfilename)
+        if not stream.read_int() == -12345:
+            raise IndexError("Number misread: byte order problem")
 
-    stream.write_varint(_INT_SIZE)
-    stream.write_varint(_LONG_SIZE)
-    stream.write_varint(_FLOAT_SIZE)
-    stream.write_int(-12345)
+        version = stream.read_int()
+        if version != _INDEX_VERSION:
+            raise IndexVersionError("Can't read format %s" % version, version)
+        release = (stream.read_varint(), stream.read_varint(),
+                   stream.read_varint())
 
-    stream.write_int(_INDEX_VERSION)
-    for num in __version__[:3]:
-        stream.write_varint(num)
+        # If the user supplied a schema object with the constructor, don't load
+        # the pickled schema from the saved index.
+        if schema:
+            stream.skip_string()
+        else:
+            schema = pickle.loads(stream.read_string())
+        schema = ensure_schema(schema)
 
-    stream.write_string(pickle.dumps(schema, -1))
-    stream.write_int(gen)
-    stream.write_int(segment_counter)
-    stream.write_pickle(segments)
-    stream.close()
+        # Generation
+        index_gen = stream.read_int()
+        assert gen == index_gen
 
-    # Rename temporary file to the proper filename
-    storage.rename_file(tempfilename, tocfilename, safe=True)
+        _ = stream.read_int()  # Unused
+        segments = stream.read_pickle()
 
+        stream.close()
+        return cls(schema, segments, gen, version=version, release=release)
 
-class Toc(object):
-    def __init__(self, **kwargs):
-        for name, value in iteritems(kwargs):
-            setattr(self, name, value)
+    def __init__(self, schema, segments, generation,
+                 version=_INDEX_VERSION, release=__version__):
+        self.schema = schema
+        self.segments = segments
+        self.generation = generation
+        self.version = version
+        self.release = release
 
+    def write(self, storage, indexname):
+        schema = ensure_schema(self.schema)
+        schema.clean()
 
-def _read_toc(storage, schema, indexname):
-    gen = _latest_generation(storage, indexname)
-    if gen < 0:
-        raise EmptyIndexError("Index %r does not exist in %r"
-                              % (indexname, storage))
+        # Use a temporary file for atomic write.
+        tocfilename = self._filename(indexname, self.generation)
+        tempfilename = '%s.%s' % (tocfilename, time())
+        stream = storage.create_file(tempfilename)
 
-    # Read the content of this index from the .toc file.
-    tocfilename = _toc_filename(indexname, gen)
-    stream = storage.open_file(tocfilename)
+        stream.write_varint(_INT_SIZE)
+        stream.write_varint(_LONG_SIZE)
+        stream.write_varint(_FLOAT_SIZE)
+        stream.write_int(-12345)
 
-    def check_size(name, target):
-        sz = stream.read_varint()
-        if sz != target:
-            raise IndexError("Index was created on different architecture:"
-                             " saved %s = %s, this computer = %s"
-                             % (name, sz, target))
+        stream.write_int(_INDEX_VERSION)
+        for num in __version__[:3]:
+            stream.write_varint(num)
 
-    check_size("int", _INT_SIZE)
-    check_size("long", _LONG_SIZE)
-    check_size("float", _FLOAT_SIZE)
+        stream.write_string(pickle.dumps(schema, -1))
+        stream.write_int(self.generation)
+        stream.write_int(0)  # Unused
+        stream.write_pickle(self.segments)
+        stream.close()
 
-    if not stream.read_int() == -12345:
-        raise IndexError("Number misread: byte order problem")
-
-    version = stream.read_int()
-    if version != _INDEX_VERSION:
-        raise IndexVersionError("Can't read format %s" % version, version)
-    release = (stream.read_varint(), stream.read_varint(),
-               stream.read_varint())
-
-    # If the user supplied a schema object with the constructor, don't load
-    # the pickled schema from the saved index.
-    if schema:
-        stream.skip_string()
-    else:
-        schema = pickle.loads(stream.read_string())
-    schema = ensure_schema(schema)
-
-    # Generation
-    index_gen = stream.read_int()
-    assert gen == index_gen
-
-    segment_counter = stream.read_int()
-    segments = stream.read_pickle()
-
-    stream.close()
-    return Toc(version=version, release=release, schema=schema,
-               segment_counter=segment_counter, segments=segments,
-               generation=gen)
-
-
-def _next_segment_name(self):
-    #Returns the name of the next segment in sequence.
-    if self.segment_num_lock is None:
-        self.segment_num_lock = Lock()
-
-    if self.segment_num_lock.acquire():
-        try:
-            self.segment_counter += 1
-            return
-        finally:
-            self.segment_num_lock.release()
-    else:
-        raise LockError
+        # Rename temporary file to the proper filename
+        storage.rename_file(tempfilename, tocfilename, safe=True)
 
 
 def _clean_files(storage, indexname, gen, segments):
     # open, they may not be deleted immediately (i.e. on Windows) but will
     # probably be deleted eventually by a later call to clean_files.
 
-    current_segment_names = set(s.name for s in segments)
+    current_segment_names = set(s.segment_id() for s in segments)
 
-    tocpattern = _toc_pattern(indexname)
-    segpattern = _segment_pattern(indexname)
+    tocpattern = TOC._pattern(indexname)
+    segpattern = TOC._segment_pattern(indexname)
 
     todelete = set()
     for filename in storage:
         try:
             storage.delete_file(filename)
         except OSError:
-            # Another process still has this file open
+            # Another process still has this file open, I guess
             pass
 
 
         self.indexname = indexname
 
         # Try reading the TOC to see if it's possible
-        _read_toc(self.storage, self._schema, self.indexname)
+        TOC.read(self.storage, self.indexname, schema=self._schema)
 
     def __repr__(self):
         return "%s(%r, %r)" % (self.__class__.__name__,
     # remove_field
 
     def latest_generation(self):
-        return _latest_generation(self.storage, self.indexname)
+        return TOC._latest_generation(self.storage, self.indexname)
 
     # refresh
     # up_to_date
 
     def last_modified(self):
         gen = self.latest_generation()
-        filename = _toc_filename(self.indexname, gen)
+        filename = TOC._filename(self.indexname, gen)
         return self.storage.file_modified(filename)
 
     def is_empty(self):
 
     # searcher
 
-    def writer(self, **kwargs):
-        from whoosh.filedb.filewriting import SegmentWriter
-        return SegmentWriter(self, **kwargs)
+    def writer(self, procs=1, **kwargs):
+        if procs > 1:
+            from whoosh.filedb.multiproc2 import MpWriter
+            return MpWriter(self, **kwargs)
+        else:
+            from whoosh.filedb.filewriting import SegmentWriter
+            return SegmentWriter(self, **kwargs)
 
     def lock(self, name):
         """Returns a lock object that you can try to call acquire() on to
         return self.storage.lock(self.indexname + "_" + name)
 
     def _read_toc(self):
-        return _read_toc(self.storage, self._schema, self.indexname)
+        return TOC.read(self.storage, self.indexname, schema=self._schema)
 
     def _segments(self):
         return self._read_toc().segments
 
     @classmethod
     def _reader(self, storage, schema, segments, generation, reuse=None):
+        # Returns a reader for the given segments, possibly reusing already
+        # opened readers
         from whoosh.filedb.filereading import SegmentReader
 
         reusable = {}
             # It removes any readers it reuses from the "reusable" dictionary,
             # so later we can close any readers left in the dictionary.
             def segreader(segment):
-                gen = segment.generation
-                if gen in reusable:
-                    r = reusable[gen]
-                    del reusable[gen]
+                segid = segment.segment_id()
+                if segid in reusable:
+                    r = reusable[segid]
+                    del reusable[segid]
                     return r
                 else:
-                    return SegmentReader(storage, schema, segment)
+                    return SegmentReader(storage, schema, segment,
+                                         generation=generation)
 
             if len(segments) == 1:
                 # This index has one segment, so return a SegmentReader object
                   "termposts": "pst",
                   "vectorindex": "vec",
                   "vectorposts": "vps"}
+    IDCHARS = "0123456789abcdefghijklmnopqrstuvwxyz"
 
-    generation = 0
+    @classmethod
+    def _random_id(cls, size=12):
+        return "".join(random.choice(cls.IDCHARS) for _ in xrange(size))
 
-    def __init__(self, name, generation, doccount, fieldlength_totals,
-                 fieldlength_mins, fieldlength_maxes, deleted=None):
+    def __init__(self, indexname, doccount, segid=None, deleted=None):
         """
         :param name: The name of the segment (the Index object computes this
             from its name and the generation).
         :param doccount: The maximum document number in the segment.
         :param term_count: Total count of all terms in all documents.
-        :param fieldlength_totals: A dictionary mapping field names to the
-            total number of terms in that field across all documents in the
-            segment.
-        :param fieldlength_mins: A dictionary mapping field names to the
-            minimum length of that field across all documents.
-        :param fieldlength_maxes: A dictionary mapping field names to the
-            maximum length of that field across all documents.
         :param deleted: A set of deleted document numbers, or None if no
             deleted documents exist in this segment.
         """
 
-        assert isinstance(name, string_type)
+        assert isinstance(indexname, string_type)
+        self.indexname = indexname
         assert isinstance(doccount, integer_types)
-        assert (fieldlength_totals is None
-                or isinstance(fieldlength_totals, dict))
-        assert fieldlength_maxes is None or isinstance(fieldlength_mins, dict)
-        assert fieldlength_maxes is None or isinstance(fieldlength_maxes, dict)
-
-        self.name = name
-        self.generation = generation
         self.doccount = doccount
-        self.fieldlength_totals = fieldlength_totals
-        self.fieldlength_mins = fieldlength_mins
-        self.fieldlength_maxes = fieldlength_maxes
+        self.segid = self._random_id() if segid is None else segid
         self.deleted = deleted
-        self.uuid = uuid.uuid4()
 
     def __repr__(self):
         return "<%s %r %s>" % (self.__class__.__name__, self.name,
-                               getattr(self, "uuid", ""))
+                               getattr(self, "segid", ""))
 
     def __getattr__(self, name):
         # Capture accesses to e.g. Segment.fieldlengths_filename and return
         # the appropriate filename
-        ext = "_filename"
-        if name.endswith(ext):
-            basename = name[:0 - len(ext)]
-            if basename in self.EXTENSIONS:
-                return self.make_filename(self.EXTENSIONS[basename])
+        part2 = "_filename"
+        if name.endswith(part2):
+            part1 = name[:0 - len(part2)]
+            if part1 in self.EXTENSIONS:
+                return self.make_filename(self.EXTENSIONS[part1])
 
         raise AttributeError(name)
 
-    def copy(self):
-        return Segment(self.name, self.generation, self.doccount,
-                       self.fieldlength_totals, self.fieldlength_mins,
-                       self.fieldlength_maxes, self.deleted)
+    def segment_id(self):
+        if hasattr(self, "name"):
+            # Old segment class
+            return self.name
+        else:
+            return "%s_%s" % (self.indexname, self.segid)
 
     def make_filename(self, ext):
-        return "%s.%s" % (self.name, ext)
-
-    @classmethod
-    def basename(cls, indexname, segment_number):
-        return "_%s_%s" % (indexname, segment_number)
+        return "%s.%s" % (self.segment_id(), ext)
 
     def doc_count_all(self):
         """
             return 0
         return len(self.deleted)
 
-    def field_length(self, fieldname, default=0):
-        """Returns the total number of terms in the given field across all
-        documents in this segment.
-        """
-        return self.fieldlength_totals.get(fieldname, default)
-
-    def min_field_length(self, fieldname, default=0):
-        """Returns the maximum length of the given field in any of the
-        documents in the segment.
-        """
-
-        return self.fieldlength_mins.get(fieldname, default)
-
-    def max_field_length(self, fieldname, default=0):
-        """Returns the maximum length of the given field in any of the
-        documents in the segment.
-        """
-        return self.fieldlength_maxes.get(fieldname, default)
-
     def delete_document(self, docnum, delete=True):
         """Deletes the given document number. The document is not actually
         removed from the index until it is optimized.
             return False
         return docnum in self.deleted
 
-    def __eq__(self, other):
-        return self.__class__ is type(other)
-
-    def __lt__(self, other):
-        return type(other) is self.__class__
-
-    def __ne__(self, other):
-        return not self.__eq__(other)
-
-    def __gt__(self, other):
-        return not (self.__lt__(other) or self.__eq__(other))
-
-    def __le__(self, other):
-        return self.__eq__(other) or self.__lt__(other)
-
-    def __ge__(self, other):
-        return self.__eq__(other) or self.__gt__(other)

File src/whoosh/filedb/filereading.py

 from threading import Lock
 
 from whoosh.compat import iteritems, string_type, integer_types, xrange
+from whoosh.filedb.fileindex import Segment
 from whoosh.filedb.fieldcache import FieldCache, DefaultFieldCachingPolicy
 from whoosh.filedb.filepostings import FilePostingReader
 from whoosh.filedb.filetables import (TermIndexReader, StoredFieldReader,
-                                      LengthReader, TermVectorReader)
+                                      TermVectorReader, Lengths)
 from whoosh.matching import FilterMatcher, ListMatcher
 from whoosh.reading import IndexReader, TermNotFound
 from whoosh.support.dawg import DiskNode
 class SegmentReader(IndexReader):
     GZIP_CACHES = False
 
-    def __init__(self, storage, schema, segment):
+    def __init__(self, storage, schema, segment, generation=None):
         self.storage = storage
         self.schema = schema
         self.segment = segment
+        self._gen = generation
 
-        if hasattr(self.segment, "uuid"):
-            self.uuid_string = str(self.segment.uuid)
+        if hasattr(self.segment, "segment_id"):
+            self.segid = str(self.segment.segment_id())
         else:
-            import uuid
-            self.uuid_string = str(uuid.uuid4())
+            self.segid = Segment.random_id()
 
         # Term index
         tf = storage.open_file(segment.termsindex_filename)
         self.fieldlengths = None
         if self.schema.has_scorable_fields():
             flf = storage.open_file(segment.fieldlengths_filename)
-            self.fieldlengths = LengthReader(flf, segment.doc_count_all())
+            self.fieldlengths = Lengths.from_file(flf, segment.doc_count_all())
 
         # Copy info from underlying segment
         self._has_deletions = segment.has_deletions()
         assert self.dc == self.storedfields.length
 
         self.set_caching_policy()
-
         self.is_closed = False
-        self._sync_lock = Lock()
 
     def has_deletions(self):
         return self._has_deletions
         return self.segment.is_deleted(docnum)
 
     def generation(self):
-        return self.segment.generation
+        return self._gen
 
     def _open_vectors(self):
         if self.vectorindex:
                 yield sf(docnum)
 
     def field_length(self, fieldname):
-        return self.segment.field_length(fieldname)
+        return self.fieldlengths.field_length(fieldname)
 
     def min_field_length(self, fieldname):
-        return self.segment.min_field_length(fieldname)
+        return self.fieldlengths.min_field_length(fieldname)
 
     def max_field_length(self, fieldname):
-        return self.segment.max_field_length(fieldname)
+        return self.fieldlengths.max_field_length(fieldname)
 
     def doc_field_length(self, docnum, fieldname, default=0):
         if self.fieldlengths is None:
                 storage = self.storage
             elif not save:
                 storage = None
-            cp = DefaultFieldCachingPolicy(self.segment.name, storage=storage)
-
+            cp = DefaultFieldCachingPolicy(self.segment.segment_id(),
+                                           storage=storage)
         if type(cp) is type:
             cp = cp()
 
         self.caching_policy = cp
 
     def _fieldkey(self, fieldname):
-        return "%s/%s" % (self.uuid_string, fieldname)
+        return "%s/%s" % (self.segid, fieldname)
 
     def fieldcache(self, fieldname, save=SAVE_BY_DEFAULT):
         """Returns a :class:`whoosh.filedb.fieldcache.FieldCache` object for

File src/whoosh/filedb/filestore.py

             raise IOError("Directory %s does not exist" % path)
 
     def create_index(self, schema, indexname=_DEF_INDEX_NAME):
+        from whoosh.filedb.fileindex import TOC, FileIndex
+
         if self.readonly:
             raise ReadOnlyError
-
-        from whoosh.filedb.fileindex import _create_index, FileIndex
-        _create_index(self, schema, indexname)
+        TOC.create(self, schema, indexname)
         return FileIndex(self, schema, indexname)
 
     def open_index(self, indexname=_DEF_INDEX_NAME, schema=None):
         from whoosh.filedb.fileindex import FileIndex
+
         return FileIndex(self, schema=schema, indexname=indexname)
 
     def create_file(self, name, excl=False, mode="wb", **kwargs):

File src/whoosh/filedb/filetables.py

         return unpack_long(v)[0]
 
 
-class LengthWriter(object):
-    def __init__(self, dbfile, doccount, lengths=None):
-        self.dbfile = dbfile
+class Lengths(object):
+    def __init__(self, doccount=0, lengths=None, totals=None):
         self.doccount = doccount
-        if lengths is not None:
-            self.lengths = lengths
-        else:
-            self.lengths = {}
+        self.lengths = lengths if lengths is not None else {}
+        self.totals = totals if totals is not None else defaultdict(int)
+        self.mins = {}
+        self.maxes = {}
 
-    def add_all(self, items):
-        lengths = self.lengths
-        for docnum, fieldname, byte in items:
-            if byte:
-                if fieldname not in lengths:
-                    zeros = (0 for _ in xrange(self.doccount))
-                    lengths[fieldname] = array("B", zeros)
-                lengths[fieldname][docnum] = byte
-
-    def add(self, docnum, fieldname, byte):
-        lengths = self.lengths
-        if byte:
-            if fieldname not in lengths:
-                zeros = (0 for _ in xrange(self.doccount))
-                lengths[fieldname] = array("B", zeros)
-            lengths[fieldname][docnum] = byte
-
-    def reader(self):
-        return LengthReader(None, self.doccount, lengths=self.lengths)
-
-    def close(self):
-        self.dbfile.write_ushort(len(self.lengths))
-        for fieldname, arry in iteritems(self.lengths):
-            self.dbfile.write_string(fieldname.encode('utf-8'))
-            self.dbfile.write_array(arry)
-        self.dbfile.close()
-
-
-class LengthReader(object):
-    def __init__(self, dbfile, doccount, lengths=None):
-        self.doccount = doccount
-
-        if lengths is not None:
-            self.lengths = lengths
-        else:
-            self.lengths = {}
-            count = dbfile.read_ushort()
-            for _ in xrange(count):
-                fieldname = dbfile.read_string().decode('utf-8')
-                self.lengths[fieldname] = dbfile.read_array("B", self.doccount)
-            dbfile.close()
+    def _create_field(self, fieldname, docnum):
+        dc = max(self.doccount, docnum + 1)
+        self.lengths[fieldname] = array("B", (0 for _ in xrange(dc)))
 
     def __iter__(self):
         for fieldname in self.lengths.keys():
             for docnum, byte in enumerate(self.lengths[fieldname]):
                 yield docnum, fieldname, byte
 
+    @classmethod
+    def from_file(cls, dbfile, doccount):
+        lengths = {}
+        totals = {}
+
+        # Read header byte and version
+        if dbfile.read(1) != "\xFF":
+            # Old format starts directly with number of fields at byte 0
+            version = 0
+            dbfile.seek(0)
+        else:
+            version = dbfile.read_int()
+            dc = dbfile.read_uint()  # Number of documents saved
+            if doccount is None:
+                # This allows you to do Lengths.from_file(dbfile, None) to open
+                # the lengths file without knowing the doccount, which is
+                # sometimes useful for
+                doccount = dc
+            # Sanity check: make sure the doccount that was saved with the file
+            # equals what the caller thinks the doccount is now
+            assert dc == doccount
+
+        # Read number of fields
+        fieldcount = dbfile.read_ushort()
+        # Read each field name and array
+        for _ in xrange(fieldcount):
+            # Fieldname
+            fieldname = dbfile.read_string().decode('utf-8')
+            # Length byte array
+            arry = dbfile.read_array("B", doccount)
+            lengths[fieldname] = arry
+            # Total length
+            if version:
+                totals[fieldname] = dbfile.read_uint()
+            else:
+                # Old format didn't store totals in the length file, fake it
+                # by adding up the byte approximations
+                totals[fieldname] = sum(byte_to_length(b) for b in arry)
+
+        dbfile.close()
+        return cls(doccount, lengths, totals)
+
+    def field_length(self, fieldname):
+        return self.totals.get(fieldname, 0)
+
+    def min_field_length(self, fieldname):
+        if fieldname in self.mins:
+            return self.maxes[fieldname]
+        mn = byte_to_length(min(b for b in self.lengths[fieldname]))
+        self.mins[fieldname] = mn
+        return mn
+
+    def max_field_length(self, fieldname):
+        if fieldname in self.maxes:
+            return self.maxes[fieldname]
+        mx = byte_to_length(max(b for b in self.lengths[fieldname]))
+        self.maxes[fieldname] = mx
+        return mx
+
+    def add_all(self, items):
+        lengths = self.lengths
+        totals = self.totals
+        for docnum, fieldname, length in items:
+            if length:
+                if fieldname not in lengths:
+                    self._create_field(fieldname, docnum)
+                byte = length_to_byte(length)
+                lengths[fieldname][docnum] = byte
+                totals[fieldname] += length
+
+    def add(self, docnum, fieldname, length):
+        lengths = self.lengths
+        if length:
+            if fieldname not in lengths:
+                self._create_field(fieldname, docnum)
+
+            arry = self.lengths[fieldname]
+            count = docnum + 1
+            if len(arry) < count:
+                for _ in xrange(count - len(arry)):
+                    arry.append(0)
+
+            byte = length_to_byte(length)
+            arry[docnum] = byte
+            self.totals[fieldname] += length
+
     def get(self, docnum, fieldname, default=0):
         lengths = self.lengths
         if fieldname not in lengths:
         byte = lengths[fieldname][docnum] or default
         return byte_to_length(byte)
 
+    def field_names(self):
+        return self.lengths.keys()
+
+    def to_file(self, dbfile, doccount):
+        # Pad out arrays to full length
+        for fieldname in self.lengths.keys():
+            arry = self.lengths[fieldname]
+            if len(arry) < doccount:
+                for _ in xrange(doccount - len(arry)):
+                    arry.append(0)
+
+        dbfile.write("\xFF")  # Header byte
+        dbfile.write_int(1)  # Format version number
+        dbfile.write_uint(doccount)  # Number of documents
+        dbfile.write_ushort(len(self.lengths))  # Number of fields
+
+        for fieldname, arry in iteritems(self.lengths):
+            dbfile.write_string(fieldname.encode('utf-8'))  # Fieldname
+            dbfile.write_array(arry)  # Length byte array
+            dbfile.write_uint(self.totals[fieldname])  # Total length
+        dbfile.close()
+
 
 _stored_pointer_struct = Struct("!qI")  # offset, length
 stored_pointer_size = _stored_pointer_struct.size

File src/whoosh/filedb/filewriting.py

 # policies, either expressed or implied, of Matt Chaput.
 
 from __future__ import with_statement
+import os, tempfile
 from bisect import bisect_right
 from collections import defaultdict
 
 except ImportError:
     has_sqlite = False
 
-from whoosh.compat import integer_types, iteritems, text_type
+from whoosh.compat import integer_types, iteritems, text_type, next
 from whoosh.fields import UnknownFieldError
 from whoosh.filedb.fileindex import Segment
 from whoosh.filedb.filepostings import FilePostingWriter
 from whoosh.filedb.filetables import (TermIndexWriter, StoredFieldWriter,
-                                      TermVectorWriter)
-from whoosh.filedb.pools import TempfilePool, DiskSet
+                                      TermVectorWriter, Lengths)
 from whoosh.store import LockError
 from whoosh.support.dawg import DawgBuilder, flatten
 from whoosh.support.filelock import try_for
+from whoosh.support.externalsort import SortingPool
 from whoosh.util import fib
 from whoosh.writing import IndexWriter, IndexingError
 
     return []
 
 
-def MERGE_SQUARES(writer, segments):
-    """This is an alternative merge policy similar to Lucene's. It is less
-    optimal than the default MERGE_SMALL.
-    """
-
-    from whoosh.filedb.filereading import SegmentReader
-
-    sizedsegs = [(s.doc_count_all(), s) for s in segments]
-    tomerge = []
-    for size in (10, 100, 1000, 10000, 100000):
-        smaller = [seg for segsize, seg in sizedsegs
-                   if segsize < size - 1 and segsize >= size // 10]
-        if len(smaller) >= 10:
-            tomerge.extend(smaller)
-            for seg in smaller:
-                segments.remove(seg)
-
-    for seg in tomerge:
-        reader = SegmentReader(writer.storage, writer.schema, seg)
-        writer.add_reader(reader)
-        reader.close()
-
-    return segments
+#def MERGE_SQUARES(writer, segments):
+#    """This is an alternative merge policy similar to Lucene's. It is less
+#    optimal than the default MERGE_SMALL.
+#    """
+#
+#    from whoosh.filedb.filereading import SegmentReader
+#
+#    sizedsegs = [(s.doc_count_all(), s) for s in segments]
+#    tomerge = []
+#    for size in (10, 100, 1000, 10000, 100000):
+#        smaller = [seg for segsize, seg in sizedsegs
+#                   if segsize < size - 1 and segsize >= size // 10]
+#        if len(smaller) >= 10:
+#            tomerge.extend(smaller)
+#            for seg in smaller:
+#                segments.remove(seg)
+#
+#    for seg in tomerge:
+#        reader = SegmentReader(writer.storage, writer.schema, seg)
+#        writer.add_reader(reader)
+#        reader.close()
+#
+#    return segments
 
 
 # Writer object
 
 class SegmentWriter(IndexWriter):
-    def __init__(self, ix, poolclass=None, procs=0, blocklimit=128,
-                 timeout=0.0, delay=0.1, name=None, _lk=True, **poolargs):
-
+    def __init__(self, ix, poolclass=None, blocklimit=128, timeout=0.0,
+                 delay=0.1, _lk=True, poolsize=100000, docbase=0, **kwargs):
+        self.is_closed = False
         self.writelock = None
+        self._added = False
         if _lk:
             self.writelock = ix.lock("WRITELOCK")
             if not try_for(self.writelock.acquire, timeout=timeout,
                            delay=delay):
                 raise LockError
 
+        self.storage = storage = ix.storage
+        self.indexname = ix.indexname
+
         info = ix._read_toc()
         self.schema = info.schema
         self.segments = info.segments
-        self.storage = storage = ix.storage
-        self.indexname = ix.indexname
-        self.is_closed = False
-
-        self.blocklimit = blocklimit
-        self.segment_number = info.segment_counter + 1
-        self.generation = info.generation + 1
-
         self._doc_offsets = []
         base = 0
         for s in self.segments:
             self._doc_offsets.append(base)
             base += s.doc_count_all()
 
-        self.name = name or Segment.basename(self.indexname,
-                                             self.segment_number)
-        self.docnum = 0
-        self.fieldlength_totals = defaultdict(int)
-        self._added = False
-        self._unique_cache = {}
-
-        # Create a temporary segment to use its .*_filename attributes
-        segment = Segment(self.name, self.generation, 0, None, None, None)
+        self.blocklimit = blocklimit
+        self.generation = info.generation + 1
+        self.newsegment = Segment(self.indexname, 0)
+        self.docnum = self.docbase = docbase
 
         # Spelling
-        self.wordsets = {}
         self.dawg = None
         if any(field.spelling for field in self.schema):
-            self.dawgfile = storage.create_file(segment.dawg_filename)
+            self.dawgfile = storage.create_file(self.newsegment.dawg_filename)
             self.dawg = DawgBuilder(field_root=True)
 
         # Terms index
-        tf = storage.create_file(segment.termsindex_filename)
+        tf = storage.create_file(self.newsegment.termsindex_filename)
         ti = TermIndexWriter(tf)
         # Term postings file
-        pf = storage.create_file(segment.termposts_filename)
+        pf = storage.create_file(self.newsegment.termposts_filename)
         pw = FilePostingWriter(pf, blocklimit=blocklimit)
         # Terms writer
         self.termswriter = TermsWriter(self.schema, ti, pw, self.dawg)
 
         if self.schema.has_vectored_fields():
             # Vector index
-            vf = storage.create_file(segment.vectorindex_filename)
+            vf = storage.create_file(self.newsegment.vectorindex_filename)
             self.vectorindex = TermVectorWriter(vf)
 
             # Vector posting file
-            vpf = storage.create_file(segment.vectorposts_filename)
+            vpf = storage.create_file(self.newsegment.vectorposts_filename)
             self.vpostwriter = FilePostingWriter(vpf, stringids=True)
         else:
             self.vectorindex = None
             self.vpostwriter = None
 
         # Stored fields file
-        sf = storage.create_file(segment.storedfields_filename)
+        sf = storage.create_file(self.newsegment.storedfields_filename)
         self.storedfields = StoredFieldWriter(sf, self.schema.stored_names())
 
-        # Field lengths file
-        self.lengthfile = storage.create_file(segment.fieldlengths_filename)
+        # Field lengths
+        self.lengths = Lengths()
 
-        # Create the pool
-        if poolclass is None:
-            if procs > 1:
-                from whoosh.filedb.multiproc import MultiPool
-                poolclass = MultiPool
-            else:
-                poolclass = TempfilePool
-        self.pool = poolclass(self.schema, procs=procs, **poolargs)
+        # Create the posting pool
+        self.pool = SortingPool(maxsize=poolsize, prefix=self.indexname)
 
     def _check_state(self):
         if self.is_closed:
     def add_reader(self, reader):
         self._check_state()
         startdoc = self.docnum
+        lengths = self.lengths
 
         has_deletions = reader.has_deletions()
         if has_deletions:
 
         # Add stored documents, vectors, and field lengths
         for docnum in reader.all_doc_ids():
+            newdoc = self.docnum
             if (not has_deletions) or (not reader.is_deleted(docnum)):
                 d = dict(item for item
                          in iteritems(reader.stored_fields(docnum))
                 self.storedfields.append(d)
 
                 if has_deletions:
-                    docmap[docnum] = self.docnum
+                    docmap[docnum] = newdoc
 
                 for fieldname in reader.schema.scorable_names():
                     length = reader.doc_field_length(docnum, fieldname)
                     if length and fieldname in fieldnames:
-                        self.pool.add_field_length(self.docnum, fieldname,
-                                                   length)
+                        lengths.add(newdoc, fieldname, length)
 
                 for fieldname in reader.schema.vector_names():
                     if (fieldname in fieldnames
                         and reader.has_vector(docnum, fieldname)):
                         vpostreader = reader.vector(docnum, fieldname)
-                        self._add_vector_reader(self.docnum, fieldname,
-                                                vpostreader)
+                        self._add_vector_reader(newdoc, fieldname, vpostreader)
 
                 self.docnum += 1
 
-        # Add dawg contents to word sets for fields that require separate
-        # handling
-        for fieldname in self.schema.separate_spelling_names():
-            if reader.has_word_graph(fieldname):
-                graph = reader.word_graph(fieldname)
-                self.add_spell_words(fieldname, flatten(graph))
-
         # Add postings
         for fieldname, text in reader.all_terms():
             if fieldname in fieldnames:
                     else:
                         newdoc = startdoc + docnum
 
-                    self.pool.add_posting(fieldname, text, newdoc,
-                                          postreader.weight(), valuestring)
+                    self.pool.add((fieldname, text, newdoc,
+                                   postreader.weight(), valuestring))
                     postreader.next()
 
         self._added = True
     def add_document(self, **fields):
         self._check_state()
         schema = self.schema
+        lengths = self.lengths
+        add_post = self.pool.add
         docboost = self._doc_boost(fields)
 
         # Sort the keys
                                         % (name, schema))
 
         storedvalues = {}
-
         docnum = self.docnum
         for fieldname in fieldnames:
             value = fields.get(fieldname)
 
             if field.indexed:
                 fieldboost = self._field_boost(fields, fieldname, docboost)
-                self.pool.add_content(docnum, fieldname, field, value,
-                                      fieldboost)
+                length = 0
+                # TODO: Method for adding progressive field values, ie
+                # setting start_pos/start_char?
+                for w, freq, weight, valuestring in field.index(value):
+                    #assert w != ""
+                    weight *= fieldboost
+                    add_post((fieldname, w, docnum, weight, valuestring))
+                    length += freq
+
+                if field.scorable:
+                    lengths.add(docnum, fieldname, length)
 
             if field.separate_spelling():
-                # This field requires spelling words to be added in a separate
-                # step, instead of as part of indexing
-                self.add_spell_words(fieldname, field.spellable_words(value))
+                # For fields which use different tokens for spelling, insert
+                # fake postings for the spellable words
+                for w in field.spellable_words(value):
+                    add_post((fieldname + " ", w, None, None, None))
 
             vformat = field.vector
             if vformat:
         self.storedfields.append(storedvalues)
         self.docnum += 1
 
-    def add_spell_words(self, fieldname, words):
-        # Get or make a set for the words in this field
-        if fieldname not in self.wordsets:
-            self.wordsets[fieldname] = set()
-        wordset = self.wordsets[fieldname]
-
-        # If the in-memory set is getting big, replace it with an
-        # on-disk set
-        if has_sqlite and isinstance(wordset, set) and len(wordset) > 4096:
-            diskset = DiskSet(wordset)
-            self.wordsets[fieldname] = wordset = diskset
-
-        for word in words:
-            wordset.add(word)
-
-        self._added = True
-
-    def _add_wordsets(self):
-        dawg = self.dawg
-        for fieldname in self.wordsets:
-            ws = self.wordsets[fieldname]
-            ft = (fieldname,)
-
-            words = sorted(ws) if isinstance(ws, set) else iter(ws)
-            for text in words:
-                dawg.insert(ft + tuple(text))
-
-            if isinstance(ws, DiskSet):
-                ws.destroy()
-
     def _add_vector(self, docnum, fieldname, vlist):
         vpostwriter = self.vpostwriter
         offset = vpostwriter.start(self.schema[fieldname].vector)
         if self.vpostwriter:
             self.vpostwriter.close()
 
-    def _getsegment(self):
-        return Segment(self.name, self.generation, self.docnum,
-                       self.pool.fieldlength_totals(),
-                       self.pool.fieldlength_mins(),
-                       self.pool.fieldlength_maxes())
+    def doc_count(self):
+        return self.docnum - self.docbase
 
     def commit(self, mergetype=None, optimize=False, merge=True):
         """Finishes writing and saves all additions and changes to disk.
         :param merge: if False, do not merge small segments.
         """
 
+        from whoosh.filedb.fileindex import TOC, _clean_files
+
         self._check_state()
         try:
             if mergetype:
 
             # Call the merge policy function. The policy may choose to merge
             # other segments into this writer's pool
-            new_segments = mergetype(self, self.segments)
+            finalsegments = mergetype(self, self.segments)
 
             if self._added:
                 # Create a Segment object for the segment created by this
                 # writer
-                thissegment = self._getsegment()
+                newsegment = self.newsegment
+                newsegment.doccount = self.doc_count()
 
-                # Tell the pool we're finished adding information, it should
-                # add its accumulated data to the lengths, terms index, and
-                # posting files.
-                self.pool.finish(self.termswriter, self.docnum,
-                                 self.lengthfile)
-
+                # Copy the sorted pool postings to the terms index and posting
+                # files
+                self.termswriter.add_iter(self.pool.items(), self.lengths)
+                # Write out lengths file
+                lf = self.storage.create_file(newsegment.fieldlengths_filename)
+                self.lengths.to_file(lf, newsegment.doccount)
                 # Write out spelling files
                 if self.dawg:
-                    # Insert any wordsets we've accumulated into the word graph
-                    self._add_wordsets()
-                    # Write out the word graph
                     self.dawg.write(self.dawgfile)
 
                 # Add new segment to the list of remaining segments returned by
                 # the merge policy function
-                new_segments.append(thissegment)
+                finalsegments.append(newsegment)
             else:
                 self.pool.cleanup()
 
             # release the lock.
             self._close_all()
 
-            from whoosh.filedb.fileindex import _write_toc, _clean_files
-
-            _write_toc(self.storage, self.schema, self.indexname,
-                       self.generation, self.segment_number, new_segments)
+            # Write the TOC for the new generation
+            toc = TOC(self.schema, finalsegments, self.generation)
+            toc.write(self.storage, self.indexname)
 
             # Delete leftover files
             _clean_files(self.storage, self.indexname, self.generation,
-                         new_segments)
+                         finalsegments)
 
         finally:
             if self.writelock:
     def cancel(self):
         self._check_state()
         try:
-            self.pool.cancel()
+            self.pool.cleanup()
             self._close_all()
         finally:
             if self.writelock:
         assert isinstance(inlinelimit, integer_types)
         self.inlinelimit = inlinelimit
 
-        self.spelling = False
         self.lastfn = None
         self.lasttext = None
-        self.format = None
-        self.offset = None
 
-    def _new_term(self, fieldname, text):
-        # This method tests whether a new field/term has started in the stream
-        # of incoming postings, and if so performs appropriate work
+    def _finish_term(self, fn, text):
+        if fn is not None and fn[-1] != " " and text is not None:
+            terminfo = self.postwriter.finish(self.inlinelimit)
+            self.termsindex.add((fn, text), terminfo)
 
-        lastfn = self.lastfn or ''
-        lasttext = self.lasttext or ''
+    def add_iter(self, postiter, lengths):
+        dawg = self.dawg
+        postwriter = self.postwriter
+        lastfn = self.lastfn
+        lasttext = self.lasttext
+        spelling = False
+        fmt = None
 
-        if fieldname < lastfn or (fieldname == lastfn and text < lasttext):
-            raise Exception("Postings are out of order: %r:%s .. %r:%s" %
-                            (lastfn, lasttext, fieldname, text))
+        for fieldname, text, docnum, weight, valuestring in postiter:
+            if fieldname < lastfn or (fieldname == lastfn and text < lasttext):
+                raise Exception("Postings are out of order: %r:%s .. %r:%s" %
+                                (lastfn, lasttext, fieldname, text))
 
-        # Is the fieldname of this posting different from the last one?
-        if fieldname != lastfn:
-            # Store information we need about the new field
-            field = self.schema[fieldname]
-            self.format = field.format
-            self.spelling = field.spelling and not field.separate_spelling()
+            if (fieldname[-1] == " "
+                and (fieldname != lastfn or text != lasttext)):
+                # Spelling word placeholder
+                if fieldname != lastfn and not lastfn[-1] == " ":
+                    self._finish_term(lastfn, lasttext)
+                fn = fieldname[:-1]
+                dawg.insert((fn,) + tuple(text))
+            else:
+                # Is the fieldname of this posting different from the last one?
+                if fieldname != lastfn:
+                    # Store information we need about the new field
+                    field = self.schema[fieldname]
+                    fmt = field.format
+                    spelling = (field.spelling
+                                and not field.separate_spelling())
 
-        # Is the term of this posting different from the last one?
-        if fieldname != lastfn or text != lasttext:
-            # Finish up the last term before starting a new one
-            self._finish_term()
+                # Is the term of this posting different from the last one?
+                if fieldname != lastfn or text != lasttext:
+                    # Finish up the last term before starting a new one
+                    self._finish_term(lastfn, lasttext)
 
-            # If this field has spelling, add the term to the word graph
-            if self.spelling:
-                self.dawg.insert((fieldname,) + tuple(text))
+                    # If this field has spelling, add to the word graph
+                    if spelling:
+                        dawg.insert((fieldname,) + tuple(text))
 
-            # Set up internal state for the new term
-            self.offset = self.postwriter.start(self.format)
-            self.lasttext = text
-            self.lastfn = fieldname
+                    # Set up postwriter for a new term
+                    postwriter.start(fmt)
 
-    def _finish_term(self):
-        postwriter = self.postwriter
-        if self.lasttext is not None:
-            terminfo = postwriter.finish(self.inlinelimit)
-            self.termsindex.add((self.lastfn, self.lasttext), terminfo)
+                length = lengths.get(docnum, fieldname)
+                postwriter.write(docnum, weight, valuestring, length)
 
-    def add_postings(self, fieldname, text, matcher, getlen, offset=0,
-                     docmap=None):
-        self._new_term(fieldname, text)
-        postwrite = self.postwriter.write
-        while matcher.is_active():
-            docnum = matcher.id()
-            weight = matcher.weight()
-            valuestring = matcher.value()
-            if docmap:
-                newdoc = docmap[docnum]
-            else:
-                newdoc = offset + docnum
-            postwrite(newdoc, weight, valuestring, getlen(docnum, fieldname))
-            matcher.next()
+            lastfn = fieldname
+            lasttext = text
 
-    def add_iter(self, postiter, getlen, offset=0, docmap=None):
-        _new_term = self._new_term
-        postwrite = self.postwriter.write
-        for fieldname, text, docnum, weight, valuestring in postiter:
-            _new_term(fieldname, text)
-            if docmap:
-                newdoc = docmap[docnum]
-            else:
-                newdoc = offset + docnum
-            postwrite(newdoc, weight, valuestring, getlen(docnum, fieldname))
-
-    def add(self, fieldname, text, docnum, weight, valuestring, fieldlen):
-        self._new_term(fieldname, text)
-        self.postwriter.write(docnum, weight, valuestring, fieldlen)
+        self.lastfn = lastfn
+        self.lasttext = lasttext
 
     def close(self):
-        self._finish_term()
+        self._finish_term(self.lastfn, self.lasttext)
         self.termsindex.close()
         self.postwriter.close()
 
 
     if commit:
         writer.commit(merge=False)
+
+
+
+

File src/whoosh/filedb/multiproc.py

 from multiprocessing import Process, Queue, cpu_count
 
 from whoosh.compat import dump, load, xrange, iteritems
-from whoosh.filedb.filetables import LengthWriter, LengthReader
+from whoosh.filedb.filetables import Lengths
 from whoosh.filedb.fileindex import Segment
 from whoosh.filedb.filewriting import SegmentWriter
 from whoosh.filedb.pools import (imerge, read_run, PoolBase, TempfilePool)
 from whoosh.writing import IndexWriter
 
 
-# Multiprocessing writer
-
-class SegmentWritingTask(Process):
-    def __init__(self, storage, indexname, segname, kwargs, jobqueue,
-                 resultqueue, firstjob=None):
-        Process.__init__(self)
-        self.storage = storage
-        self.indexname = indexname
-        self.segname = segname
-        self.kwargs = kwargs
-        self.jobqueue = jobqueue
-        self.resultqueue = resultqueue
-        self.firstjob = firstjob
-
-        self.segment = None
-        self.running = True
-
-    def _add_file(self, args):
-        writer = self.writer
-        filename, length = args
-        f = open(filename, "rb")
-        for _ in xrange(length):
-            writer.add_document(**load(f))
-        f.close()
-        os.remove(filename)
-
-    def run(self):
-        jobqueue = self.jobqueue
-        ix = self.storage.open_index(self.indexname)
-        writer = self.writer = SegmentWriter(ix, _lk=False, name=self.segname,
-                                             **self.kwargs)
-
-        if self.firstjob:
-            self._add_file(self.firstjob)
-
-        while self.running:
-            args = jobqueue.get()
-            if args is None:
-                break
-            self._add_file(args)
-
-        if not self.running:
-            writer.cancel()
-        else:
-            writer.pool.finish(writer.termswriter, writer.docnum,
-                               writer.lengthfile)
-            writer._close_all()
-            self.resultqueue.put(writer._getsegment())
-
-    def cancel(self):
-        self.running = False
-
-
-class MultiSegmentWriter(IndexWriter):
-    def __init__(self, ix, procs=None, batchsize=100, dir=None, **kwargs):
-        self.index = ix
-        self.procs = procs or cpu_count()
-        self.bufferlimit = batchsize
-        self.dir = dir
-        self.kwargs = kwargs
-        self.kwargs["dir"] = dir
-
-        self.segnames = []
-        self.tasks = []
-        self.jobqueue = Queue(self.procs * 4)
-        self.resultqueue = Queue()
-        self.docbuffer = []
-
-        self.writelock = ix.lock("WRITELOCK")
-        self.writelock.acquire()
-
-        info = ix._read_toc()
-        self.schema = info.schema
-        self.segment_number = info.segment_counter
-        self.generation = info.generation + 1
-        self.segments = info.segments
-        self.storage = ix.storage
-
-    def _new_task(self, firstjob):
-        ix = self.index
-        self.segment_number += 1
-        segmentname = Segment.basename(ix.indexname, self.segment_number)
-        task = SegmentWritingTask(ix.storage, ix.indexname, segmentname,
-                                  self.kwargs, self.jobqueue,
-                                  self.resultqueue, firstjob)
-        self.tasks.append(task)
-        task.start()
-        return task
-
-    def _enqueue(self):
-        doclist = self.docbuffer
-        fd, filename = tempfile.mkstemp(".doclist", dir=self.dir)
-        f = os.fdopen(fd, "wb")
-        for doc in doclist:
-            dump(doc, f, -1)
-        f.close()
-        args = (filename, len(doclist))
-
-        if len(self.tasks) < self.procs:
-            self._new_task(args)
-        else:
-            self.jobqueue.put(args)
-
-        self.docbuffer = []
-
-    def cancel(self):
-        try:
-            for task in self.tasks:
-                task.cancel()
-        finally:
-            self.writelock.release()
-
-    def add_document(self, **fields):
-        self.docbuffer.append(fields)
-        if len(self.docbuffer) >= self.bufferlimit:
-            self._enqueue()
-
-    def commit(self, **kwargs):
-        try:
-            # index the remaining stuff in self.docbuffer
-            self._enqueue()
-
-            for task in self.tasks:
-                self.jobqueue.put(None)
-
-            for task in self.tasks:
-                task.join()
-
-            for task in self.tasks:
-                taskseg = self.resultqueue.get()
-                assert isinstance(taskseg, Segment), type(taskseg)
-                self.segments.append(taskseg)
-
-            self.jobqueue.close()
-            self.resultqueue.close()
-
-            from whoosh.filedb.fileindex import _write_toc, _clean_files
-            _write_toc(self.storage, self.schema, self.index.indexname,
-                       self.generation, self.segment_number, self.segments)
-
-            # Delete leftover files
-            _clean_files(self.storage, self.index.indexname,
-                         self.generation, self.segments)
-        finally:
-            self.writelock.release()
-
-
-# Multiprocessing pool
-
-class PoolWritingTask(Process):
-    def __init__(self, schema, dir, jobqueue, resultqueue, limitmb,
-                 firstjob=None):
-        Process.__init__(self)
-        self.schema = schema
-        self.dir = dir
-        self.jobqueue = jobqueue
-        self.resultqueue = resultqueue
-        self.limitmb = limitmb
-        self.firstjob = firstjob
-
-    def _add_file(self, filename, length):
-        subpool = self.subpool
-        f = open(filename, "rb")
-        for _ in xrange(length):
-            code, args = load(f)
-            if code == 0:
-                subpool.add_content(*args)
-            elif code == 1:
-                subpool.add_posting(*args)
-            elif code == 2:
-                subpool.add_field_length(*args)
-        f.close()
-        os.remove(filename)
-
-    def run(self):
-        jobqueue = self.jobqueue
-        rqueue = self.resultqueue
-        subpool = self.subpool = TempfilePool(self.schema,
-                                              limitmb=self.limitmb,
-                                              dir=self.dir)
-
-        if self.firstjob:
-            self._add_file(*self.firstjob)
-
-        while True:
-            arg1, arg2 = jobqueue.get()
-            if arg1 is None:
-                doccount = arg2
-                break
-            else:
-                self._add_file(arg1, arg2)
-
-        lenfd, lenfilename = tempfile.mkstemp(".lengths", dir=subpool.dir)
-        lenf = os.fdopen(lenfd, "wb")
-        subpool._write_lengths(StructFile(lenf), doccount)
-        subpool.dump_run()
-        rqueue.put((subpool.runs, subpool.fieldlength_totals(),
-                    subpool.fieldlength_mins(), subpool.fieldlength_maxes(),
-                    lenfilename))
-
-
-class MultiPool(PoolBase):
-    def __init__(self, schema, dir=None, procs=2, limitmb=32, batchsize=100,
-                 **kw):
-        PoolBase.__init__(self, schema, dir=dir)
-        self._make_dir()
-
-        self.procs = procs
-        self.limitmb = limitmb
-        self.jobqueue = Queue(self.procs * 4)
-        self.resultqueue = Queue()
-        self.tasks = []
-        self.buffer = []
-        self.bufferlimit = batchsize
-
-    def _new_task(self, firstjob):
-        task = PoolWritingTask(self.schema, self.dir, self.jobqueue,
-                               self.resultqueue, self.limitmb,
-                               firstjob=firstjob)
-        self.tasks.append(task)
-        task.start()
-        return task
-
-    def _enqueue(self):
-        commandlist = self.buffer
-        fd, filename = tempfile.mkstemp(".commands", dir=self.dir)
-        f = os.fdopen(fd, "wb")
-        for command in commandlist:
-            dump(command, f, -1)
-        f.close()
-        args = (filename, len(commandlist))
-
-        if len(self.tasks) < self.procs:
-            self._new_task(args)
-        else:
-            self.jobqueue.put(args)
-
-        self.buffer = []
-
-    def _append(self, item):
-        self.buffer.append(item)
-        if len(self.buffer) > self.bufferlimit:
-            self._enqueue()
-
-    def add_content(self, *args):
-        self._append((0, args))
-
-    def add_posting(self, *args):
-        self.postingqueue.put((1, args))
-
-    def add_field_length(self, *args):
-        self.postingqueue.put((2, args))
-
-    def cancel(self):
-        for task in self.tasks:
-            task.terminate()
-        self.cleanup()
-
-    def cleanup(self):
-        self._clean_temp_dir()
-
-    def finish(self, termswriter, doccount, lengthfile):
-        if self.buffer:
-            self._enqueue()
-
-        _fieldlength_totals = self._fieldlength_totals
-        if not self.tasks:
-            return
-
-        jobqueue = self.jobqueue
-        rqueue = self.resultqueue
-
-        for task in self.tasks:
-            jobqueue.put((None, doccount))
-
-        for task in self.tasks:
-            task.join()
-
-        runs = []
-        lenfilenames = []
-        for task in self.tasks:
-            truns, flentotals, flenmins, flenmaxes, lenfilename = rqueue.get()
-            runs.extend(truns)
-            lenfilenames.append(lenfilename)
-            for fieldname, total in iteritems(flentotals):
-                _fieldlength_totals[fieldname] += total
-
-            for fieldname, length in iteritems(flenmins):
-                if length < self._fieldlength_maxes.get(fieldname, 9999999999):
-                    self._fieldlength_mins[fieldname] = length
-
-            for fieldname, length in flenmaxes.iteritems():
-                if length > self._fieldlength_maxes.get(fieldname, 0):
-                    self._fieldlength_maxes[fieldname] = length
-
-        jobqueue.close()
-        rqueue.close()
-
-        lw = LengthWriter(lengthfile, doccount)
-        for lenfilename in lenfilenames:
-            sublengths = LengthReader(StructFile(open(lenfilename, "rb")),
-                                      doccount)
-            lw.add_all(sublengths)
-            os.remove(lenfilename)
-        lw.close()
-        lengths = lw.reader()
-
-#        if len(runs) >= self.procs * 2:
-#            pool = Pool(self.procs)
-#            tempname = lambda: tempfile.mktemp(suffix=".run", dir=self.dir)
-#            while len(runs) >= self.procs * 2:
-#                runs2 = [(runs[i:i+4], tempname())
-#                         for i in xrange(0, len(runs), 4)]
-#                if len(runs) % 4:
-#                    last = runs2.pop()[0]
-#                    runs2[-1][0].extend(last)
-#                runs = pool.map(merge_runs, runs2)
-#            pool.close()
-
-        iterator = imerge([read_run(rname, count) for rname, count in runs])
-        total = sum(count for runname, count in runs)
-        termswriter.add_iter(iterator, lengths.get)
-        for runname, count in runs:
-            os.remove(runname)
-
-        self.cleanup()
+## Multiprocessing writer
+#
+#class SegmentWritingTask(Process):
+#    def __init__(self, storage, indexname, segname, kwargs, jobqueue,
+#                 resultqueue, firstjob=None):
+#        Process.__init__(self)
+#        self.storage = storage
+#        self.indexname = indexname
+#        self.segname = segname
+#        self.kwargs = kwargs
+#        self.jobqueue = jobqueue
+#        self.resultqueue = resultqueue
+#        self.firstjob = firstjob
+#
+#        self.segment = None
+#        self.running = True
+#
+#    def _add_file(self, args):
+#        writer = self.writer
+#        filename, length = args
+#        f = open(filename, "rb")
+#        for _ in xrange(length):
+#            writer.add_document(**load(f))
+#        f.close()
+#        os.remove(filename)
+#
+#    def run(self):
+#        jobqueue = self.jobqueue
+#        ix = self.storage.open_index(self.indexname)
+#        writer = self.writer = SegmentWriter(ix, _lk=False, name=self.segname,
+#                                             **self.kwargs)
+#
+#        if self.firstjob:
+#            self._add_file(self.firstjob)
+#
+#        while self.running:
+#            args = jobqueue.get()
+#            if args is None:
+#                break
+#            self._add_file(args)
+#
+#        if not self.running:
+#            writer.cancel()
+#        else:
+#            writer.pool.finish(writer.termswriter, writer.docnum,
+#                               writer.lengthfile)
+#            writer._close_all()
+#            self.resultqueue.put(writer._getsegment())
+#
+#    def cancel(self):
+#        self.running = False
+#
+#
+#class MultiSegmentWriter(IndexWriter):
+#    def __init__(self, ix, procs=None, batchsize=100, dir=None, **kwargs):
+#        self.index = ix
+#        self.procs = procs or cpu_count()
+#        self.bufferlimit = batchsize
+#        self.dir = dir
+#        self.kwargs = kwargs
+#        self.kwargs["dir"] = dir
+#
+#        self.segnames = []
+#        self.tasks = []
+#        self.jobqueue = Queue(self.procs * 4)
+#        self.resultqueue = Queue()
+#        self.docbuffer = []
+#
+#        self.writelock = ix.lock("WRITELOCK")
+#        self.writelock.acquire()
+#
+#        info = ix._read_toc()
+#        self.schema = info.schema
+#        self.segment_number = info.segment_counter
+#        self.generation = info.generation + 1
+#        self.segments = info.segments
+#        self.storage = ix.storage
+#
+#    def _new_task(self, firstjob):
+#        ix = self.index
+#        self.segment_number += 1
+#        segmentname = Segment.basename(ix.indexname, self.segment_number)
+#        task = SegmentWritingTask(ix.storage, ix.indexname, segmentname,
+#                                  self.kwargs, self.jobqueue,
+#                                  self.resultqueue, firstjob)
+#        self.tasks.append(task)
+#        task.start()
+#        return task
+#
+#    def _enqueue(self):
+#        doclist = self.docbuffer
+#        fd, filename = tempfile.mkstemp(".doclist", dir=self.dir)
+#        f = os.fdopen(fd, "wb")
+#        for doc in doclist:
+#            dump(doc, f, -1)
+#        f.close()
+#        args = (filename, len(doclist))
+#
+#        if len(self.tasks) < self.procs:
+#            self._new_task(args)
+#        else:
+#            self.jobqueue.put(args)
+#
+#        self.docbuffer = []
+#
+#    def cancel(self):
+#        try:
+#            for task in self.tasks:
+#                task.cancel()
+#        finally:
+#            self.writelock.release()
+#
+#    def add_document(self, **fields):
+#        self.docbuffer.append(fields)
+#        if len(self.docbuffer) >= self.bufferlimit:
+#            self._enqueue()
+#
+#    def commit(self, **kwargs):
+#        try:
+#            # index the remaining stuff in self.docbuffer
+#            self._enqueue()
+#
+#            for task in self.tasks:
+#                self.jobqueue.put(None)
+#
+#            for task in self.tasks:
+#                task.join()
+#
+#            for task in self.tasks:
+#                taskseg = self.resultqueue.get()
+#                assert isinstance(taskseg, Segment), type(taskseg)
+#                self.segments.append(taskseg)
+#
+#            self.jobqueue.close()
+#            self.resultqueue.close()
+#
+#            from whoosh.filedb.fileindex import _write_toc, _clean_files
+#            _write_toc(self.storage, self.schema, self.index.indexname,
+#                       self.generation, self.segment_number, self.segments)
+#
+#            # Delete leftover files
+#            _clean_files(self.storage, self.index.indexname,
+#                         self.generation, self.segments)
+#        finally:
+#            self.writelock.release()
+#
+#
+## Multiprocessing pool
+#
+#class PoolWritingTask(Process):
+#    def __init__(self, schema, dir, jobqueue, resultqueue, limitmb,
+#                 firstjob=None):
+#        Process.__init__(self)
+#        self.schema = schema
+#        self.dir = dir
+#        self.jobqueue = jobqueue
+#        self.resultqueue = resultqueue
+#        self.limitmb = limitmb
+#        self.firstjob = firstjob
+#
+#    def _add_file(self, filename, length):
+#        subpool = self.subpool
+#        f = open(filename, "rb")
+#        for _ in xrange(length):
+#            code, args = load(f)
+#            if code == 0:
+#                subpool.add_content(*args)
+#            elif code == 1:
+#                subpool.add_posting(*args)
+#            elif code == 2:
+#                subpool.add_field_length(*args)
+#        f.close()
+#        os.remove(filename)
+#
+#    def run(self):
+#        jobqueue = self.jobqueue
+#        rqueue = self.resultqueue
+#        subpool = self.subpool = TempfilePool(self.schema,
+#                                              limitmb=self.limitmb,
+#                                              dir=self.dir)
+#
+#        if self.firstjob:
+#            self._add_file(*self.firstjob)
+#
+#        while True:
+#            arg1, arg2 = jobqueue.get()
+#            if arg1 is None:
+#                doccount = arg2
+#                break
+#            else:
+#                self._add_file(arg1, arg2)
+#
+#        lenfd, lenfilename = tempfile.mkstemp(".lengths", dir=subpool.dir)
+#        lenf = os.fdopen(lenfd, "wb")
+#        subpool._write_lengths(StructFile(lenf), doccount)
+#        subpool.dump_run()
+#        rqueue.put((subpool.runs, subpool.fieldlength_totals(),
+#                    subpool.fieldlength_mins(), subpool.fieldlength_maxes(),
+#                    lenfilename))
+#
+#
+#class MultiPool(PoolBase):
+#    def __init__(self, schema, dir=None, procs=2, limitmb=32, batchsize=100,
+#                 **kw):
+#        PoolBase.__init__(self, schema, dir=dir)
+#        self._make_dir()
+#
+#        self.procs = procs
+#        self.limitmb = limitmb
+#        self.jobqueue = Queue(self.procs * 4)
+#        self.resultqueue = Queue()
+#        self.tasks = []
+#        self.buffer = []
+#        self.bufferlimit = batchsize
+#
+#    def _new_task(self, firstjob):
+#        task = PoolWritingTask(self.schema, self.dir, self.jobqueue,
+#                               self.resultqueue, self.limitmb,
+#                               firstjob=firstjob)
+#        self.tasks.append(task)
+#        task.start()
+#        return task
+#
+#    def _enqueue(self):
+#        commandlist = self.buffer
+#        fd, filename = tempfile.mkstemp(".commands", dir=self.dir)
+#        f = os.fdopen(fd, "wb")
+#        for command in commandlist:
+#            dump(command, f, -1)
+#        f.close()
+#        args = (filename, len(commandlist))
+#
+#        if len(self.tasks) < self.procs:
+#            self._new_task(args)
+#        else:
+#            self.jobqueue.put(args)
+#
+#        self.buffer = []
+#
+#    def _append(self, item):
+#        self.buffer.append(item)
+#        if len(self.buffer) > self.bufferlimit:
+#            self._enqueue()
+#
+#    def add_content(self, *args):
+#        self._append((0, args))
+#
+#    def add_posting(self, *args):
+#        self.postingqueue.put((1, args))
+#
+#    def add_field_length(self, *args):
+#        self.postingqueue.put((2, args))
+#
+#    def cancel(self):
+#        for task in self.tasks:
+#            task.terminate()
+#        self.cleanup()
+#
+#    def cleanup(self):
+#        self._clean_temp_dir()
+#
+#    def finish(self, termswriter, doccount, lengthfile):
+#        if self.buffer:
+#            self._enqueue()
+#
+#        _fieldlength_totals = self._fieldlength_totals
+#        if not self.tasks:
+#            return
+#
+#        jobqueue = self.jobqueue
+#        rqueue = self.resultqueue
+#
+#        for task in self.tasks:
+#            jobqueue.put((None, doccount))
+#
+#        for task in self.tasks:
+#            task.join()