Source

rsync_backup / test_rsync_backup.py

Full commit
import os
import tempfile
from os.path import join, exists

from rsync_backup import backup, rsync, dirshift


def rf(fname):
    """read the content of a file and return it"""
    with open(fname, "rb") as f:
        return f.read()

def wf(fname, content):
    """write content into a file"""
    with open(fname, "wb") as f:
        f.write(content)

def test_flat():
    src = tempfile.mkdtemp()
    dst = tempfile.mkdtemp()
    wf(join(src, "1"), "eins")
    wf(join(src, "2"), "zwei")
    rc = rsync(src, dst, src)
    assert rc == 0
    assert rf(join(dst, "1")) == "eins"
    assert rf(join(dst, "2")) == "zwei"


def test_recursive():
    src = tempfile.mkdtemp()
    dst = tempfile.mkdtemp()
    wf(join(src, "1"), "eins")
    src_sub = join(src, "sub")
    os.mkdir(src_sub)
    wf(join(src_sub, "2"), "zwei")
    rc = rsync(src, dst, src)
    assert rc == 0
    assert rf(join(dst, "1")) == "eins"
    assert rf(join(dst, "sub", "2")) == "zwei"

def test_dirshift():
    tmp = tempfile.mkdtemp()
    N = 10
    for i in range(N): # 0..N-1
        os.mkdir(join(tmp, str(i)))
        wf(join(tmp, str(i), str(i)), "")
    dirshift(tmp, N, "")
    assert exists(join(tmp, "0"))
    for i in range(1, N): # 1..N-1
        assert exists(join(tmp, str(i), str(i-1)))
    assert not exists(join(tmp, str(N)))

def test_backup():
    src = tempfile.mkdtemp()
    dst = tempfile.mkdtemp()
    N = 3
    backup(src, dst, N)
    assert os.listdir(join(dst, "0")) == []
    wf(join(src, "1"), "eins")
    wf(join(src, "2"), "zwei")
    backup(src, dst, N)
    assert sorted(os.listdir(join(dst, "0"))) == ["1", "2"]
    assert rf(join(dst, "0", "1")) == "eins"
    assert rf(join(dst, "0", "2")) == "zwei"
    wf(join(src, "1"), "einseins")
    backup(src, dst, N)
    assert sorted(os.listdir(join(dst, "0"))) == ["1", "2"]
    assert rf(join(dst, "0", "1")) == "einseins"
    assert rf(join(dst, "0", "2")) == "zwei"
    st = os.stat(join(dst, "0", "1"))
    assert st.st_nlink == 1
    st = os.stat(join(dst, "0", "2"))
    assert st.st_nlink == 2
    os.remove(join(src, "2"))
    backup(src, dst, N)
    assert sorted(os.listdir(join(dst, "0"))) == ["1"]
    assert rf(join(dst, "0", "1")) == "einseins"
    st = os.stat(join(dst, "1", "2"))
    assert st.st_nlink == 2