Source

python_tests / distributed_version_control_system_test.py

#!/usr/bin/env python
# encoding: utf-8

"""Test the speed of different distributed version control systems. 

First targets: git and Mercurial

Proceeding: Just call them via the bash and check their speed with the general_test_function. 

Ideas: Include bazaar, too. 

TODO: for multiple file change tests: backup_repo_git(), undo_git(), backup_repo_hg(), undo_hg()

This was inspired by the tests done at 
http://sayspy.blogspot.com/2006/11/bazaar-vs-mercurial-unscientific.html
"""

# also get some small useful functions and the genral test function
# We'll use many random tuples to create files with content, and to start shell processes. 
from small_useful_functions import create_many_random_tuples, shellcall
# and the general test function and the mean to get and evaluate the data
from general_test_function import general_test_function, multiple_tests, mean

def test_dvcs(number_of_files=10000, number_of_tests=10): 
    """Test one dvcs.
    
    For this we 
        - create a directory
        - initialize the repository
        - create many files with random content,  
        - commit the first version
        - append additional random data to each file and
        - commit the second version. 
    
    All this will be times. 
    """
    # First we need a dict to hold our results. 
    test_results = {} #: The results of our tests. 
    
    # Then we create the directory and then change into it
    from os import mkdir, chdir
    DIR="dir"
    try: 
        mkdir(DIR)
    except: pass
    chdir(DIR)
    
    # Now we get the names and data to write into the files. 
    data = create_many_random_tuples(number_of_files)
    
    # Then we remove all files within the dir
    shellcall("rm -Rf *")
    
    # For each of the tuples, we create a file with the first item as name and the second as data
    for i in data: 
        f = open(str(i[0]), "w")
        f.write(str(i[1]))
        f.close()
    
    # Now we initialize the repository over the general_test_function. 
    print "\n= init =\n"
    test_results["init"] = test_init(number_of_tests=number_of_tests)
    
    # Then we commit all files. 
    print "\n= initial_commit =\n"
    test_results["initial_commit"] = test_initial_commit(number_of_tests=number_of_tests)
    
    # Now we test the speed when appending a small amount of data.
    print "\n= commit_after_append_small =\n"
    test_results["commit_after_append_small"] = test_append_small(data, number_of_tests=number_of_tests)
    
    # And the speed when appending a large amount of data.
    print "\n= commit_after_append_of_many_lines =\n"
    test_results["commit_after_append_of_many_lines"] = test_append_many_lines(data, number_of_tests=number_of_tests)
    
    # And the speed when appending a large amount of data.
    print "\n= commit_after_append_of_one_long_line =\n"
    test_results["commit_after_append_of_one_long_line"] = test_append_one_long_line(data, number_of_tests=number_of_tests)
    
    return test_results

def test_init(number_of_tests=10):
    """Test initiating a repository."""
    # Now we initialize the repository over the general_test_function. 
    results = multiple_tests(number_of_tests, [init_git, init_hg])
    # print the result
    print "Init results: [git, hg]", mean(results)
    # And return it. 
    return mean(results)
    
def test_initial_commit(number_of_tests=10):
    """Test the initial commit. Must come after test_init."""
    # Then we commit all files
    results = multiple_tests(number_of_tests, [init_add_and_commit_git, init_add_and_commit_hg])

    # print the result
    print "Initial commit results: [git, hg]", mean(results)
    # and return it. 
    return mean(results)

def test_append_small(data, number_of_tests=10):
    """Append a short line to each file and commit."""
    # Now we need to append stuff to the files. We just use the filename + the same data . 
    for i in data: 
        f = open(str(i[0]), "a")
        f.write(str(i[0]) + str(i[1]))
        f.close()
    # Now we use a test armored by restoring the repo before each run. 
    # But first we need a backup. 
    backup_repo_git()
    backup_repo_hg()
    results = armored_tests([commit_git, commit_hg], before=[undo_git, undo_hg], number_of_tests=number_of_tests)
    # Print the result
    print "Commit results after append: [git, hg]", mean(results)
    # And return it
    return mean(results)

def test_append_many_lines(data, number_of_tests=10):
    """Test the speed of Mercurial and git when appending a large amount of data. """
    # We test what happens if we add the same data to each file, but lots of it. 
    # First we append the data, each item in its own line, 100 times.
    for i in data * 100: 
        f = open(str(i[0]), "a")
        for j in data: 
            f.write(str(j))
        f.close
    # Now we use a test armored by restoring the repo before each run. 
    # But first we need a backup. 
    backup_repo_git()
    backup_repo_hg()
    results = armored_tests([commit_git, commit_hg], before=[undo_git, undo_hg], number_of_tests=number_of_tests)
    # Print our result
    print "Commit results after append of same big data: [git, hg]", mean(results)
    # And return it
    return mean(results)

def test_append_one_long_line(data, number_of_tests=10):
    """Test the speed of Mercurial and git when appending a large amount of data. """
    # We test what happens if we add the same data to each file, but lots of it. 
    # First we append the data, 100 times long.
    for i in data: 
        f = open(str(i[0]), "a")
        f.write(str(data)*100)
        f.close
    # Now we use a test armored by restoring the repo before each run. 
    # But first we need a backup. 
    backup_repo_git()
    backup_repo_hg()
    results = armored_tests([commit_git, commit_hg], before=[undo_git, undo_hg], number_of_tests=number_of_tests)
    # Print our result
    print "Commit results after append of same big data: [git, hg]", mean(results)
    # And return it
    return mean(results)

def armored_tests(function_list, before=None, after=None, number_of_tests=10):
    """Run many tests, each armored by a list of functions which are called before the main function, and by one which are called after it."""
    # We need to add some handywork, here. 
    # First an empty list to which we will add the results. 
    results = []
    # Then we'll do the runs one after the other. 
    for i in range(number_of_tests): 
        # If we got before functions, we first execute them
        if before is not None: 
            for i in before: 
                i()
        # Now we run one test and add the results to our result list. 
        results += multiple_tests(1, [commit_git, commit_hg])
        # And if we got an after function, we run that, now.
        if after is not None: 
            for i in after: 
                i()
    # Then we return the results
    return results


def init_git(*args): 
    """Initialize a git repository."""
    # remove an old repo
    shellcall("rm -Rf .git")
    # create a new one
    shellcall("git init")

def init_add_and_commit_git(*args): 
    """Commit to the git repository."""
    # Init the repo
    init_git()
    # And commit
    commit_git()

def commit_git(*args): 
    """Commit to the git repository."""
    # Add all files
    shellcall("git add *")
    # And commit
    shellcall("git commit -m 'test'")
    
def backup_repo_git(*args):
    """Create a backup of the git directory, named .git.bak ."""    
    # First delete the backup dir
    shellcall("rm -Rf .git.bak")
    # copy the repo to the backup dir. 
    shellcall("cp -Rf .git .git.bak")

def undo_git(*args):
    """Delete and then restore the git dir from backup. """
    # Reset the repo
    shellcall("git reset")

def init_hg(*args): 
    """Initialize a Mercurial repository."""
    # remove an old repository
    shellcall("rm -Rf .hg")
    # create a new one. 
    shellcall("hg init")

def init_add_and_commit_hg(*args): 
    """Commit to the Mercurial repository."""
    # Init the repo
    init_hg()
    # And commit
    commit_hg()

def commit_hg(*args): 
    """Commit to the Mercurial repository."""
    # Add all untracked files
    shellcall("hg add *")
    # And commit
    shellcall("hg commit -m 'test'")
    
def backup_repo_hg(*args):
    """Create a backup of the hg directory, named .hg.bak ."""
    # First delete the backup dir
    shellcall("rm -Rf .hg.bak")
    # copy the repo to the backup dir. 
    shellcall("cp -Rf .hg .hg.bak")

def undo_hg(*args):
    """Delete and then restore the hg dir from backup. """
    # Rollback the repo
    shellcall("hg rollback")
    

#### Self-Test ####
if __name__ == "__main__": 
    from sys import argv
    # If we got 2 additional command line arguments, the first is the number of files, the second the number of tests. 
    if len(argv) == 3: 
        output = test_dvcs(number_of_files=int(argv[1]), number_of_tests=int(argv[2]))
    # If we got 1 arg, we just pass it as the number of files. 
    elif len(argv) == 2: 
        output = test_dvcs(number_of_files=int(argv[1]))
    # If we got no args, we just start it with default values. 
    elif len(argv) == 1: 
        output = test_dvcs()
    print "\n======\nStructure: [git, hg], time in seconds, mean value. \n"
    for i, j in output.items(): 
        print i, j
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.