1. Dhananjay Nene
  2. pybuckets-old

Commits

Dhananjay Nene  committed baccadd

Initial checkin of testcases (still need some refactoring)

  • Participants
  • Parent commits ebbf2ab
  • Branches default

Comments (0)

Files changed (3)

File data/files/python.png

  • Ignore whitespace
Added
New image

File data/files/ubuntu.png

  • Ignore whitespace
Added
New image

File test/test.py

View file
  • Ignore whitespace
+from pybuckets.pybuckets import BucketServer
+from unittest import TestCase
+import pybuckets.botos3
+import pybuckets.rscloudfiles
+import pybuckets.local
+import pybuckets.mongodb
+import pybuckets.pyb_couchdb
+import datetime
+import json
+import filecmp
+import os
+from conf import *
+
+##############################################################
+#   NOTE :
+#   You will need a separate conf.py in the same directory as
+#   test.py with the following variables defined
+#
+##############################################################
+# aws_access_key_id = ...
+# aws_secret_access_key = ...
+# bucket_name = ...
+#
+#
+# cf_username=...
+# cf_api_key=...
+##############################################################
+
+class TestBuckets(TestCase):
+    
+    def test_one(self):            
+#        bucket_server = BucketServer.get_server('botos3',aws_access_key_id= aws_access_key_id,aws_secret_access_key=aws_secret_access_key)
+#        bucket_server = BucketServer.get_server('cloudfiles',username= cf_username,api_key=cf_api_key)
+        bucket_server = BucketServer.get_server('localfs',path='../data/localfs')
+
+        for current_bucket_name in bucket_server.keys() :
+            print 'Exists bucket %s' % current_bucket_name
+            
+        if not bucket_name in bucket_server.keys() : bucket_server.create_bucket(bucket_name)
+        
+        self.assertEquals(bucket_name in bucket_server.keys(),True)
+        
+        bucket = bucket_server[bucket_name]
+        
+        for key in bucket :
+            print 'Deleting existing bucket key %s' % key
+            del bucket[key]
+        keys_added = []
+            
+        self.assertEquals(bucket.keys(),[])
+        
+        if bucket_server.supports_mime_type('text/plain') :
+            bucket['foo'] = 'bar'
+            self.assertEquals(bucket['foo'],'bar')
+            keys_added.append('foo')
+            
+        if bucket_server.supports_mime_type('image/png') :
+            src, target = (('../data/%s/python.png' % dir) 
+                           for dir in ('files','tmp'))
+            bucket('python_logo') << src
+            bucket('python_logo') >> target
+            self.assertEquals(filecmp.cmp(src,target),True)
+            os.remove(target)
+            keys_added.append('python_logo')
+
+            src, target = (('../data/%s/ubuntu.png' % dir) 
+                           for dir in ('files','tmp'))
+            with open(src,'r') as f:
+                bucket('ubuntu_logo') << f 
+            with open(target,'w') as f:
+                bucket('ubuntu_logo') >> f 
+            self.assertEquals(filecmp.cmp(src,target),True)
+            os.remove(target)
+            keys_added.append('ubuntu_logo')
+        
+        if bucket_server.supports_mime_type('application/json') :
+            json_str = json.dumps([{'hello':'world','foo':'bar'},'baz',['lorem','ipsum']])
+            bucket['json_str'] = json_str
+            self.assertEquals(bucket['json_str'],json_str)
+            keys_added.append('json_str')
+
+        self.assertEquals(sorted(keys_added),sorted(bucket.keys()))
+        
+        for key in bucket.keys() :
+            del bucket[key]
+            
+        self.assertEquals(len(bucket.keys()),0)
+
+
+    def testTwo(self):
+#        bucket_server = BucketServer.get_server('mongodb',host='localhost',port=27017,database_name='testdb')
+        bucket_server = BucketServer.get_server('couchdb',url='http://localhost:5984')
+        for current_bucket_name in bucket_server.keys() :
+            print 'Exists bucket %s' % current_bucket_name
+            
+        if not bucket_name in bucket_server.keys() : bucket_server.create_bucket(bucket_name)
+        self.assertEquals(bucket_name in bucket_server.keys(),True)
+        
+        bucket = bucket_server[bucket_name]
+        
+        for key in bucket :
+            print 'Deleting existing bucket key %s' % key
+            del bucket[key]
+        keys_added = []
+
+        self.assertEquals(len(bucket.keys()),0)
+        
+        if bucket_server.supports_mime_type('text/plain') :
+            bucket['foo'] = 'bar'
+            self.assertEquals(bucket['foo'],'bar')
+            keys_added.append('foo')
+            
+        if bucket_server.supports_mime_type('application/json') :
+            obj = {'hello':'world','foo':'bar'}
+            json_str = json.dumps([{'hello':'world','foo':'bar'},'baz',['lorem','ipsum']])
+            bucket['json_str'] = json.dumps(obj)
+            self.assertEquals(bucket['json_str'],obj)
+            keys_added.append('json_str')
+
+        self.assertEquals(sorted(keys_added),sorted(bucket.keys()))
+        
+       
+