Skip to content

Commit ed01b67

Browse files
adriangbclaudeCopilot
authored
Refactor PartitionedFile: add ordering field and new_from_meta constructor (#19596)
## Which issue does this PR close? Part of #19433 ## Rationale for this change In preparation for ordering inference from Parquet metadata, we need to be able to store per-file ordering information on `PartitionedFile`. This PR adds the necessary infrastructure. ## What changes are included in this PR? - Add `ordering: Option<LexOrdering>` field to `PartitionedFile` struct - Add `new_from_meta(ObjectMeta)` constructor for creating files from metadata (cleaner than manually constructing) - Add `with_ordering()` builder method to set ordering information - Add `with_partition_values()` builder method for consistency - Update all `PartitionedFile` constructors to initialize `ordering: None` - Update callsites in test_util, proto, and substrait to use `new_from_meta` ## Are these changes tested? Yes, existing tests pass. The new field is currently always `None` so no new tests are needed yet. Tests for ordering inference will come in a follow-up PR. ## Are there any user-facing changes? No user-facing API changes. The `ordering` field is public but users typically construct `PartitionedFile` via the provided constructors which handle this automatically. 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude Opus 4.5 <[email protected]> Co-authored-by: Copilot <[email protected]>
1 parent 7942e75 commit ed01b67

File tree

12 files changed

+142
-200
lines changed

12 files changed

+142
-200
lines changed

datafusion/core/src/datasource/file_format/mod.rs

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -73,17 +73,7 @@ pub(crate) mod test_util {
7373
.infer_stats(state, &store, file_schema.clone(), &meta)
7474
.await?;
7575

76-
let file_groups = vec![
77-
vec![PartitionedFile {
78-
object_meta: meta,
79-
partition_values: vec![],
80-
range: None,
81-
statistics: None,
82-
extensions: None,
83-
metadata_size_hint: None,
84-
}]
85-
.into(),
86-
];
76+
let file_groups = vec![vec![PartitionedFile::new_from_meta(meta)].into()];
8777

8878
let exec = format
8979
.create_physical_plan(

datafusion/core/src/datasource/mod.rs

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -113,14 +113,7 @@ mod tests {
113113
version: None,
114114
};
115115

116-
let partitioned_file = PartitionedFile {
117-
object_meta: meta,
118-
partition_values: vec![],
119-
range: None,
120-
statistics: None,
121-
extensions: None,
122-
metadata_size_hint: None,
123-
};
116+
let partitioned_file = PartitionedFile::new_from_meta(meta);
124117

125118
let f1 = Field::new("id", DataType::Int32, true);
126119
let f2 = Field::new("extra_column", DataType::Utf8, true);

datafusion/core/src/datasource/physical_plan/parquet.rs

Lines changed: 25 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ mod tests {
5454
use datafusion_datasource::source::DataSourceExec;
5555

5656
use datafusion_datasource::file::FileSource;
57-
use datafusion_datasource::{FileRange, PartitionedFile, TableSchema};
57+
use datafusion_datasource::{PartitionedFile, TableSchema};
5858
use datafusion_datasource_parquet::source::ParquetSource;
5959
use datafusion_datasource_parquet::{
6060
DefaultParquetFileReaderFactory, ParquetFileReaderFactory, ParquetFormat,
@@ -1527,14 +1527,7 @@ mod tests {
15271527
#[tokio::test]
15281528
async fn parquet_exec_with_range() -> Result<()> {
15291529
fn file_range(meta: &ObjectMeta, start: i64, end: i64) -> PartitionedFile {
1530-
PartitionedFile {
1531-
object_meta: meta.clone(),
1532-
partition_values: vec![],
1533-
range: Some(FileRange { start, end }),
1534-
statistics: None,
1535-
extensions: None,
1536-
metadata_size_hint: None,
1537-
}
1530+
PartitionedFile::new_from_meta(meta.clone()).with_range(start, end)
15381531
}
15391532

15401533
async fn assert_parquet_read(
@@ -1616,21 +1609,15 @@ mod tests {
16161609
.await
16171610
.unwrap();
16181611

1619-
let partitioned_file = PartitionedFile {
1620-
object_meta: meta,
1621-
partition_values: vec![
1612+
let partitioned_file = PartitionedFile::new_from_meta(meta)
1613+
.with_partition_values(vec![
16221614
ScalarValue::from("2021"),
16231615
ScalarValue::UInt8(Some(10)),
16241616
ScalarValue::Dictionary(
16251617
Box::new(DataType::UInt16),
16261618
Box::new(ScalarValue::from("26")),
16271619
),
1628-
],
1629-
range: None,
1630-
statistics: None,
1631-
extensions: None,
1632-
metadata_size_hint: None,
1633-
};
1620+
]);
16341621

16351622
let expected_schema = Schema::new(vec![
16361623
Field::new("id", DataType::Int32, true),
@@ -1711,20 +1698,13 @@ mod tests {
17111698
.unwrap()
17121699
.child("invalid.parquet");
17131700

1714-
let partitioned_file = PartitionedFile {
1715-
object_meta: ObjectMeta {
1716-
location,
1717-
last_modified: Utc.timestamp_nanos(0),
1718-
size: 1337,
1719-
e_tag: None,
1720-
version: None,
1721-
},
1722-
partition_values: vec![],
1723-
range: None,
1724-
statistics: None,
1725-
extensions: None,
1726-
metadata_size_hint: None,
1727-
};
1701+
let partitioned_file = PartitionedFile::new_from_meta(ObjectMeta {
1702+
location,
1703+
last_modified: Utc.timestamp_nanos(0),
1704+
size: 1337,
1705+
e_tag: None,
1706+
version: None,
1707+
});
17281708

17291709
let file_schema = Arc::new(Schema::empty());
17301710
let config = FileScanConfigBuilder::new(
@@ -2376,36 +2356,22 @@ mod tests {
23762356
);
23772357
let config = FileScanConfigBuilder::new(store_url, source)
23782358
.with_file(
2379-
PartitionedFile {
2380-
object_meta: ObjectMeta {
2381-
location: Path::from(name_1),
2382-
last_modified: Utc::now(),
2383-
size: total_size_1,
2384-
e_tag: None,
2385-
version: None,
2386-
},
2387-
partition_values: vec![],
2388-
range: None,
2389-
statistics: None,
2390-
extensions: None,
2391-
metadata_size_hint: None,
2392-
}
2393-
.with_metadata_size_hint(123),
2394-
)
2395-
.with_file(PartitionedFile {
2396-
object_meta: ObjectMeta {
2397-
location: Path::from(name_2),
2359+
PartitionedFile::new_from_meta(ObjectMeta {
2360+
location: Path::from(name_1),
23982361
last_modified: Utc::now(),
2399-
size: total_size_2,
2362+
size: total_size_1,
24002363
e_tag: None,
24012364
version: None,
2402-
},
2403-
partition_values: vec![],
2404-
range: None,
2405-
statistics: None,
2406-
extensions: None,
2407-
metadata_size_hint: None,
2408-
})
2365+
})
2366+
.with_metadata_size_hint(123),
2367+
)
2368+
.with_file(PartitionedFile::new_from_meta(ObjectMeta {
2369+
location: Path::from(name_2),
2370+
last_modified: Utc::now(),
2371+
size: total_size_2,
2372+
e_tag: None,
2373+
version: None,
2374+
}))
24092375
.build();
24102376

24112377
let exec = DataSourceExec::from_data_source(config);

datafusion/core/src/test_util/parquet.rs

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -160,16 +160,8 @@ impl TestParquetFile {
160160
.with_table_parquet_options(parquet_options.clone()),
161161
);
162162
let scan_config_builder =
163-
FileScanConfigBuilder::new(self.object_store_url.clone(), source).with_file(
164-
PartitionedFile {
165-
object_meta: self.object_meta.clone(),
166-
partition_values: vec![],
167-
range: None,
168-
statistics: None,
169-
extensions: None,
170-
metadata_size_hint: None,
171-
},
172-
);
163+
FileScanConfigBuilder::new(self.object_store_url.clone(), source)
164+
.with_file(PartitionedFile::new_from_meta(self.object_meta.clone()));
173165

174166
let df_schema = Arc::clone(&self.schema).to_dfschema_ref()?;
175167

datafusion/core/tests/parquet/custom_reader.rs

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -69,13 +69,9 @@ async fn route_data_access_ops_to_parquet_file_reader_factory() {
6969
store_parquet_in_memory(vec![batch]).await;
7070
let file_group = parquet_files_meta
7171
.into_iter()
72-
.map(|meta| PartitionedFile {
73-
object_meta: meta,
74-
partition_values: vec![],
75-
range: None,
76-
statistics: None,
77-
extensions: Some(Arc::new(String::from(EXPECTED_USER_DEFINED_METADATA))),
78-
metadata_size_hint: None,
72+
.map(|meta| {
73+
PartitionedFile::new_from_meta(meta)
74+
.with_extensions(Arc::new(String::from(EXPECTED_USER_DEFINED_METADATA)))
7975
})
8076
.collect();
8177

datafusion/core/tests/parquet/page_pruning.rs

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -67,14 +67,7 @@ async fn get_parquet_exec(
6767
.await
6868
.unwrap();
6969

70-
let partitioned_file = PartitionedFile {
71-
object_meta: meta,
72-
partition_values: vec![],
73-
range: None,
74-
statistics: None,
75-
extensions: None,
76-
metadata_size_hint: None,
77-
};
70+
let partitioned_file = PartitionedFile::new_from_meta(meta);
7871

7972
let df_schema = schema.clone().to_dfschema().unwrap();
8073
let execution_props = ExecutionProps::new();

datafusion/datasource-parquet/src/row_group_filter.rs

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1559,14 +1559,7 @@ mod tests {
15591559
ParquetObjectReader::new(Arc::new(in_memory), object_meta.location.clone())
15601560
.with_file_size(object_meta.size);
15611561

1562-
let partitioned_file = PartitionedFile {
1563-
object_meta,
1564-
partition_values: vec![],
1565-
range: None,
1566-
statistics: None,
1567-
extensions: None,
1568-
metadata_size_hint: None,
1569-
};
1562+
let partitioned_file = PartitionedFile::new_from_meta(object_meta);
15701563

15711564
let reader = ParquetFileReader {
15721565
inner,

datafusion/datasource/src/display.rs

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -287,13 +287,6 @@ mod tests {
287287
version: None,
288288
};
289289

290-
PartitionedFile {
291-
object_meta,
292-
partition_values: vec![],
293-
range: None,
294-
statistics: None,
295-
extensions: None,
296-
metadata_size_hint: None,
297-
}
290+
PartitionedFile::new_from_meta(object_meta)
298291
}
299292
}

datafusion/datasource/src/file_scan_config.rs

Lines changed: 34 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1661,43 +1661,40 @@ mod tests {
16611661

16621662
impl From<File> for PartitionedFile {
16631663
fn from(file: File) -> Self {
1664-
PartitionedFile {
1665-
object_meta: ObjectMeta {
1666-
location: Path::from(format!(
1667-
"data/date={}/{}.parquet",
1668-
file.date, file.name
1669-
)),
1670-
last_modified: chrono::Utc.timestamp_nanos(0),
1671-
size: 0,
1672-
e_tag: None,
1673-
version: None,
1674-
},
1675-
partition_values: vec![ScalarValue::from(file.date)],
1676-
range: None,
1677-
statistics: Some(Arc::new(Statistics {
1678-
num_rows: Precision::Absent,
1679-
total_byte_size: Precision::Absent,
1680-
column_statistics: file
1681-
.statistics
1682-
.into_iter()
1683-
.map(|stats| {
1684-
stats
1685-
.map(|(min, max)| ColumnStatistics {
1686-
min_value: Precision::Exact(
1687-
ScalarValue::Float64(min),
1688-
),
1689-
max_value: Precision::Exact(
1690-
ScalarValue::Float64(max),
1691-
),
1692-
..Default::default()
1693-
})
1694-
.unwrap_or_default()
1695-
})
1696-
.collect::<Vec<_>>(),
1697-
})),
1698-
extensions: None,
1699-
metadata_size_hint: None,
1700-
}
1664+
let object_meta = ObjectMeta {
1665+
location: Path::from(format!(
1666+
"data/date={}/{}.parquet",
1667+
file.date, file.name
1668+
)),
1669+
last_modified: chrono::Utc.timestamp_nanos(0),
1670+
size: 0,
1671+
e_tag: None,
1672+
version: None,
1673+
};
1674+
let statistics = Arc::new(Statistics {
1675+
num_rows: Precision::Absent,
1676+
total_byte_size: Precision::Absent,
1677+
column_statistics: file
1678+
.statistics
1679+
.into_iter()
1680+
.map(|stats| {
1681+
stats
1682+
.map(|(min, max)| ColumnStatistics {
1683+
min_value: Precision::Exact(ScalarValue::Float64(
1684+
min,
1685+
)),
1686+
max_value: Precision::Exact(ScalarValue::Float64(
1687+
max,
1688+
)),
1689+
..Default::default()
1690+
})
1691+
.unwrap_or_default()
1692+
})
1693+
.collect::<Vec<_>>(),
1694+
});
1695+
PartitionedFile::new_from_meta(object_meta)
1696+
.with_partition_values(vec![ScalarValue::from(file.date)])
1697+
.with_statistics(statistics)
17011698
}
17021699
}
17031700
}

0 commit comments

Comments
 (0)