Skip to content

[flink] Support delta join on Flink 2.1#1726

Merged
wuchong merged 2 commits into
apache:mainfrom
xuyangzhong:issues_1143_delta_join
Sep 27, 2025
Merged

[flink] Support delta join on Flink 2.1#1726
wuchong merged 2 commits into
apache:mainfrom
xuyangzhong:issues_1143_delta_join

Conversation

@xuyangzhong
Copy link
Copy Markdown
Contributor

Purpose

Linked issue: close #1143

Support delta join on Flink 2.1.

Brief change log

  • Introduce a specific Catalog for Flink 2.1 and override the getTable method.
  • Treat pk as an index in the schema.
  • Treat bucket keys and partition keys that serve as primary key prefixes as an index in the schema.
  • Add tests for Catalog#getTable and Delta Join

Tests

  • Flink21CatalogITCase
  • Flink21TableSourceITCase

API and Format

None.

Documentation

None.

Copy link
Copy Markdown
Contributor

@swuferhong swuferhong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your contribution, I've left a few minor comments. Additionally, shall we create an issue to write a document on Flink2.1's support for delta joins?


@Test
void testDeltaJoin() throws Exception {
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment explain why we need to set this to 2.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

.column("r", DataTypes.TIMESTAMP_LTZ())
.column("s", DataTypes.ROW(DataTypes.FIELD("a", DataTypes.INT())))
.primaryKey("a")
.index("a");
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should avoid duplicating test content, otherwise, it's hard to maintain the code. It seems the only difference with the base test is the index("a"), can we add a base method addDefaultIndexKey(SchemaBuilder) in the parent class with nothing to do in default implementation but overrirde in this class?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea! Done for it.

.column("order_time", DataTypes.TIMESTAMP(3))
.watermark("order_time", "`order_time` - INTERVAL '5' SECOND")
.primaryKey("user")
.index("user");
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

@xuyangzhong
Copy link
Copy Markdown
Contributor Author

Thank you for your contribution, I've left a few minor comments. Additionally, shall we create an issue to write a document on Flink2.1's support for delta joins?

I have created a new one for the doc #1739

@wuchong
Copy link
Copy Markdown
Member

wuchong commented Sep 26, 2025

Hi @xuyangzhong , I saw you have marked the comments addressed. But it seems the new commits are missed to push to the repo?

@xuyangzhong
Copy link
Copy Markdown
Contributor Author

Hi @xuyangzhong , I saw you have marked the comments addressed. But it seems the new commits are missed to push to the repo?

@wuchong Sorry, I forgot to commit it. Have done.

@xuyangzhong xuyangzhong force-pushed the issues_1143_delta_join branch from ae53c5d to 954b49a Compare September 27, 2025 05:36
@wuchong
Copy link
Copy Markdown
Member

wuchong commented Sep 27, 2025

@xuyangzhong the test case is a primary key join, could you replace it with an index join?

@xuyangzhong xuyangzhong force-pushed the issues_1143_delta_join branch from 954b49a to 797e369 Compare September 27, 2025 06:08
@xuyangzhong
Copy link
Copy Markdown
Contributor Author

@xuyangzhong the test case is a primary key join, could you replace it with an index join?

@wuchong Due to the presence of two sets of indexes (primary key and bucket key) in Fluss, a bug will be triggered: https://issues.apache.org/jira/browse/FLINK-38399. I will address that issue in Flink versions 2.1 and 2.2(bp that bugfix).

@wuchong wuchong merged commit ee33fa2 into apache:main Sep 27, 2025
5 checks passed
leosanqing pushed a commit to leosanqing/fluss that referenced this pull request Sep 29, 2025
Ugbot pushed a commit to Ugbot/fluss that referenced this pull request Apr 26, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Support DeltaJoin on Flink 2.1

3 participants