Skip to content

[Question] <title> 网关向下游下发请求失败时,基于Reactor响应式流实现单机版本的异步非阻塞重试策略 #5920

@SpringCat1024

Description

@SpringCat1024

Question

Divide或HTTP Client插件中,向下游服务发送请求失败后的重试策略,当前实现方案是比较简单的,建议基于Reactor响应式流实现单机版本的异步非阻塞重试策略。

实现效果:
● 异步非阻塞,不影响主线程
● 支持自定义重试次数、重试间隔、重试上限
● 支持按任务失败条件出发重试(eg: 指定异常重试)
● 支持异步获取任务执行结果,直到重试成功或者重试超限
● 轻量级,不依赖第三方重试调度中间件(如MQ,ScheduleX),不依赖Spring框架本身提供的定时调度能力。
● 多种重试策略:自定义策略(自定义退避序列)、固定间隔重试策略、默认重试策略等。

当前失败重试代码如下

Image

如图所示,这里是For循环3次比较简单的失败重试。

基于Reactor响应式流实现单机版本的异步非阻塞重试策略

实现示例代码如下:

RetryUtils 工具类

/**
 * 重试工具类
 */
public class RetryUtils {

    /**
     * 记录重试次数
     */
    private static final AtomicInteger retryCount = new AtomicInteger(0);

    /**
     * 执行并返回响应流
     */
    public static Mono<String> execute(Object... param) {
        return execute(RetryBackoffSpecEnum.DEFAULT_BACKOFF, param);
    }

    /**
     * 执行并返回响应流(指定重试策略)
     */
    public static Mono<String> execute(
                  RetryBackoffSpecEnum backoffSpecEnum, Object... param) {
        return retryWithBackoff(doExecute(param), backoffSpecEnum);
    }

    private static Mono<String> doExecute(Object... param) {
        return Mono.defer(() -> {
            // 模拟50%的异常情况,指定异常重试
            if (Math.random() < 0.5) {
                return Mono.error(new IllegalStateException("执行失败... " + Arrays.toString(param)));
            } else {
                return Mono.just("执行成功: " + Arrays.toString(param));
            }
        });
    }

    /**
     * @param mono 需要重试的操作
     * @return 返回包含重试结果的Mono
     */
    public static <T> Mono<T> retryWithBackoff(Mono<T> mono, RetryBackoffSpecEnum backoffSpecEnum) {
        RetryBackoffSpec backoffSpec = holders.get(backoffSpecEnum);

        return mono.doOnNext(i -> System.out.println("Processing item: " + i))
                .doOnError(e -> System.err.println("Error occurred: " + e.getMessage()))
                .retryWhen(
                        backoffSpec.doAfterRetry(retrySignal -> doRetry())
                )
                .doFinally(signalType -> {
                    if (signalType == SignalType.ON_ERROR) {
                        System.err.println("重试结束,最终失败");
                    } else if (signalType == SignalType.ON_COMPLETE) {
                        System.out.println("重试结束,成功完成");
                    }
                    resetRetryCount();
                });
    }

    public enum RetryBackoffSpecEnum {
        /**
         * 默认重试
         */
        DEFAULT_BACKOFF,

        /**
         * 固定重试
         */
        FIXED_BACKOFF,

        /**
         * 自定义重试
         */
        CUSTOM_BACKOFF,
    }

    private static final Map<RetryBackoffSpecEnum, RetryBackoffSpec> holders = new HashMap<>();

    static {
        holders.put(RetryBackoffSpecEnum.DEFAULT_BACKOFF, initDefaultBackoff());
        holders.put(RetryBackoffSpecEnum.FIXED_BACKOFF, initFixedBackoff());
        holders.put(RetryBackoffSpecEnum.CUSTOM_BACKOFF, initCustomBackoff());
    }

    private static RetryBackoffSpec initCustomBackoff() {
        // TODO 自己实现
        return null;
    }

    private static RetryBackoffSpec initFixedBackoff() {
        return Retry.fixedDelay(5, Duration.ofSeconds(2));
    }

    private static RetryBackoffSpec initDefaultBackoff() {
        return Retry.backoff(3, Duration.ofMillis(500))
                .maxBackoff(Duration.ofSeconds(5))
                // 只对瞬时错误进行重试
                .transientErrors(true)
                // 添加 50% 的随机抖动到每次重试的延迟时间
                .jitter(0.5d)
                .filter(t -> t instanceof IllegalStateException)
                // 当达到最大重试次数后抛出一个指定的异常
                .onRetryExhaustedThrow((retryBackoffSpecErr, retrySignal) -> {
                    throw new IllegalStateException("重试超限");
                });
    }

    private static void doRetry() {
        retryCount.incrementAndGet();
        System.out.println("执行重试,重试次数: " + retryCount.get());
    }

    private static void resetRetryCount() {
        retryCount.set(0);
    }
}

测试重试效果

public static void main(String[] args) {

    for (int i = 1; i <= 20; i++) {
        Mono<String> result = RetryUtils.execute(String.format("第【%s】次调用", i));
        // 订阅结果
        result.subscribe(
                // 成功时处理结果
                data -> System.out.println("Received: " + data),
                // 所有重试都失败时处理错误
                error -> System.err.println("Final error: " + error.getMessage()),
                // 流完成时调用
                () -> System.out.println("Completed")
        );
        try {
            Thread.sleep(10000); // 给异步任务一点时间完成
            System.out.println("===========================分隔符===========================");
        } catch (InterruptedException ignored) {
        }
    }
}

使用默认重试策略时,命中IllegalStateException异常规则,重试间隔500ms,最失败大重试3次,如果重试过程中成功,则直接终止重试。

● CASE1 第一次请求成功,不再触发重试:

Image

● CASE2 首次请求执行失败,触发重试,并重试成功,终止重试:

Image

● CASE3 首次请求执行失败,触发重试,重试次数达到上限,终止重试:

Image

Metadata

Metadata

Labels

type: questionFurther information is requested

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions