使用 guava-retrying 实现灵活的重试机制
有意思的是,这个项目最初源于 Jean-Baptiste Nizet 在 guava 仓库下的评论。
guava-retrying 入门
<dependency>
<groupId>com.github.rholdergroupId>
<artifactId>guava-retryingartifactId>
<version>2.0.0version>
dependency>
compile "com.github.rholder:guava-retrying:2.0.0"
// 调用接口
boolean result;
AtomicInteger atomicInteger = new AtomicInteger(0);
int sleepNum = 10000;
while(!result && atomicInteger.get() < 4) {
atomicInteger.incrementAndGet();
result = thirdApi.invoke();
Thread.sleep(sleepNum);
sleepNum += sleepNum * atomicInteger.get();
}
Callable<Boolean> callable = () -> {
return thirdApi.invoke(); // 业务逻辑
};
// 定义重试器
Retryer<Boolean> retryer = RetryerBuilder.<Boolean>newBuilder()
.retryIfResult(Predicates.<Boolean>isNull()) // 如果结果为空则重试
.retryIfExceptionOfType(IOException.class) // 发生IO异常则重试
.retryIfRuntimeException() // 发生运行时异常则重试
.withWaitStrategy(WaitStrategies.incrementingWait(10, TimeUnit.SECONDS, 10, TimeUnit.SECONDS)) // 等待
.withStopStrategy(StopStrategies.stopAfterAttempt(4)) // 允许执行4次(首次执行 + 最多重试3次)
.build();
try {
retryer.call(callable); // 执行
} catch (RetryException | ExecutionException e) { // 重试次数超过阈值或被强制中断
e.printStackTrace();
}
重试条件 retryIfResult、retryIfExceptionOfType、retryIfRuntimeException 重试等待策略(延迟)withWaitStrategy 重试停止策略 withStopStrategy 阻塞策略、超时限制、注册重试监听器(上述代码未使用)
RetryerBuilder<V> newBuilder()
Retryer<V> build()
重试条件
RetryerBuilder<V> retryIfResult(@Nonnull Predicate<V> resultPredicate)
// 发生任何异常都重试
retryIfException()
// 发生 Runtime 异常都重试
RetryerBuilder
retryIfRuntimeException() // 发生指定 type 异常时重试
RetryerBuilder
retryIfExceptionOfType(extends Throwable> exceptionClass) Class// 匹配到指定类型异常时重试
RetryerBuilder
retryIfException( PredicateexceptionPredicate)
RetryerBuilder<V> withWaitStrategy(@Nonnull WaitStrategy waitStrategy) throws IllegalStateException
// 参数:等待时间,时间单位
WaitStrategy fixedWait(long sleepTime, @Nonnull TimeUnit timeUnit) throws IllegalStateException
// 参数:随机上限,时间单位
WaitStrategy randomWait(long maximumTime, @Nonnull TimeUnit timeUnit)
// 参数:随机下限,下限时间单位,随机上限,上限时间单位
WaitStrategy randomWait(long minimumTime,
@Nonnull TimeUnit minimumTimeUnit,
long maximumTime,
@Nonnull TimeUnit maximumTimeUnit)
// 参数:初始等待时长,初始值时间单位,递增值,递增值时间单位
WaitStrategy incrementingWait(long initialSleepTime,
@Nonnull TimeUnit initialSleepTimeUnit,
long increment,
@Nonnull TimeUnit incrementTimeUnit)
// 无参数(默认初始值为1)
WaitStrategy exponentialWait()
// 参数:最大等待时长,最大等待时间单位(默认初始值为1)
WaitStrategy exponentialWait(long maximumTime, @Nonnull TimeUnit maximumTimeUnit)
// 参数:初始值,最大等待时长,最大等待时间单位
WaitStrategy exponentialWait(long multiplier, long maximumTime, @Nonnull TimeUnit maximumTimeUnit)
// 无参数(默认初始值为1)
WaitStrategy fibonacciWait()
// 参数:最大等待时长,最大等待时间单位(默认初始值为1)
WaitStrategy fibonacciWait(long maximumTime, @Nonnull TimeUnit maximumTimeUnit)
// 参数:最大等待时长,最大等待时间单位(默认初始值为1)
WaitStrategy fibonacciWait(long multiplier, long maximumTime, @Nonnull TimeUnit maximumTimeUnit)
// 参数:异常类型,计算等待时长的函数
extends Throwable> WaitStrategy exceptionWait( ClassexceptionClass, Function
function)
// 参数:等待策略数组
WaitStrategy join(WaitStrategy... waitStrategies)
RetryerBuilder<V> withBlockStrategy(@Nonnull BlockStrategy blockStrategy) throws IllegalStateException
private static class ThreadSleepStrategy implements BlockStrategy {
public void block(long sleepTime) throws InterruptedException {
Thread.sleep(sleepTime);
}
}
停止策略
RetryerBuilder<V> withStopStrategy(@Nonnull StopStrategy stopStrategy) throws IllegalStateException
超时限制
RetryerBuilder<V> withAttemptTimeLimiter(@Nonnull AttemptTimeLimiter<V> attemptTimeLimiter)
NoAttemptTimeLimit:不限制执行时间 FixedAttemptTimeLimit:限制执行时间为固定值
监听器
import com.github.rholder.retry.Attempt;
import com.github.rholder.retry.RetryListener;
public class MyRetryListener<T> implements RetryListener {
@Override
public <T> void onRetry(Attempt<T> attempt) {
// 第几次重试,(注意:第一次重试其实是第一次调用)
System.out.print("[retry]time=" + attempt.getAttemptNumber());
// 距离第一次重试的延迟
System.out.print(",delay=" + attempt.getDelaySinceFirstAttempt());
// 重试结果: 是异常终止, 还是正常返回
System.out.print(",hasException=" + attempt.hasException());
System.out.print(",hasResult=" + attempt.hasResult());
// 是什么原因导致异常
if (attempt.hasException()) {
System.out.print(",causeBy=" + attempt.getExceptionCause().toString());
} else {
// 正常返回时的结果
System.out.print(",result=" + attempt.getResult());
}
}
}
public V call(Callable
callable ) throws ExecutionException, RetryException {long startTime = System.nanoTime(); // 1. 记录开始时间,用于后续的时间计算
for (int attemptNumber = 1; ; attemptNumber++) {
Attempt
attempt; try {
V result = attemptTimeLimiter.call(callable); // 2. 执行callable任务,得到attempt
attempt = new ResultAttempt
(result, attemptNumber, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime)); } catch (Throwable t) {
attempt = new ExceptionAttempt
(t, attemptNumber, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime)); }
for (RetryListener listener : listeners) { // 3. 如果有***,通知
listener.onRetry(attempt);
}
if (!rejectionPredicate.apply(attempt)) { // 4. 如果执行callable出现异常,则返回异常的attempt
return attempt.get();
}
if (stopStrategy.shouldStop(attempt)) { // 5. 根据停止策略判断是否停止重试
throw new RetryException(attemptNumber, attempt); // 若停止,抛出异常
} else {
long sleepTime = waitStrategy.computeSleepTime(attempt); // 6. 根据等待策略计算休眠时间
try {
blockStrategy.block(sleepTime); // 7. 根据阻塞策略决定休眠行为,默认为sleep
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RetryException(attemptNumber, attempt);
}
}
}
}
如果 attemptTimeLimiter 是 NoAttemptTimeLimit,则直接调用 callable.call 执行。 如果 attemptTimeLimiter 是 FixedAttemptTimeLimit,则调用 timeLimiter.callWithTimeout 限制执行时间。
rejectionPredicate = Predicates.or(rejectionPredicate, new ExceptionClassPredicate(RuntimeException.class));
有道无术,术可成;有术无道,止于术
欢迎大家关注Java之道公众号
好文章,我在看❤️
评论