Skip to content

Issue when evolving and writing in the same transaction #980

@Fokko

Description

@Fokko

Feature Request / Improvement

from pyiceberg.types import StringType

try:
    cat.drop_table('default.test3')
except:
    pass

tbl = cat.create_table(
    'default.test3',
    schema=Schema(
        NestedField(1, 'idx', LongType()),
        NestedField(2, 'ts', TimestampType())
    )
)

arrow_table = pa.Table.from_arrays([
    pa.array([2, 3, 4, 5, 6]),
    pa.array([
        datetime(2021, 5, 19), 
        datetime(2022, 7, 25),
        datetime(2023, 3, 22),
        datetime(2024, 7, 17),
        datetime(2025, 2, 22)
    ]),
    pa.array(["a", "b", "c", "d", "e"])
], names=['id', 'ts', "char"])

with tbl.transaction() as tx:
    with tx.update_schema() as schema:
        schema.union_by_name(
            Schema(
                NestedField(1, 'id', LongType()),
                NestedField(2, 'ts', TimestampType()),
                NestedField(2, 'char', StringType())
            )
        )
    tx.append(arrow_table)
---------------------------------------------------------------------------
KeyError                                  Traceback (most recent call last)
File /usr/local/lib/python3.10/site-packages/pyiceberg/table/name_mapping.py:80, in NameMapping.find(self, *names)
     79 try:
---> 80     return self._field_by_name[name]
     81 except KeyError as e:

KeyError: 'id'

The above exception was the direct cause of the following exception:

ValueError                                Traceback (most recent call last)
File /usr/local/lib/python3.10/site-packages/pyiceberg/io/pyarrow.py:2229, in _check_pyarrow_schema_compatible(requested_schema, provided_schema, downcast_ns_timestamp_to_us)
   2228 try:
-> 2229     provided_schema = pyarrow_to_schema(
   2230         provided_schema, name_mapping=name_mapping, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us
   2231     )
   2232 except ValueError as e:

File /usr/local/lib/python3.10/site-packages/pyiceberg/io/pyarrow.py:828, in pyarrow_to_schema(schema, name_mapping, downcast_ns_timestamp_to_us)
    825     raise ValueError(
    826         "Parquet file does not have field-ids and the Iceberg table does not have 'schema.name-mapping.default' defined"
    827     )
--> 828 return visit_pyarrow(schema, visitor)

File /usr/local/lib/python3.10/functools.py:889, in singledispatch.<locals>.wrapper(*args, **kw)
    886     raise TypeError(f'{funcname} requires at least '
    887                     '1 positional argument')
--> 889 return dispatch(args[0].__class__)(*args, **kw)

File /usr/local/lib/python3.10/site-packages/pyiceberg/io/pyarrow.py:857, in _(obj, visitor)
    855 @visit_pyarrow.register(pa.Schema)
    856 def _(obj: pa.Schema, visitor: PyArrowSchemaVisitor[T]) -> T:
--> 857     return visitor.schema(obj, visit_pyarrow(pa.struct(obj), visitor))

File /usr/local/lib/python3.10/functools.py:889, in singledispatch.<locals>.wrapper(*args, **kw)
    886     raise TypeError(f'{funcname} requires at least '
    887                     '1 positional argument')
--> 889 return dispatch(args[0].__class__)(*args, **kw)

File /usr/local/lib/python3.10/site-packages/pyiceberg/io/pyarrow.py:867, in _(obj, visitor)
    866 result = visit_pyarrow(field.type, visitor)
--> 867 results.append(visitor.field(field, result))
    868 visitor.after_field(field)

File /usr/local/lib/python3.10/site-packages/pyiceberg/io/pyarrow.py:1024, in _ConvertToIceberg.field(self, field, field_result)
   1023 def field(self, field: pa.Field, field_result: IcebergType) -> NestedField:
-> 1024     field_id = self._field_id(field)
   1025     field_doc = doc_str.decode() if (field.metadata and (doc_str := field.metadata.get(PYARROW_FIELD_DOC_KEY))) else None

File /usr/local/lib/python3.10/site-packages/pyiceberg/io/pyarrow.py:1011, in _ConvertToIceberg._field_id(self, field)
   1010 if self._name_mapping:
-> 1011     return self._name_mapping.find(*self._field_names).field_id
   1012 elif (field_id := _get_field_id(field)) is not None:

File /usr/local/lib/python3.10/site-packages/pyiceberg/table/name_mapping.py:82, in NameMapping.find(self, *names)
     81 except KeyError as e:
---> 82     raise ValueError(f"Could not find field with name: {name}") from e

ValueError: Could not find field with name: id

The above exception was the direct cause of the following exception:

ValueError                                Traceback (most recent call last)
Cell In[26], line 38
     30 with tx.update_schema() as schema:
     31     schema.union_by_name(
     32         Schema(
     33             NestedField(1, 'id', LongType()),
   (...)
     36         )
     37     )
---> 38 tx.append(arrow_table)

File /usr/local/lib/python3.10/site-packages/pyiceberg/table/__init__.py:491, in Transaction.append(self, df, snapshot_properties)
    487     raise ValueError(
    488         f"Not all partition types are supported for writes. Following partitions cannot be written using pyarrow: {unsupported_partitions}."
    489     )
    490 downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
--> 491 _check_pyarrow_schema_compatible(
    492     self._table.schema(), provided_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us
    493 )
    495 manifest_merge_enabled = PropertyUtil.property_as_bool(
    496     self.table_metadata.properties,
    497     TableProperties.MANIFEST_MERGE_ENABLED,
    498     TableProperties.MANIFEST_MERGE_ENABLED_DEFAULT,
    499 )
    500 update_snapshot = self.update_snapshot(snapshot_properties=snapshot_properties)

File /usr/local/lib/python3.10/site-packages/pyiceberg/io/pyarrow.py:2235, in _check_pyarrow_schema_compatible(requested_schema, provided_schema, downcast_ns_timestamp_to_us)
   2233     provided_schema = _pyarrow_to_schema_without_ids(provided_schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us)
   2234     additional_names = set(provided_schema._name_to_id.keys()) - set(requested_schema._name_to_id.keys())
-> 2235     raise ValueError(
   2236         f"PyArrow table contains more columns: {', '.join(sorted(additional_names))}. Update the schema first (hint, use union_by_name)."
   2237     ) from e
   2239 _check_schema_compatible(requested_schema, provided_schema)

ValueError: PyArrow table contains more columns: char, id. Update the schema first (hint, use union_by_name).

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions