[Flink] Support Array type in Flink connector#2040
Conversation
| + "bigint_array array<bigint>, " | ||
| + "double_array array<double>, " | ||
| + "boolean_array array<boolean>, " | ||
| + "string_array array<string>" |
There was a problem hiding this comment.
Are the types here exhaustive? E.g. float, smallint, tinyint, timestamp etc. isn't tested.
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/types/
https://fluss.apache.org/docs/next/table-design/data-types/
|
Integ tests are failing with stack traces that seem related to Arrow*Writer changes |
|
hi @leekeiabstraction Don't review this PR before the CI passes – I will continue to make revisions. |
3be6ba1 to
1e40482
Compare
| import static org.assertj.core.api.Assertions.assertThat; | ||
|
|
||
| /** Integration tests for Array type support in Flink connector. */ | ||
| abstract class FlinkArrayTypeITCase extends AbstractTestBase { |
There was a problem hiding this comment.
IT case is very heavy, we should avoid add too many tests just for one purpose. I suggest to do the following updates:
- Rename
FlinkArrayTypeITCasetoFlinkComplexTypeITCaseto cover future Map and Row types. - Add a test for LOG TABLE that covers all array types (ARRAY, ARRAY, several ARRAY nested types). Writing reocords (+ null elements if the element type is nullable) into the table, and read from the table.
- Add a test for PK TABLE that is the same with above types. Also writing and reading from the table. But we should test updating and deleting as well. Take
org.apache.fluss.flink.source.FlinkTableSourceITCase#testReadKvTableWithScanStartupModeEqualsFullas an example about how to verify reading kv table for both snapshot read and incremental read. Then test lookup join the table. - Add exception test that Array type can't be as primary key, or bucket key, or partition key. Add tests to verify this.
| } | ||
|
|
||
| @Test | ||
| void testSimpleLogTableWithSinkAPI() throws Exception { |
There was a problem hiding this comment.
This test is not about array type and is not needed.
| reuseWriter.complete(); | ||
|
|
||
| return reuseArray; | ||
| return reuseArray.copy(); |
There was a problem hiding this comment.
We should avoid copy(), this introduces performance regression.
|
|
||
| CompactedRow row = new CompactedRow(fieldDataTypes.length, compactedRowDeserializer); | ||
| row.pointTo(writer.segment(), 0, writer.position()); | ||
| row.pointTo(org.apache.fluss.memory.MemorySegment.wrap(rowBytes), 0, rowSize); |
There was a problem hiding this comment.
We should avoid deep copy, this introduces performance regression.
|
|
||
| CompactedRow compactedRow = new CompactedRow(fieldDataTypes.length, deserializer); | ||
| compactedRow.pointTo(segment, offset, sizeInBytes); | ||
| compactedRow.pointTo(MemorySegment.wrap(rowBytes), 0, sizeInBytes); |
There was a problem hiding this comment.
We should avoid deep copy, this introduces performance regression.
| context instanceof LogRecordReadContext | ||
| ? (LogRecordReadContext) context | ||
| : null); |
There was a problem hiding this comment.
This force casting is hack and error-prone in the future. Because this may break if we introduce another ReadContext implementation, and here will use null.
| this.bufferAllocator = bufferAllocator; | ||
| this.selectedFieldGetters = selectedFieldGetters; | ||
| this.projectionPushDowned = projectionPushDowned; | ||
| this.batchRoots = Collections.synchronizedList(new ArrayList<>()); |
There was a problem hiding this comment.
synchronizedList is performant bad, please avoid to use this.
| * Fixes writerIndex for all buffers in all vectors after VectorLoader.load(). | ||
| * VectorLoader.load() sets the capacity but not the writerIndex for buffers. | ||
| */ | ||
| private static void fixVectorBuffers(VectorSchemaRoot schemaRoot) { |
There was a problem hiding this comment.
Why do we need this for array type? Why we don't need this before?
| public void open(InitializationContext context) throws Exception { | ||
| this.converter = new FlinkAsFlussRow(); | ||
| // For primary key tables (non-append-only), we need to encode the row immediately | ||
| // to avoid issues with Flink reusing RowData objects |
There was a problem hiding this comment.
I don't understand this, if it's for primary key tables, then UpsertWriter will materialize/encode the row into binary format which already avoid reuse object problems.
2449ade to
87eebed
Compare
… type Flink IT cases (apache#2040)
…fix Arrow IndexOutOfBoundsException (apache#2040) This fixes exception: Caused by: java.lang.IndexOutOfBoundsException: index: 0, length: 1 (expected: range(0, 0)) at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.ArrowBuf.checkIndexD(ArrowBuf.java:319)
87eebed to
c993199
Compare
wuchong
left a comment
There was a problem hiding this comment.
I appended 2 commits to improve the implementation. Please take a look.
… type Flink IT cases (apache#2040)
…fix Arrow IndexOutOfBoundsException (apache#2040) This fixes exception: Caused by: java.lang.IndexOutOfBoundsException: index: 0, length: 1 (expected: range(0, 0)) at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.ArrowBuf.checkIndexD(ArrowBuf.java:319)
c993199 to
1e8123a
Compare
| * VectorizedColumnBatch#getString(int, int)}. This can be removed once we supports object reuse | ||
| * for Arrow {@link ColumnarRow}, see {@code CompletedFetch#toScanRecord(LogRecord)}. | ||
| */ | ||
| static FieldGetter createDeepFieldGetter(DataType fieldType, int fieldPos) { |
(cherry picked from commit fe7d0cb)
(cherry picked from commit fe7d0cb)
… type Flink IT cases (apache#2040)
…fix Arrow IndexOutOfBoundsException (apache#2040) This fixes exception: Caused by: java.lang.IndexOutOfBoundsException: index: 0, length: 1 (expected: range(0, 0)) at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.ArrowBuf.checkIndexD(ArrowBuf.java:319)
Purpose
Linked issue: close #1978
This PR adds comprehensive support for Array type in the Flink connector, enabling users to read and write array data between Flink and Fluss tables. This addresses a key limitation in the connector's type support system.
Brief change log
Core Array type support in common module:
ArrowReader,ArrowWriter,ArrowArrayWriter,ArrowFieldWriter) to handle array serialization and deserializationArrowUtilsto support Array type schema conversionFlink connector integration:
FlussRowToFlinkRowConverterandFlinkRowToFlussRowConverterto support Array typesFlinkAsFlussArraywrapper class to convert Flink'sArrayDatato Fluss'sInternalArrayFlinkAsFlussRowto support array field accessComprehensive test coverage:
FlinkArrayTypeITCaseas the base test class with comprehensive integration tests covering:Tests
Unit Tests:
Integration Tests:
Flink118ComplexTypeITCase: Array type support for Flink 1.18Flink119ComplexTypeITCase: Array type support for Flink 1.19Flink120ComplexTypeITCase: Array type support for Flink 1.20Flink21ComplexTypeITCase: Array type support for Flink 2.1Test scenarios include:
testArrayOfPrimitiveTypesInLogTable: Verifies arrays of int, bigint, string, boolean, and double typestestArrayWithNullElements: Ensures proper handling of null elements within arrays and null arraystestNestedArrays: Tests multi-dimensional arrays (array of arrays)testArrayInPrimaryKeyTable: Validates array support in tables with primary keystestArrayWithAllDataTypes: Comprehensive test with all supported data types in arraystestArrayAccessAndCardinality: Tests array operations and element accessAPI and Format
API Changes:
Storage Format:
Documentation
Feature Introduction:
Documentation Updates Needed:
website/docs/engine-flink/getting-started.mdto reflect Array type support in the type mapping table (change from "Not supported" to "Supported")