Commits

James Taylor committed 913228e

Compat with in memory job tracking

Comments (0)

Files changed (1)

 +                trans.sa_session.flush()
              trans.response.send_redirect( url_for( controller='tool_runner', action='redirect', redirect_url=redirect_url ) )
          else:
-             # Queue the job for execution
-             trans.app.job_queue.put( job.id, tool )
+-            # Queue the job for execution
+-            trans.app.job_queue.put( job.id, tool )
 -            trans.log_event( "Added job to the job queue, id: %s" % str(job.id), tool_id=job.tool_id )
 +            ## trans.log_event( "Added job to the job queue, id: %s" % str(job.id), tool_id=job.tool_id )
 +            if flush:
++                # Queue the job for execution
 +                trans.sa_session.flush()
++                trans.app.job_queue.put( job.id, tool )
              return job, out_data
 diff --git a/lib/galaxy/web/controllers/workflow.py b/lib/galaxy/web/controllers/workflow.py
 --- a/lib/galaxy/web/controllers/workflow.py
 +++ b/lib/galaxy/web/controllers/workflow.py
-@@ -892,7 +892,7 @@
+@@ -878,9 +878,11 @@
+                 workflow_invocation = model.WorkflowInvocation()
+                 workflow_invocation.workflow = workflow
+                 outputs = odict()
++                jobs_and_tools = []
+                 for i, step in enumerate( workflow.steps ):
+                     # Execute module
+                     job = None
++                    tool = None
+                     if step.type == 'tool' or step.type is None:
+                         tool = trans.app.toolbox.tools_by_id[ step.tool_id ]
+                         input_values = step.state.inputs
+@@ -892,7 +894,7 @@
                                      return outputs[ conn.output_step.id ][ conn.output_name ]
                          visit_input_values( tool.inputs, step.state.inputs, callback )
                          # Execute it
                          outputs[ step.id ] = out_data
                      else:
                          job, out_data = step.module.execute( trans, step.state )
+@@ -902,9 +904,15 @@
+                     workflow_invocation_step.workflow_invocation = workflow_invocation
+                     workflow_invocation_step.workflow_step = step
+                     workflow_invocation_step.job = job
++                    # Save for queueing
++                    if job and tool:
++                        jobs_and_tools.append( ( job, tool ) )
+                 # All jobs ran sucessfully, so we can save now
+                 trans.sa_session.add( workflow_invocation )
+                 trans.sa_session.flush()
++                # Queue up all jobs
++                for job, tool in jobs_and_tools:
++                    trans.app.job_queue.put( job.id, tool )
+                 return trans.fill_template( "workflow/run_complete.mako",
+                                             workflow=stored,
+                                             outputs=outputs )