Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,13 @@ public class CatalogOptions {
.withDescription(
"Controls the max number for snapshots per table in the catalog are cached.");

public static final ConfigOption<Integer> CACHE_DV_MAX_NUM =
key("cache.deletion-vectors.max-bucket-num")
.intType()
.defaultValue(20000)
.withDescription(
"Controls the maximum number of bucket-level deletion vector meta that can be cached.");

public static final ConfigOption<Boolean> CASE_SENSITIVE =
ConfigOptions.key("case-sensitive")
.booleanType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.apache.paimon.tag.TagPreview;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.ChangelogManager;
import org.apache.paimon.utils.DVMetaCache;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.IndexFilePathFactories;
import org.apache.paimon.utils.InternalRowPartitionComputer;
Expand Down Expand Up @@ -97,6 +98,7 @@ abstract class AbstractFileStore<T> implements FileStore<T> {

@Nullable private SegmentsCache<Path> readManifestCache;
@Nullable private Cache<Path, Snapshot> snapshotCache;
@Nullable private DVMetaCache dvMetaCache;

protected AbstractFileStore(
FileIO fileIO,
Expand Down Expand Up @@ -216,7 +218,8 @@ public IndexManifestFile.Factory indexManifestFileFactory() {
FileFormat.manifestFormat(options),
options.manifestCompression(),
pathFactory(),
readManifestCache);
readManifestCache,
dvMetaCache);
}

@Override
Expand All @@ -227,6 +230,7 @@ public IndexFileHandler newIndexFileHandler() {
indexManifestFileFactory().create(),
new IndexFilePathFactories(pathFactory()),
options.dvIndexFileTargetSize(),
this.dvMetaCache,
options.deletionVectorBitmap64());
}

Expand Down Expand Up @@ -488,4 +492,9 @@ public void setManifestCache(SegmentsCache<Path> manifestCache) {
public void setSnapshotCache(Cache<Path, Snapshot> cache) {
this.snapshotCache = cache;
}

@Override
public void setDVMetaCache(DVMetaCache dvMetaCache) {
this.dvMetaCache = dvMetaCache;
}
}
3 changes: 3 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/FileStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.paimon.tag.TagAutoManager;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.ChangelogManager;
import org.apache.paimon.utils.DVMetaCache;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.SegmentsCache;
import org.apache.paimon.utils.SnapshotManager;
Expand Down Expand Up @@ -121,4 +122,6 @@ PartitionExpire newPartitionExpire(
void setManifestCache(SegmentsCache<Path> manifestCache);

void setSnapshotCache(Cache<Path, Snapshot> cache);

void setDVMetaCache(DVMetaCache dvMetaCache);
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.system.SystemTableLoader;
import org.apache.paimon.utils.DVMetaCache;
import org.apache.paimon.utils.SegmentsCache;

import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
Expand All @@ -43,6 +44,7 @@
import java.util.Map;
import java.util.Optional;

import static org.apache.paimon.options.CatalogOptions.CACHE_DV_MAX_NUM;
import static org.apache.paimon.options.CatalogOptions.CACHE_ENABLED;
import static org.apache.paimon.options.CatalogOptions.CACHE_EXPIRE_AFTER_ACCESS;
import static org.apache.paimon.options.CatalogOptions.CACHE_EXPIRE_AFTER_WRITE;
Expand All @@ -66,9 +68,9 @@ public class CachingCatalog extends DelegateCatalog {
protected Cache<String, Database> databaseCache;
protected Cache<Identifier, Table> tableCache;
@Nullable protected final SegmentsCache<Path> manifestCache;

// partition cache will affect data latency
@Nullable protected Cache<Identifier, List<Partition>> partitionCache;
@Nullable protected DVMetaCache dvMetaCache;

public CachingCatalog(Catalog wrapped, Options options) {
super(wrapped);
Expand Down Expand Up @@ -97,6 +99,11 @@ public CachingCatalog(Catalog wrapped, Options options) {
this.manifestCache = SegmentsCache.create(manifestMaxMemory, manifestCacheThreshold);

this.cachedPartitionMaxNum = options.get(CACHE_PARTITION_MAX_NUM);

int cacheDvMaxNum = options.get(CACHE_DV_MAX_NUM);
if (cacheDvMaxNum > 0) {
this.dvMetaCache = new DVMetaCache(cacheDvMaxNum);
}
init(Ticker.systemTicker());
}

Expand Down Expand Up @@ -266,6 +273,9 @@ private void putTableCache(Identifier identifier, Table table) {
if (manifestCache != null) {
storeTable.setManifestCache(manifestCache);
}
if (dvMetaCache != null) {
storeTable.setDVMetaCache(dvMetaCache);
}
}

tableCache.put(identifier, table);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,35 @@
package org.apache.paimon.index;

import org.apache.paimon.Snapshot;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.deletionvectors.DeletionVector;
import org.apache.paimon.deletionvectors.DeletionVectorsIndexFile;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.manifest.IndexManifestFile;
import org.apache.paimon.operation.metrics.CacheMetrics;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.table.source.DeletionFile;
import org.apache.paimon.utils.DVMetaCache;
import org.apache.paimon.utils.IndexFilePathFactories;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.SnapshotManager;

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

import static org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
import static org.apache.paimon.index.HashIndexFile.HASH_INDEX;
Expand All @@ -52,20 +61,24 @@ public class IndexFileHandler {
private final IndexFilePathFactories pathFactories;
private final MemorySize dvTargetFileSize;
private final boolean dvBitmap64;
@Nullable private CacheMetrics cacheMetrics;
@Nullable private final DVMetaCache dvMetaCache;

public IndexFileHandler(
FileIO fileIO,
SnapshotManager snapshotManager,
IndexManifestFile indexManifestFile,
IndexFilePathFactories pathFactories,
MemorySize dvTargetFileSize,
DVMetaCache dvMetaCache,
boolean dvBitmap64) {
this.fileIO = fileIO;
this.snapshotManager = snapshotManager;
this.pathFactories = pathFactories;
this.indexManifestFile = indexManifestFile;
this.dvTargetFileSize = dvTargetFileSize;
this.dvBitmap64 = dvBitmap64;
this.dvMetaCache = dvMetaCache;
}

public HashIndexFile hashIndex(BinaryRow partition, int bucket) {
Expand All @@ -87,6 +100,140 @@ public Optional<IndexFileMeta> scanHashIndex(
return result.isEmpty() ? Optional.empty() : Optional.of(result.get(0));
}

public void withCacheMetrics(@Nullable CacheMetrics cacheMetrics) {
this.cacheMetrics = cacheMetrics;
}

// Construct DataFile -> DeletionFile based on IndexFileMeta
@Nullable
@VisibleForTesting
public Map<String, DeletionFile> extractDeletionFileByMeta(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove public and add @VisibleForTesting.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove public will lead to compilation problems

BinaryRow partition, Integer bucket, IndexFileMeta fileMeta) {
LinkedHashMap<String, DeletionVectorMeta> dvRanges = fileMeta.dvRanges();
String dvFilePath = dvIndex(partition, bucket).path(fileMeta).toString();
if (dvRanges != null && !dvRanges.isEmpty()) {
Map<String, DeletionFile> result = new HashMap<>();
for (DeletionVectorMeta dvMeta : dvRanges.values()) {
result.put(
dvMeta.dataFileName(),
new DeletionFile(
dvFilePath,
dvMeta.offset(),
dvMeta.length(),
dvMeta.cardinality()));
}
return result;
}
return null;
}

// Scan DV index file of given partition buckets
// returns <DataFile: DeletionFile> map grouped by partition and bucket
public Map<Pair<BinaryRow, Integer>, Map<String, DeletionFile>> scanDVIndex(
Snapshot snapshot, Set<Pair<BinaryRow, Integer>> partitionBuckets) {
if (snapshot == null || snapshot.indexManifest() == null) {
return Collections.emptyMap();
}
Map<Pair<BinaryRow, Integer>, Map<String, DeletionFile>> result = new HashMap<>();
// to avoid cache being frequently evicted,
// currently we only read from cache when bucket number is 1
if (this.dvMetaCache != null && partitionBuckets.size() == 1) {
Pair<BinaryRow, Integer> partitionBucket = partitionBuckets.iterator().next();
Map<String, DeletionFile> deletionFiles =
this.scanDVIndexWithCache(
snapshot, partitionBucket.getLeft(), partitionBucket.getRight());
if (deletionFiles != null && deletionFiles.size() > 0) {
result.put(partitionBucket, deletionFiles);
}
return result;
}
Map<Pair<BinaryRow, Integer>, List<IndexFileMeta>> partitionFileMetas =
scan(
snapshot,
DELETION_VECTORS_INDEX,
partitionBuckets.stream().map(Pair::getLeft).collect(Collectors.toSet()));
partitionFileMetas.forEach(
(entry, indexFileMetas) -> {
if (partitionBuckets.contains(entry)) {
if (indexFileMetas != null) {
indexFileMetas.forEach(
indexFileMeta -> {
Map<String, DeletionFile> dvMetas =
extractDeletionFileByMeta(
entry.getLeft(),
entry.getRight(),
indexFileMeta);
if (dvMetas != null) {
result.computeIfAbsent(entry, k -> new HashMap<>())
.putAll(dvMetas);
}
});
}
}
});
return result;
}

// Scan DV Meta Cache first, if not exist, scan DV index file, returns the exact deletion file
// of the specified partition/bucket
@VisibleForTesting
Map<String, DeletionFile> scanDVIndexWithCache(
Snapshot snapshot, BinaryRow partition, Integer bucket) {
// read from cache
String indexManifestName = snapshot.indexManifest();
Path indexManifestPath = this.indexManifestFile.indexManifestFilePath(indexManifestName);
Map<String, DeletionFile> result =
this.dvMetaCache.read(indexManifestPath, partition, bucket);
if (result != null) {
if (cacheMetrics != null) {
cacheMetrics.increaseHitObject();
}
return result;
}
if (cacheMetrics != null) {
cacheMetrics.increaseMissedObject();
}
// If miss, read the whole partition's deletion files
Map<Pair<BinaryRow, Integer>, List<IndexFileMeta>> partitionFileMetas =
scan(
snapshot,
DELETION_VECTORS_INDEX,
new HashSet<>(Collections.singletonList(partition)));
// for each bucket, extract deletion files, and fill meta cache
for (Map.Entry<Pair<BinaryRow, Integer>, List<IndexFileMeta>> entry :
partitionFileMetas.entrySet()) {
Pair<BinaryRow, Integer> partitionBucket = entry.getKey();
List<IndexFileMeta> fileMetas = entry.getValue();
if (entry.getValue() != null) {
Map<String, DeletionFile> bucketDeletionFiles = new HashMap<>();
fileMetas.forEach(
meta -> {
Map<String, DeletionFile> bucketDVMetas =
extractDeletionFileByMeta(
partitionBucket.getLeft(),
partitionBucket.getRight(),
meta);
if (bucketDVMetas != null) {
bucketDeletionFiles.putAll(bucketDVMetas);
}
});
// bucketDeletionFiles can be empty
this.dvMetaCache.put(
indexManifestPath,
partitionBucket.getLeft(),
partitionBucket.getRight(),
bucketDeletionFiles);
if (partitionBucket.getRight() != null
&& partitionBucket.getLeft() != null
&& partitionBucket.getRight().equals(bucket)
&& partitionBucket.getLeft().equals(partition)) {
result = bucketDeletionFiles;
}
}
}
return result;
}

public List<IndexManifestEntry> scan(String indexType) {
return scan(snapshotManager.latestSnapshot(), indexType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.paimon.fs.Path;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.DVMetaCache;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.ObjectsFile;
import org.apache.paimon.utils.PathFactory;
Expand All @@ -39,14 +40,17 @@
/** Index manifest file. */
public class IndexManifestFile extends ObjectsFile<IndexManifestEntry> {

private final DVMetaCache dvMetaCache;

private IndexManifestFile(
FileIO fileIO,
RowType schema,
FormatReaderFactory readerFactory,
FormatWriterFactory writerFactory,
String compression,
PathFactory pathFactory,
@Nullable SegmentsCache<Path> cache) {
@Nullable SegmentsCache<Path> cache,
@Nullable DVMetaCache dvMetaCache) {
super(
fileIO,
new IndexManifestEntrySerializer(),
Expand All @@ -56,6 +60,11 @@ private IndexManifestFile(
compression,
pathFactory,
cache);
this.dvMetaCache = dvMetaCache;
}

public Path indexManifestFilePath(String fileName) {
return pathFactory.toPath(fileName);
}

/** Write new index files to index manifest. */
Expand All @@ -79,18 +88,21 @@ public static class Factory {
private final String compression;
private final FileStorePathFactory pathFactory;
@Nullable private final SegmentsCache<Path> cache;
@Nullable private final DVMetaCache dvMetaCache;

public Factory(
FileIO fileIO,
FileFormat fileFormat,
String compression,
FileStorePathFactory pathFactory,
@Nullable SegmentsCache<Path> cache) {
@Nullable SegmentsCache<Path> cache,
@Nullable DVMetaCache dvMetaCache) {
this.fileIO = fileIO;
this.fileFormat = fileFormat;
this.compression = compression;
this.pathFactory = pathFactory;
this.cache = cache;
this.dvMetaCache = dvMetaCache;
}

public IndexManifestFile create() {
Expand All @@ -102,7 +114,8 @@ public IndexManifestFile create() {
fileFormat.createWriterFactory(schema),
compression,
pathFactory.indexManifestFileFactory(),
cache);
cache,
dvMetaCache);
}
}
}
Loading
Loading