Commits

jhouse  committed ae2c3bc

QUARTZ-668, QUARTZ-669: fixes to threading issues with multicore/multicpu systems.

git-svn-id: http://svn.opensymphony.com/svn/quartz/trunk@82469f7d36a-ea1c-0410-88ea-9fd03e4c9665

  • Participants
  • Parent commits f5c0893

Comments (0)

Files changed (2)

File src/java/org/quartz/core/QuartzSchedulerThread.java

             if (paused) {
                 signalSchedulingChange(0);
             } else {
-                sigLock.notify();
+                sigLock.notifyAll();
             }
         }
     }
             halted = true;
 
             if (paused) {
-                sigLock.notify();
+                sigLock.notifyAll();
             } else {
                 signalSchedulingChange(0);
             }
         synchronized(sigLock) {
             signaled = true;
             signaledNextFireTime = candidateNewNextFireTime;
+            sigLock.notifyAll();
         }
     }
 
                     while (paused && !halted) {
                         try {
                             // wait until togglePause(false) is called...
-                            sigLock.wait(100L);
+                            sigLock.wait(1000L);
                         } catch (InterruptedException ignore) {
                         }
                     }
                         now = System.currentTimeMillis();
                         long triggerTime = trigger.getNextFireTime().getTime();
                         long timeUntilTrigger = triggerTime - now;
-                        long spinInterval = 10;
-
-                        // this looping may seem a bit silly, but it's the
-                        // current work-around
-                        // for a dead-lock that can occur if the Thread.sleep()
-                        // is replaced with
-                        // a obj.wait() that gets notified when the signal is
-                        // set...
-                        // so to be able to detect the signal change without
-                        // sleeping the entire
-                        // timeUntilTrigger, we spin here... don't worry
-                        // though, this spinning
-                        // doesn't even register 0.2% cpu usage on a pentium 4.
-                        long numPauses = (timeUntilTrigger / spinInterval);
-                        while (numPauses >= 0) {
-
-                            try {
-                                Thread.sleep(spinInterval);
-                            } catch (InterruptedException ignore) {
-                            }
-                            
-                            if (isScheduleChanged()) {
-                            	if(isCandidateNewTimeEarlierWithinReason(triggerTime)) {
-                            		// above call does a clearSignaledSchedulingChange()
-                            		try {
-    	                                qsRsrcs.getJobStore().releaseAcquiredTrigger(
-    	                                        ctxt, trigger);
-    	                            } catch (JobPersistenceException jpe) {
-    	                                qs.notifySchedulerListenersError(
-    	                                        "An error occured while releasing trigger '"
-    	                                                + trigger.getFullName() + "'",
-    	                                        jpe);
-    	                                // db connection must have failed... keep
-    	                                // retrying until it's up...
-    	                                releaseTriggerRetryLoop(trigger);
-    	                            } catch (RuntimeException e) {
-    	                                getLog().error(
-    	                                    "releaseTriggerRetryLoop: RuntimeException "
-    	                                    +e.getMessage(), e);
-    	                                // db connection must have failed... keep
-    	                                // retrying until it's up...
-    	                                releaseTriggerRetryLoop(trigger);
-    	                            }
-    	                            trigger = null;
-    	                            break;
-                            	}
-                            }
-
-                            now = System.currentTimeMillis();
-                            timeUntilTrigger = triggerTime - now;
-                            numPauses = (timeUntilTrigger / spinInterval);
+                        if(timeUntilTrigger > 5) {
+	                        synchronized(sigLock) {
+		                        try {
+		                            sigLock.wait(timeUntilTrigger);
+		                        } catch (InterruptedException ignore) {
+		                        }
+		                        
+		                        if (isScheduleChanged()) {
+		                        	if(isCandidateNewTimeEarlierWithinReason(triggerTime)) {
+		                        		// above call does a clearSignaledSchedulingChange()
+		                        		try {
+			                                qsRsrcs.getJobStore().releaseAcquiredTrigger(
+			                                        ctxt, trigger);
+			                            } catch (JobPersistenceException jpe) {
+			                                qs.notifySchedulerListenersError(
+			                                        "An error occured while releasing trigger '"
+			                                                + trigger.getFullName() + "'",
+			                                        jpe);
+			                                // db connection must have failed... keep
+			                                // retrying until it's up...
+			                                releaseTriggerRetryLoop(trigger);
+			                            } catch (RuntimeException e) {
+			                                getLog().error(
+			                                    "releaseTriggerRetryLoop: RuntimeException "
+			                                    +e.getMessage(), e);
+			                                // db connection must have failed... keep
+			                                // retrying until it's up...
+			                                releaseTriggerRetryLoop(trigger);
+			                            }
+			                            trigger = null;
+			                            continue;
+		                        	}
+		                        }
+	                        }
                         }
-
-                        if(trigger == null)
-                        	continue;
                         
                         // set trigger to 'executing'
                         TriggerFiredBundle bndle = null;
                                     releaseTriggerRetryLoop(trigger);
                                 }
                             }
-
-                            // it's possible to get 'null' if the trigger was paused,
-                            // blocked, or other similar occurences that prevent it being
-                            // fired at this time...  or if the scheduler was shutdown (halted)
-                            if (bndle == null) {
-                                try {
-                                    qsRsrcs.getJobStore().releaseAcquiredTrigger(ctxt,
-                                            trigger);
-                                } catch (SchedulerException se) {
-                                    qs.notifySchedulerListenersError(
-                                            "An error occured while releasing trigger '"
-                                                    + trigger.getFullName() + "'", se);
-                                    // db connection must have failed... keep retrying
-                                    // until it's up...
-                                    releaseTriggerRetryLoop(trigger);
-                                }
-                                continue;
+                        }
+                        
+                        // it's possible to get 'null' if the trigger was paused,
+                        // blocked, or other similar occurences that prevent it being
+                        // fired at this time...  or if the scheduler was shutdown (halted)
+                        if (bndle == null) {
+                            try {
+                                qsRsrcs.getJobStore().releaseAcquiredTrigger(ctxt,
+                                        trigger);
+                            } catch (SchedulerException se) {
+                                qs.notifySchedulerListenersError(
+                                        "An error occured while releasing trigger '"
+                                                + trigger.getFullName() + "'", se);
+                                // db connection must have failed... keep retrying
+                                // until it's up...
+                                releaseTriggerRetryLoop(trigger);
                             }
+                            continue;
+                        }
 
-                            // TODO: improvements:
-                            //
-                            // 2- make sure we can get a job runshell before firing trigger, or
-                            //   don't let that throw an exception (right now it never does,
-                            //   but the signature says it can).
-                            // 3- acquire more triggers at a time (based on num threads available?)
+                        // TODO: improvements:
+                        //
+                        // 2- make sure we can get a job runshell before firing trigger, or
+                        //   don't let that throw an exception (right now it never does,
+                        //   but the signature says it can).
+                        // 3- acquire more triggers at a time (based on num threads available?)
 
 
-                            JobRunShell shell = null;
+                        JobRunShell shell = null;
+                        try {
+                            shell = qsRsrcs.getJobRunShellFactory().borrowJobRunShell();
+                            shell.initialize(qs, bndle);
+                        } catch (SchedulerException se) {
                             try {
-                                shell = qsRsrcs.getJobRunShellFactory().borrowJobRunShell();
-                                shell.initialize(qs, bndle);
-                            } catch (SchedulerException se) {
-                                try {
-                                    qsRsrcs.getJobStore().triggeredJobComplete(ctxt,
-                                            trigger, bndle.getJobDetail(), Trigger.INSTRUCTION_SET_ALL_JOB_TRIGGERS_ERROR);
-                                } catch (SchedulerException se2) {
-                                    qs.notifySchedulerListenersError(
-                                            "An error occured while placing job's triggers in error state '"
-                                                    + trigger.getFullName() + "'", se2);
-                                    // db connection must have failed... keep retrying
-                                    // until it's up...
-                                    errorTriggerRetryLoop(bndle);
-                                }
-                                continue;
+                                qsRsrcs.getJobStore().triggeredJobComplete(ctxt,
+                                        trigger, bndle.getJobDetail(), Trigger.INSTRUCTION_SET_ALL_JOB_TRIGGERS_ERROR);
+                            } catch (SchedulerException se2) {
+                                qs.notifySchedulerListenersError(
+                                        "An error occured while placing job's triggers in error state '"
+                                                + trigger.getFullName() + "'", se2);
+                                // db connection must have failed... keep retrying
+                                // until it's up...
+                                errorTriggerRetryLoop(bndle);
                             }
+                            continue;
+                        }
 
-                            if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
-                                try {
-                                    // this case should never happen, as it is indicative of the
-                                    // scheduler being shutdown or a bug in the thread pool or
-                                    // a thread pool being used concurrently - which the docs
-                                    // say not to do...
-                                    getLog().error("ThreadPool.runInThread() return false!");
-                                    qsRsrcs.getJobStore().triggeredJobComplete(ctxt,
-                                            trigger, bndle.getJobDetail(), Trigger.INSTRUCTION_SET_ALL_JOB_TRIGGERS_ERROR);
-                                } catch (SchedulerException se2) {
-                                    qs.notifySchedulerListenersError(
-                                            "An error occured while placing job's triggers in error state '"
-                                                    + trigger.getFullName() + "'", se2);
-                                    // db connection must have failed... keep retrying
-                                    // until it's up...
-                                    releaseTriggerRetryLoop(trigger);
-                                }
+                        if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
+                            try {
+                                // this case should never happen, as it is indicative of the
+                                // scheduler being shutdown or a bug in the thread pool or
+                                // a thread pool being used concurrently - which the docs
+                                // say not to do...
+                                getLog().error("ThreadPool.runInThread() return false!");
+                                qsRsrcs.getJobStore().triggeredJobComplete(ctxt,
+                                        trigger, bndle.getJobDetail(), Trigger.INSTRUCTION_SET_ALL_JOB_TRIGGERS_ERROR);
+                            } catch (SchedulerException se2) {
+                                qs.notifySchedulerListenersError(
+                                        "An error occured while placing job's triggers in error state '"
+                                                + trigger.getFullName() + "'", se2);
+                                // db connection must have failed... keep retrying
+                                // until it's up...
+                                releaseTriggerRetryLoop(trigger);
                             }
                         }
 
                     continue; // should never happen, if threadPool.blockForAvailableThreads() follows contract
                 }
 
-                // this looping may seem a bit silly, but it's the current
-                // work-around
-                // for a dead-lock that can occur if the Thread.sleep() is replaced
-                // with
-                // a obj.wait() that gets notified when the signal is set...
-                // so to be able to detect the signal change without sleeping the
-                // entier
-                // getRandomizedIdleWaitTime(), we spin here... don't worry though,
-                // the
-                // CPU usage of this spinning can't even be measured on a pentium
-                // 4.
                 long now = System.currentTimeMillis();
                 long waitTime = now + getRandomizedIdleWaitTime();
                 long timeUntilContinue = waitTime - now;
-                long spinInterval = 10;
-                long numPauses = (timeUntilContinue / spinInterval);
-    
-                while (numPauses > 0) {
-                	if(isScheduleChanged()) 
-                		break;
-    
-                    try {
-                        Thread.sleep(10L);
-                    } catch (InterruptedException ignore) {
-                    }
-    
-                    now = System.currentTimeMillis();
-                    timeUntilContinue = waitTime - now;
-                    numPauses = (timeUntilContinue / spinInterval);
+                synchronized(sigLock) {
+                	try {
+						sigLock.wait(timeUntilContinue);
+					} catch (InterruptedException ignore) {
+					}
                 }
+
             } catch(RuntimeException re) {
                 getLog().error("Runtime error occured in main trigger firing loop.", re);
             }

File src/java/org/quartz/simpl/SimpleThreadPool.java

          * </p>
          */
         void shutdown() {
-            run = false;
-
-            // Javadoc mentions that it interrupts blocked I/O operations as
-            // well. Hence the job will most likely fail. I think we should
-            // shut the work thread gracefully, by letting the job finish
-            // uninterrupted. See SimpleThreadPool.shutdown()
-            //interrupt();
+        	synchronized (this) {
+        		run = false;
+        	}
         }
 
         public void run(Runnable newRunnable) {
          * </p>
          */
         public void run() {
-            boolean runOnce = (runnable != null);
-
             boolean ran = false;
-            while (run) {
+        	boolean runOnce = false;
+            boolean shouldRun = false;
+            synchronized(this) {
+            	runOnce = (runnable != null);
+            	shouldRun = run;
+            }
+            
+            while (shouldRun) {
                 try {
                     synchronized(this) {
                         while (runnable == null && run) {
                         // ignore to help with a tomcat glitch
                     }
                 } finally {
-                    runnable = null;
+                    synchronized(this) {
+                    	runnable = null;
+                    }
                     // repair the thread in case the runnable mucked it up...
                     if(getPriority() != tp.getThreadPriority()) {
                         setPriority(tp.getThreadPriority());
                     }
 
                     if (runOnce) {
-                        run = false;
+                        synchronized(this) {
+                        	run = false;
+                        }
                     } else if(ran) {
                         ran = false;
                         makeAvailable(this);
                     }
 
                 }
+
+                // read value of run within synchronized block to be 
+                // sure of its value
+                synchronized(this) {
+                	shouldRun = run;
+                }
             }
 
             //if (log.isDebugEnabled())