Skip to content

Commit edf449e

Browse files
committed
Table: Implement replace() API for data compaction
Replaces the use of .overwrite() in MaintenanceTable.compact() with a new .replace() API backed by a _RewriteFiles producer. This ensures compaction now generates an Operation.REPLACE snapshot instead of Operation.OVERWRITE, preserving logical table state for downstream consumers. Fixes #1092
1 parent 93df231 commit edf449e

File tree

5 files changed

+180
-5
lines changed

5 files changed

+180
-5
lines changed

pyiceberg/table/__init__.py

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -614,6 +614,54 @@ def overwrite(
614614
for data_file in data_files:
615615
append_files.append_data_file(data_file)
616616

617+
def replace(
618+
self,
619+
df: pa.Table,
620+
files_to_delete: Iterable[DataFile],
621+
snapshot_properties: dict[str, str] = EMPTY_DICT,
622+
branch: str | None = MAIN_BRANCH,
623+
) -> None:
624+
"""
625+
Shorthand for replacing existing files.
626+
627+
A replace will produce a REPLACE snapshot that will ignore existing
628+
files and replace them with the new files.
629+
630+
Args:
631+
df: The Arrow dataframe that will be used to generate the new data files
632+
files_to_delete: The files to delete
633+
snapshot_properties: Custom properties to be added to the snapshot summary
634+
branch: Branch Reference to run the replace operation
635+
"""
636+
try:
637+
import pyarrow as pa
638+
except ModuleNotFoundError as e:
639+
raise ModuleNotFoundError("For writes PyArrow needs to be installed") from e
640+
641+
from pyiceberg.io.pyarrow import _check_pyarrow_schema_compatible, _dataframe_to_data_files
642+
643+
if not isinstance(df, pa.Table):
644+
raise ValueError(f"Expected PyArrow table, got: {df}")
645+
646+
downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
647+
_check_pyarrow_schema_compatible(
648+
self.table_metadata.schema(),
649+
provided_schema=df.schema,
650+
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us,
651+
format_version=self.table_metadata.format_version,
652+
)
653+
654+
with self.update_snapshot(snapshot_properties=snapshot_properties, branch=branch).replace() as replace_snapshot:
655+
for file_to_delete in files_to_delete:
656+
replace_snapshot.delete_data_file(file_to_delete)
657+
658+
if df.shape[0] > 0:
659+
data_files = _dataframe_to_data_files(
660+
table_metadata=self.table_metadata, write_uuid=replace_snapshot.commit_uuid, df=df, io=self._table.io
661+
)
662+
for data_file in data_files:
663+
replace_snapshot.append_data_file(data_file)
664+
617665
def delete(
618666
self,
619667
delete_filter: str | BooleanExpression,
@@ -1432,6 +1480,33 @@ def overwrite(
14321480
branch=branch,
14331481
)
14341482

1483+
def replace(
1484+
self,
1485+
df: pa.Table,
1486+
files_to_delete: Iterable[DataFile],
1487+
snapshot_properties: dict[str, str] = EMPTY_DICT,
1488+
branch: str | None = MAIN_BRANCH,
1489+
) -> None:
1490+
"""
1491+
Shorthand for replacing existing files.
1492+
1493+
A replace will produce a REPLACE snapshot that will ignore existing
1494+
files and replace them with the new files.
1495+
1496+
Args:
1497+
df: The Arrow dataframe that will be used to generate the new data files
1498+
files_to_delete: The files to delete
1499+
snapshot_properties: Custom properties to be added to the snapshot summary
1500+
branch: Branch Reference to run the replace operation
1501+
"""
1502+
with self.transaction() as tx:
1503+
tx.replace(
1504+
df=df,
1505+
files_to_delete=files_to_delete,
1506+
snapshot_properties=snapshot_properties,
1507+
branch=branch,
1508+
)
1509+
14351510
def delete(
14361511
self,
14371512
delete_filter: BooleanExpression | str = ALWAYS_TRUE,

pyiceberg/table/maintenance.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,11 @@ def compact(self) -> None:
6363
logger.info("Table contains no rows, skipping compaction.")
6464
return
6565

66-
# Overwrite the table atomically (REPLACE operation)
66+
# Replace existing files with new compacted files
6767
with self.tbl.transaction() as txn:
68-
txn.overwrite(arrow_table, snapshot_properties={"snapshot-type": "replace", "replace-operation": "compaction"})
68+
files_to_delete = [task.file for task in self.tbl.scan().plan_files()]
69+
txn.replace(
70+
df=arrow_table,
71+
files_to_delete=files_to_delete,
72+
snapshot_properties={"snapshot-type": "replace", "replace-operation": "compaction"},
73+
)

pyiceberg/table/snapshots.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -344,7 +344,7 @@ def _partition_summary(self, update_metrics: UpdateMetrics) -> str:
344344

345345

346346
def update_snapshot_summaries(summary: Summary, previous_summary: Mapping[str, str] | None = None) -> Summary:
347-
if summary.operation not in {Operation.APPEND, Operation.OVERWRITE, Operation.DELETE}:
347+
if summary.operation not in {Operation.APPEND, Operation.OVERWRITE, Operation.DELETE, Operation.REPLACE}:
348348
raise ValueError(f"Operation not implemented: {summary.operation}")
349349

350350
if not previous_summary:

pyiceberg/table/update/snapshot.py

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -667,6 +667,91 @@ def _get_entries(manifest: ManifestFile) -> list[ManifestEntry]:
667667
return []
668668

669669

670+
class _RewriteFiles(_SnapshotProducer["_RewriteFiles"]):
671+
"""Rewrites data in the table. This will produce a REPLACE snapshot.
672+
673+
Data files were logically rearranged, but no new logical records were
674+
added or removed (e.g. compaction).
675+
"""
676+
677+
def _existing_manifests(self) -> list[ManifestFile]:
678+
"""Determine if there are any existing manifest files."""
679+
existing_files = []
680+
681+
manifest_evaluators: dict[int, Callable[[ManifestFile], bool]] = KeyDefaultDict(self._build_manifest_evaluator)
682+
if snapshot := self._transaction.table_metadata.snapshot_by_name(name=self._target_branch):
683+
for manifest_file in snapshot.manifests(io=self._io):
684+
# Manifest does not contain rows that match the files to delete partitions
685+
if not manifest_evaluators[manifest_file.partition_spec_id](manifest_file):
686+
existing_files.append(manifest_file)
687+
continue
688+
689+
entries_to_write: set[ManifestEntry] = set()
690+
found_deleted_entries: set[ManifestEntry] = set()
691+
692+
for entry in manifest_file.fetch_manifest_entry(io=self._io, discard_deleted=True):
693+
if entry.data_file in self._deleted_data_files:
694+
found_deleted_entries.add(entry)
695+
else:
696+
entries_to_write.add(entry)
697+
698+
# Is the intercept the empty set?
699+
if len(found_deleted_entries) == 0:
700+
existing_files.append(manifest_file)
701+
continue
702+
703+
# Delete all files from manifest
704+
if len(entries_to_write) == 0:
705+
continue
706+
707+
# We have to rewrite the manifest file without the deleted data files
708+
with self.new_manifest_writer(self.spec(manifest_file.partition_spec_id)) as writer:
709+
for entry in entries_to_write:
710+
writer.add_entry(
711+
ManifestEntry.from_args(
712+
status=ManifestEntryStatus.EXISTING,
713+
snapshot_id=entry.snapshot_id,
714+
sequence_number=entry.sequence_number,
715+
file_sequence_number=entry.file_sequence_number,
716+
data_file=entry.data_file,
717+
)
718+
)
719+
existing_files.append(writer.to_manifest_file())
720+
721+
return existing_files
722+
723+
def _deleted_entries(self) -> list[ManifestEntry]:
724+
"""To determine if we need to record any deleted entries."""
725+
if self._parent_snapshot_id is not None:
726+
previous_snapshot = self._transaction.table_metadata.snapshot_by_id(self._parent_snapshot_id)
727+
if previous_snapshot is None:
728+
raise ValueError(f"Could not find the previous snapshot: {self._parent_snapshot_id}")
729+
730+
executor = ExecutorFactory.get_or_create()
731+
manifest_evaluators: dict[int, Callable[[ManifestFile], bool]] = KeyDefaultDict(self._build_manifest_evaluator)
732+
733+
def _get_entries(manifest: ManifestFile) -> list[ManifestEntry]:
734+
if not manifest_evaluators[manifest.partition_spec_id](manifest):
735+
return []
736+
737+
return [
738+
ManifestEntry.from_args(
739+
status=ManifestEntryStatus.DELETED,
740+
snapshot_id=entry.snapshot_id,
741+
sequence_number=entry.sequence_number,
742+
file_sequence_number=entry.file_sequence_number,
743+
data_file=entry.data_file,
744+
)
745+
for entry in manifest.fetch_manifest_entry(self._io, discard_deleted=True)
746+
if entry.data_file.content == DataFileContent.DATA and entry.data_file in self._deleted_data_files
747+
]
748+
749+
list_of_entries = executor.map(_get_entries, previous_snapshot.manifests(self._io))
750+
return list(itertools.chain(*list_of_entries))
751+
else:
752+
return []
753+
754+
670755
class UpdateSnapshot:
671756
_transaction: Transaction
672757
_io: FileIO
@@ -715,6 +800,16 @@ def overwrite(self, commit_uuid: uuid.UUID | None = None) -> _OverwriteFiles:
715800
snapshot_properties=self._snapshot_properties,
716801
)
717802

803+
def replace(self, commit_uuid: uuid.UUID | None = None) -> _RewriteFiles:
804+
return _RewriteFiles(
805+
commit_uuid=commit_uuid,
806+
operation=Operation.REPLACE,
807+
transaction=self._transaction,
808+
io=self._io,
809+
branch=self._branch,
810+
snapshot_properties=self._snapshot_properties,
811+
)
812+
718813
def delete(self) -> _DeleteFiles:
719814
return _DeleteFiles(
720815
operation=Operation.DELETE,

tests/table/test_snapshots.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -398,8 +398,8 @@ def test_merge_snapshot_summaries_overwrite_summary() -> None:
398398

399399
def test_invalid_operation() -> None:
400400
with pytest.raises(ValueError) as e:
401-
update_snapshot_summaries(summary=Summary(Operation.REPLACE))
402-
assert "Operation not implemented: Operation.REPLACE" in str(e.value)
401+
update_snapshot_summaries(summary=Summary(Operation("invalid")))
402+
assert "'invalid' is not a valid Operation" in str(e.value)
403403

404404

405405
def test_invalid_type() -> None:

0 commit comments

Comments
 (0)