Commits

Tetsuya Morimoto  committed 6632752

added ticker tests

  • Participants
  • Parent commits efd76dd

Comments (0)

Files changed (4)

File src/traccron/core.py

         wait : if True the current thread wait until running task finished.
         Default is False
         """
-        self.env.log.debug("create new ticker")
+        self.env.log.debug('create new ticker')
         if self.timer is not None:
             self.timer.cancel()
             if (wait):
             _delay = self.interval * 60
         self.timer = Timer(_delay, self.wake_up)
         self.timer.start()
-        self.env.log.debug("new ticker started")
+        self.env.log.debug('new ticker started')
 
     def wake_up(self):
         """
         Wake up this ticker. This ticker will call the callback function then
         create a new timer to wake it up again
         """
-        self.env.log.debug("ticker wake up")
+        self.env.log.debug('ticker wake up')
         in_hurry = True
         while (in_hurry):
             wake_up_time = datetime(*datetime.now().timetuple()[:6])
 
             now = datetime(*datetime.now().timetuple()[:6])
             next_wake_up_time = wake_up_time + timedelta(minutes=self.interval)
-            self.env.log.debug("last wake up time %s" % str(wake_up_time))
-            self.env.log.debug("next wake up time %s" % str(next_wake_up_time))
-            self.env.log.debug("current time %s" % str(now))
+            self.env.log.debug('last wake up time %s' % wake_up_time)
+            self.env.log.debug('next wake up time %s' % next_wake_up_time)
+            self.env.log.debug('current time %s' % now)
 
             if now < next_wake_up_time:
                 # calculate amount of second to wait in second
                 seconds_before_next_wake_up = (next_wake_up_time - now).seconds
-                # need to adjust it to ensure to skip this last wake up  minute window
+                # need to adjust it to ensure to skip this last wake up
+                # minute window
                 if next_wake_up_time.second == 0:
                     # slide up 1 second
                     seconds_before_next_wake_up += 1
                     # slid down 1 second
                     seconds_before_next_wake_up -= 1
 
-                self.env.log.debug("adjusted wait %s secondes" % seconds_before_next_wake_up)
+                msg = 'adjusted wait %s secondes'
+                self.env.log.debug(msg % seconds_before_next_wake_up)
                 self.create_new_timer(delay=seconds_before_next_wake_up)
                 in_hurry = False
             else:
                 # the next wake up is over,
                 seconds_before_next_wake_up = (now - next_wake_up_time).seconds
-                self.env.log.warn("task processing duration overtake ticker interval\n"
-                                  "next wake up is over since %d seconds " % seconds_before_next_wake_up)
+                msg = 'task processing duration overtake ticker interval, '\
+                      'next wake up is over since %d seconds'
+                self.env.log.warn(msg % seconds_before_next_wake_up)
 
     def cancel(self, wait=False):
         self.timer.cancel()

File tests/test_core.py

 import time
 
 import pytest
+from utils import has_log_message
 
 
 def test_getCronConf(env, component):
         'check existing task',
         'check task:',
         'nothing to do:',
-        'schedule is disabled:',
-        'task is disabled:',
         'looking for schedule of type:',
-        'task is scheduled:',
         'no matching schedule found',
     ]
-    assert _has_log_message(caplog, expected_messages)
+    assert has_log_message(caplog, expected_messages)
 
 
 def test_run_task(env, component, caplog):
         'executing task: heart_beat',
         'Heart beat: boom boom !!!',
         'task execution result is SUCCESS',
-        'task execution result is FAILURE,',
         'notifying task event...',
         'no recipient for task event, aborting',
         'task is finished: heart_beat',
     ]
-    assert _has_log_message(caplog, expected_messages)
+    assert has_log_message(caplog, expected_messages)
 
 
 """
         else:
             assert False, 'unknown object: %s' % obj
     return True
-
-def _has_log_message(caplog, expected_messages):
-    for log in caplog.records():
-        for msg in expected_messages:
-            if log.message.startswith(msg):
-                break
-        else:
-            assert False, 'unknown message: %s' % log.message
-    return True

File tests/test_ticker.py

+# -*- coding: utf-8 -*-
+import logging
+import time
+
+import pytest
+from utils import has_log_message
+
+from traccron.core import Ticker
+
+
+def pytest_funcarg__ticker(request, env):
+    ticker = Ticker(env, 1, lambda: 'wake up')
+    return ticker
+
+
+def test_basic_ticker_action(ticker):
+    try:
+        assert ticker.timer.isAlive()
+        assert 60 == ticker.timer.interval
+        assert 'wake up' == ticker.callback()
+
+        # stop timer, but still alive
+        ticker.cancel()
+        assert ticker.timer.isAlive()
+    finally:
+        # stop timer thread
+        ticker.cancel(wait=True)
+        assert not ticker.timer.isAlive()
+
+
+def test_create_new_timer(ticker):
+    try:
+        assert ticker.timer.isAlive()
+        prev_timer_id = id(ticker.timer)
+        ticker.create_new_timer(wait=True, delay=10)
+        assert prev_timer_id != id(ticker.timer)
+        assert 10 == ticker.timer.interval
+    finally:
+        ticker.cancel(wait=True)
+
+
+def test_wake_up(ticker, caplog):
+    caplog.setLevel(logging.DEBUG, logger=ticker.env.log.name)
+    try:
+        ticker.callback = lambda: ticker.env.log.debug('only logging')
+        ticker.create_new_timer(wait=True, delay=1)
+        time.sleep(2)
+        expected_messages = [
+            'create new ticker',
+            'new ticker started',
+            'ticker wake up',
+            'only logging',
+            'last wake up time ',
+            'next wake up time ',
+            'current time ',
+            'adjusted wait 60 secondes',
+            'create new ticker',
+            'new ticker started',
+        ]
+        assert has_log_message(caplog, expected_messages)
+    finally:
+        ticker.cancel(wait=True)

File tests/utils.py

+# -*- coding: utf-8 -*-
+from itertools import ifilterfalse
+
+
+def has_log_message(caplog, expected_messages, strict=True):
+    log_status = dict((msg, False) for msg in expected_messages)
+    for log in caplog.records():
+        for msg in expected_messages:
+            if log.message.startswith(msg):
+                log_status[msg] = True
+                break
+        else:
+            assert False, 'unknown message: "%s"' % log.message
+
+    if strict:
+        for msg, is_logged in log_status.items():
+            assert is_logged, 'unlogged message: "%s"' % msg
+    return True