1. opensymphony
  2. quartz

Commits

jhouse  committed bb2aabc

QUARTZ-632: Scheduling change causes a paused/blocked scheduler to do unnecessary trigger release

git-svn-id: http://svn.opensymphony.com/svn/quartz/trunk@79369f7d36a-ea1c-0410-88ea-9fd03e4c9665

  • Participants
  • Parent commits da452d2
  • Branches master

Comments (0)

Files changed (10)

File src/java/org/quartz/core/JobRunShell.java

View file
  • Ignore whitespace
                 break;
             } while (true);
     
-            qs.notifySchedulerThread();
+            qs.notifySchedulerThread(0L);
         } finally {
             jobRunShellFactory.returnJobRunShell(this);
         }
         public VetoedException() {
         }
     }
+
 }

File src/java/org/quartz/core/QuartzScheduler.java

View file
  • Ignore whitespace
         errLogger = new ErrorLogger();
         addSchedulerListener(errLogger);
 
-        signaler = new SchedulerSignalerImpl(this);
+        signaler = new SchedulerSignalerImpl(this, this.schedThread);
         
         getLog().info("Quartz Scheduler v." + getVersion() + " created.");
     }
         }
 
         resources.getJobStore().storeJobAndTrigger(ctxt, jobDetail, trigger);
-        notifySchedulerThread();
+        notifySchedulerThread(trigger.getNextFireTime().getTime());
         notifySchedulerListenersSchduled(trigger);
 
         return ft;
         }
 
         resources.getJobStore().storeTrigger(ctxt, trigger, false);
-        notifySchedulerThread();
+        notifySchedulerThread(trigger.getNextFireTime().getTime());
         notifySchedulerListenersSchduled(trigger);
 
         return ft;
         }
         
         if (resources.getJobStore().removeTrigger(ctxt, triggerName, groupName)) {
-            notifySchedulerThread();
+            notifySchedulerThread(0L);
             notifySchedulerListenersUnschduled(triggerName, groupName);
         } else {
             return false;
         }
         
         if (resources.getJobStore().replaceTrigger(ctxt, triggerName, groupName, newTrigger)) {
-            notifySchedulerThread();
+            notifySchedulerThread(newTrigger.getNextFireTime().getTime());
             notifySchedulerListenersUnschduled(triggerName, groupName);
             notifySchedulerListenersSchduled(newTrigger);
         } else {
             }
         }
 
-        notifySchedulerThread();
+        notifySchedulerThread(trig.getNextFireTime().getTime());
         notifySchedulerListenersSchduled(trig);
     }
 
             }
         }
 
-        notifySchedulerThread();
+        notifySchedulerThread(trig.getNextFireTime().getTime());
         notifySchedulerListenersSchduled(trig);
     }
 
         }
         
         resources.getJobStore().pauseTrigger(ctxt, triggerName, groupName);
-        notifySchedulerThread();
+        notifySchedulerThread(0L);
         notifySchedulerListenersPausedTrigger(triggerName, groupName);
     }
 
         }
         
         resources.getJobStore().pauseTriggerGroup(ctxt, groupName);
-        notifySchedulerThread();
+        notifySchedulerThread(0L);
         notifySchedulerListenersPausedTrigger(null, groupName);
     }
 
         }
 
         resources.getJobStore().pauseJob(ctxt, jobName, groupName);
-        notifySchedulerThread();
+        notifySchedulerThread(0L);
         notifySchedulerListenersPausedJob(jobName, groupName);
     }
 
         }
         
         resources.getJobStore().pauseJobGroup(ctxt, groupName);
-        notifySchedulerThread();
+        notifySchedulerThread(0L);
         notifySchedulerListenersPausedJob(null, groupName);
     }
 
         }
         
         resources.getJobStore().resumeTrigger(ctxt, triggerName, groupName);
-        notifySchedulerThread();
+        notifySchedulerThread(0L);
         notifySchedulerListenersResumedTrigger(triggerName, groupName);
     }
 
         }
         
         resources.getJobStore().resumeTriggerGroup(ctxt, groupName);
-        notifySchedulerThread();
+        notifySchedulerThread(0L);
         notifySchedulerListenersResumedTrigger(null, groupName);
     }
 
         }
         
         resources.getJobStore().resumeJob(ctxt, jobName, groupName);
-        notifySchedulerThread();
+        notifySchedulerThread(0L);
         notifySchedulerListenersResumedJob(jobName, groupName);
     }
 
         }
         
         resources.getJobStore().resumeJobGroup(ctxt, groupName);
-        notifySchedulerThread();
+        notifySchedulerThread(0L);
         notifySchedulerListenersResumedJob(null, groupName);
     }
 
         validateState();
 
         resources.getJobStore().pauseAll(ctxt);
-        notifySchedulerThread();
+        notifySchedulerThread(0L);
         notifySchedulerListenersPausedTrigger(null, null);
     }
 
         validateState();
 
         resources.getJobStore().resumeAll(ctxt);
-        notifySchedulerThread();
+        notifySchedulerThread(0L);
         notifySchedulerListenersResumedTrigger(null, null);
     }
 
         resources.getJobStore().triggeredJobComplete(ctxt, trigger, detail, instCode);
     }
 
-    protected void notifySchedulerThread() {
+    protected void notifySchedulerThread(long candidateNewNextFireTime) {
         if (isSignalOnSchedulingChange()) {
-            schedThread.signalSchedulingChange();
+            signaler.signalSchedulingChange(candidateNewNextFireTime);
         }
     }
 

File src/java/org/quartz/core/QuartzSchedulerResources.java

View file
  • Ignore whitespace
             ",name=" + schedName + 
             ",instance=" + schedInstId;
     }
+
 }

File src/java/org/quartz/core/QuartzSchedulerThread.java

View file
  • Ignore whitespace
     private Object sigLock = new Object();
 
     private boolean signaled;
-
+    private long signaledNextFireTime;
+    
     private boolean paused;
 
     private boolean halted;
             paused = pause;
 
             if (paused) {
-                signalSchedulingChange();
+                signalSchedulingChange(0);
             } else {
                 sigLock.notify();
             }
             if (paused) {
                 sigLock.notify();
             } else {
-                signalSchedulingChange();
+                signalSchedulingChange(0);
             }
         }
     }
      * made - in order to interrupt any sleeping that may be occuring while
      * waiting for the fire time to arrive.
      * </p>
+     *
+     * @param newNextTime the time (in millis) when the newly scheduled trigger
+     * will fire.  If this method is being called do to some other even (rather
+     * than scheduling a trigger), the caller should pass zero (0).
      */
-    void signalSchedulingChange() {
+    public void signalSchedulingChange(long candidateNewNextFireTime) {
         synchronized(sigLock) {
             signaled = true;
+            signaledNextFireTime = candidateNewNextFireTime;
         }
     }
 
-    private void clearSignaledSchedulingChange() {
+    public void clearSignaledSchedulingChange() {
         synchronized(sigLock) {
             signaled = false;
+            signaledNextFireTime = 0;
         }
     }
 
-    private boolean isScheduleChanged() {
+    public boolean isScheduleChanged() {
         synchronized(sigLock) {
             return signaled;
         }
     }
 
+    public long getSignaledNextFireTime() {
+        synchronized(sigLock) {
+            return signaledNextFireTime;
+        }
+    }
+
     /**
      * <p>
      * The main processing loop of the <code>QuartzSchedulerThread</code>.
                         // though, this spinning
                         // doesn't even register 0.2% cpu usage on a pentium 4.
                         long numPauses = (timeUntilTrigger / spinInterval);
-                        while (numPauses >= 0 && !isScheduleChanged()) {
+                        while (numPauses >= 0) {
 
                             try {
                                 Thread.sleep(spinInterval);
                             } catch (InterruptedException ignore) {
                             }
+                            
+                            if (isScheduleChanged()) {
+                            	if(isCandidateNewTimeEarlierWithinReason(triggerTime)) {
+                            		// above call does a clearSignaledSchedulingChange()
+                            		try {
+    	                                qsRsrcs.getJobStore().releaseAcquiredTrigger(
+    	                                        ctxt, trigger);
+    	                            } catch (JobPersistenceException jpe) {
+    	                                qs.notifySchedulerListenersError(
+    	                                        "An error occured while releasing trigger '"
+    	                                                + trigger.getFullName() + "'",
+    	                                        jpe);
+    	                                // db connection must have failed... keep
+    	                                // retrying until it's up...
+    	                                releaseTriggerRetryLoop(trigger);
+    	                            } catch (RuntimeException e) {
+    	                                getLog().error(
+    	                                    "releaseTriggerRetryLoop: RuntimeException "
+    	                                    +e.getMessage(), e);
+    	                                // db connection must have failed... keep
+    	                                // retrying until it's up...
+    	                                releaseTriggerRetryLoop(trigger);
+    	                            }
+    	                            trigger = null;
+    	                            break;
+                            	}
+                            }
 
                             now = System.currentTimeMillis();
                             timeUntilTrigger = triggerTime - now;
                             numPauses = (timeUntilTrigger / spinInterval);
                         }
-                        if (isScheduleChanged()) {
-                            try {
-                                qsRsrcs.getJobStore().releaseAcquiredTrigger(
-                                        ctxt, trigger);
-                            } catch (JobPersistenceException jpe) {
-                                qs.notifySchedulerListenersError(
-                                        "An error occured while releasing trigger '"
-                                                + trigger.getFullName() + "'",
-                                        jpe);
-                                // db connection must have failed... keep
-                                // retrying until it's up...
-                                releaseTriggerRetryLoop(trigger);
-                            } catch (RuntimeException e) {
-                                getLog().error(
-                                    "releaseTriggerRetryLoop: RuntimeException "
-                                    +e.getMessage(), e);
-                                // db connection must have failed... keep
-                                // retrying until it's up...
-                                releaseTriggerRetryLoop(trigger);
-                            }
-                            clearSignaledSchedulingChange();
-                            continue;
-                        }
 
+                        if(trigger == null)
+                        	continue;
+                        
                         // set trigger to 'executing'
                         TriggerFiredBundle bndle = null;
 
                 long spinInterval = 10;
                 long numPauses = (timeUntilContinue / spinInterval);
     
-                while (numPauses > 0 && !isScheduleChanged()) {
+                while (numPauses > 0) {
+                	if(isScheduleChanged()) 
+                		break;
     
                     try {
                         Thread.sleep(10L);
         qsRsrcs = null;
     }
 
-    public void errorTriggerRetryLoop(TriggerFiredBundle bndle) {
+    private boolean isCandidateNewTimeEarlierWithinReason(long oldTime) {
+    	
+		// So here's the deal: We know due to being signaled that 'the schedule'
+		// has changed.  We may know (if getSignaledNextFireTime() != 0) the
+		// new earliest fire time.  We may not (in which case we will assume
+		// that the new time is earlier than the trigger we have acquired).
+		// In either case, we only want to abandon our acquired trigger and
+		// go looking for a new one if "it's worth it".  It's only worth it if
+		// the time cost incurred to abandon the trigger and acquire a new one 
+		// is less than the time until the currently acquired trigger will fire,
+		// otherwise we're just "thrashing" the job store (e.g. database).
+		//
+		// So the question becomes when is it "worth it"?  This will depend on
+		// the job store implementation (and of course the particular database
+		// or whatever behind it).  Ideally we would depend on the job store 
+		// implementation to tell us the amount of time in which it "thinks"
+		// it can abandon the acquired trigger and acquire a new one.  However
+		// we have no current facility for having it tell us that, so we make
+		// and somewhat educated but arbitrary guess ;-).
+
+    	synchronized(sigLock) {
+			
+			boolean earlier = false;
+			
+			if(getSignaledNextFireTime() == 0)
+				earlier = true;
+			else if(getSignaledNextFireTime() < oldTime )
+				earlier = true;
+			
+			if(earlier) {
+				// so the new time is considered earlier, but is it enough earlier?
+				long diff = System.currentTimeMillis() - oldTime;
+				if(diff < (qsRsrcs.getJobStore().supportsPersistence() ? 120L : 10L))
+					earlier = false;
+			}
+			
+			clearSignaledSchedulingChange();
+			
+			return earlier;
+        }
+	}
+
+	public void errorTriggerRetryLoop(TriggerFiredBundle bndle) {
         int retryCount = 0;
         try {
             while (!halted) {

File src/java/org/quartz/core/SchedulerSignalerImpl.java

View file
  • Ignore whitespace
  */
 package org.quartz.core;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.quartz.SchedulerException;
 import org.quartz.Trigger;
 import org.quartz.spi.SchedulerSignaler;
  */
 public class SchedulerSignalerImpl implements SchedulerSignaler {
 
+	Log log = LogFactory.getLog(SchedulerSignalerImpl.class);
+	
     /*
      * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
      * 
      * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
      */
 
-    private QuartzScheduler sched;
+	protected QuartzScheduler sched;
+    protected QuartzSchedulerThread schedThread;
 
     /*
      * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
      * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
      */
 
-    SchedulerSignalerImpl(QuartzScheduler sched) {
+    public SchedulerSignalerImpl(QuartzScheduler sched, QuartzSchedulerThread schedThread) {
         this.sched = sched;
+        this.schedThread = schedThread;
+        
+		log.info("Initialized Scheduler Signaller of type: " + getClass());
     }
 
     /*
         sched.notifySchedulerListenersFinalized(trigger);
     }
 
-    public void signalSchedulingChange() {
-        sched.notifySchedulerThread();
+    public void signalSchedulingChange(long candidateNewNextFireTime) {
+        schedThread.signalSchedulingChange(candidateNewNextFireTime);
     }
 
 }

File src/java/org/quartz/ee/jta/UserTransactionHelper.java

View file
  • Ignore whitespace
 import javax.transaction.NotSupportedException;
 import javax.transaction.RollbackException;
 import javax.transaction.SystemException;
+import javax.transaction.TransactionManager;
 import javax.transaction.UserTransaction;
 
 import org.apache.commons.logging.Log;
      */
 
     public static final String DEFAULT_USER_TX_LOCATION = "java:comp/UserTransaction";
-
     /*
      * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
      * 
      */
 
     private static String userTxURL = DEFAULT_USER_TX_LOCATION;
-
+    
+    private static Log log = LogFactory.getLog(UserTransactionHelper.class);
     /*
      * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
      * 
         }
     }
 
+
     /**
      * This class wraps a UserTransaction with the InitialContext that was used
      * to look it up, so that when the UserTransaction is returned to the 
     private static class UserTransactionWithContext implements UserTransaction {
         InitialContext context;
         UserTransaction userTransaction;
+        TransactionManager tm;
         
         public UserTransactionWithContext() throws SchedulerException {
             try {

File src/java/org/quartz/impl/StdSchedulerFactory.java

View file
  • Ignore whitespace
         long dbFailureRetry = -1;
         String classLoadHelperClass;
         String jobFactoryClass;
-
+        
         SchedulerRepository schedRep = SchedulerRepository.getInstance();
 
         // Get Scheduler Properties
         rsrcs.setMakeSchedulerThreadDaemon(makeSchedulerThreadDaemon);
         rsrcs.setJMXExport(jmxExport);
         rsrcs.setJMXObjectName(jmxObjectName);
-
+        	
         if (rmiExport) {
             rsrcs.setRMIRegistryHost(rmiHost);
             rsrcs.setRMIRegistryPort(rmiPort);
 
         qs = new QuartzScheduler(rsrcs, schedCtxt, idleWaitTime, dbFailureRetry);
 
-        //    if(usingJSCMT)
-        //      qs.setSignalOnSchedulingChange(false); // TODO: fixed? (don't need
-        // this any more?)
-
         // Create Scheduler ref...
         Scheduler scheduler = instantiate(rsrcs, qs);
         

File src/java/org/quartz/impl/jdbcjobstore/JobStoreSupport.java

View file
  • Ignore whitespace
      */
     protected static class RecoverMisfiredJobsResult {
         public static final RecoverMisfiredJobsResult NO_OP =
-            new RecoverMisfiredJobsResult(false, 0);
+            new RecoverMisfiredJobsResult(false, 0, Long.MAX_VALUE);
         
         private boolean _hasMoreMisfiredTriggers;
         private int _processedMisfiredTriggerCount;
+        private long _earliestNewTime;
         
         public RecoverMisfiredJobsResult(
-            boolean hasMoreMisfiredTriggers, int processedMisfiredTriggerCount) {
+            boolean hasMoreMisfiredTriggers, int processedMisfiredTriggerCount, long _earliestNewTime) {
             _hasMoreMisfiredTriggers = hasMoreMisfiredTriggers;
             _processedMisfiredTriggerCount = processedMisfiredTriggerCount;
         }
         public int getProcessedMisfiredTriggerCount() {
             return _processedMisfiredTriggerCount;
         } 
+        public long getEarliestNewTime() {
+            return _earliestNewTime;
+        } 
     }
     
     protected RecoverMisfiredJobsResult recoverMisfiredJobs(
             (recovering) ? -1 : getMaxMisfiresToHandleAtATime();
         
         List misfiredTriggers = new ArrayList();
-        
+        long earliestNewTime = Long.MAX_VALUE;
         // We must still look for the MISFIRED state in case triggers were left 
         // in this state when upgrading to this version that does not support it. 
         boolean hasMoreMisfiredTriggers =
 
             doUpdateOfMisfiredTrigger(conn, null, trig, false, STATE_WAITING, recovering);
 
-            signaler.notifySchedulerListenersFinalized(trig);
+            if(trig.getNextFireTime().getTime() < earliestNewTime)
+            	earliestNewTime = trig.getNextFireTime().getTime();
+            
+            signaler.notifyTriggerListenersMisfired(trig);
         }
 
         return new RecoverMisfiredJobsResult(
-                hasMoreMisfiredTriggers, misfiredTriggers.size());
+                hasMoreMisfiredTriggers, misfiredTriggers.size(), earliestNewTime);
     }
 
     protected boolean updateMisfiredTrigger(Connection conn,
         }
     }
 
-    protected void signalSchedulingChange() {
-        signaler.signalSchedulingChange();
+    protected void signalSchedulingChange(long candidateNewNextFireTime) {
+        signaler.signalSchedulingChange(candidateNewNextFireTime);
     }
 
     //---------------------------------------------------------------------------
                 }
 
                 if (!shutdown && this.manage()) {
-                    signalSchedulingChange();
+                    signalSchedulingChange(0L);
                 }
 
             }//while !shutdown
                 RecoverMisfiredJobsResult recoverMisfiredJobsResult = manage();
 
                 if (recoverMisfiredJobsResult.getProcessedMisfiredTriggerCount() > 0) {
-                    signalSchedulingChange();
+                    signalSchedulingChange(recoverMisfiredJobsResult.getEarliestNewTime());
                 }
 
                 if (!shutdown) {

File src/java/org/quartz/spi/SchedulerSignaler.java

View file
  • Ignore whitespace
 
     void notifySchedulerListenersFinalized(Trigger trigger);
 
-    void signalSchedulingChange();
+    void signalSchedulingChange(long candidateNewNextFireTime);
 
 }

File src/test/java/org/quartz/simpl/RAMJobStoreTest.java

View file
  • Ignore whitespace
             fMisfireCount++;
         }
 
-        public void signalSchedulingChange() {
+        public void signalSchedulingChange(long candidateNewNextFireTime) {
         }
 
         public void notifySchedulerListenersFinalized(Trigger trigger) {