-
-
Notifications
You must be signed in to change notification settings - Fork 48
Description
Describe the Proposal
Extractor/Loaders, especially those that relay on external services might crash from time to time, especially when we are dealing with networking for reasons unrelated to data.
In those cases we would like to retry extracting/loading, with some small delay.
However we should be able to have a flexible logic deciding if retry is even necessary.
For that we should create an RetryStrategy interface with few simple implementations:
- OnException (allow to pass one or more exceptions classes, when non passed any Throwable triggers retry)
Additionally we should be able to control the delay through DelayFactory interface with following implementations:
- Fixed(Duration $delay)
- Exponential(Duration $initial, int $multiplier)
- Linear(Duration $delay, int $increment)
- Jitter(Duration $initial, float $randomness)
Duration - this is a simple value object that have multiple static factories for fromMilliseconds / fromSeconds / fromMinutes (with private constructor) and that under the hood keeps everything as milliseconds and that return those milliseconds through method milliseconds(): int.
Sleep - in order to not wait in tests, we also need to use Sleep abstraction. Simple interface that will let user to decide for how long process should sleep with 2 implementations.
Sleep interface should also use TimeUnit value object Sleep::for(Duration $duration)
- FakeSleep - for tests
- SystemSleep - for actual implementation that uses usleep
Namespaces
- Loader should be implemented in following namespace: src/core/etl/src/Flow/ETL/Loader
- Extractor should be implemented in following namespace: src/core/etl/src/Flow/ETL/Extractor
- Sleep / Duration should be implemented in following namespace: src/core/etl/src/Flow/ETL/Time
- RetryStrategy / DelayFactory should be implemented in src/core/etl/src/Flow/ETL/Retry
API Adjustments
DSL Functions
write_with_retries(
Loader $loader,
RetryStrategy $retryStrategy = new OnException(),
DelayFactory $delayFactory = Fixed(TimeUnit::fromSeconds(1)),
int $maxRetries = 3,
)
read_with_retries(
Extractor $loader,
RetryStrategy $retryStrategy = new OnException(),
DelayFactory $delayFactory = new Fixed(TimeUnit::fromSeconds(1)),
int $maxRetries = 3,
)
Are you intending to also work on proposed change?
Yes
Are you interested in sponsoring this change?
None
Integration & Dependencies
No response
Sub-issues
Metadata
Metadata
Assignees
Labels
Type
Projects
Status