Skip to content

Commit 1789f02

Browse files
committed
make lfc table scoped
1 parent 8be0389 commit 1789f02

File tree

9 files changed

+272
-131
lines changed

9 files changed

+272
-131
lines changed

datafusion-cli/src/functions.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -771,6 +771,7 @@ impl TableFunctionImpl for ListFilesCacheFunc {
771771
Field::new("metadata", DataType::Struct(nested_fields.clone()), true);
772772

773773
let schema = Arc::new(Schema::new(vec![
774+
Field::new("table", DataType::Utf8, false),
774775
Field::new("path", DataType::Utf8, false),
775776
Field::new("metadata_size_bytes", DataType::UInt64, false),
776777
// expires field in ListFilesEntry has type Instant when set, from which we cannot get "the number of seconds", hence using Duration instead of Timestamp as data type.
@@ -785,7 +786,8 @@ impl TableFunctionImpl for ListFilesCacheFunc {
785786
true,
786787
),
787788
]));
788-
789+
790+
let mut table_arr = vec![];
789791
let mut path_arr = vec![];
790792
let mut metadata_size_bytes_arr = vec![];
791793
let mut expires_arr = vec![];
@@ -802,7 +804,8 @@ impl TableFunctionImpl for ListFilesCacheFunc {
802804
let mut current_offset: i32 = 0;
803805

804806
for (path, entry) in list_files_cache.list_entries() {
805-
path_arr.push(path.to_string());
807+
table_arr.push(path.0.map_or("NULL".to_string(), |t| t.to_string()));
808+
path_arr.push(path.1.to_string());
806809
metadata_size_bytes_arr.push(entry.size_bytes as u64);
807810
// calculates time left before entry expires
808811
expires_arr.push(
@@ -841,6 +844,7 @@ impl TableFunctionImpl for ListFilesCacheFunc {
841844
let batch = RecordBatch::try_new(
842845
schema.clone(),
843846
vec![
847+
Arc::new(StringArray::from(table_arr)),
844848
Arc::new(StringArray::from(path_arr)),
845849
Arc::new(UInt64Array::from(metadata_size_bytes_arr)),
846850
Arc::new(DurationMillisecondArray::from(expires_arr)),

datafusion/catalog-listing/src/table.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
3434
use datafusion_datasource::{
3535
ListingTableUrl, PartitionedFile, TableSchema, compute_all_files_statistics,
3636
};
37+
use datafusion_execution::cache::TableScopedPath;
3738
use datafusion_execution::cache::cache_manager::FileStatisticsCache;
3839
use datafusion_execution::cache::cache_unit::DefaultFileStatisticsCache;
3940
use datafusion_expr::dml::InsertOp;
@@ -565,7 +566,11 @@ impl TableProvider for ListingTable {
565566

566567
// Invalidate cache entries for this table if they exist
567568
if let Some(lfc) = state.runtime_env().cache_manager.get_list_files_cache() {
568-
let _ = lfc.remove(table_path.prefix());
569+
let key = TableScopedPath(
570+
table_path.get_table_ref().clone(),
571+
table_path.prefix().clone(),
572+
);
573+
let _ = lfc.remove(&key);
569574
}
570575

571576
// Sink related option, apart from format

datafusion/core/src/datasource/listing_table_factory.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,8 @@ impl TableProviderFactory for ListingTableFactory {
6363
))?
6464
.create(session_state, &cmd.options)?;
6565

66-
let mut table_path = ListingTableUrl::parse(&cmd.location)?;
66+
let mut table_path =
67+
ListingTableUrl::parse(&cmd.location)?.with_table_ref(cmd.name.clone());
6768
let file_extension = match table_path.is_collection() {
6869
// Setting the extension to be empty instead of allowing the default extension seems
6970
// odd, but was done to ensure existing behavior isn't modified. It seems like this

datafusion/core/src/execution/context/mod.rs

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ use datafusion_common::{
7474
tree_node::{TreeNodeRecursion, TreeNodeVisitor},
7575
};
7676
pub use datafusion_execution::TaskContext;
77+
use datafusion_execution::cache::TableScopedPath;
7778
use datafusion_execution::cache::cache_manager::{
7879
DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT, DEFAULT_LIST_FILES_CACHE_TTL,
7980
DEFAULT_METADATA_CACHE_LIMIT,
@@ -101,6 +102,7 @@ use datafusion_session::SessionStore;
101102

102103
use async_trait::async_trait;
103104
use chrono::{DateTime, Utc};
105+
use log::warn;
104106
use object_store::ObjectStore;
105107
use parking_lot::RwLock;
106108
use url::Url;
@@ -1315,7 +1317,7 @@ impl SessionContext {
13151317
let table = table_ref.table().to_owned();
13161318
let maybe_schema = {
13171319
let state = self.state.read();
1318-
let resolved = state.resolve_table_ref(table_ref);
1320+
let resolved = state.resolve_table_ref(table_ref.clone());
13191321
state
13201322
.catalog_list()
13211323
.catalog(&resolved.catalog)
@@ -1327,12 +1329,35 @@ impl SessionContext {
13271329
&& table_provider.table_type() == table_type
13281330
{
13291331
schema.deregister_table(&table)?;
1332+
if table_type == TableType::Base {
1333+
self.drop_list_files_cache_entries(&table_ref, &table_provider);
1334+
}
13301335
return Ok(true);
13311336
}
13321337

13331338
Ok(false)
13341339
}
13351340

1341+
fn drop_list_files_cache_entries(
1342+
&self,
1343+
table_ref: &TableReference,
1344+
table_provider: &Arc<dyn TableProvider>,
1345+
) {
1346+
if let Some(lfc) = self.runtime_env().cache_manager.get_list_files_cache()
1347+
&& let Some(listing_table) =
1348+
table_provider.as_any().downcast_ref::<ListingTable>()
1349+
{
1350+
for table_path in listing_table.table_paths() {
1351+
let key =
1352+
TableScopedPath(Some(table_ref.clone()), table_path.prefix().clone());
1353+
println!("removing entry for key {key:?}");
1354+
if lfc.remove(&key).is_none() {
1355+
warn!("list files cache for key {key:?} not found");
1356+
}
1357+
}
1358+
}
1359+
}
1360+
13361361
async fn create_function(&self, stmt: CreateFunction) -> Result<DataFrame> {
13371362
let function = {
13381363
let state = self.state.read().clone();

datafusion/core/tests/catalog_listing/pruned_partition_list.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
use std::sync::Arc;
1919

2020
use arrow_schema::DataType;
21+
use datafusion_sql::TableReference;
2122
use futures::{FutureExt, StreamExt as _, TryStreamExt as _};
2223
use object_store::{ObjectStore as _, memory::InMemory, path::Path};
2324

datafusion/datasource/src/url.rs

Lines changed: 45 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@
1717

1818
use std::sync::Arc;
1919

20-
use datafusion_common::{DataFusionError, Result};
20+
use datafusion_common::{DataFusionError, Result, TableReference};
21+
use datafusion_execution::cache::TableScopedPath;
2122
use datafusion_execution::object_store::ObjectStoreUrl;
2223
use datafusion_session::Session;
2324

@@ -41,6 +42,8 @@ pub struct ListingTableUrl {
4142
prefix: Path,
4243
/// An optional glob expression used to filter files
4344
glob: Option<Pattern>,
45+
46+
table_ref: Option<TableReference>,
4447
}
4548

4649
impl ListingTableUrl {
@@ -145,7 +148,12 @@ impl ListingTableUrl {
145148
/// to create a [`ListingTableUrl`].
146149
pub fn try_new(url: Url, glob: Option<Pattern>) -> Result<Self> {
147150
let prefix = Path::from_url_path(url.path())?;
148-
Ok(Self { url, prefix, glob })
151+
Ok(Self {
152+
url,
153+
prefix,
154+
glob,
155+
table_ref: None,
156+
})
149157
}
150158

151159
/// Returns the URL scheme
@@ -255,7 +263,14 @@ impl ListingTableUrl {
255263
};
256264

257265
let list: BoxStream<'a, Result<ObjectMeta>> = if self.is_collection() {
258-
list_with_cache(ctx, store, &self.prefix, prefix.as_ref()).await?
266+
list_with_cache(
267+
ctx,
268+
store,
269+
self.table_ref.as_ref(),
270+
&self.prefix,
271+
prefix.as_ref(),
272+
)
273+
.await?
259274
} else {
260275
match store.head(&full_prefix).await {
261276
Ok(meta) => futures::stream::once(async { Ok(meta) })
@@ -264,7 +279,14 @@ impl ListingTableUrl {
264279
// If the head command fails, it is likely that object doesn't exist.
265280
// Retry as though it were a prefix (aka a collection)
266281
Err(object_store::Error::NotFound { .. }) => {
267-
list_with_cache(ctx, store, &self.prefix, prefix.as_ref()).await?
282+
list_with_cache(
283+
ctx,
284+
store,
285+
self.table_ref.as_ref(),
286+
&self.prefix,
287+
prefix.as_ref(),
288+
)
289+
.await?
268290
}
269291
Err(e) => return Err(e.into()),
270292
}
@@ -323,6 +345,15 @@ impl ListingTableUrl {
323345
Pattern::new(glob).map_err(|e| DataFusionError::External(Box::new(e)))?;
324346
Self::try_new(self.url, Some(glob))
325347
}
348+
349+
pub fn with_table_ref(mut self, table_ref: TableReference) -> Self {
350+
self.table_ref = Some(table_ref);
351+
self
352+
}
353+
354+
pub fn get_table_ref(&self) -> &Option<TableReference> {
355+
&self.table_ref
356+
}
326357
}
327358

328359
/// Lists files with cache support, using prefix-aware lookups.
@@ -345,6 +376,7 @@ impl ListingTableUrl {
345376
async fn list_with_cache<'b>(
346377
ctx: &'b dyn Session,
347378
store: &'b dyn ObjectStore,
379+
table_ref: Option<&TableReference>,
348380
table_base_path: &Path,
349381
prefix: Option<&Path>,
350382
) -> Result<BoxStream<'b, Result<ObjectMeta>>> {
@@ -367,9 +399,16 @@ async fn list_with_cache<'b>(
367399
// Convert prefix to Option<Path> for cache lookup
368400
let prefix_filter = prefix.cloned();
369401

402+
println!("list files cache is set");
403+
404+
let table_scoped_base_path =
405+
TableScopedPath(table_ref.cloned(), table_base_path.clone());
406+
407+
println!("table scoped path {table_scoped_base_path:?}");
408+
370409
// Try cache lookup with optional prefix filter
371410
let vec = if let Some(res) =
372-
cache.get_with_extra(table_base_path, &prefix_filter)
411+
cache.get_with_extra(&table_scoped_base_path, &prefix_filter)
373412
{
374413
debug!("Hit list files cache");
375414
res.as_ref().clone()
@@ -380,7 +419,7 @@ async fn list_with_cache<'b>(
380419
.list(Some(table_base_path))
381420
.try_collect::<Vec<ObjectMeta>>()
382421
.await?;
383-
cache.put(table_base_path, Arc::new(vec.clone()));
422+
cache.put(&table_scoped_base_path, Arc::new(vec.clone()));
384423

385424
// If a prefix filter was requested, apply it to the results
386425
if prefix.is_some() {

datafusion/execution/src/cache/cache_manager.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
use crate::cache::cache_unit::DefaultFilesMetadataCache;
1919
use crate::cache::list_files_cache::ListFilesEntry;
20+
use crate::cache::list_files_cache::TableScopedPath;
2021
use crate::cache::{CacheAccessor, DefaultListFilesCache};
2122
use datafusion_common::stats::Precision;
2223
use datafusion_common::{Result, Statistics};
@@ -81,7 +82,7 @@ pub struct FileStatisticsCacheEntry {
8182
///
8283
/// See [`crate::runtime_env::RuntimeEnv`] for more details.
8384
pub trait ListFilesCache:
84-
CacheAccessor<Path, Arc<Vec<ObjectMeta>>, Extra = Option<Path>>
85+
CacheAccessor<TableScopedPath, Arc<Vec<ObjectMeta>>, Extra = Option<Path>>
8586
{
8687
/// Returns the cache's memory limit in bytes.
8788
fn cache_limit(&self) -> usize;
@@ -96,7 +97,7 @@ pub trait ListFilesCache:
9697
fn update_cache_ttl(&self, ttl: Option<Duration>);
9798

9899
/// Retrieves the information about the entries currently cached.
99-
fn list_entries(&self) -> HashMap<Path, ListFilesEntry>;
100+
fn list_entries(&self) -> HashMap<TableScopedPath, ListFilesEntry>;
100101
}
101102

102103
/// Generic file-embedded metadata used with [`FileMetadataCache`].

0 commit comments

Comments
 (0)