Commits

Anonymous committed 0ace773

Refactored SSHJobMan.results into dictionary of JobRecord objects

Adding a JobRecord class and refactored all to use that; the syntax is
thus much cleaner for accessing job.output, job.exitcode, and so on.

Set default poolsize back to 100; eliminated copy/slice of targets
(it was unnecessary at best).

  • Participants
  • Parent commits fc419c7

Comments (0)

Files changed (2)

 
 __author__  = '''Jim Dennis <answrguy@gmail.com>'''
 __url__     = '''http://bitbucket.org/jimd/classh/'''
-__version__ = 0.03
 __license___= '''PSF (Python Software Foundation)'''
+__version__ = 0.04
 
 
 from subprocess import Popen, PIPE
     if 'DEBUG' in os.environ and os.environ['DEBUG']==sys.argv[0]:
         print >> sys.stderr, str
 
+class JobRecord(object):
+    def __init__(self, target):
+        self.target   = target
+        self.proc     = None      
+        self.exitcode = None 
+        self.output   = None
+        self.errors   = None
+        self.started  = time()
+        self.stopped  = None
+
 class SSHJobMan(object):
     '''SSH Job Manager
        Given a command and a list of targets (hostnames), concurrently
        containing the specified types fo results.
     '''
 
-
     def __init__(self, hostlist=None, cmd=None, **opts):
         '''
         '''
         self.started = False
-        self.poolsize = 50
-        self.pool = dict()         # for maintain host/process data
+        self.poolsize = 100
+        self.jobtimeout = 300
+
+        self.pool    = dict()         # for maintaining host/process data
         self.results = dict()  
-        self.jobtimeout = 300
-        self.ssh = [ '/usr/bin/ssh' ] # TODO: dynamically find this
-        self.ssh_args = [ '-oBatchMode=yes'] #TODO: add other batch friendly options?
+        self.ssh = [ '/usr/bin/ssh' ]        #TODO: dynamically find this
+        self.ssh_args = [ '-oBatchMode=yes'] #TODO: add other batch options?
 
-        self.targets = hostlist[:]
+        self.targets = hostlist
         self.cmd = cmd 
 
         self.__dict__.update(opts)  # Ugly hack?
 
         while self.targets and len(self.pool.keys()) < self.poolsize:
             debug('adding jobs')
-            key   = self.targets.pop()
-            host  = key
+            host = self.targets.pop()
+            job = JobRecord(host)
+            key = host
             if key in self.results:
                 x = 0 
                 while key + ':' + str(x) in self.results:  # Unique-ify a key
                     x += 1
                 key = key + ':' + str(x) 
-            self.results[key] = {   # Placeholder  + start time
-              'Job:': host,
-              'Start': time() }
-            proc = Popen(self.ssh + self.ssh_args + [host] + [self.cmd], 
+
+            job.proc = Popen(self.ssh + self.ssh_args + [host] + [self.cmd], 
 	             stdout=PIPE,stderr=PIPE, close_fds=True)
-            self.pool[key] = (host, proc, time()) # start time for timeouts
+
+            self.results[key] = 'Placeholder'
+            self.pool[key] = job
             debug('... added job %s' % key)
 
     def poll(self):
         debug ('polling')
         reaped = list() 
         for key, job in self.pool.items():
-            (host, proc, starttime) = job
-            rv = proc.poll()
+            rv = job.proc.poll()
             if rv is not None:
                 debug('job %s is done' % key)
-                self.gather(key, host, rv, proc)
+                job.exitcode = rv 
+                self.gather(key, job)
                 del self.pool[key]
                 reaped.append(key)
             # Handle timeouts:
-            elapsed = time() - starttime
+            elapsed = time() - job.started
             if elapsed > self.jobtimeout:
                 debug('job %s timed out after %s seconds' % (key, elapsed))
-                self.kill(proc)
-                self.gather(key, host, proc.poll(), proc)
+                self.kill(job.proc)
+                self.gather(key, job)
                 del self.pool[key]
                 reaped.append(key)
         debug ('reaped %s jobs' % len(reaped))
         self.add_jobs()
         return reaped
 
-    def gather(self, key, hostname, exitval, proc):
+    def gather(self, key, job):
         '''Gather results from a subprocess
            These are stored as a dictionary of dictionaries
         '''
         debug ('gathering')
-        (out,err) = proc.communicate()
-        self.results[key] = {
-              'Job': hostname,
-              'Ret': exitval,
-              'Out': out,
-              'Err': err,
-              'Start': self.results[key]['Start'],
-              'End': time()
-              }
+        (job.output, job.errors) = job.proc.communicate()
+        job.stopped  = time()
+        self.results[key] = job
             
     def kill(self, proc):
         '''Kill a subprocess which has exceeded its timeout
            string.maketrans('',''),string.printable[:-5])))
 
 
-def print_results(key, res):
-    errs = ' '.join(res['Err'].split('\n'))
-    outs = ' '.join(res['Out'].split('\n')) 
+def print_results(key, job):
+    errs = ' '.join(job.errors.split('\n'))
+    outs = ' '.join(job.output.split('\n')) 
     errs = errs[:60].translate(filter)
     outs = outs[:60].translate(filter)
 
-    pfix = ''.join(["%-48s" % key, "%-5s" % res['Ret'], 
-        "(%s)" % ("%0.2f" % (res['End'] - res['Start']))])
-    if res['Ret']:
+    pfix = ''.join(["%-48s" % key, "%-5s" % job.exitcode, 
+        "(%s)" % ("%0.2f" % (job.stopped - job.started))])
+    if job.exitcode:
         print pfix, "\tErrors: ", errs[:40]
     print pfix, "\tOutput: ", outs[:40]
             
 def summarize_results(res):
-    success = 0
-    errs = 0
+    success  = 0
+    errs     = 0
     timeouts = 0
     for i in res.values():
-        if    i['Ret'] == 0: success += 1
-        elif  i['Ret'] >  0: errs += 1
-        elif  i['Ret'] <  0: timeouts += 1
+        if    i.exitcode  ==  0: success  += 1
+        elif  i.exitcode   >  0: errs     += 1
+        elif  i.exitcode   <  0: timeouts += 1
     print "\n\n\tSuccessful: %s\tErrors: %s\tSignaled(timeouts): %s" \
       % (success, errs, timeouts)
 
-
 def readfile(fname):
     results = []
     try: 
 
     job = SSHJobMan(hosts, cmd)
     job.start()
-
+    
     completed = None
     while not job.done():
         completed = job.poll()
 
 __author__  = '''Jim Dennis <answrguy@gmail.com>'''
 __url__     = '''http://bitbucket.org/jimd/classh/'''
-__version__ = 0.03
 __license___= '''PSF (Python Software Foundation)'''
+__version__ = 0.04
 
 
 from subprocess import Popen, PIPE
     if 'DEBUG' in os.environ and os.environ['DEBUG']==sys.argv[0]:
         print >> sys.stderr, str
 
+class JobRecord(object):
+    def __init__(self, target):
+        self.target   = target
+        self.proc     = None      
+        self.exitcode = None 
+        self.output   = None
+        self.errors   = None
+        self.started  = time()
+        self.stopped  = None
+
 class SSHJobMan(object):
     '''SSH Job Manager
        Given a command and a list of targets (hostnames), concurrently
        containing the specified types fo results.
     '''
 
-
     def __init__(self, hostlist=None, cmd=None, **opts):
         '''
         '''
         self.started = False
-        self.poolsize = 50
-        self.pool = dict()         # for maintain host/process data
+        self.poolsize = 100
+        self.jobtimeout = 300
+
+        self.pool    = dict()         # for maintaining host/process data
         self.results = dict()  
-        self.jobtimeout = 300
-        self.ssh = [ '/usr/bin/ssh' ] # TODO: dynamically find this
-        self.ssh_args = [ '-oBatchMode=yes'] #TODO: add other batch friendly options?
+        self.ssh = [ '/usr/bin/ssh' ]        #TODO: dynamically find this
+        self.ssh_args = [ '-oBatchMode=yes'] #TODO: add other batch options?
 
-        self.targets = hostlist[:]
+        self.targets = hostlist
         self.cmd = cmd 
 
         self.__dict__.update(opts)  # Ugly hack?
 
         while self.targets and len(self.pool.keys()) < self.poolsize:
             debug('adding jobs')
-            key   = self.targets.pop()
-            host  = key
+            host = self.targets.pop()
+            job = JobRecord(host)
+            key = host
             if key in self.results:
                 x = 0 
                 while key + ':' + str(x) in self.results:  # Unique-ify a key
                     x += 1
                 key = key + ':' + str(x) 
-            self.results[key] = {   # Placeholder  + start time
-              'Job:': host,
-              'Start': time() }
-            proc = Popen(self.ssh + self.ssh_args + [host] + [self.cmd], 
+
+            job.proc = Popen(self.ssh + self.ssh_args + [host] + [self.cmd], 
 	             stdout=PIPE,stderr=PIPE, close_fds=True)
-            self.pool[key] = (host, proc, time()) # start time for timeouts
+
+            self.results[key] = 'Placeholder'
+            self.pool[key] = job
             debug('... added job %s' % key)
 
     def poll(self):
         debug ('polling')
         reaped = list() 
         for key, job in self.pool.items():
-            (host, proc, starttime) = job
-            rv = proc.poll()
+            rv = job.proc.poll()
             if rv is not None:
                 debug('job %s is done' % key)
-                self.gather(key, host, rv, proc)
+                job.exitcode = rv 
+                self.gather(key, job)
                 del self.pool[key]
                 reaped.append(key)
             # Handle timeouts:
-            elapsed = time() - starttime
+            elapsed = time() - job.started
             if elapsed > self.jobtimeout:
                 debug('job %s timed out after %s seconds' % (key, elapsed))
-                self.kill(proc)
-                self.gather(key, host, proc.poll(), proc)
+                self.kill(job.proc)
+                self.gather(key, job)
                 del self.pool[key]
                 reaped.append(key)
         debug ('reaped %s jobs' % len(reaped))
         self.add_jobs()
         return reaped
 
-    def gather(self, key, hostname, exitval, proc):
+    def gather(self, key, job):
         '''Gather results from a subprocess
            These are stored as a dictionary of dictionaries
         '''
         debug ('gathering')
-        (out,err) = proc.communicate()
-        self.results[key] = {
-              'Job': hostname,
-              'Ret': exitval,
-              'Out': out,
-              'Err': err,
-              'Start': self.results[key]['Start'],
-              'End': time()
-              }
+        (job.output, job.errors) = job.proc.communicate()
+        job.stopped  = time()
+        self.results[key] = job
             
     def kill(self, proc):
         '''Kill a subprocess which has exceeded its timeout
            string.maketrans('',''),string.printable[:-5])))
 
 
-def print_results(key, res):
-    errs = ' '.join(res['Err'].split('\n'))
-    outs = ' '.join(res['Out'].split('\n')) 
+def print_results(key, job):
+    errs = ' '.join(job.errors.split('\n'))
+    outs = ' '.join(job.output.split('\n')) 
     errs = errs[:60].translate(filter)
     outs = outs[:60].translate(filter)
 
-    pfix = ''.join(["%-48s" % key, "%-5s" % res['Ret'], 
-        "(%s)" % ("%0.2f" % (res['End'] - res['Start']))])
-    if res['Ret']:
+    pfix = ''.join(["%-48s" % key, "%-5s" % job.exitcode, 
+        "(%s)" % ("%0.2f" % (job.stopped - job.started))])
+    if job.exitcode:
         print pfix, "\tErrors: ", errs[:40]
     print pfix, "\tOutput: ", outs[:40]
             
 def summarize_results(res):
-    success = 0
-    errs = 0
+    success  = 0
+    errs     = 0
     timeouts = 0
     for i in res.values():
-        if    i['Ret'] == 0: success += 1
-        elif  i['Ret'] >  0: errs += 1
-        elif  i['Ret'] <  0: timeouts += 1
+        if    i.exitcode  ==  0: success  += 1
+        elif  i.exitcode   >  0: errs     += 1
+        elif  i.exitcode   <  0: timeouts += 1
     print "\n\n\tSuccessful: %s\tErrors: %s\tSignaled(timeouts): %s" \
       % (success, errs, timeouts)
 
-
 def readfile(fname):
     results = []
     try: 
 
     job = SSHJobMan(hosts, cmd)
     job.start()
-
+    
     completed = None
     while not job.done():
         completed = job.poll()