Commits

Anonymous committed c51a2df

Fix remaining cluster checking / recover issues after previous refactoring.
CVS: ----------------------------------------------------------------------
CVS: Issue number:
CVS: If this change addresses one or more issues,
CVS: then enter the issue number(s) here.
CVS: Obtained from:
CVS: If this change has been taken from another system,
CVS: then name the system in this line, otherwise delete it.
CVS: Submitted by:
CVS: If this code has been contributed to the project by someone else; i.e.,
CVS: they sent us a patch or a set of diffs, then include their name/email
CVS: address here. If this is your work then delete this line.
CVS: Reviewed by:
CVS: If we are doing pre-commit code reviews and someone else has
CVS: reviewed your changes, include their name(s) here.
CVS: If you have not had it reviewed then delete this line.

git-svn-id: http://svn.opensymphony.com/svn/quartz/branches/b_quartz_1-5-x@32069f7d36a-ea1c-0410-88ea-9fd03e4c9665

Comments (0)

Files changed (2)

src/java/org/quartz/impl/jdbcjobstore/DriverDelegate.java

      * @param conn
      *          the DB Connection
      */
-    public List selectSchedulerStateRecords(Connection conn, String instanceId)
+    public List selectSchedulerStateRecords(Connection conn, String instanceId) // TODO: this method would be more handy if it returned a map.
             throws SQLException;
 
 }

src/java/org/quartz/impl/jdbcjobstore/JobStoreSupport.java

         
         try {
             List states = getDelegate().selectSchedulerStateRecords(conn, null);
-        
+            // build map of states by Id...
+            HashMap statesById = new HashMap();
             Iterator itr = states.iterator();
             while (itr.hasNext()) {
                 SchedulerStateRecord rec = (SchedulerStateRecord) itr.next();
+            	statesById.put(rec.getSchedulerInstanceId(), rec);
+            }        
+
+            itr = states.iterator();
+            while (itr.hasNext()) {
+                SchedulerStateRecord rec = (SchedulerStateRecord) itr.next();
         
                 // find own record...
                 if (rec.getSchedulerInstanceId().equals(getInstanceId())) {
         //                selfFailed = true;
         //            }
 
-                    if (rec.getRecoverer() == null && firstCheckIn) {
-                      failedInstances.add(rec);
+                    if (firstCheckIn) {
+                      if(rec.getRecoverer() == null)
+                    	  failedInstances.add(rec);
+                      // make sure the recoverer hasn't died itself!
+                      SchedulerStateRecord recOrec = (SchedulerStateRecord) 
+                      	  statesById.get(rec.getRecoverer());
+                      long failedIfAfter = timeNow;
+                      if(recOrec != null) {
+                      	failedIfAfter = calcFailedIfAfter(recOrec);
+                      }
+                      // if it has failed, then let's become the recoverer
+                      if( failedIfAfter < timeNow || recOrec == null) {
+                      	failedInstances.add(rec);
+                      }
                     }
                     firstCheckIn = false;
                 } else {
                     // find failed instances...
-                    long failedIfAfter =
-                        rec.getCheckinTimestamp() +
-                        Math.max(rec.getCheckinInterval(), (System.currentTimeMillis() - lastCheckin)) +
-                        7500L;
+                    long failedIfAfter = calcFailedIfAfter(rec);
         
                     if (failedIfAfter < timeNow && rec.getRecoverer() == null) {
                         failedInstances.add(rec);
                     }
+                    else if(rec.getRecoverer() != null) {
+                    	// make sure the recoverer hasn't died itself!
+                        SchedulerStateRecord recOrec = (SchedulerStateRecord) 
+                        	statesById.get(rec.getRecoverer());
+                        failedIfAfter = timeNow;
+                        if(recOrec != null) {
+                        	failedIfAfter = calcFailedIfAfter(recOrec);
+                        }
+                        // if it has failed, then let's become the recoverer
+                        if( failedIfAfter < timeNow || recOrec == null) {
+                        	failedInstances.add(rec);
+                        }
+                    }
                 }
             }
         } catch (Exception e) {
         return failedInstances;
     }
     
+    protected long calcFailedIfAfter(SchedulerStateRecord rec) {
+    	return rec.getCheckinTimestamp() +
+	        Math.max(rec.getCheckinInterval(), 
+	        		(System.currentTimeMillis() - lastCheckin)) +
+	        7500L;
+    }
     
     protected List clusterCheckIn(Connection conn)
             throws JobPersistenceException {
                     if (rec.getSchedulerInstanceId().equals(getInstanceId())) {
                         recoverer = null;
                         checkInTS = System.currentTimeMillis();
+                        getDelegate().insertSchedulerState(conn,
+                                rec.getSchedulerInstanceId(), checkInTS,
+                                rec.getCheckinInterval(), recoverer);
                     }
 
-                    getDelegate().insertSchedulerState(conn,
-                            rec.getSchedulerInstanceId(), checkInTS,
-                            rec.getCheckinInterval(), recoverer);
-
                 }
             } catch (Exception e) {
                 throw new JobPersistenceException("Failure recovering jobs: "
         
         ClusterManager(JobStoreSupport js) {
             this.js = js;
-
+            this.setPriority(Thread.NORM_PRIORITY + 2);
             this.setName("QuartzScheduler_" + instanceName + "-" + instanceId + "_ClusterManager");
         }