+package com.atlassian.scheduler.compat.clustered;
+import javax.annotation.Nullable;
+import com.atlassian.scheduler.JobRunner;
+import com.atlassian.scheduler.SchedulerService;
+import com.atlassian.scheduler.SchedulerServiceException;
+import com.atlassian.scheduler.compat.CompatibilityPluginScheduler;
+import com.atlassian.scheduler.compat.JobHandler;
+import com.atlassian.scheduler.compat.JobHandlerKey;
+import com.atlassian.scheduler.compat.JobInfo;
+import com.atlassian.scheduler.config.IntervalScheduleInfo;
+import com.atlassian.scheduler.config.JobConfig;
+import com.atlassian.scheduler.config.JobId;
+import com.atlassian.scheduler.config.JobRunnerKey;
+import com.atlassian.scheduler.config.RunMode;
+import com.atlassian.scheduler.config.Schedule;
+import com.atlassian.scheduler.status.JobDetails;
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import static com.atlassian.sal.api.component.ComponentLocator.getComponent;
+import static com.atlassian.scheduler.config.RunMode.RUN_ONCE_PER_CLUSTER;
+import static com.atlassian.scheduler.config.Schedule.Type.INTERVAL;
+import static com.atlassian.scheduler.config.Schedule.forInterval;
+import static com.atlassian.util.concurrent.Assertions.notNull;
+ * Implements the compatibility plugin scheduler by delegating to the atlassian-scheduler library.
+ * This implementation maps {@link JobHandler} to {@link JobRunner}, the scheduling interval in
+ * {@link #scheduleClusteredJob(String, JobHandlerKey, Date, long)} to
+ * {@link Schedule#forInterval(long, Date)}, and so on. The end result is that everything is
+ * scheduled as if {@link RunMode#RUN_ONCE_PER_CLUSTER} jobs with
+ * {@link Schedule#forInterval(long, Date) interval schedules} were created directly with the
+ * {@code atlassian-scheduler} API.
+public class ClusteredCompatibilityPluginScheduler implements CompatibilityPluginScheduler
+ private static final Logger LOG = LoggerFactory.getLogger(ClusteredCompatibilityPluginScheduler.class);
+ private static final String JOB_RUNNER_KEY_PREFIX = "CompatibilityPluginScheduler.JobRunnerKey.";
+ private static final String JOB_ID_PREFIX = "CompatibilityPluginScheduler.JobId.";
+ private final SchedulerService schedulerService;
+ @SuppressWarnings("UnusedDeclaration") // lies
+ public ClusteredCompatibilityPluginScheduler()
+ this(getComponent(SchedulerService.class));
+ ClusteredCompatibilityPluginScheduler(SchedulerService schedulerService)
+ this.schedulerService = notNull("schedulerService", schedulerService);
+ public void registerJobHandler(JobHandlerKey jobHandlerKey, JobHandler jobHandler)
+ notNull("jobHandlerKey", jobHandlerKey);
+ notNull("jobHandler", jobHandler);
+ final JobRunnerKey jobRunnerKey = toJobRunnerKey(jobHandlerKey);
+ if (isRegistered(jobRunnerKey))
+ throw new IllegalArgumentException("Job handler already registered: " + jobHandlerKey);
+ schedulerService.registerJobRunner(jobRunnerKey, new ClusteredJobRunner(jobHandler));
+ public void unregisterJobHandler(JobHandlerKey jobHandlerKey)
+ schedulerService.unregisterJobRunner(toJobRunnerKey(jobHandlerKey));
+ public void scheduleClusteredJob(String jobKey, JobHandlerKey jobHandlerKey,
+ Date startTime, long repeatInterval)
+ // Explicitly reject scheduling a job without registering a runner for it first,
+ // because it is actually *impossible* to do this in the other implementation and
+ // consistency is more important than being nice.
+ final JobRunnerKey jobRunnerKey = toJobRunnerKey(jobHandlerKey);
+ if (!isRegistered(jobRunnerKey))
+ throw new IllegalArgumentException("Job handler not registered: " + jobHandlerKey);
+ final JobConfig jobConfig = JobConfig.forJobRunnerKey(jobRunnerKey)
+ .withRunMode(RUN_ONCE_PER_CLUSTER)
+ .withSchedule(forInterval(repeatInterval, startTime));
+ schedulerService.scheduleJob(toJobId(jobKey), jobConfig);
+ catch (SchedulerServiceException sse)
+ LOG.error("Error scheduling job " + jobKey, sse);
+ public JobInfo getJobInfo(String jobKey)
+ final JobDetails jobDetails = schedulerService.getJobDetails(toJobId(jobKey));
+ if (jobDetails == null ||
+ !jobDetails.isRunnable() ||
+ /*jobDetails.getRunMode() != RUN_ONCE_PER_CLUSTER ||*/
+ jobDetails.getSchedule().getType() != INTERVAL)
+ final IntervalScheduleInfo interval = jobDetails.getSchedule().getIntervalScheduleInfo();
+ return new JobInfo(jobKey, toJobHandlerKey(jobDetails.getJobRunnerKey()),
+ interval.getFirstRunTime(), interval.getIntervalInMillis());
+ public void unscheduleClusteredJob(String jobKey)
+ schedulerService.unscheduleJob(toJobId(jobKey));
+ private boolean isRegistered(JobRunnerKey jobRunnerKey)
+ return schedulerService.getRegisteredJobRunnerKeys().contains(jobRunnerKey);
+ static JobRunnerKey toJobRunnerKey(JobHandlerKey jobHandlerKey)
+ return JobRunnerKey.of(JOB_RUNNER_KEY_PREFIX + notNull("jobHandlerKey", jobHandlerKey));
+ static JobId toJobId(String jobKey)
+ return JobId.of(JOB_ID_PREFIX + notNull("jobKey", jobKey));
+ static JobHandlerKey toJobHandlerKey(JobRunnerKey jobRunnerKey)
+ final String s = notNull("jobRunnerKey", jobRunnerKey).toString();
+ if (s.length() > JOB_RUNNER_KEY_PREFIX.length() && s.startsWith(JOB_RUNNER_KEY_PREFIX))
+ return JobHandlerKey.of(s.substring(JOB_RUNNER_KEY_PREFIX.length()));
+ throw new IllegalArgumentException("Job runner key belongs to somebody else: " + jobRunnerKey);
+ static String toJobKey(JobId jobId)
+ final String s = notNull("jobId", jobId).toString();
+ if (s.length() > JOB_ID_PREFIX.length() && s.startsWith(JOB_ID_PREFIX))
+ return s.substring(JOB_ID_PREFIX.length());
+ throw new IllegalArgumentException("Job ID belongs to somebody else: " + jobId);