LimitedCurrentAverageExecutionTimeStrategy.java
package net.secodo.jcircuitbreaker.breakstrategy.impl;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import net.secodo.jcircuitbreaker.breaker.execution.ExecutedTask;
import net.secodo.jcircuitbreaker.breaker.execution.ExecutionContext;
import net.secodo.jcircuitbreaker.breakstrategy.BreakStrategy;
import net.secodo.jcircuitbreaker.task.Task;
import net.secodo.jcircuitbreaker.util.TimeUtil;
/**
* An implementation of {@link BreakStrategy} which causes the circuit breaker to break (not execute the Task) in
* case all tasks currently running inside the circuit breaker takes long time on average.
*
* <p>In constructor you define (in milliseconds) how long on average you expect the execution of single Task to take.
* If on average all currently running tasks take longer than this strategy will decide to break (not to execute
* another Task).
*
* <p>You may additionally define {@link #percentageOfMaxTimesToSkip} which is how many percent of the executions with
* highest execution time (longest executions) should not be considered when taking calculating the average.
*
* <p>For example this strategy:
*
* <p>new LimitedCurrentAverageExecutionTimeStrategy(1000, 10);
*
* <p>breaks (not allows execution of target-method) in case 90% of other executions (currently in
* progress) takes more than one is 1 second (1000 ms). Example figures: there are 20 executions (threads)
* currently going through circuit breaker. 2 are running for 6 seconds, 18 are running for 900 milliseconds. We skip
* 10 % of longest executions - this is 2 executions (10% * 20 = 2) and we calculate average from the remaining 18
* executions. The average is 900 ms. This is less than allowed maximum = 1000 ms therefore next execution will be
* allowed.
*
* <p>NOTE: in above example, if the average was above 1000ms than this strategy would return false to circuit breaker
* and circuit breaker would execute {@link net.secodo.jcircuitbreaker.breakhandler.BreakHandler} to handle this
* situation
*
* @param <R> the return type of <i>real-method</i> that is executed by circuit breaker
*/
public class LimitedCurrentAverageExecutionTimeStrategy<R> implements BreakStrategy<R> {
private final TimeUtil timeUtil;
/**
* Protected access to allow extending classes to dynamically change it,
* e.g. via MBean
*/
protected float maxAllowedExecutionTimeMillis;
/**
* Protected access to allow extending classes to dynamically change it,
* e.g. via MBean
*/
protected float percentageOfMaxTimesToSkip;
/**
* Constructs new strategy with given maximum allowed execution. This is the execution of the threads which are
* currently going through the circuit breaker.
*
* @param maxAllowedAverageExecutionTimeMillis max time which does not result in "break" (which allows execution of
* target-method)
* @param percentageOfLongestTimesToSkip how many percent of longest times should not be considered when
* calculating the average
*/
public LimitedCurrentAverageExecutionTimeStrategy(long maxAllowedAverageExecutionTimeMillis,
int percentageOfLongestTimesToSkip) {
this.maxAllowedExecutionTimeMillis = maxAllowedAverageExecutionTimeMillis;
timeUtil = new TimeUtil();
this.percentageOfMaxTimesToSkip = ((percentageOfLongestTimesToSkip >= 0) && (percentageOfLongestTimesToSkip <= 100))
? (0.01f * (float) percentageOfLongestTimesToSkip) : 0;
}
/**
* Constructs new strategy with given maximum allowed execution. This is the execution of the threads which are
* currently going through the circuit breaker.
*
* @param maxAllowedAverageExecutionTimeMillis max time which does not result in "break" (which allows execution of
*/
public LimitedCurrentAverageExecutionTimeStrategy(long maxAllowedAverageExecutionTimeMillis) {
this(maxAllowedAverageExecutionTimeMillis, 0);
}
@Override
public boolean shouldBreak(Task<R> task, ExecutionContext<R> executionContext) {
final Collection<ExecutedTask<R>> executionsInProgress = executionContext.getExecutionsInProgress();
if (executionsInProgress.isEmpty()) {
return false;
}
final List<Long> startTimestamps = executionsInProgress.stream()
.map(ExecutedTask::getExecutionStartedTimestamp)
.collect(Collectors.toList());
Collections.sort(startTimestamps);
final List<Long> interestingTimestamps = startTimestamps.subList(0,
(int) ((1f - percentageOfMaxTimesToSkip) * (float) startTimestamps.size()));
if (interestingTimestamps.isEmpty()) {
return false;
}
final long currentTime = timeUtil.getCurrentTimeMilis();
final Long sumOfInterestingTimestamps = interestingTimestamps.stream().mapToLong(t -> currentTime - t).sum();
final float currentAverageExecutionTime = (float) sumOfInterestingTimestamps / (float) interestingTimestamps.size();
return currentAverageExecutionTime > maxAllowedExecutionTimeMillis;
}
}