Commits

Michael Heemskerk committed 0b299c1

Made the ExecutorService that is used for the IO pumps pluggable

Comments (0)

Files changed (4)

src/main/java/com/atlassian/utils/process/DefaultExternalProcessFactory.java

 package com.atlassian.utils.process;
 
+import org.apache.log4j.Logger;
+
+import java.util.concurrent.*;
+
 /**
  * A default implementation of the {@link ExternalProcessFactory} which builds and configures instances of
  * {@link ExternalProcessImpl}.
  */
 public class DefaultExternalProcessFactory implements ExternalProcessFactory {
 
+    private static final Logger LOG = Logger.getLogger(DefaultExternalProcessFactory.class);
+    private final ExecutorService pool;
     private boolean shutdown;
 
+    public DefaultExternalProcessFactory() {
+        this(createDefaultExecutorService());
+    }
+
+    public DefaultExternalProcessFactory(ExecutorService executorService) {
+        this.pool = executorService;
+    }
+
     public ExternalProcess create(ExternalProcessSettings settings) {
         if (shutdown) {
             throw new IllegalStateException("The DefaultExternalProcessFactory has been shutdown; new external processes cannot be created");
         }
         settings.validate();
 
-        ExternalProcessImpl process =  new ExternalProcessImpl(settings.getCommand(), settings.getProcessHandler());
+        ExternalProcessImpl process =  new ExternalProcessImpl(pool, settings.getCommand(), settings.getProcessHandler());
         configureProcess(process, settings);
 
         return process;
     }
 
     /**
-     * Shuts down the thread pool managed by {@link ExternalProcessImpl}.
+     * Attempts to shutdown the internal {@code ThreadPoolExecutor} which manages the I/O pumps for external processes.
+     * To prevent memory leaks, web applications which use process-utils should always call this when they terminate.
+     * <p/>
+     * On termination, an attempt is made to shutdown the thread pool gracefully. If that does not complete within five
+     * seconds, shutdown is forced. That is given another five seconds to complete before the thread pool is abandoned.
      *
-     * @see com.atlassian.utils.process.ExternalProcessImpl#shutdown()
+     * @since 1.5
      */
     public void shutdown() {
         shutdown = true;
 
-        ExternalProcessImpl.shutdown();
+        if (pool == null) {
+            return;
+        }
+        LOG.debug("Attempting to shutdown pump executor service");
+
+        pool.shutdown();
+        try {
+            if (pool.awaitTermination(5, TimeUnit.SECONDS)) {
+                LOG.debug("Pump executor service has shutdown gracefully");
+            } else {
+                //The executor did not shutdown within the timeout. We can't wait forever, though, so issue a
+                //shutdownNow() and give it another second
+                LOG.warn("Pump executor service did not shutdown within the timeout; forcing shutdown");
+
+                pool.shutdownNow();
+                if (pool.awaitTermination(5, TimeUnit.SECONDS)) {
+                    //The forced shutdown has brought the executor down. Not ideal, but acceptable
+                    LOG.debug("Pump executor service has been forced to shutdown");
+                } else {
+                    //We can't delay execution indefinitely waiting, so log a warning. The JVM may not shut down
+                    //if this service does not stop (because it uses non-daemon threads), so this may be helpful
+                    //in debugging should that happen.
+                    LOG.warn("Pump executor service did not shutdown; it will be abandoned");
+                }
+            }
+        } catch (InterruptedException e) {
+            LOG.warn("Interrupted while waiting for the pump executor service to shutdown; some worker threads may " +
+                    "still be running");
+        }
     }
 
     protected void configureProcess(ExternalProcessImpl process, ExternalProcessSettings settings) {
             process.setIdleTimeout(settings.getIdleTimeout());
         }
     }
+
+    private static ExecutorService createDefaultExecutorService() {
+        final String pooledThreadName = "ExtProcess IO Pump";
+
+        ThreadFactory threadFactory = new ThreadFactory() {
+            public Thread newThread(Runnable r) {
+                return new Thread(r, pooledThreadName);
+            }
+        };
+
+        return new ThreadPoolExecutor(6, Integer.MAX_VALUE, 2, TimeUnit.MINUTES,
+                new SynchronousQueue<Runnable>(), threadFactory) {
+
+            @Override
+            protected void beforeExecute(Thread thread, Runnable runnable) {
+                thread.setName(thread.getId() + ":" + ((LatchedRunnable) runnable).getName());
+                super.beforeExecute(thread, runnable);
+            }
+
+            @Override
+            protected void afterExecute(Runnable runnable, Throwable throwable) {
+                Thread.currentThread().setName(pooledThreadName);
+                super.afterExecute(runnable, throwable);
+            }
+        };
+    }
 }

src/main/java/com/atlassian/utils/process/ExternalProcessBuilder.java

 
     private final ExternalProcessSettings settings = new ExternalProcessSettings();
 
+    public static ExternalProcessFactory getExternalProcessFactory() {
+        return externalProcessFactory;
+    }
+
     public static void setExternalProcessFactory(ExternalProcessFactory factory) {
         if (factory == null) {
             throw new NullPointerException("factory");

src/main/java/com/atlassian/utils/process/ExternalProcessImpl.java

 
     private static final Logger LOG = Logger.getLogger(ExternalProcessImpl.class);
     private static final String OS_NAME = System.getProperty("os.name").toLowerCase();
-    private static final ExecutorService POOL;
 
     private AtomicBoolean canceled = new AtomicBoolean(false);
     private List<String> command;
     private Map<String, String> environment;
     private LatchedRunnable errorPump;
+    private final ExecutorService executorService;
     private long executionTimeout;
     private ProcessHandler handler;
     private long idleTimeout;
     private File workingDir;
     private boolean useWindowsEncodingWorkaround;
 
-    static {
-        final String pooledThreadName = "ExtProcess IO Pump";
-        ThreadFactory threadFactory = new ThreadFactory() {
-            public Thread newThread(Runnable r) {
-                return new Thread(r, pooledThreadName);
-            }
-        };
-        POOL = new ThreadPoolExecutor(6, Integer.MAX_VALUE, 2, TimeUnit.MINUTES,
-                new SynchronousQueue<Runnable>(), threadFactory) {
-
-            @Override
-            protected void beforeExecute(Thread thread, Runnable runnable) {
-                thread.setName(thread.getId() + ":" + ((LatchedRunnable) runnable).getName());
-                super.beforeExecute(thread, runnable);
-            }
-
-            @Override
-            protected void afterExecute(Runnable runnable, Throwable throwable) {
-                Thread.currentThread().setName(pooledThreadName);
-                super.afterExecute(runnable, throwable);
-            }
-        };
+    /**
+     * Process an external command.
+     *
+     * @param executorService the executorService to use for the IO pump threads
+     * @param command the command and its arguments as separate elements
+     * @param handler the process handler to manage the execution of this process
+     */
+    public ExternalProcessImpl(ExecutorService executorService, String[] command, ProcessHandler handler) {
+        this(executorService, Arrays.asList(command), handler);
     }
 
     /**
      * Process an external command.
      *
+     * @param executorService the executorService to use for the IO pump threads
      * @param command the command and its arguments as separate elements
      * @param handler the process handler to manage the execution of this process
      */
-    public ExternalProcessImpl(String[] command, ProcessHandler handler) {
-        this(Arrays.asList(command), handler);
-    }
-
-    /**
-     * Process an external command.
-     *
-     * @param command the command and its arguments as separate elements
-     * @param handler the process handler to manage the execution of this process
-     */
-    public ExternalProcessImpl(List<String> command, ProcessHandler handler) {
+    public ExternalProcessImpl(ExecutorService executorService, List<String> command, ProcessHandler handler) {
         setCommand(command);
         setHandler(handler);
 
+        this.executorService = executorService;
         idleTimeout = TimeUnit.MINUTES.toMillis(1L);
         monitors = new ArrayList<ProcessMonitor>();
         startTime = -1;
      * the command and its arguments. Spaces are used as argument delimiters so if any command arguments
      * need to contain spaces, the array or list based constructors should be used.
      *
+     * @param executorService the executorService to use for the IO pump threads
      * @param commandLine the command and its arguments in a single line. If any arguments
      *                    need to contain spaces, the array or list based constructors should be used.
      * @param handler     The handler for this execution. The handler supports the required IO
      *                    operations
      */
-    public ExternalProcessImpl(String commandLine, ProcessHandler handler) {
-        this(ProcessUtils.tokenizeCommand(commandLine), handler);
+    public ExternalProcessImpl(ExecutorService executorService, String commandLine, ProcessHandler handler) {
+        this(executorService, ProcessUtils.tokenizeCommand(commandLine), handler);
     }
 
     public void resetWatchdog() {
         resetWatchdog();
         handler.setWatchdog(this);
 
-        POOL.execute(errorPump);
-        POOL.execute(outputPump);
+        executorService.execute(errorPump);
+        executorService.execute(outputPump);
         if (inputPump != null) {
-            POOL.execute(inputPump);
+            executorService.execute(inputPump);
         }
     }
 
     public void setTimeout(long timeout) {
         setIdleTimeout(timeout);
     }
-
-    /**
-     * Attempts to shutdown the internal {@code ThreadPoolExecutor} which manages the I/O pumps for external processes.
-     * To prevent memory leaks, web applications which use process-utils should always call this when they terminate.
-     * <p/>
-     * On termination, an attempt is made to shutdown the thread pool gracefully. If that does not complete within five
-     * seconds, shutdown is forced. That is given another five seconds to complete before the thread pool is abandoned.
-     *
-     * @since 1.5
-     */
-    public static void shutdown() {
-        if (POOL == null) {
-            return;
-        }
-        LOG.debug("Attempting to shutdown pump executor service");
-
-        POOL.shutdown();
-        try {
-            if (POOL.awaitTermination(5, TimeUnit.SECONDS)) {
-                LOG.debug("Pump executor service has shutdown gracefully");
-            } else {
-                //The executor did not shutdown within the timeout. We can't wait forever, though, so issue a
-                //shutdownNow() and give it another second
-                LOG.warn("Pump executor service did not shutdown within the timeout; forcing shutdown");
-
-                POOL.shutdownNow();
-                if (POOL.awaitTermination(5, TimeUnit.SECONDS)) {
-                    //The forced shutdown has brought the executor down. Not ideal, but acceptable
-                    LOG.debug("Pump executor service has been forced to shutdown");
-                } else {
-                    //We can't delay execution indefinitely waiting, so log a warning. The JVM may not shut down
-                    //if this service does not stop (because it uses non-daemon threads), so this may be helpful
-                    //in debugging should that happen.
-                    LOG.warn("Pump executor service did not shutdown; it will be abandoned");
-                }
-            }
-        } catch (InterruptedException e) {
-            LOG.warn("Interrupted while waiting for the pump executor service to shutdown; some worker threads may " +
-                    "still be running");
-        }
-    }
 }

src/test/java/com/atlassian/utils/process/CallEcho.java

             System.out.println(getBytes(echo));
             System.out.println(getBytes(result[0]));
         }
-        ExternalProcessImpl.shutdown();
+
+        ExternalProcessBuilder.getExternalProcessFactory().shutdown();
     }
 }