Skip to content

[flink] add a test for ccovering the latest mode in scan.startup.mode configuration tests#2477

Merged
wuchong merged 3 commits into
apache:mainfrom
nhuantho:flink-test/issue-2471
Feb 28, 2026
Merged

[flink] add a test for ccovering the latest mode in scan.startup.mode configuration tests#2477
wuchong merged 3 commits into
apache:mainfrom
nhuantho:flink-test/issue-2471

Conversation

@nhuantho
Copy link
Copy Markdown
Contributor

Purpose

Linked issue: close #2471

Brief change log

  • Overloading the collectRowsWithTimeout method allows passing maxWaitTime.
  • Add a test for covering scan.startup.mode='latest'.

Tests

  • fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/testutils/FlinkRowAssertionsUtils.java
    • Overloading the collectRowsWithTimeout method allows passing maxWaitTime.
  • fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/ChangelogVirtualTableITCase.java
    • testChangelogWithScanStartupMode -> add logic for testing scan.startup.mode='latest'

@MehulBatra MehulBatra requested a review from Copilot January 29, 2026 07:51
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds test coverage for the scan.startup.mode='latest' configuration option in Flink's changelog virtual table integration tests. The change ensures that the latest mode correctly reads only new records after subscription, filling a gap in the existing test suite.

Changes:

  • Added an overloaded collectRowsWithTimeout method that accepts a custom maxWaitTime parameter
  • Extended testChangelogWithScanStartupMode to include a test case for scan.startup.mode='latest'

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.

File Description
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/testutils/FlinkRowAssertionsUtils.java Adds overloaded method to support custom timeout duration when collecting rows
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/ChangelogVirtualTableITCase.java Implements test logic for verifying latest scan startup mode behavior

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

break;
}
}
int id = Integer.parseInt(latestResults.getFirst().replaceAll(".*\\[(\\d+)]", "$1"));
Copy link

Copilot AI Jan 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The regex pattern assumes a specific row format. If latestResults.getFirst() doesn't match the expected pattern, parseInt will throw a NumberFormatException. Consider adding validation or using a more robust parsing approach to handle unexpected formats gracefully.

Copilot uses AI. Check for mistakes.
Comment on lines +386 to +392
for (int attempt = 0; attempt < 10; attempt++) {
// Write a new record (with id larger than 5)
int rowId = 6 + attempt;
writeRows(conn, tablePath, Arrays.asList(row(rowId, "v" + rowId)), false);

// Try to fetch one record with a 5-second timeout
latestResults = collectRowsWithTimeout(rowIterLatest, 1, Duration.ofSeconds(5));
Copy link

Copilot AI Jan 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The hardcoded 5-second timeout per attempt combined with 10 attempts could result in up to 50 seconds of waiting. Consider reducing the timeout or number of attempts, or explaining why this duration is necessary to avoid unnecessarily long test execution times.

Suggested change
for (int attempt = 0; attempt < 10; attempt++) {
// Write a new record (with id larger than 5)
int rowId = 6 + attempt;
writeRows(conn, tablePath, Arrays.asList(row(rowId, "v" + rowId)), false);
// Try to fetch one record with a 5-second timeout
latestResults = collectRowsWithTimeout(rowIterLatest, 1, Duration.ofSeconds(5));
for (int attempt = 0; attempt < 5; attempt++) {
// Write a new record (with id larger than 5)
int rowId = 6 + attempt;
writeRows(conn, tablePath, Arrays.asList(row(rowId, "v" + rowId)), false);
// Try to fetch one record with a 1-second timeout
latestResults = collectRowsWithTimeout(rowIterLatest, 1, Duration.ofSeconds(1));

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This suggestion is not correct:
image

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please ellaborate your concern?

Copy link
Copy Markdown
Contributor Author

@nhuantho nhuantho Feb 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@MehulBatra, from my side, Copilot was suggested incorrectly:

  • Proposal:
    • Fetch timeout: 5s -> Copilot: latestResults = collectRowsWithTimeout(rowIterLatest, 1, Duration.ofSeconds(1));
    • Repeat step 1 for 10 times -> Copilot: for (int attempt = 0; attempt < 5; attempt++)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’ve thought about this again, and a more elegant approach might be to leverage Flink’s savepoint mechanism to explicitly force the initialization of the start offset. Here’s the refined idea:

  1. Pre-populate the source table with some existing records.
  2. Launch a job:
    INSERT INTO sink SELECT * FROM source /*+ OPTIONS('scan.startup.mode' = 'latest') */;
  3. Stop the job with a savepoint (e.g., using FlinkOperator#stopWithSavepoint, as shown in this example).
    → At this point, the “latest” offset has been determined, initialized, and checkpointed into the state.
  4. Restart the job from the savepoint. (see example in FlinkTableSourceFailOverITCase#initTableEnvironment)
  5. Write new records into the source table.
  6. Read from the sink table — the output should contain only the records written in step 5, confirming that the job correctly started reading from the offset captured at savepoint time.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion! I’ll follow this approach and submit a PR accordingly.

@nhuantho
Copy link
Copy Markdown
Contributor Author

@MehulBatra, can you please review the pull request again?

@MehulBatra
Copy link
Copy Markdown
Contributor

@MehulBatra, can you please review the pull request again?

Thank you for your PR I will try to review it within this week!

@MehulBatra
Copy link
Copy Markdown
Contributor

@nhuantho can you resolve the conflicts? I will run the workflow and review

@nhuantho nhuantho force-pushed the flink-test/issue-2471 branch from 57489ce to 6fb31ee Compare February 9, 2026 15:40
@nhuantho
Copy link
Copy Markdown
Contributor Author

nhuantho commented Feb 9, 2026

@MehulBatra I resolved conflicts

Copy link
Copy Markdown
Member

@wuchong wuchong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @nhuantho , I left some comments.

.containsExactly(
"+I[insert, 3, 1970-01-01T00:00:00.200Z, 4, v4]",
"+I[insert, 4, 1970-01-01T00:00:00.200Z, 5, v5]");
// 3. Test scan.startup.mode='latest' - should only read new records after subscription
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be more clear to extract this test into a separate test method.

tEnv = initTableEnvironment(savepointPath);
// Recreate a table
tEnv.executeSql(
"CREATE TABLE startup_mode_test_mode_latest ("
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't need to re-create the table, the table is already there.


// Recreate a table
tEnv.executeSql(
"CREATE TABLE "
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't need to re-create the table

tEnv.executeSql(
"INSERT INTO "
+ tablePath.getTableName()
+ " SELECT * FROM startup_mode_test");
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Run the query with startup_mode_test$changelog with latest startup mode. So we can materilaize the latest offsets into state.

Comment on lines +462 to +469

String optionsLatest = " /*+ OPTIONS('scan.startup.mode' = 'latest') */";
String queryLatest =
"SELECT _change_type, id, name FROM "
+ tablePath.getTableName()
+ "$changelog"
+ optionsLatest;
CloseableIterator<Row> rowIterLatest = tEnv.executeSql(queryLatest).collect();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rerun the query of insertResult

@nhuantho nhuantho force-pushed the flink-test/issue-2471 branch 3 times, most recently from 98c7a4e to b74adcc Compare February 11, 2026 19:27
@nhuantho nhuantho requested a review from wuchong February 11, 2026 19:28
@wuchong
Copy link
Copy Markdown
Member

wuchong commented Feb 12, 2026

I tried to run locally, but it fails with following exception, not sure what's the reason

java.util.concurrent.ExecutionException: org.apache.flink.runtime.dispatcher.UnavailableDispatcherOperationException: Unable to get JobMasterGateway for initializing job. The requested operation is not available while the JobManager is initializing.

	at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
	at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2005)
	at org.apache.fluss.flink.source.ChangelogVirtualTableITCase.testChangelogWithLatestScanStartupMode(ChangelogVirtualTableITCase.java:444)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:727)
	at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
	at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156)
	at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147)
	at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:86)
	at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103)
	at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
	at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92)
	at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:217)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:213)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:138)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:68)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
	at org.junit.platform.engine.support.hierarchical.ForkJoinPoolHierarchicalTestExecutorService$ExclusiveTask.compute(ForkJoinPoolHierarchicalTestExecutorService.java:185)
	at org.junit.platform.engine.support.hierarchical.ForkJoinPoolHierarchicalTestExecutorService.invokeAll(ForkJoinPoolHierarchicalTestExecutorService.java:129)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
	at org.junit.platform.engine.support.hierarchical.ForkJoinPoolHierarchicalTestExecutorService$ExclusiveTask.compute(ForkJoinPoolHierarchicalTestExecutorService.java:185)
	at org.junit.platform.engine.support.hierarchical.ForkJoinPoolHierarchicalTestExecutorService.invokeAll(ForkJoinPoolHierarchicalTestExecutorService.java:129)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
	at org.junit.platform.engine.support.hierarchical.ForkJoinPoolHierarchicalTestExecutorService$ExclusiveTask.compute(ForkJoinPoolHierarchicalTestExecutorService.java:185)
	at java.base/java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189)
	at java.base/java.util.concurrent.ForkJoinTask.doExec$$$capture(ForkJoinTask.java:290)
	at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java)
	at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
	at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
	at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
	at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
Caused by: org.apache.flink.runtime.dispatcher.UnavailableDispatcherOperationException: Unable to get JobMasterGateway for initializing job. The requested operation is not available while the JobManager is initializing.
	at org.apache.flink.runtime.dispatcher.Dispatcher.getJobMasterGateway(Dispatcher.java:1503)
	at org.apache.flink.runtime.dispatcher.Dispatcher.performOperationOnJobMasterGateway(Dispatcher.java:1513)
	at org.apache.flink.runtime.dispatcher.Dispatcher.stopWithSavepointAndGetLocation(Dispatcher.java:1077)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:318)

@nhuantho nhuantho force-pushed the flink-test/issue-2471 branch from 10766c1 to 2a6d9cb Compare February 15, 2026 16:11
@wuchong wuchong force-pushed the flink-test/issue-2471 branch from b66dcd8 to 6b06d6f Compare February 27, 2026 17:17
Copy link
Copy Markdown
Member

@wuchong wuchong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I investigated the failing test case and traced it to an issue #2743. I submitted a commit to address the root cause and the test should pass now.

cc @MehulBatra could you help to review it when you are free, because it is related to a bug of $changelog and $binlog table.

@wuchong wuchong force-pushed the flink-test/issue-2471 branch 2 times, most recently from 347e0be to 995104f Compare February 28, 2026 03:26
@wuchong wuchong force-pushed the flink-test/issue-2471 branch from 995104f to 7c76581 Compare February 28, 2026 04:32
@wuchong wuchong merged commit fbdb7fa into apache:main Feb 28, 2026
14 of 16 checks passed
@nhuantho nhuantho deleted the flink-test/issue-2471 branch February 28, 2026 08:04
Copy link
Copy Markdown
Contributor

@MehulBatra MehulBatra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! just left one small comment

Thank you @wuchong & @nhuantho for the PR looks great to me.
Verified Latest mode doesn't read historical data for changelog and binlog

  • Latest mode state persists in savepoints
  • Restart from savepoint continues correctly
  • No duplicate processing after restart

Can we move waitForCheckpoint, getFileBasedCheckpointsConfig to a base class and share it instead of writing individually for both?

One concern @wuchong as we have moved to maintaining MiniClusterWithClientResource with before each instead of abstarcttestbase could this hamper the CI runtimes.

@nhuantho
Copy link
Copy Markdown
Contributor Author

@MehulBatra if this idea is approved, I can create another issue and implement it.

@wuchong
Copy link
Copy Markdown
Member

wuchong commented Feb 28, 2026

@MehulBatra, I agree with extracting common test methods.

I also share your concern regarding increased test execution time and support reusing the Flink mini-cluster as an optimization. Let's track this improvement in a follow-up PR. However, we cannot simply extend AbstractTestBase in this case, as we need to override specific checkpoint configurations.

@MehulBatra
Copy link
Copy Markdown
Contributor

Hi @nhuantho I am already working on the shared base class for the IT, you can help me in review!

@nhuantho
Copy link
Copy Markdown
Contributor Author

nhuantho commented Mar 1, 2026

@MehulBatra sure! If there’s anything I can help with, feel free to tag me.

@MehulBatra
Copy link
Copy Markdown
Contributor

@MehulBatra, I agree with extracting common test methods.

I also share your concern regarding increased test execution time and support reusing the Flink mini-cluster as an optimization. Let's track this improvement in a follow-up PR. However, we cannot simply extend AbstractTestBase in this case, as we need to override specific checkpoint configurations.

Sure @wuchong , we can discuss this more in the upcoming pr and improvements related to this, will kleep you posted on same.

@nhuantho
Copy link
Copy Markdown
Contributor Author

nhuantho commented Mar 1, 2026

@MehulBatra, I agree with extracting common test methods.

I also share your concern regarding increased test execution time and support reusing the Flink mini-cluster as an optimization. Let's track this improvement in a follow-up PR. However, we cannot simply extend AbstractTestBase in this case, as we need to override specific checkpoint configurations.

Hi @wuchong, @MehulBatra I have a question, Can we move MiniClusterWithClientResource to beforeAll, instead of beforeEach. From my side, it can reduce time to creating MiniCluster.
I tested with:

  • First
  • Create MiniCluster in beforeEach
image
  • Create MiniCluster in beforeAll
image (4)
  • Second
  • Create MiniCluster in beforeEach
image
  • Create MiniCluster in beforeAll
image
  • If it is correct, I can create a pr.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

4 participants