Commits

Anonymous committed 13c2190

Changes append tests to doing multiple tests. The number of tests can now be passed via the commandline.

  • Participants
  • Parent commits 7d3f007

Comments (0)

Files changed (3)

distributed_version_control_system_test.py

 
 Ideas: Include bazaar, too. 
 
+TODO: for multiple file change tests: backup_repo_git(), restore_repo_git(), backup_repo_hg(), restore_repo_hg()
+
 This was inspired by the tests done at 
-- http://sayspy.blogspot.com/2006/11/bazaar-vs-mercurial-unscientific.html
+http://sayspy.blogspot.com/2006/11/bazaar-vs-mercurial-unscientific.html
 """
 
-# First get the call function, so we can call the systems via the bash
-from subprocess import call
-
-# for convenience, we create a call, which uses the shell by default
-def shellcall(command): 
-    """Call a string on the shell."""
-    # print "shellcall", command
-    return call(command, shell=True)
-
 # also get some small useful functions and the genral test function
-# We'll use many random tuples to create files with content
-from small_useful_functions import create_many_random_tuples
-# and the general test function and the mean to get and evaluate the data. 
+# We'll use many random tuples to create files with content, and to start shell processes. 
+from small_useful_functions import create_many_random_tuples, shellcall
+# and the general test function and the mean to get and evaluate the data
 from general_test_function import general_test_function, multiple_tests, mean
 
-def test_dvcs(number_of_files=10000): 
+def test_dvcs(number_of_files=10000, number_of_tests=10): 
     """Test one dvcs.
     
     For this we 
     # Then we remove all files within the dir
     shellcall("rm -Rf *")
     
-    # For each of the tuples, we create a file with the first item as name and teh second as data
+    # For each of the tuples, we create a file with the first item as name and the second as data
     for i in data: 
         f = open(str(i[0]), "w")
         f.write(str(i[1]))
         f.close()
     
     # Now we initialize the repository over the general_test_function. 
-    results = multiple_tests(10, [init_git, init_hg])
-    # Add it to our test results
-    test_results["init"] = mean(results)
-    # And print it
+    print "init"
+    test_results["init"] = test_init(number_of_tests=number_of_tests)
+    
+    # Then we commit all files. 
+    print "initial_commit"
+    test_results["initial_commit"] = test_initial_commit(number_of_tests=number_of_tests)
+    
+    # Now we test the speed when appending a small amount of data.
+    print "commit_after_append_small"
+    test_results["commit_after_append_small"] = test_append_small(data, number_of_tests=number_of_tests)
+    
+    # And the speed when appending a large amount of data.
+    print "commit_after_append_of_many_lines"
+    test_results["commit_after_append_of_many_lines"] = test_append_many_lines(data, number_of_tests=number_of_tests)
+    
+    # And the speed when appending a large amount of data.
+    print "commit_after_append_of_one_long_line"
+    test_results["commit_after_append_of_one_long_line"] = test_append_one_long_line(data, number_of_tests=number_of_tests)
+    
+    return test_results
+
+def test_init(number_of_tests=10):
+    """Test initiating a repository."""
+    # Now we initialize the repository over the general_test_function. 
+    results = multiple_tests(number_of_tests, [init_git, init_hg])
+    # print the result
     print "Init results: [git, hg]", mean(results)
+    # And return it. 
+    return mean(results)
     
+def test_initial_commit(number_of_tests=10):
+    """Test the initial commit. Must come after test_init."""
     # Then we commit all files
-    results = multiple_tests(10, [init_add_and_commit_git, init_add_and_commit_hg])
-    # Add it to our test results
-    test_results["initial_commit"] = mean(results)
-    # And print it
+    results = multiple_tests(number_of_tests, [init_add_and_commit_git, init_add_and_commit_hg])
+
+    # print the result
     print "Initial commit results: [git, hg]", mean(results)
-    
+    # and return it. 
+    return mean(results)
+
+def test_append_small(data, number_of_tests=10):
+    """Append a short line to each file and commit."""
     # Now we need to append stuff to the files. We just use the filename + the same data . 
     for i in data: 
         f = open(str(i[0]), "a")
         f.write(str(i[0]) + str(i[1]))
         f.close()
-        
-    # We do only one test. TODO: Copy the repo, then move it back each time and commit. 
-    results = multiple_tests(1, [commit_git, commit_hg])
-    # Add it to our test results
-    test_results["commit_after_append"] = mean(results)
-    # And print it
+    # Now we use a test armored by restoring the repo before each run. 
+    # But first we need a backup. 
+    backup_repo_git()
+    backup_repo_hg()
+    results = armored_tests([commit_git, commit_hg], before=[restore_repo_git, restore_repo_hg], number_of_tests=number_of_tests)
+    # Print the result
     print "Commit results after append: [git, hg]", mean(results)
-    
-    # Now we test what happens if we add the same data to each file, but lots of it. 
+    # And return it
+    return mean(results)
+
+def test_append_many_lines(data, number_of_tests=10):
+    """Test the speed of Mercurial and git when appending a large amount of data. """
+    # We test what happens if we add the same data to each file, but lots of it. 
+    # First we append the data, each item in its own line, 100 times.
+    for i in data * 100: 
+        f = open(str(i[0]), "a")
+        for j in data: 
+            f.write(str(j))
+        f.close
+    # Now we use a test armored by restoring the repo before each run. 
+    # But first we need a backup. 
+    backup_repo_git()
+    backup_repo_hg()
+    results = armored_tests([commit_git, commit_hg], before=[restore_repo_git, restore_repo_hg], number_of_tests=number_of_tests)
+    # Print our result
+    print "Commit results after append of same big data: [git, hg]", mean(results)
+    # And return it
+    return mean(results)
+
+def test_append_one_long_line(data, number_of_tests=10):
+    """Test the speed of Mercurial and git when appending a large amount of data. """
+    # We test what happens if we add the same data to each file, but lots of it. 
+    # First we append the data, 100 times long.
     for i in data: 
         f = open(str(i[0]), "a")
-        f.write(str(data))
+        f.write(str(data)*100)
         f.close
-    
-    # We do only one test. TODO: Copy the repo, then move it back each time and commit. 
-    results = multiple_tests(1, [commit_git, commit_hg])
-    # Add it to our test results
-    test_results["commit_after_append_of_same_big_data"] = mean(results)
-    # And print it
+    # Now we use a test armored by restoring the repo before each run. 
+    # But first we need a backup. 
+    backup_repo_git()
+    backup_repo_hg()
+    results = armored_tests([commit_git, commit_hg], before=[restore_repo_git, restore_repo_hg], number_of_tests=number_of_tests)
+    # Print our result
     print "Commit results after append of same big data: [git, hg]", mean(results)
-    
-    
-    return test_results
+    # And return it
+    return mean(results)
 
-def init_git(): 
+def armored_tests(function_list, before=None, after=None, number_of_tests=10):
+    """Run many tests, each armored by a list of functions which are called before the main function, and by one which are called after it."""
+    # We need to add some handywork, here. 
+    # First an empty list to which we will add the results. 
+    results = []
+    # Then we'll do the runs one after the other. 
+    for i in range(number_of_tests): 
+        # If we got before functions, we first execute them
+        if before is not None: 
+            for i in before: 
+                i()
+        # Now we run one test and add the results to our result list. 
+        results += multiple_tests(1, [commit_git, commit_hg])
+        # And if we got an after function, we run that, now.
+        if after is not None: 
+            for i in after: 
+                i()
+    # Then we return the results
+    return results
+
+
+def init_git(*args): 
     """Initialize a git repository."""
     # remove an old repo
     shellcall("rm -Rf .git")
     # create a new one
     shellcall("git init")
 
-def init_add_and_commit_git(): 
+def init_add_and_commit_git(*args): 
     """Commit to the git repository."""
     # Init the repo
     init_git()
     # And commit
     commit_git()
 
-def commit_git(): 
+def commit_git(*args): 
     """Commit to the git repository."""
     # Add all files
     shellcall("git add *")
     # And commit
     shellcall("git commit -m 'test'")
     
+def backup_repo_git(*args):
+    """Create a backup of the git directory, named .git.bak ."""
+    # copy the repo to the backup dir. 
+    shellcall("cp -Rf .git .git.bak")
 
-def init_hg(): 
+def restore_repo_git(*args):
+    """Delete and then restore the git dir from backup. """
+    # Delete the repo
+    shellcall("rm -Rf .git")
+    # And copy it from the backup
+    shellcall("cp -Rf .git.bak .git")
+
+def init_hg(*args): 
     """Initialize a Mercurial repository."""
     # remove an old repository
     shellcall("rm -Rf .hg")
     # create a new one. 
     shellcall("hg init")
 
-def init_add_and_commit_hg(): 
+def init_add_and_commit_hg(*args): 
     """Commit to the Mercurial repository."""
     # Init the repo
     init_hg()
     # And commit
     commit_hg()
 
-def commit_hg(): 
+def commit_hg(*args): 
     """Commit to the Mercurial repository."""
     # And commit
     shellcall("hg commit -m 'test'")
+    
+def backup_repo_hg(*args):
+    """Create a backup of the git directory, named .git.bak ."""
+    # copy the repo to the backup dir. 
+    shellcall("cp -Rf .hg .hg.bak")
+
+def restore_repo_hg(*args):
+    """Delete and then restore the git dir from backup. """
+    # First delete the repo
+    shellcall("rm -Rf .hg")
+    # Then copy it from the backup
+    shellcall("cp -Rf .hg.bak .hg")
 
 
 #### Self-Test ####
     from yaml import dump
     # Get the command line args to get the number of files to test
     from sys import argv
-    try: 
-        print dump(test_dvcs(number_of_files=int(argv[1])))
-    except: 
-        print "Please provide the number of files to test."
+    # If we got 2 additional command line arguments, the first is the number of files, the second the number of tests. 
+    if len(argv) == 3: 
+        output = dump(test_dvcs(number_of_files=int(argv[1]), number_of_tests=int(argv[2])))
+    # If we got 1 arg, we just pass it as the number of files. 
+    elif len(argv) == 2: 
+        output = dump(test_dvcs(number_of_files=int(argv[1])))
+    # If we got no args, we just start it with default values. 
+    elif len(argv) == 1: 
+        output = dump(test_dvcs())
+    print "\n======\n"
+    print output

general_test_function.py

 The general_test_function can be used to test the execution time of any number of functions. 
 """
 
-def general_test_function(functions, *args): 
+# We need the current time in seconds
+from time import time
+
+def general_test_function(functions, offset=None, offset_function=None, offset_function_args=None, arg_list=None, *args): 
     """Compare the execution speed of different functions in the list of functions. 
     
 @type functions: list of functions
 @param functions: [function1, function2, ...].
+@type offset: int or None
+@param offset: Constant offset additional to the time needed to call time()
+@type offset_function: Function or None
+@param offset_function: A function which gets callestad to determine an additional offset. 
+@type offset_function_args: List or None
+@param offset_function_args: The arguments of the offset function. TODO: At the moment only the first one is being used. 
 @type args: any
 @param args: The argument(s) to pass to each of the functions. All functions get all and the same argument(s)"""
-    # We need the current time in seconds
-    from time import time
 
+    # Now add the offset from the passeyd offset function. 
+    time_offset = calculate_offset_from_function(offset_function, offset_function_args)
+    # Then add the general offset passed to the test function. 
+    if offset is not None: 
+        time_offset += offset
+    
+    # Get start and stop times of all functions and put them into a list of tuples: (time_needed, function) - time needed first, so we can sort easily. 
+    #: A list of functions with execution speed
+    execution_times = []
+    for i in functions: 
+        # Start time of the function. 
+        time_needed = get_execution_time(i, time_offset, *args)
+        # append the tuple of execution time and function to the list of execution_speeds. 
+        execution_times.append((time_needed, i))
+    # Return the list of execution times, sorted by functions. 
+    return execution_times
+
+def check_offset_for_time_calculation():
+    """Check the time needed to call time() two times."""
     # First evaluate how long Python takes to get two times. We need to substract this value from the result to remove a constant offset. 
     # Get the current time two times
     start_time = time()
     finish_time = time()
     # Now calculate the difference which is how long Python takes for getting the time. 
     #: The offset due to getting the time two times. This is less than 1*10e-5 s for me.
-    time_offset = finish_time - start_time
+    return finish_time - start_time
 
-    # Get start and stop times of all functions and put them into a list of tuples: (time_needed, function) - time needed first, so we can sort easily. 
-    #: A list of functions with execution speed
-    execution_times = []
-    for i in functions: 
-        # Start time of the function. 
-        start_time = time()
-        # Execute the function with the given argument
-        i(*args)
-        # Get the finish time
-        finish_time = time()
-        # Get the difference
-        time_needed = finish_time - start_time - time_offset
-        # append the tuple of execution time and function to the list of execution_speeds. 
-        execution_times.append((time_needed, i))
-    # Return the list of execution times, sorted by functions. 
-    return execution_times
+def get_execution_time(i, *args):
+    """Return the time needed for a function."""
+    # Start time of the function. 
+    start_time = time()
+    # Execute the function with the given argument
+    i(*args)
+    # Get the finish time
+    finish_time = time()
+    # Get the difference
+    time_needed = finish_time - start_time
+    return time_needed
+
+def calculate_offset_from_function(function=None, args=None):
+    """Calculate the offset from a function which represents the basic constant offset.
+    
+    TODO: Pass all arguments from the list to the function. 
+    """
+    # If a function is being passed, start it in the general test function and calculate the time needed for it. 
+    if function is not None: 
+        # If we get args, pass them to the function 
+        if args is not None: 
+            # TODO: The function can only take one arg at the moment. 
+            time_needed = get_execution_time(function, args[0])
+        else: 
+            # Call it without args, if we got none. 
+            time_needed = get_execution_time(function)
+        # return the time needed to call the function
+        return time_needed
+    else: 
+        # If we got no function, no offset is needed, so we return 0. 
+        return get_execution_time(just_pass)
+
+def just_pass(*args): 
+    """Do nothing. This is the trivial function. """
+    pass
 
 def multiple_tests(number_of_tests, functions, *args): 
     """run multiple tests."""
         mean_time[i] = float(mean_time[i]) / len(data)
     return mean_time # TODO: Calculate the standard deviation, so we know how certain the result is. 
 
-
+def call_with_arg_list(function, arg_list=None, *args): 
+    """Call a function with a list containing arguments which get expanded into commandline arguments.
+    
+    Plans: 
+        - All tests should be started with this function. 
+        
+    To test: 
+        - Does this work with the general test function, which requires a list of functions as argument. 
+    """
+    # If we have a 
+    if len(arg_list) > 0 and arg_list is not None: 
+        return call_with_arg_list(function, arg_list[:-1], arg_list[-1], *args)
+    else: 
+        return function(*args)
+        
 
 
 ### Self-Test ###

small_useful_functions.py

     
     return many_tuples
 
+
+
+# For being able to call shell commands, we want our own small function: shellcall. 
+
+# First get the call function, so we can call the systems via the bash
+from subprocess import call
+
+# and, for convenience, we create a call, which uses the shell by default
+def shellcall(command): 
+    """Call a string on the shell."""
+    # print "shellcall", command
+    return call(command, shell=True)