Commits

Matt Chaput  committed 5e885af

- More work on new multiprocessing writer
- Removed max-WOL statistic.

  • Participants
  • Parent commits 1034522

Comments (0)

Files changed (3)

File src/whoosh/filedb/filetables.py

 from struct import Struct, pack
 
 from whoosh.compat import (loads, dumps, long_type, xrange, iteritems,
-                           b, text_type)
+                           itervalues, b, text_type)
 from whoosh.reading import TermInfo
 from whoosh.system import (_INT_SIZE, _LONG_SIZE, _FLOAT_SIZE, pack_ushort,
                            pack_long, unpack_ushort, unpack_long)
         dbfile.close()
         return cls(doccount, lengths, totals)
 
+    def doc_count(self):
+        if not self.lengths:
+            return 0
+        return max(len(arry) for arry in itervalues(self.lengths))
+
     def field_length(self, fieldname):
         return self.totals.get(fieldname, 0)
 
     def min_field_length(self, fieldname):
         if fieldname in self.mins:
             return self.maxes[fieldname]
+        if not self.lengths:
+            return 0
         mn = byte_to_length(min(b for b in self.lengths[fieldname]))
         self.mins[fieldname] = mn
         return mn
     def max_field_length(self, fieldname):
         if fieldname in self.maxes:
             return self.maxes[fieldname]
+        if not self.lengths:
+            return 0
         mx = byte_to_length(max(b for b in self.lengths[fieldname]))
         self.maxes[fieldname] = mx
         return mx
             arry[docnum] = byte
             self.totals[fieldname] += length
 
+    def add_other(self, other):
+        lengths = self.lengths
+        totals = self.totals
+        doccount = self.doc_count()
+        for fname in other.lengths:
+            if fname not in lengths:
+                lengths[fname] = array("B")
+        self._pad_arrays(doccount)
+
+        for fname in other.lengths:
+            lengths[fname].extend(other.lengths[fname])
+        self._pad_arrays(self.doc_count())
+
+        for fname in other.totals:
+            totals[fname] += other.totals[fname]
+
     def get(self, docnum, fieldname, default=0):
         lengths = self.lengths
         if fieldname not in lengths:
     def field_names(self):
         return self.lengths.keys()
 
-    def to_file(self, dbfile, doccount):
+    def _pad_arrays(self, doccount):
         # Pad out arrays to full length
         for fieldname in self.lengths.keys():
             arry = self.lengths[fieldname]
                 for _ in xrange(doccount - len(arry)):
                     arry.append(0)
 
+    def to_file(self, dbfile, doccount):
+        self._pad_arrays(doccount)
+
         dbfile.write("\xFF")  # Header byte
         dbfile.write_int(1)  # Format version number
         dbfile.write_uint(doccount)  # Number of documents
 
 
 class FileTermInfo(TermInfo):
-    # Freq, Doc freq, min len, max length, max weight, max WOL, min ID, max ID
+    # Freq, Doc freq, min len, max length, max weight, unused, min ID, max ID
     struct = Struct("!fIBBffII")
 
     def __init__(self, weight=0.0, docfreq=0, minlength=None, maxlength=0,
-                 maxweight=0.0, maxwol=0.0, minid=None, maxid=None,
-                 postings=None):
+                 maxweight=0.0, minid=None, maxid=None, postings=None):
         self._weight = weight
         self._df = docfreq
         self._minlength = minlength  # (as byte)
         self._maxlength = maxlength  # (as byte)
         self._maxweight = maxweight
-        self._maxwol = maxwol
         self._minid = minid
         self._maxid = maxid
         self.postings = postings
         self._maxlength = max(self._maxlength, xl)
 
         self._maxweight = max(self._maxweight, block.max_weight())
-        self._maxwol = max(self._maxwol, block.max_wol())
 
         if self._minid is None:
             self._minid = block.ids[0]
 
         # Pack the term info into bytes
         st = self.struct.pack(self._weight, self._df, ml, xl,
-                              self._maxweight, self._maxwol, mid, xid)
+                              self._maxweight, 0, mid, xid)
 
         if isinstance(self.postings, tuple):
             # Postings are inlined - dump them using the pickle protocol
         hbyte = ord(s[0:1])
         if hbyte < 2:
             st = cls.struct
-            # Freq, Doc freq, min len, max len, max w, max WOL, min ID, max ID
-            f, df, ml, xl, xw, xwol, mid, xid = st.unpack(s[1:st.size + 1])
+            # Freq, Doc freq, min len, max len, max w, unused, min ID, max ID
+            f, df, ml, xl, xw, _, mid, xid = st.unpack(s[1:st.size + 1])
             mid = None if mid == NO_ID else mid
             xid = None if xid == NO_ID else xid
             # Postings
             ml = 1
             xl = 106374
             xw = 999999999
-            xwol = 999999999
             mid = -1
             xid = -1
 
-        return cls(f, df, ml, xl, xw, xwol, mid, xid, p)
+        return cls(f, df, ml, xl, xw, mid, xid, p)
 
     @classmethod
     def read_weight(cls, dbfile, datapos):
         weightspos = datapos + 1 + _FLOAT_SIZE + _INT_SIZE + 2
         return dbfile.get_float(weightspos)
 
-    @classmethod
-    def read_max_wol(cls, dbfile, datapos):
-        weightspos = datapos + 1 + _FLOAT_SIZE + _INT_SIZE + 2
-        return dbfile.get_float(weightspos + _FLOAT_SIZE)

File src/whoosh/filedb/filewriting.py

 #    return segments
 
 
+class PostingPool(SortingPool):
+    # Subclass whoosh.support.externalsort.SortingPool to use knowledge of
+    # postings to set run size in bytes instead of items
+
+    def __init__(self, limitmb=128, **kwargs):
+        SortingPool.__init__(self, **kwargs)
+        self.limit = limitmb * 1024 * 1024
+        self.currentsize = 0
+
+    def add(self, item):
+        # item = (fieldname, text, docnum, weight, valuestring)
+        size = (28 + 4 * 5  # tuple = 28 + 4 * length
+                + 21 + len(item[0])  # fieldname = str = 21 + length
+                + 26 + len(item[1]) * 2  # text = unicode = 26 + 2 * length
+                + 18  # docnum = long = 18
+                + 16  # weight = float = 16
+                + 21 + len(item[4]))  # valuestring
+        self.currentsize += size
+        if self.currentsize > self.limit:
+            self.save()
+
+    def save(self):
+        SortingPool.save(self)
+        self.currentsize = 0
+
+
 # Writer object
 
 class SegmentWriter(IndexWriter):
     def __init__(self, ix, poolclass=None, blocklimit=128, timeout=0.0,
-                 delay=0.1, _lk=True, poolsize=100000, docbase=0, **kwargs):
+                 delay=0.1, _lk=True, limitmb=128, docbase=0, **kwargs):
         self.is_closed = False
         self.writelock = None
         self._added = False
         self.lengths = Lengths()
 
         # Create the posting pool
-        self.pool = SortingPool(maxsize=poolsize, prefix=self.indexname)
+        self.pool = PostingPool(limitmb=limitmb,
+                                prefix="whoosh_%s_" % self.indexname)
 
     def _check_state(self):
         if self.is_closed:
 
         self.termswriter.close()
         self.storedfields.close()
-        if not self.lengthfile.is_closed:
-            self.lengthfile.close()
         if self.vectorindex:
             self.vectorindex.close()
         if self.vpostwriter:

File src/whoosh/filedb/multiproc2.py

 # those of the authors and should not be interpreted as representing official
 # policies, either expressed or implied, of Matt Chaput.
 
-import os, tempfile
+import marshal, os, tempfile
 from multiprocessing import Process, Queue, cpu_count
 
-from whoosh.compat import xrange, iteritems, dump, load
+from whoosh.compat import xrange, iteritems, pickle
+from whoosh.filedb.filetables import Lengths
 from whoosh.filedb.filewriting import SegmentWriter
+from whoosh.support.externalsort import imerge
 from whoosh.writing import IndexWriter
 
 
 
     def _process_file(self, filename, length):
         writer = self.writer
+        load = pickle.load
         f = open(filename, "rb")
         for _ in xrange(length):
             code, args = load(f)
 
 
 class MpWriter(IndexWriter):
-    def __init__(self, ix, procs=None, batchsize=100, subargs=None, **kwargs):
+    def __init__(self, ix, procs=None, batchsize=100, subargs=None,
+                 combine=True, ** kwargs):
         self.index = ix
         self.writer = self.index.writer(**kwargs)
         self.procs = procs or cpu_count()
         self.batchsize = batchsize
         self.subargs = subargs if subargs else kwargs
+        self.combine = combine
 
         self.tasks = []
         self.jobqueue = Queue(self.procs * 4)
 
     def _enqueue(self):
         docbuffer = self.docbuffer
+        dump = pickle.dump
         length = len(docbuffer)
         fd, filename = tempfile.mkstemp(".doclist")
         f = os.fdopen(fd, "wb")
         if len(self.docbuffer) >= self.batchsize:
             self._enqueue()
 
+    def _read_and_renumber_run(self, path, offset):
+        load = marshal.load
+        f = open(path, "rb")
+        try:
+            while True:
+                fname, text, docnum, weight, value = load(f)
+                yield (fname, text, docnum + offset, weight, value)
+        except EOFError:
+            return
+        finally:
+            f.close()
+            os.remove(path)
+
     def commit(self, **kwargs):
+        writer = self.writer
+        pool = writer.pool
+
         # Index the remaining documents in the doc buffer
         self._enqueue()
         # Tell the tasks to finish
         # Get the results
         results = []
         for task in self.tasks:
+            # runname, doccount, lenname
             results.append(self.resultqueue.get(timeout=5))
 
+        if results:
+            print "Combining results"
+            from whoosh.util import now
+            t = now()
+            for runname, doccount, lenname in results:
+                f = writer.storage.open_file(lenname)
+                lengths = Lengths.from_file(f, doccount)
+                writer.lengths.add_other(lengths)
+                writer.storage.delete_file(lenname)
 
+            base = results[0][1]
+            runreaders = [pool._read_run(results[0][0])]
+            for runname, doccount, lenname in results[1:]:
+                rr = self._read_and_renumber_run(runname, base)
+                runreaders.append(rr)
+                base += doccount
+            writer.termswriter.add_iter(imerge(runreaders), writer.lengths)
+            print "Combining took", now() - t
 
 
 
 
 
+
+
+