[Fix](StreamingJob) fix postgres consumer data in multi backend#59798
[Fix](StreamingJob) fix postgres consumer data in multi backend#59798JNSimba merged 7 commits intoapache:masterfrom
Conversation
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
|
run buildall |
|
run buildall |
TPC-H: Total hot run time: 31764 ms |
TPC-DS: Total hot run time: 172827 ms |
FE UT Coverage ReportIncrement line coverage |
FE Regression Coverage ReportIncrement line coverage |
|
run cloud_p0 |
FE Regression Coverage ReportIncrement line coverage |
# Conflicts: # fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
|
run buildall |
|
run buildall |
TPC-H: Total hot run time: 31642 ms |
TPC-DS: Total hot run time: 171994 ms |
|
run p0 |
|
run buildall |
TPC-H: Total hot run time: 32020 ms |
TPC-DS: Total hot run time: 172939 ms |
FE UT Coverage ReportIncrement line coverage |
|
run nonConcurrent |
FE Regression Coverage ReportIncrement line coverage |
|
PR approved by at least one committer and no changes requested. |
|
PR approved by anyone and no changes requested. |
There was a problem hiding this comment.
Pull request overview
This PR fixes PostgreSQL consumer data handling in multi-backend scenarios by ensuring proper slot management and connection cleanup. The changes address the issue where PostgreSQL slots can only be used by one client at a time, necessitating proper initialization and cleanup strategies.
Changes:
- Added early PostgreSQL slot creation during job initialization to prevent conflicts in multi-backend scenarios
- Moved connection cleanup from mid-processing to the
finishSplitRecordsmethod to properly release connections after each read cycle - Refactored tests to use proper assertions and polling mechanisms instead of fixed sleep delays
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
PostgresSourceReader.java |
Added log message when replication slot already exists |
MySqlSourceReader.java |
Moved binlog reader cleanup from mid-processing to finishSplitRecords method |
JdbcIncrementalSourceReader.java |
Moved binlog reader cleanup from mid-processing to finishSplitRecords method |
ClientController.java |
Added new /api/initReader endpoint for early source reader initialization |
JdbcSourceOffsetProvider.java |
Added initSourceReader method to initialize readers for latest mode scenarios |
StreamingMultiTblTask.java |
Enhanced logging to include backend information for better debugging |
test_streaming_postgres_job_priv.groovy |
Refactored test to expect early failure without replication privileges and use polling instead of sleep |
test_streaming_mysql_job_priv.groovy |
Replaced fixed sleep with polling mechanism to wait for task completion |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
Show resolved
Hide resolved
...ient/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
Show resolved
Hide resolved
...c_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
Show resolved
Hide resolved
regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_priv.groovy
Show resolved
Hide resolved
regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_priv.groovy
Show resolved
Hide resolved
regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_priv.groovy
Show resolved
Hide resolved
### What problem does this PR solve? Related PR: #59461 1. PostgreSQL uses slots for data consumption, but only one client can use a slot at a time. Therefore, after consuming data from the WAL phase, the slot needs to be closed. This doesn't affect MySQL, but it can be closed to avoid consuming connections. 2. Create pg slot first when create job 3. fix unstable case
### What problem does this PR solve? Issue Number: close #xxx Related PR: #59798 If you close the reader before committing the LSN, the commit will not be completed.
### What problem does this PR solve? Issue Number: close #xxx Related PR: #59798 If you close the reader before committing the LSN, the commit will not be completed.
What problem does this PR solve?
Related PR: #59461
Release note
None
Check List (For Author)
Test
Behavior changed:
Does this need documentation?
Check List (For Reviewer who merge this PR)