[Improve](StreamingJob) add max_filter_ratio and strict mode for mysql/pg streaming job #60473
[Improve](StreamingJob) add max_filter_ratio and strict mode for mysql/pg streaming job #60473JNSimba merged 5 commits intoapache:masterfrom
Conversation
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
|
run buildall |
FE UT Coverage ReportIncrement line coverage |
TPC-H: Total hot run time: 32066 ms |
ClickBench: Total hot run time: 28.69 s |
|
run buildall |
TPC-H: Total hot run time: 31554 ms |
ClickBench: Total hot run time: 28.33 s |
FE UT Coverage ReportIncrement line coverage |
FE Regression Coverage ReportIncrement line coverage |
|
run buildall |
|
run buildall |
TPC-H: Total hot run time: 32090 ms |
ClickBench: Total hot run time: 28.38 s |
|
run p0 |
There was a problem hiding this comment.
Pull request overview
This PR adds support for load.max_filter_ratio and load.strict_mode properties to MySQL/PostgreSQL streaming jobs, enabling error tolerance configuration for data quality monitoring.
Changes:
- Added data quality monitoring with configurable filter ratio thresholds using a sliding window approach
- Introduced
LoadStatisticclass to track filtered rows, loaded rows, and load bytes - Modified target properties validation to support load properties prefix
- Refactored statistics tracking from
scannedBytestoloadBytesand addedfilteredRowstracking
Reviewed changes
Copilot reviewed 28 out of 28 changed files in this pull request and generated 13 comments.
Show a summary per file
| File | Description |
|---|---|
| LoadStatistic.java | New class to track load statistics (filtered/loaded rows, bytes) |
| HttpPutBuilder.java | Changed properties parameter type from Properties to Map<String, String> |
| DorisBatchStreamLoad.java | Added load statistics tracking and stream load properties support |
| PipelineCoordinator.java | Updated to pass LoadStatistic object in commitOffset |
| StreamingJobUtils.java | Moved TABLE_PROPS_PREFIX constant to DataSourceConfigKeys |
| StreamingMultiTblTask.java | Generate stream load properties based on max_filter_ratio and strict_mode |
| StreamingJobStatistic.java | Added filteredRows field |
| StreamingJobSchedulerTask.java | Initialize sampleStartTime when job transitions to RUNNING |
| StreamingInsertJob.java | Implemented checkDataQuality method with sliding window monitoring |
| DataSourceConfigValidator.java | Updated to allow load properties prefix in target validation |
| StreamingJobAction.java | Removed CommitOffsetRequest inner class (moved to separate file) |
| DorisParser.g4 | Made sourceProperties optional in jobFromToClause grammar |
| CommitOffsetRequest.java | New file with fields for filtered/loaded rows and load bytes |
| DataSourceConfigKeys.java | Added TABLE_PROPS_PREFIX and LOAD_PROPERTIES constants |
| WriteRecordRequest.java | Added streamLoadProps field, removed unused abstract methods |
| JobBaseRecordRequest.java | Removed unused abstract methods |
| FetchRecordRequest.java | Removed unused method implementations |
| Test files | Updated to parse JSON statistics and verify new fields; adjusted expected byte counts |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
...-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
Show resolved
Hide resolved
...-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
Show resolved
Hide resolved
regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_split.groovy
Show resolved
Hide resolved
regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_restart_fe.groovy
Show resolved
Hide resolved
regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job.groovy
Show resolved
Hide resolved
...rc/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java
Show resolved
Hide resolved
regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job.groovy
Show resolved
Hide resolved
regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_restart_fe.groovy
Show resolved
Hide resolved
...re/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
Outdated
Show resolved
Hide resolved
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/LoadStatistic.java
Show resolved
Hide resolved
|
run buildall |
TPC-H: Total hot run time: 32501 ms |
ClickBench: Total hot run time: 28.32 s |
FE Regression Coverage ReportIncrement line coverage |
|
run cloud_p0 |
FE Regression Coverage ReportIncrement line coverage |
|
PR approved by at least one committer and no changes requested. |
|
PR approved by anyone and no changes requested. |
…l/pg streaming job (#60473) ### What problem does this PR solve? Related PR: #58898 #59461 In some scenarios, it is necessary to tolerate a certain amount of erroneous data. Supported parameters: `load.strict_mode`: Whether to enable strict mode, defaults to false. `load.max_filter_ratio`: The maximum allowed filtering rate within the sampling window, defaults to zero tolerance. The sampling window is `max_interval * 10`. That is, if the number of erroneous rows/total rows exceeds `max_filter_ratio` within the sampling window, the job will be paused, requiring manual intervention to check data quality issues. eg: ``` CREATE JOB test_streaming_mysql_job_errormsg ON STREAMING FROM MYSQL ( "jdbc_url" = "jdbc:mysql://127.0.0.1:3308", ...... ) TO DATABASE database ( "table.create.properties.replication_num" = "1" ... "load.max_filter_ratio" = "1" ) ```
What problem does this PR solve?
Related PR: #58898 #59461
In some scenarios, it is necessary to tolerate a certain amount of erroneous data.
Supported parameters:
load.strict_mode: Whether to enable strict mode, defaults to false.load.max_filter_ratio: The maximum allowed filtering rate within the sampling window, defaults to zero tolerance. The sampling window ismax_interval * 10. That is, if the number of erroneous rows/total rows exceedsmax_filter_ratiowithin the sampling window, the job will be paused, requiring manual intervention to check data quality issues.eg:
Release note
None
Check List (For Author)
Test
Behavior changed:
Does this need documentation?
Check List (For Reviewer who merge this PR)