Commits

Thomas Waldmann  committed 1c69c4b

remove pycdb code + tests, not used any more

  • Participants
  • Parent commits 1dbbcb8
  • Branches storage-ng

Comments (0)

Files changed (2)

File MoinMoin/util/_tests/test_pycdb.py

-# -*- coding: utf-8 -*-
-# Copyright: 2011 Prashant Kumar <contactprashantat AT gmail DOT com>
-# License: GNU GPL v2 (or any later version), see LICENSE.txt for details.
-
-"""
-MoinMoin - MoinMoin.util.pycdb Tests
-"""
-
-import os
-import shutil, tempfile
-
-import pytest
-from MoinMoin.util import pycdb
-
-class TestCDBMaker(object):
-    """ Test: util.pycdb.CDBMaker """
-
-    def setup_method(self, method):
-        self.test_dir = tempfile.mkdtemp('', 'test_cdb')
-        self.test_tmpname = os.path.join(self.test_dir, "test_tmpfile")
-        self.test_cdbname = os.path.join(self.test_dir, "test_cdbfile")
-        self.CDBMaker_obj = pycdb.CDBMaker(self.test_cdbname, self.test_tmpname)
-
-    def teardown_method(self, method):
-        shutil.rmtree(self.test_dir)
-
-    def test_add(self):
-        result = os.listdir(self.test_dir)
-        result1 = self.CDBMaker_obj.__len__()
-        expected = ['test_tmpfile']
-        assert result == expected
-
-        self.CDBMaker_obj = self.CDBMaker_obj.add(' k_value - ', 'v_value')
-        self.CDBMaker_obj._fp = open(self.test_tmpname, 'r')
-        # seek to 2048 since self._pos was assigned to 2048 initially.
-        self.CDBMaker_obj._fp.seek(2048)
-        # read the contents i.e. newly added contents
-        result = self.CDBMaker_obj._fp.read()
-        expected = '\x0b\x00\x00\x00\x07\x00\x00\x00 k_value - v_value'
-        assert result == expected
-
-    def test_finish(self):
-        # add contents to cdb_file
-        self.CDBMaker_obj = self.CDBMaker_obj.add(' k_value - ', 'v_value')
-        # remove tmpfile
-        self.CDBMaker_obj.finish()
-        result = os.listdir(self.test_dir)
-        expected = ['test_cdbfile']
-        assert result == expected
-
-class TestCDBReader(object):
-    """ Test: util.pycdb.CDBReader """
-
-    def setup_method(self, method):
-        self.test_dir = tempfile.mkdtemp('', 'test_cdb')
-        self.test_tmpname = os.path.join(self.test_dir, "test_tmpfile")
-        self.test_cdbname = os.path.join(self.test_dir, "test_cdbfile")
-        self.CDBMaker_obj = pycdb.CDBMaker(self.test_cdbname, self.test_tmpname)
-        # add k and v
-        self.CDBMaker_obj = self.CDBMaker_obj.add(' k_value - ', 'v_value')
-
-    def teardown_method(self, method):
-        shutil.rmtree(self.test_dir)
-
-    def test_get(self):
-        # remove tmpfile
-        self.CDBMaker_obj.finish()
-
-        CDBReader_obj = pycdb.CDBReader(self.test_cdbname)
-        result = CDBReader_obj.get(' k_value - ', failed=None)
-        expected = 'v_value'
-        assert result == expected
-
-        # invalid key
-        result = CDBReader_obj.get('invalid_key', failed='no_such_value')
-        expected = 'no_such_value'
-        assert result == expected
-
-    def test_keys(self):
-        """ test all key realated functions """
-        # add next value
-        self.CDBMaker_obj = self.CDBMaker_obj.add(' k_value_next - ', 'v_value_next')
-        # remove tmpfile
-        self.CDBMaker_obj.finish()
-
-        CDBReader_obj = pycdb.CDBReader(self.test_cdbname)
-        # test: has_key
-        result = CDBReader_obj.has_key(' k_value - ')
-        assert result
-        # test: invalidkey
-        result = CDBReader_obj.has_key('not_present')
-        assert not result
-
-        # test: firstkey
-        result = CDBReader_obj.firstkey()
-        expected = ' k_value - '
-        assert result == expected
-
-        # test: nextkey
-        result = CDBReader_obj.nextkey()
-        expected = ' k_value_next - '
-        assert result == expected
-
-        # test: iterkeys
-        test_keys = CDBReader_obj.iterkeys()
-        result = []
-        [result.append(key) for key in test_keys]
-        expected = [' k_value - ', ' k_value_next - ']
-        assert expected == result
-
-    def test_itervalues(self):
-        # add next value
-        self.CDBMaker_obj = self.CDBMaker_obj.add(' k_value_next - ', 'v_value_next')
-        # remove tmpfile
-        self.CDBMaker_obj.finish()
-
-        CDBReader_obj = pycdb.CDBReader(self.test_cdbname)
-        test_values = CDBReader_obj.itervalues()
-        result = []
-        [result.append(value) for value in test_values]
-        expected = ['v_value', 'v_value_next']
-        assert expected == result
-
-    def test_iteritems(self):
-        # add next value
-        self.CDBMaker_obj = self.CDBMaker_obj.add(' k_value_next - ', 'v_value_next')
-        # remove tmpfile
-        self.CDBMaker_obj.finish()
-
-        CDBReader_obj = pycdb.CDBReader(self.test_cdbname)
-        test_items = CDBReader_obj.iteritems()
-        result = []
-        [result.append(item) for item in test_items]
-        expected = [(' k_value - ', 'v_value'), (' k_value_next - ', 'v_value_next')]
-        assert expected == result
-

File MoinMoin/util/pycdb.py

-# Copyright: Yusuke Shinyama (author)
-# Copyright: Johannes Berg (coding style fixes and tcdb removal)
-# License: Public Domain
-
-"""
-pycdb.py - Python implementation of cdb
-"""
-
-
-import os
-from struct import pack, unpack
-from array import array
-
-
-def cdbhash(s, n=0L):
-    """calc hash value with a given key"""
-    return reduce(lambda h, c: ((h * 33) ^ ord(c)) & 0xffffffffL, s, n + 5381L)
-
-if pack('=i', 1) == pack('>i', 1):
-    def decode(x):
-        a = array('I', x)
-        a.byteswap()
-        return a
-    def encode(a):
-        a.byteswap()
-        return a.tostring()
-else:
-    def decode(x):
-        a = array('I', x)
-        return a
-    def encode(a):
-        return a.tostring()
-
-
-def cdbiter(fp, eod):
-    kloc = 2048
-    while kloc < eod:
-        fp.seek(kloc)
-        (klen, vlen) = unpack('<II', fp.read(8))
-        k = fp.read(klen)
-        v = fp.read(vlen)
-        kloc += 8 + klen + vlen
-        yield (k, v)
-    fp.close()
-
-
-class CDBReader:
-    def __init__(self, cdbname, docache=1):
-        self.name = cdbname
-        self._fp = file(cdbname, 'rb')
-        hash0 = decode(self._fp.read(2048))
-        self._hash0 = [(hash0[i], hash0[i+1]) for i in xrange(0, 512, 2)]
-        self._hash1 = [None ] * 256
-        self._eod = hash0[0]
-        self._docache = docache
-        self._cache = {}
-        self._keyiter = None
-        self._eachiter = None
-
-    def close(self):
-        if self._fp:
-            self._fp.close()
-            self._fp = None
-
-    def __enter__(self):
-        return self
-
-    def __exit__(self, type, value, traceback):
-        self.close()
-
-    def __getstate__(self):
-        raise TypeError
-
-    def __setstate__(self, dict):
-        raise TypeError
-
-    def __getitem__(self, k):
-        k = str(k)
-        if k in self._cache:
-            return self._cache[k]
-        h = cdbhash(k)
-        h1 = h & 0xff
-        (pos_bucket, ncells) = self._hash0[h1]
-        if ncells == 0:
-            raise KeyError(k)
-        hs = self._hash1[h1]
-        if hs is None:
-            self._fp.seek(pos_bucket)
-            hs = decode(self._fp.read(ncells * 8))
-            self._hash1[h1] = hs
-        i = ((h >> 8) % ncells) * 2
-        n = ncells * 2
-        for _ in xrange(ncells):
-            p1 = hs[i + 1]
-            if p1 == 0: raise KeyError(k)
-            if hs[i] == h:
-                self._fp.seek(p1)
-                (klen, vlen) = unpack('<II', self._fp.read(8))
-                k1 = self._fp.read(klen)
-                if k1 == k:
-                    v1 = self._fp.read(vlen)
-                    if self._docache:
-                        self._cache[k] = v1
-                    return v1
-            i = (i + 2) % n
-        raise KeyError(k)
-
-    def get(self, k, failed=None):
-        try:
-            return self.__getitem__(k)
-        except KeyError:
-            return failed
-
-    def has_key(self, k):
-        try:
-            self.__getitem__(k)
-            return True
-        except KeyError:
-            return False
-
-    def __contains__(self, k):
-        return self.has_key(k)
-
-    def firstkey(self):
-        self._keyiter = None
-        return self.nextkey()
-
-    def nextkey(self):
-        if not self._keyiter:
-            self._keyiter = (k for (k, v) in cdbiter(self._fp, self._eod))
-        try:
-            return self._keyiter.next()
-        except StopIteration:
-            return None
-
-    def each(self):
-        if not self._eachiter:
-            self._eachiter = cdbiter(self._fp, self._eod)
-        try:
-            return self._eachiter.next()
-        except StopIteration:
-            return None
-
-    def iterkeys(self):
-        return (k for (k, v) in cdbiter(self._fp, self._eod))
-
-    def itervalues(self):
-        return (v for (k, v) in cdbiter(self._fp, self._eod))
-
-    def iteritems(self):
-        return cdbiter(self._fp, self._eod)
-
-
-class CDBMaker:
-    def __init__(self, cdbname, tmpname):
-        self.fn = cdbname
-        self.fntmp = tmpname
-        self.numentries = 0
-        self._fp = file(tmpname, 'wb')
-        self._pos = 2048
-        self._bucket = [array('I') for _ in xrange(256)]
-
-    def close(self):
-        # you don't need to call this if you called finish()
-        if self._fp:
-            self._fp.close()
-            self._fp = None
-
-    def __enter__(self):
-        return self
-
-    def __exit__(self, type, value, traceback):
-        self.finish()
-
-    def __len__(self):
-        return self.numentries
-
-    def __getstate__(self):
-        raise TypeError
-
-    def __setstate__(self, dict):
-        raise TypeError
-
-    def add(self, k, v):
-        (k, v) = (str(k), str(v))
-        (klen, vlen) = (len(k), len(v))
-        self._fp.seek(self._pos)
-        self._fp.write(pack('<II', klen, vlen))
-        self._fp.write(k)
-        self._fp.write(v)
-        h = cdbhash(k)
-        b = self._bucket[h % 256]
-        b.append(h)
-        b.append(self._pos)
-        # sizeof(keylen)+sizeof(datalen)+sizeof(key)+sizeof(data)
-        self._pos += 8 + klen + vlen
-        self.numentries += 1
-        return self
-
-    def finish(self):
-        self._fp.seek(self._pos)
-        pos_hash = self._pos
-        # write hashes
-        for b1 in self._bucket:
-            if not b1: continue
-            blen = len(b1)
-            a = array('I', [0] * blen * 2)
-            for j in xrange(0, blen, 2):
-                (h, p) = (b1[j], b1[j+1])
-                i = ((h >> 8) % blen) * 2
-                while a[i + 1]:
-                    i = (i + 2) % len(a)
-                a[i] = h
-                a[i + 1] = p
-            self._fp.write(encode(a))
-        # write header
-        self._fp.seek(0)
-        a = array('I')
-        for b1 in self._bucket:
-            a.append(pos_hash)
-            a.append(len(b1))
-            pos_hash += len(b1) * 8
-        self._fp.write(encode(a))
-        self.close()
-        os.rename(self.fntmp, self.fn)
-
-cdbmake = CDBMaker
-init = CDBReader