Michael Heemskerk avatar Michael Heemskerk committed 812c83a

Changed the way ExternalProcessImpl waits for its IO pumps to finish in order to cope with InputHandlers that
are blocked on IO. This often happens when an InputHandler is used to pump data from an OutputStream to the
ExternalProcess inputstream. The InputHandler can be blocked on a read while the process finishes.

Previously, ExternalProcess would not finish until the InputHandler finished are tried to write something to the
process inputstream (which triggers an exception after the stream is closed).

This change makes the ExternalProcess wait for process output and error pumps to finish on their own. After these
pumps have finished, ExternalProcess will wait until either the input pump finishes, or until the process finishes.

Comments (0)

Files changed (3)

src/main/java/com/atlassian/utils/process/ExternalProcess.java

     public void executeWhile(Runnable runnable);
 
     public String getCommandLine();
+
+    /**
+     * @return true if the process is currently still running. {@code false} if the process has finished.
+     */
+    public boolean isAlive();
 }

src/main/java/com/atlassian/utils/process/ExternalProcessImpl.java

     private void handleHandlerError(String handlerName, Throwable t) {
         if (!isCanceled()) {
             LOG.debug(handlerName + " encountered an error; aborting process", t);
-            cancel();
+            if (isAlive()) {
+                cancel();
+            }
             if (t instanceof ProcessException) {
                 processException = (ProcessException) t;
             } else {
                     handler.processOutput(process.getInputStream());
                 } catch (Throwable t) {
                     handleHandlerError(name, t);
-                }
+                } 
             }
         };
 
                 do {
                     long checkTime = getTimeoutTime();
                     awaitPump(outputPump, checkTime);
-                    awaitPump(inputPump, checkTime);
                     awaitPump(errorPump, checkTime);
-                } while (!isTimedOut() && arePumpsRunning() && !Thread.currentThread().isInterrupted());
+                    awaitPumpOrProcess(inputPump, checkTime);
+                } while (!isTimedOut() && isAlive() && arePumpsRunning() && !Thread.currentThread().isInterrupted());
             } finally {
                 if (Thread.currentThread().isInterrupted()) {
                     cancel();
             try {
                 long endTime = System.currentTimeMillis() + maxWait;
                 awaitPump(outputPump, endTime);
-                awaitPump(inputPump, endTime);
                 awaitPump(errorPump, endTime);
+                awaitPumpOrProcess(inputPump, endTime);
             } finally {
-                if (!arePumpsRunning()) {
+                if (!arePumpsRunning() || !isAlive()) {
                     // process finished
                     finished = true;
                     int exitCode = wrapUpProcess();
         return exitCode;
     }
 
+    /**
+     * Wait for either the pump or the process to finish, whichever happens first.
+     * @param runnable the runnable that is pumping the input/outputstream
+     * @param latestTime the maximum timestamp for waiting on the pump to finish
+     */
+    private void awaitPumpOrProcess(LatchedRunnable runnable, long latestTime) {
+        if (runnable != null) {
+            boolean finished = false;
+            while (!finished && System.currentTimeMillis() < latestTime && isAlive()) {
+                long timeout = Math.min(1000, latestTime - System.currentTimeMillis());
+                if (timeout < 1) {
+                    timeout = 1;
+                }
+                finished = runnable.await(timeout);
+            }
+        }
+    }
+
+    /**
+     * Wait for the pump to finish.
+     * @param runnable the runnable that is pumping the input/outputstream
+     * @param latestTime the maximum timestamp for waiting on the pump to finish
+     */
     private void awaitPump(LatchedRunnable runnable, long latestTime) {
         if (runnable != null) {
             long timeout = latestTime - System.currentTimeMillis();
         return canceled;
     }
 
+    public boolean isAlive() {
+        try {
+            process.exitValue();
+            return false;
+        } catch (IllegalThreadStateException e) {
+            // thrown when the process has not finished yet
+            return true;
+        }
+    }
+
     public void setExecutionTimeout(long executionTimeout) {
         this.executionTimeout = executionTimeout;
     }

src/test/java/com/atlassian/utils/process/ExternalProcessImplTest.java

 
 import org.junit.Test;
 
+import java.io.OutputStream;
 import java.util.Arrays;
 import java.util.concurrent.TimeUnit;
 
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 public class ExternalProcessImplTest {
         assertTrue(exception instanceof ProcessTimeoutException);
     }
 
+    private ExternalProcess buildProcessWithBlockingInputHandler() {
+        ExternalProcessBuilder builder = new ExternalProcessBuilder();
+        if (isWindows()) {
+            //pause produces a small amount of output immediately but then produces no more. 
+            builder.command(Arrays.asList("pause"));
+
+        } else {
+            //sleep is present on MacOS and Linux and produces no output while running for a specified duration. We
+            //provide a short duration here so that we know the command will 
+            builder.command(Arrays.asList("sleep", "1"));
+        }
+
+
+        InputHandler inputHandler = new BaseInputHandler() {
+            public void process(OutputStream input) {
+                try {
+                    // this is for windows pause. Wait 1 second then press a key
+                    input.write("key".getBytes());
+
+                    // sleep for a long time, the process should finish before the inputHandler finishes
+                    // this simulates an inputHandler that simply copies data from some other stream into
+                    // the process inputstream. It is quite common for the process to finish while the
+                    // inputHandler is blocked on IO.
+                    Thread.sleep(30 * 1000L);
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        };
+
+        //Ensure the timeout here is less than the timeout on the test
+        ExternalProcess process = builder.idleTimeout(TimeUnit.SECONDS.toMillis(15))
+                .handlers(inputHandler, new StringOutputHandler())
+                .build();
+        
+        return process;
+    }
+
+    @Test(timeout = 5 * 1000) // Test timeout in case execution doesn't stop as it should
+    public void testBlockingInputHandlerAbortsWhenProcessFinishes() {
+        ExternalProcess process = buildProcessWithBlockingInputHandler();
+
+        process.start();
+        process.finish();
+        
+        // the process should finish by itself and should not be marked as canceled.
+        assertFalse(process.isCanceled());
+    }
+
+    @Test(timeout = 5 * 1000) // Test timeout in case execution doesn't stop as it should
+    public void testBlockingInputHandlerAbortsWhenProcessFinished2() {
+        ExternalProcess process = buildProcessWithBlockingInputHandler();
+
+        process.execute();
+
+        // the test succeeds if it does not time out.
+        process.start();
+
+        // the process should finish by itself and should not be marked as canceled.
+        assertTrue(process.finish(10 * 1000));
+        assertFalse(process.isCanceled());
+    }
+    
+
     private boolean isWindows() {
         return System.getProperty("os.name").toLowerCase().contains("windows");
     }
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.