19597: Refactor cache APIs to support ordering information#160
19597: Refactor cache APIs to support ordering information#160martin-augment wants to merge 4 commits intomainfrom
Conversation
Refactor the cache system to support storing both statistics and ordering information together, in preparation for ordering inference from Parquet metadata. Changes to cache_manager.rs: - Add `CachedFileMetadata` struct with `meta`, `statistics`, and `ordering` fields - Refactor `FileStatisticsCache` trait to use `CachedFileMetadata` and Path keys - Add `has_ordering` field to `FileStatisticsCacheEntry` - Add `CachedFileList` for list files cache - Refactor `FileMetadataCache` trait to use `CachedFileMetadataEntry` and Path keys Changes to cache implementations: - Update `DefaultFileStatisticsCache` to use new trait methods - Update `DefaultFilesMetadataCache` to use new trait methods - Simplify list files cache implementation Changes to callsites: - Update `ListingTable::do_collect_statistics` to use new cache API - Update `DFParquetMetadata::fetch_metadata` to use new cache API - Update `ListingTableUrl` to use new cache API 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <[email protected]>
Replace unused FileStatisticsCache import with CacheAccessor which provides the len() method used in tests. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <[email protected]>
Address review feedback to remove duplicated prefix filtering logic. Now both cache hit and cache miss paths use the filter_by_prefix helper. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <[email protected]>
|
Warning Rate limit exceeded@martin-augment has exceeded the limit for the number of commits that can be reviewed per hour. Please wait 4 minutes and 7 seconds before requesting another review. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. ⛔ Files ignored due to path filters (1)
📒 Files selected for processing (9)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
PR Review: Refactor cache APIs to support ordering informationSummaryThis PR refactors the cache API to embed validation metadata ( Architecture & Design✅ Strengths
🔍 Code Quality Issues1. Inconsistent Cache Key Usage (datafusion/datasource-parquet/src/metadata.rs:127-172)Issue: The code switches between using // Line 130: Uses object_meta.location as key
&& let Some(cached) = file_metadata_cache.get(&object_meta.location)
// Line 164: Uses object_meta.location as key again
file_metadata_cache.put(
&object_meta.location,
CachedFileMetadataEntry::new(Concern: While this appears consistent in the current code, the trait signature change from Recommendation: Add a comment explaining why 2. Potential Race Condition (datafusion/catalog-listing/src/table.rs:711-743)Issue: There's a TOCTOU (time-of-check to time-of-use) pattern: // Check cache first
if let Some(cached) = self.collected_statistics.get(&part_file.object_meta.location) {
// Validate that cached entry is still valid
if cached.is_valid_for(&part_file.object_meta) {
return Ok(cached.statistics);
}
}
// Cache miss or invalid - infer statistics
let statistics = self.options.format.infer_stats(...).await?;
// ...
self.collected_statistics.put(...)Risk: Between the Impact: Low - The Recommendation: Consider documenting this behavior or using a compute-if-absent pattern if performance becomes an issue. 3. Silent Failure on Cache Rejection (datafusion/execution/src/cache/file_metadata_cache.rs:71-73)// no point in trying to add this value to the cache if it cannot fit entirely
if value_size > self.memory_limit {
return None; // Silent failure
}Issue: When an entry is too large to cache, it silently returns Impact: Medium - This could lead to confusion during debugging when large files are never cached. Recommendation: Consider logging a debug message when entries are rejected due to size, especially for the file metadata cache where individual Parquet footers could exceed limits. 🐛 Potential Bugs4. Empty Vector Handling (datafusion/execution/src/cache/list_files_cache.rs:99-110)fn try_new(
cached_file_list: CachedFileList,
ttl: Option<Duration>,
now: Instant,
) -> Option<Self> {
let size_bytes = (cached_file_list.files.capacity() * size_of::<ObjectMeta>())
+ cached_file_list
.files
.iter()
.map(meta_heap_bytes)
.reduce(|acc, b| acc + b)?; // Returns None if empty!Bug: For an empty file list, Test Gap: The test at line 740 validates this behavior but doesn't document whether it's intentional. Impact: Medium - Empty partitions won't be cached, leading to repeated object store calls. Recommendation: .reduce(|acc, b| acc + b)
.unwrap_or(0); // Empty list has 0 heap bytes5. Missing Validation on Cached Updates (datafusion/datasource/src/url.rs:370-382)let vec = if let Some(cached) = cache.get(table_base_path) {
debug!("Hit list files cache");
cached.filter_by_prefix(&filter_prefix)
} else {
// Cache miss - always list and cache the full tableIssue: The code doesn't validate cached file lists against the current table state. If files are added/removed from the table, the cache won't know until TTL expires. Impact: Low to Medium - This is by design (noted in documentation), but worth calling out that stale data is possible. Note: The PR description mentions "19597: To review by AI" but doesn't link to an issue explaining the motivation for these changes. ⚡ Performance Considerations6. Clone Overhead in filter_by_prefix (datafusion/execution/src/cache/cache_manager.rs:130-140)pub fn filter_by_prefix(&self, prefix: &Option<Path>) -> Vec<ObjectMeta> {
match prefix {
Some(prefix) => self
.files
.iter()
.filter(|meta| meta.location.as_ref().starts_with(prefix.as_ref()))
.cloned() // Clones ObjectMeta for each matching file
.collect(),
None => self.files.as_ref().clone(), // Clones entire Vec
}
}Impact: When no prefix is provided, this clones the entire Recommendation: Consider returning pub fn iter_with_prefix(&self, prefix: &Option<Path>) -> impl Iterator<Item = &ObjectMeta> {
let files = self.files.clone(); // Clone Arc, not Vec
files.iter().filter(move |meta| {
prefix.as_ref().map_or(true, |p| meta.location.as_ref().starts_with(p.as_ref()))
})
}7. Memory Accounting for Ordering (datafusion/execution/src/cache/cache_unit.rs:91)statistics_size_bytes: 0, // TODO: set to the real size in the futureIssue: The Impact: Low - Ordering info is small relative to statistics, but memory limits could be exceeded. Recommendation: Implement proper size calculation for 🔒 Security ConsiderationsNo security issues identified. The changes don't introduce new attack vectors:
📝 DocumentationGood:
Needs Improvement:
🎯 Overall AssessmentQuality Score: 8/10 Strengths:
Weaknesses:
RecommendationsMust Fix:
Should Fix: Nice to Have: Verdict: Approve with minor fixes. The API improvements are solid and the test coverage is excellent. The empty vector bug should be fixed before merging, and the performance optimization for prefix filtering would be valuable for production use. |
🤖 Augment PR SummarySummary: Refactors DataFusion’s runtime cache APIs to carry file-ordering information alongside existing cached data, enabling future planning optimizations. Changes:
Technical Notes: Ordering is plumbed through the cache layer via 🤖 Was this summary useful? React with 👍 or 👎 |
19597: To review by AI