Snippets

Francesco Stefanini atlassian-scheduler-compat 1.2

Created by Francesco Stefanini

File ClusteredCompatibilityPluginScheduler.java Added

  • Ignore whitespace
  • Hide word diff
+package com.atlassian.scheduler.compat.clustered;
+
+import java.util.Date;
+
+import javax.annotation.Nullable;
+
+import com.atlassian.scheduler.JobRunner;
+import com.atlassian.scheduler.SchedulerService;
+import com.atlassian.scheduler.SchedulerServiceException;
+import com.atlassian.scheduler.compat.CompatibilityPluginScheduler;
+import com.atlassian.scheduler.compat.JobHandler;
+import com.atlassian.scheduler.compat.JobHandlerKey;
+import com.atlassian.scheduler.compat.JobInfo;
+import com.atlassian.scheduler.config.IntervalScheduleInfo;
+import com.atlassian.scheduler.config.JobConfig;
+import com.atlassian.scheduler.config.JobId;
+import com.atlassian.scheduler.config.JobRunnerKey;
+import com.atlassian.scheduler.config.RunMode;
+import com.atlassian.scheduler.config.Schedule;
+import com.atlassian.scheduler.status.JobDetails;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static com.atlassian.sal.api.component.ComponentLocator.getComponent;
+import static com.atlassian.scheduler.config.RunMode.RUN_ONCE_PER_CLUSTER;
+import static com.atlassian.scheduler.config.Schedule.Type.INTERVAL;
+import static com.atlassian.scheduler.config.Schedule.forInterval;
+import static com.atlassian.util.concurrent.Assertions.notNull;
+
+/**
+ * Implements the compatibility plugin scheduler by delegating to the atlassian-scheduler library.
+ * <p>
+ * This implementation maps {@link JobHandler} to {@link JobRunner}, the scheduling interval in
+ * {@link #scheduleClusteredJob(String, JobHandlerKey, Date, long)} to
+ * {@link Schedule#forInterval(long, Date)}, and so on.  The end result is that everything is
+ * scheduled as if {@link RunMode#RUN_ONCE_PER_CLUSTER} jobs with
+ * {@link Schedule#forInterval(long, Date) interval schedules} were created directly with the
+ * {@code atlassian-scheduler} API.
+ * </p>
+ *
+ * @since v1.0
+ */
+public class ClusteredCompatibilityPluginScheduler implements CompatibilityPluginScheduler
+{
+    private static final Logger LOG = LoggerFactory.getLogger(ClusteredCompatibilityPluginScheduler.class);
+
+    private static final String JOB_RUNNER_KEY_PREFIX = "CompatibilityPluginScheduler.JobRunnerKey.";
+    private static final String JOB_ID_PREFIX = "CompatibilityPluginScheduler.JobId.";
+
+    private final SchedulerService schedulerService;
+
+
+
+    @SuppressWarnings("UnusedDeclaration")  // lies
+    public ClusteredCompatibilityPluginScheduler()
+    {
+        this(getComponent(SchedulerService.class));
+    }
+
+    @VisibleForTesting
+    ClusteredCompatibilityPluginScheduler(SchedulerService schedulerService)
+    {
+        this.schedulerService = notNull("schedulerService", schedulerService);
+    }
+
+
+
+    @Override
+    public void registerJobHandler(JobHandlerKey jobHandlerKey, JobHandler jobHandler)
+    {
+        notNull("jobHandlerKey", jobHandlerKey);
+        notNull("jobHandler", jobHandler);
+
+        final JobRunnerKey jobRunnerKey = toJobRunnerKey(jobHandlerKey);
+        if (isRegistered(jobRunnerKey))
+        {
+            throw new IllegalArgumentException("Job handler already registered: " + jobHandlerKey);
+        }
+        schedulerService.registerJobRunner(jobRunnerKey, new ClusteredJobRunner(jobHandler));
+    }
+
+    @Override
+    public void unregisterJobHandler(JobHandlerKey jobHandlerKey)
+    {
+        schedulerService.unregisterJobRunner(toJobRunnerKey(jobHandlerKey));
+    }
+
+    @Override
+    public void scheduleClusteredJob(String jobKey, JobHandlerKey jobHandlerKey,
+            Date startTime, long repeatInterval)
+    {
+        // Explicitly reject scheduling a job without registering a runner for it first,
+        // because it is actually *impossible* to do this in the other implementation and
+        // consistency is more important than being nice.
+        final JobRunnerKey jobRunnerKey = toJobRunnerKey(jobHandlerKey);
+        if (!isRegistered(jobRunnerKey))
+        {
+            throw new IllegalArgumentException("Job handler not registered: " + jobHandlerKey);
+        }
+
+        final JobConfig jobConfig = JobConfig.forJobRunnerKey(jobRunnerKey)
+                .withRunMode(RUN_ONCE_PER_CLUSTER)
+                .withSchedule(forInterval(repeatInterval, startTime));
+        try
+        {
+            schedulerService.scheduleJob(toJobId(jobKey), jobConfig);
+        }
+        catch (SchedulerServiceException sse)
+        {
+            LOG.error("Error scheduling job " + jobKey, sse);
+        }
+    }
+
+    @Nullable
+    @Override
+    public JobInfo getJobInfo(String jobKey)
+    {
+        final JobDetails jobDetails = schedulerService.getJobDetails(toJobId(jobKey));
+        if (jobDetails == null ||
+                !jobDetails.isRunnable() ||
+                /*jobDetails.getRunMode() != RUN_ONCE_PER_CLUSTER ||*/
+                jobDetails.getSchedule().getType() != INTERVAL)
+        {
+            return null;
+        }
+
+        final IntervalScheduleInfo interval = jobDetails.getSchedule().getIntervalScheduleInfo();
+        return new JobInfo(jobKey, toJobHandlerKey(jobDetails.getJobRunnerKey()),
+                interval.getFirstRunTime(), interval.getIntervalInMillis());
+    }
+
+    @Override
+    public void unscheduleClusteredJob(String jobKey)
+    {
+        schedulerService.unscheduleJob(toJobId(jobKey));
+    }
+
+
+
+    private boolean isRegistered(JobRunnerKey jobRunnerKey)
+    {
+        return schedulerService.getRegisteredJobRunnerKeys().contains(jobRunnerKey);
+    }
+
+    static JobRunnerKey toJobRunnerKey(JobHandlerKey jobHandlerKey)
+    {
+        return JobRunnerKey.of(JOB_RUNNER_KEY_PREFIX + notNull("jobHandlerKey", jobHandlerKey));
+    }
+
+    static JobId toJobId(String jobKey)
+    {
+        return JobId.of(JOB_ID_PREFIX + notNull("jobKey", jobKey));
+    }
+
+    static JobHandlerKey toJobHandlerKey(JobRunnerKey jobRunnerKey)
+    {
+        final String s = notNull("jobRunnerKey", jobRunnerKey).toString();
+        if (s.length() > JOB_RUNNER_KEY_PREFIX.length() && s.startsWith(JOB_RUNNER_KEY_PREFIX))
+        {
+            return JobHandlerKey.of(s.substring(JOB_RUNNER_KEY_PREFIX.length()));
+        }
+        throw new IllegalArgumentException("Job runner key belongs to somebody else: " + jobRunnerKey);
+    }
+
+    static String toJobKey(JobId jobId)
+    {
+        final String s = notNull("jobId", jobId).toString();
+        if (s.length() > JOB_ID_PREFIX.length() && s.startsWith(JOB_ID_PREFIX))
+        {
+            return s.substring(JOB_ID_PREFIX.length());
+        }
+        throw new IllegalArgumentException("Job ID belongs to somebody else: " + jobId);
+    }
+}

File ClusteredJobRunner.java Added

  • Ignore whitespace
  • Hide word diff
+package com.atlassian.scheduler.compat.clustered;
+
+import com.atlassian.scheduler.JobRunner;
+import com.atlassian.scheduler.JobRunnerRequest;
+import com.atlassian.scheduler.JobRunnerResponse;
+import com.atlassian.scheduler.compat.JobHandler;
+import com.atlassian.scheduler.compat.JobHandlerKey;
+import com.atlassian.scheduler.compat.JobInfo;
+import com.atlassian.scheduler.config.IntervalScheduleInfo;
+import com.atlassian.scheduler.config.JobConfig;
+import com.atlassian.scheduler.config.JobRunnerKey;
+import com.atlassian.scheduler.config.RunMode;
+import com.atlassian.scheduler.config.Schedule;
+
+import static com.atlassian.scheduler.compat.clustered.ClusteredCompatibilityPluginScheduler.toJobHandlerKey;
+import static com.atlassian.scheduler.compat.clustered.ClusteredCompatibilityPluginScheduler.toJobKey;
+import static com.atlassian.scheduler.compat.clustered.ClusteredCompatibilityPluginScheduler.toJobRunnerKey;
+import static com.atlassian.util.concurrent.Assertions.notNull;
+
+/**
+ * @since v1.0
+ */
+public class ClusteredJobRunner implements JobRunner
+{
+    private final JobHandler delegate;
+
+
+
+    public ClusteredJobRunner(JobHandler delegate)
+    {
+        this.delegate = notNull("delegate", delegate);
+    }
+
+
+
+    @Override
+    public JobRunnerResponse runJob(JobRunnerRequest request)
+    {
+        final String jobKey = toJobKey(request.getJobId());
+        final JobConfig jobConfig = request.getJobConfig();
+        final JobHandlerKey jobHandlerKey = toJobHandlerKey(jobConfig.getJobRunnerKey());
+
+        /*if (jobConfig.getRunMode() != RunMode.RUN_ONCE_PER_CLUSTER)
+        {
+            return JobRunnerResponse.aborted("Job has unexpected run mode " + jobConfig.getRunMode());
+        }*/
+        if (jobConfig.getSchedule().getType() != Schedule.Type.INTERVAL)
+        {
+            return JobRunnerResponse.aborted("Job has unexpected schedule type " + jobConfig.getSchedule().getType());
+        }
+
+        final IntervalScheduleInfo interval = jobConfig.getSchedule().getIntervalScheduleInfo();
+        final JobInfo jobInfo = new JobInfo(jobKey, jobHandlerKey, interval.getFirstRunTime(), interval.getIntervalInMillis());
+        delegate.execute(jobInfo);
+        return JobRunnerResponse.success();
+    }
+}
HTTPS SSH

You can clone a snippet to your computer for local editing. Learn more.