Anonymous avatar Anonymous committed 5399d7d

Copied from ag-raetsch svn. Could not get history.

Comments (0)

Files changed (13)

+===================================================
+pythongrid - a high level front end to DRMAA-python
+===================================================
+
+.. contents::
+
+Introduction
+============
+The module pythongrid.py provides wrappers that simplify 
+submission and collection of jobs, in a more 'pythonic' fashion.
+It also uses the same interface for running jobs locally on the
+same machine using threads. This may be useful for debugging purposes,
+and may also be faster if the cluster is busy and the jobs are small.
+
+Installation
+============
+
+Dependencies
+------------
+
+- `DRMAA-python`_ (>=0.2.2)
+- `easysvm`_  (>=0.2) [optional, used in example]
+
+.. _`DRMAA-python`: http://gridengine.sunsource.net/howto/drmaa_python.html
+.. _`easysvm` : http://www.easysvm.org
+
+
+Install
+-------
+
+DRMAA-python is required. To check that it is available
+
+    >>> import DRMAA
+
+Untar the archive, and put pythongrid.py somewhere in $PYTHONPATH
+
+    $ tar xzvf pythongrid-0.1.tar.gz
+
+Edit pythongrid.py to set the following two variables at the top.
+
+    PYGRID = '/absolute/path/to/pythongrid.py'
+    TEMPDIR = '/absolute/path/to/inputoutputdir/'
+
+PYGRID is used on the cluster as a shell command.
+TEMPDIR should be readable and writable from both submission
+and cluster systems.
+
+Edit your environment such that the drmaa shared library for the grid
+engine is in the shared path. For example
+
+    export LD_LIBRARY_PATH=/usr/local/sge/lib/lx26-amd64/:$LD_LIBRARY_PATH
+
+To check that everything has installed properly
+
+    >>> import pythongrid
+
+
+
+Usage
+=====
+
+Four functions wrap the complexity:
+
+- submit_jobs takes a list of jobs and sends them to the cluster
+- get_status checks the jobs in a given session
+- collect_jobs takes a list of job ids and collects their results
+- process_jobs provides a shortcut that does both in series. It also
+  provides an option to run locally.
+
+Please look at the examples for usage details.
+
+To use a function within the module itself, one has to import the
+module explicitly from within the module. This is to resolve
+namespaces in python
+
+License
+=======
+
+GPLv3_
+
+.. _GPLv3: http://gplv3.fsf.org/
+
+All programs in this collection are free software: 
+you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+Copyright 2008 Cheng Soon Ong and Christian Widmer
+TODO
+
+pre-0.3:
++ update to new DRMAA wrapper!
++ clearly document "submit and wait" and "submit and collect"
++ remove hard-coded env vars and paths
++ Add ETHJob
+
+post-0.3:
++ add apply_to_combination as helper
++ Unify method jobs, reconsider whether it makes sense to have this
++ Need a better way to set job and file names. Preferably human readable and unique.
++ Clear memory after pickling in the constructor, so reduce memory usage
++ validate member flags for KybJob
++ hold interface
+#!/usr/bin/env python
+
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# Written (W) 2008-2009 Christian Widmer
+# Copyright (C) 2008-2009 Max-Planck-Society
+
+import sys
+import getopt
+import pythongrid
+from pythongrid import KybJob, Usage
+from pythongrid import process_jobs, submit_jobs, collect_jobs, get_status
+import time
+
+#needs to be imported, such that module name can be referred to explicitly
+import example
+
+
+def makeJobs():
+    """
+    Creates a list of Jobs.
+    """
+    
+    inputvec = [[3], [5], [10], [15]]
+    print 'print computing the factorials of %s' % str(inputvec)
+    jobs=[]
+
+    for input in inputvec:
+        # We need to use the full identifier
+        # such that the module name is explicit.
+        job = KybJob(example.computeFactorial, input) 
+        job.h_vmem="300M"
+        
+        jobs.append(job)
+        
+
+    return jobs
+
+
+def runExample():
+    '''
+    execute example    
+    ''' 
+
+    print "====================================="
+    print "======  Local Multithreading  ======="
+    print "====================================="
+    print ""
+    print ""
+
+    print "generating function jobs"
+
+    functionJobs = makeJobs()
+
+    print "output ret field in each job before multithreaded computation"
+    for (i, job) in enumerate(functionJobs):
+        print "Job #", i, "- ret: ", job.ret
+
+    print ""
+    print "executing jobs on local machine using 3 threads"
+    if not pythongrid.multiprocessing_present:
+        print 'multiprocessing not found, serial computation'
+    print ""
+
+
+    processedFunctionJobs = process_jobs(functionJobs, local=True, maxNumThreads=3)
+    
+
+    print "ret fields AFTER execution on local machine"
+    for (i, job) in enumerate(processedFunctionJobs):
+        print "Job #", i, "- ret: ", str(job.ret)[0:10]
+    
+    
+    print ""
+    print ""
+    print "====================================="
+    print "========   Submit and Wait   ========"
+    print "====================================="
+    print ""
+    print ""
+
+    functionJobs = makeJobs()
+
+    print "output ret field in each job before sending it onto the cluster"
+    for (i, job) in enumerate(functionJobs):
+        print "Job #", i, "- ret: ", job.ret
+
+    print ""
+    print "sending function jobs to cluster"
+    print ""
+
+    processedFunctionJobs = process_jobs(functionJobs)
+
+    print "ret fields AFTER execution on cluster"
+    for (i, job) in enumerate(processedFunctionJobs):
+        print "Job #", i, "- ret: ", str(job.ret)[0:10]
+
+
+    print ""
+    print ""
+    print "====================================="
+    print "=======  Submit and Forget   ========"
+    print "====================================="
+    print ""
+    print ""
+
+
+    print 'demo session'
+    myjobs = makeJobs()
+
+    (sid, jobids) = submit_jobs(myjobs)
+
+    print 'checking whether finished'
+    while not get_status(sid, jobids):
+        time.sleep(7)
+    print 'collecting jobs'
+    retjobs = collect_jobs(sid, jobids, myjobs)
+    print "ret fields AFTER execution on cluster"
+    for (i, job) in enumerate(retjobs):
+        print "Job #", i, "- ret: ", str(job.ret)[0:10]
+
+    print '--------------'
+
+
+def computeFactorial(n):
+    """
+    computes factorial of n
+    """
+    ret=1
+    for i in xrange(n):
+        ret=ret*(i+1)
+
+    return ret
+
+
+def main(argv=None):
+    if argv is None:
+        argv = sys.argv
+
+    try:
+        try:
+            opts, args = getopt.getopt(argv[1:], "h", ["help"])
+            runExample()
+
+        except getopt.error, msg:
+            raise Usage(msg)
+
+    except Usage, err:
+        print >>sys.stderr, err.msg
+        print >>sys.stderr, "for help use --help"
+
+        return 2
+
+
+if __name__ == "__main__":
+    main()
+

example_env_vars.py

+#!/usr/bin/env python
+
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# Written (W) 2008-2009 Christian Widmer
+# Copyright (C) 2008-2009 Max-Planck-Society
+
+import sys
+import getopt
+import time
+import os
+
+from pythongrid import KybJob, Usage
+from pythongrid import process_jobs, submit_jobs, collect_jobs, get_status
+
+#needs to be imported, such that module name can be referred to explicitly
+import example_advanced
+
+
+
+def runExample():
+
+    inputvec = [["0"], ["1"], ["2"]]
+
+
+    jobs = []
+
+    for input in inputvec:
+        # We need to use the full identifier
+        # such that the module name is explicit.
+        job = KybJob(example_advanced.show_env, input) 
+        jobs.append(job)
+
+        
+    jobs[1].environment = {"MYENV1": "defined"}
+    jobs[1].replace_env = False
+
+    jobs[2].environment = {"MYENV1": "defined"}
+    jobs[2].replace_env = True
+
+    processedFunctionJobs = process_jobs(jobs)
+
+    print "ret fields AFTER execution on cluster"
+    for (i, job) in enumerate(processedFunctionJobs):
+        print "Job #", i, "- ret: ", str(job.ret)
+
+
+    return jobs
+
+
+def show_env(n):
+    """
+    computes factorial of n
+    """
+
+    ret = (os.getenv("MYENV1"), os.getenv("MYENV2"))
+
+    return ret
+
+
+def main(argv=None):
+    if argv is None:
+        argv = sys.argv
+
+    try:
+        try:
+            opts, args = getopt.getopt(argv[1:], "h", ["help"])
+            runExample()
+
+        except getopt.error, msg:
+            raise Usage(msg)
+
+    except Usage, err:
+        print >>sys.stderr, err.msg
+        print >>sys.stderr, "for help use --help"
+
+        return 2
+
+
+if __name__ == "__main__":
+    main()
+

example_multiproc.py

+#!/usr/bin/env python
+
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# Written (W) 2008 Christian Widmer
+# Copyright (C) 2008 Max-Planck-Society
+
+import sys
+import getopt
+from pythongrid import KybJob, Usage
+from pythongrid import process_jobs, submit_jobs, collect_jobs, get_status
+import time
+
+#needs to be imported, such that module name can be referred to explicitly
+import example
+
+
+def makeJobs():
+    """
+    Creates a list of Jobs.
+    """
+    inputvec = [[20], [30], [40], [50]]
+    print 'print computing the factorials of %s' % str(inputvec)
+    jobs=[]
+    j1 = KybJob(example.computeFactorial, inputvec[0])
+    j1.h_vmem="300M"
+
+    jobs.append(j1)
+
+    # One needs to use the full identifier
+    # such that the module name is explicit.
+    jobs.append(KybJob(example.computeFactorial, inputvec[1]))
+    jobs.append(KybJob(example.computeFactorial, inputvec[2]))
+    jobs.append(KybJob(example.computeFactorial, inputvec[3]))
+
+    return jobs
+
+
+def runExample():
+    print "====================================="
+    print "======= Local Multithreading ========"
+    print "====================================="
+    print ""
+    print ""
+
+    print "generating fuction jobs"
+
+    functionJobs = makeJobs()
+
+    print "output ret field in each job before multithreaded computation"
+    for (i, job) in enumerate(functionJobs):
+        print "Job #", i, "- ret: ", job.ret
+
+    print ""
+    print "executing jobs on local machine"
+    print ""
+
+    processedFunctionJobs = process_jobs(functionJobs, local=True, maxNumThreads=2)
+
+    print "ret fields AFTER execution on local machine"
+    for (i, job) in enumerate(processedFunctionJobs):
+        print "Job #", i, "- ret: ", job.ret
+
+
+def computeFactorial(n):
+    """
+    computes factorial of n
+    """
+    ret=1
+    for i in xrange(n):
+        ret=ret*(i+1)
+
+    return ret
+
+
+def main(argv=None):
+    if argv is None:
+        argv = sys.argv
+
+    try:
+        try:
+            opts, args = getopt.getopt(argv[1:], "h", ["help"])
+            runExample()
+
+        except getopt.error, msg:
+            raise Usage(msg)
+
+    except Usage, err:
+        print >>sys.stderr, err.msg
+        print >>sys.stderr, "for help use --help"
+
+        return 2
+
+
+if __name__ == "__main__":
+    main()
+    #sys.exit(main())

Binary file added.

Binary file added.

+#! /usr/bin/env python
+
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# Written (W) 2008-2009 Christian Widmer
+# Written (W) 2008-2009 Cheng Soon Ong
+# Copyright (C) 2008-2009 Max-Planck-Society
+
+
+"""
+pythongrid provides a high level front end to DRMAA-python.
+This module provides wrappers that simplify submission and collection of jobs,
+in a more 'pythonic' fashion.
+"""
+
+#paths on cluster file system
+#PYTHONPATH = ["/fml/ag-raetsch/home/raetsch/svn/tools/python/", "/fml/ag-raetsch/home/raetsch/svn/tools/python/pythongrid/", '/fml/ag-raetsch/home/raetsch/mylibs/lib/python2.5/site-packages/', '/fml/ag-raetsch/home/raetsch/projects/Git-QPalma/ParaParser', '/fml/ag-raetsch/home/raetsch/projects/Git-QPalma/DynProg/', '/fml/ag-raetsch/share/software/mosek/5/tools/platform/linux64x86/bin', '/fml/ag-raetsch/home/fabio/site-packages', '/fml/ag-raetsch/home/raetsch/projects/Git-QPalma/Genefinding', '/fml/ag-raetsch/share/software/lib/python2.5/site-packages']
+
+import sys
+import os
+import os.path
+import bz2
+import cPickle
+import getopt
+import time
+import random
+import traceback
+
+
+#paths on cluster file system
+PYTHONPATH = os.environ['PYTHONPATH'] #["/fml/ag-raetsch/home/raetsch/svn/tools/python/", "/fml/ag-raetsch/home/raetsch/svn/tools/python/pythongrid/", '/fml/ag-raetsch/home/raetsch/mylibs/lib/python2.5/site-packages/', '/fml/ag-raetsch/home/raetsch/projects/Git-QPalma/ParaParser', '/fml/ag-raetsch/home/raetsch/projects/Git-QPalma/DynProg/', '/fml/ag-raetsch/share/software/mosek/5/tools/platform/linux64x86/bin', '/fml/ag-raetsch/home/fabio/site-packages', '/fml/ag-raetsch/home/raetsch/projects/Git-QPalma/Genefinding', '/fml/ag-raetsch/share/software/lib/python2.5/site-packages']
+
+# location of pythongrid.py on cluster file system
+# TODO set this in configuration file
+PYGRID = "~/svn/tools/python/pythongrid/pythongrid.py"
+
+# define temp directories for the input and output variables
+# (must be writable from cluster)
+# TODO define separate client/server TEMPDIR
+# ag-raetsch
+TEMPDIR = "~/tmp/"
+
+
+# used for generating random filenames
+alphabet = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
+
+PPATH=reduce(lambda x,y: x+':'+y, PYTHONPATH) ;
+print PPATH
+os.environ['PYTHONPATH'] = PPATH;
+
+sys.path.extend(PYTHONPATH)
+
+
+print "sys.path=" + str(sys.path) ;
+
+jp = os.path.join
+
+drmaa_present=True
+multiprocessing_present=True
+
+try:
+    import drmaa
+except ImportError, detail:
+    print "Error importing drmaa. Only local multi-threading supported."
+    print "Please check your installation."
+    print detail
+    drmaa_present=False
+
+try:
+    import multiprocessing
+except ImportError, detail:
+    print "Error importing multiprocessing. Local computing limited to one CPU."
+    print "Please install python2.6 or the backport of the multiprocessing package"
+    print detail
+    multiprocessing_present=False
+
+class Job(object):
+    """
+    Central entity that wraps a function and its data. Basically,
+    a job consists of a function, its argument list, its
+    keyword list and a field "ret" which is filled, when
+    the execute method gets called
+    """
+
+    def __init__(self, f, args, kwlist={}, cleanup=True):
+        """
+        constructor of Job
+
+        @param f: a function, which should be executed.
+        @type f: function
+        @param args: argument list of function f
+        @type args: list
+        @param kwlist: dictionary of keyword arguments
+        @type kwlist: dict
+        @param cleanup: flag that determines the cleanup of input and log file
+        @type cleanup: boolean
+        """
+        self.f = f
+        self.args = args
+        self.kwlist = kwlist
+        self.cleanup = cleanup
+        self.ret = None
+        self.inputfile = ""
+        self.outputfile = ""
+        self.nativeSpecification = ""
+        self.exception = None
+        self.environment = None
+        self.replace_env = False
+
+        outdir = os.path.expanduser(TEMPDIR)
+        if not os.path.isdir(outdir):
+            print '%s does not exist. Please create a directory' % outdir
+            raise Exception()
+
+        # TODO: ensure uniqueness of file names
+        self.name = 'pg'+''.join([random.choice(alphabet) for a in xrange(8)])
+        self.inputfile = jp(outdir,self.name + "_in.bz2")
+        self.outputfile = jp(outdir,self.name + "_out.bz2")
+        self.jobid = ""
+
+
+    def __repr_broken__(self):
+        #TODO: fix representation
+        retstr1 = ('%s\nargs=%s\nkwlist=%s\nret=%s\ncleanup=%s' %
+                   self.f, self.args, self.kwlist, self.ret, self.cleanup)
+        retstr2 = ('\nnativeSpecification=%s\ninputfile=%s\noutputfile=%s\n' %
+                   self.nativeSpecification, self.inputfile, self.outputfile)
+        return retstr1 + retstr2
+
+    def execute(self):
+        """
+        Executes function f with given arguments
+        and writes return value to field ret.
+        If an exception is encountered during execution, ret will
+        remain empty and the exception will be written
+        to the exception field of the Job object.
+        Input data is removed after execution to save space.
+        """
+        try:
+            self.ret = apply(self.f, self.args, self.kwlist)
+
+        except Exception, e:
+
+            print "exception encountered"
+            print "type:", str(type(e))
+            print "line number:", sys.exc_info()[2].tb_lineno
+            print e
+            traceback.print_exc(file=sys.stdout)
+            print "========="
+            self.exception = e
+
+
+class KybJob(Job):
+    """
+    Specialization of generic Job that provides an interface to
+    the system at MPI Biol. Cyber. Tuebingen.
+    """
+
+    def __init__(self, f, args, kwlist={}, cleanup=True):
+        """
+        constructor of KybJob
+        """
+        Job.__init__(self, f, args, kwlist, cleanup)
+        self.h_vmem = ""
+        self.arch = ""
+        self.tmpfree = ""
+        self.h_cpu = ""
+        self.h_rt = ""
+        self.express = ""
+        self.matlab = ""
+        self.simulink = ""
+        self.compiler = ""
+        self.imagetb = ""
+        self.opttb = ""
+        self.stattb = ""
+        self.sigtb = ""
+        self.cplex = ""
+        self.nicetohave = ""
+
+
+    def getNativeSpecification(self):
+        """
+        define python-style getter
+        """
+
+        ret = ""
+
+        if (self.name != ""):
+            ret = ret + " -N " + str(self.name)
+        if (self.h_vmem != ""):
+            ret = ret + " -l " + "h_vmem" + "=" + str(self.h_vmem)
+        if (self.arch != ""):
+            ret = ret + " -l " + "arch" + "=" + str(self.arch)
+        if (self.tmpfree != ""):
+            ret = ret + " -l " + "tmpfree" + "=" + str(self.tmpfree)
+        if (self.h_cpu != ""):
+            ret = ret + " -l " + "h_cpu" + "=" + str(self.h_cpu)
+        if (self.h_rt != ""):
+            ret = ret + " -l " + "h_rt" + "=" + str(self.h_rt)
+        if (self.express != ""):
+            ret = ret + " -l " + "express" + "=" + str(self.express)
+        if (self.matlab != ""):
+            ret = ret + " -l " + "matlab" + "=" + str(self.matlab)
+        if (self.simulink != ""):
+            ret = ret + " -l " + "simulink" + "=" + str(self.simulink)
+        if (self.compiler != ""):
+            ret = ret + " -l " + "compiler" + "=" + str(self.compiler)
+        if (self.imagetb != ""):
+            ret = ret + " -l " + "imagetb" + "=" + str(self.imagetb)
+        if (self.opttb != ""):
+            ret = ret + " -l " + "opttb" + "=" + str(self.opttb)
+        if (self.stattb != ""):
+            ret = ret + " -l " + "stattb" + "=" + str(self.stattb)
+        if (self.sigtb != ""):
+            ret = ret + " -l " + "sigtb" + "=" + str(self.sigtb)
+        if (self.cplex != ""):
+            ret = ret + " -l " + "cplex" + "=" + str(self.cplex)
+        if (self.nicetohave != ""):
+            ret = ret + " -l " + "nicetohave" + "=" + str(self.nicetohave)
+
+        return ret
+
+
+    def setNativeSpecification(self, x):
+        """
+        define python-style setter
+        @param x: nativeSpecification string to be set
+        @type x: string
+        """
+
+        self.__nativeSpecification = x
+
+    nativeSpecification = property(getNativeSpecification,
+                                   setNativeSpecification)
+
+
+
+#only define this class if the multiprocessing module is present
+if multiprocessing_present:
+
+    class JobsProcess(multiprocessing.Process):
+        """
+        In case jobs are to be computed locally, a number of Jobs (possibly one)
+        are assinged to one thread.
+        """
+
+        def __init__(self, jobs):
+            """
+            Constructor
+            @param jobs: list of jobs
+            @type jobs: list of Job objects
+            """
+
+            self.jobs = jobs
+            multiprocessing.Process.__init__(self)
+
+        def run(self):
+            """
+            Executes each job in job list
+            """
+            for job in self.jobs:
+                job.execute()
+
+
+def _execute(job):
+    """Cannot pickle method instances, so fake a function.
+    Used by _process_jobs_locally"""
+    
+    return apply(job.f, job.args, job.kwlist)
+
+
+
+def _process_jobs_locally(jobs, maxNumThreads=None):
+    """
+    Local execution using the package multiprocessing, if present
+    
+    @param jobs: jobs to be executed
+    @type jobs: list<Job>
+    @param maxNumThreads: maximal number of threads
+    @type maxNumThreads: int
+    
+    @return: list of jobs, each with return in job.ret
+    @rtype: list<Job>
+    """
+    
+    
+    if (not multiprocessing_present or maxNumThreads == 1):
+        #perform sequential computation
+        for job in jobs:
+            job.execute()
+    else:
+        #print multiprocessing.cpu_count()
+        po = multiprocessing.Pool(maxNumThreads)
+        result = po.map(_execute, jobs)
+        for ix,job in enumerate(jobs):
+            job.ret = result[ix]
+            
+    return jobs
+
+
+def submit_jobs(jobs):
+    """
+    Method used to send a list of jobs onto the cluster.
+    @param jobs: list of jobs to be executed
+    @type jobs: list<Job>
+    """
+
+    s = drmaa.Session()
+    s.initialize()
+    jobids = []
+
+    for job in jobs:
+        save(job.inputfile, job)
+        jt = s.createJobTemplate()
+
+        #fetch only specific env vars from shell
+        #shell_env = {"LD_LIBRARY_PATH": os.getenv("LD_LIBRARY_PATH"),
+        #             "PYTHONPATH": os.getenv("PYTHONPATH"),
+        #             "MOSEKLM_LICENSE_FILE": os.getenv("MOSEKLM_LICENSE_FILE"),
+        #             }
+
+        # fetch env vars from shell        
+        shell_env = os.environ
+
+        if job.environment and job.replace_env:
+            # only consider defined env vars
+            jt.setEnvironment(job.environment)
+
+        elif job.environment and not job.replace_env:
+            # replace env var from shell with defined env vars
+            env = shell_env
+            env.update(job.environment)
+            jt.setEnvironment(env)
+
+        else:
+            # only consider env vars from shell
+            jt.setEnvironment(shell_env)
+
+        jt.remoteCommand = os.path.expanduser(PYGRID)
+        jt.args = [job.inputfile]
+        jt.joinFiles = True
+        jt.setNativeSpecification(job.nativeSpecification)
+        jt.outputPath = ":" + os.path.expanduser(TEMPDIR)
+        jt.errorPath = ":" + os.path.expanduser(TEMPDIR)
+
+        jobid = s.runJob(jt)
+
+        # set job fields that depend on the jobid assigned by grid engine
+        job.jobid = jobid
+        log_stdout_fn = (os.path.expanduser(TEMPDIR) + job.name + '.o' + jobid)
+        log_stderr_fn = (os.path.expanduser(TEMPDIR) + job.name + '.e' + jobid)
+
+        print 'Your job %s has been submitted with id %s' % (job.name, jobid)
+        print "stdout:", log_stdout_fn
+        print "stderr:", log_stderr_fn
+        print ""
+
+        #display tmp file size
+        # print os.system("du -h " + invars)
+
+        jobids.append(jobid)
+        s.deleteJobTemplate(jt)
+
+    sid = s.getContact()
+    s.exit()
+
+    return (sid, jobids)
+
+
+def collect_jobs(sid, jobids, joblist, wait=False):
+    """
+    Collect the results from the jobids, returns a list of Jobs
+
+    @param sid: session identifier
+    @type sid: string returned by cluster
+    @param jobids: list of job identifiers returned by the cluster
+    @type jobids: list of strings
+    @param wait: Wait for jobs to finish?
+    @type wait: Boolean, defaults to False
+    """
+
+    for ix in xrange(len(jobids)):
+        assert(jobids[ix] == joblist[ix].jobid)
+
+    s = drmaa.Session()
+    s.initialize(sid)
+
+    if wait:
+        drmaaWait = drmaa.Session.TIMEOUT_WAIT_FOREVER
+    else:
+        drmaaWait = drmaa.Session.TIMEOUT_NO_WAIT
+
+    s.synchronize(jobids, drmaaWait, True)
+    print "success: all jobs finished"
+    s.exit()
+
+    #attempt to collect results
+    retJobs = []
+    for ix, job in enumerate(joblist):
+        
+        log_stdout_fn = (os.path.expanduser(TEMPDIR) + job.name + '.o' + jobids[ix])
+        log_stderr_fn = (os.path.expanduser(TEMPDIR) + job.name + '.e' + jobids[ix])
+        
+        try:
+            retJob = load(job.outputfile)
+            assert(retJob.name == job.name)
+            retJobs.append(retJob)
+
+            #print exceptions
+            if retJob.exception != None:
+                print str(type(retJob.exception))
+                print "Exception encountered in job with log file:"
+                print log_stdout_fn
+                print retJob.exception
+
+            #remove files
+            elif retJob.cleanup:
+
+                print "cleaning up:", job.outputfile
+                os.remove(job.outputfile)
+
+                if retJob != None:
+
+                    print "cleaning up:", log_stdout_fn
+                    os.remove(log_stdout_fn)
+
+                    print "cleaning up:", log_stderr_fn
+                    #os.remove(log_stderr_fn)
+
+
+        except Exception, detail:
+            print "error while unpickling file: " + job.outputfile
+            print "this could caused by a problem with the cluster environment, imports or environment variables"
+            print "check log files for more information: "
+            print "stdout:", log_stdout_fn
+            print "stderr:", log_stderr_fn
+            
+            print detail
+
+
+
+    return retJobs
+
+
+def process_jobs(jobs, local=False, maxNumThreads=1):
+    """
+    Director function to decide whether to run on the cluster or locally
+    """
+    
+    if (not local and drmaa_present):
+        # Use submit_jobs and collect_jobs to run jobs and wait for the results.
+        (sid, jobids) = submit_jobs(jobs)
+        return collect_jobs(sid, jobids, jobs, wait=True)
+
+    elif (not local and not drmaa_present):
+        print 'Warning: import drmaa failed, computing locally'
+        return  _process_jobs_locally(jobs, maxNumThreads=maxNumThreads)
+
+    else:
+        return  _process_jobs_locally(jobs, maxNumThreads=maxNumThreads)
+
+
+def get_status(sid, jobids):
+    """
+    Get the status of all jobs in jobids.
+    Returns True if all jobs are finished.
+
+    There is some instability in determining job completion
+    """
+    _decodestatus = {
+        -42: 'sge and drmaa not in sync',
+        drmaa.Session.UNDETERMINED: 'process status cannot be determined',
+        drmaa.Session.QUEUED_ACTIVE: 'job is queued and active',
+        drmaa.Session.SYSTEM_ON_HOLD: 'job is queued and in system hold',
+        drmaa.Session.USER_ON_HOLD: 'job is queued and in user hold',
+        drmaa.Session.USER_SYSTEM_ON_HOLD: 'job is in user and system hold',
+        drmaa.Session.RUNNING: 'job is running',
+        drmaa.Session.SYSTEM_SUSPENDED: 'job is system suspended',
+        drmaa.Session.USER_SUSPENDED: 'job is user suspended',
+        drmaa.Session.DONE: 'job finished normally',
+        drmaa.Session.FAILED: 'job finished, but failed',
+        }
+
+    s = drmaa.Session()
+    s.initialize(sid)
+    status_summary = {}.fromkeys(_decodestatus, 0)
+    for jobid in jobids:
+        try:
+            curstat = s.getJobProgramStatus(jobid)
+        except drmaa.InvalidJobError, message:
+            print message
+            status_summary[-42] += 1
+        else:
+            status_summary[curstat] += 1
+
+    print 'Status of %s at %s' % (sid, time.strftime('%d/%m/%Y - %H.%M:%S'))
+    for curkey in status_summary.keys():
+        if status_summary[curkey]>0:
+            print '%s: %d' % (_decodestatus[curkey], status_summary[curkey])
+    s.exit()
+
+    return ((status_summary[drmaa.Session.DONE]
+             +status_summary[-42])==len(jobids))
+
+
+#####################################################################
+# Dealing with data
+#####################################################################
+
+
+def save(filename, myobj):
+    """
+    Save myobj to filename using pickle
+    """
+    try:
+        f = bz2.BZ2File(filename, 'wb')
+    except IOError, details:
+        sys.stderr.write('File ' + filename + ' cannot be written\n')
+        sys.stderr.write(details)
+        return
+
+    cPickle.dump(myobj, f, protocol=2)
+    f.close()
+
+
+def load(filename):
+    """
+    Load from filename using pickle
+    """
+    try:
+        f = bz2.BZ2File(filename, 'rb')
+    except IOError, details:
+        sys.stderr.write('File ' + filename + ' cannot be read\n')
+        sys.stderr.write(details)
+        return
+
+    myobj = cPickle.load(f)
+    f.close()
+    return myobj
+
+
+################################################################
+#      The following code will be executed on the cluster      #
+################################################################
+
+
+def run_job(pickleFileName):
+    """
+    This is the code that is executed on the cluster side.
+    Runs job which was pickled to a file called pickledFileName.
+
+    @param pickleFileName: filename of pickled Job object
+    @type pickleFileName: string
+    """
+
+    inPath = pickleFileName
+    job = load(inPath)
+
+    job.execute()
+
+    #remove input file
+    if job.cleanup:
+        os.remove(job.inputfile)
+
+    save(job.outputfile, job)
+
+
+class Usage(Exception):
+    """
+    Simple Exception for cmd-line user-interface.
+    """
+
+    def __init__(self, msg):
+        """
+        Constructor of simple Exception.
+
+        @param msg: exception message
+        @type msg: string
+        """
+
+        self.msg = msg
+
+
+def main(argv=None):
+    """
+    Parse the command line inputs and call run_job
+
+    @param argv: list of arguments
+    @type argv: list of strings
+    """
+
+
+    if argv is None:
+        argv = sys.argv
+
+    try:
+        try:
+            opts, args = getopt.getopt(argv[1:], "h", ["help"])
+            run_job(args[0])
+        except getopt.error, msg:
+            raise Usage(msg)
+
+
+    except Usage, err:
+        print >>sys.stderr, err.msg
+        print >>sys.stderr, "for help use --help"
+
+        return 2
+
+if __name__ == "__main__":
+    main(sys.argv)
+#!/bin/bash
+cd ../
+tar cvzf pythongrid/pythongrid-0.2.tar.gz \
+pythongrid/C_elegans_acc_gc.csv pythongrid/easysvm_example.py pythongrid/example.py \
+pythongrid/pythongrid.py pythongrid/README pythongrid/TODO pythongrid/installing-easysvm.txt
+cd pythongrid
+This is just to check that all the headers and libraries are in place
+the C DRMAA installation.
+
+If compilation fails, check the following environment variables (or use -L and -I).
+CPATH
+LIBRARY_PATH
+LD_LIBRARY_PATH
+
+
+test_drmaa
+----------
+Check whether the installation is working.
+
+gcc -o test_drmaa -ldrmaa test_drmaa.c 
+./test_drmaa
+
+The output is:
+DRMAA library was started successfully
+
+test_drmaa_run
+--------------
+Submit a small job (sleep 10).
+
+gcc -o test_drmaa_run -ldrmaa test_drmaa_run.c 
+./test_drmaa_run
+
+The output is:
+DRMAA library was started successfully
+DRMAA job.
+Job <xxxxxx> is submitted to queue <xxxx>.
+Your job has been submitted with id xxxxxx
+DRMAA quit successfully
+
+
+test_drmaa.py
+-------------
+Submit a small job (sleep 10) in python.
+
+python test_drmaa.py
+
+The output is:
+DRMAA library v1.0
+DRM system used: XXXX
+DRMAA job.
+Job <xxxxxx> is submitted to queue <xxxx>.
+Job exited using the following resources:
+{'mem': 'xx', 'start_time': 'xx', 'n_hosts': 'xx', 'vmem': 'xx'}
+

test/test_drmaa.c

+/* Some simple code just to check that installation is working */
+
+#include <drmaa.h>
+
+int main (int argc, char **argv) {
+  char error[DRMAA_ERROR_STRING_BUFFER];
+  int errnum = 0;
+  
+  errnum = drmaa_init (NULL, error, DRMAA_ERROR_STRING_BUFFER);
+  
+  if (errnum != DRMAA_ERRNO_SUCCESS) {
+    fprintf (stderr, "Could not initialize the DRMAA library: %s\n", error);
+    return 1;
+  }
+  printf ("DRMAA library was started successfully\n");
+  
+  errnum = drmaa_exit (error, DRMAA_ERROR_STRING_BUFFER);
+  
+  if (errnum != DRMAA_ERRNO_SUCCESS) {
+    fprintf (stderr, "Could not shut down the DRMAA library: %s\n", error);
+    return 1;
+  }
+  printf ("DRMAA quit successfully\n");
+  return 0;
+}

test/test_drmaa.py

+from drmaa import *
+
+s=Session()
+print('DRMAA library v%s'%str(s.version))
+# On LSF, it seems that session already initialized.
+#s.initialize()
+print('DRM system used: '+s.drmsInfo)
+jt=JobTemplate()
+jt.remoteCommand='/bin/sleep'
+jt.args=['10']
+jt.nativeSpecification=''
+jname=s.runJob(jt)
+# Explicitly kill the job
+#s.control(jname, JobControlAction.TERMINATE)
+jinfo=s.wait(jname, Session.TIMEOUT_WAIT_FOREVER)
+
+# Show all the returned information
+# print jinfo
+if (jinfo.wasAborted):
+    print('Job never ran')
+if (jinfo.hasExited):
+    print('Job exited using the following resources:')
+    print jinfo.resourceUsage
+if (jinfo.hasSignal):
+    print('Job was signalled with '+jinfo.terminatedSignal)
+s.exit()
+                         
+

test/test_drmaa_run.c

+/* Start the drmaa library, submit a job (sleep 10), and quit. */
+
+#include <drmaa.h>
+
+int main (int argc, char **argv) {
+  char error[DRMAA_ERROR_STRING_BUFFER];
+  int errnum = 0;
+  drmaa_job_template_t *jt = NULL;
+
+  errnum = drmaa_init (NULL, error, DRMAA_ERROR_STRING_BUFFER);
+  
+  if (errnum != DRMAA_ERRNO_SUCCESS) {
+    fprintf (stderr, "Could not initialize the DRMAA library: %s\n", error);
+    return 1;
+  }
+  printf ("DRMAA library was started successfully\n");
+  
+  errnum = drmaa_allocate_job_template (&jt, error, DRMAA_ERROR_STRING_BUFFER);
+  
+  if (errnum != DRMAA_ERRNO_SUCCESS) {
+    fprintf (stderr, "Could not create job template: %s\n", error);
+  }
+  else {
+    errnum = drmaa_set_attribute (jt, DRMAA_REMOTE_COMMAND, "/bin/sleep", \
+				  error, DRMAA_ERROR_STRING_BUFFER);
+    
+    if (errnum != DRMAA_ERRNO_SUCCESS) {
+      fprintf (stderr, "Could not set attribute \"%s\": %s\n",
+	       DRMAA_REMOTE_COMMAND, error);
+    }
+    else {
+      const char *args[2] = {"10", NULL};
+      
+      errnum = drmaa_set_vector_attribute (jt, DRMAA_V_ARGV, args, error, \
+					   DRMAA_ERROR_STRING_BUFFER);
+    }
+    
+    if (errnum != DRMAA_ERRNO_SUCCESS) {
+      fprintf (stderr, "Could not set attribute \"%s\": %s\n",	\
+	       DRMAA_REMOTE_COMMAND, error);
+    }
+    else {
+      char jobid[DRMAA_JOBNAME_BUFFER];
+      
+      errnum = drmaa_run_job (jobid, DRMAA_JOBNAME_BUFFER, jt, error,	\
+			      DRMAA_ERROR_STRING_BUFFER);
+      
+      if (errnum != DRMAA_ERRNO_SUCCESS) {
+	fprintf (stderr, "Could not submit job: %s\n", error);
+      }
+      else {
+	printf ("Your job has been submitted with id %s\n", jobid);
+      }
+    } /* else */
+    
+    errnum = drmaa_delete_job_template (jt, error, DRMAA_ERROR_STRING_BUFFER);
+    
+    if (errnum != DRMAA_ERRNO_SUCCESS) {
+      fprintf (stderr, "Could not delete job template: %s\n", error);
+    }
+  } /* else */
+  
+  errnum = drmaa_exit (error, DRMAA_ERROR_STRING_BUFFER);
+  
+  if (errnum != DRMAA_ERRNO_SUCCESS) {
+	  fprintf (stderr, "Could not shut down the DRMAA library: %s\n", error);
+	  return 1;
+  }
+  printf ("DRMAA quit successfully\n");
+  return 0;
+}
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.