-
Notifications
You must be signed in to change notification settings - Fork 3k
Closed
Labels
type: questionFurther information is requestedFurther information is requested
Description
Question
Divide或HTTP Client插件中,向下游服务发送请求失败后的重试策略,当前实现方案是比较简单的,建议基于Reactor响应式流实现单机版本的异步非阻塞重试策略。
实现效果:
● 异步非阻塞,不影响主线程
● 支持自定义重试次数、重试间隔、重试上限
● 支持按任务失败条件出发重试(eg: 指定异常重试)
● 支持异步获取任务执行结果,直到重试成功或者重试超限
● 轻量级,不依赖第三方重试调度中间件(如MQ,ScheduleX),不依赖Spring框架本身提供的定时调度能力。
● 多种重试策略:自定义策略(自定义退避序列)、固定间隔重试策略、默认重试策略等。
当前失败重试代码如下
如图所示,这里是For循环3次比较简单的失败重试。
基于Reactor响应式流实现单机版本的异步非阻塞重试策略
实现示例代码如下:
RetryUtils 工具类
/**
* 重试工具类
*/
public class RetryUtils {
/**
* 记录重试次数
*/
private static final AtomicInteger retryCount = new AtomicInteger(0);
/**
* 执行并返回响应流
*/
public static Mono<String> execute(Object... param) {
return execute(RetryBackoffSpecEnum.DEFAULT_BACKOFF, param);
}
/**
* 执行并返回响应流(指定重试策略)
*/
public static Mono<String> execute(
RetryBackoffSpecEnum backoffSpecEnum, Object... param) {
return retryWithBackoff(doExecute(param), backoffSpecEnum);
}
private static Mono<String> doExecute(Object... param) {
return Mono.defer(() -> {
// 模拟50%的异常情况,指定异常重试
if (Math.random() < 0.5) {
return Mono.error(new IllegalStateException("执行失败... " + Arrays.toString(param)));
} else {
return Mono.just("执行成功: " + Arrays.toString(param));
}
});
}
/**
* @param mono 需要重试的操作
* @return 返回包含重试结果的Mono
*/
public static <T> Mono<T> retryWithBackoff(Mono<T> mono, RetryBackoffSpecEnum backoffSpecEnum) {
RetryBackoffSpec backoffSpec = holders.get(backoffSpecEnum);
return mono.doOnNext(i -> System.out.println("Processing item: " + i))
.doOnError(e -> System.err.println("Error occurred: " + e.getMessage()))
.retryWhen(
backoffSpec.doAfterRetry(retrySignal -> doRetry())
)
.doFinally(signalType -> {
if (signalType == SignalType.ON_ERROR) {
System.err.println("重试结束,最终失败");
} else if (signalType == SignalType.ON_COMPLETE) {
System.out.println("重试结束,成功完成");
}
resetRetryCount();
});
}
public enum RetryBackoffSpecEnum {
/**
* 默认重试
*/
DEFAULT_BACKOFF,
/**
* 固定重试
*/
FIXED_BACKOFF,
/**
* 自定义重试
*/
CUSTOM_BACKOFF,
}
private static final Map<RetryBackoffSpecEnum, RetryBackoffSpec> holders = new HashMap<>();
static {
holders.put(RetryBackoffSpecEnum.DEFAULT_BACKOFF, initDefaultBackoff());
holders.put(RetryBackoffSpecEnum.FIXED_BACKOFF, initFixedBackoff());
holders.put(RetryBackoffSpecEnum.CUSTOM_BACKOFF, initCustomBackoff());
}
private static RetryBackoffSpec initCustomBackoff() {
// TODO 自己实现
return null;
}
private static RetryBackoffSpec initFixedBackoff() {
return Retry.fixedDelay(5, Duration.ofSeconds(2));
}
private static RetryBackoffSpec initDefaultBackoff() {
return Retry.backoff(3, Duration.ofMillis(500))
.maxBackoff(Duration.ofSeconds(5))
// 只对瞬时错误进行重试
.transientErrors(true)
// 添加 50% 的随机抖动到每次重试的延迟时间
.jitter(0.5d)
.filter(t -> t instanceof IllegalStateException)
// 当达到最大重试次数后抛出一个指定的异常
.onRetryExhaustedThrow((retryBackoffSpecErr, retrySignal) -> {
throw new IllegalStateException("重试超限");
});
}
private static void doRetry() {
retryCount.incrementAndGet();
System.out.println("执行重试,重试次数: " + retryCount.get());
}
private static void resetRetryCount() {
retryCount.set(0);
}
}测试重试效果
public static void main(String[] args) {
for (int i = 1; i <= 20; i++) {
Mono<String> result = RetryUtils.execute(String.format("第【%s】次调用", i));
// 订阅结果
result.subscribe(
// 成功时处理结果
data -> System.out.println("Received: " + data),
// 所有重试都失败时处理错误
error -> System.err.println("Final error: " + error.getMessage()),
// 流完成时调用
() -> System.out.println("Completed")
);
try {
Thread.sleep(10000); // 给异步任务一点时间完成
System.out.println("===========================分隔符===========================");
} catch (InterruptedException ignored) {
}
}
}使用默认重试策略时,命中IllegalStateException异常规则,重试间隔500ms,最失败大重试3次,如果重试过程中成功,则直接终止重试。
● CASE1 第一次请求成功,不再触发重试:
● CASE2 首次请求执行失败,触发重试,并重试成功,终止重试:
● CASE3 首次请求执行失败,触发重试,重试次数达到上限,终止重试:
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
type: questionFurther information is requestedFurther information is requested



