Anonymous avatar Anonymous committed 73593d3

Adding author and author_email data to setup.py

Comments (0)

Files changed (2)

classh.py

-#!/usr/bin/env python
-##############################################################################
-'''Provide an easy way to concurrently run ssh jobs on a large number of 
-   targets, gather output, error messages and results from each and handle 
-   timeouts.
-'''
-
-__author__='''Jim Dennis <answrguy@gmail.com>'''
-__url__   ='''http://bitbucket.org/jimd/classh/'''
-__license___='''BSD'''
-
-
-from subprocess import Popen, PIPE
-from time import sleep, time
-import sys, os, string, signal
-## from signal import alarm
-
-
-def debug(str):   # TODO: use logging package?
-    if 'DEBUG' in os.environ:
-        print >> sys.stderr, str
-
-class SSHJobMan(object):
-    '''SSH Job Manager
-       Given a command and a list of targets (hostnames), concurrently
-       run the command on some number (poolsize) of the targets,
-       gathering all the results (output, error messages and exit values)
-       and handling any timeouts (jobtimeout)
-       
-       Maintains a dictionary of results, each item of which is a dictionary
-       containing the specified types fo results.
-    '''
-
-
-    def __init__(self, hostlist=None, cmd=None, **opts):
-        '''
-        '''
-        self.started = False
-        self.poolsize = 50
-        self.pool = dict()         # for maintain host/process data
-        self.results = dict()  
-        self.jobtimeout = 300
-        self.ssh = [ '/usr/bin/ssh' ] # TODO: dynamically find this
-        self.ssh_args = [ '-oBatchMode=yes'] #TODO: add other batch friendly options?
-
-        self.targets = hostlist[:]
-        self.cmd = cmd 
-
-        self.__dict__.update(opts)  # Ugly hack?
-        
-
-    def start(self):
-        '''Set instance started flag
-           Prime the pool with new jobs
-
-           This is deferred from initialization to
-           allow the caller to hook self.poll() into
-           any signal handling (SIGCHILD, SIGALARM), etc.
-        '''
-
-        debug ('Starting')
-        self.add_jobs()
-        self.started = time()
-        
-
-    def add_jobs(self):
-        '''Called from start and from poll to fill the pool with
-           subprocesses.  While the start method primes the pool, 
-           poll method keeps it topped off until all targets have been
-           handled.
-
-           Jobs are added to the pool and to the results with a unique
-           key.  The poll method removes completed jobs from the pool
-           calling the gather method to save their results.  The key
-           is used to keep track of each job even if multiple jobs
-           go to the same target.  (If there are duplicate targets than
-           the additional keys will be of the form: hostname:XX)
-        '''
-
-        while self.targets and len(self.pool.keys()) < self.poolsize:
-            debug('adding jobs')
-            key   = self.targets.pop()
-            host  = key
-            if key in self.results:
-                x = 0 
-                while key + ':' + str(x) in self.results:  # Unique-ify a key
-                    x += 1
-                key = key + ':' + str(x) 
-            self.results[key] = {   # Placeholder  + start time
-              'Job:': host,
-              'Start': time() }
-            proc = Popen(self.ssh + self.ssh_args + [host] + [self.cmd], 
-	             stdout=PIPE,stderr=PIPE, close_fds=True)
-            self.pool[key] = (host, proc, time()) # start time for timeouts
-            debug('... added job %s' % key)
-
-    def poll(self):
-        '''Scan pool for completed jobs, 
-           call gather() for any of those
-           remove completed jobs from pool
-           call add_jobs() to top off pool
-           return list of completed job keys
-        '''
-        
-        debug ('polling')
-        reaped = list() 
-        for key, job in self.pool.items():
-            (host, proc, starttime) = job
-            rv = proc.poll()
-            if rv is not None:
-                debug('job %s is done' % key)
-                self.gather(key, host, rv, proc)
-                del self.pool[key]
-                reaped.append(key)
-            # Handle timeouts:
-            elapsed = time() - starttime
-            if elapsed > self.jobtimeout:
-                debug('job %s timed out after %s seconds' % (key, elapsed))
-                self.kill(proc)
-                self.gather(key, host, proc.poll(), proc)
-                del self.pool[key]
-                reaped.append(key)
-        debug ('reaped %s jobs' % len(reaped))
-        self.add_jobs()
-        return reaped
-
-    def gather(self, key, hostname, exitval, proc):
-        '''Gather results from a subprocess
-           These are stored as a dictionary of dictionaries
-        '''
-        debug ('gathering')
-        (out,err) = proc.communicate()
-        self.results[key] = {
-              'Job': hostname,
-              'Ret': exitval,
-              'Out': out,
-              'Err': err,
-              'Start': self.results[key]['Start'],
-              'End': time()
-              }
-            
-    def kill(self, proc):
-        '''Kill a subprocess which has exceeded its timeout
-           called by poll()
-        '''
-        debug('Killing %s' % proc.pid)
-        try:
-            os.kill(proc.pid, signal.SIGTERM)
-        except OSError:
-            debug('Trying SIGKILL!')
-            try:
-                os.kill(proc.pid, signal.SIGKILL)
-            except OSError:
-                debug('Ignoring os.kill OSError')
-
-    def done(self):
-        '''We're done when we have been started and
-           we have zero remaining jobs in the pool
-        '''
-        debug ('done called with %s remaining jobs' % len(self.pool.keys()))
-        return self.started and not len(self.pool.keys())
-
-
-# Use in print results to munge non-printable chars
-filter = string.maketrans(
-           string.translate(string.maketrans('',''),
-           string.maketrans('',''),string.printable[:-5]),
-           '.'*len(string.translate(string.maketrans('',''),
-           string.maketrans('',''),string.printable[:-5])))
-
-
-def print_results(key, res):
-    errs = ' '.join(res['Err'].split('\n'))
-    outs = ' '.join(res['Out'].split('\n')) 
-    errs = errs[:60].translate(filter)
-    outs = outs[:60].translate(filter)
-
-    pfix = ''.join(["%-48s" % key, "%-5s" % res['Ret'], 
-        "(%s)" % ("%0.2f" % (res['End'] - res['Start']))])
-    if res['Ret']:
-        print pfix, "\tErrors: ", errs[:40]
-    print pfix, "\tOutput: ", outs[:40]
-            
-def summarize_results(res):
-    success = 0
-    errs = 0
-    timeouts = 0
-    for i in res.values():
-        if    i['Ret'] == 0: success += 1
-        elif  i['Ret'] >  0: errs += 1
-        elif  i['Ret'] <  0: timeouts += 1
-    print "\n\n\tSuccessful: %s\tErrors: %s\tSignaled(timeouts): %s" \
-      % (success, errs, timeouts)
-
-
-def readfile(fname):
-    results = []
-    try: 
-        f = open(fname)
-    except EnvironmentError, e:
-        print >> sys.stderr, "Error reading %s, %s" % (fname, e)
-    for line in f:
-        if line.endswith('\n'):
-            line = line[:-1].strip()
-        results.append(line)
-    return results
-
-if __name__ == "__main__":
-    '''Stupid simple test code and wrapper:
-       first argument is command, rest are hosts
-       Dispatch jobs to hosts and incrementally print
-       results
-    '''
-    start = time()
-    if len(sys.argv[1:]) < 2:
-        print >> sys.stderr, "Must specify a command and a list of hosts"
-        sys.exit(1)
-
-    cmd   = sys.argv[1]
-    hosts = list()
-
-    for arg in sys.argv[2:]:
-        if '/' in arg:
-            # it's a filename so:
-            hosts.extend(readfile(arg))
-        else:
-            ## TODO: expand host range expressions
-            hosts.append(arg)
-
-    print >> sys.stderr, "About to run '%s' on %s hosts...\n\n" \
-        % (cmd, len(hosts))
-
-
-    job = SSHJobMan(hosts, cmd)
-    job.start()
-
-    completed = None
-    while not job.done():
-        completed = job.poll()
-        sleep(0.2)
-        if completed:
-            for each in completed:
-                print_results(each, job.results[each])
-        
-    summarize_results(job.results)
-    print "Completed in %s seconds" % (time() - start)
-       
 setup(
     name='classh',
     version='0.01',
+    author='James T. Dennis',
+    author_email='answrguy@gmail.com',
     description="Cluster Administrators' ssh Wrapper",
     url='http://bitbucket.org/jimd/classh/',
     packages=['classh']
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.