Commits

Brad Chapman committed ac7142a Merge

Pull from galaxy-dist: Dec 20, 2012 distribution

Comments (0)

Files changed (313)

 Galaxy requires Python 2.5, 2.6 or 2.7. To check your python version, run:
 
 % python -V
-Python 2.4.4
+Python 2.7.3
 
 Start Galaxy:
 

contrib/nagios/check_galaxy.py

 via the check_galaxy.sh script in Galaxy's cron/ directory.
 """
 
-import socket, sys, os, time, tempfile, filecmp, htmllib, formatter, getopt
+import socket, sys, os, time, tempfile, filecmp, htmllib, formatter, getopt, json
 from user import home
 
 import warnings
 username = args[1]
 password = args[2]
 
-if server.endswith(".g2.bx.psu.edu"):
-    if debug:
-        print "Checking a PSU Galaxy server, using maint file"
-    maint = "/errordocument/502/%s/maint" % args[0].split('.', 1)[0]
-else:
-    maint = None
-
 new_history = False
 for o, a in opts:
     if o == "-n":
 
     def __init__(self):
         self.server = server
-        self.maint = maint
         self.tool = None
         self.tool_opts = None
-        self.id = None
-        self.status = None
+        self._hda_id = None
+        self._hda_state = None
+        self._history_id = None
         self.check_file = None
-        self.hid = None
         self.cookie_jar = os.path.join( var_dir, "cookie_jar" )
         dprint("cookie jar path: %s" % self.cookie_jar)
         if not os.access(self.cookie_jar, os.R_OK):
     def reset(self):
         self.tool = None
         self.tool_opts = None
-        self.id = None
-        self.status = None
+        self._hda_id = None
+        self._hda_state = None
+        self._history_id = None
         self.check_file = None
-        self.delete_datasets()
-        self.get("/root/history")
-        p = didParser()
-        p.feed(tc.browser.get_html())
-        if len(p.dids) > 0:
-            print "Remaining datasets ids:", " ".join( p.dids )
-            raise Exception, "History still contains datasets after attempting to delete them"
         if new_history:
             self.get("/history/delete_current")
             tc.save_cookies(self.cookie_jar)
+        self.delete_datasets()
 
     def check_redir(self, url):
         try:
             dprint( "%s is not returning redirect (302): %s" % (url, e) )
             code = tc.browser.get_code()
             if code == 502:
-                is_maint = self.check_maint()
-                if is_maint:
-                    dprint( "Galaxy is down, but a maint file was found, so not sending alert" )
-                    sys.exit(0)
-                else:
-                    print "Galaxy is down (code 502)"
-                    sys.exit(1)
-            return(False)
-
-    # checks for a maint file
-    def check_maint(self):
-        if self.maint is None:
-            #dprint( "Warning: unable to check maint file for %s" % self.server )
-            return(False)
-        try:
-            self.get(self.maint)
-            return(True)
-        except twill.errors.TwillAssertionError, e:
-            return(False)
+                print "Galaxy is down (code 502)"
+                sys.exit(1)
+            return False
 
     def login(self, user, pw):
         self.get("/user/login")
         tc.submit("runtool_btn")
         tc.code(200)
 
+    @property
+    def history_id(self):
+        if self._history_id is None:
+            self.get('/api/histories')
+            self._history_id = json.loads(tc.browser.get_html())[0]['id']
+        return self._history_id
+
+    @property
+    def history_contents(self):
+        self.get('/api/histories/%s/contents' % self.history_id)
+        return json.loads(tc.browser.get_html())
+
+    @property
+    def hda_id(self):
+        if self._hda_id is None:
+            self.set_top_hda()
+        return self._hda_id
+
+    @property
+    def hda_state(self):
+        if self._hda_state is None:
+            self.set_top_hda()
+        return self._hda_state
+
+    def set_top_hda(self):
+        self.get(self.history_contents[-1]['url'])
+        hda = json.loads(tc.browser.get_html())
+        self._hda_id = hda['id']
+        self._hda_state = hda['state']
+
+    @property
+    def undeleted_hdas(self):
+        rval = []
+        for item in self.history_contents:
+            self.get(item['url'])
+            hda = json.loads(tc.browser.get_html())
+            if hda['deleted'] == False:
+                rval.append(hda)
+        return rval
+
+    @property
+    def history_state(self):
+        self.get('/api/histories/%s' % self.history_id)
+        return json.loads(tc.browser.get_html())['state']
+
+    @property
+    def history_state_terminal(self):
+        if self.history_state not in ['queued', 'running', 'paused']:
+            return True
+        return False
+
     def wait(self):
         sleep_amount = 1
         count = 0
         maxiter = 16
         while count < maxiter:
             count += 1
-            self.get("/root/history")
-            page = tc.browser.get_html()
-            if page.find( '<!-- running: do not change this comment, used by TwillTestCase.wait -->' ) > -1:
+            if not self.history_state_terminal:
                 time.sleep( sleep_amount )
                 sleep_amount += 1
             else:
         if count == maxiter:
             raise Exception, "Tool never finished"
 
-    def check_status(self):
-        self.get("/root/history")
-        p = historyParser()
-        p.feed(tc.browser.get_html())
-        if p.status != "ok":
-            self.get("/datasets/%s/stderr" % p.id)
+    def check_state(self):
+        if self.hda_state != "ok":
+            self.get("/datasets/%s/stderr" % self.hda_id)
             print tc.browser.get_html()
-            raise Exception, "HDA %s NOT OK: %s" % (p.id, p.status)
-        self.id = p.id
-        self.status = p.status
-        #return((p.id, p.status))
+            raise Exception, "HDA %s NOT OK: %s" % (self.hda_id, self.hda_state)
 
     def diff(self):
-        self.get("/datasets/%s/display?to_ext=%s" % (self.id, self.tool_opts.get('out_format', 'fasta')))
+        self.get("/datasets/%s/display?to_ext=%s" % (self.hda_id, self.tool_opts.get('out_format', 'fasta')))
         data = tc.browser.get_html()
         tmp = tempfile.mkstemp()
         dprint("tmp file: %s" % tmp[1])
             os.remove(tmp[1])
 
     def delete_datasets(self):
-        self.get("/root/history")
-        p = didParser()
-        p.feed(tc.browser.get_html())
-        dids = p.dids
-        for did in dids:
-            self.get("/datasets/%s/delete" % did)
+        for hda in self.undeleted_hdas:
+            self.get('/datasets/%s/delete' % hda['id'])
+        hdas = [hda['id'] for hda in self.undeleted_hdas]
+        if hdas:
+            print "Remaining datasets ids:", " ".join(hdas)
+            raise Exception, "History still contains datasets after attempting to delete them"
 
     def check_if_logged_in(self):
         self.get("/user?cntrller=user")
             elif data == "User with that email already exists":
                 self.already_exists = True
 
-class historyParser(htmllib.HTMLParser):
-    def __init__(self):
-        htmllib.HTMLParser.__init__(self, formatter.NullFormatter())
-        self.status = None
-        self.id = None
-    def start_div(self, attrs):
-        # find the top history item
-        for i in attrs:
-            if i[0] == "class" and i[1].startswith("historyItemWrapper historyItem historyItem-"):
-                self.status = i[1].rsplit("historyItemWrapper historyItem historyItem-", 1)[1]
-                dprint("status: %s" % self.status)
-            if i[0] == "id" and i[1].startswith("historyItem-"):
-                self.id = i[1].rsplit("historyItem-", 1)[1]
-                dprint("id: %s" % self.id)
-        if self.status is not None:
-            self.reset()
-
-class didParser(htmllib.HTMLParser):
-    def __init__(self):
-        htmllib.HTMLParser.__init__(self, formatter.NullFormatter())
-        self.dids = []
-    def start_div(self, attrs):
-        for i in attrs:
-            if i[0] == "id" and i[1].startswith("historyItemContainer-"):
-                self.dids.append( i[1].rsplit("historyItemContainer-", 1)[1] )
-                dprint("got a dataset id: %s" % self.dids[-1])
-
 class loggedinParser(htmllib.HTMLParser):
     def __init__(self):
         htmllib.HTMLParser.__init__(self, formatter.NullFormatter())
 
         b.runtool()
         b.wait()
-        b.check_status()
+        b.check_state()
         b.diff()
         b.delete_datasets()
 
-        # by this point, everything else has succeeded.  there should be no maint.
-        is_maint = b.check_maint()
-        if is_maint:
-            print "Galaxy is up and fully functional, but a maint file is in place."
-            sys.exit(1)
-
     print "OK"
     sys.exit(0)

datatypes_conf.xml.sample

     <datatype extension="fli" type="galaxy.datatypes.tabular:FeatureLocationIndex" display_in_upload="false"/>
     <datatype extension="bam" type="galaxy.datatypes.binary:Bam" mimetype="application/octet-stream" display_in_upload="true">
       <converter file="bam_to_bai.xml" target_datatype="bai"/>
-      <converter file="bam_to_summary_tree_converter.xml" target_datatype="summary_tree" depends_on="bai"/>
+      <converter file="bam_to_summary_tree_converter.xml" target_datatype="summary_tree"/>
+      <!--
+        Caution: (a) this converter requires bedtools to be installed and (b) it is very memory intensive and
+        is not recommended for most laptops/desktops.
+        <converter file="bam_to_bigwig_converter.xml" target_datatype="bigwig"/>
+      -->
       <display file="ucsc/bam.xml" />
       <display file="ensembl/ensembl_bam.xml" />
       <display file="igv/bam.xml" />

doc/source/conf.py

             return Mock()
 
 # adding pbs_python, DRMAA_python, markupsafe, and drmaa here had no effect.
-MOCK_MODULES = ['tables', 'decorator']
+MOCK_MODULES = ['tables', 'decorator', 'numpy']
 for mod_name in MOCK_MODULES:
     sys.modules[mod_name] = Mock()

doc/source/index.rst

 Galaxy Code Documentation
 *************************
 
-Galaxy is an open, web-based platform for accessible, reproducible, and
+Galaxy_ is an open, web-based platform for accessible, reproducible, and
 transparent computational biomedical research.
 
-- Accessible: Users without programming experience can easily specify parameters and run tools and workflows.
-- Reproducible: Galaxy captures information so that any user can repeat and understand a complete computational analysis.
-- Transparent: Users share and publish analyses via the web and create Pages, interactive, web-based documents that describe a complete analysis.
+- *Accessible:* Users without programming experience can easily specify parameters and run tools and workflows.
+- *Reproducible:* Galaxy captures information so that any user can repeat and understand a complete computational analysis.
+- *Transparent:* Users share and publish analyses via the web and create Pages, interactive, web-based documents that describe a complete analysis.
+
+Two copies of the Galaxy code doumentation are published by the Galaxy Project
+
+- Galaxy-Dist_:  This describes the code in the `most recent official release`_ of Galaxy.
+- Galaxy-Central_: Describes the `current code in the development branch`_ of Galaxy.  This is the latest checkin, bleeding edge version of the code.  The documentation should never be more than an hour behind the code.
+
+Both copies are hosted at ReadTheDocs_, a publicly supported web site for hosting project documentation.
+
+If you have your own copy of the Galaxy source code, you can also generate your own version of this documentation:
+
+::
+
+    $ cd doc
+    $ make html
+
+The generated documentation will be in ``doc/build/html/`` and can be viewed with a web browser.  Note that you will need to install Sphinx and a fair number of module dependencies before this will produce output.
+
+.. _Galaxy: http://galaxyproject.org/
+.. _Galaxy-Dist: https://galaxy-dist.readthedocs.org/
+.. _most recent official release: https://bitbucket.org/galaxy/galaxy-dist
+.. _Galaxy-Central: https://galaxy-central.readthedocs.org/
+.. _current code in the development branch: https://bitbucket.org/galaxy/galaxy-central
+.. _ReadTheDocs: https://readthedocs.org/
+
+
+For more on the Galaxy Project, please visit the `project home page`_.
+
+.. _project home page: http://galaxyproject.org/
+
 
 Contents
 ========

lib/galaxy/config.py

         try:
             job_limits = global_conf_parser.items( 'galaxy:job_limits' )
             for k, v in job_limits:
-                # ConfigParser considers the first colon to be the delimiter, undo this behavior
-                more_k, v = v.split('=', 1)
+                # Since the URL contains a colon and possibly an equals sign, consider ' = ' the delimiter
+                more_k, v = v.split(' = ', 1)
                 k = '%s:%s' % (k, more_k.strip())
                 v = v.strip().rsplit(None, 1)
                 v[1] = int(v[1])

lib/galaxy/datatypes/assembly.py

 if __name__ == '__main__':
     import doctest, sys
     doctest.testmod(sys.modules[__name__])
-

lib/galaxy/datatypes/binary.py

         except:
             return "Binary bam alignments file (%s)" % ( data.nice_size( dataset.get_size() ) )
     def get_track_type( self ):
-        return "ReadTrack", {"data": "bai", "index": "summary_tree"}
+        return "ReadTrack", { "data": "bai", "index": [ "bigwig", "summary_tree" ] }
 
 Binary.register_sniffable_binary_format("bam", "bam", Bam)
 

lib/galaxy/datatypes/converters/bam_to_bigwig_converter.xml

+<tool id="CONVERTER_bam_to_bigwig_0" name="Convert BAM to BigWig" version="1.0.0" hidden="true">
+    <!--  <description>__NOT_USED_CURRENTLY_FOR_CONVERTERS__</description> -->
+    <command>
+        bedtools genomecov -bg -split -ibam $input -g $chromInfo | wigToBigWig stdin $chromInfo $output
+    </command>
+    <inputs>
+        <param format="bam" name="input" type="data" label="Choose BAM file"/>
+    </inputs>
+    <outputs>
+        <data format="bigwig" name="output"/>
+    </outputs>
+    <help>
+    </help>
+</tool>

lib/galaxy/datatypes/converters/bam_to_summary_tree_converter.xml

 <tool id="CONVERTER_bam_to_summary_tree_0" name="Convert BAM to Summary Tree" version="1.0.0" hidden="true">
-<!--  <description>__NOT_USED_CURRENTLY_FOR_CONVERTERS__</description> -->
-  <command interpreter="python">sam_or_bam_to_summary_tree_converter.py --bam $input1 $bai $output1</command>
+  <!--  <description>__NOT_USED_CURRENTLY_FOR_CONVERTERS__</description> -->
+  <command interpreter="python">
+    sam_or_bam_to_summary_tree_converter.py --bam $input1 $input1.metadata.bam_index $output1
+  </command>
   <inputs>
-    <page>
-        <param format="bam" name="input1" type="data" label="Choose BAM file"/>
-        <param format="bai" name="bai" type="data" label="BAI index file"/>
-    </page>
+    <param format="bam" name="input1" type="data" label="Choose BAM file"/>
    </inputs>
   <outputs>
     <data format="summary_tree" name="output1"/>

lib/galaxy/datatypes/converters/bedgraph_to_bigwig_converter.xml

   <!-- Used internally to generate track indexes -->
   <command>grep -v "^track" $input | wigToBigWig -clip stdin $chromInfo $output</command>
   <inputs>
-    <page>
       <param format="bedgraph" name="input" type="data" label="Choose wiggle"/>
-    </page>
    </inputs>
    <outputs>
       <data format="bigwig" name="output"/>

lib/galaxy/datatypes/data.py

     <class 'galaxy.datatypes.metadata.MetadataParameter'>
 
     """
+    #: dictionary of metadata fields for this datatype::
+    metadata_spec = None
+
     __metaclass__ = DataMeta
     # Add metadata elements
     MetadataElement( name="dbkey", desc="Database/Build", default="?", param=metadata.DBKeyParameter, multiple=False, no_value="?" )
         except UnicodeDecodeError:
             text = "binary/unknown file"
     return text
-

lib/galaxy/datatypes/metadata.py

     def __getstate__( self ):
         return None #cannot pickle a weakref item (self._parent), when data._metadata_collection is None, it will be recreated on demand
 
+
 class MetadataSpecCollection( odict ):
     """
     A simple extension of dict which allows cleaner access to items
     """
     def __init__( self, dict = None ):
         odict.__init__( self, dict = None )
+
     def append( self, item ):
         self[item.name] = item
+
     def iter( self ):
         return self.itervalues()
+
     def __getattr__( self, name ):
         return self.get( name )
 
+    def __repr__( self ):
+        # force elements to draw with __str__ for sphinx-apidoc
+        return ', '.join([ item.__str__() for item in self.iter() ])
+
+
 class MetadataParameter( object ):
     def __init__( self, spec ):
         self.spec = spec
         """
         pass
 
-
     def unwrap( self, form_value ):
         """
         Turns a value into its storable form.
         Turns a value read from an external dict into its value to be pushed directly into the metadata dict.
         """
         return value
+
     def to_external_value( self, value ):
         """
         Turns a value read from a metadata into its value to be pushed directly into the external dict.
         """
         return value
 
+
 class MetadataElementSpec( object ):
     """
     Defines a metadata element and adds it to the metadata_spec (which
     is a MetadataSpecCollection) of datatype.
     """
-
-    def __init__( self, datatype, name=None, desc=None, param=MetadataParameter, default=None, no_value = None, visible=True, set_in_upload = False, **kwargs ):
+    def __init__( self, datatype,
+                  name=None, desc=None, param=MetadataParameter, default=None, no_value = None,
+                  visible=True, set_in_upload = False, **kwargs ):
         self.name = name
         self.desc = desc or name
         self.default = default
         self.set_in_upload = set_in_upload
         # Catch-all, allows for extra attributes to be set
         self.__dict__.update(kwargs)
-        #set up param last, as it uses values set above
+        # set up param last, as it uses values set above
         self.param = param( self )
-        datatype.metadata_spec.append( self ) #add spec element to the spec
+        # add spec element to the spec
+        datatype.metadata_spec.append( self )
+
     def get( self, name, default=None ):
         return self.__dict__.get(name, default)
+
     def wrap( self, value ):
         """
         Turns a stored value into its usable form.
         """
         return self.param.wrap( value )
+
     def unwrap( self, value ):
         """
         Turns an incoming value into its storable form.
         """
         return self.param.unwrap( value )
 
+    def __str__( self ):
+        #TODO??: assuming param is the class of this MetadataElementSpec - add the plain class name for that
+        spec_dict = dict( param_class=self.param.__class__.__name__ )
+        spec_dict.update( self.__dict__ )
+        return ( "{name} ({param_class}): {desc}, defaults to '{default}'".format( **spec_dict ) )
+
+# create a statement class that, when called,
+#   will add a new MetadataElementSpec to a class's metadata_spec
 MetadataElement = Statement( MetadataElementSpec )
 
+
 """
 MetadataParameter sub-classes.
 """

lib/galaxy/datatypes/tabular.py

 """
 import pkg_resources
 pkg_resources.require( "bx-python" )
-
+import gzip
 import logging
-import data
+import os
+from cgi import escape
 from galaxy import util
-from cgi import escape
+from galaxy.datatypes import data
 from galaxy.datatypes import metadata
+from galaxy.datatypes.checkers import is_gzip
 from galaxy.datatypes.metadata import MetadataElement
-import galaxy_utils.sequence.vcf
-from sniff import *
+from galaxy.datatypes.sniff import get_headers
 from galaxy.util.json import to_json_string
 
 log = logging.getLogger(__name__)
         return to_json_string({'ck_data': ck_data, 'ck_index': ck_index+1})
 
     def display_data(self, trans, dataset, preview=False, filename=None, to_ext=None, chunk=None):
-        #TODO Prevent failure when displaying extremely long > 50kb lines.
         if chunk:
             return self.get_chunk(trans, dataset, chunk)
-        if to_ext or not preview:
+        elif dataset.metadata.columns > 50:
+            #Fancy tabular display is only suitable for datasets without an incredibly large number of columns.
+            #We should add a new datatype 'matrix', with it's own draw method, suitable for this kind of data.
+            #For now, default to the old behavior, ugly as it is.  Remove this after adding 'matrix'.
+            max_peek_size = 1000000 # 1 MB
+            if not preview or os.stat( dataset.file_name ).st_size < max_peek_size:
+                return open( dataset.file_name )
+            else:
+                trans.response.set_content_type( "text/html" )
+                return trans.stream_template_mako( "/dataset/large_file.mako",
+                                            truncated_data = open( dataset.file_name ).read(max_peek_size),
+                                            data = dataset)
+        elif to_ext or not preview:
             return self._serve_raw(trans, dataset, to_ext)
         else:
             column_names = 'null'
             - LANE, TILEm X, Y, INDEX, READ_NO, SEQ, QUAL, POSITION, *STRAND, FILT must be correct
             - We will only check that up to the first 5 alignments are correctly formatted.
         """
-        import gzip
         try:
             compress = is_gzip(filename)
             if compress:

lib/galaxy/jobs/__init__.py

                 # Update (non-library) job output datasets through the object store
                 if dataset not in job.output_library_datasets:
                     self.app.object_store.update_from_file(dataset.dataset, create=True)
+                # Pause any dependent jobs (and those jobs' outputs)
+                for dep_job_assoc in dataset.dependent_jobs:
+                    self.pause( dep_job_assoc.job, "Execution of this dataset's job is paused because its input datasets are in an error state." )
                 self.sa_session.add( dataset )
                 self.sa_session.flush()
             job.state = job.states.ERROR
         if self.app.config.cleanup_job == 'always' or (self.app.config.cleanup_job == 'onsuccess' and job.state == job.states.DELETED):
             self.cleanup()
 
+    def pause( self, job=None, message=None ):
+        if job is None:
+            job = self.get_job()
+        if message is None:
+            message = "Execution of this dataset's job is paused"
+        if job.state == job.states.NEW:
+            for dataset_assoc in job.output_datasets + job.output_library_datasets:
+                dataset_assoc.dataset.dataset.state = dataset_assoc.dataset.dataset.states.PAUSED
+                dataset_assoc.dataset.info = message
+                self.sa_session.add( dataset_assoc.dataset )
+            job.state = job.states.PAUSED
+            self.sa_session.add( job )
+
     def change_state( self, state, info = False ):
         job = self.get_job()
         self.sa_session.refresh( job )
                 log.debug( "setting dataset state to ERROR" )
                 # TODO: This is where the state is being set to error. Change it!
                 dataset_assoc.dataset.dataset.state = model.Dataset.states.ERROR
+                # Pause any dependent jobs (and those jobs' outputs)
+                for dep_job_assoc in dataset_assoc.dataset.dependent_jobs:
+                    self.pause( dep_job_assoc.job, "Execution of this dataset's job is paused because its input datasets are in an error state." )
             else:
                 dataset_assoc.dataset.dataset.state = model.Dataset.states.OK
             # If any of the rest of the finish method below raises an
             if self.app.config.set_metadata_externally:
                 self.external_output_metadata.cleanup_external_metadata( self.sa_session )
             galaxy.tools.imp_exp.JobExportHistoryArchiveWrapper( self.job_id ).cleanup_after_job( self.sa_session )
-            galaxy.tools.imp_exp.JobImportHistoryArchiveWrapper( self.job_id ).cleanup_after_job( self.sa_session )
+            galaxy.tools.imp_exp.JobImportHistoryArchiveWrapper( self.app, self.job_id ).cleanup_after_job()
             galaxy.tools.genome_index.GenomeIndexToolWrapper( self.job_id ).postprocessing( self.sa_session, self.app )
             self.app.object_store.delete(self.get_job(), base_dir='job_work', entire_dir=True, dir_only=True, extra_dir=str(self.job_id))
         except:

lib/galaxy/jobs/handler.py

 
 # States for running a job. These are NOT the same as data states
 JOB_WAIT, JOB_ERROR, JOB_INPUT_ERROR, JOB_INPUT_DELETED, JOB_READY, JOB_DELETED, JOB_ADMIN_DELETED, JOB_USER_OVER_QUOTA = 'wait', 'error', 'input_error', 'input_deleted', 'ready', 'deleted', 'admin_deleted', 'user_over_quota'
+DEFAULT_JOB_PUT_FAILURE_MESSAGE = 'Unable to run job due to a misconfiguration of the Galaxy job running system.  Please contact a site administrator.'
 
 class JobHandler( object ):
     """
     a JobRunner.
     """
     STOP_SIGNAL = object()
+
     def __init__( self, app, dispatcher ):
         """Start the job manager"""
         self.app = app
             if job.tool_id not in self.app.toolbox.tools_by_id:
                 log.warning( "(%s) Tool '%s' removed from tool config, unable to recover job" % ( job.id, job.tool_id ) )
                 JobWrapper( job, self ).fail( 'This tool was disabled before the job completed.  Please contact your Galaxy administrator.' )
-            elif job.job_runner_name is None:
-                log.debug( "(%s) No job runner assigned and job still in '%s' state, adding to the job handler queue" % ( job.id, job.state ) )
+            elif job.job_runner_name is None or (job.job_runner_name is not None and job.job_runner_external_id is None):
+                if job.job_runner_name is None:
+                    log.debug( "(%s) No job runner assigned and job still in '%s' state, adding to the job handler queue" % ( job.id, job.state ) )
+                else:
+                    log.debug( "(%s) Job runner assigned but no external ID recorded, adding to the job handler queue" % job.id )
                 if self.track_jobs_in_database:
                     job.state = model.Job.states.NEW
                 else:
                     .join(model.HistoryDatasetAssociation) \
                     .join(model.Dataset) \
                     .filter(and_((model.Job.state == model.Job.states.NEW),
-                                 or_((model.HistoryDatasetAssociation._state != None),
+                                 or_((model.HistoryDatasetAssociation._state == model.HistoryDatasetAssociation.states.FAILED_METADATA),
                                      (model.HistoryDatasetAssociation.deleted == True ),
                                      (model.Dataset.state != model.Dataset.states.OK ),
                                      (model.Dataset.deleted == True)))).subquery()
                 elif job_state == JOB_USER_OVER_QUOTA:
                     log.info( "(%d) User (%s) is over quota: job paused" % ( job.id, job.user_id ) )
                     job.state = model.Job.states.PAUSED
+                    for dataset_assoc in job.output_datasets + job.output_library_datasets:
+                        dataset_assoc.dataset.dataset.state = model.Dataset.states.PAUSED
+                        dataset_assoc.dataset.info = "Execution of this dataset's job is paused because you were over your disk quota at the time it was ready to run"
+                        self.sa_session.add( dataset_assoc.dataset.dataset )
                     self.sa_session.add( job )
                 else:
                     log.error( "(%d) Job in unknown state '%s'" % ( job.id, job_state ) )
     def put( self, job_wrapper ):
         try:
             runner_name = self.__get_runner_name( job_wrapper )
+        except Exception, e:
+            failure_message = getattr(e, 'failure_message', DEFAULT_JOB_PUT_FAILURE_MESSAGE )
+            if failure_message == DEFAULT_JOB_PUT_FAILURE_MESSAGE:
+                log.exception( 'Failed to generate job runner name' )
+            else:
+                log.debug( "Intentionally failing job with message (%s)" % failure_message )
+            job_wrapper.fail( failure_message )
+            return
+        try:
             if self.app.config.use_tasked_jobs and job_wrapper.tool.parallelism is not None and isinstance(job_wrapper, TaskWrapper):
                 #DBTODO Refactor
                 log.debug( "dispatching task %s, of job %d, to %s runner" %( job_wrapper.task_id, job_wrapper.job_id, runner_name ) )
             self.job_runners[runner_name].put( job_wrapper )
         except KeyError:
             log.error( 'put(): (%s) Invalid job runner: %s' % ( job_wrapper.job_id, runner_name ) )
-            job_wrapper.fail( 'Unable to run job due to a misconfiguration of the Galaxy job running system.  Please contact a site administrator.' )
+            job_wrapper.fail( DEFAULT_JOB_PUT_FAILURE_MESSAGE )
 
     def stop( self, job ):
         """
             self.job_runners[runner_name].recover( job, job_wrapper )
         except KeyError:
             log.error( 'recover(): (%s) Invalid job runner: %s' % ( job_wrapper.job_id, runner_name ) )
-            job_wrapper.fail( 'Unable to run job due to a misconfiguration of the Galaxy job running system.  Please contact a site administrator.' )
+            job_wrapper.fail( DEFAULT_JOB_PUT_FAILURE_MESSAGE )
 
     def shutdown( self ):
         for runner in self.job_runners.itervalues():

lib/galaxy/jobs/manager.py

 from sqlalchemy.sql.expression import and_, or_
 
 from galaxy import model
-from galaxy.jobs import handler, Sleeper, NoopQueue
+from galaxy.jobs import handler, Sleeper, NoopQueue, JobWrapper
 from galaxy.util.json import from_json_string
 
 log = logging.getLogger( __name__ )

lib/galaxy/jobs/mapper.py

 
 DYNAMIC_RUNNER_PREFIX = "dynamic:///"
 
+class JobMappingException( Exception ):
+
+    def __init__( self, failure_message ):
+        self.failure_message = failure_message
+
+
 class JobRunnerMapper( object ):
     """
     This class is responsible to managing the mapping of jobs
 
     def __cache_job_runner_url( self, params ):
         # If there's already a runner set in the Job object, don't overwrite from the tool
-        if self.job_runner_name is not None:
+        if self.job_runner_name is not None and not self.job_runner_name.startswith('tasks'):
             raw_job_runner_url = self.job_runner_name
         else:
             raw_job_runner_url = self.job_wrapper.tool.get_job_runner_url( params )

lib/galaxy/jobs/runners/cli.py

 
     def recover( self, job, job_wrapper ):
         """Recovers jobs stuck in the queued/running state when Galaxy started"""
+        job_id = job.get_job_runner_external_id()
+        if job_id is None:
+            self.put( job_wrapper )
+            return
         runner_job_state = RunnerJobState()
         runner_job_state.ofile = "%s.gjout" % os.path.join(job_wrapper.working_directory, job_wrapper.get_id_tag())
         runner_job_state.efile = "%s.gjerr" % os.path.join(job_wrapper.working_directory, job_wrapper.get_id_tag())
         runner_job_state.ecfile = "%s.gjec" % os.path.join(job_wrapper.working_directory, job_wrapper.get_id_tag())
         runner_job_state.job_file = "%s/galaxy_%s.sh" % (self.app.config.cluster_files_directory, job_wrapper.get_id_tag())
-        runner_job_state.external_job_id = str( job.job_runner_external_id )
+        runner_job_state.external_job_id = str( job_id )
         job_wrapper.command_line = job.command_line
         runner_job_state.job_wrapper = job_wrapper
         runner_job_state.runner_url = job.job_runner_name

lib/galaxy/jobs/runners/condor.py

     def recover( self, job, job_wrapper ):
         """Recovers jobs stuck in the queued/running state when Galaxy started"""
         # TODO Check if we need any changes here
+        job_id = job.get_job_runner_external_id()
+        if job_id is None:
+            self.put( job_wrapper )
+            return
         drm_job_state = CondorJobState()
         drm_job_state.ofile = "%s/database/pbs/%s.o" % (os.getcwd(), job.id)
         drm_job_state.efile = "%s/database/pbs/%s.e" % (os.getcwd(), job.id)
         drm_job_state.job_file = "%s/database/pbs/galaxy_%s.sh" % (os.getcwd(), job.id)
-        drm_job_state.job_id = str( job.job_runner_external_id )
+        drm_job_state.job_id = str( job_id )
         drm_job_state.runner_url = job_wrapper.get_job_runner()
         job_wrapper.command_line = job.command_line
         drm_job_state.job_wrapper = job_wrapper

lib/galaxy/jobs/runners/drmaa.py

             galaxy_job_id = drm_job_state.job_wrapper.job_id
             old_state = drm_job_state.old_state
             try:
+                assert job_id not in ( None, 'None' ), 'Invalid job id: %s' % job_id
                 state = self.ds.jobStatus( job_id )
             # InternalException was reported to be necessary on some DRMs, but
             # this could cause failures to be detected as completion!  Please
                 continue
             except Exception, e:
                 # so we don't kill the monitor thread
-                log.exception("(%s/%s) Unable to check job status" % ( galaxy_job_id, job_id ) )
+                log.exception("(%s/%s) Unable to check job status: %s" % ( galaxy_job_id, job_id, str( e ) ) )
                 log.warning("(%s/%s) job will now be errored" % ( galaxy_job_id, job_id ) )
                 drm_job_state.fail_message = "Cluster could not complete job"
                 self.work_queue.put( ( 'fail', drm_job_state ) )
 
     def recover( self, job, job_wrapper ):
         """Recovers jobs stuck in the queued/running state when Galaxy started"""
+        job_id = job.get_job_runner_external_id()
+        if job_id is None:
+            self.put( job_wrapper )
+            return
         drm_job_state = DRMAAJobState()
         drm_job_state.ofile = "%s.drmout" % os.path.join(os.getcwd(), job_wrapper.working_directory, job_wrapper.get_id_tag())
         drm_job_state.efile = "%s.drmerr" % os.path.join(os.getcwd(), job_wrapper.working_directory, job_wrapper.get_id_tag())
         drm_job_state.ecfile = "%s.drmec" % os.path.join(os.getcwd(), job_wrapper.working_directory, job_wrapper.get_id_tag())
         drm_job_state.job_file = "%s/galaxy_%s.sh" % (self.app.config.cluster_files_directory, job.get_id())
-        drm_job_state.job_id = str( job.get_job_runner_external_id() )
+        drm_job_state.job_id = str( job_id )
         drm_job_state.runner_url = job_wrapper.get_job_runner_url()
         job_wrapper.command_line = job.get_command_line()
         drm_job_state.job_wrapper = job_wrapper

lib/galaxy/jobs/runners/pbs.py

 
     def recover( self, job, job_wrapper ):
         """Recovers jobs stuck in the queued/running state when Galaxy started"""
+        job_id = job.get_job_runner_external_id()
+        if job_id is None:
+            self.put( job_wrapper )
+            return
         pbs_job_state = PBSJobState()
         pbs_job_state.ofile = "%s/%s.o" % (self.app.config.cluster_files_directory, job.id)
         pbs_job_state.efile = "%s/%s.e" % (self.app.config.cluster_files_directory, job.id)
         pbs_job_state.ecfile = "%s/%s.ec" % (self.app.config.cluster_files_directory, job.id)
         pbs_job_state.job_file = "%s/%s.sh" % (self.app.config.cluster_files_directory, job.id)
-        pbs_job_state.job_id = str( job.get_job_runner_external_id() )
+        pbs_job_state.job_id = str( job_id )
         pbs_job_state.runner_url = job_wrapper.get_job_runner_url()
         job_wrapper.command_line = job.command_line
         pbs_job_state.job_wrapper = job_wrapper

lib/galaxy/jobs/runners/sge.py

-import os, logging, threading, time
-from Queue import Queue, Empty
-
-from galaxy import model
-from galaxy.jobs.runners import BaseJobRunner
-
-from paste.deploy.converters import asbool
-
-import pkg_resources
-
-egg_message = """
-
-The 'sge' runner depends on 'DRMAA_python' which is not installed.  Galaxy's
-"scramble" system should make this installation simple, please follow the
-instructions found at:
-
-  http://wiki.g2.bx.psu.edu/Admin/Config/Performance/Cluster
-
-Additional errors may follow:
-%s
-"""
-
-
-try:
-    pkg_resources.require( "DRMAA_python" )
-    import DRMAA
-except Exception, e:
-    raise Exception( egg_message % str( e ) )
-
-
-log = logging.getLogger( __name__ )
-
-__all__ = [ 'SGEJobRunner' ]
-
-DRMAA_state = {
-    DRMAA.Session.UNDETERMINED: 'process status cannot be determined',
-    DRMAA.Session.QUEUED_ACTIVE: 'job is queued and waiting to be scheduled',
-    DRMAA.Session.SYSTEM_ON_HOLD: 'job is queued and in system hold',
-    DRMAA.Session.USER_ON_HOLD: 'job is queued and in user hold',
-    DRMAA.Session.USER_SYSTEM_ON_HOLD: 'job is queued and in user and system hold',
-    DRMAA.Session.RUNNING: 'job is running',
-    DRMAA.Session.SYSTEM_SUSPENDED: 'job is system suspended',
-    DRMAA.Session.USER_SUSPENDED: 'job is user suspended',
-    DRMAA.Session.DONE: 'job finished normally',
-    DRMAA.Session.FAILED: 'job finished, but failed',
-}
-
-sge_template = """#!/bin/sh
-#$ -S /bin/sh
-GALAXY_LIB="%s"
-if [ "$GALAXY_LIB" != "None" ]; then
-    if [ -n "$PYTHONPATH" ]; then
-        PYTHONPATH="$GALAXY_LIB:$PYTHONPATH"
-    else
-        PYTHONPATH="$GALAXY_LIB"
-    fi
-    export PYTHONPATH
-fi
-cd %s
-%s
-"""
-
-class SGEJobState( object ):
-    def __init__( self ):
-        """
-        Encapsulates state related to a job that is being run via SGE and 
-        that we need to monitor.
-        """
-        self.job_wrapper = None
-        self.job_id = None
-        self.old_state = None
-        self.running = False
-        self.job_file = None
-        self.ofile = None
-        self.efile = None
-        self.runner_url = None
-
-class SGEJobRunner( BaseJobRunner ):
-    """
-    Job runner backed by a finite pool of worker threads. FIFO scheduling
-    """
-    STOP_SIGNAL = object()
-    def __init__( self, app ):
-        """Initialize this job runner and start the monitor thread"""
-        self.app = app
-        self.sa_session = app.model.context
-        # 'watched' and 'queue' are both used to keep track of jobs to watch.
-        # 'queue' is used to add new watched jobs, and can be called from
-        # any thread (usually by the 'queue_job' method). 'watched' must only
-        # be modified by the monitor thread, which will move items from 'queue'
-        # to 'watched' and then manage the watched jobs.
-        self.watched = []
-        self.monitor_queue = Queue()
-        self.default_cell = self.determine_sge_cell( self.app.config.default_cluster_job_runner )
-        self.ds = DRMAA.Session()
-        self.ds.init( self.default_cell )
-        self.monitor_thread = threading.Thread( target=self.monitor )
-        self.monitor_thread.start()
-        self.work_queue = Queue()
-        self.work_threads = []
-        nworkers = app.config.cluster_job_queue_workers
-        for i in range( nworkers ):
-            worker = threading.Thread( target=self.run_next )
-            worker.start()
-            self.work_threads.append( worker )
-        log.debug( "%d workers ready" % nworkers )
-
-    def determine_sge_cell( self, url ):
-        """Determine what SGE cell we are using"""
-        url_split = url.split("/")
-        if url_split[0] == 'sge:':
-            return url_split[2]
-        # this could happen if sge is started, but is not the default runner
-        else:
-            return ''
-
-    def determine_sge_queue( self, url ):
-        """Determine what SGE queue we are submitting to"""
-        try:
-            return url.split('/')[3] or None
-        except:
-            return None
-
-    def determine_sge_project( self, url ):
-        """Determine what SGE project we are submitting to"""
-        try:
-            return url.split('/')[4] or None
-        except:
-            return None
-
-    def determine_sge_tool_parameters( self, url ):
-        """Determine what are the tool's specific paramters"""
-        try:
-            return url.split('/')[5] or None
-        except:
-            return None
-
-    def run_next( self ):
-        """
-        Run the next item in the queue (a job waiting to run or finish )
-        """
-        while 1:
-            ( op, obj ) = self.work_queue.get()
-            if op is self.STOP_SIGNAL:
-                return
-            try:
-                if op == 'queue':
-                    self.queue_job( obj )
-                elif op == 'finish':
-                    self.finish_job( obj )
-                elif op == 'fail':
-                    self.fail_job( obj )
-            except:
-                log.exception( "Uncaught exception %sing job" % op )
-
-    def queue_job( self, job_wrapper ):
-        """Create SGE script for a job and submit it to the SGE queue"""
-
-        try:
-            job_wrapper.prepare()
-            command_line = self.build_command_line( job_wrapper, include_metadata = True )
-        except:
-            job_wrapper.fail( "failure preparing job", exception=True )
-            log.exception("failure running job %d" % job_wrapper.job_id)
-            return
-
-        runner_url = job_wrapper.get_job_runner_url()
-        
-        # This is silly, why would we queue a job with no command line?
-        if not command_line:
-            job_wrapper.finish( '', '' )
-            return
-        
-        # Check for deletion before we change state
-        if job_wrapper.get_state() == model.Job.states.DELETED:
-            log.debug( "Job %s deleted by user before it entered the SGE queue" % job_wrapper.job_id )
-            job_wrapper.cleanup()
-            return
-
-        # Change to queued state immediately
-        job_wrapper.change_state( model.Job.states.QUEUED )
-        
-        if self.determine_sge_cell( runner_url ) != self.default_cell:
-            # TODO: support multiple cells
-            log.warning( "(%s) Using multiple SGE cells is not supported.  This job will be submitted to the default cell." % job_wrapper.job_id )
-        sge_queue_name = self.determine_sge_queue( runner_url )
-        sge_project_name = self.determine_sge_project( runner_url )
-        sge_extra_params = self.determine_sge_tool_parameters ( runner_url )
-
-        # define job attributes
-        ofile = "%s/%s.o" % (self.app.config.cluster_files_directory, job_wrapper.job_id)
-        efile = "%s/%s.e" % (self.app.config.cluster_files_directory, job_wrapper.job_id)
-        jt = self.ds.createJobTemplate()
-        jt.remoteCommand = "%s/database/pbs/galaxy_%s.sh" % (os.getcwd(), job_wrapper.job_id)
-        jt.outputPath = ":%s" % ofile
-        jt.errorPath = ":%s" % efile
-        nativeSpec = []
-        if sge_queue_name is not None:
-            nativeSpec.append( "-q '%s'" % sge_queue_name )
-        if sge_project_name is not None:
-            nativeSpec.append( "-P '%s'" % sge_project_name)
-        if sge_extra_params is not None:
-            nativeSpec.append( sge_extra_params ) 
-        if len(nativeSpec)>0:
-            jt.nativeSpecification = ' '.join(nativeSpec)
-
-        script = sge_template % (job_wrapper.galaxy_lib_dir, os.path.abspath( job_wrapper.working_directory ), command_line)
-
-        fh = file( jt.remoteCommand, "w" )
-        fh.write( script )
-        fh.close()
-        os.chmod( jt.remoteCommand, 0750 )
-
-        # job was deleted while we were preparing it
-        if job_wrapper.get_state() == model.Job.states.DELETED:
-            log.debug( "Job %s deleted by user before it entered the SGE queue" % job_wrapper.job_id )
-            self.cleanup( ( ofile, efile, jt.remoteCommand ) )
-            job_wrapper.cleanup()
-            return
-
-        galaxy_job_id = job_wrapper.job_id
-        log.debug("(%s) submitting file %s" % ( galaxy_job_id, jt.remoteCommand ) )
-        log.debug("(%s) command is: %s" % ( galaxy_job_id, command_line ) )
-        # runJob will raise if there's a submit problem
-        job_id = self.ds.runJob(jt)
-        if sge_queue_name is None:
-            log.debug("(%s) queued in default queue as %s" % (galaxy_job_id, job_id) )
-        else:
-            log.debug("(%s) queued in %s queue as %s" % (galaxy_job_id, sge_queue_name, job_id) )
-
-        # store runner information for tracking if Galaxy restarts
-        job_wrapper.set_runner( runner_url, job_id )
-
-        # Store SGE related state information for job
-        sge_job_state = SGEJobState()
-        sge_job_state.job_wrapper = job_wrapper
-        sge_job_state.job_id = job_id
-        sge_job_state.ofile = ofile
-        sge_job_state.efile = efile
-        sge_job_state.job_file = jt.remoteCommand
-        sge_job_state.old_state = 'new'
-        sge_job_state.running = False
-        sge_job_state.runner_url = runner_url
-        
-        # delete the job template
-        self.ds.deleteJobTemplate( jt )
-
-        # Add to our 'queue' of jobs to monitor
-        self.monitor_queue.put( sge_job_state )
-
-    def monitor( self ):
-        """
-        Watches jobs currently in the PBS queue and deals with state changes
-        (queued to running) and job completion
-        """
-        while 1:
-            # Take any new watched jobs and put them on the monitor list
-            try:
-                while 1: 
-                    sge_job_state = self.monitor_queue.get_nowait()
-                    if sge_job_state is self.STOP_SIGNAL:
-                        # TODO: This is where any cleanup would occur
-                        self.ds.exit()
-                        return
-                    self.watched.append( sge_job_state )
-            except Empty:
-                pass
-            # Iterate over the list of watched jobs and check state
-            self.check_watched_items()
-            # Sleep a bit before the next state check
-            time.sleep( 1 )
-            
-    def check_watched_items( self ):
-        """
-        Called by the monitor thread to look at each watched job and deal
-        with state changes.
-        """
-        new_watched = []
-        for sge_job_state in self.watched:
-            job_id = sge_job_state.job_id
-            galaxy_job_id = sge_job_state.job_wrapper.job_id
-            old_state = sge_job_state.old_state
-            try:
-                state = self.ds.getJobProgramStatus( job_id )
-            except DRMAA.InvalidJobError:
-                # we should only get here if an orphaned job was put into the queue at app startup
-                log.debug("(%s/%s) job left SGE queue" % ( galaxy_job_id, job_id ) )
-                self.work_queue.put( ( 'finish', sge_job_state ) )
-                continue
-            except Exception, e:
-                # so we don't kill the monitor thread
-                log.exception("(%s/%s) Unable to check job status" % ( galaxy_job_id, job_id ) )
-                log.warning("(%s/%s) job will now be errored" % ( galaxy_job_id, job_id ) )
-                sge_job_state.fail_message = "Cluster could not complete job"
-                self.work_queue.put( ( 'fail', sge_job_state ) )
-                continue
-            if state != old_state:
-                log.debug("(%s/%s) state change: %s" % ( galaxy_job_id, job_id, DRMAA_state[state] ) )
-            if state == DRMAA.Session.RUNNING and not sge_job_state.running:
-                sge_job_state.running = True
-                sge_job_state.job_wrapper.change_state( model.Job.states.RUNNING )
-            if state in ( DRMAA.Session.DONE, DRMAA.Session.FAILED ):
-                self.work_queue.put( ( 'finish', sge_job_state ) )
-                continue
-            sge_job_state.old_state = state
-            new_watched.append( sge_job_state )
-        # Replace the watch list with the updated version
-        self.watched = new_watched
-        
-    def finish_job( self, sge_job_state ):
-        """
-        Get the output/error for a finished job, pass to `job_wrapper.finish`
-        and cleanup all the SGE temporary files.
-        """
-        ofile = sge_job_state.ofile
-        efile = sge_job_state.efile
-        job_file = sge_job_state.job_file
-        # collect the output
-        try:
-            ofh = file(ofile, "r")
-            efh = file(efile, "r")
-            stdout = ofh.read( 32768 )
-            stderr = efh.read( 32768 )
-        except:
-            stdout = ''
-            stderr = 'Job output not returned from cluster'
-            log.debug(stderr)
-
-        try:
-            sge_job_state.job_wrapper.finish( stdout, stderr )
-        except:
-            log.exception("Job wrapper finish method failed")
-
-        # clean up the sge files
-        self.cleanup( ( ofile, efile, job_file ) )
-
-    def fail_job( self, sge_job_state ):
-        """
-        Seperated out so we can use the worker threads for it.
-        """
-        self.stop_job( self.sa_session.query( self.app.model.Job ).get( sge_job_state.job_wrapper.job_id ) )
-        sge_job_state.job_wrapper.fail( sge_job_state.fail_message )
-        self.cleanup( ( sge_job_state.ofile, sge_job_state.efile, sge_job_state.job_file ) )
-
-    def cleanup( self, files ):
-        if not asbool( self.app.config.get( 'debug', False ) ):
-            for file in files:
-                if os.access( file, os.R_OK ):
-                    os.unlink( file )
-
-    def put( self, job_wrapper ):
-        """Add a job to the queue (by job identifier)"""
-        # Change to queued state before handing to worker thread so the runner won't pick it up again
-        job_wrapper.change_state( model.Job.states.QUEUED )
-        self.work_queue.put( ( 'queue', job_wrapper ) )
-
-    def shutdown( self ):
-        """Attempts to gracefully shut down the monitor thread"""
-        log.info( "sending stop signal to worker threads" )
-        self.monitor_queue.put( self.STOP_SIGNAL )
-        for i in range( len( self.work_threads ) ):
-            self.work_queue.put( ( self.STOP_SIGNAL, None ) )
-        log.info( "sge job runner stopped" )
-
-    def stop_job( self, job ):
-        """Attempts to delete a job from the SGE queue"""
-        try:
-            self.ds.control( job.get_job_runner_external_id(), DRMAA.Session.TERMINATE )
-            log.debug( "(%s/%s) Removed from SGE queue at user's request" % ( job.get_id(), job.get_job_runner_external_id() ) )
-        except DRMAA.InvalidJobError:
-            log.debug( "(%s/%s) User killed running job, but it was already dead" % ( job.get_id(), job.get_job_runner_external_id() ) )
-
-    def recover( self, job, job_wrapper ):
-        """Recovers jobs stuck in the queued/running state when Galaxy started"""
-        sge_job_state = SGEJobState()
-        sge_job_state.ofile = "%s/database/pbs/%s.o" % (os.getcwd(), job.get_id())
-        sge_job_state.efile = "%s/database/pbs/%s.e" % (os.getcwd(), job.get_id())
-        sge_job_state.job_file = "%s/database/pbs/galaxy_%s.sh" % (os.getcwd(), job.get_id())
-        sge_job_state.job_id = str( job.get_job_runner_external_id() )
-        sge_job_state.runner_url = job_wrapper.get_job_runner_url()
-        job_wrapper.command_line = job.get_command_line()
-        sge_job_state.job_wrapper = job_wrapper
-        if job.get_state() == model.Job.states.RUNNING:
-            log.debug( "(%s/%s) is still in running state, adding to the SGE queue" % ( job.get_id(), job.get_job_runner_external_id() ) )
-            sge_job_state.old_state = DRMAA.Session.RUNNING
-            sge_job_state.running = True
-            self.monitor_queue.put( sge_job_state )
-        elif job.get_state() == model.Job.states.QUEUED:
-            log.debug( "(%s/%s) is still in SGE queued state, adding to the SGE queue" % ( job.get_id(), job.get_job_runner_external_id() ) )
-            sge_job_state.old_state = DRMAA.Session.QUEUED_ACTIVE
-            sge_job_state.running = False
-            self.monitor_queue.put( sge_job_state )

lib/galaxy/jobs/splitters/multi.py

 import os, logging,  shutil
+import inspect
 from galaxy import model, util
 
 
             output_file_name = str(outputs[output][1])
             base_output_name = os.path.basename(output_file_name)
             if output in merge_outputs:
-                output_type = outputs[output][0].datatype
+                output_dataset = outputs[output][0]
+                output_type = output_dataset.datatype
                 output_files = [os.path.join(dir,base_output_name) for dir in task_dirs]
                 # Just include those files f in the output list for which the 
                 # file f exists; some files may not exist if a task fails.
                     if len(output_files) < len(task_dirs):
                         log.debug('merging only %i out of expected %i files for %s'
                                   % (len(output_files), len(task_dirs), output_file_name))
-                    output_type.merge(output_files, output_file_name)
+                    # First two args to merge always output_files and path of dataset. More
+                    # complicated merge methods may require more parameters. Set those up here.
+                    extra_merge_arg_names = inspect.getargspec( output_type.merge ).args[2:]
+                    extra_merge_args = {}
+                    if "output_dataset" in extra_merge_arg_names:
+                        extra_merge_args["output_dataset"] = output_dataset
+                    output_type.merge(output_files, output_file_name, **extra_merge_args)
                     log.debug('merge finished: %s' % output_file_name)
                 else:
                     msg = 'nothing to merge for %s (expected %i files)' \

lib/galaxy/model/__init__.py

 Naming: try to use class names that have a distinct plural form so that
 the relationship cardinalities are obvious (e.g. prefer Dataset to Data)
 """
+
 import pkg_resources
-pkg_resources.require( "simplejson" )
-import simplejson
+pkg_resources.require("simplejson")
+pkg_resources.require("pexpect")
+import simplejson, os, errno, codecs, operator, socket, pexpect, logging, time
 import galaxy.datatypes
-from galaxy.util.bunch import Bunch
-from galaxy import util
 import galaxy.datatypes.registry
 from galaxy.datatypes.metadata import MetadataCollection
-from galaxy.security import RBACAgent, get_permitted_actions
-from galaxy.util.hash_util import *
-from galaxy.web.form_builder import *
+from galaxy.security import get_permitted_actions
+from galaxy import util
+from galaxy.util.bunch import Bunch
+from galaxy.util.hash_util import new_secure_hash
+from galaxy.web.form_builder import (AddressField, CheckboxField, PasswordField, SelectField, TextArea, TextField,
+                                    WorkflowField, WorkflowMappingField, HistoryField)
 from galaxy.model.item_attrs import UsesAnnotations, APIItem
 from sqlalchemy.orm import object_session
 from sqlalchemy.sql.expression import func
-import os.path, os, errno, codecs, operator, socket, pexpect, logging, time, shutil
-
-if sys.version_info[:2] < ( 2, 5 ):
-    from sets import Set as set
 
 log = logging.getLogger( __name__ )
 
         self.exit_code = None
 
     # TODO: Add accessors for members defined in SQL Alchemy for the Job table and
-    # for the mapper defined to the Job table. 
+    # for the mapper defined to the Job table.
     def get_external_output_metadata( self ):
         """
-        The external_output_metadata is currently a reference from Job to 
+        The external_output_metadata is currently a reference from Job to
         JobExternalOutputMetadata. It exists for a job but not a task.
         """
-        return self.external_output_metadata 
+        return self.external_output_metadata
     def get_session_id( self ):
         return self.session_id
     def get_user_id( self ):
         # runner_name is not the same thing.
         return self.job_runner_name
     def get_job_runner_external_id( self ):
-        # This is different from the Task just in the member accessed: 
+        # This is different from the Task just in the member accessed:
         return self.job_runner_external_id
     def get_post_job_actions( self ):
         return self.post_job_actions
         # The tasks member is pert of a reference in the SQL Alchemy schema:
         return self.tasks
     def get_id_tag( self ):
-        """ 
-        Return a tag that can be useful in identifying a Job. 
+        """
+        Return a tag that can be useful in identifying a Job.
         This returns the Job's get_id
-        """ 
+        """
         return "%s" % self.id;
 
     def set_session_id( self, session_id ):
         self.task_runner_name = None
         self.task_runner_external_id = None
         self.job = job
-        self.stdout = "" 
-        self.stderr = "" 
+        self.stdout = ""
+        self.stderr = ""
         self.exit_code = None
         self.prepare_input_files_cmd = prepare_files_cmd
 
         return param_dict
 
     def get_id( self ):
-        # This is defined in the SQL Alchemy schema: 
-        return self.id 
+        # This is defined in the SQL Alchemy schema:
+        return self.id
     def get_id_tag( self ):
         """
         Return an id tag suitable for identifying the task.
     # metdata). These can be filled in as needed.
     def get_external_output_metadata( self ):
         """
-        The external_output_metadata is currently a backref to 
+        The external_output_metadata is currently a backref to
         JobExternalOutputMetadata. It exists for a job but not a task,
         and when a task is cancelled its corresponding parent Job will
         be cancelled. So None is returned now, but that could be changed
         """
         Runners will use the same methods to get information about the Task
         class as they will about the Job class, so this method just returns
-        the task's external id. 
+        the task's external id.
         """
         # TODO: Merge into get_runner_external_id.
         return self.task_runner_external_id
     def get_session_id( self ):
         # The Job's galaxy session is equal to the Job's session, so the
-        # Job's session is the same as the Task's session. 
+        # Job's session is the same as the Task's session.
         return self.get_job().get_session_id()
 
     def set_id( self, id ):
         # This method is available for runners that do not want/need to
         # differentiate between the kinds of Runnable things (Jobs and Tasks)
         # that they're using.
-        log.debug( "Task %d: Set external id to %s" 
+        log.debug( "Task %d: Set external id to %s"
                  % ( self.id, task_runner_external_id ) )
         self.task_runner_external_id = task_runner_external_id
     def set_task_runner_external_id( self, task_runner_external_id ):
     def resume_paused_jobs( self ):
         for dataset in self.datasets:
             job = dataset.creating_job
-            if job.state == Job.states.PAUSED:
-                job.set_state(Job.states.QUEUED)
+            if job is not None and job.state == Job.states.PAUSED:
+                job.set_state(Job.states.NEW)
     def get_disk_size( self, nice_size=False ):
         # unique datasets only
         db_session = object_session( self )
                     EMPTY = 'empty',
                     ERROR = 'error',
                     DISCARDED = 'discarded',
+                    PAUSED = 'paused',
                     SETTING_METADATA = 'setting_metadata',
                     FAILED_METADATA = 'failed_metadata' )
+
+    conversion_messages = Bunch( PENDING = "pending",
+                                 NO_DATA = "no data",
+                                 NO_CHROMOSOME = "no chromosome",
+                                 NO_CONVERTER = "no converter",
+                                 NO_TOOL = "no tool",
+                                 DATA = "data",
+                                 ERROR = "error",
+                                 OK = "ok" )
+
     permitted_actions = get_permitted_actions( filter='DATASET' )
     file_path = "/tmp/"
     object_store = None # This get initialized in mapping.py (method init) by app.py
             return False
         try:
             return util.is_multi_byte( codecs.open( self.file_name, 'r', 'utf-8' ).read( 100 ) )
-        except UnicodeDecodeError, e:
+        except UnicodeDecodeError:
             return False
     # FIXME: sqlalchemy will replace this
     def _delete(self):
 class DatasetInstance( object ):
     """A base class for all 'dataset instances', HDAs, LDAs, etc"""
     states = Dataset.states
+    conversion_messages = Dataset.conversion_messages
     permitted_actions = Dataset.permitted_actions
     def __init__( self, id=None, hid=None, name=None, info=None, blurb=None, peek=None, tool_version=None, extension=None,
                   dbkey=None, metadata=None, history=None, dataset=None, deleted=False, designation=None,
         """
         Returns dict of { "dependency" => HDA }
         """
-        converted_dataset = self.get_converted_files_by_type( target_ext )
         # List of string of dependencies
         try:
             depends_list = trans.app.datatypes_registry.converter_deps[self.extension][target_ext]
                 if dep_dataset is None:
                     # None means converter is running first time
                     return None
-                elif dep_dataset.state == trans.app.model.Job.states.ERROR:
+                elif dep_dataset.state == Job.states.ERROR:
                     raise ConverterDependencyException("A dependency (%s) was in an error state." % dependency)
-                elif dep_dataset.state != trans.app.model.Job.states.OK:
+                elif dep_dataset.state != Job.states.OK:
                     # Pending
                     return None
                 deps[dependency] = dep_dataset
         for name, value in self.metadata.items():
             # HACK: MetadataFile objects do not have a type/ext, so need to use metadata name
             # to determine type.
-            if dataset_ext == 'bai' and name == 'bam_index' and isinstance( value, trans.app.model.MetadataFile ):
+            if dataset_ext == 'bai' and name == 'bam_index' and isinstance( value, MetadataFile ):
                 # HACK: MetadataFile objects cannot be used by tools, so return
                 # a fake HDA that points to metadata file.
-                fake_dataset = trans.app.model.Dataset( state=trans.app.model.Dataset.states.OK,
-                                                        external_filename=value.file_name )
-                fake_hda = trans.app.model.HistoryDatasetAssociation( dataset=fake_dataset )
+                fake_dataset = Dataset( state=Dataset.states.OK, external_filename=value.file_name )
+                fake_hda = HistoryDatasetAssociation( dataset=fake_dataset )
                 return fake_hda
     def clear_associated_files( self, metadata_safe = False, purge = False ):
         raise 'Unimplemented'
         """
         Returns datasources for dataset; if datasources are not available
         due to indexing, indexing is started. Return value is a dictionary
-        with entries of type 
+        with entries of type
         (<datasource_type> : {<datasource_name>, <indexing_message>}).
         """
         track_type, data_sources = self.datatype.get_track_type()
         data_sources_dict = {}
         msg = None
-        for source_type, data_source in data_sources.iteritems():
+        for source_type, source_list in data_sources.iteritems():
+            data_source = None
             if source_type == "data_standalone":
                 # Nothing to do.
                 msg = None
+                data_source = source_list
             else:
                 # Convert.
-                msg = self.convert_dataset( trans, data_source )
-            
+                if isinstance( source_list, str ):
+                    source_list = [ source_list ]
+
+                # Loop through sources until viable one is found.
+                for source in source_list:
+                    msg = self.convert_dataset( trans, source )
+                    if msg == self.conversion_messages.PENDING:
+                        data_source = source
+                        break
+
             # Store msg.
-            data_sources_dict[ source_type ] = { "name" : data_source, "message": msg }
-        
+            data_sources_dict[ source_type ] = { "name": data_source, "message": msg }
+
         return data_sources_dict
 
     def convert_dataset( self, trans, target_type ):
         """
-        Converts a dataset to the target_type and returns a message indicating 
+        Converts a dataset to the target_type and returns a message indicating
         status of the conversion. None is returned to indicate that dataset
-        was converted successfully. 
+        was converted successfully.
         """
 
-        # FIXME: copied from controller.py
-        messages = Bunch(
-            PENDING = "pending",
-            NO_DATA = "no data",
-            NO_CHROMOSOME = "no chromosome",
-            NO_CONVERTER = "no converter",
-            NO_TOOL = "no tool",
-            DATA = "data",
-            ERROR = "error",
-            OK = "ok"
-        )
-
         # Get converted dataset; this will start the conversion if necessary.
         try:
             converted_dataset = self.get_converted_dataset( trans, target_type )
         except NoConverterException:
-            return messages.NO_CONVERTER
+            return self.conversion_messages.NO_CONVERTER
         except ConverterDependencyException, dep_error:
-            return { 'kind': messages.ERROR, 'message': dep_error.value }
+            return { 'kind': self.conversion_messages.ERROR, 'message': dep_error.value }
 
         # Check dataset state and return any messages.
         msg = None
-        if converted_dataset and converted_dataset.state == trans.app.model.Dataset.states.ERROR:
-            job_id = trans.sa_session.query( trans.app.model.JobToOutputDatasetAssociation ) \
+        if converted_dataset and converted_dataset.state == Dataset.states.ERROR:
+            job_id = trans.sa_session.query( JobToOutputDatasetAssociation ) \
                         .filter_by( dataset_id=converted_dataset.id ).first().job_id
-            job = trans.sa_session.query( trans.app.model.Job ).get( job_id )
-            msg = { 'kind': messages.ERROR, 'message': job.stderr }
-        elif not converted_dataset or converted_dataset.state != trans.app.model.Dataset.states.OK:
-            msg = messages.PENDING
+            job = trans.sa_session.query( Job ).get( job_id )
+            msg = { 'kind': self.conversion_messages.ERROR, 'message': job.stderr }
+        elif not converted_dataset or converted_dataset.state != Dataset.states.OK:
+            msg = self.conversion_messages.PENDING