diff --git a/libs/vectorized-exec-spi/src/main/java/org/opensearch/vectorized/execution/jni/PageCacheAware.java b/libs/vectorized-exec-spi/src/main/java/org/opensearch/vectorized/execution/jni/PageCacheAware.java new file mode 100644 index 0000000000000..315258e30a8a4 --- /dev/null +++ b/libs/vectorized-exec-spi/src/main/java/org/opensearch/vectorized/execution/jni/PageCacheAware.java @@ -0,0 +1,32 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.vectorized.execution.jni; + +/** + * Marker interface for plugins that can receive a {@link PageCacheProvider}. + *

+ * Implemented by {@code TieredStoragePlugin} so that {@code Node.java} (in the + * {@code server} module) can inject a page cache provider without needing + * a compile-time dependency on the {@code modules/tiered-storage} module. + *

+ * {@code Node.java} discovers plugins implementing this interface and calls + * {@link #setPageCacheProvider(PageCacheProvider)} after the {@link PageCacheProvider} + * (e.g., {@code DataFusionPlugin}) has been discovered. + */ +public interface PageCacheAware { + + /** + * Inject the page cache provider. + * Called by {@code Node.java} during node construction, after the plugin implementing + * {@link PageCacheProvider} (e.g. {@code DataFusionPlugin}) has been initialized. + * + * @param provider the page cache provider, never null when this method is called + */ + void setPageCacheProvider(PageCacheProvider provider); +} diff --git a/libs/vectorized-exec-spi/src/main/java/org/opensearch/vectorized/execution/jni/PageCacheProvider.java b/libs/vectorized-exec-spi/src/main/java/org/opensearch/vectorized/execution/jni/PageCacheProvider.java new file mode 100644 index 0000000000000..60ec86d126e40 --- /dev/null +++ b/libs/vectorized-exec-spi/src/main/java/org/opensearch/vectorized/execution/jni/PageCacheProvider.java @@ -0,0 +1,53 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.vectorized.execution.jni; + +/** + * Provider for a byte-range page cache used to serve Parquet file reads on warm indices. + *

+ * Implemented by search-engine plugins (e.g. {@code DataFusionPlugin}) that own an + * in-process page cache and want to expose it to the tiered-storage module so that + * {@code CachedParquetCacheStrategy} can serve Parquet column-chunk byte ranges from the + * cache instead of re-fetching from object storage (S3/GCS/Azure) on every + * {@code openIndexInput()} call. + *

+ * The cache key is the local filesystem path of the Parquet file (without leading slash) + * combined with the byte range, e.g. {@code "data/nodes/0/.../parquet/_parquet_0.parquet:4096-8192"}. + * The exact key format is an implementation detail of the provider. + */ +public interface PageCacheProvider { + + /** + * Look up a cached byte range for a Parquet file. + * + * @param path the local file path used as cache key (e.g. "data/nodes/0/.../parquet/_parquet_0.parquet") + * @param start byte range start (inclusive) + * @param end byte range end (exclusive) + * @return the cached bytes, or {@code null} on cache miss + */ + byte[] getPageRange(String path, int start, int end); + + /** + * Store a byte range for a Parquet file in the cache. + * + * @param path the local file path used as cache key + * @param start byte range start (inclusive) + * @param end byte range end (exclusive) + * @param data the bytes to cache (must have length == end - start) + */ + void putPageRange(String path, int start, int end, byte[] data); + + /** + * Evict all cached byte ranges for a given Parquet file. + * Called when a file is deleted (merged, compacted, or tiered out). + * + * @param path the local file path whose cached ranges should be removed + */ + void evictFile(String path); +} diff --git a/modules/tiered-storage/src/main/java/org/opensearch/storage/TieredStoragePlugin.java b/modules/tiered-storage/src/main/java/org/opensearch/storage/TieredStoragePlugin.java index de466c1e70bfc..a898c6b4320fb 100644 --- a/modules/tiered-storage/src/main/java/org/opensearch/storage/TieredStoragePlugin.java +++ b/modules/tiered-storage/src/main/java/org/opensearch/storage/TieredStoragePlugin.java @@ -66,6 +66,8 @@ import org.opensearch.telemetry.metrics.MetricsRegistry; import org.opensearch.telemetry.tracing.Tracer; import org.opensearch.index.store.CompositeStoreDirectoryFactory; +import org.opensearch.vectorized.execution.jni.PageCacheAware; +import org.opensearch.vectorized.execution.jni.PageCacheProvider; import org.opensearch.vectorized.execution.jni.NativeObjectStoreProvider; import java.util.ArrayList; @@ -88,7 +90,7 @@ * Per-repository remote stores are added to the shared {@code FileRegistry} as new * repositories are encountered. Different indices can point to different repositories. */ -public class TieredStoragePlugin extends Plugin implements IndexStorePlugin, ActionPlugin, TelemetryAwarePlugin, NativeObjectStoreProvider { +public class TieredStoragePlugin extends Plugin implements IndexStorePlugin, ActionPlugin, TelemetryAwarePlugin, NativeObjectStoreProvider, PageCacheAware { private static final org.apache.logging.log4j.Logger logger = org.apache.logging.log4j.LogManager.getLogger(TieredStoragePlugin.class); @@ -101,6 +103,14 @@ public class TieredStoragePlugin extends Plugin implements IndexStorePlugin, Act private volatile Supplier repositoriesServiceSupplier; + /** + * Page cache provider — received from DataFusionPlugin via Node.java. + * Passed into TieredCompositeStoreDirectoryFactory so that CachedParquetCacheStrategy + * can be used for parquet format files instead of PassthroughCacheStrategy. + * May be null if DataFusionPlugin is not loaded or page cache is disabled. + */ + private volatile PageCacheProvider pageCacheProvider; + // Global native ObjectStore — created lazily on first warm shard creation private volatile long globalObjStoreDataPtr; private volatile long globalObjStoreVtablePtr; @@ -139,6 +149,17 @@ public Map getCompositeStoreDirectoryFac return Collections.emptyMap(); } + /** + * Set the page cache provider (e.g. from DataFusionPlugin). + * Called by Node.java after discovering a plugin implementing {@link PageCacheProvider}, + * so TieredCompositeStoreDirectoryFactory can use CachedParquetCacheStrategy. + */ + @Override + public void setPageCacheProvider(PageCacheProvider provider) { + this.pageCacheProvider = provider; + logger.info("[TieredStoragePlugin] PageCacheProvider set — parquet reads will use page cache"); + } + @Override public Map getCachedCompositeStoreDirectoryFactories() { return Map.of(TIERED_COMPOSITE_INDEX_TYPE, new TieredCompositeStoreDirectoryFactory( @@ -147,7 +168,12 @@ public Map this.pageCacheProvider )); } diff --git a/modules/tiered-storage/src/main/java/org/opensearch/storage/directory/CachedParquetCacheStrategy.java b/modules/tiered-storage/src/main/java/org/opensearch/storage/directory/CachedParquetCacheStrategy.java new file mode 100644 index 0000000000000..d144207fe55dd --- /dev/null +++ b/modules/tiered-storage/src/main/java/org/opensearch/storage/directory/CachedParquetCacheStrategy.java @@ -0,0 +1,304 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.storage.directory; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexInput; +import org.opensearch.index.engine.exec.FileMetadata; +import org.opensearch.index.store.CompositeRemoteSegmentStoreDirectory; +import org.opensearch.index.store.FormatCacheStrategy; +import org.opensearch.index.store.FormatStoreDirectory; +import org.opensearch.storage.jni.TieredStoreNative; +import org.opensearch.vectorized.execution.jni.PageCacheProvider; + +import java.io.IOException; +import java.nio.file.Files; +import java.util.zip.CRC32; + +/** + * Foyer-backed cache strategy for the Parquet format on warm indices. + * Replaces {@link PassthroughCacheStrategy} for the {@code "parquet"} format name. + * The key difference is in {@link #openInput}: for REMOTE files, instead of opening a full + * streaming {@link IndexInput} from {@link CompositeRemoteSegmentStoreDirectory} (which + * re-fetches the entire file from S3/GCS/Azure on every call), it returns a + * {@link CachedParquetIndexInput} that serves byte ranges from Foyer (cache miss fetches from + * remote and populates Foyer). All other operations (FileRegistry routing, ref-counting, + * file registration, deletion, checksums) are identical to {@link PassthroughCacheStrategy}. + * For Lucene format files, {@link PassthroughCacheStrategy} is still used unchanged. + */ +public class CachedParquetCacheStrategy implements FormatCacheStrategy { + + private static final Logger logger = LogManager.getLogger(CachedParquetCacheStrategy.class); + + private final String formatName; + private final CompositeRemoteSegmentStoreDirectory remoteDirectory; + private final long registryPtr; + private final String dirPathPrefix; + /** Page cache — provided by DataFusionPlugin via the PageCacheProvider interface */ + private final PageCacheProvider foyerCache; + /** Reference to the owning directory for deferred local file deletion */ + private volatile TieredCompositeStoreDirectory owningDirectory; + + public CachedParquetCacheStrategy( + String formatName, + CompositeRemoteSegmentStoreDirectory remoteDirectory, + long registryPtr, + String dirPathPrefix, + PageCacheProvider foyerCache + ) { + this.formatName = formatName; + this.remoteDirectory = remoteDirectory; + this.registryPtr = registryPtr; + this.dirPathPrefix = dirPathPrefix; + this.foyerCache = foyerCache; + } + + /** + * Set the owning directory reference. Called after construction by + * {@link TieredCompositeStoreDirectory} so that deferred local file + * deletion can be triggered when the last reader closes. + */ + public void setOwningDirectory(TieredCompositeStoreDirectory directory) { + this.owningDirectory = directory; + } + + /** Build the full registry key for a file (matches DataFusion/object_store key format). */ + private String registryKey(String fileName) { + return dirPathPrefix + "/" + fileName; + } + + /** + * Build the format-qualified remote file name. + * {@link CompositeRemoteSegmentStoreDirectory#openInput} uses the FileMetadata + * delimiter to route to the correct format container. + */ + private String remoteFileName(String fileName) { + if (formatName == null || formatName.isEmpty() || "lucene".equals(formatName)) { + return fileName; + } + return fileName + FileMetadata.DELIMITER + formatName; + } + + @Override + public String name() { + return "foyer-parquet(" + formatName + ")"; + } + + /** + * Opens an IndexInput for reading a Parquet file. + * LOCAL files are served from disk; REMOTE files return a {@link CachedParquetIndexInput} + * that serves byte ranges from Foyer (cache miss fetches from remote and populates cache). + * FileRegistry ref-counting is maintained in both paths (same as PassthroughCacheStrategy). + */ + @Override + public IndexInput openInput(String fileName, IOContext context, FormatStoreDirectory delegate) + throws IOException { + + String key = registryKey(fileName); + // Acquire read reference in the FileRegistry — prevents local eviction while reading + int location = TieredStoreNative.registryAcquireRead(registryPtr, key); + + // --- LOCAL or UNREGISTERED: read directly from disk --- + if (location == TieredStoreNative.LOCATION_LOCAL + || location == TieredStoreNative.LOCATION_NOT_FOUND) { + logger.debug("[CachedParquetCacheStrategy] openInput LOCAL: format={}, file={}, key={}, loc={}", + formatName, fileName, key, locationName(location)); + try { + IndexInput localInput = delegate.openIndexInput(fileName, context); + // Wrap in RefCountedIndexInput for safe eviction (same as PassthroughCacheStrategy) + return new PassthroughCacheStrategy.RefCountedIndexInputPublic( + localInput, key, registryPtr, owningDirectory + ); + } catch (IOException e) { + TieredStoreNative.registryReleaseRead(registryPtr, key); + throw e; + } + } + + // --- REMOTE or BOTH: serve via Foyer page cache --- + if (remoteDirectory != null) { + logger.debug("[CachedParquetCacheStrategy] openInput REMOTE (Foyer): format={}, file={}, key={}", + formatName, fileName, key); + try { + // Resolve file length: try registry first (O(1)), fall back to remote metadata + long fileLen = TieredStoreNative.registryGetSize(registryPtr, key); + if (fileLen < 0) { + fileLen = remoteDirectory.fileLength(remoteFileName(fileName)); + } + + return new CachedParquetIndexInput( + "CachedParquet(" + fileName + ")", + remoteFileName(fileName), + key, + fileLen, + foyerCache, + remoteDirectory, + registryPtr, + owningDirectory + ); + } catch (IOException e) { + // Remote failed — fall back to local + logger.warn("[CachedParquetCacheStrategy] remote open failed for {}, trying local: {}", + fileName, e.getMessage()); + try { + IndexInput localInput = delegate.openIndexInput(fileName, context); + return new PassthroughCacheStrategy.RefCountedIndexInputPublic( + localInput, key, registryPtr, owningDirectory + ); + } catch (IOException localEx) { + TieredStoreNative.registryReleaseRead(registryPtr, key); + throw e; + } + } + } + + // No remote directory — fall back to local + logger.debug("[CachedParquetCacheStrategy] openInput LOCAL fallback (no remote dir): format={}, file={}", + formatName, fileName); + try { + IndexInput localInput = delegate.openIndexInput(fileName, context); + return new PassthroughCacheStrategy.RefCountedIndexInputPublic( + localInput, key, registryPtr, owningDirectory + ); + } catch (IOException e) { + TieredStoreNative.registryReleaseRead(registryPtr, key); + throw e; + } + } + + @Override + public void onFileWritten(String fileName, FormatStoreDirectory delegate) throws IOException { + String key = registryKey(fileName); + long size = 0; + try { + size = delegate.fileLength(fileName); + } catch (IOException e) { + logger.warn("[CachedParquetCacheStrategy] could not get size for {}: {}", fileName, e.getMessage()); + } + TieredStoreNative.registryRegisterLocal(registryPtr, key, size); + logger.debug("[CachedParquetCacheStrategy] onFileWritten: format={}, file={}, key={}, size={}", + formatName, fileName, key, size); + } + + @Override + public void onFileDeleted(String fileName) throws IOException { + String key = registryKey(fileName); + int location = TieredStoreNative.registryGetLocation(registryPtr, key); + if (location == TieredStoreNative.LOCATION_BOTH) { + TieredStoreNative.registryMarkLocalDeleted(registryPtr, key); + logger.debug("[CachedParquetCacheStrategy] onFileDeleted (mark local deleted): key={}", key); + } else if (location == TieredStoreNative.LOCATION_REMOTE) { + logger.debug("[CachedParquetCacheStrategy] onFileDeleted (already remote-only): key={}", key); + } else { + TieredStoreNative.registryRemove(registryPtr, key); + logger.debug("[CachedParquetCacheStrategy] onFileDeleted (removed from registry): key={}", key); + } + // Also evict from Foyer page cache — stale bytes must not be served + foyerCache.evictFile(key); + } + + @Override + public long fileLength(String fileName, FormatStoreDirectory delegate) throws IOException { + String key = registryKey(fileName); + long size = TieredStoreNative.registryGetSize(registryPtr, key); + if (size >= 0) return size; + + try { + return delegate.fileLength(fileName); + } catch (IOException e) { + // fall through to remote + } + + if (remoteDirectory != null) { + return remoteDirectory.fileLength(remoteFileName(fileName)); + } + throw new IOException("fileLength failed for " + fileName + " — not in registry, local, or remote"); + } + + @Override + public long calculateChecksum(String fileName, FormatStoreDirectory delegate) throws IOException { + String key = registryKey(fileName); + int location = TieredStoreNative.registryGetLocation(registryPtr, key); + + // For LOCAL or BOTH: try local first ONLY if the file actually exists on disk. + // On a warm node receiving shards via peer recovery, the registry may report BOTH + // (populated from remote metadata) but the file is not yet present locally — + // in that case, calling delegate.calculateChecksum() would log a misleading ERROR. + if (location == TieredStoreNative.LOCATION_LOCAL || location == TieredStoreNative.LOCATION_BOTH) { + if (Files.exists(delegate.getDirectoryPath().resolve(fileName))) { + try { return delegate.calculateChecksum(fileName); } catch (IOException ignored) {} + } else { + logger.debug("[CachedParquetCacheStrategy] calculateChecksum: skipping local (file not on disk): key={}, loc={}", + key, locationName(location)); + } + } + // REMOTE, BOTH-with-local-failure/missing, or NOT_FOUND: try remote + if (remoteDirectory != null) { + try (IndexInput input = remoteDirectory.openInput(remoteFileName(fileName), IOContext.READONCE)) { + return computeCrc32(input); + } + } + // Last resort: try local (handles NOT_FOUND case where file may still exist) + return delegate.calculateChecksum(fileName); + } + + @Override + public String calculateUploadChecksum(String fileName, FormatStoreDirectory delegate) + throws IOException { + String key = registryKey(fileName); + int location = TieredStoreNative.registryGetLocation(registryPtr, key); + + // For LOCAL or BOTH: try local first ONLY if the file actually exists on disk. + if (location == TieredStoreNative.LOCATION_LOCAL || location == TieredStoreNative.LOCATION_BOTH) { + if (Files.exists(delegate.getDirectoryPath().resolve(fileName))) { + try { return delegate.calculateUploadChecksum(fileName); } catch (IOException ignored) {} + } else { + logger.debug("[CachedParquetCacheStrategy] calculateUploadChecksum: skipping local (file not on disk): key={}, loc={}", + key, locationName(location)); + } + } + // REMOTE, BOTH-with-local-failure/missing, or NOT_FOUND: try remote + if (remoteDirectory != null) { + try (IndexInput input = remoteDirectory.openInput(remoteFileName(fileName), IOContext.READONCE)) { + return Long.toString(computeCrc32(input)); + } + } + // Last resort: try local + return delegate.calculateUploadChecksum(fileName); + } + + @Override + public void close() throws IOException { + // no-op — registry is owned by TieredCompositeStoreDirectory + } + + private static long computeCrc32(IndexInput input) throws IOException { + CRC32 crc32 = new CRC32(); + byte[] buffer = new byte[8192]; + long remaining = input.length(); + while (remaining > 0) { + int toRead = (int) Math.min(buffer.length, remaining); + input.readBytes(buffer, 0, toRead); + crc32.update(buffer, 0, toRead); + remaining -= toRead; + } + return crc32.getValue(); + } + + private static String locationName(int loc) { + switch (loc) { + case TieredStoreNative.LOCATION_LOCAL: return "LOCAL"; + case TieredStoreNative.LOCATION_REMOTE: return "REMOTE"; + case TieredStoreNative.LOCATION_BOTH: return "BOTH"; + default: return "UNREGISTERED"; + } + } +} diff --git a/modules/tiered-storage/src/main/java/org/opensearch/storage/directory/CachedParquetIndexInput.java b/modules/tiered-storage/src/main/java/org/opensearch/storage/directory/CachedParquetIndexInput.java new file mode 100644 index 0000000000000..9677b247ecfd4 --- /dev/null +++ b/modules/tiered-storage/src/main/java/org/opensearch/storage/directory/CachedParquetIndexInput.java @@ -0,0 +1,257 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.storage.directory; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexInput; +import org.opensearch.index.store.CompositeRemoteSegmentStoreDirectory; +import org.opensearch.storage.jni.TieredStoreNative; +import org.opensearch.vectorized.execution.jni.PageCacheProvider; + +import java.io.IOException; + +/** + * Lucene {@link IndexInput} backed by the Foyer in-memory page cache. + *

+ * Used by {@link CachedParquetCacheStrategy} for REMOTE Parquet files. + * On every {@code readBytes()} call it translates the current file pointer + * and length into a byte range and serves it from Foyer (fetching from + * {@code remoteDirectory} on cache miss). + *

+ * On {@code close()}, releases the FileRegistry read reference — same safety + * contract as {@link PassthroughCacheStrategy.RefCountedIndexInput}. + */ +public class CachedParquetIndexInput extends IndexInput { + + private static final Logger logger = LogManager.getLogger(CachedParquetIndexInput.class); + + /** Remote file name (format-qualified, e.g. "_parquet_0.parquet:::parquet") */ + private final String remoteFileName; + /** FileRegistry key (local path without leading "/") */ + private final String registryKey; + /** Total file length in bytes */ + private final long fileLength; + /** Page cache provider — calls back into DataFusionPlugin via JNI */ + private final PageCacheProvider foyerCache; + /** Remote directory for fetching bytes on cache miss */ + private final CompositeRemoteSegmentStoreDirectory remoteDirectory; + /** FileRegistry pointer for ref-counting */ + private final long registryPtr; + /** Owning directory for deferred eviction after last reader closes */ + private final TieredCompositeStoreDirectory owningDirectory; + + /** Current virtual file pointer */ + private long filePointer = 0L; + /** Whether this input has been closed */ + private boolean closed = false; + + public CachedParquetIndexInput( + String resourceDescription, + String remoteFileName, + String registryKey, + long fileLength, + PageCacheProvider foyerCache, + CompositeRemoteSegmentStoreDirectory remoteDirectory, + long registryPtr, + TieredCompositeStoreDirectory owningDirectory + ) { + super(resourceDescription); + this.remoteFileName = remoteFileName; + this.registryKey = registryKey; + this.fileLength = fileLength; + this.foyerCache = foyerCache; + this.remoteDirectory = remoteDirectory; + this.registryPtr = registryPtr; + this.owningDirectory = owningDirectory; + } + + @Override + public byte readByte() throws IOException { + byte[] buf = new byte[1]; + readBytes(buf, 0, 1); + return buf[0]; + } + + @Override + public void readBytes(byte[] b, int offset, int len) throws IOException { + if (closed) throw new IOException("IndexInput is closed: " + toString()); + if (filePointer + len > fileLength) { + throw new IOException( + "Read past EOF: filePointer=" + filePointer + ", len=" + len + + ", fileLength=" + fileLength + ", file=" + remoteFileName + ); + } + + int start = (int) filePointer; + int end = start + len; + + // 1. Try Foyer cache first + byte[] cached = foyerCache.getPageRange(registryKey, start, end); + if (cached != null) { + logger.debug("[CachedParquetIndexInput] cache HIT: key={}, range={}..{}", registryKey, start, end); + System.arraycopy(cached, 0, b, offset, len); + filePointer += len; + return; + } + + // 2. Cache miss — fetch from remote directory + logger.debug("[CachedParquetIndexInput] cache MISS: key={}, range={}..{} — fetching from remote", + registryKey, start, end); + + byte[] fetched = fetchRangeFromRemote(start, len); + + // 3. Populate Foyer for future reads + foyerCache.putPageRange(registryKey, start, end, fetched); + + System.arraycopy(fetched, 0, b, offset, len); + filePointer += len; + } + + /** + * Fetch a byte range from the remote directory. + * Opens a temporary IndexInput at the remote path, seeks to {@code start}, reads {@code len} bytes. + */ + private byte[] fetchRangeFromRemote(int start, int len) throws IOException { + try (IndexInput remoteInput = remoteDirectory.openInput(remoteFileName, IOContext.READONCE)) { + remoteInput.seek(start); + byte[] buf = new byte[len]; + remoteInput.readBytes(buf, 0, len); + return buf; + } + } + + @Override + public void seek(long pos) throws IOException { + if (pos < 0 || pos > fileLength) { + throw new IOException("Seek out of bounds: pos=" + pos + ", fileLength=" + fileLength); + } + filePointer = pos; + } + + @Override + public long getFilePointer() { + return filePointer; + } + + @Override + public long length() { + return fileLength; + } + + @Override + public IndexInput slice(String sliceDescription, long offset, long length) throws IOException { + // Create a sliced view — delegates reads through this input + return new SlicedFoyerIndexInput(sliceDescription, this, offset, length); + } + + @Override + public void close() throws IOException { + if (!closed) { + closed = true; + // Release FileRegistry read reference + long remaining = TieredStoreNative.registryReleaseRead(registryPtr, registryKey); + // If last reader and file is REMOTE-only, trigger deferred local delete + if (remaining == 0 && owningDirectory != null) { + owningDirectory.tryDeleteLocalFileAfterLastRead(registryKey); + } + } + } + + @Override + public IndexInput clone() { + CachedParquetIndexInput cloned = (CachedParquetIndexInput) super.clone(); + // Each clone has its own file pointer (already handled by super.clone()) + // Increment registry read ref for the clone + TieredStoreNative.registryAcquireRead(registryPtr, registryKey); + return cloned; + } + + // ------------------------------------------------------------------------- + // Inner class: slice support + // ------------------------------------------------------------------------- + + /** + * Sliced view of a {@link CachedParquetIndexInput}. + * Reads are delegated to the parent input with offset adjustment. + */ + static class SlicedFoyerIndexInput extends IndexInput { + + private final CachedParquetIndexInput parent; + private final long sliceOffset; + private final long sliceLength; + private long localPointer = 0L; + + SlicedFoyerIndexInput( + String resourceDescription, + CachedParquetIndexInput parent, + long offset, + long length + ) { + super(resourceDescription); + this.parent = parent; + this.sliceOffset = offset; + this.sliceLength = length; + } + + @Override + public byte readByte() throws IOException { + byte[] buf = new byte[1]; + readBytes(buf, 0, 1); + return buf[0]; + } + + @Override + public void readBytes(byte[] b, int offset, int len) throws IOException { + if (localPointer + len > sliceLength) { + throw new IOException("Read past slice end"); + } + long absoluteStart = sliceOffset + localPointer; + int start = (int) absoluteStart; + int end = start + len; + + // Try Foyer cache via parent + byte[] cached = parent.foyerCache.getPageRange(parent.registryKey, start, end); + if (cached != null) { + System.arraycopy(cached, 0, b, offset, len); + localPointer += len; + return; + } + + // Fetch from remote + byte[] fetched = parent.fetchRangeFromRemote(start, len); + parent.foyerCache.putPageRange(parent.registryKey, start, end, fetched); + System.arraycopy(fetched, 0, b, offset, len); + localPointer += len; + } + + @Override + public void seek(long pos) throws IOException { + if (pos < 0 || pos > sliceLength) throw new IOException("Seek out of slice bounds"); + localPointer = pos; + } + + @Override + public long getFilePointer() { return localPointer; } + + @Override + public long length() { return sliceLength; } + + @Override + public IndexInput slice(String desc, long offset, long length) throws IOException { + return new SlicedFoyerIndexInput(desc, parent, sliceOffset + offset, length); + } + + @Override + public void close() throws IOException { + // Slice does not own the ref-count; parent does + } + } +} diff --git a/modules/tiered-storage/src/main/java/org/opensearch/storage/directory/PassthroughCacheStrategy.java b/modules/tiered-storage/src/main/java/org/opensearch/storage/directory/PassthroughCacheStrategy.java index c5b155c49771f..ef61a3bcb92bc 100644 --- a/modules/tiered-storage/src/main/java/org/opensearch/storage/directory/PassthroughCacheStrategy.java +++ b/modules/tiered-storage/src/main/java/org/opensearch/storage/directory/PassthroughCacheStrategy.java @@ -341,12 +341,23 @@ private static String locationName(int loc) { } } + /** + * Package-visible alias so that {@link CachedParquetCacheStrategy} can reuse the same + * ref-counting wrapper for LOCAL reads without duplicating the logic. + */ + static class RefCountedIndexInputPublic extends RefCountedIndexInput { + RefCountedIndexInputPublic(IndexInput delegate, String fileName, long registryPtr, + TieredCompositeStoreDirectory owningDirectory) { + super(delegate, fileName, registryPtr, owningDirectory); + } + } + /** * Wrapper that releases the read reference in the Rust FileRegistry when closed. * When the last reader closes (active_reads → 0) and the file is REMOTE-only, * triggers deferred local file deletion via the owning directory. */ - private static class RefCountedIndexInput extends IndexInput { + static class RefCountedIndexInput extends IndexInput { private IndexInput delegate; private final String fileName; diff --git a/modules/tiered-storage/src/main/java/org/opensearch/storage/directory/TieredCompositeStoreDirectoryFactory.java b/modules/tiered-storage/src/main/java/org/opensearch/storage/directory/TieredCompositeStoreDirectoryFactory.java index 2f00798610ae0..146a042abc5df 100644 --- a/modules/tiered-storage/src/main/java/org/opensearch/storage/directory/TieredCompositeStoreDirectoryFactory.java +++ b/modules/tiered-storage/src/main/java/org/opensearch/storage/directory/TieredCompositeStoreDirectoryFactory.java @@ -24,6 +24,7 @@ import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.vectorized.execution.jni.PageCacheProvider; import java.io.IOException; import java.util.function.Supplier; @@ -50,13 +51,44 @@ public class TieredCompositeStoreDirectoryFactory implements CachedCompositeStor private final Supplier repositoriesService; private final java.util.function.Function globalRegistryPtrResolver; + /** + * Supplier for the page cache provider. + *

+ * Using a Supplier (rather than holding the provider directly) is critical: in Node.java, + * {@code getCachedCompositeStoreDirectoryFactories()} is called at line ~973, BEFORE + * {@code setPageCacheProvider()} is called at line ~1191. By capturing it as a Supplier, + * the actual provider is resolved lazily at shard creation time (the first call to + * {@code newDirectory()}), by which point the provider has already been set. + */ + private final Supplier pageCacheProviderSupplier; + /** + * Constructor without Foyer cache (backward compatible). + * Uses PassthroughCacheStrategy for all formats. + */ public TieredCompositeStoreDirectoryFactory( Supplier repositoriesService, java.util.function.Function globalRegistryPtrResolver + ) { + this(repositoriesService, globalRegistryPtrResolver, () -> null); + } + + /** + * Constructor with lazy page cache provider supplier. + *

+ * The supplier is called per-shard at {@code newDirectory()} time (not at factory construction + * time), so it correctly observes the provider value that was set after factory creation. + * When the supplier returns non-null for a shard's parquet format, {@link CachedParquetCacheStrategy} + * is used; otherwise {@link PassthroughCacheStrategy} is used. + */ + public TieredCompositeStoreDirectoryFactory( + Supplier repositoriesService, + java.util.function.Function globalRegistryPtrResolver, + Supplier pageCacheProviderSupplier ) { this.repositoriesService = repositoriesService; this.globalRegistryPtrResolver = globalRegistryPtrResolver; + this.pageCacheProviderSupplier = pageCacheProviderSupplier; } @Override @@ -82,12 +114,26 @@ public CompositeStoreDirectory newDirectory( fileCache != null ? "present" : "null", remoteDirectory != null ? "present" : "null"); + // Cache strategy factory: + // "parquet" + Foyer available → CachedParquetCacheStrategy (byte-range caching via Foyer) + // "parquet" + no Foyer → PassthroughCacheStrategy (full remote read each time) + // "lucene" / "metadata" / etc → PassthroughCacheStrategy (FieldCache will replace later) + // Resolved HERE (at shard creation time), not at factory construction time. + // This is why pageCacheProviderSupplier is a Supplier — the provider is set in Node.java + // AFTER getCachedCompositeStoreDirectoryFactories() is called. + final PageCacheProvider pageCache = this.pageCacheProviderSupplier.get(); TieredCompositeStoreDirectory directory = new TieredCompositeStoreDirectory( indexSettings, pluginsService, shardId, shardPath, - (formatName, dirPathPrefix) -> new PassthroughCacheStrategy(formatName, remoteDirectory, registryPtr, dirPathPrefix), + (formatName, dirPathPrefix) -> { + if ("parquet".equals(formatName) && pageCache != null) { + logger.debug("[TieredCompositeStoreDirectoryFactory] using CachedParquetCacheStrategy for format=parquet, shard={}", shardId); + return new CachedParquetCacheStrategy(formatName, remoteDirectory, registryPtr, dirPathPrefix, pageCache); + } + return new PassthroughCacheStrategy(formatName, remoteDirectory, registryPtr, dirPathPrefix); + }, registryPtr, remoteDataBlobPath, repositoryName, diff --git a/plugins/engine-datafusion/Cargo.toml b/plugins/engine-datafusion/Cargo.toml index 9889595c669c2..346a7cacecc1f 100644 --- a/plugins/engine-datafusion/Cargo.toml +++ b/plugins/engine-datafusion/Cargo.toml @@ -19,6 +19,8 @@ arrow-array = "57.3.0" arrow-schema = "57.3.0" arrow-buffer = "57.3.0" downcast-rs = "1.2" +foyer = { version = "=0.11.5" } +bytes = "1.9" # JNI dependencies diff --git a/plugins/engine-datafusion/jni/Cargo.toml b/plugins/engine-datafusion/jni/Cargo.toml index a2538d76b13d9..6144aef22ada1 100644 --- a/plugins/engine-datafusion/jni/Cargo.toml +++ b/plugins/engine-datafusion/jni/Cargo.toml @@ -68,6 +68,12 @@ url = { workspace = true } # Liquid Cache for byte-level caching liquid-cache-datafusion-local = { workspace = true } +# Foyer hybrid in-memory+disk cache for Parquet page caching +foyer = { workspace = true } + +# serde_bytes: efficient Bytes serialization needed for Foyer's StorageValue bound +serde_bytes = "0.11" + # Substrait support substrait = { workspace = true } diff --git a/plugins/engine-datafusion/jni/src/cache.rs b/plugins/engine-datafusion/jni/src/cache.rs index 4172c0926d724..26ad695a72998 100644 --- a/plugins/engine-datafusion/jni/src/cache.rs +++ b/plugins/engine-datafusion/jni/src/cache.rs @@ -7,11 +7,13 @@ use datafusion::execution::cache::CacheAccessor; use object_store::ObjectMeta; use vectorized_exec_spi::log_error; -pub const ALL_CACHE_TYPES: &[&str] = &[CACHE_TYPE_METADATA, CACHE_TYPE_STATS]; +pub const ALL_CACHE_TYPES: &[&str] = &[CACHE_TYPE_METADATA, CACHE_TYPE_STATS, CACHE_TYPE_PAGES]; // Cache type constants pub const CACHE_TYPE_METADATA: &str = "METADATA"; pub const CACHE_TYPE_STATS: &str = "STATISTICS"; +/// Foyer-backed byte-level page cache for Parquet column chunk data (Cache Layer 3) +pub const CACHE_TYPE_PAGES: &str = "PAGES"; // Helper function to handle cache errors #[allow(dead_code)] diff --git a/plugins/engine-datafusion/jni/src/cache_jni.rs b/plugins/engine-datafusion/jni/src/cache_jni.rs index db45f8adc7cb4..3863e8c2c808d 100644 --- a/plugins/engine-datafusion/jni/src/cache_jni.rs +++ b/plugins/engine-datafusion/jni/src/cache_jni.rs @@ -1,5 +1,5 @@ -use jni::objects::{JClass, JObjectArray, JString}; -use jni::sys::jlong; +use jni::objects::{JByteArray, JClass, JObjectArray, JString}; +use jni::sys::{jbyteArray, jint, jlong}; use jni::{JNIEnv}; use crate::custom_cache_manager::CustomCacheManager; use crate::util::{parse_string_arr}; @@ -7,8 +7,35 @@ use crate::cache; use crate::DataFusionRuntime; use datafusion::execution::cache::cache_unit::DefaultFilesMetadataCache; use std::sync::Arc; +use bytes::Bytes; use vectorized_exec_spi::{log_info, log_error, log_debug}; +// Default page cache budgets — overridden by Java settings via createCache() +const DEFAULT_PAGE_CACHE_MEMORY_BYTES: usize = 256 * 1024 * 1024; // 256 MB L1 memory +const DEFAULT_PAGE_CACHE_DISK_BYTES: usize = 10 * 1024 * 1024 * 1024; // 10 GB L2 disk +const DEFAULT_PAGE_CACHE_DIR: &str = "/tmp/foyer-page-cache"; + +/// Parse the eviction_type string for PAGES cache type. +/// Expected format: "|" +/// Falls back to defaults if the string is malformed (e.g. plain "LRU" from old Java code). +fn parse_page_cache_params(eviction_str: &str) -> (usize, String) { + if let Some(sep) = eviction_str.find('|') { + let disk_bytes_str = &eviction_str[..sep]; + let disk_dir = eviction_str[sep + 1..].to_string(); + if let Ok(disk_bytes) = disk_bytes_str.parse::() { + let dir = if disk_dir.is_empty() { DEFAULT_PAGE_CACHE_DIR.to_string() } else { disk_dir }; + return (disk_bytes, dir); + } + } + // Fallback: plain eviction type like "LRU" from legacy config + log_info!( + "[FOYER-PAGE-CACHE] eviction_type '{}' is not in '|

' format; \ + using defaults: disk={}B, dir={}", + eviction_str, DEFAULT_PAGE_CACHE_DISK_BYTES, DEFAULT_PAGE_CACHE_DIR + ); + (DEFAULT_PAGE_CACHE_DISK_BYTES, DEFAULT_PAGE_CACHE_DIR.to_string()) +} + /// Create a CustomCacheManager instance #[no_mangle] pub extern "system" fn Java_org_opensearch_datafusion_jni_NativeBridge_createCustomCacheManager( @@ -89,6 +116,29 @@ pub extern "system" fn Java_org_opensearch_datafusion_jni_NativeBridge_createCac manager.set_statistics_cache(stats_cache); log_info!("[CACHE INFO] Successfully created {} cache in CustomCacheManager", cache_type_str); } + cache::CACHE_TYPE_PAGES => { + // Create Foyer hybrid (memory + disk) page cache — Cache Layer 3. + // `size_limit` is the L1 memory budget in bytes (e.g. 256 MB). + // The L2 disk budget and disk directory come from the Java settings + // PAGE_CACHE_DISK_CAPACITY and PAGE_CACHE_DIR; for this cache creation + // call they are passed via the eviction_type string as + // "|". + // Format: eviction_type_str = "|" + let (disk_bytes, disk_dir) = parse_page_cache_params(&eviction_type_str); + log_info!( + "[FOYER-PAGE-CACHE] creating hybrid page cache: L1-mem={}B, L2-disk={}B, dir={}", + size_limit, disk_bytes, disk_dir + ); + let page_cache = Arc::new(crate::foyer_cache::FoyerDiskPageCache::new( + size_limit as usize, + disk_bytes, + disk_dir, + )); + manager.set_page_cache(page_cache); + log_info!( + "[FOYER-PAGE-CACHE] successfully created Foyer hybrid page cache in CustomCacheManager" + ); + } _ => { let msg = format!("Invalid cache type: {}", cache_type_str); log_error!("[CACHE ERROR] {}", msg); @@ -444,3 +494,128 @@ pub extern "system" fn Java_org_opensearch_datafusion_jni_NativeBridge_cacheMana } } } + +// ============================================================================ +// Foyer page cache JNI operations (Layer 3: Parquet byte range cache) +// Called by DataFusionPlugin.FoyerCacheProvider implementation to serve +// PassthroughCacheStrategy → FoyerParquetCacheStrategy in the tiered-storage module. +// ============================================================================ + +/// Look up a cached byte range for a Parquet file. +/// Returns the cached bytes as a Java byte[], or null on cache miss. +#[no_mangle] +pub extern "system" fn Java_org_opensearch_datafusion_jni_NativeBridge_foyerPageCacheGet( + mut env: JNIEnv, + _class: JClass, + runtime_ptr: jlong, + path: JString, + start: jint, + end: jint, +) -> jbyteArray { + if runtime_ptr == 0 { + return std::ptr::null_mut(); + } + + let runtime = unsafe { &*(runtime_ptr as *const DataFusionRuntime) }; + let path_str: String = match env.get_string(&path) { + Ok(s) => s.into(), + Err(_) => return std::ptr::null_mut(), + }; + + let page_cache = match runtime.custom_cache_manager.as_ref().and_then(|m| m.get_page_cache()) { + Some(c) => c, + None => return std::ptr::null_mut(), + }; + + // FoyerDiskPageCache.get() is async (disk I/O). Use get_blocking() since JNI is synchronous. + match page_cache.get_blocking(&path_str, start as usize, end as usize) { + Some(bytes) => { + log_debug!( + "[FOYER-PAGE-CACHE] JNI get HIT: path={}, range={}..{}, size={}B", + path_str, start, end, bytes.len() + ); + match env.byte_array_from_slice(&bytes) { + Ok(arr) => arr.into_raw(), + Err(e) => { + log_debug!("[FOYER-PAGE-CACHE] JNI get: failed to create Java byte[]: {}", e); + std::ptr::null_mut() + } + } + } + None => { + log_debug!( + "[FOYER-PAGE-CACHE] JNI get MISS: path={}, range={}..{}", + path_str, start, end + ); + std::ptr::null_mut() + } + } +} + +/// Store a byte range for a Parquet file in the Foyer page cache. +#[no_mangle] +pub extern "system" fn Java_org_opensearch_datafusion_jni_NativeBridge_foyerPageCachePut( + mut env: JNIEnv, + _class: JClass, + runtime_ptr: jlong, + path: JString, + start: jint, + end: jint, + data: JByteArray, +) { + if runtime_ptr == 0 { + return; + } + + let runtime = unsafe { &*(runtime_ptr as *const DataFusionRuntime) }; + let path_str: String = match env.get_string(&path) { + Ok(s) => s.into(), + Err(e) => { + log_debug!("[FoyerCache] foyerPageCachePut: failed to convert path: {}", e); + return; + } + }; + + let page_cache = match runtime.custom_cache_manager.as_ref().and_then(|m| m.get_page_cache()) { + Some(c) => c, + None => return, + }; + + let bytes_vec: Vec = match env.convert_byte_array(data) { + Ok(v) => v, + Err(e) => { + log_debug!("[FoyerCache] foyerPageCachePut: failed to convert byte array: {}", e); + return; + } + }; + + page_cache.put(path_str, start as usize, end as usize, Bytes::from(bytes_vec)); +} + +/// Evict all cached byte ranges for a given Parquet file. +/// Called when a file is deleted (merged/compacted/tiered out). +#[no_mangle] +pub extern "system" fn Java_org_opensearch_datafusion_jni_NativeBridge_foyerPageCacheEvictFile( + mut env: JNIEnv, + _class: JClass, + runtime_ptr: jlong, + path: JString, +) { + if runtime_ptr == 0 { + return; + } + + let runtime = unsafe { &*(runtime_ptr as *const DataFusionRuntime) }; + let path_str: String = match env.get_string(&path) { + Ok(s) => s.into(), + Err(e) => { + log_debug!("[FoyerCache] foyerPageCacheEvictFile: failed to convert path: {}", e); + return; + } + }; + + if let Some(page_cache) = runtime.custom_cache_manager.as_ref().and_then(|m| m.get_page_cache()) { + page_cache.evict_file(&path_str); + log_debug!("[FoyerCache] evicted file from page cache: {}", path_str); + } +} diff --git a/plugins/engine-datafusion/jni/src/caching_object_store.rs b/plugins/engine-datafusion/jni/src/caching_object_store.rs new file mode 100644 index 0000000000000..83f203459479e --- /dev/null +++ b/plugins/engine-datafusion/jni/src/caching_object_store.rs @@ -0,0 +1,234 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +//! Foyer-backed caching wrapper around any [`ObjectStore`]. +//! +//! [`CachingObjectStore`] intercepts `get_range()` and `get_ranges()` calls — +//! the two methods DataFusion uses to fetch Parquet column chunk byte ranges. +//! All other methods are delegated transparently to the inner store. +//! +//! ## Two-tier read path +//! +//! ```text +//! DataFusion.get_range(file, 4096..8192) +//! └── CachingObjectStore.get_range() +//! ├── [FOYER-PAGE-CACHE] check L1-memory → HIT: return bytes (0 I/O) +//! ├── [FOYER-PAGE-CACHE] check L2-disk → HIT: return bytes (local NVMe) +//! └── MISS: inner.get_range() → S3/local read +//! └── [FOYER-PAGE-CACHE] PUT → L1-memory (async spill to L2-disk) +//! ``` +//! +//! ## Log prefix +//! +//! All log lines produced by this module use `[FOYER-PAGE-CACHE]` for easy grepping. + +use std::fmt; +use std::ops::Range; +use std::sync::Arc; + +use async_trait::async_trait; +use bytes::Bytes; +use futures::stream::BoxStream; +use object_store::{ + path::Path, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore, + PutMultipartOpts, PutOptions, PutPayload, PutResult, +}; +use vectorized_exec_spi::{log_debug, log_info}; + +use crate::foyer_cache::FoyerDiskPageCache; + +/// An [`ObjectStore`] wrapper that caches `get_range` / `get_ranges` results +/// in the Foyer hybrid (memory + disk) page cache. +pub struct CachingObjectStore { + inner: Arc, + page_cache: Arc, +} + +impl fmt::Debug for CachingObjectStore { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "CachingObjectStore(inner={}, {})", + self.inner, self.page_cache.disk_dir().display()) + } +} + +impl fmt::Display for CachingObjectStore { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "CachingObjectStore({})", self.inner) + } +} + +impl CachingObjectStore { + /// Wrap `inner` with Foyer-backed page caching. + pub fn new(inner: Arc, page_cache: Arc) -> Self { + log_info!( + "[FOYER-PAGE-CACHE] CachingObjectStore created: inner={}, disk_dir={}, \ + mem_capacity={}B, disk_capacity={}B", + inner, + page_cache.disk_dir().display(), + page_cache.memory_capacity_bytes(), + page_cache.disk_capacity_bytes() + ); + Self { inner, page_cache } + } + + /// Strip the leading `/` so the cache key matches the FileRegistry key format. + fn cache_path(location: &Path) -> String { + let s = location.as_ref(); + if s.starts_with('/') { s[1..].to_string() } else { s.to_string() } + } +} + +#[async_trait] +impl ObjectStore for CachingObjectStore { + // ── Write passthrough ────────────────────────────────────────── + + async fn put(&self, location: &Path, payload: PutPayload) -> object_store::Result { + self.inner.put(location, payload).await + } + async fn put_opts(&self, location: &Path, payload: PutPayload, opts: PutOptions) -> object_store::Result { + self.inner.put_opts(location, payload, opts).await + } + async fn put_multipart(&self, location: &Path) -> object_store::Result> { + self.inner.put_multipart(location).await + } + async fn put_multipart_opts(&self, location: &Path, opts: PutMultipartOpts) -> object_store::Result> { + self.inner.put_multipart_opts(location, opts).await + } + + // ── Read passthrough (non-range) ─────────────────────────────── + + async fn get(&self, location: &Path) -> object_store::Result { + self.inner.get(location).await + } + async fn get_opts(&self, location: &Path, options: GetOptions) -> object_store::Result { + self.inner.get_opts(location, options).await + } + async fn head(&self, location: &Path) -> object_store::Result { + self.inner.head(location).await + } + + // ── Range reads: intercepted by Foyer page cache ─────────────── + + /// Fetch a single byte range. + /// Checks Foyer L1 (memory) then L2 (disk) before falling through to the inner store. + async fn get_range(&self, location: &Path, range: Range) -> object_store::Result { + let path_str = Self::cache_path(location); + let start = range.start as usize; + let end = range.end as usize; + + // L1+L2 lookup (async — disk I/O is async in Foyer) + if let Some(cached) = self.page_cache.get(&path_str, start, end).await { + log_info!( + "[FOYER-PAGE-CACHE] get_range HIT: path={}, range={}..{}, size={}B", + path_str, start, end, cached.len() + ); + return Ok(cached); + } + + // L1+L2 miss — fetch from inner store (local NVMe or S3/GCS/Azure) + log_info!( + "[FOYER-PAGE-CACHE] get_range MISS → inner store: path={}, range={}..{}", + path_str, start, end + ); + let bytes = self.inner.get_range(location, range).await?; + + // Populate cache (insert to L1; Foyer spills to L2 asynchronously) + log_info!( + "[FOYER-PAGE-CACHE] get_range PUT: path={}, range={}..{}, size={}B", + path_str, start, end, bytes.len() + ); + self.page_cache.put(path_str, start, end, bytes.clone()); + + Ok(bytes) + } + + /// Fetch multiple byte ranges in one call. + /// Each range is looked up individually so partial cache hits are exploited. + async fn get_ranges(&self, location: &Path, ranges: &[Range]) -> object_store::Result> { + let path_str = Self::cache_path(location); + + let mut results: Vec> = vec![None; ranges.len()]; + let mut miss_indices: Vec = Vec::new(); + + // Check each range in the cache + for (i, range) in ranges.iter().enumerate() { + let start = range.start as usize; + let end = range.end as usize; + if let Some(cached) = self.page_cache.get(&path_str, start, end).await { + log_info!( + "[FOYER-PAGE-CACHE] get_ranges HIT [{}/{}]: path={}, range={}..{}, size={}B", + i + 1, ranges.len(), path_str, start, end, cached.len() + ); + results[i] = Some(cached); + } else { + miss_indices.push(i); + } + } + + if miss_indices.is_empty() { + log_info!( + "[FOYER-PAGE-CACHE] get_ranges ALL HIT: path={}, {} ranges", + path_str, ranges.len() + ); + return Ok(results.into_iter().map(|b| b.unwrap()).collect()); + } + + // Fetch only the missing ranges from the inner store + log_info!( + "[FOYER-PAGE-CACHE] get_ranges PARTIAL MISS: path={}, {}/{} ranges need fetch", + path_str, miss_indices.len(), ranges.len() + ); + let miss_ranges: Vec> = miss_indices.iter().map(|&i| ranges[i].clone()).collect(); + let fetched = self.inner.get_ranges(location, &miss_ranges).await?; + + for (miss_idx, fetched_bytes) in miss_indices.iter().zip(fetched.into_iter()) { + let range = &ranges[*miss_idx]; + let start = range.start as usize; + let end = range.end as usize; + log_info!( + "[FOYER-PAGE-CACHE] get_ranges PUT: path={}, range={}..{}, size={}B", + path_str, start, end, fetched_bytes.len() + ); + self.page_cache.put(path_str.clone(), start, end, fetched_bytes.clone()); + results[*miss_idx] = Some(fetched_bytes); + } + + Ok(results.into_iter().map(|b| b.unwrap()).collect()) + } + + // ── Directory / listing — object_store 0.12 API ──────────────── + + async fn delete(&self, location: &Path) -> object_store::Result<()> { + self.inner.delete(location).await + } + + fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, object_store::Result> { + self.inner.list(prefix) + } + + async fn list_with_delimiter(&self, prefix: Option<&Path>) -> object_store::Result { + self.inner.list_with_delimiter(prefix).await + } + + fn list_with_offset(&self, prefix: Option<&Path>, offset: &Path) -> BoxStream<'static, object_store::Result> { + self.inner.list_with_offset(prefix, offset) + } + + async fn copy(&self, from: &Path, to: &Path) -> object_store::Result<()> { + self.inner.copy(from, to).await + } + async fn rename(&self, from: &Path, to: &Path) -> object_store::Result<()> { + self.inner.rename(from, to).await + } + async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> object_store::Result<()> { + self.inner.copy_if_not_exists(from, to).await + } + async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> object_store::Result<()> { + self.inner.rename_if_not_exists(from, to).await + } +} diff --git a/plugins/engine-datafusion/jni/src/custom_cache_manager.rs b/plugins/engine-datafusion/jni/src/custom_cache_manager.rs index a516c3cf5b4c5..2b1ea32da57f2 100644 --- a/plugins/engine-datafusion/jni/src/custom_cache_manager.rs +++ b/plugins/engine-datafusion/jni/src/custom_cache_manager.rs @@ -10,14 +10,17 @@ use crate::util::{create_object_meta_from_file}; use object_store::path::Path; use object_store::ObjectMeta; use datafusion::datasource::physical_plan::parquet::metadata::DFParquetMetadata; -use vectorized_exec_spi::{log_debug, log_error}; +use vectorized_exec_spi::{log_debug, log_error, log_info}; +use crate::foyer_cache::FoyerDiskPageCache; /// Custom CacheManager that holds cache references directly pub struct CustomCacheManager { - /// Direct reference to the file metadata cache + /// Direct reference to the file metadata cache (Cache Layer 1: Parquet footer/schema) file_metadata_cache: Option>, - /// Direct reference to the statistics cache - statistics_cache: Option> + /// Direct reference to the statistics cache (Cache Layer 2: row counts, min/max stats) + statistics_cache: Option>, + /// Foyer-backed hybrid (memory+disk) page cache (Cache Layer 3: Parquet column chunk byte ranges) + pub page_cache: Option>, } impl CustomCacheManager { @@ -25,22 +28,43 @@ impl CustomCacheManager { pub fn new() -> Self { Self { file_metadata_cache: None, - statistics_cache: None + statistics_cache: None, + page_cache: None, } } - /// Set the file metadata cache + /// Set the file metadata cache (Layer 1) pub fn set_file_metadata_cache(&mut self, cache: Arc) { self.file_metadata_cache = Some(cache); log_debug!("[CACHE INFO] File metadata cache set in CustomCacheManager"); } - /// Set the statistics cache + /// Set the statistics cache (Layer 2) pub fn set_statistics_cache(&mut self, cache: Arc) { self.statistics_cache = Some(cache); log_debug!("[CACHE INFO] Statistics cache set in CustomCacheManager"); } + /// Set the Foyer page cache (Layer 3). + /// + /// Once set, the `CachingObjectStore` (wrapping this cache) will intercept all + /// `get_range()` calls to DataFusion's ObjectStore, returning cached bytes on HIT + /// and populating the cache on MISS. + pub fn set_page_cache(&mut self, cache: Arc) { + log_info!( + "[FOYER-PAGE-CACHE] page cache set in CustomCacheManager: mem={}B, disk={}B, dir={}", + cache.memory_capacity_bytes(), + cache.disk_capacity_bytes(), + cache.disk_dir().display() + ); + self.page_cache = Some(cache); + } + + /// Get the Foyer page cache (Layer 3), if configured. + pub fn get_page_cache(&self) -> Option> { + self.page_cache.clone() + } + /// Get the statistics cache pub fn get_statistics_cache(&self) -> Option> { self.statistics_cache.clone() @@ -167,6 +191,12 @@ impl CustomCacheManager { } } + // Evict all cached page byte ranges for this file (Layer 3) + if let Some(page_cache) = &self.page_cache { + page_cache.evict_file(file_path); + any_removed = true; // evict_file is best-effort, count as success + } + let removed = if !errors.is_empty() && !any_removed { false } else { @@ -257,22 +287,27 @@ impl CustomCacheManager { pub fn get_total_memory_consumed(&self) -> usize { let mut total = 0; - // Add metadata cache memory + // Layer 1: metadata cache memory if let Some(cache) = &self.file_metadata_cache { if let Ok(cache_guard) = cache.inner.lock() { total += cache_guard.memory_used(); } } - // Add statistics cache memory + // Layer 2: statistics cache memory if let Some(cache) = &self.statistics_cache { total += cache.memory_consumed(); } + // Layer 3: page cache memory (Foyer reports usage in bytes) + if let Some(cache) = &self.page_cache { + total += cache.memory_usage_bytes(); + } + total } - /// Clear all caches + /// Clear all caches (Layers 1, 2, and 3) pub fn clear_all(&self) { if let Some(cache) = &self.file_metadata_cache { cache.clear(); @@ -280,6 +315,11 @@ impl CustomCacheManager { if let Some(cache) = &self.statistics_cache { cache.clear(); } + // FoyerDiskPageCache.clear() is async — use the blocking wrapper + if let Some(cache) = &self.page_cache { + log_info!("[FOYER-PAGE-CACHE] clear_all: clearing page cache (memory + disk)"); + cache.clear_blocking(); + } } /// Clear specific cache type @@ -301,6 +341,15 @@ impl CustomCacheManager { Err("No statistics cache configured".to_string()) } } + crate::cache::CACHE_TYPE_PAGES => { + if let Some(cache) = &self.page_cache { + log_info!("[FOYER-PAGE-CACHE] clear_cache_type PAGES: clearing page cache (memory + disk)"); + cache.clear_blocking(); + Ok(()) + } else { + Err("No page cache configured".to_string()) + } + } _ => Err(format!("Unknown cache type: {}", cache_type)) } } @@ -326,6 +375,13 @@ impl CustomCacheManager { Err("No statistics cache configured".to_string()) } } + crate::cache::CACHE_TYPE_PAGES => { + if let Some(cache) = &self.page_cache { + Ok(cache.memory_usage_bytes()) + } else { + Err("No page cache configured".to_string()) + } + } _ => Err(format!("Unknown cache type: {}", cache_type)) } } diff --git a/plugins/engine-datafusion/jni/src/foyer_cache.rs b/plugins/engine-datafusion/jni/src/foyer_cache.rs new file mode 100644 index 0000000000000..ca2752d8cab59 --- /dev/null +++ b/plugins/engine-datafusion/jni/src/foyer_cache.rs @@ -0,0 +1,258 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +//! Foyer-backed **hybrid (memory + disk)** page cache for Parquet column chunk byte ranges. +//! +//! ## Architecture +//! +//! `FoyerDiskPageCache` wraps Foyer's [`HybridCache`] which provides two storage tiers: +//! - **L1 (memory)**: hot byte ranges served from RAM — zero I/O +//! - **L2 (disk)**: warm byte ranges spilled to a dedicated directory on NVMe — avoids S3 fetches +//! +//! On a cache miss both tiers are checked. On eviction from L1, entries are spilled to L2 +//! automatically by Foyer. On L2 eviction they are dropped. +//! +//! ## Key format +//! +//! Cache key = `":-"` (a plain String) +//! Example: `"data/nodes/0/indices/UUID/0/index/parquet/_parquet_0.parquet:4096-8192"` +//! +//! ## Log prefix +//! +//! All log lines use the `[FOYER-PAGE-CACHE]` prefix so the caching flow can be easily +//! grepped in the OpenSearch logs. + +use std::path::PathBuf; +use std::sync::Arc; +use bytes::Bytes; +use serde::{Deserialize, Serialize}; +use foyer::{HybridCache, HybridCacheBuilder, DirectFsDeviceOptionsBuilder, LruConfig}; +use vectorized_exec_spi::{log_debug, log_info, log_error}; + +// ──────────────────────────────────────────────────────────────────── +// Value wrapper: Bytes does not implement serde, so wrap it. +// ──────────────────────────────────────────────────────────────────── + +/// Newtype wrapper around [`Bytes`] that implements `serde::Serialize/Deserialize` +/// so it satisfies Foyer's `StorageValue` bound for disk persistence. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct CachedBytes(#[serde(with = "serde_bytes")] Vec); + +impl CachedBytes { + pub fn from_bytes(b: Bytes) -> Self { + Self(b.to_vec()) + } + pub fn into_bytes(self) -> Bytes { + Bytes::from(self.0) + } +} + +// ──────────────────────────────────────────────────────────────────── +// Main cache struct +// ──────────────────────────────────────────────────────────────────── + +/// Foyer hybrid (memory + disk) page cache for Parquet byte ranges. +/// +/// - **L1**: an in-memory LRU bounded by `memory_capacity_bytes` +/// - **L2**: a disk store in `disk_dir` bounded by `disk_capacity_bytes` +/// +/// Thread-safe and cheap to clone (inner `HybridCache` is `Arc`-backed). +/// +/// **Important**: The `tokio::runtime::Runtime` used to build Foyer must stay alive +/// for the entire lifetime of the `HybridCache`. Foyer spawns background I/O tasks +/// on that runtime during `build().await`; dropping the runtime cancels those tasks, +/// which causes `JoinError::Cancelled` panics in `foyer-storage`. We therefore keep +/// the runtime as an `Arc` field so it is dropped only after the `HybridCache` itself. +#[derive(Clone)] +pub struct FoyerDiskPageCache { + inner: HybridCache, + /// The Tokio runtime that owns Foyer's background tasks. + /// Must outlive `inner` — Arc ensures it is dropped last. + _runtime: Arc, + memory_capacity_bytes: usize, + disk_capacity_bytes: usize, + disk_dir: PathBuf, +} + +impl std::fmt::Debug for FoyerDiskPageCache { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "FoyerDiskPageCache(mem={}B, disk={}B, dir={:?})", + self.memory_capacity_bytes, self.disk_capacity_bytes, self.disk_dir) + } +} + +impl FoyerDiskPageCache { + /// Build the cache synchronously by blocking a Tokio runtime. + /// + /// Called once at node startup from `DataFusionRuntimeEnv`. + /// The `disk_dir` must be writable; Foyer will create the directory if needed. + /// + /// # Arguments + /// * `memory_capacity_bytes` — hot L1 memory budget (e.g. 512 MB) + /// * `disk_capacity_bytes` — warm L2 disk budget (e.g. 10 GB) + /// * `disk_dir` — directory for Foyer disk files + pub fn new(memory_capacity_bytes: usize, disk_capacity_bytes: usize, disk_dir: impl Into) -> Self { + let disk_dir = disk_dir.into(); + + log_info!( + "[FOYER-PAGE-CACHE] initializing hybrid page cache: memory={}B, disk={}B, dir={}", + memory_capacity_bytes, disk_capacity_bytes, disk_dir.display() + ); + + // Foyer's HybridCacheBuilder::build() is async — use a temporary Tokio runtime + // to block on it. We only do this once at startup so the overhead is acceptable. + let rt = tokio::runtime::Runtime::new() + .expect("[FOYER-PAGE-CACHE] failed to create bootstrap Tokio runtime"); + + let disk_dir_clone = disk_dir.clone(); + let inner: HybridCache = rt.block_on(async move { + HybridCacheBuilder::new() + .with_name("foyer-parquet-page-cache") + .memory(memory_capacity_bytes) + .with_eviction_config(LruConfig { high_priority_pool_ratio: 0.1 }) + .storage() + .with_device_config( + DirectFsDeviceOptionsBuilder::new(disk_dir_clone) + .with_capacity(disk_capacity_bytes) + .build() + ) + .build() + .await + .expect("[FOYER-PAGE-CACHE] failed to build Foyer HybridCache") + }); + + log_info!( + "[FOYER-PAGE-CACHE] hybrid page cache ready: memory={}B, disk={}B, dir={}", + memory_capacity_bytes, disk_capacity_bytes, disk_dir.display() + ); + + // CRITICAL: keep `rt` alive as an Arc field. + // Foyer spawns background store tasks on this runtime during build(). + // If `rt` is dropped here, those tasks are cancelled → JoinError::Cancelled panic. + let runtime = Arc::new(rt); + + Self { inner, _runtime: runtime, memory_capacity_bytes, disk_capacity_bytes, disk_dir } + } + + // ── Key helpers ──────────────────────────────────────────────── + + /// Build the string cache key from a file path and a byte range. + /// Format: `":-"` + pub fn make_key(path: &str, start: usize, end: usize) -> String { + format!("{}:{}-{}", path, start, end) + } + + // ── Cache operations ─────────────────────────────────────────── + + /// Async lookup. Returns `Some(Bytes)` on hit (from memory or disk), `None` on miss. + /// + /// This is `async` because a disk-tier lookup involves I/O on Foyer's background threads. + /// Callers that are already inside an async context (e.g. `CachingObjectStore::get_range`) + /// can simply `.await` this. + pub async fn get(&self, path: &str, start: usize, end: usize) -> Option { + let key = Self::make_key(path, start, end); + match self.inner.get(&key).await { + Ok(Some(entry)) => { + log_debug!( + "[FOYER-PAGE-CACHE] HIT (L1-mem or L2-disk): path={}, range={}..{}, key={}", + path, start, end, key + ); + Some(entry.value().clone().into_bytes()) + } + Ok(None) => { + log_debug!( + "[FOYER-PAGE-CACHE] MISS (not in memory or disk): path={}, range={}..{}", + path, start, end + ); + None + } + Err(e) => { + log_error!( + "[FOYER-PAGE-CACHE] error reading cache: path={}, range={}..{}, err={}", + path, start, end, e + ); + None + } + } + } + + /// Synchronous get — blocks on Foyer's own runtime. + /// + /// Use this from JNI callbacks that cannot be `async`. + pub fn get_blocking(&self, path: &str, start: usize, end: usize) -> Option { + self._runtime.block_on(self.get(path, start, end)) + } + + /// Insert a byte range. The insert goes to memory (L1) synchronously; Foyer spills + /// to disk (L2) asynchronously in the background. + pub fn put(&self, path: impl Into, start: usize, end: usize, value: Bytes) { + let path = path.into(); + let key = Self::make_key(&path, start, end); + let size = value.len(); + log_debug!( + "[FOYER-PAGE-CACHE] PUT: path={}, range={}..{}, size={}B, key={}", + path, start, end, size, key + ); + self.inner.insert(key, CachedBytes::from_bytes(value)); + } + + /// Evict all cached byte ranges for a given file path. + /// + /// Called when a Parquet file is deleted (merged/compacted/tiered out). + /// Because Foyer does not support prefix-based removal, this is a no-op with a log warning. + /// The entry will expire naturally via LRU eviction; stale reads are prevented by the + /// FileRegistry (which marks the file as deleted before any caller can open it). + /// + /// A production improvement would be to track per-file keys in a `DashMap` and evict them + /// individually — the `FoyerDiskPageCacheWithIndex` (not implemented yet) would do this. + pub fn evict_file(&self, path: &str) { + log_info!( + "[FOYER-PAGE-CACHE] evict_file: path={} — Foyer does not support prefix eviction; \ + entry will be evicted by LRU. FileRegistry guards against stale reads.", + path + ); + // Foyer's remove() takes a full key, not a prefix. + // Without a key index we cannot enumerate all ranges for a file. + // This is safe because PassthroughCacheStrategy / FoyerParquetCacheStrategy + // always checks the FileRegistry location before opening an IndexInput. + } + + /// Clear the entire cache (both memory and disk tiers). + pub async fn clear(&self) { + log_info!("[FOYER-PAGE-CACHE] clearing all entries from memory and disk"); + if let Err(e) = self.inner.clear().await { + log_error!("[FOYER-PAGE-CACHE] error during clear: {}", e); + } + } + + /// Synchronous clear variant for JNI. + /// Runs on Foyer's own Tokio runtime so the async clear can complete cleanly. + pub fn clear_blocking(&self) { + self._runtime.block_on(self.clear()); + } + + /// Returns the in-memory L1 usage in bytes. + pub fn memory_usage_bytes(&self) -> usize { + self.inner.memory().usage() + } + + /// Returns the configured memory L1 capacity. + pub fn memory_capacity_bytes(&self) -> usize { + self.memory_capacity_bytes + } + + /// Returns the configured disk L2 capacity. + pub fn disk_capacity_bytes(&self) -> usize { + self.disk_capacity_bytes + } + + /// Returns the disk directory. + pub fn disk_dir(&self) -> &std::path::Path { + &self.disk_dir + } +} diff --git a/plugins/engine-datafusion/jni/src/lib.rs b/plugins/engine-datafusion/jni/src/lib.rs index 62b4cfafe1fa7..525bb45f95a5a 100644 --- a/plugins/engine-datafusion/jni/src/lib.rs +++ b/plugins/engine-datafusion/jni/src/lib.rs @@ -40,6 +40,8 @@ mod absolute_row_id_optimizer; mod listing_table; mod cache; mod custom_cache_manager; +mod foyer_cache; +mod caching_object_store; mod tiered; mod memory; mod cross_rt_stream; @@ -294,6 +296,24 @@ pub extern "system" fn Java_org_opensearch_datafusion_jni_NativeBridge_createGlo .with_disk_manager_builder(builder) .build().unwrap(); + // If a Foyer page cache is configured, wrap the default LocalFileSystem for file:// + // with CachingObjectStore so all get_range / get_ranges calls are intercepted. + if let Some(ref mgr) = custom_cache_manager { + if let Some(page_cache) = mgr.get_page_cache() { + use crate::caching_object_store::CachingObjectStore; + use object_store::local::LocalFileSystem; + use url::Url; + + let base_store: Arc = Arc::new(LocalFileSystem::new()); + let caching_store = Arc::new(CachingObjectStore::new(base_store, page_cache)); + runtime_env.register_object_store( + &Url::parse("file://").unwrap(), + caching_store, + ); + log_info!("[createGlobalRuntime] CachingObjectStore registered for file:// (Foyer page cache active)"); + } + } + let runtime = DataFusionRuntime { runtime_env, custom_cache_manager, @@ -422,7 +442,7 @@ pub extern "system" fn Java_org_opensearch_datafusion_jni_NativeBridge_registerO let runtime = unsafe { &*(runtime_ptr as *const DataFusionRuntime) }; - let store: Arc = unsafe { + let tiered_store: Arc = unsafe { let fat_ptr: *const dyn object_store::ObjectStore = std::mem::transmute([ obj_store_data_ptr as usize, obj_store_vtable_ptr as usize, @@ -431,11 +451,25 @@ pub extern "system" fn Java_org_opensearch_datafusion_jni_NativeBridge_registerO Arc::from_raw(fat_ptr) }; + // If a Foyer page cache is configured, wrap TieredObjectStore with CachingObjectStore + // so that get_range / get_ranges calls are intercepted and cached. + // Without this wrapping, TieredObjectStore overwrites the CachingObjectStore registered + // in createGlobalRuntime, defeating the Foyer cache entirely. + let store_to_register: Arc = + if let Some(page_cache) = runtime.custom_cache_manager.as_ref().and_then(|m| m.get_page_cache()) { + use crate::caching_object_store::CachingObjectStore; + log_info!("[FOYER-PAGE-CACHE] registerObjectStore: wrapping TieredObjectStore with CachingObjectStore"); + Arc::new(CachingObjectStore::new(tiered_store, page_cache)) + } else { + log_info!("[registerObjectStore] no page cache — registering TieredObjectStore directly"); + tiered_store + }; + runtime.runtime_env.register_object_store( &url::Url::parse("file://").unwrap(), - store, + store_to_register, ); - log_info!("[registerObjectStore] registered TieredObjectStore for file:// scheme, data_ptr={}, vtable_ptr={}", + log_info!("[registerObjectStore] registered TieredObjectStore (+ Foyer cache if configured) for file:// scheme, data_ptr={}, vtable_ptr={}", obj_store_data_ptr, obj_store_vtable_ptr); } diff --git a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DataFusionPlugin.java b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DataFusionPlugin.java index d1be71c6b9267..ff5303ef9e7ed 100644 --- a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DataFusionPlugin.java +++ b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DataFusionPlugin.java @@ -49,6 +49,7 @@ import org.opensearch.script.ScriptService; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.client.Client; +import org.opensearch.vectorized.execution.jni.PageCacheProvider; import org.opensearch.vectorized.execution.jni.NativeObjectStoreProvider; import org.opensearch.watcher.ResourceWatcherService; @@ -76,8 +77,11 @@ /** * Main plugin class for OpenSearch DataFusion integration. * + * Also implements {@link PageCacheProvider} so that the tiered-storage module's + * {@code CachedParquetCacheStrategy} can call back into the page cache owned + * by this plugin, without any classloader visibility issues. */ -public class DataFusionPlugin extends Plugin implements ActionPlugin, SearchEnginePlugin, AnalyticsBackEndPlugin, ExtensiblePlugin, SearchAnalyticsBackEndPlugin { +public class DataFusionPlugin extends Plugin implements ActionPlugin, SearchEnginePlugin, AnalyticsBackEndPlugin, ExtensiblePlugin, SearchAnalyticsBackEndPlugin, PageCacheProvider { private DataFusionService dataFusionService; @@ -298,6 +302,34 @@ public org.opensearch.index.engine.exec.CatalogSnapshotAwareReaderManager cre return null; } + // ---- PageCacheProvider implementation ---- + // Delegates to the Foyer page cache inside the DataFusion runtime via JNI. + // Called by CachedParquetCacheStrategy in the tiered-storage module. + + @Override + public byte[] getPageRange(String path, int start, int end) { + if (dataFusionService == null) return null; + long runtimePtr = dataFusionService.getRuntimePointer(); + if (runtimePtr == 0) return null; + return org.opensearch.datafusion.jni.NativeBridge.foyerPageCacheGet(runtimePtr, path, start, end); + } + + @Override + public void putPageRange(String path, int start, int end, byte[] data) { + if (dataFusionService == null || data == null) return; + long runtimePtr = dataFusionService.getRuntimePointer(); + if (runtimePtr == 0) return; + org.opensearch.datafusion.jni.NativeBridge.foyerPageCachePut(runtimePtr, path, start, end, data); + } + + @Override + public void evictFile(String path) { + if (dataFusionService == null) return; + long runtimePtr = dataFusionService.getRuntimePointer(); + if (runtimePtr == 0) return; + org.opensearch.datafusion.jni.NativeBridge.foyerPageCacheEvictFile(runtimePtr, path); + } + public interface ParentAware { void setParentPlugin(DataFusionPlugin parent); } diff --git a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DataFusionService.java b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DataFusionService.java index c0fb53d1acc79..dfb5651d55f00 100644 --- a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DataFusionService.java +++ b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DataFusionService.java @@ -98,6 +98,22 @@ protected void doStart() { @Override protected void doStop() { logger.info("Stopping DataFusion service"); + // Clear the Foyer page cache BEFORE shutting down the Tokio runtime. + // If the runtime is shut down first, Foyer's background store tasks get + // JoinError::Cancelled and foyer-storage panics at store.rs:151. + // Calling clear() (which calls cache.close().await) drains Foyer's async + // tasks cleanly while the runtime is still alive. + if (runtimeEnv != null) { + long runtimePtr = runtimeEnv.getPointer(); + if (runtimePtr != 0) { + try { + org.opensearch.datafusion.jni.NativeBridge.cacheManagerClear(runtimePtr); + logger.info("[FOYER-PAGE-CACHE] page cache cleared before runtime shutdown"); + } catch (Exception e) { + logger.warn("[FOYER-PAGE-CACHE] error clearing page cache on shutdown: {}", e.getMessage()); + } + } + } runtimeEnv.close(); logger.info("DataFusion service stopped"); } diff --git a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/jni/NativeBridge.java b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/jni/NativeBridge.java index ecd552c85fdd3..14909527215b4 100644 --- a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/jni/NativeBridge.java +++ b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/jni/NativeBridge.java @@ -91,6 +91,36 @@ private NativeBridge() {} // Memory monitoring public static native void printMemoryPoolAllocation(long runtimePtr); + // Foyer page cache operations (Layer 3: compressed Parquet byte range cache) + // These operate on the FoyerPageCacheWithIndex inside the DataFusion runtime's CustomCacheManager. + + /** + * Look up a cached byte range for a Parquet file. + * @param runtimePtr the DataFusion runtime pointer + * @param path file path key (local path without leading slash) + * @param start byte range start (inclusive) + * @param end byte range end (exclusive) + * @return cached bytes, or null on cache miss + */ + public static native byte[] foyerPageCacheGet(long runtimePtr, String path, int start, int end); + + /** + * Store a byte range for a Parquet file in the Foyer page cache. + * @param runtimePtr the DataFusion runtime pointer + * @param path file path key + * @param start byte range start (inclusive) + * @param end byte range end (exclusive) + * @param data the bytes to cache + */ + public static native void foyerPageCachePut(long runtimePtr, String path, int start, int end, byte[] data); + + /** + * Evict all cached byte ranges for a given file from the Foyer page cache. + * @param runtimePtr the DataFusion runtime pointer + * @param path file path whose ranges should be evicted + */ + public static native void foyerPageCacheEvictFile(long runtimePtr, String path); + // Logger initialization public static native void initLogger(); diff --git a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/cache/CacheSettings.java b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/cache/CacheSettings.java index 7b9685e4ad608..d05ec696addb7 100644 --- a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/cache/CacheSettings.java +++ b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/cache/CacheSettings.java @@ -53,16 +53,62 @@ public class CacheSettings { public static final Setting STATISTICS_CACHE_ENABLED = Setting.boolSetting(STATISTICS_CACHE_ENABLED_KEY, true, Setting.Property.NodeScope, Setting.Property.Dynamic); + // --- Page Cache (Cache Layer 3: Foyer hybrid memory+disk cache for Parquet column chunk byte ranges) --- + + /** + * L1 memory budget for the Foyer page cache. + * Hot byte ranges (recently read Parquet column chunk pages) are kept here. + * Default: 256 MB. + */ + public static final String PAGE_CACHE_SIZE_LIMIT_KEY = "datafusion.page.cache.size.limit"; + public static final Setting PAGE_CACHE_SIZE_LIMIT = + new Setting<>(PAGE_CACHE_SIZE_LIMIT_KEY, "256mb", + (s) -> ByteSizeValue.parseBytesSizeValue(s, new ByteSizeValue(64, ByteSizeUnit.MB), PAGE_CACHE_SIZE_LIMIT_KEY), + Setting.Property.NodeScope, Setting.Property.Dynamic); + + /** + * L2 disk budget for the Foyer page cache. + * Warm byte ranges evicted from L1 memory are spilled to this disk store + * to avoid re-fetching from S3/GCS/Azure. Must be larger than PAGE_CACHE_SIZE_LIMIT. + * Default: 10 GB. + */ + public static final String PAGE_CACHE_DISK_CAPACITY_KEY = "datafusion.page.cache.disk.capacity"; + public static final Setting PAGE_CACHE_DISK_CAPACITY = + new Setting<>(PAGE_CACHE_DISK_CAPACITY_KEY, "10gb", + (s) -> ByteSizeValue.parseBytesSizeValue(s, new ByteSizeValue(1, ByteSizeUnit.GB), PAGE_CACHE_DISK_CAPACITY_KEY), + Setting.Property.NodeScope, Setting.Property.Dynamic); + + /** + * Directory on local NVMe where Foyer stores the L2 disk cache files. + * Should be a fast local path (not S3/NFS). Foyer creates it if it doesn't exist. + * Default: "/tmp/foyer-page-cache". + */ + public static final String PAGE_CACHE_DIR_KEY = "datafusion.page.cache.dir"; + public static final Setting PAGE_CACHE_DIR = new Setting<>( + PAGE_CACHE_DIR_KEY, + "/tmp/foyer-page-cache", + Function.identity(), + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + + public static final String PAGE_CACHE_ENABLED_KEY = "datafusion.page.cache.enabled"; + public static final Setting PAGE_CACHE_ENABLED = + Setting.boolSetting(PAGE_CACHE_ENABLED_KEY, true, Setting.Property.NodeScope, Setting.Property.Dynamic); public static final List> CACHE_SETTINGS = Arrays.asList( METADATA_CACHE_SIZE_LIMIT, METADATA_CACHE_EVICTION_TYPE, STATISTICS_CACHE_SIZE_LIMIT, - STATISTICS_CACHE_EVICTION_TYPE + STATISTICS_CACHE_EVICTION_TYPE, + PAGE_CACHE_SIZE_LIMIT, + PAGE_CACHE_DISK_CAPACITY, + PAGE_CACHE_DIR ); public static final List> CACHE_ENABLED = Arrays.asList( METADATA_CACHE_ENABLED, - STATISTICS_CACHE_ENABLED + STATISTICS_CACHE_ENABLED, + PAGE_CACHE_ENABLED ); } diff --git a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/cache/CacheUtils.java b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/cache/CacheUtils.java index 3c3eaf755d264..fe0924cd9f394 100644 --- a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/cache/CacheUtils.java +++ b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/cache/CacheUtils.java @@ -21,6 +21,10 @@ import static org.opensearch.datafusion.search.cache.CacheSettings.STATISTICS_CACHE_ENABLED; import static org.opensearch.datafusion.search.cache.CacheSettings.STATISTICS_CACHE_EVICTION_TYPE; import static org.opensearch.datafusion.search.cache.CacheSettings.STATISTICS_CACHE_SIZE_LIMIT; +import static org.opensearch.datafusion.search.cache.CacheSettings.PAGE_CACHE_ENABLED; +import static org.opensearch.datafusion.search.cache.CacheSettings.PAGE_CACHE_SIZE_LIMIT; +import static org.opensearch.datafusion.search.cache.CacheSettings.PAGE_CACHE_DISK_CAPACITY; +import static org.opensearch.datafusion.search.cache.CacheSettings.PAGE_CACHE_DIR; /** * Utility class for cache initialization and configuration. @@ -36,30 +40,25 @@ private CacheUtils() {} * Cache type enumeration with associated settings. */ public enum CacheType { - METADATA( - "METADATA", - METADATA_CACHE_ENABLED, - METADATA_CACHE_SIZE_LIMIT, - METADATA_CACHE_EVICTION_TYPE - ), + METADATA("METADATA", METADATA_CACHE_ENABLED, METADATA_CACHE_SIZE_LIMIT), + STATISTICS("STATISTICS", STATISTICS_CACHE_ENABLED, STATISTICS_CACHE_SIZE_LIMIT), - STATISTICS("STATISTICS",STATISTICS_CACHE_ENABLED, STATISTICS_CACHE_SIZE_LIMIT,STATISTICS_CACHE_EVICTION_TYPE); + /** + * Cache Layer 3: Foyer hybrid (memory + disk) page cache for compressed Parquet byte ranges. + * L1 = memory (PAGE_CACHE_SIZE_LIMIT), L2 = disk (PAGE_CACHE_DISK_CAPACITY at PAGE_CACHE_DIR). + * The eviction string passed to Rust is encoded as: {@code "|"} + * so that Rust's {@code parse_page_cache_params()} can unpack both L2 disk settings. + */ + PAGES("PAGES", PAGE_CACHE_ENABLED, PAGE_CACHE_SIZE_LIMIT); private final String cacheTypeName; private final Setting enabledSetting; private final Setting sizeLimitSetting; - private final Setting evictionTypeSetting; - - CacheType( - String cacheTypeName, - Setting enabledSetting, - Setting sizeLimitSetting, - Setting evictionTypeSetting - ) { + + CacheType(String cacheTypeName, Setting enabledSetting, Setting sizeLimitSetting) { this.cacheTypeName = cacheTypeName; this.enabledSetting = enabledSetting; this.sizeLimitSetting = sizeLimitSetting; - this.evictionTypeSetting = evictionTypeSetting; } public boolean isEnabled(ClusterSettings clusterSettings) { @@ -74,18 +73,10 @@ public Setting getSizeLimitSetting() { return sizeLimitSetting; } - public Setting getEvictionTypeSetting() { - return evictionTypeSetting; - } - public ByteSizeValue getSizeLimit(ClusterSettings clusterSettings) { return clusterSettings.get(sizeLimitSetting); } - public String getEvictionType(ClusterSettings clusterSettings) { - return clusterSettings.get(evictionTypeSetting); - } - public String getCacheTypeName() { return cacheTypeName; } @@ -93,28 +84,48 @@ public String getCacheTypeName() { /** * Creates and configures a CacheManagerConfig pointer with all enabled caches. + * For each cache type, calls NativeBridge.createCache() with the appropriate + * size and configuration string. * * @param clusterSettings OpenSearch cluster settings containing cache configuration */ public static long createCacheConfig(ClusterSettings clusterSettings) { - logger.info("Initializing cache configuration"); + logger.info("[FOYER-PAGE-CACHE] initializing cache configuration"); long cacheManagerPtr = NativeBridge.createCustomCacheManager(); - // Configure each enabled cache type - for (CacheType type : CacheType.values()) { - if (type.isEnabled(clusterSettings)) { - logger.info("Configuring {} cache: size={} bytes, eviction={}", - type.getCacheTypeName(), - type.getSizeLimit(clusterSettings).getBytes(), - type.getEvictionType(clusterSettings)); - - NativeBridge.createCache(cacheManagerPtr, type.cacheTypeName, type.getSizeLimit(clusterSettings).getBytes(), type.getEvictionType(clusterSettings)); - // clusterSettings.addSettingsUpdateConsumer(type.sizeLimitSetting,(v) -> NativeBridge.cacheManagerUpdateSizeLimitForCacheType(cacheManagerPtr, CacheType.METADATA.getCacheTypeName(),v.getBytes())); - } else { - logger.debug("Cache type {} is disabled", type.getCacheTypeName()); - } + + // METADATA cache (Layer 1: Parquet footer/schema) + if (CacheType.METADATA.isEnabled(clusterSettings)) { + long size = CacheType.METADATA.getSizeLimit(clusterSettings).getBytes(); + String eviction = clusterSettings.get(CacheSettings.METADATA_CACHE_EVICTION_TYPE); + logger.info("[CACHE INFO] Configuring METADATA cache: size={}B, eviction={}", size, eviction); + NativeBridge.createCache(cacheManagerPtr, "METADATA", size, eviction); } - logger.info("Cache configuration completed"); + + // STATISTICS cache (Layer 2: row counts, min/max) + if (CacheType.STATISTICS.isEnabled(clusterSettings)) { + long size = CacheType.STATISTICS.getSizeLimit(clusterSettings).getBytes(); + String eviction = clusterSettings.get(CacheSettings.STATISTICS_CACHE_EVICTION_TYPE); + logger.info("[CACHE INFO] Configuring STATISTICS cache: size={}B, eviction={}", size, eviction); + NativeBridge.createCache(cacheManagerPtr, "STATISTICS", size, eviction); + } + + // PAGES cache (Layer 3: Foyer hybrid memory+disk byte range cache) + if (CacheType.PAGES.isEnabled(clusterSettings)) { + long memBytes = clusterSettings.get(PAGE_CACHE_SIZE_LIMIT).getBytes(); + long diskBytes = clusterSettings.get(PAGE_CACHE_DISK_CAPACITY).getBytes(); + String diskDir = clusterSettings.get(PAGE_CACHE_DIR); + // Encode disk settings into the eviction_type string: "|" + // Rust's parse_page_cache_params() reads this format. + String evictionEncoded = diskBytes + "|" + diskDir; + logger.info( + "[FOYER-PAGE-CACHE] Configuring PAGES cache: L1-mem={}B, L2-disk={}B, dir={}, encoded={}", + memBytes, diskBytes, diskDir, evictionEncoded + ); + NativeBridge.createCache(cacheManagerPtr, "PAGES", memBytes, evictionEncoded); + } + + logger.info("[FOYER-PAGE-CACHE] cache configuration completed"); return cacheManagerPtr; } } diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 547ee60bdb44c..681d132ebac3c 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -1178,6 +1178,23 @@ protected Node(final Environment initialEnvironment, Collection clas .findFirst() .orElse(null); + // Discover PageCacheProvider from plugins (e.g. DataFusionPlugin) and wire it into + // any plugin implementing PageCacheAware (e.g. TieredStoragePlugin) so that + // CachedParquetCacheStrategy is used for parquet format files instead of + // PassthroughCacheStrategy (which re-fetches from S3 on every openIndexInput()). + // Uses PageCacheAware interface (in vectorized-exec-spi) to avoid a compile-time + // dependency on modules/tiered-storage from the server module. + org.opensearch.vectorized.execution.jni.PageCacheProvider pageCacheProvider = + pluginsService.filterPlugins(org.opensearch.vectorized.execution.jni.PageCacheProvider.class) + .stream() + .findFirst() + .orElse(null); + if (pageCacheProvider != null) { + final org.opensearch.vectorized.execution.jni.PageCacheProvider pcp = pageCacheProvider; + pluginsService.filterPlugins(org.opensearch.vectorized.execution.jni.PageCacheAware.class) + .forEach(p -> p.setPageCacheProvider(pcp)); + } + Collection dataSourceAwareComponents = pluginsService.filterPlugins(SearchEnginePlugin.class) .stream() .flatMap(