Commits

jordilin  committed dda877b

TriggerExecutor on its own thread to control the executions and avoid leaving
zombie processes lurking around. One execution at a time.

  • Participants
  • Parent commits 4d0a7e5

Comments (0)

Files changed (3)

File log4tailer/LogTailer.py

                 log.closeLog()
             if self.mailAction:
                 self.mailAction.quitSMTP()
+            for action in self.actions:
+                if hasattr(action, 'stop'):
+                    action.stop()
             print "\n"
             resume.report()
             print "Ended log4tailer, because colors are fun"

File log4tailer/notifications.py

 # Log4Tailer: A multicolored python tailer for log4J logs
-# Copyright (C) 2008 Jordi Carrillo Bosch
+# Copyright (C) 2010 Jordi Carrillo Bosch
 
 # This file is part of Log4Tailer Project.
 #
 from smtplib import *
 from log4tailer.TermColorCodes import TermColorCodes
 import subprocess
-
+import threading
+try:
+    import queue
+except ImportError:
+    import Queue as queue
 
 class Print(object):
     '''PrintAction: prints to stdout the 
                 self.timer.stopTimer()
                 self.count = 0
                 self.flagged = False
+                
+                
+class TriggerExecutor(threading.Thread):
+    """Triggers the trigger command, one 
+    trigger_command at a time, in its own thread.
+    """
+
+    def __init__(self):
+        threading.Thread.__init__(self)
+        self.queue = queue.Queue()
+        self.go = True
+
+    def landing(self, trigger_command):
+        """One command to be triggered. Enqueued 
+        to be executed when ready.
+        
+        :param trigger_command: command to be triggered.
+        """
+        self.queue.put(trigger_command)
+
+    def run(self):
+        while self.go:
+            trigger_command = self.queue.get()
+            if trigger_command == 'stop':
+                continue
+            try:
+                subprocess.call(trigger_command)
+            except Exception, err:
+                print err
+
+    def stop(self):
+        self.go = False
             
 class Executor(object):
     """Will execute a program if a certain condition is given"""
         self.full_trigger_active = False
         if self.PlaceHolders in self.executable:
             self.full_trigger_active = True
+        self.trigger_executor = TriggerExecutor()
+        self.started = False
 
     def _build_trigger(self, logtrace, logpath):
         if self.full_trigger_active:
             return
         logtrace, logpath = message.getPlainMessage()
         trigger = self._build_trigger(logtrace, logpath)
-        try:
-            subprocess.Popen(trigger)
-        except Exception, err:
-            # log4tailer should continue processing
-            print err
-        
+        if not self.started:
+            self.started = True
+            self.trigger_executor.start()
+        self.trigger_executor.landing(trigger)
+    
+    def stop(self):
+        if self.started:
+            self.trigger_executor.stop()
+            # unblock the queue
+            self.trigger_executor.landing("stop")
+            self.trigger_executor.join()
+            

File test/test_executor.py

         properties.parseProperties()
         executor = notifications.Executor(properties)
         self.assertEquals(['ls', '-l'], executor.executable)
+        executor.stop()
 
     def testShouldRaiseIfExecutorNotProvided(self):
         fh = open(CONFIG, 'w')
         properties.parseProperties()
         executor = notifications.Executor(properties)
         self.assertEqual(True, executor.full_trigger_active)
+        executor.stop()
 
     def testFullTriggerFalseBasedOnConfig(self):
         fh = open(CONFIG, 'w')
         properties.parseProperties()
         executor = notifications.Executor(properties)
         self.assertEqual(False, executor.full_trigger_active)
+        executor.stop()
 
     def testShouldNotifyWithFullTrigger(self):
         logcolor = LogColors()
         trigger = ['ls', '-l', trace, log.getLogPath() ]
         properties = Property(CONFIG)
         properties.parseProperties()
-        executor = notifications.Executor(properties)
         os_mock = self.mocker.replace('subprocess')
-        os_mock.Popen(trigger)
+        os_mock.call(trigger)
         self.mocker.result(True)
         self.mocker.replay()
+        executor = notifications.Executor(properties)
         message.parse(trace, log)
         executor.notify(message, log)
+        time.sleep(0.2)
+        executor.stop()
     
     def testShouldNotifyWithNoFullTrigger(self):
         logcolor = LogColors()
         trigger = executor._build_trigger(trace, logpath)
         self.assertEqual(['echo'], trigger)
         executor.notify(message, log)
+        executor.stop()
 
     def testShouldNotifyAndContinueIfExecutorFails(self):
         logcolor = LogColors()
         executor = notifications.Executor(properties)
         message.parse(trace, log)
         executor.notify(message, log)
+        time.sleep(0.2)
+        executor.stop()
         self.assertTrue(sys.stdout.captured)
 
     def testShouldContinueTailingIfExecutableTakesLongTime(self):
         executor.notify(message, log)
         finished = time.time()
         ellapsed = start - finished
+        time.sleep(0.2)
+        executor.stop()
         # executable.py sleeps for three seconds
-        self.assertTrue(ellapsed < 2)
+        self.assertTrue(ellapsed < 1)
 
     def testShouldNotExecuteIfLevelNotInPullers(self):
         logcolor = LogColors()
         executor = notifications.Executor(properties)
         message.parse(trace, log)
         executor.notify(message, log)
+        time.sleep(0.2)
+        executor.stop()
         self.assertFalse(sys.stdout.captured)
-        trace = "this is an warning log trace"
-        sys.stdout.captured = []
-        message.parse(trace, log)
-        executor.notify(message, log)
-        self.assertFalse(sys.stdout.captured)
-        trace = "this is an fatal log trace"
-        message.parse(trace, log)
-        executor.notify(message, log)
-        self.assertTrue(sys.stdout.captured)
-        
+       
     def tearDown(self):
         self.mocker.restore()
         self.mocker.verify()