Commits

Clint Howarth committed c94e4a1

partially repair notification

Comments (0)

Files changed (3)

speedrack/filer.py

         file_path = os.path.join(path, the_file)
         try:
             os.unlink(file_path)
+        # consuming errors
         except Exception, e:
             print e
     try:

speedrack/models.py

 # if spam, just answer with success/fail
 # returns string with message if notification
 # is needed, otherwise False
-def needs_notification(execution, prev_exec = None, spam = None):
+def needs_notification(execution,
+                       spam = None):
+
+    if spam:
+        if not execution.success():
+            return "failed"
+        else:
+            return "success"
+
+    if previous_executions and len(previous_executions) != 1:
+        for execution in previous_executions:
+            if execution.success():
+                return False
+
+    prev_exec = None
+    if previous_executions:
+        prev_exec = previous_executions[0]
 
     if (not execution.success()
-        and (spam or not prev_exec or prev_exec.success())):
+        and (not prev_exec or prev_exec.success())):
         return "failed"
 
     if (execution.success()
-        and (spam or (prev_exec and not prev_exec.success()))):
+        and (prev_exec and not prev_exec.success())):
         return "success"
 
     return False
         return None
 
     def find_previous_execution(self, timestamp):
-        '''Get previous execution to the given timestamp; None if
-        given cannot be found or previous timestamp not available'''
+        pevs = self.find_previous_executions(timestamp)
+        if pevs and len(pevs):
+            return pevs[0]
+        return None
+
+    def find_previous_executions(self, timestamp, count=1):
+        '''Get :count: previous executions to the given timestamp;
+        None if given timestamp cannot be found or previous timestamp
+        is not available'''
+
         dirs = self._get_execution_dirs()
+
+        if not dirs:
+            return None
+
         this_index = -1
+
         for i, x in enumerate(dirs):
             if os.path.split(x)[-1] == timestamp:
                 this_index = i
                 break
+
         if this_index in (-1, len(dirs)-1):
             return None
-        previous_timestamp = dirs[i+1]
-        return Execution(previous_timestamp)
+
+        previous_timestamps = []
+        if count == 1:
+            previous_timestamps = [dirs[i+1]]
+        elif len(dirs) - i <= count:
+            previous_timestamps = dirs[i+1:]
+        else:
+            previous_timestamps = dirs[i+1:i+1+count]
+
+        return [Execution(timestamp) for timestamp in previous_timestamps]
 
     def get_last_execution(self):
         '''Returns None if no executions found.'''
 
     def needs_notification(self, execution):
         _task = Task(execution.name, job_root_dir = execution.task_root)
-        prev_exec = _task.find_previous_execution(execution.timestamp)
-        return needs_notification(execution, prev_exec, self.spam)
+        previous_executions = _task.find_previous_execution(execution.timestamp)
+        return needs_notification(execution, previous_executions, self.spam)
 
     def run(self):
         '''execute task, ping notifiers, clean up old tasks'''
 
 def is_execution_success(execution,
                          default_fail_by_stderr,
-                         default_fail_by_retcode):
+                         default_fail_by_retcode,
+                         count = 1):
     # failing by retcode means either not finding a retcode OR finding
     # a retcode other than 0
     fail_by_stderr = default_fail_by_stderr

speedrack/tests/models_test.py

 
 # Note that unlike default Task execution sorting, this
 # is chronologically ascending
+# (correspond to testdata/sample_task)
 EXECUTIONS_GOOD = [
     "2012-05-03_18:40:44",
     "2012-05-03_18:41:44",
         e = self.t.get_last_execution()
         assert_equals(e.timestamp, EXECUTIONS_GOOD[-1])
 
+    def test_find_previous_executions_good_one(self):
+        prevs = self.t.find_previous_executions(EXECUTIONS_GOOD[1], count=1)
+        assert_equals(1, len(prevs))
+        assert_equals(EXECUTIONS_GOOD[0], prevs[0].timestamp)
+
+    def test_find_previous_executions_good_one_overcount(self):
+        prevs = self.t.find_previous_executions(EXECUTIONS_GOOD[1], count=10)
+        assert_equals(1, len(prevs))
+        assert_equals(EXECUTIONS_GOOD[0], prevs[0].timestamp)
+
+    def test_find_previous_executions_good_two(self):
+        prevs = self.t.find_previous_executions(EXECUTIONS_GOOD[2], count=2)
+        assert_equals(2, len(prevs))
+        timestamps = [x.timestamp for x in prevs]
+        assert_in(EXECUTIONS_GOOD[0], timestamps)
+        assert_in(EXECUTIONS_GOOD[1], timestamps)
+
+    def test_find_previous_executions_good_two_overcount(self):
+        prevs = self.t.find_previous_executions(EXECUTIONS_GOOD[2], count=10)
+        assert_equals(2, len(prevs))
+        timestamps = [x.timestamp for x in prevs]
+        assert_in(EXECUTIONS_GOOD[0], timestamps)
+        assert_in(EXECUTIONS_GOOD[1], timestamps)
+
+    def test_find_previous_executions_bad_first(self):
+        prevs = self.t.find_previous_executions(EXECUTIONS_GOOD[0])
+        assert_is_none(prevs)
+
+    def test_find_previous_executions_bad_nomatch(self):
+        prevs = self.t.find_previous_executions(EXECUTION_BAD)
+        assert_is_none(prevs)
+
 
 class TestExecution(object):
 
             patch.object(self.e0, 'success', return_value=True)):
 
             assert_false(
-                models.needs_notification(self.e0, prev_exec = None, spam = False))
+                models.needs_notification(self.e0, previous_executions = None, spam = False))
 
     def test_initial_success_no_prev_with_spam(self):
         with nested(
             patch.object(self.e0, 'success', return_value=True)):
 
             assert_equal("success",
-                models.needs_notification(self.e0, prev_exec = None, spam = True))
+                models.needs_notification(self.e0, previous_executions = None, spam = True))
 
     def test_initial_failure_no_prev_no_spam(self):
         with nested(
             patch.object(self.e0, 'success', return_value=False)):
 
             assert_equal("failed",
-                models.needs_notification(self.e0, prev_exec = None, spam = False))
+                models.needs_notification(self.e0, previous_executions = None, spam = False))
 
     def test_initial_failure_no_prev_with_spam(self):
         with nested(
             patch.object(self.e0, 'success', return_value=False)):
 
             assert_equal("failed",
-                models.needs_notification(self.e0, prev_exec = None, spam = True))
+                models.needs_notification(self.e0, previous_executions = None, spam = True))
 
     def test_failure_after_failure_no_spam(self):
         with nested(
             patch.object(self.e1, 'success', return_value=False)):
 
             assert_false(
-                models.needs_notification(self.e0, prev_exec = self.e1, spam = False))
+                models.needs_notification(self.e0, previous_executions = [self.e1], spam = False))
 
     def test_failure_after_failure_with_spam(self):
         with nested(
             patch.object(self.e1, 'success', return_value=False)):
 
             assert_equal("failed",
-                models.needs_notification(self.e0, prev_exec = self.e1, spam = True))
+                models.needs_notification(self.e0, previous_executions = [self.e1], spam = True))
 
     def test_success_after_failure_no_spam(self):
         with nested(
             patch.object(self.e1, 'success', return_value=False)):
 
             assert_equal("success",
-                models.needs_notification(self.e0, prev_exec = self.e1, spam = False))
+                models.needs_notification(self.e0, previous_executions = [self.e1], spam = False))
 
     def test_success_after_failure_with_spam(self):
         with nested(
             patch.object(self.e1, 'success', return_value=False)):
 
             assert_equal("success",
-                models.needs_notification(self.e0, prev_exec = self.e1, spam = True))
+                models.needs_notification(self.e0, previous_executions = [self.e1], spam = True))
 
     def test_success_after_success_no_spam(self):
         with nested(
             patch.object(self.e1, 'success', return_value=True)):
 
             assert_false(
-                models.needs_notification(self.e0, prev_exec = self.e1, spam = False))
+                models.needs_notification(self.e0, previous_executions = [self.e1], spam = False))
 
     def test_success_after_success_with_spam(self):
         with nested(
             patch.object(self.e1, 'success', return_value=True)):
 
             assert_equal("success",
-                models.needs_notification(self.e0, prev_exec = self.e1, spam = True))
+                models.needs_notification(self.e0, previous_executions = [self.e1], spam = True))
 
     def test_failure_after_success_no_spam(self):
         with nested(
             patch.object(self.e1, 'success', return_value=True)):
 
             assert_equal("failed",
-                models.needs_notification(self.e0, prev_exec = self.e1, spam = False))
+                models.needs_notification(self.e0, previous_executions = [self.e1], spam = False))
 
     def test_failure_after_success_with_spam(self):
         with nested(
             patch.object(self.e1, 'success', return_value=True)):
 
             assert_equal("failed",
-                models.needs_notification(self.e0, prev_exec = self.e1, spam = True))
+                models.needs_notification(self.e0, previous_executions = [self.e1], spam = True))
+
+
+class TestNeedsNotificationWithConsecutiveFailures(object):
+
+    def setup(self):
+        e0_path = os.path.join(TESTDATA_PATH,
+                               SAMPLE_TASK,
+                               EXECUTIONS_GOOD[0])
+        self.e0 = models.Execution(e0_path)
+        self.e1 = models.Execution(e0_path)
+
+    def test_two_consec_with_two_failures(self):
+        with nested(
+            patch.object(self.e0, 'success', return_value=False),
+            patch.object(self.e1, 'success', return_value=False)):
+
+            pass
+
+    def test_two_consec_with_one_failure(self):
+        with nested(
+            patch.object(self.e0, 'success', return_value=True),
+            patch.object(self.e1, 'success', return_value=False)):
+
+#            assert_equal("failed",
+#                         models.needs_notification(self.e0, previous_executions = [self.e1], spam = True))
+            pass
 
 
 class TestIsConfError(object):