Snippets

Alexander Hanel Aplib Python

Created by Alexander Hanel

File aplib2.py Added

  • Ignore whitespace
  • Hide word diff
+#!/usr/bin/env python
+# -*- coding: cp1252 -*-
+
+
+"""
+    File Name: aplib.py
+    Author: Ange Albertini, extended by Alexander Hanel
+    Date Created: Unknown
+    Date Modified: 2016/08/07
+    Python Version: 2.7
+    License: BSD License
+"""
+
+
+"""
+Details:
+    This script is a modification of Ange Albertini's aplib code. This
+    script adds the optional aplib header and allows data to be compressed
+    or decompressed via the command line. 
+
+usage: aplib2.py [-h] [-d] [-c] [-a] -i INPUT -o OUTPUT
+
+aplib compression and decompression
+
+optional arguments:
+  -h, --help            show this help message and exit
+  -d, --decompress      decompresses aplib data
+  -c, --compress        compresses aplib data
+  -a, --add-header      add Aplib header to compressed data
+  -i INPUT, --input INPUT
+                        input file to be compressed or decompressed
+  -o OUTPUT, --output OUTPUT
+                        output file to be compressed or decompressed
+
+APLIB Header Details
+
+aplib header
+Offset(h) 00 01 02 03 04 05 06 07 08 09 0A 0B 0C 0D 0E 0F
+
+00000000  41 50 33 32 18 00 00 00 6A 2D 00 00 C3 5B 3C CC  AP32....j-...[<.
+00000010  00 64 00 00 3A F8 81 21 4D 38 5A 90 38 03 66 02  .d..:.!M8Z.8.f.
+
+no header 
+Offset(h) 00 01 02 03 04 05 06 07 08 09 0A 0B 0C 0D 0E 0F
+
+00000000  4D 38 5A 90 38 03 66 02 04 09 71 FF 81 B8 C2 91  M8Z.8.f...q....‘
+00000010  01 40 C2 15 C6 80 09 1C 0E 1F BA F8 00 B4 09 CD  ................
+
+Aplib header notes 
+ * tag: 4 bytes, 41 50 33 32 or "AP32"
+ * header_size: 4 bytes // offset to the compressed data 
+ * packed_size: 4 bytes // size of the compressed data
+ * packed_crc: 4 bytes  // crc of the compressed data 
+ * original_size: 4 bytes // original size of the uncompressed data 
+ * original_crc: 4 bytes  // crc of the uncompress data
+
+Source: See Aplib's spack.asm
+"""
+
+import argparse
+import struct
+from zlib import crc32
+from optparse import OptionParser
+
+
+def find_longest_match(s, sub):
+    """returns the number of byte to look backward and the length of byte to copy)"""
+    if sub == "":
+        return 0, 0
+    limit = len(s)
+    dic = s[:]
+    l = 0
+    offset = 0
+    length = 0
+    first = 0
+    word = ""
+
+    word += sub[l]
+    pos = dic.rfind(word, 0, limit + 1)
+    if pos == -1:
+        return offset, length
+
+    offset = limit - pos
+    length = len(word)
+    dic += sub[l]
+
+    while l < len(sub) - 1:
+        l += 1
+        word += sub[l]
+
+        pos = dic.rfind(word, 0, limit + 1)
+        if pos == -1:
+            return offset, length
+        offset = limit - pos
+        length = len(word)
+        dic += sub[l]
+    return offset, length
+
+
+def int2lebin(value, size):
+    """ouputs value in binary, as little-endian"""
+    result = ""
+    for i in xrange(size):
+        result = result + chr((value >> (8 * i)) & 0xFF )
+    return result
+
+def modifystring(s, sub, offset):
+    """overwrites 'sub' at 'offset' of 's'"""
+    return s[:offset] + sub + s[offset + len(sub):]
+
+def getbinlen(value):
+    """return the bit length of an integer"""
+    result = 0
+    if value == 0:
+        return 1
+    while value != 0:
+        value >>= 1
+        result += 1
+    return result
+
+
+class _bits_compress():
+    """bit machine for variable-sized auto-reloading tag compression"""
+    def __init__(self, tagsize):
+        """tagsize is the number of bytes that takes the tag"""
+        self.out = ""
+
+        self.__tagsize = tagsize
+        self.__tag = 0
+        self.__tagoffset = -1
+        self.__maxbit = (self.__tagsize * 8) - 1
+        self.__curbit = 0
+        self.__isfirsttag = True
+
+
+    def getdata(self):
+        """builds an output string of what's currently compressed:
+        currently output bit + current tag content"""
+        tagstr = int2lebin(self.__tag, self.__tagsize)
+        return modifystring(self.out, tagstr, self.__tagoffset)
+
+    def write_bit(self, value):
+        """writes a bit, make space for the tag if necessary"""
+        if self.__curbit != 0:
+            self.__curbit -= 1
+        else:
+            if self.__isfirsttag:
+                self.__isfirsttag = False
+            else:
+                self.out = self.getdata()
+            self.__tagoffset = len(self.out)
+            self.out += "".join(["\x00"] * self.__tagsize)
+            self.__curbit = self.__maxbit
+            self.__tag = 0
+
+        if value:
+            self.__tag |= (1 << self.__curbit)
+        return
+
+    def write_bitstring(self, s):
+        """write a string of bits"""
+        for c in s:
+            self.write_bit(0 if c == "0" else 1)
+        return
+
+    def write_byte(self, b):
+        """writes a char or a number"""
+        assert len(b) == 1 if isinstance(b, str) else 0 <= b <= 255
+        self.out += b[0:1] if isinstance(b, str) else chr(b)
+        return
+
+    def write_fixednumber(self, value, nbbit):
+        """write a value on a fixed range of bits"""
+        for i in xrange(nbbit - 1, -1, -1):
+            self.write_bit( (value >> i) & 1)
+        return
+
+    def write_variablenumber(self, value):
+        assert value >= 2
+
+        length = getbinlen(value) - 2 # the highest bit is 1
+        self.write_bit(value & (1 << length))
+        for i in xrange(length - 1, -1, -1):
+            self.write_bit(1)
+            self.write_bit(value & (1 << i))
+        self.write_bit(0)
+        return
+
+class _bits_decompress():
+    """bit machine for variable-sized auto-reloading tag decompression"""
+    def __init__(self, data, tagsize):
+        self.__curbit = 0
+        self.__offset = 0
+        self.__tag = None
+        self.__tagsize = tagsize
+        self.__in = data
+        self.out = ""
+
+    def getoffset(self):
+        """return the current byte offset"""
+        return self.__offset
+
+
+    def read_bit(self):
+        """read next bit from the stream, reloads the tag if necessary"""
+        if self.__curbit != 0:
+            self.__curbit -= 1
+        else:
+            self.__curbit = (self.__tagsize * 8) - 1
+            self.__tag = ord(self.read_byte())
+            for i in xrange(self.__tagsize - 1):
+                self.__tag += ord(self.read_byte()) << (8 * (i + 1))
+
+        bit = (self.__tag  >> ((self.__tagsize * 8) - 1)) & 0x01
+        self.__tag <<= 1
+        return bit
+
+
+    def is_end(self):
+        return self.__offset == len(self.__in) and self.__curbit == 1
+
+
+    def read_byte(self):
+        """read next byte from the stream"""
+        if type(self.__in) == str:
+            result = self.__in[self.__offset]
+        elif type(self.__in) == file:
+            result = self.__in.read(1)
+        self.__offset += 1
+        return result
+
+
+    def read_fixednumber(self, nbbit, init=0):
+        """reads a fixed bit-length number"""
+        result = init
+        for i in xrange(nbbit):
+            result = (result << 1)  + self.read_bit()
+        return result
+
+
+    def read_variablenumber(self):
+        """return a variable bit-length number x, x >= 2
+
+        reads a bit until the next bit in the pair is not set"""
+        result = 1
+        result = (result << 1) + self.read_bit()
+        while self.read_bit():
+            result = (result << 1) + self.read_bit()
+        return result
+
+
+    def read_setbits(self, max_, set_=1):
+        """read bits as long as their set or a maximum is reached"""
+        result = 0
+        while result < max_ and self.read_bit() == set_:
+            result += 1
+        return result
+
+
+    def back_copy(self, offset, length=1):
+        for i in xrange(length):
+            self.out += self.out[-offset]
+        return
+
+
+    def read_literal(self, value=None):
+        if value is None:
+            self.out += self.read_byte()
+        else:
+            self.out += value
+        return False
+
+
+"""
+aPLib, LZSS based lossless compression algorithm
+
+Jorgen Ibsen U{http://www.ibsensoftware.com}
+"""
+
+def lengthdelta(offset):
+    if offset < 0x80 or 0x7D00 <= offset:
+        return 2
+    elif 0x500 <= offset:
+        return 1
+    return 0
+
+
+class aplib_compress(_bits_compress):
+    """
+    aplib compression is based on lz77
+    """
+    def __init__(self, data, length=None):
+        _bits_compress.__init__(self, 1)
+        self.__in = data
+        self.__length = length if length is not None else len(data)
+        self.__offset = 0
+        self.__lastoffset = 0
+        self.__pair = True
+        return
+
+
+    def __literal(self, marker=True):
+        if marker:
+            self.write_bit(0)
+        self.write_byte(self.__in[self.__offset])
+        self.__offset += 1
+        self.__pair = True
+        return
+
+
+    def __block(self, offset, length):
+        assert offset >= 2
+        self.write_bitstring("10")
+
+        # if the last operations were literal or single byte
+        # and the offset is unchanged since the last block copy
+        # we can just store a 'null' offset and the length
+        if self.__pair and self.__lastoffset == offset:
+            self.write_variablenumber(2)    # 2-
+            self.write_variablenumber(length)
+        else:
+            high = (offset >> 8) + 2
+            if self.__pair:
+                high += 1
+            self.write_variablenumber(high)
+            low = offset & 0xFF
+            self.write_byte(low)
+            self.write_variablenumber(length - lengthdelta(offset))
+        self.__offset += length
+        self.__lastoffset = offset
+        self.__pair = False
+        return
+
+
+    def __shortblock(self, offset, length):
+        assert 2 <= length <= 3
+        assert 0 < offset <= 127
+        self.write_bitstring("110")
+        b = (offset << 1 ) + (length - 2)
+        self.write_byte(b)
+        self.__offset += length
+        self.__lastoffset = offset
+        self.__pair = False
+        return
+
+
+    def __singlebyte(self, offset):
+        assert 0 <= offset < 16
+        self.write_bitstring("111")
+        self.write_fixednumber(offset, 4)
+        self.__offset += 1
+        self.__pair = True
+        return
+
+
+    def __end(self):
+        self.write_bitstring("110")
+        self.write_byte(chr(0))
+        return
+
+
+    def do(self):
+        self.__literal(False)
+        while self.__offset < self.__length:
+            offset, length = find_longest_match(self.__in[:self.__offset],
+                self.__in[self.__offset:])
+            if length == 0:
+                c = self.__in[self.__offset]
+                if c == "\x00":
+                    self.__singlebyte(0)
+                else:
+                    self.__literal()
+            elif length == 1 and 0 <= offset < 16:
+                self.__singlebyte(offset)
+            elif 2 <= length <= 3 and 0 < offset <= 127:
+                self.__shortblock(offset, length)
+            elif 3 <= length and 2 <= offset:
+                self.__block(offset, length)
+            else:
+                self.__literal()
+                #raise ValueError("no parsing found", offset, length)
+        self.__end()
+        return self.getdata()
+
+
+class aplib_decompress(_bits_decompress):
+    def __init__(self, data):
+        _bits_decompress.__init__(self, data, tagsize=1)
+        self.__pair = True    # paired sequence
+        self.__lastoffset = 0
+        self.__functions = [
+            self.__literal,
+            self.__block,
+            self.__shortblock,
+            self.__singlebyte]
+        return
+
+
+    def __literal(self):
+        self.read_literal()
+        self.__pair = True
+        return False
+
+
+    def __block(self):
+        b = self.read_variablenumber()    # 2-
+        if b == 2 and self.__pair :    # reuse the same offset
+            offset = self.__lastoffset
+            length = self.read_variablenumber()    # 2-
+        else:
+            high = b - 2    # 0-
+            if self.__pair:
+                high -= 1
+            offset = (high << 8) + ord(self.read_byte())
+            length = self.read_variablenumber()    # 2-
+            length += lengthdelta(offset)
+        self.__lastoffset = offset
+        self.back_copy(offset, length)
+        self.__pair = False
+        return False
+
+
+    def __shortblock(self):
+        b = ord(self.read_byte())
+        if b <= 1:    # likely 0
+            return True
+        length = 2 + (b & 0x01)    # 2-3
+        offset = b >> 1    # 1-127
+        self.back_copy(offset, length)
+        self.__lastoffset = offset
+        self.__pair = False
+        return False
+
+
+    def __singlebyte(self):
+        offset = self.read_fixednumber(4) # 0-15
+        if offset:
+            self.back_copy(offset)
+        else:
+            self.read_literal('\x00')
+        self.__pair = True
+        return False
+
+    def do(self):
+        """returns decompressed buffer and consumed bytes counter"""
+        self.read_literal()
+        while True:
+            if self.__functions[self.read_setbits(3)]():
+                break
+        return self.out
+
+
+def create_header(original_data, compressed_data):
+    HEADER_TAG = "AP32"
+    HEADER_SIZE = struct.pack("<I", 0x18)
+    header = HEADER_TAG + HEADER_SIZE
+    header += struct.pack("<I", len(compressed_data))
+    header += struct.pack("<I", crc32(compressed_data) & 0xFFFFFFFF)
+    header += struct.pack("<I", len(original_data))
+    header += struct.pack("<I", crc32(original_data) & 0xFFFFFFFF)
+    return header + compressed_data
+
+
+def run():
+    parser = argparse.ArgumentParser(description='aplib compression and decompression ')
+    parser.add_argument("-d", "--decompress", action="store_true", dest="decompress", help="decompresses aplib data")
+    parser.add_argument("-c", "--compress", action="store_true", dest="compress", help="compresses aplib data")
+    parser.add_argument("-a", "--add-header", action="store_true", dest="header", help="add Aplib header to compressed data")
+    parser.add_argument("-i", "--input", required=True, type=argparse.FileType('rb'), dest="input", help="input file to be compressed or decompressed")
+    parser.add_argument("-o", "--output", required=True, type=argparse.FileType('wb'), dest="output", help="output file to be compressed or decompressed")
+    args = parser.parse_args()
+    if args.compress is False and args.decompress is False:
+        print "ERROR: Please choose  -c --compress or  -d --decompress"
+        parser.print_help()
+        return
+    if args.decompress:
+        compressed_data = args.input.read()
+        if "AP32" in compressed_data[:4]:
+            compressed_data = compressed_data[24:]
+        try:
+            decompressed_data = aplib_decompress(compressed_data).do()
+        except:
+            print "ERROR: Decompression Failed"
+            return
+        args.output.write(decompressed_data)
+        args.output.close()
+
+    if args.compress:
+        data_2_compress = args.input.read()
+        try: 
+            compressed_data = aplib_compress(data_2_compress).do()
+        except:
+            print "ERROR: Compression Failed"
+   
+        if args.header:
+            compressed_data = create_header(data_2_compress, compressed_data)             
+        args.output.write(compressed_data)
+        args.output.close()
+        
+run()
HTTPS SSH

You can clone a snippet to your computer for local editing. Learn more.