diff --git a/libs/vectorized-exec-spi/src/main/java/org/opensearch/vectorized/execution/jni/PageCacheProvider.java b/libs/vectorized-exec-spi/src/main/java/org/opensearch/vectorized/execution/jni/PageCacheProvider.java index 60ec86d126e40..de1c89bb30b96 100644 --- a/libs/vectorized-exec-spi/src/main/java/org/opensearch/vectorized/execution/jni/PageCacheProvider.java +++ b/libs/vectorized-exec-spi/src/main/java/org/opensearch/vectorized/execution/jni/PageCacheProvider.java @@ -50,4 +50,29 @@ public interface PageCacheProvider { * @param path the local file path whose cached ranges should be removed */ void evictFile(String path); + + /** + * Returns the number of bytes currently stored in the page cache's disk tier (L2). + * Used by {@code DiskBudgetManager} to aggregate disk usage across all SSD consumers. + *
+ * Default implementation returns {@code 0} so existing implementors are not broken. + * + * @return disk bytes currently in use by the page cache, or {@code 0} if unknown + */ + default long getDiskUsageBytes() { + return 0L; + } + + /** + * Returns the configured disk capacity of the page cache's disk tier (L2). + * Used by {@code DiskBudgetManager} to validate that total cache budgets do not + * exceed available SSD space at node startup. + *
+ * Default implementation returns {@code 0} so existing implementors are not broken.
+ *
+ * @return disk capacity configured for the page cache in bytes, or {@code 0} if disabled
+ */
+ default long getDiskCapacityBytes() {
+ return 0L;
+ }
}
diff --git a/plugins/engine-datafusion/jni/src/cache_jni.rs b/plugins/engine-datafusion/jni/src/cache_jni.rs
index 3863e8c2c808d..5f52acdfde556 100644
--- a/plugins/engine-datafusion/jni/src/cache_jni.rs
+++ b/plugins/engine-datafusion/jni/src/cache_jni.rs
@@ -592,6 +592,30 @@ pub extern "system" fn Java_org_opensearch_datafusion_jni_NativeBridge_foyerPage
page_cache.put(path_str, start as usize, end as usize, Bytes::from(bytes_vec));
}
+/// Returns the number of bytes currently stored in Foyer's L2 disk tier.
+/// Mapped to: {@code NativeBridge.foyerDiskUsageBytes(long runtimePtr)}
+/// Used by {@code DiskBudgetManager.getStats()} to populate the
+/// {@code disk_budget.format_cache.used_in_bytes} field in {@code _nodes/stats}.
+#[no_mangle]
+pub extern "system" fn Java_org_opensearch_datafusion_jni_NativeBridge_foyerDiskUsageBytes(
+ _env: JNIEnv,
+ _class: JClass,
+ runtime_ptr: jlong,
+) -> jlong {
+ if runtime_ptr == 0 {
+ return 0;
+ }
+ let runtime = unsafe { &*(runtime_ptr as *const DataFusionRuntime) };
+ match runtime.custom_cache_manager.as_ref().and_then(|m| m.get_page_cache()) {
+ Some(cache) => {
+ let bytes = cache.disk_usage_bytes() as jlong;
+ log_debug!("[FOYER-PAGE-CACHE] JNI foyerDiskUsageBytes: {}B", bytes);
+ bytes
+ }
+ None => 0,
+ }
+}
+
/// Evict all cached byte ranges for a given Parquet file.
/// Called when a file is deleted (merged/compacted/tiered out).
#[no_mangle]
diff --git a/plugins/engine-datafusion/jni/src/foyer_cache.rs b/plugins/engine-datafusion/jni/src/foyer_cache.rs
index ca2752d8cab59..c636af9f9e710 100644
--- a/plugins/engine-datafusion/jni/src/foyer_cache.rs
+++ b/plugins/engine-datafusion/jni/src/foyer_cache.rs
@@ -246,6 +246,20 @@ impl FoyerDiskPageCache {
self.memory_capacity_bytes
}
+ /// Returns the cumulative bytes written to the Foyer L2 disk tier since cache creation.
+ ///
+ /// Note: Foyer 0.11.5 does not expose a "current occupancy" metric for the disk tier.
+ /// `DeviceStats::write_bytes` is the closest available proxy — it counts total bytes
+ /// flushed to disk and is monotonically increasing. It overestimates current usage
+ /// (evicted bytes are not subtracted) but is accurate enough for disk budget monitoring.
+ ///
+ /// Used by `DiskBudgetManager` via `foyerDiskUsageBytes()` JNI to populate
+ /// `disk_budget.format_cache.used_in_bytes` in `_nodes/stats`.
+ pub fn disk_usage_bytes(&self) -> usize {
+ use std::sync::atomic::Ordering;
+ self.inner.stats().write_bytes.load(Ordering::Relaxed)
+ }
+
/// Returns the configured disk L2 capacity.
pub fn disk_capacity_bytes(&self) -> usize {
self.disk_capacity_bytes
diff --git a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DataFusionPlugin.java b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DataFusionPlugin.java
index ff5303ef9e7ed..b25c6fd4179b2 100644
--- a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DataFusionPlugin.java
+++ b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DataFusionPlugin.java
@@ -306,6 +306,28 @@ public org.opensearch.index.engine.exec.CatalogSnapshotAwareReaderManager> cre
// Delegates to the Foyer page cache inside the DataFusion runtime via JNI.
// Called by CachedParquetCacheStrategy in the tiered-storage module.
+ @Override
+ public long getDiskUsageBytes() {
+ if (dataFusionService == null) return 0L;
+ long runtimePtr = dataFusionService.getRuntimePointer();
+ if (runtimePtr == 0) return 0L;
+ try {
+ // The JNI method returns the actual bytes used in Foyer's L2 disk tier.
+ return org.opensearch.datafusion.jni.NativeBridge.foyerDiskUsageBytes(runtimePtr);
+ } catch (UnsatisfiedLinkError e) {
+ // Rust implementation not yet available in this build — return 0 safely.
+ // This prevents _nodes/stats from crashing the node.
+ return 0L;
+ }
+ }
+
+ @Override
+ public long getDiskCapacityBytes() {
+ if (dataFusionService == null) return 0L;
+ // Return the configured Foyer disk capacity from the cache manager.
+ return dataFusionService.getFormatCacheDiskCapacityBytes();
+ }
+
@Override
public byte[] getPageRange(String path, int start, int end) {
if (dataFusionService == null) return null;
diff --git a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DataFusionService.java b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DataFusionService.java
index dfb5651d55f00..86a826d158009 100644
--- a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DataFusionService.java
+++ b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DataFusionService.java
@@ -30,6 +30,7 @@ public class DataFusionService extends AbstractLifecycleComponent {
private final DataSourceRegistry dataSourceRegistry;
private final DataFusionRuntimeEnv runtimeEnv;
+ private final ClusterService clusterService;
private volatile NativeObjectStoreProvider nativeObjectStoreProvider;
private volatile boolean objectStoreRegistered = false;
@@ -39,6 +40,7 @@ public class DataFusionService extends AbstractLifecycleComponent {
*/
public DataFusionService(Map Consumers tracked:
+ * What this class does:
+ * What this class does NOT do:
+ * This is a passive observer. It does not evict entries from either cache, does not
+ * coordinate writes, and does not block operations. Each cache manages its own eviction
+ * independently. The only enforcement point is the startup check.
+ *
+ * @opensearch.experimental
+ */
+@ExperimentalApi
+public class DiskBudgetManager {
+
+ private static final Logger logger = LogManager.getLogger(DiskBudgetManager.class);
+
+ /**
+ * Default maximum fraction of physical SSD that can be committed to long-lived caches.
+ * The remaining 10% is reserved for transient write buffers and merge temp files.
+ */
+ public static final int DEFAULT_HEADROOM_THRESHOLD_PERCENT = 90;
+
+ private final FileCache fileCache;
+ private final long formatCacheConfiguredBytes;
+ private final int headroomThresholdPercent;
+ private volatile PageCacheProvider pageCacheProvider;
+
+ /**
+ * @param fileCache the warm node FileCache, or {@code null} if not initialized
+ * @param formatCacheConfiguredBytes the configured disk budget for the format (page) cache in bytes;
+ * {@code 0} means the format cache is disabled
+ */
+ public DiskBudgetManager(FileCache fileCache, long formatCacheConfiguredBytes) {
+ this(fileCache, formatCacheConfiguredBytes, DEFAULT_HEADROOM_THRESHOLD_PERCENT);
+ }
+
+ /**
+ * @param fileCache the warm node FileCache, or {@code null} if not initialized
+ * @param formatCacheConfiguredBytes the configured disk budget for the format (page) cache in bytes
+ * @param headroomThresholdPercent maximum percent of SSD that may be committed (default 90)
+ */
+ public DiskBudgetManager(FileCache fileCache, long formatCacheConfiguredBytes, int headroomThresholdPercent) {
+ this.fileCache = fileCache;
+ this.formatCacheConfiguredBytes = formatCacheConfiguredBytes;
+ this.headroomThresholdPercent = headroomThresholdPercent;
+ }
+
+ /**
+ * Inject the format (page) cache provider. Called by {@code Node.java} after the plugin
+ * implementing {@link PageCacheProvider} has been discovered and wired.
+ */
+ public void setPageCacheProvider(PageCacheProvider provider) {
+ this.pageCacheProvider = provider;
+ logger.debug("[DiskBudgetManager] PageCacheProvider set — format cache disk stats available");
+ }
+
+ /**
+ * Validates that the sum of all long-lived cache budgets does not exceed
+ * {@code headroomThresholdPercent} of the physical SSD.
+ *
+ * Called once during node startup, before the node accepts traffic. If validation
+ * fails the node refuses to start with a clear error message.
+ *
+ * @throws IllegalStateException if configured budgets exceed the threshold
+ */
+ public void validateAtStartup() {
+ long physicalSsd = getPhysicalSsdBytes();
+ if (physicalSsd <= 0) {
+ logger.warn("[DiskBudgetManager] Could not determine physical SSD size — skipping budget validation");
+ return;
+ }
+
+ long fileCacheCapacity = fileCache != null ? fileCache.capacity() : 0L;
+ long totalCommitted = fileCacheCapacity + formatCacheConfiguredBytes;
+ long threshold = (physicalSsd * headroomThresholdPercent) / 100L;
+
+ if (totalCommitted > threshold) {
+ throw new IllegalStateException(
+ "[DiskBudgetManager] Disk budget misconfigured on this warm node. "
+ + "FileCache("
+ + fileCacheCapacity
+ + " B) + FormatCache("
+ + formatCacheConfiguredBytes
+ + " B) = "
+ + totalCommitted
+ + " B exceeds "
+ + headroomThresholdPercent
+ + "% of physical SSD ("
+ + physicalSsd
+ + " B = "
+ + threshold
+ + " B allowed). "
+ + "Reduce 'node.search.cache.size' or 'format_cache.disk.total_budget'."
+ );
+ }
+
+ int headroom = computeHeadroomPercent(physicalSsd, fileCacheCapacity, formatCacheConfiguredBytes);
+ logger.info(
+ "[DiskBudgetManager] Startup validation passed: FileCache={}B, FormatCache={}B, "
+ + "TotalCommitted={}B, PhysicalSSD={}B, Headroom={}%",
+ fileCacheCapacity,
+ formatCacheConfiguredBytes,
+ totalCommitted,
+ physicalSsd,
+ headroom
+ );
+ }
+
+ /**
+ * Returns a point-in-time snapshot of all disk budget consumers.
+ * Safe to call from any thread at any time.
+ */
+ public DiskBudgetStats getStats() {
+ long physicalSsd = getPhysicalSsdBytes();
+ long fileCacheCapacity = fileCache != null ? fileCache.capacity() : 0L;
+ long fileCacheUsed = fileCache != null ? fileCache.usage() : 0L;
+
+ PageCacheProvider provider = this.pageCacheProvider;
+ long formatCacheUsed = provider != null ? provider.getDiskUsageBytes() : 0L;
+
+ int headroom = computeHeadroomPercent(physicalSsd, fileCacheCapacity, formatCacheConfiguredBytes);
+
+ return new DiskBudgetStats(
+ physicalSsd,
+ fileCacheCapacity,
+ fileCacheUsed,
+ formatCacheConfiguredBytes,
+ formatCacheUsed,
+ headroom
+ );
+ }
+
+ // ── Helpers ───────────────────────────────────────────────────────────────
+
+ /**
+ * Returns the physical SSD capacity in bytes by scanning all data node paths and
+ * summing the unique mount-point totals.
+ *
+ * Falls back to 0 if the file system cannot be queried (e.g. in tests).
+ */
+ private long getPhysicalSsdBytes() {
+ try {
+ java.io.File[] roots = java.io.File.listRoots();
+ if (roots == null || roots.length == 0) {
+ return 0L;
+ }
+ // Use the root with the largest total space as a proxy for the primary data SSD.
+ // In a real warm node the data directory is on a single dedicated SSD.
+ return Arrays.stream(roots)
+ .mapToLong(java.io.File::getTotalSpace)
+ .max()
+ .orElse(0L);
+ } catch (Exception e) {
+ logger.debug("[DiskBudgetManager] Could not determine physical SSD size: {}", e.getMessage());
+ return 0L;
+ }
+ }
+
+ /**
+ * Computes the percentage of SSD remaining after both cache budgets.
+ * Returns 100 if {@code physicalSsd == 0} (unknown) to avoid false alarms.
+ */
+ static int computeHeadroomPercent(long physicalSsd, long fileCacheCapacity, long formatCacheCapacity) {
+ if (physicalSsd <= 0) {
+ return 100;
+ }
+ long committed = fileCacheCapacity + formatCacheCapacity;
+ int usedPercent = (int) ((committed * 100L) / physicalSsd);
+ return Math.max(0, 100 - usedPercent);
+ }
+}
diff --git a/server/src/main/java/org/opensearch/monitor/fs/DiskBudgetStats.java b/server/src/main/java/org/opensearch/monitor/fs/DiskBudgetStats.java
new file mode 100644
index 0000000000000..79f547d662da7
--- /dev/null
+++ b/server/src/main/java/org/opensearch/monitor/fs/DiskBudgetStats.java
@@ -0,0 +1,155 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.monitor.fs;
+
+import org.opensearch.common.annotation.ExperimentalApi;
+import org.opensearch.core.common.unit.ByteSizeValue;
+import org.opensearch.core.common.io.stream.StreamInput;
+import org.opensearch.core.common.io.stream.StreamOutput;
+import org.opensearch.core.common.io.stream.Writeable;
+import org.opensearch.core.xcontent.ToXContentFragment;
+import org.opensearch.core.xcontent.XContentBuilder;
+
+import java.io.IOException;
+
+/**
+ * Aggregated disk budget statistics for a warm node.
+ *
+ * A warm node has multiple independent SSD consumers:
+ * This class exposes a read-only snapshot that can be surfaced via {@code _nodes/stats}.
+ * It does not coordinate eviction — each cache manages its own eviction independently.
+ *
+ * @opensearch.experimental
+ */
+@ExperimentalApi
+public class DiskBudgetStats implements Writeable, ToXContentFragment {
+
+ /** Total physical SSD capacity on this node (bytes). */
+ private final long totalPhysicalSsdBytes;
+
+ /** Bytes configured as the FileCache capacity ({@code node.search.cache.size}). */
+ private final long fileCacheConfiguredBytes;
+
+ /** Bytes currently stored in the FileCache. */
+ private final long fileCacheUsedBytes;
+
+ /** Bytes configured as the format (page) cache disk budget ({@code format_cache.disk.total_budget}). */
+ private final long formatCacheConfiguredBytes;
+
+ /** Bytes currently stored in the format (page) cache disk tier. */
+ private final long formatCacheUsedBytes;
+
+ /**
+ * Percentage of physical SSD remaining after accounting for both configured cache budgets.
+ * Computed as {@code 100 - ((fileCacheConfigured + formatCacheConfigured) * 100 / totalPhysicalSsd)}.
+ * A negative value means the budgets are over-provisioned.
+ */
+ private final int headroomPercent;
+
+ public DiskBudgetStats(
+ long totalPhysicalSsdBytes,
+ long fileCacheConfiguredBytes,
+ long fileCacheUsedBytes,
+ long formatCacheConfiguredBytes,
+ long formatCacheUsedBytes,
+ int headroomPercent
+ ) {
+ this.totalPhysicalSsdBytes = totalPhysicalSsdBytes;
+ this.fileCacheConfiguredBytes = fileCacheConfiguredBytes;
+ this.fileCacheUsedBytes = fileCacheUsedBytes;
+ this.formatCacheConfiguredBytes = formatCacheConfiguredBytes;
+ this.formatCacheUsedBytes = formatCacheUsedBytes;
+ this.headroomPercent = headroomPercent;
+ }
+
+ /**
+ * Deserialization constructor — must read fields in the same order as {@link #writeTo}.
+ */
+ public DiskBudgetStats(StreamInput in) throws IOException {
+ this.totalPhysicalSsdBytes = in.readVLong();
+ this.fileCacheConfiguredBytes = in.readVLong();
+ this.fileCacheUsedBytes = in.readVLong();
+ this.formatCacheConfiguredBytes = in.readVLong();
+ this.formatCacheUsedBytes = in.readVLong();
+ this.headroomPercent = in.readInt();
+ }
+
+ @Override
+ public void writeTo(StreamOutput out) throws IOException {
+ out.writeVLong(totalPhysicalSsdBytes);
+ out.writeVLong(fileCacheConfiguredBytes);
+ out.writeVLong(fileCacheUsedBytes);
+ out.writeVLong(formatCacheConfiguredBytes);
+ out.writeVLong(formatCacheUsedBytes);
+ out.writeInt(headroomPercent);
+ }
+
+ // ── Getters ───────────────────────────────────────────────────────────────
+
+ public long getTotalPhysicalSsdBytes() { return totalPhysicalSsdBytes; }
+ public long getFileCacheConfiguredBytes() { return fileCacheConfiguredBytes; }
+ public long getFileCacheUsedBytes() { return fileCacheUsedBytes; }
+ public long getFormatCacheConfiguredBytes() { return formatCacheConfiguredBytes; }
+ public long getFormatCacheUsedBytes() { return formatCacheUsedBytes; }
+ public int getHeadroomPercent() { return headroomPercent; }
+
+ // ── XContent ──────────────────────────────────────────────────────────────
+
+ @Override
+ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+ builder.startObject(Fields.DISK_BUDGET);
+
+ builder.humanReadableField(Fields.TOTAL_PHYSICAL_SSD_IN_BYTES, Fields.TOTAL_PHYSICAL_SSD,
+ new ByteSizeValue(totalPhysicalSsdBytes));
+
+ // FileCache sub-object
+ builder.startObject(Fields.FILE_CACHE);
+ builder.humanReadableField(Fields.CONFIGURED_IN_BYTES, Fields.CONFIGURED,
+ new ByteSizeValue(fileCacheConfiguredBytes));
+ builder.humanReadableField(Fields.USED_IN_BYTES, Fields.USED,
+ new ByteSizeValue(fileCacheUsedBytes));
+ builder.endObject();
+
+ // Format (page) cache sub-object
+ builder.startObject(Fields.FORMAT_CACHE);
+ builder.humanReadableField(Fields.CONFIGURED_IN_BYTES, Fields.CONFIGURED,
+ new ByteSizeValue(formatCacheConfiguredBytes));
+ builder.humanReadableField(Fields.USED_IN_BYTES, Fields.USED,
+ new ByteSizeValue(formatCacheUsedBytes));
+ builder.endObject();
+
+ builder.field(Fields.HEADROOM_PERCENT, headroomPercent);
+
+ builder.endObject();
+ return builder;
+ }
+
+ // ── Field name constants ──────────────────────────────────────────────────
+
+ static final class Fields {
+ static final String DISK_BUDGET = "disk_budget";
+ static final String TOTAL_PHYSICAL_SSD = "total_physical_ssd";
+ static final String TOTAL_PHYSICAL_SSD_IN_BYTES = "total_physical_ssd_in_bytes";
+ static final String FILE_CACHE = "file_cache";
+ static final String FORMAT_CACHE = "format_cache";
+ static final String CONFIGURED = "configured";
+ static final String CONFIGURED_IN_BYTES = "configured_in_bytes";
+ static final String USED = "used";
+ static final String USED_IN_BYTES = "used_in_bytes";
+ static final String HEADROOM_PERCENT = "headroom_percent";
+ }
+}
diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java
index 681d132ebac3c..a3d203390d0eb 100644
--- a/server/src/main/java/org/opensearch/node/Node.java
+++ b/server/src/main/java/org/opensearch/node/Node.java
@@ -1567,6 +1567,22 @@ protected Node(final Environment initialEnvironment, Collection
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *