Skip to content

Issue when rewriting an unpartitioned table #979

@Fokko

Description

@Fokko

Feature Request / Improvement

arrow_table = pa.Table.from_arrays([
    pa.array([2, 3, 4, 5, 6]),
    pa.array([
        datetime(2021, 5, 19),
        datetime(2022, 7, 25),
        datetime(2023, 3, 22),
        datetime(2024, 7, 17),
        datetime(2025, 2, 22)
    ])
], names=['idx', 'ts'])

try:
    cat.drop_table('default.test2')
except:
    pass

tbl = cat.create_table(
    'default.test2',
    schema=Schema(
        NestedField(1, 'idx', LongType()),
        NestedField(2, 'ts', TimestampType())
    )
)

tbl.append(arrow_table)

with tbl.transaction() as tx:
    with tx.update_schema() as schema:
        schema.rename_column('idx', 'id')
    with tx.update_spec() as spec:
        spec.add_field('id', IdentityTransform())

tbl.delete('id == 4')

Raises:

---------------------------------------------------------------------------
IndexError                                Traceback (most recent call last)
Cell In[25], line 2
      1 # Should rewrite the original unpartitioned manifest
----> 2 tbl.delete('id == 4')

File /usr/local/lib/python3.10/site-packages/pyiceberg/table/__init__.py:1611, in Table.delete(self, delete_filter, snapshot_properties)
   1603 """
   1604 Shorthand for deleting rows from the table.
   1605 
   (...)
   1608     snapshot_properties: Custom properties to be added to the snapshot summary
   1609 """
   1610 with self.transaction() as tx:
-> 1611     tx.delete(delete_filter=delete_filter, snapshot_properties=snapshot_properties)

File /usr/local/lib/python3.10/site-packages/pyiceberg/table/__init__.py:593, in Transaction.delete(self, delete_filter, snapshot_properties)
    590 if isinstance(delete_filter, str):
    591     delete_filter = _parse_row_filter(delete_filter)
--> 593 with self.update_snapshot(snapshot_properties=snapshot_properties).delete() as delete_snapshot:
    594     delete_snapshot.delete_by_predicate(delete_filter)
    596 # Check if there are any files that require an actual rewrite of a data file

File /usr/local/lib/python3.10/site-packages/pyiceberg/table/__init__.py:2090, in UpdateTableMetadata.__exit__(self, _, value, traceback)
   2088 def __exit__(self, _: Any, value: Any, traceback: Any) -> None:
   2089     """Close and commit the change."""
-> 2090     self.commit()

File /usr/local/lib/python3.10/site-packages/pyiceberg/table/__init__.py:2086, in UpdateTableMetadata.commit(self)
   2085 def commit(self) -> None:
-> 2086     self._transaction._apply(*self._commit())

File /usr/local/lib/python3.10/site-packages/pyiceberg/table/__init__.py:3291, in DeleteFiles._commit(self)
   3288 def _commit(self) -> UpdatesAndRequirements:
   3289     # Only produce a commit when there is something to delete
   3290     if self.files_affected:
-> 3291         return super()._commit()
   3292     else:
   3293         return (), ()

File /usr/local/lib/python3.10/site-packages/pyiceberg/table/__init__.py:3197, in _SnapshotProducer._commit(self)
   3196 def _commit(self) -> UpdatesAndRequirements:
-> 3197     new_manifests = self._manifests()
   3198     next_sequence_number = self._transaction.table_metadata.next_sequence_number()
   3200     summary = self._summary(self.snapshot_properties)

File /usr/local/lib/python3.10/site-packages/pyiceberg/table/__init__.py:3157, in _SnapshotProducer._manifests(self)
   3154 delete_manifests = executor.submit(_write_delete_manifest)
   3155 existing_manifests = executor.submit(self._existing_manifests)
-> 3157 return self._process_manifests(added_manifests.result() + delete_manifests.result() + existing_manifests.result())

File /usr/local/lib/python3.10/concurrent/futures/_base.py:458, in Future.result(self, timeout)
    456     raise CancelledError()
    457 elif self._state == FINISHED:
--> 458     return self.__get_result()
    459 else:
    460     raise TimeoutError()

File /usr/local/lib/python3.10/concurrent/futures/_base.py:403, in Future.__get_result(self)
    401 if self._exception:
    402     try:
--> 403         raise self._exception
    404     finally:
    405         # Break a reference cycle with the exception in self._exception
    406         self = None

File /usr/local/lib/python3.10/concurrent/futures/thread.py:58, in _WorkItem.run(self)
     55     return
     57 try:
---> 58     result = self.fn(*self.args, **self.kwargs)
     59 except BaseException as exc:
     60     self.future.set_exception(exc)

File /usr/local/lib/python3.10/site-packages/pyiceberg/table/__init__.py:3146, in _SnapshotProducer._manifests.<locals>._write_delete_manifest()
   3138     with write_manifest(
   3139         format_version=self._transaction.table_metadata.format_version,
   3140         spec=self._transaction.table_metadata.spec(),
   (...)
   3143         snapshot_id=self._snapshot_id,
   3144     ) as writer:
   3145         for delete_entry in deleted_entries:
-> 3146             writer.add_entry(delete_entry)
   3147     return [writer.to_manifest_file()]
   3148 else:

File /usr/local/lib/python3.10/site-packages/pyiceberg/manifest.py:816, in ManifestWriter.add_entry(self, entry)
    809 if (
    810     (entry.status == ManifestEntryStatus.ADDED or entry.status == ManifestEntryStatus.EXISTING)
    811     and entry.sequence_number is not None
    812     and (self._min_sequence_number is None or entry.sequence_number < self._min_sequence_number)
    813 ):
    814     self._min_sequence_number = entry.sequence_number
--> 816 self._writer.write_block([self.prepare_entry(entry)])
    817 return self

File /usr/local/lib/python3.10/site-packages/pyiceberg/avro/file.py:281, in AvroOutputFile.write_block(self, objects)
    279 block_content_encoder = BinaryEncoder(output_stream=in_memory)
    280 for obj in objects:
--> 281     self.writer.write(block_content_encoder, obj)
    282 block_content = in_memory.getvalue()
    284 self.encoder.write_int(len(objects))

File /usr/local/lib/python3.10/site-packages/pyiceberg/avro/writer.py:174, in StructWriter.write(self, encoder, val)
    171 def write(self, encoder: BinaryEncoder, val: Record) -> None:
    172     for pos, writer in self.field_writers:
    173         # When pos is None, then it is a default value
--> 174         writer.write(encoder, val[pos] if pos is not None else None)

File /usr/local/lib/python3.10/site-packages/pyiceberg/avro/writer.py:174, in StructWriter.write(self, encoder, val)
    171 def write(self, encoder: BinaryEncoder, val: Record) -> None:
    172     for pos, writer in self.field_writers:
    173         # When pos is None, then it is a default value
--> 174         writer.write(encoder, val[pos] if pos is not None else None)

File /usr/local/lib/python3.10/site-packages/pyiceberg/avro/writer.py:174, in StructWriter.write(self, encoder, val)
    171 def write(self, encoder: BinaryEncoder, val: Record) -> None:
    172     for pos, writer in self.field_writers:
    173         # When pos is None, then it is a default value
--> 174         writer.write(encoder, val[pos] if pos is not None else None)

File /usr/local/lib/python3.10/site-packages/pyiceberg/typedef.py:188, in Record.__getitem__(self, pos)
    186 def __getitem__(self, pos: int) -> Any:
    187     """Fetch a value from a Record."""
--> 188     return self.__getattribute__(self._position_to_field_name[pos])

IndexError: tuple index out of range

We add the manifest as deleted, but try to write that using the latest partition-spec.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions