1. Matt Chaput
  2. whoosh

Commits

Matt Chaput  committed 8255b65

More work on MpWriter implementation.

  • Participants
  • Parent commits 5eeadbe
  • Branches default

Comments (0)

Files changed (8)

File src/whoosh/codec/base.py

View file
 
 
 class MultiLengths(LengthsReader):
-    def __init__(self, lengths):
+    def __init__(self, lengths, offset=0):
         self.lengths = lengths
         self.doc_offsets = []
         self._count = 0

File src/whoosh/codec/whoosh2.py

View file
         self.directory.append(pack_stored_pointer(f.tell(), len(vstring)))
         f.write(vstring)
 
+    def add_reader(self, sfreader):
+        add = self.add
+        for vdict in sfreader:
+            add(vdict)
+
     def close(self):
         f = self.dbfile
         dirpos = f.tell()
         dbfile.seek(0)
         dirpos = dbfile.read_long()
         self.length = dbfile.read_uint()
+        self.basepos = dbfile.tell()
 
         dbfile.seek(dirpos)
 
     def close(self):
         self.dbfile.close()
 
+    def __iter__(self):
+        dbfile = self.dbfile
+        names = self.names
+        lengths = array("I")
+
+        dbfile.seek(self.directory_offset)
+        for i in xrange(self.length):
+            dbfile.seek(_LONG_SIZE, 1)
+            lengths.append(dbfile.read_uint())
+
+        dbfile.seek(self.basepos)
+        for length in lengths:
+            vlist = loads(dbfile.read(length) + b("."))
+            vdict = dict((names[i], vlist[i]) for i in xrange(len(vlist))
+                     if vlist[i] is not None)
+            yield vdict
+
     def __getitem__(self, num):
         if num > self.length - 1:
             raise IndexError("Tried to get document %s, file has %s"

File src/whoosh/filedb/fileindex.py

View file
 
     def writer(self, procs=1, **kwargs):
         if procs > 1:
-            from whoosh.filedb.multiproc import MpWriter
+            from whoosh.filedb.multiproc2 import MpWriter
             return MpWriter(self, **kwargs)
         else:
             from whoosh.filedb.filewriting import SegmentWriter

File src/whoosh/filedb/filewriting.py

View file
         return self.pool.iter_postings()
 
     def add_postings(self, lengths, items, startdoc, docmap):
-        # fieldname, text, docnum, weight, valuestring
+        # items = (fieldname, text, docnum, weight, valuestring) ...
         schema = self.schema
+        # Make a generator to strip out deleted fields and renumber the docs
+        # before passing them down to the field writer
         def gen():
             for fieldname, text, docnum, weight, valuestring in items:
                 if fieldname not in schema:
                 else:
                     newdoc = startdoc + docnum
                 yield (fieldname, text, newdoc, weight, valuestring)
+
         self.fieldwriter.add_postings(schema, lengths, gen())
 
     def _make_docmap(self, reader, newdoc):
                 items = field.index(value)
                 # Only store the length if the field is marked scorable
                 scorable = field.scorable
-                length = 0
                 # Add the terms to the pool
                 for text, freq, weight, valuestring in items:
                     #assert w != ""
         self._added = True
         self.docnum += 1
 
+    def doc_count(self):
+        return self.docnum - self.docbase
+
+    def get_segment(self):
+        newsegment = self.newsegment
+        newsegment.doccount = self.doc_count()
+        return newsegment
+
+    def partial_segment(self):
+        self._check_state()
+        self._close_all()
+        return self.get_segment()
+
     def _close_all(self):
         self.is_closed = True
         self.perdocwriter.close()
         self.fieldwriter.close()
         self.storage.close()
 
-    def doc_count(self):
-        return self.docnum - self.docbase
+    def _finish_toc(self, newsegment, segments):
+        if self._added and self.compound:
+            # Assemble the segment files into a compound file
+            newsegment.create_compound_file(self.storage)
+            newsegment.compound = True
+
+        segments = segments
+        # Write a new TOC with the new segment list (and delete old files)
+        self.codec.commit_toc(self.storage, self.indexname, self.schema,
+                              segments + [newsegment], self.generation)
+
+    def _release_lock(self):
+        if self.writelock:
+            self.writelock.release()
 
     def commit(self, mergetype=None, optimize=False, merge=True):
         """Finishes writing and saves all additions and changes to disk.
 
             if self._added:
                 # Update the new segment with the current doc count
-                newsegment = self.newsegment
-                newsegment.doccount = self.doc_count()
+                newsegment = self.get_segment()
 
                 # Output the sorted pool postings to the terms index and
                 # posting files
                 lengths = self.perdocwriter.lengths_reader()
                 self.fieldwriter.add_postings(schema, lengths,
                                               self.pool.iter_postings())
-
-                # Add the new segment to the list of remaining segments
-                # returned by the merge policy function
-                finalsegments.append(newsegment)
             else:
                 self.pool.cleanup()
 
             # Close all files
             self._close_all()
-
-            if self._added and self.compound:
-                # Assemble the segment files into a compound file
-                newsegment.create_compound_file(storage)
-                newsegment.compound = True
-
-            # Write a new TOC with the new segment list (and delete old files)
-            self.codec.commit_toc(storage, self.indexname, schema,
-                                  finalsegments, self.generation)
+            # Write the new TOC
+            self._finish_toc(newsegment, finalsegments)
         finally:
-            if self.writelock:
-                self.writelock.release()
+            self._release_lock()
 
     def cancel(self):
         self._check_state()

File src/whoosh/filedb/multiproc2.py

View file
 # those of the authors and should not be interpreted as representing official
 # policies, either expressed or implied, of Matt Chaput.
 
-import marshal, os, tempfile
+
+import os, tempfile
 from multiprocessing import Process, Queue, cpu_count
 
 from whoosh.compat import xrange, iteritems, pickle
-from whoosh.filedb.filetables import Lengths
+from whoosh.codec import base
 from whoosh.filedb.filewriting import SegmentWriter
-from whoosh.support.externalsort import imerge
-from whoosh.writing import IndexWriter
+from whoosh.support.externalsort import imerge, SortingPool
+
+
+def finish_subsegment(writer, k=64):
+    # Tell the pool to finish up the current file
+    writer.pool.save()
+    # Tell the pool to merge any and all runs in the pool until there
+    # is only one run remaining. "k" is an optional parameter passed
+    # from the parent which sets the maximum number of files to open
+    # while reducing.
+    writer.pool.reduce_to(1, k)
+
+    # The filename of the single remaining run
+    runname = writer.pool.runs[0]
+    # The segment ID (parent can use this to re-open the files created
+    # by my sub-writer)
+    segment = writer.partial_segment()
+
+    return runname, segment
 
 
 # Multiprocessing Writer
 
 class SubWriterTask(Process):
+    # This is a Process object that takes "jobs" off a job Queue, processes
+    # them, and when it's done, puts a summary of its work on a results Queue
+
     def __init__(self, storage, indexname, jobqueue, resultqueue, kwargs):
         Process.__init__(self)
         self.storage = storage
         self.kwargs = kwargs
         self.running = True
 
-    def _process_file(self, filename, length):
+    def run(self):
+        # This is the main loop of the process. OK, so the way this works is
+        # kind of brittle and stupid, but I had to figure out how to use the
+        # multiprocessing module, work around bugs, and address performance
+        # issues, so there is at least some reasoning behind some of this
+
+        # The "parent" task farms individual documents out to the subtasks for
+        # indexing. You could pickle the actual documents and put them in the
+        # queue, but that is not very performant. Instead, we assume the tasks
+        # share a filesystem and use that to pass the information around. The
+        # parent task writes a certain number of documents to a file, then puts
+        # the filename on the "job queue". A subtask gets the filename off the
+        # queue and reads through the file processing the documents.
+
+        jobqueue = self.jobqueue
+        resultqueue = self.resultqueue
+
+        # Open a placeholder object representing the index
+        ix = self.storage.open_index(self.indexname)
+        # Open a writer for the index. The _lk=False parameter means to not try
+        # to lock the index (the parent object that started me takes care of
+        # locking the index)
+        writer = self.writer = SegmentWriter(ix, _lk=False, **self.kwargs)
+
+        # If the parent task calls cancel() on me, it will set self.running to
+        # False, so I'll notice the next time through the loop
+        while self.running:
+            # Take an object off the job queue
+            jobinfo = jobqueue.get()
+            # If the object is None, it means the parent task wants me to
+            # finish up
+            if jobinfo is None:
+                break
+            # The object from the queue is a tuple of (filename,
+            # number_of_docs_in_file). Pass those two pieces of information as
+            # arguments to _process_file().
+            self._process_file(*jobinfo)
+
+        if not self.running:
+            # I was cancelled, so I'll cancel my underlying writer
+            writer.cancel()
+        else:
+            runname, segment = finish_subsegment(writer,
+                                                 self.kwargs.get("k", 64))
+            # Put the results (the run filename and the segment object) on the
+            # result queue
+            resultqueue.put((runname, segment), timeout=5)
+
+    def _process_file(self, filename, doc_count):
+        # This method processes a "job file" written out by the parent task. A
+        # job file is a series of pickled (code, arguments) tuples. Currently
+        # the only two command codes are 0=add_document, and 1=update_document
+
         writer = self.writer
         load = pickle.load
         f = open(filename, "rb")
-        for _ in xrange(length):
+        for _ in xrange(doc_count):
+            # Load the next pickled tuple from the file
             code, args = load(f)
             if code == 0:
                 writer.add_document(**args)
             elif code == 1:
                 writer.update_document(**args)
         f.close()
+        # Remove the job file
         os.remove(filename)
 
-    def run(self):
-        jobqueue = self.jobqueue
-        resultqueue = self.resultqueue
-        ix = self.storage.open_index(self.indexname)
-        writer = self.writer = SegmentWriter(ix, _lk=False, **self.kwargs)
-
-        while self.running:
-            jobinfo = jobqueue.get()
-            if jobinfo is None:
-                break
-            self._process_file(*jobinfo)
-
-        if not self.running:
-            writer.cancel()
-        else:
-            writer.pool.save()
-            writer.pool.reduce_to(1, self.kwargs.get("k", 64))
-            runname = writer.pool.runs[0]
-            doccount = writer.doc_count()
-            lenname, lenfile = self.storage.create_temp()
-            writer.lengths.to_file(lenfile, doccount)
-            resultqueue.put((runname, doccount, lenname), timeout=5)
-
     def cancel(self):
         self.running = False
 
 
-class MpWriter(IndexWriter):
-    def __init__(self, ix, procs=None, batchsize=100, subargs=None,
-                 combine=True, ** kwargs):
-        self.index = ix
-        self.writer = self.index.writer(**kwargs)
+class MpWriter(SegmentWriter):
+    def __init__(self, ix, procs=None, batchsize=100, subargs=None, **kwargs):
+        # This is the "main" writer that will aggregate the results created by
+        # the sub-tasks
+        SegmentWriter.__init__(self, ix, **kwargs)
+
         self.procs = procs or cpu_count()
+        # The maximum number of documents in each job file submitted to the
+        # sub-tasks
         self.batchsize = batchsize
+        # You can use keyword arguments or the "subargs" argument to pass
+        # keyword arguments to the sub-writers
         self.subargs = subargs if subargs else kwargs
-        self.combine = combine
 
+        # A list to hold the sub-task Process objects
         self.tasks = []
+        # A queue to pass the filenames of job files to the sub-tasks
         self.jobqueue = Queue(self.procs * 4)
+        # A queue to get back the final results of the sub-tasks
         self.resultqueue = Queue()
+        # A buffer for documents before they are flushed to a job file
         self.docbuffer = []
-        self._to_delete = set()
+
+        self._count = 0
 
     def _new_task(self):
         task = SubWriterTask(self.index.storage, self.index.indexname,
         task.start()
         return task
 
-    def delete_document(self, docnum):
-        self.writer.delete_document(docnum)
-
     def _enqueue(self):
+        # Flush the documents stored in self.docbuffer to a file and put the
+        # filename on the job queue
         docbuffer = self.docbuffer
         dump = pickle.dump
         length = len(docbuffer)
             for task in self.tasks:
                 task.cancel()
         finally:
-            self.writer.cancel()
+            SegmentWriter.cancel(self)
 
-    def add_document(self, **fields):
-        self.docbuffer.append((0, fields))
+    def _put_command(self, code, args):
+        # Add the document to the docbuffer
+        self.docbuffer.append((code, args))
+        # If the buffer is full, flush it to the job queue
         if len(self.docbuffer) >= self.batchsize:
             self._enqueue()
 
+    def add_document(self, **fields):
+        self._put_command(0, fields)
+
     def update_document(self, **fields):
-        self.docbuffer.append((1, fields))
-        if len(self.docbuffer) >= self.batchsize:
-            self._enqueue()
+        self._put_command(1, fields)
 
     def _read_and_renumber_run(self, path, offset):
-        load = marshal.load
-        f = open(path, "rb")
-        try:
-            while True:
-                fname, text, docnum, weight, value = load(f)
-                yield (fname, text, docnum + offset, weight, value)
-        except EOFError:
-            return
-        finally:
-            f.close()
-            os.remove(path)
+        # Note that SortingPool._read_run() automatically deletes the run file
+        # when it's finished
+
+        gen = SortingPool._read_run(path)
+        # If offset is 0, just return the items unchanged
+        if not offset:
+            return gen
+        else:
+            # Otherwise, add the offset to each docnum
+            return ((fname, text, docnum + offset, weight, value)
+                    for fname, text, docnum, weight, value in gen)
 
     def commit(self, **kwargs):
-        writer = self.writer
-        pool = writer.pool
+        try:
+            # Index the remaining documents in the doc buffer
+            self._enqueue()
+            # Tell the tasks to finish
+            for task in self.tasks:
+                self.jobqueue.put(None)
+            # Wait for the tasks to finish
+            for task in self.tasks:
+                task.join()
 
-        # Index the remaining documents in the doc buffer
-        self._enqueue()
-        # Tell the tasks to finish
-        for task in self.tasks:
-            self.jobqueue.put(None)
-        # Wait for the tasks to finish
-        for task in self.tasks:
-            task.join()
-        # Get the results
-        results = []
-        for task in self.tasks:
-            # runname, doccount, lenname
-            results.append(self.resultqueue.get(timeout=5))
+            # Pull a (run_file_name, segment) tuple off the result queue for each
+            # sub-task, representing the final results of the task
+            results = []
+            for task in self.tasks:
+                results.append(self.resultqueue.get(timeout=5))
+            self._merge_subsegments(results)
+            self._close_all()
+            self._finish_toc(self.get_segment(), self.segments)
+        finally:
+            self._release_lock()
 
-        if results:
-            for runname, doccount, lenname in results:
-                f = writer.storage.open_file(lenname)
-                lengths = Lengths.from_file(f, doccount)
-                writer.lengths.add_other(lengths)
-                writer.storage.delete_file(lenname)
+    def _merge_subsegments(self, results):
+        schema = self.schema
+        storage = self.storage
+        pool = self.pool
+        codec = self.codec
 
-            base = results[0][1]
-            runreaders = [pool._read_run(results[0][0])]
-            for runname, doccount, lenname in results[1:]:
-                rr = self._read_and_renumber_run(runname, base)
-                runreaders.append(rr)
-                base += doccount
-            writer.termswriter.add_iter(imerge(runreaders), writer.lengths)
+        # Merge per-document information
+        pdw = self.perdocwriter
+        # Names of fields that store term vectors
+        vnames = set(schema.vector_names())
+        print "-self.docnum=", self.docnum
+        basedoc = self.docnum
+        # A list to remember field length readers for each sub-segment (we'll
+        # re-use them below)
+        lenreaders = []
+        for _, segment in results:
+            # Create a field length reader for the sub-segment
+            lenreader = codec.lengths_reader(storage, segment)
+            # Remember it in the list for later
+            lenreaders.append(lenreader)
+            # Vector reader for the sub-segment
+            vreader = codec.vector_reader(storage, segment)
+            # Stored field reader for the sub-segment
+            sfreader = codec.stored_fields_reader(storage, segment)
+            # Iterating on the stored field reader yields a dictionary of
+            # stored fields for *every* document in the segment (even if the
+            # document has no stored fields it should yield {})
+            for i, fs in enumerate(sfreader):
+                # Add the base doc count to the sub-segment doc num
+                pdw.start_doc(basedoc + i)
+                # Call add_field to store the field values and lengths
+                for fieldname, value in iteritems(fs):
+                    pdw.add_field(fieldname, schema[fieldname], value,
+                                  lenreader.doc_field_length(i, fieldname))
+                # Copy over the vectors. TODO: would be much faster to bulk-
+                # copy the postings
+                for fieldname in vnames:
+                    if (i, fieldname) in vreader:
+                        field = schema[fieldname]
+                        vmatcher = vreader.matcher(i, fieldname, field.vector)
+                        pdw.add_vector_matcher(fieldname, field, vmatcher)
+                pdw.finish_doc()
+            basedoc += segment.doccount
 
+        # Create a list of iterators from the run filenames
+        basedoc = self.docnum
+        sources = []
+        for runname, segment in results:
+            items = self._read_and_renumber_run(runname, basedoc)
+            sources.append(items)
+            basedoc += segment.doccount
 
+        # Create a MultiLengths object combining the length files from the
+        # subtask segments
+        mlens = base.MultiLengths(lenreaders)
+        # Merge the iterators into the field writer
+        self.fieldwriter.add_postings(schema, mlens, imerge(sources))
 
+        print "self.docnum=", self.docnum, "basedoc=", basedoc
+        self.docnum = basedoc
 
 
+class SerialMpWriter(MpWriter):
+    # A non-parallel version of the MpWriter for testing purposes
 
+    def __init__(self, ix, procs=None, batchsize=100, subargs=None, **kwargs):
+        SegmentWriter.__init__(self, ix, **kwargs)
 
+        self.procs = procs or cpu_count()
+        self.batchsize = batchsize
+        self.subargs = subargs if subargs else kwargs
+        self.tasks = [SegmentWriter(ix, _lk=False, **self.subargs)
+                      for _ in xrange(self.procs)]
+        self.pointer = 0
 
+    def add_document(self, **fields):
+        self.tasks[self.pointer].add_document(**fields)
+        self.pointer = (self.pointer + 1) % len(self.tasks)
+
+    def update_document(self, **fields):
+        self.tasks[self.pointer].update_document(**fields)
+        self.pointer = (self.pointer + 1) % len(self.tasks)
+
+    def commit(self, **kwargs):
+        # Pull a (run_file_name, segment) tuple off the result queue for each
+        # sub-task, representing the final results of the task
+        try:
+            results = []
+            for writer in self.tasks:
+                results.append(finish_subsegment(writer))
+            self._merge_subsegments(results)
+            self._close_all()
+            self._finish_toc(self.get_segment(), self.segments)
+        finally:
+            self._release_lock()
+
+
+

File tests/test_codecs.py

View file
         dw.add_field("b", fieldobj, "there", 1)
         dw.finish_doc()
 
-        dw.start_doc(0)
+        dw.start_doc(1)
         dw.add_field("a", fieldobj, "one", 1)
         dw.add_field("b", fieldobj, "two", 1)
+        dw.add_field("c", fieldobj, "three", 1)
         dw.finish_doc()
 
-        dw.start_doc(0)
+        dw.start_doc(2)
+        dw.finish_doc()
+
+        dw.start_doc(3)
         dw.add_field("a", fieldobj, "alfa", 1)
         dw.add_field("b", fieldobj, "bravo", 1)
         dw.finish_doc()
+
         dw.close()
 
         dr = codec.stored_fields_reader(st, seg)
         assert_equal(dr[0], {"a": "hello", "b": "there"})
-        assert_equal(dr[2], {"a": "alfa", "b": "bravo"})
-        assert_equal(dr[1], {"a": "one", "b": "two"})
+        # Note: access out of order
+        assert_equal(dr[3], {"a": "alfa", "b": "bravo"})
+        assert_equal(dr[1], {"a": "one", "b": "two", "c": "three"})
+        dr.close()
+
+        dr = codec.stored_fields_reader(st, seg)
+        sfs = list(dr)
+        assert_equal(sfs, [{"a": "hello", "b": "there"},
+                           {"a": "one", "b": "two", "c": "three"},
+                           {},
+                           {"a": "alfa", "b": "bravo"},
+                           ])
         dr.close()
 
 

File tests/test_mpwriter.py

View file
+from __future__ import with_statement
+import random
+
+from nose.tools import assert_equal  # @UnresolvedImport
+
+from whoosh import fields
+from whoosh.compat import u
+from whoosh.filedb.multiproc2 import MpWriter, SerialMpWriter
+from whoosh.support.testing import TempIndex
+from whoosh.util import now, permutations
+
+
+def test_serial():
+    schema = fields.Schema(a=fields.TEXT(stored=True, spelling=True,
+                                         vector=True))
+    domain = ["".join(ls) for ls in permutations(u("abcdef"))]
+    scrambled = domain[:]
+    random.shuffle(scrambled)
+
+    with TempIndex(schema) as ix:
+        t = now()
+        with SerialMpWriter(ix, procs=3) as w:
+            for ls in scrambled:
+                w.add_document(a="".join(ls))
+        print now() - t
+
+        with ix.searcher() as s:
+            assert_equal(list(s.lexicon("a")), domain)
+            assert_equal(s.doc_count_all(), 720)
+
+
+
+
+

File tests/test_parsing.py

View file
-from nose.tools import assert_equal  #@UnresolvedImport
+from nose.tools import assert_equal  # @UnresolvedImport
 
 from whoosh import analysis, fields, query
 from whoosh.compat import u, text_type
     p = default.QueryParser("t", None, [plugins.WhitespacePlugin()])
     assert_equal(repr(p.tag("hello there amiga")), "<AndGroup <None:'hello'>, < >, <None:'there'>, < >, <None:'amiga'>>")
 
+
 def test_singlequotes():
     p = default.QueryParser("t", None, [plugins.WhitespacePlugin(),
                                         plugins.SingleQuotePlugin()])
     assert_equal(repr(p.process("a 'b c' d")), "<AndGroup <None:'a'>, <None:'b c'>, <None:'d'>>")
 
+
 def test_prefix():
     p = default.QueryParser("t", None, [plugins.WhitespacePlugin(),
                                         plugins.PrefixPlugin()])
     assert_equal(repr(p.process("a b* c")), "<AndGroup <None:'a'>, <None:'b'*>, <None:'c'>>")
 
+
 def test_wild():
     p = default.QueryParser("t", None, [plugins.WhitespacePlugin(),
                                         plugins.WildcardPlugin()])
     assert_equal(repr(p.process("a b*c? d")), "<AndGroup <None:'a'>, <None:Wild 'b*c?'>, <None:'d'>>")
 
+
 def test_range():
     p = default.QueryParser("t", None, [plugins.WhitespacePlugin(),
                                         plugins.RangePlugin()])
     assert_equal(repr(p.process("[to c] d")), "<AndGroup <None:[None 'c']>, <None:'d'>>")
     assert_equal(repr(p.process("[to]")), "<AndGroup <None:[None None]>>")
 
+
 def test_sq_range():
     p = default.QueryParser("t", None, [plugins.WhitespacePlugin(),
                                         plugins.SingleQuotePlugin(),
                                         plugins.RangePlugin()])
     assert_equal(repr(p.process("['a b' to ']']")), "<AndGroup <None:['a b' ']']>>")
 
+
 def test_phrase():
     p = default.QueryParser("t", None, [plugins.WhitespacePlugin(),
                                         plugins.PhrasePlugin()])
     assert_equal(repr(p.process('"b c" d')), "<AndGroup <None:PhraseNode 'b c'~1>, <None:'d'>>")
     assert_equal(repr(p.process('"b c"')), "<AndGroup <None:PhraseNode 'b c'~1>>")
 
+
 def test_groups():
     p = default.QueryParser("t", None, [plugins.WhitespacePlugin(),
                                         plugins.GroupPlugin()])
     ns = p.process("a ((b c) d) e")
     assert_equal(repr(ns), "<AndGroup <None:'a'>, <AndGroup <AndGroup <None:'b'>, <None:'c'>>, <None:'d'>>, <None:'e'>>")
 
+
 def test_fieldnames():
     p = default.QueryParser("t", None, [plugins.WhitespacePlugin(),
                                         plugins.FieldsPlugin(),
     assert_equal(repr(ns), "<AndGroup <'a':'b'>, <None:'c'>, <AndGroup <'d':'e'>, <AndGroup <'f':'g'>, <'f':'h'>>>, <None:'i'>, <None:'j:'>>")
     assert_equal(repr(p.process("a:b:")), "<AndGroup <'a':'b:'>>")
 
+
 def test_operators():
     p = default.QueryParser("t", None, [plugins.WhitespacePlugin(),
                                         plugins.OperatorsPlugin()])
     ns = p.process("a OR b")
     assert_equal(repr(ns), "<AndGroup <OrGroup <None:'a'>, <None:'b'>>>")
 
+
 def test_boost():
     p = default.QueryParser("t", None, [plugins.WhitespacePlugin(),
                                         plugins.GroupPlugin(),
     q = qp.parse(u(""))
     assert_equal(q, query.NullQuery)
 
+
 def test_fields():
     s = fields.Schema(content=fields.TEXT, title=fields.TEXT, id=fields.ID)
     qp = default.QueryParser("content", s)
     assert_equal(q.fieldname, "title")
     assert_equal(q.text, "test")
 
+
 def test_multifield():
     schema = fields.Schema(content=fields.TEXT, title=fields.TEXT,
                            cat=fields.KEYWORD, date=fields.DATETIME)
     q = qp.parse(qs)
     assert_equal(text_type(q), "((x:a OR y:a) AND (((x:b OR y:b) AND (x:c OR y:c) AND cat:d) OR ((x:b OR y:b) AND (x:c OR y:c) AND cat:e)))")
 
+
 def test_fieldname_chars():
     s = fields.Schema(abc123=fields.TEXT, nisbah=fields.KEYWORD)
     qp = default.QueryParser("content", s)
     q = qp.parse(u("abc123 (xyz:123 OR qrs)"))
     assert_equal(text_type(q), "(content:abc123 AND (abc123:123 OR content:qrs))")
 
+
 def test_colonspace():
     s = fields.Schema(content=fields.TEXT, url=fields.ID)
     qp = default.QueryParser("content", s)
     assert_equal(q.fieldname, "foo")
     assert_equal(q.text, "blah:")
 
+
 def test_andor():
     qp = default.QueryParser("a", None)
     q = qp.parse("a AND b OR c AND d OR e AND f")
 
     assert_equal(qp.parse("OR"), query.Term("a", "OR"))
 
+
 def test_andnot():
     qp = default.QueryParser("content", None)
     q = qp.parse(u("this ANDNOT that"))
     assert_equal(q.__class__, query.AndNot)
     assert_equal(text_type(q), "((content:a AND content:b) ANDNOT content:c)")
 
+
 def test_boost_query():
     qp = default.QueryParser("content", None)
     q = qp.parse(u("this^3 fn:that^0.5 5.67 hi^5x"))
     assert_equal(q[1].boost, 2.5)
     assert_equal(q[2].text, "^3")
 
+
 def test_boosts():
     qp = default.QueryParser("t", None)
     q = qp.parse("alfa ((bravo^2)^3)^4 charlie")
     assert_equal(q.__unicode__(), "(t:alfa AND t:bravo^24.0 AND t:charlie)")
 
+
 def test_wildcard1():
     qp = default.QueryParser("content", None)
     q = qp.parse(u("hello *the?e* ?star*s? test"))
     assert_equal(q[3].__class__, query.Term)
     assert_equal(q[3].text, "test")
 
+
 def test_wildcard2():
     qp = default.QueryParser("content", None)
     q = qp.parse(u("*the?e*"))
     assert_equal(q.__class__, query.Wildcard)
     assert_equal(q.text, "*the?e*")
 
+
 def test_parse_fieldname_underscores():
     s = fields.Schema(my_name=fields.ID(stored=True), my_value=fields.TEXT)
     qp = default.QueryParser("my_value", schema=s)
     assert_equal(q.fieldname, "my_name")
     assert_equal(q.text, "Green")
 
+
 def test_endstar():
     qp = default.QueryParser("text", None)
     q = qp.parse(u("word*"))
     assert_equal(q[0].__class__, query.Prefix)
     assert_equal(q[0].text, "first")
 
+
 def test_singlequotes_query():
     qp = default.QueryParser("text", None)
     q = qp.parse("hell's hot 'i stab at thee'")
 #        assert q.__class__, query.Wildcard)
 #        assert q.text, u("ab\u005c*"))
 
+
 def test_phrase_phrase():
     qp = default.QueryParser("content", None)
     q = qp.parse('"alfa bravo" "charlie delta echo"^2.2 test:"foxtrot golf"')
     assert_equal(q[2].words, ["foxtrot", "golf"])
     assert_equal(q[2].fieldname, "test")
 
+
 def test_weird_characters():
     qp = default.QueryParser("content", None)
     q = qp.parse(u(".abcd@gmail.com"))
     assert_equal(q.__class__, query.Wildcard)
     assert_equal(q.text, "?")
 
+
 def test_euro_chars():
     schema = fields.Schema(text=fields.TEXT)
     qp = default.QueryParser("text", schema)
     assert_equal(q.__class__, query.Term)
     assert_equal(q.text, u("stra\xdfe"))
 
+
 def test_star():
     schema = fields.Schema(text=fields.TEXT(stored=True))
     qp = default.QueryParser("text", schema)
     assert_equal(q.__class__, query.Wildcard)
     assert_equal(q.text, "*q")
 
+
 def test_star_field():
     schema = fields.Schema(text=fields.TEXT)
     qp = default.QueryParser("text", schema)
     assert_equal(q.fieldname, "text")
     assert_equal(q.text, "test")
 
+
 def test_range_query():
     schema = fields.Schema(name=fields.ID(stored=True), text=fields.TEXT(stored=True))
     qp = default.QueryParser("text", schema)
     assert_equal(q.start, "d")
     assert_equal(q.fieldname, "name")
 
+
 def test_numeric_range():
     schema = fields.Schema(id=fields.STORED, number=fields.NUMERIC)
     qp = default.QueryParser("number", schema)
     assert_equal(q.start, teststart)
     assert_equal(q.end, testend)
 
+
 def test_regressions():
     qp = default.QueryParser("f", None)
 
     q = qp.parse(u('22" BX'))
     assert_equal(q, query.And([query.Term("f", '22"'), query.Term("f", "BX")]))
 
+
 def test_empty_ranges():
     schema = fields.Schema(name=fields.TEXT, num=fields.NUMERIC,
                            date=fields.DATETIME)
         q = qp.parse(u("%s:[to]") % fname)
         assert_equal(q.__class__, query.Every)
 
+
 def test_empty_numeric_range():
     schema = fields.Schema(id=fields.ID, num=fields.NUMERIC)
     qp = default.QueryParser("num", schema)
     assert_equal(q.start, None)
     assert_equal(q.end, None)
 
+
 def test_numrange_multi():
     schema = fields.Schema(text=fields.TEXT, start=fields.NUMERIC, end=fields.NUMERIC)
     qp = default.QueryParser("text", schema)
     assert_equal(q[1].start, 2011)
     assert_equal(q[1].end, 2012)
 
+
 def test_nonexistant_fieldnames():
     # Need an analyzer that won't mangle a URL
     a = analysis.SimpleAnalyzer("\\S+")
     assert_equal(q[1].fieldname, "text")
     assert_equal(q[1].text, "http://localhost/")
 
+
 def test_stopped():
     schema = fields.Schema(text=fields.TEXT)
     qp = default.QueryParser("text", schema)
     q = qp.parse(u("a b"))
     assert_equal(q, query.NullQuery)
 
+
 def test_analyzing_terms():
     schema = fields.Schema(text=fields.TEXT(analyzer=analysis.StemmingAnalyzer()))
     qp = default.QueryParser("text", schema)
     assert_equal(q.__class__, query.Term)
     assert_equal(q.text, "index")
 
+
 def test_simple_parsing():
     parser = default.SimpleParser("x", None)
     q = parser.parse(u("alfa bravo charlie delta"))
     q = parser.parse(u("- alfa +bravo + delta"))
     assert_equal(text_type(q), "((x:bravo AND x:delta) ANDNOT x:alfa)")
 
+
 def test_dismax():
     parser = default.DisMaxParser({"body": 0.8, "title": 2.5}, None)
     q = parser.parse(u("alfa bravo charlie"))
     q = parser.parse(u("alfa -bravo +charlie"))
     assert_equal(text_type(q), "((DisMax(body:charlie^0.8 title:charlie^2.5) ANDMAYBE DisMax(body:alfa^0.8 title:alfa^2.5)) ANDNOT DisMax(body:bravo^0.8 title:bravo^2.5))")
 
+
 def test_many_clauses():
     qs = "1" + (" OR 1" * 1000)
 
     parser = default.QueryParser("content", None)
     parser.parse(qs)
 
+
 def test_roundtrip():
     parser = default.QueryParser("a", None)
     q = parser.parse(u("a OR ((b AND c AND d AND e) OR f OR g) ANDNOT h"))
     assert_equal(text_type(q), "((a:a OR (a:b AND a:c AND a:d AND a:e) OR a:f OR a:g) ANDNOT a:h)")
 
+
 def test_ngrams():
     schema = fields.Schema(grams=fields.NGRAM)
     parser = default.QueryParser('grams', schema)
     assert_equal(len(q), 8)
     assert_equal([sq.text for sq in q], ["hell", "ello", "llo ", "lo t", "o th", " the", "ther", "here"])
 
+
 def test_ngramwords():
     schema = fields.Schema(grams=fields.NGRAMWORDS(queryor=True))
     parser = default.QueryParser('grams', schema)
     assert_equal(q[0][1].text, "ello")
     assert_equal(q[1].text, "tom")
 
+
 def test_multitoken_default():
     textfield = fields.TEXT()
     assert textfield.multitoken_query == "default"
     assert_equal(q[1].__class__, query.Term)
     assert_equal(q[1].text, "bacon")
 
+
 def test_multitoken_or():
     textfield = fields.TEXT()
     textfield.multitoken_query = "or"
     assert_equal(q[1].__class__, query.Term)
     assert_equal(q[1].text, "bacon")
 
+
 def test_multitoken_phrase():
     textfield = fields.TEXT()
     textfield.multitoken_query = "phrase"
     q = parser.parse(qstring)
     assert_equal(q.__class__, query.Phrase)
 
+
 def test_singlequote_multitoken():
     schema = fields.Schema(text=fields.TEXT(multitoken_query="or"))
     parser = default.QueryParser("text", schema)
     q = parser.parse(u("'foo bar'"))  # single quotes
     assert_equal(q.__unicode__(), "(text:foo OR text:bar)")
 
+
 def test_operator_queries():
     qp = default.QueryParser("f", None)
 
 #    q = p.parse("(a ANDMAYBE b) ANDMAYBE (c ANDMAYBE d)")
 #    assert_equal(text_type(q), "((f:a ANDMAYBE f:b) ANDMAYBE (f:c ANDMAYBE f:d))")
 
+
 def test_not_assoc():
     qp = default.QueryParser("text", None)
     q = qp.parse(u("a AND NOT b OR c"))
     q = qp.parse(u("a NOT (b OR c)"))
     assert_equal(text_type(q), "(text:a AND NOT (text:b OR text:c))")
 
+
 def test_fieldname_space():
     qp = default.QueryParser("a", None)
     q = qp.parse("Man Ray: a retrospective")
     assert_equal(text_type(q), "(a:Man AND a:Ray: AND a:a AND a:retrospective)")
 
+
 def test_fieldname_fieldname():
     qp = default.QueryParser("a", None)
     q = qp.parse("a:b:")
     assert_equal(q, query.Term("a", "b:"))
 
+
 def test_paren_fieldname():
     schema = fields.Schema(kind=fields.ID, content=fields.TEXT)
 
     q = qp.parse(u("kind:(1d565 OR 7c584) AND (stuff)"))
     assert_equal(text_type(q), "((kind:1d565 OR kind:7c584) AND content:stuff)")
 
+
 def test_star_paren():
     qp = default.QueryParser("content", None)
     q = qp.parse(u("(*john*) AND (title:blog)"))
     assert_equal(q[0].text, "*john*")
     assert_equal(q[1].text, "blog")
 
+
 def test_dash():
     ana = analysis.StandardAnalyzer("[ \t\r\n()*?]+")
     schema = fields.Schema(title=fields.TEXT(analyzer=ana),