Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,29 @@ public interface PageCacheProvider {
* @param path the local file path whose cached ranges should be removed
*/
void evictFile(String path);

/**
* Returns the number of bytes currently stored in the page cache's disk tier (L2).
* Used by {@code DiskBudgetManager} to aggregate disk usage across all SSD consumers.
* <p>
* Default implementation returns {@code 0} so existing implementors are not broken.
*
* @return disk bytes currently in use by the page cache, or {@code 0} if unknown
*/
default long getDiskUsageBytes() {
return 0L;
}

/**
* Returns the configured disk capacity of the page cache's disk tier (L2).
* Used by {@code DiskBudgetManager} to validate that total cache budgets do not
* exceed available SSD space at node startup.
* <p>
* Default implementation returns {@code 0} so existing implementors are not broken.
*
* @return disk capacity configured for the page cache in bytes, or {@code 0} if disabled
*/
default long getDiskCapacityBytes() {
return 0L;
}
}
24 changes: 24 additions & 0 deletions plugins/engine-datafusion/jni/src/cache_jni.rs
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,30 @@ pub extern "system" fn Java_org_opensearch_datafusion_jni_NativeBridge_foyerPage
page_cache.put(path_str, start as usize, end as usize, Bytes::from(bytes_vec));
}

/// Returns the number of bytes currently stored in Foyer's L2 disk tier.
/// Mapped to: {@code NativeBridge.foyerDiskUsageBytes(long runtimePtr)}
/// Used by {@code DiskBudgetManager.getStats()} to populate the
/// {@code disk_budget.format_cache.used_in_bytes} field in {@code _nodes/stats}.
#[no_mangle]
pub extern "system" fn Java_org_opensearch_datafusion_jni_NativeBridge_foyerDiskUsageBytes(
_env: JNIEnv,
_class: JClass,
runtime_ptr: jlong,
) -> jlong {
if runtime_ptr == 0 {
return 0;
}
let runtime = unsafe { &*(runtime_ptr as *const DataFusionRuntime) };
match runtime.custom_cache_manager.as_ref().and_then(|m| m.get_page_cache()) {
Some(cache) => {
let bytes = cache.disk_usage_bytes() as jlong;
log_debug!("[FOYER-PAGE-CACHE] JNI foyerDiskUsageBytes: {}B", bytes);
bytes
}
None => 0,
}
}

/// Evict all cached byte ranges for a given Parquet file.
/// Called when a file is deleted (merged/compacted/tiered out).
#[no_mangle]
Expand Down
14 changes: 14 additions & 0 deletions plugins/engine-datafusion/jni/src/foyer_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,20 @@ impl FoyerDiskPageCache {
self.memory_capacity_bytes
}

/// Returns the cumulative bytes written to the Foyer L2 disk tier since cache creation.
///
/// Note: Foyer 0.11.5 does not expose a "current occupancy" metric for the disk tier.
/// `DeviceStats::write_bytes` is the closest available proxy — it counts total bytes
/// flushed to disk and is monotonically increasing. It overestimates current usage
/// (evicted bytes are not subtracted) but is accurate enough for disk budget monitoring.
///
/// Used by `DiskBudgetManager` via `foyerDiskUsageBytes()` JNI to populate
/// `disk_budget.format_cache.used_in_bytes` in `_nodes/stats`.
pub fn disk_usage_bytes(&self) -> usize {
use std::sync::atomic::Ordering;
self.inner.stats().write_bytes.load(Ordering::Relaxed)
}

/// Returns the configured disk L2 capacity.
pub fn disk_capacity_bytes(&self) -> usize {
self.disk_capacity_bytes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,28 @@ public org.opensearch.index.engine.exec.CatalogSnapshotAwareReaderManager<?> cre
// Delegates to the Foyer page cache inside the DataFusion runtime via JNI.
// Called by CachedParquetCacheStrategy in the tiered-storage module.

@Override
public long getDiskUsageBytes() {
if (dataFusionService == null) return 0L;
long runtimePtr = dataFusionService.getRuntimePointer();
if (runtimePtr == 0) return 0L;
try {
// The JNI method returns the actual bytes used in Foyer's L2 disk tier.
return org.opensearch.datafusion.jni.NativeBridge.foyerDiskUsageBytes(runtimePtr);
} catch (UnsatisfiedLinkError e) {
// Rust implementation not yet available in this build — return 0 safely.
// This prevents _nodes/stats from crashing the node.
return 0L;
}
}

@Override
public long getDiskCapacityBytes() {
if (dataFusionService == null) return 0L;
// Return the configured Foyer disk capacity from the cache manager.
return dataFusionService.getFormatCacheDiskCapacityBytes();
}

@Override
public byte[] getPageRange(String path, int start, int end) {
if (dataFusionService == null) return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public class DataFusionService extends AbstractLifecycleComponent {

private final DataSourceRegistry dataSourceRegistry;
private final DataFusionRuntimeEnv runtimeEnv;
private final ClusterService clusterService;
private volatile NativeObjectStoreProvider nativeObjectStoreProvider;
private volatile boolean objectStoreRegistered = false;

Expand All @@ -39,6 +40,7 @@ public class DataFusionService extends AbstractLifecycleComponent {
*/
public DataFusionService(Map<DataFormat, DataSourceCodec> dataSourceCodecs, ClusterService clusterService, String spill_dir) {
this.dataSourceRegistry = new DataSourceRegistry(dataSourceCodecs);
this.clusterService = clusterService;

// to verify jni
String version = NativeBridge.getVersionInfo();
Expand Down Expand Up @@ -152,4 +154,21 @@ public String getVersion() {
public CacheManager getCacheManager() {
return runtimeEnv.getCacheManager();
}

/**
* Returns the configured disk capacity for the format (page) cache disk tier in bytes.
* Reads {@code datafusion.page.cache.disk.capacity} from the cluster settings.
* Used by {@code DiskBudgetManager} for startup validation and stats reporting.
*
* @return configured disk capacity in bytes, or {@code 0} if the format cache is disabled
*/
public long getFormatCacheDiskCapacityBytes() {
try {
return clusterService.getClusterSettings()
.get(org.opensearch.datafusion.search.cache.CacheSettings.PAGE_CACHE_DISK_CAPACITY)
.getBytes();
} catch (Exception e) {
return 0L;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,15 @@ private NativeBridge() {}
*/
public static native void foyerPageCacheEvictFile(long runtimePtr, String path);

/**
* Returns the number of bytes currently stored in Foyer's L2 disk tier.
* Used by {@code DiskBudgetManager} to report format-cache disk usage in {@code _nodes/stats}.
*
* @param runtimePtr the DataFusion runtime pointer
* @return disk bytes in use by the Foyer page cache, or 0 if unavailable
*/
public static native long foyerDiskUsageBytes(long runtimePtr);


// Logger initialization
public static native void initLogger();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.opensearch.index.stats.IndexingPressureStats;
import org.opensearch.index.stats.ShardIndexingPressureStats;
import org.opensearch.index.store.remote.filecache.AggregateFileCacheStats;
import org.opensearch.monitor.fs.DiskBudgetStats;
import org.opensearch.indices.NodeIndicesStats;
import org.opensearch.ingest.IngestStats;
import org.opensearch.monitor.fs.FsInfo;
Expand Down Expand Up @@ -166,6 +167,9 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment {
@Nullable
private RemoteStoreNodeStats remoteStoreNodeStats;

@Nullable
private DiskBudgetStats diskBudgetStats;

public NodeStats(StreamInput in) throws IOException {
super(in);
timestamp = in.readVLong();
Expand Down Expand Up @@ -252,6 +256,9 @@ public NodeStats(StreamInput in) throws IOException {
} else {
remoteStoreNodeStats = null;
}
// DiskBudgetStats — not version-gated because this is a new experimental addition
// in a feature branch. In open-source it should be gated on a new version constant.
diskBudgetStats = in.readOptionalWriteable(DiskBudgetStats::new);
}

public NodeStats(
Expand Down Expand Up @@ -284,7 +291,8 @@ public NodeStats(
@Nullable RepositoriesStats repositoriesStats,
@Nullable AdmissionControlStats admissionControlStats,
@Nullable NodeCacheStats nodeCacheStats,
@Nullable RemoteStoreNodeStats remoteStoreNodeStats
@Nullable RemoteStoreNodeStats remoteStoreNodeStats,
@Nullable DiskBudgetStats diskBudgetStats
) {
super(node);
this.timestamp = timestamp;
Expand Down Expand Up @@ -316,6 +324,7 @@ public NodeStats(
this.admissionControlStats = admissionControlStats;
this.nodeCacheStats = nodeCacheStats;
this.remoteStoreNodeStats = remoteStoreNodeStats;
this.diskBudgetStats = diskBudgetStats;
}

public long getTimestamp() {
Expand Down Expand Up @@ -483,6 +492,11 @@ public RemoteStoreNodeStats getRemoteStoreNodeStats() {
return remoteStoreNodeStats;
}

@Nullable
public DiskBudgetStats getDiskBudgetStats() {
return diskBudgetStats;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
Expand Down Expand Up @@ -544,6 +558,7 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_2_18_0)) {
out.writeOptionalWriteable(remoteStoreNodeStats);
}
out.writeOptionalWriteable(diskBudgetStats);
}

@Override
Expand Down Expand Up @@ -653,6 +668,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (getRemoteStoreNodeStats() != null) {
getRemoteStoreNodeStats().toXContent(builder, params);
}
if (getDiskBudgetStats() != null) {
getDiskBudgetStats().toXContent(builder, params);
}
return builder;
}
}
Loading
Loading