Skip to content

[Flink] Support Array type in Flink connector#2040

Merged
wuchong merged 3 commits into
apache:mainfrom
XuQianJin-Stars:feature/issue-1978-array-flink-connector
Dec 2, 2025
Merged

[Flink] Support Array type in Flink connector#2040
wuchong merged 3 commits into
apache:mainfrom
XuQianJin-Stars:feature/issue-1978-array-flink-connector

Conversation

@XuQianJin-Stars
Copy link
Copy Markdown
Contributor

@XuQianJin-Stars XuQianJin-Stars commented Nov 27, 2025

Purpose

Linked issue: close #1978

This PR adds comprehensive support for Array type in the Flink connector, enabling users to read and write array data between Flink and Fluss tables. This addresses a key limitation in the connector's type support system.

Brief change log

  • Core Array type support in common module:

    • Enhanced Arrow format readers and writers (ArrowReader, ArrowWriter, ArrowArrayWriter, ArrowFieldWriter) to handle array serialization and deserialization
    • Updated ArrowUtils to support Array type schema conversion
  • Flink connector integration:

    • Implemented bidirectional type conversion in FlussRowToFlinkRowConverter and FlinkRowToFlussRowConverter to support Array types
    • Added FlinkAsFlussArray wrapper class to convert Flink's ArrayData to Fluss's InternalArray
    • Updated FlinkAsFlussRow to support array field access
  • Comprehensive test coverage:

    • Added FlinkArrayTypeITCase as the base test class with comprehensive integration tests covering:
      • Arrays of primitive types (int, bigint, string, boolean, double)
      • Arrays with null elements
      • Nested arrays (array of arrays)
      • Arrays in primary key tables
      • Array operations and access
    • Created version-specific test implementations for Flink 1.18, 1.19, 1.20, and 2.1

Tests

Unit Tests:

  • Arrow array writer and reader tests for serialization/deserialization

Integration Tests:

  • Flink118ComplexTypeITCase: Array type support for Flink 1.18
  • Flink119ComplexTypeITCase: Array type support for Flink 1.19
  • Flink120ComplexTypeITCase: Array type support for Flink 1.20
  • Flink21ComplexTypeITCase: Array type support for Flink 2.1

Test scenarios include:

  • testArrayOfPrimitiveTypesInLogTable: Verifies arrays of int, bigint, string, boolean, and double types
  • testArrayWithNullElements: Ensures proper handling of null elements within arrays and null arrays
  • testNestedArrays: Tests multi-dimensional arrays (array of arrays)
  • testArrayInPrimaryKeyTable: Validates array support in tables with primary keys
  • testArrayWithAllDataTypes: Comprehensive test with all supported data types in arrays
  • testArrayAccessAndCardinality: Tests array operations and element access

API and Format

API Changes:

  • No breaking API changes
  • Array type now supported in Flink connector's type mapping

Storage Format:

  • Enhanced Arrow format to support Array type serialization
  • Backward compatible with existing storage formats

Documentation

Feature Introduction:

  • This change introduces Array type support as a new feature in the Flink connector
  • Users can now create tables with array columns and perform read/write operations through Flink SQL

Documentation Updates Needed:

  • Update website/docs/engine-flink/getting-started.md to reflect Array type support in the type mapping table (change from "Not supported" to "Supported")
  • Add examples of using Array types in Flink SQL with Fluss tables

Comment on lines +217 to +220
+ "bigint_array array<bigint>, "
+ "double_array array<double>, "
+ "boolean_array array<boolean>, "
+ "string_array array<string>"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are the types here exhaustive? E.g. float, smallint, tinyint, timestamp etc. isn't tested.

https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/types/
https://fluss.apache.org/docs/next/table-design/data-types/

@leekeiabstraction
Copy link
Copy Markdown
Contributor

Integ tests are failing with stack traces that seem related to Arrow*Writer changes

Error:  org.apache.fluss.server.kv.KvTabletTest.testPartialUpdateAndDelete  Time elapsed: 0.043 s  <<< ERROR!
java.lang.IndexOutOfBoundsException: index: 0, length: 1 (expected: range(0, 0))
	at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.ArrowBuf.checkIndexD(ArrowBuf.java:319)
	at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.ArrowBuf.chk(ArrowBuf.java:306)
	at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.ArrowBuf.getByte(ArrowBuf.java:508)
	at org.apache.fluss.shaded.arrow.org.apache.arrow.vector.BitVectorHelper.setBit(BitVectorHelper.java:82)
	at org.apache.fluss.shaded.arrow.org.apache.arrow.vector.IntVector.set(IntVector.java:160)
	at org.apache.fluss.row.arrow.writers.ArrowIntWriter.doWrite(ArrowIntWriter.java:38)
	at org.apache.fluss.row.arrow.writers.ArrowFieldWriter.write(ArrowFieldWriter.java:59)
	at org.apache.fluss.row.arrow.ArrowWriter.writeRow(ArrowWriter.java:201)
	```

@XuQianJin-Stars
Copy link
Copy Markdown
Contributor Author

hi @leekeiabstraction Don't review this PR before the CI passes – I will continue to make revisions.

@XuQianJin-Stars XuQianJin-Stars force-pushed the feature/issue-1978-array-flink-connector branch 2 times, most recently from 3be6ba1 to 1e40482 Compare November 29, 2025 12:08
import static org.assertj.core.api.Assertions.assertThat;

/** Integration tests for Array type support in Flink connector. */
abstract class FlinkArrayTypeITCase extends AbstractTestBase {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IT case is very heavy, we should avoid add too many tests just for one purpose. I suggest to do the following updates:

  1. Rename FlinkArrayTypeITCase to FlinkComplexTypeITCase to cover future Map and Row types.
  2. Add a test for LOG TABLE that covers all array types (ARRAY, ARRAY, several ARRAY nested types). Writing reocords (+ null elements if the element type is nullable) into the table, and read from the table.
  3. Add a test for PK TABLE that is the same with above types. Also writing and reading from the table. But we should test updating and deleting as well. Take org.apache.fluss.flink.source.FlinkTableSourceITCase#testReadKvTableWithScanStartupModeEqualsFull as an example about how to verify reading kv table for both snapshot read and incremental read. Then test lookup join the table.
  4. Add exception test that Array type can't be as primary key, or bucket key, or partition key. Add tests to verify this.

}

@Test
void testSimpleLogTableWithSinkAPI() throws Exception {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is not about array type and is not needed.

reuseWriter.complete();

return reuseArray;
return reuseArray.copy();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should avoid copy(), this introduces performance regression.


CompactedRow row = new CompactedRow(fieldDataTypes.length, compactedRowDeserializer);
row.pointTo(writer.segment(), 0, writer.position());
row.pointTo(org.apache.fluss.memory.MemorySegment.wrap(rowBytes), 0, rowSize);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should avoid deep copy, this introduces performance regression.


CompactedRow compactedRow = new CompactedRow(fieldDataTypes.length, deserializer);
compactedRow.pointTo(segment, offset, sizeInBytes);
compactedRow.pointTo(MemorySegment.wrap(rowBytes), 0, sizeInBytes);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should avoid deep copy, this introduces performance regression.

Comment on lines +227 to +229
context instanceof LogRecordReadContext
? (LogRecordReadContext) context
: null);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This force casting is hack and error-prone in the future. Because this may break if we introduce another ReadContext implementation, and here will use null.

this.bufferAllocator = bufferAllocator;
this.selectedFieldGetters = selectedFieldGetters;
this.projectionPushDowned = projectionPushDowned;
this.batchRoots = Collections.synchronizedList(new ArrayList<>());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

synchronizedList is performant bad, please avoid to use this.

* Fixes writerIndex for all buffers in all vectors after VectorLoader.load().
* VectorLoader.load() sets the capacity but not the writerIndex for buffers.
*/
private static void fixVectorBuffers(VectorSchemaRoot schemaRoot) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this for array type? Why we don't need this before?

public void open(InitializationContext context) throws Exception {
this.converter = new FlinkAsFlussRow();
// For primary key tables (non-append-only), we need to encode the row immediately
// to avoid issues with Flink reusing RowData objects
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this, if it's for primary key tables, then UpsertWriter will materialize/encode the row into binary format which already avoid reuse object problems.

@XuQianJin-Stars XuQianJin-Stars force-pushed the feature/issue-1978-array-flink-connector branch 2 times, most recently from 2449ade to 87eebed Compare December 2, 2025 05:49
wuchong pushed a commit to XuQianJin-Stars/fluss that referenced this pull request Dec 2, 2025
wuchong added a commit to XuQianJin-Stars/fluss that referenced this pull request Dec 2, 2025
wuchong added a commit to XuQianJin-Stars/fluss that referenced this pull request Dec 2, 2025
…fix Arrow IndexOutOfBoundsException (apache#2040)

This fixes exception:
Caused by: java.lang.IndexOutOfBoundsException: index: 0, length: 1 (expected: range(0, 0))
	at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.ArrowBuf.checkIndexD(ArrowBuf.java:319)
@wuchong wuchong force-pushed the feature/issue-1978-array-flink-connector branch from 87eebed to c993199 Compare December 2, 2025 06:17
Copy link
Copy Markdown
Member

@wuchong wuchong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I appended 2 commits to improve the implementation. Please take a look.

XuQianJin-Stars and others added 3 commits December 2, 2025 14:18
…fix Arrow IndexOutOfBoundsException (apache#2040)

This fixes exception:
Caused by: java.lang.IndexOutOfBoundsException: index: 0, length: 1 (expected: range(0, 0))
	at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.ArrowBuf.checkIndexD(ArrowBuf.java:319)
@wuchong wuchong force-pushed the feature/issue-1978-array-flink-connector branch from c993199 to 1e8123a Compare December 2, 2025 06:21
* VectorizedColumnBatch#getString(int, int)}. This can be removed once we supports object reuse
* for Arrow {@link ColumnarRow}, see {@code CompletedFetch#toScanRecord(LogRecord)}.
*/
static FieldGetter createDeepFieldGetter(DataType fieldType, int fieldPos) {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good job!

@wuchong wuchong merged commit f1a75ca into apache:main Dec 2, 2025
5 checks passed
wuchong added a commit that referenced this pull request Dec 2, 2025
zcoo pushed a commit to zcoo/fluss that referenced this pull request Dec 3, 2025
zcoo pushed a commit to zcoo/fluss that referenced this pull request Dec 4, 2025
zcoo pushed a commit to zcoo/fluss that referenced this pull request Dec 4, 2025
Ugbot pushed a commit to Ugbot/fluss that referenced this pull request Apr 26, 2026
Ugbot pushed a commit to Ugbot/fluss that referenced this pull request Apr 26, 2026
Ugbot pushed a commit to Ugbot/fluss that referenced this pull request Apr 26, 2026
…fix Arrow IndexOutOfBoundsException (apache#2040)

This fixes exception:
Caused by: java.lang.IndexOutOfBoundsException: index: 0, length: 1 (expected: range(0, 0))
	at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.ArrowBuf.checkIndexD(ArrowBuf.java:319)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Support Array type in Flink connector

3 participants