Skip to content

Commit bcc1db0

Browse files
committed
address feedback, refactor cache
1 parent 601811b commit bcc1db0

File tree

13 files changed

+1102
-830
lines changed

13 files changed

+1102
-830
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/catalog-listing/src/table.rs

Lines changed: 92 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,9 @@ use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
3434
use datafusion_datasource::{
3535
ListingTableUrl, PartitionedFile, TableSchema, compute_all_files_statistics,
3636
};
37-
use datafusion_execution::cache::cache_manager::FileStatisticsCache;
37+
use datafusion_execution::cache::cache_manager::{
38+
CachedFileMetadata, FileStatisticsCache,
39+
};
3840
use datafusion_execution::cache::cache_unit::DefaultFileStatisticsCache;
3941
use datafusion_expr::dml::InsertOp;
4042
use datafusion_expr::execution_props::ExecutionProps;
@@ -355,45 +357,85 @@ impl ListingTable {
355357
execution_props,
356358
);
357359
}
358-
359-
// Otherwise, try to derive from file orderings
360-
Ok(derive_common_ordering_from_files(file_groups))
360+
if let Some(ordering) = derive_common_ordering_from_files(file_groups) {
361+
return Ok(vec![ordering]);
362+
}
363+
Ok(vec![])
361364
}
362365
}
363366

364367
/// Derives a common ordering from file orderings across all file groups.
365368
///
366369
/// Returns the common ordering if all files have compatible orderings,
367-
/// otherwise returns an empty Vec (no ordering).
368-
fn derive_common_ordering_from_files(file_groups: &[FileGroup]) -> Vec<LexOrdering> {
369-
// Collect all file orderings
370-
let mut all_orderings: Vec<&LexOrdering> = Vec::new();
370+
/// otherwise returns None.
371+
///
372+
/// The function finds the longest common prefix among all file orderings.
373+
/// For example, if files have orderings `[a, b, c]` and `[a, b]`, the common
374+
/// ordering is `[a, b]`.
375+
fn derive_common_ordering_from_files(file_groups: &[FileGroup]) -> Option<LexOrdering> {
376+
enum CurrentOrderingState {
377+
/// Initial state before processing any files
378+
FirstFile,
379+
/// Some common ordering found so far
380+
SomeOrdering(LexOrdering),
381+
/// No files have ordering
382+
NoOrdering,
383+
}
384+
let mut state = CurrentOrderingState::FirstFile;
385+
386+
// Collect file orderings and track counts
371387
for group in file_groups {
372388
for file in group.iter() {
373-
if let Some(ordering) = &file.ordering {
374-
all_orderings.push(ordering);
375-
} else {
376-
// If any file has no ordering, we can't derive a common ordering
377-
return vec![];
378-
}
389+
state = match (&state, &file.ordering) {
390+
// If this is the first file with ordering, set it as current
391+
(CurrentOrderingState::FirstFile, Some(ordering)) => {
392+
CurrentOrderingState::SomeOrdering(ordering.clone())
393+
}
394+
(CurrentOrderingState::FirstFile, None) => {
395+
CurrentOrderingState::NoOrdering
396+
}
397+
// If we have an existing ordering, find common prefix with new ordering
398+
(CurrentOrderingState::SomeOrdering(current), Some(ordering)) => {
399+
// Find common prefix between current and new ordering
400+
let prefix_len = current
401+
.as_ref()
402+
.iter()
403+
.zip(ordering.as_ref().iter())
404+
.take_while(|(a, b)| a == b)
405+
.count();
406+
if prefix_len == 0 {
407+
log::trace!(
408+
"Cannot derive common ordering: no common prefix between orderings {current:?} and {ordering:?}"
409+
);
410+
return None;
411+
} else {
412+
let ordering =
413+
LexOrdering::new(current.as_ref()[..prefix_len].to_vec())
414+
.expect("prefix_len > 0, so ordering must be valid");
415+
CurrentOrderingState::SomeOrdering(ordering)
416+
}
417+
}
418+
// If one file has ordering and another doesn't, no common ordering
419+
// Return None and log a trace message explaining why
420+
(CurrentOrderingState::SomeOrdering(ordering), None)
421+
| (CurrentOrderingState::NoOrdering, Some(ordering)) => {
422+
log::trace!(
423+
"Cannot derive common ordering: some files have ordering {ordering:?}, others don't"
424+
);
425+
return None;
426+
}
427+
// Both have no ordering, remain in NoOrdering state
428+
(CurrentOrderingState::NoOrdering, None) => {
429+
CurrentOrderingState::NoOrdering
430+
}
431+
};
379432
}
380433
}
381434

382-
if all_orderings.is_empty() {
383-
return vec![];
384-
}
385-
386-
// Check that all orderings are identical
387-
let first = all_orderings[0];
388-
for ordering in &all_orderings[1..] {
389-
if *ordering != first {
390-
// Orderings don't match, can't derive a common ordering
391-
return vec![];
392-
}
435+
match state {
436+
CurrentOrderingState::SomeOrdering(ordering) => Some(ordering),
437+
_ => None,
393438
}
394-
395-
// All orderings match, return the common ordering
396-
vec![first.clone()]
397439
}
398440

399441
// Expressions can be used for partition pruning if they can be evaluated using
@@ -763,37 +805,29 @@ impl ListingTable {
763805
let path = &part_file.object_meta.location;
764806
let meta = &part_file.object_meta;
765807

766-
// Check if statistics are cached
767-
if let Some(statistics) = self.collected_statistics.get_with_extra(path, meta) {
768-
// Statistics cache hit - check if ordering is also cached
769-
if let Some(ordering) = self.collected_statistics.get_ordering(path, meta) {
770-
// Both cached - return without any file access
771-
return Ok((statistics, ordering));
772-
}
773-
774-
// Statistics cached but ordering not - infer ordering and cache it
775-
let ordering = self
776-
.options
777-
.format
778-
.infer_ordering(ctx, store, Arc::clone(&self.file_schema), meta)
779-
.await?;
780-
self.collected_statistics
781-
.put_ordering(path, ordering.clone(), meta);
782-
return Ok((statistics, ordering));
808+
// Check if statistics and ordering are cached and valid
809+
if let Some(cached) = self.collected_statistics.get(path)
810+
&& cached.is_valid_for(meta)
811+
{
812+
return Ok((cached.statistics.clone(), cached.ordering.clone()));
783813
}
784814

785-
// Cache miss: fetch both statistics and ordering in a single metadata read
815+
// Cache miss or invalid: fetch both statistics and ordering in a single metadata read
786816
let file_meta = self
787817
.options
788818
.format
789819
.infer_stats_and_ordering(ctx, store, Arc::clone(&self.file_schema), meta)
790820
.await?;
791821

792822
let statistics = Arc::new(file_meta.statistics);
793-
self.collected_statistics
794-
.put_with_extra(path, Arc::clone(&statistics), meta);
795-
self.collected_statistics
796-
.put_ordering(path, file_meta.ordering.clone(), meta);
823+
self.collected_statistics.put(
824+
path,
825+
CachedFileMetadata::new(
826+
meta.clone(),
827+
Arc::clone(&statistics),
828+
file_meta.ordering.clone(),
829+
),
830+
);
797831

798832
Ok((statistics, file_meta.ordering))
799833
}
@@ -924,8 +958,7 @@ mod tests {
924958

925959
let result = derive_common_ordering_from_files(&file_groups);
926960

927-
assert_eq!(result.len(), 1);
928-
assert_eq!(result[0], ordering);
961+
assert_eq!(result.unwrap().as_ref(), ordering.as_ref());
929962
}
930963

931964
#[test]
@@ -941,7 +974,7 @@ mod tests {
941974

942975
let result = derive_common_ordering_from_files(&file_groups);
943976

944-
assert!(result.is_empty());
977+
assert!(result.is_none());
945978
}
946979

947980
#[test]
@@ -956,7 +989,7 @@ mod tests {
956989

957990
let result = derive_common_ordering_from_files(&file_groups);
958991

959-
assert!(result.is_empty());
992+
assert!(result.is_none());
960993
}
961994

962995
#[test]
@@ -966,7 +999,7 @@ mod tests {
966999

9671000
let result = derive_common_ordering_from_files(&file_groups);
9681001

969-
assert!(result.is_empty());
1002+
assert!(result.is_none());
9701003
}
9711004

9721005
#[test]
@@ -976,7 +1009,7 @@ mod tests {
9761009

9771010
let result = derive_common_ordering_from_files(&file_groups);
9781011

979-
assert!(result.is_empty());
1012+
assert!(result.is_none());
9801013
}
9811014

9821015
#[test]
@@ -991,8 +1024,7 @@ mod tests {
9911024

9921025
let result = derive_common_ordering_from_files(&file_groups);
9931026

994-
assert_eq!(result.len(), 1);
995-
assert_eq!(result[0], ordering);
1027+
assert_eq!(result.unwrap().as_ref(), ordering.as_ref());
9961028
}
9971029

9981030
#[test]
@@ -1008,8 +1040,7 @@ mod tests {
10081040

10091041
let result = derive_common_ordering_from_files(&file_groups);
10101042

1011-
assert_eq!(result.len(), 1);
1012-
assert_eq!(result[0], ordering);
1043+
assert_eq!(result.unwrap().as_ref(), ordering.as_ref());
10131044
}
10141045

10151046
#[test]
@@ -1025,6 +1056,6 @@ mod tests {
10251056

10261057
let result = derive_common_ordering_from_files(&file_groups);
10271058

1028-
assert!(result.is_empty());
1059+
assert!(result.is_none());
10291060
}
10301061
}

datafusion/core/src/datasource/listing_table_factory.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -220,8 +220,9 @@ mod tests {
220220
datasource::file_format::csv::CsvFormat, execution::context::SessionContext,
221221
test_util::parquet_test_data,
222222
};
223-
use datafusion_execution::cache::CacheAccessor;
224-
use datafusion_execution::cache::cache_manager::CacheManagerConfig;
223+
use datafusion_execution::cache::cache_manager::{
224+
CacheManagerConfig, FileStatisticsCache,
225+
};
225226
use datafusion_execution::cache::cache_unit::DefaultFileStatisticsCache;
226227
use datafusion_execution::config::SessionConfig;
227228
use datafusion_execution::runtime_env::RuntimeEnvBuilder;

datafusion/datasource-parquet/src/metadata.rs

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,9 @@ use datafusion_common::stats::Precision;
3232
use datafusion_common::{
3333
ColumnStatistics, DataFusionError, Result, ScalarValue, Statistics,
3434
};
35-
use datafusion_execution::cache::cache_manager::{FileMetadata, FileMetadataCache};
35+
use datafusion_execution::cache::cache_manager::{
36+
CachedFileMetadataEntry, FileMetadata, FileMetadataCache,
37+
};
3638
use datafusion_functions_aggregate_common::min_max::{MaxAccumulator, MinAccumulator};
3739
use datafusion_physical_expr::expressions::Column;
3840
use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
@@ -131,9 +133,13 @@ impl<'a> DFParquetMetadata<'a> {
131133
if cache_metadata
132134
&& let Some(parquet_metadata) = file_metadata_cache
133135
.as_ref()
134-
.and_then(|file_metadata_cache| file_metadata_cache.get(object_meta))
135-
.and_then(|file_metadata| {
136-
file_metadata
136+
.and_then(|file_metadata_cache| {
137+
file_metadata_cache.get(&object_meta.location)
138+
})
139+
.filter(|cached| cached.is_valid_for(object_meta))
140+
.and_then(|cached| {
141+
cached
142+
.file_metadata
137143
.as_any()
138144
.downcast_ref::<CachedParquetMetaData>()
139145
.map(|cached_parquet_metadata| {
@@ -167,8 +173,11 @@ impl<'a> DFParquetMetadata<'a> {
167173

168174
if cache_metadata && let Some(file_metadata_cache) = file_metadata_cache {
169175
file_metadata_cache.put(
170-
object_meta,
171-
Arc::new(CachedParquetMetaData::new(Arc::clone(&metadata))),
176+
&object_meta.location,
177+
CachedFileMetadataEntry::new(
178+
(*object_meta).clone(),
179+
Arc::new(CachedParquetMetaData::new(Arc::clone(&metadata))),
180+
),
172181
);
173182
}
174183

datafusion/datasource/src/file_format.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,11 @@ pub const DEFAULT_SCHEMA_INFER_MAX_RECORD: usize = 1000;
4646
///
4747
/// This struct is returned by [`FileFormat::infer_stats_and_ordering`] to
4848
/// provide all metadata in a single read, avoiding duplicate I/O operations.
49+
///
50+
/// Note: Individual components (statistics and ordering) are typically cached
51+
/// separately by `FileStatisticsCache` implementations to enable partial cache
52+
/// hits. For example, statistics may be cached from a previous query while
53+
/// ordering is fetched fresh.
4954
#[derive(Debug, Clone)]
5055
#[non_exhaustive]
5156
pub struct FileMeta {

datafusion/datasource/src/url.rs

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,8 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use std::sync::Arc;
19-
2018
use datafusion_common::{DataFusionError, Result};
19+
use datafusion_execution::cache::cache_manager::CachedFileList;
2120
use datafusion_execution::object_store::ObjectStoreUrl;
2221
use datafusion_session::Session;
2322

@@ -364,23 +363,31 @@ async fn list_with_cache<'b>(
364363
.map(|res| res.map_err(|e| DataFusionError::ObjectStore(Box::new(e))))
365364
.boxed()),
366365
Some(cache) => {
367-
// Convert prefix to Option<Path> for cache lookup
368-
let prefix_filter = prefix.cloned();
369-
370-
// Try cache lookup with optional prefix filter
371-
let vec = if let Some(res) =
372-
cache.get_with_extra(table_base_path, &prefix_filter)
373-
{
366+
// Try cache lookup
367+
let vec = if let Some(cached) = cache.get(table_base_path) {
374368
debug!("Hit list files cache");
375-
res.as_ref().clone()
369+
// Cache hit - apply prefix filter if needed
370+
if prefix.is_some() {
371+
let full_prefix_str = full_prefix.as_ref();
372+
cached
373+
.files
374+
.iter()
375+
.filter(|meta| {
376+
meta.location.as_ref().starts_with(full_prefix_str)
377+
})
378+
.cloned()
379+
.collect()
380+
} else {
381+
cached.files.as_ref().clone()
382+
}
376383
} else {
377384
// Cache miss - always list and cache the full table
378385
// This ensures we have complete data for future prefix queries
379386
let vec = store
380387
.list(Some(table_base_path))
381388
.try_collect::<Vec<ObjectMeta>>()
382389
.await?;
383-
cache.put(table_base_path, Arc::new(vec.clone()));
390+
cache.put(table_base_path, CachedFileList::new(vec.clone()));
384391

385392
// If a prefix filter was requested, apply it to the results
386393
if prefix.is_some() {
@@ -494,6 +501,7 @@ mod tests {
494501
use std::any::Any;
495502
use std::collections::HashMap;
496503
use std::ops::Range;
504+
use std::sync::Arc;
497505
use tempfile::tempdir;
498506

499507
#[test]

datafusion/execution/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,3 +67,4 @@ url = { workspace = true }
6767

6868
[dev-dependencies]
6969
insta = { workspace = true }
70+
datafusion-physical-expr = { workspace = true, default-features = false }

0 commit comments

Comments
 (0)