Commits

Anonymous committed 08f9a86

hachoir-parser/archive: add new ZlibData parser to parse raw Deflate streams

  • Participants
  • Parent commits a8daa17

Comments (0)

Files changed (2)

File hachoir-parser/hachoir_parser/archive/__init__.py

 from hachoir_parser.archive.sevenzip import SevenZipParser
 from hachoir_parser.archive.mar import MarFile
 from hachoir_parser.archive.mozilla_ar import MozillaArchive
+from hachoir_parser.archive.zlib import ZlibData

File hachoir-parser/hachoir_parser/archive/zlib.py

+"""Detailed ZLIB parser
+
+Author: Robert Xiao
+Creation date: July 9 2007
+
+"""
+
+from hachoir_parser import Parser
+from hachoir_core.field import (Bit, Bits, Field, Int16, UInt32,
+    Enum, FieldSet, GenericFieldSet,
+    PaddingBits, ParserError, RawBytes)
+from hachoir_core.endian import LITTLE_ENDIAN
+from hachoir_core.text_handler import textHandler, hexadecimal
+from hachoir_core.tools import paddingSize, alignValue
+
+def extend_data(data, length, offset):
+    """Extend data using a length and an offset."""
+    if length >= offset:
+        new_data = data[-offset:] * (alignValue(length, offset) // offset)
+        return data + new_data[:length]
+    else:
+        return data + data[-offset:-offset+length]
+
+def build_tree(lengths):
+    """Build a Huffman tree from a list of lengths.
+       The ith entry of the input list is the length of the Huffman code corresponding to
+       integer i, or 0 if the integer i is unused."""
+    max_length = max(lengths) + 1
+    bit_counts = [0]*max_length
+    next_code = [0]*max_length
+    tree = {}
+    for i in lengths:
+        if i:
+            bit_counts[i] += 1
+    code = 0
+    for i in xrange(1, len(bit_counts)):
+        next_code[i] = code = (code + bit_counts[i-1]) << 1
+    for i, ln in enumerate(lengths):
+        if ln:
+            tree[(ln, next_code[ln])] = i
+            next_code[ln] += 1
+    return tree
+
+class HuffmanCode(Field):
+    """Huffman code. Uses tree parameter as the Huffman tree."""
+    def __init__(self, parent, name, tree, description=None):
+        Field.__init__(self, parent, name, 0, description)
+
+        endian = self.parent.endian
+        stream = self.parent.stream
+        addr = self.absolute_address
+
+        value = 0
+        while (self.size, value) not in tree:
+            bit = stream.readBits(addr, 1, endian)
+            value <<= 1
+            value += bit
+            self._size += 1
+            addr += 1
+        self.huffvalue = value
+        self.realvalue = tree[(self.size, value)]
+    def createValue(self):
+        return self.huffvalue
+
+class DeflateBlock(FieldSet):
+    # code: (min, max, extrabits)
+    LENGTH_SYMBOLS = {257:(3,3,0),
+                      258:(4,4,0),
+                      259:(5,5,0),
+                      260:(6,6,0),
+                      261:(7,7,0),
+                      262:(8,8,0),
+                      263:(9,9,0),
+                      264:(10,10,0),
+                      265:(11,12,1),
+                      266:(13,14,1),
+                      267:(15,16,1),
+                      268:(17,18,1),
+                      269:(19,22,2),
+                      270:(23,26,2),
+                      271:(27,30,2),
+                      272:(31,34,2),
+                      273:(35,42,3),
+                      274:(43,50,3),
+                      275:(51,58,3),
+                      276:(59,66,3),
+                      277:(67,82,4),
+                      278:(83,98,4),
+                      279:(99,114,4),
+                      280:(115,130,4),
+                      281:(131,162,5),
+                      282:(163,194,5),
+                      283:(195,226,5),
+                      284:(227,257,5),
+                      285:(258,258,0)
+                      }
+    DISTANCE_SYMBOLS = {0:(1,1,0),
+                        1:(2,2,0),
+                        2:(3,3,0),
+                        3:(4,4,0),
+                        4:(5,6,1),
+                        5:(7,8,1),
+                        6:(9,12,2),
+                        7:(13,16,2),
+                        8:(17,24,3),
+                        9:(25,32,3),
+                        10:(33,48,4),
+                        11:(49,64,4),
+                        12:(65,96,5),
+                        13:(97,128,5),
+                        14:(129,192,6),
+                        15:(193,256,6),
+                        16:(257,384,7),
+                        17:(385,512,7),
+                        18:(513,768,8),
+                        19:(769,1024,8),
+                        20:(1025,1536,9),
+                        21:(1537,2048,9),
+                        22:(2049,3072,10),
+                        23:(3073,4096,10),
+                        24:(4097,6144,11),
+                        25:(6145,8192,11),
+                        26:(8193,12288,12),
+                        27:(12289,16384,12),
+                        28:(16385,24576,13),
+                        29:(24577,32768,13),
+                        }
+    CODE_LENGTH_ORDER = [16, 17, 18, 0, 8, 7, 9, 6, 10, 5, 11, 4, 12, 3, 13, 2, 14, 1, 15]
+    def __init__(self, parent, name, uncomp_data="", *args, **kwargs):
+        FieldSet.__init__(self, parent, name, *args, **kwargs)
+        self.uncomp_data = uncomp_data
+        
+    def createFields(self):
+        yield Bit(self, "final", "Is this the final block?") # BFINAL
+        yield Enum(Bits(self, "compression_type", 2), # BTYPE
+                   {0:"None", 1:"Fixed Huffman", 2:"Dynamic Huffman", 3:"Reserved"})
+        if self["compression_type"].value == 0: # no compression
+            padding = paddingSize(self.current_size + self.absolute_address, 8) # align on byte boundary
+            if padding:
+                yield PaddingBits(self, "padding[]", padding)
+            yield Int16(self, "len")
+            yield Int16(self, "nlen", "One's complement of len")
+            if self["len"].value != ~self["nlen"].value:
+                raise ParserError("len must be equal to the one's complement of nlen!")
+            if self["len"].value: # null stored blocks produced by some encoders (e.g. PIL)
+                yield RawBytes(self, "data", self["len"].value, "Uncompressed data")
+            return
+        elif self["compression_type"].value == 1: # Fixed Huffman
+            length_tree = {} # (size, huffman code): value
+            distance_tree = {}
+            for i in xrange(144):
+                length_tree[(8, i+48)] = i
+            for i in xrange(144, 256):
+                length_tree[(9, i+256)] = i
+            for i in xrange(256, 280):
+                length_tree[(7, i-256)] = i
+            for i in xrange(280, 288):
+                length_tree[(8, i-88)] = i
+            for i in xrange(32):
+                distance_tree[(5, i)] = i
+        elif self["compression_type"].value == 2: # Dynamic Huffman
+            yield Bits(self, "huff_num_length_codes", 5, "Number of Literal/Length Codes, minus 257")
+            yield Bits(self, "huff_num_distance_codes", 5, "Number of Distance Codes, minus 1")
+            yield Bits(self, "huff_num_code_length_codes", 4, "Number of Code Length Codes, minus 4")
+            code_length_code_lengths = [0]*19 # confusing variable name...
+            for i in self.CODE_LENGTH_ORDER[:self["huff_num_code_length_codes"].value+4]:
+                field = Bits(self, "huff_code_length_code[%i]" % i, 3, "Code lengths for the code length alphabet")
+                yield field
+                code_length_code_lengths[i] = field.value
+            code_length_tree = build_tree(code_length_code_lengths)
+            length_code_lengths = []
+            distance_code_lengths = []
+            for numcodes, name, lengths in (
+                (self["huff_num_length_codes"].value + 257, "length", length_code_lengths),
+                (self["huff_num_distance_codes"].value + 1, "distance", distance_code_lengths)):
+                while len(lengths) < numcodes:
+                    field = HuffmanCode(self, "huff_%s_code[]" % name, code_length_tree)
+                    value = field.realvalue
+                    if value < 16:
+                        prev_value = value
+                        field._description = "Literal Code Length %i (Huffman Code %i)" % (value, field.value)
+                        yield field
+                        lengths.append(value)
+                    else:
+                        info = {16: (3,6,2),
+                                17: (3,10,3),
+                                18: (11,138,7)}[value]
+                        if value == 16:
+                            repvalue = prev_value
+                        else:
+                            repvalue = 0
+                        field._description = "Repeat Code %i, Repeating value (%i) %i to %i times (Huffman Code %i)" % (value, repvalue, info[0], info[1], field.value)
+                        yield field
+                        extrafield = Bits(self, "huff_%s_code_extra[%s" % (name, field.name.split('[')[1]), info[2])
+                        num_repeats = extrafield.value+info[0]
+                        extrafield._description = "Repeat Extra Bits (%i), total repeats %i"%(extrafield.value, num_repeats)
+                        yield extrafield
+                        lengths += [repvalue]*num_repeats
+            length_tree = build_tree(length_code_lengths)
+            distance_tree = build_tree(distance_code_lengths)
+        else:
+            raise ParserError("Unsupported compression type 3!")
+        while True:
+            field = HuffmanCode(self, "length_code[]", length_tree)
+            value = field.realvalue
+            if value < 256:
+                field._description = "Literal Code %r (Huffman Code %i)" % (chr(value), field.value)
+                yield field
+                self.uncomp_data += chr(value)
+            if value == 256:
+                field._description = "Block Terminator Code (256) (Huffman Code %i)" % field.value
+                yield field
+                break
+            elif value > 256:
+                info = self.LENGTH_SYMBOLS[value]
+                if info[2] == 0:
+                    field._description = "Length Code %i, Value %i (Huffman Code %i)" % (value, info[0], field.value)
+                    length = info[0]
+                    yield field
+                else:
+                    field._description = "Length Code %i, Values %i to %i (Huffman Code %i)" % (value, info[0], info[1], field.value)
+                    yield field
+                    extrafield = Bits(self, "length_extra[%s" % field.name.split('[')[1], info[2])
+                    length = extrafield.value + info[0]
+                    extrafield._description = "Length Extra Bits (%i), total length %i"%(extrafield.value, length)
+                    yield extrafield
+                field = HuffmanCode(self, "distance_code[]", distance_tree)
+                value = field.realvalue
+                info = self.DISTANCE_SYMBOLS[value]
+                if info[2] == 0:
+                    field._description = "Distance Code %i, Value %i (Huffman Code %i)" % (value, info[0], field.value)
+                    distance = info[0]
+                    yield field
+                else:
+                    field._description = "Distance Code %i, Values %i to %i (Huffman Code %i)" % (value, info[0], info[1], field.value)
+                    yield field
+                    extrafield = Bits(self, "distance_extra[%s" % field.name.split('[')[1], info[2])
+                    distance = extrafield.value + info[0]
+                    extrafield._description = "Distance Extra Bits (%i), total length %i"%(extrafield.value, distance)
+                    yield extrafield
+                self.uncomp_data = extend_data(self.uncomp_data, length, distance)
+
+class DeflateData(GenericFieldSet):
+    endian = LITTLE_ENDIAN
+    def createFields(self):
+        uncomp_data = ""
+        blk=DeflateBlock(self, "compressed_block[]", uncomp_data)
+        yield blk
+        uncomp_data = blk.uncomp_data
+        while not blk["final"].value:
+            blk=DeflateBlock(self, "compressed_block[]", uncomp_data)
+            yield blk
+            uncomp_data = blk.uncomp_data
+        padding = paddingSize(self.current_size + self.absolute_address, 8) # align on byte boundary
+        if padding:
+            yield PaddingBits(self, "padding[]", padding)
+        self.uncompressed_data = uncomp_data
+
+class ZlibData(Parser):
+    PARSER_TAGS = {
+        "id": "zlib",
+        "category": "archive",
+        "file_ext": ("zlib",),
+        "min_size": 8*8,
+        "description": "ZLIB Data",
+    }
+    endian = LITTLE_ENDIAN
+
+    def validate(self):
+        if self["compression_method"].value != 8:
+            return "Incorrect compression method"
+        if ((self["compression_info"].value << 12) +
+            (self["compression_method"].value << 8) +
+            (self["flag_compression_level"].value << 6) +
+            (self["flag_dictionary_present"].value << 5) +
+            (self["flag_check_bits"].value)) % 31 != 0:
+            return "Invalid flag check value"
+        return True
+
+    def createFields(self):
+        yield Enum(Bits(self, "compression_method", 4), {8:"deflate", 15:"reserved"}) # CM
+        yield Bits(self, "compression_info", 4, "base-2 log of the window size") # CINFO
+        yield Bits(self, "flag_check_bits", 5) # FCHECK
+        yield Bit(self, "flag_dictionary_present") # FDICT
+        yield Enum(Bits(self, "flag_compression_level", 2), # FLEVEL
+                   {0:"Fastest", 1:"Fast", 2:"Default", 3:"Maximum, Slowest"})
+        if self["flag_dictionary_present"].value:
+            yield textHandler(UInt32(self, "dict_checksum", "ADLER32 checksum of dictionary information"), hexadecimal)
+        yield DeflateData(self, "data", self.stream, description = "Compressed Data")
+        yield textHandler(UInt32(self, "data_checksum", "ADLER32 checksum of compressed data"), hexadecimal)
+
+def zlib_inflate(stream, wbits=None, prevdata=""):
+    if wbits is None or wbits >= 0:
+        return ZlibData(stream)["data"].uncompressed_data
+    else:
+        data = DeflateData(None, "root", stream, "", stream.askSize(None))
+        for unused in data:
+            pass
+        return data.uncompressed_data