[flink] add a test for ccovering the latest mode in scan.startup.mode configuration tests#2477
Conversation
There was a problem hiding this comment.
Pull request overview
This PR adds test coverage for the scan.startup.mode='latest' configuration option in Flink's changelog virtual table integration tests. The change ensures that the latest mode correctly reads only new records after subscription, filling a gap in the existing test suite.
Changes:
- Added an overloaded
collectRowsWithTimeoutmethod that accepts a custommaxWaitTimeparameter - Extended
testChangelogWithScanStartupModeto include a test case forscan.startup.mode='latest'
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
| fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/testutils/FlinkRowAssertionsUtils.java | Adds overloaded method to support custom timeout duration when collecting rows |
| fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/ChangelogVirtualTableITCase.java | Implements test logic for verifying latest scan startup mode behavior |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| break; | ||
| } | ||
| } | ||
| int id = Integer.parseInt(latestResults.getFirst().replaceAll(".*\\[(\\d+)]", "$1")); |
There was a problem hiding this comment.
The regex pattern assumes a specific row format. If latestResults.getFirst() doesn't match the expected pattern, parseInt will throw a NumberFormatException. Consider adding validation or using a more robust parsing approach to handle unexpected formats gracefully.
| for (int attempt = 0; attempt < 10; attempt++) { | ||
| // Write a new record (with id larger than 5) | ||
| int rowId = 6 + attempt; | ||
| writeRows(conn, tablePath, Arrays.asList(row(rowId, "v" + rowId)), false); | ||
|
|
||
| // Try to fetch one record with a 5-second timeout | ||
| latestResults = collectRowsWithTimeout(rowIterLatest, 1, Duration.ofSeconds(5)); |
There was a problem hiding this comment.
The hardcoded 5-second timeout per attempt combined with 10 attempts could result in up to 50 seconds of waiting. Consider reducing the timeout or number of attempts, or explaining why this duration is necessary to avoid unnecessarily long test execution times.
| for (int attempt = 0; attempt < 10; attempt++) { | |
| // Write a new record (with id larger than 5) | |
| int rowId = 6 + attempt; | |
| writeRows(conn, tablePath, Arrays.asList(row(rowId, "v" + rowId)), false); | |
| // Try to fetch one record with a 5-second timeout | |
| latestResults = collectRowsWithTimeout(rowIterLatest, 1, Duration.ofSeconds(5)); | |
| for (int attempt = 0; attempt < 5; attempt++) { | |
| // Write a new record (with id larger than 5) | |
| int rowId = 6 + attempt; | |
| writeRows(conn, tablePath, Arrays.asList(row(rowId, "v" + rowId)), false); | |
| // Try to fetch one record with a 1-second timeout | |
| latestResults = collectRowsWithTimeout(rowIterLatest, 1, Duration.ofSeconds(1)); |
There was a problem hiding this comment.
Can you please ellaborate your concern?
There was a problem hiding this comment.
@MehulBatra, from my side, Copilot was suggested incorrectly:
- Proposal:
- Fetch timeout: 5s -> Copilot:
latestResults = collectRowsWithTimeout(rowIterLatest, 1, Duration.ofSeconds(1)); - Repeat step 1 for 10 times -> Copilot:
for (int attempt = 0; attempt < 5; attempt++)
- Fetch timeout: 5s -> Copilot:
There was a problem hiding this comment.
I’ve thought about this again, and a more elegant approach might be to leverage Flink’s savepoint mechanism to explicitly force the initialization of the start offset. Here’s the refined idea:
- Pre-populate the source table with some existing records.
- Launch a job:
INSERT INTO sink SELECT * FROM source /*+ OPTIONS('scan.startup.mode' = 'latest') */;
- Stop the job with a savepoint (e.g., using
FlinkOperator#stopWithSavepoint, as shown in this example).
→ At this point, the “latest” offset has been determined, initialized, and checkpointed into the state. - Restart the job from the savepoint. (see example in
FlinkTableSourceFailOverITCase#initTableEnvironment) - Write new records into the source table.
- Read from the sink table — the output should contain only the records written in step 5, confirming that the job correctly started reading from the offset captured at savepoint time.
There was a problem hiding this comment.
Thanks for the suggestion! I’ll follow this approach and submit a PR accordingly.
|
@MehulBatra, can you please review the pull request again? |
Thank you for your PR I will try to review it within this week! |
|
@nhuantho can you resolve the conflicts? I will run the workflow and review |
57489ce to
6fb31ee
Compare
|
@MehulBatra I resolved conflicts |
| .containsExactly( | ||
| "+I[insert, 3, 1970-01-01T00:00:00.200Z, 4, v4]", | ||
| "+I[insert, 4, 1970-01-01T00:00:00.200Z, 5, v5]"); | ||
| // 3. Test scan.startup.mode='latest' - should only read new records after subscription |
There was a problem hiding this comment.
It will be more clear to extract this test into a separate test method.
| tEnv = initTableEnvironment(savepointPath); | ||
| // Recreate a table | ||
| tEnv.executeSql( | ||
| "CREATE TABLE startup_mode_test_mode_latest (" |
There was a problem hiding this comment.
we don't need to re-create the table, the table is already there.
|
|
||
| // Recreate a table | ||
| tEnv.executeSql( | ||
| "CREATE TABLE " |
There was a problem hiding this comment.
we don't need to re-create the table
| tEnv.executeSql( | ||
| "INSERT INTO " | ||
| + tablePath.getTableName() | ||
| + " SELECT * FROM startup_mode_test"); |
There was a problem hiding this comment.
Run the query with startup_mode_test$changelog with latest startup mode. So we can materilaize the latest offsets into state.
|
|
||
| String optionsLatest = " /*+ OPTIONS('scan.startup.mode' = 'latest') */"; | ||
| String queryLatest = | ||
| "SELECT _change_type, id, name FROM " | ||
| + tablePath.getTableName() | ||
| + "$changelog" | ||
| + optionsLatest; | ||
| CloseableIterator<Row> rowIterLatest = tEnv.executeSql(queryLatest).collect(); |
There was a problem hiding this comment.
rerun the query of insertResult
98c7a4e to
b74adcc
Compare
|
I tried to run locally, but it fails with following exception, not sure what's the reason |
10766c1 to
2a6d9cb
Compare
b66dcd8 to
6b06d6f
Compare
wuchong
left a comment
There was a problem hiding this comment.
I investigated the failing test case and traced it to an issue #2743. I submitted a commit to address the root cause and the test should pass now.
cc @MehulBatra could you help to review it when you are free, because it is related to a bug of $changelog and $binlog table.
347e0be to
995104f
Compare
995104f to
7c76581
Compare
There was a problem hiding this comment.
LGTM! just left one small comment
Thank you @wuchong & @nhuantho for the PR looks great to me.
Verified Latest mode doesn't read historical data for changelog and binlog
- Latest mode state persists in savepoints
- Restart from savepoint continues correctly
- No duplicate processing after restart
Can we move waitForCheckpoint, getFileBasedCheckpointsConfig to a base class and share it instead of writing individually for both?
One concern @wuchong as we have moved to maintaining MiniClusterWithClientResource with before each instead of abstarcttestbase could this hamper the CI runtimes.
|
@MehulBatra if this idea is approved, I can create another issue and implement it. |
|
@MehulBatra, I agree with extracting common test methods. I also share your concern regarding increased test execution time and support reusing the Flink mini-cluster as an optimization. Let's track this improvement in a follow-up PR. However, we cannot simply extend |
|
Hi @nhuantho I am already working on the shared base class for the IT, you can help me in review! |
|
@MehulBatra sure! If there’s anything I can help with, feel free to tag me. |
Sure @wuchong , we can discuss this more in the upcoming pr and improvements related to this, will kleep you posted on same. |
Hi @wuchong, @MehulBatra I have a question, Can we move
|





Purpose
Linked issue: close #2471
Brief change log
collectRowsWithTimeoutmethod allows passingmaxWaitTime.Tests
collectRowsWithTimeoutmethod allows passingmaxWaitTime.testChangelogWithScanStartupMode-> add logic for testingscan.startup.mode='latest'