+# -*- coding: cp1252 -*-
+ Author: Ange Albertini, extended by Alexander Hanel
+ Date Modified: 2016/08/07
+ This script is a modification of Ange Albertini's aplib code. This
+ script adds the optional aplib header and allows data to be compressed
+ or decompressed via the command line.
+usage: aplib2.py [-h] [-d] [-c] [-a] -i INPUT -o OUTPUT
+aplib compression and decompression
+ -h, --help show this help message and exit
+ -d, --decompress decompresses aplib data
+ -c, --compress compresses aplib data
+ -a, --add-header add Aplib header to compressed data
+ -i INPUT, --input INPUT
+ input file to be compressed or decompressed
+ -o OUTPUT, --output OUTPUT
+ output file to be compressed or decompressed
+Offset(h) 00 01 02 03 04 05 06 07 08 09 0A 0B 0C 0D 0E 0F
+00000000 41 50 33 32 18 00 00 00 6A 2D 00 00 C3 5B 3C CC AP32....j-...[<.
+00000010 00 64 00 00 3A F8 81 21 4D 38 5A 90 38 03 66 02 .d..:.!M8Z.8.f.
+Offset(h) 00 01 02 03 04 05 06 07 08 09 0A 0B 0C 0D 0E 0F
+00000000 4D 38 5A 90 38 03 66 02 04 09 71 FF 81 B8 C2 91 M8Z.8.f...q....‘
+00000010 01 40 C2 15 C6 80 09 1C 0E 1F BA F8 00 B4 09 CD ................
+ * tag: 4 bytes, 41 50 33 32 or "AP32"
+ * header_size: 4 bytes // offset to the compressed data
+ * packed_size: 4 bytes // size of the compressed data
+ * packed_crc: 4 bytes // crc of the compressed data
+ * original_size: 4 bytes // original size of the uncompressed data
+ * original_crc: 4 bytes // crc of the uncompress data
+Source: See Aplib's spack.asm
+from optparse import OptionParser
+def find_longest_match(s, sub):
+ """returns the number of byte to look backward and the length of byte to copy)"""
+ pos = dic.rfind(word, 0, limit + 1)
+ while l < len(sub) - 1:
+ pos = dic.rfind(word, 0, limit + 1)
+def int2lebin(value, size):
+ """ouputs value in binary, as little-endian"""
+ result = result + chr((value >> (8 * i)) & 0xFF )
+def modifystring(s, sub, offset):
+ """overwrites 'sub' at 'offset' of 's'"""
+ return s[:offset] + sub + s[offset + len(sub):]
+ """return the bit length of an integer"""
+ """bit machine for variable-sized auto-reloading tag compression"""
+ def __init__(self, tagsize):
+ """tagsize is the number of bytes that takes the tag"""
+ self.__tagsize = tagsize
+ self.__maxbit = (self.__tagsize * 8) - 1
+ self.__isfirsttag = True
+ """builds an output string of what's currently compressed:
+ currently output bit + current tag content"""
+ tagstr = int2lebin(self.__tag, self.__tagsize)
+ return modifystring(self.out, tagstr, self.__tagoffset)
+ def write_bit(self, value):
+ """writes a bit, make space for the tag if necessary"""
+ self.__isfirsttag = False
+ self.out = self.getdata()
+ self.__tagoffset = len(self.out)
+ self.out += "".join(["\x00"] * self.__tagsize)
+ self.__curbit = self.__maxbit
+ self.__tag |= (1 << self.__curbit)
+ def write_bitstring(self, s):
+ """write a string of bits"""
+ self.write_bit(0 if c == "0" else 1)
+ def write_byte(self, b):
+ """writes a char or a number"""
+ assert len(b) == 1 if isinstance(b, str) else 0 <= b <= 255
+ self.out += b[0:1] if isinstance(b, str) else chr(b)
+ def write_fixednumber(self, value, nbbit):
+ """write a value on a fixed range of bits"""
+ for i in xrange(nbbit - 1, -1, -1):
+ self.write_bit( (value >> i) & 1)
+ def write_variablenumber(self, value):
+ length = getbinlen(value) - 2 # the highest bit is 1
+ self.write_bit(value & (1 << length))
+ for i in xrange(length - 1, -1, -1):
+ self.write_bit(value & (1 << i))
+class _bits_decompress():
+ """bit machine for variable-sized auto-reloading tag decompression"""
+ def __init__(self, data, tagsize):
+ self.__tagsize = tagsize
+ """return the current byte offset"""
+ """read next bit from the stream, reloads the tag if necessary"""
+ self.__curbit = (self.__tagsize * 8) - 1
+ self.__tag = ord(self.read_byte())
+ for i in xrange(self.__tagsize - 1):
+ self.__tag += ord(self.read_byte()) << (8 * (i + 1))
+ bit = (self.__tag >> ((self.__tagsize * 8) - 1)) & 0x01
+ return self.__offset == len(self.__in) and self.__curbit == 1
+ """read next byte from the stream"""
+ if type(self.__in) == str:
+ result = self.__in[self.__offset]
+ elif type(self.__in) == file:
+ result = self.__in.read(1)
+ def read_fixednumber(self, nbbit, init=0):
+ """reads a fixed bit-length number"""
+ for i in xrange(nbbit):
+ result = (result << 1) + self.read_bit()
+ def read_variablenumber(self):
+ """return a variable bit-length number x, x >= 2
+ reads a bit until the next bit in the pair is not set"""
+ result = (result << 1) + self.read_bit()
+ result = (result << 1) + self.read_bit()
+ def read_setbits(self, max_, set_=1):
+ """read bits as long as their set or a maximum is reached"""
+ while result < max_ and self.read_bit() == set_:
+ def back_copy(self, offset, length=1):
+ for i in xrange(length):
+ self.out += self.out[-offset]
+ def read_literal(self, value=None):
+ self.out += self.read_byte()
+aPLib, LZSS based lossless compression algorithm
+Jorgen Ibsen U{http://www.ibsensoftware.com}
+def lengthdelta(offset):
+ if offset < 0x80 or 0x7D00 <= offset:
+class aplib_compress(_bits_compress):
+ aplib compression is based on lz77
+ def __init__(self, data, length=None):
+ _bits_compress.__init__(self, 1)
+ self.__length = length if length is not None else len(data)
+ def __literal(self, marker=True):
+ self.write_byte(self.__in[self.__offset])
+ def __block(self, offset, length):
+ self.write_bitstring("10")
+ # if the last operations were literal or single byte
+ # and the offset is unchanged since the last block copy
+ # we can just store a 'null' offset and the length
+ if self.__pair and self.__lastoffset == offset:
+ self.write_variablenumber(2) # 2-
+ self.write_variablenumber(length)
+ high = (offset >> 8) + 2
+ self.write_variablenumber(high)
+ self.write_variablenumber(length - lengthdelta(offset))
+ self.__offset += length
+ self.__lastoffset = offset
+ def __shortblock(self, offset, length):
+ assert 2 <= length <= 3
+ assert 0 < offset <= 127
+ self.write_bitstring("110")
+ b = (offset << 1 ) + (length - 2)
+ self.__offset += length
+ self.__lastoffset = offset
+ def __singlebyte(self, offset):
+ assert 0 <= offset < 16
+ self.write_bitstring("111")
+ self.write_fixednumber(offset, 4)
+ self.write_bitstring("110")
+ self.write_byte(chr(0))
+ while self.__offset < self.__length:
+ offset, length = find_longest_match(self.__in[:self.__offset],
+ self.__in[self.__offset:])
+ c = self.__in[self.__offset]
+ elif length == 1 and 0 <= offset < 16:
+ self.__singlebyte(offset)
+ elif 2 <= length <= 3 and 0 < offset <= 127:
+ self.__shortblock(offset, length)
+ elif 3 <= length and 2 <= offset:
+ self.__block(offset, length)
+ #raise ValueError("no parsing found", offset, length)
+class aplib_decompress(_bits_decompress):
+ def __init__(self, data):
+ _bits_decompress.__init__(self, data, tagsize=1)
+ self.__pair = True # paired sequence
+ b = self.read_variablenumber() # 2-
+ if b == 2 and self.__pair : # reuse the same offset
+ offset = self.__lastoffset
+ length = self.read_variablenumber() # 2-
+ offset = (high << 8) + ord(self.read_byte())
+ length = self.read_variablenumber() # 2-
+ length += lengthdelta(offset)
+ self.__lastoffset = offset
+ self.back_copy(offset, length)
+ def __shortblock(self):
+ b = ord(self.read_byte())
+ length = 2 + (b & 0x01) # 2-3
+ offset = b >> 1 # 1-127
+ self.back_copy(offset, length)
+ self.__lastoffset = offset
+ def __singlebyte(self):
+ offset = self.read_fixednumber(4) # 0-15
+ self.read_literal('\x00')
+ """returns decompressed buffer and consumed bytes counter"""
+ if self.__functions[self.read_setbits(3)]():
+def create_header(original_data, compressed_data):
+ HEADER_SIZE = struct.pack("<I", 0x18)
+ header = HEADER_TAG + HEADER_SIZE
+ header += struct.pack("<I", len(compressed_data))
+ header += struct.pack("<I", crc32(compressed_data) & 0xFFFFFFFF)
+ header += struct.pack("<I", len(original_data))
+ header += struct.pack("<I", crc32(original_data) & 0xFFFFFFFF)
+ return header + compressed_data
+ parser = argparse.ArgumentParser(description='aplib compression and decompression ')
+ parser.add_argument("-d", "--decompress", action="store_true", dest="decompress", help="decompresses aplib data")
+ parser.add_argument("-c", "--compress", action="store_true", dest="compress", help="compresses aplib data")
+ parser.add_argument("-a", "--add-header", action="store_true", dest="header", help="add Aplib header to compressed data")
+ parser.add_argument("-i", "--input", required=True, type=argparse.FileType('rb'), dest="input", help="input file to be compressed or decompressed")
+ parser.add_argument("-o", "--output", required=True, type=argparse.FileType('wb'), dest="output", help="output file to be compressed or decompressed")
+ args = parser.parse_args()
+ if args.compress is False and args.decompress is False:
+ print "ERROR: Please choose -c --compress or -d --decompress"
+ compressed_data = args.input.read()
+ if "AP32" in compressed_data[:4]:
+ compressed_data = compressed_data[24:]
+ decompressed_data = aplib_decompress(compressed_data).do()
+ print "ERROR: Decompression Failed"
+ args.output.write(decompressed_data)
+ data_2_compress = args.input.read()
+ compressed_data = aplib_compress(data_2_compress).do()
+ print "ERROR: Compression Failed"
+ compressed_data = create_header(data_2_compress, compressed_data)
+ args.output.write(compressed_data)