Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not the right place. Placed here to avoid compilation issue.
Update location post open sourcing Tiered storage.

* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.vectorized.execution.jni;

/**
* Marker interface for plugins that can receive a {@link PageCacheProvider}.
* <p>
* Implemented by {@code TieredStoragePlugin} so that {@code Node.java} (in the
* {@code server} module) can inject a page cache provider without needing
* a compile-time dependency on the {@code modules/tiered-storage} module.
* <p>
* {@code Node.java} discovers plugins implementing this interface and calls
* {@link #setPageCacheProvider(PageCacheProvider)} after the {@link PageCacheProvider}
* (e.g., {@code DataFusionPlugin}) has been discovered.
*/
public interface PageCacheAware {

/**
* Inject the page cache provider.
* Called by {@code Node.java} during node construction, after the plugin implementing
* {@link PageCacheProvider} (e.g. {@code DataFusionPlugin}) has been initialized.
*
* @param provider the page cache provider, never null when this method is called
*/
void setPageCacheProvider(PageCacheProvider provider);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.vectorized.execution.jni;

/**
* Provider for a byte-range page cache used to serve Parquet file reads on warm indices.
* <p>
* Implemented by search-engine plugins (e.g. {@code DataFusionPlugin}) that own an
* in-process page cache and want to expose it to the tiered-storage module so that
* {@code CachedParquetCacheStrategy} can serve Parquet column-chunk byte ranges from the
* cache instead of re-fetching from object storage (S3/GCS/Azure) on every
* {@code openIndexInput()} call.
* <p>
* The cache key is the local filesystem path of the Parquet file (without leading slash)
* combined with the byte range, e.g. {@code "data/nodes/0/.../parquet/_parquet_0.parquet:4096-8192"}.
* The exact key format is an implementation detail of the provider.
*/
public interface PageCacheProvider {
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we need a page cache provider we have a cache strategy provider correct and for parquet we can have pass through on java and foyer in rust correct


/**
* Look up a cached byte range for a Parquet file.
*
* @param path the local file path used as cache key (e.g. "data/nodes/0/.../parquet/_parquet_0.parquet")
* @param start byte range start (inclusive)
* @param end byte range end (exclusive)
* @return the cached bytes, or {@code null} on cache miss
*/
byte[] getPageRange(String path, int start, int end);

/**
* Store a byte range for a Parquet file in the cache.
*
* @param path the local file path used as cache key
* @param start byte range start (inclusive)
* @param end byte range end (exclusive)
* @param data the bytes to cache (must have length == end - start)
*/
void putPageRange(String path, int start, int end, byte[] data);

/**
* Evict all cached byte ranges for a given Parquet file.
* Called when a file is deleted (merged, compacted, or tiered out).
*
* @param path the local file path whose cached ranges should be removed
*/
void evictFile(String path);
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.index.store.CompositeStoreDirectoryFactory;
import org.opensearch.vectorized.execution.jni.PageCacheAware;
import org.opensearch.vectorized.execution.jni.PageCacheProvider;
import org.opensearch.vectorized.execution.jni.NativeObjectStoreProvider;

import java.util.ArrayList;
Expand All @@ -88,7 +90,7 @@
* Per-repository remote stores are added to the shared {@code FileRegistry} as new
* repositories are encountered. Different indices can point to different repositories.
*/
public class TieredStoragePlugin extends Plugin implements IndexStorePlugin, ActionPlugin, TelemetryAwarePlugin, NativeObjectStoreProvider {
public class TieredStoragePlugin extends Plugin implements IndexStorePlugin, ActionPlugin, TelemetryAwarePlugin, NativeObjectStoreProvider, PageCacheAware {

private static final org.apache.logging.log4j.Logger logger = org.apache.logging.log4j.LogManager.getLogger(TieredStoragePlugin.class);

Expand All @@ -101,6 +103,14 @@ public class TieredStoragePlugin extends Plugin implements IndexStorePlugin, Act

private volatile Supplier<RepositoriesService> repositoriesServiceSupplier;

/**
* Page cache provider — received from DataFusionPlugin via Node.java.
* Passed into TieredCompositeStoreDirectoryFactory so that CachedParquetCacheStrategy
* can be used for parquet format files instead of PassthroughCacheStrategy.
* May be null if DataFusionPlugin is not loaded or page cache is disabled.
*/
private volatile PageCacheProvider pageCacheProvider;

// Global native ObjectStore — created lazily on first warm shard creation
private volatile long globalObjStoreDataPtr;
private volatile long globalObjStoreVtablePtr;
Expand Down Expand Up @@ -139,6 +149,17 @@ public Map<String, CompositeStoreDirectoryFactory> getCompositeStoreDirectoryFac
return Collections.emptyMap();
}

/**
* Set the page cache provider (e.g. from DataFusionPlugin).
* Called by Node.java after discovering a plugin implementing {@link PageCacheProvider},
* so TieredCompositeStoreDirectoryFactory can use CachedParquetCacheStrategy.
*/
@Override
public void setPageCacheProvider(PageCacheProvider provider) {
this.pageCacheProvider = provider;
logger.info("[TieredStoragePlugin] PageCacheProvider set — parquet reads will use page cache");
}

@Override
public Map<String, org.opensearch.index.store.CachedCompositeStoreDirectoryFactory> getCachedCompositeStoreDirectoryFactories() {
return Map.of(TIERED_COMPOSITE_INDEX_TYPE, new TieredCompositeStoreDirectoryFactory(
Expand All @@ -147,7 +168,12 @@ public Map<String, org.opensearch.index.store.CachedCompositeStoreDirectoryFacto
ensureGlobalObjectStoreCreated();
ensureRemoteStoreForRepo(repoName);
return globalRegistryPtr;
}
},
// Pass as Supplier so it is resolved LAZILY at shard creation time (newDirectory()).
// getCachedCompositeStoreDirectoryFactories() is called in Node.java at line ~973
// BEFORE setPageCacheProvider() is called at line ~1191.
// The Supplier captures 'this' and reads the volatile field at call time, not now.
() -> this.pageCacheProvider
));
}

Expand Down
Loading
Loading