Anonymous avatar Anonymous committed b0db630

general overhaul of locking, thread-handoff within SimpleThreadPool

git-svn-id: http://svn.opensymphony.com/svn/quartz/trunk@612 69f7d36a-ea1c-0410-88ea-9fd03e4c9665

Comments (0)

Files changed (2)

src/java/org/quartz/core/QuartzSchedulerThread.java

                                     // scheduler being shutdown or a bug in the thread pool or
                                     // a thread pool being used concurrently - which the docs
                                     // say not to do...
+                                    getLog().error("ThreadPool.runInThread() return false!");
                                     qsRsrcs.getJobStore().triggeredJobComplete(ctxt,
                                             trigger, bndle.getJobDetail(), Trigger.INSTRUCTION_SET_ALL_JOB_TRIGGERS_ERROR);
                                 } catch (SchedulerException se2) {

src/java/org/quartz/simpl/SimpleThreadPool.java

 import org.quartz.SchedulerConfigException;
 import org.quartz.spi.ThreadPool;
 
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
 /**
  * <p>
  * This is class is a simple implementation of a thread pool, based on the
      */
 
     private int count = -1;
-    private int availCount = 0;
 
     private int prio = Thread.NORM_PRIORITY;
 
     private boolean isShutdown = false;
+    private boolean handoffPending = false;
 
     private boolean inheritLoader = false;
 
 
     private ThreadGroup threadGroup;
 
-    private Runnable nextRunnable;
-
     private final Object nextRunnableLock = new Object();
 
-    private WorkerThread[] workers;
+    private List workers;
+    private LinkedList availWorkers = new LinkedList();
+    private LinkedList busyWorkers = new LinkedList();
 
     private String threadNamePrefix = "SimpleThreadPoolWorker";
 
         }
 
         // create the worker threads and start them
-        workers = createWorkerThreads(count);
-        for (int i = 0; i < count; ++i) {
-            if (isThreadsInheritContextClassLoaderOfInitializingThread()) {
-                workers[i].setContextClassLoader(Thread.currentThread()
-                        .getContextClassLoader());
-            }
-            availCount++;
-            workers[i].start();
+        Iterator workerThreads = createWorkerThreads(count).iterator();
+        while(workerThreads.hasNext()) {
+            WorkerThread wt = (WorkerThread) workerThreads.next();
+            wt.start();
+            availWorkers.add(wt);
         }
     }
 
-    protected WorkerThread[] createWorkerThreads(int count) {
-        workers = new WorkerThread[count];
-        for (int i = 0; i < count; ++i) {
-            workers[i] = new WorkerThread(this, threadGroup,
+    protected List createWorkerThreads(int count) {
+        workers = new LinkedList();
+        for (int i = 1; i<= count; ++i) {
+            WorkerThread wt = new WorkerThread(this, threadGroup,
                 getThreadNamePrefix() + "-" + i,
                 getThreadPriority(),
                 isMakeThreadsDaemons());
+            if (isThreadsInheritContextClassLoaderOfInitializingThread()) {
+                wt.setContextClassLoader(Thread.currentThread()
+                        .getContextClassLoader());
+            }
+            workers.add(wt);
         }
 
         return workers;
      * </p>
      */
     public void shutdown(boolean waitForJobsToComplete) {
-        isShutdown = true;
 
-        // signal each worker thread to shut down
-        for (int i = 0; i < workers.length; i++) {
-            if (workers[i] != null) {
-                workers[i].shutdown();
+        synchronized (nextRunnableLock) {
+            isShutdown = true;
+
+            // signal each worker thread to shut down
+            Iterator workerThreads = workers.iterator();
+            while(workerThreads.hasNext()) {
+                WorkerThread wt = (WorkerThread) workerThreads.next();
+                wt.shutdown();
+                availWorkers.remove(wt);
             }
-        }
 
-        // Give waiting (wait(1000)) worker threads a chance to shut down.
-        // Active worker threads will shut down after finishing their
-        // current job.
-        synchronized (nextRunnableLock) {
+            // Give waiting (wait(1000)) worker threads a chance to shut down.
+            // Active worker threads will shut down after finishing their
+            // current job.
             nextRunnableLock.notifyAll();
-        }
 
-        if (waitForJobsToComplete == true) {
-            // Wait until all worker threads are shut down
-            int alive = workers.length;
-            while (alive > 0) {
-                alive = 0;
-                for (int i = 0; i < workers.length; i++) {
-                    if (workers[i].isAlive()) {
-                        try {
-                            //if (logger.isDebugEnabled())
-                            getLog().debug(
-                                    "Waiting for thread no. " + i
-                                            + " to shut down");
-
-                            // note: with waiting infinite - join(0) - the
-                            // application
-                            // may appear to 'hang'. Waiting for a finite time
-                            // however
-                            // requires an additional loop (alive).
-                            alive++;
-                            workers[i].join(200);
-                        } catch (InterruptedException ex) {
-                        }
+            if (waitForJobsToComplete == true) {
+
+                // wait for hand-off in runInThread to complete...
+                while(handoffPending)
+                        try { nextRunnableLock.wait(100); } catch(Throwable t) {}
+
+                // Wait until all worker threads are shut down
+                while (busyWorkers.size() > 0) {
+                    WorkerThread wt = (WorkerThread) busyWorkers.getFirst();
+                    try {
+                        getLog().debug(
+                                "Waiting for thread " + wt.getName()
+                                        + " to shut down");
+
+                        // note: with waiting infinite time the
+                        // application may appear to 'hang'.
+                        nextRunnableLock.wait(2000);
+                    } catch (InterruptedException ex) {
                     }
                 }
-            }
 
-            //if (logger.isDebugEnabled()) {
-            int activeCount = threadGroup.activeCount();
-            if (activeCount > 0) {
-                getLog().info(
-                    "There are still " + activeCount + " worker threads active."
-                    + " See javadoc runInThread(Runnable) for a possible explanation");
-            }
+                int activeCount = threadGroup.activeCount();
+                if (activeCount > 0) {
+                    getLog().info(
+                        "There are still " + activeCount + " worker threads active."
+                        + " See javadoc runInThread(Runnable) for a possible explanation");
+                }
 
-            getLog().debug("shutdown complete");
-            //}
+                getLog().debug("shutdown complete");
+            }
         }
     }
 
             return false;
         }
 
-        if (isShutdown) {
-            try {
-                getLog()
-                        .info(
-                                "SimpleThreadPool.runInThread(): thread pool has been shutdown. Runnable will not be executed");
-            } catch(Exception e) {
-                // ignore to help with a tomcat glitch
-            }
-
-            return false;
-        }
-
         synchronized (nextRunnableLock) {
 
-            // Wait until a worker thread has taken the previous Runnable
-            // or until the thread pool is asked to shutdown.
-            while ((nextRunnable != null) && !isShutdown) {
+            handoffPending = true;
+
+            // Wait until a worker thread is available
+            while ((availWorkers.size() < 1) && !isShutdown) {
                 try {
-                    nextRunnableLock.wait(1000);
+                    nextRunnableLock.wait(500);
                 } catch (InterruptedException ignore) {
                 }
             }
 
-            if(availCount < 1 ) {
-                return false;
-            }
-
-            // During normal operation, not shutdown, set the nextRunnable
-            // and notify the worker threads waiting (getNextRunnable()).
             if (!isShutdown) {
-                nextRunnable = runnable;
-                nextRunnableLock.notifyAll();
+                WorkerThread wt = (WorkerThread)availWorkers.removeFirst();
+                busyWorkers.add(wt);
+                wt.run(runnable);
             }
-        }
-
-        // If the thread pool is going down, execute the Runnable
-        // within a new additional worker thread (no thread from the pool).
-        // note: the synchronized section should be as short (time) as
-        //  possible. Starting a new thread is not a quick action.
-        if (isShutdown) {
-            new WorkerThread(this, threadGroup,
-                    "WorkerThread-LastJob", prio, false, runnable);
+            else {
+                // If the thread pool is going down, execute the Runnable
+                // within a new additional worker thread (no thread from the pool).
+                WorkerThread wt = new WorkerThread(this, threadGroup,
+                        "WorkerThread-LastJob", prio, isMakeThreadsDaemons(), runnable);
+                busyWorkers.add(wt);
+                workers.add(wt);
+                wt.start();
+            }
+            nextRunnableLock.notifyAll();
+            handoffPending = false;
         }
 
         return true;
 
     public int blockForAvailableThreads() {
         synchronized(nextRunnableLock) {
-            if(availCount > 0) {
-                return availCount;
-            }
 
-            while(availCount < 1  && !isShutdown) {
+            while((availWorkers.size() < 1 || handoffPending) && !isShutdown) {
                 try {
-                    nextRunnableLock.wait(1000);
+                    nextRunnableLock.wait(500);
                 } catch (InterruptedException ignore) {
                 }
             }
-            return availCount;
+
+            return availWorkers.size();
         }
     }
 
-    /**
-     * <p>
-     * Dequeue the next pending <code>Runnable</code>.
-     * </p>
-     * 
-     * <p>
-     * getNextRunnable() should return null if within a specific time no new
-     * Runnable is available. This gives the worker thread the chance to check
-     * its shutdown flag. In case the worker thread is asked to shut down it
-     * will notify on nextRunnableLock, hence interrupt the wait state. That
-     * is, the time used for waiting need not be short.
-     * </p>
-     */
-    private Runnable getNextRunnable(boolean ranLastTime) throws InterruptedException {
-        Runnable toRun = null;
-
-        // Wait for new Runnable (see runInThread()) and notify runInThread()
-        // in case the next Runnable is already waiting.
-        synchronized (nextRunnableLock) {
-
-            if(ranLastTime) {
-                availCount++;
-                nextRunnableLock.notifyAll();
-            }
-
-            if (nextRunnable == null) {
-                nextRunnableLock.wait(1000);
-            }
-
-            if (nextRunnable != null) {
-                toRun = nextRunnable;
-                availCount--;
-                nextRunnable = null;
-                nextRunnableLock.notifyAll();
-            }
+    protected void makeAvailable(WorkerThread wt) {
+        synchronized(nextRunnableLock) {
+            if(!isShutdown)
+                availWorkers.add(wt);
+            busyWorkers.remove(wt);
+            nextRunnableLock.notifyAll();
         }
-
-        return toRun;
     }
 
     /*
             //interrupt();
         }
 
+        public void run(Runnable newRunnable) {
+            synchronized(this) {
+                if(runnable != null)
+                    throw new IllegalStateException("Already running a Runnable!");
+
+                runnable = newRunnable;
+                this.notifyAll();
+            }
+        }
+
         /**
          * <p>
          * Loop, executing targets as they are received.
             boolean ran = false;
             while (run) {
                 try {
-                    if (runnable == null) {
-                        runnable = tp.getNextRunnable(ran);
+                    synchronized(this) {
+                        while (runnable == null && run) {
+                            this.wait(500);
+                        }
                     }
 
                     if (runnable != null) {
                         ran = true;
                         runnable.run();
-                    } else {
-                        ran = false;
                     }
                 } catch (InterruptedException unblock) {
                     // do nothing (loop will terminate if shutdown() was called
                         // ignore to help with a tomcat glitch
                     }
                 } finally {
+                    runnable = null;
+                    // repair the thread in case the runnable mucked it up...
+                    if(getPriority() != tp.getThreadPriority())
+                        setPriority(tp.getThreadPriority());
+
                     if (runOnce) {
                         run = false;
                     }
-                    runnable = null;
+                    else if(ran) {
+                        ran = false;
+                        makeAvailable(this);
+                    }
 
-                    // repair the thread in case the runnable mucked it up...
-                    setPriority(tp.getThreadPriority());
                 }
             }
 
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.