Commits

timemachine committed d02c09c

- Fixed multiple schedulers access with database lock error in HibernateDataStore.
(Solution: Hibernate LockOptions(LockMode.PESSIMISTIC_WRITE) to lock SchedulerData before data update and delete.)
- Added new deleteDeadSchedules() to DataStore interface.
- Improved after job task data updates in PollingScheduleRunner.
- Add options to disable missedRun and delete dead schedules in PollingScheduleRunner.

Comments (0)

Files changed (5)

timemachine-hibernate/src/main/java/timemachine/scheduler/hibernate/HibernateDataStore.java

 import java.util.Map;
 import java.util.Properties;
 
+import org.hibernate.LockMode;
+import org.hibernate.LockOptions;
 import org.hibernate.ObjectNotFoundException;
 import org.hibernate.Query;
 import org.hibernate.Session;
 import org.hibernate.SessionFactory;
 import org.hibernate.cfg.Configuration;
-import org.hibernate.exception.LockAcquisitionException;
 import org.hibernate.service.ServiceRegistry;
 import org.hibernate.service.ServiceRegistryBuilder;
 
 	}
 	
 	@SuppressWarnings("unchecked")
-	private <T extends Data> T update(final T data) {
+	private <T extends Data> T update(final Long schedulerId, final T data) {
 		try {
 			if (data instanceof Schedule) {
 				((Schedule)data).setLastModified(new Date());
 			return (T)hbmSessionTemplate.withSessionResult(new SessionResultAction() {	
 				@Override
 				public Object onSession(Session session) {
+					// Get a scheduler wide lock before operation.
+					LockOptions lockOptions = new LockOptions(LockMode.PESSIMISTIC_WRITE);
+					session.get(SchedulerData.class, schedulerId, lockOptions);
+					
 					return session.merge(data);
 				}
 			});
 					", id=" + data.getId(), e);
 		}
 	}	
-	private void delete(final Data data) {
+	private void delete(final Long schedulerId, final Data data) {
 		hbmSessionTemplate.withSession(new SessionAction() {
 			@Override
 			public void onSession(Session session) {
+				// Get a scheduler wide lock before operation.
+				LockOptions lockOptions = new LockOptions(LockMode.PESSIMISTIC_WRITE);
+				session.get(SchedulerData.class, schedulerId, lockOptions);
+				
 				Data entity = (Data)session.load(data.getClass(), data.getId());
 				session.delete(entity);
 			}
 	public SchedulerNode updateSchedulerNode(SchedulerNode schedulerNode) {
 		if (schedulerNode.getId() == null)
 			throw new SchedulerException("Failed to update: missing Id value.");
-		return update(schedulerNode);
+		return update(schedulerNode.getSchedulerData().getId(), schedulerNode);
 	}
 	@Override
 	public void deleteSchedulerData(final SchedulerData schedulerData) {
 			throw new SchedulerException("Failed to delete: missing Id value.");
 		
 		try {
-			delete(schedulerNode);
+			delete(schedulerNode.getSchedulerData().getId(), schedulerNode);
 		} catch (ObjectNotFoundException e) {
 			throw new SchedulerException("Failed to delete SchedulerNode: Id " + schedulerNode.getId() + " not found.");
 		}
 			if (!schedule.isInitOnceDone())
 				schedule.initOnce();
 		}
-		return update(jobDef);
+		return update(jobDef.getSchedulerId(), jobDef);
 	}
 	@Override
 	public void deleteJobDef(JobDef jobDef) {
 			throw new SchedulerException("Failed to delete JobDef: missing Id value.");
 		
 		try {
-			delete(jobDef);
+			delete(jobDef.getSchedulerId(), jobDef);
 		} catch (ObjectNotFoundException e) {
 			throw new SchedulerException("Failed to delete JobDef: Id " + jobDef.getId() + " not found.");
 		}
 	public Schedule updateSchedule(Long schedulerId, Schedule schedule) {
 		if (schedule.getId() == null)
 			throw new SchedulerException("Failed to update Schedule: missing Id value.");
-		return update(schedule);
+		return update(schedulerId, schedule);
 	}
 	@Override
 	public void deleteSchedule(Long schedulerId, Schedule schedule) {
 			throw new SchedulerException("Failed to delete Schedule: missing Id value.");
 		
 		try {
-			delete(schedule);
+			delete(schedulerId, schedule);
 		} catch (ObjectNotFoundException e) {
 			throw new SchedulerException("Failed to delete Schedule: Id " + schedule.getId() + " not found.");
 		}
 	////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
 	@Override
 	public List<Schedule> findAndStageSchedulesToRun(final Long schedulerId, final int maxCount, final Date fromTime, final Date toTime) {
-		logger.debug("Delete dead schedules.");
-		hbmSessionTemplate.withSession(new SessionAction() {			
+		final List<Schedule> processed = new ArrayList<Schedule>(maxCount);
+		hbmSessionTemplate.withSession(new SessionAction() {	
 			@Override
 			public void onSession(Session session) {
-				// TODO: Can we improve this DELETE query? the sub-select might cause performance issue.
-				// Remove any dead Schedule first.
-				String query = "delete Schedule e where e.deleted = :deleted and e.jobDefId in (from JobDef e2 where e2.schedulerId = :schedulerId)";
-				int removedCount = session.createQuery(query)
-						.setParameter("schedulerId", schedulerId)
-						.setParameter("deleted", true)
-						.executeUpdate();
-				logger.debug("Removed {} Schedules that were marked for deletion.", removedCount);
-			}
-		});
-
-		// Now query for next run Schedules.
-		logger.debug("findSchedulesToRun: maxCount={}, fromTime={}, toTime={}", new Object[] { maxCount, fromTime, toTime });
-		final List<Schedule> nextRunSchedules = new ArrayList<Schedule>(maxCount);
-		hbmSessionTemplate.withSession(new SessionAction() {
-			@Override
-			public void onSession(Session session) {
-				String query = appendSchedulerIdWhereClause("select e from Schedule e", Schedule.class, false);
-				query += " and e.deleted = :deleted and e.state = :state and (e.nextRun between :fromTime and :toTime) order by e.nextRun asc";
+				// Get a scheduler wide lock before operation.
+				LockOptions lockOptions = new LockOptions(LockMode.PESSIMISTIC_WRITE);
+				session.get(SchedulerData.class, schedulerId, lockOptions);
+								
+				// Now query for next run Schedules.
+				logger.debug("findSchedulesToRun: maxCount={}, fromTime={}, toTime={}", new Object[] { maxCount, fromTime, toTime });
+				List<Schedule> nextRunSchedules = new ArrayList<Schedule>(maxCount);
+				
+				String query = appendSchedulerIdWhereClause("select e from Schedule e", Schedule.class);
+				query += " and e.state = :state and (e.nextRun between :fromTime and :toTime) order by e.nextRun asc";
 				Query selectScheduleQuery = session
 						.createQuery(query)
 						.setParameter("schedulerId", schedulerId)
-						.setParameter("deleted", false)
 						.setParameter("state", Schedule.State.WAITING)
 						.setParameter("fromTime", fromTime)
 						.setParameter("toTime", toTime).setMaxResults(maxCount);
 						Date nextRun = schedule.getNextRun();
 						if (nextRun == null) {
 							logger.debug("Schedule {} is done and will be removed. There is no more nextRun.", schedule);
-							delete(schedule);
+							session.delete(schedule);
 							continue;
 						}
 						nextRunSchedules.add(schedule);
 					}
 				}
+				
+				// Sort the nextRunSchedules before continue
+				if (nextRunSchedules.size() > 0)
+					Collections.sort(nextRunSchedules, new ScheduleComparator());
+				
+				// Process schedule to be run and move state to STAGING so other may not run it at the same time.
+				for (Schedule schedule : nextRunSchedules) {
+					if (schedule.getState() == Schedule.State.WAITING) {
+						// hibernate should auto update this upon end of this method/tx.
+						schedule.setState(Schedule.State.STAGING);
+						processed.add(schedule);
+					}
+				}				
 			}
 		});
 
-		// Sort the nextRunSchedules before continue
-		if (nextRunSchedules.size() > 0)
-			Collections.sort(nextRunSchedules, new ScheduleComparator());
-		
-		// Try to update it's state to STAGING so other may not run it at the same time.
-		final List<Schedule> processed = new ArrayList<Schedule>(nextRunSchedules.size());
-		for (final Schedule schedule : nextRunSchedules) {
-			try {
-				hbmSessionTemplate.withSession(new SessionAction() {
-					@Override
-					public void onSession(Session session) {
-						// Staging schedule
-						int updatedCount = session
-								.createQuery("update Schedule e set e.state = :newState where e.id = :id and e.state = :state")
-								.setParameter("id", schedule.getId())
-								.setParameter("newState", Schedule.State.STAGING)
-								.setParameter("state", Schedule.State.WAITING)
-								.executeUpdate();
-	
-						// Schedule could be ran by other scheduler, so ensure we can actually updated before collecting in result
-						if (updatedCount >= 1) {
-							// Changing this schedule in stead of re-querying. (which is going to be detached entity for return anyway.)
-							schedule.setState(Schedule.State.STAGING);
-							processed.add(schedule);
-						}
-					}				
-				});
-			} catch (LockAcquisitionException e) {
-				// Do nothing.
-				// This happens if we failed to lock the DB record due to race issue, we simply skip this schedule and continue
-			}
-		}
-		
-		// Return those STAGING schedules
+		// Return those staged schedules
 		return processed;
 	}
 	@Override
 	public void updateMissedRunSchedules(final Long schedulerId, final int maxCount, final Date fromTime) {
-		// Select the missed run schedules
-		List<Schedule> missedRunSchedules =  hbmSessionTemplate.withSessionResult(new SessionResultAction() {
+		hbmSessionTemplate.withSession(new SessionAction() {
 			@Override
-			public Object onSession(Session session) {
+			public void onSession(Session session) {
+				// Get a scheduler wide lock before operation.
+				LockOptions lockOptions = new LockOptions(LockMode.PESSIMISTIC_WRITE);
+				session.get(SchedulerData.class, schedulerId, lockOptions);
 
+				// Select the missed run schedules
+				List<Schedule> missedRunSchedules = new ArrayList<Schedule>(maxCount);
 				String query = appendSchedulerIdWhereClause("select e from Schedule e", Schedule.class);
 				query += " and e.state = :state and e.nextRun <= :fromTime order by e.nextRun asc";
 				Query selectScheduleQuery = session
 						.setParameter("state", Schedule.State.WAITING)
 						.setParameter("fromTime", fromTime)
 						.setMaxResults(maxCount);
-				List<Schedule> result = new ArrayList<Schedule>(maxCount);
-				while (result.size() < maxCount) {
+
+				while (missedRunSchedules.size() < maxCount) {
 					List<?> list = selectScheduleQuery.list();
 
 					// Stop the loop if we no longer have schedule to run
 							continue;
 						}
 
-						result.add(schedule);
+						missedRunSchedules.add(schedule);
 					}
 				}
-				return result;
+
+				// Process the missed run schedules and call its onMissedRun().
+				final Map<Long, JobDef> processedJobDefs = new HashMap<Long, JobDef>();
+				for (Schedule schedule : missedRunSchedules) {
+					Date missedRun = schedule.getNextRun();
+					schedule.onMissedRun();
+					Long jobDefId = schedule.getJobDefId();
+					JobDef jobDef = processedJobDefs.get(jobDefId); 
+					if (jobDef == null) {
+						// retrieve and save it so not to re query if already found.
+						jobDef = (JobDef)session.get(JobDef.class, jobDefId);
+						processedJobDefs.put(jobDefId, jobDef);
+					}
+					jobListenerNotifier.onJobMissedRun(jobDef, schedule);
+					logger.debug("{} missed run: {}, nextRun: {}", new Object[] { schedule, missedRun, schedule.getNextRun() });
+				}
 			}
 		});
-
-		// Process the missed run schedules and call its onMissedRun().
-		final Map<Long, JobDef> processedJobDefs = new HashMap<Long, JobDef>();
-		for (final Schedule schedule : missedRunSchedules) {
-			try {
-				hbmSessionTemplate.withSession(new SessionAction() {
-					@Override
-					public void onSession(Session session) {	
-						// Staging schedule
-						int updatedCount = session
-								.createQuery("update Schedule e set e.state = :newState where e.id = :id and e.state = :state")
-								.setParameter("id", schedule.getId())
-								.setParameter("newState", Schedule.State.STAGING)
-								.setParameter("state", Schedule.State.WAITING)
-								.executeUpdate();
-	
-						// Schedule could have been ran by other scheduler, so ensure we've actually updated in STAGING first.
-						if (updatedCount >= 1) {
-							Schedule stagedSchedule = (Schedule)session.get(Schedule.class, schedule.getId());
-							Date missedRun = stagedSchedule.getNextRun();
-							stagedSchedule.onMissedRun();
-							Long jobDefId = stagedSchedule.getJobDefId();
-							JobDef jobDef = processedJobDefs.get(jobDefId); 
-							if (jobDef == null) {
-								// retrieve and save it so not to re query if already found.
-								jobDef = (JobDef)session.get(JobDef.class, jobDefId);
-								processedJobDefs.put(jobDefId, jobDef);
-							}
-							jobListenerNotifier.onJobMissedRun(jobDef, stagedSchedule);
-							logger.debug("{} missed run: {}, nextRun: {}", new Object[] { stagedSchedule, missedRun, stagedSchedule.getNextRun() });
-		
-							// Unstage schedule after the update.
-							stagedSchedule.setState(Schedule.State.WAITING);
-							session.update(stagedSchedule);
-						}
-					}
-				});
-			} catch (LockAcquisitionException e) {
-				// Do nothing.
-				// This happens if we failed to lock the DB record due to race issue, we simply skip this schedule and continue
-			}
-		}
 	}
 	@Override
 	public Date findEarliestScheduleRunTime(final Long schedulerId) {
 		return hbmSessionTemplate.withSessionResult(new SessionResultAction() {
 			@Override
 			public Object onSession(Session session) {
-				Tuple<Schedule, JobDef> result = null;
+				JobDef jobDef = null;
+				Schedule schedule = null;
 				try {
-					JobDef jobDef = get(session, jobDefId, JobDef.class);
-					Schedule schedule = get(session, scheduleId, Schedule.class);
-					result = new Tuple<Schedule, JobDef>(schedule, jobDef);
+					jobDef = get(session, jobDefId, JobDef.class);
+					schedule = get(session, scheduleId, Schedule.class);
 				} catch (SchedulerException e) {
 					// One of data must have been deleted. This method will purposely allow this and return null.
 					// So do nothing here.
 				}
-				return result;
+				return new Tuple<Schedule, JobDef>(schedule, jobDef);
 			}
 		});
 	}
-
+	@Override
+	public void deleteDeadSchedules(final Long schedulerId, final int maxCount) {
+		hbmSessionTemplate.withSession(new SessionAction() {
+			@Override
+			public void onSession(Session session) {
+				// Get a scheduler wide lock before operation.
+				LockOptions lockOptions = new LockOptions(LockMode.PESSIMISTIC_WRITE);
+				session.get(SchedulerData.class, schedulerId, lockOptions);
+				
+				logger.debug("Delete dead schedules.");
+				// TODO: Can we improve this DELETE query? the sub-select might cause performance issue.
+				// Remove any dead Schedule
+				String query = "delete Schedule e where e.deleted = :deleted and e.jobDefId in (from JobDef e2 where e2.schedulerId = :schedulerId) order by e.nextRun asc";
+				int removedCount = session.createQuery(query)
+						.setParameter("schedulerId", schedulerId)
+						.setParameter("deleted", true)
+						.setMaxResults(maxCount)
+						.executeUpdate();
+				logger.debug("Removed {} Schedules that were marked for deletion.", removedCount);
+				
+			}
+		});
+	}
 }

timemachine-scheduler/src/main/java/timemachine/scheduler/service/DataStore.java

 	public Date findEarliestScheduleRunTime(Long schedulerId);
 	public void updateMissedRunSchedules(Long schedulerId, int maxCount, Date fromTime);
 	public Tuple<Schedule, JobDef> getScheduleWithJobDef(Long schedulerId, Long scheduleId, Long jobDefId);
+	public void deleteDeadSchedules(Long schedulerId, int maxCount);
 	
 }

timemachine-scheduler/src/main/java/timemachine/scheduler/service/MemoryDataStore.java

 
 		// Query for the schedules to be run
 		List<Schedule> schedulesToRun = new ArrayList<Schedule>();
-		List<Schedule> schedulesToDel = new ArrayList<Schedule>();
 		Iterator<Schedule> scheduleItr = nodeStore.getSortedSchedules().iterator();
 		while (schedulesToRun.size() < maxCount && scheduleItr.hasNext()) {
 			Schedule schedule = scheduleItr.next();
 			Date nextRun = schedule.getNextRun();
 			if (schedule.isDeleted() && schedule.getState() != State.STAGING && schedule.getState() != State.RUNNING) {
-				schedulesToDel.add(schedule);
 				continue;
 			} else if (nextRun == null && schedule.getState() != State.STAGING && schedule.getState() != State.RUNNING) {
-				schedulesToDel.add(schedule);
 				continue;
 			} else if (schedule.getState() != Schedule.State.WAITING) {
 				continue;
 			schedulesToRun.add(schedule);
 		}
 		
-		// Delete schedules that are dead
-		if (schedulesToDel.size() > 0)
-			for (Schedule schedule : schedulesToDel) {
-				deleteSchedule(schedulerId, schedule);
-			}
-
 		// Moving the to run schedules to the STAGING state.
 		if (schedulesToRun.size() > 0)
 			for (Schedule schedule : schedulesToRun) {
 			result = new Date(Long.MAX_VALUE);
 		return result;
 	}
+	
 	@Override
 	synchronized public Tuple<Schedule, JobDef> getScheduleWithJobDef(Long schedulerId, Long scheduleId, Long jobDefId) {
 		SchedulerNodeStore nodeStore = getSchedulerNodeStore(schedulerId);
 		JobDef jobDef = nodeStore.getJobDefs().get(jobDefId);
 		Schedule schedule = nodeStore.getSchedules().get(scheduleId);
-		
-		if (jobDef == null || schedule == null)
-			return null;
-				
 		return new Tuple<Schedule, JobDef>(schedule, jobDef);
 	}
+	
+	@Override
+	synchronized public void deleteDeadSchedules(Long schedulerId, int maxCount) {
+		SchedulerNodeStore nodeStore = getSchedulerNodeStore(schedulerId);
+
+		List<Schedule> schedulesToDel = new ArrayList<Schedule>();
+		Iterator<Schedule> scheduleItr = nodeStore.getSortedSchedules().iterator();
+		while (schedulesToDel.size() < maxCount && scheduleItr.hasNext()) {
+			Schedule schedule = scheduleItr.next();
+			Date nextRun = schedule.getNextRun();
+			if (schedule.isDeleted() && schedule.getState() != State.STAGING && schedule.getState() != State.RUNNING) {
+				schedulesToDel.add(schedule);
+			} else if (nextRun == null && schedule.getState() != State.STAGING && schedule.getState() != State.RUNNING) {
+				schedulesToDel.add(schedule);
+			}
+		}
+		
+		// Delete schedules that are dead
+		if (schedulesToDel.size() > 0) {
+			logger.debug("Deleting {} dead schedules.", schedulesToDel.size());
+			for (Schedule schedule : schedulesToDel) {
+				deleteSchedule(schedulerId, schedule);
+			}
+		}
+	}
 }

timemachine-scheduler/src/main/java/timemachine/scheduler/service/PollingScheduleRunner.java

 	private AtomicBoolean running = new AtomicBoolean(false);
 	private long pollingInterval; // In millis sec
 	private int maxSchedulesPerInterval;
+	private boolean disableMissedRun;
+	private boolean disableDeadScheduleRemoval;
 	
 	private class JobRunner implements Runnable {
 		private JobContext jobContext;
 					jobListenerNotifier.onJobRunException(jobContext, e);
 				}
 				
-				logger.debug("Updating {} after a jobTask run.", schedule);
-
 				// Reload schedule's JobDef from store and ensure all data still exists after above wait time.
 				Tuple<Schedule, JobDef> scheduleJobDef = dataStore.getScheduleWithJobDef(schedulerId, schedule.getId(), 
-						schedule.getJobDefId());
-				if (scheduleJobDef == null) {
-					logger.debug("Staged {} no longer exists after job task. It will not be updated.", schedule);
-					if (dataStore.existsSchedule(schedulerId, schedule.getId())) {
-						schedule.setState(Schedule.State.WAITING);
-						dataStore.updateSchedule(schedulerId, schedule);
-					}
+						schedule.getJobDefId());				
+				schedule = scheduleJobDef.getItem1();
+				jobDef = scheduleJobDef.getItem2();				
+				if (jobDef == null || schedule == null) {
+					logger.debug("No action after a jobTask run: {} or {} no longer exists.", schedule, jobDef);
 					return;
 				}
 				
-				schedule = scheduleJobDef.getItem1(); // reload it.
-				jobDef = scheduleJobDef.getItem2();
 				if (schedule.isDeleted()) {
+					logger.debug("Deleting {} after a jobTask run: no more next run.", schedule);
 					dataStore.deleteSchedule(schedulerId, schedule);
-				} else if (schedule.getState() == State.RUNNING) {
+				} else {
+					logger.debug("Updating {} after a jobTask run.", schedule);
 					schedule.updateNextRun();
 					Date nextRun = schedule.getNextRun();
 					if (nextRun == null) {
 						dataStore.deleteSchedule(schedulerId, schedule);
 					} else {
 						// We finish the running the job above and we need to update Schedule back to WAITING state.
-						schedule.setState(State.WAITING);
+						// Note: we could have schedule that are in PAUSE mode, in that case do not change the state.
+						if (schedule.getState() == State.RUNNING)
+							schedule.setState(State.WAITING);
 						dataStore.updateSchedule(schedulerId, schedule);
 					}
 				}
 	
 	@Override
 	protected void initService() {		
-		pollingInterval = configProps.getLong("timemachine.scheduler.scheduleRunner.pollingInterval");		
+		pollingInterval = configProps.getLong("timemachine.scheduler.scheduleRunner.pollingInterval");
 		maxSchedulesPerInterval = configProps.getInt("timemachine.scheduler.scheduleRunner.maxSchedulesPerInterval");
+		disableMissedRun = configProps.getBoolean("timemachine.scheduler.scheduleRunner.disableMissedRun");
+		disableDeadScheduleRemoval = configProps.getBoolean("timemachine.scheduler.scheduleRunner.disableDeadScheduleRemoval");		
 		
 		if (maxSchedulesPerInterval <= 0) {
 			maxSchedulesPerInterval = findSmallestThreadPoolSize();
 		}
 	}
 
-	private void checkScheduleForMissedRun() {
+	private void checkScheduleForMissedRun() {		
 		logger.debug("Checking schedules for missed runs.");
 		Date fromTime = new Date(System.currentTimeMillis() - pollingInterval);
 		dataStore.updateMissedRunSchedules(scheduler.getSchedulerId(), maxSchedulesPerInterval, fromTime);
 	
 				if (!started.get())
 					break;
-	
-				checkScheduleForMissedRun();
+
+				if (!disableMissedRun)
+					checkScheduleForMissedRun();
+				
+				if (!disableDeadScheduleRemoval)
+					dataStore.deleteDeadSchedules(scheduler.getSchedulerId(), maxSchedulesPerInterval);
+				
 				int scheduleSubmittedCount = checkSchedulesToRun();
 				
 				// The checkSchedulesToRun might takes a while, so we might check to see if we are in pause or start mode.

timemachine-scheduler/src/main/resources/timemachine/scheduler/default.properties

 timemachine.scheduler.scheduleRunner.class = timemachine.scheduler.service.PollingScheduleRunner
 timemachine.scheduler.scheduleRunner.pollingInterval = 1000
 timemachine.scheduler.scheduleRunner.maxSchedulesPerInterval = -1
+timemachine.scheduler.scheduleRunner.disableMissedRun = false
+timemachine.scheduler.scheduleRunner.disableDeadScheduleRemoval = false
 timemachine.scheduler.jobListenerNotifier.class = timemachine.scheduler.service.JobListenerNotifier
 timemachine.scheduler.schedulerContextService.class = timemachine.scheduler.service.SchedulerContextService
 timemachine.scheduler.jobTaskPoolNameResolver.class = timemachine.scheduler.service.SimpleJobTaskPoolNameResolver