Commits

Nate Coraor committed 1c71749 Merge

merge changes from next-stable branch

Comments (0)

Files changed (224)

 database/community_files
 database/compiled_templates
 database/files
+database/job_working_directory
 database/pbs
 database/tmp
 database/*.sqlite
 # Python bytecode
 *.pyc
 
+# Tool Shed Runtime Files
+community_webapp.log
+community_webapp.pid
+hgweb.config*
+
 # Config files
 universe_wsgi.ini
 reports_wsgi.ini
+a4113cc1cb5eaa68091c9a73375f00555b66dd11 release_2013.01.13
-Copyright (c) 2005 Pennsylvania State University
+Copyright (c) 2005-2013 Pennsylvania State University
 
-Permission is hereby granted, free of charge, to any person obtaining 
-a copy of this software and associated documentation files (the 
-"Software"), to deal in the Software without restriction, including 
-without limitation the rights to use, copy, modify, merge, publish, 
-distribute, sublicense, and/or sell copies of the Software, and to 
-permit persons to whom the Software is furnished to do so, subject to 
-the following conditions:
+Licensed under the Academic Free License version 3.0
 
-The above copyright notice and this permission notice shall be 
-included in all copies or substantial portions of the Software.
+ 1) Grant of Copyright License. Licensor grants You a worldwide, royalty-free, 
+    non-exclusive, sublicensable license, for the duration of the copyright, to 
+    do the following:
 
-THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 
-EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 
-MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. 
-IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY 
-CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, 
-TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE 
-SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. 
+    a) to reproduce the Original Work in copies, either alone or as part of a 
+       collective work;
+
+    b) to translate, adapt, alter, transform, modify, or arrange the Original 
+       Work, thereby creating derivative works ("Derivative Works") based upon 
+       the Original Work;
+
+    c) to distribute or communicate copies of the Original Work and Derivative 
+       Works to the public, under any license of your choice that does not 
+       contradict the terms and conditions, including Licensor's reserved 
+       rights and remedies, in this Academic Free License;
+
+    d) to perform the Original Work publicly; and
+
+    e) to display the Original Work publicly.
+
+ 2) Grant of Patent License. Licensor grants You a worldwide, royalty-free, 
+    non-exclusive, sublicensable license, under patent claims owned or 
+    controlled by the Licensor that are embodied in the Original Work as 
+    furnished by the Licensor, for the duration of the patents, to make, use, 
+    sell, offer for sale, have made, and import the Original Work and 
+    Derivative Works.
+
+ 3) Grant of Source Code License. The term "Source Code" means the preferred 
+    form of the Original Work for making modifications to it and all available 
+    documentation describing how to modify the Original Work. Licensor agrees 
+    to provide a machine-readable copy of the Source Code of the Original Work 
+    along with each copy of the Original Work that Licensor distributes. 
+    Licensor reserves the right to satisfy this obligation by placing a 
+    machine-readable copy of the Source Code in an information repository 
+    reasonably calculated to permit inexpensive and convenient access by You 
+    for as long as Licensor continues to distribute the Original Work.
+
+ 4) Exclusions From License Grant. Neither the names of Licensor, nor the 
+    names of any contributors to the Original Work, nor any of their 
+    trademarks or service marks, may be used to endorse or promote products 
+    derived from this Original Work without express prior permission of the 
+    Licensor. Except as expressly stated herein, nothing in this License 
+    grants any license to Licensor's trademarks, copyrights, patents, trade 
+    secrets or any other intellectual property. No patent license is granted 
+    to make, use, sell, offer for sale, have made, or import embodiments of 
+    any patent claims other than the licensed claims defined in Section 2. 
+    No license is granted to the trademarks of Licensor even if such marks 
+    are included in the Original Work. Nothing in this License shall be 
+    interpreted to prohibit Licensor from licensing under terms different 
+    from this License any Original Work that Licensor otherwise would have a 
+    right to license.
+
+ 5) External Deployment. The term "External Deployment" means the use, 
+    distribution, or communication of the Original Work or Derivative Works 
+    in any way such that the Original Work or Derivative Works may be used by 
+    anyone other than You, whether those works are distributed or 
+    communicated to those persons or made available as an application 
+    intended for use over a network. As an express condition for the grants 
+    of license hereunder, You must treat any External Deployment by You of 
+    the Original Work or a Derivative Work as a distribution under 
+    section 1(c).
+
+ 6) Attribution Rights. You must retain, in the Source Code of any Derivative 
+    Works that You create, all copyright, patent, or trademark notices from 
+    the Source Code of the Original Work, as well as any notices of licensing 
+    and any descriptive text identified therein as an "Attribution Notice." 
+    You must cause the Source Code for any Derivative Works that You create 
+    to carry a prominent Attribution Notice reasonably calculated to inform 
+    recipients that You have modified the Original Work.
+
+ 7) Warranty of Provenance and Disclaimer of Warranty. Licensor warrants that 
+    the copyright in and to the Original Work and the patent rights granted 
+    herein by Licensor are owned by the Licensor or are sublicensed to You 
+    under the terms of this License with the permission of the contributor(s) 
+    of those copyrights and patent rights. Except as expressly stated in the 
+    immediately preceding sentence, the Original Work is provided under this 
+    License on an "AS IS" BASIS and WITHOUT WARRANTY, either express or 
+    implied, including, without limitation, the warranties of 
+    non-infringement, merchantability or fitness for a particular purpose. 
+    THE ENTIRE RISK AS TO THE QUALITY OF THE ORIGINAL WORK IS WITH YOU. This 
+    DISCLAIMER OF WARRANTY constitutes an essential part of this License. 
+    No license to the Original Work is granted by this License except under 
+    this disclaimer.
+
+ 8) Limitation of Liability. Under no circumstances and under no legal 
+    theory, whether in tort (including negligence), contract, or otherwise, 
+    shall the Licensor be liable to anyone for any indirect, special, 
+    incidental, or consequential damages of any character arising as a result 
+    of this License or the use of the Original Work including, without 
+    limitation, damages for loss of goodwill, work stoppage, computer failure 
+    or malfunction, or any and all other commercial damages or losses. This 
+    limitation of liability shall not apply to the extent applicable law 
+    prohibits such limitation.
+
+ 9) Acceptance and Termination. If, at any time, You expressly assented to 
+    this License, that assent indicates your clear and irrevocable acceptance 
+    of this License and all of its terms and conditions. If You distribute or 
+    communicate copies of the Original Work or a Derivative Work, You must 
+    make a reasonable effort under the circumstances to obtain the express 
+    assent of recipients to the terms of this License. This License 
+    conditions your rights to undertake the activities listed in Section 1, 
+    including your right to create Derivative Works based upon the Original 
+    Work, and doing so without honoring these terms and conditions is 
+    prohibited by copyright law and international treaty. Nothing in this 
+    License is intended to affect copyright exceptions and limitations 
+    (including "fair use" or "fair dealing"). This License shall terminate 
+    immediately and You may no longer exercise any of the rights granted to 
+    You by this License upon your failure to honor the conditions in 
+    Section 1(c).
+
+10) Termination for Patent Action. This License shall terminate 
+    automatically and You may no longer exercise any of the rights granted 
+    to You by this License as of the date You commence an action, including 
+    a cross-claim or counterclaim, against Licensor or any licensee alleging 
+    that the Original Work infringes a patent. This termination provision 
+    shall not apply for an action alleging patent infringement by 
+    combinations of the Original Work with other software or hardware.
+
+11) Jurisdiction, Venue and Governing Law. Any action or suit relating to 
+    this License may be brought only in the courts of a jurisdiction wherein 
+    the Licensor resides or in which Licensor conducts its primary business, 
+    and under the laws of that jurisdiction excluding its conflict-of-law 
+    provisions. The application of the United Nations Convention on 
+    Contracts for the International Sale of Goods is expressly excluded. Any 
+    use of the Original Work outside the scope of this License or after its 
+    termination shall be subject to the requirements and penalties of 
+    copyright or patent law in the appropriate jurisdiction. This section 
+    shall survive the termination of this License.
+
+12) Attorneys' Fees. In any action to enforce the terms of this License or 
+    seeking damages relating thereto, the prevailing party shall be entitled 
+    to recover its costs and expenses, including, without limitation, 
+    reasonable attorneys' fees and costs incurred in connection with such 
+    action, including any appeal of such action. This section shall survive 
+    the termination of this License.
+
+13) Miscellaneous. If any provision of this License is held to be 
+    unenforceable, such provision shall be reformed only to the extent 
+    necessary to make it enforceable.
+
+14) Definition of "You" in This License. "You" throughout this License, 
+    whether in upper or lower case, means an individual or a legal entity 
+    exercising rights under, and complying with all of the terms of, this 
+    License. For legal entities, "You" includes any entity that controls, is 
+    controlled by, or is under common control with you. For purposes of this 
+    definition, "control" means (i) the power, direct or indirect, to cause 
+    the direction or management of such entity, whether by contract or 
+    otherwise, or (ii) ownership of fifty percent (50%) or more of the 
+    outstanding shares, or (iii) beneficial ownership of such entity.
+
+15) Right to Use. You may use the Original Work in all ways not otherwise 
+    restricted or conditioned by this License or by law, and Licensor 
+    promises not to interfere with or be responsible for such uses by You.
+
+16) Modification of This License. This License is Copyright © 2005 Lawrence 
+    Rosen. Permission is granted to copy, distribute, or communicate this 
+    License without modification. Nothing in this License permits You to 
+    modify this License as applied to the Original Work or to Derivative 
+    Works. However, You may modify the text of this License and copy, 
+    distribute or communicate your modified version (the "Modified 
+    License") and apply it to other original works of authorship subject to 
+    the following conditions: (i) You may not indicate in any way that your 
+    Modified License is the "Academic Free License" or "AFL" and you may not 
+    use those names in the name of your Modified License; (ii) You must 
+    replace the notice specified in the first paragraph above with the 
+    notice "Licensed under <insert your license name here>" or with a notice 
+    of your own that is not confusingly similar to the notice in this 
+    License; and (iii) You may not claim that your original works are open 
+    source software unless your Modified License has been approved by Open 
+    Source Initiative (OSI) and You comply with its license review and 
+    certification process.
+
 
 Some icons found in Galaxy are from the Silk Icons set, available under
 the Creative Commons Attribution 2.5 License, from:
 
 http://www.famfamfam.com/lab/icons/silk/
+
+
+Other images and documentation are licensed under the Creative Commons Attribution 3.0 (CC BY 3.0) License.   See 
+
+http://creativecommons.org/licenses/by/3.0/

community_wsgi.ini.sample

 log_level = DEBUG
 
 # Database connection
-#database_file = database/community.sqlite
+database_file = database/community.sqlite
 # You may use a SQLAlchemy connection string to specify an external database instead
 #database_connection = postgres:///community_test?host=/var/run/postgresql
 

doc/source/lib/galaxy.jobs.runners.rst

 
     galaxy.jobs.runners.cli_job
     galaxy.jobs.runners.cli_shell
+    galaxy.jobs.runners.lwr_client
 

doc/source/lib/galaxy.util.rst

 .. toctree::
 
     galaxy.util.backports
+    galaxy.util.log
 

doc/source/lib/galaxy.webapps.community.controllers.rst

     :undoc-members:
     :show-inheritance:
 
-:mod:`workflow` Module
-----------------------
-
-.. automodule:: galaxy.webapps.community.controllers.workflow
-    :members:
-    :undoc-members:
-    :show-inheritance:
-

doc/source/lib/galaxy.webapps.community.util.rst

     :undoc-members:
     :show-inheritance:
 
+:mod:`workflow_util` Module
+---------------------------
+
+.. automodule:: galaxy.webapps.community.util.workflow_util
+    :members:
+    :undoc-members:
+    :show-inheritance:
+

doc/source/lib/galaxy.webapps.galaxy.api.rst

 Quickstart
 ==========
 
-Set the following option in universe_wsgi.ini and start the server::
-
-        enable_api = True
-
 Log in as your user, navigate to the API Keys page in the User menu, and
 generate a new API key.  Make a note of the API key, and then pull up a
 terminal.  Now we'll use the display.py script in your galaxy/scripts/api
 simplejson = 2.1.1
 threadframe = 0.2
 guppy = 0.1.8
+; msgpack_python = 0.2.4
 
 [eggs:noplatform]
 amqplib = 0.6.1
 Babel = 0.9.4
 wchartype = 0.1
 Whoosh = 0.3.18
+; fluent_logger = 0.3.3
 
 ; extra version information
 [tags]

lib/galaxy/app.py

         self.config = config.Configuration( **kwargs )
         self.config.check()
         config.configure_logging( self.config )
+        self.configure_fluent_log()
         # Determine the database url
         if self.config.database_connection:
             db_url = self.config.database_connection
                                    db_url,
                                    self.config.database_engine_options,
                                    database_query_profiling_proxy = self.config.database_query_profiling_proxy,
-                                   object_store = self.object_store )
+                                   object_store = self.object_store,
+                                   trace_logger=self.trace_logger )
         # Manage installed tool shed repositories.
         self.installed_repository_manager = galaxy.tool_shed.InstalledRepositoryManager( self )
         # Create an empty datatypes registry.
         self.job_stop_queue = self.job_manager.job_stop_queue
         # Initialize the external service types
         self.external_service_types = external_service_types.ExternalServiceTypesCollection( self.config.external_service_type_config_file, self.config.external_service_type_path, self )
+
     def shutdown( self ):
         self.job_manager.shutdown()
         self.object_store.shutdown()
                 os.unlink( self.datatypes_registry.integrated_datatypes_configs )
         except:
             pass
+
+    def configure_fluent_log( self ):
+        if self.config.fluent_log:
+            from galaxy.util.log.fluent_log import FluentTraceLogger
+            self.trace_logger = FluentTraceLogger( 'galaxy', self.config.fluent_host, self.config.fluent_port ) 
+        else:
+            self.trace_logger = None

lib/galaxy/config.py

         self.test_conf = resolve_path( kwargs.get( "test_conf", "" ), self.root )
         # The value of migrated_tools_config is the file reserved for containing only those tools that have been eliminated from the distribution
         # and moved to the tool shed.
-        self.migrated_tools_config = resolve_path( "migrated_tools_conf.xml", self.root )
+        self.migrated_tools_config = resolve_path( kwargs.get( 'migrated_tools_config', 'migrated_tools_conf.xml' ), self.root )
         if 'tool_config_file' in kwargs:
             tcf = kwargs[ 'tool_config_file' ]
         elif 'tool_config_files' in kwargs:
         self.api_folders = string_as_bool( kwargs.get( 'api_folders', False ) )
         # This is for testing new library browsing capabilities.
         self.new_lib_browse = string_as_bool( kwargs.get( 'new_lib_browse', False ) )
+        # Logging with fluentd
+        self.fluent_log = string_as_bool( kwargs.get( 'fluent_log', False ) )
+        self.fluent_host = kwargs.get( 'fluent_host', 'localhost' )
+        self.fluent_port = int( kwargs.get( 'fluent_port', 24224 ) )
 
     def __read_tool_job_config( self, global_conf_parser, section, key ):
         try:

lib/galaxy/datatypes/converters/interval_to_fli.py

 
 import sys, optparse
 from galaxy import eggs
+import pkg_resources; pkg_resources.require( "bx-python" )
+from bx.tabular.io import Comment
 from galaxy.datatypes.util.gff_util import GFFReaderWrapper, read_unordered_gtf, convert_gff_coords_to_bed
 
 def main():
             in_reader = read_unordered_gtf( open( in_fname, 'r' ) )
 
         for feature in in_reader:
+            if isinstance( feature, Comment ):
+                continue
+
             for name in feature.attributes:
                 val = feature.attributes[ name ]
                 try:

lib/galaxy/datatypes/tabular.py

 from galaxy.datatypes import metadata
 from galaxy.datatypes.checkers import is_gzip
 from galaxy.datatypes.metadata import MetadataElement
-from galaxy.datatypes.sniff import get_headers
+from galaxy.datatypes.sniff import get_headers, get_test_fname
 from galaxy.util.json import to_json_string
 
 log = logging.getLogger(__name__)

lib/galaxy/datatypes/util/gff_util.py

         key_fn = lambda fields: fields[0] + '_' + get_transcript_id( fields )
 
     
-    # Aggregate intervals by transcript_id.
+    # Aggregate intervals by transcript_id and collect comments.
     feature_intervals = odict()
+    comments = []
     for count, line in enumerate( iterator ):
+        if line.startswith( '#' ):
+            comments.append( Comment( line ) )
+            continue
+
         line_key = key_fn( line.split('\t') )
         if line_key in feature_intervals:
             feature = feature_intervals[ line_key ]
     for features in chroms_features_sorted:
         features.sort( lambda a,b: cmp( a.start, b.start ) )
         
-    # Yield.
+    # Yield comments first, then features.
+    # FIXME: comments can appear anywhere in file, not just the beginning. 
+    # Ideally, then comments would be associated with features and output 
+    # just before feature/line.
+    for comment in comments:
+        yield comment
+
     for chrom_features in chroms_features_sorted:
         for feature in chrom_features:
             yield feature

lib/galaxy/jobs/__init__.py

         self.__user_system_pwent = None
         self.__galaxy_system_pwent = None
 
+    def can_split( self ):
+        # Should the job handler split this job up?
+        return self.app.config.use_tasked_jobs and self.tool.parallelism
+
     def get_job_runner_url( self ):
         return self.job_runner_mapper.get_job_runner_url( self.params )
 
+    def get_parallelism(self):
+        return self.tool.parallelism
+
     # legacy naming
     get_job_runner = get_job_runner_url
 
             self.prepare_input_files_cmds = None
         self.status = task.states.NEW
 
+    def can_split( self ):
+        # Should the job handler split this job up? TaskWrapper should 
+        # always return False as the job has already been split.
+        return False
+
     def get_job( self ):
         if self.job_id:
             return self.sa_session.query( model.Job ).get( self.job_id )
         return
     def shutdown( self ):
         return
+
+class ParallelismInfo(object):
+    """
+    Stores the information (if any) for running multiple instances of the tool in parallel
+    on the same set of inputs.
+    """
+    def __init__(self, tag):
+        self.method = tag.get('method')
+        if isinstance(tag, dict):
+            items = tag.iteritems()
+        else:
+            items = tag.attrib.items()
+        self.attributes = dict([item for item in items if item[0] != 'method' ])
+        if len(self.attributes) == 0:
+            # legacy basic mode - provide compatible defaults
+            self.attributes['split_size'] = 20
+            self.attributes['split_mode'] = 'number_of_parts'

lib/galaxy/jobs/handler.py

             log.debug( 'Loaded job runner: %s' % display_name )
 
     def __get_runner_name( self, job_wrapper ):
-        if self.app.config.use_tasked_jobs and job_wrapper.tool.parallelism is not None and not isinstance(job_wrapper, TaskWrapper):
+        if job_wrapper.can_split():
             runner_name = "tasks"
         else:
             runner_name = ( job_wrapper.get_job_runner_url().split(":", 1) )[0]
             job_wrapper.fail( failure_message )
             return
         try:
-            if self.app.config.use_tasked_jobs and job_wrapper.tool.parallelism is not None and isinstance(job_wrapper, TaskWrapper):
+            if isinstance(job_wrapper, TaskWrapper):
                 #DBTODO Refactor
                 log.debug( "dispatching task %s, of job %d, to %s runner" %( job_wrapper.task_id, job_wrapper.job_id, runner_name ) )
             else:

lib/galaxy/jobs/runners/__init__.py

 import os, logging, os.path
 
+from galaxy import model
+from Queue import Queue, Empty
+import time
+import threading
+
 log = logging.getLogger( __name__ )
 
 class BaseJobRunner( object ):
-    def build_command_line( self, job_wrapper, include_metadata=False ):
+    def build_command_line( self, job_wrapper, include_metadata=False, include_work_dir_outputs=True ):
         """
         Compose the sequence of commands necessary to execute a job. This will
         currently include:
             - commands to set metadata (if include_metadata is True)
         """
 
-        def in_directory( file, directory ):
-            """
-            Return true, if the common prefix of both is equal to directory
-            e.g. /a/b/c/d.rst and directory is /a/b, the common prefix is /a/b
-            """
-
-            # Make both absolute.
-            directory = os.path.abspath( directory )
-            file = os.path.abspath( file )
-
-            return os.path.commonprefix( [ file, directory ] ) == directory
-
         commands = job_wrapper.get_command_line()
         # All job runners currently handle this case which should never
         # occur
             commands = "; ".join( job_wrapper.dependency_shell_commands + [ commands ] ) 
 
         # -- Append commands to copy job outputs based on from_work_dir attribute. --
+        if include_work_dir_outputs:
+            work_dir_outputs = self.get_work_dir_outputs( job_wrapper )
+            if work_dir_outputs:
+                commands += "; " + "; ".join( [ "cp %s %s" % ( source_file, destination ) for ( source_file, destination ) in work_dir_outputs ] )
+
+        # Append metadata setting commands, we don't want to overwrite metadata
+        # that was copied over in init_meta(), as per established behavior
+        if include_metadata and self.app.config.set_metadata_externally:
+            commands += "; cd %s; " % os.path.abspath( os.getcwd() )
+            commands += job_wrapper.setup_external_metadata( 
+                            exec_dir = os.path.abspath( os.getcwd() ),
+                            tmp_dir = job_wrapper.working_directory,
+                            dataset_files_path = self.app.model.Dataset.file_path,
+                            output_fnames = job_wrapper.get_output_fnames(),
+                            set_extension = False,
+                            kwds = { 'overwrite' : False } ) 
+        return commands
+
+    def get_work_dir_outputs( self, job_wrapper ):
+        """
+        Returns list of pairs (source_file, destination) describing path
+        to work_dir output file and ultimate destination.
+        """
+
+        def in_directory( file, directory ):
+            """
+            Return true, if the common prefix of both is equal to directory
+            e.g. /a/b/c/d.rst and directory is /a/b, the common prefix is /a/b
+            """
+
+            # Make both absolute.
+            directory = os.path.abspath( directory )
+            file = os.path.abspath( file )
+
+            return os.path.commonprefix( [ file, directory ] ) == directory
 
         # Set up dict of dataset id --> output path; output path can be real or 
         # false depending on outputs_to_working_directory
                 path = dataset_path.false_path
             output_paths[ dataset_path.dataset_id ] = path
 
+        output_pairs = []
         # Walk job's output associations to find and use from_work_dir attributes.
         job = job_wrapper.get_job()
         job_tool = self.app.toolbox.tools_by_id.get( job.tool_id, None )
                             source_file = os.path.join( os.path.abspath( job_wrapper.working_directory ), hda_tool_output.from_work_dir )
                             destination = output_paths[ dataset.dataset_id ]
                             if in_directory( source_file, job_wrapper.working_directory ):
-                                try:
-                                    commands += "; cp %s %s" % ( source_file, destination )
-                                    log.debug( "Copying %s to %s as directed by from_work_dir" % ( source_file, destination ) )
-                                except ( IOError, OSError ):
-                                    log.debug( "Could not copy %s to %s as directed by from_work_dir" % ( source_file, destination ) )
+                                output_pairs.append( ( source_file, destination ) )
+                                log.debug( "Copying %s to %s as directed by from_work_dir" % ( source_file, destination ) )
                             else:
                                 # Security violation.
                                 log.exception( "from_work_dir specified a location not in the working directory: %s, %s" % ( source_file, job_wrapper.working_directory ) )
+        return output_pairs
 
 
+class ClusterJobState( object ):
+    """
+    Encapsulate the state of a cluster job, this should be subclassed as
+    needed for various job runners to capture additional information needed
+    to communicate with cluster job manager.
+    """
 
-        # Append metadata setting commands, we don't want to overwrite metadata
-        # that was copied over in init_meta(), as per established behavior
-        if include_metadata and self.app.config.set_metadata_externally:
-            commands += "; cd %s; " % os.path.abspath( os.getcwd() )
-            commands += job_wrapper.setup_external_metadata( 
-                            exec_dir = os.path.abspath( os.getcwd() ),
-                            tmp_dir = job_wrapper.working_directory,
-                            dataset_files_path = self.app.model.Dataset.file_path,
-                            output_fnames = job_wrapper.get_output_fnames(),
-                            set_extension = False,
-                            kwds = { 'overwrite' : False } ) 
-        return commands
+    def __init__( self ):
+        self.job_wrapper = None
+        self.job_id = None
+        self.old_state = None
+        self.running = False
+        self.runner_url = None
+
+STOP_SIGNAL = object()
+
+JOB_STATUS_QUEUED = 'queue'
+JOB_STATUS_FAILED = 'fail'
+JOB_STATUS_FINISHED = 'finish'
+
+class ClusterJobRunner( BaseJobRunner ):
+    """
+    Not sure this is the best name for this class, but there is common code
+    shared between sge, pbs, drmaa, etc...
+    """
+
+    def __init__( self, app ):
+        self.app = app
+        self.sa_session = app.model.context
+        # 'watched' and 'queue' are both used to keep track of jobs to watch.
+        # 'queue' is used to add new watched jobs, and can be called from
+        # any thread (usually by the 'queue_job' method). 'watched' must only
+        # be modified by the monitor thread, which will move items from 'queue'
+        # to 'watched' and then manage the watched jobs.
+        self.watched = []
+        self.monitor_queue = Queue()
+
+    def _init_monitor_thread(self):
+        self.monitor_thread = threading.Thread( name="%s.monitor_thread" % self.runner_name, target=self.monitor )
+        self.monitor_thread.setDaemon( True )
+        self.monitor_thread.start()
+
+    def _init_worker_threads(self):
+        self.work_queue = Queue()
+        self.work_threads = []
+        nworkers = self.app.config.cluster_job_queue_workers
+        for i in range( nworkers ):
+            worker = threading.Thread( name="%s.work_thread-%d" % (self.runner_name, i), target=self.run_next )
+            worker.setDaemon( True )
+            worker.start()
+            self.work_threads.append( worker )
+
+    def handle_stop(self):
+        # DRMAA and SGE runners should override this and disconnect.
+        pass
+
+    def monitor( self ):
+        """
+        Watches jobs currently in the cluster queue and deals with state changes
+        (queued to running) and job completion
+        """
+        while 1:
+            # Take any new watched jobs and put them on the monitor list
+            try:
+                while 1: 
+                    cluster_job_state = self.monitor_queue.get_nowait()
+                    if cluster_job_state is STOP_SIGNAL:
+                        # TODO: This is where any cleanup would occur
+                        self.handle_stop()
+                        return
+                    self.watched.append( cluster_job_state )
+            except Empty:
+                pass
+            # Iterate over the list of watched jobs and check state
+            self.check_watched_items()
+            # Sleep a bit before the next state check
+            time.sleep( 1 )
+
+    def run_next( self ):
+        """
+        Run the next item in the queue (a job waiting to run or finish )
+        """
+        while 1:
+            ( op, obj ) = self.work_queue.get()
+            if op is STOP_SIGNAL:
+                return
+            try:
+                if op == JOB_STATUS_QUEUED:
+                    # If the next item is to be run, then only run it if the
+                    # job state is "queued". Otherwise the next item was either
+                    # cancelled or one of its siblings encountered an error.
+                    job_state = obj.get_state()
+                    if model.Job.states.QUEUED == job_state:
+                        self.queue_job( obj )
+                    else:
+                        log.debug( "Not executing job %d in state %s"  % ( obj.get_id_tag(), job_state ) ) 
+                elif op == JOB_STATUS_FINISHED:
+                    self.finish_job( obj )
+                elif op == JOB_STATUS_FAILED:
+                    self.fail_job( obj )
+            except:
+                log.exception( "Uncaught exception %sing job" % op )
+
+    def monitor_job(self, job_state):
+        self.monitor_queue.put( job_state )
+
+    def put( self, job_wrapper ):
+        """Add a job to the queue (by job identifier)"""
+        # Change to queued state before handing to worker thread so the runner won't pick it up again
+        job_wrapper.change_state( model.Job.states.QUEUED )
+        self.mark_as_queued(job_wrapper)
+
+    def shutdown( self ):
+        """Attempts to gracefully shut down the monitor thread"""
+        log.info( "sending stop signal to worker threads" )
+        self.monitor_queue.put( STOP_SIGNAL )
+        for i in range( len( self.work_threads ) ):
+            self.work_queue.put( ( STOP_SIGNAL, None ) )
+
+    def check_watched_items(self):
+        """
+        This method is responsible for iterating over self.watched and handling
+        state changes and updating self.watched with a new list of watched job
+        states. Subclasses can opt to override this directly (as older job runners will
+        initially) or just override check_watched_item and allow the list processing to
+        reuse the logic here.
+        """
+        new_watched = []
+        for cluster_job_state in self.watched:
+            new_cluster_job_state = self.check_watched_item(cluster_job_state)
+            if new_cluster_job_state:
+                new_watched.append(new_cluster_job_state)
+        self.watched = new_watched
+
+    # Subclasses should implement this unless they override check_watched_items all together.
+    def check_watched_item(self):
+        raise NotImplementedError()
+
+    def queue_job(self, job_wrapper):
+        raise NotImplementedError()
+
+    def finish_job(self, job_state):
+        raise NotImplementedError()
+
+    def fail_job(self, job_state):
+        raise NotImplementedError()
+
+    def mark_as_finished(self, job_state):
+        self.work_queue.put( ( JOB_STATUS_FINISHED, job_state ) )
+
+    def mark_as_failed(self, job_state):
+        self.work_queue.put( ( JOB_STATUS_FAILED, job_state ) )
+
+    def mark_as_queued(self, job_wrapper):
+        self.work_queue.put( ( JOB_STATUS_QUEUED, job_wrapper ) )

lib/galaxy/jobs/runners/drmaa.py

     def stop_job( self, job ):
         """Attempts to delete a job from the DRM queue"""
         try:
+            ext_id = job.get_job_runner_external_id()
+            assert ext_id not in ( None, 'None' ), 'External job id is None'
             if self.external_killJob_script is None:
-                self.ds.control( job.get_job_runner_external_id(), drmaa.JobControlAction.TERMINATE )
+                self.ds.control( ext_id, drmaa.JobControlAction.TERMINATE )
             else:
                 # FIXME: hardcoded path
-                subprocess.Popen( [ '/usr/bin/sudo', '-E', self.external_killJob_script, str( job.get_job_runner_external_id() ), str( self.userid ) ], shell=False )
-            log.debug( "(%s/%s) Removed from DRM queue at user's request" % ( job.get_id(), job.get_job_runner_external_id() ) )
+                subprocess.Popen( [ '/usr/bin/sudo', '-E', self.external_killJob_script, str( ext_id ), str( self.userid ) ], shell=False )
+            log.debug( "(%s/%s) Removed from DRM queue at user's request" % ( job.get_id(), ext_id ) )
         except drmaa.InvalidJobException:
-            log.debug( "(%s/%s) User killed running job, but it was already dead" % ( job.get_id(), job.get_job_runner_external_id() ) )
+            log.debug( "(%s/%s) User killed running job, but it was already dead" % ( job.get_id(), ext_id ) )
         except Exception, e:
-            log.debug( "(%s/%s) User killed running job, but error encountered removing from DRM queue: %s" % ( job.get_id(), job.get_job_runner_external_id(), e ) )
+            log.debug( "(%s/%s) User killed running job, but error encountered removing from DRM queue: %s" % ( job.get_id(), ext_id, e ) )
 
     def recover( self, job, job_wrapper ):
         """Recovers jobs stuck in the queued/running state when Galaxy started"""

lib/galaxy/jobs/runners/lwr.py

 import logging
 import subprocess
-from Queue import Queue
-import threading
-
-import re
 
 from galaxy import model
-from galaxy.datatypes.data import nice_size
-from galaxy.jobs.runners import BaseJobRunner
+from galaxy.jobs.runners import ClusterJobState, ClusterJobRunner, STOP_SIGNAL
 
-import os, errno
+import errno
 from time import sleep
 
+from lwr_client import FileStager, Client
+
 log = logging.getLogger( __name__ )
 
 __all__ = [ 'LwrJobRunner' ]
 
-import urllib 
-import urllib2
-import httplib
-import mmap 
-import tempfile
-import time
 
-import simplejson
+class LwrJobRunner( ClusterJobRunner ):
+    """
+    LWR Job Runner
+    """
+    runner_name = "LWRRunner"
 
-class FileStager(object):
-    
-    def __init__(self, client, command_line, config_files, input_files, output_files, tool_dir):
-        self.client = client
-        self.command_line = command_line
-        self.config_files = config_files
-        self.input_files = input_files
-        self.output_files = output_files
-        self.tool_dir = os.path.abspath(tool_dir)
+    def __init__( self, app ):
+        """Start the job runner """
+        super( LwrJobRunner, self ).__init__( app )
+        self._init_monitor_thread()
+        log.info( "starting LWR workers" )
+        self._init_worker_threads()
 
-        self.file_renames = {}
+    def check_watched_item(self, job_state):
+        try:
+            client = self.get_client_from_state(job_state)
+            complete = client.check_complete()
+        except Exception:
+            # An orphaned job was put into the queue at app startup, so remote server went down
+            # either way we are done I guess.
+            self.mark_as_finished(job_state)
+            return None
+        if complete:
+            self.mark_as_finished(job_state)
+            return None
+        return job_state
 
-        job_config = client.setup()
-
-        self.new_working_directory = job_config['working_directory']
-        self.new_outputs_directory = job_config['outputs_directory']
-        self.remote_path_separator = job_config['path_separator']
-
-        self.__initialize_referenced_tool_files()
-        self.__upload_tool_files()
-        self.__upload_input_files()
-        self.__initialize_output_file_renames()
-        self.__initialize_config_file_renames()
-        self.__rewrite_and_upload_config_files()
-        self.__rewrite_command_line()
-
-    def __initialize_referenced_tool_files(self):
-        pattern = r"(%s%s\S+)" % (self.tool_dir, os.sep)
-        referenced_tool_files = []
-        referenced_tool_files += re.findall(pattern, self.command_line)
-        if self.config_files != None:
-            for config_file in self.config_files:
-                referenced_tool_files += re.findall(pattern, self.__read(config_file))
-        self.referenced_tool_files = referenced_tool_files
-
-    def __upload_tool_files(self):
-        for referenced_tool_file in self.referenced_tool_files:
-            tool_upload_response = self.client.upload_tool_file(referenced_tool_file)
-            self.file_renames[referenced_tool_file] = tool_upload_response['path']
-
-    def __upload_input_files(self):
-        for input_file in self.input_files:
-            input_upload_response = self.client.upload_input(input_file)
-            self.file_renames[input_file] = input_upload_response['path']
-            
-    def __initialize_output_file_renames(self):
-        for output_file in self.output_files:
-            self.file_renames[output_file] = r'%s%s%s' % (self.new_outputs_directory, 
-                                                         self.remote_path_separator, 
-                                                         os.path.basename(output_file))
-
-    def __initialize_config_file_renames(self):
-        for config_file in self.config_files:
-            self.file_renames[config_file] = r'%s%s%s' % (self.new_working_directory,
-                                                         self.remote_path_separator,
-                                                         os.path.basename(config_file))
-
-    def __rewrite_paths(self, contents):
-        new_contents = contents
-        for local_path, remote_path in self.file_renames.iteritems():
-            new_contents = new_contents.replace(local_path, remote_path)
-        return new_contents
-
-    def __rewrite_and_upload_config_files(self):
-        for config_file in self.config_files:
-            config_contents = self.__read(config_file)
-            new_config_contents = self.__rewrite_paths(config_contents)
-            self.client.upload_config_file(config_file, new_config_contents)
-
-    def __rewrite_command_line(self):
-        self.rewritten_command_line = self.__rewrite_paths(self.command_line)
-
-    def get_rewritten_command_line(self):
-        return self.rewritten_command_line
-
-    def __read(self, path):
-        input = open(path, "r")
-        try:
-            return input.read()
-        finally:
-            input.close()
-
-        
-        
-class Client(object):
-    """    
-    """
-    """    
-    """
-    def __init__(self, remote_host, job_id, private_key=None):
-        if not remote_host.endswith("/"):
-            remote_host = remote_host + "/"
-        ## If we don't have an explicit private_key defined, check for
-        ## one embedded in the URL. A URL of the form
-        ## https://moo@cow:8913 will try to contact https://cow:8913
-        ## with a private key of moo
-        private_key_format = "https?://(.*)@.*/?"
-        private_key_match= re.match(private_key_format, remote_host)
-        if not private_key and private_key_match:
-            private_key = private_key_match.group(1)
-            remote_host = remote_host.replace("%s@" % private_key, '', 1)
-        self.remote_host = remote_host
-        self.job_id = job_id
-        self.private_key = private_key
-
-    def url_open(self, request, data):
-        return urllib2.urlopen(request, data)
-        
-    def __build_url(self, command, args):
-        if self.private_key:
-            args["private_key"] = self.private_key
-        data = urllib.urlencode(args)
-        url = self.remote_host + command + "?" + data
-        return url
-
-    def __raw_execute(self, command, args = {}, data = None):
-        url = self.__build_url(command, args)
-        request = urllib2.Request(url=url, data=data)
-        response = self.url_open(request, data)
-        return response
-
-    def __raw_execute_and_parse(self, command, args = {}, data = None):
-        response = self.__raw_execute(command, args, data)
-        return simplejson.loads(response.read())
-
-    def __upload_file(self, action, path, contents = None):
-        """ """
-        input = open(path, 'rb')
-        try:
-            mmapped_input = mmap.mmap(input.fileno(), 0, access = mmap.ACCESS_READ)
-            return self.__upload_contents(action, path, mmapped_input)
-        finally:
-            input.close()
-
-    def __upload_contents(self, action, path, contents):
-        name = os.path.basename(path)
-        args = {"job_id" : self.job_id, "name" : name}
-        return self.__raw_execute_and_parse(action, args, contents)
-    
-    def upload_tool_file(self, path):
-        return self.__upload_file("upload_tool_file", path)
-
-    def upload_input(self, path):
-        return self.__upload_file("upload_input", path)
-
-    def upload_config_file(self, path, contents):
-        return self.__upload_contents("upload_config_file", path, contents)
-        
-    def download_output(self, path):
-        """ """
-        name = os.path.basename(path)
-        response = self.__raw_execute('download_output', {'name' : name, 
-                                                          "job_id" : self.job_id})
-        output = open(path, 'wb')
-        try:
-            while True:
-                buffer = response.read(1024)
-                if buffer == "":
-                    break
-                output.write(buffer)
-        finally:
-            output.close()
-    
-    def launch(self, command_line):
-        """ """
-        return self.__raw_execute("launch", {"command_line" : command_line,
-                                             "job_id" : self.job_id})
-
-    def kill(self):
-        return self.__raw_execute("kill", {"job_id" : self.job_id})
-    
-    def wait(self):
-        """ """
-        while True:
-            check_complete_response = self.__raw_execute_and_parse("check_complete", {"job_id" : self.job_id })
-            complete = check_complete_response["complete"] == "true"
-            if complete:
-                return check_complete_response
-            time.sleep(1)
-
-    def clean(self):
-        self.__raw_execute("clean", { "job_id" : self.job_id })
-
-    def setup(self):
-        return self.__raw_execute_and_parse("setup", { "job_id" : self.job_id })
-
-
-
-class LwrJobRunner( BaseJobRunner ):
-    """
-    Lwr Job Runner
-    """
-    STOP_SIGNAL = object()
-    def __init__( self, app ):
-        """Start the job runner with 'nworkers' worker threads"""
-        self.app = app
-        self.sa_session = app.model.context
-
-        # start workers
-        self.queue = Queue()
-        self.threads = []
-        nworkers = app.config.local_job_queue_workers
-        log.info( "starting workers" )
-        for i in range( nworkers  ):
-            worker = threading.Thread( name=( "LwrJobRunner.thread-%d" % i ), target=self.run_next )
-            worker.setDaemon( True )
-            worker.start()
-            self.threads.append( worker )
-        log.debug( "%d workers ready", nworkers )
-
-    def run_next( self ):
-        """Run the next job, waiting until one is available if neccesary"""
-        while 1:
-            job_wrapper = self.queue.get()
-            if job_wrapper is self.STOP_SIGNAL:
-                return
-            try:
-                self.run_job( job_wrapper )
-            except:
-                log.exception( "Uncaught exception running job" )
-
-    def determine_lwr_url(self, url):
-        lwr_url = url[ len( 'lwr://' ) : ]
-        return  lwr_url 
-
-    def get_client_from_wrapper(self, job_wrapper):
-        return self.get_client( job_wrapper.get_job_runner_url(), job_wrapper.job_id )
-
-    def get_client(self, job_runner, job_id):
-        lwr_url = self.determine_lwr_url( job_runner )
-        return Client(lwr_url, job_id)   
-
-    def run_job( self, job_wrapper ):
+    def queue_job(self, job_wrapper):
         stderr = stdout = command_line = ''
 
         runner_url = job_wrapper.get_job_runner_url()
         try:
             job_wrapper.prepare()
             if hasattr(job_wrapper, 'prepare_input_files_cmds') and job_wrapper.prepare_input_files_cmds is not None:
-                for cmd in job_wrapper.prepare_input_file_cmds: # run the commands to stage the input files
+                for cmd in job_wrapper.prepare_input_files_cmds: # run the commands to stage the input files
                     #log.debug( 'executing: %s' % cmd )
                     if 0 != os.system(cmd):
                         raise Exception('Error running file staging command: %s' % cmd)
                 job_wrapper.prepare_input_files_cmds = None # prevent them from being used in-line
-            command_line = self.build_command_line( job_wrapper, include_metadata=False )
+            command_line = self.build_command_line( job_wrapper, include_metadata=False, include_work_dir_outputs=False )
         except:
             job_wrapper.fail( "failure preparing job", exception=True )
             log.exception("failure running job %d" % job_wrapper.job_id)
             return
 
         # If we were able to get a command line, run the job
-        if command_line:
-            try:                
-                #log.debug( 'executing: %s' % command_line )
-                client = self.get_client_from_wrapper(job_wrapper)
-                output_fnames = job_wrapper.get_output_fnames()
-                output_files = [ str( o ) for o in output_fnames ]
-                input_files = job_wrapper.get_input_fnames()
-                file_stager = FileStager(client, command_line, job_wrapper.extra_filenames, input_files, output_files, job_wrapper.tool.tool_dir)
-                rebuilt_command_line = file_stager.get_rewritten_command_line()
-                client.launch( rebuilt_command_line )
+        if not command_line:
+            job_wrapper.finish( '', '' )
+            return
 
-                job_wrapper.set_runner( runner_url, job_wrapper.job_id )
-                job_wrapper.change_state( model.Job.states.RUNNING )
+        try:
+            #log.debug( 'executing: %s' % command_line )
+            client = self.get_client_from_wrapper(job_wrapper)
+            output_files = self.get_output_files(job_wrapper)
+            input_files = job_wrapper.get_input_fnames()
+            working_directory = job_wrapper.working_directory
+            file_stager = FileStager(client, command_line, job_wrapper.extra_filenames, input_files, output_files, job_wrapper.tool.tool_dir, working_directory)
+            rebuilt_command_line = file_stager.get_rewritten_command_line()
+            job_id = file_stager.job_id
+            client.launch( rebuilt_command_line )
+            job_wrapper.set_runner( runner_url, job_id )
+            job_wrapper.change_state( model.Job.states.RUNNING )
 
-                run_results = client.wait()
-                log.debug('run_results %s' % run_results )
-                stdout = run_results['stdout']
-                stderr = run_results['stderr']
+        except Exception, exc:
+            job_wrapper.fail( "failure running job", exception=True )
+            log.exception("failure running job %d" % job_wrapper.job_id)
+            return
 
-                
-                if job_wrapper.get_state() not in [ model.Job.states.ERROR, model.Job.states.DELETED ]:
-                    for output_file in output_files:
-                        client.download_output(output_file)
-                client.clean()
-                log.debug('execution finished: %s' % command_line)
-            except Exception, exc:
-                job_wrapper.fail( "failure running job", exception=True )
-                log.exception("failure running job %d" % job_wrapper.job_id)
-                return
+        lwr_job_state = ClusterJobState()
+        lwr_job_state.job_wrapper = job_wrapper
+        lwr_job_state.job_id = job_id
+        lwr_job_state.old_state = True
+        lwr_job_state.running = True
+        lwr_job_state.runner_url = runner_url
+        self.monitor_job(lwr_job_state)
+
+    def get_output_files(self, job_wrapper):
+        output_fnames = job_wrapper.get_output_fnames()
+        return [ str( o ) for o in output_fnames ]
+
+
+    def determine_lwr_url(self, url):
+        lwr_url = url[ len( 'lwr://' ) : ]
+        return  lwr_url 
+
+    def get_client_from_wrapper(self, job_wrapper):
+        job_id = job_wrapper.job_id
+        if hasattr(job_wrapper, 'task_id'):
+            job_id = "%s_%s" % (job_id, job_wrapper.task_id)
+        return self.get_client( job_wrapper.get_job_runner_url(), job_id )
+
+    def get_client_from_state(self, job_state):
+        job_runner = job_state.runner_url
+        job_id = job_state.job_id
+        return self.get_client(job_runner, job_id)
+
+    def get_client(self, job_runner, job_id):
+        lwr_url = self.determine_lwr_url( job_runner )
+        return Client(lwr_url, job_id)   
+
+    def finish_job( self, job_state ):
+        stderr = stdout = command_line = ''
+        job_wrapper = job_state.job_wrapper
+        try:
+            client = self.get_client_from_state(job_state)
+
+            run_results = client.raw_check_complete()
+            log.debug('run_results %s' % run_results )
+            stdout = run_results['stdout']
+            stderr = run_results['stderr']
+
+            if job_wrapper.get_state() not in [ model.Job.states.ERROR, model.Job.states.DELETED ]:
+                work_dir_outputs = self.get_work_dir_outputs(job_wrapper)
+                output_files = self.get_output_files(job_wrapper)
+                for source_file, output_file in work_dir_outputs:
+                    client.download_work_dir_output(source_file, job_wrapper.working_directory, output_file)
+                    # Remove from full output_files list so don't try to download directly.
+                    output_files.remove(output_file)
+                for output_file in output_files:
+                    client.download_output(output_file, working_directory=job_wrapper.working_directory)
+            client.clean()
+            log.debug('execution finished: %s' % command_line)
+        except Exception, exc:
+            job_wrapper.fail( "failure running job", exception=True )
+            log.exception("failure running job %d" % job_wrapper.job_id)
+            return
         #run the metadata setting script here
         #this is terminate-able when output dataset/job is deleted
         #so that long running set_meta()s can be canceled without having to reboot the server
             job_wrapper.external_output_metadata.set_job_runner_external_pid( external_metadata_proc.pid, self.sa_session )
             external_metadata_proc.wait()
             log.debug( 'execution of external set_meta finished for job %d' % job_wrapper.job_id )
-        
+
         # Finish the job                
         try:
             job_wrapper.finish( stdout, stderr )
             log.exception("Job wrapper finish method failed")
             job_wrapper.fail("Unable to finish job", exception=True)
 
-    def put( self, job_wrapper ):
-        """Add a job to the queue (by job identifier)"""
-        # Change to queued state before handing to worker thread so the runner won't pick it up again
-        job_wrapper.change_state( model.Job.states.QUEUED )
-        self.queue.put( job_wrapper )
-    
+    def fail_job( self, job_state ):
+        """
+        Seperated out so we can use the worker threads for it.
+        """
+        self.stop_job( self.sa_session.query( self.app.model.Job ).get( job_state.job_wrapper.job_id ) )
+        job_state.job_wrapper.fail( job_state.fail_message )
+
     def shutdown( self ):
         """Attempts to gracefully shut down the worker threads"""
         log.info( "sending stop signal to worker threads" )
-        for i in range( len( self.threads ) ):
-            self.queue.put( self.STOP_SIGNAL )
+        self.monitor_queue.put( STOP_SIGNAL )
+        for i in range( len( self.work_threads ) ):
+            self.work_queue.put( ( STOP_SIGNAL, None ) )
         log.info( "local job runner stopped" )
 
     def check_pid( self, pid ):
 
     def stop_job( self, job ):
         #if our local job has JobExternalOutputMetadata associated, then our primary job has to have already finished
-        if job.external_output_metadata:
-            pid = job.external_output_metadata[0].job_runner_external_pid #every JobExternalOutputMetadata has a pid set, we just need to take from one of them
+        job_ext_output_metadata = job.get_external_output_metadata()
+        if job_ext_output_metadata: 
+            pid = job_ext_output_metadata[0].job_runner_external_pid #every JobExternalOutputMetadata has a pid set, we just need to take from one of them
             if pid in [ None, '' ]:
                 log.warning( "stop_job(): %s: no PID in database for job, unable to stop" % job.id )
                 return
             log.debug("Attempt remote lwr kill of job with url %s and id %s" % (lwr_url, job_id))
             client = self.get_client(lwr_url, job_id)
             client.kill()
+
+
     def recover( self, job, job_wrapper ):
-        # local jobs can't be recovered
-        job_wrapper.change_state( model.Job.states.ERROR, info = "This job was killed when Galaxy was restarted.  Please retry the job." )
-
+        """Recovers jobs stuck in the queued/running state when Galaxy started"""
+        job_state = ClusterJobState()
+        job_state.job_id = str( job.get_job_runner_external_id() )
+        job_state.runner_url = job_wrapper.get_job_runner_url()
+        job_wrapper.command_line = job.get_command_line()
+        job_state.job_wrapper = job_wrapper
+        if job.get_state() == model.Job.states.RUNNING:
+            log.debug( "(LWR/%s) is still in running state, adding to the LWR queue" % ( job.get_id()) )
+            job_state.old_state = True
+            job_state.running = True
+            self.monitor_queue.put( job_state )
+        elif job.get_state() == model.Job.states.QUEUED:
+            # LWR doesn't queue currently, so this indicates galaxy was shutoff while 
+            # job was being staged. Not sure how to recover from that. 
+            job_state.job_wrapper.fail( "This job was killed when Galaxy was restarted.  Please retry the job." )

lib/galaxy/jobs/runners/lwr_client/__init__.py

+"""
+lwr_client
+==========
+
+This module contains logic for interfacing with an external LWR server.
+
+"""
+import mmap
+import os
+import re
+import time
+import urllib
+import urllib2
+
+import simplejson
+
+
+class JobInputs(object):
+    """
+    Abstractions over dynamic inputs created for a given job (namely the command to
+    execute and created configfiles).
+
+    **Parameters**
+
+    command_line : str
+        Local command to execute for this job. (To be rewritten.)
+    config_files : str
+        Config files created for this job. (To be rewritten.)
+
+
+    >>> import tempfile
+    >>> tf = tempfile.NamedTemporaryFile()
+    >>> def setup_inputs(tf):
+    ...     open(tf.name, "w").write("world /path/to/input the rest")
+    ...     inputs = JobInputs("hello /path/to/input", [tf.name])
+    ...     return inputs
+    >>> inputs = setup_inputs(tf)
+    >>> inputs.rewrite_paths("/path/to/input", 'C:\\input')
+    >>> inputs.rewritten_command_line
+    'hello C:\\\\input'
+    >>> inputs.rewritten_config_files[tf.name]
+    'world C:\\\\input the rest'
+    >>> tf.close()
+    >>> tf = tempfile.NamedTemporaryFile()
+    >>> inputs = setup_inputs(tf)
+    >>> inputs.find_referenced_subfiles('/path/to')
+    ['/path/to/input']
+    >>> inputs.path_referenced('/path/to')
+    True
+    >>> inputs.path_referenced('/path/to/input')
+    True
+    >>> inputs.path_referenced('/path/to/notinput')
+    False
+    >>> tf.close()
+    """
+
+    def __init__(self, command_line, config_files):
+        self.rewritten_command_line = command_line
+        self.rewritten_config_files = {}
+        for config_file in config_files or []:
+            config_contents = _read(config_file)
+            self.rewritten_config_files[config_file] = config_contents
+
+    def find_referenced_subfiles(self, directory):
+        """
+        Return list of files below specified `directory` in job inputs. Could
+        use more sophisticated logic (match quotes to handle spaces, handle
+        subdirectories, etc...).
+
+        **Parameters**
+
+        directory : str
+            Full path to directory to search.
+
+        """
+        pattern = r"(%s%s\S+)" % (directory, os.sep)
+        referenced_files = set()
+        for input_contents in self.__items():
+            referenced_files.update(re.findall(pattern, input_contents))
+        return list(referenced_files)
+
+    def path_referenced(self, path):
+        pattern = r"%s" % path
+        found = False
+        for input_contents in self.__items():
+            if re.findall(pattern, input_contents):
+                found = True
+                break
+        return found
+
+    def rewrite_paths(self, local_path, remote_path):
+        """
+        Rewrite references to `local_path` with  `remote_path` in job inputs.
+        """
+        self.__rewrite_command_line(local_path, remote_path)
+        self.__rewrite_config_files(local_path, remote_path)
+
+    def __rewrite_command_line(self, local_path, remote_path):
+        self.rewritten_command_line = self.rewritten_command_line.replace(local_path, remote_path)
+
+    def __rewrite_config_files(self, local_path, remote_path):
+        for config_file, rewritten_contents in self.rewritten_config_files.iteritems():
+            self.rewritten_config_files[config_file] = rewritten_contents.replace(local_path, remote_path)
+
+    def __items(self):
+        items = [self.rewritten_command_line]
+        items.extend(self.rewritten_config_files.values())
+        return items
+
+
+class FileStager(object):
+    """
+    Objects of the FileStager class interact with an LWR client object to
+    stage the files required to run jobs on a remote LWR server.
+
+    **Parameters**
+
+    client : Client
+        LWR client object.
+    command_line : str
+        The local command line to execute, this will be rewritten for the remote server.
+    config_files : list
+        List of Galaxy 'configfile's produced for this job. These will be rewritten and sent to remote server.
+    input_files :  list
+        List of input files used by job. These will be transferred and references rewritten.
+    output_files : list
+        List of output_files produced by job.
+    tool_dir : str
+        Directory containing tool to execute (if a wrapper is used, it will be transferred to remote server).
+    working_directory : str
+        Local path created by Galaxy for running this job.
+
+    """
+
+    def __init__(self, client, command_line, config_files, input_files, output_files, tool_dir, working_directory):
+        """
+        """
+        self.client = client
+        self.command_line = command_line
+        self.config_files = config_files
+        self.input_files = input_files
+        self.output_files = output_files
+        self.tool_dir = os.path.abspath(tool_dir)
+        self.working_directory = working_directory
+
+        # Setup job inputs, these will need to be rewritten before
+        # shipping off to remote LWR server.
+        self.job_inputs = JobInputs(self.command_line, self.config_files)
+
+        self.file_renames = {}
+
+        self.__handle_setup()
+        self.__initialize_referenced_tool_files()
+        self.__upload_tool_files()
+        self.__upload_input_files()
+        self.__upload_working_directory_files()
+        self.__initialize_output_file_renames()
+        self.__initialize_task_output_file_renames()
+        self.__initialize_config_file_renames()
+        self.__handle_rewrites()
+        self.__upload_rewritten_config_files()
+
+    def __handle_setup(self):
+        job_config = self.client.setup()
+
+        self.new_working_directory = job_config['working_directory']
+        self.new_outputs_directory = job_config['outputs_directory']
+        self.remote_path_separator = job_config['path_separator']
+        # If remote LWR server assigned job id, use that otherwise
+        # just use local job_id assigned.
+        galaxy_job_id = self.client.job_id
+        self.job_id = job_config.get('job_id', galaxy_job_id)
+        if self.job_id != galaxy_job_id:
+            # Remote LWR server assigned an id different than the
+            # Galaxy job id, update client to reflect this.
+            self.client.job_id = self.job_id
+
+    def __initialize_referenced_tool_files(self):
+        self.referenced_tool_files = self.job_inputs.find_referenced_subfiles(self.tool_dir)
+
+    def __upload_tool_files(self):
+        for referenced_tool_file in self.referenced_tool_files:
+            tool_upload_response = self.client.upload_tool_file(referenced_tool_file)
+            self.file_renames[referenced_tool_file] = tool_upload_response['path']
+
+    def __upload_input_files(self):
+        for input_file in self.input_files:
+            self.__upload_input_file(input_file)
+            self.__upload_input_extra_files(input_file)
+
+    def __upload_input_file(self, input_file):
+        if self.job_inputs.path_referenced(input_file):
+            input_upload_response = self.client.upload_input(input_file)
+            self.file_renames[input_file] = input_upload_response['path']
+
+    def __upload_input_extra_files(self, input_file):
+        # TODO: Determine if this is object store safe and what needs to be
+        # done if it is not.
+        files_path = "%s_files" % input_file[0:-len(".dat")]
+        if os.path.exists(files_path) and self.job_inputs.path_referenced(files_path):
+            for extra_file in os.listdir(files_path):
+                extra_file_path = os.path.join(files_path, extra_file)
+                relative_path = os.path.basename(files_path)
+                extra_file_relative_path = os.path.join(relative_path, extra_file)
+                response = self.client.upload_extra_input(extra_file_path, extra_file_relative_path)
+                self.file_renames[extra_file_path] = response['path']
+
+    def __upload_working_directory_files(self):
+        # Task manager stages files into working directory, these need to be
+        # uploaded if present.
+        for working_directory_file in os.listdir(self.working_directory):
+            path = os.path.join(self.working_directory, working_directory_file)
+            working_file_response = self.client.upload_working_directory_file(path)
+            self.file_renames[path] = working_file_response['path']
+
+    def __initialize_output_file_renames(self):
+        for output_file in self.output_files:
+            self.file_renames[output_file] = r'%s%s%s' % (self.new_outputs_directory,
+                                                         self.remote_path_separator,
+                                                         os.path.basename(output_file))
+
+    def __initialize_task_output_file_renames(self):
+        for output_file in self.output_files:
+            name = os.path.basename(output_file)
+            self.file_renames[os.path.join(self.working_directory, name)] = r'%s%s%s' % (self.new_working_directory,
+                                                                                         self.remote_path_separator,
+                                                                                         name)
+
+    def __initialize_config_file_renames(self):
+        for config_file in self.config_files:
+            self.file_renames[config_file] = r'%s%s%s' % (self.new_working_directory,
+                                                         self.remote_path_separator,
+                                                         os.path.basename(config_file))
+
+    def __rewrite_paths(self, contents):
+        new_contents = contents
+        for local_path, remote_path in self.file_renames.iteritems():
+            new_contents = new_contents.replace(local_path, remote_path)
+        return new_contents
+
+    def __handle_rewrites(self):
+        for local_path, remote_path in self.file_renames.iteritems():
+            self.job_inputs.rewrite_paths(local_path, remote_path)
+
+    def __upload_rewritten_config_files(self):
+        for config_file, new_config_contents in self.job_inputs.rewritten_config_files.iteritems():
+            self.client.upload_config_file(config_file, new_config_contents)
+
+    def get_rewritten_command_line(self):
+        """
+        Returns the rewritten version of the command line to execute suitable
+        for remote host.
+        """
+        return self.job_inputs.rewritten_command_line
+
+
+class Client(object):
+    """
+    Objects of this client class perform low-level communication with a remote LWR server.
+
+    **Parameters**
+
+    remote_host : str
+        Remote URL of the LWR server.
+    job_id : str
+        Galaxy job/task id.
+    private_key : str (optional)
+        Secret key the remote LWR server is configured with.
+    """
+
+    def __init__(self, remote_host, job_id, private_key=None):
+        if not remote_host.endswith("/"):
+            remote_host = remote_host + "/"
+        ## If we don't have an explicit private_key defined, check for
+        ## one embedded in the URL. A URL of the form
+        ## https://moo@cow:8913 will try to contact https://cow:8913
+        ## with a private key of moo
+        private_key_format = "https?://(.*)@.*/?"
+        private_key_match = re.match(private_key_format, remote_host)
+        if not private_key and private_key_match:
+            private_key = private_key_match.group(1)
+            remote_host = remote_host.replace("%s@" % private_key, '', 1)
+        self.remote_host = remote_host
+        self.job_id = job_id
+        self.private_key = private_key
+
+    def _url_open(self, request, data):
+        return urllib2.urlopen(request, data)
+
+    def __build_url(self, command, args):
+        if self.private_key:
+            args["private_key"] = self.private_key
+        data = urllib.urlencode(args)
+        url = self.remote_host + command + "?" + data
+        return url
+
+    def __raw_execute(self, command, args={}, data=None):
+        url = self.__build_url(command, args)
+        request = urllib2.Request(url=url, data=data)
+        response = self._url_open(request, data)
+        return response
+
+    def __raw_execute_and_parse(self, command, args={}, data=None):
+        response = self.__raw_execute(command, args, data)
+        return simplejson.loads(response.read())
+
+    def __upload_file(self, action, path, name=None, contents=None):
+        input = open(path, 'rb')
+        try:
+            mmapped_input = mmap.mmap(input.fileno(), 0, access=mmap.ACCESS_READ)
+            return self.__upload_contents(action, path, mmapped_input, name)
+        finally:
+            input.close()
+
+    def __upload_contents(self, action, path, contents, name=None):
+        if not name:
+            name = os.path.basename(path)
+        args = {"job_id": self.job_id, "name": name}
+        return self.__raw_execute_and_parse(action, args, contents)
+
+    def upload_tool_file(self, path):
+        """
+        Upload a tool related file (e.g. wrapper) required to run job.
+
+        **Parameters**
+
+        path : str
+            Local path tool.
+        """
+        return self.__upload_file("upload_tool_file", path)
+
+    def upload_input(self, path):
+        """
+        Upload input dataset to remote server.
+
+        **Parameters**
+
+        path : str
+            Local path of input dataset.
+        """
+        return self.__upload_file("upload_input", path)
+
+    def upload_extra_input(self, path, relative_name):
+        """
+        Upload extra input file to remote server.
+
+        **Parameters**
+
+        path : str
+            Extra files path of input dataset corresponding to this input.
+        relative_name : str
+            Relative path of extra file to upload relative to inputs extra files path.
+        """
+        return self.__upload_file("upload_extra_input", path, name=relative_name)
+
+    def upload_config_file(self, path, contents):
+        """
+        Upload a job's config file to the remote server.
+
+        **Parameters**
+
+        path : str
+            Local path to the original config file.
+        contents : str
+            Rewritten contents of the config file to upload.
+        """
+        return self.__upload_contents("upload_config_file", path, contents)
+
+    def upload_working_directory_file(self, path):
+        """
+        Upload the supplied file (path) from a job's working directory
+        to remote server.
+
+        **Parameters**
+
+        path : str
+            Path to file to upload.
+        """
+        return self.__upload_file("upload_working_directory_file", path)
+
+    def _get_output_type(self, name):
+        return self.__raw_execute_and_parse("get_output_type", {"name": name,
+                                                                "job_id": self.job_id})
+
+    def download_work_dir_output(self, source, working_directory, output_path):
+        """
+        Download an output dataset specified with from_work_dir from the
+        remote server.
+
+        **Parameters**
+
+        source : str
+            Path in job's working_directory to find output in.
+        working_directory : str
+            Local working_directory for the job.
+        output_path : str
+            Full path to output dataset.
+        """
+        output = open(output_path, "wb")
+        name = os.path.basename(source)
+        self.__raw_download_output(name, self.job_id, "work_dir", output)
+
+    def download_output(self, path, working_directory):
+        """
+        Download an output dataset from the remote server.
+
+        **Parameters**