LimitedCurrentAverageExecutionTimeStrategy.java

package net.secodo.jcircuitbreaker.breakstrategy.impl;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;

import net.secodo.jcircuitbreaker.breaker.execution.ExecutedTask;
import net.secodo.jcircuitbreaker.breaker.execution.ExecutionContext;
import net.secodo.jcircuitbreaker.breakstrategy.BreakStrategy;
import net.secodo.jcircuitbreaker.task.Task;
import net.secodo.jcircuitbreaker.util.TimeUtil;


/**
 * An implementation of {@link BreakStrategy} which causes the circuit breaker to break (not execute the Task) in
 * case all tasks currently running inside the circuit breaker takes long time on average.
 *
 * <p>In constructor you define (in milliseconds) how long on average you expect the execution of single Task to take.
 * If on average all currently running tasks take longer than this strategy will decide to break (not to execute
 * another Task).
 *
 * <p>You may additionally define {@link #percentageOfMaxTimesToSkip} which is how many percent of the executions with
 * highest execution time (longest executions) should not be considered when taking calculating the average.
 *
 * <p>For example this strategy:
 *
 * <p>new LimitedCurrentAverageExecutionTimeStrategy(1000, 10);
 *
 * <p>breaks (not allows execution of target-method) in case 90% of other executions (currently in
 * progress) takes more than one  is 1 second (1000 ms). Example figures: there are 20 executions (threads)
 * currently going through circuit breaker. 2 are running for 6 seconds, 18 are running for 900 milliseconds. We skip
 * 10 % of longest executions - this is 2 executions (10% * 20  = 2) and we calculate average from the remaining 18
 * executions. The average is 900 ms. This is less than allowed maximum = 1000 ms therefore next execution will be
 * allowed.
 *
 * <p>NOTE: in above example, if the average was above 1000ms than this strategy would return false to circuit breaker
 * and circuit breaker would execute {@link net.secodo.jcircuitbreaker.breakhandler.BreakHandler} to handle this
 * situation
 *
 * @param <R> the return type of <i>real-method</i> that is executed by circuit breaker
 */
public class LimitedCurrentAverageExecutionTimeStrategy<R> implements BreakStrategy<R> {
  private final TimeUtil timeUtil;

  /**
   * Protected access to allow extending classes to dynamically change it,
   * e.g. via MBean
   */
  protected float maxAllowedExecutionTimeMillis;

  /**
   * Protected access to allow extending classes to dynamically change it,
   * e.g. via MBean
   */
  protected float percentageOfMaxTimesToSkip;

  /**
   * Constructs new strategy with given maximum allowed execution. This is the execution of the threads which are
   * currently going through the circuit breaker.
   *
   * @param maxAllowedAverageExecutionTimeMillis max time which does not result in "break" (which allows execution of
   *                                             target-method)
   * @param percentageOfLongestTimesToSkip       how many percent of longest times should not be considered when
   *                                             calculating the average
   */
  public LimitedCurrentAverageExecutionTimeStrategy(long maxAllowedAverageExecutionTimeMillis,
                                                    int percentageOfLongestTimesToSkip) {
    this.maxAllowedExecutionTimeMillis = maxAllowedAverageExecutionTimeMillis;
    timeUtil = new TimeUtil();

    this.percentageOfMaxTimesToSkip = ((percentageOfLongestTimesToSkip >= 0) && (percentageOfLongestTimesToSkip <= 100))
      ? (0.01f * (float) percentageOfLongestTimesToSkip) : 0;
  }


  /**
   * Constructs new strategy with given maximum allowed execution. This is the execution of the threads which are
   * currently going through the circuit breaker.
   *
   * @param maxAllowedAverageExecutionTimeMillis max time which does not result in "break" (which allows execution of
   */
  public LimitedCurrentAverageExecutionTimeStrategy(long maxAllowedAverageExecutionTimeMillis) {
    this(maxAllowedAverageExecutionTimeMillis, 0);
  }

  @Override
  public boolean shouldBreak(Task<R> task, ExecutionContext<R> executionContext) {
    final Collection<ExecutedTask<R>> executionsInProgress = executionContext.getExecutionsInProgress();

    if (executionsInProgress.isEmpty()) {
      return false;
    }

    final List<Long> startTimestamps = executionsInProgress.stream()
      .map(ExecutedTask::getExecutionStartedTimestamp)
      .collect(Collectors.toList());

    Collections.sort(startTimestamps);

    final List<Long> interestingTimestamps = startTimestamps.subList(0,
      (int) ((1f - percentageOfMaxTimesToSkip) * (float) startTimestamps.size()));

    if (interestingTimestamps.isEmpty()) {
      return false;
    }


    final long currentTime = timeUtil.getCurrentTimeMilis();

    final Long sumOfInterestingTimestamps = interestingTimestamps.stream().mapToLong(t -> currentTime - t).sum();


    final float currentAverageExecutionTime = (float) sumOfInterestingTimestamps / (float) interestingTimestamps.size();


    return currentAverageExecutionTime > maxAllowedExecutionTimeMillis;
  }
}