1. Jim Dennis
  2. classh

Commits

ji...@bitbucket.org  committed 2bcbdb0

Add option parsing and some exception handling

Added -n/--numjobs -t/--timeout --progress and --noexpand options
with show_progress() function, usage message, etc.

Noticed some exceptions that could be thrown by Popen and its .communicate
method with extremely short (less than 10 second) timeouts; wrapped those
calls in exception handling.

  • Participants
  • Parent commits 0ace773
  • Branches default
  • Tags 0.05

Comments (0)

Files changed (3)

File README

View file
  • Ignore whitespace
  the output, error messages, exit value, start and end times of each; to
  facilitate sorting, writing into separate files, etc.
 
+ When the --progress switch is used then classh will print progress to
+ stderr consisting of the following characters: .?~! (successful,
+ remote error, ssh error, and killed/timeout respectively).  In that case
+ the other incremental output is skipped by default.
+
+ The --timeout option can over-ride the default job timeout value (300
+ seconds).  (Note: extremely short timeout values --- less than 10 seonds
+ can cause strange errors, 0 or any negative number will disable timeout
+ handling completely).
+
  For a more powerful example consider this:
 
     classh -q -E ~/bin/remediate  -S '~/bin/nextstage someargs ...' \
  ... bar40baz.  By default the same sort of numeric range expansion
  will be performed on each entry in file as it's processed.
 
- 
-
+ The --noexpand option disables this expansion (in both arguments and
+ file contents).
 
  Features:
 

File classh

View file
  • Ignore whitespace
 __author__  = '''Jim Dennis <answrguy@gmail.com>'''
 __url__     = '''http://bitbucket.org/jimd/classh/'''
 __license___= '''PSF (Python Software Foundation)'''
-__version__ = 0.04
+__version__ = 0.05
 
 
+import sys, os, string, signal, re
+from time import sleep, time
 from subprocess import Popen, PIPE
-from time import sleep, time
-import sys, os, string, signal, re
-
 
 def debug(str):   # TODO: use logging package?
     if 'DEBUG' in os.environ and os.environ['DEBUG']==sys.argv[0]:
                     x += 1
                 key = key + ':' + str(x) 
 
-            job.proc = Popen(self.ssh + self.ssh_args + [host] + [self.cmd], 
-	             stdout=PIPE,stderr=PIPE, close_fds=True)
+            try: 
+                job.proc = Popen(self.ssh + self.ssh_args + [host] 
+                           + [self.cmd], 
+                           stdout=PIPE,stderr=PIPE, close_fds=True)
+            except OSError, e:
+                ## Too many open files, or processes or whatever
+                ## Sleep briefly and retry
+                debug('Popen failure %s' % e.errno)
+                self.targets.insert(0, host)
+                sleep(0.1)
+                return 
 
             self.results[key] = 'Placeholder'
             self.pool[key] = job
                 debug('job %s is done' % key)
                 job.exitcode = rv 
                 self.gather(key, job)
-                del self.pool[key]
+                if key in self.pool:
+                    del self.pool[key]
+                else:  #TODO: only seen with extremely short timeouts
+                    debug('key %s not found in pool' % key)
                 reaped.append(key)
             # Handle timeouts:
-            elapsed = time() - job.started
-            if elapsed > self.jobtimeout:
-                debug('job %s timed out after %s seconds' % (key, elapsed))
-                self.kill(job.proc)
-                self.gather(key, job)
-                del self.pool[key]
-                reaped.append(key)
+            if self.jobtimeout > 0:
+                elapsed = time() - job.started
+                if elapsed > self.jobtimeout:
+                    debug('job %s timed out after %s seconds' % (key, elapsed))
+                    self.kill(job.proc)
+                    self.gather(key, job)
+                    if key in self.pool:
+                        del self.pool[key]
+                    else:  #TODO: only seen with extremely short timeouts
+                        debug('key %s not found in pool' % key)
+                    reaped.append(key)
         debug ('reaped %s jobs' % len(reaped))
         self.add_jobs()
         return reaped
 
     def gather(self, key, job):
         '''Gather results from a subprocess
-           These are stored as a dictionary of dictionaries
+           These are stored as a dictionary of jobs
         '''
         debug ('gathering')
-        (job.output, job.errors) = job.proc.communicate()
+        try: 
+            (job.output, job.errors) = job.proc.communicate()
+        except ValueError, e: # Popen error on short timeouts?
+            debug ('Popen.communicate exception')
+            (job.output, job.errors) = ('','')
+            job.exitcode = -1000  ##TODO: Popen:Exception
         job.stopped  = time()
         self.results[key] = job
             
     return results 
         
 
-def expand_range(s, r=None):
+def expand_range(s):
     '''Return a list of strings with expressions like [n-m,p,t] expanded.
 
        Examples:
        ['bar1foo', 'bar2foo', 'bar3foo']
     '''
 
-    debug("\ter(%s,%s)" % (s,r))
+    debug("\texpand_range(%s)" % (s))
     results = [] 
     rx = expr.search(s)  # range expression
     if not rx:
     if job.exitcode:
         print pfix, "\tErrors: ", errs[:40]
     print pfix, "\tOutput: ", outs[:40]
+
+def show_progress(key, job):
+    '''Print progress bar as incremental results
+       . for successful jobs
+       ? for remote errors returned
+       ! for timed out / killed jobs
+       ~ for (probable) ssh errors 
+    '''
+    if    job.exitcode  ==      0: res = '.'
+    elif  job.exitcode  ==    255: res = '~'
+    elif  job.exitcode  ==  -1000: res = '#' # See TODO: Popen:Exception
+    elif  job.exitcode   >      0: res = '?'
+    elif  job.exitcode   <      0: res = '!'
+    sys.stderr.write(res)   # avoid trailing spaciness from print statement
             
 def summarize_results(res):
     success  = 0
     for line in f:
         if line.endswith('\n'):
             line = line[:-1].strip()
-        results.extend(expand_range(line))
+        results.extend(line)
     return results
 
+usage = "usage %prog [switches] cmd targethost [targethost2 ...]" 
+def parse_options():
+    from optparse import OptionParser
+    parser = OptionParser(usage=usage)
+
+    parser.add_option(
+       '-n', '--numjobs', dest='numjobs', 
+       action='store', type='int', default=100,
+       help="Maximum number of concurrent subprocesses (size of job pool)")
+
+    parser.add_option(
+       '-t', '--timeout', dest='timeout', 
+       action='store', type='int', default=300,
+       help="Maximum number of seconds a job may run before being killed")
+
+    parser.add_option(
+       '--progress', dest='progressbar', 
+       action='store_true', default=False,
+       help="Show progress as . (successful) ? (remote error) ! (timed out)")
+
+    parser.add_option(
+       '--noexpand', dest='noexpand', 
+       action='store_true', default=False,
+       help="Disable expansion of hostname ranges")
+       
+    opts, args = parser.parse_args()
+    return (parser, opts, args)
+
 if __name__ == "__main__":
     '''Stupid simple test code and wrapper:
-       first argument is command, rest are hosts
-       Dispatch jobs to hosts and incrementally print
-       results
+       First argument is command, rest are hosts.
+       Dispatch jobs to hosts and incrementally print results
     '''
+
     start = time()
-    if len(sys.argv[1:]) < 2:
-        print >> sys.stderr, "Must specify a command and a list of hosts"
-        sys.exit(1)
+    (parser, options, args) = parse_options()
 
-    cmd   = sys.argv[1]
+    if len(args) < 2:
+        parser.error("Must specify a command and a list of hosts")
+
+    cmd   = args[0]
     hosts = list()
 
-    for arg in sys.argv[2:]:
+    handle_completed = print_results
+    if options.progressbar:
+        handle_completed = show_progress
+    
+    for arg in args[1:]:
         if '/' in arg:
             # it's a filename so:
-            hosts.extend(readfile(arg))
+            if options.noexpand:
+                hosts.extend(readfile(arg))
+            else:
+                for each in readfile(arg):
+                    hosts.extend(expand_range(each))
         else:
-            ## TODO: expand host range expressions
-            hosts.extend(expand_range(arg))
+            if options.noexpand:
+                hosts.append(arg)
+            else:
+                hosts.extend(expand_range(arg))
 
     print >> sys.stderr, "About to run '%s' on %s hosts...\n\n" \
         % (cmd, len(hosts))
 
 
-    job = SSHJobMan(hosts, cmd)
+    job = SSHJobMan(hosts, cmd, 
+        poolsize=options.numjobs,
+        jobtimeout=options.timeout)
+
     job.start()
     
     completed = None
         sleep(0.2)
         if completed:
             for each in completed:
-                print_results(each, job.results[each])
+                handle_completed(each, job.results[each])
         
     summarize_results(job.results)
     print "Completed in %s seconds" % (time() - start)

File classh.py

View file
  • Ignore whitespace
 __author__  = '''Jim Dennis <answrguy@gmail.com>'''
 __url__     = '''http://bitbucket.org/jimd/classh/'''
 __license___= '''PSF (Python Software Foundation)'''
-__version__ = 0.04
+__version__ = 0.05
 
 
+import sys, os, string, signal, re
+from time import sleep, time
 from subprocess import Popen, PIPE
-from time import sleep, time
-import sys, os, string, signal, re
-
 
 def debug(str):   # TODO: use logging package?
     if 'DEBUG' in os.environ and os.environ['DEBUG']==sys.argv[0]:
                     x += 1
                 key = key + ':' + str(x) 
 
-            job.proc = Popen(self.ssh + self.ssh_args + [host] + [self.cmd], 
-	             stdout=PIPE,stderr=PIPE, close_fds=True)
+            try: 
+                job.proc = Popen(self.ssh + self.ssh_args + [host] 
+                           + [self.cmd], 
+                           stdout=PIPE,stderr=PIPE, close_fds=True)
+            except OSError, e:
+                ## Too many open files, or processes or whatever
+                ## Sleep briefly and retry
+                debug('Popen failure %s' % e.errno)
+                self.targets.insert(0, host)
+                sleep(0.1)
+                return 
 
             self.results[key] = 'Placeholder'
             self.pool[key] = job
                 debug('job %s is done' % key)
                 job.exitcode = rv 
                 self.gather(key, job)
-                del self.pool[key]
+                if key in self.pool:
+                    del self.pool[key]
+                else:  #TODO: only seen with extremely short timeouts
+                    debug('key %s not found in pool' % key)
                 reaped.append(key)
             # Handle timeouts:
-            elapsed = time() - job.started
-            if elapsed > self.jobtimeout:
-                debug('job %s timed out after %s seconds' % (key, elapsed))
-                self.kill(job.proc)
-                self.gather(key, job)
-                del self.pool[key]
-                reaped.append(key)
+            if self.jobtimeout > 0:
+                elapsed = time() - job.started
+                if elapsed > self.jobtimeout:
+                    debug('job %s timed out after %s seconds' % (key, elapsed))
+                    self.kill(job.proc)
+                    self.gather(key, job)
+                    if key in self.pool:
+                        del self.pool[key]
+                    else:  #TODO: only seen with extremely short timeouts
+                        debug('key %s not found in pool' % key)
+                    reaped.append(key)
         debug ('reaped %s jobs' % len(reaped))
         self.add_jobs()
         return reaped
 
     def gather(self, key, job):
         '''Gather results from a subprocess
-           These are stored as a dictionary of dictionaries
+           These are stored as a dictionary of jobs
         '''
         debug ('gathering')
-        (job.output, job.errors) = job.proc.communicate()
+        try: 
+            (job.output, job.errors) = job.proc.communicate()
+        except ValueError, e: # Popen error on short timeouts?
+            debug ('Popen.communicate exception')
+            (job.output, job.errors) = ('','')
+            job.exitcode = -1000  ##TODO: Popen:Exception
         job.stopped  = time()
         self.results[key] = job
             
     return results 
         
 
-def expand_range(s, r=None):
+def expand_range(s):
     '''Return a list of strings with expressions like [n-m,p,t] expanded.
 
        Examples:
        ['bar1foo', 'bar2foo', 'bar3foo']
     '''
 
-    debug("\ter(%s,%s)" % (s,r))
+    debug("\texpand_range(%s)" % (s))
     results = [] 
     rx = expr.search(s)  # range expression
     if not rx:
     if job.exitcode:
         print pfix, "\tErrors: ", errs[:40]
     print pfix, "\tOutput: ", outs[:40]
+
+def show_progress(key, job):
+    '''Print progress bar as incremental results
+       . for successful jobs
+       ? for remote errors returned
+       ! for timed out / killed jobs
+       ~ for (probable) ssh errors 
+    '''
+    if    job.exitcode  ==      0: res = '.'
+    elif  job.exitcode  ==    255: res = '~'
+    elif  job.exitcode  ==  -1000: res = '#' # See TODO: Popen:Exception
+    elif  job.exitcode   >      0: res = '?'
+    elif  job.exitcode   <      0: res = '!'
+    sys.stderr.write(res)   # avoid trailing spaciness from print statement
             
 def summarize_results(res):
     success  = 0
     for line in f:
         if line.endswith('\n'):
             line = line[:-1].strip()
-        results.extend(expand_range(line))
+        results.extend(line)
     return results
 
+usage = "usage %prog [switches] cmd targethost [targethost2 ...]" 
+def parse_options():
+    from optparse import OptionParser
+    parser = OptionParser(usage=usage)
+
+    parser.add_option(
+       '-n', '--numjobs', dest='numjobs', 
+       action='store', type='int', default=100,
+       help="Maximum number of concurrent subprocesses (size of job pool)")
+
+    parser.add_option(
+       '-t', '--timeout', dest='timeout', 
+       action='store', type='int', default=300,
+       help="Maximum number of seconds a job may run before being killed")
+
+    parser.add_option(
+       '--progress', dest='progressbar', 
+       action='store_true', default=False,
+       help="Show progress as . (successful) ? (remote error) ! (timed out)")
+
+    parser.add_option(
+       '--noexpand', dest='noexpand', 
+       action='store_true', default=False,
+       help="Disable expansion of hostname ranges")
+       
+    opts, args = parser.parse_args()
+    return (parser, opts, args)
+
 if __name__ == "__main__":
     '''Stupid simple test code and wrapper:
-       first argument is command, rest are hosts
-       Dispatch jobs to hosts and incrementally print
-       results
+       First argument is command, rest are hosts.
+       Dispatch jobs to hosts and incrementally print results
     '''
+
     start = time()
-    if len(sys.argv[1:]) < 2:
-        print >> sys.stderr, "Must specify a command and a list of hosts"
-        sys.exit(1)
+    (parser, options, args) = parse_options()
 
-    cmd   = sys.argv[1]
+    if len(args) < 2:
+        parser.error("Must specify a command and a list of hosts")
+
+    cmd   = args[0]
     hosts = list()
 
-    for arg in sys.argv[2:]:
+    handle_completed = print_results
+    if options.progressbar:
+        handle_completed = show_progress
+    
+    for arg in args[1:]:
         if '/' in arg:
             # it's a filename so:
-            hosts.extend(readfile(arg))
+            if options.noexpand:
+                hosts.extend(readfile(arg))
+            else:
+                for each in readfile(arg):
+                    hosts.extend(expand_range(each))
         else:
-            ## TODO: expand host range expressions
-            hosts.extend(expand_range(arg))
+            if options.noexpand:
+                hosts.append(arg)
+            else:
+                hosts.extend(expand_range(arg))
 
     print >> sys.stderr, "About to run '%s' on %s hosts...\n\n" \
         % (cmd, len(hosts))
 
 
-    job = SSHJobMan(hosts, cmd)
+    job = SSHJobMan(hosts, cmd, 
+        poolsize=options.numjobs,
+        jobtimeout=options.timeout)
+
     job.start()
     
     completed = None
         sleep(0.2)
         if completed:
             for each in completed:
-                print_results(each, job.results[each])
+                handle_completed(each, job.results[each])
         
     summarize_results(job.results)
     print "Completed in %s seconds" % (time() - start)