pythongrid / pythongrid.py

Diff from to

pythongrid.py

 
 
 """
-pythongrid provides a high level front end to DRMAA-python.
+pythongrid provides a high level front end to drmaa-python.
 This module provides wrappers that simplify submission and collection of jobs,
 in a more 'pythonic' fashion.
 """
 
-#paths on cluster file system
-#PYTHONPATH = ["/fml/ag-raetsch/home/raetsch/svn/tools/python/", "/fml/ag-raetsch/home/raetsch/svn/tools/python/pythongrid/", '/fml/ag-raetsch/home/raetsch/mylibs/lib/python2.5/site-packages/', '/fml/ag-raetsch/home/raetsch/projects/Git-QPalma/ParaParser', '/fml/ag-raetsch/home/raetsch/projects/Git-QPalma/DynProg/', '/fml/ag-raetsch/share/software/mosek/5/tools/platform/linux64x86/bin', '/fml/ag-raetsch/home/fabio/site-packages', '/fml/ag-raetsch/home/raetsch/projects/Git-QPalma/Genefinding', '/fml/ag-raetsch/share/software/lib/python2.5/site-packages']
+# location of pythongrid.py on cluster file system
+# TODO set this in configuration file
+# PYGRID = "/absolute/path/to/pythongrid/pythongrid.py"
+PYGRID = "/cluster/home/infk/cong/pythongrid/pythongrid.py"
+if PYGRID == "/absolute/path/to/pythongrid/pythongrid.py":
+    print "Warning: variable PYGRID seems not to be set"
+
+# define temp directories for the input and output variables
+# (must be writable from cluster)
+# TODO define separate client/server TEMPDIR
+# TEMPDIR = "/absolute/path/to/inputoutputdir/"
+TEMPDIR = "/cluster/home/infk/cong/tmp/"
+if TEMPDIR == "/absolute/path/to/inputoutputdir/":
+    print "Warning: variable TEMPDIR seems not to be set"
+    
 
 import sys
 import os
 import time
 import random
 import traceback
+import os.path
+from os.path import join as jp
 
+import pdb
 
-#paths on cluster file system
-PYTHONPATH = os.environ['PYTHONPATH'] #["/fml/ag-raetsch/home/raetsch/svn/tools/python/", "/fml/ag-raetsch/home/raetsch/svn/tools/python/pythongrid/", '/fml/ag-raetsch/home/raetsch/mylibs/lib/python2.5/site-packages/', '/fml/ag-raetsch/home/raetsch/projects/Git-QPalma/ParaParser', '/fml/ag-raetsch/home/raetsch/projects/Git-QPalma/DynProg/', '/fml/ag-raetsch/share/software/mosek/5/tools/platform/linux64x86/bin', '/fml/ag-raetsch/home/fabio/site-packages', '/fml/ag-raetsch/home/raetsch/projects/Git-QPalma/Genefinding', '/fml/ag-raetsch/share/software/lib/python2.5/site-packages']
-
-# location of pythongrid.py on cluster file system
-# TODO set this in configuration file
-PYGRID = "~/svn/tools/python/pythongrid/pythongrid.py"
-
-# define temp directories for the input and output variables
-# (must be writable from cluster)
-# TODO define separate client/server TEMPDIR
-# ag-raetsch
-TEMPDIR = "~/tmp/"
-
+# Define any extra python paths
+PYTHONPATH = []
+if len(PYTHONPATH) > 0:
+    PPATH=reduce(lambda x,y: x+':'+y, PYTHONPATH) ;
+    print PPATH
+    os.environ['PYTHONPATH'] = PPATH;
+    sys.path.extend(PYTHONPATH)
+    print "sys.path=" + str(sys.path) ;
 
 # used for generating random filenames
 alphabet = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
 
-PPATH=reduce(lambda x,y: x+':'+y, PYTHONPATH) ;
-print PPATH
-os.environ['PYTHONPATH'] = PPATH;
-
-sys.path.extend(PYTHONPATH)
-
-
-print "sys.path=" + str(sys.path) ;
-
-jp = os.path.join
-
+# Check for drmaa and multiprocessing
 drmaa_present=True
 multiprocessing_present=True
 
             self.exception = e
 
 
+
+#only define this class if the multiprocessing module is present
+if multiprocessing_present:
+
+    class JobsProcess(multiprocessing.Process):
+        """
+        In case jobs are to be computed locally, a number of Jobs (possibly one)
+        are assinged to one thread.
+        """
+
+        def __init__(self, jobs):
+            """
+            Constructor
+            @param jobs: list of jobs
+            @type jobs: list of Job objects
+            """
+
+            self.jobs = jobs
+            multiprocessing.Process.__init__(self)
+
+        def run(self):
+            """
+            Executes each job in job list
+            """
+            for job in self.jobs:
+                job.execute()
+
+
+def _execute(job):
+    """Cannot pickle method instances, so fake a function.
+    Used by _process_jobs_locally"""
+    
+    return apply(job.f, job.args, job.kwlist)
+
+
+
+def _process_jobs_locally(jobs, maxNumThreads=None):
+    """
+    Local execution using the package multiprocessing, if present
+    
+    @param jobs: jobs to be executed
+    @type jobs: list<Job>
+    @param maxNumThreads: maximal number of threads
+    @type maxNumThreads: int
+    
+    @return: list of jobs, each with return in job.ret
+    @rtype: list<Job>
+    """
+    
+    
+    if (not multiprocessing_present or maxNumThreads == 1):
+        #perform sequential computation
+        for job in jobs:
+            job.execute()
+    else:
+        #print multiprocessing.cpu_count()
+        po = multiprocessing.Pool(maxNumThreads)
+        result = po.map(_execute, jobs)
+        for ix,job in enumerate(jobs):
+            job.ret = result[ix]
+            
+    return jobs
+
+
+def submit_jobs(jobs):
+    """
+    Method used to send a list of jobs onto the cluster.
+    @param jobs: list of jobs to be executed
+    @type jobs: list<Job>
+    """
+
+    s = drmaa.Session()
+    jobids = []
+
+    for job in jobs:
+        save(job.inputfile, job)
+        jt = s.createJobTemplate()
+
+        #fetch only specific env vars from shell
+        #shell_env = {"LD_LIBRARY_PATH": os.getenv("LD_LIBRARY_PATH"),
+        #             "PYTHONPATH": os.getenv("PYTHONPATH"),
+        #             "MOSEKLM_LICENSE_FILE": os.getenv("MOSEKLM_LICENSE_FILE"),
+        #             }
+
+        # fetch env vars from shell        
+        shell_env = os.environ
+
+        #if job.environment and job.replace_env:
+        #    # only consider defined env vars
+        #    #jt.setEnvironment(job.environment)
+        #    jt.jobEnvironment = job.environment
+
+        #elif job.environment and not job.replace_env:
+        #    # replace env var from shell with defined env vars
+        #    env = shell_env
+        #    env.update(job.environment)
+        #    #jt.setEnvironment(env)
+        #    jt.jobEnvironment = env
+
+        #else:
+        #    # only consider env vars from shell
+        #    #jt.setEnvironment(shell_env)
+        #    jt.jobEnvironment = shell_env
+
+        jt.remoteCommand = os.path.expanduser(PYGRID)
+        jt.args = [job.inputfile]
+        jt.joinFiles = True
+        #jt.setNativeSpecification(job.nativeSpecification)
+        #jt.nativeSpecification = job.nativeSpecification
+        #jt.outputPath = ":" + os.path.expanduser(TEMPDIR)
+        #jt.errorPath = ":" + os.path.expanduser(TEMPDIR)
+
+        jobid = s.runJob(jt)
+
+        # set job fields that depend on the jobid assigned by grid engine
+        job.jobid = jobid
+        log_stdout_fn = (os.path.expanduser(TEMPDIR) + job.name + '.o' + jobid)
+        log_stderr_fn = (os.path.expanduser(TEMPDIR) + job.name + '.e' + jobid)
+
+        print 'Your job %s has been submitted with id %s' % (job.name, jobid)
+        print "stdout:", log_stdout_fn
+        print "stderr:", log_stderr_fn
+        print ""
+
+        #display tmp file size
+        # print os.system("du -h " + invars)
+
+        jobids.append(jobid)
+        s.deleteJobTemplate(jt)
+
+    #sid = s.getContact()
+
+    return (s, jobids)
+
+
+def collect_jobs(sid, jobids, joblist, wait=False):
+    """
+    Collect the results from the jobids, returns a list of Jobs
+
+    @param sid: session identifier
+    @type sid: string returned by cluster
+    @param jobids: list of job identifiers returned by the cluster
+    @type jobids: list of strings
+    @param wait: Wait for jobs to finish?
+    @type wait: Boolean, defaults to False
+    """
+
+    for ix in xrange(len(jobids)):
+        assert(jobids[ix] == joblist[ix].jobid)
+
+    #s = drmaa.Session()
+    #s.initialize(sid)
+    s = sid
+
+    if wait:
+        drmaaWait = drmaa.Session.TIMEOUT_WAIT_FOREVER
+    else:
+        drmaaWait = drmaa.Session.TIMEOUT_NO_WAIT
+
+    s.synchronize(jobids, drmaaWait, True)
+    print "success: all jobs finished"
+    s.exit()
+    os.system('ls -lhtr ~/tmp/')
+
+    #attempt to collect results
+    retJobs = []
+    for ix, job in enumerate(joblist):
+        
+        log_stdout_fn = (os.path.expanduser(TEMPDIR) + job.name + '.o' + jobids[ix])
+        log_stderr_fn = (os.path.expanduser(TEMPDIR) + job.name + '.e' + jobids[ix])
+        
+        try:
+            retJob = load(job.outputfile)
+            assert(retJob.name == job.name)
+            retJobs.append(retJob)
+
+            #print exceptions
+            if retJob.exception != None:
+                print str(type(retJob.exception))
+                print "Exception encountered in job with log file:"
+                print log_stdout_fn
+                print retJob.exception
+
+            #remove files
+            elif retJob.cleanup:
+
+                print "cleaning up:", job.outputfile
+                os.remove(job.outputfile)
+
+                if retJob != None:
+
+                    print "cleaning up:", log_stdout_fn
+                    os.remove(log_stdout_fn)
+
+                    #print "cleaning up:", log_stderr_fn
+                    #os.remove(log_stderr_fn)
+
+
+        except Exception, detail:
+            print "error while unpickling file: " + job.outputfile
+            print "this could caused by a problem with the cluster environment, imports or environment variables"
+            print "check log files for more information: "
+            print "stdout:", log_stdout_fn
+            print "stderr:", log_stderr_fn
+            
+            print detail
+
+
+
+    return retJobs
+
+
+def process_jobs(jobs, local=False, maxNumThreads=1):
+    """
+    Director function to decide whether to run on the cluster or locally
+    """
+    
+    if (not local and drmaa_present):
+        # Use submit_jobs and collect_jobs to run jobs and wait for the results.
+        (sid, jobids) = submit_jobs(jobs)
+        return collect_jobs(sid, jobids, jobs, wait=True)
+
+    elif (not local and not drmaa_present):
+        print 'Warning: import drmaa failed, computing locally'
+        return  _process_jobs_locally(jobs, maxNumThreads=maxNumThreads)
+
+    else:
+        return  _process_jobs_locally(jobs, maxNumThreads=maxNumThreads)
+
+
+def get_status(sid, jobids):
+    """
+    Get the status of all jobs in jobids.
+    Returns True if all jobs are finished.
+
+    There is some instability in determining job completion
+    """
+    _decodestatus = {
+        -42: 'sge and drmaa not in sync',
+        drmaa.Session.UNDETERMINED: 'process status cannot be determined',
+        drmaa.Session.QUEUED_ACTIVE: 'job is queued and active',
+        drmaa.Session.SYSTEM_ON_HOLD: 'job is queued and in system hold',
+        drmaa.Session.USER_ON_HOLD: 'job is queued and in user hold',
+        drmaa.Session.USER_SYSTEM_ON_HOLD: 'job is in user and system hold',
+        drmaa.Session.RUNNING: 'job is running',
+        drmaa.Session.SYSTEM_SUSPENDED: 'job is system suspended',
+        drmaa.Session.USER_SUSPENDED: 'job is user suspended',
+        drmaa.Session.DONE: 'job finished normally',
+        drmaa.Session.FAILED: 'job finished, but failed',
+        }
+
+    s = drmaa.Session()
+    s.initialize(sid)
+    status_summary = {}.fromkeys(_decodestatus, 0)
+    for jobid in jobids:
+        try:
+            curstat = s.getJobProgramStatus(jobid)
+        except drmaa.InvalidJobError, message:
+            print message
+            status_summary[-42] += 1
+        else:
+            status_summary[curstat] += 1
+
+    print 'Status of %s at %s' % (sid, time.strftime('%d/%m/%Y - %H.%M:%S'))
+    for curkey in status_summary.keys():
+        if status_summary[curkey]>0:
+            print '%s: %d' % (_decodestatus[curkey], status_summary[curkey])
+    s.exit()
+
+    return ((status_summary[drmaa.Session.DONE]
+             +status_summary[-42])==len(jobids))
+
+
+#####################################################################
+# Dealing with data
+#####################################################################
+
+
+def save(filename, myobj):
+    """
+    Save myobj to filename using pickle
+    """
+    try:
+        f = bz2.BZ2File(filename, 'wb')
+    except IOError, details:
+        sys.stderr.write('File ' + filename + ' cannot be written\n')
+        sys.stderr.write(details)
+        return
+
+    cPickle.dump(myobj, f, protocol=2)
+    f.close()
+
+
+def load(filename):
+    """
+    Load from filename using pickle
+    """
+    try:
+        f = bz2.BZ2File(filename, 'rb')
+    except IOError, details:
+        sys.stderr.write('File ' + filename + ' cannot be read\n')
+        sys.stderr.write(details)
+        return
+
+    myobj = cPickle.load(f)
+    f.close()
+    return myobj
+
+
+################################################################
+#      The following code will be executed on the cluster      #
+################################################################
+
+
+def run_job(pickleFileName):
+    """
+    This is the code that is executed on the cluster side.
+    Runs job which was pickled to a file called pickledFileName.
+
+    @param pickleFileName: filename of pickled Job object
+    @type pickleFileName: string
+    """
+
+    inPath = pickleFileName
+    job = load(inPath)
+
+    job.execute()
+
+    #remove input file
+    if job.cleanup:
+        os.remove(job.inputfile)
+
+    save(job.outputfile, job)
+
+
+class Usage(Exception):
+    """
+    Simple Exception for cmd-line user-interface.
+    """
+
+    def __init__(self, msg):
+        """
+        Constructor of simple Exception.
+
+        @param msg: exception message
+        @type msg: string
+        """
+
+        self.msg = msg
+
+
+def main(argv=None):
+    """
+    Parse the command line inputs and call run_job
+
+    @param argv: list of arguments
+    @type argv: list of strings
+    """
+
+
+    if argv is None:
+        argv = sys.argv
+
+    try:
+        try:
+            opts, args = getopt.getopt(argv[1:], "h", ["help"])
+            run_job(args[0])
+        except getopt.error, msg:
+            raise Usage(msg)
+
+
+    except Usage, err:
+        print >>sys.stderr, err.msg
+        print >>sys.stderr, "for help use --help"
+
+        return 2
+
 class KybJob(Job):
     """
     Specialization of generic Job that provides an interface to
-    the system at MPI Biol. Cyber. Tuebingen.
+    the system at MPI Biol. Cyber. Tuebingen. This is a SGE system.
     """
 
     def __init__(self, f, args, kwlist={}, cleanup=True):
                                    setNativeSpecification)
 
 
+class LSFJob(Job):
+    """
+    Specialization of generic Job that provides an interface to
+    the system at ETH Zurich. This is an LSF system.
+    """
+    def __init__(self, f, args, kwlist={}, cleanup=True):
+        Job.__init__(self, f, args, kwlist, cleanup)
 
-#only define this class if the multiprocessing module is present
-if multiprocessing_present:
+        # -W HH:MM     Wall-clock time required by the job. Can also be expressed in minutes.
+        self.wall_time_h = None
+        self.wall_time_m = None
+        
+        # -n N         Number of processors required by the job.
+        self.ncpu = None
+        
+        # -o outfile   Append the job's output (stdout) to outfile.
+        #              The keyword "%J" is interpreted as the job's numerical ID.
+        self.outfile = None
+        
+        # -e errfile   Append the job's error (stderr) to errfile.
+        #              By default, stderr is merged with stdout.
+        self.errfile = None
+        
+        # -oo outfile  Write the job's output (stdout) to outfile,
+        #              overwriting it if it already exists. [LSF 6.1 and above]
+        self.ooutfile = None
+        
+        # -eo errfile  Write the job's error (stderr) to errfile,
+        #              overwriting it if it already exists. [LSF 6.1 and above]
+        self.oerrfile = None
+        
+        # -I           Run the job interactively. Input/output are redirected from/to your terminal.
+        # Not supported.
+        
+        # -J jobname   Assign a (non necessarily unique) name to the job.
+        #              Used to define job chains.
+        #              To avoid confusion with numerical job IDs,
+        #              jobname should contain at least one letter.
+        self.jobname = None
+        
+        # -w "depcond" Wait (do not start the job) until the specified
+        #              dependency condition is satisfied.
+        #              For example: "done(jobID)", "ended(jobname)". Quotes are recommended.
+        self.wait = None
+        
+        # -B / -N      Send an e-mail to the job's owner when the job begins / ends.
+        self.email_begin = None
+        self.email_end = None
+        
+        # -u user      Send e-mail to user instead of the job's owner.
+        #              The recipient's address must be inside the ETH domain.
+        #              The firewall blocks e-mail sent to other addresses.
+        self.email_address = None
 
-    class JobsProcess(multiprocessing.Process):
+    @property
+    def nativeSpecification(self):
         """
-        In case jobs are to be computed locally, a number of Jobs (possibly one)
-        are assinged to one thread.
+        define python-style getter
         """
 
-        def __init__(self, jobs):
-            """
-            Constructor
-            @param jobs: list of jobs
-            @type jobs: list of Job objects
-            """
+        ret = ""
+        if self.wall_time_h and self.wall_time_m:
+            ret += " -W %02d:%02d" % (self.wall_time_h,self.wall_time_m)
+        if self.ncpu:
+            ret += " -n %d" % self.ncpu
+        if self.outfile:
+            ret += " -o %s" % self.outfile
+        if self.errfile:
+            ret += " -e %s" % self.errfile
+        if self.ooutfile:
+            ret += " -oo %s" % self.ooutfile
+        if self.oerrfile:
+            ret += " -oe %s" % self.oerrfile
+        if self.jobname:
+            ret += " -J %s" % self.jobname
+        if self.wait:
+            ret += " -w %s" % self.wait
+        if self.email_begin:
+            ret += " -B"
+        if self.email_end:
+            ret += " -N"
+        if self.email_address:
+            ret += " -u %s" % self.email_address
 
-            self.jobs = jobs
-            multiprocessing.Process.__init__(self)
-
-        def run(self):
-            """
-            Executes each job in job list
-            """
-            for job in self.jobs:
-                job.execute()
-
-
-def _execute(job):
-    """Cannot pickle method instances, so fake a function.
-    Used by _process_jobs_locally"""
-    
-    return apply(job.f, job.args, job.kwlist)
-
-
-
-def _process_jobs_locally(jobs, maxNumThreads=None):
-    """
-    Local execution using the package multiprocessing, if present
-    
-    @param jobs: jobs to be executed
-    @type jobs: list<Job>
-    @param maxNumThreads: maximal number of threads
-    @type maxNumThreads: int
-    
-    @return: list of jobs, each with return in job.ret
-    @rtype: list<Job>
-    """
-    
-    
-    if (not multiprocessing_present or maxNumThreads == 1):
-        #perform sequential computation
-        for job in jobs:
-            job.execute()
-    else:
-        #print multiprocessing.cpu_count()
-        po = multiprocessing.Pool(maxNumThreads)
-        result = po.map(_execute, jobs)
-        for ix,job in enumerate(jobs):
-            job.ret = result[ix]
-            
-    return jobs
-
-
-def submit_jobs(jobs):
-    """
-    Method used to send a list of jobs onto the cluster.
-    @param jobs: list of jobs to be executed
-    @type jobs: list<Job>
-    """
-
-    s = drmaa.Session()
-    s.initialize()
-    jobids = []
-
-    for job in jobs:
-        save(job.inputfile, job)
-        jt = s.createJobTemplate()
-
-        #fetch only specific env vars from shell
-        #shell_env = {"LD_LIBRARY_PATH": os.getenv("LD_LIBRARY_PATH"),
-        #             "PYTHONPATH": os.getenv("PYTHONPATH"),
-        #             "MOSEKLM_LICENSE_FILE": os.getenv("MOSEKLM_LICENSE_FILE"),
-        #             }
-
-        # fetch env vars from shell        
-        shell_env = os.environ
-
-        if job.environment and job.replace_env:
-            # only consider defined env vars
-            jt.setEnvironment(job.environment)
-
-        elif job.environment and not job.replace_env:
-            # replace env var from shell with defined env vars
-            env = shell_env
-            env.update(job.environment)
-            jt.setEnvironment(env)
-
-        else:
-            # only consider env vars from shell
-            jt.setEnvironment(shell_env)
-
-        jt.remoteCommand = os.path.expanduser(PYGRID)
-        jt.args = [job.inputfile]
-        jt.joinFiles = True
-        jt.setNativeSpecification(job.nativeSpecification)
-        jt.outputPath = ":" + os.path.expanduser(TEMPDIR)
-        jt.errorPath = ":" + os.path.expanduser(TEMPDIR)
-
-        jobid = s.runJob(jt)
-
-        # set job fields that depend on the jobid assigned by grid engine
-        job.jobid = jobid
-        log_stdout_fn = (os.path.expanduser(TEMPDIR) + job.name + '.o' + jobid)
-        log_stderr_fn = (os.path.expanduser(TEMPDIR) + job.name + '.e' + jobid)
-
-        print 'Your job %s has been submitted with id %s' % (job.name, jobid)
-        print "stdout:", log_stdout_fn
-        print "stderr:", log_stderr_fn
-        print ""
-
-        #display tmp file size
-        # print os.system("du -h " + invars)
-
-        jobids.append(jobid)
-        s.deleteJobTemplate(jt)
-
-    sid = s.getContact()
-    s.exit()
-
-    return (sid, jobids)
-
-
-def collect_jobs(sid, jobids, joblist, wait=False):
-    """
-    Collect the results from the jobids, returns a list of Jobs
-
-    @param sid: session identifier
-    @type sid: string returned by cluster
-    @param jobids: list of job identifiers returned by the cluster
-    @type jobids: list of strings
-    @param wait: Wait for jobs to finish?
-    @type wait: Boolean, defaults to False
-    """
-
-    for ix in xrange(len(jobids)):
-        assert(jobids[ix] == joblist[ix].jobid)
-
-    s = drmaa.Session()
-    s.initialize(sid)
-
-    if wait:
-        drmaaWait = drmaa.Session.TIMEOUT_WAIT_FOREVER
-    else:
-        drmaaWait = drmaa.Session.TIMEOUT_NO_WAIT
-
-    s.synchronize(jobids, drmaaWait, True)
-    print "success: all jobs finished"
-    s.exit()
-
-    #attempt to collect results
-    retJobs = []
-    for ix, job in enumerate(joblist):
-        
-        log_stdout_fn = (os.path.expanduser(TEMPDIR) + job.name + '.o' + jobids[ix])
-        log_stderr_fn = (os.path.expanduser(TEMPDIR) + job.name + '.e' + jobids[ix])
-        
-        try:
-            retJob = load(job.outputfile)
-            assert(retJob.name == job.name)
-            retJobs.append(retJob)
-
-            #print exceptions
-            if retJob.exception != None:
-                print str(type(retJob.exception))
-                print "Exception encountered in job with log file:"
-                print log_stdout_fn
-                print retJob.exception
-
-            #remove files
-            elif retJob.cleanup:
-
-                print "cleaning up:", job.outputfile
-                os.remove(job.outputfile)
-
-                if retJob != None:
-
-                    print "cleaning up:", log_stdout_fn
-                    os.remove(log_stdout_fn)
-
-                    print "cleaning up:", log_stderr_fn
-                    #os.remove(log_stderr_fn)
-
-
-        except Exception, detail:
-            print "error while unpickling file: " + job.outputfile
-            print "this could caused by a problem with the cluster environment, imports or environment variables"
-            print "check log files for more information: "
-            print "stdout:", log_stdout_fn
-            print "stderr:", log_stderr_fn
-            
-            print detail
-
-
-
-    return retJobs
-
-
-def process_jobs(jobs, local=False, maxNumThreads=1):
-    """
-    Director function to decide whether to run on the cluster or locally
-    """
-    
-    if (not local and drmaa_present):
-        # Use submit_jobs and collect_jobs to run jobs and wait for the results.
-        (sid, jobids) = submit_jobs(jobs)
-        return collect_jobs(sid, jobids, jobs, wait=True)
-
-    elif (not local and not drmaa_present):
-        print 'Warning: import drmaa failed, computing locally'
-        return  _process_jobs_locally(jobs, maxNumThreads=maxNumThreads)
-
-    else:
-        return  _process_jobs_locally(jobs, maxNumThreads=maxNumThreads)
-
-
-def get_status(sid, jobids):
-    """
-    Get the status of all jobs in jobids.
-    Returns True if all jobs are finished.
-
-    There is some instability in determining job completion
-    """
-    _decodestatus = {
-        -42: 'sge and drmaa not in sync',
-        drmaa.Session.UNDETERMINED: 'process status cannot be determined',
-        drmaa.Session.QUEUED_ACTIVE: 'job is queued and active',
-        drmaa.Session.SYSTEM_ON_HOLD: 'job is queued and in system hold',
-        drmaa.Session.USER_ON_HOLD: 'job is queued and in user hold',
-        drmaa.Session.USER_SYSTEM_ON_HOLD: 'job is in user and system hold',
-        drmaa.Session.RUNNING: 'job is running',
-        drmaa.Session.SYSTEM_SUSPENDED: 'job is system suspended',
-        drmaa.Session.USER_SUSPENDED: 'job is user suspended',
-        drmaa.Session.DONE: 'job finished normally',
-        drmaa.Session.FAILED: 'job finished, but failed',
-        }
-
-    s = drmaa.Session()
-    s.initialize(sid)
-    status_summary = {}.fromkeys(_decodestatus, 0)
-    for jobid in jobids:
-        try:
-            curstat = s.getJobProgramStatus(jobid)
-        except drmaa.InvalidJobError, message:
-            print message
-            status_summary[-42] += 1
-        else:
-            status_summary[curstat] += 1
-
-    print 'Status of %s at %s' % (sid, time.strftime('%d/%m/%Y - %H.%M:%S'))
-    for curkey in status_summary.keys():
-        if status_summary[curkey]>0:
-            print '%s: %d' % (_decodestatus[curkey], status_summary[curkey])
-    s.exit()
-
-    return ((status_summary[drmaa.Session.DONE]
-             +status_summary[-42])==len(jobids))
-
-
-#####################################################################
-# Dealing with data
-#####################################################################
-
-
-def save(filename, myobj):
-    """
-    Save myobj to filename using pickle
-    """
-    try:
-        f = bz2.BZ2File(filename, 'wb')
-    except IOError, details:
-        sys.stderr.write('File ' + filename + ' cannot be written\n')
-        sys.stderr.write(details)
-        return
-
-    cPickle.dump(myobj, f, protocol=2)
-    f.close()
-
-
-def load(filename):
-    """
-    Load from filename using pickle
-    """
-    try:
-        f = bz2.BZ2File(filename, 'rb')
-    except IOError, details:
-        sys.stderr.write('File ' + filename + ' cannot be read\n')
-        sys.stderr.write(details)
-        return
-
-    myobj = cPickle.load(f)
-    f.close()
-    return myobj
-
-
-################################################################
-#      The following code will be executed on the cluster      #
-################################################################
-
-
-def run_job(pickleFileName):
-    """
-    This is the code that is executed on the cluster side.
-    Runs job which was pickled to a file called pickledFileName.
-
-    @param pickleFileName: filename of pickled Job object
-    @type pickleFileName: string
-    """
-
-    inPath = pickleFileName
-    job = load(inPath)
-
-    job.execute()
-
-    #remove input file
-    if job.cleanup:
-        os.remove(job.inputfile)
-
-    save(job.outputfile, job)
-
-
-class Usage(Exception):
-    """
-    Simple Exception for cmd-line user-interface.
-    """
-
-    def __init__(self, msg):
-        """
-        Constructor of simple Exception.
-
-        @param msg: exception message
-        @type msg: string
-        """
-
-        self.msg = msg
-
-
-def main(argv=None):
-    """
-    Parse the command line inputs and call run_job
-
-    @param argv: list of arguments
-    @type argv: list of strings
-    """
-
-
-    if argv is None:
-        argv = sys.argv
-
-    try:
-        try:
-            opts, args = getopt.getopt(argv[1:], "h", ["help"])
-            run_job(args[0])
-        except getopt.error, msg:
-            raise Usage(msg)
-
-
-    except Usage, err:
-        print >>sys.stderr, err.msg
-        print >>sys.stderr, "for help use --help"
-
-        return 2
+        return ret
 
 if __name__ == "__main__":
     main(sys.argv)
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.