Commits

Priit Laes committed a6cc86d Merge

Merged with master

Comments (0)

Files changed (67)

docs/source/api/query.rst

 
 .. automodule:: whoosh.query
 
-See also :mod:`whoosh.qparser` which contains code for parsing user queries into query objects.
+See also :mod:`whoosh.qparser` which contains code for parsing user queries
+into query objects.
 
 Base classes
 ============
 
-The following abstract base classes are subclassed to create the the "real" query operations.
+The following abstract base classes are subclassed to create the the "real"
+query operations.
 
 .. autoclass:: Query
     :members:
 .. autoclass:: TermRange
 .. autoclass:: NumericRange
 .. autoclass:: DateRange
+.. autoclass:: Nested
 .. autoclass:: Every
 .. autoclass:: NullQuery
 

docs/source/batch.rst

 *maximum* memory (in megabytes) the writer will use for the indexing pool. The
 higher the number, the faster indexing will be.
 
-The default value of ``32`` is actually pretty low, considering many people
+The default value of ``128`` is actually somewhat low, considering many people
 have multiple gigabytes of RAM these days. Setting it higher can speed up
 indexing considerably::
 
 =======================
 
 The ``procs`` parameter to :meth:`whoosh.index.Index.writer` controls the
-number of processors the writer will use for the indexing pool (via the
+number of processors the writer will use for indexing (via the
 ``multiprocessing`` module)::
 
     from whoosh import index
 amount of memory used by *each process*, so the actual memory used will be
 ``limitmb * procs``::
 
-    # Each process will use a limit of 128 MB, for a total of 512 MB
+    # Each process will use a limit of 128, for a total of 512
     writer = ix.writer(procs=4, limitmb=128)
 
 
-MultiSegmentWriter
-==================
+The ``multisegment`` parameter
+==============================
 
-The ``procs`` parameter causes the default ``FileWriter`` to use multiple
-processors to build the pool, but then still uses a single process to merge
-the pool into a segment.
+The ``procs`` parameter causes the default writer to use multiple processors to
+do much of the indexing, but then still uses a single process to merge the pool
+of each sub-writer into a single segment.
 
-You can get much better indexing speed using the MultiSegmentWriter, which
-instead of a building the pool in parallel uses entirely separate parallel
-writers. The drawback is that instead of creating a single new segment,
-``MultiSegmentWriter`` creates a number of new segments equal to the number of
-processes you you use. For example, if you use ``procs=4``, the writer will
-create four new segments.
-
-So, while ``MultiSegmentWriter`` is much faster than a normal writer, you should
-only use it for large batch indexing jobs (and perhaps only for indexing from
-scratch). It should not be the only method you use for indexing, because
-otherwise the number of segments will increase forever!
-
-To use a ``MultiSegmentWriter``, construct it directly, with your Index as the
-first argument::
+You can get much better indexing speed by also using the ``multisegment=True``
+keyword argument, which instead of merging the results of each sub-writer,
+simply has them each just write out a new segment::
 
     from whoosh import index
-    from whoosh.filedb.multiproc import MultiSegmentWriter
-    
+
     ix = index.open_dir("indexdir")
-    writer = MultiSegmentWriter(ix, procs=4, limitmb=128)
+    writer = ix.writer(procs=4, multisegment=True)
 
+The drawback is that instead
+of creating a single new segment, this option creates a number of new segments
+**at least** equal to the number of processes you use.
 
-Benchmarks
-==========
+For example, if you use ``procs=4``, the writer will create four new segments.
+(If you merge old segments or call ``add_reader`` on the parent writer, the
+parent writer will also write a segment, meaning you'll get five new segments.)
 
-As a single data point purely to illustrate the possible relative differences
-between single processing, a multiprocessing pool, and ``MultiSegmentWriter``,
-here are the indexing times for the ``benchmarks/enron.py``, indexing over 1 GB
-of text in over 500 000 email messages, using the three different methods on a
-Windows machine with ``limitmb=128``::
+So, while ``multisegment=True`` is much faster than a normal writer, you should
+only use it for large batch indexing jobs (or perhaps only for indexing from
+scratch). It should not be the only method you use for indexing, because
+otherwise the number of segments will tend to increase forever!
 
-    Default Writer     procs=1 : 49m
-    Default Writer     procs=4 : 32m
-    MultiSegmentWriter procs=4 : 13m
 
 
 
 
+

docs/source/recipes.rst

 
 As an alternative, you might display the *estimated* total hits::
 
+    found = results.scored_length()
     if results.has_exact_length():
         print("Scored", found, "of exactly", len(results), "documents")
     else:

docs/source/releases/2_0.rst

 Whoosh 2.x release notes
 ========================
 
+Whoosh 2.4
+==========
+
+* By default, Whoosh now assembles the individual files of a segment into a
+  single file when committing. This has a small performance penalty but solves
+  a problem where Whoosh can keep too many files open. Whoosh is also now
+  smarter about using mmap.
+
+* Added functionality to index and search hierarchical documents. See
+  :doc:`/nested`.
+
+* Rewrote the Directed Acyclic Word Graph implementation (used in spell
+  checking) to be faster and more space-efficient. Word graph files created by
+  previous versions will be ignored, meaning that spell checking may becomes
+  slower unless/until you replace the old segments (for example, by
+  optimizing).
+
+* Rewrote multiprocessing indexing to be faster and simpler. You can now
+  do ``myindex.writer(procs=n)`` to get a multiprocessing writer, or
+  ``myindex.writer(procs=n, multisegment=True)`` to get a multiprocessing
+  writer that leaves behind multiple segments, like the old MultiSegmentWriter.
+  (``MultiSegmentWriter`` is still available as a function that returns the
+  new class.)
+
+* When creating ``Term`` query objects for special fields (e.g. NUMERIC or
+  BOOLEAN), you can now use the field's literal type instead of a string as the
+  second argument, for example ``Term("num", 20)`` or ``Term("bool", True)``.
+  (This change may cause problems interacting with functions that expect
+  query objects to be pure textual, such as spell checking.)
+
+* All writing to and reading from on-disk indexes is now done through "codec"
+  objects. This architecture should make it easier to add optional or
+  experimental features, and maintain backwards compatibility.
+
+* Fixes issues #137, #206, #213, #215, #219, #223, #226, #230, #233, #238
+  and other bugs.
+
+
 Whoosh 2.3.2
 ============
 

docs/source/searching.rst

 documents. You can use it to access the stored fields of each hit document, to
 display to the user.
 
->>> # How many documents matched?
->>> len(results)
-27
->>> # How many scored and sorted documents in this Results object?
->>> # This will be less than len() if you used the limit keyword argument.
->>> results.scored_length()
-10
 >>> # Show the best hit's stored fields
 >>> results[0]
 {"title": u"Hello World in Python", "path": u"/a/b/c"}
 >>> results[0:2]
 [{"title": u"Hello World in Python", "path": u"/a/b/c"}, {"title": u"Foo", "path": u"/bar"}]
 
+By default, ``Searcher.search(myquery)`` limits the number of hits to 20, So
+the number of scored hits in the ``Results`` object may be less than the number
+of matching documents in the index.
+
+>>> # How many documents in the entire index would have matched?
+>>> len(results)
+27
+>>> # How many scored and sorted documents in this Results object?
+>>> # This will often be less than len() if the number of hits was limited
+>>> # (the default).
+>>> results.scored_length()
+10
+
+Calling ``len(Results)`` runs a fast (unscored) version of the query again to
+figure out the total number of matching documents. This is usually very fast
+but for large indexes it can cause a noticeable delay. If you want to avoid
+this dalay on very large indexes, you can use the
+:meth:`~whoosh.searching.Results.has_exact_length`,
+:meth:`~whoosh.searching.Results.estimated_length`,
+and :meth:`~whoosh.searching.Results.estimated_min_length` methods to estimate
+the number of matching documents without calling ``len()``::
+
+    found = results.scored_length()
+    if results.has_exact_length():
+        print("Scored", found, "of exactly", len(results), "documents")
+    else:
+        low = results.estimated_min_length()
+        high = results.estimated_length()
+    
+        print("Scored", found, "of between", low, "and", "high", "documents")
+
 
 Scoring and sorting
 ===================

src/whoosh/codec/legacy.py

         string = self.postfile.read(self.idslen)
         self.ids = deminimize_ids(self.idcode, self.count, string,
                                   compression=self.cmp)
+        return self.ids
 
     def read_weights(self):
         if self.wtslen == 0:

src/whoosh/codec/whoosh2.py

         self.stored = StoredFieldWriter(sffile)
         self.storedfields = None
 
-        self.flfile = segment.create_file(storage, W2Codec.LENGTHS_EXT)
         self.lengths = InMemoryLengths()
 
         # We'll wait to create the vector files until someone actually tries
         if self.storedfields is not None:
             self.stored.add(self.storedfields)
         self.stored.close()
-        self.lengths.to_file(self.flfile, self.doccount)
+        flfile = self.segment.create_file(self.storage, W2Codec.LENGTHS_EXT)
+        self.lengths.to_file(flfile, self.doccount)
         if self.vindex:
             self.vindex.close()
             self.vpostfile.close()

src/whoosh/compat.py

     xrange = xrange
     zip_ = zip
 
-    def memoryview_(source, offset, length):
-        return buffer(source, offset, length)
+    def memoryview_(source, offset=None, length=None):
+        if offset:
+            return buffer(source, offset, length)
+        else:
+            return buffer(source)
 
 else:
     PY3 = True
     xrange = range
     zip_ = lambda * args: list(zip(*args))
 
-    def memoryview_(source, offset, length):
-        return memoryview(source)[offset:offset + length]
+    def memoryview_(source, offset=None, length=None):
+        mv = memoryview(source)
+        if offset:
+            return mv[offset:offset + length]
+        else:
+            return mv
 

src/whoosh/fields.py

         return x
 
     def to_text(self, x, shift=0):
-        return self._to_text(self.prepare_number(x), shift=shift)
+        return self._to_text(self.prepare_number(x), shift=shift,
+                             signed=self.signed)
 
     def from_text(self, t):
-        x = self._from_text(t)
+        x = self._from_text(t, signed=self.signed)
         return self.unprepare_number(x)
 
     def process_text(self, text, **kwargs):

src/whoosh/filedb/fileindex.py

 
     @classmethod
     def _segment_pattern(cls, indexname):
-        return re.compile("_(%s_[0-9a-z]+)[.][a-z]+" % indexname)
+        return re.compile("(%s_[0-9a-z]+)[.][a-z]+" % indexname)
 
     @classmethod
     def _latest_generation(cls, storage, indexname):
 
     def writer(self, procs=1, **kwargs):
         if procs > 1:
-            from whoosh.filedb.multiproc2 import MpWriter
-            return MpWriter(self, **kwargs)
+            from whoosh.filedb.multiproc import MpWriter
+            return MpWriter(self, procs=procs, **kwargs)
         else:
             from whoosh.filedb.filewriting import SegmentWriter
             return SegmentWriter(self, **kwargs)

src/whoosh/filedb/filewriting.py

         self.perdocwriter = codec.per_document_writer(self.storage, newsegment)
         self.fieldwriter = codec.field_writer(self.storage, newsegment)
 
+    def __repr__(self):
+        return "<%s %r>" % (self.__class__.__name__, self.newsegment)
+
     def _setup_doc_offsets(self):
         self._doc_offsets = []
         base = 0
         newsegment.doccount = self.doc_count()
         return newsegment
 
-    def partial_segment(self):
-        self._check_state()
-        self.perdocwriter.close()
-        self.fieldwriter.close()
-        return self.get_segment()
-
-    def _finish(self, segments=None):
-        self.perdocwriter.close()
-        self.fieldwriter.close()
-        self.pool.cleanup()
-
-        if segments:
-            if self._added and self.compound:
-                # Assemble the segment files into a compound file
-                newsegment = segments[-1]
-                newsegment.create_compound_file(self.storage)
-                newsegment.compound = True
-
-            # Write a new TOC with the new segment list (and delete old files)
-            self.codec.commit_toc(self.storage, self.indexname, self.schema,
-                                  segments, self.generation)
-
-        self.is_closed = True
-        self.storage.close()
-
-    def _release_lock(self):
-        if self.writelock:
-            self.writelock.release()
-
     def _merge_segments(self, mergetype, optimize, merge):
         if mergetype:
             pass
         # other segments into this writer's pool
         return mergetype(self, self.segments)
 
+    def _flush_segment(self):
+        lengths = self.perdocwriter.lengths_reader()
+        postings = self.pool.iter_postings()
+        self.fieldwriter.add_postings(self.schema, lengths, postings)
+
+    def _close_segment(self):
+        self.perdocwriter.close()
+        self.fieldwriter.close()
+        self.pool.cleanup()
+
+    def _assemble_segment(self):
+        if self.compound:
+            # Assemble the segment files into a compound file
+            newsegment = self.get_segment()
+            newsegment.create_compound_file(self.storage)
+            newsegment.compound = True
+
+    def _commit_toc(self, segments):
+        # Write a new TOC with the new segment list (and delete old files)
+        self.codec.commit_toc(self.storage, self.indexname, self.schema,
+                              segments, self.generation)
+
+    def _finish(self):
+        if self.writelock:
+            self.writelock.release()
+        self.is_closed = True
+        #self.storage.close()
+
+    def _partial_segment(self):
+        # For use by a parent multiprocessing writer: Closes out the segment
+        # but leaves the pool files intact so the parent can access them
+        self._check_state()
+        self.perdocwriter.close()
+        self.fieldwriter.close()
+        # Don't call self.pool.cleanup()! We want to grab the pool files.
+        return self.get_segment()
+
     def commit(self, mergetype=None, optimize=False, merge=True):
         """Finishes writing and saves all additions and changes to disk.
         
         """
 
         self._check_state()
-        schema = self.schema
         try:
+            # Merge old segments if necessary
             finalsegments = self._merge_segments(mergetype, optimize, merge)
             if self._added:
-                # Update the new segment with the current doc count
-                newsegment = self.get_segment()
-
-                # Output the sorted pool postings to the terms index and
-                # posting files
-                lengths = self.perdocwriter.lengths_reader()
-                self.fieldwriter.add_postings(schema, lengths,
-                                              self.pool.iter_postings())
+                # Finish writing segment
+                self._flush_segment()
+                # Close segment files
+                self._close_segment()
+                # Assemble compound segment if necessary
+                self._assemble_segment()
 
                 # Add the new segment to the list of remaining segments
                 # returned by the merge policy function
-                finalsegments.append(newsegment)
-            # Close all files
-            self._finish(finalsegments)
+                finalsegments.append(self.get_segment())
+            else:
+                # Close segment files
+                self._close_segment()
+            # Write TOC
+            self._commit_toc(finalsegments)
         finally:
-            self._release_lock()
+            # Final cleanup
+            self._finish()
 
     def cancel(self):
         self._check_state()
-        try:
-            self._finish()
-        finally:
-            if self.writelock:
-                self.writelock.release()
+        self._close_segment()
+        self._finish()
 
 
 # Retroactively add spelling files to an existing index

src/whoosh/filedb/multiproc.py

-# Copyright 2010 Matt Chaput. All rights reserved.
+# Copyright 2011 Matt Chaput. All rights reserved.
 #
 # Redistribution and use in source and binary forms, with or without
 # modification, are permitted provided that the following conditions are met:
 # those of the authors and should not be interpreted as representing official
 # policies, either expressed or implied, of Matt Chaput.
 
-import os
-import tempfile
+
+import os, tempfile
 from multiprocessing import Process, Queue, cpu_count
 
-from whoosh.compat import dump, load, xrange, iteritems
-from whoosh.filedb.filetables import LengthWriter, LengthReader
-from whoosh.filedb.fileindex import Segment
-from whoosh.filedb.filewriting import SegmentWriter
-from whoosh.filedb.pools import (imerge, read_run, PoolBase, TempfilePool)
-from whoosh.filedb.structfile import StructFile
-from whoosh.writing import IndexWriter
+from whoosh.compat import xrange, iteritems, pickle
+from whoosh.codec import base
+from whoosh.filedb.filewriting import PostingPool, SegmentWriter
+from whoosh.support.externalsort import imerge
 
 
-# Multiprocessing writer
+def finish_subsegment(writer, k=64):
+    # Tell the pool to finish up the current file
+    writer.pool.save()
+    # Tell the pool to merge any and all runs in the pool until there
+    # is only one run remaining. "k" is an optional parameter passed
+    # from the parent which sets the maximum number of files to open
+    # while reducing.
+    writer.pool.reduce_to(1, k)
 
-class SegmentWritingTask(Process):
-    def __init__(self, storage, indexname, segname, kwargs, jobqueue,
-                 resultqueue, firstjob=None):
+    # The filename of the single remaining run
+    runname = writer.pool.runs[0]
+    # The segment ID (parent can use this to re-open the files created
+    # by my sub-writer)
+    segment = writer._partial_segment()
+
+    return runname, segment
+
+
+# Multiprocessing Writer
+
+class SubWriterTask(Process):
+    # This is a Process object that takes "jobs" off a job Queue, processes
+    # them, and when it's done, puts a summary of its work on a results Queue
+
+    def __init__(self, storage, indexname, jobqueue, resultqueue, kwargs,
+                 multisegment):
         Process.__init__(self)
         self.storage = storage
         self.indexname = indexname
-        self.segname = segname
-        self.kwargs = kwargs
         self.jobqueue = jobqueue
         self.resultqueue = resultqueue
-        self.firstjob = firstjob
-
-        self.segment = None
+        self.kwargs = kwargs
+        self.multisegment = multisegment
         self.running = True
 
-    def _add_file(self, args):
-        writer = self.writer
-        filename, length = args
-        f = open(filename, "rb")
-        for _ in xrange(length):
-            writer.add_document(**load(f))
-        f.close()
-        os.remove(filename)
+    def run(self):
+        # This is the main loop of the process. OK, so the way this works is
+        # kind of brittle and stupid, but I had to figure out how to use the
+        # multiprocessing module, work around bugs, and address performance
+        # issues, so there is at least some reasoning behind some of this
 
-    def run(self):
+        # The "parent" task farms individual documents out to the subtasks for
+        # indexing. You could pickle the actual documents and put them in the
+        # queue, but that is not very performant. Instead, we assume the tasks
+        # share a filesystem and use that to pass the information around. The
+        # parent task writes a certain number of documents to a file, then puts
+        # the filename on the "job queue". A subtask gets the filename off the
+        # queue and reads through the file processing the documents.
+
         jobqueue = self.jobqueue
+        resultqueue = self.resultqueue
+        multisegment = self.multisegment
+
+        # Open a placeholder object representing the index
         ix = self.storage.open_index(self.indexname)
-        writer = self.writer = SegmentWriter(ix, _lk=False, name=self.segname,
-                                             **self.kwargs)
+        # Open a writer for the index. The _lk=False parameter means to not try
+        # to lock the index (the parent object that started me takes care of
+        # locking the index)
+        writer = self.writer = SegmentWriter(ix, _lk=False, **self.kwargs)
 
-        if self.firstjob:
-            self._add_file(self.firstjob)
-
+        # If the parent task calls cancel() on me, it will set self.running to
+        # False, so I'll notice the next time through the loop
         while self.running:
-            args = jobqueue.get()
-            if args is None:
+            # Take an object off the job queue
+            jobinfo = jobqueue.get()
+            # If the object is None, it means the parent task wants me to
+            # finish up
+            if jobinfo is None:
                 break
-            self._add_file(args)
+            # The object from the queue is a tuple of (filename,
+            # number_of_docs_in_file). Pass those two pieces of information as
+            # arguments to _process_file().
+            self._process_file(*jobinfo)
 
         if not self.running:
+            # I was cancelled, so I'll cancel my underlying writer
             writer.cancel()
         else:
-            writer.pool.finish(writer.termswriter, writer.docnum,
-                               writer.lengthfile)
-            writer._close_all()
-            self.resultqueue.put(writer._getsegment())
+            if multisegment:
+                # Actually finish the segment and return it with no run
+                runname = None
+                writer._flush_segment()
+                writer._close_segment()
+                writer._assemble_segment()
+                segment = writer.get_segment()
+            else:
+                # Merge all runs in the writer's pool into one run, close the
+                # segment, and return the run name and the segment
+                k = self.kwargs.get("k", 64)
+                runname, segment = finish_subsegment(writer, k)
+
+            # Put the results (the run filename and the segment object) on the
+            # result queue
+            resultqueue.put((runname, segment), timeout=5)
+
+    def _process_file(self, filename, doc_count):
+        # This method processes a "job file" written out by the parent task. A
+        # job file is a series of pickled (code, arguments) tuples. Currently
+        # the only command codes is 0=add_document
+
+        writer = self.writer
+        load = pickle.load
+        f = open(filename, "rb")
+        for _ in xrange(doc_count):
+            # Load the next pickled tuple from the file
+            code, args = load(f)
+            assert code == 0
+            writer.add_document(**args)
+        f.close()
+        # Remove the job file
+        os.remove(filename)
 
     def cancel(self):
         self.running = False
 
 
-class MultiSegmentWriter(IndexWriter):
-    def __init__(self, ix, procs=None, batchsize=100, dir=None, **kwargs):
-        self.index = ix
+class MpWriter(SegmentWriter):
+    def __init__(self, ix, procs=None, batchsize=100, subargs=None,
+                 multisegment=False, **kwargs):
+        # This is the "main" writer that will aggregate the results created by
+        # the sub-tasks
+        SegmentWriter.__init__(self, ix, **kwargs)
+
         self.procs = procs or cpu_count()
-        self.bufferlimit = batchsize
-        self.dir = dir
-        self.kwargs = kwargs
-        self.kwargs["dir"] = dir
+        # The maximum number of documents in each job file submitted to the
+        # sub-tasks
+        self.batchsize = batchsize
+        # You can use keyword arguments or the "subargs" argument to pass
+        # keyword arguments to the sub-writers
+        self.subargs = subargs if subargs else kwargs
+        # If multisegment is True, don't merge the segments created by the
+        # sub-writers, just add them directly to the TOC
+        self.multisegment = multisegment
 
-        self.segnames = []
+        # A list to hold the sub-task Process objects
         self.tasks = []
+        # A queue to pass the filenames of job files to the sub-tasks
         self.jobqueue = Queue(self.procs * 4)
+        # A queue to get back the final results of the sub-tasks
         self.resultqueue = Queue()
+        # A buffer for documents before they are flushed to a job file
         self.docbuffer = []
 
-        self.writelock = ix.lock("WRITELOCK")
-        self.writelock.acquire()
+        self._grouping = 0
+        self._added_sub = False
 
-        info = ix._read_toc()
-        self.schema = info.schema
-        self.segment_number = info.segment_counter
-        self.generation = info.generation + 1
-        self.segments = info.segments
-        self.storage = ix.storage
-
-    def _new_task(self, firstjob):
-        ix = self.index
-        self.segment_number += 1
-        segmentname = Segment.basename(ix.indexname, self.segment_number)
-        task = SegmentWritingTask(ix.storage, ix.indexname, segmentname,
-                                  self.kwargs, self.jobqueue,
-                                  self.resultqueue, firstjob)
+    def _new_task(self):
+        task = SubWriterTask(self.storage, self.indexname,
+                             self.jobqueue, self.resultqueue, self.subargs,
+                             self.multisegment)
         self.tasks.append(task)
         task.start()
         return task
 
     def _enqueue(self):
-        doclist = self.docbuffer
-        fd, filename = tempfile.mkstemp(".doclist", dir=self.dir)
+        # Flush the documents stored in self.docbuffer to a file and put the
+        # filename on the job queue
+        docbuffer = self.docbuffer
+        dump = pickle.dump
+        length = len(docbuffer)
+        fd, filename = tempfile.mkstemp(".doclist")
         f = os.fdopen(fd, "wb")
-        for doc in doclist:
-            dump(doc, f, -1)
-        f.close()
-        args = (filename, len(doclist))
+        for item in docbuffer:
+            dump(item, f, -1)
 
         if len(self.tasks) < self.procs:
-            self._new_task(args)
-        else:
-            self.jobqueue.put(args)
-
+            self._new_task()
+        jobinfo = (filename, length)
+        self.jobqueue.put(jobinfo)
         self.docbuffer = []
 
     def cancel(self):
             for task in self.tasks:
                 task.cancel()
         finally:
-            self.writelock.release()
+            SegmentWriter.cancel(self)
+
+    def start_group(self):
+        self._grouping += 1
+
+    def end_group(self):
+        if not self._grouping:
+            raise Exception("Unbalanced end_group")
+        self._grouping -= 1
 
     def add_document(self, **fields):
-        self.docbuffer.append(fields)
-        if len(self.docbuffer) >= self.bufferlimit:
+        # Add the document to the docbuffer
+        self.docbuffer.append((0, fields))
+        # If the buffer is full, flush it to the job queue
+        if not self._grouping and len(self.docbuffer) >= self.batchsize:
             self._enqueue()
+        self._added_sub = True
 
-    def commit(self, **kwargs):
+    def _read_and_renumber_run(self, path, offset):
+        # Note that SortingPool._read_run() automatically deletes the run file
+        # when it's finished
+
+        gen = PostingPool._read_run(path)
+        # If offset is 0, just return the items unchanged
+        if not offset:
+            return gen
+        else:
+            # Otherwise, add the offset to each docnum
+            return ((fname, text, docnum + offset, weight, value)
+                    for fname, text, docnum, weight, value in gen)
+
+    def commit(self, mergetype=None, optimize=False, merge=True):
+        if self._added_sub:
+            # If documents have been added to sub-writers, use the parallel
+            # merge commit code
+            self._commit(mergetype, optimize, merge)
+        else:
+            # Otherwise, just do a regular-old commit
+            SegmentWriter.commit(self, mergetype=mergetype, optimize=optimize,
+                                 merge=merge)
+
+    def _commit(self, mergetype, optimize, merge):
         try:
-            # index the remaining stuff in self.docbuffer
+            # Index the remaining documents in the doc buffer
             self._enqueue()
-
+            # Tell the tasks to finish
             for task in self.tasks:
                 self.jobqueue.put(None)
 
+            # Merge existing segments
+            finalsegments = self._merge_segments(mergetype, optimize, merge)
+
+            # Wait for the subtasks to finish
             for task in self.tasks:
                 task.join()
 
+            # Pull a (run_file_name, segment) tuple off the result queue for
+            # each sub-task, representing the final results of the task
+            results = []
             for task in self.tasks:
-                taskseg = self.resultqueue.get()
-                assert isinstance(taskseg, Segment), type(taskseg)
-                self.segments.append(taskseg)
+                results.append(self.resultqueue.get(timeout=5))
 
-            self.jobqueue.close()
-            self.resultqueue.close()
+            if self.multisegment:
+                finalsegments += [s for _, s in results]
+                if self._added:
+                    self._flush_segment()
+                    self._close_segment()
+                    self._assemble_segment()
+                    finalsegments.append(self.get_segment())
+                else:
+                    self._close_segment()
+            else:
+                # Merge the posting sources from the sub-writers and my
+                # postings into this writer
+                self._merge_subsegments(results, mergetype, optimize, merge)
+                self._close_segment()
+                self._assemble_segment()
+                finalsegments.append(self.get_segment())
+            self._commit_toc(finalsegments)
+        finally:
+            self._finish()
 
-            from whoosh.filedb.fileindex import _write_toc, _clean_files
-            _write_toc(self.storage, self.schema, self.index.indexname,
-                       self.generation, self.segment_number, self.segments)
+    def _merge_subsegments(self, results, mergetype, optimize, merge):
+        schema = self.schema
+        storage = self.storage
+        codec = self.codec
+        fieldnames = list(schema.names())
 
-            # Delete leftover files
-            _clean_files(self.storage, self.index.indexname,
-                         self.generation, self.segments)
+        # Merge per-document information
+        pdw = self.perdocwriter
+        # Names of fields that store term vectors
+        vnames = set(schema.vector_names())
+        basedoc = self.docnum
+        # A list to remember field length readers for each sub-segment (we'll
+        # re-use them below)
+        lenreaders = [pdw.lengths_reader()]
+
+        for _, segment in results:
+            # Create a field length reader for the sub-segment
+            lenreader = codec.lengths_reader(storage, segment)
+            # Remember it in the list for later
+            lenreaders.append(lenreader)
+            # Vector reader for the sub-segment
+            vreader = None
+            if schema.has_vectored_fields():
+                vreader = codec.vector_reader(storage, segment)
+            # Stored field reader for the sub-segment
+            sfreader = codec.stored_fields_reader(storage, segment)
+            # Iterating on the stored field reader yields a dictionary of
+            # stored fields for *every* document in the segment (even if the
+            # document has no stored fields it should yield {})
+            for i, fs in enumerate(sfreader):
+                # Add the base doc count to the sub-segment doc num
+                pdw.start_doc(basedoc + i)
+                # Call add_field to store the field values and lengths
+                for fieldname in fieldnames:
+                    value = fs.get(fieldname)
+                    length = lenreader.doc_field_length(i, fieldname)
+                    pdw.add_field(fieldname, schema[fieldname], value, length)
+                # Copy over the vectors. TODO: would be much faster to bulk-
+                # copy the postings
+                if vreader:
+                    for fieldname in vnames:
+                        if (i, fieldname) in vreader:
+                            field = schema[fieldname]
+                            vformat = field.vector
+                            vmatcher = vreader.matcher(i, fieldname, vformat)
+                            pdw.add_vector_matcher(fieldname, field, vmatcher)
+                pdw.finish_doc()
+            basedoc += segment.doccount
+
+        # If information was added to this writer the conventional (e.g.
+        # through add_reader or merging segments), add it as an extra source
+        if self._added:
+            sources = [self.pool.iter_postings()]
+        else:
+            sources = []
+        # Add iterators from the run filenames
+        basedoc = self.docnum
+        for runname, segment in results:
+            items = self._read_and_renumber_run(runname, basedoc)
+            sources.append(items)
+            basedoc += segment.doccount
+
+        # Create a MultiLengths object combining the length files from the
+        # subtask segments
+        mlens = base.MultiLengths(lenreaders)
+        # Merge the iterators into the field writer
+        self.fieldwriter.add_postings(schema, mlens, imerge(sources))
+        self.docnum = basedoc
+        self._added = True
+
+
+class SerialMpWriter(MpWriter):
+    # A non-parallel version of the MpWriter for testing purposes
+
+    def __init__(self, ix, procs=None, batchsize=100, subargs=None, **kwargs):
+        SegmentWriter.__init__(self, ix, **kwargs)
+
+        self.procs = procs or cpu_count()
+        self.batchsize = batchsize
+        self.subargs = subargs if subargs else kwargs
+        self.tasks = [SegmentWriter(ix, _lk=False, **self.subargs)
+                      for _ in xrange(self.procs)]
+        self.pointer = 0
+        self._added_sub = False
+
+    def add_document(self, **fields):
+        self.tasks[self.pointer].add_document(**fields)
+        self.pointer = (self.pointer + 1) % len(self.tasks)
+        self._added_sub = True
+
+    def _commit(self, mergetype, optimize, merge):
+        # Pull a (run_file_name, segment) tuple off the result queue for each
+        # sub-task, representing the final results of the task
+        try:
+            # Merge existing segments
+            finalsegments = self._merge_segments(mergetype, optimize, merge)
+            results = []
+            for writer in self.tasks:
+                results.append(finish_subsegment(writer))
+            self._merge_subsegments(results, mergetype, optimize, merge)
+            self._close_segment()
+            self._assemble_segment()
+            finalsegments.append(self.get_segment())
+            self._commit_toc(finalsegments)
         finally:
-            self.writelock.release()
+            self._finish()
 
 
-# Multiprocessing pool
+# For compatibility with old multiproc module
+class MultiSegmentWriter(MpWriter):
+    def __init__(self, *args, **kwargs):
+        MpWriter.__init__(self, *args, **kwargs)
+        self.multisegment = True
 
-class PoolWritingTask(Process):
-    def __init__(self, schema, dir, jobqueue, resultqueue, limitmb,
-                 firstjob=None):
-        Process.__init__(self)
-        self.schema = schema
-        self.dir = dir
-        self.jobqueue = jobqueue
-        self.resultqueue = resultqueue
-        self.limitmb = limitmb
-        self.firstjob = firstjob
-
-    def _add_file(self, filename, length):
-        subpool = self.subpool
-        f = open(filename, "rb")
-        for _ in xrange(length):
-            code, args = load(f)
-            if code == 0:
-                subpool.add_content(*args)
-            elif code == 1:
-                subpool.add_posting(*args)
-            elif code == 2:
-                subpool.add_field_length(*args)
-        f.close()
-        os.remove(filename)
-
-    def run(self):
-        jobqueue = self.jobqueue
-        rqueue = self.resultqueue
-        subpool = self.subpool = TempfilePool(self.schema,
-                                              limitmb=self.limitmb,
-                                              dir=self.dir)
-
-        if self.firstjob:
-            self._add_file(*self.firstjob)
-
-        while True:
-            arg1, arg2 = jobqueue.get()
-            if arg1 is None:
-                doccount = arg2
-                break
-            else:
-                self._add_file(arg1, arg2)
-
-        lenfd, lenfilename = tempfile.mkstemp(".lengths", dir=subpool.dir)
-        lenf = os.fdopen(lenfd, "wb")
-        subpool._write_lengths(StructFile(lenf), doccount)
-        subpool.dump_run()
-        rqueue.put((subpool.runs, subpool.fieldlength_totals(),
-                    subpool.fieldlength_mins(), subpool.fieldlength_maxes(),
-                    lenfilename))
-
-
-class MultiPool(PoolBase):
-    def __init__(self, schema, dir=None, procs=2, limitmb=32, batchsize=100,
-                 **kw):
-        PoolBase.__init__(self, schema, dir=dir)
-        self._make_dir()
-
-        self.procs = procs
-        self.limitmb = limitmb
-        self.jobqueue = Queue(self.procs * 4)
-        self.resultqueue = Queue()
-        self.tasks = []
-        self.buffer = []
-        self.bufferlimit = batchsize
-
-    def _new_task(self, firstjob):
-        task = PoolWritingTask(self.schema, self.dir, self.jobqueue,
-                               self.resultqueue, self.limitmb,
-                               firstjob=firstjob)
-        self.tasks.append(task)
-        task.start()
-        return task
-
-    def _enqueue(self):
-        commandlist = self.buffer
-        fd, filename = tempfile.mkstemp(".commands", dir=self.dir)
-        f = os.fdopen(fd, "wb")
-        for command in commandlist:
-            dump(command, f, -1)
-        f.close()
-        args = (filename, len(commandlist))
-
-        if len(self.tasks) < self.procs:
-            self._new_task(args)
-        else:
-            self.jobqueue.put(args)
-
-        self.buffer = []
-
-    def _append(self, item):
-        self.buffer.append(item)
-        if len(self.buffer) > self.bufferlimit:
-            self._enqueue()
-
-    def add_content(self, *args):
-        self._append((0, args))
-
-    def add_posting(self, *args):
-        self._append((1, args))
-
-    def add_field_length(self, *args):
-        self._append((2, args))
-
-    def cancel(self):
-        for task in self.tasks:
-            task.terminate()
-        self.cleanup()
-
-    def cleanup(self):
-        self._clean_temp_dir()
-
-    def finish(self, termswriter, doccount, lengthfile):
-        if self.buffer:
-            self._enqueue()
-
-        _fieldlength_totals = self._fieldlength_totals
-        if not self.tasks:
-            return
-
-        jobqueue = self.jobqueue
-        rqueue = self.resultqueue
-
-        for task in self.tasks:
-            jobqueue.put((None, doccount))
-
-        for task in self.tasks:
-            task.join()
-
-        runs = []
-        lenfilenames = []
-        for task in self.tasks:
-            truns, flentotals, flenmins, flenmaxes, lenfilename = rqueue.get()
-            runs.extend(truns)
-            lenfilenames.append(lenfilename)
-            for fieldname, total in iteritems(flentotals):
-                _fieldlength_totals[fieldname] += total
-
-            for fieldname, length in iteritems(flenmins):
-                if length < self._fieldlength_maxes.get(fieldname, 9999999999):
-                    self._fieldlength_mins[fieldname] = length
-
-            for fieldname, length in flenmaxes.iteritems():
-                if length > self._fieldlength_maxes.get(fieldname, 0):
-                    self._fieldlength_maxes[fieldname] = length
-
-        jobqueue.close()
-        rqueue.close()
-
-        lw = LengthWriter(lengthfile, doccount)
-        for lenfilename in lenfilenames:
-            sublengths = LengthReader(StructFile(open(lenfilename, "rb")),
-                                      doccount)
-            lw.add_all(sublengths)
-            os.remove(lenfilename)
-        lw.close()
-        lengths = lw.reader()
-
-#        if len(runs) >= self.procs * 2:
-#            pool = Pool(self.procs)
-#            tempname = lambda: tempfile.mktemp(suffix=".run", dir=self.dir)
-#            while len(runs) >= self.procs * 2:
-#                runs2 = [(runs[i:i+4], tempname())
-#                         for i in xrange(0, len(runs), 4)]
-#                if len(runs) % 4:
-#                    last = runs2.pop()[0]
-#                    runs2[-1][0].extend(last)
-#                runs = pool.map(merge_runs, runs2)
-#            pool.close()
-
-        iterator = imerge([read_run(rname, count) for rname, count in runs])
-        total = sum(count for runname, count in runs)
-        termswriter.add_iter(iterator, lengths.get)
-        for runname, count in runs:
-            os.remove(runname)
-
-        self.cleanup()

src/whoosh/filedb/multiproc2.py

-# Copyright 2011 Matt Chaput. All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are met:
-#
-#    1. Redistributions of source code must retain the above copyright notice,
-#       this list of conditions and the following disclaimer.
-#
-#    2. Redistributions in binary form must reproduce the above copyright
-#       notice, this list of conditions and the following disclaimer in the
-#       documentation and/or other materials provided with the distribution.
-#
-# THIS SOFTWARE IS PROVIDED BY MATT CHAPUT ``AS IS'' AND ANY EXPRESS OR
-# IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
-# MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
-# EVENT SHALL MATT CHAPUT OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
-# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
-# OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
-# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
-# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
-# EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-#
-# The views and conclusions contained in the software and documentation are
-# those of the authors and should not be interpreted as representing official
-# policies, either expressed or implied, of Matt Chaput.
-
-
-import os, tempfile
-from multiprocessing import Process, Queue, cpu_count
-
-from whoosh.compat import xrange, iteritems, pickle
-from whoosh.codec import base
-from whoosh.filedb.filewriting import SegmentWriter
-from whoosh.support.externalsort import imerge, SortingPool
-
-
-def finish_subsegment(writer, k=64):
-    # Tell the pool to finish up the current file
-    writer.pool.save()
-    # Tell the pool to merge any and all runs in the pool until there
-    # is only one run remaining. "k" is an optional parameter passed
-    # from the parent which sets the maximum number of files to open
-    # while reducing.
-    writer.pool.reduce_to(1, k)
-
-    # The filename of the single remaining run
-    runname = writer.pool.runs[0]
-    # The segment ID (parent can use this to re-open the files created
-    # by my sub-writer)
-    segment = writer.partial_segment()
-
-    return runname, segment
-
-
-# Multiprocessing Writer
-
-class SubWriterTask(Process):
-    # This is a Process object that takes "jobs" off a job Queue, processes
-    # them, and when it's done, puts a summary of its work on a results Queue
-
-    def __init__(self, storage, indexname, jobqueue, resultqueue, kwargs):
-        Process.__init__(self)
-        self.storage = storage
-        self.indexname = indexname
-        self.jobqueue = jobqueue
-        self.resultqueue = resultqueue
-        self.kwargs = kwargs
-        self.running = True
-
-    def run(self):
-        # This is the main loop of the process. OK, so the way this works is
-        # kind of brittle and stupid, but I had to figure out how to use the
-        # multiprocessing module, work around bugs, and address performance
-        # issues, so there is at least some reasoning behind some of this
-
-        # The "parent" task farms individual documents out to the subtasks for
-        # indexing. You could pickle the actual documents and put them in the
-        # queue, but that is not very performant. Instead, we assume the tasks
-        # share a filesystem and use that to pass the information around. The
-        # parent task writes a certain number of documents to a file, then puts
-        # the filename on the "job queue". A subtask gets the filename off the
-        # queue and reads through the file processing the documents.
-
-        jobqueue = self.jobqueue
-        resultqueue = self.resultqueue
-
-        # Open a placeholder object representing the index
-        ix = self.storage.open_index(self.indexname)
-        # Open a writer for the index. The _lk=False parameter means to not try
-        # to lock the index (the parent object that started me takes care of
-        # locking the index)
-        writer = self.writer = SegmentWriter(ix, _lk=False, **self.kwargs)
-
-        # If the parent task calls cancel() on me, it will set self.running to
-        # False, so I'll notice the next time through the loop
-        while self.running:
-            # Take an object off the job queue
-            jobinfo = jobqueue.get()
-            # If the object is None, it means the parent task wants me to
-            # finish up
-            if jobinfo is None:
-                break
-            # The object from the queue is a tuple of (filename,
-            # number_of_docs_in_file). Pass those two pieces of information as
-            # arguments to _process_file().
-            self._process_file(*jobinfo)
-
-        if not self.running:
-            # I was cancelled, so I'll cancel my underlying writer
-            writer.cancel()
-        else:
-            runname, segment = finish_subsegment(writer,
-                                                 self.kwargs.get("k", 64))
-            # Put the results (the run filename and the segment object) on the
-            # result queue
-            resultqueue.put((runname, segment), timeout=5)
-
-    def _process_file(self, filename, doc_count):
-        # This method processes a "job file" written out by the parent task. A
-        # job file is a series of pickled (code, arguments) tuples. Currently
-        # the only command codes is 0=add_document
-
-        writer = self.writer
-        load = pickle.load
-        f = open(filename, "rb")
-        for _ in xrange(doc_count):
-            # Load the next pickled tuple from the file
-            code, args = load(f)
-            assert code == 0
-            writer.add_document(**args)
-        f.close()
-        # Remove the job file
-        os.remove(filename)
-
-    def cancel(self):
-        self.running = False
-
-
-class MpWriter(SegmentWriter):
-    def __init__(self, ix, procs=None, batchsize=100, subargs=None, **kwargs):
-        # This is the "main" writer that will aggregate the results created by
-        # the sub-tasks
-        SegmentWriter.__init__(self, ix, **kwargs)
-
-        self.procs = procs or cpu_count()
-        # The maximum number of documents in each job file submitted to the
-        # sub-tasks
-        self.batchsize = batchsize
-        # You can use keyword arguments or the "subargs" argument to pass
-        # keyword arguments to the sub-writers
-        self.subargs = subargs if subargs else kwargs
-
-        # A list to hold the sub-task Process objects
-        self.tasks = []
-        # A queue to pass the filenames of job files to the sub-tasks
-        self.jobqueue = Queue(self.procs * 4)
-        # A queue to get back the final results of the sub-tasks
-        self.resultqueue = Queue()
-        # A buffer for documents before they are flushed to a job file
-        self.docbuffer = []
-
-        self._added_sub = False
-
-    def _new_task(self):
-        task = SubWriterTask(self.storage, self.indexname,
-                             self.jobqueue, self.resultqueue, self.subargs)
-        self.tasks.append(task)
-        task.start()
-        return task
-
-    def _enqueue(self):
-        # Flush the documents stored in self.docbuffer to a file and put the
-        # filename on the job queue
-        docbuffer = self.docbuffer
-        dump = pickle.dump
-        length = len(docbuffer)
-        fd, filename = tempfile.mkstemp(".doclist")
-        f = os.fdopen(fd, "wb")
-        for item in docbuffer:
-            dump(item, f, -1)
-
-        if len(self.tasks) < self.procs:
-            self._new_task()
-        jobinfo = (filename, length)
-        self.jobqueue.put(jobinfo)
-        self.docbuffer = []
-
-    def cancel(self):
-        try:
-            for task in self.tasks:
-                task.cancel()
-        finally:
-            SegmentWriter.cancel(self)
-
-    def add_document(self, **fields):
-        # Add the document to the docbuffer
-        self.docbuffer.append((0, fields))
-        # If the buffer is full, flush it to the job queue
-        if len(self.docbuffer) >= self.batchsize:
-            self._enqueue()
-        self._added_sub = True
-
-    def _read_and_renumber_run(self, path, offset):
-        # Note that SortingPool._read_run() automatically deletes the run file
-        # when it's finished
-
-        gen = SortingPool._read_run(path)
-        # If offset is 0, just return the items unchanged
-        if not offset:
-            return gen
-        else:
-            # Otherwise, add the offset to each docnum
-            return ((fname, text, docnum + offset, weight, value)
-                    for fname, text, docnum, weight, value in gen)
-
-    def commit(self, mergetype=None, optimize=False, merge=True):
-        if self._added_sub:
-            self._commit(mergetype, optimize, merge)
-        else:
-            SegmentWriter.commit(self, mergetype=mergetype, optimize=optimize,
-                                 merge=merge)
-
-    def _commit(self, mergetype, optimize, merge):
-        try:
-            # Index the remaining documents in the doc buffer
-            self._enqueue()
-            # Tell the tasks to finish
-            for task in self.tasks:
-                self.jobqueue.put(None)
-
-            # Merge existing segments
-            segments = self._merge_segments(mergetype, optimize, merge)
-            sources = []
-            if self._added:
-                # If information was added to this writer the conventional
-                # (e.g. through add_reader or merging segments), add it as an
-                # extra source
-                sources.append(self.pool.iter_postings())
-
-            # Wait for the subtasks to finish
-            for task in self.tasks:
-                task.join()
-
-            # Pull a (run_file_name, segment) tuple off the result queue for
-            # each sub-task, representing the final results of the task
-            results = []
-            for task in self.tasks:
-                results.append(self.resultqueue.get(timeout=5))
-            self._merge_subsegments(results, sources, mergetype, optimize,
-                                    merge)
-            self._finish(segments + [self.get_segment()])
-        finally:
-            self._release_lock()
-
-    def _merge_subsegments(self, results, sources, mergetype, optimize, merge):
-        schema = self.schema
-        storage = self.storage
-        codec = self.codec
-        fieldnames = list(schema.names())
-
-        # Merge per-document information
-        pdw = self.perdocwriter
-        # Names of fields that store term vectors
-        vnames = set(schema.vector_names())
-        basedoc = self.docnum
-        # A list to remember field length readers for each sub-segment (we'll
-        # re-use them below)
-        lenreaders = [pdw.lengths_reader()]
-
-        for _, segment in results:
-            # Create a field length reader for the sub-segment
-            lenreader = codec.lengths_reader(storage, segment)
-            # Remember it in the list for later
-            lenreaders.append(lenreader)
-            # Vector reader for the sub-segment
-            vreader = codec.vector_reader(storage, segment)
-            # Stored field reader for the sub-segment
-            sfreader = codec.stored_fields_reader(storage, segment)
-            # Iterating on the stored field reader yields a dictionary of
-            # stored fields for *every* document in the segment (even if the
-            # document has no stored fields it should yield {})
-            for i, fs in enumerate(sfreader):
-                # Add the base doc count to the sub-segment doc num
-                pdw.start_doc(basedoc + i)
-                # Call add_field to store the field values and lengths
-                for fieldname in fieldnames:
-                    value = fs.get(fieldname)
-                    length = lenreader.doc_field_length(i, fieldname)
-                    pdw.add_field(fieldname, schema[fieldname], value, length)
-                # Copy over the vectors. TODO: would be much faster to bulk-
-                # copy the postings
-                for fieldname in vnames:
-                    if (i, fieldname) in vreader:
-                        field = schema[fieldname]
-                        vmatcher = vreader.matcher(i, fieldname, field.vector)
-                        pdw.add_vector_matcher(fieldname, field, vmatcher)
-                pdw.finish_doc()
-            basedoc += segment.doccount
-
-        # Create a list of iterators from the run filenames
-        basedoc = self.docnum
-        for runname, segment in results:
-            items = self._read_and_renumber_run(runname, basedoc)
-            sources.append(items)
-            basedoc += segment.doccount
-
-        # Create a MultiLengths object combining the length files from the
-        # subtask segments
-        mlens = base.MultiLengths(lenreaders)
-        # Merge the iterators into the field writer
-        self.fieldwriter.add_postings(schema, mlens, imerge(sources))
-        self.docnum = basedoc
-
-
-class SerialMpWriter(MpWriter):
-    # A non-parallel version of the MpWriter for testing purposes
-
-    def __init__(self, ix, procs=None, batchsize=100, subargs=None, **kwargs):
-        SegmentWriter.__init__(self, ix, **kwargs)
-
-        self.procs = procs or cpu_count()
-        self.batchsize = batchsize
-        self.subargs = subargs if subargs else kwargs
-        self.tasks = [SegmentWriter(ix, _lk=False, **self.subargs)
-                      for _ in xrange(self.procs)]
-        self.pointer = 0
-        self._added_sub = False
-
-    def add_document(self, **fields):
-        self.tasks[self.pointer].add_document(**fields)
-        self.pointer = (self.pointer + 1) % len(self.tasks)
-        self._added_sub = True
-
-    def _commit(self, mergetype, optimize, merge):
-        # Pull a (run_file_name, segment) tuple off the result queue for each
-        # sub-task, representing the final results of the task
-        try:
-            # Merge existing segments
-            segments = self._merge_segments(mergetype, optimize, merge)
-            sources = []
-            if self._added:
-                # If information was added to this writer the conventional
-                # (e.g. through add_reader or merging segments), add it as an
-                # extra source
-                sources.append(self.pool.iter_postings())
-
-            results = []
-            for writer in self.tasks:
-                results.append(finish_subsegment(writer))
-
-            self._merge_subsegments(results, sources, mergetype, optimize,
-                                    merge)
-            self._finish(segments + [self.get_segment()])
-        finally:
-            self._release_lock()
-
-
-

src/whoosh/formats.py

     def decode_weight(self, valuestring):
         return self.field_boost
 
+    def combine(self, vs):
+        return ''
+
 
 class Frequency(Format):
     """Stores frequency information for each posting.
         freq = unpack_uint(valuestring)[0]
         return freq * self.field_boost
 
+    def combine(self, vs):
+        return pack_uint(sum(self.decode_value(v) for v in vs))
+
 
 class Positions(Format):
     """Stores position information in each posting, to allow phrase searching
             weights[t.text] += t.boost
 
         for w, poslist in iteritems(poses):
-            deltas = []
-            base = 0
-            for pos in poslist:
-                deltas.append(pos - base)
-                base = pos
-            value = pack_uint(len(deltas)) + dumps(deltas, -1)[2:-1]
+            value = self.encode(poslist)
             yield (w, len(poslist), weights[w] * fb, value)
 
+    def encode(self, poslist):
+        deltas = []
+        base = 0
+        for pos in poslist:
+            deltas.append(pos - base)
+            base = pos
+        return pack_uint(len(deltas)) + dumps(deltas, -1)[2:-1]
+
     def decode_positions(self, valuestring):
         codes = loads(valuestring[_INT_SIZE:] + b("."))
         position = 0
     def decode_position_boosts(self, valuestring):
         return [(pos, 1) for pos in self.decode_positions(valuestring)]
 
+    def combine(self, vs):
+        s = set()
+        for v in vs:
+            s.update(self.decode_positions(v))
+        return self.encode(sorted(s))
+
 
 class Characters(Positions):
     """Stores token position and character start and end information for each
             weights[t.text] += t.boost
 
         for w, poslist in iteritems(seen):
-            deltas = []
-            posbase = 0
-            charbase = 0
-            for pos, startchar, endchar in poslist:
-                deltas.append((pos - posbase, startchar - charbase,
-                               endchar - startchar))
-                posbase = pos
-                charbase = endchar
-                value = pack_uint(len(deltas)) + dumps(deltas, -1)[2:-1]
+            value = self.encode(poslist)
             yield (w, len(poslist), weights[w] * fb, value)
 
+    def encode(self, poslist):
+        deltas = []
+        posbase = 0
+        charbase = 0
+        for pos, startchar, endchar in poslist:
+            deltas.append((pos - posbase, startchar - charbase,
+                           endchar - startchar))
+            posbase = pos
+            charbase = endchar
+        return pack_uint(len(deltas)) + dumps(deltas, -1)[2:-1]
+
     def decode_characters(self, valuestring):
         codes = loads(valuestring[_INT_SIZE:] + b("."))
         position = 0
             posns.append(position)
         return posns
 
+    def combine(self, vs):
+        s = {}
+        for v in vs:
+            for pos, sc, ec in self.decode_characters(v):
+                if pos in s:
+                    old_sc, old_ec = pos[s]
+                    s[pos] = (min(sc, old_sc), max(ec, old_ec))
+                else:
+                    s[pos] = (sc, ec)
+        poses = [(pos, s[pos][0], s[pos][1]) for pos in sorted(s.keys())]
+        return self.encode(poses)
+
 
 class PositionBoosts(Positions):
     """A format that stores positions and per-position boost information
             seen[t.text].append((pos, boost))
 
         for w, poses in iteritems(seen):
-            codes = []
-            base = 0
-            summedboost = 0
-            for pos, boost in poses:
-                summedboost += boost
-                codes.append((pos - base, boost))
-                base = pos
-            value = (pack_uint(len(poses)) + pack_float(summedboost)
-                     + dumps(codes, -1)[2:-1])
+            value = self.encode(poses)
             yield (w, len(poses), sum(p[1] for p in poses) * fb, value)
 
+    def encode(self, poses):
+        codes = []
+        base = 0
+        summedboost = 0
+        for pos, boost in poses:
+            summedboost += boost
+            codes.append((pos - base, boost))
+            base = pos
+        return (pack_uint(len(poses)) + pack_float(summedboost)
+                + dumps(codes, -1)[2:-1])
+
     def decode_position_boosts(self, valuestring):
         codes = loads(valuestring[_INT_SIZE + _FLOAT_SIZE:] + b("."))
         position = 0
         summedboost = unpack_float(v[_INT_SIZE:_INT_SIZE + _FLOAT_SIZE])[0]
         return summedboost * self.field_boost
 
+    def combine(self, vs):
+        s = defaultdict(float)
+        for v in vs:
+            for pos, boost in self.decode_position_boosts(v):
+                s[pos] += boost
+        return self.encode(sorted(s.items()))
+
 
 class CharacterBoosts(Characters):
     """A format that stores positions, character start and end, and
     """
 
     def word_values(self, value, analyzer, **kwargs):
-        fb = self.field_boost
         seen = defaultdict(list)
 
         kwargs["positions"] = True
             seen[t.text].append((t.pos, t.startchar, t.endchar, t.boost))
 
         for w, poses in iteritems(seen):
-            # posns_chars_boosts = [(pos, startchar, endchar, boost), ...]
-            codes = []
-            posbase = 0
-            charbase = 0
-            summedboost = 0
-            for pos, startchar, endchar, boost in poses:
-                codes.append((pos - posbase, startchar - charbase,
-                              endchar - startchar, boost))
-                posbase = pos
-                charbase = endchar
-                summedboost += boost
+            value, summedboost = self.encode(poses)
+            yield (w, len(poses), summedboost, value)
 
-            value = (pack_uint(len(poses)) + pack_float(summedboost * fb)
-                     + dumps(codes, -1)[2:-1])
+    def encode(self, poses):
+        fb = self.field_boost
+        # posns_chars_boosts = [(pos, startchar, endchar, boost), ...]
+        codes = []
+        posbase = 0
+        charbase = 0
+        summedboost = 0
+        for pos, startchar, endchar, boost in poses:
+            codes.append((pos - posbase, startchar - charbase,
+                          endchar - startchar, boost))
+            posbase = pos
+            charbase = endchar
+            summedboost += boost
 
-            yield (w, len(poses), summedboost * fb, value)
+        return ((pack_uint(len(poses)) + pack_float(summedboost * fb)
+                 + dumps(codes, -1)[2:-1]), summedboost)
 
     def decode_character_boosts(self, valuestring):
         codes = loads(valuestring[_INT_SIZE + _FLOAT_SIZE:] + b("."))
         return [(pos, boost) for pos, _, _, boost
                 in self.decode_character_boosts(valuestring)]
 
+    def combine(self, vs):
+        s = {}
+        for v in vs:
+            for pos, sc, ec, boost in self.decode_character_boosts(v):
+                if pos in s:
+                    old_sc, old_ec, old_boost = pos[s]
+                    s[pos] = (min(sc, old_sc), max(ec, old_ec),
+                              old_boost + boost)
+                else:
+                    s[pos] = (sc, ec, boost)
+        poses = [(pos, sc, ec, boost) for pos, (sc, ec, boost)
+                 in sorted(s.items())]
+        return self.encode(poses)[0]  # encode() returns value, summedboost
 
 
 
 
 
 
+
+

src/whoosh/matching.py

-# Copyright 2010 Matt Chaput. All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are met:
-#
-#    1. Redistributions of source code must retain the above copyright notice,
-#       this list of conditions and the following disclaimer.
-#
-#    2. Redistributions in binary form must reproduce the above copyright
-#       notice, this list of conditions and the following disclaimer in the
-#       documentation and/or other materials provided with the distribution.
-#
-# THIS SOFTWARE IS PROVIDED BY MATT CHAPUT ``AS IS'' AND ANY EXPRESS OR
-# IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
-# MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
-# EVENT SHALL MATT CHAPUT OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
-# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
-# OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
-# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
-# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
-# EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-#
-# The views and conclusions contained in the software and documentation are
-# those of the authors and should not be interpreted as representing official
-# policies, either expressed or implied, of Matt Chaput.
-
-"""
-This module contains "matcher" classes. Matchers deal with posting lists. The
-most basic matcher, which reads the list of postings for a term, will be
-provided by the backend implementation (for example,
-:class:`whoosh.filedb.filepostings.FilePostingReader`). The classes in this
-module provide additional functionality, such as combining the results of two
-matchers, or modifying the results of a matcher.
-
-You do not need to deal with the classes in this module unless you need to
-write your own Matcher implementation to provide some new functionality. These
-classes are not instantiated by the user. They are usually created by a
-:class:`~whoosh.query.Query` object's :meth:`~whoosh.query.Query.matcher()`
-method, which returns the appropriate matcher to implement the query (for
-example, the :class:`~whoosh.query.Or` query's
-:meth:`~whoosh.query.Or.matcher()` method returns a
-:py:class:`~whoosh.matching.UnionMatcher` object).
-
-Certain backends support "quality" optimizations. These backends have the
-ability to skip ahead if it knows the current block of postings can't
-contribute to the top N documents. If the matcher tree and backend support
-these optimizations, the matcher's :meth:`Matcher.supports_block_quality()`
-method will return ``True``.
-"""
-
-import sys
-from itertools import repeat
-
-from whoosh.compat import izip, xrange
-from whoosh.util import abstractmethod
-
-
-class ReadTooFar(Exception):
-    """Raised when :meth:`~whoosh.matching.Matcher.next()` or
-    :meth:`~whoosh.matching.Matcher.skip_to()` are called on an inactive
-    matcher.
-    """
-
-
-class NoQualityAvailable(Exception):
-    """Raised when quality methods are called on a matcher that does not
-    support block quality optimizations.
-    """
-
-
-# Matchers
-
-class Matcher(object):
-    """Base class for all matchers.
-    """
-
-    @abstractmethod
-    def is_active(self):
-        """Returns True if this matcher is still "active", that is, it has not
-        yet reached the end of the posting list.
-        """
-
-        raise NotImplementedError
-
-    @abstractmethod
-    def reset(self):
-        """Returns to the start of the posting list.
-
-        Note that reset() may not do what you expect after you call
-        :meth:`Matcher.replace()`, since this can mean calling reset() not on
-        the original matcher, but on an optimized replacement.
-        """
-
-        raise NotImplementedError
-
-    def term(self):
-        """Returns a ``("fieldname", "termtext")`` tuple for the term this matcher
-        matches, or None if this matcher is not a term matcher.
-        """
-
-        return None
-
-    def term_matchers(self):
-        """Returns an iterator of term matchers in this tree.
-        """
-
-        if self.term() is not None:
-            yield self
-        else:
-            for cm in self.children():
-                for m in cm.term_matchers():
-                    yield m
-
-    def matching_terms(self, id=None):
-        """Returns an iterator of ``("fieldname", "termtext")`` tuples for the
-        **currently matching** term matchers in this tree.
-        """
-
-        if not self.is_active():
-            return
-
-        if id is None:
-            id = self.id()
-        elif id != self.id():
-            return
-
-        t = self.term()
-        if t is None:
-            for c in self.children():
-                for t in c.matching_terms(id):
-                    yield t
-        else:
-            yield t
-
-    def children(self):
-        """Returns an (possibly empty) list of the submatchers of this
-        matcher.
-        """
-
-        return []
-
-    def replace(self, minquality=0):
-        """Returns a possibly-simplified version of this matcher. For example,
-        if one of the children of a UnionMatcher is no longer active, calling
-        this method on the UnionMatcher will return the other child.
-        """
-
-        return self
-
-    @abstractmethod
-    def copy(self):
-        """Returns a copy of this matcher.
-        """
-
-        raise NotImplementedError
-
-    def depth(self):
-        """Returns the depth of the tree under this matcher, or 0 if this
-        matcher does not have any children.
-        """
-
-        return 0
-
-    def supports_block_quality(self):
-        """Returns True if this matcher supports the use of ``quality`` and
-        ``block_quality``.
-        """
-
-        return False
-
-    def block_quality(self):
-        """Returns a quality measurement of the current block of postings,
-        according to the current weighting algorithm. Raises
-        ``NoQualityAvailable`` if the matcher or weighting do not support
-        quality measurements.
-        """
-
-        raise NoQualityAvailable(self.__class__)
-
-    @abstractmethod
-    def id(self):
-        """Returns the ID of the current posting.
-        """
-
-        raise NotImplementedError
-
-    def all_ids(self):
-        """Returns a generator of all IDs in the matcher.
-
-        What this method returns for a matcher that has already read some
-        postings (whether it only yields the remaining postings or all postings
-        from the beginning) is undefined, so it's best to only use this method
-        on fresh matchers.
-        """
-
-        i = 0
-        while self.is_active():
-            yield self.id()
-            self.next()
-            i += 1
-            if i == 10:
-                self = self.replace()
-                i = 0
-
-    def all_items(self):
-        """Returns a generator of all (ID, encoded value) pairs in the matcher.
-
-        What this method returns for a matcher that has already read some
-        postings (whether it only yields the remaining postings or all postings
-        from the beginning) is undefined, so it's best to only use this method
-        on fresh matchers.
-        """
-
-        i = 0
-        while self.is_active():
-            yield (self.id(), self.value())
-            self.next()
-            i += 1
-            if i == 10:
-                self = self.replace()
-                i = 0
-
-    def items_as(self, astype):
-        """Returns a generator of all (ID, decoded value) pairs in the matcher.
-
-        What this method returns for a matcher that has already read some
-        postings (whether it only yields the remaining postings or all postings
-        from the beginning) is undefined, so it's best to only use this method
-        on fresh matchers.
-        """
-
-        while self.is_active():
-            yield (self.id(), self.value_as(astype))
-
-    @abstractmethod
-    def value(self):
-        """Returns the encoded value of the current posting.
-        """
-
-        raise NotImplementedError
-
-    @abstractmethod
-    def supports(self, astype):
-        """Returns True if the field's format supports the named data type,
-        for example 'frequency' or 'characters'.
-        """
-
-        raise NotImplementedError("supports not implemented in %s"
-                                  % self.__class__)
-
-    @abstractmethod
-    def value_as(self, astype):
-        """Returns the value(s) of the current posting as the given type.
-        """
-
-        raise NotImplementedError("value_as not implemented in %s"
-                                  % self.__class__)
-
-    def spans(self):
-        """Returns a list of :class:`whoosh.spans.Span` objects for the matches
-        in this document. Raises an exception if the field being searched does
-        not store positions.
-        """
-
-        from whoosh.spans import Span
-        if self.supports("characters"):
-            return [Span(pos, startchar=startchar, endchar=endchar)
-                    for pos, startchar, endchar in self.value_as("characters")]
-        elif self.supports("positions"):
-            return [Span(pos) for pos in self.value_as("positions")]
-        else:
-            raise Exception("Field does not support spans")
-
-    def skip_to(self, id):
-        """Moves this matcher to the first posting with an ID equal to or
-        greater than the given ID.
-        """
-
-        while self.is_active() and self.id() < id:
-            self.next()
-
-    def skip_to_quality(self, minquality):
-        """Moves this matcher to the next block with greater than the given
-        minimum quality value.
-        """
-
-        raise NotImplementedError(self.__class__.__name__)
-
-    @abstractmethod
-    def next(self):
-        """Moves this matcher to the next posting.
-        """
-
-        raise NotImplementedError(self.__class__.__name__)
-
-    def weight(self):
-        """Returns the weight of the current posting.
-        """
-
-        return self.value_as("weight")
-
-    @abstractmethod
-    def score(self):
-        """Returns the score of the current posting.
-        """
-
-        raise NotImplementedError(self.__class__.__name__)
-
-    def __eq__(self, other):
-        return self.__class__ is type(other)
-
-    def __lt__(self, other):
-        return type(other) is self.__class__
-
-    def __ne__(self, other):
-        return not self.__eq__(other)
-
-    def __gt__(self, other):
-        return not (self.__lt__(other) or self.__eq__(other))
-
-    def __le__(self, other):
-        return self.__eq__(other) or self.__lt__(other)
-
-    def __ge__(self, other):
-        return self.__eq__(other) or self.__gt__(other)
-
-
-class NullMatcherClass(Matcher):
-    """Matcher with no postings which is never active.
-    """
-
-    def __call__(self):
-        return self
-
-    def supports_block_quality(self):
-        return True
-
-    def block_quality(self):
-        return 0
-
-    def skip_to_quality(self, minquality):