-
Notifications
You must be signed in to change notification settings - Fork 1.9k
infer parquet file order from metadata, write order in ParquetSink #19433
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
@zhuqi-lucas you may be intersted in this |
|
Great work @adriangb, i will review it next week ! |
| // Check that all orderings are identical | ||
| let first = all_orderings[0]; | ||
| for ordering in &all_orderings[1..] { | ||
| if *ordering != first { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to support more relax cases, for example to use similar to:
| pub fn ordering_satisfy( |
| SELECT 1372708800 + value AS t | ||
| FROM generate_series(0, 99999) | ||
| ORDER BY t | ||
| ORDER BY t + 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this changes to t+1?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because it was picking up the ordering and causing even less data to be read 😆
|
|
||
| statement ok | ||
| DROP TABLE t; | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems only test ASC NULLS FIRST. Better to add test cases for:
- DESC ordering
- Multi-column ordering
- Files with no ordering metadata
| } | ||
|
|
||
| /// Checks if `other` is a prefix of this `LexOrdering`. | ||
| pub fn is_prefix(&self, other: &LexOrdering) -> bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meet some similar topic for the sort pushdown, and submitted a PR, i prefer to use eq_properties to do similar things, it will handle more cases after my try in my internal project.
The ordering_satisfy will handle more cases than prefix.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I gave it a review - looks nice! I can sequence this behind your PR if needed.
|
Great work @adriangb i left some initial review comments, i will review again for more details. |
8fb624f to
1a52a45
Compare
|
@zhuqi-lucas ping to take a look at this when you get a chance. Thanks! |
zhuqi-lucas
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thank you @adriangb for the great work, left some comments and question.
| if let Some(ordering) = &file.ordering { | ||
| all_orderings.push(ordering); | ||
| } else { | ||
| // If any file has no ordering, we can't derive a common ordering |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We may add some debug or tracing log here:
| // If any file has no ordering, we can't derive a common ordering | |
| // If any file has no ordering, we can't derive a common ordering | |
| tracing!( | |
| "Cannot derive common ordering: file {} has different ordering ({:?}) than first file ({:?})", | |
| file.object_meta.location, ordering, first | |
| ); |
| CacheAccessor<Path, Arc<Statistics>, Extra = ObjectMeta> | ||
| { | ||
| /// Retrieves the information about the entries currently cached. | ||
| fn list_entries(&self) -> HashMap<Path, FileStatisticsCacheEntry>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it useful that we can add the has_ordering info to the FileStatisticsCacheEntry?
FileStatisticsCacheEntry {
object_meta: object_meta.clone(),
num_rows: stats.num_rows,
num_columns: stats.column_statistics.len(),
table_size_bytes: stats.total_byte_size,
statistics_size_bytes: 0,
has_ordering, // NEW: indicates if ordering is cached
},
| // Check that all orderings are identical | ||
| let first = all_orderings[0]; | ||
| for ordering in &all_orderings[1..] { | ||
| if *ordering != first { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to use ordering_satisfy?
| /// Metadata fetched from a file, including statistics and ordering. | ||
| /// | ||
| /// This struct is returned by [`FileFormat::infer_stats_and_ordering`] to | ||
| /// provide all metadata in a single read, avoiding duplicate I/O operations. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| /// Metadata fetched from a file, including statistics and ordering. | |
| /// | |
| /// This struct is returned by [`FileFormat::infer_stats_and_ordering`] to | |
| /// provide all metadata in a single read, avoiding duplicate I/O operations. | |
| /// Metadata fetched from a file, including statistics and ordering. | |
| /// | |
| /// This struct is returned by [`FileFormat::infer_stats_and_ordering`] to | |
| /// provide all metadata in a single read, avoiding duplicate I/O operations. | |
| /// | |
| /// Note: Individual components (statistics and ordering) are typically cached | |
| /// separately by [`FileStatisticsCache`] implementations to enable partial cache hits. |
| // may not be compatible with Parquet sorting columns (e.g. ordering on `random()`). | ||
| // So if we cannot create a Parquet sorting column from the ordering requirement, | ||
| // we skip setting sorting columns on the Parquet sink. | ||
| lex_ordering_to_sorting_columns(&ordering).ok() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great point!
| /// Cached file orderings, keyed by file path. | ||
| /// Stored separately from statistics to maintain backwards compatibility | ||
| /// with the FileStatisticsCache trait interface. | ||
| orderings: DashMap<Path, (ObjectMeta, Option<LexOrdering>)>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible we just use one map:
pub struct DefaultFileStatisticsCache {
// Store both stats and ordering together
cache: DashMap<Path, CacheEntry>,
}
struct CacheEntry {
meta: ObjectMeta,
statistics: Arc<Statistics>,
ordering: Option<LexOrdering>, // Initially None until fetched
}| // [a ASC, b DESC] is NOT a prefix of [a ASC, b ASC, c ASC] (mismatch in middle) | ||
| assert!(!ordering_abc.is_prefix(&ordering_ab)); | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe add some tests for file modification:
#[tokio::test]
async fn test_ordering_cache_invalidation_on_file_modification() {
let cache = DefaultFileStatisticsCache::default();
let path = Path::from("test.parquet");
// Cache with original metadata
let meta_v1 = ObjectMeta {
location: path.clone(),
last_modified: Utc.timestamp_nanos(1000),
size: 100,
e_tag: None,
version: None,
};
let ordering_v1 = Some(LexOrdering::new(vec![...]).unwrap());
cache.put_ordering(&path, ordering_v1.clone(), &meta_v1);
// Verify cached
assert_eq!(cache.get_ordering(&path, &meta_v1), Some(ordering_v1.clone()));
// File modified (size changed)
let meta_v2 = ObjectMeta {
last_modified: Utc.timestamp_nanos(2000),
size: 200, // ← changed
..meta_v1.clone()
};
// Should return None (cache miss due to size mismatch)
assert_eq!(cache.get_ordering(&path, &meta_v2), None);
// Cache new version
let ordering_v2 = Some(LexOrdering::new(vec![...]).unwrap());
cache.put_ordering(&path, ordering_v2.clone(), &meta_v2);
// Old metadata should still be invalid
assert_eq!(cache.get_ordering(&path, &meta_v1), None);
// New metadata should work
assert_eq!(cache.get_ordering(&path, &meta_v2), Some(ordering_v2));
}| let column_idx = sorting_col.column_idx as usize; | ||
|
|
||
| // Get the column path from the Parquet schema | ||
| // The column_idx in SortingColumn refers to leaf columns |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add test to nest type, it also can be sorted?
I am not sure if it's possible?
| let ordering = self | ||
| .options | ||
| .format | ||
| .infer_ordering(ctx, store, Arc::clone(&self.file_schema), meta) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here we get the cached statistic, but we still need to compute the ordering info.
If it's possible we only have one entry for it, and cached it or not. And the cache including the computed ordering info.
|
FYI @adriangb, i will be out for next 3 days, feel free to merge. |
The idea here is to use the metadata in parquet files to infer sort orders, thus it is not required for users to specify it manually.
This should probably be split into multiple PRs:
WITH ORDERPartitionedFileconstruction