pythongrid / pythongrid.py

Diff from to

pythongrid.py

 import pythongrid
 from os.path import join as jp
 
-import pdb
-
 # Define any extra python paths
 PYTHONPATH = []
 if len(PYTHONPATH) > 0:
-    PPATH=reduce(lambda x,y: x+':'+y, PYTHONPATH) ;
+    PPATH = reduce(lambda x, y: x+':'+y, PYTHONPATH)
     print PPATH
-    os.environ['PYTHONPATH'] = PPATH;
+    os.environ['PYTHONPATH'] = PPATH
     sys.path.extend(PYTHONPATH)
-    print "sys.path=" + str(sys.path) ;
+    print "sys.path=" + str(sys.path)
 
 # used for generating random filenames
 alphabet = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
 
 # Check for drmaa and multiprocessing
-drmaa_present=True
-multiprocessing_present=True
+drmaa_present = True
+multiprocessing_present = True
 
 try:
     import drmaa
     print "Error importing drmaa. Only local multi-threading supported."
     print "Please check your installation."
     print detail
-    drmaa_present=False
+    drmaa_present = False
 
 try:
     import multiprocessing
 except ImportError, detail:
     print "Error importing multiprocessing. Local computing limited to one CPU."
-    print "Please install python2.6 or the backport of the multiprocessing package"
+    print "Please install python2.6"
+    print "or the backport of the multiprocessing package"
     print detail
-    multiprocessing_present=False
+    multiprocessing_present = False
 
 class Job(object):
     """
     the execute method gets called
     """
 
-    def __init__(self, f, args, kwlist={}, cleanup=True):
+    def __init__(self, func, args, kwlist=None, cleanup=True):
         """
         constructor of Job
 
-        @param f: a function, which should be executed.
-        @type f: function
+        @param func: a function, which should be executed.
+        @type func: function
         @param args: argument list of function f
         @type args: list
         @param kwlist: dictionary of keyword arguments
         @param cleanup: flag that determines the cleanup of input and log file
         @type cleanup: boolean
         """
-        self.f = f
+        self.func = func
         self.args = args
         self.kwlist = kwlist
         self.cleanup = cleanup
         self.ret = None
         self.inputfile = ""
         self.outputfile = ""
-        self.nativeSpecification = ""
+        self.native_specification = ""
         self.exception = None
         self.environment = None
         self.replace_env = False
 
         # TODO: ensure uniqueness of file names
         self.name = 'pg'+''.join([random.choice(alphabet) for a in xrange(8)])
-        self.inputfile = jp(outdir,self.name + "_in.bz2")
-        self.outputfile = jp(outdir,self.name + "_out.bz2")
+        self.inputfile = jp(outdir, self.name + "_in.bz2")
+        self.outputfile = jp(outdir, self.name + "_out.bz2")
         self.jobid = ""
 
 
     def __repr_broken__(self):
         #TODO: fix representation
         retstr1 = ('%s\nargs=%s\nkwlist=%s\nret=%s\ncleanup=%s' %
-                   self.f, self.args, self.kwlist, self.ret, self.cleanup)
-        retstr2 = ('\nnativeSpecification=%s\ninputfile=%s\noutputfile=%s\n' %
-                   self.nativeSpecification, self.inputfile, self.outputfile)
+                   self.func, self.args, self.kwlist, self.ret, self.cleanup)
+        retstr2 = ('\nnative_specification=%s\ninputfile=%s\noutputfile=%s\n' %
+                   self.native_specification, self.inputfile, self.outputfile)
         return retstr1 + retstr2
 
     def execute(self):
         Input data is removed after execution to save space.
         """
         try:
-            self.ret = apply(self.f, self.args, self.kwlist)
+            self.ret = apply(self.func, self.args, self.kwlist)
 
-        except Exception, e:
+        except Exception, error:
 
             print "exception encountered"
-            print "type:", str(type(e))
+            print "type:", str(type(error))
             print "line number:", sys.exc_info()[2].tb_lineno
-            print e
+            print error
             traceback.print_exc(file=sys.stdout)
             print "========="
-            self.exception = e
+            self.exception = error
 
 
 
 
 
 
-def _process_jobs_locally(jobs, maxNumThreads=None):
+def _process_jobs_locally(jobs, max_num_threads=None):
     """
     Local execution using the package multiprocessing, if present
     
     @param jobs: jobs to be executed
     @type jobs: list<Job>
-    @param maxNumThreads: maximal number of threads
-    @type maxNumThreads: int
+    @param max_num_threads: maximal number of threads
+    @type max_num_threads: int
     
     @return: list of jobs, each with return in job.ret
     @rtype: list<Job>
     """
     
     
-    if (not multiprocessing_present or maxNumThreads == 1):
+    if (not multiprocessing_present or max_num_threads == 1):
         #perform sequential computation
         for job in jobs:
             job.execute()
     else:
         #print multiprocessing.cpu_count()
-        po = multiprocessing.Pool(maxNumThreads)
+        po = multiprocessing.Pool(max_num_threads)
         result = po.map(_execute, jobs)
-        for ix,job in enumerate(jobs):
+        for ix, job in enumerate(jobs):
             job.ret = result[ix]
             
     return jobs
         #             }
 
         # fetch env vars from shell        
-        shell_env = os.environ
+        #shell_env = os.environ
 
         #if job.environment and job.replace_env:
         #    # only consider defined env vars
         jt.remoteCommand = os.path.expanduser(PYGRID)
         jt.args = [job.inputfile]
         jt.joinFiles = True
-        #jt.setNativeSpecification(job.nativeSpecification)
-        #jt.nativeSpecification = job.nativeSpecification
+        #jt.setNativeSpecification(job.native_specification)
+        #jt.native_specification = job.native_specification
         #jt.outputPath = ":" + os.path.expanduser(TEMPDIR)
         #jt.errorPath = ":" + os.path.expanduser(TEMPDIR)
 
     s = sid
 
     if wait:
-        drmaaWait = drmaa.Session.TIMEOUT_WAIT_FOREVER
+        drmaa_wait = drmaa.Session.TIMEOUT_WAIT_FOREVER
     else:
-        drmaaWait = drmaa.Session.TIMEOUT_NO_WAIT
+        drmaa_wait = drmaa.Session.TIMEOUT_NO_WAIT
 
-    s.synchronize(jobids, drmaaWait, True)
+    s.synchronize(jobids, drmaa_wait, True)
     print "success: all jobs finished"
     s.exit()
 
     #attempt to collect results
-    retJobs = []
+    ret_jobs = []
     for ix, job in enumerate(joblist):
-        
         log_stdout_fn = (os.path.expanduser(TEMPDIR) + job.name + '.o' + jobids[ix])
         log_stderr_fn = (os.path.expanduser(TEMPDIR) + job.name + '.e' + jobids[ix])
         
         try:
-            retJob = load(job.outputfile)
-            assert(retJob.name == job.name)
-            retJobs.append(retJob)
+            ret_job = load(job.outputfile)
+            assert(ret_job.name == job.name)
+            ret_jobs.append(ret_job)
 
             #print exceptions
-            if retJob.exception != None:
-                print str(type(retJob.exception))
+            if ret_job.exception != None:
+                print str(type(ret_job.exception))
                 print "Exception encountered in job with log file:"
                 print log_stdout_fn
-                print retJob.exception
+                print ret_job.exception
 
             #remove files
-            elif retJob.cleanup:
+            elif ret_job.cleanup:
 
                 print "cleaning up:", job.outputfile
                 os.remove(job.outputfile)
 
-                if retJob != None:
+                if ret_job != None:
 
                     print "cleaning up:", log_stdout_fn
                     os.remove(log_stdout_fn)
                     #os.remove(log_stderr_fn)
 
 
-        except Exception, detail:
+        except Exception, error:
             print "error while unpickling file: " + job.outputfile
-            print "this could caused by a problem with the cluster environment, imports or environment variables"
+            print "this could caused by a problem with"
+            print "the cluster environment, imports or environment variables"
             print "check log files for more information: "
             print "stdout:", log_stdout_fn
             print "stderr:", log_stderr_fn
             
-            print detail
+            print error
 
 
 
-    return retJobs
+    return ret_jobs
 
 
-def process_jobs(jobs, local=False, maxNumThreads=1):
+def process_jobs(jobs, local=False, max_num_threads=1):
     """
     Director function to decide whether to run on the cluster or locally
     """
 
     elif (not local and not drmaa_present):
         print 'Warning: import drmaa failed, computing locally'
-        return  _process_jobs_locally(jobs, maxNumThreads=maxNumThreads)
+        return  _process_jobs_locally(jobs, max_num_threads=max_num_threads)
 
     else:
-        return  _process_jobs_locally(jobs, maxNumThreads=maxNumThreads)
+        return  _process_jobs_locally(jobs, max_num_threads=max_num_threads)
 
 
 def get_status(sid, jobids):
 
     print 'Status of jobs at %s' % (time.strftime('%d/%m/%Y - %H.%M:%S'))
     for curkey in status_summary.keys():
-        if status_summary[curkey]>0:
+        if status_summary[curkey] > 0:
             print '%s: %d' % (_decodestatus[curkey], status_summary[curkey])
     #s.exit()
 
     Save myobj to filename using pickle
     """
     try:
-        f = bz2.BZ2File(filename, 'wb')
+        handle = bz2.BZ2File(filename, 'wb')
     except IOError, details:
         sys.stderr.write('File ' + filename + ' cannot be written\n')
         sys.stderr.write(details)
         return
 
-    cPickle.dump(myobj, f, protocol=2)
-    f.close()
+    cPickle.dump(myobj, handle, protocol=2)
+    handle.close()
 
 
 def load(filename):
     Load from filename using pickle
     """
     try:
-        f = bz2.BZ2File(filename, 'rb')
+        handle = bz2.BZ2File(filename, 'rb')
     except IOError, details:
         sys.stderr.write('File ' + filename + ' cannot be read\n')
         sys.stderr.write(details)
         return
 
-    myobj = cPickle.load(f)
-    f.close()
+    myobj = cPickle.load(handle)
+    handle.close()
     return myobj
 
 ################################################################
             break
     return func.__get__(obj, cls)
 
-copy_reg.pickle(types.MethodType, pythongrid._pickle_method, pythongrid._unpickle_method)
+copy_reg.pickle(types.MethodType, pythongrid._pickle_method,
+                pythongrid._unpickle_method)
 
 
 
 ################################################################
 
 
-def run_job(pickleFileName):
+def run_job(pickle_filename):
     """
     This is the code that is executed on the cluster side.
     Runs job which was pickled to a file called pickledFileName.
 
-    @param pickleFileName: filename of pickled Job object
-    @type pickleFileName: string
+    @param pickle_filename: filename of pickled Job object
+    @type pickle_filename: string
     """
 
-    inPath = pickleFileName
-    job = load(inPath)
+    in_path = pickle_filename
+    job = load(in_path)
 
     job.execute()
 
 
 
     except Usage, err:
-        print >>sys.stderr, err.msg
-        print >>sys.stderr, "for help use --help"
+        print >> sys.stderr, err.msg
+        print >> sys.stderr, "for help use --help"
 
         return 2
 
     the system at MPI Biol. Cyber. Tuebingen. This is a SGE system.
     """
 
-    def __init__(self, f, args, kwlist={}, cleanup=True):
+    def __init__(self, f, args, kwlist=None, cleanup=True):
         """
         constructor of KybJob
         """
         self.nicetohave = ""
 
 
-    def getNativeSpecification(self):
+    def get_native_specification(self):
         """
         define python-style getter
         """
         return ret
 
 
-    def setNativeSpecification(self, x):
+    def set_native_specification(self, x):
         """
         define python-style setter
-        @param x: nativeSpecification string to be set
+        @param x: native_specification string to be set
         @type x: string
         """
 
-        self.__nativeSpecification = x
+        self.__native_specification = x
 
-    nativeSpecification = property(getNativeSpecification,
-                                   setNativeSpecification)
+    native_specification = property(get_native_specification,
+                                   set_native_specification)
 
 
 class LSFJob(Job):
     Specialization of generic Job that provides an interface to
     the system at ETH Zurich. This is an LSF system.
     """
-    def __init__(self, f, args, kwlist={}, cleanup=True):
+    def __init__(self, f, args, kwlist=None, cleanup=True):
         Job.__init__(self, f, args, kwlist, cleanup)
 
         # -W HH:MM     Wall-clock time required by the job. Can also be expressed in minutes.
         self.email_address = None
 
     @property
-    def nativeSpecification(self):
+    def native_specification(self):
         """
         define python-style getter
         """
 
         ret = ""
         if self.wall_time_h and self.wall_time_m:
-            ret += " -W %02d:%02d" % (self.wall_time_h,self.wall_time_m)
+            ret += " -W %02d:%02d" % (self.wall_time_h, self.wall_time_m)
         if self.ncpu:
             ret += " -n %d" % self.ncpu
         if self.outfile:
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.