diff --git a/datafusion-cli/src/functions.rs b/datafusion-cli/src/functions.rs index aa83fec1118ed..8a6ad448d895d 100644 --- a/datafusion-cli/src/functions.rs +++ b/datafusion-cli/src/functions.rs @@ -703,6 +703,23 @@ impl TableFunctionImpl for StatisticsCacheFunc { } } +// Implementation of the `list_files_cache` table function in datafusion-cli. +/// +/// This function returns the cached results of running a LIST command on a particular object store path for a table. The object metadata is returned as a List of Structs, with one Struct for each object. +/// DataFusion uses these cached results to plan queries against external tables. +/// # Schema +/// ```sql +/// > describe select * from list_files_cache(); +/// +---------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+ +/// | column_name | data_type | is_nullable | +/// +---------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+ +/// | table | Utf8 | NO | +/// | path | Utf8 | NO | +/// | metadata_size_bytes | UInt64 | NO | +/// | expires_in | Duration(ms) | YES | +/// | metadata_list | List(Struct("file_path": non-null Utf8, "file_modified": non-null Timestamp(ms), "file_size_bytes": non-null UInt64, "e_tag": Utf8, "version": Utf8), field: 'metadata') | YES | +/// +---------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+ +/// ``` #[derive(Debug)] struct ListFilesCacheTable { schema: SchemaRef, @@ -771,6 +788,7 @@ impl TableFunctionImpl for ListFilesCacheFunc { Field::new("metadata", DataType::Struct(nested_fields.clone()), true); let schema = Arc::new(Schema::new(vec![ + Field::new("table", DataType::Utf8, false), Field::new("path", DataType::Utf8, false), Field::new("metadata_size_bytes", DataType::UInt64, false), // expires field in ListFilesEntry has type Instant when set, from which we cannot get "the number of seconds", hence using Duration instead of Timestamp as data type. @@ -786,6 +804,7 @@ impl TableFunctionImpl for ListFilesCacheFunc { ), ])); + let mut table_arr = vec![]; let mut path_arr = vec![]; let mut metadata_size_bytes_arr = vec![]; let mut expires_arr = vec![]; @@ -802,7 +821,8 @@ impl TableFunctionImpl for ListFilesCacheFunc { let mut current_offset: i32 = 0; for (path, entry) in list_files_cache.list_entries() { - path_arr.push(path.to_string()); + table_arr.push(path.table.map_or("NULL".to_string(), |t| t.to_string())); + path_arr.push(path.path.to_string()); metadata_size_bytes_arr.push(entry.size_bytes as u64); // calculates time left before entry expires expires_arr.push( @@ -841,6 +861,7 @@ impl TableFunctionImpl for ListFilesCacheFunc { let batch = RecordBatch::try_new( schema.clone(), vec![ + Arc::new(StringArray::from(table_arr)), Arc::new(StringArray::from(path_arr)), Arc::new(UInt64Array::from(metadata_size_bytes_arr)), Arc::new(DurationMillisecondArray::from(expires_arr)), diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs index 46d88152fac12..9e53260e42773 100644 --- a/datafusion-cli/src/main.rs +++ b/datafusion-cli/src/main.rs @@ -848,35 +848,4 @@ mod tests { Ok(()) } - - #[tokio::test] - async fn test_list_files_cache_not_set() -> Result<(), DataFusionError> { - let rt = RuntimeEnvBuilder::new() - .with_cache_manager(CacheManagerConfig::default().with_list_files_cache(None)) - .build_arc() - .unwrap(); - - let ctx = SessionContext::new_with_config_rt(SessionConfig::default(), rt); - - ctx.register_udtf( - "list_files_cache", - Arc::new(ListFilesCacheFunc::new( - ctx.task_ctx().runtime_env().cache_manager.clone(), - )), - ); - - let rbs = ctx - .sql("SELECT * FROM list_files_cache()") - .await? - .collect() - .await?; - assert_snapshot!(batches_to_string(&rbs),@r" - +------+---------------------+------------+---------------+ - | path | metadata_size_bytes | expires_in | metadata_list | - +------+---------------------+------------+---------------+ - +------+---------------------+------------+---------------+ - "); - - Ok(()) - } } diff --git a/datafusion/catalog-listing/src/table.rs b/datafusion/catalog-listing/src/table.rs index 9fb2dd2dce29c..a175d47f4de65 100644 --- a/datafusion/catalog-listing/src/table.rs +++ b/datafusion/catalog-listing/src/table.rs @@ -34,6 +34,7 @@ use datafusion_datasource::schema_adapter::SchemaAdapterFactory; use datafusion_datasource::{ ListingTableUrl, PartitionedFile, TableSchema, compute_all_files_statistics, }; +use datafusion_execution::cache::TableScopedPath; use datafusion_execution::cache::cache_manager::FileStatisticsCache; use datafusion_execution::cache::cache_unit::DefaultFileStatisticsCache; use datafusion_expr::dml::InsertOp; @@ -565,7 +566,11 @@ impl TableProvider for ListingTable { // Invalidate cache entries for this table if they exist if let Some(lfc) = state.runtime_env().cache_manager.get_list_files_cache() { - let _ = lfc.remove(table_path.prefix()); + let key = TableScopedPath { + table: table_path.get_table_ref().clone(), + path: table_path.prefix().clone(), + }; + let _ = lfc.remove(&key); } // Sink related option, apart from format diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs index 3ca388af0c4c1..86af691fd7248 100644 --- a/datafusion/core/src/datasource/listing_table_factory.rs +++ b/datafusion/core/src/datasource/listing_table_factory.rs @@ -63,7 +63,8 @@ impl TableProviderFactory for ListingTableFactory { ))? .create(session_state, &cmd.options)?; - let mut table_path = ListingTableUrl::parse(&cmd.location)?; + let mut table_path = + ListingTableUrl::parse(&cmd.location)?.with_table_ref(cmd.name.clone()); let file_extension = match table_path.is_collection() { // Setting the extension to be empty instead of allowing the default extension seems // odd, but was done to ensure existing behavior isn't modified. It seems like this @@ -160,7 +161,9 @@ impl TableProviderFactory for ListingTableFactory { } None => format!("*.{}", cmd.file_type.to_lowercase()), }; - table_path = table_path.with_glob(glob.as_ref())?; + table_path = table_path + .with_glob(glob.as_ref())? + .with_table_ref(cmd.name.clone()); } let schema = options.infer_schema(session_state, &table_path).await?; let df_schema = Arc::clone(&schema).to_dfschema()?; diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index a769bb01b4354..6df90b205c8ec 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -1315,7 +1315,7 @@ impl SessionContext { let table = table_ref.table().to_owned(); let maybe_schema = { let state = self.state.read(); - let resolved = state.resolve_table_ref(table_ref); + let resolved = state.resolve_table_ref(table_ref.clone()); state .catalog_list() .catalog(&resolved.catalog) @@ -1327,6 +1327,11 @@ impl SessionContext { && table_provider.table_type() == table_type { schema.deregister_table(&table)?; + if table_type == TableType::Base + && let Some(lfc) = self.runtime_env().cache_manager.get_list_files_cache() + { + lfc.drop_table_entries(&Some(table_ref))?; + } return Ok(true); } diff --git a/datafusion/datasource/src/url.rs b/datafusion/datasource/src/url.rs index 155d6efe462c1..2428275ac3c36 100644 --- a/datafusion/datasource/src/url.rs +++ b/datafusion/datasource/src/url.rs @@ -17,7 +17,8 @@ use std::sync::Arc; -use datafusion_common::{DataFusionError, Result}; +use datafusion_common::{DataFusionError, Result, TableReference}; +use datafusion_execution::cache::TableScopedPath; use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_session::Session; @@ -41,6 +42,8 @@ pub struct ListingTableUrl { prefix: Path, /// An optional glob expression used to filter files glob: Option, + + table_ref: Option, } impl ListingTableUrl { @@ -145,7 +148,12 @@ impl ListingTableUrl { /// to create a [`ListingTableUrl`]. pub fn try_new(url: Url, glob: Option) -> Result { let prefix = Path::from_url_path(url.path())?; - Ok(Self { url, prefix, glob }) + Ok(Self { + url, + prefix, + glob, + table_ref: None, + }) } /// Returns the URL scheme @@ -255,7 +263,14 @@ impl ListingTableUrl { }; let list: BoxStream<'a, Result> = if self.is_collection() { - list_with_cache(ctx, store, &self.prefix, prefix.as_ref()).await? + list_with_cache( + ctx, + store, + self.table_ref.as_ref(), + &self.prefix, + prefix.as_ref(), + ) + .await? } else { match store.head(&full_prefix).await { Ok(meta) => futures::stream::once(async { Ok(meta) }) @@ -264,7 +279,14 @@ impl ListingTableUrl { // If the head command fails, it is likely that object doesn't exist. // Retry as though it were a prefix (aka a collection) Err(object_store::Error::NotFound { .. }) => { - list_with_cache(ctx, store, &self.prefix, prefix.as_ref()).await? + list_with_cache( + ctx, + store, + self.table_ref.as_ref(), + &self.prefix, + prefix.as_ref(), + ) + .await? } Err(e) => return Err(e.into()), } @@ -323,6 +345,15 @@ impl ListingTableUrl { Pattern::new(glob).map_err(|e| DataFusionError::External(Box::new(e)))?; Self::try_new(self.url, Some(glob)) } + + pub fn with_table_ref(mut self, table_ref: TableReference) -> Self { + self.table_ref = Some(table_ref); + self + } + + pub fn get_table_ref(&self) -> &Option { + &self.table_ref + } } /// Lists files with cache support, using prefix-aware lookups. @@ -345,6 +376,7 @@ impl ListingTableUrl { async fn list_with_cache<'b>( ctx: &'b dyn Session, store: &'b dyn ObjectStore, + table_ref: Option<&TableReference>, table_base_path: &Path, prefix: Option<&Path>, ) -> Result>> { @@ -367,9 +399,14 @@ async fn list_with_cache<'b>( // Convert prefix to Option for cache lookup let prefix_filter = prefix.cloned(); + let table_scoped_base_path = TableScopedPath { + table: table_ref.cloned(), + path: table_base_path.clone(), + }; + // Try cache lookup with optional prefix filter let vec = if let Some(res) = - cache.get_with_extra(table_base_path, &prefix_filter) + cache.get_with_extra(&table_scoped_base_path, &prefix_filter) { debug!("Hit list files cache"); res.as_ref().clone() @@ -380,7 +417,7 @@ async fn list_with_cache<'b>( .list(Some(table_base_path)) .try_collect::>() .await?; - cache.put(table_base_path, Arc::new(vec.clone())); + cache.put(&table_scoped_base_path, Arc::new(vec.clone())); // If a prefix filter was requested, apply it to the results if prefix.is_some() { diff --git a/datafusion/execution/src/cache/cache_manager.rs b/datafusion/execution/src/cache/cache_manager.rs index 31a2323524dd4..162074d909ead 100644 --- a/datafusion/execution/src/cache/cache_manager.rs +++ b/datafusion/execution/src/cache/cache_manager.rs @@ -17,7 +17,9 @@ use crate::cache::cache_unit::DefaultFilesMetadataCache; use crate::cache::list_files_cache::ListFilesEntry; +use crate::cache::list_files_cache::TableScopedPath; use crate::cache::{CacheAccessor, DefaultListFilesCache}; +use datafusion_common::TableReference; use datafusion_common::stats::Precision; use datafusion_common::{Result, Statistics}; use object_store::ObjectMeta; @@ -81,7 +83,7 @@ pub struct FileStatisticsCacheEntry { /// /// See [`crate::runtime_env::RuntimeEnv`] for more details. pub trait ListFilesCache: - CacheAccessor>, Extra = Option> + CacheAccessor>, Extra = Option> { /// Returns the cache's memory limit in bytes. fn cache_limit(&self) -> usize; @@ -96,7 +98,9 @@ pub trait ListFilesCache: fn update_cache_ttl(&self, ttl: Option); /// Retrieves the information about the entries currently cached. - fn list_entries(&self) -> HashMap; + fn list_entries(&self) -> HashMap; + + fn drop_table_entries(&self, table_ref: &Option) -> Result<()>; } /// Generic file-embedded metadata used with [`FileMetadataCache`]. diff --git a/datafusion/execution/src/cache/list_files_cache.rs b/datafusion/execution/src/cache/list_files_cache.rs index c4a92c49478d7..858219e5b883f 100644 --- a/datafusion/execution/src/cache/list_files_cache.rs +++ b/datafusion/execution/src/cache/list_files_cache.rs @@ -22,6 +22,7 @@ use std::{ time::Duration, }; +use datafusion_common::TableReference; use datafusion_common::instant::Instant; use object_store::{ObjectMeta, path::Path}; @@ -148,9 +149,15 @@ pub const DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT: usize = 1024 * 1024; // 1MiB /// The default cache TTL for the [`DefaultListFilesCache`] pub const DEFAULT_LIST_FILES_CACHE_TTL: Option = None; // Infinite +#[derive(PartialEq, Eq, Hash, Clone, Debug)] +pub struct TableScopedPath { + pub table: Option, + pub path: Path, +} + /// Handles the inner state of the [`DefaultListFilesCache`] struct. pub struct DefaultListFilesCacheState { - lru_queue: LruQueue, + lru_queue: LruQueue, memory_limit: usize, memory_used: usize, ttl: Option, @@ -198,17 +205,17 @@ impl DefaultListFilesCacheState { /// ``` fn get_with_prefix( &mut self, - table_base: &Path, + table_scoped_base_path: &TableScopedPath, prefix: Option<&Path>, now: Instant, ) -> Option>> { - let entry = self.lru_queue.get(table_base)?; + let entry = self.lru_queue.get(table_scoped_base_path)?; // Check expiration if let Some(exp) = entry.expires && now > exp { - self.remove(table_base); + self.remove(table_scoped_base_path); return None; } @@ -218,6 +225,7 @@ impl DefaultListFilesCacheState { }; // Build the full prefix path: table_base/prefix + let table_base = &table_scoped_base_path.path; let mut parts: Vec<_> = table_base.parts().collect(); parts.extend(prefix.parts()); let full_prefix = Path::from_iter(parts); @@ -243,7 +251,7 @@ impl DefaultListFilesCacheState { /// If the entry has expired by `now` it is removed from the cache. /// /// The LRU queue is not updated. - fn contains_key(&mut self, k: &Path, now: Instant) -> bool { + fn contains_key(&mut self, k: &TableScopedPath, now: Instant) -> bool { let Some(entry) = self.lru_queue.peek(k) else { return false; }; @@ -264,7 +272,7 @@ impl DefaultListFilesCacheState { /// If the size of the entry is greater than the `memory_limit`, the value is not inserted. fn put( &mut self, - key: &Path, + key: &TableScopedPath, value: Arc>, now: Instant, ) -> Option>> { @@ -306,7 +314,7 @@ impl DefaultListFilesCacheState { } /// Removes an entry from the cache and returns it, if it exists. - fn remove(&mut self, k: &Path) -> Option>> { + fn remove(&mut self, k: &TableScopedPath) -> Option>> { if let Some(entry) = self.lru_queue.remove(k) { self.memory_used -= entry.size_bytes; Some(entry.metas) @@ -350,23 +358,40 @@ impl ListFilesCache for DefaultListFilesCache { state.evict_entries(); } - fn list_entries(&self) -> HashMap { + fn list_entries(&self) -> HashMap { let state = self.state.lock().unwrap(); - let mut entries = HashMap::::new(); + let mut entries = HashMap::::new(); for (path, entry) in state.lru_queue.list_entries() { entries.insert(path.clone(), entry.clone()); } entries } + + fn drop_table_entries( + &self, + table_ref: &Option, + ) -> datafusion_common::Result<()> { + let mut state = self.state.lock().unwrap(); + let mut table_paths = vec![]; + for (path, _) in state.lru_queue.list_entries() { + if path.table == *table_ref { + table_paths.push(path.clone()); + } + } + for path in table_paths { + state.remove(&path); + } + Ok(()) + } } -impl CacheAccessor>> for DefaultListFilesCache { +impl CacheAccessor>> for DefaultListFilesCache { type Extra = Option; /// Gets all files for the given table base path. /// /// This is equivalent to calling `get_with_extra(k, &None)`. - fn get(&self, k: &Path) -> Option>> { + fn get(&self, k: &TableScopedPath) -> Option>> { self.get_with_extra(k, &None) } @@ -385,17 +410,17 @@ impl CacheAccessor>> for DefaultListFilesCache { /// can serve queries for any partition subset without additional storage calls. fn get_with_extra( &self, - table_base: &Path, + table_scoped_path: &TableScopedPath, prefix: &Self::Extra, ) -> Option>> { let mut state = self.state.lock().unwrap(); let now = self.time_provider.now(); - state.get_with_prefix(table_base, prefix.as_ref(), now) + state.get_with_prefix(table_scoped_path, prefix.as_ref(), now) } fn put( &self, - key: &Path, + key: &TableScopedPath, value: Arc>, ) -> Option>> { let mut state = self.state.lock().unwrap(); @@ -405,19 +430,19 @@ impl CacheAccessor>> for DefaultListFilesCache { fn put_with_extra( &self, - key: &Path, + key: &TableScopedPath, value: Arc>, _e: &Self::Extra, ) -> Option>> { self.put(key, value) } - fn remove(&self, k: &Path) -> Option>> { + fn remove(&self, k: &TableScopedPath) -> Option>> { let mut state = self.state.lock().unwrap(); state.remove(k) } - fn contains_key(&self, k: &Path) -> bool { + fn contains_key(&self, k: &TableScopedPath) -> bool { let mut state = self.state.lock().unwrap(); let now = self.time_provider.now(); state.contains_key(k, now) @@ -509,36 +534,49 @@ mod tests { #[test] fn test_basic_operations() { let cache = DefaultListFilesCache::default(); + let table_ref = Some(TableReference::from("table")); let path = Path::from("test_path"); + let key = TableScopedPath { + table: table_ref.clone(), + path, + }; // Initially cache is empty - assert!(cache.get(&path).is_none()); - assert!(!cache.contains_key(&path)); + assert!(cache.get(&key).is_none()); + assert!(!cache.contains_key(&key)); assert_eq!(cache.len(), 0); // Put an entry let meta = create_test_object_meta("file1", 50); let value = Arc::new(vec![meta.clone()]); - cache.put(&path, Arc::clone(&value)); + cache.put(&key, Arc::clone(&value)); // Entry should be retrievable - assert!(cache.contains_key(&path)); + assert!(cache.contains_key(&key)); assert_eq!(cache.len(), 1); - let retrieved = cache.get(&path).unwrap(); + let retrieved = cache.get(&key).unwrap(); assert_eq!(retrieved.len(), 1); assert_eq!(retrieved[0].location, meta.location); // Remove the entry - let removed = cache.remove(&path).unwrap(); + let removed = cache.remove(&key).unwrap(); assert_eq!(removed.len(), 1); - assert!(!cache.contains_key(&path)); + assert!(!cache.contains_key(&key)); assert_eq!(cache.len(), 0); // Put multiple entries let (path1, value1, size1) = create_test_list_files_entry("path1", 2, 50); let (path2, value2, size2) = create_test_list_files_entry("path2", 3, 50); - cache.put(&path1, Arc::clone(&value1)); - cache.put(&path2, Arc::clone(&value2)); + let key1 = TableScopedPath { + table: table_ref.clone(), + path: path1, + }; + let key2 = TableScopedPath { + table: table_ref, + path: path2, + }; + cache.put(&key1, Arc::clone(&value1)); + cache.put(&key2, Arc::clone(&value2)); assert_eq!(cache.len(), 2); // List cache entries @@ -546,7 +584,7 @@ mod tests { cache.list_entries(), HashMap::from([ ( - path1.clone(), + key1.clone(), ListFilesEntry { metas: value1, size_bytes: size1, @@ -554,7 +592,7 @@ mod tests { } ), ( - path2.clone(), + key2.clone(), ListFilesEntry { metas: value2, size_bytes: size2, @@ -567,8 +605,8 @@ mod tests { // Clear all entries cache.clear(); assert_eq!(cache.len(), 0); - assert!(!cache.contains_key(&path1)); - assert!(!cache.contains_key(&path2)); + assert!(!cache.contains_key(&key1)); + assert!(!cache.contains_key(&key2)); } #[test] @@ -580,24 +618,42 @@ mod tests { // Set cache limit to exactly fit all three entries let cache = DefaultListFilesCache::new(size * 3, None); + let table_ref = Some(TableReference::from("table")); + let key1 = TableScopedPath { + table: table_ref.clone(), + path: path1, + }; + let key2 = TableScopedPath { + table: table_ref.clone(), + path: path2, + }; + let key3 = TableScopedPath { + table: table_ref.clone(), + path: path3, + }; + // All three entries should fit - cache.put(&path1, value1); - cache.put(&path2, value2); - cache.put(&path3, value3); + cache.put(&key1, value1); + cache.put(&key2, value2); + cache.put(&key3, value3); assert_eq!(cache.len(), 3); - assert!(cache.contains_key(&path1)); - assert!(cache.contains_key(&path2)); - assert!(cache.contains_key(&path3)); + assert!(cache.contains_key(&key1)); + assert!(cache.contains_key(&key2)); + assert!(cache.contains_key(&key3)); // Adding a new entry should evict path1 (LRU) let (path4, value4, _) = create_test_list_files_entry("path4", 1, 100); - cache.put(&path4, value4); + let key4 = TableScopedPath { + table: table_ref, + path: path4, + }; + cache.put(&key4, value4); assert_eq!(cache.len(), 3); - assert!(!cache.contains_key(&path1)); // Evicted - assert!(cache.contains_key(&path2)); - assert!(cache.contains_key(&path3)); - assert!(cache.contains_key(&path4)); + assert!(!cache.contains_key(&key1)); // Evicted + assert!(cache.contains_key(&key2)); + assert!(cache.contains_key(&key3)); + assert!(cache.contains_key(&key4)); } #[test] @@ -609,24 +665,42 @@ mod tests { // Set cache limit to fit exactly three entries let cache = DefaultListFilesCache::new(size * 3, None); - cache.put(&path1, value1); - cache.put(&path2, value2); - cache.put(&path3, value3); + let table_ref = Some(TableReference::from("table")); + let key1 = TableScopedPath { + table: table_ref.clone(), + path: path1, + }; + let key2 = TableScopedPath { + table: table_ref.clone(), + path: path2, + }; + let key3 = TableScopedPath { + table: table_ref.clone(), + path: path3, + }; + + cache.put(&key1, value1); + cache.put(&key2, value2); + cache.put(&key3, value3); assert_eq!(cache.len(), 3); // Access path1 to move it to front (MRU) // Order is now: path2 (LRU), path3, path1 (MRU) - cache.get(&path1); + cache.get(&key1); // Adding a new entry should evict path2 (the LRU) let (path4, value4, _) = create_test_list_files_entry("path4", 1, 100); - cache.put(&path4, value4); + let key4 = TableScopedPath { + table: table_ref, + path: path4, + }; + cache.put(&key4, value4); assert_eq!(cache.len(), 3); - assert!(cache.contains_key(&path1)); // Still present (recently accessed) - assert!(!cache.contains_key(&path2)); // Evicted (was LRU) - assert!(cache.contains_key(&path3)); - assert!(cache.contains_key(&path4)); + assert!(cache.contains_key(&key1)); // Still present (recently accessed) + assert!(!cache.contains_key(&key2)); // Evicted (was LRU) + assert!(cache.contains_key(&key3)); + assert!(cache.contains_key(&key4)); } #[test] @@ -637,19 +711,32 @@ mod tests { // Set cache limit to fit both entries let cache = DefaultListFilesCache::new(size * 2, None); - cache.put(&path1, value1); - cache.put(&path2, value2); + let table_ref = Some(TableReference::from("table")); + let key1 = TableScopedPath { + table: table_ref.clone(), + path: path1, + }; + let key2 = TableScopedPath { + table: table_ref.clone(), + path: path2, + }; + cache.put(&key1, value1); + cache.put(&key2, value2); assert_eq!(cache.len(), 2); // Try to add an entry that's too large to fit in the cache let (path_large, value_large, _) = create_test_list_files_entry("large", 1, 1000); - cache.put(&path_large, value_large); + let key_large = TableScopedPath { + table: table_ref, + path: path_large, + }; + cache.put(&key_large, value_large); // Large entry should not be added - assert!(!cache.contains_key(&path_large)); + assert!(!cache.contains_key(&key_large)); assert_eq!(cache.len(), 2); - assert!(cache.contains_key(&path1)); - assert!(cache.contains_key(&path2)); + assert!(cache.contains_key(&key1)); + assert!(cache.contains_key(&key2)); } #[test] @@ -661,21 +748,38 @@ mod tests { // Set cache limit for exactly 3 entries let cache = DefaultListFilesCache::new(size * 3, None); - cache.put(&path1, value1); - cache.put(&path2, value2); - cache.put(&path3, value3); + let table_ref = Some(TableReference::from("table")); + let key1 = TableScopedPath { + table: table_ref.clone(), + path: path1, + }; + let key2 = TableScopedPath { + table: table_ref.clone(), + path: path2, + }; + let key3 = TableScopedPath { + table: table_ref.clone(), + path: path3, + }; + cache.put(&key1, value1); + cache.put(&key2, value2); + cache.put(&key3, value3); assert_eq!(cache.len(), 3); // Add a large entry that requires evicting 2 entries let (path_large, value_large, _) = create_test_list_files_entry("large", 1, 200); - cache.put(&path_large, value_large); + let key_large = TableScopedPath { + table: table_ref, + path: path_large, + }; + cache.put(&key_large, value_large); // path1 and path2 should be evicted (both LRU), path3 and path_large remain assert_eq!(cache.len(), 2); - assert!(!cache.contains_key(&path1)); // Evicted - assert!(!cache.contains_key(&path2)); // Evicted - assert!(cache.contains_key(&path3)); - assert!(cache.contains_key(&path_large)); + assert!(!cache.contains_key(&key1)); // Evicted + assert!(!cache.contains_key(&key2)); // Evicted + assert!(cache.contains_key(&key3)); + assert!(cache.contains_key(&key_large)); } #[test] @@ -686,10 +790,23 @@ mod tests { let cache = DefaultListFilesCache::new(size * 3, None); + let table_ref = Some(TableReference::from("table")); + let key1 = TableScopedPath { + table: table_ref.clone(), + path: path1, + }; + let key2 = TableScopedPath { + table: table_ref.clone(), + path: path2, + }; + let key3 = TableScopedPath { + table: table_ref, + path: path3, + }; // Add three entries - cache.put(&path1, value1); - cache.put(&path2, value2); - cache.put(&path3, value3); + cache.put(&key1, value1); + cache.put(&key2, value2); + cache.put(&key3, value3); assert_eq!(cache.len(), 3); // Resize cache to only fit one entry @@ -697,10 +814,10 @@ mod tests { // Should keep only the most recent entry (path3, the MRU) assert_eq!(cache.len(), 1); - assert!(cache.contains_key(&path3)); + assert!(cache.contains_key(&key3)); // Earlier entries (LRU) should be evicted - assert!(!cache.contains_key(&path1)); - assert!(!cache.contains_key(&path2)); + assert!(!cache.contains_key(&key1)); + assert!(!cache.contains_key(&key2)); } #[test] @@ -711,34 +828,49 @@ mod tests { let cache = DefaultListFilesCache::new(size * 3, None); + let table_ref = Some(TableReference::from("table")); + let key1 = TableScopedPath { + table: table_ref.clone(), + path: path1, + }; + let key2 = TableScopedPath { + table: table_ref.clone(), + path: path2, + }; + let key3 = TableScopedPath { + table: table_ref, + path: path3, + }; // Add three entries - cache.put(&path1, value1); - cache.put(&path2, Arc::clone(&value2)); - cache.put(&path3, value3_v1); + cache.put(&key1, value1); + cache.put(&key2, Arc::clone(&value2)); + cache.put(&key3, value3_v1); assert_eq!(cache.len(), 3); // Update path3 with same size - should not cause eviction let (_, value3_v2, _) = create_test_list_files_entry("path3", 1, 100); - cache.put(&path3, value3_v2); + cache.put(&key3, value3_v2); assert_eq!(cache.len(), 3); - assert!(cache.contains_key(&path1)); - assert!(cache.contains_key(&path2)); - assert!(cache.contains_key(&path3)); + assert!(cache.contains_key(&key1)); + assert!(cache.contains_key(&key2)); + assert!(cache.contains_key(&key3)); // Update path3 with larger size that requires evicting path1 (LRU) let (_, value3_v3, size3_v3) = create_test_list_files_entry("path3", 1, 200); - cache.put(&path3, Arc::clone(&value3_v3)); + cache.put(&key3, Arc::clone(&value3_v3)); assert_eq!(cache.len(), 2); - assert!(!cache.contains_key(&path1)); + assert!(!cache.contains_key(&key1)); // Evicted (was LRU) + assert!(cache.contains_key(&key2)); + assert!(cache.contains_key(&key3)); // List cache entries assert_eq!( cache.list_entries(), HashMap::from([ ( - path2, + key2, ListFilesEntry { metas: value2, size_bytes: size2, @@ -746,7 +878,7 @@ mod tests { } ), ( - path3, + key3, ListFilesEntry { metas: value3_v3, size_bytes: size3_v3, @@ -768,18 +900,27 @@ mod tests { let (path1, value1, size1) = create_test_list_files_entry("path1", 2, 50); let (path2, value2, size2) = create_test_list_files_entry("path2", 2, 50); - cache.put(&path1, Arc::clone(&value1)); - cache.put(&path2, Arc::clone(&value2)); + let table_ref = Some(TableReference::from("table")); + let key1 = TableScopedPath { + table: table_ref.clone(), + path: path1, + }; + let key2 = TableScopedPath { + table: table_ref, + path: path2, + }; + cache.put(&key1, Arc::clone(&value1)); + cache.put(&key2, Arc::clone(&value2)); // Entries should be accessible immediately - assert!(cache.get(&path1).is_some()); - assert!(cache.get(&path2).is_some()); + assert!(cache.get(&key1).is_some()); + assert!(cache.get(&key2).is_some()); // List cache entries assert_eq!( cache.list_entries(), HashMap::from([ ( - path1.clone(), + key1.clone(), ListFilesEntry { metas: value1, size_bytes: size1, @@ -787,7 +928,7 @@ mod tests { } ), ( - path2.clone(), + key2.clone(), ListFilesEntry { metas: value2, size_bytes: size2, @@ -800,9 +941,9 @@ mod tests { mock_time.inc(Duration::from_millis(150)); // Entries should now return None and be removed when observed through get or contains_key - assert!(cache.get(&path1).is_none()); + assert!(cache.get(&key1).is_none()); assert_eq!(cache.len(), 1); // path1 was removed by get() - assert!(!cache.contains_key(&path2)); + assert!(!cache.contains_key(&key2)); assert_eq!(cache.len(), 0); // path2 was removed by contains_key() } @@ -818,21 +959,34 @@ mod tests { let (path2, value2, _) = create_test_list_files_entry("path2", 1, 400); let (path3, value3, _) = create_test_list_files_entry("path3", 1, 400); - cache.put(&path1, value1); + let table_ref = Some(TableReference::from("table")); + let key1 = TableScopedPath { + table: table_ref.clone(), + path: path1, + }; + let key2 = TableScopedPath { + table: table_ref.clone(), + path: path2, + }; + let key3 = TableScopedPath { + table: table_ref, + path: path3, + }; + cache.put(&key1, value1); mock_time.inc(Duration::from_millis(50)); - cache.put(&path2, value2); + cache.put(&key2, value2); mock_time.inc(Duration::from_millis(50)); // path3 should evict path1 due to size limit - cache.put(&path3, value3); - assert!(!cache.contains_key(&path1)); // Evicted by LRU - assert!(cache.contains_key(&path2)); - assert!(cache.contains_key(&path3)); + cache.put(&key3, value3); + assert!(!cache.contains_key(&key1)); // Evicted by LRU + assert!(cache.contains_key(&key2)); + assert!(cache.contains_key(&key3)); mock_time.inc(Duration::from_millis(151)); - assert!(!cache.contains_key(&path2)); // Expired - assert!(cache.contains_key(&path3)); // Still valid + assert!(!cache.contains_key(&key2)); // Expired + assert!(cache.contains_key(&key3)); // Still valid } #[test] @@ -918,7 +1072,12 @@ mod tests { // Add entry and verify memory tracking let (path1, value1, size1) = create_test_list_files_entry("path1", 1, 100); - cache.put(&path1, value1); + let table_ref = Some(TableReference::from("table")); + let key1 = TableScopedPath { + table: table_ref.clone(), + path: path1, + }; + cache.put(&key1, value1); { let state = cache.state.lock().unwrap(); assert_eq!(state.memory_used, size1); @@ -926,14 +1085,18 @@ mod tests { // Add another entry let (path2, value2, size2) = create_test_list_files_entry("path2", 1, 200); - cache.put(&path2, value2); + let key2 = TableScopedPath { + table: table_ref.clone(), + path: path2, + }; + cache.put(&key2, value2); { let state = cache.state.lock().unwrap(); assert_eq!(state.memory_used, size1 + size2); } // Remove first entry and verify memory decreases - cache.remove(&path1); + cache.remove(&key1); { let state = cache.state.lock().unwrap(); assert_eq!(state.memory_used, size2); @@ -977,12 +1140,17 @@ mod tests { ]); // Cache the full table listing - cache.put(&table_base, files); + let table_ref = Some(TableReference::from("table")); + let key = TableScopedPath { + table: table_ref, + path: table_base, + }; + cache.put(&key, files); // Query for partition a=1 using get_with_extra // New API: get_with_extra(table_base, Some(relative_prefix)) let prefix_a1 = Some(Path::from("a=1")); - let result = cache.get_with_extra(&table_base, &prefix_a1); + let result = cache.get_with_extra(&key, &prefix_a1); // Should return filtered results (only files from a=1) assert!(result.is_some()); @@ -996,7 +1164,7 @@ mod tests { // Query for partition a=2 let prefix_a2 = Some(Path::from("a=2")); - let result_2 = cache.get_with_extra(&table_base, &prefix_a2); + let result_2 = cache.get_with_extra(&key, &prefix_a2); assert!(result_2.is_some()); let filtered_2 = result_2.unwrap(); @@ -1022,16 +1190,21 @@ mod tests { create_object_meta_with_path("my_table/a=2/file3.parquet"), create_object_meta_with_path("my_table/a=2/file4.parquet"), ]); - cache.put(&table_base, full_files); + let table_ref = Some(TableReference::from("table")); + let key = TableScopedPath { + table: table_ref, + path: table_base, + }; + cache.put(&key, full_files); // Query with no prefix filter (None) should return all 4 files - let result = cache.get_with_extra(&table_base, &None); + let result = cache.get_with_extra(&key, &None); assert!(result.is_some()); let files = result.unwrap(); assert_eq!(files.len(), 4); // Also test using get() which delegates to get_with_extra(&None) - let result_get = cache.get(&table_base); + let result_get = cache.get(&key); assert!(result_get.is_some()); assert_eq!(result_get.unwrap().len(), 4); } @@ -1042,14 +1215,19 @@ mod tests { let cache = DefaultListFilesCache::new(100000, None); let table_base = Path::from("my_table"); + let table_ref = Some(TableReference::from("table")); + let key = TableScopedPath { + table: table_ref, + path: table_base, + }; // Query for full table should miss (nothing cached) - let result = cache.get_with_extra(&table_base, &None); + let result = cache.get_with_extra(&key, &None); assert!(result.is_none()); // Query with prefix should also miss let prefix = Some(Path::from("a=1")); - let result_2 = cache.get_with_extra(&table_base, &prefix); + let result_2 = cache.get_with_extra(&key, &prefix); assert!(result_2.is_none()); } @@ -1063,11 +1241,16 @@ mod tests { create_object_meta_with_path("my_table/a=1/file1.parquet"), create_object_meta_with_path("my_table/a=2/file2.parquet"), ]); - cache.put(&table_base, files); + let table_ref = Some(TableReference::from("table")); + let key = TableScopedPath { + table: table_ref, + path: table_base, + }; + cache.put(&key, files); // Query for partition a=3 which doesn't exist let prefix_a3 = Some(Path::from("a=3")); - let result = cache.get_with_extra(&table_base, &prefix_a3); + let result = cache.get_with_extra(&key, &prefix_a3); // Should return None since no files match assert!(result.is_none()); @@ -1093,23 +1276,28 @@ mod tests { "events/year=2025/month=01/day=01/file4.parquet", ), ]); - cache.put(&table_base, files); + let table_ref = Some(TableReference::from("table")); + let key = TableScopedPath { + table: table_ref, + path: table_base, + }; + cache.put(&key, files); // Query for year=2024/month=01 (should get 2 files) let prefix_month = Some(Path::from("year=2024/month=01")); - let result = cache.get_with_extra(&table_base, &prefix_month); + let result = cache.get_with_extra(&key, &prefix_month); assert!(result.is_some()); assert_eq!(result.unwrap().len(), 2); // Query for year=2024 (should get 3 files) let prefix_year = Some(Path::from("year=2024")); - let result_year = cache.get_with_extra(&table_base, &prefix_year); + let result_year = cache.get_with_extra(&key, &prefix_year); assert!(result_year.is_some()); assert_eq!(result_year.unwrap().len(), 3); // Query for specific day (should get 1 file) let prefix_day = Some(Path::from("year=2024/month=01/day=01")); - let result_day = cache.get_with_extra(&table_base, &prefix_day); + let result_day = cache.get_with_extra(&key, &prefix_day); assert!(result_day.is_some()); assert_eq!(result_day.unwrap().len(), 1); } @@ -1130,18 +1318,63 @@ mod tests { create_object_meta_with_path("table_b/part=2/file2.parquet"), ]); - cache.put(&table_a, files_a); - cache.put(&table_b, files_b); + let table_ref_a = Some(TableReference::from("table_a")); + let table_ref_b = Some(TableReference::from("table_b")); + let key_a = TableScopedPath { + table: table_ref_a, + path: table_a, + }; + let key_b = TableScopedPath { + table: table_ref_b, + path: table_b, + }; + cache.put(&key_a, files_a); + cache.put(&key_b, files_b); // Query table_a should only return table_a files - let result_a = cache.get(&table_a); + let result_a = cache.get(&key_a); assert!(result_a.is_some()); assert_eq!(result_a.unwrap().len(), 1); // Query table_b with prefix should only return matching table_b files let prefix = Some(Path::from("part=1")); - let result_b = cache.get_with_extra(&table_b, &prefix); + let result_b = cache.get_with_extra(&key_b, &prefix); assert!(result_b.is_some()); assert_eq!(result_b.unwrap().len(), 1); } + + #[test] + fn test_drop_table_entries() { + let cache = DefaultListFilesCache::default(); + + let (path1, value1, _) = create_test_list_files_entry("path1", 1, 100); + let (path2, value2, _) = create_test_list_files_entry("path2", 1, 100); + let (path3, value3, _) = create_test_list_files_entry("path3", 1, 100); + + let table_ref1 = Some(TableReference::from("table1")); + let key1 = TableScopedPath { + table: table_ref1.clone(), + path: path1, + }; + let key2 = TableScopedPath { + table: table_ref1.clone(), + path: path2, + }; + + let table_ref2 = Some(TableReference::from("table2")); + let key3 = TableScopedPath { + table: table_ref2.clone(), + path: path3, + }; + + cache.put(&key1, value1); + cache.put(&key2, value2); + cache.put(&key3, value3); + + cache.drop_table_entries(&table_ref1).unwrap(); + + assert!(!cache.contains_key(&key1)); + assert!(!cache.contains_key(&key2)); + assert!(cache.contains_key(&key3)); + } } diff --git a/datafusion/execution/src/cache/mod.rs b/datafusion/execution/src/cache/mod.rs index 8172069fdbabd..93b9f0520b2a3 100644 --- a/datafusion/execution/src/cache/mod.rs +++ b/datafusion/execution/src/cache/mod.rs @@ -24,6 +24,7 @@ mod list_files_cache; pub use file_metadata_cache::DefaultFilesMetadataCache; pub use list_files_cache::DefaultListFilesCache; +pub use list_files_cache::TableScopedPath; /// A trait that can be implemented to provide custom cache behavior for the caches managed by /// [`cache_manager::CacheManager`]. diff --git a/docs/source/user-guide/cli/functions.md b/docs/source/user-guide/cli/functions.md index 11f61297ac8df..ea353d5c8dcc8 100644 --- a/docs/source/user-guide/cli/functions.md +++ b/docs/source/user-guide/cli/functions.md @@ -172,41 +172,53 @@ The columns of the returned table are: ## `list_files_cache` -The `list_files_cache` function shows information about the `ListFilesCache` that is used by the [`ListingTable`] implementation in DataFusion. When creating a [`ListingTable`], DataFusion lists the files in the table's location and caches results in the `ListFilesCache`. Subsequent queries against the same table can reuse this cached information instead of re-listing the files. +The `list_files_cache` function shows information about the `ListFilesCache` that is used by the [`ListingTable`] implementation in DataFusion. When creating a [`ListingTable`], DataFusion lists the files in the table's location and caches results in the `ListFilesCache`. Subsequent queries against the same table can reuse this cached information instead of re-listing the files. Cache entries are scoped to tables. You can inspect the cache by querying the `list_files_cache` function. For example, ```sql -> select split_part(path, '/', -1) as folder, metadata_size_bytes, expires_in, unnest(metadata_list)['file_size_bytes'] as file_size_bytes, unnest(metadata_list)['e_tag'] as e_tag from list_files_cache(); -+----------+---------------------+-----------------------------------+-----------------+-------------------------------+ -| folder | metadata_size_bytes | expires_in | file_size_bytes | e_tag | -+----------+---------------------+-----------------------------------+-----------------+-------------------------------+ -| customer | 1592 | 0 days 0 hours 0 mins 18.488 secs | 1233969 | 7041136-643a7bfeeec9b-12d431 | -| customer | 1592 | 0 days 0 hours 0 mins 18.488 secs | 1234756 | 7041137-643a7bfeef2df-12d744 | -| customer | 1592 | 0 days 0 hours 0 mins 18.488 secs | 1232554 | 7041139-643a7bfeef86a-12ceaa | -| customer | 1592 | 0 days 0 hours 0 mins 18.488 secs | 1238676 | 704113a-643a7bfeef914-12e694 | -| customer | 1592 | 0 days 0 hours 0 mins 18.488 secs | 1232186 | 704113b-643a7bfeefb22-12cd3a | -| customer | 1592 | 0 days 0 hours 0 mins 18.488 secs | 1237506 | 7041138-643a7bfeef775-12e202 | -| customer | 1592 | 0 days 0 hours 0 mins 18.488 secs | 1228756 | 7041134-643a7bfeec2d8-12bfd4 | -| customer | 1592 | 0 days 0 hours 0 mins 18.488 secs | 1228509 | 7041135-643a7bfeed599-12bedd | -| lineitem | 1600 | 0 days 0 hours 0 mins 16.758 secs | 20124715 | 704114a-643a7c00bb560-133142b | -| lineitem | 1600 | 0 days 0 hours 0 mins 16.758 secs | 20131024 | 7041149-643a7c00b90b7-1332cd0 | -| lineitem | 1600 | 0 days 0 hours 0 mins 16.758 secs | 20179217 | 704114b-643a7c00bb93e-133e911 | -| lineitem | 1600 | 0 days 0 hours 0 mins 16.758 secs | 20296819 | 704114f-643a7c00ccefd-135b473 | -| lineitem | 1600 | 0 days 0 hours 0 mins 16.758 secs | 20110730 | 7041148-643a7c00b9832-132dd8a | -| lineitem | 1600 | 0 days 0 hours 0 mins 16.758 secs | 20128346 | 704114c-643a7c00bc00a-133225a | -| lineitem | 1600 | 0 days 0 hours 0 mins 16.758 secs | 20130133 | 7041147-643a7c00b3901-1332955 | -| lineitem | 1600 | 0 days 0 hours 0 mins 16.758 secs | 20139830 | 7041146-643a7c00abbe8-1334f36 | -+----------+---------------------+-----------------------------------+-----------------+-------------------------------+ +> set datafusion.runtime.list_files_cache_ttl = "30s"; +> create external table overturemaps +stored as parquet +location 's3://overturemaps-us-west-2/release/2025-12-17.0/theme=base/type=infrastructure'; +0 row(s) fetched. +> select table, path, metadata_size_bytes, expires_in, unnest(metadata_list)['file_size_bytes'] as file_size_bytes, unnest(metadata_list)['e_tag'] as e_tag from list_files_cache() limit 10; ++--------------+-----------------------------------------------------+---------------------+-----------------------------------+-----------------+---------------------------------------+ +| table | path | metadata_size_bytes | expires_in | file_size_bytes | e_tag | ++--------------+-----------------------------------------------------+---------------------+-----------------------------------+-----------------+---------------------------------------+ +| overturemaps | release/2025-12-17.0/theme=base/type=infrastructure | 2750 | 0 days 0 hours 0 mins 25.264 secs | 999055952 | "35fc8fbe8400960b54c66fbb408c48e8-60" | +| overturemaps | release/2025-12-17.0/theme=base/type=infrastructure | 2750 | 0 days 0 hours 0 mins 25.264 secs | 975592768 | "8a16e10b722681cdc00242564b502965-59" | +| overturemaps | release/2025-12-17.0/theme=base/type=infrastructure | 2750 | 0 days 0 hours 0 mins 25.264 secs | 1082925747 | "24cd13ddb5e0e438952d2499f5dabe06-65" | +| overturemaps | release/2025-12-17.0/theme=base/type=infrastructure | 2750 | 0 days 0 hours 0 mins 25.264 secs | 1008425557 | "37663e31c7c64d4ef355882bcd47e361-61" | +| overturemaps | release/2025-12-17.0/theme=base/type=infrastructure | 2750 | 0 days 0 hours 0 mins 25.264 secs | 1065561905 | "4e7c50d2d1b3c5ed7b82b4898f5ac332-64" | +| overturemaps | release/2025-12-17.0/theme=base/type=infrastructure | 2750 | 0 days 0 hours 0 mins 25.264 secs | 1045655427 | "8fff7e6a72d375eba668727c55d4f103-63" | +| overturemaps | release/2025-12-17.0/theme=base/type=infrastructure | 2750 | 0 days 0 hours 0 mins 25.264 secs | 1086822683 | "b67167d8022d778936c330a52a5f1922-65" | +| overturemaps | release/2025-12-17.0/theme=base/type=infrastructure | 2750 | 0 days 0 hours 0 mins 25.264 secs | 1016732378 | "6d70857a0473ed9ed3fc6e149814168b-61" | +| overturemaps | release/2025-12-17.0/theme=base/type=infrastructure | 2750 | 0 days 0 hours 0 mins 25.264 secs | 991363784 | "c9cafb42fcbb413f851691c895dd7c2b-60" | +| overturemaps | release/2025-12-17.0/theme=base/type=infrastructure | 2750 | 0 days 0 hours 0 mins 25.264 secs | 1032469715 | "7540252d0d67158297a67038a3365e0f-62" | ++--------------+-----------------------------------------------------+---------------------+-----------------------------------+-----------------+---------------------------------------+ ``` The columns of the returned table are: | column_name | data_type | Description | | ------------------- | ------------ | ----------------------------------------------------------------------------------------- | +| table | Utf8 | Name of the table | | path | Utf8 | File path relative to the object store / filesystem root | | metadata_size_bytes | UInt64 | Size of the cached metadata in memory (not its thrift encoded form) | | expires_in | Duration(ms) | Last modified time of the file | | metadata_list | List(Struct) | List of metadatas, one for each file under the path. | +A metadata struct in the metadata_list contains the following fields: + +```text +{ + "file_path": "release/2025-12-17.0/theme=base/type=infrastructure/part-00000-d556e455-e0c5-4940-b367-daff3287a952-c000.zstd.parquet", + "file_modified": "2025-12-17T22:20:29", + "file_size_bytes": 999055952, + "e_tag": "35fc8fbe8400960b54c66fbb408c48e8-60", + "version": null +} +``` + [`listingtable`]: https://docs.rs/datafusion/latest/datafusion/datasource/listing/struct.ListingTable.html [entity tag]: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/ETag