Commits

Erik Schweller  committed 6c7fd22

make some test utils and add a test, bump version to .1

  • Participants
  • Parent commits f09c685

Comments (0)

Files changed (4)

File dedupe_copy/test/__init__.py

Empty file added.

File dedupe_copy/test/test_copy.py

+"""Tests around copy operations"""
+
+
+from functools import partial
+import os
+import unittest
+
+import utils
+
+from dedupe_copy import dedupe_copy
+
+
+do_copy = partial(dedupe_copy.run_dupe_copy, read_from_path=None,
+    extensions=None, read_manifest_data=None, manifest_out_path=None,
+    path_rules=None, copy_to_path=None, ignore_old_collisions=False,
+    ignored_patterns=None, csv_report_path=None, walk_threads=4,
+    read_threads=8, copy_threads=8, convert_manifest_paths_to='',
+    convert_manifest_paths_from='', no_walk=False)
+
+
+class TestCopySystem(unittest.TestCase):
+    """Test system level copy of files"""
+
+    def setUp(self):
+        """Create temporary directory and test data"""
+        self.temp_dir = utils.make_temp_dir('copy_sys')
+        self.file_data = utils.make_file_tree(self.temp_dir, file_count=10,
+            extensions=None)
+
+    def tearDown(self):
+        """Remove temporary directory and all test files"""
+        utils.remove_dir(self.temp_dir)
+
+    def test_copy_no_change_no_dupes(self):
+        """Test copying of small tree to same structure - only tests the file
+        layout
+        """
+        copy_to_path = os.path.join(self.temp_dir, 'tree_copy')
+        # perform the copy
+        do_copy(read_from_path=self.temp_dir, copy_to_path=copy_to_path,
+            path_rules=['*:no_change'])
+        # the abspath is preserved when copying into a new target
+        copy_to_path = os.path.join(copy_to_path, self.temp_dir)
+        # verify we didn't alter the existing data
+        result, notes = utils.verify_files(self.file_data)
+        self.assertTrue(result, 'Altered original files: {0}'.format(notes))
+        # verify the copied data
+        for file_info in self.file_data:
+            file_info[0] = file_info[0].replace(self.temp_dir, copy_to_path, 1)
+        result, notes = utils.verify_files(self.file_data)
+        self.assertTrue(result, 'Failed to copy files: {0}'.format(notes))
+
+
+if __name__ == '__main__':
+    unittest.main()

File dedupe_copy/test/utils.py

+"""Utilities to support tests"""
+
+
+from collections import deque
+import hashlib
+import os
+import random
+import shutil
+import string
+import tempfile
+
+
+RANDOM_DATA = 'Thisdatamaybefarfromrandom,butithasnoduplicatebigrAms.'
+
+
+def make_temp_dir(description='test_temp'):
+    """Return the path to a temporary directory"""
+    abs_path = tempfile.mkdtemp(suffix=description)
+    print 'Made temporary directory: {0}'.format(abs_path)
+    return abs_path
+
+
+def remove_dir(root):
+    """Remove a directory"""
+    if root is None:
+        return
+    shutil.rmtree(root)
+    print 'Removed temporary directory: {0}'.format(root)
+
+
+def write_file(src, seed, size=1000, initial=None):
+    """Write a file that is reproduce-able given size, seed, and initial data
+
+    :param src: Path to file that will be created / truncated and re-written
+    :param seed: integer up to len of RANDOM_DATA - 1
+    :param size: file size in bytes
+    :param initial: string to write to the file first
+
+    Returns a tuple of (checksum, mtime)
+    """
+    written = 0
+    check = hashlib.md5()
+    data_chunk_size = len(RANDOM_DATA)
+    if seed >= data_chunk_size:
+        print 'Warning: data uniqueness not guaranteed'
+    data = deque(RANDOM_DATA)
+    data.rotate(seed)
+    data = ''.join(data)
+    dirs = os.path.dirname(src)
+    if not os.path.exists(dirs):
+        try:
+            os.makedirs(dirs)
+        except Exception:
+            pass  # might be a threading race if making lots in threads
+    with open(src, 'wb') as fh:
+        if initial:
+            fh.write(initial[:size])
+            written += len(initial)
+            check.update(initial[:size])
+        while written < size:
+            if written + data_chunk_size <= size:
+                chunk = str(data)
+            else:
+                chunk = str(data[:size - written])
+            fh.write(chunk)
+            check.update(chunk)
+            written += len(chunk)
+    return check.hexdigest(), os.path.getmtime(src)
+
+
+def get_random_file_name(root='', prefix=None, name_len=10, extensions=None):
+    """Return a random file name. If extensions is supplied, one will be chosen
+    from the list. Will try to only return new names. If a root is suppled, a
+    full path to the file will be formed.
+    """
+    while True:
+        name = []
+        if prefix:
+            name.append(prefix)
+            name_len -= len(prefix)
+        for _ in xrange(name_len):
+            name.append(random.choice(string.ascii_letters))
+        if extensions:
+            name.extend(random.choice(extensions))
+        name = ''.join(name)
+        full_path = os.path.join(root, name)
+        if not os.path.exists(full_path):
+            return full_path
+
+
+def get_random_dir_path(root, max_depth=4, existing_dirs=None,
+    new_dir_chance=.3, new_path_only=False):
+    if existing_dirs is None:
+        existing_dirs = set()
+    while True:
+        dir_count = random.randint(0, max_depth)
+        dirs = [root]
+        for i in range(dir_count):
+            if existing_dirs and random.random() <= new_dir_chance:
+                dirs.append(random.choice(list(existing_dirs)))
+            else:
+                dirs.append(get_random_file_name())
+            existing_dirs.add(dirs[-1])
+        dirs = os.path.join(*dirs)
+        if not new_path_only or not os.path.exists(dirs):
+            return dirs
+
+
+def make_file_tree(root, file_count=10, extensions=None):
+    """Create a tree of files with various extensions off of root,
+    returns a list if lists such as [[item, hash, mtime], [item, hash, mtime]]
+    """
+    if not extensions:
+        extensions = ['.mov', '.mp3', '.png', '.jpg']
+    file_list = []
+    # this set is grown by the get_random_dir_path function
+    existing_dirs = set()
+    for i in xrange(file_count):
+        fn = get_random_file_name(root=get_random_dir_path(root,
+            existing_dirs=existing_dirs), extensions=extensions)
+        src = os.path.join(root, fn)
+        check, mtime = write_file(src, 0, size=1000, initial=str(i))
+        file_list.append([src, check, mtime])
+    return file_list
+
+
+def get_hash(src):
+    check = hashlib.md5()
+    with open(src, 'rb') as fh:
+        d = fh.read(64000)
+        while d:
+            check.update(d)
+            d = fh.read(64000)
+    return check.hexdigest()
+
+
+def verify_files(file_list):
+    """Inspect a list of the form [[item, hash, mtime], [item, hash, mtime]]
+    and return a tuple of (True|False, Message). True in the tuple indicates
+    a match, if False, the message will contain the mismatches.
+    """
+    print 'Verifying {0} items'.format(len(file_list))
+    success = True
+    message = []
+    for src, check, mtime in file_list:
+        try:
+            new_check = get_hash(src)
+            new_mtime = os.path.getmtime(src)
+            if not check == new_check:
+                success = False
+                message.append("Hash mismatch on {src}. Actual: {act} "
+                    "Expected: {exp}".format(act=check, exp=new_check,
+                        src=src))
+            if not mtime == new_mtime:
+                success = False
+                message.append("Mtime mismatch on {src}. Actual: {act} "
+                    "Expected: {exp}".format(act=mtime, exp=new_mtime,
+                        src=src))
+        except Exception as err:
+            msg = 'Failed to read {0}: {1}'.format(src, err)
+            message.append(msg)
+            print msg
+            success = False
+    return success, '\n'.join(message)
 
 setup(
     name='DedupeCopy',
-    version='0.1dev',
+    version='0.1',
     author='Erik Schweller',
     author_email='othererik@gmail.com',
-    packages=['dedupe_copy', ],
+    packages=['dedupe_copy', 'dedupe_copy.test', ],
     url='http://pypi.python.org/pypi/DedupeCopy/',
     download_url='http://www.bitbucket.org/othererik/dedupe_copy',
     scripts=['dedupe_copy/dedupe_copy.py', ],