Source

freenet-spider-database / fdb_io.py

Full commit
#!/usr/bin/env python3

import yaml
from os.path import join
from os import makedirs, listdir

def key_to_paths(key,
                sites_dir="sites",
                states_dir="states"): 
    """Get the sites and states path to a given key.

    >>> key_to_paths("USK@123456/foo/bar")
    ('sites/USK@123456', 'states/USK@123456.state')
    """
    try: 
        keypath = key.split("/")[0]
    except IndexError:
        # no "/" in path. 
        keypath = key

    # ksk keys are in their own file.
    if keypath.startswith("KSK"):
        keypath = "KSK"

    return join(sites_dir, keypath), join(states_dir, keypath + ".state")


def key_to_subkey(key, latest_revision=False):
    """Split off the subkey from a key.

    >>> key_to_subkey("KSK@gpl.txt")
    'KSK@gpl.txt'
    >>> key_to_subkey("USK@123/foo/2/bar")
    '/foo/2/bar'
    >>> key_to_subkey("USK@123/foo/2/bar", latest_revision=True)
    '/foo/-1/bar'
    """
    if key.startswith("KSK"):
        return key
    try:
        if latest_revision: 
            try: 
                k = "/" + "/".join(key.split("/")[1:2]) + "/-1/" + "/".join(key.split("/")[3:])
                return k
            except IndexError:
                return "/" + "/".join(key.split("/")[1:2]) + "/-1/"
        return "/" + "/".join(key.split("/")[1:])    
    except IndexError:
        return "/"


def get_dataset_for_key(key,
                     sites_dir="sites",
                     states_dir="states"):
    """Get the data for the given key.

    >>> _test_clean_dirs()
    >>> sites, states = get_dataset_for_key("KSK@gpl.txt", sites_dir="test_sites", states_dir="test_states")
    >>> list(sites), list(states)
    ([], [])
    """

    sites_path, states_path = key_to_paths(key, sites_dir, states_dir)
   
    # sites data
    try: 
        with open(sites_path) as f: 
            yaml_data = f.read()
        sites = yaml.safe_load_all(yaml_data)
    except IOError:
        sites = (i for i in []) # empty generator

    # states data
    try: 
        with open(states_path) as f: 
            yaml_data = f.read()
        states = yaml.safe_load_all(yaml_data)
    except IOError:
        states = (i for i in []) # empty generator
    
    return sites, states


def put_dataset_for_key(key, sites, states,
                     sites_dir="sites",
                     states_dir="states"):
    """Store the data for a given key.

    >>> _test_create_test_dirs()
    >>> sites = [{1: 2}, {5:6}, {'path': 'yum'}]
    >>> states = [{5:7}]
    >>> put_dataset_for_key("KSK@foo.bar", sites, states, sites_dir="test_sites", states_dir="test_states")
    >>> si, st = get_dataset_for_key("KSK@foo.bar", sites_dir="test_sites", states_dir="test_states")
    >>> list(si) == sites
    True
    >>> list(st) == states
    True
    """

    sites_path, states_path = key_to_paths(key, sites_dir, states_dir)

    try:
        with open(sites_path, "w") as f:
            f.write(yaml.safe_dump_all(sites, default_flow_style=False))
    except IOError:
        raise IOError("Can’t write to the sites datafile. Please check if the sites folder exists:" + sites_dir)

    try:
        with open(states_path, "w") as f:
            f.write(yaml.safe_dump_all(states, default_flow_style=False))
    except IOError:
        raise IOError("Can’t write to the states datafile. Please check if the states folder exists:" + states_dir)


def get_all_datasets(sites_dir="sites",
                     states_dir="states"):
    """Store the data for a given key.

    >>> _test_clean_dirs()
    >>> _test_create_test_dirs()
    >>> get_all_datasets(sites_dir="test_sites", states_dir="test_states")
    ({}, {})
    >>> sites = [{1: 2}, {5:6}, {'path': 'yum'}]
    >>> states = [{5:7}]
    >>> put_dataset_for_key("USK@123/foo/bar", sites, states, sites_dir="test_sites", states_dir="test_states")
    >>> si, st = get_all_datasets(sites_dir="test_sites", states_dir="test_states")
    >>> si.keys(), st.keys()
    (dict_keys(['USK@123']), dict_keys(['USK@123.state']))
    """
    datasets_sites = {}
    datasets_states = {}
    for i in listdir(sites_dir):
        sites_path = join(sites_dir, i)
        # sites data
        try: 
            with open(sites_path) as f: 
                yaml_data = f.read()
            sites = yaml.safe_load_all(yaml_data)
        except IOError:
            sites = (i for i in []) # empty generator
        datasets_sites[i] = sites

    for i in listdir(states_dir):
        states_path = join(states_dir, i)
        # states data
        try: 
            with open(states_path) as f: 
                yaml_data = f.read()
            states = yaml.safe_load_all(yaml_data)
        except IOError:
            states = (i for i in []) # empty generator
        datasets_states[i] = states

    return datasets_sites, datasets_states


def _test_clean_dirs():
    """Cleanup the test folders before the tests."""
    from subprocess import call
    call(["rm", "-r", "test_sites", "test_states"])

def _test_create_test_dirs():
    """Create folders for testing."""
    makedirs("test_sites")
    makedirs("test_states")

def _test():
    from doctest import testmod
    testmod()

if __name__ == "__main__":
    from sys import argv
    if "--test" in argv: 
        _test()
        exit()