Skip to content

Invalid partial update for PK table #2843

@zuston

Description

@zuston

Search before asking

  • I searched in the issues and found nothing similar.

Fluss version

0.9.0 (latest release)

Please describe the bug 🐞

According to the doc of https://fluss.apache.org/docs/next/apis/java-client/#partial-updates , I found the un-specified columns will also be updated.

class User {
    public String id;
    public Integer age;
    public LocalDateTime createdAt;
    public boolean active;

    public User(String id, Integer age, LocalDateTime createdAt, boolean active) {
        this.id = id;
        this.age = age;
        this.active = active;
        this.createdAt = createdAt;
    }

    public String getId() {
        return id;
    }

    public Integer getAge() {
        return age;
    }

    public LocalDateTime getCreatedAt() {
        return createdAt;
    }

    public boolean isActive() {
        return active;
    }

    @Override
    public String toString() {
        return "User{"
                + "id='"
                + id
                + '\''
                + ", age="
                + age
                + ", createdAt="
                + createdAt
                + ", active="
                + active
                + '}';
    }

    public static User from(InternalRow row) {
        String id = row.getString(0).toString();
        Integer age = row.getInt(1);
        LocalDateTime createdAt = row.getTimestampNtz(2, 1).toLocalDateTime();
        boolean active = row.getBoolean(3);
        return new User(id, age, createdAt, active);
    }

    public static GenericRow to(User user) {
        GenericRow row = new GenericRow(4);
        row.setField(0, BinaryString.fromString(user.getId()));
        row.setField(1, user.getAge());
        row.setField(2, TimestampNtz.fromLocalDateTime(user.getCreatedAt()));
        row.setField(3, user.isActive());
        return row;
    }



Admin admin = conn.getAdmin();

        Schema schema =
                Schema.newBuilder()
                        .column("id", DataTypes.STRING())
                        .column("age", DataTypes.INT())
                        .column("created_at", DataTypes.TIMESTAMP())
                        .column("is_active", DataTypes.BOOLEAN())
                        .primaryKey("id")
                        .build();
        TableDescriptor tableDescriptor =
                TableDescriptor.builder()
                        .schema(schema)
                        .build();

        TablePath tablePath = TablePath.of("fluss", "fluss_toolkit_partial_update_t1");
        admin.createTable(tablePath, tableDescriptor, true).get();

        Table table = conn.getTable(tablePath);

        User user = new User("1", 20, LocalDateTime.now(), false);
        GenericRow row = User.to(user);
        System.out.println("Upserting rows to the table");
        UpsertWriter writer = table.newUpsert().partialUpdate("id", "age").createWriter();
        writer.upsert(row);
        writer.flush();

        GenericRow rowKey = new GenericRow(1);
        rowKey.setField(0, BinaryString.fromString("1"));
        LookupResult lookup = table.newLookup().createLookuper().lookup(rowKey).get();
        lookup.getRowList().forEach(x -> System.out.println(User.from(x)));

        // 4. delete the table
        admin.dropTable(tablePath, true).get();

Solution

No response

Are you willing to submit a PR?

  • I'm willing to submit a PR!

Metadata

Metadata

Assignees

No one assigned

    Type

    No fields configured for Bug.

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions