Snippets

Francesco Stefanini atlassian-scheduler-compat 1.2

Created by Francesco Stefanini
package com.atlassian.scheduler.compat.clustered;

import java.util.Date;

import javax.annotation.Nullable;

import com.atlassian.scheduler.JobRunner;
import com.atlassian.scheduler.SchedulerService;
import com.atlassian.scheduler.SchedulerServiceException;
import com.atlassian.scheduler.compat.CompatibilityPluginScheduler;
import com.atlassian.scheduler.compat.JobHandler;
import com.atlassian.scheduler.compat.JobHandlerKey;
import com.atlassian.scheduler.compat.JobInfo;
import com.atlassian.scheduler.config.IntervalScheduleInfo;
import com.atlassian.scheduler.config.JobConfig;
import com.atlassian.scheduler.config.JobId;
import com.atlassian.scheduler.config.JobRunnerKey;
import com.atlassian.scheduler.config.RunMode;
import com.atlassian.scheduler.config.Schedule;
import com.atlassian.scheduler.status.JobDetails;

import com.google.common.annotations.VisibleForTesting;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.atlassian.sal.api.component.ComponentLocator.getComponent;
import static com.atlassian.scheduler.config.RunMode.RUN_ONCE_PER_CLUSTER;
import static com.atlassian.scheduler.config.Schedule.Type.INTERVAL;
import static com.atlassian.scheduler.config.Schedule.forInterval;
import static com.atlassian.util.concurrent.Assertions.notNull;

/**
 * Implements the compatibility plugin scheduler by delegating to the atlassian-scheduler library.
 * <p>
 * This implementation maps {@link JobHandler} to {@link JobRunner}, the scheduling interval in
 * {@link #scheduleClusteredJob(String, JobHandlerKey, Date, long)} to
 * {@link Schedule#forInterval(long, Date)}, and so on.  The end result is that everything is
 * scheduled as if {@link RunMode#RUN_ONCE_PER_CLUSTER} jobs with
 * {@link Schedule#forInterval(long, Date) interval schedules} were created directly with the
 * {@code atlassian-scheduler} API.
 * </p>
 *
 * @since v1.0
 */
public class ClusteredCompatibilityPluginScheduler implements CompatibilityPluginScheduler
{
    private static final Logger LOG = LoggerFactory.getLogger(ClusteredCompatibilityPluginScheduler.class);

    private static final String JOB_RUNNER_KEY_PREFIX = "CompatibilityPluginScheduler.JobRunnerKey.";
    private static final String JOB_ID_PREFIX = "CompatibilityPluginScheduler.JobId.";

    private final SchedulerService schedulerService;



    @SuppressWarnings("UnusedDeclaration")  // lies
    public ClusteredCompatibilityPluginScheduler()
    {
        this(getComponent(SchedulerService.class));
    }

    @VisibleForTesting
    ClusteredCompatibilityPluginScheduler(SchedulerService schedulerService)
    {
        this.schedulerService = notNull("schedulerService", schedulerService);
    }



    @Override
    public void registerJobHandler(JobHandlerKey jobHandlerKey, JobHandler jobHandler)
    {
        notNull("jobHandlerKey", jobHandlerKey);
        notNull("jobHandler", jobHandler);

        final JobRunnerKey jobRunnerKey = toJobRunnerKey(jobHandlerKey);
        if (isRegistered(jobRunnerKey))
        {
            throw new IllegalArgumentException("Job handler already registered: " + jobHandlerKey);
        }
        schedulerService.registerJobRunner(jobRunnerKey, new ClusteredJobRunner(jobHandler));
    }

    @Override
    public void unregisterJobHandler(JobHandlerKey jobHandlerKey)
    {
        schedulerService.unregisterJobRunner(toJobRunnerKey(jobHandlerKey));
    }

    @Override
    public void scheduleClusteredJob(String jobKey, JobHandlerKey jobHandlerKey,
            Date startTime, long repeatInterval)
    {
        // Explicitly reject scheduling a job without registering a runner for it first,
        // because it is actually *impossible* to do this in the other implementation and
        // consistency is more important than being nice.
        final JobRunnerKey jobRunnerKey = toJobRunnerKey(jobHandlerKey);
        if (!isRegistered(jobRunnerKey))
        {
            throw new IllegalArgumentException("Job handler not registered: " + jobHandlerKey);
        }

        final JobConfig jobConfig = JobConfig.forJobRunnerKey(jobRunnerKey)
                .withRunMode(RUN_ONCE_PER_CLUSTER)
                .withSchedule(forInterval(repeatInterval, startTime));
        try
        {
            schedulerService.scheduleJob(toJobId(jobKey), jobConfig);
        }
        catch (SchedulerServiceException sse)
        {
            LOG.error("Error scheduling job " + jobKey, sse);
        }
    }

    @Nullable
    @Override
    public JobInfo getJobInfo(String jobKey)
    {
        final JobDetails jobDetails = schedulerService.getJobDetails(toJobId(jobKey));
        if (jobDetails == null ||
                !jobDetails.isRunnable() ||
                /*jobDetails.getRunMode() != RUN_ONCE_PER_CLUSTER ||*/
                jobDetails.getSchedule().getType() != INTERVAL)
        {
            return null;
        }

        final IntervalScheduleInfo interval = jobDetails.getSchedule().getIntervalScheduleInfo();
        return new JobInfo(jobKey, toJobHandlerKey(jobDetails.getJobRunnerKey()),
                interval.getFirstRunTime(), interval.getIntervalInMillis());
    }

    @Override
    public void unscheduleClusteredJob(String jobKey)
    {
        schedulerService.unscheduleJob(toJobId(jobKey));
    }



    private boolean isRegistered(JobRunnerKey jobRunnerKey)
    {
        return schedulerService.getRegisteredJobRunnerKeys().contains(jobRunnerKey);
    }

    static JobRunnerKey toJobRunnerKey(JobHandlerKey jobHandlerKey)
    {
        return JobRunnerKey.of(JOB_RUNNER_KEY_PREFIX + notNull("jobHandlerKey", jobHandlerKey));
    }

    static JobId toJobId(String jobKey)
    {
        return JobId.of(JOB_ID_PREFIX + notNull("jobKey", jobKey));
    }

    static JobHandlerKey toJobHandlerKey(JobRunnerKey jobRunnerKey)
    {
        final String s = notNull("jobRunnerKey", jobRunnerKey).toString();
        if (s.length() > JOB_RUNNER_KEY_PREFIX.length() && s.startsWith(JOB_RUNNER_KEY_PREFIX))
        {
            return JobHandlerKey.of(s.substring(JOB_RUNNER_KEY_PREFIX.length()));
        }
        throw new IllegalArgumentException("Job runner key belongs to somebody else: " + jobRunnerKey);
    }

    static String toJobKey(JobId jobId)
    {
        final String s = notNull("jobId", jobId).toString();
        if (s.length() > JOB_ID_PREFIX.length() && s.startsWith(JOB_ID_PREFIX))
        {
            return s.substring(JOB_ID_PREFIX.length());
        }
        throw new IllegalArgumentException("Job ID belongs to somebody else: " + jobId);
    }
}
package com.atlassian.scheduler.compat.clustered;

import com.atlassian.scheduler.JobRunner;
import com.atlassian.scheduler.JobRunnerRequest;
import com.atlassian.scheduler.JobRunnerResponse;
import com.atlassian.scheduler.compat.JobHandler;
import com.atlassian.scheduler.compat.JobHandlerKey;
import com.atlassian.scheduler.compat.JobInfo;
import com.atlassian.scheduler.config.IntervalScheduleInfo;
import com.atlassian.scheduler.config.JobConfig;
import com.atlassian.scheduler.config.JobRunnerKey;
import com.atlassian.scheduler.config.RunMode;
import com.atlassian.scheduler.config.Schedule;

import static com.atlassian.scheduler.compat.clustered.ClusteredCompatibilityPluginScheduler.toJobHandlerKey;
import static com.atlassian.scheduler.compat.clustered.ClusteredCompatibilityPluginScheduler.toJobKey;
import static com.atlassian.scheduler.compat.clustered.ClusteredCompatibilityPluginScheduler.toJobRunnerKey;
import static com.atlassian.util.concurrent.Assertions.notNull;

/**
 * @since v1.0
 */
public class ClusteredJobRunner implements JobRunner
{
    private final JobHandler delegate;



    public ClusteredJobRunner(JobHandler delegate)
    {
        this.delegate = notNull("delegate", delegate);
    }



    @Override
    public JobRunnerResponse runJob(JobRunnerRequest request)
    {
        final String jobKey = toJobKey(request.getJobId());
        final JobConfig jobConfig = request.getJobConfig();
        final JobHandlerKey jobHandlerKey = toJobHandlerKey(jobConfig.getJobRunnerKey());

        /*if (jobConfig.getRunMode() != RunMode.RUN_ONCE_PER_CLUSTER)
        {
            return JobRunnerResponse.aborted("Job has unexpected run mode " + jobConfig.getRunMode());
        }*/
        if (jobConfig.getSchedule().getType() != Schedule.Type.INTERVAL)
        {
            return JobRunnerResponse.aborted("Job has unexpected schedule type " + jobConfig.getSchedule().getType());
        }

        final IntervalScheduleInfo interval = jobConfig.getSchedule().getIntervalScheduleInfo();
        final JobInfo jobInfo = new JobInfo(jobKey, jobHandlerKey, interval.getFirstRunTime(), interval.getIntervalInMillis());
        delegate.execute(jobInfo);
        return JobRunnerResponse.success();
    }
}

Comments (0)

HTTPS SSH

You can clone a snippet to your computer for local editing. Learn more.