Commits

Sam Skillman committed 269d51f

Attempting to add local storage of results.

Comments (0)

Files changed (1)

yt/utilities/answer_testing/framework.py

 from yt.mods import *
 from yt.data_objects.static_output import StaticOutput
 import cPickle
+import shelve
 
 from yt.utilities.logger import disable_stream_logging
 from yt.utilities.command_line import get_yt_version
             help="The name we'll call this set of tests")
         parser.add_option("--answer-store", dest="store_results",
             default=False, action="store_true")
+        parser.add_option("--local-store", dest="store_local_results",
+            default=False)
 
     def configure(self, options, conf):
         super(AnswerTesting, self).configure(options, conf)
             # Now we grab from our S3 store
             if options.compare_name == "latest":
                 options.compare_name = _latest
+        if options.store_local_results:
             AnswerTestingTest.reference_storage = \
-                AnswerTestOpener(options.compare_name)
+                self.storage = AnswerTestLocalStorage(options.compare_name)
+        else:
+            AnswerTestingTest.reference_storage = \
+                self.storage = AnswerTestCloudStorage(options.compare_name)
+
         self.answer_name = options.this_name
         self.store_results = options.store_results
+        self.store_local_results = options.store_local_results
         global run_big_data
         run_big_data = options.big_data
 
     def finalize(self, result):
-        # This is where we dump our result storage up to Amazon, if we are able
-        # to.
         if self.store_results is False: return
-        import boto
-        from boto.s3.key import Key
-        c = boto.connect_s3()
-        bucket = c.get_bucket("yt-answer-tests")
-        for pf_name in self.result_storage:
-            rs = cPickle.dumps(self.result_storage[pf_name])
-            tk = bucket.get_key("%s_%s" % (self.answer_name, pf_name)) 
-            if tk is not None: tk.delete()
-            k = Key(bucket)
-            k.key = "%s_%s" % (self.answer_name, pf_name)
-            k.set_contents_from_string(rs)
-            k.set_acl("public-read")
+        self.storage.dump(self.result_storage, result)        
 
-class AnswerTestOpener(object):
+class AnswerTestStorage(object):
     def __init__(self, reference_name):
         self.reference_name = reference_name
         self.cache = {}
+    def dump(self, result_storage, result):
+        pass
+    def get(self, pf_name, default=None):
+        pass
 
+class AnswerTestCloudStorage(AnswerTestStorage):
     def get(self, pf_name, default = None):
         if pf_name in self.cache: return self.cache[pf_name]
         url = _url_path % (self.reference_name, pf_name)
         self.cache[pf_name] = rv
         return rv
 
+    def dump(self, result_storage, result):
+        # This is where we dump our result storage up to Amazon, if we are able
+        # to.
+        import boto
+        from boto.s3.key import Key
+        c = boto.connect_s3()
+        bucket = c.get_bucket("yt-answer-tests")
+        for pf_name in result_storage:
+            rs = cPickle.dumps(result_storage[pf_name])
+            tk = bucket.get_key("%s_%s" % (self.reference_name, pf_name)) 
+            if tk is not None: tk.delete()
+            k = Key(bucket)
+            k.key = "%s_%s" % (self.reference_name, pf_name)
+            k.set_contents_from_string(rs)
+            k.set_acl("public-read")
+
+class AnswerTestLocalStorage(AnswerTestStorage):
+    def __init__(self, reference_name):
+        self.reference_name = reference_name
+        self.cache = {}
+
+    def dump(self, result_storage, result):
+        # Store data using shelve
+        if self.store_results is False: return
+        ds = shelve.open(self.reference_name, protocol=-1)
+        for pf_name in result_storage:
+            answer_name = "%s_%s" % (self.reference_name, pf_name)
+            if name in ds:
+                mylog.info("Overwriting %s", answer_name)
+            ds[answer_name] = result_storage[pf_name]
+        ds.close()
+
+    def get(self, pf_name, default=None):
+        # Read data using shelve
+        answer_name = "%s_%s" % (self.reference_name, pf_name)
+        ds = shelve.open(self.reference_name, protocol=-1)
+        try:
+            result = ds[name]
+        except KeyError:
+            result = default
+        ds.close()
+        return result
+
 @contextlib.contextmanager
 def temp_cwd(cwd):
     oldcwd = os.getcwd()
 
 class AnswerTestingTest(object):
     reference_storage = None
+    result_storage = None
     prefix = ""
     def __init__(self, pf_fn):
         self.pf = data_dir_load(pf_fn)
         nv = self.run()
         if self.reference_storage is not None:
             dd = self.reference_storage.get(self.storage_name)
-            if dd is None: raise YTNoOldAnswer()
+            if dd is None: raise YTNoOldAnswer(self.storage_name)
             ov = dd[self.description]
             self.compare(nv, ov)
         else: