[docs] Clarify the supported flink versions for delta join#2000
Conversation
|
|
||
| #### Limitations | ||
|
|
||
| - The primary key or the prefix lookup key of the tables must be included as part of the equivalence conditions in the join. |
There was a problem hiding this comment.
Align it with Limitations in Flink 2.1
| - The primary key or the prefix lookup key of the tables must be included as part of the equivalence conditions in the join. | ||
| - The primary key or the prefix key of the tables must be included as part of the equivalence conditions in the join. | ||
| - The join must be a INNER join. | ||
| - The downstream nodes of the join can accept duplicate changes, such as a sink that provides UPSERT mode. |
| - The primary key or the prefix key of the tables must be included as part of the equivalence conditions in the join. | ||
| - The join must be a INNER join. | ||
| - The downstream nodes of the join can accept duplicate changes, such as a sink that provides UPSERT mode. | ||
| - The downstream nodes of the join can accept duplicate changes, such as a sink that provides UPSERT mode without `upsertMaterialize`. |
There was a problem hiding this comment.
It's hard to understand what is a sink without "upsertMaterialize". Could you explain more about this?
There was a problem hiding this comment.
I try to explain a lot about what is upsertMaterialize, and what are upsert keys that related to upsertMaterialize.
There was a problem hiding this comment.
I found that there are no official documents from the Flink side explaining upsert materialization and upsert keys (the related issue https://issues.apache.org/jira/browse/FLINK-23350 is still open).
I found an article by Ververica at https://www.ververica.com/blog/flink-sql-secrets-mastering-the-art-of-changelog-events#Fix_the_Issue_with_SinkUpsertMaterializer. I'm not sure if I should include this link here.
| - The downstream nodes of the join can accept duplicate changes, such as a sink that provides UPSERT mode without `upsertMaterialize`. | ||
| - When the pk of the sink does not align with (or does not include) the upstream upsert key, the sink will produce a sink materialization (called `upsertMaterialize`). | ||
| - About upsert key and `upsertMaterialize`, more details can be found in this [blog](https://www.ververica.com/blog/flink-sql-secrets-mastering-the-art-of-changelog-events). |
There was a problem hiding this comment.
How about changing to this? Use SinkUpsertMaterializer instead of upsertMaterialize as it is the formal name of the node.
- The downstream node of the join must support idempotent updates, typically it's an upsert sink and should not have a `SinkUpsertMaterializer` node before it.
- Flink planner automatically inserts a `SinkUpsertMaterializer` when the sink’s primary key does not fully cover the upstream update key.
- This node can be disabled by setting `table.exec.sink.upsert-materialize` to `NONE`. You can learn more details about `SinkUpsertMaterializer` by reading this [blog](https://www.ververica.com/blog/flink-sql-secrets-mastering-the-art-of-changelog-events).
There was a problem hiding this comment.
I have removed the hint This node can be disabled by setting table.exec.sink.upsert-materialize to NONE. and add a separate line to explain that "the pipeline must discard update-before messages".
|
@xuyangzhong could you open a pull request for |
(cherry picked from commit 4f00709)
|
Purpose
Linked issue: close #1986
Brief change log
Clarify the supported Flink versions for delta join.
Tests
I have manually verified it with:
Steps: (I choose to use a slightly modified query based on Nexmark Q20 to verify that the delta join can run normally.)
fluss-flink-2.1-0.8.0-incubating.jarand nexmark connector (we use nexmark connector to mock data, so we need to build https://github.com/nexmark/nexmark and get the nexmark connector jarnexmark-flink-0.3-SNAPSHOT.jarinnexmark-flink/target/) to${FLINK_HOME}/libnumberOfTaskSlots4 , and start fluss local cluster./bin/sql-client.sh -f prepare_table.sqlto prepare three tables used for two sources and one sink.3.1. for Flink 2.1.2, the DDLs are prepare_table_2.1.sql
3.2. for Flink 2.2.0, the DDLs are prepare_table_2.2.sql
./bin/sql-client.sh -f run_delta_join.sqlrun_delta_join.sql./bin/sql-client.sh -f insert_data.sqlinsert_data.sqlNow you can see the delta join job is running successfully and is processing data.

API and Format
None.
Documentation
None.