Commits

Anonymous committed 3d07a78

The task runner now chooses an exit code based on the tasks' exit codes - if the tasks are successful, then the last exit code scanned is chosen, and if a task fails then the first scanned failing task's exit code is chosen. The JobWrapper's fail method also stores stdout, stderr, and the exit code if they're available, though this will currently only applies to when a Job using the task runner fails. A null/None exit code is also now supported.

  • Participants
  • Parent commits 6d45fd1

Comments (0)

Files changed (4)

lib/galaxy/jobs/__init__.py

         self.version_string_cmd = self.tool.version_string_cmd
         return extra_filenames
 
-    def fail( self, message, exception=False ):
+    def fail( self, message, exception=False, stdout="", stderr="", exit_code=None ):
         """
         Indicate job failure by setting state and message on all output
         datasets.
             job.state = job.states.ERROR
             job.command_line = self.command_line
             job.info = message
+            # TODO: Put setting the stdout, stderr, and exit code in one place
+            # (not duplicated with the finish method).
+            if ( len( stdout ) > 32768 ):
+                stdout = stdout[:32768]
+                log.info( "stdout for job %d is greater than 32K, only first part will be logged to database" % job.id )
+            job.stdout = stdout 
+            if ( len( stderr ) > 32768 ):
+                stderr = stderr[:32768]
+                log.info( "stderr for job %d is greater than 32K, only first part will be logged to database" % job.id )
+            job.stderr = stderr  
+            # Let the exit code be Null if one is not provided:
+            if ( exit_code != None ):
+                job.exit_code = exit_code
+
             self.sa_session.add( job )
             self.sa_session.flush()
         #Perform email action even on failure.
         self.sa_session.add( job )
         self.sa_session.flush()
 
-    def finish( self, stdout, stderr, tool_exit_code=0 ):
+    def finish( self, stdout, stderr, tool_exit_code=None ):
         """
         Called to indicate that the associated command has been run. Updates
         the output datasets based on stderr and stdout from the command, and
         self.sa_session.expunge_all()
         job = self.get_job()
 
+        # TODO: After failing here, consider returning from the function. 
         try:
             self.reclaim_ownership()
         except:
-            self.fail( job.info )
             log.exception( '(%s) Failed to change ownership of %s, failing' % ( job.id, self.working_directory ) )
+            return self.fail( job.info, stdout=stdout, stderr=stderr, exit_code=tool_exit_code )
 
+        log.debug( "############## JobWrapper.finish: %s exit code" 
+                 % ( "None" if None == tool_exit_code else str(tool_exit_code)))
         # if the job was deleted, don't finish it
         if job.state == job.states.DELETED or job.state == job.states.ERROR:
-            #ERROR at this point means the job was deleted by an administrator.
-            return self.fail( job.info )
+            # ERROR at this point means the job was deleted by an administrator.
+            # SM: Note that, at this point, the exit code must be saved in case
+            # there was an error. Errors caught here could mean that the job
+            # was deleted by an administrator (based on old comments), but it
+            # could also mean that a job was broken up into tasks and one of
+            # the tasks failed. So 
+            return self.fail( job.info, stderr=stderr, stdout=stdout, exit_code=tool_exit_code )
 
         # Check the tool's stdout, stderr, and exit code for errors, but only
         # if the job has not already been marked as having an error. 
         # The job's stdout and stderr will be set accordingly.
+        log.debug( "############## JobWrapper.finish: Post-check exit code: %s/%s" 
+                 % ( ( "None" if None == tool_exit_code else str(tool_exit_code) ),
+                     ( "None" if None == job.exit_code else str(job.exit_code) ) ) )
         if job.states.ERROR != job.state:
             if ( self.check_tool_output( stdout, stderr, tool_exit_code, job )):
                 job.state = job.states.OK
             else:
                 job.state = job.states.ERROR
+            log.debug( "############## JobWrapper.finish: Post-check exit code: %s" 
+                     % ( "None" if None == tool_exit_code else str(tool_exit_code)))
 
         if self.version_string_cmd:
             version_filename = self.get_version_string_path()
                 # TODO: The context['stderr'] holds stderr's contents. An error
                 # only really occurs if the job also has an error. So check the
                 # job's state:
-                #if context['stderr']:
                 if job.states.ERROR == job.state:
                     dataset.blurb = "error"
                 elif dataset.has_data():
         self.sa_session.flush()
         # Save stdout and stderr
         if len( job.stdout ) > 32768:
-            log.error( "stdout for job %d is greater than 32K, only first part will be logged to database" % job.id )
+            log.info( "stdout for job %d is greater than 32K, only first part will be logged to database" % job.id )
         job.stdout = job.stdout[:32768]
         if len( job.stderr ) > 32768:
-            log.error( "stderr for job %d is greater than 32K, only first part will be logged to database" % job.id )
+            log.info( "stderr for job %d is greater than 32K, only first part will be logged to database" % job.id )
         job.stderr = job.stderr[:32768]
-        job.exit_code = tool_exit_code
+        # The exit code will be null if there is no exit code to be set.
+        # This is so that we don't assign an exit code, such as 0, that
+        # is either incorrect or has the wrong semantics. 
+        if None != tool_exit_code:
+            job.exit_code = tool_exit_code
+        log.debug( "############## JobWrapper.finish: storing %s exit code" 
+                 % ( "None" if None == job.exit_code else str(job.exit_code)))
         # custom post process setup
         inp_data = dict( [ ( da.name, da.dataset ) for da in job.input_datasets ] )
         out_data = dict( [ ( da.name, da.dataset ) for da in job.output_datasets ] )
                 # that range, then apply the error level and add a message.
                 # If we've reached a fatal error rule, then stop.
                 max_error_level = galaxy.tools.StdioErrorLevel.NO_ERROR
-                for stdio_exit_code in self.tool.stdio_exit_codes:
-                    if ( tool_exit_code >= stdio_exit_code.range_start and 
-                         tool_exit_code <= stdio_exit_code.range_end ):
-                        # Tack on a generic description of the code 
-                        # plus a specific code description. For example,
-                        # this might prepend "Job 42: Warning: Out of Memory\n".
-                        code_desc = stdio_exit_code.desc
-                        if ( None == code_desc ):
-                            code_desc = ""
-                        tool_msg = ( "%s: Exit code %d: %s" % (
-                                     galaxy.tools.StdioErrorLevel.desc( stdio_exit_code.error_level ),
-                                     tool_exit_code,
-                                     code_desc ) )
-                        log.info( "Job %s: %s" % (job.get_id_tag(), tool_msg) )
-                        stderr = tool_msg + "\n" + stderr
-                        max_error_level = max( max_error_level, 
-                                               stdio_exit_code.error_level )
-                        if ( max_error_level >= 
-                             galaxy.tools.StdioErrorLevel.FATAL ):
-                            break
+                if tool_exit_code != None:
+                    for stdio_exit_code in self.tool.stdio_exit_codes:
+                        if ( tool_exit_code >= stdio_exit_code.range_start and 
+                             tool_exit_code <= stdio_exit_code.range_end ):
+                            # Tack on a generic description of the code 
+                            # plus a specific code description. For example,
+                            # this might prepend "Job 42: Warning (Out of Memory)\n".
+                            code_desc = stdio_exit_code.desc
+                            if ( None == code_desc ):
+                                code_desc = ""
+                            tool_msg = ( "%s: Exit code %d (%s)" % (
+                                         galaxy.tools.StdioErrorLevel.desc( stdio_exit_code.error_level ),
+                                         tool_exit_code,
+                                         code_desc ) )
+                            log.info( "Job %s: %s" % (job.get_id_tag(), tool_msg) )
+                            stderr = tool_msg + "\n" + stderr
+                            max_error_level = max( max_error_level, 
+                                                   stdio_exit_code.error_level )
+                            if ( max_error_level >= 
+                                 galaxy.tools.StdioErrorLevel.FATAL ):
+                                break
     
                 if max_error_level < galaxy.tools.StdioErrorLevel.FATAL:
                     # We'll examine every regex. Each regex specifies whether
         self.sa_session.refresh( task )
         return task.state
 
+    def get_exit_code( self ):
+        task = self.get_task()
+        self.sa_session.refresh( task )
+        return task.exit_code
+
     def set_runner( self, runner_url, external_id ):
         task = self.get_task()
         self.sa_session.refresh( task )
         self.sa_session.add( task )
         self.sa_session.flush()
 
-    def finish( self, stdout, stderr, tool_exit_code=0 ):
+    def finish( self, stdout, stderr, tool_exit_code=None ):
         # DBTODO integrate previous finish logic.
         # Simple finish for tasks.  Just set the flag OK.
         """
         """
         # This may have ended too soon
         log.debug( 'task %s for job %d ended; exit code: %d' 
-                 % (self.task_id, self.job_id, tool_exit_code) )
+                 % (self.task_id, self.job_id, 
+                    tool_exit_code if tool_exit_code != None else -256 ) )
         # default post job setup_external_metadata
         self.sa_session.expunge_all()
         task = self.get_task()

lib/galaxy/jobs/runners/local.py

                                 if sleep_time < 8:
                                     # So we don't stat every second
                                     sleep_time *= 2
-                    # Reap the process and get the exit code. The exit code should
-                    # only be None if the process isn't finished, but check anyway.
-                    exit_code = proc.wait() # reap
-                    if None == exit_code:
-                        exit_code = 0
+                    # Reap the process and get the exit code.
+                    exit_code = proc.wait()
                     stdout_file.seek( 0 )
                     stderr_file.seek( 0 )
                     stdout = stdout_file.read( 32768 )
         
             # Finish the job!
             try:
-                job_wrapper.finish( stdout, stderr, exit_code )
+                #job_wrapper.finish( stdout, stderr, exit_code )
+                # DELETEME: This is for testing how null exit codes are handled:
+                log.debug( "############## Finishing job - None exit code" )
+                job_wrapper.finish( stdout, stderr, None )
             except:
                 log.exception("Job wrapper finish method failed")
                 job_wrapper.fail("Unable to finish job", exception=True)

lib/galaxy/jobs/runners/tasks.py

             job_wrapper.fail( "failure preparing job", exception=True )
             log.exception("failure running job %d" % job_wrapper.job_id)
             return
+
+        # This is the job's exit code, which will depend on the tasks'
+        # exit code. The overall job's exit code will be one of two values: 
+        # o if the job is successful, then the last task scanned will be
+        #   used to determine the exit code. Note that this is not the same
+        #   thing as the last task to complete, which could be added later.
+        # o if a task fails, then the job will fail and the failing task's
+        #   exit code will become the job's exit code.
+        job_exit_code = ""
+
         # If we were able to get a command line, run the job.  ( must be passed to tasks )
         if command_line:
             try:
                     job_wrapper.fail("Job Splitting Failed, no match for '%s'" % job_wrapper.tool.parallelism)
                     return
                 tasks = splitter.do_split(job_wrapper)
-                # Not an option for now.  Task objects don't *do* anything useful yet, but we'll want them tracked outside this thread to do anything.
+                # Not an option for now.  Task objects don't *do* anything 
+                # useful yet, but we'll want them tracked outside this thread 
+                # to do anything.
                 # if track_tasks_in_database:
                 task_wrappers = []
                 for task in tasks:
                 # Deleted tasks are not included right now.
                 # 
                 while tasks_complete is False:
+                    log.debug( "************ Rechecking tasks" )
                     count_complete = 0
                     tasks_complete = True
                     for tw in task_wrappers:
                         task_state = tw.get_state()
+                        log.debug( "***** Checking task %d: state %s"
+                                 % (tw.task_id, task_state) )
                         if ( model.Task.states.ERROR == task_state ):
-                            log.debug( "Canceling job %d: Task %d returned an error"
-                                     % ( tw.job_id, tw.task_id ) )
+                            job_exit_code = tw.get_exit_code()
+                            log.debug( "Canceling job %d: Task %s returned an error (exit code %d)"
+                                     % ( tw.job_id, tw.task_id, job_exit_code ) )
                             self.cancel_job( job_wrapper, task_wrappers )
                             tasks_complete = True
                             break
                         elif not task_state in completed_states:
                             tasks_complete = False
                         else:
+                            job_exit_code = tw.get_exit_code()
                             count_complete = count_complete + 1
                     if tasks_complete is False:
                         sleep( sleep_time )
                         if sleep_time < 8:
                             sleep_time *= 2
                 import time
+                log.debug( "####################### Finished with tasks; job exit code: %d" % job_exit_code )
 
                 job_wrapper.reclaim_ownership()      # if running as the actual user, change ownership before merging.
                 log.debug('execution finished - beginning merge: %s' % command_line)
 
         # Finish the job
         try:
-            job_wrapper.finish( stdout, stderr )
+            log.debug( "$$$$$$$$$$$$$$ job_exit_code before finish: %d" % job_exit_code )
+            job_wrapper.finish( stdout, stderr, job_exit_code )
         except:
             log.exception("Job wrapper finish method failed")
             job_wrapper.fail("Unable to finish job", exception=True)

lib/galaxy/model/__init__.py

         self.post_job_actions = []
         self.imported = False
         self.handler = None
-        self.exit_code = 0
+        self.exit_code = None
 
     # TODO: Add accessors for members defined in SQL Alchemy for the Job table and
     # for the mapper defined to the Job table. 
         self.task_runner_name = None
         self.task_runner_external_id = None
         self.job = job
-        # SM: Using default empty strings avoids None exceptions later on.
         self.stdout = "" 
         self.stderr = "" 
-        self.exit_code = 0 
+        self.exit_code = None
         self.prepare_input_files_cmd = prepare_files_cmd
 
     def get_param_values( self, app ):