nx-web / nxweb / data / dkvs / __init__.py

import riak
import time

from nxweb.data.dkvs.exception import InvalidId


_server = None


def initialize(**settings):
    host = settings['host']
    port = settings['port']

    global _server
    _server = {'port': port}
    if host is not None:
        _server['host'] = host


def make_riak_client():
    return riak.RiakClient(**_server)


def validate_id(ident):
    if '+' in ident:
        raise InvalidId('Object Id `{0}` can not contain the symbol `+`'.format(ident))


def compose_key(*ids):
    for _id in ids:
        validate_id(_id)

    return '+'.join(ids)


def split_key(key):
    return key.split('+')


def _make_bucket_getter(bucket_name):
    def get_bucket():
        return make_riak_client().bucket(bucket_name)
    return get_bucket


BUCKETS = set([
    'user',
    'program_metadata',
    'program_binary',
    'task_metadata',
    'task_input_data_chunk',
    'task_output_data_chunk',
    'pubkey',
    'attribute',
])

BUCKET_GETTERS = {name: _make_bucket_getter(name) for name in BUCKETS}

def delete_all_buckets():
    for bucket in BUCKETS:
        print "Deleting " + repr(bucket)
        delete_all_keys(bucket)

def delete_all_keys(bucket_name, sleeptime=2, num_pause=100):
    global _server
    #print "###if keeps crashing run 'ulimit -n 4096' first to change the limit of open files"
    bucket = BUCKET_GETTERS.get(bucket_name)()
    #keys = get_all_keys(bucket_name)
    #keys = bucket.get_keys()

    host = _server.get('host')
    if host is None:
        host = "localhost"
    port = _server.get('port')

    import StringIO
    import pycurl
    import json
    c =  pycurl.Curl()
    c.setopt(pycurl.URL, "http://%s:%s/riak/%s?keys=true&props=false" % (host,port,bucket_name))

    b = StringIO.StringIO()
    c.setopt(pycurl.WRITEFUNCTION, b.write)
    c.setopt(pycurl.FOLLOWLOCATION, 1)
    c.setopt(pycurl.MAXREDIRS, 5)
    c.perform()
    data = b.getvalue()
    data = json.loads(data)
    keys = data.get('keys')

    length = len(keys)
    i = 0
    print "Deleting %s Items" % length
    for key in keys:
        print "Deleting from bucket %s; %s Done" % (bucket_name, (i * 100) / length)
        i = i + 1
        if i % num_pause == 0:
            time.sleep(sleeptime)
        bucket.get(key).delete()


def get_all_keys(bucket_name):
    varmap = """function(v) {
        return [v.key];
    }"""
    riak_client = make_riak_client()
    query = riak.RiakMapReduce(riak_client)
    query.add(bucket_name)
    query.map(varmap)
    keys = query.run()
    return keys
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.