Commits

Anonymous committed 6e0cd48

Fixing up tests

  • Participants
  • Parent commits 88ee706

Comments (0)

Files changed (4)

File dragoman/lib/httpcache.py

 	def delete(self, key):
 		cached_key = self.safe(key)
 		if self.db.has_key(cached_key):
-			del self.db[cached_key]
+			self.db.__delitem__(cached_key)
 			self.db.sync()
 
 

File dragoman/lib/storage.py

         return LanguageList.get(lang)
 
     def __init__(self, catalog, lang):
+        self.extra_keys = [
+            'extracted_comment', 'reference', 'flag',
+            'previous_untranslated_string',
+        ]
         self.db = get_db()
         self.lang = lang
         self.catalog = catalog
             self.import_po(env.path(defaults[self.name]))
         else:
             self.import_po(env.path(defaults['default']))
-            
         
     def import_po(self, po):
         '''Imports a po file on the filesystem into the current catalog'''
-        extra_keys = [
-            'extracted_comment', 'reference', 'flag',
-            'previous_untranslated_string',
-        ]
         po = GettextCatalog(po, None)
         for m in po.messages:
-            extra = [getattr(po, k) for k in extra_keys if hasattr(po, k)]
-            extra = dict(zip(extra_keys, extra))
+            extra = [getattr(po, k) for k in self.extra_keys if hasattr(po, k)]
+            extra = dict(zip(self.extra_keys, extra))
             self.add(m['msgid'], m['msgstr'], extra)

File dragoman/lib/test/test_httpcache.py

 import os
 import py.test
-from dragoman.lib.httpcache import BsddbCache
+from dragoman.lib import httpcache
+from dingus import DontCare, returner, Dingus
+from dragoman.lib.testing import DingusTestCase, mock, restore
 from hashlib import md5
 
 __here__ = os.path.dirname(os.path.abspath(__file__))
 
-class TestHttpCache(object):
+class TestHttpCache(DingusTestCase(httpcache.BsddbCache)):
+	def test_init(self):
+		cache = httpcache.BsddbCache('foo')
+		assert cache.safe
+		assert httpcache.bsddb.calls('btopen', ('foo'))
+
+	def test_get(self):
+		safe = Dingus(return_value='safe foo')
+		cache = httpcache.BsddbCache('foo', safe=safe)
+		result = cache.get('bar')
+		assert safe.calls('()', ('bar'))
+		assert cache.db.calls('has_key', ('safe foo'))
+		assert cache.db.calls('__getitem__', ('safe foo'))
+
+	def test_set(self):
+		safe = Dingus(return_value='safe bar')
+		cache = httpcache.BsddbCache('foo', safe=safe)
+		cache.set('bar', 'baz')
+		args = ('safe bar', 'baz')
+		assert cache.db.calls('__setitem__').once().args == args
+		assert cache.db.calls('sync')
+
+	def test_delete(self):
+		safe = Dingus(return_value='safe bar')
+		cache = httpcache.BsddbCache('foo', safe=safe)
+		cache.delete('bar')
+		assert cache.db.calls('has_key', ('safe bar'))
+		# force an explicit __delitem__ call instead of del db[key]
+		assert cache.db.calls('__delitem__', ('safe bar'))
+		assert cache.db.calls('sync')
+		
+
+class TestHttpCacheBsdDB(object):
 	def setup_class(self):
 		dbfile = os.path.join(__here__, 'httpcache.db')
 		if os.path.exists(dbfile):
 			os.remove(dbfile)
-		self.cache = BsddbCache(dbfile)
+		self.cache = httpcache.BsddbCache(dbfile)
 
 	def safe(self, key):
 		return '%s,%s' % (key, md5(key).hexdigest())

File dragoman/lib/test/test_storage.py

 
 class TestLanguageList(DingusTestCase(storage.LanguageList)):
 
-    def test_get(self):
-        storage.get_db.reset()
+    def test_successfull_get(self):
+        expected = {'abbrev': 'foo',
+                    'name': 'bar'}
+        db = Dingus(find_one__returns=expected)
+        storage.get_db = returner({storage.LanguageList.collection: db})
         result = storage.LanguageList.get('foo')
-        db = LikeDingus()
-        result = db()['config.languages'].find_one({'abbrev': 'foo'})
-        result['name']
-        assert result.like(storage.get_db)
+        assert result == expected['name']
 
-    def test_add(self):
-        storage.get_db.reset()
-        expected_db = LikeDingus()
-        expected_db.like(storage.get_db)
-        
-        langs = storage.LanguageList()
-        langs.add('en_PIGLATIN', 'English Piglatin')
+    def test_failed_get(self):
+        db = Dingus(find_one__returns=None)
+        storage.get_db = returner({storage.LanguageList.collection: db})
+        result = storage.LanguageList.get('foo')
+        assert result == None
+
+    def test_add_when_not_there(self):
         data = {
             'abbrev': 'en_PIGLATIN',
             'name': 'English Piglatin',
         }
-        expected_db()['config.languages'].find(data).count()
-        assert expected_db.like(storage.get_db)
 
-    def test_delete(self):
-        storage.get_db.reset()
+        # will return 0 so we add
+        langs = storage.LanguageList()
+        db = Dingus(find__returns=Dingus(count__returns=0))
+        langs.db = {storage.LanguageList.collection: db}
+        langs.add(data['abbrev'], data['name'])
+        assert db.calls('find', (data))
+        assert db.calls('insert', (data))
+
+    def test_add_when_already_exists(self):
+        data = {
+            'abbrev': 'en_PIGLATIN',
+            'name': 'English Piglatin',
+        }
+
+        # will return 1 so we have a no-op
+        db = Dingus(find__returns=Dingus(count__returns=1))
+        storage.get_db = returner({storage.LanguageList.collection: db})
+        langs = storage.LanguageList()
+        langs.add('en_PIGLATIN', 'English Piglatin')
+        assert not db.calls('insert', (data,))
+
+    def test_delete_when_exists(self):
+        db = Dingus(find__returns=['foo_obj'])
+        storage.get_db = returner({storage.LanguageList.collection: db})
         langs = storage.LanguageList()
         langs.delete('foo')
-        db = LikeDingus()
-        db = db()['config.languages']
-        db.find({'abbrev': 'foo'}).count()
-        db.remove(DontCare)
-        assert db.like(storage.get_db)
+        assert db.calls('find', ({'abbrev': 'foo'}))
+        assert db.calls('remove', ('foo_obj'))
 
 
 class MockMongoCatalogList(Dingus):
     def teardown_class(self):
         restore('get_db', storage)
     
-    def test_catalogs(self):
+    def test_catalogs_classmethod(self):
         cl = storage.CatalogList()
         # verifies the order of the languages keys
-
         assert cl.catalogs == defaultdict(list, {
             'bar': ['piglatin'],
             'foo': ['es', 'piglatin'],
         assert sorted(cl.catalogs.keys()) == ['bar', 'baz']
 
     def test_add(self):
-        mock('Catalog', Dingus('mock_catalog'), storage)
+        mock_catalog = Dingus()
+        catalog_cls = Dingus(return_value=mock_catalog)
+
+        # replace the class with my dingus
+        mock('Catalog', catalog_cls, storage)
+
         cl = storage.CatalogList()
         result = cl.add('test_catalog_list')
 
-        # make sure we created the catalog in the DB with a placeholder
-        # of piglatin
-        catalog = LikeDingus()
-        catalog = catalog('test_catalog_list', storage.PLACEHOLDER)
-        catalog.initialize()
-        assert catalog.like(result)
+        # pprint(catalog_cls.calls('__call__', ('test_catalog_list', 'piglatin')))
+        # I'd like to make this assertion but there is a bug in dingus for this
+        # assert catalog_cls.calls('()', ('test_catalog_list', storage.PLACEHOLDER))
+        assert mock_catalog.calls('initialize')
         restore('Catalog', storage)
 
-
-
 class MockMongoCatalog(Dingus):
     pass
 
 
     def test_percent_decorator(self):
         assert self.return_unquoted_value('Hello%20World') == 'Hello World'
-        
 
-class TestCatalog(object):
-    def setup_class(self):
-        self.mock_db = returner(MockMongoCatalog())
-        mock('get_db', self.mock_db, storage)
 
-    def teardown_class(self):
-        restore('get_db', storage)
-    
-    def test_constructor(self):
+def noop(f):
+    def dec(*args, **kw):
+        return f(*args, **kw)
+    return dec
+
+
+class TestCatalog(DingusTestCase(storage.Catalog)):
+
+    def test_init_creates_index(self):
+        db = Dingus(find__returns=Dingus(return_value=5))
+        storage.get_db = returner({'foo.es': db})
+
         c = storage.Catalog('foo', 'es')
-        assert c
         assert c._collection == 'foo.es'
+        assert db.calls('find')
+        assert db.calls('ensure_index', ('phrase'))
 
-    def test_add_phrase(self):
+    def test_init_skips_index_when_no_docs(self):
+        find_result = Dingus(count__returns=0)
+        db = Dingus(find__returns=find_result)
+        storage.get_db = returner({'foo.es': db})
+
         c = storage.Catalog('foo', 'es')
-        c.db.reset()
+        assert c._collection == 'foo.es'
+        assert db.calls('find')
+        assert not db.calls('ensure_index', ('phrase'))
 
-        # make sure there isn't an original
-        c._get = lambda x: False
-		
-        # adds uses an percent encoded phrase
-        c.add('Hello%20World', 'Hola Globo')
+    def test_add_phrase_with_insert(self):
+        data = {
+            'phrase': DontCare(),
+            'translation': 'a dingus',
+            'encoding': 'utf-8',
+        }
 
-        db = LikeDingus()
+        db = Dingus()
+        storage.get_db = returner({'foo.es': db})
+        storage.percent_encoding = noop
+        c = storage.Catalog('foo', 'es')
+        # we make sure there isn't an existing doc in order to force
+        # an insert
+        c._get = Dingus(return_value=False) 
+        c.add('un dingus', data['translation'])
+        assert db.calls('insert')
 
-        # finds matches before adding
-        docs = db[c._collection].find_one({'phrase': 'Hello World'})
+    def test_add_phrase_with_update(self):
+        data = {
+            'phrase': DontCare(),
+            'translation': 'a dingus',
+            'encoding': 'utf-8',
+        }
 
-        # update the original or insert
-        orig = docs.pop()
-        orig.update(DontCare)
-        assert orig.like(c.db)
+        db = Dingus()
+        storage.get_db = returner({'foo.es': db})
+        storage.percent_encoding = noop
+        c = storage.Catalog('foo', 'es')
+        # we make sure there isn't an existing doc in order to force
+        # an insert
+        c._get = Dingus(return_value=Dingus())
+        c._update = Dingus()
+        c.add('un dingus', data['translation'])
+        assert not db.calls('insert')
+        assert c._update.calls('()').once().args[1]['translation'] == data['translation']
+        assert c._update.calls('()').once().args[1]['encoding'] == data['encoding'] 
 
-    def test_get_phrase(self):
+    def test_public_phrase_update(self):
+        db = Dingus()
+        priv_update = Dingus()
+        storage.Catalog._update = priv_update
+        storage.percent_encoding = noop
         c = storage.Catalog('foo', 'es')
-        c.db.reset()
-        c.get('Hello%20World')
-        db = LikeDingus()
+        c.db = {'foo.es': db}        
+        c.update('foo', 'bar')
+        assert priv_update.calls('()').once().args[1] == 'bar'
 
-        # find the doc and return the translation
-        docs = list(db[c._collection].find({'phrase': 'Hello World'}))
-        doc = docs.pop()
-        doc.get(u'translation')
-        assert doc.like(c.db)
-
-    def test_update_phrase(self):
+    def test_public_get(self):
         c = storage.Catalog('foo', 'es')
-        c.db.reset()
-        doc_id = '4b72ff387621054e21000507' # sample
-        doc = {
-            'phrase': 'foo',
-            'translations': 'bar',
-        }
-        c.update(doc_id, doc)
-
-        # use the mongodb _id to update the value
-        db = LikeDingus()
-        query = {'_id': ObjectId(doc_id)}
-        db[c._collection].update(query, {'$set': doc})
-        assert db.like(c.db)
+        doc = Dingus()
+        c._get = Dingus(return_value=doc)
+        val = c.get('foo')
+        assert doc.calls('get', ('translation'))
 
     def test_phrases_property(self):
+        docs = Dingus('result_docs')
+        db = Dingus(find__returns=docs)
+        imap = storage.imap
+        storage.get_db = returner({'foo.es': db})
         c = storage.Catalog('foo', 'es')
-        c.db.reset()
-        phrases = c.phrases
+        p = c.phrases
+        # make sure we're sorting on the phrase, imapping the sorted
+        # result with our safe_doc
+        assert db.calls('find', ({'phrase': {'$ne': ''}}))
+        assert docs.calls('sort', ('phrase'))
+        assert imap.calls('()').once().args[0] == c._safe_doc
 
-        db = LikeDingus()
-        # finds the non-empty phrases, sort on phrases
-        db[c._collection].find({'phrase': {'$ne': ''}}).sort('phrase')
-        assert db.like(c.db)
+    def test_find(self):
+        db = Dingus(find__returns='find_returns_foo')
+        imap = storage.imap
+        c = storage.Catalog('foo', 'es')
+        c.db = {'foo.es': db}
+        c.find('bar')
+        pprint(db.calls())
+        assert db.calls('find', ('bar'))
+        assert imap.calls('()').once().args[0] == c._safe_doc
+        assert imap.calls('()').once().args[1] == 'find_returns_foo'
 
-    def test_raw_find(self):
+    def test_batch(self):
+        json_doc = {
+            'foo': [
+                { 'phrase': 'hello',
+                  'translation': 'hola',
+                  'bar': 'baz' },
+                { 'phrase': 'bye',
+                  'translation': 'luego' }
+            ]
+        }
         c = storage.Catalog('foo', 'es')
-        c.db.reset()
-        q = {'foo': 'bar'}
-        result = c.find(q)
-        assert hasattr(result, '__iter__')
-        db = LikeDingus()
-        db[c._collection].find(q)
-        assert db.like(c.db)
+        c.add = Dingus()
+        c.batch(json_doc)
+        assert c.add('()', ('hello', 'hola'), {'bar': 'baz'})
+        assert c.add('()', ('bye', 'luego'))
+        
+    def test_delete(self):
+        db = Dingus()
+        c = storage.Catalog('foo', 'es')
+        c.db = db
+        c.delete()
+        assert db.calls('drop_collection', ('foo.es'))
 
+    def test_initialize_with_specific_catalog(self):
+        defaults = {'foo': 'path/to/default/po'}
+        env = Dingus(get__returns=defaults,
+                     path__returns='full path')
+        storage.env = env
+        c = storage.Catalog('foo', 'es')
+        c.import_po = Dingus()
+        c.initialize()
+        pprint(env.calls())
+        # This means we got the defaults from the env and use the path
+        # method on the env to get a full path
+        assert c.import_po.calls('()', ('full path'))
 
-    def test_delete(self):
+    def test_initialize_with_default_catalog(self):
+        defaults = {'default': 'path/to/default/po'}
+        env = Dingus(get__returns=defaults,
+                     path__returns='full path')
+        storage.env = env
         c = storage.Catalog('foo', 'es')
-        c.db.reset()
-        c.delete()
-        assert c.db.calls('drop_collection', (c._collection))
+        c.import_po = Dingus()
+        c.initialize()
+        pprint(env.calls())
+        # This means we got the defaults from the env and use the path
+        # method on the env to get a full path
+        assert c.import_po.calls('()', ('full path'))
+
+    def test_import_po(self):
+        gtc = returner(Dingus(
+            ext_key='baz',
+            messages=[
+                {'msgid': 'foo', 'msgstr': 'bar'}
+            ]
+        ))
+        storage.GettextCatalog = gtc
+        add = Dingus()
+        storage.Catalog.add = add
+        c = storage.Catalog('foo', 'es')
+        c.extra_keys = ['ext_key']
+        c.import_po('foo')
+        assert add.calls('()').once().args == ('foo', 'bar', {'ext_key': 'baz'})
+