[flink] Support delta join on Flink 2.1#1726
Conversation
swuferhong
left a comment
There was a problem hiding this comment.
Thank you for your contribution, I've left a few minor comments. Additionally, shall we create an issue to write a document on Flink2.1's support for delta joins?
|
|
||
| @Test | ||
| void testDeltaJoin() throws Exception { | ||
| tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2); |
There was a problem hiding this comment.
Add a comment explain why we need to set this to 2.
| .column("r", DataTypes.TIMESTAMP_LTZ()) | ||
| .column("s", DataTypes.ROW(DataTypes.FIELD("a", DataTypes.INT()))) | ||
| .primaryKey("a") | ||
| .index("a"); |
There was a problem hiding this comment.
We should avoid duplicating test content, otherwise, it's hard to maintain the code. It seems the only difference with the base test is the index("a"), can we add a base method addDefaultIndexKey(SchemaBuilder) in the parent class with nothing to do in default implementation but overrirde in this class?
There was a problem hiding this comment.
Good idea! Done for it.
| .column("order_time", DataTypes.TIMESTAMP(3)) | ||
| .watermark("order_time", "`order_time` - INTERVAL '5' SECOND") | ||
| .primaryKey("user") | ||
| .index("user"); |
I have created a new one for the doc #1739 |
|
Hi @xuyangzhong , I saw you have marked the comments addressed. But it seems the new commits are missed to push to the repo? |
@wuchong Sorry, I forgot to commit it. Have done. |
ae53c5d to
954b49a
Compare
|
@xuyangzhong the test case is a primary key join, could you replace it with an index join? |
954b49a to
797e369
Compare
@wuchong Due to the presence of two sets of indexes (primary key and bucket key) in Fluss, a bug will be triggered: https://issues.apache.org/jira/browse/FLINK-38399. I will address that issue in Flink versions 2.1 and 2.2(bp that bugfix). |
Purpose
Linked issue: close #1143
Support delta join on Flink 2.1.
Brief change log
getTablemethod.Tests
API and Format
None.
Documentation
None.