diff --git a/Cargo.lock b/Cargo.lock index b9dbd37463dd6..cfb292e869904 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2221,6 +2221,7 @@ dependencies = [ "dashmap", "datafusion-common", "datafusion-expr", + "datafusion-physical-expr-common", "futures", "insta", "log", diff --git a/datafusion/catalog-listing/src/table.rs b/datafusion/catalog-listing/src/table.rs index 9fb2dd2dce29c..0ccd3c7f2f548 100644 --- a/datafusion/catalog-listing/src/table.rs +++ b/datafusion/catalog-listing/src/table.rs @@ -705,31 +705,42 @@ impl ListingTable { store: &Arc, part_file: &PartitionedFile, ) -> datafusion_common::Result> { - match self + use datafusion_execution::cache::cache_manager::CachedFileMetadata; + + // Check cache first + if let Some(cached) = self .collected_statistics - .get_with_extra(&part_file.object_meta.location, &part_file.object_meta) + .get(&part_file.object_meta.location) { - Some(statistics) => Ok(statistics), - None => { - let statistics = self - .options - .format - .infer_stats( - ctx, - store, - Arc::clone(&self.file_schema), - &part_file.object_meta, - ) - .await?; - let statistics = Arc::new(statistics); - self.collected_statistics.put_with_extra( - &part_file.object_meta.location, - Arc::clone(&statistics), - &part_file.object_meta, - ); - Ok(statistics) + // Validate that cached entry is still valid + if cached.is_valid_for(&part_file.object_meta) { + return Ok(cached.statistics); } } + + // Cache miss or invalid - infer statistics + let statistics = self + .options + .format + .infer_stats( + ctx, + store, + Arc::clone(&self.file_schema), + &part_file.object_meta, + ) + .await?; + let statistics = Arc::new(statistics); + + // Store in cache + self.collected_statistics.put( + &part_file.object_meta.location, + CachedFileMetadata::new( + part_file.object_meta.clone(), + Arc::clone(&statistics), + None, // No ordering information in this PR + ), + ); + Ok(statistics) } } diff --git a/datafusion/datasource-parquet/src/metadata.rs b/datafusion/datasource-parquet/src/metadata.rs index 8b11ba64ae7f1..a4ba196ae4c5f 100644 --- a/datafusion/datasource-parquet/src/metadata.rs +++ b/datafusion/datasource-parquet/src/metadata.rs @@ -31,7 +31,9 @@ use datafusion_common::stats::Precision; use datafusion_common::{ ColumnStatistics, DataFusionError, Result, ScalarValue, Statistics, }; -use datafusion_execution::cache::cache_manager::{FileMetadata, FileMetadataCache}; +use datafusion_execution::cache::cache_manager::{ + CachedFileMetadataEntry, FileMetadata, FileMetadataCache, +}; use datafusion_functions_aggregate_common::min_max::{MaxAccumulator, MinAccumulator}; use datafusion_physical_plan::Accumulator; use log::debug; @@ -125,19 +127,15 @@ impl<'a> DFParquetMetadata<'a> { !cfg!(feature = "parquet_encryption") || decryption_properties.is_none(); if cache_metadata - && let Some(parquet_metadata) = file_metadata_cache - .as_ref() - .and_then(|file_metadata_cache| file_metadata_cache.get(object_meta)) - .and_then(|file_metadata| { - file_metadata - .as_any() - .downcast_ref::() - .map(|cached_parquet_metadata| { - Arc::clone(cached_parquet_metadata.parquet_metadata()) - }) - }) + && let Some(file_metadata_cache) = file_metadata_cache.as_ref() + && let Some(cached) = file_metadata_cache.get(&object_meta.location) + && cached.is_valid_for(object_meta) + && let Some(cached_parquet) = cached + .file_metadata + .as_any() + .downcast_ref::() { - return Ok(parquet_metadata); + return Ok(Arc::clone(cached_parquet.parquet_metadata())); } let mut reader = @@ -163,8 +161,11 @@ impl<'a> DFParquetMetadata<'a> { if cache_metadata && let Some(file_metadata_cache) = file_metadata_cache { file_metadata_cache.put( - object_meta, - Arc::new(CachedParquetMetaData::new(Arc::clone(&metadata))), + &object_meta.location, + CachedFileMetadataEntry::new( + (*object_meta).clone(), + Arc::new(CachedParquetMetaData::new(Arc::clone(&metadata))), + ), ); } diff --git a/datafusion/datasource/src/url.rs b/datafusion/datasource/src/url.rs index 155d6efe462c1..d5c3211e11319 100644 --- a/datafusion/datasource/src/url.rs +++ b/datafusion/datasource/src/url.rs @@ -15,9 +15,8 @@ // specific language governing permissions and limitations // under the License. -use std::sync::Arc; - use datafusion_common::{DataFusionError, Result}; +use datafusion_execution::cache::cache_manager::CachedFileList; use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_session::Session; @@ -364,15 +363,13 @@ async fn list_with_cache<'b>( .map(|res| res.map_err(|e| DataFusionError::ObjectStore(Box::new(e)))) .boxed()), Some(cache) => { - // Convert prefix to Option for cache lookup - let prefix_filter = prefix.cloned(); + // Build the filter prefix (only Some if prefix was requested) + let filter_prefix = prefix.is_some().then(|| full_prefix.clone()); - // Try cache lookup with optional prefix filter - let vec = if let Some(res) = - cache.get_with_extra(table_base_path, &prefix_filter) - { + // Try cache lookup + let vec = if let Some(cached) = cache.get(table_base_path) { debug!("Hit list files cache"); - res.as_ref().clone() + cached.filter_by_prefix(&filter_prefix) } else { // Cache miss - always list and cache the full table // This ensures we have complete data for future prefix queries @@ -380,19 +377,10 @@ async fn list_with_cache<'b>( .list(Some(table_base_path)) .try_collect::>() .await?; - cache.put(table_base_path, Arc::new(vec.clone())); - - // If a prefix filter was requested, apply it to the results - if prefix.is_some() { - let full_prefix_str = full_prefix.as_ref(); - vec.into_iter() - .filter(|meta| { - meta.location.as_ref().starts_with(full_prefix_str) - }) - .collect() - } else { - vec - } + let cached = CachedFileList::new(vec); + let result = cached.filter_by_prefix(&filter_prefix); + cache.put(table_base_path, cached); + result }; Ok(futures::stream::iter(vec.into_iter().map(Ok)).boxed()) } @@ -494,6 +482,7 @@ mod tests { use std::any::Any; use std::collections::HashMap; use std::ops::Range; + use std::sync::Arc; use tempfile::tempdir; #[test] diff --git a/datafusion/execution/Cargo.toml b/datafusion/execution/Cargo.toml index ca1fba07cae2d..4d390a8ccc57b 100644 --- a/datafusion/execution/Cargo.toml +++ b/datafusion/execution/Cargo.toml @@ -55,6 +55,7 @@ chrono = { workspace = true } dashmap = { workspace = true } datafusion-common = { workspace = true, default-features = false } datafusion-expr = { workspace = true, default-features = false } +datafusion-physical-expr-common = { workspace = true, default-features = false } futures = { workspace = true } log = { workspace = true } object_store = { workspace = true, features = ["fs"] } diff --git a/datafusion/execution/src/cache/cache_manager.rs b/datafusion/execution/src/cache/cache_manager.rs index c76a68c651eb0..ac2577a9abb1a 100644 --- a/datafusion/execution/src/cache/cache_manager.rs +++ b/datafusion/execution/src/cache/cache_manager.rs @@ -15,10 +15,12 @@ // specific language governing permissions and limitations // under the License. +use crate::cache::CacheAccessor; +use crate::cache::DefaultListFilesCache; use crate::cache::cache_unit::DefaultFilesMetadataCache; -use crate::cache::{CacheAccessor, DefaultListFilesCache}; use datafusion_common::stats::Precision; use datafusion_common::{Result, Statistics}; +use datafusion_physical_expr_common::sort_expr::LexOrdering; use object_store::ObjectMeta; use object_store::path::Path; use std::any::Any; @@ -31,16 +33,61 @@ pub use super::list_files_cache::{ DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT, DEFAULT_LIST_FILES_CACHE_TTL, }; -/// A cache for [`Statistics`]. +/// Cached metadata for a file, including statistics and ordering. +/// +/// This struct embeds the [`ObjectMeta`] used for cache validation, +/// along with the cached statistics and ordering information. +#[derive(Debug, Clone)] +pub struct CachedFileMetadata { + /// File metadata used for cache validation (size, last_modified). + pub meta: ObjectMeta, + /// Cached statistics for the file, if available. + pub statistics: Arc, + /// Cached ordering for the file. + pub ordering: Option, +} + +impl CachedFileMetadata { + /// Create a new cached file metadata entry. + pub fn new( + meta: ObjectMeta, + statistics: Arc, + ordering: Option, + ) -> Self { + Self { + meta, + statistics, + ordering, + } + } + + /// Check if this cached entry is still valid for the given metadata. + /// + /// Returns true if the file size and last modified time match. + pub fn is_valid_for(&self, current_meta: &ObjectMeta) -> bool { + self.meta.size == current_meta.size + && self.meta.last_modified == current_meta.last_modified + } +} + +/// A cache for file statistics and orderings. +/// +/// This cache stores [`CachedFileMetadata`] which includes: +/// - File metadata for validation (size, last_modified) +/// - Statistics for the file +/// - Ordering information for the file /// /// If enabled via [`CacheManagerConfig::with_files_statistics_cache`] this /// cache avoids inferring the same file statistics repeatedly during the /// session lifetime. /// +/// The typical usage pattern is: +/// 1. Call `get(path)` to check for cached value +/// 2. If `Some(cached)`, validate with `cached.is_valid_for(¤t_meta)` +/// 3. If invalid or missing, compute new value and call `put(path, new_value)` +/// /// See [`crate::runtime_env::RuntimeEnv`] for more details -pub trait FileStatisticsCache: - CacheAccessor, Extra = ObjectMeta> -{ +pub trait FileStatisticsCache: CacheAccessor { /// Retrieves the information about the entries currently cached. fn list_entries(&self) -> HashMap; } @@ -58,6 +105,39 @@ pub struct FileStatisticsCacheEntry { pub table_size_bytes: Precision, /// Size of the statistics entry, in bytes. pub statistics_size_bytes: usize, + /// Whether ordering information is cached for this file. + pub has_ordering: bool, +} + +/// Cached file listing. +/// +/// TTL expiration is handled internally by the cache implementation. +#[derive(Debug, Clone)] +pub struct CachedFileList { + /// The cached file list. + pub files: Arc>, +} + +impl CachedFileList { + /// Create a new cached file list. + pub fn new(files: Vec) -> Self { + Self { + files: Arc::new(files), + } + } + + /// Filter the files by prefix. + pub fn filter_by_prefix(&self, prefix: &Option) -> Vec { + match prefix { + Some(prefix) => self + .files + .iter() + .filter(|meta| meta.location.as_ref().starts_with(prefix.as_ref())) + .cloned() + .collect(), + None => self.files.as_ref().clone(), + } + } } /// Cache for storing the [`ObjectMeta`]s that result from listing a path @@ -67,21 +147,12 @@ pub struct FileStatisticsCacheEntry { /// especially when done over remote object stores. /// /// The cache key is always the table's base path, ensuring a stable cache key. -/// The `Extra` type is `Option`, representing an optional prefix filter -/// (relative to the table base path) for partition-aware lookups. -/// -/// When `get_with_extra(key, Some(prefix))` is called: -/// - The cache entry for `key` (table base path) is fetched -/// - Results are filtered to only include files matching `key/prefix` -/// - Filtered results are returned without making a storage call +/// The cached value is a [`CachedFileList`] containing the files and a timestamp. /// -/// This enables efficient partition pruning: a single cached listing of the -/// full table can serve queries for any partition subset. +/// Partition filtering is done after retrieval using [`CachedFileList::filter_by_prefix`]. /// /// See [`crate::runtime_env::RuntimeEnv`] for more details. -pub trait ListFilesCache: - CacheAccessor>, Extra = Option> -{ +pub trait ListFilesCache: CacheAccessor { /// Returns the cache's memory limit in bytes. fn cache_limit(&self) -> usize; @@ -113,9 +184,44 @@ pub trait FileMetadata: Any + Send + Sync { fn extra_info(&self) -> HashMap; } +/// Cached file metadata entry with validation information. +#[derive(Clone)] +pub struct CachedFileMetadataEntry { + /// File metadata used for cache validation (size, last_modified). + pub meta: ObjectMeta, + /// The cached file metadata. + pub file_metadata: Arc, +} + +impl CachedFileMetadataEntry { + /// Create a new cached file metadata entry. + pub fn new(meta: ObjectMeta, file_metadata: Arc) -> Self { + Self { + meta, + file_metadata, + } + } + + /// Check if this cached entry is still valid for the given metadata. + pub fn is_valid_for(&self, current_meta: &ObjectMeta) -> bool { + self.meta.size == current_meta.size + && self.meta.last_modified == current_meta.last_modified + } +} + +impl Debug for CachedFileMetadataEntry { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("CachedFileMetadataEntry") + .field("meta", &self.meta) + .field("memory_size", &self.file_metadata.memory_size()) + .finish() + } +} + /// Cache for file-embedded metadata. /// -/// This cache stores per-file metadata in the form of [`FileMetadata`], +/// This cache stores per-file metadata in the form of [`CachedFileMetadataEntry`], +/// which includes the [`ObjectMeta`] for validation. /// /// For example, the built in [`ListingTable`] uses this cache to avoid parsing /// Parquet footers multiple times for the same file. @@ -124,12 +230,15 @@ pub trait FileMetadata: Any + Send + Sync { /// and users can also provide their own implementations to implement custom /// caching strategies. /// +/// The typical usage pattern is: +/// 1. Call `get(path)` to check for cached value +/// 2. If `Some(cached)`, validate with `cached.is_valid_for(¤t_meta)` +/// 3. If invalid or missing, compute new value and call `put(path, new_value)` +/// /// See [`crate::runtime_env::RuntimeEnv`] for more details. /// /// [`ListingTable`]: https://docs.rs/datafusion/latest/datafusion/datasource/listing/struct.ListingTable.html -pub trait FileMetadataCache: - CacheAccessor, Extra = ObjectMeta> -{ +pub trait FileMetadataCache: CacheAccessor { /// Returns the cache's memory limit in bytes. fn cache_limit(&self) -> usize; diff --git a/datafusion/execution/src/cache/cache_unit.rs b/datafusion/execution/src/cache/cache_unit.rs index 5351df449a7c1..9362d078416ff 100644 --- a/datafusion/execution/src/cache/cache_unit.rs +++ b/datafusion/execution/src/cache/cache_unit.rs @@ -16,189 +16,339 @@ // under the License. use std::collections::HashMap; -use std::sync::Arc; use crate::cache::CacheAccessor; -use crate::cache::cache_manager::{FileStatisticsCache, FileStatisticsCacheEntry}; - -use datafusion_common::Statistics; +use crate::cache::cache_manager::{ + CachedFileMetadata, FileStatisticsCache, FileStatisticsCacheEntry, +}; use dashmap::DashMap; -use object_store::ObjectMeta; use object_store::path::Path; pub use crate::cache::DefaultFilesMetadataCache; /// Default implementation of [`FileStatisticsCache`] /// -/// Stores collected statistics for files +/// Stores cached file metadata (statistics and orderings) for files. +/// +/// The typical usage pattern is: +/// 1. Call `get(path)` to check for cached value +/// 2. If `Some(cached)`, validate with `cached.is_valid_for(¤t_meta)` +/// 3. If invalid or missing, compute new value and call `put(path, new_value)` /// -/// Cache is invalided when file size or last modification has changed +/// Uses DashMap for lock-free concurrent access. /// /// [`FileStatisticsCache`]: crate::cache::cache_manager::FileStatisticsCache #[derive(Default)] pub struct DefaultFileStatisticsCache { - statistics: DashMap)>, + cache: DashMap, } -impl FileStatisticsCache for DefaultFileStatisticsCache { - fn list_entries(&self) -> HashMap { - let mut entries = HashMap::::new(); - - for entry in &self.statistics { - let path = entry.key(); - let (object_meta, stats) = entry.value(); - entries.insert( - path.clone(), - FileStatisticsCacheEntry { - object_meta: object_meta.clone(), - num_rows: stats.num_rows, - num_columns: stats.column_statistics.len(), - table_size_bytes: stats.total_byte_size, - statistics_size_bytes: 0, // TODO: set to the real size in the future - }, - ); - } - - entries - } -} - -impl CacheAccessor> for DefaultFileStatisticsCache { - type Extra = ObjectMeta; - - /// Get `Statistics` for file location. - fn get(&self, k: &Path) -> Option> { - self.statistics - .get(k) - .map(|s| Some(Arc::clone(&s.value().1))) - .unwrap_or(None) - } - - /// Get `Statistics` for file location. Returns None if file has changed or not found. - fn get_with_extra(&self, k: &Path, e: &Self::Extra) -> Option> { - self.statistics - .get(k) - .map(|s| { - let (saved_meta, statistics) = s.value(); - if saved_meta.size != e.size - || saved_meta.last_modified != e.last_modified - { - // file has changed - None - } else { - Some(Arc::clone(statistics)) - } - }) - .unwrap_or(None) +impl CacheAccessor for DefaultFileStatisticsCache { + fn get(&self, key: &Path) -> Option { + self.cache.get(key).map(|entry| entry.value().clone()) } - /// Save collected file statistics - fn put(&self, _key: &Path, _value: Arc) -> Option> { - panic!("Put cache in DefaultFileStatisticsCache without Extra not supported.") + fn put(&self, key: &Path, value: CachedFileMetadata) -> Option { + self.cache.insert(key.clone(), value) } - fn put_with_extra( - &self, - key: &Path, - value: Arc, - e: &Self::Extra, - ) -> Option> { - self.statistics - .insert(key.clone(), (e.clone(), value)) - .map(|x| x.1) - } - - fn remove(&self, k: &Path) -> Option> { - self.statistics.remove(k).map(|x| x.1.1) + fn remove(&self, k: &Path) -> Option { + self.cache.remove(k).map(|(_, entry)| entry) } fn contains_key(&self, k: &Path) -> bool { - self.statistics.contains_key(k) + self.cache.contains_key(k) } fn len(&self) -> usize { - self.statistics.len() + self.cache.len() } fn clear(&self) { - self.statistics.clear() + self.cache.clear(); } + fn name(&self) -> String { "DefaultFileStatisticsCache".to_string() } } +impl FileStatisticsCache for DefaultFileStatisticsCache { + fn list_entries(&self) -> HashMap { + let mut entries = HashMap::::new(); + + for entry in self.cache.iter() { + let path = entry.key(); + let cached = entry.value(); + entries.insert( + path.clone(), + FileStatisticsCacheEntry { + object_meta: cached.meta.clone(), + num_rows: cached.statistics.num_rows, + num_columns: cached.statistics.column_statistics.len(), + table_size_bytes: cached.statistics.total_byte_size, + statistics_size_bytes: 0, // TODO: set to the real size in the future + has_ordering: cached.ordering.is_some(), + }, + ); + } + + entries + } +} + #[cfg(test)] mod tests { use super::*; use crate::cache::CacheAccessor; - use crate::cache::cache_manager::{FileStatisticsCache, FileStatisticsCacheEntry}; - use crate::cache::cache_unit::DefaultFileStatisticsCache; + use crate::cache::cache_manager::{ + CachedFileMetadata, FileStatisticsCache, FileStatisticsCacheEntry, + }; + use arrow::array::RecordBatch; use arrow::datatypes::{DataType, Field, Schema, TimeUnit}; use chrono::DateTime; use datafusion_common::Statistics; use datafusion_common::stats::Precision; + use datafusion_expr::ColumnarValue; + use datafusion_physical_expr_common::physical_expr::PhysicalExpr; + use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr}; use object_store::ObjectMeta; use object_store::path::Path; + use std::sync::Arc; - #[test] - fn test_statistics_cache() { - let meta = ObjectMeta { - location: Path::from("test"), + fn create_test_meta(path: &str, size: u64) -> ObjectMeta { + ObjectMeta { + location: Path::from(path), last_modified: DateTime::parse_from_rfc3339("2022-09-27T22:36:00+02:00") .unwrap() .into(), - size: 1024, + size, e_tag: None, version: None, - }; + } + } + + #[test] + fn test_statistics_cache() { + let meta = create_test_meta("test", 1024); + let cache = DefaultFileStatisticsCache::default(); + + let schema = Schema::new(vec![Field::new( + "test_column", + DataType::Timestamp(TimeUnit::Second, None), + false, + )]); + + // Cache miss + assert!(cache.get(&meta.location).is_none()); + + // Put a value + let cached_value = CachedFileMetadata::new( + meta.clone(), + Arc::new(Statistics::new_unknown(&schema)), + None, + ); + cache.put(&meta.location, cached_value); + + // Cache hit + let result = cache.get(&meta.location); + assert!(result.is_some()); + let cached = result.unwrap(); + assert!(cached.is_valid_for(&meta)); + + // File size changed - validation should fail + let meta2 = create_test_meta("test", 2048); + let cached = cache.get(&meta2.location).unwrap(); + assert!(!cached.is_valid_for(&meta2)); + + // Update with new value + let cached_value2 = CachedFileMetadata::new( + meta2.clone(), + Arc::new(Statistics::new_unknown(&schema)), + None, + ); + cache.put(&meta2.location, cached_value2); + + // Test list_entries + let entries = cache.list_entries(); + assert_eq!(entries.len(), 1); + let entry = entries.get(&Path::from("test")).unwrap(); + assert_eq!(entry.object_meta.size, 2048); // Should be updated value + } + + #[derive(Clone, Debug, PartialEq, Eq, Hash)] + struct MockExpr {} + + impl std::fmt::Display for MockExpr { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "MockExpr") + } + } + + impl PhysicalExpr for MockExpr { + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn data_type( + &self, + _input_schema: &Schema, + ) -> datafusion_common::Result { + Ok(DataType::Int32) + } + + fn nullable(&self, _input_schema: &Schema) -> datafusion_common::Result { + Ok(false) + } + + fn evaluate( + &self, + _batch: &RecordBatch, + ) -> datafusion_common::Result { + unimplemented!() + } + + fn children(&self) -> Vec<&Arc> { + vec![] + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> datafusion_common::Result> { + assert!(children.is_empty()); + Ok(self) + } + + fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "MockExpr") + } + } + + fn ordering() -> LexOrdering { + let expr = Arc::new(MockExpr {}) as Arc; + LexOrdering::new(vec![PhysicalSortExpr::new_default(expr)]).unwrap() + } + + #[test] + fn test_ordering_cache() { + let meta = create_test_meta("test.parquet", 100); let cache = DefaultFileStatisticsCache::default(); - assert!(cache.get_with_extra(&meta.location, &meta).is_none()); - - cache.put_with_extra( - &meta.location, - Statistics::new_unknown(&Schema::new(vec![Field::new( - "test_column", - DataType::Timestamp(TimeUnit::Second, None), - false, - )])) - .into(), - &meta, + + let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); + + // Cache statistics with no ordering + let cached_value = CachedFileMetadata::new( + meta.clone(), + Arc::new(Statistics::new_unknown(&schema)), + None, // No ordering yet ); - assert!(cache.get_with_extra(&meta.location, &meta).is_some()); - - // file size changed - let mut meta2 = meta.clone(); - meta2.size = 2048; - assert!(cache.get_with_extra(&meta2.location, &meta2).is_none()); - - // file last_modified changed - let mut meta2 = meta.clone(); - meta2.last_modified = DateTime::parse_from_rfc3339("2022-09-27T22:40:00+02:00") - .unwrap() - .into(); - assert!(cache.get_with_extra(&meta2.location, &meta2).is_none()); - - // different file - let mut meta2 = meta.clone(); - meta2.location = Path::from("test2"); - assert!(cache.get_with_extra(&meta2.location, &meta2).is_none()); - - // test the list_entries method + cache.put(&meta.location, cached_value); + + let result = cache.get(&meta.location).unwrap(); + assert!(result.ordering.is_none()); + + // Update to add ordering + let mut cached = cache.get(&meta.location).unwrap(); + if cached.is_valid_for(&meta) && cached.ordering.is_none() { + cached.ordering = Some(ordering()); + } + cache.put(&meta.location, cached); + + let result2 = cache.get(&meta.location).unwrap(); + assert!(result2.ordering.is_some()); + + // Verify list_entries shows has_ordering = true + let entries = cache.list_entries(); + assert_eq!(entries.len(), 1); + assert!(entries.get(&meta.location).unwrap().has_ordering); + } + + #[test] + fn test_cache_invalidation_on_file_modification() { + let cache = DefaultFileStatisticsCache::default(); + let path = Path::from("test.parquet"); + let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); + + let meta_v1 = create_test_meta("test.parquet", 100); + + // Cache initial value + let cached_value = CachedFileMetadata::new( + meta_v1.clone(), + Arc::new(Statistics::new_unknown(&schema)), + None, + ); + cache.put(&path, cached_value); + + // File modified (size changed) + let meta_v2 = create_test_meta("test.parquet", 200); + + let cached = cache.get(&path).unwrap(); + // Should not be valid for new meta + assert!(!cached.is_valid_for(&meta_v2)); + + // Compute new value and update + let new_cached = CachedFileMetadata::new( + meta_v2.clone(), + Arc::new(Statistics::new_unknown(&schema)), + None, + ); + cache.put(&path, new_cached); + + // Should have new metadata + let result = cache.get(&path).unwrap(); + assert_eq!(result.meta.size, 200); + } + + #[test] + fn test_list_entries() { + let cache = DefaultFileStatisticsCache::default(); + let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); + + let meta1 = create_test_meta("test1.parquet", 100); + + let cached_value = CachedFileMetadata::new( + meta1.clone(), + Arc::new(Statistics::new_unknown(&schema)), + None, + ); + cache.put(&meta1.location, cached_value); + let meta2 = create_test_meta("test2.parquet", 200); + let cached_value = CachedFileMetadata::new( + meta2.clone(), + Arc::new(Statistics::new_unknown(&schema)), + Some(ordering()), + ); + cache.put(&meta2.location, cached_value); + let entries = cache.list_entries(); assert_eq!( entries, - HashMap::from([( - Path::from("test"), - FileStatisticsCacheEntry { - object_meta: meta.clone(), - num_rows: Precision::Absent, - num_columns: 1, - table_size_bytes: Precision::Absent, - statistics_size_bytes: 0, - } - )]) + HashMap::from([ + ( + Path::from("test1.parquet"), + FileStatisticsCacheEntry { + object_meta: meta1, + num_rows: Precision::Absent, + num_columns: 1, + table_size_bytes: Precision::Absent, + statistics_size_bytes: 0, + has_ordering: false, + } + ), + ( + Path::from("test2.parquet"), + FileStatisticsCacheEntry { + object_meta: meta2, + num_rows: Precision::Absent, + num_columns: 1, + table_size_bytes: Precision::Absent, + statistics_size_bytes: 0, + has_ordering: true, + } + ), + ]) ); } } diff --git a/datafusion/execution/src/cache/file_metadata_cache.rs b/datafusion/execution/src/cache/file_metadata_cache.rs index c7a24dd878e4f..5e899d7dd9f8b 100644 --- a/datafusion/execution/src/cache/file_metadata_cache.rs +++ b/datafusion/execution/src/cache/file_metadata_cache.rs @@ -15,22 +15,19 @@ // specific language governing permissions and limitations // under the License. -use std::{ - collections::HashMap, - sync::{Arc, Mutex}, -}; +use std::{collections::HashMap, sync::Mutex}; -use object_store::{ObjectMeta, path::Path}; +use object_store::path::Path; use crate::cache::{ CacheAccessor, - cache_manager::{FileMetadata, FileMetadataCache, FileMetadataCacheEntry}, + cache_manager::{CachedFileMetadataEntry, FileMetadataCache, FileMetadataCacheEntry}, lru_queue::LruQueue, }; /// Handles the inner state of the [`DefaultFilesMetadataCache`] struct. struct DefaultFilesMetadataCacheState { - lru_queue: LruQueue)>, + lru_queue: LruQueue, memory_limit: usize, memory_used: usize, cache_hits: HashMap, @@ -46,35 +43,18 @@ impl DefaultFilesMetadataCacheState { } } - /// Returns the respective entry from the cache, if it exists and the `size` and `last_modified` - /// properties from [`ObjectMeta`] match. + /// Returns the respective entry from the cache, if it exists. /// If the entry exists, it becomes the most recently used. - fn get(&mut self, k: &ObjectMeta) -> Option> { - self.lru_queue - .get(&k.location) - .map(|(object_meta, metadata)| { - if object_meta.size != k.size - || object_meta.last_modified != k.last_modified - { - None - } else { - *self.cache_hits.entry(k.location.clone()).or_insert(0) += 1; - Some(Arc::clone(metadata)) - } - }) - .unwrap_or(None) + fn get(&mut self, k: &Path) -> Option { + self.lru_queue.get(k).cloned().inspect(|_| { + *self.cache_hits.entry(k.clone()).or_insert(0) += 1; + }) } - /// Checks if the metadata is currently cached (entry exists and the `size` and `last_modified` - /// properties of [`ObjectMeta`] match). + /// Checks if the metadata is currently cached. /// The LRU queue is not updated. - fn contains_key(&self, k: &ObjectMeta) -> bool { - self.lru_queue - .peek(&k.location) - .map(|(object_meta, _)| { - object_meta.size == k.size && object_meta.last_modified == k.last_modified - }) - .unwrap_or(false) + fn contains_key(&self, k: &Path) -> bool { + self.lru_queue.peek(k).is_some() } /// Adds a new key-value pair to cache, meaning LRU entries might be evicted if required. @@ -82,35 +62,34 @@ impl DefaultFilesMetadataCacheState { /// If the size of the metadata is greater than the `memory_limit`, the value is not inserted. fn put( &mut self, - key: ObjectMeta, - value: Arc, - ) -> Option> { - let value_size = value.memory_size(); + key: Path, + value: CachedFileMetadataEntry, + ) -> Option { + let value_size = value.file_metadata.memory_size(); // no point in trying to add this value to the cache if it cannot fit entirely if value_size > self.memory_limit { return None; } - self.cache_hits.insert(key.location.clone(), 0); + self.cache_hits.insert(key.clone(), 0); // if the key is already in the cache, the old value is removed - let old_value = self.lru_queue.put(key.location.clone(), (key, value)); + let old_value = self.lru_queue.put(key, value); self.memory_used += value_size; - if let Some((_, ref old_metadata)) = old_value { - self.memory_used -= old_metadata.memory_size(); + if let Some(ref old_entry) = old_value { + self.memory_used -= old_entry.file_metadata.memory_size(); } self.evict_entries(); - old_value.map(|v| v.1) + old_value } /// Evicts entries from the LRU cache until `memory_used` is lower than `memory_limit`. fn evict_entries(&mut self) { while self.memory_used > self.memory_limit { if let Some(removed) = self.lru_queue.pop() { - let metadata: Arc = removed.1.1; - self.memory_used -= metadata.memory_size(); + self.memory_used -= removed.1.file_metadata.memory_size(); } else { // cache is empty while memory_used > memory_limit, cannot happen debug_assert!( @@ -123,11 +102,11 @@ impl DefaultFilesMetadataCacheState { } /// Removes an entry from the cache and returns it, if it exists. - fn remove(&mut self, k: &ObjectMeta) -> Option> { - if let Some((_, old_metadata)) = self.lru_queue.remove(&k.location) { - self.memory_used -= old_metadata.memory_size(); - self.cache_hits.remove(&k.location); - Some(old_metadata) + fn remove(&mut self, k: &Path) -> Option { + if let Some(old_entry) = self.lru_queue.remove(k) { + self.memory_used -= old_entry.file_metadata.memory_size(); + self.cache_hits.remove(k); + Some(old_entry) } else { None } @@ -150,8 +129,8 @@ impl DefaultFilesMetadataCacheState { /// /// Collected file embedded metadata cache. /// -/// The metadata for each file is invalidated when the file size or last -/// modification time have been changed. +/// The metadata for each file is validated by comparing the cached [`ObjectMeta`] +/// (size and last_modified) against the current file state using `cached.is_valid_for(¤t_meta)`. /// /// # Internal details /// @@ -160,11 +139,7 @@ impl DefaultFilesMetadataCacheState { /// size of the cached entries exceeds `memory_limit`, the least recently used entries /// are evicted until the total size is lower than `memory_limit`. /// -/// # `Extra` Handling -/// -/// Users should use the [`Self::get`] and [`Self::put`] methods. The -/// [`Self::get_with_extra`] and [`Self::put_with_extra`] methods simply call -/// `get` and `put`, respectively. +/// [`ObjectMeta`]: object_store::ObjectMeta pub struct DefaultFilesMetadataCache { // the state is wrapped in a Mutex to ensure the operations are atomic state: Mutex, @@ -189,78 +164,27 @@ impl DefaultFilesMetadataCache { } } -impl FileMetadataCache for DefaultFilesMetadataCache { - fn cache_limit(&self) -> usize { - let state = self.state.lock().unwrap(); - state.memory_limit - } - - fn update_cache_limit(&self, limit: usize) { - let mut state = self.state.lock().unwrap(); - state.memory_limit = limit; - state.evict_entries(); - } - - fn list_entries(&self) -> HashMap { - let state = self.state.lock().unwrap(); - let mut entries = HashMap::::new(); - - for (path, (object_meta, metadata)) in state.lru_queue.list_entries() { - entries.insert( - path.clone(), - FileMetadataCacheEntry { - object_meta: object_meta.clone(), - size_bytes: metadata.memory_size(), - hits: *state.cache_hits.get(path).expect("entry must exist"), - extra: metadata.extra_info(), - }, - ); - } - - entries - } -} - -impl CacheAccessor> for DefaultFilesMetadataCache { - type Extra = ObjectMeta; - - fn get(&self, k: &ObjectMeta) -> Option> { +impl CacheAccessor for DefaultFilesMetadataCache { + fn get(&self, key: &Path) -> Option { let mut state = self.state.lock().unwrap(); - state.get(k) - } - - fn get_with_extra( - &self, - k: &ObjectMeta, - _e: &Self::Extra, - ) -> Option> { - self.get(k) + state.get(key) } fn put( &self, - key: &ObjectMeta, - value: Arc, - ) -> Option> { + key: &Path, + value: CachedFileMetadataEntry, + ) -> Option { let mut state = self.state.lock().unwrap(); state.put(key.clone(), value) } - fn put_with_extra( - &self, - key: &ObjectMeta, - value: Arc, - _e: &Self::Extra, - ) -> Option> { - self.put(key, value) - } - - fn remove(&self, k: &ObjectMeta) -> Option> { + fn remove(&self, k: &Path) -> Option { let mut state = self.state.lock().unwrap(); state.remove(k) } - fn contains_key(&self, k: &ObjectMeta) -> bool { + fn contains_key(&self, k: &Path) -> bool { let state = self.state.lock().unwrap(); state.contains_key(k) } @@ -280,6 +204,38 @@ impl CacheAccessor> for DefaultFilesMetadataCa } } +impl FileMetadataCache for DefaultFilesMetadataCache { + fn cache_limit(&self) -> usize { + let state = self.state.lock().unwrap(); + state.memory_limit + } + + fn update_cache_limit(&self, limit: usize) { + let mut state = self.state.lock().unwrap(); + state.memory_limit = limit; + state.evict_entries(); + } + + fn list_entries(&self) -> HashMap { + let state = self.state.lock().unwrap(); + let mut entries = HashMap::::new(); + + for (path, entry) in state.lru_queue.list_entries() { + entries.insert( + path.clone(), + FileMetadataCacheEntry { + object_meta: entry.meta.clone(), + size_bytes: entry.file_metadata.memory_size(), + hits: *state.cache_hits.get(path).expect("entry must exist"), + extra: entry.file_metadata.extra_info(), + }, + ); + } + + entries + } +} + #[cfg(test)] mod tests { use std::collections::HashMap; @@ -287,7 +243,7 @@ mod tests { use crate::cache::CacheAccessor; use crate::cache::cache_manager::{ - FileMetadata, FileMetadataCache, FileMetadataCacheEntry, + CachedFileMetadataEntry, FileMetadata, FileMetadataCache, FileMetadataCacheEntry, }; use crate::cache::file_metadata_cache::DefaultFilesMetadataCache; use object_store::ObjectMeta; @@ -311,67 +267,77 @@ mod tests { } } - #[test] - fn test_default_file_metadata_cache() { - let object_meta = ObjectMeta { - location: Path::from("test"), + fn create_test_object_meta(path: &str, size: usize) -> ObjectMeta { + ObjectMeta { + location: Path::from(path), last_modified: chrono::DateTime::parse_from_rfc3339( "2025-07-29T12:12:12+00:00", ) .unwrap() .into(), - size: 1024, + size: size as u64, e_tag: None, version: None, - }; + } + } + + #[test] + fn test_default_file_metadata_cache() { + let object_meta = create_test_object_meta("test", 1024); let metadata: Arc = Arc::new(TestFileMetadata { metadata: "retrieved_metadata".to_owned(), }); let cache = DefaultFilesMetadataCache::new(1024 * 1024); - assert!(cache.get(&object_meta).is_none()); - // put - cache.put(&object_meta, Arc::clone(&metadata)); + // Cache miss + assert!(cache.get(&object_meta.location).is_none()); + + // Put a value + let cached_entry = + CachedFileMetadataEntry::new(object_meta.clone(), Arc::clone(&metadata)); + cache.put(&object_meta.location, cached_entry); - // get and contains of a valid entry - assert!(cache.contains_key(&object_meta)); - let value = cache.get(&object_meta); - assert!(value.is_some()); - let test_file_metadata = Arc::downcast::(value.unwrap()); + // Verify the cached value + assert!(cache.contains_key(&object_meta.location)); + let result = cache.get(&object_meta.location).unwrap(); + let test_file_metadata = Arc::downcast::(result.file_metadata); assert!(test_file_metadata.is_ok()); assert_eq!(test_file_metadata.unwrap().metadata, "retrieved_metadata"); - // file size changed - let mut object_meta2 = object_meta.clone(); - object_meta2.size = 2048; - assert!(cache.get(&object_meta2).is_none()); - assert!(!cache.contains_key(&object_meta2)); - - // file last_modified changed - let mut object_meta2 = object_meta.clone(); - object_meta2.last_modified = - chrono::DateTime::parse_from_rfc3339("2025-07-29T13:13:13+00:00") - .unwrap() - .into(); - assert!(cache.get(&object_meta2).is_none()); - assert!(!cache.contains_key(&object_meta2)); - - // different file - let mut object_meta2 = object_meta.clone(); - object_meta2.location = Path::from("test2"); - assert!(cache.get(&object_meta2).is_none()); - assert!(!cache.contains_key(&object_meta2)); + // Cache hit - check validation + let result2 = cache.get(&object_meta.location).unwrap(); + assert!(result2.is_valid_for(&object_meta)); + + // File size changed - closure should detect invalidity + let object_meta2 = create_test_object_meta("test", 2048); + let result3 = cache.get(&object_meta2.location).unwrap(); + // Cached entry should NOT be valid for new meta + assert!(!result3.is_valid_for(&object_meta2)); + + // Return new entry + let new_entry = + CachedFileMetadataEntry::new(object_meta2.clone(), Arc::clone(&metadata)); + cache.put(&object_meta2.location, new_entry); + + let result4 = cache.get(&object_meta2.location).unwrap(); + assert_eq!(result4.meta.size, 2048); // remove - cache.remove(&object_meta); - assert!(cache.get(&object_meta).is_none()); - assert!(!cache.contains_key(&object_meta)); + cache.remove(&object_meta.location); + assert!(!cache.contains_key(&object_meta.location)); // len and clear - cache.put(&object_meta, Arc::clone(&metadata)); - cache.put(&object_meta2, metadata); + let object_meta3 = create_test_object_meta("test3", 100); + cache.put( + &object_meta.location, + CachedFileMetadataEntry::new(object_meta.clone(), Arc::clone(&metadata)), + ); + cache.put( + &object_meta3.location, + CachedFileMetadataEntry::new(object_meta3.clone(), Arc::clone(&metadata)), + ); assert_eq!(cache.len(), 2); cache.clear(); assert_eq!(cache.len(), 0); @@ -402,92 +368,129 @@ mod tests { let (object_meta2, metadata2) = generate_test_metadata_with_size("2", 500); let (object_meta3, metadata3) = generate_test_metadata_with_size("3", 300); - cache.put(&object_meta1, metadata1); - cache.put(&object_meta2, metadata2); - cache.put(&object_meta3, metadata3); + cache.put( + &object_meta1.location, + CachedFileMetadataEntry::new(object_meta1.clone(), metadata1), + ); + cache.put( + &object_meta2.location, + CachedFileMetadataEntry::new(object_meta2.clone(), metadata2), + ); + cache.put( + &object_meta3.location, + CachedFileMetadataEntry::new(object_meta3.clone(), metadata3), + ); // all entries will fit assert_eq!(cache.len(), 3); assert_eq!(cache.memory_used(), 900); - assert!(cache.contains_key(&object_meta1)); - assert!(cache.contains_key(&object_meta2)); - assert!(cache.contains_key(&object_meta3)); + assert!(cache.contains_key(&object_meta1.location)); + assert!(cache.contains_key(&object_meta2.location)); + assert!(cache.contains_key(&object_meta3.location)); // add a new entry which will remove the least recently used ("1") let (object_meta4, metadata4) = generate_test_metadata_with_size("4", 200); - cache.put(&object_meta4, metadata4); + cache.put( + &object_meta4.location, + CachedFileMetadataEntry::new(object_meta4.clone(), metadata4), + ); assert_eq!(cache.len(), 3); assert_eq!(cache.memory_used(), 1000); - assert!(!cache.contains_key(&object_meta1)); - assert!(cache.contains_key(&object_meta4)); + assert!(!cache.contains_key(&object_meta1.location)); + assert!(cache.contains_key(&object_meta4.location)); // get entry "2", which will move it to the top of the queue, and add a new one which will // remove the new least recently used ("3") - cache.get(&object_meta2); + let _ = cache.get(&object_meta2.location); let (object_meta5, metadata5) = generate_test_metadata_with_size("5", 100); - cache.put(&object_meta5, metadata5); + cache.put( + &object_meta5.location, + CachedFileMetadataEntry::new(object_meta5.clone(), metadata5), + ); assert_eq!(cache.len(), 3); assert_eq!(cache.memory_used(), 800); - assert!(!cache.contains_key(&object_meta3)); - assert!(cache.contains_key(&object_meta5)); + assert!(!cache.contains_key(&object_meta3.location)); + assert!(cache.contains_key(&object_meta5.location)); // new entry which will not be able to fit in the 1000 bytes allocated let (object_meta6, metadata6) = generate_test_metadata_with_size("6", 1200); - cache.put(&object_meta6, metadata6); + cache.put( + &object_meta6.location, + CachedFileMetadataEntry::new(object_meta6.clone(), metadata6), + ); assert_eq!(cache.len(), 3); assert_eq!(cache.memory_used(), 800); - assert!(!cache.contains_key(&object_meta6)); + assert!(!cache.contains_key(&object_meta6.location)); // new entry which is able to fit without removing any entry let (object_meta7, metadata7) = generate_test_metadata_with_size("7", 200); - cache.put(&object_meta7, metadata7); + cache.put( + &object_meta7.location, + CachedFileMetadataEntry::new(object_meta7.clone(), metadata7), + ); assert_eq!(cache.len(), 4); assert_eq!(cache.memory_used(), 1000); - assert!(cache.contains_key(&object_meta7)); + assert!(cache.contains_key(&object_meta7.location)); // new entry which will remove all other entries let (object_meta8, metadata8) = generate_test_metadata_with_size("8", 999); - cache.put(&object_meta8, metadata8); + cache.put( + &object_meta8.location, + CachedFileMetadataEntry::new(object_meta8.clone(), metadata8), + ); assert_eq!(cache.len(), 1); assert_eq!(cache.memory_used(), 999); - assert!(cache.contains_key(&object_meta8)); + assert!(cache.contains_key(&object_meta8.location)); // when updating an entry, the previous ones are not unnecessarily removed let (object_meta9, metadata9) = generate_test_metadata_with_size("9", 300); let (object_meta10, metadata10) = generate_test_metadata_with_size("10", 200); let (object_meta11_v1, metadata11_v1) = generate_test_metadata_with_size("11", 400); - cache.put(&object_meta9, metadata9); - cache.put(&object_meta10, metadata10); - cache.put(&object_meta11_v1, metadata11_v1); + cache.put( + &object_meta9.location, + CachedFileMetadataEntry::new(object_meta9.clone(), metadata9), + ); + cache.put( + &object_meta10.location, + CachedFileMetadataEntry::new(object_meta10.clone(), metadata10), + ); + cache.put( + &object_meta11_v1.location, + CachedFileMetadataEntry::new(object_meta11_v1.clone(), metadata11_v1), + ); assert_eq!(cache.memory_used(), 900); assert_eq!(cache.len(), 3); let (object_meta11_v2, metadata11_v2) = generate_test_metadata_with_size("11", 500); - cache.put(&object_meta11_v2, metadata11_v2); + cache.put( + &object_meta11_v2.location, + CachedFileMetadataEntry::new(object_meta11_v2.clone(), metadata11_v2), + ); assert_eq!(cache.memory_used(), 1000); assert_eq!(cache.len(), 3); - assert!(cache.contains_key(&object_meta9)); - assert!(cache.contains_key(&object_meta10)); - assert!(cache.contains_key(&object_meta11_v2)); - assert!(!cache.contains_key(&object_meta11_v1)); + assert!(cache.contains_key(&object_meta9.location)); + assert!(cache.contains_key(&object_meta10.location)); + assert!(cache.contains_key(&object_meta11_v2.location)); // when updating an entry that now exceeds the limit, the LRU ("9") needs to be removed let (object_meta11_v3, metadata11_v3) = generate_test_metadata_with_size("11", 501); - cache.put(&object_meta11_v3, metadata11_v3); + cache.put( + &object_meta11_v3.location, + CachedFileMetadataEntry::new(object_meta11_v3.clone(), metadata11_v3), + ); assert_eq!(cache.memory_used(), 701); assert_eq!(cache.len(), 2); - assert!(cache.contains_key(&object_meta10)); - assert!(cache.contains_key(&object_meta11_v3)); - assert!(!cache.contains_key(&object_meta11_v2)); + assert!(cache.contains_key(&object_meta10.location)); + assert!(cache.contains_key(&object_meta11_v3.location)); // manually removing an entry that is not the LRU - cache.remove(&object_meta11_v3); + cache.remove(&object_meta11_v3.location); assert_eq!(cache.len(), 1); assert_eq!(cache.memory_used(), 200); - assert!(cache.contains_key(&object_meta10)); - assert!(!cache.contains_key(&object_meta11_v3)); + assert!(cache.contains_key(&object_meta10.location)); + assert!(!cache.contains_key(&object_meta11_v3.location)); // clear cache.clear(); @@ -498,17 +501,26 @@ mod tests { let (object_meta12, metadata12) = generate_test_metadata_with_size("12", 300); let (object_meta13, metadata13) = generate_test_metadata_with_size("13", 200); let (object_meta14, metadata14) = generate_test_metadata_with_size("14", 500); - cache.put(&object_meta12, metadata12); - cache.put(&object_meta13, metadata13); - cache.put(&object_meta14, metadata14); + cache.put( + &object_meta12.location, + CachedFileMetadataEntry::new(object_meta12.clone(), metadata12), + ); + cache.put( + &object_meta13.location, + CachedFileMetadataEntry::new(object_meta13.clone(), metadata13), + ); + cache.put( + &object_meta14.location, + CachedFileMetadataEntry::new(object_meta14.clone(), metadata14), + ); assert_eq!(cache.len(), 3); assert_eq!(cache.memory_used(), 1000); cache.update_cache_limit(600); assert_eq!(cache.len(), 1); assert_eq!(cache.memory_used(), 500); - assert!(!cache.contains_key(&object_meta12)); - assert!(!cache.contains_key(&object_meta13)); - assert!(cache.contains_key(&object_meta14)); + assert!(!cache.contains_key(&object_meta12.location)); + assert!(!cache.contains_key(&object_meta13.location)); + assert!(cache.contains_key(&object_meta14.location)); } #[test] @@ -519,9 +531,18 @@ mod tests { let (object_meta3, metadata3) = generate_test_metadata_with_size("3", 300); // initial entries, all will have hits = 0 - cache.put(&object_meta1, metadata1); - cache.put(&object_meta2, metadata2); - cache.put(&object_meta3, metadata3); + cache.put( + &object_meta1.location, + CachedFileMetadataEntry::new(object_meta1.clone(), metadata1), + ); + cache.put( + &object_meta2.location, + CachedFileMetadataEntry::new(object_meta2.clone(), metadata2), + ); + cache.put( + &object_meta3.location, + CachedFileMetadataEntry::new(object_meta3.clone(), metadata3), + ); assert_eq!( cache.list_entries(), HashMap::from([ @@ -565,7 +586,7 @@ mod tests { ); // new hit on "1" - cache.get(&object_meta1); + let _ = cache.get(&object_meta1.location); assert_eq!( cache.list_entries(), HashMap::from([ @@ -610,7 +631,10 @@ mod tests { // new entry, will evict "2" let (object_meta4, metadata4) = generate_test_metadata_with_size("4", 600); - cache.put(&object_meta4, metadata4); + cache.put( + &object_meta4.location, + CachedFileMetadataEntry::new(object_meta4.clone(), metadata4), + ); assert_eq!( cache.list_entries(), HashMap::from([ @@ -655,7 +679,10 @@ mod tests { // replace entry "1" let (object_meta1_new, metadata1_new) = generate_test_metadata_with_size("1", 50); - cache.put(&object_meta1_new, metadata1_new); + cache.put( + &object_meta1_new.location, + CachedFileMetadataEntry::new(object_meta1_new.clone(), metadata1_new), + ); assert_eq!( cache.list_entries(), HashMap::from([ @@ -699,7 +726,7 @@ mod tests { ); // remove entry "4" - cache.remove(&object_meta4); + cache.remove(&object_meta4.location); assert_eq!( cache.list_entries(), HashMap::from([ diff --git a/datafusion/execution/src/cache/list_files_cache.rs b/datafusion/execution/src/cache/list_files_cache.rs index 661bc47b5468a..36a5b8af59b45 100644 --- a/datafusion/execution/src/cache/list_files_cache.rs +++ b/datafusion/execution/src/cache/list_files_cache.rs @@ -24,7 +24,11 @@ use std::{ use datafusion_common::instant::Instant; use object_store::{ObjectMeta, path::Path}; -use crate::cache::{CacheAccessor, cache_manager::ListFilesCache, lru_queue::LruQueue}; +use crate::cache::{ + CacheAccessor, + cache_manager::{CachedFileList, ListFilesCache}, + lru_queue::LruQueue, +}; pub trait TimeProvider: Send + Sync + 'static { fn now(&self) -> Instant; @@ -50,11 +54,10 @@ impl TimeProvider for SystemTimeProvider { /// the cache exceeds `memory_limit`, the least recently used entries are evicted until the total /// size is lower than the `memory_limit`. /// -/// # `Extra` Handling +/// # Cache API /// -/// Users should use the [`Self::get`] and [`Self::put`] methods. The -/// [`Self::get_with_extra`] and [`Self::put_with_extra`] methods simply call -/// `get` and `put`, respectively. +/// Uses `get` and `put` methods for cache operations. TTL validation is handled internally - +/// expired entries return `None` from `get`. pub struct DefaultListFilesCache { state: Mutex, time_provider: Arc, @@ -84,42 +87,29 @@ impl DefaultListFilesCache { self.time_provider = provider; self } - - /// Returns the cache's memory limit in bytes. - pub fn cache_limit(&self) -> usize { - self.state.lock().unwrap().memory_limit - } - - /// Updates the cache with a new memory limit in bytes. - pub fn update_cache_limit(&self, limit: usize) { - let mut state = self.state.lock().unwrap(); - state.memory_limit = limit; - state.evict_entries(); - } - - /// Returns the TTL (time-to-live) applied to cache entries. - pub fn cache_ttl(&self) -> Option { - self.state.lock().unwrap().ttl - } } struct ListFilesEntry { - metas: Arc>, + cached_file_list: CachedFileList, size_bytes: usize, expires: Option, } impl ListFilesEntry { fn try_new( - metas: Arc>, + cached_file_list: CachedFileList, ttl: Option, now: Instant, ) -> Option { - let size_bytes = (metas.capacity() * size_of::()) - + metas.iter().map(meta_heap_bytes).reduce(|acc, b| acc + b)?; + let size_bytes = (cached_file_list.files.capacity() * size_of::()) + + cached_file_list + .files + .iter() + .map(meta_heap_bytes) + .reduce(|acc, b| acc + b)?; Some(Self { - metas, + cached_file_list, size_bytes, expires: ttl.map(|t| now + t), }) @@ -175,65 +165,22 @@ impl DefaultListFilesCacheState { } } - /// Performs a prefix-aware cache lookup. - /// - /// # Arguments - /// * `table_base` - The table's base path (the cache key) - /// * `prefix` - Optional prefix filter relative to the table base path - /// * `now` - Current time for expiration checking + /// Gets an entry from the cache, checking for expiration. /// - /// # Behavior - /// - Fetches the cache entry for `table_base` - /// - If `prefix` is `Some`, filters results to only files matching `table_base/prefix` - /// - Returns the (potentially filtered) results - /// - /// # Example - /// ```text - /// get_with_prefix("my_table", Some("a=1"), now) - /// → Fetch cache entry for "my_table" - /// → Filter to files matching "my_table/a=1/*" - /// → Return filtered results - /// ``` - fn get_with_prefix( - &mut self, - table_base: &Path, - prefix: Option<&Path>, - now: Instant, - ) -> Option>> { - let entry = self.lru_queue.get(table_base)?; + /// Returns the cached file list if it exists and hasn't expired. + /// If the entry has expired, it is removed from the cache. + fn get(&mut self, key: &Path, now: Instant) -> Option { + let entry = self.lru_queue.get(key)?; // Check expiration if let Some(exp) = entry.expires && now > exp { - self.remove(table_base); + self.remove(key); return None; } - // Early return if no prefix filter - return all files - let Some(prefix) = prefix else { - return Some(Arc::clone(&entry.metas)); - }; - - // Build the full prefix path: table_base/prefix - let mut parts: Vec<_> = table_base.parts().collect(); - parts.extend(prefix.parts()); - let full_prefix = Path::from_iter(parts); - let full_prefix_str = full_prefix.as_ref(); - - // Filter files to only those matching the prefix - let filtered: Vec = entry - .metas - .iter() - .filter(|meta| meta.location.as_ref().starts_with(full_prefix_str)) - .cloned() - .collect(); - - if filtered.is_empty() { - None - } else { - Some(Arc::new(filtered)) - } + Some(entry.cached_file_list.clone()) } /// Checks if the respective entry is currently cached. @@ -263,9 +210,9 @@ impl DefaultListFilesCacheState { fn put( &mut self, key: &Path, - value: Arc>, + value: CachedFileList, now: Instant, - ) -> Option>> { + ) -> Option { let entry = ListFilesEntry::try_new(value, self.ttl, now)?; let entry_size = entry.size_bytes; @@ -284,7 +231,7 @@ impl DefaultListFilesCacheState { self.evict_entries(); - old_value.map(|v| v.metas) + old_value.map(|v| v.cached_file_list) } /// Evicts entries from the LRU cache until `memory_used` is lower than `memory_limit`. @@ -304,10 +251,10 @@ impl DefaultListFilesCacheState { } /// Removes an entry from the cache and returns it, if it exists. - fn remove(&mut self, k: &Path) -> Option>> { + fn remove(&mut self, k: &Path) -> Option { if let Some(entry) = self.lru_queue.remove(k) { self.memory_used -= entry.size_bytes; - Some(entry.metas) + Some(entry.cached_file_list) } else { None } @@ -325,83 +272,20 @@ impl DefaultListFilesCacheState { } } -impl ListFilesCache for DefaultListFilesCache { - fn cache_limit(&self) -> usize { - let state = self.state.lock().unwrap(); - state.memory_limit - } - - fn cache_ttl(&self) -> Option { - let state = self.state.lock().unwrap(); - state.ttl - } - - fn update_cache_limit(&self, limit: usize) { - let mut state = self.state.lock().unwrap(); - state.memory_limit = limit; - state.evict_entries(); - } - - fn update_cache_ttl(&self, ttl: Option) { - let mut state = self.state.lock().unwrap(); - state.ttl = ttl; - state.evict_entries(); - } -} - -impl CacheAccessor>> for DefaultListFilesCache { - type Extra = Option; - - /// Gets all files for the given table base path. - /// - /// This is equivalent to calling `get_with_extra(k, &None)`. - fn get(&self, k: &Path) -> Option>> { - self.get_with_extra(k, &None) - } - - /// Performs a prefix-aware cache lookup. - /// - /// # Arguments - /// * `table_base` - The table's base path (the cache key) - /// * `prefix` - Optional prefix filter (relative to table base) for partition filtering - /// - /// # Behavior - /// - Fetches the cache entry for `table_base` - /// - If `prefix` is `Some`, filters results to only files matching `table_base/prefix` - /// - Returns the (potentially filtered) results - /// - /// This enables efficient partition pruning - a single cached listing of the full table - /// can serve queries for any partition subset without additional storage calls. - fn get_with_extra( - &self, - table_base: &Path, - prefix: &Self::Extra, - ) -> Option>> { +impl CacheAccessor for DefaultListFilesCache { + fn get(&self, key: &Path) -> Option { let mut state = self.state.lock().unwrap(); let now = self.time_provider.now(); - state.get_with_prefix(table_base, prefix.as_ref(), now) + state.get(key, now) } - fn put( - &self, - key: &Path, - value: Arc>, - ) -> Option>> { + fn put(&self, key: &Path, value: CachedFileList) -> Option { let mut state = self.state.lock().unwrap(); let now = self.time_provider.now(); state.put(key, value, now) } - fn put_with_extra( - &self, - key: &Path, - value: Arc>, - _e: &Self::Extra, - ) -> Option>> { - self.put(key, value) - } - - fn remove(&self, k: &Path) -> Option>> { + fn remove(&self, k: &Path) -> Option { let mut state = self.state.lock().unwrap(); state.remove(k) } @@ -427,6 +311,30 @@ impl CacheAccessor>> for DefaultListFilesCache { } } +impl ListFilesCache for DefaultListFilesCache { + fn cache_limit(&self) -> usize { + let state = self.state.lock().unwrap(); + state.memory_limit + } + + fn cache_ttl(&self) -> Option { + let state = self.state.lock().unwrap(); + state.ttl + } + + fn update_cache_limit(&self, limit: usize) { + let mut state = self.state.lock().unwrap(); + state.memory_limit = limit; + state.evict_entries(); + } + + fn update_cache_ttl(&self, ttl: Option) { + let mut state = self.state.lock().unwrap(); + state.ttl = ttl; + state.evict_entries(); + } +} + #[cfg(test)] mod tests { use super::*; @@ -478,22 +386,21 @@ mod tests { } } - /// Helper function to create a vector of ObjectMeta with at least meta_size bytes + /// Helper function to create a CachedFileList with at least meta_size bytes fn create_test_list_files_entry( path: &str, count: usize, meta_size: usize, - ) -> (Path, Arc>, usize) { + ) -> (Path, CachedFileList, usize) { let metas: Vec = (0..count) .map(|i| create_test_object_meta(&format!("file{i}"), meta_size)) .collect(); - let metas = Arc::new(metas); // Calculate actual size using the same logic as ListFilesEntry::try_new let size = (metas.capacity() * size_of::()) + metas.iter().map(meta_heap_bytes).sum::(); - (Path::from(path), metas, size) + (Path::from(path), CachedFileList::new(metas), size) } #[test] @@ -502,25 +409,25 @@ mod tests { let path = Path::from("test_path"); // Initially cache is empty - assert!(cache.get(&path).is_none()); assert!(!cache.contains_key(&path)); assert_eq!(cache.len(), 0); - // Put an entry + // Cache miss - get returns None + assert!(cache.get(&path).is_none()); + + // Put a value let meta = create_test_object_meta("file1", 50); - let value = Arc::new(vec![meta.clone()]); - cache.put(&path, Arc::clone(&value)); + cache.put(&path, CachedFileList::new(vec![meta])); - // Entry should be retrievable + // Entry should be cached assert!(cache.contains_key(&path)); assert_eq!(cache.len(), 1); - let retrieved = cache.get(&path).unwrap(); - assert_eq!(retrieved.len(), 1); - assert_eq!(retrieved[0].location, meta.location); + let result = cache.get(&path).unwrap(); + assert_eq!(result.files.len(), 1); // Remove the entry let removed = cache.remove(&path).unwrap(); - assert_eq!(removed.len(), 1); + assert_eq!(removed.files.len(), 1); assert!(!cache.contains_key(&path)); assert_eq!(cache.len(), 0); @@ -583,7 +490,7 @@ mod tests { // Access path1 to move it to front (MRU) // Order is now: path2 (LRU), path3, path1 (MRU) - cache.get(&path1); + let _ = cache.get(&path1); // Adding a new entry should evict path2 (the LRU) let (path4, value4, _) = create_test_list_files_entry("path4", 1, 100); @@ -609,6 +516,7 @@ mod tests { assert_eq!(cache.len(), 2); // Try to add an entry that's too large to fit in the cache + // The entry is not stored (too large) let (path_large, value_large, _) = create_test_list_files_entry("large", 1, 1000); cache.put(&path_large, value_large); @@ -681,7 +589,7 @@ mod tests { // Add three entries cache.put(&path1, value1); cache.put(&path2, value2); - cache.put(&path3, value3_v1); + cache.put(&path3.clone(), value3_v1); assert_eq!(cache.len(), 3); // Update path3 with same size - should not cause eviction @@ -715,8 +623,6 @@ mod tests { cache.put(&path2, value2); // Entries should be accessible immediately - assert!(cache.get(&path1).is_some()); - assert!(cache.get(&path2).is_some()); assert!(cache.contains_key(&path1)); assert!(cache.contains_key(&path2)); assert_eq!(cache.len(), 2); @@ -724,9 +630,9 @@ mod tests { // Wait for TTL to expire thread::sleep(Duration::from_millis(150)); - // Entries should now return None and be removed when observed through get or contains_key - assert!(cache.get(&path1).is_none()); - assert_eq!(cache.len(), 1); // path1 was removed by get() + // Entries should now return None when observed through contains_key + assert!(!cache.contains_key(&path1)); + assert_eq!(cache.len(), 1); // path1 was removed by contains_key() assert!(!cache.contains_key(&path2)); assert_eq!(cache.len(), 0); // path2 was removed by contains_key() } @@ -757,7 +663,30 @@ mod tests { mock_time.inc(Duration::from_millis(151)); assert!(!cache.contains_key(&path2)); // Expired - assert!(cache.contains_key(&path3)); // Still valid + assert!(cache.contains_key(&path3)); // Still valid + } + + #[test] + fn test_ttl_expiration_in_get() { + let ttl = Duration::from_millis(100); + let cache = DefaultListFilesCache::new(10000, Some(ttl)); + + let (path, value, _) = create_test_list_files_entry("path", 2, 50); + + // Cache the entry + cache.put(&path, value.clone()); + + // Entry should be accessible immediately + let result = cache.get(&path); + assert!(result.is_some()); + assert_eq!(result.unwrap().files.len(), 2); + + // Wait for TTL to expire + thread::sleep(Duration::from_millis(150)); + + // Get should return None because entry expired + let result2 = cache.get(&path); + assert!(result2.is_none()); } #[test] @@ -806,28 +735,29 @@ mod tests { #[test] fn test_entry_creation() { // Test with empty vector - let empty_vec: Arc> = Arc::new(vec![]); + let empty_list = CachedFileList::new(vec![]); let now = Instant::now(); - let entry = ListFilesEntry::try_new(empty_vec, None, now); + let entry = ListFilesEntry::try_new(empty_list, None, now); assert!(entry.is_none()); // Validate entry size let metas: Vec = (0..5) .map(|i| create_test_object_meta(&format!("file{i}"), 30)) .collect(); - let metas = Arc::new(metas); - let entry = ListFilesEntry::try_new(metas, None, now).unwrap(); - assert_eq!(entry.metas.len(), 5); + let cached_list = CachedFileList::new(metas); + let entry = ListFilesEntry::try_new(cached_list, None, now).unwrap(); + assert_eq!(entry.cached_file_list.files.len(), 5); // Size should be: capacity * sizeof(ObjectMeta) + (5 * 30) for heap bytes - let expected_size = - (entry.metas.capacity() * size_of::()) + (entry.metas.len() * 30); + let expected_size = (entry.cached_file_list.files.capacity() + * size_of::()) + + (entry.cached_file_list.files.len() * 30); assert_eq!(entry.size_bytes, expected_size); // Test with TTL let meta = create_test_object_meta("file", 50); let ttl = Duration::from_secs(10); - let entry = - ListFilesEntry::try_new(Arc::new(vec![meta]), Some(ttl), now).unwrap(); + let cached_list = CachedFileList::new(vec![meta]); + let entry = ListFilesEntry::try_new(cached_list, Some(ttl), now).unwrap(); assert!(entry.expires.unwrap() > now); } @@ -872,7 +802,7 @@ mod tests { } } - // Prefix-aware cache tests + // Prefix filtering tests using CachedFileList::filter_by_prefix /// Helper function to create ObjectMeta with a specific location path fn create_object_meta_with_path(location: &str) -> ObjectMeta { @@ -888,30 +818,27 @@ mod tests { } #[test] - fn test_prefix_aware_cache_hit() { - // Scenario: Cache has full table listing, query for partition returns filtered results + fn test_prefix_filtering() { let cache = DefaultListFilesCache::new(100000, None); // Create files for a partitioned table let table_base = Path::from("my_table"); - let files = Arc::new(vec![ + let files = vec![ create_object_meta_with_path("my_table/a=1/file1.parquet"), create_object_meta_with_path("my_table/a=1/file2.parquet"), create_object_meta_with_path("my_table/a=2/file3.parquet"), create_object_meta_with_path("my_table/a=2/file4.parquet"), - ]); + ]; // Cache the full table listing - cache.put(&table_base, files); + let cached = CachedFileList::new(files); + cache.put(&table_base, cached); - // Query for partition a=1 using get_with_extra - // New API: get_with_extra(table_base, Some(relative_prefix)) - let prefix_a1 = Some(Path::from("a=1")); - let result = cache.get_with_extra(&table_base, &prefix_a1); + let result = cache.get(&table_base).unwrap(); - // Should return filtered results (only files from a=1) - assert!(result.is_some()); - let filtered = result.unwrap(); + // Filter for partition a=1 + let prefix_a1 = Some(Path::from("my_table/a=1")); + let filtered = result.filter_by_prefix(&prefix_a1); assert_eq!(filtered.len(), 2); assert!( filtered @@ -919,92 +846,46 @@ mod tests { .all(|m| m.location.as_ref().starts_with("my_table/a=1")) ); - // Query for partition a=2 - let prefix_a2 = Some(Path::from("a=2")); - let result_2 = cache.get_with_extra(&table_base, &prefix_a2); - - assert!(result_2.is_some()); - let filtered_2 = result_2.unwrap(); + // Filter for partition a=2 + let prefix_a2 = Some(Path::from("my_table/a=2")); + let filtered_2 = result.filter_by_prefix(&prefix_a2); assert_eq!(filtered_2.len(), 2); assert!( filtered_2 .iter() .all(|m| m.location.as_ref().starts_with("my_table/a=2")) ); - } - - #[test] - fn test_prefix_aware_cache_no_filter_returns_all() { - // Scenario: Query with no prefix filter should return all files - let cache = DefaultListFilesCache::new(100000, None); - - let table_base = Path::from("my_table"); - - // Cache full table listing with 4 files - let full_files = Arc::new(vec![ - create_object_meta_with_path("my_table/a=1/file1.parquet"), - create_object_meta_with_path("my_table/a=1/file2.parquet"), - create_object_meta_with_path("my_table/a=2/file3.parquet"), - create_object_meta_with_path("my_table/a=2/file4.parquet"), - ]); - cache.put(&table_base, full_files); - - // Query with no prefix filter (None) should return all 4 files - let result = cache.get_with_extra(&table_base, &None); - assert!(result.is_some()); - let files = result.unwrap(); - assert_eq!(files.len(), 4); - - // Also test using get() which delegates to get_with_extra(&None) - let result_get = cache.get(&table_base); - assert!(result_get.is_some()); - assert_eq!(result_get.unwrap().len(), 4); - } - - #[test] - fn test_prefix_aware_cache_miss_no_entry() { - // Scenario: Table not cached, query should miss - let cache = DefaultListFilesCache::new(100000, None); - - let table_base = Path::from("my_table"); - // Query for full table should miss (nothing cached) - let result = cache.get_with_extra(&table_base, &None); - assert!(result.is_none()); - - // Query with prefix should also miss - let prefix = Some(Path::from("a=1")); - let result_2 = cache.get_with_extra(&table_base, &prefix); - assert!(result_2.is_none()); + // No filter returns all + let all = result.filter_by_prefix(&None); + assert_eq!(all.len(), 4); } #[test] - fn test_prefix_aware_cache_no_matching_files() { - // Scenario: Cache has table listing but no files match the requested partition + fn test_prefix_no_matching_files() { let cache = DefaultListFilesCache::new(100000, None); let table_base = Path::from("my_table"); - let files = Arc::new(vec![ + let files = vec![ create_object_meta_with_path("my_table/a=1/file1.parquet"), create_object_meta_with_path("my_table/a=2/file2.parquet"), - ]); - cache.put(&table_base, files); + ]; - // Query for partition a=3 which doesn't exist - let prefix_a3 = Some(Path::from("a=3")); - let result = cache.get_with_extra(&table_base, &prefix_a3); + cache.put(&table_base, CachedFileList::new(files)); + let result = cache.get(&table_base).unwrap(); - // Should return None since no files match - assert!(result.is_none()); + // Query for partition a=3 which doesn't exist + let prefix_a3 = Some(Path::from("my_table/a=3")); + let filtered = result.filter_by_prefix(&prefix_a3); + assert!(filtered.is_empty()); } #[test] - fn test_prefix_aware_nested_partitions() { - // Scenario: Table with multiple partition levels (e.g., year/month/day) + fn test_nested_partitions() { let cache = DefaultListFilesCache::new(100000, None); let table_base = Path::from("events"); - let files = Arc::new(vec![ + let files = vec![ create_object_meta_with_path( "events/year=2024/month=01/day=01/file1.parquet", ), @@ -1017,56 +898,19 @@ mod tests { create_object_meta_with_path( "events/year=2025/month=01/day=01/file4.parquet", ), - ]); - cache.put(&table_base, files); + ]; - // Query for year=2024/month=01 (should get 2 files) - let prefix_month = Some(Path::from("year=2024/month=01")); - let result = cache.get_with_extra(&table_base, &prefix_month); - assert!(result.is_some()); - assert_eq!(result.unwrap().len(), 2); - - // Query for year=2024 (should get 3 files) - let prefix_year = Some(Path::from("year=2024")); - let result_year = cache.get_with_extra(&table_base, &prefix_year); - assert!(result_year.is_some()); - assert_eq!(result_year.unwrap().len(), 3); - - // Query for specific day (should get 1 file) - let prefix_day = Some(Path::from("year=2024/month=01/day=01")); - let result_day = cache.get_with_extra(&table_base, &prefix_day); - assert!(result_day.is_some()); - assert_eq!(result_day.unwrap().len(), 1); - } + cache.put(&table_base, CachedFileList::new(files)); + let result = cache.get(&table_base).unwrap(); - #[test] - fn test_prefix_aware_different_tables() { - // Scenario: Multiple tables cached, queries should not cross-contaminate - let cache = DefaultListFilesCache::new(100000, None); + // Filter for year=2024/month=01 + let prefix_month = Some(Path::from("events/year=2024/month=01")); + let filtered = result.filter_by_prefix(&prefix_month); + assert_eq!(filtered.len(), 2); - let table_a = Path::from("table_a"); - let table_b = Path::from("table_b"); - - let files_a = Arc::new(vec![create_object_meta_with_path( - "table_a/part=1/file1.parquet", - )]); - let files_b = Arc::new(vec![ - create_object_meta_with_path("table_b/part=1/file1.parquet"), - create_object_meta_with_path("table_b/part=2/file2.parquet"), - ]); - - cache.put(&table_a, files_a); - cache.put(&table_b, files_b); - - // Query table_a should only return table_a files - let result_a = cache.get(&table_a); - assert!(result_a.is_some()); - assert_eq!(result_a.unwrap().len(), 1); - - // Query table_b with prefix should only return matching table_b files - let prefix = Some(Path::from("part=1")); - let result_b = cache.get_with_extra(&table_b, &prefix); - assert!(result_b.is_some()); - assert_eq!(result_b.unwrap().len(), 1); + // Filter for year=2024 + let prefix_year = Some(Path::from("events/year=2024")); + let filtered_year = result.filter_by_prefix(&prefix_year); + assert_eq!(filtered_year.len(), 3); } } diff --git a/datafusion/execution/src/cache/mod.rs b/datafusion/execution/src/cache/mod.rs index 8172069fdbabd..795f7e9bc2838 100644 --- a/datafusion/execution/src/cache/mod.rs +++ b/datafusion/execution/src/cache/mod.rs @@ -25,35 +25,54 @@ mod list_files_cache; pub use file_metadata_cache::DefaultFilesMetadataCache; pub use list_files_cache::DefaultListFilesCache; -/// A trait that can be implemented to provide custom cache behavior for the caches managed by -/// [`cache_manager::CacheManager`]. +/// Base trait for cache implementations with common operations. +/// +/// This trait provides the fundamental cache operations (`get`, `put`, `remove`, etc.) +/// that all cache types share. Specific cache traits like [`cache_manager::FileStatisticsCache`], +/// [`cache_manager::ListFilesCache`], and [`cache_manager::FileMetadataCache`] extend this +/// trait with their specialized methods. +/// +/// ## Thread Safety /// /// Implementations must handle their own locking via internal mutability, as methods do not /// take mutable references and may be accessed by multiple concurrent queries. +/// +/// ## Validation Pattern +/// +/// Validation metadata (e.g., file size, last modified time) should be embedded in the +/// value type `V`. The typical usage pattern is: +/// 1. Call `get(key)` to check for cached value +/// 2. If `Some(cached)`, validate with `cached.is_valid_for(¤t_meta)` +/// 3. If invalid or missing, compute new value and call `put(key, new_value)` pub trait CacheAccessor: Send + Sync { - // Extra info but not part of the cache key or cache value. - type Extra: Clone; - - /// Get value from cache. - fn get(&self, k: &K) -> Option; - /// Get value from cache. - fn get_with_extra(&self, k: &K, e: &Self::Extra) -> Option; - /// Put value into cache. Returns the old value associated with the key if there was one. + /// Get a cached entry if it exists. + /// + /// Returns the cached value without any validation. The caller should + /// validate the returned value if freshness matters. + fn get(&self, key: &K) -> Option; + + /// Store a value in the cache. + /// + /// Returns the previous value if one existed. fn put(&self, key: &K, value: V) -> Option; - /// Put value into cache. Returns the old value associated with the key if there was one. - fn put_with_extra(&self, key: &K, value: V, e: &Self::Extra) -> Option; - /// Remove an entry from the cache, returning value if they existed in the map. + + /// Remove an entry from the cache, returning the value if it existed. fn remove(&self, k: &K) -> Option; + /// Check if the cache contains a specific key. fn contains_key(&self, k: &K) -> bool; + /// Fetch the total number of cache entries. fn len(&self) -> usize; - /// Check if the Cache collection is empty or not. + + /// Check if the cache collection is empty. fn is_empty(&self) -> bool { self.len() == 0 } + /// Remove all entries from the cache. fn clear(&self); + /// Return the cache name. fn name(&self) -> String; }