Skip to content

Commit f72f4fb

Browse files
author
Vishwas Garg
committed
Enable Foyer disk based cache for Parquet reads
1 parent a72528c commit f72f4fb

File tree

14 files changed

+899
-64
lines changed

14 files changed

+899
-64
lines changed
Binary file not shown.

plugins/engine-datafusion/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ arrow-array = "57.3.0"
1919
arrow-schema = "57.3.0"
2020
arrow-buffer = "57.3.0"
2121
downcast-rs = "1.2"
22+
foyer = { version = "=0.11.5" }
23+
bytes = "1.9"
2224

2325

2426
# JNI dependencies

plugins/engine-datafusion/jni/Cargo.toml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,12 @@ url = { workspace = true }
6868
# Liquid Cache for byte-level caching
6969
liquid-cache-datafusion-local = { workspace = true }
7070

71+
# Foyer hybrid in-memory+disk cache for Parquet page caching
72+
foyer = { workspace = true }
73+
74+
# serde_bytes: efficient Bytes serialization needed for Foyer's StorageValue bound
75+
serde_bytes = "0.11"
76+
7177
# Substrait support
7278
substrait = { workspace = true }
7379

plugins/engine-datafusion/jni/src/cache.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,13 @@ use datafusion::execution::cache::CacheAccessor;
77
use object_store::ObjectMeta;
88
use vectorized_exec_spi::log_error;
99

10-
pub const ALL_CACHE_TYPES: &[&str] = &[CACHE_TYPE_METADATA, CACHE_TYPE_STATS];
10+
pub const ALL_CACHE_TYPES: &[&str] = &[CACHE_TYPE_METADATA, CACHE_TYPE_STATS, CACHE_TYPE_PAGES];
1111

1212
// Cache type constants
1313
pub const CACHE_TYPE_METADATA: &str = "METADATA";
1414
pub const CACHE_TYPE_STATS: &str = "STATISTICS";
15+
/// Foyer-backed byte-level page cache for Parquet column chunk data (Cache Layer 3)
16+
pub const CACHE_TYPE_PAGES: &str = "PAGES";
1517

1618
// Helper function to handle cache errors
1719
#[allow(dead_code)]

plugins/engine-datafusion/jni/src/cache_jni.rs

Lines changed: 172 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,40 @@
1-
use jni::objects::{JClass, JObjectArray, JString};
2-
use jni::sys::jlong;
1+
use jni::objects::{JByteArray, JClass, JObjectArray, JString};
2+
use jni::sys::{jbyteArray, jint, jlong};
33
use jni::{JNIEnv};
44
use crate::custom_cache_manager::CustomCacheManager;
55
use crate::util::{parse_string_arr};
66
use crate::cache;
77
use crate::DataFusionRuntime;
88
use datafusion::execution::cache::cache_unit::DefaultFilesMetadataCache;
99
use std::sync::Arc;
10+
use bytes::Bytes;
1011
use vectorized_exec_spi::{log_info, log_error, log_debug};
1112

13+
// Default page cache budget — overridden by Java settings via createCache()
14+
const DEFAULT_PAGE_CACHE_DISK_BYTES: usize = 10 * 1024 * 1024 * 1024; // 10 GB disk
15+
const DEFAULT_PAGE_CACHE_DIR: &str = "/tmp/foyer-page-cache";
16+
17+
/// Parse the eviction_type string for PAGES cache type.
18+
/// Expected format: "<disk_capacity_bytes>|<disk_dir>"
19+
/// Falls back to defaults if the string is malformed (e.g. plain "LRU" from old Java code).
20+
fn parse_page_cache_params(eviction_str: &str) -> (usize, String) {
21+
if let Some(sep) = eviction_str.find('|') {
22+
let disk_bytes_str = &eviction_str[..sep];
23+
let disk_dir = eviction_str[sep + 1..].to_string();
24+
if let Ok(disk_bytes) = disk_bytes_str.parse::<usize>() {
25+
let dir = if disk_dir.is_empty() { DEFAULT_PAGE_CACHE_DIR.to_string() } else { disk_dir };
26+
return (disk_bytes, dir);
27+
}
28+
}
29+
// Fallback: plain eviction type like "LRU" from legacy config
30+
log_info!(
31+
"[FOYER-PAGE-CACHE] eviction_type '{}' is not in '<disk_bytes>|<dir>' format; \
32+
using defaults: disk={}B, dir={}",
33+
eviction_str, DEFAULT_PAGE_CACHE_DISK_BYTES, DEFAULT_PAGE_CACHE_DIR
34+
);
35+
(DEFAULT_PAGE_CACHE_DISK_BYTES, DEFAULT_PAGE_CACHE_DIR.to_string())
36+
}
37+
1238
/// Create a CustomCacheManager instance
1339
#[no_mangle]
1440
pub extern "system" fn Java_org_opensearch_datafusion_jni_NativeBridge_createCustomCacheManager(
@@ -89,6 +115,25 @@ pub extern "system" fn Java_org_opensearch_datafusion_jni_NativeBridge_createCac
89115
manager.set_statistics_cache(stats_cache);
90116
log_info!("[CACHE INFO] Successfully created {} cache in CustomCacheManager", cache_type_str);
91117
}
118+
cache::CACHE_TYPE_PAGES => {
119+
// Create Foyer disk-only page cache — Cache Layer 3.
120+
// `size_limit` is ignored for PAGES (disk-only, no memory tier).
121+
// The disk budget and disk directory are passed via eviction_type as
122+
// "<disk_bytes>|<disk_dir>".
123+
let (disk_bytes, disk_dir) = parse_page_cache_params(&eviction_type_str);
124+
log_info!(
125+
"[FOYER-PAGE-CACHE] creating disk-only page cache: disk={}B, dir={}",
126+
disk_bytes, disk_dir
127+
);
128+
let page_cache = Arc::new(crate::tiered::foyer_cache::FoyerDiskPageCache::new(
129+
disk_bytes,
130+
disk_dir,
131+
));
132+
manager.set_page_cache(page_cache);
133+
log_info!(
134+
"[FOYER-PAGE-CACHE] successfully created Foyer disk-only page cache in CustomCacheManager"
135+
);
136+
}
92137
_ => {
93138
let msg = format!("Invalid cache type: {}", cache_type_str);
94139
log_error!("[CACHE ERROR] {}", msg);
@@ -444,3 +489,128 @@ pub extern "system" fn Java_org_opensearch_datafusion_jni_NativeBridge_cacheMana
444489
}
445490
}
446491
}
492+
493+
// ============================================================================
494+
// Foyer page cache JNI operations (Layer 3: Parquet byte range cache)
495+
// Called by DataFusionPlugin.FoyerCacheProvider implementation to serve
496+
// PassthroughCacheStrategy → FoyerParquetCacheStrategy in the tiered-storage module.
497+
// ============================================================================
498+
499+
/// Look up a cached byte range for a Parquet file.
500+
/// Returns the cached bytes as a Java byte[], or null on cache miss.
501+
#[no_mangle]
502+
pub extern "system" fn Java_org_opensearch_datafusion_jni_NativeBridge_foyerPageCacheGet(
503+
mut env: JNIEnv,
504+
_class: JClass,
505+
runtime_ptr: jlong,
506+
path: JString,
507+
start: jint,
508+
end: jint,
509+
) -> jbyteArray {
510+
if runtime_ptr == 0 {
511+
return std::ptr::null_mut();
512+
}
513+
514+
let runtime = unsafe { &*(runtime_ptr as *const DataFusionRuntime) };
515+
let path_str: String = match env.get_string(&path) {
516+
Ok(s) => s.into(),
517+
Err(_) => return std::ptr::null_mut(),
518+
};
519+
520+
let page_cache = match runtime.custom_cache_manager.as_ref().and_then(|m| m.get_page_cache()) {
521+
Some(c) => c,
522+
None => return std::ptr::null_mut(),
523+
};
524+
525+
// FoyerDiskPageCache.get() is async (disk I/O). Use get_blocking() since JNI is synchronous.
526+
match page_cache.get_blocking(&path_str, start as usize, end as usize) {
527+
Some(bytes) => {
528+
log_debug!(
529+
"[FOYER-PAGE-CACHE] JNI get HIT: path={}, range={}..{}, size={}B",
530+
path_str, start, end, bytes.len()
531+
);
532+
match env.byte_array_from_slice(&bytes) {
533+
Ok(arr) => arr.into_raw(),
534+
Err(e) => {
535+
log_debug!("[FOYER-PAGE-CACHE] JNI get: failed to create Java byte[]: {}", e);
536+
std::ptr::null_mut()
537+
}
538+
}
539+
}
540+
None => {
541+
log_debug!(
542+
"[FOYER-PAGE-CACHE] JNI get MISS: path={}, range={}..{}",
543+
path_str, start, end
544+
);
545+
std::ptr::null_mut()
546+
}
547+
}
548+
}
549+
550+
/// Store a byte range for a Parquet file in the Foyer page cache.
551+
#[no_mangle]
552+
pub extern "system" fn Java_org_opensearch_datafusion_jni_NativeBridge_foyerPageCachePut(
553+
mut env: JNIEnv,
554+
_class: JClass,
555+
runtime_ptr: jlong,
556+
path: JString,
557+
start: jint,
558+
end: jint,
559+
data: JByteArray,
560+
) {
561+
if runtime_ptr == 0 {
562+
return;
563+
}
564+
565+
let runtime = unsafe { &*(runtime_ptr as *const DataFusionRuntime) };
566+
let path_str: String = match env.get_string(&path) {
567+
Ok(s) => s.into(),
568+
Err(e) => {
569+
log_debug!("[FoyerCache] foyerPageCachePut: failed to convert path: {}", e);
570+
return;
571+
}
572+
};
573+
574+
let page_cache = match runtime.custom_cache_manager.as_ref().and_then(|m| m.get_page_cache()) {
575+
Some(c) => c,
576+
None => return,
577+
};
578+
579+
let bytes_vec: Vec<u8> = match env.convert_byte_array(data) {
580+
Ok(v) => v,
581+
Err(e) => {
582+
log_debug!("[FoyerCache] foyerPageCachePut: failed to convert byte array: {}", e);
583+
return;
584+
}
585+
};
586+
587+
page_cache.put(path_str, start as usize, end as usize, Bytes::from(bytes_vec));
588+
}
589+
590+
/// Evict all cached byte ranges for a given Parquet file.
591+
/// Called when a file is deleted (merged/compacted/tiered out).
592+
#[no_mangle]
593+
pub extern "system" fn Java_org_opensearch_datafusion_jni_NativeBridge_foyerPageCacheEvictFile(
594+
mut env: JNIEnv,
595+
_class: JClass,
596+
runtime_ptr: jlong,
597+
path: JString,
598+
) {
599+
if runtime_ptr == 0 {
600+
return;
601+
}
602+
603+
let runtime = unsafe { &*(runtime_ptr as *const DataFusionRuntime) };
604+
let path_str: String = match env.get_string(&path) {
605+
Ok(s) => s.into(),
606+
Err(e) => {
607+
log_debug!("[FoyerCache] foyerPageCacheEvictFile: failed to convert path: {}", e);
608+
return;
609+
}
610+
};
611+
612+
if let Some(page_cache) = runtime.custom_cache_manager.as_ref().and_then(|m| m.get_page_cache()) {
613+
page_cache.evict_file(&path_str);
614+
log_debug!("[FoyerCache] evicted file from page cache: {}", path_str);
615+
}
616+
}

plugins/engine-datafusion/jni/src/custom_cache_manager.rs

Lines changed: 59 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,37 +10,60 @@ use crate::util::{create_object_meta_from_file};
1010
use object_store::path::Path;
1111
use object_store::ObjectMeta;
1212
use datafusion::datasource::physical_plan::parquet::metadata::DFParquetMetadata;
13-
use vectorized_exec_spi::{log_debug, log_error};
13+
use vectorized_exec_spi::{log_debug, log_error, log_info};
14+
use crate::tiered::foyer_cache::FoyerDiskPageCache;
1415

1516
/// Custom CacheManager that holds cache references directly
1617
pub struct CustomCacheManager {
17-
/// Direct reference to the file metadata cache
18+
/// Direct reference to the file metadata cache (Cache Layer 1: Parquet footer/schema)
1819
file_metadata_cache: Option<Arc<MutexFileMetadataCache>>,
19-
/// Direct reference to the statistics cache
20-
statistics_cache: Option<Arc<CustomStatisticsCache>>
20+
/// Direct reference to the statistics cache (Cache Layer 2: row counts, min/max stats)
21+
statistics_cache: Option<Arc<CustomStatisticsCache>>,
22+
/// Foyer-backed hybrid (memory+disk) page cache (Cache Layer 3: Parquet column chunk byte ranges)
23+
pub page_cache: Option<Arc<FoyerDiskPageCache>>,
2124
}
2225

2326
impl CustomCacheManager {
2427
/// Create a new CustomCacheManager
2528
pub fn new() -> Self {
2629
Self {
2730
file_metadata_cache: None,
28-
statistics_cache: None
31+
statistics_cache: None,
32+
page_cache: None,
2933
}
3034
}
3135

32-
/// Set the file metadata cache
36+
/// Set the file metadata cache (Layer 1)
3337
pub fn set_file_metadata_cache(&mut self, cache: Arc<MutexFileMetadataCache>) {
3438
self.file_metadata_cache = Some(cache);
3539
log_debug!("[CACHE INFO] File metadata cache set in CustomCacheManager");
3640
}
3741

38-
/// Set the statistics cache
42+
/// Set the statistics cache (Layer 2)
3943
pub fn set_statistics_cache(&mut self, cache: Arc<CustomStatisticsCache>) {
4044
self.statistics_cache = Some(cache);
4145
log_debug!("[CACHE INFO] Statistics cache set in CustomCacheManager");
4246
}
4347

48+
/// Set the Foyer page cache (Layer 3).
49+
///
50+
/// Once set, the `CachingObjectStore` (wrapping this cache) will intercept all
51+
/// `get_range()` calls to DataFusion's ObjectStore, returning cached bytes on HIT
52+
/// and populating the cache on MISS.
53+
pub fn set_page_cache(&mut self, cache: Arc<FoyerDiskPageCache>) {
54+
log_info!(
55+
"[FOYER-PAGE-CACHE] page cache set in CustomCacheManager: disk={}B, dir={}",
56+
cache.disk_capacity_bytes(),
57+
cache.disk_dir().display()
58+
);
59+
self.page_cache = Some(cache);
60+
}
61+
62+
/// Get the Foyer page cache (Layer 3), if configured.
63+
pub fn get_page_cache(&self) -> Option<Arc<FoyerDiskPageCache>> {
64+
self.page_cache.clone()
65+
}
66+
4467
/// Get the statistics cache
4568
pub fn get_statistics_cache(&self) -> Option<Arc<CustomStatisticsCache>> {
4669
self.statistics_cache.clone()
@@ -167,6 +190,12 @@ impl CustomCacheManager {
167190
}
168191
}
169192

193+
// Evict all cached page byte ranges for this file (Layer 3)
194+
if let Some(page_cache) = &self.page_cache {
195+
page_cache.evict_file(file_path);
196+
any_removed = true; // evict_file is best-effort, count as success
197+
}
198+
170199
let removed = if !errors.is_empty() && !any_removed {
171200
false
172201
} else {
@@ -257,29 +286,36 @@ impl CustomCacheManager {
257286
pub fn get_total_memory_consumed(&self) -> usize {
258287
let mut total = 0;
259288

260-
// Add metadata cache memory
289+
// Layer 1: metadata cache memory
261290
if let Some(cache) = &self.file_metadata_cache {
262291
if let Ok(cache_guard) = cache.inner.lock() {
263292
total += cache_guard.memory_used();
264293
}
265294
}
266295

267-
// Add statistics cache memory
296+
// Layer 2: statistics cache memory
268297
if let Some(cache) = &self.statistics_cache {
269298
total += cache.memory_consumed();
270299
}
271300

301+
// Layer 3: page cache is disk-only — no memory contribution to track here.
302+
272303
total
273304
}
274305

275-
/// Clear all caches
306+
/// Clear all caches (Layers 1, 2, and 3)
276307
pub fn clear_all(&self) {
277308
if let Some(cache) = &self.file_metadata_cache {
278309
cache.clear();
279310
}
280311
if let Some(cache) = &self.statistics_cache {
281312
cache.clear();
282313
}
314+
// FoyerDiskPageCache.clear() is async — use the blocking wrapper
315+
if let Some(cache) = &self.page_cache {
316+
log_info!("[FOYER-PAGE-CACHE] clear_all: clearing page cache (memory + disk)");
317+
cache.clear_blocking();
318+
}
283319
}
284320

285321
/// Clear specific cache type
@@ -301,6 +337,15 @@ impl CustomCacheManager {
301337
Err("No statistics cache configured".to_string())
302338
}
303339
}
340+
crate::cache::CACHE_TYPE_PAGES => {
341+
if let Some(cache) = &self.page_cache {
342+
log_info!("[FOYER-PAGE-CACHE] clear_cache_type PAGES: clearing page cache (memory + disk)");
343+
cache.clear_blocking();
344+
Ok(())
345+
} else {
346+
Err("No page cache configured".to_string())
347+
}
348+
}
304349
_ => Err(format!("Unknown cache type: {}", cache_type))
305350
}
306351
}
@@ -326,6 +371,10 @@ impl CustomCacheManager {
326371
Err("No statistics cache configured".to_string())
327372
}
328373
}
374+
crate::cache::CACHE_TYPE_PAGES => {
375+
// Disk-only cache — reports 0 heap memory consumed.
376+
Ok(0)
377+
}
329378
_ => Err(format!("Unknown cache type: {}", cache_type))
330379
}
331380
}

0 commit comments

Comments
 (0)