Commits

Michael Heemskerk committed 5a4dad2

Change to ensure that a process is only cancelled once. Previously, if a cancelled process was cancelled again, the pump threads would be interrupted again. Since the pump threads are returned to the threadpool after the process finishes, this can cause other processes (that were started later and are reusing the pump threads from the thread pool) to abort

  • Participants
  • Parent commits 9ae46db

Comments (0)

Files changed (2)

File src/main/java/com/atlassian/utils/process/ExternalProcessImpl.java

 import java.io.InterruptedIOException;
 import java.util.*;
 import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * This class manages the execution of an external process, using separate threads to process
     private static final String OS_NAME = System.getProperty("os.name").toLowerCase();
     private static final ExecutorService POOL;
 
-    private boolean canceled;
+    private AtomicBoolean canceled = new AtomicBoolean(false);
     private List<String> command;
     private Map<String, String> environment;
     private LatchedRunnable errorPump;
     private ProcessHandler handler;
     private long idleTimeout;
     private LatchedRunnable inputPump;
+    private volatile boolean inputPumpInterruptedAfterProcessFinished = false;
     private long lastWatchdogReset;
     private List<ProcessMonitor> monitors;
     private LatchedRunnable outputPump;
     private long startTime;
     private File workingDir;
     private boolean useWindowsEncodingWorkaround;
-    private volatile boolean stdInPumpInterrupted = false;
 
     static {
         final String pooledThreadName = "ExtProcess IO Pump";
     private void handleHandlerError(String handlerName, Throwable t) {
         if (!isCanceled()) {
             if (isAlive()) {
+                LOG.debug(handlerName + " encountered an error; aborting process", t);
+                // if any of the pump streams error out and are no longer pumping, there's
+                // no point in keeping the process running. There is a high probability that
+                // the process will simply be blocked until the (new dead) input/outputstream is
+                // read from or written to.
                 cancel();
             }
-            LOG.debug(handlerName + " encountered an error; aborting process", t);
             if (t instanceof ProcessException) {
                 processException = (ProcessException) t;
             } else {
                         // The StdInHandler IO pump is intentionally interrupted when the process terminates, the output and the
                         // error pumps have finished and the StdInHandler does not finish by itself. In this case we do not
                         // consider this to be an error condition and we ignore the exception
-                        if (!isStdInPumpInterrupted(t)) {
+                        if (!shouldIgnoreInputPumpException(t)) {
                             handleHandlerError(name, t);
                         }
                     }
                 }
 
                 int exitCode = wrapUpProcess();
-                handler.complete(exitCode, canceled, processException);
+                handler.complete(exitCode, isCanceled(), processException);
             }
         } else {
             handler.complete(-1, false, processException);
                     // process finished
                     finished = true;
                     int exitCode = wrapUpProcess();
-                    handler.complete(exitCode, canceled, processException);
+                    handler.complete(exitCode, isCanceled(), processException);
                 }
             }
             return finished;
         }
     }
 
-    private boolean isStdInPumpInterrupted(Throwable t) {
-        if (stdInPumpInterrupted) {
+    private boolean shouldIgnoreInputPumpException(Throwable t) {
+        // the only case where we want to ignore inputPump exceptions is when we've explicitly interrupted
+        // the inputPump *after* the process had already ended.
+        if (inputPumpInterruptedAfterProcessFinished) {
             while (t != null) {
                 if (t instanceof InterruptedException || t instanceof InterruptedIOException) {
                     return true;
      * Cancel should be called if you wish to interrupt process execution.
      */
     public void cancel() {
-        this.canceled = true;
-
-        internalCancel();
+        if (canceled.compareAndSet(false, true)) {
+            internalCancel();
+        }
     }
 
     private void internalCancel() {
-        stdInPumpInterrupted = !isAlive() && inputPump.isRunning();
+        
+        if (inputPump != null) {
+            // we determine whether we're going to interrupt the inputPump *after* the process has already finished.
+            // if that is the case, we don't treat the InterruptedException that we catch from the inputPump as an error
+            // and the process as a whole is not marked as having resulted in errors.
+            inputPumpInterruptedAfterProcessFinished = !isAlive() && inputPump.isRunning();
 
-        if (inputPump != null) {
             inputPump.cancel();
+            inputPump = null;
         }
         if (outputPump != null) {
             outputPump.cancel();
+            outputPump = null;
         }
         if (errorPump != null) {
             errorPump.cancel();
+            errorPump = null;
         }
-        if (inputPump != null) {
-            inputPump.cancel();
-        }
+
         if (process != null) {
             if (isWindows()) {
                 try {
     }
 
     public boolean isCanceled() {
-        return canceled;
+        return canceled.get();
     }
 
     public boolean isAlive() {
         try {
-            process.exitValue();
-            return false;
+            if (process != null) {
+                process.exitValue();
+            }
         } catch (IllegalThreadStateException e) {
             // thrown when the process has not finished yet
             return true;
         }
+
+        return false;
     }
 
     public void setExecutionTimeout(long executionTimeout) {

File src/main/java/com/atlassian/utils/process/LatchedRunnable.java

     private Stack<?> ndcStack = NDC.cloneStack();
     protected String name;
 
-    protected LatchedRunnable()
-    {
+    protected LatchedRunnable() {
     }
 
-    protected LatchedRunnable(final String name)
-    {
+    protected LatchedRunnable(final String name) {
         this.name = name;
     }
 
         } finally {
             latch.countDown();
             NDC.remove();
+            // release the runner thread, we're done with it.
+            runner = null;
         }
     }
 
     }
 
     public String getName() {
-        return "LatchedRunnable";
+        return name;
     }
 }