Skip to content

[Proposal]: Retry Loader #1842

@norberttech

Description

@norberttech

Describe the Proposal

Extractor/Loaders, especially those that relay on external services might crash from time to time, especially when we are dealing with networking for reasons unrelated to data.

In those cases we would like to retry extracting/loading, with some small delay.
However we should be able to have a flexible logic deciding if retry is even necessary.

For that we should create an RetryStrategy interface with few simple implementations:

  • OnException (allow to pass one or more exceptions classes, when non passed any Throwable triggers retry)

Additionally we should be able to control the delay through DelayFactory interface with following implementations:

  • Fixed(Duration $delay)
  • Exponential(Duration $initial, int $multiplier)
  • Linear(Duration $delay, int $increment)
  • Jitter(Duration $initial, float $randomness)

Duration - this is a simple value object that have multiple static factories for fromMilliseconds / fromSeconds / fromMinutes (with private constructor) and that under the hood keeps everything as milliseconds and that return those milliseconds through method milliseconds(): int.

Sleep - in order to not wait in tests, we also need to use Sleep abstraction. Simple interface that will let user to decide for how long process should sleep with 2 implementations.
Sleep interface should also use TimeUnit value object Sleep::for(Duration $duration)

  • FakeSleep - for tests
  • SystemSleep - for actual implementation that uses usleep

Namespaces

  • Loader should be implemented in following namespace: src/core/etl/src/Flow/ETL/Loader
  • Extractor should be implemented in following namespace: src/core/etl/src/Flow/ETL/Extractor
  • Sleep / Duration should be implemented in following namespace: src/core/etl/src/Flow/ETL/Time
  • RetryStrategy / DelayFactory should be implemented in src/core/etl/src/Flow/ETL/Retry

API Adjustments

DSL Functions

write_with_retries(
   Loader $loader,
   RetryStrategy $retryStrategy = new OnException(),
   DelayFactory $delayFactory = Fixed(TimeUnit::fromSeconds(1)),
   int $maxRetries = 3,
)
read_with_retries(
   Extractor $loader,
   RetryStrategy $retryStrategy = new OnException(),
   DelayFactory $delayFactory = new Fixed(TimeUnit::fromSeconds(1)),
   int $maxRetries = 3,
)

Are you intending to also work on proposed change?

Yes

Are you interested in sponsoring this change?

None

Integration & Dependencies

No response

Sub-issues

Metadata

Metadata

Assignees

Labels

No labels
No labels

Projects

Status

Done

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions