Commits

Anonymous committed d83f0ec

bug 840305 - mozharness run_command() idle timeouts. r=catlee

  • Participants
  • Parent commits fcb5bdd

Comments (0)

Files changed (6)

File mozharness/base/log.py

 import logging
 import os
 import sys
+import time
 import traceback
 
 # Define our own FATAL_LEVEL
     def fatal(self, message, exit_code=-1):
         self.log(message, level=FATAL, exit_code=exit_code)
 
+
 # OutputParser {{{1
 class OutputParser(LogMixin):
     """ Helper object to parse command output.
         self.num_pre_context_lines = 0
         self.num_post_context_lines = 0
         self.worst_log_level = INFO
+        self.last_log_time = None
 
     def parse_single_line(self, line):
+        self.last_log_time = time.time()
         for error_check in self.error_list:
             # TODO buffer for context_lines.
             match = False
     error,critical,fatal messages for us to count up at the end (aiming
     for 0).
     """
-    LEVELS = {DEBUG: logging.DEBUG,
-              INFO: logging.INFO,
-              WARNING: logging.WARNING,
-              ERROR: logging.ERROR,
-              CRITICAL: logging.CRITICAL,
-              FATAL: FATAL_LEVEL
-             }
+    LEVELS = {
+        DEBUG: logging.DEBUG,
+        INFO: logging.INFO,
+        WARNING: logging.WARNING,
+        ERROR: logging.ERROR,
+        CRITICAL: logging.CRITICAL,
+        FATAL: FATAL_LEVEL
+    }
 
     def __init__(self, log_level=INFO,
                  log_format='%(message)s',
                  logger_name='',
                  halt_on_failure=True,
                  append_to_log=False,
-                ):
+                 ):
         self.halt_on_failure = halt_on_failure,
         self.log_format = log_format
         self.log_date_format = log_date_format
     def init_message(self, name=None):
         if not name:
             name = self.__class__.__name__
-        self.log_message("%s online at %s in %s" % \
+        self.log_message("%s online at %s in %s" %
                          (name, datetime.now().strftime("%Y%m%d %H:%M:%S"),
                          os.getcwd()))
 
             self.log_files['raw'] = '%s_raw.log' % self.log_name
             self.add_file_handler(os.path.join(self.abs_log_dir,
                                                self.log_files['raw']),
-                                 log_format='%(message)s')
+                                  log_format='%(message)s')
 
     def _clear_handlers(self):
         """To prevent dups -- logging will preserve Handlers across
         self._clear_handlers()
 
     def add_console_handler(self, log_level=None, log_format=None,
-                          date_format=None):
+                            date_format=None):
         console_handler = logging.StreamHandler()
         console_handler.setLevel(self.get_logger_level(log_level))
         console_handler.setFormatter(self.get_log_formatter(log_format=log_format,
         self.all_handlers.append(console_handler)
 
     def add_file_handler(self, log_path, log_level=None, log_format=None,
-                       date_format=None):
+                         date_format=None):
         if not self.append_to_log and os.path.exists(log_path):
             os.remove(log_path)
         file_handler = logging.FileHandler(log_path)
             raise SystemExit(exit_code)
 
 
-
 # SimpleFileLogger {{{1
 class SimpleFileLogger(BaseLogger):
     """Create one logFile.  Possibly also output to
         self.add_file_handler(self.log_path)
 
 
-
-
 # MultiFileLogger {{{1
 class MultiFileLogger(BaseLogger):
     """Create a log per log level in log_dir.  Possibly also output to
                                       log_level=level)
 
 
-
 # __main__ {{{1
 
 if __name__ == '__main__':

File mozharness/base/script.py

 import pprint
 import re
 import shutil
+import signal
 import subprocess
 import sys
+import threading
 import time
 import urllib2
 import urlparse
 except ImportError:
     import json
 
+
 from mozharness.base.config import BaseConfig
 from mozharness.base.log import SimpleFileLogger, MultiFileLogger, \
-    LogMixin, OutputParser, DEBUG, INFO, ERROR, FATAL
+    LogMixin, OutputParser, DEBUG, INFO, ERROR, WARNING, FATAL
+
+
+def _process_streaming_output(proc, parser):
+    """
+        Helper method: take streaming output from a subprocess
+        in a non-blocking fashion, so we can detect idle timeouts.
+
+        Based on http://stackoverflow.com/questions/375427/non-blocking-read-on-a-subprocess-pipe-in-python
+        """
+    next_line = None
+    buf = ''
+    delay = 0
+    while True:
+        """ Read one character at a time in non-blocking fashion.
+            Detect newlines and add to parser.
+            """
+        out = proc.stdout.read(1)
+        if out == '' and proc.poll() is not None:
+            # End of process.
+            if buf:
+                parser.add_lines(buf)
+            break
+        if out != '':
+            # Output!
+            delay = 0
+            buf += out
+            if out == '\n':
+                next_line = buf
+                buf = ''
+        else:
+            # No output; slow down the loop
+            delay += 0.1
+            if delay > 1:
+                delay = 1
+            time.sleep(delay)
+        if not next_line:
+            continue
+        parser.add_lines(next_line)
+        next_line = None
+    proc.stdout.close()
 
 
 # ScriptMixin {{{1
             self.log("Unknown return_type type %s requested in query_exe!" % return_type, level=error_level)
         return exe
 
-    def run_command(self, command, cwd=None, error_list=None, parse_at_end=False,
+    def terminate_pid(self, pid, sig=None):
+        """
+            Kill a subprocess, compatible with Python 2.5
+            From http://stackoverflow.com/questions/1064335/in-python-2-5-how-do-i-kill-a-subprocess
+
+            """
+        self.info("Killing pid %s..." % str(pid))
+        if self._is_windows():
+            import ctypes
+            PROCESS_TERMINATE = 1
+            handle = ctypes.windll.kernel32.OpenProcess(PROCESS_TERMINATE, False, pid)
+            ctypes.windll.kernel32.TerminateProcess(handle, -1)
+            ctypes.windll.kernel32.CloseHandle(handle)
+        else:
+            if sig is None:
+                sig = signal.SIGTERM
+            os.kill(pid, sig)
+
+    def run_command(self, command, cwd=None, error_list=None,
+                    idle_timeout=None, idle_error_level=WARNING,
                     halt_on_failure=False, success_codes=None,
                     env=None, return_type='status', throw_exception=False,
                     output_parser=None):
         """Run a command, with logging and error parsing.
 
-        TODO: parse_at_end, context_lines
-        TODO: retry_interval?
-        TODO: error_level_override?
-        TODO: Add a copy-pastable version of |command| if it's a list.
+        TODO: context_lines
         TODO: print env if set
 
         output_parser lets you provide an instance of your own OutputParser
          {'regex': re.compile('^Error:'), level=ERROR, contextLines='5:5'},
          {'substr': 'THE WORLD IS ENDING', level=FATAL, contextLines='20:'}
         ]
-        (context_lines isn't written yet)
         """
         if success_codes is None:
             success_codes = [0]
             self.log('caught OS error %s: %s while running %s' % (e.errno,
                      e.strerror, command), level=level)
             return -1
+
         if output_parser is None:
             parser = OutputParser(config=self.config, log_obj=self.log_obj,
                                   error_list=error_list)
         else:
             parser = output_parser
-        loop = True
-        while loop:
+        parser.last_log_time = time.time()
+
+        thread = threading.Thread(target=_process_streaming_output, args=(p, parser))
+        thread.daemon = True
+        thread.start()
+        while True:
             if p.poll() is not None:
-                """Avoid losing the final lines of the log?"""
-                loop = False
-            while True:
-                line = p.stdout.readline()
-                if not line:
-                    break
-                parser.add_lines(line)
+                if thread and thread.is_alive():
+                    thread.join()
+                break
+            t = time.time()
+            if idle_timeout and t - parser.last_log_time > idle_timeout:
+                if p.poll() is None:
+                    self.log("Process has passed max idle timeout of %d seconds." % idle_timeout,
+                             level=idle_error_level)
+                    self.terminate_pid(p.pid)
+                    time.sleep(1)
+                    if p.poll() is None:
+                        self.warning("It looks like the process may still be around; trying p.kill()")
+                        self.terminate_pid(p.pid, sig=signal.SIGKILL)
+                        time.sleep(1)
+                        try:
+                            p.kill()
+                            time.sleep(1)
+                        except:
+                            pass
+                        if p.poll() is None:
+                            self.log("Still not dead.  Giving up.", level=idle_error_level)
+                            # We could |p.wait(); if t and t.is_alive(): t.join()| here
+                            # However, blocking on something finishing when we're trying
+                            # to time out seems counter-intuitive
+                break
         return_level = INFO
         if p.returncode not in success_codes:
             return_level = ERROR

File mozharness/base/vcs/mercurial.py

     def __init__(self, log_obj=None, config=None, vcs_config=None):
         super(MercurialVCS, self).__init__()
         self.can_share = None
+        self.idle_timeout = 15 * 60
         self.log_obj = log_obj
         if config:
             self.config = config
         #  revision: revision,
         #  ssh_username: ssh_username,
         #  ssh_key: ssh_key,
+        #  idle_timeout: idle timeout in seconds,
         # }
         self.vcs_config = vcs_config
         self.hg = [self.query_exe('hg')] + HG_OPTIONS
                 cmd.extend(['-b', branch])
 
         cmd.extend([repo, dest])
-        self.run_command(cmd, error_list=HgErrorList)
+        self.run_command(cmd, idle_timeout=self.idle_timeout, error_list=HgErrorList)
 
         if update_dest:
             return self.update(dest, branch, revision)

File requirements.txt

+# These packages are needed for mozharness unit tests.
+# Output from 'pip freeze'; we may be able to use other versions of the below packages.
+Cython==0.14.1
+Fabric==1.6.0
+coverage==3.6
+distribute==0.6.35
+dulwich==0.8.7
+hg-git==0.4.0
+logilab-astng==0.24.2
+logilab-common==0.59.0
+mercurial==2.5.3
+mock==1.0.1
+nose==1.2.1
+ordereddict==1.1
+paramiko==1.10.0
+pycrypto==2.6
+pyflakes==0.6.1
+pylint==0.27.0
+simplejson==2.1.1
+unittest2==0.5.1
+virtualenv==1.5.1
+wsgiref==0.1.2

File test/test_base_script.py

 import mock
 import os
 import re
+import sys
+import time
 import unittest
 
-
 import mozharness.base.errors as errors
 import mozharness.base.log as log
 from mozharness.base.log import DEBUG, INFO, WARNING, ERROR, CRITICAL, FATAL, IGNORE
         self.assertEqual(ret[0], args)
         self.assertEqual(ret[1], kwargs)
 
+
+# TestTimeout {{{1
+class TestTimeout(unittest.TestCase):
+    def setUp(self):
+        cleanup()
+        self.s = script.BaseScript(initial_config_file='test/test.json')
+
+    def tearDown(self):
+        # Close the logfile handles, or windows can't remove the logs
+        if hasattr(self, 's') and isinstance(self.s, object):
+            del(self.s)
+        cleanup()
+
+    def testNoTimeout(self):
+        status = self.s.run_command([sys.executable, '-c', 'import time; time.sleep(2)'], idle_timeout=5)
+        self.assertEqual(status, 0)
+
+    def testTimeout(self):
+        t = time.time()
+        status = self.s.run_command([sys.executable, '-c', 'import time; time.sleep(20)'], idle_timeout=2)
+        self.assertNotEqual(status, 0, "Timed-out (and killed) status is 0!")
+        self.assertTrue(time.time() - t < 5, "Idle timeout of 2 seconds took longer than 5 seconds to return!")
+
+    def testCommandLag(self):
+        t = time.time()
+        self.s.run_command(['echo'])
+        self.s.run_command(['echo'])
+        self.s.run_command(['echo'])
+        self.assertTrue(time.time() - t < 1, "Running three echos takes over a full second!")
+
+
 # main {{{1
 if __name__ == '__main__':
     unittest.main()
 export PYTHONPATH=`env pwd`:$PYTHONPATH
 
 echo "### Running pyflakes"
-pyflakes $MOZHARNESS_PY_FILES $SCRIPTS_PY_FILES | grep -v "local variable 'url' is assigned to" | grep -v "redefinition of unused 'json'" | egrep -v "mozharness/mozilla/testing/mozpool\.py.*undefined name 'requests'"
+pyflakes $MOZHARNESS_PY_FILES $SCRIPTS_PY_FILES | grep -v "local variable 'url' is assigned to" | grep -v "redefinition of unused 'json'" | egrep -v "mozharness/mozilla/testing/mozpool\.py.*undefined name 'requests'" | grep -v "list comprehension redefines"
 
 echo "### Running pylint"
 pylint -E -e F -f parseable $MOZHARNESS_PY_FILES $SCRIPTS_PY_FILES 2>&1 | egrep -v '(No config file found, using default configuration|Instance of .* has no .* member|Unable to import .devicemanager|Undefined variable .DMError|Module .hashlib. has no .sha512. member)'
 
 rm -rf build logs
 if [ $OS_TYPE != 'windows' ] ; then
-  echo "### Testing non-networked unit tests"
+  echo "### Running unit tests"
   coverage run -a --branch $COVERAGE_ARGS $NOSETESTS test/test_*.py
   echo "### Running *.py [--list-actions]"
   for filename in $MOZHARNESS_PY_FILES; do