Commits

jhouse  committed ee101cc

Issue number: QUARTZ-282
Obtained from:
Submitted by:
Reviewed by:
CVS: ----------------------------------------------------------------------
CVS: Issue number:
CVS: If this change addresses one or more issues,
CVS: then enter the issue number(s) here.
CVS: Obtained from:
CVS: If this change has been taken from another system,
CVS: then name the system in this line, otherwise delete it.
CVS: Submitted by:
CVS: If this code has been contributed to the project by someone else; i.e.,
CVS: they sent us a patch or a set of diffs, then include their name/email
CVS: address here. If this is your work then delete this line.
CVS: Reviewed by:
CVS: If we are doing pre-commit code reviews and someone else has
CVS: reviewed your changes, include their name(s) here.
CVS: If you have not had it reviewed then delete this line.

git-svn-id: http://svn.opensymphony.com/svn/quartz/branches/b_quartz_1-5-x@31269f7d36a-ea1c-0410-88ea-9fd03e4c9665

  • Participants
  • Parent commits 804268b
  • Branches b_quartz_1-5-x

Comments (0)

Files changed (3)

File src/java/org/quartz/impl/jdbcjobstore/JobStoreCMT.java

         try {
             conn = getNonManagedTXConnection();
 
-            getLockHandler().obtainLock(conn, LOCK_STATE_ACCESS);
-            transStateOwner = true;
-
+            // checkin, and make sure there is work to be done before we aquire 
+        	// the lock (since that is expensive, and almost never occurs)
             List failedRecords = clusterCheckIn(conn);
-
             if (failedRecords.size() > 0) {
-                getLockHandler().obtainLock(conn, LOCK_TRIGGER_ACCESS);
-                //getLockHandler().obtainLock(conn, LOCK_JOB_ACCESS);
-                transOwner = true;
-
-                clusterRecover(conn, failedRecords);
-                recovered = true;
+                getLockHandler().obtainLock(conn, LOCK_STATE_ACCESS);
+                transStateOwner = true;
+                
+                // Now that we own the lock, make sure we still have work to do. 
+                failedRecords = findFailedInstances(conn);
+    
+                if (failedRecords.size() > 0) {
+                    getLockHandler().obtainLock(conn, LOCK_TRIGGER_ACCESS);
+                    //getLockHandler().obtainLock(conn, LOCK_JOB_ACCESS);
+                    transOwner = true;
+    
+                    clusterRecover(conn, failedRecords);
+                    recovered = true;
+                }
             }
-
             conn.commit();
         } catch (JobPersistenceException e) {
             rollbackConnection(conn);

File src/java/org/quartz/impl/jdbcjobstore/JobStoreSupport.java

 
     protected long lastCheckin = System.currentTimeMillis();
 
-    protected List clusterCheckIn(Connection conn)
+    /**
+     * Get a list of all scheduler instances in the cluster that may have failed.
+     * This includes this scheduler if it has no recoverer and is checking for the
+     * first time.
+     */
+    protected List findFailedInstances(Connection conn)
             throws JobPersistenceException {
-
-        List states = null;
+    
         List failedInstances = new LinkedList();
         SchedulerStateRecord myLastState = null;
         boolean selfFailed = false;
-
+        
         long timeNow = System.currentTimeMillis();
-
+        
         try {
-            states = getDelegate().selectSchedulerStateRecords(conn, null);
-
+            List states = getDelegate().selectSchedulerStateRecords(conn, null);
+        
             Iterator itr = states.iterator();
             while (itr.hasNext()) {
                 SchedulerStateRecord rec = (SchedulerStateRecord) itr.next();
-
+        
                 // find own record...
                 if (rec.getSchedulerInstanceId().equals(getInstanceId())) {
                     myLastState = rec;
+        
+                    // TODO: revisit when handle self-failed-out impled (see TODO in clusterCheckIn() below)
+        //            if (rec.getRecoverer() != null && !firstCheckIn) {
+        //                selfFailed = true;
+        //            }
 
-                    // TODO: revisit when handle self-failed-out impled (see TODO below)
-//                    if (rec.getRecoverer() != null && !firstCheckIn) {
-//                        selfFailed = true;
-//                    }
-//                    if (rec.getRecoverer() == null && firstCheckIn) {
-//                        failedInstances.add(rec);
-//                    }
-                  if (rec.getRecoverer() == null) {
+                    if (rec.getRecoverer() == null && firstCheckIn) {
                       failedInstances.add(rec);
-                  }
-
+                    }
+                    firstCheckIn = false;
                 } else {
                     // find failed instances...
                     long failedIfAfter =
                         rec.getCheckinTimestamp() +
                         Math.max(rec.getCheckinInterval(), (System.currentTimeMillis() - lastCheckin)) +
                         7500L;
-
+        
                     if (failedIfAfter < timeNow && rec.getRecoverer() == null) {
                         failedInstances.add(rec);
                     }
                 }
             }
+        } catch (Exception e) {
+            lastCheckin = System.currentTimeMillis();
+            throw new JobPersistenceException("Failure identifying failed instances when checking-in: "
+                    + e.getMessage(), e);
+        }
+    
+        return failedInstances;
+    }
+    
+    
+    protected List clusterCheckIn(Connection conn)
+            throws JobPersistenceException {
 
+        List failedInstances = findFailedInstances(conn);
+        
+        try {
             // TODO: handle self-failed-out
 
             // check in...
             lastCheckin = System.currentTimeMillis();
-            if(firstCheckIn) {
-                getDelegate().deleteSchedulerState(conn, getInstanceId());
+            if(getDelegate().updateSchedulerState(conn, getInstanceId(), lastCheckin) == 0) {
                 getDelegate().insertSchedulerState(conn, getInstanceId(),
                         lastCheckin, getClusterCheckinInterval(), null);
-                firstCheckIn = false;
-            }
-            else {
-                getDelegate().updateSchedulerState(conn, getInstanceId(), lastCheckin);
             }
             
         } catch (Exception e) {
             lastCheckin = System.currentTimeMillis();
-            throw new JobPersistenceException("Failure checking-in: "
+            throw new JobPersistenceException("Failure updating scheduler state when checking-in: "
                     + e.getMessage(), e);
         }
 

File src/java/org/quartz/impl/jdbcjobstore/JobStoreTX.java

         boolean recovered = false;
 
         try {
-            getLockHandler().obtainLock(conn, LOCK_STATE_ACCESS);
-            transStateOwner = true;
-
+            // checkin, and make sure there is work to be done before we aquire 
+        	// the lock (since that is expensive, and almost never occurs)
             List failedRecords = clusterCheckIn(conn);
-
             if (failedRecords.size() > 0) {
-                getLockHandler().obtainLock(conn, LOCK_TRIGGER_ACCESS);
-                //getLockHandler().obtainLock(conn, LOCK_JOB_ACCESS);
-                transOwner = true;
-
-                clusterRecover(conn, failedRecords);
-                recovered = true;
+                getLockHandler().obtainLock(conn, LOCK_STATE_ACCESS);
+                transStateOwner = true;
+    
+                // Now that we own the lock, make sure we still have work to do. 
+                failedRecords = findFailedInstances(conn);
+    
+                if (failedRecords.size() > 0) {
+                    getLockHandler().obtainLock(conn, LOCK_TRIGGER_ACCESS);
+                    //getLockHandler().obtainLock(conn, LOCK_JOB_ACCESS);
+                    transOwner = true;
+    
+                    clusterRecover(conn, failedRecords);
+                    recovered = true;
+                }
             }
-
             commitConnection(conn);
         } catch (JobPersistenceException e) {
             rollbackConnection(conn);