1. Matt Chaput
  2. whoosh

Source

whoosh / src / whoosh / multiproc.py

# Copyright 2011 Matt Chaput. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
#    1. Redistributions of source code must retain the above copyright notice,
#       this list of conditions and the following disclaimer.
#
#    2. Redistributions in binary form must reproduce the above copyright
#       notice, this list of conditions and the following disclaimer in the
#       documentation and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY MATT CHAPUT ``AS IS'' AND ANY EXPRESS OR
# IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
# EVENT SHALL MATT CHAPUT OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
# OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
# EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
# The views and conclusions contained in the software and documentation are
# those of the authors and should not be interpreted as representing official
# policies, either expressed or implied, of Matt Chaput.

from __future__ import with_statement
import os
from multiprocessing import Process, Queue, cpu_count

from whoosh.compat import xrange, iteritems, pickle
from whoosh.codec import base
from whoosh.writing import PostingPool, SegmentWriter
from whoosh.externalsort import imerge
from whoosh.util import random_name


def finish_subsegment(writer, k=64):
    # Tell the pool to finish up the current file
    writer.pool.save()
    # Tell the pool to merge any and all runs in the pool until there
    # is only one run remaining. "k" is an optional parameter passed
    # from the parent which sets the maximum number of files to open
    # while reducing.
    writer.pool.reduce_to(1, k)

    # The filename of the single remaining run
    runname = writer.pool.runs[0]
    # The indexed field names
    fieldnames = writer.pool.fieldnames
    # The segment object (parent can use this to re-open the files created
    # by the sub-writer)
    segment = writer._partial_segment()

    return runname, fieldnames, segment


# Multiprocessing Writer

class SubWriterTask(Process):
    # This is a Process object that takes "jobs" off a job Queue, processes
    # them, and when it's done, puts a summary of its work on a results Queue

    def __init__(self, storage, indexname, jobqueue, resultqueue, kwargs,
                 multisegment):
        Process.__init__(self)
        self.storage = storage
        self.indexname = indexname
        self.jobqueue = jobqueue
        self.resultqueue = resultqueue
        self.kwargs = kwargs
        self.multisegment = multisegment
        self.running = True

    def run(self):
        # This is the main loop of the process. OK, so the way this works is
        # kind of brittle and stupid, but I had to figure out how to use the
        # multiprocessing module, work around bugs, and address performance
        # issues, so there is at least some reasoning behind some of this

        # The "parent" task farms individual documents out to the subtasks for
        # indexing. You could pickle the actual documents and put them in the
        # queue, but that is not very performant. Instead, we assume the tasks
        # share a filesystem and use that to pass the information around. The
        # parent task writes a certain number of documents to a file, then puts
        # the filename on the "job queue". A subtask gets the filename off the
        # queue and reads through the file processing the documents.

        jobqueue = self.jobqueue
        resultqueue = self.resultqueue
        multisegment = self.multisegment

        # Open a placeholder object representing the index
        ix = self.storage.open_index(self.indexname)
        # Open a writer for the index. The _lk=False parameter means to not try
        # to lock the index (the parent object that started me takes care of
        # locking the index)
        writer = self.writer = SegmentWriter(ix, _lk=False, **self.kwargs)

        # If the parent task calls cancel() on me, it will set self.running to
        # False, so I'll notice the next time through the loop
        while self.running:
            # Take an object off the job queue
            jobinfo = jobqueue.get()
            # If the object is None, it means the parent task wants me to
            # finish up
            if jobinfo is None:
                break
            # The object from the queue is a tuple of (filename,
            # number_of_docs_in_file). Pass those two pieces of information as
            # arguments to _process_file().
            self._process_file(*jobinfo)

        if not self.running:
            # I was cancelled, so I'll cancel my underlying writer
            writer.cancel()
        else:
            if multisegment:
                # Actually finish the segment and return it with no run
                runname = None
                fieldnames = writer.pool.fieldnames
                segment = writer._finalize_segment()
            else:
                # Merge all runs in the writer's pool into one run, close the
                # segment, and return the run name and the segment
                k = self.kwargs.get("k", 64)
                runname, fieldnames, segment = finish_subsegment(writer, k)

            # Put the results (the run filename and the segment object) on the
            # result queue
            resultqueue.put((runname, fieldnames, segment), timeout=5)

    def _process_file(self, filename, doc_count):
        # This method processes a "job file" written out by the parent task. A
        # job file is a series of pickled (code, arguments) tuples. Currently
        # the only command codes is 0=add_document

        writer = self.writer
        tempstorage = writer.temp_storage()

        load = pickle.load
        with tempstorage.open_file(filename).raw_file() as f:
            for _ in xrange(doc_count):
                # Load the next pickled tuple from the file
                code, args = load(f)
                assert code == 0
                writer.add_document(**args)
        # Remove the job file
        tempstorage.delete_file(filename)

    def cancel(self):
        self.running = False


class MpWriter(SegmentWriter):
    def __init__(self, ix, procs=None, batchsize=100, subargs=None,
                 multisegment=False, **kwargs):
        # This is the "main" writer that will aggregate the results created by
        # the sub-tasks
        SegmentWriter.__init__(self, ix, **kwargs)

        self.procs = procs or cpu_count()
        # The maximum number of documents in each job file submitted to the
        # sub-tasks
        self.batchsize = batchsize
        # You can use keyword arguments or the "subargs" argument to pass
        # keyword arguments to the sub-writers
        self.subargs = subargs if subargs else kwargs
        # If multisegment is True, don't merge the segments created by the
        # sub-writers, just add them directly to the TOC
        self.multisegment = multisegment

        # A list to hold the sub-task Process objects
        self.tasks = []
        # A queue to pass the filenames of job files to the sub-tasks
        self.jobqueue = Queue(self.procs * 4)
        # A queue to get back the final results of the sub-tasks
        self.resultqueue = Queue()
        # A buffer for documents before they are flushed to a job file
        self.docbuffer = []

        self._grouping = 0
        self._added_sub = False

    def _new_task(self):
        task = SubWriterTask(self.storage, self.indexname,
                             self.jobqueue, self.resultqueue, self.subargs,
                             self.multisegment)
        self.tasks.append(task)
        task.start()
        return task

    def _enqueue(self):
        # Flush the documents stored in self.docbuffer to a file and put the
        # filename on the job queue
        docbuffer = self.docbuffer
        dump = pickle.dump
        length = len(docbuffer)

        filename = "%s.doclist" % random_name()
        with self.temp_storage().create_file(filename).raw_file() as f:
            for item in docbuffer:
                dump(item, f, -1)

        if len(self.tasks) < self.procs:
            self._new_task()
        jobinfo = (filename, length)
        self.jobqueue.put(jobinfo)
        self.docbuffer = []

    def cancel(self):
        try:
            for task in self.tasks:
                task.cancel()
        finally:
            SegmentWriter.cancel(self)

    def start_group(self):
        self._grouping += 1

    def end_group(self):
        if not self._grouping:
            raise Exception("Unbalanced end_group")
        self._grouping -= 1

    def add_document(self, **fields):
        # Add the document to the docbuffer
        self.docbuffer.append((0, fields))
        # If the buffer is full, flush it to the job queue
        if not self._grouping and len(self.docbuffer) >= self.batchsize:
            self._enqueue()
        self._added_sub = True

    def _read_and_renumber_run(self, path, offset):
        # Note that SortingPool._read_run() automatically deletes the run file
        # when it's finished

        gen = self.pool._read_run(path)
        # If offset is 0, just return the items unchanged
        if not offset:
            return gen
        else:
            # Otherwise, add the offset to each docnum
            return ((fname, text, docnum + offset, weight, value)
                    for fname, text, docnum, weight, value in gen)

    def commit(self, mergetype=None, optimize=None, merge=None):
        if self._added_sub:
            # If documents have been added to sub-writers, use the parallel
            # merge commit code
            self._commit(mergetype, optimize, merge)
        else:
            # Otherwise, just do a regular-old commit
            SegmentWriter.commit(self, mergetype=mergetype, optimize=optimize,
                                 merge=merge)

    def _commit(self, mergetype, optimize, merge):
        # Index the remaining documents in the doc buffer
        if self.docbuffer:
            self._enqueue()
        # Tell the tasks to finish
        for task in self.tasks:
            self.jobqueue.put(None)

        # Merge existing segments
        finalsegments = self._merge_segments(mergetype, optimize, merge)

        # Wait for the subtasks to finish
        for task in self.tasks:
            task.join()

        # Pull a (run_file_name, fieldnames, segment) tuple off the result
        # queue for each sub-task, representing the final results of the task
        results = []
        for task in self.tasks:
            results.append(self.resultqueue.get(timeout=5))

        if self.multisegment:
            # If we're not merging the segments, we don't care about the runname
            # and fieldnames in the results... just pull out the segments and
            # add them to the list of final segments
            finalsegments += [s for _, _, s in results]
            if self._added:
                finalsegments.append(self._finalize_segment())
            else:
                self._close_segment()
            assert self.perdocwriter.is_closed
        else:
            # Merge the posting sources from the sub-writers and my
            # postings into this writer
            self._merge_subsegments(results, mergetype)
            self._close_segment()
            self._assemble_segment()
            finalsegments.append(self.get_segment())
            assert self.perdocwriter.is_closed

        self._commit_toc(finalsegments)
        self._finish()

    def _merge_subsegments(self, results, mergetype):
        schema = self.schema
        schemanames = set(schema.names())
        storage = self.storage
        codec = self.codec
        sources = []

        # If information was added to this writer the conventional (e.g.
        # through add_reader or merging segments), add it as an extra source
        if self._added:
            sources.append(self.pool.iter_postings())

        pdrs = []
        for runname, fieldnames, segment in results:
            fieldnames = set(fieldnames) | schemanames
            pdr = codec.per_document_reader(storage, segment)
            pdrs.append(pdr)
            basedoc = self.docnum
            docmap = self.write_per_doc(fieldnames, pdr)
            assert docmap is None

            items = self._read_and_renumber_run(runname, basedoc)
            sources.append(items)

        # Create a MultiLengths object combining the length files from the
        # subtask segments
        self.perdocwriter.close()
        pdrs.insert(0, self.per_document_reader())
        mpdr = base.MultiPerDocumentReader(pdrs)

        try:
            # Merge the iterators into the field writer
            self.fieldwriter.add_postings(schema, mpdr, imerge(sources))
        finally:
            mpdr.close()
        self._added = True


class SerialMpWriter(MpWriter):
    # A non-parallel version of the MpWriter for testing purposes

    def __init__(self, ix, procs=None, batchsize=100, subargs=None, **kwargs):
        SegmentWriter.__init__(self, ix, **kwargs)

        self.procs = procs or cpu_count()
        self.batchsize = batchsize
        self.subargs = subargs if subargs else kwargs
        self.tasks = [SegmentWriter(ix, _lk=False, **self.subargs)
                      for _ in xrange(self.procs)]
        self.pointer = 0
        self._added_sub = False

    def add_document(self, **fields):
        self.tasks[self.pointer].add_document(**fields)
        self.pointer = (self.pointer + 1) % len(self.tasks)
        self._added_sub = True

    def _commit(self, mergetype, optimize, merge):
        # Pull a (run_file_name, segment) tuple off the result queue for each
        # sub-task, representing the final results of the task

        # Merge existing segments
        finalsegments = self._merge_segments(mergetype, optimize, merge)
        results = []
        for writer in self.tasks:
            results.append(finish_subsegment(writer))

        self._merge_subsegments(results, mergetype)
        self._close_segment()
        self._assemble_segment()
        finalsegments.append(self.get_segment())

        self._commit_toc(finalsegments)
        self._finish()


# For compatibility with old multiproc module
class MultiSegmentWriter(MpWriter):
    def __init__(self, *args, **kwargs):
        MpWriter.__init__(self, *args, **kwargs)
        self.multisegment = True