Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

53 changes: 32 additions & 21 deletions datafusion/catalog-listing/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -705,31 +705,42 @@ impl ListingTable {
store: &Arc<dyn ObjectStore>,
part_file: &PartitionedFile,
) -> datafusion_common::Result<Arc<Statistics>> {
match self
use datafusion_execution::cache::cache_manager::CachedFileMetadata;

// Check cache first
if let Some(cached) = self
.collected_statistics
.get_with_extra(&part_file.object_meta.location, &part_file.object_meta)
.get(&part_file.object_meta.location)
{
Some(statistics) => Ok(statistics),
None => {
let statistics = self
.options
.format
.infer_stats(
ctx,
store,
Arc::clone(&self.file_schema),
&part_file.object_meta,
)
.await?;
let statistics = Arc::new(statistics);
self.collected_statistics.put_with_extra(
&part_file.object_meta.location,
Arc::clone(&statistics),
&part_file.object_meta,
);
Ok(statistics)
// Validate that cached entry is still valid
if cached.is_valid_for(&part_file.object_meta) {
return Ok(cached.statistics);
}
}

// Cache miss or invalid - infer statistics
let statistics = self
.options
.format
.infer_stats(
ctx,
store,
Arc::clone(&self.file_schema),
&part_file.object_meta,
)
.await?;
let statistics = Arc::new(statistics);

// Store in cache
self.collected_statistics.put(
&part_file.object_meta.location,
CachedFileMetadata::new(
part_file.object_meta.clone(),
Arc::clone(&statistics),
None, // No ordering information in this PR
),
);
Ok(statistics)
}
}

Expand Down
31 changes: 16 additions & 15 deletions datafusion/datasource-parquet/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ use datafusion_common::stats::Precision;
use datafusion_common::{
ColumnStatistics, DataFusionError, Result, ScalarValue, Statistics,
};
use datafusion_execution::cache::cache_manager::{FileMetadata, FileMetadataCache};
use datafusion_execution::cache::cache_manager::{
CachedFileMetadataEntry, FileMetadata, FileMetadataCache,
};
use datafusion_functions_aggregate_common::min_max::{MaxAccumulator, MinAccumulator};
use datafusion_physical_plan::Accumulator;
use log::debug;
Expand Down Expand Up @@ -125,19 +127,15 @@ impl<'a> DFParquetMetadata<'a> {
!cfg!(feature = "parquet_encryption") || decryption_properties.is_none();

if cache_metadata
&& let Some(parquet_metadata) = file_metadata_cache
.as_ref()
.and_then(|file_metadata_cache| file_metadata_cache.get(object_meta))
.and_then(|file_metadata| {
file_metadata
.as_any()
.downcast_ref::<CachedParquetMetaData>()
.map(|cached_parquet_metadata| {
Arc::clone(cached_parquet_metadata.parquet_metadata())
})
})
&& let Some(file_metadata_cache) = file_metadata_cache.as_ref()
&& let Some(cached) = file_metadata_cache.get(&object_meta.location)
&& cached.is_valid_for(object_meta)
&& let Some(cached_parquet) = cached
.file_metadata
.as_any()
.downcast_ref::<CachedParquetMetaData>()
{
return Ok(parquet_metadata);
return Ok(Arc::clone(cached_parquet.parquet_metadata()));
}

let mut reader =
Expand All @@ -163,8 +161,11 @@ impl<'a> DFParquetMetadata<'a> {

if cache_metadata && let Some(file_metadata_cache) = file_metadata_cache {
file_metadata_cache.put(
object_meta,
Arc::new(CachedParquetMetaData::new(Arc::clone(&metadata))),
&object_meta.location,
CachedFileMetadataEntry::new(
(*object_meta).clone(),
Arc::new(CachedParquetMetaData::new(Arc::clone(&metadata))),
),
);
}

Expand Down
33 changes: 11 additions & 22 deletions datafusion/datasource/src/url.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,8 @@
// specific language governing permissions and limitations
// under the License.

use std::sync::Arc;

use datafusion_common::{DataFusionError, Result};
use datafusion_execution::cache::cache_manager::CachedFileList;
use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_session::Session;

Expand Down Expand Up @@ -364,35 +363,24 @@ async fn list_with_cache<'b>(
.map(|res| res.map_err(|e| DataFusionError::ObjectStore(Box::new(e))))
.boxed()),
Some(cache) => {
// Convert prefix to Option<Path> for cache lookup
let prefix_filter = prefix.cloned();
// Build the filter prefix (only Some if prefix was requested)
let filter_prefix = prefix.is_some().then(|| full_prefix.clone());

// Try cache lookup with optional prefix filter
let vec = if let Some(res) =
cache.get_with_extra(table_base_path, &prefix_filter)
{
// Try cache lookup
let vec = if let Some(cached) = cache.get(table_base_path) {
debug!("Hit list files cache");
res.as_ref().clone()
cached.filter_by_prefix(&filter_prefix)
} else {
// Cache miss - always list and cache the full table
// This ensures we have complete data for future prefix queries
let vec = store
.list(Some(table_base_path))
.try_collect::<Vec<ObjectMeta>>()
.await?;
cache.put(table_base_path, Arc::new(vec.clone()));

// If a prefix filter was requested, apply it to the results
if prefix.is_some() {
let full_prefix_str = full_prefix.as_ref();
vec.into_iter()
.filter(|meta| {
meta.location.as_ref().starts_with(full_prefix_str)
})
.collect()
} else {
vec
}
let cached = CachedFileList::new(vec);
let result = cached.filter_by_prefix(&filter_prefix);
cache.put(table_base_path, cached);
result
};
Ok(futures::stream::iter(vec.into_iter().map(Ok)).boxed())
}
Expand Down Expand Up @@ -494,6 +482,7 @@ mod tests {
use std::any::Any;
use std::collections::HashMap;
use std::ops::Range;
use std::sync::Arc;
use tempfile::tempdir;

#[test]
Expand Down
1 change: 1 addition & 0 deletions datafusion/execution/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ chrono = { workspace = true }
dashmap = { workspace = true }
datafusion-common = { workspace = true, default-features = false }
datafusion-expr = { workspace = true, default-features = false }
datafusion-physical-expr-common = { workspace = true, default-features = false }
futures = { workspace = true }
log = { workspace = true }
object_store = { workspace = true, features = ["fs"] }
Expand Down
Loading