Skip to content

Commit aeda804

Browse files
adriangbclaude
andcommitted
Refactor PartitionedFile: add ordering field and new_from_meta constructor
Add infrastructure to track ordering information per file in preparation for ordering inference from Parquet metadata. Changes: - Add `ordering: Option<LexOrdering>` field to `PartitionedFile` struct - Add `new_from_meta(ObjectMeta)` constructor for creating files from metadata - Add `with_ordering()` and `with_partition_values()` builder methods - Update all PartitionedFile constructors to initialize `ordering: None` - Update callsites in test_util, proto, and substrait to use new_from_meta 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <[email protected]>
1 parent cd12d51 commit aeda804

File tree

7 files changed

+79
-72
lines changed

7 files changed

+79
-72
lines changed

datafusion/core/src/test_util/parquet.rs

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -160,16 +160,8 @@ impl TestParquetFile {
160160
.with_table_parquet_options(parquet_options.clone()),
161161
);
162162
let scan_config_builder =
163-
FileScanConfigBuilder::new(self.object_store_url.clone(), source).with_file(
164-
PartitionedFile {
165-
object_meta: self.object_meta.clone(),
166-
partition_values: vec![],
167-
range: None,
168-
statistics: None,
169-
extensions: None,
170-
metadata_size_hint: None,
171-
},
172-
);
163+
FileScanConfigBuilder::new(self.object_store_url.clone(), source)
164+
.with_file(PartitionedFile::new_from_meta(self.object_meta.clone()));
173165

174166
let df_schema = Arc::clone(&self.schema).to_dfschema_ref()?;
175167

datafusion/datasource-parquet/src/row_group_filter.rs

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1559,14 +1559,7 @@ mod tests {
15591559
ParquetObjectReader::new(Arc::new(in_memory), object_meta.location.clone())
15601560
.with_file_size(object_meta.size);
15611561

1562-
let partitioned_file = PartitionedFile {
1563-
object_meta,
1564-
partition_values: vec![],
1565-
range: None,
1566-
statistics: None,
1567-
extensions: None,
1568-
metadata_size_hint: None,
1569-
};
1562+
let partitioned_file = PartitionedFile::new_from_meta(object_meta);
15701563

15711564
let reader = ParquetFileReader {
15721565
inner,

datafusion/datasource/src/display.rs

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -287,13 +287,6 @@ mod tests {
287287
version: None,
288288
};
289289

290-
PartitionedFile {
291-
object_meta,
292-
partition_values: vec![],
293-
range: None,
294-
statistics: None,
295-
extensions: None,
296-
metadata_size_hint: None,
297-
}
290+
PartitionedFile::new_from_meta(object_meta)
298291
}
299292
}

datafusion/datasource/src/file_scan_config.rs

Lines changed: 35 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1661,43 +1661,41 @@ mod tests {
16611661

16621662
impl From<File> for PartitionedFile {
16631663
fn from(file: File) -> Self {
1664-
PartitionedFile {
1665-
object_meta: ObjectMeta {
1666-
location: Path::from(format!(
1667-
"data/date={}/{}.parquet",
1668-
file.date, file.name
1669-
)),
1670-
last_modified: chrono::Utc.timestamp_nanos(0),
1671-
size: 0,
1672-
e_tag: None,
1673-
version: None,
1674-
},
1675-
partition_values: vec![ScalarValue::from(file.date)],
1676-
range: None,
1677-
statistics: Some(Arc::new(Statistics {
1678-
num_rows: Precision::Absent,
1679-
total_byte_size: Precision::Absent,
1680-
column_statistics: file
1681-
.statistics
1682-
.into_iter()
1683-
.map(|stats| {
1684-
stats
1685-
.map(|(min, max)| ColumnStatistics {
1686-
min_value: Precision::Exact(
1687-
ScalarValue::Float64(min),
1688-
),
1689-
max_value: Precision::Exact(
1690-
ScalarValue::Float64(max),
1691-
),
1692-
..Default::default()
1693-
})
1694-
.unwrap_or_default()
1695-
})
1696-
.collect::<Vec<_>>(),
1697-
})),
1698-
extensions: None,
1699-
metadata_size_hint: None,
1700-
}
1664+
let object_meta = ObjectMeta {
1665+
location: Path::from(format!(
1666+
"data/date={}/{}.parquet",
1667+
file.date, file.name
1668+
)),
1669+
last_modified: chrono::Utc.timestamp_nanos(0),
1670+
size: 0,
1671+
e_tag: None,
1672+
version: None,
1673+
};
1674+
let statistics = Arc::new(Statistics {
1675+
num_rows: Precision::Absent,
1676+
total_byte_size: Precision::Absent,
1677+
column_statistics: file
1678+
.statistics
1679+
.into_iter()
1680+
.map(|stats| {
1681+
stats
1682+
.map(|(min, max)| ColumnStatistics {
1683+
min_value: Precision::Exact(ScalarValue::Float64(
1684+
min,
1685+
)),
1686+
max_value: Precision::Exact(ScalarValue::Float64(
1687+
max,
1688+
)),
1689+
..Default::default()
1690+
})
1691+
.unwrap_or_default()
1692+
})
1693+
.collect::<Vec<_>>(),
1694+
});
1695+
let mut pf = PartitionedFile::new_from_meta(object_meta)
1696+
.with_partition_values(vec![ScalarValue::from(file.date)]);
1697+
pf.statistics = Some(statistics);
1698+
pf
17011699
}
17021700
}
17031701
}

datafusion/datasource/src/mod.rs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ use chrono::TimeZone;
5858
use datafusion_common::stats::Precision;
5959
use datafusion_common::{ColumnStatistics, Result, exec_datafusion_err};
6060
use datafusion_common::{ScalarValue, Statistics};
61+
use datafusion_physical_expr::LexOrdering;
6162
use futures::{Stream, StreamExt};
6263
use object_store::{GetOptions, GetRange, ObjectStore};
6364
use object_store::{ObjectMeta, path::Path};
@@ -133,6 +134,8 @@ pub struct PartitionedFile {
133134
/// When set via [`Self::with_statistics`], partition column statistics are automatically
134135
/// computed from [`Self::partition_values`] with exact min/max/null_count/distinct_count.
135136
pub statistics: Option<Arc<Statistics>>,
137+
/// A known ordering of the data in this file.
138+
pub ordering: Option<LexOrdering>,
136139
/// An optional field for user defined per object metadata
137140
pub extensions: Option<Arc<dyn std::any::Any + Send + Sync>>,
138141
/// The estimated size of the parquet metadata, in bytes
@@ -153,6 +156,20 @@ impl PartitionedFile {
153156
partition_values: vec![],
154157
range: None,
155158
statistics: None,
159+
ordering: None,
160+
extensions: None,
161+
metadata_size_hint: None,
162+
}
163+
}
164+
165+
/// Create a file from a known ObjectMeta without partition
166+
pub fn new_from_meta(object_meta: ObjectMeta) -> Self {
167+
Self {
168+
object_meta,
169+
partition_values: vec![],
170+
range: None,
171+
statistics: None,
172+
ordering: None,
156173
extensions: None,
157174
metadata_size_hint: None,
158175
}
@@ -171,12 +188,20 @@ impl PartitionedFile {
171188
partition_values: vec![],
172189
range: Some(FileRange { start, end }),
173190
statistics: None,
191+
ordering: None,
174192
extensions: None,
175193
metadata_size_hint: None,
176194
}
177195
.with_range(start, end)
178196
}
179197

198+
/// Attach partition values to this file.
199+
/// This replaces any existing partition values.
200+
pub fn with_partition_values(mut self, partition_values: Vec<ScalarValue>) -> Self {
201+
self.partition_values = partition_values;
202+
self
203+
}
204+
180205
/// Size of the file to be scanned (taking into account the range, if present).
181206
pub fn effective_size(&self) -> u64 {
182207
if let Some(range) = &self.range {
@@ -282,6 +307,15 @@ impl PartitionedFile {
282307
false
283308
}
284309
}
310+
311+
/// Set the known ordering of data in this file.
312+
///
313+
/// The ordering represents the lexicographical sort order of the data,
314+
/// typically inferred from file metadata (e.g., Parquet sorting_columns).
315+
pub fn with_ordering(mut self, ordering: Option<LexOrdering>) -> Self {
316+
self.ordering = ordering;
317+
self
318+
}
285319
}
286320

287321
impl From<ObjectMeta> for PartitionedFile {
@@ -291,6 +325,7 @@ impl From<ObjectMeta> for PartitionedFile {
291325
partition_values: vec![],
292326
range: None,
293327
statistics: None,
328+
ordering: None,
294329
extensions: None,
295330
metadata_size_hint: None,
296331
}
@@ -483,6 +518,7 @@ pub fn generate_test_files(num_files: usize, overlap_factor: f64) -> Vec<FileGro
483518
byte_size: Precision::Absent,
484519
}],
485520
})),
521+
ordering: None,
486522
extensions: None,
487523
metadata_size_hint: None,
488524
};

datafusion/proto/src/physical_plan/from_proto.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -637,6 +637,7 @@ impl TryFrom<&protobuf::PartitionedFile> for PartitionedFile {
637637
.as_ref()
638638
.map(|v| v.try_into().map(Arc::new))
639639
.transpose()?,
640+
ordering: None,
640641
extensions: None,
641642
metadata_size_hint: None,
642643
})

datafusion/substrait/src/physical_plan/consumer.rs

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -119,20 +119,14 @@ pub async fn from_substrait_rel(
119119
.unwrap();
120120
let size = 0;
121121

122-
let partitioned_file = PartitionedFile {
123-
object_meta: ObjectMeta {
122+
let partitioned_file =
123+
PartitionedFile::new_from_meta(ObjectMeta {
124124
last_modified: last_modified.into(),
125125
location: path.into(),
126126
size,
127127
e_tag: None,
128128
version: None,
129-
},
130-
partition_values: vec![],
131-
range: None,
132-
statistics: None,
133-
extensions: None,
134-
metadata_size_hint: None,
135-
};
129+
});
136130

137131
let part_index = file.partition_index as usize;
138132
while part_index >= file_groups.len() {

0 commit comments

Comments
 (0)