Snippets

Difan Zhang Python script to extract compressed IMG file produced by HD-COPY

Created by Difan Zhang last modified by unrecognized user

Why hdcopy.py?

Of course you could use DOSIMG.zip. However that only runs on Windows / DOS.

How to use?

You will need Python 2.7 and gflags. If you don't like gflags, tweak the code a bit. Should not be that hard.

What's HDCOPY format like?

Read this awesome article: http://techlog.jeffcai.com/?p=24.

Two small corrections:

#. Sector map length is 164, not 168. This might be miscalculation. #. RLE data structure is incorrect. It it RLEMARK CHAR REPEAT. Refer to the source code.

Why are you doing that?

For fun. I run https://software-archive.tifan.la/archive/, a Chinese MS-DOS software archive site. All files are in HD-COPY format, which means it's very hard to extract on modern platforms. I plan to extract all HD-COPY IMGs into standard floppy images in the near future, so it's necessary to have something that could programmatically extract everything. I tried a few tools on Windows, but they're very awkward to use. Plus, it's simply good to have an open source tool to read disk archives.

License?

Apache 2.0.

#!/usr/bin/python
# -*- coding: UTF-8 -*-
# vim:set shiftwidth=2 tabstop=2 expandtab textwidth=79:
import binascii
import struct
import logging
import gflags
from google.apputils import app

FLAGS = gflags.FLAGS

gflags.DEFINE_string('image', None, 'Disk image file name.')
gflags.DEFINE_string('output', None, 'Output image file name.')
gflags.DEFINE_string('sector_size', 0, 'Sector size.')
gflags.DEFINE_enum('version', '1.7', ['1.7', '2.0'], 'HD-COPY Disk Version.')
logging.basicConfig(format='[%(asctime)s] %(message)s', level=logging.INFO)


class EOFException(Exception):
    pass

class DiskImage(object):

    def __init__(self, filename, version):
        self.filename = filename
        self.version = version

        self.initial_offset = 0
        if version == '2.0':
            # We don't really need those.
            self.initial_offset = 14
        self.f = open(self.filename, 'rb')

        self.metadata = {
            'tracks': -1,
            'sectors_per_track': -1,
            'track_mask': []
        }

    def format_track_mask(self, track_mask):
        y = lambda x: x and u'█' or u'░'
        map = [y(x) for x in track_mask]
        return '\n'.join([''.join(map[:82]), ''.join(map[82:])])

    def dump(self):
        sector_size = FLAGS.sector_size
        self.f.seek(self.initial_offset)
        tracks, sectors_per_track = struct.unpack('BB', self.f.read(2))
        raw_track_mask = self.f.read(164)
        track_mask = struct.unpack('164B', raw_track_mask)
        self.metadata.update({
            'tracks': tracks,
            'sectors_per_track': sectors_per_track,
            'track_mask': track_mask
        })
        logging.info('Disk image has %d tracks and %d sectors per track.',
                     tracks, sectors_per_track)
        logging.info('Track map\n%s', self.format_track_mask(track_mask))

        # Now parse the real sectors
        floppy = []
        data = []
        index = -1
        while True:
            index += 1
            if index == len(self.metadata['track_mask']):
                d_ = self.f.read()
                if d_:
                    logging.error('Block %d/%d: data found.', index,
                                  len(self.metadata['track_mask']))
                    logging.info('===\n%s\n===', binascii.hexlify(d_))
                    raise ValueError('Inconsistent data map.')
                break
            elif not self.metadata['track_mask'][index]:
                if sector_size == 0:
                    raise ValueError('Invalid sector size.')
                floppy.extend('\0' * sector_size)
                continue
            else:
                try:
                    data = self.parse_next_block()
                except EOFException:
                    logging.error('Block %d/%d: no data found. '
                                  'Writing %d \\0\'s instead.', index,
                                  len(self.metadata['track_mask']), sector_size)
                    floppy.extend('\0' * sector_size)
                    continue

                if sector_size == 0:
                    sector_size = len(data)
                elif sector_size != len(data):
                    logging.error('Inconsistent sector size: prev=%d, curr=%d',
                                  sector_size, len(data))
                    raise ValueError
                floppy.extend(data)
        logging.info('Extracted %d bytes of data (%.1f KiB), '
                     'sector size is %d bytes.',
                     len(floppy), len(floppy) * 1./1024, sector_size)

        calculated_disk_size = len(self.metadata['track_mask']) * sector_size
        if len(floppy) != calculated_disk_size:
            logging.error('Disk length does not match. Should be %d.',
                          calculated_disk_size)
            raise ValueError
        with open(FLAGS.output, 'wb') as outf:
            for d in floppy:
                outf.write(d)

    def parse_next_block(self):
        header = self.f.read(3)
        if not header: raise EOFException()
        length, rle_marker = struct.unpack('Hc', header)
        logging.debug('Data block at 0x%X: length=0x%X, rle_marker=0x%X',
                      self.f.tell() - 3, length,
                      struct.unpack('B', rle_marker)[0])
        # 数据长度,从0x02算起,不包含本长度所占的2字节
        current_sector = self.f.read(length - 1)
        if len(current_sector) != length - 1:
            raise Exception('Incorrect sector length.')
        data = []
        # RLE decode
        skip = 0
        for pos in xrange(length - 1):
            if skip > 0:
                skip = skip - 1
                continue

            if current_sector[pos] == rle_marker:
                dat, repeat = struct.unpack('cB',
                                            current_sector[pos + 1:pos + 3])
                data.extend([dat] * repeat)
                skip = 2
            else:
                data.append(current_sector[pos])
        return data

def main(unused_argv):
  if FLAGS.output == None:
    FLAGS.output = FLAGS.image + '_extracted.img'
  logging.info('Disk image %s.', FLAGS.image)
  DiskImage(FLAGS.image, FLAGS.version).dump()

if __name__ == '__main__':
  app.run()

Comments (0)