Commits

Anonymous committed dc20a7b

When the history panel is used to stop a job, the job's subtasks are
cancelled. The mapping change allows "Job.tasks" to refer to the job's
subtasks.

  • Participants
  • Parent commits fabe313

Comments (0)

Files changed (2)

File lib/galaxy/jobs/runners/tasks.py

             return False
 
     def stop_job( self, job ):
-        # DBTODO Call stop on all of the tasks.
-        #if our local job has JobExternalOutputMetadata associated, then our primary job has to have already finished
-        if job.external_output_metadata:
-            pid = job.external_output_metadata[0].job_runner_external_pid #every JobExternalOutputMetadata has a pid set, we just need to take from one of them
+        # We need to stop all subtasks. This is going to stay in the task
+        # runner because the task runner also starts all the tasks.
+        # First, get the list of tasks from job.tasks, which uses SQL
+        # alchemy to retrieve a job's list of tasks.
+        tasks = job.tasks
+        if ( len( job.tasks ) > 0 ):
+            for task in job.tasks:
+                self.stop_pid( task.task_runner_external_id, job.id )
+
+        # There were no subtasks, so just kill the job. We'll touch
+        # this if the tasks runner is used but the tool does not use
+        # parallelism. 
         else:
-            pid = job.job_runner_external_id
-        if pid in [ None, '' ]:
-            log.warning( "stop_job(): %s: no PID in database for job, unable to stop" % job.id )
-            return
+            #if our local job has JobExternalOutputMetadata associated, then our primary job has to have already finished
+            if job.external_output_metadata:
+                pid = job.external_output_metadata[0].job_runner_external_pid #every JobExternalOutputMetadata has a pid set, we just need to take from one of them
+            else:
+                pid = job.job_runner_external_id
+            if pid in [ None, '' ]:
+                log.warning( "stop_job(): %s: no PID in database for job, unable to stop" % job.id )
+                return
+            self.stop_pid( pid, job.id )
+
+    def stop_pid( self, pid, job_id ):
+        """
+        This method stops the given process id whether it's a task or job.
+        It is meant to be a private helper method, but it is mostly reusable. 
+        The first argument is the process id to stop, and the second id is the
+        job's id (which is used for logging messages only right now).
+        """
         pid = int( pid )
+        log.debug( "Stopping pid %s" % pid )
         if not self.check_pid( pid ):
-            log.warning( "stop_job(): %s: PID %d was already dead or can't be signaled" % ( job.id, pid ) )
+            log.warning( "stop_job(): %s: PID %d was already dead or can't be signaled" % ( job_id, pid ) )
             return
         for sig in [ 15, 9 ]:
             try:
                 os.killpg( pid, sig )
             except OSError, e:
-                log.warning( "stop_job(): %s: Got errno %s when attempting to signal %d to PID %d: %s" % ( job.id, errno.errorcode[e.errno], sig, pid, e.strerror ) )
-                return  # give up
+                # This warning could be bogus; many tasks are stopped with 
+                # SIGTERM (signal 15), but ymmv depending on the platform. 
+                log.warning( "stop_job(): %s: Got errno %s when attempting to signal %d to PID %d: %s" % ( job_id, errno.errorcode[e.errno], sig, pid, e.strerror ) )
+                return
+            # TODO: If we're stopping lots of tasks, then we will want to put this 
+            # avoid a two-second overhead using some other asynchronous method. 
             sleep( 2 )
             if not self.check_pid( pid ):
-                log.debug( "stop_job(): %s: PID %d successfully killed with signal %d" %( job.id, pid, sig ) )
+                log.debug( "stop_job(): %s: PID %d successfully killed with signal %d" %( job_id, pid, sig ) )
                 return
         else:
-            log.warning( "stop_job(): %s: PID %d refuses to die after signaling TERM/KILL" %( job.id, pid ) )
+            log.warning( "stop_job(): %s: PID %d refuses to die after signaling TERM/KILL" %( job_id, pid ) )
 
     def recover( self, job, job_wrapper ):
         # DBTODO Task Recovery, this should be possible.

File lib/galaxy/model/mapping.py

                      post_job_actions=relation( PostJobActionAssociation, lazy=False ),
                      input_library_datasets=relation( JobToInputLibraryDatasetAssociation ),
                      output_library_datasets=relation( JobToOutputLibraryDatasetAssociation ),
-                     external_output_metadata = relation( JobExternalOutputMetadata, lazy = False ) ) )
+                     external_output_metadata = relation( JobExternalOutputMetadata, lazy = False ), 
+                     tasks = relation(Task) ) )
 
 assign_mapper( context, Task, Task.table,
-    properties=dict( job = relation( Job )))
+    properties=dict( job = relation( Job ) ) )
                      
 assign_mapper( context, DeferredJob, DeferredJob.table, 
     properties = {} )