Commits

Tetsuya Morimoto committed 19d3955

refactored core component tests

Comments (0)

Files changed (3)

src/traccron/core.py

         Read configuration and apply it
         """
         # stop existing ticker if any
-        self.env.log.debug("applying config")
+        self.env.log.debug('applying config')
         if Core.current_ticker is not None:
-            self.env.log.debug("stop existing ticker")
+            self.env.log.debug('stop existing ticker')
             Core.current_ticker.cancel(wait=wait)
 
         if self.getCronConf().get_ticker_enabled():
-            self.env.log.debug("ticker is enabled")
+            self.env.log.debug('ticker is enabled')
             # try to execute task a first time
             # because we don't want to wait for interval to elapse
             self.check_task()
-            Core.current_ticker = Ticker(self.env, self.getCronConf().get_ticker_interval(), self.check_task)
+            interval = self.getCronConf().get_ticker_interval()
+            Core.current_ticker = Ticker(self.env, interval, self.check_task)
         else:
-            self.env.log.debug("ticker is disabled")
+            self.env.log.debug('ticker is disabled')
 
     def getCronConf(self):
         """
 
 class WebUi(IAdminPanelProvider, ITemplateProvider, IRequestHandler):
     """
-    Class that deal with Web stuff. It is the both the controller and the page builder.
+    Class that deal with Web stuff.
+    It is the both the controller and the page builder.
     """
     def __init__(self, core):
         self.env = core.env
         elif req.path_info == '/traccron/cron_history':
             return self._displayHistoryView(req)
         else:
-            self.env.log.warn("Trac Cron Plugin was unable to handle %s" % req.path_info)
-            add_warning(req, "The request was not handled by trac cron plugin")
+            msg = 'Trac Cron Plugin was unable to handle %s' % req.path_info
+            self.env.log.warn(msg)
+            add_warning(req, 'The request was not handled by trac cron plugin')
             req.redirect(req.href.admin('traccron', 'cron_admin'))
 
     # internal method
                 endTime = localtime(end)
                 date = datetime.fromtimestamp(start, utc)
                 execution = {
-                    "timestamp": start,
-                    "datetime": date,
-                    "dateuid": self._to_utimestamp(date),
-                    "date": "%d-%d-%d" % (startTime.tm_mon, startTime.tm_mday, startTime.tm_year),
-                    "task": task.getId(),
-                    "start": "%02d h %02d" % (startTime.tm_hour, startTime.tm_min),
-                    "end": "%02d h %02d" % (endTime.tm_hour, endTime.tm_min),
-                    "success": success
+                    'timestamp': start,
+                    'datetime': date,
+                    'dateuid': self._to_utimestamp(date),
+                    'date': '%d-%d-%d' % (startTime.tm_mon, startTime.tm_mday,
+                                          startTime.tm_year),
+                    'task': task.getId(),
+                    'start': '%02d h %02d' % (startTime.tm_hour,
+                                              startTime.tm_min),
+                    'end': '%02d h %02d' % (endTime.tm_hour, endTime.tm_min),
+                    'success': success
                 }
                 _history.append(execution)
 
             for notice in notices:
                 add_notice(req, notice)
         except Exception, e:
-            self.env.log.error('Error writing to trac.ini: %s', exception_to_unicode(e))
+            msg = 'Error writing to trac.ini: %s', exception_to_unicode(e)
+            self.env.log.error(msg)
             add_warning(req, _('Error writing to trac.ini, make sure it is '
                                'writable by the web server. Your changes have '
                                'not been saved.'))
 
     def _displaySettingView(self, data={}):
+        conf = self.cronconf  # just an alias
         data.update({
-            self.cronconf.TICKER_ENABLED_KEY: self.cronconf.get_ticker_enabled(),
-            self.cronconf.TICKER_INTERVAL_KEY: self.cronconf.get_ticker_interval()
+            conf.TICKER_ENABLED_KEY: conf.get_ticker_enabled(),
+            conf.TICKER_INTERVAL_KEY: conf.get_ticker_interval()
         })
         task_list = []
         for task in self.cron_task_list:
             task_data = {}
-            task_data['enabled'] = self.cronconf.is_task_enabled(task)
+            task_data['enabled'] = conf.is_task_enabled(task)
             task_data['id'] = task.getId()
             task_data['description'] = task.getDescription()
             all_schedule_value = {}
             for schedule in self.all_schedule_type:
-                value = self.cronconf.get_schedule_value(task, schedule)
+                value = conf.get_schedule_value(task, schedule)
                 if value is None:
-                    value = ""
-                task_enabled = self.cronconf.is_schedule_enabled(task, schedule)
-                task_arg = self.cronconf.get_schedule_arg(task, schedule)
+                    value = ''
+                task_enabled = conf.is_schedule_enabled(task, schedule)
+                task_arg = conf.get_schedule_arg(task, schedule)
                 if task_arg is None:
-                    task_arg = ""
+                    task_arg = ''
                 all_schedule_value[schedule.getId()] = {
-                    "value": value,
-                    "hint": schedule.getHint(),
-                    "enabled": task_enabled,
-                    "arg": task_arg
+                    'value': value,
+                    'hint': schedule.getHint(),
+                    'enabled': task_enabled,
+                    'arg': task_arg
                 }
 
             task_data['schedule_list'] = all_schedule_value
                 'description': listener.getDescription()
             }
             listener_list.append(listener_data)
-        data["listener_list"] = listener_list
+        data['listener_list'] = listener_list
         return 'cron_listener.html', data
 
     def _saveSettings(self, req, category, page):

tests/test_core.py

 from utils import has_log_message
 
 
-def test_getCronConf(env, component):
+def pytest_funcarg__core(request, component):
+    core = component['core']
+    return core
+
+
+def test_apply_config_with_disabled_ticker(core, caplog):
+    caplog.setLevel(logging.DEBUG, logger=core.env.log.name)
+    core.apply_config()  # in case ticker is disabled
+    expected_messages = [
+        'applying config',
+        'ticker is disabled',
+    ]
+    assert has_log_message(caplog, expected_messages)
+
+
+def test_apply_config_with_enabled_ticker(core, caplog):
+    from traccron.core import Core, Ticker
+    caplog.setLevel(logging.DEBUG, logger=core.env.log.name)
+    core.getCronConf().set_ticker_enabled(True)
+    core.apply_config()  # in case ticker is enabled
+    try:
+        assert isinstance(Core.current_ticker, Ticker)
+        prev_timer_id = id(Core.current_ticker.timer)
+        core.apply_config(wait=True)
+        assert prev_timer_id != id(Core.current_ticker.timer)
+        expected_messages = [
+            'applying config',
+            'ticker is enabled',
+            'check existing task',
+            'check task:',
+            'looking for schedule of type:',
+            'no matching schedule found',
+            'nothing to do:',
+            'create new ticker',
+            'new ticker started',
+            'stop existing ticker',
+            'ticker is enabled',
+        ]
+        assert has_log_message(caplog, expected_messages)
+    finally:
+        Core.current_ticker.cancel(wait=True)
+
+
+def test_getCronConf(core):
     from traccron.core import CronConfig
-    core = component['core']
     cron_conf = core.getCronConf()
     assert isinstance(cron_conf, CronConfig)
 
-    # ticker enabled
-    assert not cron_conf.get_ticker_enabled()
+    # ticker status
     cron_conf.set_ticker_enabled(True)
-    assert cron_conf.get_ticker_enabled()
+    assert True is cron_conf.get_ticker_enabled()
+    cron_conf.set_ticker_enabled(False)
+    assert False is cron_conf.get_ticker_enabled()
 
     # ticker interval
     assert 1 == cron_conf.get_ticker_interval()
     assert 60 == cron_conf.get_ticker_interval()
 
 
-def test_getTaskList(env, component):
+def test_getTaskList(core):
     from traccron.task import AutoPostponeTask
     from traccron.task import HeartBeatTask
     from traccron.task import SleepingTicketReminderTask
     from traccron.task import UnreachableMilestoneTask
-    core = component['core']
     task_list = core.getTaskList()
     task_classes = [AutoPostponeTask, HeartBeatTask,
                     SleepingTicketReminderTask, UnreachableMilestoneTask]
     assert _isinstances(task_list, task_classes)
 
 
-def test_getSupportedScheduleType(env, component):
+def test_getSupportedScheduleType(core):
     from traccron.scheduler import CronScheduler
     from traccron.scheduler import DailyScheduler
     from traccron.scheduler import HourlyScheduler
     from traccron.scheduler import MonthlyScheduler
     from traccron.scheduler import WeeklyScheduler
-    core = component['core']
     schedule_type = core.getSupportedScheduleType()
     schedule_classes = [CronScheduler, DailyScheduler, HourlyScheduler,
                         MonthlyScheduler, WeeklyScheduler]
     assert _isinstances(schedule_type, schedule_classes)
 
 
-def test_getHistoryList(env, component):
+def test_getHistoryList(core):
     from traccron.history import MemoryHistoryStore
-    core = component['core']
     history_list = core.getHistoryList()
     history_classes = [MemoryHistoryStore]
     assert _isinstances(history_list, history_classes)
         assert 0 == len(hist.history)
 
 
-def test_getTaskListnerList(env, component):
+def test_getTaskListnerList(core):
     from traccron.listener import HistoryTaskEvent
     from traccron.listener import NotificationEmailTaskEvent
-    core = component['core']
     listener_list = core.getTaskListnerList()
     listener_classes = [HistoryTaskEvent, NotificationEmailTaskEvent]
     assert _isinstances(listener_list, listener_classes)
 
 
-def test_check_task(env, component, caplog):
+def test_check_task(env, core, caplog):
     caplog.setLevel(logging.DEBUG, logger=env.log.name)
-    core = component['core']
     core.check_task()
     expected_messages = [
         'check existing task',
     assert has_log_message(caplog, expected_messages)
 
 
-def test_run_task(env, component, caplog):
+def test_run_task(env, core, component, caplog):
     caplog.setLevel(logging.DEBUG, logger=env.log.name)
-    core = component['core']
     core.runTask(component['heart_beat_task'])
     expected_messages = [
         'executing task: heart_beat',

tests/test_ticker.py

 import pytest
 from utils import has_log_message
 
-from traccron.core import Ticker
-
 
 def pytest_funcarg__ticker(request, env):
+    from traccron.core import Ticker
     ticker = Ticker(env, 1, lambda: 'wake up')
     return ticker