Skip to content

[docs] Clarify the supported flink versions for delta join#2000

Merged
wuchong merged 4 commits into
apache:mainfrom
xuyangzhong:dt_version
Nov 26, 2025
Merged

[docs] Clarify the supported flink versions for delta join#2000
wuchong merged 4 commits into
apache:mainfrom
xuyangzhong:dt_version

Conversation

@xuyangzhong
Copy link
Copy Markdown
Contributor

@xuyangzhong xuyangzhong commented Nov 19, 2025

Purpose

Linked issue: close #1986

Brief change log

Clarify the supported Flink versions for delta join.

Tests

I have manually verified it with:

  • Latest Flink 2.1 (can be regarded as 2.1.2) with Fluss 0.8
  • Latest Flink 2.2 (can be regarded as 2.2.0) with Fluss 0.8

Steps: (I choose to use a slightly modified query based on Nexmark Q20 to verify that the delta join can run normally.)

  1. build the flink with these latest specific branches manually (because currently they are not released), and move fluss connector fluss-flink-2.1-0.8.0-incubating.jar and nexmark connector (we use nexmark connector to mock data, so we need to build https://github.com/nexmark/nexmark and get the nexmark connector jar nexmark-flink-0.3-SNAPSHOT.jar in nexmark-flink/target/) to ${FLINK_HOME}/lib
  2. start flink standalone cluster with new configuration numberOfTaskSlots 4 , and start fluss local cluster
  3. use flink sql client to run ./bin/sql-client.sh -f prepare_table.sql to prepare three tables used for two sources and one sink.
    3.1. for Flink 2.1.2, the DDLs are prepare_table_2.1.sql
    3.2. for Flink 2.2.0, the DDLs are prepare_table_2.2.sql
  4. run the delta join job by flink sql client ./bin/sql-client.sh -f run_delta_join.sql run_delta_join.sql
  5. insert data to source tables by flink sql client ./bin/sql-client.sh -f insert_data.sql insert_data.sql

Now you can see the delta join job is running successfully and is processing data.
image

API and Format

None.

Documentation

None.


#### Limitations

- The primary key or the prefix lookup key of the tables must be included as part of the equivalence conditions in the join.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Align it with Limitations in Flink 2.1

- The primary key or the prefix lookup key of the tables must be included as part of the equivalence conditions in the join.
- The primary key or the prefix key of the tables must be included as part of the equivalence conditions in the join.
- The join must be a INNER join.
- The downstream nodes of the join can accept duplicate changes, such as a sink that provides UPSERT mode.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

- The primary key or the prefix key of the tables must be included as part of the equivalence conditions in the join.
- The join must be a INNER join.
- The downstream nodes of the join can accept duplicate changes, such as a sink that provides UPSERT mode.
- The downstream nodes of the join can accept duplicate changes, such as a sink that provides UPSERT mode without `upsertMaterialize`.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's hard to understand what is a sink without "upsertMaterialize". Could you explain more about this?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I try to explain a lot about what is upsertMaterialize, and what are upsert keys that related to upsertMaterialize.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found that there are no official documents from the Flink side explaining upsert materialization and upsert keys (the related issue https://issues.apache.org/jira/browse/FLINK-23350 is still open).

I found an article by Ververica at https://www.ververica.com/blog/flink-sql-secrets-mastering-the-art-of-changelog-events#Fix_the_Issue_with_SinkUpsertMaterializer. I'm not sure if I should include this link here.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's fine to link it

Comment thread website/docs/engine-flink/delta-joins.md Outdated
Comment thread website/docs/engine-flink/delta-joins.md Outdated
Comment on lines +158 to +163
- The downstream nodes of the join can accept duplicate changes, such as a sink that provides UPSERT mode without `upsertMaterialize`.
- When the pk of the sink does not align with (or does not include) the upstream upsert key, the sink will produce a sink materialization (called `upsertMaterialize`).
- About upsert key and `upsertMaterialize`, more details can be found in this [blog](https://www.ververica.com/blog/flink-sql-secrets-mastering-the-art-of-changelog-events).
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about changing to this? Use SinkUpsertMaterializer instead of upsertMaterialize as it is the formal name of the node.

- The downstream node of the join must support idempotent updates, typically it's an upsert sink and should not have a `SinkUpsertMaterializer` node before it.
  - Flink planner automatically inserts a `SinkUpsertMaterializer` when the sink’s primary key does not fully cover the upstream update key.
  - This node can be disabled by setting `table.exec.sink.upsert-materialize` to `NONE`. You can learn more details about `SinkUpsertMaterializer` by reading this [blog](https://www.ververica.com/blog/flink-sql-secrets-mastering-the-art-of-changelog-events).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have removed the hint This node can be disabled by setting table.exec.sink.upsert-materialize to NONE. and add a separate line to explain that "the pipeline must discard update-before messages".

@wuchong wuchong merged commit 4f00709 into apache:main Nov 26, 2025
2 checks passed
@wuchong
Copy link
Copy Markdown
Member

wuchong commented Nov 26, 2025

@xuyangzhong could you open a pull request for release-0.8 branch?

@xuyangzhong
Copy link
Copy Markdown
Contributor Author

@xuyangzhong could you open a pull request for release-0.8 branch?

@wuchong the pr BP to release-0.8 is #2030

Ugbot pushed a commit to Ugbot/fluss that referenced this pull request Apr 26, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

The docs for delta join is not work

2 participants