Commits

Matt Chaput committed 1034522

Forgot to add externalsort module to last commit.

Comments (0)

Files changed (1)

src/whoosh/support/externalsort.py

+# Copyright 2011 Matt Chaput. All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are met:
+#
+#    1. Redistributions of source code must retain the above copyright notice,
+#       this list of conditions and the following disclaimer.
+#
+#    2. Redistributions in binary form must reproduce the above copyright
+#       notice, this list of conditions and the following disclaimer in the
+#       documentation and/or other materials provided with the distribution.
+#
+# THIS SOFTWARE IS PROVIDED BY MATT CHAPUT ``AS IS'' AND ANY EXPRESS OR
+# IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
+# MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
+# EVENT SHALL MATT CHAPUT OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
+# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
+# OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
+# EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+#
+# The views and conclusions contained in the software and documentation are
+# those of the authors and should not be interpreted as representing official
+# policies, either expressed or implied, of Matt Chaput.
+
+"""
+This module implements a general external merge sort for Python objects.
+"""
+
+from __future__ import with_statement
+
+import os, tempfile
+from heapq import heapify, heappop, heapreplace
+from marshal import load, dump, dumps
+
+from whoosh.compat import xrange
+
+
+try:
+    from heapq import merge
+    def imerge(iterables):
+        return merge(*iterables)
+except ImportError:
+    def imerge(iterables):
+        _hpop, _hreplace, _Stop = (heappop, heapreplace, StopIteration)
+        h = []
+        h_append = h.append
+        for itnum, it in enumerate(map(iter, iterables)):
+            try:
+                nx = it.next
+                h_append([nx(), itnum, nx])
+            except _Stop:
+                pass
+        heapify(h)
+
+        while 1:
+            try:
+                while 1:
+                    v, itnum, nx = s = h[0]
+                    yield v
+                    s[0] = nx()
+                    _hreplace(h, s)
+            except _Stop:
+                _hpop(h)
+            except IndexError:
+                return
+
+
+class SortingPool(object):
+    """This object implements a general K-way external merge sort for Python
+    objects.
+    
+    >>> pool = MergePool()
+    >>> # Add an unlimited number of items in any order
+    >>> for item in my_items:
+    ...     pool.add(item)
+    ...
+    >>> # Get the items back in sorted order
+    >>> for item in pool.items():
+    ...     print item
+    
+    This class uses the `marshal` module to write the items to temporary files,
+    so you can only sort marshal-able types (generally: numbers, strings,
+    tuples, lists, and dicts).
+    """
+
+    filenamechars = "abcdefghijklmnopqrstuvwxyz_1234567890"
+
+    def __init__(self, maxsize=1000000, tempdir=None, prefix="", suffix=".run"):
+        """
+        :param maxsize: the maximum number of items to keep in memory at once.
+        :param tempdir: the path of a directory to use for temporary file
+            storage. The default is to use the system's temp directory.
+        :param prefix: a prefix to add to temporary filenames.
+        :param suffix: a suffix to add to temporary filenames.
+        """
+
+        self.tempdir = tempdir
+        if maxsize < 1:
+            raise ValueError("maxsize=%s must be >= 1" % maxsize)
+        self.maxsize = maxsize
+        self.prefix = prefix
+        self.suffix = suffix
+        # Current run queue
+        self.current = []
+        # List of run filenames
+        self.runs = []
+
+    def _new_run(self):
+        fd, path = tempfile.mkstemp(prefix=self.prefix, suffix=self.suffix,
+                                    dir=self.tempdir)
+        f = os.fdopen(fd, "wb")
+        return path, f
+
+    @staticmethod
+    def _read_run(path):
+        f = open(path, "rb")
+        try:
+            while True:
+                yield load(f)
+        except EOFError:
+            return
+        finally:
+            f.close()
+            os.remove(path)
+
+    @classmethod
+    def _merge_runs(cls, paths):
+        iters = [cls._read_run(path) for path in paths]
+        for item in imerge(iters):
+            yield item
+
+    def add(self, item):
+        """Adds `item` to the pool to be sorted.
+        """
+
+        if len(self.current) >= self.maxsize:
+            self.save()
+        self.current.append(item)
+
+    def _write_run(self, f, items):
+        for item in items:
+            dump(item, f)
+        f.close()
+
+    def _add_run(self, filename):
+        self.runs.append(filename)
+
+    def save(self):
+        current = self.current
+        if current:
+            current.sort()
+            path, f = self._new_run()
+            self._write_run(f, current)
+            self._add_run(path)
+            self.current = []
+
+    def cleanup(self):
+        for path in self.runs:
+            try:
+                os.remove(path)
+            except OSError:
+                pass
+
+    def reduce_to(self, target, k):
+        # Reduce the number of runs to "target" by merging "k" runs at a time
+
+        if k < 2:
+            raise ValueError("k=%s must be > 2" % k)
+        if target < 1:
+            raise ValueError("target=%s must be >= 1" % target)
+        runs = self.runs
+        while len(runs) > target:
+            newpath, f = self._new_run()
+            # Take k runs off the end of the run list
+            tomerge = []
+            while runs and len(tomerge) < k:
+                tomerge.append(runs.pop())
+            # Merge them into a new run and add it at the start of the list
+            self._write_run(f, self._merge_runs(tomerge))
+            runs.insert(0, newpath)
+
+    def items(self, maxfiles=128):
+        """Returns a sorted list or iterator of the items in the pool.
+        
+        :param maxfiles: maximum number of files to open at once.
+        """
+
+        if maxfiles < 2:
+            raise ValueError("maxfiles=%s must be >= 2" % maxfiles)
+
+        if not self.runs:
+            # We never wrote a run to disk, so just sort the queue in memory
+            # and return that
+            return sorted(self.current)
+        # Write a new run with the leftover items in the queue
+        self.save()
+
+        # If we have more runs than allowed open files, merge some of the runs
+        if maxfiles < len(self.runs):
+            self.reduce_to(maxfiles, maxfiles)
+
+        # Take all the runs off the run list and merge them
+        runs = self.runs
+        self.runs = []  # Minor detail, makes this object reusable
+        return self._merge_runs(runs)
+
+
+def sort(items, maxsize=100000, tempdir=None, maxfiles=128):
+    """Sorts the given items using an external merge sort.
+    
+    :param tempdir: the path of a directory to use for temporary file
+        storage. The default is to use the system's temp directory.
+    :param maxsize: the maximum number of items to keep in memory at once.
+    :param maxfiles: maximum number of files to open at once.
+    """
+
+    p = SortingPool(maxsize=maxsize, tempdir=tempdir)
+    for item in items:
+        p.add(item)
+    return p.items(maxfiles=maxfiles)
+
+
+