Commits

Matt Chaput  committed 54c0271

Fixed segment merges not merging per-doc info for glob fields
(because it just got the field names from the schema).
Fixes issue #343.

Added IndexReader.indexed_field_names() and PostingPool.fieldnames to support the fix:
Union the indexed names with the schema names to get the set of names you need to merge.

  • Participants
  • Parent commits 2c5c77e

Comments (0)

Files changed (9)

File src/whoosh/codec/base.py

     def matcher(self, fieldname, text, format_, scorer=None):
         raise NotImplementedError
 
+    @abstractmethod
+    def indexed_field_names(self):
+        raise NotImplementedError
+
     def close(self):
         pass
 

File src/whoosh/codec/memory.py

         ids, weights, values = zip(*items)
         return ListMatcher(ids, weights, values, format_, scorer=scorer)
 
+    def indexed_field_names(self):
+        return self._invindex.keys()
+
     def close(self):
         pass
 

File src/whoosh/codec/plaintext.py

         fieldname, btext = term
         return self._find_term(fieldname, btext)
 
+    def indexed_field_names(self):
+        return self._iter_fields()
+
     def terms(self):
         for fieldname in self._iter_fields():
             for btext in self._iter_btexts():

File src/whoosh/codec/whoosh2.py

 class W2TermsReader(PostingIndexBase):
     # Implements whoosh.codec.base.TermsReader
 
+    def indexed_field_names(self):
+        return self.fieldmap.keys()
+
     def terms(self):
         return self.keys()
 

File src/whoosh/codec/whoosh3.py

     def __contains__(self, term):
         return self._keycoder(*term) in self._tindex
 
+    def indexed_field_names(self):
+        return self._fieldmap.keys()
+
     def terms(self):
         keydecoder = self._keydecoder
         return (keydecoder(keybytes) for keybytes in self._tindex.keys())

File src/whoosh/multiproc.py

 
     # The filename of the single remaining run
     runname = writer.pool.runs[0]
+    # The indexed field names
+    fieldnames = writer.pool.fieldnames
     # The segment object (parent can use this to re-open the files created
     # by the sub-writer)
     segment = writer._partial_segment()
 
-    return runname, segment
+    return runname, fieldnames, segment
 
 
 # Multiprocessing Writer
             if multisegment:
                 # Actually finish the segment and return it with no run
                 runname = None
+                fieldnames = writer.pool.fieldnames
                 segment = writer._finalize_segment()
             else:
                 # Merge all runs in the writer's pool into one run, close the
                 # segment, and return the run name and the segment
                 k = self.kwargs.get("k", 64)
-                runname, segment = finish_subsegment(writer, k)
+                runname, fieldnames, segment = finish_subsegment(writer, k)
 
             # Put the results (the run filename and the segment object) on the
             # result queue
-            resultqueue.put((runname, segment), timeout=5)
+            resultqueue.put((runname, fieldnames, segment), timeout=5)
 
     def _process_file(self, filename, doc_count):
         # This method processes a "job file" written out by the parent task. A
         for task in self.tasks:
             task.join()
 
-        # Pull a (run_file_name, segment) tuple off the result queue for
-        # each sub-task, representing the final results of the task
+        # Pull a (run_file_name, fieldnames, segment) tuple off the result
+        # queue for each sub-task, representing the final results of the task
         results = []
         for task in self.tasks:
             results.append(self.resultqueue.get(timeout=5))
 
         if self.multisegment:
-            finalsegments += [s for _, s in results]
+            # If we're not merging the segments, we don't care about the runname
+            # and fieldnames in the results... just pull out the segments and
+            # add them to the list of final segments
+            finalsegments += [s for _, _, s in results]
             if self._added:
                 finalsegments.append(self._finalize_segment())
             else:
         self._finish()
 
     def _merge_subsegments(self, results, mergetype):
+        schema = self.schema
+        schemanames = set(schema.names())
         storage = self.storage
         codec = self.codec
         sources = []
             sources.append(self.pool.iter_postings())
 
         pdrs = []
-        for runname, segment in results:
+        for runname, fieldnames, segment in results:
+            fieldnames = set(fieldnames) | schemanames
             pdr = codec.per_document_reader(storage, segment)
             pdrs.append(pdr)
             basedoc = self.docnum
-            docmap = self.write_per_doc(pdr)
+            docmap = self.write_per_doc(fieldnames, pdr)
             assert docmap is None
 
             items = self._read_and_renumber_run(runname, basedoc)
 
         try:
             # Merge the iterators into the field writer
-            self.fieldwriter.add_postings(self.schema, mpdr, imerge(sources))
+            self.fieldwriter.add_postings(schema, mpdr, imerge(sources))
         finally:
             mpdr.close()
         self._added = True

File src/whoosh/reading.py

         return None
 
     @abstractmethod
+    def indexed_field_names(self):
+        """Returns an iterable of strings representing the names of the indexed
+        fields. This may include additional names not explicitly listed in the
+        Schema if you use "glob" fields.
+        """
+
+        raise NotImplementedError
+
+    @abstractmethod
     def all_terms(self):
         """Yields (fieldname, text) tuples for every term in the index.
         """
         if self.schema[fieldname].format is None:
             raise TermNotFound("Field %r is not indexed" % fieldname)
 
+    def indexed_field_names(self):
+        return self._terms.indexed_field_names()
+
     def all_terms(self):
         if self.is_closed:
             raise ReaderClosed
     def __iter__(self):
         return iter([])
 
+    def indexed_field_names(self):
+        return []
+
     def all_terms(self):
         return iter([])
 
             # Yield the term
             yield term
 
+    def indexed_field_names(self):
+        names = set()
+        for r in self.reader():
+            names.update(r.indexed_field_names())
+        return iter(names)
+
     def all_terms(self):
         return self._merge_terms([r.all_terms() for r in self.readers])
 

File src/whoosh/writing.py

         self.segment = segment
         self.limit = limitmb * 1024 * 1024
         self.currentsize = 0
+        self.fieldnames = set()
 
     def _new_run(self):
         path = "%s.run" % random_name()
         assert isinstance(item[1], bytes_type), "tbytes=%r" % item[1]
         if item[4] is not None:
             assert isinstance(item[4], bytes_type), "vbytes=%r" % item[4]
+        self.fieldnames.add(item[0])
         size = (28 + 4 * 5  # tuple = 28 + 4 * length
                 + 21 + len(item[0])  # fieldname = str = 21 + length
                 + 26 + len(item[1]) * 2  # text = unicode = 26 + 2 * length
         items = self._process_posts(items, startdoc, docmap)
         self.fieldwriter.add_postings(self.schema, lengths, items)
 
-    def write_per_doc(self, reader):
+    def write_per_doc(self, fieldnames, reader):
+        # Very bad hack: reader should be an IndexReader, but may be a
+        # PerDocumentReader if this is called from multiproc, where the code
+        # tries to be efficient by merging per-doc and terms separately.
+        # TODO: fix this!
+
         schema = self.schema
-
         if reader.has_deletions():
             docmap = {}
         else:
             docmap = None
 
         pdw = self.perdocwriter
-
         # Open all column readers
         cols = {}
-        for fieldname, fieldobj in schema.items():
+        for fieldname in fieldnames:
+            fieldobj = schema[fieldname]
             coltype = fieldobj.column_type
             if coltype and reader.has_column(fieldname):
                 creader = reader.column_reader(fieldname, coltype)
                 docmap[docnum] = self.docnum
 
             pdw.start_doc(self.docnum)
-            for fieldname, fieldobj in schema.items():
+            for fieldname in fieldnames:
+                fieldobj = schema[fieldname]
                 length = reader.doc_field_length(docnum, fieldname)
                 pdw.add_field(fieldname, fieldobj,
                               stored.get(fieldname), length)
     def add_reader(self, reader):
         self._check_state()
         basedoc = self.docnum
-        docmap = self.write_per_doc(reader)
+        ndxnames = set(fname for fname in reader.indexed_field_names()
+                       if fname in self.schema)
+        fieldnames = set(self.schema.names()) | ndxnames
+
+        docmap = self.write_per_doc(fieldnames, reader)
         self.add_postings_to_pool(reader, basedoc, docmap)
         self._added = True
 

File tests/test_indexing.py

         assert [hit["id"] for hit in r] == [1, 0, 3, 2]
 
 
+def test_globfield_length_merge():
+    # Issue 343
 
+    schema = fields.Schema(title=fields.TEXT(stored=True),
+                           path=fields.ID(stored=True))
+    schema.add("*_text", fields.TEXT, glob=True)
 
+    with TempIndex(schema, "globlenmerge") as ix:
+        with ix.writer() as w:
+            w.add_document(title=u"First document", path=u"/a",
+                           content_text=u"This is the first document we've added!")
 
+        with ix.writer() as w:
+            w.add_document(title=u"Second document", path=u"/b",
+                           content_text=u"The second document is even more interesting!")
+
+        with ix.searcher() as s:
+            docnum = s.document_number(path="/a")
+            assert s.doc_field_length(docnum, "content_text") is not None
+
+            qp = qparser.QueryParser("content", schema)
+            q = qp.parse("content_text:document")
+            r = s.search(q)
+            paths = sorted(hit["path"] for hit in r)
+            assert paths == ["/a", "/b"]
+
+