Snippets

Kunzhipeng 用SSDB做后端实现的缓存方案

Created by Qi Peng last modified
__doc__ = """
ssdb_cache has a dictionary like interface and a SSDB(http://ssdb.io/) backend
It uses pickle to store Python objects and strings
Multithreading is supported

By qi@site-digger.com 西安鲲之鹏网络信息技术有限公司
"""
import datetime
# pip install pyssdb --upgrade
import pyssdb
try:
    import cPickle as pickle
except ImportError:
    import pickle

class CACHE:
    """Stores and retrieves persistent data through a dict-like interface
    Data is stored on disk using SSDB

    cache_name/filename: 
        The name of Hashmap, use this name "filename" to be compatible with PersistentDict in webscraping library
    expires: 
        A timedelta object of how old data can be before expires. By default is set to None to disable. 
    host:
        The SSDB server host
    port:
        The SSDB server port
    """
    def __init__(self, cache_name=None, filename=None, expires=None, host='127.0.0.1', port=8888):
        """initialize a new CACHE
        """
        self.cache_name = filename or cache_name or 'cache'
        self.expires = expires
        self.ssdb_host = host
        self.ssdb_port = port
        self.ssdb_client = pyssdb.Client(host=self.ssdb_host, port=self.ssdb_port)

    def __copy__(self):
        """make a copy of current cache settings
        """
        return CACHE(cache_name=self.cache_name, expires=self.expires, host=self.ssdb_host, port=self.ssdb_port)

    def __contains__(self, key):
        """check the database to see if a key exists
        """
        return self.ssdb_client.hexists(self.cache_name, key)

    def __getitem__(self, key):
        """return the value of the specified key or raise KeyError if not found
        """
        value_meta_update = self.ssdb_client.hget(self.cache_name, key)
        if value_meta_update is not None:
            value_meta_update = self.deserialize(value_meta_update)
            if self.is_fresh(value_meta_update.get('updated')):
                return value_meta_update.get('value')
            else:
                raise KeyError("Key `%s' is stale" % key)
        else:
            raise KeyError("Key `%s' does not exist" % key)

    def __delitem__(self, key):
        """remove the specifed value from the database
        """
        self.ssdb_client.hdel(self.cache_name, key)

    def __setitem__(self, key, value):
        """set the value of the specified key
        """
        value_meta_update = {'value': value, 'meta': None, 'updated': datetime.datetime.now()}
        self.ssdb_client.hset(self.cache_name, key, self.serialize(value_meta_update))
        
    def __len__(self):
        """get the number of elements in CACHE
        """
        return self.ssdb_client.hsize(self.cache_name)        

    def serialize(self, value):
        """convert object to a pickled string to save in the db
        """
        return pickle.dumps(value, protocol=pickle.HIGHEST_PROTOCOL)
    
    def deserialize(self, value):
        """convert pickled string from database back into an object
        """
        return pickle.loads(value)

    def get(self, key, default=None):
        """Get data at key and return default if not defined
        """
        data = default
        if key:
            value_meta_update = self.ssdb_client.hget(self.cache_name, key)
            if value_meta_update is not None:
                return self.deserialize(value_meta_update)
        return data

    def meta(self, key, value=None):
        """Get / set meta for this value

        if value is passed then set the meta attribute for this key
        if not then get the existing meta data for this key
        """
        if value is None:
            # want to get meta
            value_meta_update = self.ssdb_client.hget(self.cache_name, key)
            if value_meta_update is not None:
                return self.deserialize(value_meta_update).get('meta')
            else:
                raise KeyError("Key `%s' does not exist" % key)
        else:
            # want to set meta
            value_meta_update = self.ssdb_client.hget(self.cache_name, key)
            if value_meta_update is not None:
                value_meta_update = self.deserialize(value_meta_update)
                value_meta_update['meta'] = value
                value_meta_update['updated'] = datetime.datetime.now()
                self.ssdb_client.hset(self.cache_name, key, self.serialize(value_meta_update))

    def clear(self):
        """Clear all cached data in this collecion
        """
        self.ssdb_client.hclear(self.cache_name)
        
    def is_fresh(self, t):
        """returns whether this datetime has expired
        """
        return self.expires is None or datetime.datetime.now() - t < self.expires 
    
if __name__ == '__main__':
    cache = CACHE(cache_name='test')
    print 'Name' in cache
    cache['Name'] = 'Peng Qi'
    print cache['Name']
    print cache.get('Name')
    cache.meta('Name', {'Age': 29})
    print cache.meta('Name')
# coding: utf-8
# ssdb_cache_test.py

import time
import threading
import ssdb_cache

def test_one_thread_write():
    """单线程写测试
    """
    print '单线程写测试'
    html = '0123456789' * 1024
    cache = ssdb_cache.CACHE()
    start = time.time()
    print 'Start time: %s, size of per document: %s' % (time.strftime('%Y-%m-%d %H:%M:%S'), len(html))
    for i in range(1000000):
        cache['page-%s' % i] = html
    print 'End time: %s, seconds passed: %s' % (time.strftime('%Y-%m-%d %H:%M:%S'), time.time() - start)

def test_one_thread_read():
    """单线程读测试
    """
    print '单线程读测试'    
    cache = ssdb_cache.CACHE()
    start = time.time()
    print 'Start time: %s' % time.strftime('%Y-%m-%d %H:%M:%S')
    for i in range(1000000):
        html = cache['page-%s' % i]
    print 'End time: %s, seconds passed: %s' % (time.strftime('%Y-%m-%d %H:%M:%S'), time.time() - start)

def test_multiple_threads_write():
    """10线程写测试
    """
    print '10线程写测试'     
    html = '0123456789' * 1024
    start = time.time()
    cache = ssdb_cache.CACHE()
    def worker(index):
        for i in range(1, 100001):
            cache['page-%s-%s' % (index, i)] = html
        print index, i
    print 'Start time: %s, size of per document: %s' % (time.strftime('%Y-%m-%d %H:%M:%S'), len(html))
    threads = [threading.Thread(target=worker, args=(i,)) for i in range(10)]
    # Start threads one by one         
    for thread in threads: 
        thread.start()
    # Wait for all threads to finish
    for thread in threads:
        thread.join()
    print 'End time: %s, seconds passed: %s' % (time.strftime('%Y-%m-%d %H:%M:%S'), time.time() - start)

def test_multiple_threads_read():
    """10线程读测试
    """
    print '10线程读测试'     
    start = time.time()
    cache = ssdb_cache.CACHE()
    def worker(index):
        for i in range(1, 100001):
            try:
                html = cache['page-%s-%s' % (index, i)]
            except KeyError:
                pass
        print index, i
    print 'Start time: %s' % time.strftime('%Y-%m-%d %H:%M:%S')
    threads = [threading.Thread(target=worker, args=(i,)) for i in range(10)]
    # Start threads one by one         
    for thread in threads: 
        thread.start()
    # Wait for all threads to finish
    for thread in threads:
        thread.join()
    print 'End time: %s, seconds passed: %s' % (time.strftime('%Y-%m-%d %H:%M:%S'), time.time() - start)
    
if __name__ == '__main__':
    test_one_thread_write()
    test_one_thread_read()
    test_multiple_threads_write()
    test_multiple_threads_read()

Comments (0)