Commits

Anonymous committed 0977b59

Updated to work with only one session. Not clear whether this is a bug in DRMAA system that doesnt allow to reconnect to old sessions.

Comments (0)

Files changed (2)

 import example
 
 
+def computeFactorial(n):
+    """
+    computes factorial of n
+    """
+    ret=1
+    for i in range(n):
+        ret=ret*(i+1)
+
+    return ret
+
 def makeJobs():
     """
     Creates a list of Jobs.
     """
-    
     inputvec = [[3], [5], [10], [15]]
     print 'print computing the factorials of %s' % str(inputvec)
     jobs=[]
     for input in inputvec:
         # We need to use the full identifier
         # such that the module name is explicit.
-        job = Job(example.computeFactorial, input, cleanup=False) 
+        job = Job(example.computeFactorial, input, cleanup=True) 
         jobs.append(job)
 
     return jobs
 
-
-def runExample():
-    """
-    execute example    
-    """ 
+def local_multithreading():
+    """Run jobs on local machine"""
     print "===  Local Multithreading  ==="
     functionJobs = makeJobs()
 
     for (i, job) in enumerate(processedFunctionJobs):
         print "Job #", i, "- ret: ", str(job.ret)[0:10]
     
-    
+def submit_and_wait():
+    """Submit jobs to the cluster and wait for jobs to complete."""
     print "===  Submit and Wait  ==="
     functionJobs = makeJobs()
 
     for (i, job) in enumerate(processedFunctionJobs):
         print "Job #", i, "- ret: ", str(job.ret)[0:10]
 
-    return
-
+def submit_and_forget():
+    """Submit jobs to cluster, and check them later."""
     print "===  Submit and Forget  ==="
     myjobs = makeJobs()
 
     for (i, job) in enumerate(retjobs):
         print "Job #", i, "- ret: ", str(job.ret)[0:10]
 
-
-def computeFactorial(n):
+def runExample():
     """
-    computes factorial of n
-    """
-    ret=1
-    for i in xrange(n):
-        ret=ret*(i+1)
-
-    return ret
+    execute example    
+    """ 
+    local_multithreading()
+    submit_and_wait()
+    #submit_and_forget()
 
 
 def main(argv=None):
 # location of pythongrid.py on cluster file system
 # TODO set this in configuration file
 # PYGRID = "/absolute/path/to/pythongrid/pythongrid.py"
-PYGRID = "/cluster/home/infk/cong/pythongrid/pythongrid.py"
+#PYGRID = "/cluster/home/infk/cong/pythongrid/pythongrid.py"
 if PYGRID == "/absolute/path/to/pythongrid/pythongrid.py":
     print "Warning: variable PYGRID seems not to be set"
 
 # (must be writable from cluster)
 # TODO define separate client/server TEMPDIR
 # TEMPDIR = "/absolute/path/to/inputoutputdir/"
-TEMPDIR = "/cluster/home/infk/cong/tmp/"
+#TEMPDIR = "/cluster/home/infk/cong/tmp/"
 if TEMPDIR == "/absolute/path/to/inputoutputdir/":
     print "Warning: variable TEMPDIR seems not to be set"
     
     s.synchronize(jobids, drmaaWait, True)
     print "success: all jobs finished"
     s.exit()
-    os.system('ls -lhtr ~/tmp/')
 
     #attempt to collect results
     retJobs = []
     """
     _decodestatus = {
         -42: 'sge and drmaa not in sync',
-        drmaa.Session.UNDETERMINED: 'process status cannot be determined',
-        drmaa.Session.QUEUED_ACTIVE: 'job is queued and active',
-        drmaa.Session.SYSTEM_ON_HOLD: 'job is queued and in system hold',
-        drmaa.Session.USER_ON_HOLD: 'job is queued and in user hold',
-        drmaa.Session.USER_SYSTEM_ON_HOLD: 'job is in user and system hold',
-        drmaa.Session.RUNNING: 'job is running',
-        drmaa.Session.SYSTEM_SUSPENDED: 'job is system suspended',
-        drmaa.Session.USER_SUSPENDED: 'job is user suspended',
-        drmaa.Session.DONE: 'job finished normally',
-        drmaa.Session.FAILED: 'job finished, but failed',
+        drmaa.JobState.UNDETERMINED: 'process status cannot be determined',
+        drmaa.JobState.QUEUED_ACTIVE: 'job is queued and active',
+        drmaa.JobState.SYSTEM_ON_HOLD: 'job is queued and in system hold',
+        drmaa.JobState.USER_ON_HOLD: 'job is queued and in user hold',
+        drmaa.JobState.USER_SYSTEM_ON_HOLD: 'job is in user and system hold',
+        drmaa.JobState.RUNNING: 'job is running',
+        drmaa.JobState.SYSTEM_SUSPENDED: 'job is system suspended',
+        drmaa.JobState.USER_SUSPENDED: 'job is user suspended',
+        drmaa.JobState.DONE: 'job finished normally',
+        drmaa.JobState.FAILED: 'job finished, but failed',
         }
 
-    s = drmaa.Session()
-    s.initialize(sid)
+    #s = drmaa.Session()
+    #s.initialize(sid)
+    s = sid
     status_summary = {}.fromkeys(_decodestatus, 0)
     for jobid in jobids:
         try:
-            curstat = s.getJobProgramStatus(jobid)
-        except drmaa.InvalidJobError, message:
+            curstat = s.jobStatus(jobid)
+        except drmaa.errors.InvalidJobException, message:
             print message
             status_summary[-42] += 1
         else:
             status_summary[curstat] += 1
 
-    print 'Status of %s at %s' % (sid, time.strftime('%d/%m/%Y - %H.%M:%S'))
+    print 'Status of jobs at %s' % (time.strftime('%d/%m/%Y - %H.%M:%S'))
     for curkey in status_summary.keys():
         if status_summary[curkey]>0:
             print '%s: %d' % (_decodestatus[curkey], status_summary[curkey])
-    s.exit()
+    #s.exit()
 
-    return ((status_summary[drmaa.Session.DONE]
+    return ((status_summary[drmaa.JobState.DONE]
              +status_summary[-42])==len(jobids))
 
 
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.