diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java index ffd99a30bdec..3d2b29d17c29 100644 --- a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java @@ -108,6 +108,22 @@ public class IcebergCommitCallback implements CommitCallback, TagCallback { private static final String PUFFIN_FORMAT = "puffin"; + // Snapshot summary metric keys + private static final String SNAPSHOT_SUMMARY_ADDED_DATA_FILES = "added-data-files"; + private static final String SNAPSHOT_SUMMARY_ADDED_RECORDS = "added-records"; + private static final String SNAPSHOT_SUMMARY_ADDED_FILES_SIZE = "added-files-size"; + private static final String SNAPSHOT_SUMMARY_DELETED_DATA_FILES = "deleted-data-files"; + private static final String SNAPSHOT_SUMMARY_DELETED_RECORDS = "deleted-records"; + private static final String SNAPSHOT_SUMMARY_REMOVED_FILES_SIZE = "removed-files-size"; + private static final String SNAPSHOT_SUMMARY_CHANGED_PARTITION_COUNT = + "changed-partition-count"; + private static final String SNAPSHOT_SUMMARY_TOTAL_RECORDS = "total-records"; + private static final String SNAPSHOT_SUMMARY_TOTAL_DATA_FILES = "total-data-files"; + private static final String SNAPSHOT_SUMMARY_TOTAL_FILES_SIZE = "total-files-size"; + private static final String SNAPSHOT_SUMMARY_TOTAL_DELETE_FILES = "total-delete-files"; + private static final String SNAPSHOT_SUMMARY_TOTAL_POSITION_DELETES = "total-position-deletes"; + private static final String SNAPSHOT_SUMMARY_TOTAL_EQUALITY_DELETES = "total-equality-deletes"; + private final FileStoreTable table; private final String commitUser; @@ -303,45 +319,86 @@ private void createMetadata( private void createMetadataWithoutBase(long snapshotId) throws IOException { SnapshotReader snapshotReader = table.newSnapshotReader().withSnapshot(snapshotId); + Snapshot paimonSnapshot = table.snapshotManager().snapshot(snapshotId); SchemaCache schemaCache = new SchemaCache(); List dataFileEntries = new ArrayList<>(); List dvFileEntries = new ArrayList<>(); + SummaryMetrics metrics = new SummaryMetrics(); + Set changedPartitions = new HashSet<>(); List filteredDataSplits = snapshotReader.read().dataSplits().stream() .filter(DataSplit::rawConvertible) .collect(Collectors.toList()); for (DataSplit dataSplit : filteredDataSplits) { + changedPartitions.add(dataSplit.partition()); dataSplitToManifestEntries( dataSplit, snapshotId, schemaCache, dataFileEntries, dvFileEntries); + + for (DataFileMeta paimonFileMeta : dataSplit.dataFiles()) { + metrics.addedDataFiles++; + metrics.addedRecords += paimonFileMeta.rowCount(); + metrics.addedFilesSize += paimonFileMeta.fileSize(); + } } - List manifestFileMetas = new ArrayList<>(); + List dataManifestFileMetas = new ArrayList<>(); if (!dataFileEntries.isEmpty()) { - manifestFileMetas.addAll( + dataManifestFileMetas.addAll( manifestFile.rollingWrite(dataFileEntries.iterator(), snapshotId)); } + + List dvManifestFileMetas = new ArrayList<>(); if (!dvFileEntries.isEmpty()) { - manifestFileMetas.addAll( + dvManifestFileMetas.addAll( manifestFile.rollingWrite( dvFileEntries.iterator(), snapshotId, IcebergManifestFileMeta.Content.DELETES)); } - String manifestListFileName = manifestList.writeWithoutRolling(manifestFileMetas); + List allManifestFileMetas = new ArrayList<>(); + allManifestFileMetas.addAll(dataManifestFileMetas); + allManifestFileMetas.addAll(dvManifestFileMetas); + + metrics.changedPartitionCount = changedPartitions.size(); + metrics.totalDataFiles = metrics.addedDataFiles; + metrics.deletedDataFiles = 0; + metrics.deletedRecords = 0; + metrics.deletedFilesSize = 0; + metrics.totalRecords = + paimonSnapshot.totalRecordCount() == null + ? metrics.addedRecords + : paimonSnapshot.totalRecordCount(); + metrics.totalFilesSize = metrics.addedFilesSize; + long totalDeleteFiles = dvFileEntries.stream().filter(IcebergManifestEntry::isLive).count(); + long totalPositionDeleteRecords = + dvFileEntries.stream() + .filter(IcebergManifestEntry::isLive) + .mapToLong(entry -> entry.file().recordCount()) + .sum(); + metrics.totalDeleteFiles = totalDeleteFiles; + metrics.totalPositionDeletes = totalPositionDeleteRecords; + metrics.totalEqualityDeletes = 0; + + String manifestListFileName = manifestList.writeWithoutRolling(allManifestFileMetas); int schemaId = (int) schemaCache.getLatestSchemaId(); IcebergSchema icebergSchema = schemaCache.get(schemaId); List partitionFields = getPartitionFields(table.schema().partitionKeys(), icebergSchema); + + IcebergSnapshotSummary snapshotSummary = + computeSnapshotSummary( + IcebergSnapshotSummary.APPEND.operation(), paimonSnapshot, metrics); + IcebergSnapshot snapshot = new IcebergSnapshot( snapshotId, snapshotId, null, System.currentTimeMillis(), - IcebergSnapshotSummary.APPEND, + snapshotSummary, pathFactory.toManifestListPath(manifestListFileName).toString(), schemaId, null, @@ -525,12 +582,14 @@ private void createMetadataWithBase( .filter(meta -> meta.content() == IcebergManifestFileMeta.Content.DELETES) .collect(Collectors.toList()); - Map removedFiles = new LinkedHashMap<>(); + Map> removedFiles = new LinkedHashMap<>(); Map> addedFiles = new LinkedHashMap<>(); boolean isAddOnly = fileChangesCollector.collect(removedFiles, addedFiles); - Set modifiedPartitionsSet = new LinkedHashSet<>(removedFiles.values()); - modifiedPartitionsSet.addAll( - addedFiles.values().stream().map(Pair::getLeft).collect(Collectors.toList())); + Set modifiedPartitionsSet = + removedFiles.values().stream() + .map(Pair::getLeft) + .collect(Collectors.toCollection(LinkedHashSet::new)); + addedFiles.values().stream().map(Pair::getLeft).forEach(modifiedPartitionsSet::add); List modifiedPartitions = new ArrayList<>(modifiedPartitionsSet); // Note that this check may be different from `removedFiles.isEmpty()`, @@ -538,16 +597,16 @@ private void createMetadataWithBase( // In this case, if `baseMetadata` already contains this file, we should not add a // duplicate. List newDataManifestFileMetas; - IcebergSnapshotSummary snapshotSummary; + String operation; if (isAddOnly) { // Fast case. We don't need to remove files from `baseMetadata`. We only need to append // new metadata files. newDataManifestFileMetas = new ArrayList<>(baseDataManifestFileMetas); newDataManifestFileMetas.addAll( createNewlyAddedManifestFileMetas(addedFiles, snapshotId)); - snapshotSummary = IcebergSnapshotSummary.APPEND; + operation = IcebergSnapshotSummary.APPEND.operation(); } else { - Pair, IcebergSnapshotSummary> result = + Pair, String> result = createWithDeleteManifestFileMetas( removedFiles, addedFiles, @@ -555,7 +614,7 @@ private void createMetadataWithBase( baseDataManifestFileMetas, snapshotId); newDataManifestFileMetas = result.getLeft(); - snapshotSummary = result.getRight(); + operation = result.getRight(); } List newDVManifestFileMetas = new ArrayList<>(); @@ -579,6 +638,65 @@ private void createMetadataWithBase( newDVManifestFileMetas.stream()) .collect(Collectors.toList())); + SummaryMetrics metrics = new SummaryMetrics(); + metrics.addedDataFiles = addedFiles.size(); + metrics.addedRecords = + addedFiles.values().stream().mapToLong(p -> p.getRight().rowCount()).sum(); + metrics.addedFilesSize = + addedFiles.values().stream().mapToLong(p -> p.getRight().fileSize()).sum(); + metrics.deletedDataFiles = removedFiles.size(); + metrics.deletedRecords = + removedFiles.values().stream().mapToLong(p -> p.getRight().rowCount()).sum(); + metrics.deletedFilesSize = + removedFiles.values().stream().mapToLong(p -> p.getRight().fileSize()).sum(); + metrics.changedPartitionCount = modifiedPartitionsSet.size(); + + IcebergSnapshot baseSnapshot = baseMetadata.currentSnapshot(); + Long snapshotTotalRecords = snapshot.totalRecordCount(); + Long previousTotalRecordsValue = + getSummaryLong(baseSnapshot, SNAPSHOT_SUMMARY_TOTAL_RECORDS); + long previousTotalRecords = + previousTotalRecordsValue != null + ? previousTotalRecordsValue + : computeLiveRowCount(baseDataManifestFileMetas); + if (snapshotTotalRecords != null) { + metrics.totalRecords = snapshotTotalRecords; + } else { + metrics.totalRecords = + Math.max( + 0, + previousTotalRecords + metrics.addedRecords - metrics.deletedRecords); + } + + Long previousTotalDataFilesValue = + getSummaryLong(baseSnapshot, SNAPSHOT_SUMMARY_TOTAL_DATA_FILES); + long previousTotalDataFiles = + previousTotalDataFilesValue != null + ? previousTotalDataFilesValue + : computeLiveDataFileCount(baseDataManifestFileMetas); + metrics.totalDataFiles = + Math.max( + 0, + previousTotalDataFiles + metrics.addedDataFiles - metrics.deletedDataFiles); + + Long previousTotalFilesSizeValue = + getSummaryLong(baseSnapshot, SNAPSHOT_SUMMARY_TOTAL_FILES_SIZE); + long previousTotalFilesSize = + previousTotalFilesSizeValue != null + ? previousTotalFilesSizeValue + : computeTotalFilesSizeFromManifests(baseDataManifestFileMetas); + metrics.totalFilesSize = + Math.max( + 0, + previousTotalFilesSize + metrics.addedFilesSize - metrics.deletedFilesSize); + + metrics.totalDeleteFiles = computeLiveDeleteFileCount(newDVManifestFileMetas); + metrics.totalPositionDeletes = computeLiveRowCount(newDVManifestFileMetas); + metrics.totalEqualityDeletes = 0; + + IcebergSnapshotSummary snapshotSummary = + computeSnapshotSummary(operation, snapshot, metrics); + // add new schemas if needed SchemaCache schemaCache = new SchemaCache(); int schemaId = (int) schemaCache.getLatestSchemaId(); @@ -680,14 +798,14 @@ private void createMetadataWithBase( private interface FileChangesCollector { boolean collect( - Map removedFiles, + Map> removedFiles, Map> addedFiles) throws IOException; } private boolean collectFileChanges( List manifestEntries, - Map removedFiles, + Map> removedFiles, Map> addedFiles) { boolean isAddOnly = true; DataFilePathFactories factories = new DataFilePathFactories(fileStorePathFactory); @@ -705,7 +823,7 @@ private boolean collectFileChanges( case DELETE: isAddOnly = false; addedFiles.remove(path); - removedFiles.put(path, entry.partition()); + removedFiles.put(path, Pair.of(entry.partition(), entry.file())); break; default: throw new UnsupportedOperationException( @@ -717,7 +835,7 @@ private boolean collectFileChanges( private boolean collectFileChanges( long snapshotId, - Map removedFiles, + Map> removedFiles, Map> addedFiles) { return collectFileChanges( table.store() @@ -777,15 +895,14 @@ private List createNewlyAddedManifestFileMetas( currentSnapshotId); } - private Pair, IcebergSnapshotSummary> - createWithDeleteManifestFileMetas( - Map removedFiles, - Map> addedFiles, - List modifiedPartitions, - List baseManifestFileMetas, - long currentSnapshotId) - throws IOException { - IcebergSnapshotSummary snapshotSummary = IcebergSnapshotSummary.APPEND; + private Pair, String> createWithDeleteManifestFileMetas( + Map> removedFiles, + Map> addedFiles, + List modifiedPartitions, + List baseManifestFileMetas, + long currentSnapshotId) + throws IOException { + String operation = IcebergSnapshotSummary.APPEND.operation(); List newManifestFileMetas = new ArrayList<>(); RowType partitionType = table.schema().logicalPartitionType(); @@ -838,7 +955,7 @@ private List createNewlyAddedManifestFileMetas( newManifestFileMetas.add(fileMeta); } else { // some file is removed, rewrite this file meta - snapshotSummary = IcebergSnapshotSummary.OVERWRITE; + operation = IcebergSnapshotSummary.OVERWRITE.operation(); List newEntries = new ArrayList<>(); for (IcebergManifestEntry entry : entries) { if (entry.isLive()) { @@ -865,7 +982,7 @@ private List createNewlyAddedManifestFileMetas( newManifestFileMetas.addAll( createNewlyAddedManifestFileMetas(addedFiles, currentSnapshotId)); - return Pair.of(newManifestFileMetas, snapshotSummary); + return Pair.of(newManifestFileMetas, operation); } // ------------------------------------------------------------------------------------- @@ -1230,6 +1347,137 @@ private List createDvManifestFileMetas(Snapshot snapsho icebergDvEntries.iterator(), snapshotId, IcebergManifestFileMeta.Content.DELETES); } + // ------------------------------------------------------------------------------------- + // Snapshot Summary Computation + // ------------------------------------------------------------------------------------- + + private static class SummaryMetrics { + long addedDataFiles; + long addedRecords; + long addedFilesSize; + long deletedDataFiles; + long deletedRecords; + long deletedFilesSize; + long changedPartitionCount; + long totalDataFiles; + long totalRecords; + long totalFilesSize; + long totalDeleteFiles; + long totalPositionDeletes; + long totalEqualityDeletes; + } + + private IcebergSnapshotSummary computeSnapshotSummary( + String operation, Snapshot snapshot, SummaryMetrics metrics) { + + IcebergSnapshotSummary summary = new IcebergSnapshotSummary(operation); + + long addedDataFiles = Math.max(0, metrics.addedDataFiles); + long addedRecords = Math.max(0, metrics.addedRecords); + long addedFilesSize = Math.max(0, metrics.addedFilesSize); + long deletedDataFiles = Math.max(0, metrics.deletedDataFiles); + long deletedRecords = Math.max(0, metrics.deletedRecords); + long deletedFilesSize = Math.max(0, metrics.deletedFilesSize); + long changedPartitionCount = Math.max(0, metrics.changedPartitionCount); + long totalRecords = Math.max(0, metrics.totalRecords); + long totalDataFiles = Math.max(0, metrics.totalDataFiles); + long totalFilesSize = Math.max(0, metrics.totalFilesSize); + long totalDeleteFiles = Math.max(0, metrics.totalDeleteFiles); + long totalPositionDeletes = Math.max(0, metrics.totalPositionDeletes); + long totalEqualityDeletes = Math.max(0, metrics.totalEqualityDeletes); + + summary.put(SNAPSHOT_SUMMARY_ADDED_DATA_FILES, Long.toString(addedDataFiles)); + summary.put(SNAPSHOT_SUMMARY_ADDED_RECORDS, Long.toString(addedRecords)); + summary.put(SNAPSHOT_SUMMARY_ADDED_FILES_SIZE, Long.toString(addedFilesSize)); + summary.put(SNAPSHOT_SUMMARY_DELETED_DATA_FILES, Long.toString(deletedDataFiles)); + summary.put(SNAPSHOT_SUMMARY_DELETED_RECORDS, Long.toString(deletedRecords)); + summary.put(SNAPSHOT_SUMMARY_REMOVED_FILES_SIZE, Long.toString(deletedFilesSize)); + summary.put(SNAPSHOT_SUMMARY_CHANGED_PARTITION_COUNT, Long.toString(changedPartitionCount)); + summary.put(SNAPSHOT_SUMMARY_TOTAL_RECORDS, Long.toString(totalRecords)); + summary.put(SNAPSHOT_SUMMARY_TOTAL_DATA_FILES, Long.toString(totalDataFiles)); + summary.put(SNAPSHOT_SUMMARY_TOTAL_FILES_SIZE, Long.toString(totalFilesSize)); + summary.put(SNAPSHOT_SUMMARY_TOTAL_DELETE_FILES, Long.toString(totalDeleteFiles)); + summary.put(SNAPSHOT_SUMMARY_TOTAL_POSITION_DELETES, Long.toString(totalPositionDeletes)); + summary.put(SNAPSHOT_SUMMARY_TOTAL_EQUALITY_DELETES, Long.toString(totalEqualityDeletes)); + + Map properties = snapshot.properties(); + if (properties != null) { + properties.forEach( + (key, value) -> { + if (value != null) { + summary.put(key, value); + } + }); + } + + return summary; + } + + private long computeLiveDataFileCount(List manifestMetas) { + return manifestMetas.stream() + .mapToLong( + meta -> + meta.addedFilesCount() + + meta.existingFilesCount() + - meta.deletedFilesCount()) + .sum(); + } + + private long computeLiveRowCount(List manifestMetas) { + return manifestMetas.stream() + .mapToLong( + meta -> + meta.addedRowsCount() + + meta.existingRowsCount() + - meta.deletedRowsCount()) + .sum(); + } + + private long computeLiveDeleteFileCount(List manifestMetas) { + return manifestMetas.stream() + .mapToLong( + meta -> + meta.addedFilesCount() + + meta.existingFilesCount() + - meta.deletedFilesCount()) + .sum(); + } + + @Nullable + private Long getSummaryLong(@Nullable IcebergSnapshot snapshot, String key) { + if (snapshot == null) { + return null; + } + Map summaryMap = snapshot.summary().getSummary(); + String value = summaryMap.get(key); + if (value == null) { + return null; + } + try { + return Long.parseLong(value); + } catch (NumberFormatException e) { + LOG.warn( + "Unable to parse snapshot summary field {}={} as long. The value will be recomputed.", + key, + value); + return null; + } + } + + private long computeTotalFilesSizeFromManifests(List manifestMetas) + throws IOException { + long total = 0; + for (IcebergManifestFileMeta meta : manifestMetas) { + for (IcebergManifestEntry entry : + manifestFile.read(new Path(meta.manifestPath()).getName())) { + if (entry.isLive()) { + total += entry.file().fileSizeInBytes(); + } + } + } + return total; + } + // ------------------------------------------------------------------------------------- // Utils // ------------------------------------------------------------------------------------- diff --git a/paimon-iceberg/src/test/java/org/apache/paimon/iceberg/IcebergMetadataTest.java b/paimon-iceberg/src/test/java/org/apache/paimon/iceberg/IcebergMetadataTest.java index 91a1b5acf266..a6205dc107d5 100644 --- a/paimon-iceberg/src/test/java/org/apache/paimon/iceberg/IcebergMetadataTest.java +++ b/paimon-iceberg/src/test/java/org/apache/paimon/iceberg/IcebergMetadataTest.java @@ -512,8 +512,17 @@ void testReadMetadataWithSnapshotSummary() throws Exception { // Verify snapshot summary contains operation information assertThat(snapshot.summary().operation()).isEqualTo("append"); assertThat(snapshot.summary().getSummary().get("operation")).isEqualTo("append"); + + // Verify required fields for Redshift Spectrum compatibility assertThat(snapshot.summary().getSummary().get("total-records")).isEqualTo("10"); + assertThat(snapshot.summary().getSummary().get("total-data-files")).isEqualTo("1"); + assertThat(snapshot.summary().getSummary().get("total-delete-files")).isEqualTo("0"); + assertThat(snapshot.summary().getSummary().get("total-position-deletes")).isEqualTo("0"); + assertThat(snapshot.summary().getSummary().get("total-equality-deletes")).isEqualTo("0"); + + // Verify change fields assertThat(snapshot.summary().getSummary().get("added-data-files")).isEqualTo("1"); + assertThat(snapshot.summary().getSummary().get("added-records")).isEqualTo("10"); assertThat(snapshot.summary().getSummary().get("added-files-size")).isEqualTo("100"); }