Commits

Tetsuya Morimoto committed 7930129

added SleepingTicketReminderTask component tests and refactoring

  • Participants
  • Parent commits 5225262

Comments (0)

Files changed (4)

File src/traccron/task.py

 ##             O U T    O F    T H E    B O X    T A S K
 ##
 ###############################################################################
+from datetime import datetime, timedelta
 from time import time, localtime
+
 from trac.ticket.model import Ticket
 from trac.core import Component, implements
 from trac.notification import NotifyEmail
+from trac.util.datefmt import utc, to_utimestamp
 from trac.web.chrome import ITemplateProvider
 from traccron.api import ICronTask
 from traccron.core import CronConfig
         return self.__doc__
 
 
+class SleepingTicketNotification(NotifyEmail):
+
+    template_name = 'sleeping_ticket_template.txt'
+
+    def __init__(self, env):
+        NotifyEmail.__init__(self, env)
+
+    def get_recipients(self, owner):
+        return ([owner], [])
+
+    def remind(self, tiketsByOwner, delay):
+        """
+        Send a digest mail to ticket owner to remind him of those
+        sleeping tickets
+        """
+        for owner in tiketsByOwner.keys():
+            # prepare the data for the email content generation
+            self.data.update({
+                'ticket_count': len(tiketsByOwner[owner]),
+                'delay': delay
+            })
+            NotifyEmail.notify(self, owner, 'Sleeping ticket notification')
+
+    def send(self, torcpts, ccrcpts):
+        return NotifyEmail.send(self, torcpts, ccrcpts)
+
+
+class OrphanedTicketNotification(NotifyEmail):
+
+    template_name = 'orphaned_ticket_template.txt'
+
+    def __init__(self, env):
+        NotifyEmail.__init__(self, env)
+
+    def get_recipients(self, reporter):
+        return ([reporter], [])
+
+    def remind(self, tiketsByReporter, delay):
+        """
+        Send a digest mail to the reporter to remind them
+        of those orphaned tickets
+        """
+        for reporter in tiketsByReporter.keys():
+            # prepare the data for the email content generation
+            self.data.update({
+                'ticket_count': len(tiketsByReporter[reporter]),
+                'delay': delay
+            })
+            NotifyEmail.notify(self, reporter, 'orphaned ticket notification')
+
+    def send(self, torcpts, ccrcpts):
+        return NotifyEmail.send(self, torcpts, ccrcpts)
+
+
 class SleepingTicketReminderTask(Component, ICronTask, ITemplateProvider):
     """
     Remind user about sleeping ticket they are assigned to.
 
     implements(ICronTask, ITemplateProvider)
 
+    select_assigned_ticket = """
+        SELECT t.id , t.owner
+        FROM ticket t, ticket_change tc
+        WHERE t.id = tc.ticket
+        AND t.status in ('new','assigned','accepted')
+        AND (SELECT MAX(tc2.time)
+             FROM ticket_change tc2
+             WHERE tc2.ticket=tc.ticket) < %s
+        GROUP BY t.id
+    """
+
+    select_orphaned_ticket = """
+       SELECT t.id, t.reporter
+       FROM ticket t
+       WHERE t.id not in (SELECT tc.ticket
+                          FROM ticket_change tc
+                          WHERE tc.ticket=t.id)
+       AND t.time < %s
+       AND t.status = 'new'
+    """
+
     def get_htdocs_dirs(self):
         return []
 
         from pkg_resources import resource_filename
         return [resource_filename(__name__, 'templates')]
 
+    def _find_sleeping_ticket(self, sql, delay, dico, msg):
+        db = self.env.get_db_cnx()
+        cursor = db.cursor()
+        delay_time = datetime.now(utc) - timedelta(days=delay)
+        cursor.execute(sql, (to_utimestamp(delay_time),))
+        for ticket, recipient in cursor:
+            self.env.log.info(msg % (ticket, recipient, delay))
+            if recipient in dico:
+                dico[recipient].append(ticket)
+            else:
+                dico[recipient] = [ticket]
+
+    def remind_assigned_ticket(self, delay):
+        dico = {}
+        msg = 'warning sleeping ticket %d assigned to %s ' \
+              'but is inactive since more than %d day'
+        self._find_sleeping_ticket(self.select_assigned_ticket,
+                                   delay, dico, msg)
+        SleepingTicketNotification(self.env).remind(dico, delay)
+
+    def remind_orphaned_ticket(self, delay):
+        dico = {}
+        msg = 'warning ticket %d is new but orphaned: %s, %d'
+        self._find_sleeping_ticket(self.select_orphaned_ticket,
+                                   delay, dico, msg)
+        OrphanedTicketNotification(self.env).remind(dico, delay)
+
     def wake_up(self, *args):
         delay = 3
         if len(args) > 0:
             delay = int(args[0])
 
-        class SleepingTicketNotification(NotifyEmail):
-
-            template_name = "sleeping_ticket_template.txt"
-
-            def __init__(self, env):
-                NotifyEmail.__init__(self, env)
-
-            def get_recipients(self, owner):
-                return ([owner], [])
-
-            def remind(self, tiketsByOwner):
-                """
-                Send a digest mail to ticket owner to remind him of those
-                sleeping tickets
-                """
-                for owner in tiketsByOwner.keys():
-                    # prepare the data for the email content generation
-                    self.data.update({
-                        "ticket_count": len(tiketsByOwner[owner]),
-                        "delay": delay
-                    })
-                    NotifyEmail.notify(self, owner, "Sleeping ticket notification")
-
-            def send(self, torcpts, ccrcpts):
-                return NotifyEmail.send(self, torcpts, ccrcpts)
-
-        class OrphanedTicketNotification(NotifyEmail):
-
-            template_name = "orphaned_ticket_template.txt"
-
-            def __init__(self, env):
-                NotifyEmail.__init__(self, env)
-
-            def get_recipients(self, reporter):
-                return ([reporter], [])
-
-            def remind(self, tiketsByReporter):
-                """
-                Send a digest mail to the reporter to remind them
-                of those orphaned tickets
-                """
-                for reporter in tiketsByReporter.keys():
-                    # prepare the data for the email content generation
-                    self.data.update({
-                        "ticket_count": len(tiketsByReporter[owner]),
-                        "delay": delay
-                    })
-                    NotifyEmail.notify(self, reporter, "orphaned ticket notification")
-
-            def send(self, torcpts, ccrcpts):
-                return NotifyEmail.send(self, torcpts, ccrcpts)
-
         # look for ticket assigned but not touched since more that the delay
-        db = self.env.get_db_cnx()
-        cursor = db.cursor()
-        # assigned ticket
-        cursor.execute("""
-                SELECT t.id , t.owner  FROM ticket t, ticket_change tc
-                WHERE  t.id = tc.ticket
-                AND    t.status in ('new','assigned','accepted')
-                AND    (SELECT MAX(tc2.time) FROM ticket_change tc2 WHERE tc2.ticket=tc.ticket)  < %s GROUP BY t.id
-            """, (time() - delay * 24 * 60 * 60,))
-        dico = {}
-        for ticket, owner in cursor:
-            self.env.log.info("warning ticket %d assigned to %s but is inactive since more than %d day" % (ticket, owner, delay))
-            if dico.has_key(owner):
-                dico[owner].append(ticket)
-            else:
-                dico[owner] = [ticket]
-        SleepingTicketNotification(self.env).remind(dico)
-        # orphaned ticket
-        cursor.execute("""
-               SELECT t.id, t.reporter  FROM  ticket t
-               WHERE  t.id not in (select tc.ticket FROM ticket_change tc WHERE tc.ticket=t.id)
-               AND t.time < %s AND t.status = 'new'
-            """, (time() - delay * 24 * 60 * 60,))
-        dico = {}
-        for ticket, reporter in cursor:
-            self.env.log.info("warning ticket %d is new but orphaned" % (ticket,))
-            if dico.has_key(reporter):
-                dico[reporter].append(ticket)
-            else:
-                dico[reporter] = [ticket]
-        OrphanedTicketNotification(self.env).remind(dico)
+        self.remind_assigned_ticket(delay)
+        self.remind_orphaned_ticket(delay)
 
     def getId(self):
-        return "sleeping_ticket"
+        return 'sleeping_ticket'
 
     def getDescription(self):
         return self.__doc__

File tests/test_heart_beat_task.py

     heart_beat_task = component['heart_beat_task']
     return heart_beat_task
 
+
 def test_heart_beat_task_getId(heart_beat_task):
     assert 'heart_beat' == heart_beat_task.getId()
 

File tests/test_sleeping_ticket_reminder_task.py

+# -*- coding: utf-8 -*-
+import logging
+from datetime import datetime, timedelta
+
+import pytest
+from utils import create_orphaned_ticket, create_ticket, create_tickets
+from utils import has_log_message
+
+from trac.util.datefmt import utc
+
+
+def pytest_funcarg__sleeping_ticket_reminder_task(request, component):
+    sleeping_ticket_reminder_task = component['sleeping_ticket_reminder_task']
+    return sleeping_ticket_reminder_task
+
+
+def test_sleeping_ticket_reminder_task_getId(sleeping_ticket_reminder_task):
+    assert 'sleeping_ticket' == sleeping_ticket_reminder_task.getId()
+
+
+def test_sleeping_ticket_reminder_task_wake_up(sleeping_ticket_reminder_task,
+                                               caplog):
+    env = sleeping_ticket_reminder_task.env
+    caplog.setLevel(logging.DEBUG, logger=env.log.name)
+
+    # create tickets
+    when = datetime.now(utc) - timedelta(days=5)
+    t1 = create_tickets(env, 3, owner='sleeper1', when=when)
+    o1 = create_orphaned_ticket(env, when=when)
+
+    sleeping_ticket_reminder_task.wake_up()
+    expected_messages = [
+        'action controllers for ticket workflow',
+        'warning sleeping ticket',
+        'warning ticket',
+    ]
+    assert has_log_message(caplog, expected_messages)
+
+
+@pytest.mark.parametrize('args', [
+    (2,),
+    (7, 'test'),
+])
+def test_sleeping_ticket_reminder_task_wake_up_args(
+        sleeping_ticket_reminder_task, caplog, args):
+    env = sleeping_ticket_reminder_task.env
+    caplog.setLevel(logging.DEBUG, logger=env.log.name)
+
+    # create tickets
+    ticket_subtract_days = 5
+    when = datetime.now(utc) - timedelta(days=ticket_subtract_days)
+    t1 = create_tickets(env, 3, owner='sleeper1', when=when)
+    o1 = create_orphaned_ticket(env, when=when)
+
+    sleeping_ticket_reminder_task.wake_up(*args)
+    expected_messages = ['action controllers for ticket workflow']
+    if args[0] <= ticket_subtract_days:
+        expected_messages.extend([
+            'warning sleeping ticket',
+            'warning ticket',
+        ])
+    assert has_log_message(caplog, expected_messages)

File tests/utils.py

 # -*- coding: utf-8 -*-
 import sys
-from itertools import ifilterfalse
+import time
+
+from trac.ticket.model import Ticket
 
 
 def has_log_message(caplog, expected_messages, strict=True):
         return lambda obj: tuple(attrgetter(attr)(obj) for attr in args)
     else:  # version >= 2.5
         return attrgetter(*args)
+
+
+def create_ticket(env, owner='user1', when=None):
+    t = Ticket(env)
+    t['summary'] = 'test'
+    t['description'] = 'ticket for test'
+    t['owner'] = owner
+    t.insert(when=when)
+    assert t.exists
+    t['status'] = 'new'
+    assert t.save_changes(when=when)
+    return t
+
+def create_tickets(env, num, owner='user1', when=None):
+    return [create_ticket(env, owner, when) for _ in range(num)]
+
+def create_orphaned_ticket(env, reporter='user1', when=None):
+    t = Ticket(env)
+    t['summary'] = 'orphaned test'
+    t['description'] = 'orphaned ticket for test'
+    t['reporter'] = reporter
+    t.insert(when=when)
+    assert t.exists
+    db = env.get_db_cnx()
+    cursor = db.cursor()
+    cursor.execute("update ticket set status='new' where id=%s", (t.id,))
+    return t