Skip to content

Commit bf51e52

Browse files
adriangbclaude
andcommitted
refactor: use PartitionedFile builder pattern across codebase
Update all PartitionedFile struct literals to use new_from_meta() builder pattern with appropriate builder methods like with_partition_values(), with_extensions(), with_range(), and with_statistics(). 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <[email protected]>
1 parent f2d318c commit bf51e52

File tree

7 files changed

+64
-131
lines changed

7 files changed

+64
-131
lines changed

datafusion/core/src/datasource/file_format/mod.rs

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -74,14 +74,7 @@ pub(crate) mod test_util {
7474
.await?;
7575

7676
let file_groups = vec![
77-
vec![PartitionedFile {
78-
object_meta: meta,
79-
partition_values: vec![],
80-
range: None,
81-
statistics: None,
82-
extensions: None,
83-
metadata_size_hint: None,
84-
}]
77+
vec![PartitionedFile::new_from_meta(meta)]
8578
.into(),
8679
];
8780

datafusion/core/src/datasource/mod.rs

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -113,14 +113,7 @@ mod tests {
113113
version: None,
114114
};
115115

116-
let partitioned_file = PartitionedFile {
117-
object_meta: meta,
118-
partition_values: vec![],
119-
range: None,
120-
statistics: None,
121-
extensions: None,
122-
metadata_size_hint: None,
123-
};
116+
let partitioned_file = PartitionedFile::new_from_meta(meta);
124117

125118
let f1 = Field::new("id", DataType::Int32, true);
126119
let f2 = Field::new("extra_column", DataType::Utf8, true);

datafusion/core/src/datasource/physical_plan/parquet.rs

Lines changed: 28 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -1527,14 +1527,8 @@ mod tests {
15271527
#[tokio::test]
15281528
async fn parquet_exec_with_range() -> Result<()> {
15291529
fn file_range(meta: &ObjectMeta, start: i64, end: i64) -> PartitionedFile {
1530-
PartitionedFile {
1531-
object_meta: meta.clone(),
1532-
partition_values: vec![],
1533-
range: Some(FileRange { start, end }),
1534-
statistics: None,
1535-
extensions: None,
1536-
metadata_size_hint: None,
1537-
}
1530+
PartitionedFile::new_from_meta(meta.clone())
1531+
.with_range(start, end)
15381532
}
15391533

15401534
async fn assert_parquet_read(
@@ -1616,21 +1610,14 @@ mod tests {
16161610
.await
16171611
.unwrap();
16181612

1619-
let partitioned_file = PartitionedFile {
1620-
object_meta: meta,
1621-
partition_values: vec![
1622-
ScalarValue::from("2021"),
1623-
ScalarValue::UInt8(Some(10)),
1624-
ScalarValue::Dictionary(
1625-
Box::new(DataType::UInt16),
1626-
Box::new(ScalarValue::from("26")),
1627-
),
1628-
],
1629-
range: None,
1630-
statistics: None,
1631-
extensions: None,
1632-
metadata_size_hint: None,
1633-
};
1613+
let partitioned_file = PartitionedFile::new_from_meta(meta).with_partition_values(vec![
1614+
ScalarValue::from("2021"),
1615+
ScalarValue::UInt8(Some(10)),
1616+
ScalarValue::Dictionary(
1617+
Box::new(DataType::UInt16),
1618+
Box::new(ScalarValue::from("26")),
1619+
),
1620+
]);
16341621

16351622
let expected_schema = Schema::new(vec![
16361623
Field::new("id", DataType::Int32, true),
@@ -1711,20 +1698,13 @@ mod tests {
17111698
.unwrap()
17121699
.child("invalid.parquet");
17131700

1714-
let partitioned_file = PartitionedFile {
1715-
object_meta: ObjectMeta {
1716-
location,
1717-
last_modified: Utc.timestamp_nanos(0),
1718-
size: 1337,
1719-
e_tag: None,
1720-
version: None,
1721-
},
1722-
partition_values: vec![],
1723-
range: None,
1724-
statistics: None,
1725-
extensions: None,
1726-
metadata_size_hint: None,
1727-
};
1701+
let partitioned_file = PartitionedFile::new_from_meta(ObjectMeta {
1702+
location,
1703+
last_modified: Utc.timestamp_nanos(0),
1704+
size: 1337,
1705+
e_tag: None,
1706+
version: None,
1707+
});
17281708

17291709
let file_schema = Arc::new(Schema::empty());
17301710
let config = FileScanConfigBuilder::new(
@@ -2376,36 +2356,24 @@ mod tests {
23762356
);
23772357
let config = FileScanConfigBuilder::new(store_url, source)
23782358
.with_file(
2379-
PartitionedFile {
2380-
object_meta: ObjectMeta {
2359+
PartitionedFile::new_from_meta(
2360+
ObjectMeta {
23812361
location: Path::from(name_1),
23822362
last_modified: Utc::now(),
23832363
size: total_size_1,
23842364
e_tag: None,
23852365
version: None,
2386-
},
2387-
partition_values: vec![],
2388-
range: None,
2389-
statistics: None,
2390-
extensions: None,
2391-
metadata_size_hint: None,
2392-
}
2366+
}
2367+
)
23932368
.with_metadata_size_hint(123),
23942369
)
2395-
.with_file(PartitionedFile {
2396-
object_meta: ObjectMeta {
2397-
location: Path::from(name_2),
2398-
last_modified: Utc::now(),
2399-
size: total_size_2,
2400-
e_tag: None,
2401-
version: None,
2402-
},
2403-
partition_values: vec![],
2404-
range: None,
2405-
statistics: None,
2406-
extensions: None,
2407-
metadata_size_hint: None,
2408-
})
2370+
.with_file(PartitionedFile::new_from_meta(ObjectMeta {
2371+
location: Path::from(name_2),
2372+
last_modified: Utc::now(),
2373+
size: total_size_2,
2374+
e_tag: None,
2375+
version: None,
2376+
}))
24092377
.build();
24102378

24112379
let exec = DataSourceExec::from_data_source(config);

datafusion/core/tests/parquet/custom_reader.rs

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -69,13 +69,9 @@ async fn route_data_access_ops_to_parquet_file_reader_factory() {
6969
store_parquet_in_memory(vec![batch]).await;
7070
let file_group = parquet_files_meta
7171
.into_iter()
72-
.map(|meta| PartitionedFile {
73-
object_meta: meta,
74-
partition_values: vec![],
75-
range: None,
76-
statistics: None,
77-
extensions: Some(Arc::new(String::from(EXPECTED_USER_DEFINED_METADATA))),
78-
metadata_size_hint: None,
72+
.map(|meta| {
73+
PartitionedFile::new_from_meta(meta)
74+
.with_extensions(Arc::new(String::from(EXPECTED_USER_DEFINED_METADATA)))
7975
})
8076
.collect();
8177

datafusion/core/tests/parquet/page_pruning.rs

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -67,14 +67,7 @@ async fn get_parquet_exec(
6767
.await
6868
.unwrap();
6969

70-
let partitioned_file = PartitionedFile {
71-
object_meta: meta,
72-
partition_values: vec![],
73-
range: None,
74-
statistics: None,
75-
extensions: None,
76-
metadata_size_hint: None,
77-
};
70+
let partitioned_file = PartitionedFile::new_from_meta(meta);
7871

7972
let df_schema = schema.clone().to_dfschema().unwrap();
8073
let execution_props = ExecutionProps::new();

datafusion/datasource/src/file_scan_config.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1692,10 +1692,9 @@ mod tests {
16921692
})
16931693
.collect::<Vec<_>>(),
16941694
});
1695-
let mut pf = PartitionedFile::new_from_meta(object_meta)
1696-
.with_partition_values(vec![ScalarValue::from(file.date)]);
1697-
pf.statistics = Some(statistics);
1698-
pf
1695+
PartitionedFile::new_from_meta(object_meta)
1696+
.with_partition_values(vec![ScalarValue::from(file.date)])
1697+
.with_statistics(statistics)
16991698
}
17001699
}
17011700
}

datafusion/proto/src/physical_plan/from_proto.rs

Lines changed: 27 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -616,31 +616,29 @@ impl TryFrom<&protobuf::PartitionedFile> for PartitionedFile {
616616
type Error = DataFusionError;
617617

618618
fn try_from(val: &protobuf::PartitionedFile) -> Result<Self, Self::Error> {
619-
Ok(PartitionedFile {
620-
object_meta: ObjectMeta {
621-
location: Path::parse(val.path.as_str()).map_err(|e| {
622-
proto_error(format!("Invalid object_store path: {e}"))
623-
})?,
624-
last_modified: Utc.timestamp_nanos(val.last_modified_ns as i64),
625-
size: val.size,
626-
e_tag: None,
627-
version: None,
628-
},
629-
partition_values: val
630-
.partition_values
619+
let mut pf = PartitionedFile::new_from_meta(ObjectMeta {
620+
location: Path::parse(val.path.as_str()).map_err(|e| {
621+
proto_error(format!("Invalid object_store path: {e}"))
622+
})?,
623+
last_modified: Utc.timestamp_nanos(val.last_modified_ns as i64),
624+
size: val.size,
625+
e_tag: None,
626+
version: None,
627+
})
628+
.with_partition_values(
629+
val.partition_values
631630
.iter()
632631
.map(|v| v.try_into())
633632
.collect::<Result<Vec<_>, _>>()?,
634-
range: val.range.as_ref().map(|v| v.try_into()).transpose()?,
635-
statistics: val
636-
.statistics
637-
.as_ref()
638-
.map(|v| v.try_into().map(Arc::new))
639-
.transpose()?,
640-
ordering: None,
641-
extensions: None,
642-
metadata_size_hint: None,
643-
})
633+
);
634+
if let Some(range) = val.range.as_ref() {
635+
let file_range: FileRange = range.try_into()?;
636+
pf = pf.with_range(file_range.start, file_range.end);
637+
}
638+
if let Some(proto_stats) = val.statistics.as_ref() {
639+
pf = pf.with_statistics(Arc::new(proto_stats.try_into()?));
640+
}
641+
Ok(pf)
644642
}
645643
}
646644

@@ -755,20 +753,13 @@ mod tests {
755753
#[test]
756754
fn partitioned_file_path_roundtrip_percent_encoded() {
757755
let path_str = "foo/foo%2Fbar/baz%252Fqux";
758-
let pf = PartitionedFile {
759-
object_meta: ObjectMeta {
760-
location: Path::parse(path_str).unwrap(),
761-
last_modified: Utc.timestamp_nanos(1_000),
762-
size: 42,
763-
e_tag: None,
764-
version: None,
765-
},
766-
partition_values: vec![],
767-
range: None,
768-
statistics: None,
769-
extensions: None,
770-
metadata_size_hint: None,
771-
};
756+
let pf = PartitionedFile::new_from_meta(ObjectMeta {
757+
location: Path::parse(path_str).unwrap(),
758+
last_modified: Utc.timestamp_nanos(1_000),
759+
size: 42,
760+
e_tag: None,
761+
version: None,
762+
});
772763

773764
let proto = protobuf::PartitionedFile::try_from(&pf).unwrap();
774765
assert_eq!(proto.path, path_str);

0 commit comments

Comments
 (0)