Source

whoosh / src / whoosh / filedb / compound.py

# Copyright 2011 Matt Chaput. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
#    1. Redistributions of source code must retain the above copyright notice,
#       this list of conditions and the following disclaimer.
#
#    2. Redistributions in binary form must reproduce the above copyright
#       notice, this list of conditions and the following disclaimer in the
#       documentation and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY MATT CHAPUT ``AS IS'' AND ANY EXPRESS OR
# IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
# EVENT SHALL MATT CHAPUT OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
# OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
# EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
# The views and conclusions contained in the software and documentation are
# those of the authors and should not be interpreted as representing official
# policies, either expressed or implied, of Matt Chaput.

import errno, sys
from threading import Lock
from shutil import copyfileobj

try:
    import mmap
except ImportError:
    mmap = None

from whoosh.compat import BytesIO, memoryview_
from whoosh.filedb.structfile import BufferFile, StructFile
from whoosh.filedb.filestore import FileStorage
from whoosh.system import emptybytes
from whoosh.util import random_name


class CompoundStorage(FileStorage):
    readonly = True

    def __init__(self, dbfile, use_mmap=True, basepos=0):
        self._file = dbfile
        self._file.seek(basepos)

        self._diroffset = self._file.read_long()
        self._dirlength = self._file.read_int()
        self._file.seek(self._diroffset)
        self._dir = self._file.read_pickle()
        self._options = self._file.read_pickle()
        self._locks = {}
        self._source = None

        if mmap and use_mmap and hasattr(self._file, "fileno"):
            # Try to open the entire segment as a memory-mapped object
            try:
                fileno = self._file.fileno()
                self._source = mmap.mmap(fileno, 0, access=mmap.ACCESS_READ)
            except (mmap.error, OSError):
                e = sys.exc_info()[1]
                # If we got an error because there wasn't enough memory to
                # open the map, ignore it and fall through, we'll just use the
                # (slower) "sub-file" implementation
                if e.errno == errno.ENOMEM:
                    pass
                else:
                    raise
            else:
                # If that worked, we can close the file handle we were given
                self._file.close()
                self._file = None

    def __repr__(self):
        return "<%s (%s)>" % (self.__class__.__name__, self._name)

    def close(self):
        if self._source:
            try:
                self._source.close()
            except BufferError:
                del self._source
        if self._file:
            self._file.close()

    def range(self, name):
        try:
            fileinfo = self._dir[name]
        except KeyError:
            raise NameError("Unknown file %r" % (name,))
        return (fileinfo["offset"], fileinfo["length"])

    def open_file(self, name, *args, **kwargs):
        offset, length = self.range(name)
        if self._source:
            # Create a memoryview/buffer from the mmap
            buf = memoryview_(self._source, offset, length)
            f = BufferFile(buf, name=name)
        elif hasattr(self._file, "subset"):
            f = self._file.subset(offset, length, name=name)
        else:
            f = StructFile(SubFile(self._file, offset, length), name=name)
        return f

    def list(self):
        return list(self._dir.keys())

    def file_exists(self, name):
        return name in self._dir

    def file_length(self, name):
        info = self._dir[name]
        return info["length"]

    def file_modified(self, name):
        info = self._dir[name]
        return info["modified"]

    def lock(self, name):
        if name not in self._locks:
            self._locks[name] = Lock()
        return self._locks[name]

    @staticmethod
    def assemble(dbfile, store, names, **options):
        assert names, names

        directory = {}
        basepos = dbfile.tell()
        dbfile.write_long(0)  # Directory position
        dbfile.write_int(0)  # Directory length

        # Copy the files into the compound file
        for name in names:
            if name.endswith(".toc") or name.endswith(".seg"):
                raise Exception(name)

        for name in names:
            offset = dbfile.tell()
            length = store.file_length(name)
            modified = store.file_modified(name)
            directory[name] = {"offset": offset, "length": length,
                               "modified": modified}
            f = store.open_file(name)
            copyfileobj(f, dbfile)
            f.close()

        CompoundStorage.write_dir(dbfile, basepos, directory, options)

    @staticmethod
    def write_dir(dbfile, basepos, directory, options=None):
        options = options or {}

        dirpos = dbfile.tell()  # Remember the start of the directory
        dbfile.write_pickle(directory)  # Write the directory
        dbfile.write_pickle(options)
        endpos = dbfile.tell()  # Remember the end of the directory
        dbfile.flush()
        dbfile.seek(basepos)  # Seek back to the start
        dbfile.write_long(dirpos)  # Directory position
        dbfile.write_int(endpos - dirpos)  # Directory length

        dbfile.close()


class SubFile(object):
    def __init__(self, parentfile, offset, length, name=None):
        self._file = parentfile
        self._offset = offset
        self._length = length
        self._end = offset + length
        self._pos = 0

        self.name = name
        self.closed = False

    def close(self):
        self.closed = True

    def subset(self, position, length, name=None):
        start = self._offset + position
        end = start + length
        name = name or self.name
        assert self._offset >= start >= self._end
        assert self._offset >= end >= self._end
        return SubFile(self._file, self._offset + position, length, name=name)

    def read(self, size=None):
        if size is None:
            size = self._length - self._pos
        else:
            size = min(size, self._length - self._pos)
        if size < 0:
            size = 0

        if size > 0:
            self._file.seek(self._offset + self._pos)
            self._pos += size
            return self._file.read(size)
        else:
            return emptybytes

    def readline(self):
        maxsize = self._length - self._pos
        self._file.seek(self._offset + self._pos)
        data = self._file.readline()
        if len(data) > maxsize:
            data = data[:maxsize]
        self._pos += len(data)
        return data

    def seek(self, where, whence=0):
        if whence == 0:  # Absolute
            pos = where
        elif whence == 1:  # Relative
            pos = self._pos + where
        elif whence == 2:  # From end
            pos = self._length - where
        else:
            raise ValueError

        self._pos = pos

    def tell(self):
        return self._pos


class CompoundWriter(object):
    def __init__(self, tempstorage, buffersize=32 * 1024):
        assert isinstance(buffersize, int)
        self._tempstorage = tempstorage
        self._tempname = "%s.ctmp" % random_name()
        self._temp = tempstorage.create_file(self._tempname, mode="w+b")
        self._buffersize = buffersize
        self._streams = {}

    def create_file(self, name):
        ss = self.SubStream(self._temp, self._buffersize)
        self._streams[name] = ss
        return StructFile(ss)

    def _readback(self):
        temp = self._temp
        for name, substream in self._streams.items():
            substream.close()

            def gen():
                for f, offset, length in substream.blocks:
                    if f is None:
                        f = temp
                    f.seek(offset)
                    yield f.read(length)

            yield (name, gen)
        temp.close()
        self._tempstorage.delete_file(self._tempname)

    def save_as_compound(self, dbfile):
        basepos = dbfile.tell()
        dbfile.write_long(0)  # Directory offset
        dbfile.write_int(0)  # Directory length

        directory = {}
        for name, blocks in self._readback():
            filestart = dbfile.tell()
            for block in blocks():
                dbfile.write(block)
            directory[name] = {"offset": filestart,
                               "length": dbfile.tell() - filestart}

        CompoundStorage.write_dir(dbfile, basepos, directory)

    def save_as_files(self, storage, name_fn):
        for name, blocks in self._readback():
            f = storage.create_file(name_fn(name))
            for block in blocks():
                f.write(block)
            f.close()

    class SubStream(object):
        def __init__(self, dbfile, buffersize):
            self._dbfile = dbfile
            self._buffersize = buffersize
            self._buffer = BytesIO()
            self.blocks = []

        def tell(self):
            return sum(b[2] for b in self.blocks) + self._buffer.tell()

        def write(self, inbytes):
            bio = self._buffer
            buflen = bio.tell()
            length = buflen + len(inbytes)
            if length >= self._buffersize:
                offset = self._dbfile.tell()
                self._dbfile.write(bio.getvalue()[:buflen])
                self._dbfile.write(inbytes)

                self.blocks.append((None, offset, length))
                self._buffer.seek(0)
            else:
                bio.write(inbytes)

        def close(self):
            bio = self._buffer
            length = bio.tell()
            if length:
                self.blocks.append((bio, 0, length))