diff --git a/paimon-api/src/main/java/org/apache/paimon/options/CatalogOptions.java b/paimon-api/src/main/java/org/apache/paimon/options/CatalogOptions.java index 7a862e8fb213..72674bdd7bcb 100644 --- a/paimon-api/src/main/java/org/apache/paimon/options/CatalogOptions.java +++ b/paimon-api/src/main/java/org/apache/paimon/options/CatalogOptions.java @@ -136,6 +136,13 @@ public class CatalogOptions { .withDescription( "Controls the max number for snapshots per table in the catalog are cached."); + public static final ConfigOption CACHE_DV_MAX_NUM = + key("cache.deletion-vectors.max-bucket-num") + .intType() + .defaultValue(20000) + .withDescription( + "Controls the maximum number of bucket-level deletion vector meta that can be cached."); + public static final ConfigOption CASE_SENSITIVE = ConfigOptions.key("case-sensitive") .booleanType() diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java index 2e209fba45ea..2a5dbd7766e3 100644 --- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java @@ -59,6 +59,7 @@ import org.apache.paimon.tag.TagPreview; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.ChangelogManager; +import org.apache.paimon.utils.DVMetaCache; import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.IndexFilePathFactories; import org.apache.paimon.utils.InternalRowPartitionComputer; @@ -97,6 +98,7 @@ abstract class AbstractFileStore implements FileStore { @Nullable private SegmentsCache readManifestCache; @Nullable private Cache snapshotCache; + @Nullable private DVMetaCache dvMetaCache; protected AbstractFileStore( FileIO fileIO, @@ -216,7 +218,8 @@ public IndexManifestFile.Factory indexManifestFileFactory() { FileFormat.manifestFormat(options), options.manifestCompression(), pathFactory(), - readManifestCache); + readManifestCache, + dvMetaCache); } @Override @@ -227,6 +230,7 @@ public IndexFileHandler newIndexFileHandler() { indexManifestFileFactory().create(), new IndexFilePathFactories(pathFactory()), options.dvIndexFileTargetSize(), + this.dvMetaCache, options.deletionVectorBitmap64()); } @@ -488,4 +492,9 @@ public void setManifestCache(SegmentsCache manifestCache) { public void setSnapshotCache(Cache cache) { this.snapshotCache = cache; } + + @Override + public void setDVMetaCache(DVMetaCache dvMetaCache) { + this.dvMetaCache = dvMetaCache; + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/FileStore.java b/paimon-core/src/main/java/org/apache/paimon/FileStore.java index 929302673d3f..62bc65c744fb 100644 --- a/paimon-core/src/main/java/org/apache/paimon/FileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/FileStore.java @@ -40,6 +40,7 @@ import org.apache.paimon.tag.TagAutoManager; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.ChangelogManager; +import org.apache.paimon.utils.DVMetaCache; import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.SegmentsCache; import org.apache.paimon.utils.SnapshotManager; @@ -121,4 +122,6 @@ PartitionExpire newPartitionExpire( void setManifestCache(SegmentsCache manifestCache); void setSnapshotCache(Cache cache); + + void setDVMetaCache(DVMetaCache dvMetaCache); } diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java index b3c4c5928d07..2b76d08abfef 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java @@ -28,6 +28,7 @@ import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; import org.apache.paimon.table.system.SystemTableLoader; +import org.apache.paimon.utils.DVMetaCache; import org.apache.paimon.utils.SegmentsCache; import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache; @@ -43,6 +44,7 @@ import java.util.Map; import java.util.Optional; +import static org.apache.paimon.options.CatalogOptions.CACHE_DV_MAX_NUM; import static org.apache.paimon.options.CatalogOptions.CACHE_ENABLED; import static org.apache.paimon.options.CatalogOptions.CACHE_EXPIRE_AFTER_ACCESS; import static org.apache.paimon.options.CatalogOptions.CACHE_EXPIRE_AFTER_WRITE; @@ -66,9 +68,9 @@ public class CachingCatalog extends DelegateCatalog { protected Cache databaseCache; protected Cache tableCache; @Nullable protected final SegmentsCache manifestCache; - // partition cache will affect data latency @Nullable protected Cache> partitionCache; + @Nullable protected DVMetaCache dvMetaCache; public CachingCatalog(Catalog wrapped, Options options) { super(wrapped); @@ -97,6 +99,11 @@ public CachingCatalog(Catalog wrapped, Options options) { this.manifestCache = SegmentsCache.create(manifestMaxMemory, manifestCacheThreshold); this.cachedPartitionMaxNum = options.get(CACHE_PARTITION_MAX_NUM); + + int cacheDvMaxNum = options.get(CACHE_DV_MAX_NUM); + if (cacheDvMaxNum > 0) { + this.dvMetaCache = new DVMetaCache(cacheDvMaxNum); + } init(Ticker.systemTicker()); } @@ -266,6 +273,9 @@ private void putTableCache(Identifier identifier, Table table) { if (manifestCache != null) { storeTable.setManifestCache(manifestCache); } + if (dvMetaCache != null) { + storeTable.setDVMetaCache(dvMetaCache); + } } tableCache.put(identifier, table); diff --git a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java index 205db897eace..83a41161f31e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java +++ b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java @@ -19,6 +19,7 @@ package org.apache.paimon.index; import org.apache.paimon.Snapshot; +import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.deletionvectors.DeletionVector; import org.apache.paimon.deletionvectors.DeletionVectorsIndexFile; @@ -26,19 +27,27 @@ import org.apache.paimon.fs.Path; import org.apache.paimon.manifest.IndexManifestEntry; import org.apache.paimon.manifest.IndexManifestFile; +import org.apache.paimon.operation.metrics.CacheMetrics; import org.apache.paimon.options.MemorySize; +import org.apache.paimon.table.source.DeletionFile; +import org.apache.paimon.utils.DVMetaCache; import org.apache.paimon.utils.IndexFilePathFactories; import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.SnapshotManager; +import javax.annotation.Nullable; + import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.stream.Collectors; import static org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX; import static org.apache.paimon.index.HashIndexFile.HASH_INDEX; @@ -52,6 +61,8 @@ public class IndexFileHandler { private final IndexFilePathFactories pathFactories; private final MemorySize dvTargetFileSize; private final boolean dvBitmap64; + @Nullable private CacheMetrics cacheMetrics; + @Nullable private final DVMetaCache dvMetaCache; public IndexFileHandler( FileIO fileIO, @@ -59,6 +70,7 @@ public IndexFileHandler( IndexManifestFile indexManifestFile, IndexFilePathFactories pathFactories, MemorySize dvTargetFileSize, + DVMetaCache dvMetaCache, boolean dvBitmap64) { this.fileIO = fileIO; this.snapshotManager = snapshotManager; @@ -66,6 +78,7 @@ public IndexFileHandler( this.indexManifestFile = indexManifestFile; this.dvTargetFileSize = dvTargetFileSize; this.dvBitmap64 = dvBitmap64; + this.dvMetaCache = dvMetaCache; } public HashIndexFile hashIndex(BinaryRow partition, int bucket) { @@ -87,6 +100,140 @@ public Optional scanHashIndex( return result.isEmpty() ? Optional.empty() : Optional.of(result.get(0)); } + public void withCacheMetrics(@Nullable CacheMetrics cacheMetrics) { + this.cacheMetrics = cacheMetrics; + } + + // Construct DataFile -> DeletionFile based on IndexFileMeta + @Nullable + @VisibleForTesting + public Map extractDeletionFileByMeta( + BinaryRow partition, Integer bucket, IndexFileMeta fileMeta) { + LinkedHashMap dvRanges = fileMeta.dvRanges(); + String dvFilePath = dvIndex(partition, bucket).path(fileMeta).toString(); + if (dvRanges != null && !dvRanges.isEmpty()) { + Map result = new HashMap<>(); + for (DeletionVectorMeta dvMeta : dvRanges.values()) { + result.put( + dvMeta.dataFileName(), + new DeletionFile( + dvFilePath, + dvMeta.offset(), + dvMeta.length(), + dvMeta.cardinality())); + } + return result; + } + return null; + } + + // Scan DV index file of given partition buckets + // returns map grouped by partition and bucket + public Map, Map> scanDVIndex( + Snapshot snapshot, Set> partitionBuckets) { + if (snapshot == null || snapshot.indexManifest() == null) { + return Collections.emptyMap(); + } + Map, Map> result = new HashMap<>(); + // to avoid cache being frequently evicted, + // currently we only read from cache when bucket number is 1 + if (this.dvMetaCache != null && partitionBuckets.size() == 1) { + Pair partitionBucket = partitionBuckets.iterator().next(); + Map deletionFiles = + this.scanDVIndexWithCache( + snapshot, partitionBucket.getLeft(), partitionBucket.getRight()); + if (deletionFiles != null && deletionFiles.size() > 0) { + result.put(partitionBucket, deletionFiles); + } + return result; + } + Map, List> partitionFileMetas = + scan( + snapshot, + DELETION_VECTORS_INDEX, + partitionBuckets.stream().map(Pair::getLeft).collect(Collectors.toSet())); + partitionFileMetas.forEach( + (entry, indexFileMetas) -> { + if (partitionBuckets.contains(entry)) { + if (indexFileMetas != null) { + indexFileMetas.forEach( + indexFileMeta -> { + Map dvMetas = + extractDeletionFileByMeta( + entry.getLeft(), + entry.getRight(), + indexFileMeta); + if (dvMetas != null) { + result.computeIfAbsent(entry, k -> new HashMap<>()) + .putAll(dvMetas); + } + }); + } + } + }); + return result; + } + + // Scan DV Meta Cache first, if not exist, scan DV index file, returns the exact deletion file + // of the specified partition/bucket + @VisibleForTesting + Map scanDVIndexWithCache( + Snapshot snapshot, BinaryRow partition, Integer bucket) { + // read from cache + String indexManifestName = snapshot.indexManifest(); + Path indexManifestPath = this.indexManifestFile.indexManifestFilePath(indexManifestName); + Map result = + this.dvMetaCache.read(indexManifestPath, partition, bucket); + if (result != null) { + if (cacheMetrics != null) { + cacheMetrics.increaseHitObject(); + } + return result; + } + if (cacheMetrics != null) { + cacheMetrics.increaseMissedObject(); + } + // If miss, read the whole partition's deletion files + Map, List> partitionFileMetas = + scan( + snapshot, + DELETION_VECTORS_INDEX, + new HashSet<>(Collections.singletonList(partition))); + // for each bucket, extract deletion files, and fill meta cache + for (Map.Entry, List> entry : + partitionFileMetas.entrySet()) { + Pair partitionBucket = entry.getKey(); + List fileMetas = entry.getValue(); + if (entry.getValue() != null) { + Map bucketDeletionFiles = new HashMap<>(); + fileMetas.forEach( + meta -> { + Map bucketDVMetas = + extractDeletionFileByMeta( + partitionBucket.getLeft(), + partitionBucket.getRight(), + meta); + if (bucketDVMetas != null) { + bucketDeletionFiles.putAll(bucketDVMetas); + } + }); + // bucketDeletionFiles can be empty + this.dvMetaCache.put( + indexManifestPath, + partitionBucket.getLeft(), + partitionBucket.getRight(), + bucketDeletionFiles); + if (partitionBucket.getRight() != null + && partitionBucket.getLeft() != null + && partitionBucket.getRight().equals(bucket) + && partitionBucket.getLeft().equals(partition)) { + result = bucketDeletionFiles; + } + } + } + return result; + } + public List scan(String indexType) { return scan(snapshotManager.latestSnapshot(), indexType); } diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFile.java b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFile.java index 077d1a45aa29..21086eadf39f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFile.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFile.java @@ -25,6 +25,7 @@ import org.apache.paimon.fs.Path; import org.apache.paimon.table.BucketMode; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.DVMetaCache; import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.ObjectsFile; import org.apache.paimon.utils.PathFactory; @@ -39,6 +40,8 @@ /** Index manifest file. */ public class IndexManifestFile extends ObjectsFile { + private final DVMetaCache dvMetaCache; + private IndexManifestFile( FileIO fileIO, RowType schema, @@ -46,7 +49,8 @@ private IndexManifestFile( FormatWriterFactory writerFactory, String compression, PathFactory pathFactory, - @Nullable SegmentsCache cache) { + @Nullable SegmentsCache cache, + @Nullable DVMetaCache dvMetaCache) { super( fileIO, new IndexManifestEntrySerializer(), @@ -56,6 +60,11 @@ private IndexManifestFile( compression, pathFactory, cache); + this.dvMetaCache = dvMetaCache; + } + + public Path indexManifestFilePath(String fileName) { + return pathFactory.toPath(fileName); } /** Write new index files to index manifest. */ @@ -79,18 +88,21 @@ public static class Factory { private final String compression; private final FileStorePathFactory pathFactory; @Nullable private final SegmentsCache cache; + @Nullable private final DVMetaCache dvMetaCache; public Factory( FileIO fileIO, FileFormat fileFormat, String compression, FileStorePathFactory pathFactory, - @Nullable SegmentsCache cache) { + @Nullable SegmentsCache cache, + @Nullable DVMetaCache dvMetaCache) { this.fileIO = fileIO; this.fileFormat = fileFormat; this.compression = compression; this.pathFactory = pathFactory; this.cache = cache; + this.dvMetaCache = dvMetaCache; } public IndexManifestFile create() { @@ -102,7 +114,8 @@ public IndexManifestFile create() { fileFormat.createWriterFactory(schema), compression, pathFactory.indexManifestFileFactory(), - cache); + cache, + dvMetaCache); } } } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/ScanMetrics.java b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/ScanMetrics.java index 280f53a10162..10246907ef55 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/ScanMetrics.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/ScanMetrics.java @@ -35,10 +35,13 @@ public class ScanMetrics { public static final String LAST_SCAN_RESULTED_TABLE_FILES = "lastScanResultedTableFiles"; public static final String MANIFEST_HIT_CACHE = "manifestHitCache"; public static final String MANIFEST_MISSED_CACHE = "manifestMissedCache"; + public static final String DVMETA_HIT_CACHE = "dvMetaHitCache"; + public static final String DVMETA_MISSED_CACHE = "dvMetaMissedCache"; private final MetricGroup metricGroup; private final Histogram durationHistogram; private final CacheMetrics cacheMetrics; + private final CacheMetrics dvMetaCacheMetrics; private ScanStats latestScan; @@ -48,6 +51,7 @@ public ScanMetrics(MetricRegistry registry, String tableName) { LAST_SCAN_DURATION, () -> latestScan == null ? 0L : latestScan.getDuration()); durationHistogram = metricGroup.histogram(SCAN_DURATION, HISTOGRAM_WINDOW_SIZE); cacheMetrics = new CacheMetrics(); + dvMetaCacheMetrics = new CacheMetrics(); metricGroup.gauge( LAST_SCANNED_MANIFESTS, () -> latestScan == null ? 0L : latestScan.getScannedManifests()); @@ -59,6 +63,8 @@ public ScanMetrics(MetricRegistry registry, String tableName) { () -> latestScan == null ? 0L : latestScan.getResultedTableFiles()); metricGroup.gauge(MANIFEST_HIT_CACHE, () -> cacheMetrics.getHitObject().get()); metricGroup.gauge(MANIFEST_MISSED_CACHE, () -> cacheMetrics.getMissedObject().get()); + metricGroup.gauge(DVMETA_HIT_CACHE, () -> dvMetaCacheMetrics.getHitObject().get()); + metricGroup.gauge(DVMETA_MISSED_CACHE, () -> dvMetaCacheMetrics.getMissedObject().get()); } @VisibleForTesting @@ -74,4 +80,8 @@ public void reportScan(ScanStats scanStats) { public CacheMetrics getCacheMetrics() { return cacheMetrics; } + + public CacheMetrics getDvMetaCacheMetrics() { + return dvMetaCacheMetrics; + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java index 99069ddc3182..b683174cd4f8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java @@ -44,6 +44,7 @@ import org.apache.paimon.tag.TagAutoManager; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.ChangelogManager; +import org.apache.paimon.utils.DVMetaCache; import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.SegmentsCache; import org.apache.paimon.utils.SnapshotManager; @@ -232,4 +233,9 @@ public void setManifestCache(SegmentsCache manifestCache) { public void setSnapshotCache(Cache cache) { wrapped.setSnapshotCache(cache); } + + @Override + public void setDVMetaCache(DVMetaCache dvMetaCache) { + wrapped.setDVMetaCache(dvMetaCache); + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index aa5b8bb70d79..aeb223aaf577 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -55,6 +55,7 @@ import org.apache.paimon.utils.BranchManager; import org.apache.paimon.utils.CatalogBranchManager; import org.apache.paimon.utils.ChangelogManager; +import org.apache.paimon.utils.DVMetaCache; import org.apache.paimon.utils.FileSystemBranchManager; import org.apache.paimon.utils.Preconditions; import org.apache.paimon.utils.SegmentsCache; @@ -93,6 +94,7 @@ abstract class AbstractFileStoreTable implements FileStoreTable { @Nullable protected transient SegmentsCache manifestCache; @Nullable protected transient Cache snapshotCache; @Nullable protected transient Cache statsCache; + @Nullable protected transient DVMetaCache dvmetaCache; protected AbstractFileStoreTable( FileIO fileIO, @@ -138,6 +140,12 @@ public void setStatsCache(Cache cache) { this.statsCache = cache; } + @Override + public void setDVMetaCache(DVMetaCache cache) { + this.dvmetaCache = cache; + store().setDVMetaCache(cache); + } + @Override public Optional latestSnapshot() { Snapshot snapshot = store().snapshotManager().latestSnapshot(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java index e8898a1cd129..3a0d4ecc1cad 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java @@ -42,6 +42,7 @@ import org.apache.paimon.tag.TagAutoManager; import org.apache.paimon.utils.BranchManager; import org.apache.paimon.utils.ChangelogManager; +import org.apache.paimon.utils.DVMetaCache; import org.apache.paimon.utils.SegmentsCache; import org.apache.paimon.utils.SimpleFileReader; import org.apache.paimon.utils.SnapshotManager; @@ -154,6 +155,11 @@ public void setStatsCache(Cache cache) { wrapped.setStatsCache(cache); } + @Override + public void setDVMetaCache(DVMetaCache cache) { + wrapped.setDVMetaCache(cache); + } + @Override public TableSchema schema() { return wrapped.schema(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java index 0ed51cb8528a..b07465a25828 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java @@ -36,6 +36,7 @@ import org.apache.paimon.types.RowType; import org.apache.paimon.utils.BranchManager; import org.apache.paimon.utils.ChangelogManager; +import org.apache.paimon.utils.DVMetaCache; import org.apache.paimon.utils.SegmentsCache; import org.apache.paimon.utils.TagManager; @@ -63,6 +64,8 @@ public interface FileStoreTable extends DataTable { void setStatsCache(Cache cache); + void setDVMetaCache(DVMetaCache cache); + @Override default RowType rowType() { return schema().logicalRowType(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java index 81c015a9d47b..915773791573 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java @@ -286,7 +286,9 @@ public SnapshotReader withBucketFilter(Filter bucketFilter) { @Override public SnapshotReader withMetricRegistry(MetricRegistry registry) { - scan.withMetrics(new ScanMetrics(registry, tableName)); + ScanMetrics scanMetrics = new ScanMetrics(registry, tableName); + scan.withMetrics(scanMetrics); + indexFileHandler.withCacheMetrics(scanMetrics.getDvMetaCacheMetrics()); return this; } @@ -349,14 +351,18 @@ private List generateSplits( Map>> groupedManifestEntries) { List splits = new ArrayList<>(); // Read deletion indexes at once to reduce file IO - Map, List> deletionIndexFilesMap = null; + Map, Map> deletionFilesMap = null; if (!isStreaming) { - deletionIndexFilesMap = + Set> partitionBuckets = + groupedManifestEntries.entrySet().stream() + .flatMap( + e -> + e.getValue().keySet().stream() + .map(bucket -> Pair.of(e.getKey(), bucket))) + .collect(Collectors.toSet()); + deletionFilesMap = deletionVectors && snapshot != null - ? indexFileHandler.scan( - snapshot, - DELETION_VECTORS_INDEX, - groupedManifestEntries.keySet()) + ? indexFileHandler.scanDVIndex(snapshot, partitionBuckets) : Collections.emptyMap(); } for (Map.Entry>> entry : @@ -387,16 +393,14 @@ private List generateSplits( builder.withDataFiles(dataFiles) .rawConvertible(splitGroup.rawConvertible) .withBucketPath(bucketPath); - if (deletionVectors && deletionIndexFilesMap != null) { + if (deletionVectors && deletionFilesMap != null) { builder.withDataDeletionFiles( getDeletionFiles( - indexFileHandler.dvIndex(partition, bucket), dataFiles, - deletionIndexFilesMap.getOrDefault( + deletionFilesMap.getOrDefault( Pair.of(partition, bucket), - Collections.emptyList()))); + Collections.emptyMap()))); } - splits.add(builder.build()); } } @@ -588,4 +592,14 @@ private List getDeletionFiles( return deletionFiles; } + + public List getDeletionFiles( + List dataFiles, Map deletionFilesMap) { + List deletionFiles = new ArrayList<>(dataFiles.size()); + dataFiles.stream() + .map(DataFileMeta::fileName) + .map(f -> deletionFilesMap == null ? null : deletionFilesMap.get(f)) + .forEach(deletionFiles::add); + return deletionFiles; + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/DVMetaCache.java b/paimon-core/src/main/java/org/apache/paimon/utils/DVMetaCache.java new file mode 100644 index 000000000000..d06c56318771 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/utils/DVMetaCache.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.utils; + +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.fs.Path; +import org.apache.paimon.table.source.DeletionFile; + +import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache; +import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +/** Cache for deletion vector meta. */ +public class DVMetaCache { + private final Cache> cache; + + public DVMetaCache(long maxElementSize) { + this.cache = + Caffeine.newBuilder() + .maximumSize(maxElementSize) + .softValues() + .executor(Runnable::run) + .build(); + } + + @Nullable + public Map read(Path path, BinaryRow partition, int bucket) { + DVMetaCacheKey cacheKey = new DVMetaCacheKey(path, partition, bucket); + List cacheValue = this.cache.getIfPresent(cacheKey); + if (cacheValue == null) { + return null; + } + // If the bucket doesn't have dv metas, return empty set. + Map dvFilesMap = new HashMap<>(); + cacheValue.forEach( + dvMeta -> + dvFilesMap.put( + dvMeta.getDataFileName(), + new DeletionFile( + dvMeta.getDeletionFilePath(), + dvMeta.getOffset(), + dvMeta.getLength(), + dvMeta.getCardinality()))); + return dvFilesMap; + } + + public void put( + Path path, BinaryRow partition, int bucket, Map dvFilesMap) { + DVMetaCacheKey key = new DVMetaCacheKey(path, partition, bucket); + List cacheValue = new ArrayList<>(); + dvFilesMap.forEach( + (dataFileName, deletionFile) -> { + DVMetaCacheValue dvMetaCacheValue = + new DVMetaCacheValue( + dataFileName, + deletionFile.path(), + (int) deletionFile.offset(), + (int) deletionFile.length(), + deletionFile.cardinality()); + cacheValue.add(dvMetaCacheValue); + }); + this.cache.put(key, cacheValue); + } + + private static class DVMetaCacheValue { + private final String dataFileName; + private final String deletionFilePath; + private final int offset; + private final int length; + @Nullable private final Long cardinality; + + public DVMetaCacheValue( + String dataFileName, + String deletionFilePath, + int start, + int length, + @Nullable Long cardinality) { + this.dataFileName = dataFileName; + this.deletionFilePath = deletionFilePath; + this.offset = start; + this.length = length; + this.cardinality = cardinality; + } + + public String getDataFileName() { + return dataFileName; + } + + public String getDeletionFilePath() { + return deletionFilePath; + } + + public int getOffset() { + return offset; + } + + public int getLength() { + return length; + } + + @Nullable + public Long getCardinality() { + return cardinality; + } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) { + return false; + } + DVMetaCacheValue that = (DVMetaCacheValue) o; + return offset == that.offset + && length == that.length + && Objects.equals(dataFileName, that.dataFileName) + && Objects.equals(deletionFilePath, that.deletionFilePath) + && Objects.equals(cardinality, that.cardinality); + } + + @Override + public int hashCode() { + return Objects.hash(dataFileName, deletionFilePath, offset, length, cardinality); + } + } + + /** Cache key for deletion vector meta at bucket level. */ + private static final class DVMetaCacheKey { + private final Path path; + private final BinaryRow row; + private final int bucket; + + public DVMetaCacheKey(Path path, BinaryRow row, int bucket) { + this.path = path; + this.row = row; + this.bucket = bucket; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof DVMetaCacheKey)) { + return false; + } + DVMetaCacheKey that = (DVMetaCacheKey) o; + return bucket == that.bucket + && Objects.equals(path, that.path) + && Objects.equals(row, that.row); + } + + @Override + public int hashCode() { + return Objects.hash(path, row, bucket); + } + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/manifest/IndexManifestFileHandlerTest.java b/paimon-core/src/test/java/org/apache/paimon/manifest/IndexManifestFileHandlerTest.java index c2c5baba0a07..edc34bc76637 100644 --- a/paimon-core/src/test/java/org/apache/paimon/manifest/IndexManifestFileHandlerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/manifest/IndexManifestFileHandlerTest.java @@ -18,18 +18,40 @@ package org.apache.paimon.manifest; +import org.apache.paimon.Snapshot; import org.apache.paimon.TestAppendFileStore; +import org.apache.paimon.TestFileStore; +import org.apache.paimon.TestKeyValueGenerator; import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.BinaryRowWriter; import org.apache.paimon.format.FileFormat; +import org.apache.paimon.fs.Path; +import org.apache.paimon.index.DeletionVectorMeta; +import org.apache.paimon.index.IndexFileHandler; +import org.apache.paimon.index.IndexFileMeta; +import org.apache.paimon.mergetree.compact.DeduplicateMergeFunction; +import org.apache.paimon.operation.metrics.CacheMetrics; +import org.apache.paimon.options.MemorySize; import org.apache.paimon.table.BucketMode; +import org.apache.paimon.table.source.DeletionFile; +import org.apache.paimon.utils.DVMetaCache; +import org.apache.paimon.utils.IndexFilePathFactories; +import org.apache.paimon.utils.Pair; +import org.apache.paimon.utils.SegmentsCache; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import java.util.Arrays; import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import static org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX; import static org.apache.paimon.index.IndexFileMetaSerializerTest.randomDeletionVectorIndexFile; import static org.assertj.core.api.Assertions.assertThat; @@ -49,6 +71,7 @@ public void testUnawareMode() throws Exception { FileFormat.manifestFormat(fileStore.options()), "zstd", fileStore.pathFactory(), + null, null) .create(); IndexManifestFileHandler indexManifestFileHandler = @@ -84,6 +107,7 @@ public void testHashFixedBucket() throws Exception { FileFormat.manifestFormat(fileStore.options()), "zstd", fileStore.pathFactory(), + null, null) .create(); IndexManifestFileHandler indexManifestFileHandler = @@ -114,4 +138,206 @@ public void testHashFixedBucket() throws Exception { assertThat(entries.contains(entry3)).isTrue(); assertThat(entries.contains(entry4)).isTrue(); } + + @Test + public void testDVMetaCache() { + TestFileStore fileStore = keyValueFileStore(); + + // Setup cache and index-manifest file + MemorySize manifestMaxMemory = MemorySize.ofMebiBytes(128); + long manifestCacheThreshold = MemorySize.ofMebiBytes(1).getBytes(); + SegmentsCache manifestCache = + SegmentsCache.create(manifestMaxMemory, manifestCacheThreshold); + DVMetaCache dvMetaCache = new DVMetaCache(1000000); + + IndexManifestFile indexManifestFile = + new IndexManifestFile.Factory( + fileStore.fileIO(), + FileFormat.manifestFormat(fileStore.options()), + "zstd", + fileStore.pathFactory(), + manifestCache, + dvMetaCache) + .create(); + + IndexManifestFileHandler indexManifestFileHandler = + new IndexManifestFileHandler(indexManifestFile, BucketMode.HASH_FIXED); + + BinaryRow partition1 = partition(1); + BinaryRow partition2 = partition(2); + + IndexManifestEntry entry1 = + new IndexManifestEntry( + FileKind.ADD, + partition1, + 0, + deletionVectorIndexFile("data1.parquet", "data2.parquet")); + IndexManifestEntry entry2 = + new IndexManifestEntry( + FileKind.ADD, partition1, 1, deletionVectorIndexFile("data3.parquet")); + IndexManifestEntry entry3 = + new IndexManifestEntry( + FileKind.ADD, + partition2, + 0, + deletionVectorIndexFile("data4.parquet", "data5.parquet")); + + String indexManifestFileName = + indexManifestFileHandler.write(null, Arrays.asList(entry1, entry2, entry3)); + + // Create IndexFileHandler with cache enabled + IndexFileHandler indexFileHandler = + new IndexFileHandler( + fileStore.fileIO(), + fileStore.snapshotManager(), + indexManifestFile, + new IndexFilePathFactories(fileStore.pathFactory()), + MemorySize.ofMebiBytes(2), + dvMetaCache, + false); + + Map expectedPartition1Bucket0Files = + indexFileHandler.extractDeletionFileByMeta(partition1, 0, entry1.indexFile()); + Map expectedPartition1Bucket1Files = + indexFileHandler.extractDeletionFileByMeta(partition1, 1, entry2.indexFile()); + Map expectedPartition2Bucket0Files = + indexFileHandler.extractDeletionFileByMeta(partition2, 0, entry3.indexFile()); + + CacheMetrics cacheMetrics = new CacheMetrics(); + indexFileHandler.withCacheMetrics(cacheMetrics); + + Snapshot snapshot = snapshot(indexManifestFileName); + + // Test 1: First access should miss cache + Set> partitionBuckets = new HashSet<>(); + partitionBuckets.add(Pair.of(partition1, 0)); + Map, Map> deletionFiles1 = + indexFileHandler.scanDVIndex(snapshot, partitionBuckets); + assertThat(deletionFiles1).isNotNull(); + assertThat(deletionFiles1.containsKey(Pair.of(partition1, 0))).isTrue(); + assertThat(cacheMetrics.getMissedObject().get()).isEqualTo(1); + assertThat(cacheMetrics.getHitObject().get()).isEqualTo(0); + Map actualPartition1Bucket0Files = + deletionFiles1.get(Pair.of(partition1, 0)); + assertThat(actualPartition1Bucket0Files).isEqualTo(expectedPartition1Bucket0Files); + + // Test 2: Second access to same partition/bucket should hit cache + Map, Map> deletionFiles2 = + indexFileHandler.scanDVIndex(snapshot, partitionBuckets); + assertThat(deletionFiles2).isNotNull(); + assertThat(deletionFiles1).isEqualTo(deletionFiles2); + assertThat(cacheMetrics.getHitObject().get()).isEqualTo(1); + Map cachedPartition1Bucket0Files = + deletionFiles2.get(Pair.of(partition1, 0)); + assertThat(cachedPartition1Bucket0Files).isEqualTo(expectedPartition1Bucket0Files); + + // Test 3: Access different bucket in same partition should hit cache + partitionBuckets.clear(); + partitionBuckets.add(Pair.of(partition1, 1)); + Map, Map> deletionFiles3 = + indexFileHandler.scanDVIndex(snapshot, partitionBuckets); + assertThat(deletionFiles3).isNotNull(); + assertThat(cacheMetrics.getHitObject().get()).isEqualTo(2); + assertThat(cacheMetrics.getMissedObject().get()).isEqualTo(1); + Map actualPartition1Bucket1Files = + deletionFiles3.get(Pair.of(partition1, 1)); + assertThat(actualPartition1Bucket1Files).isEqualTo(expectedPartition1Bucket1Files); + + // Test 4: Access different partition should miss cache + partitionBuckets.clear(); + partitionBuckets.add(Pair.of(partition2, 0)); + Map, Map> deletionFiles4 = + indexFileHandler.scanDVIndex(snapshot, partitionBuckets); + assertThat(deletionFiles4).isNotNull(); + assertThat(cacheMetrics.getMissedObject().get()).isEqualTo(2); // Now 2 misses total + Map actualPartition2Bucket0Files = + deletionFiles4.get(Pair.of(partition2, 0)); + assertThat(actualPartition2Bucket0Files).isEqualTo(expectedPartition2Bucket0Files); + + // Test 5: Test non-cache path by requesting multiple partition buckets + partitionBuckets.clear(); + partitionBuckets.add(Pair.of(partition1, 0)); + partitionBuckets.add(Pair.of(partition2, 0)); // Multiple buckets to avoid cache path + Map, Map> deletionFiles5 = + indexFileHandler.scanDVIndex(snapshot, partitionBuckets); + assertThat(deletionFiles5).isNotNull(); + assertThat(deletionFiles5.containsKey(Pair.of(partition1, 0))).isTrue(); + assertThat(deletionFiles5.containsKey(Pair.of(partition2, 0))).isTrue(); + + Map nonCachePartition1Bucket0Files = + deletionFiles5.get(Pair.of(partition1, 0)); + Map nonCachePartition2Bucket0Files = + deletionFiles5.get(Pair.of(partition2, 0)); + assertThat(nonCachePartition1Bucket0Files).isEqualTo(expectedPartition1Bucket0Files); + assertThat(nonCachePartition2Bucket0Files).isEqualTo(expectedPartition2Bucket0Files); + } + + // ============================ Test utils =================================== + + private BinaryRow partition(int partitionValue) { + BinaryRow partition = new BinaryRow(1); + BinaryRowWriter writer = new BinaryRowWriter(partition); + writer.writeInt(0, partitionValue); + writer.complete(); + return partition; + } + + private IndexFileMeta deletionVectorIndexFile(String... dataFileNames) { + LinkedHashMap dvRanges = new LinkedHashMap<>(); + int offset = 0; + for (String dataFileName : dataFileNames) { + dvRanges.put( + dataFileName, + new DeletionVectorMeta( + dataFileName, offset, 100 + offset, (long) (10 + offset))); + offset += 150; + } + return new IndexFileMeta( + DELETION_VECTORS_INDEX, + "dv_index_" + UUID.randomUUID().toString(), + 1024, + 512, + dvRanges, + null); + } + + private Snapshot snapshot(String indexManifestFileName) { + String json = + "{\n" + + " \"version\" : 3,\n" + + " \"id\" : 1,\n" + + " \"schemaId\" : 0,\n" + + " \"baseManifestList\" : null,\n" + + " \"baseManifestListSize\" : 0,\n" + + " \"deltaManifestList\" : null,\n" + + " \"deltaManifestListSize\" : 0,\n" + + " \"changelogManifestListSize\" : 0,\n" + + " \"indexManifest\" : \"" + + indexManifestFileName + + "\",\n" + + " \"commitUser\" : \"test\",\n" + + " \"commitIdentifier\" : 1,\n" + + " \"commitKind\" : \"APPEND\",\n" + + " \"timeMillis\" : " + + System.currentTimeMillis() + + ",\n" + + " \"totalRecordCount\" : null,\n" + + " \"deltaRecordCount\" : null\n" + + "}"; + return Snapshot.fromJson(json); + } + + private TestFileStore keyValueFileStore() { + return new TestFileStore.Builder( + "avro", + tempDir.toString(), + 2, // 2 buckets for testing + TestKeyValueGenerator.DEFAULT_PART_TYPE, + TestKeyValueGenerator.KEY_TYPE, + TestKeyValueGenerator.DEFAULT_ROW_TYPE, + TestKeyValueGenerator.TestKeyValueFieldsExtractor.EXTRACTOR, + DeduplicateMergeFunction.factory(), + null) + .build(); + } } diff --git a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileTest.java b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileTest.java index 78045d9440db..142ebc6a4710 100644 --- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileTest.java @@ -57,6 +57,7 @@ public class ManifestFileTest { public void testWriteAndReadManifestFile() { List entries = generateData(); ManifestFileMeta meta = gen.createManifestFileMeta(entries); + System.out.println(tempDir.toString()); ManifestFile manifestFile = createManifestFile(tempDir.toString()); List actualMetas = manifestFile.write(entries); diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/metrics/ScanMetricsTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/metrics/ScanMetricsTest.java index 7ea86a2800d1..25b1b21f61d8 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/metrics/ScanMetricsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/metrics/ScanMetricsTest.java @@ -50,7 +50,9 @@ public void testGenericMetricsRegistration() { ScanMetrics.LAST_SCAN_SKIPPED_TABLE_FILES, ScanMetrics.LAST_SCAN_RESULTED_TABLE_FILES, ScanMetrics.MANIFEST_HIT_CACHE, - ScanMetrics.MANIFEST_MISSED_CACHE); + ScanMetrics.MANIFEST_MISSED_CACHE, + ScanMetrics.DVMETA_HIT_CACHE, + ScanMetrics.DVMETA_MISSED_CACHE); } /** Tests that the metrics are updated properly. */ diff --git a/paimon-core/src/test/java/org/apache/paimon/utils/DVMetaCacheTest.java b/paimon-core/src/test/java/org/apache/paimon/utils/DVMetaCacheTest.java new file mode 100644 index 000000000000..5b1c9611bb12 --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/utils/DVMetaCacheTest.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.utils; + +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.serializer.InternalRowSerializer; +import org.apache.paimon.fs.Path; +import org.apache.paimon.table.source.DeletionFile; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; + +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link DVMetaCache}. */ +public class DVMetaCacheTest { + + @Test + public void testPutAndRead() { + DVMetaCache cache = new DVMetaCache(100); + Path path = new Path("manifest/index-manifest-00001"); + BinaryRow partition = partition("year=2023/month=12"); + + // Put data for bucket 1 with multiple files + Map dvFiles1 = new HashMap<>(); + dvFiles1.put( + "data-a1b2c3d4-e5f6-7890-abcd-ef1234567890-1.parquet", + new DeletionFile("index-a1b2c3d4-e5f6-7890-abcd-ef1234567890-1", 0L, 100L, 42L)); + dvFiles1.put( + "data-a1b2c3d4-e5f6-7890-abcd-ef1234567890-2.parquet", + new DeletionFile("index-a1b2c3d4-e5f6-7890-abcd-ef1234567890-1", 100L, 500L, null)); + cache.put(path, partition, 1, dvFiles1); + + // Put data for bucket 2 with single file + Map dvFiles2 = new HashMap<>(); + dvFiles2.put( + "data-b2c3d4e5-f6g7-8901-bcde-f23456789012-1.parquet", + new DeletionFile("index-b2c3d4e5-f6g7-8901-bcde-f23456789012-1", 0L, 300L, 12L)); + cache.put(path, partition, 2, dvFiles2); + + // Read bucket 1 - verify multiple files + Map result1 = cache.read(path, partition, 1); + assertThat(result1).isNotNull().hasSize(2); + assertThat(result1) + .containsKeys( + "data-a1b2c3d4-e5f6-7890-abcd-ef1234567890-1.parquet", + "data-a1b2c3d4-e5f6-7890-abcd-ef1234567890-2.parquet"); + + DeletionFile file1 = result1.get("data-a1b2c3d4-e5f6-7890-abcd-ef1234567890-1.parquet"); + assertThat(file1.path()).isEqualTo("index-a1b2c3d4-e5f6-7890-abcd-ef1234567890-1"); + assertThat(file1.offset()).isEqualTo(0L); + assertThat(file1.length()).isEqualTo(100L); + assertThat(file1.cardinality()).isEqualTo(42L); + + DeletionFile file2 = result1.get("data-a1b2c3d4-e5f6-7890-abcd-ef1234567890-2.parquet"); + assertThat(file2.path()).isEqualTo("index-a1b2c3d4-e5f6-7890-abcd-ef1234567890-1"); + assertThat(file2.cardinality()).isNull(); + + // Read bucket 2 - verify single file + Map result2 = cache.read(path, partition, 2); + assertThat(result2).isNotNull().hasSize(1); + assertThat(result2).containsKey("data-b2c3d4e5-f6g7-8901-bcde-f23456789012-1.parquet"); + + // Read non-existent key + assertThat(cache.read(path, partition("year=2024/month=01"), 0)).isNull(); + assertThat(cache.read(path, partition, 999)).isNull(); + } + + @Test + public void testEmptyMap() { + DVMetaCache cache = new DVMetaCache(100); + Path path = new Path("manifest/index-manifest-00002"); + BinaryRow partition = partition("year=2023/month=11"); + cache.put(path, partition, 1, new HashMap<>()); + + // Should return empty map, not null + Map result = cache.read(path, partition, 1); + assertThat(result).isNotNull().isEmpty(); + } + + @Test + public void testCacheEviction() { + DVMetaCache cache = new DVMetaCache(2); + Path path = new Path("manifest/index-manifest-00004"); + BinaryRow partition = partition("year=2023/month=09"); + + // Fill cache to capacity + Map dvFiles1 = new HashMap<>(); + dvFiles1.put( + "data-e5f6g7h8-i9j0-1234-efgh-567890123456-1.parquet", + new DeletionFile("index-e5f6g7h8-i9j0-1234-efgh-567890123456-1", 0L, 100L, 1L)); + dvFiles1.put( + "data-e5f6g7h8-i9j0-1234-efgh-567890123456-2.parquet", + new DeletionFile("index-e5f6g7h8-i9j0-1234-efgh-567890123456-1", 100L, 200L, 2L)); + cache.put(path, partition, 1, dvFiles1); + + Map dvFiles2 = new HashMap<>(); + dvFiles2.put( + "data-f6g7h8i9-j0k1-2345-fghi-678901234567-1.parquet", + new DeletionFile("index-f6g7h8i9-j0k1-2345-fghi-678901234567-1", 0L, 100L, 2L)); + cache.put(path, partition, 2, dvFiles2); + + // Verify both buckets are cached + assertThat(cache.read(path, partition, 1)).isNotNull(); + assertThat(cache.read(path, partition, 2)).isNotNull(); + + // Add third entry, should evict first one + Map dvFiles3 = new HashMap<>(); + dvFiles3.put( + "data-g7h8i9j0-k1l2-3456-ghij-789012345678-1.parquet", + new DeletionFile("index-g7h8i9j0-k1l2-3456-ghij-789012345678-1", 0L, 100L, 3L)); + cache.put(path, partition, 3, dvFiles3); + + // First entry should be evicted + assertThat(cache.read(path, partition, 1)).isNull(); + assertThat(cache.read(path, partition, 3)).isNotNull(); + } + + // ============================ Test utils =================================== + + private BinaryRow partition(String partitionValue) { + InternalRowSerializer serializer = + new InternalRowSerializer(RowType.of(DataTypes.STRING())); + return serializer + .toBinaryRow(GenericRow.of(BinaryString.fromString(partitionValue))) + .copy(); + } +}