Commits

Fabian Topfstedt committed 1960eeb

First running version

  • Participants

Comments (0)

Files changed (8)

+Fabian Topfstedt <topfstedt@schneevonmorgen.com>
+The MIT License
+
+Copyright (c) 2011 Fabian Topfstedt, schnee von morgen webTV GmbH
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in
+all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE.
+FileChunkIO represents a chunk of an OS-level file containing bytes data
+
+Requires Python 2.6+

File __init__.py

Empty file added.

File filechunkio/__init__.py

+from filechunkio import FileChunkIO
+
+
+__version__ = VERSION = 1.5

File filechunkio/filechunkio.py

+import io
+import os
+
+
+SEEK_SET = getattr(io, 'SEEK_SET', 0)
+SEEK_CUR = getattr(io, 'SEEK_CUR', 1)
+SEEK_END = getattr(io, 'SEEK_END', 2)
+
+
+class FileChunkIO(io.FileIO):
+    """
+    A class that allows you reading only a chunk of a file.
+    """
+    def __init__(self, name, mode='r', closefd=True, offset=0, bytes=None,
+        *args, **kwargs):
+        """
+        Open a file chunk. The mode can only be 'r' for reading. Offset
+        is the amount of bytes that the chunks starts after the real file's
+        first byte. Bytes defines the amount of bytes the chunk has, which you
+        can set to None to include the last byte of the real file.
+        """
+        if not mode.startswith('r'):
+            raise ValueError("Mode string must begin with 'r'")
+        self.offset = offset
+        self.bytes = bytes
+        if bytes is None:
+            self.bytes = os.stat(name).st_size - self.offset
+        super(FileChunkIO, self).__init__(name, mode, closefd, *args, **kwargs)
+        self.seek(0)
+
+    def seek(self, offset, whence=SEEK_SET):
+        """
+        Move to a new chunk position.
+        """
+        if whence == SEEK_SET:
+            super(FileChunkIO, self).seek(self.offset + offset)
+        elif whence == SEEK_CUR:
+            self.seek(self.tell() + offset)
+        elif whence == SEEK_END:
+            self.seek(self.bytes + offset)
+
+    def tell(self):
+        """
+        Current file position.
+        """
+        return super(FileChunkIO, self).tell() - self.offset
+
+    def read(self, n=-1):
+        """
+        Read and return at most n bytes.
+        """
+        if n >= 0:
+            max_n = self.bytes - self.tell()
+            n = min([n, max_n])
+            return super(FileChunkIO, self).read(n)
+        else:
+            return self.readall()
+
+    def readall(self):
+        """
+        Read all data from the chunk.
+        """
+        return self.read(self.bytes - self.tell())
+
+    def readinto(self, b):
+        """
+        Same as RawIOBase.readinto().
+        """
+        data = self.read(len(b))
+        n = len(data)
+        try:
+            b[:n] = data
+        except TypeError as err:
+            import array
+            if not isinstance(b, array.array):
+                raise err
+            b[:n] = array.array(b'b', data)
+        return n

File filechunkio/tests.py

+import io
+import os
+import tempfile
+import unittest
+
+from filechunkio import FileChunkIO
+from filechunkio import SEEK_CUR
+from filechunkio import SEEK_END
+
+
+class FileChunkIOTest(unittest.TestCase):
+
+    def setUp(self):
+        self.tf = tempfile.mkstemp()[1]
+
+        with open(self.tf, 'w') as f:
+            f.write('123456789%s' % os.linesep)
+            f.write('234567891%s' % os.linesep)
+            f.write('345678912%s' % os.linesep)
+            f.write('456789123%s' % os.linesep)
+            f.write('56789')
+
+    def tearDown(self):
+        if os.path.exists(self.tf):
+            os.remove(self.tf)
+
+    def test_open_write_raises_valueerror(self):
+        self.assertRaises(ValueError, FileChunkIO, self.tf, 'w')
+
+    def test_init_autosets_bytes(self):
+        with FileChunkIO(self.tf) as c:
+            self.assertEquals(c.bytes, 45)
+
+    def test_init_autosets_bytes_and_respects_offset(self):
+        with FileChunkIO(self.tf, offset=1) as c:
+            self.assertEquals(c.bytes, 44)
+
+    def test_init_seeks_to_offset(self):
+        with FileChunkIO(self.tf, offset=1) as c:
+            self.assertEquals(c.tell(), 0)
+            self.assertEquals(c.read(1), '2')
+
+    def test_seek_respects_offset(self):
+        with FileChunkIO(self.tf, offset=1) as c:
+            c.seek(1)
+            self.assertEquals(c.read(1), '3')
+
+    def test_seek_cur(self):
+        with FileChunkIO(self.tf, offset=20, bytes=10) as c:
+            c.seek(5)
+            c.seek(-3, SEEK_CUR)
+            self.assertEquals(c.tell(), 2)
+
+    def test_seek_end(self):
+        with FileChunkIO(self.tf, offset=10, bytes=10) as c:
+            c.seek(-5, SEEK_END)
+            self.assertEquals(c.read(3), '789')
+
+    def test_tell_respects_offset(self):
+        with FileChunkIO(self.tf, offset=1) as c:
+            self.assertEquals(c.tell(), 0)
+            self.assertEquals(c.read(1), '2')
+
+    def test_read_with_minus_one_calls_readall(self):
+        with FileChunkIO(self.tf) as c:
+            def mocked_readall(*args, **kwargs):
+                self._readall_called = True
+            c.readall = mocked_readall
+
+            c.read(-1)
+            self.assertTrue(self._readall_called)
+
+    def test_read_respects_offset_and_bytes(self):
+        with FileChunkIO(self.tf, offset=1, bytes=3) as c:
+            self.assertEquals(c.read(), '234')
+
+    def test_readinto(self):
+        with FileChunkIO(self.tf, offset=1, bytes=2) as c:
+            n = 3
+            b = bytearray(n.__index__())
+            c.readinto(b)
+            self.assertEquals(b, b'23\x00')
+
+    def test_readline(self):
+        with FileChunkIO(self.tf, offset=1, bytes=20) as c:
+            lines = []
+            while True:
+                line = c.readline()
+                if not line:
+                    break
+                lines.append(line)
+            self.assertEquals(lines, ['23456789\n', '234567891\n', '3'])
+
+    def test_readlines(self):
+        with FileChunkIO(self.tf, offset=1, bytes=15) as c:
+            self.assertEquals(c.readlines(), ['23456789\n', '234567'])
+
+    def test_next(self):
+        with FileChunkIO(self.tf, offset=1, bytes=20) as c:
+            lines = []
+            while True:
+                try:
+                    lines.append(c.next())
+                except StopIteration:
+                    break
+            self.assertEquals(lines, ['23456789\n', '234567891\n', '3'])
+
+
+if __name__ == '__main__':
+    unittest.main()
+#!/usr/bin/env python
+from distutils.core import setup
+
+from filechunkio import __version__
+
+
+setup(
+    name="filechunkio",
+    version=unicode(__version__),
+    description="FileChunkIO represents a chunk of an OS-level file "\
+        "containing bytes data",
+    long_description=open("README", 'r').read(),
+    author="Fabian Topfstedt",
+    author_email="topfstedt@schneevonmorgen.com",
+    url="http://bitbucket.org/fabian/filechunkio",
+    license="MIT license",
+    packages=["filechunkio"],
+)