Commits

Anonymous committed 6c4a54b

Initial publication: tested against localhost a 25,000 host list at work.

  • Participants

Comments (0)

Files changed (2)

+ README: classh
+
+
+ classh is the "Cluster Administrator's ssh" tool.  It is yet another wrapper
+ around ssh for running commands on a number of hosts concurrently similar
+ to xCAT, pssh, Cluster ssh, and a gaggle of other utilities.
+
+ The astute reader will already be asking: *WHY* release ANOTHER such tool?
+
+ A few years ago I need something like this and surveyed the tools available
+ at the time.  My requirements were:
+
+   * Must handle thousands, preferably tens of thousands of targets
+   * Must run a reasonable number (100s) of the jobs concurrently
+   * Must be able to capture the output, error messages, and exit values
+     for each job in a manner amenable to automated post-processing
+   * Must be reliable enough not to stall, hang, nor crash "no matter what"
+   * Must run on a standard Linux installation and be able to handle a
+     variety of standard UNIX targets with no special client/agent/daemon
+     software deployed (other than sshd).
+
+ At that time I also need it to be capable of handling interactive
+ authentication (prompting for a password once, and automatically 
+ responding to ssh and sudo password prompts as necessary).
+
+ I wrote something in Python using os.fork(), os.execve(), Pexpect,
+ and the signal handling module (to handle SIGCHLD events).  It was a
+ relatively ugly hack which we only used internally for the purpose at hand.
+ It's output handling was crude. (Every child process simply wrote
+ $(hostname).{out,err,ret} files into a specified target/job results
+ directory).
+
+ However, it did the job and none of the other tools I reviewed at the
+ time met all of my requirements.  (It's possible that some of them
+ have the features, but have them poorly enough documented or sufficiently
+ inaccessible to a new user that I missed them).
+
+ classh is a re-implementation of that concept.
+
+ Example:
+
+    classh 'hostname;date' host1, host2, host3, ...
+
+
+ ... will simply fire off up to 100 (default) subprocesses of ssh, 
+ each running the 'hostname' and date 'commands' on their respective
+ hosts.
+
+ In this example if there were more than 100 hosts listed then after
+ a 100 jobs were active classh would pause for a few tenths of a second,
+ poll its pool of jobs for any that have completed, print any of the
+ results, kill any jobs that stalled (5 minutes by default), and
+ replenish the job pool until all the jobs were completed.
+
+ By default the output from each job would look like:
+
+	 host2   0   (8.2)     
+	   Output: 
+
+	   host2.foo.xxx
+           Fri Nov 20 03:39:32 PST 2009
+
+	 host1   7   (2.6)     
+	   Error:
+	   date: not found
+
+	   Output:
+
+	   host1.foo.xxx
+
+ ... and so on.  (A number of alternative result displays are supported).
+
+ While classh defaults to incrementally printing results it also captures
+ the output, error messages, exit value, start and end times of each; to
+ facilitate sorting, writing into separate files, etc.
+
+ For a more powerful example consider this:
+
+    classh -q -E ~/bin/remediate  -S '~/bin/nextstage someargs ...' \
+       'test -f /...' ./targets.txt
+
+
+ ... which will quietly (-q) run the command "test -f /..." on every
+ host listed in ./targets.txt and feed the names of each host that
+ reports an error into a process running ~/bin/remediate while feeding
+ all of the successful host names into another process which is running
+ "~/bin/nextstage" with "someargs ..." as arguments.
+
+ In other words we can easily pipeline successful and exceptional results
+ from one classh job into other processes (including, obviously, other 
+ classh commands).  
+
+ The -S and -E options perform a bit of magic, - means classh's own
+ stdout (for normal shell pipeline handling), a directory will be taken
+ as a target for *.{out,err,ret} files (*.ret only in -E directories)
+ an executable (or any string containing a space and starting with an
+ executable filename) will be executed in a subshell (as described) and
+ a regular/writable file will be opened for appending.
+
+ Similarly any of the trailing arguments that looks like a filename
+ (contains a '/' character) will be treated as a list of host names
+ or host patterns.
+
+ There is further magic in the hostname handling.  Any argument
+ that doesn't look like a filename and that does contain [...]
+ expressions such as foo[0-10] or bar[3,2,12-23,40-44]baz ... will be
+ expanded into a list like: foo0 foo1 ... foo10 or bar3baz bar2baz
+ ... bar40baz.  By default the same sort of numeric range expansion
+ will be performed on each entry in file as it's processed.
+
+ 
+
+
+ Features:
+
+   * Runs configurable number of jobs in parallel
+   * Tested on tens of thousands of targets per job
+   * Record exit status, running time, output, and error messages separately
+   * Supports timeouts (and records them)
+   * Supports (optional) incremental results gathering/processing
+   * Feed hostnames from successful and/or exceptional jobs into
+     their own files or processes.
+   * Flexible host pattern expansion (foo[1-20,31,32,40-100]bar.xxx)
+   * Flexible options for saving output, errors, and exit values
+     (including pickling all results for import)
+   * Support interactive shell
+   * Importable as a Python module: use to build more powerful scripts
+   * Basic functionality in one file using only Python 2.4 std libs.
+
+
+ Rhetorical Questions:
+
+   * Why not multiprocessing module?
+   * Why not Twisted/conch?
+ 
+
+
+ Links to Related Packages:
+
+ * http://vxargs.sourceforge.net/
+ * http://pydsh.sourceforge.net/
+ * http://pussh.sourceforge.net/
+ * http://www.csm.ornl.gov/torc/C3/  ## Cluster Command&Control (Python)
+ * http://guichaz.free.fr/gsh/  Group shell
+ * http://www.lysator.liu.se/fsh/    ## Honorable mention
+ * http://jonas.bardinosen.dk/pywrat/
+ * http://www.cure.nom.fr/blog/archives/83-SRC-Simultaneous-Remote-Command.html
+ * http://freshmeat.net/projects/octopussh (leads to 404 errors)
+
+
+ * http://freshmeat.net/projects/pconsole
+ * http://tentakel.biskalar.de/
+ * http://web.taranis.org/shmux/     ## Also good links!
+ * http://taktuk.gforge.inria.fr/
+ * http://www.tuxrocks.com/Projects/p-run/
+ * http://omnitty.sourceforge.net/
+ * http://www.theether.org/pssh/
+ * http://www.lerp.com/~sic/mass/
+ * http://outflux.net/unix/software/gsh/
+ * http://watson-wilson.ca/blog/sshdo.html 
+ * http://xcat.sourceforge.net/
+ * http://cssh.sourceforge.net/
+ * http://clusterit.sourceforge.net/
+ * http://sourceforge.net/projects/distribulator/
+ * http://sourceforge.net/projects/clusterssh/develop
+   * http://code.google.com/p/csshx/
+ * http://sourceforge.net/projects/clusterm/
+ * http://sourceforge.net/projects/consh/
+ * http://sourceforge.net/projects/mpssh/
+ * http://sourceforge.net/projects/mrtools/
+ * http://sourceforge.net/projects/mussh/
+ * http://sourceforge.net/projects/pdsh/
+ * http://sourceforge.net/projects/remotecmd/
+ * http://sourceforge.net/projects/rover/files/
+
+ * http://www.stearns.org/fanout/README.html
+ * http://y3sy3s.lafibre.org/?q=node/6
+ * 
+
+ Honorable Mention:
+ * http://open.controltier.org/wiki/ControlTier
+ * https://fedorahosted.org/func/
+ * http://expect.nist.gov/example/multixterm.man.html
+ * http://stromberg.dnsalias.org/~dstromberg/looper/
+ * http://mi.eng.cam.ac.uk/~er258/code/parallel.html
+
+ * http://www.noah.org/wiki/Pexpect
+ 
+#!/usr/bin/env python
+__author__   = '''James T. Dennis answrguy@gmail.com'''
+__version__  = '''0.01'''
+__homepage__ = '''http://
+
+from subprocess import Popen, PIPE
+from time import sleep, time
+import sys, os, signal
+## from signal import alarm
+
+
+def debug(str):   # TODO: use logging package?
+    if 'DEBUG' in os.environ:
+        print >> sys.stderr, str
+
+class SSHJobMan(object):
+    '''SSH Job Manager
+       Given a command and a list of targets (hostnames), concurrently
+       run the command on some number (poolsize) of the targets,
+       gathering all the results (output, error messages and exit values)
+       and handling any timeouts (jobtimeout)
+       
+       Maintains a dictionary of results, each item of which is a dictionary
+       containing the specified types fo results.
+    '''
+
+
+    def __init__(self, hostlist=None, cmd=None, **opts):
+        '''
+        '''
+        self.started = False
+        self.poolsize = 100
+        self.pool = dict()         # for maintain host/process data
+        self.results = dict()  
+        self.jobtimeout = 300
+        self.ssh = [ '/usr/bin/ssh' ] # TODO: dynamically find this
+        self.ssh_args = [ '-oBatchMode=yes'] #TODO: add other batch friendly options?
+
+        self.targets = hostlist[:]
+        self.cmd = cmd 
+
+        self.__dict__.update(opts)  # Ugly hack?
+        
+
+    def start(self):
+        '''Set instance started flag
+           Prime the pool with new jobs
+
+           This is deferred from initialization to
+           allow the caller to hook self.poll() into
+           any signal handling (SIGCHILD, SIGALARM), etc.
+        '''
+
+        debug ('Starting')
+        self.add_jobs()
+        self.started = time()
+        
+
+    def add_jobs(self):
+        '''Called from start and from poll to fill the pool with
+           subprocesses.  While the start method primes the pool, 
+           poll method keeps it topped off until all targets have been
+           handled.
+
+           Jobs are added to the pool and to the results with a unique
+           key.  The poll method removes completed jobs from the pool
+           calling the gather method to save their results.  The key
+           is used to keep track of each job even if multiple jobs
+           go to the same target.  (If there are duplicate targets than
+           the additional keys will be of the form: hostname..XX)
+        '''
+
+        while self.targets and len(self.pool.keys()) < self.poolsize:
+            debug('adding jobs')
+            key   = self.targets.pop()
+            host  = key
+            if key in self.results:
+                x = 0 
+                while key + '..' + str(x) in self.results:  # Unique-ify a key
+                    x += 1
+                key = key + '..' + str(x) 
+            self.results[key] = {   # Placeholder  + start time
+              'Job:': host,
+              'Start': time() }
+            proc = Popen(self.ssh + self.ssh_args + [host] + [cmd], 
+	             stdout=PIPE,stderr=PIPE, close_fds=True)
+            self.pool[key] = (host, proc, time()) # start time for timeouts
+            debug('... added job %s' % key)
+
+    def poll(self):
+        '''Scan pool for completed jobs, 
+           call gather() for any of those
+           remove completed jobs from pool
+           call add_jobs() to top off pool
+           return list of completed job keys
+        '''
+        
+        debug ('polling')
+        reaped = list() 
+        for key, job in self.pool.items():
+            (host, proc, starttime) = job
+            rv = proc.poll()
+            if rv is not None:
+                debug('job %s is done' % key)
+                self.gather(key, host, rv, proc)
+                del self.pool[key]
+                reaped.append(key)
+            # Handle timeouts:
+            elapsed = time() - starttime
+            if elapsed > self.jobtimeout:
+                debug('job %s timed out after %s seconds' % (key, elapsed))
+                self.kill(proc)
+                self.gather(key, host, proc.poll(), proc)
+                del self.pool[key]
+                reaped.append(key)
+        debug ('reaped %s jobs' % len(reaped))
+        self.add_jobs()
+        return reaped
+
+    def gather(self, key, hostname, exitval, proc):
+        '''Gather results from a subprocess
+           These are stored as a dictionary of dictionaries
+        '''
+        debug ('gathering')
+        (out,err) = proc.communicate()
+        self.results[key] = {
+              'Job': hostname,
+              'Ret': exitval,
+              'Out': out,
+              'Err': err,
+              'Start': self.results[key]['Start'],
+              'End': time()
+              }
+            
+    def kill(self, proc):
+        '''Kill a subprocess which has exceeded its timeout
+           called by poll()
+        '''
+        ## TODO: import signal and do this using signal.SIGKILL
+        debug('Killing %s' % proc.pid)
+        try:
+            os.kill(proc.pid, 9)
+        except OSError:
+            debug('Ignoring os.kill OSError')
+
+    def done(self):
+        '''We're done when we have been started and
+           we have zero remaining jobs in the pool
+        '''
+        debug ('done called with %s remaining jobs' % len(self.pool.keys()))
+        return self.started and not len(self.pool.keys())
+
+
+def print_results(key, res):
+    errs = ' '.join(res['Err'].split('\n'))
+    outs = ' '.join(res['Out'].split('\n')) 
+    errs = errs[:60]
+    outs = outs[:60]
+
+    pfix = ''.join(["%-40s" % key, "%-5s" % res['Ret'], 
+        "(%5.2d)" % (res['End'] - res['Start'])])
+    if res['Ret']:
+        print pfix, "\tErrors: ", errs
+    print pfix, "\tOutput: ", outs
+            
+def summarize_results(res):
+    success = 0
+    errs = 0
+    timeouts = 0
+    for i in res.values():
+        if    i['Ret'] == 0: success += 1
+        elif  i['Ret'] >  0: errs += 1
+        elif  i['Ret'] <  0: timeouts += 1
+    print "\n\n\tSuccessful: %s\tErrors: %s\tSignaled(timeouts): %s" \
+      % (success, errs, timeouts)
+
+
+def readfile(fname):
+    results = []
+    try: 
+        f = open(fname)
+    except EnvironmentError, e:
+        print >> sys.stderr, "Error reading %s, %s" % (fname, e)
+    for line in f:
+        if line.endswith('\n'):
+            line = line[:-1]
+        results.append(line)
+    return results
+
+if __name__ == "__main__":
+    '''Stupid simple test code and wrapper:
+       first argument is command, rest are hosts
+       Dispatch jobs to hosts and incrementally print
+       results
+    '''
+    if len(sys.argv[1:]) < 2:
+        print >> sys.stderr, "Must specify a command and a list of hosts"
+        sys.exit(1)
+
+    cmd   = sys.argv[1]
+    hosts = list()
+
+    for arg in sys.argv[2:]:
+        if '/' in arg:
+            # it's a filename so:
+            hosts.extend(readfile(arg))
+        else:
+            ## TODO: expand host range expressions
+            hosts.append(arg)
+
+    print >> sys.stderr, "About to run '%s' on %s hosts...\n\n" \
+        % (cmd, len(hosts))
+
+
+    job = SSHJobMan(hosts, cmd)
+    job.start()
+
+    completed = None
+    while not job.done():
+        completed = job.poll()
+        sleep(0.2)
+
+	if completed:
+            for each in completed:
+                print_results(each, job.results[each])
+        
+    summarize_results(job.results)
+