diff --git a/datafusion/core/src/datasource/file_format/mod.rs b/datafusion/core/src/datasource/file_format/mod.rs index 6bbb63f6a17ad..b04238ebc9b37 100644 --- a/datafusion/core/src/datasource/file_format/mod.rs +++ b/datafusion/core/src/datasource/file_format/mod.rs @@ -73,17 +73,7 @@ pub(crate) mod test_util { .infer_stats(state, &store, file_schema.clone(), &meta) .await?; - let file_groups = vec![ - vec![PartitionedFile { - object_meta: meta, - partition_values: vec![], - range: None, - statistics: None, - extensions: None, - metadata_size_hint: None, - }] - .into(), - ]; + let file_groups = vec![vec![PartitionedFile::new_from_meta(meta)].into()]; let exec = format .create_physical_plan( diff --git a/datafusion/core/src/datasource/mod.rs b/datafusion/core/src/datasource/mod.rs index aefda64d39367..1f21d6a7e603a 100644 --- a/datafusion/core/src/datasource/mod.rs +++ b/datafusion/core/src/datasource/mod.rs @@ -113,14 +113,7 @@ mod tests { version: None, }; - let partitioned_file = PartitionedFile { - object_meta: meta, - partition_values: vec![], - range: None, - statistics: None, - extensions: None, - metadata_size_hint: None, - }; + let partitioned_file = PartitionedFile::new_from_meta(meta); let f1 = Field::new("id", DataType::Int32, true); let f2 = Field::new("extra_column", DataType::Utf8, true); diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs index 4703b55ecc0de..ce2b05e6d3b61 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet.rs @@ -54,7 +54,7 @@ mod tests { use datafusion_datasource::source::DataSourceExec; use datafusion_datasource::file::FileSource; - use datafusion_datasource::{FileRange, PartitionedFile, TableSchema}; + use datafusion_datasource::{PartitionedFile, TableSchema}; use datafusion_datasource_parquet::source::ParquetSource; use datafusion_datasource_parquet::{ DefaultParquetFileReaderFactory, ParquetFileReaderFactory, ParquetFormat, @@ -1527,14 +1527,7 @@ mod tests { #[tokio::test] async fn parquet_exec_with_range() -> Result<()> { fn file_range(meta: &ObjectMeta, start: i64, end: i64) -> PartitionedFile { - PartitionedFile { - object_meta: meta.clone(), - partition_values: vec![], - range: Some(FileRange { start, end }), - statistics: None, - extensions: None, - metadata_size_hint: None, - } + PartitionedFile::new_from_meta(meta.clone()).with_range(start, end) } async fn assert_parquet_read( @@ -1616,21 +1609,15 @@ mod tests { .await .unwrap(); - let partitioned_file = PartitionedFile { - object_meta: meta, - partition_values: vec![ + let partitioned_file = PartitionedFile::new_from_meta(meta) + .with_partition_values(vec![ ScalarValue::from("2021"), ScalarValue::UInt8(Some(10)), ScalarValue::Dictionary( Box::new(DataType::UInt16), Box::new(ScalarValue::from("26")), ), - ], - range: None, - statistics: None, - extensions: None, - metadata_size_hint: None, - }; + ]); let expected_schema = Schema::new(vec![ Field::new("id", DataType::Int32, true), @@ -1711,20 +1698,13 @@ mod tests { .unwrap() .child("invalid.parquet"); - let partitioned_file = PartitionedFile { - object_meta: ObjectMeta { - location, - last_modified: Utc.timestamp_nanos(0), - size: 1337, - e_tag: None, - version: None, - }, - partition_values: vec![], - range: None, - statistics: None, - extensions: None, - metadata_size_hint: None, - }; + let partitioned_file = PartitionedFile::new_from_meta(ObjectMeta { + location, + last_modified: Utc.timestamp_nanos(0), + size: 1337, + e_tag: None, + version: None, + }); let file_schema = Arc::new(Schema::empty()); let config = FileScanConfigBuilder::new( @@ -2376,36 +2356,22 @@ mod tests { ); let config = FileScanConfigBuilder::new(store_url, source) .with_file( - PartitionedFile { - object_meta: ObjectMeta { - location: Path::from(name_1), - last_modified: Utc::now(), - size: total_size_1, - e_tag: None, - version: None, - }, - partition_values: vec![], - range: None, - statistics: None, - extensions: None, - metadata_size_hint: None, - } - .with_metadata_size_hint(123), - ) - .with_file(PartitionedFile { - object_meta: ObjectMeta { - location: Path::from(name_2), + PartitionedFile::new_from_meta(ObjectMeta { + location: Path::from(name_1), last_modified: Utc::now(), - size: total_size_2, + size: total_size_1, e_tag: None, version: None, - }, - partition_values: vec![], - range: None, - statistics: None, - extensions: None, - metadata_size_hint: None, - }) + }) + .with_metadata_size_hint(123), + ) + .with_file(PartitionedFile::new_from_meta(ObjectMeta { + location: Path::from(name_2), + last_modified: Utc::now(), + size: total_size_2, + e_tag: None, + version: None, + })) .build(); let exec = DataSourceExec::from_data_source(config); diff --git a/datafusion/core/src/test_util/parquet.rs b/datafusion/core/src/test_util/parquet.rs index 44e884c23a681..53684e51bc0ba 100644 --- a/datafusion/core/src/test_util/parquet.rs +++ b/datafusion/core/src/test_util/parquet.rs @@ -160,16 +160,8 @@ impl TestParquetFile { .with_table_parquet_options(parquet_options.clone()), ); let scan_config_builder = - FileScanConfigBuilder::new(self.object_store_url.clone(), source).with_file( - PartitionedFile { - object_meta: self.object_meta.clone(), - partition_values: vec![], - range: None, - statistics: None, - extensions: None, - metadata_size_hint: None, - }, - ); + FileScanConfigBuilder::new(self.object_store_url.clone(), source) + .with_file(PartitionedFile::new_from_meta(self.object_meta.clone())); let df_schema = Arc::clone(&self.schema).to_dfschema_ref()?; diff --git a/datafusion/core/tests/parquet/custom_reader.rs b/datafusion/core/tests/parquet/custom_reader.rs index 31ec6efd19510..25f69d2975eac 100644 --- a/datafusion/core/tests/parquet/custom_reader.rs +++ b/datafusion/core/tests/parquet/custom_reader.rs @@ -69,13 +69,9 @@ async fn route_data_access_ops_to_parquet_file_reader_factory() { store_parquet_in_memory(vec![batch]).await; let file_group = parquet_files_meta .into_iter() - .map(|meta| PartitionedFile { - object_meta: meta, - partition_values: vec![], - range: None, - statistics: None, - extensions: Some(Arc::new(String::from(EXPECTED_USER_DEFINED_METADATA))), - metadata_size_hint: None, + .map(|meta| { + PartitionedFile::new_from_meta(meta) + .with_extensions(Arc::new(String::from(EXPECTED_USER_DEFINED_METADATA))) }) .collect(); diff --git a/datafusion/core/tests/parquet/page_pruning.rs b/datafusion/core/tests/parquet/page_pruning.rs index 17392974b63a8..7eb39bfe78305 100644 --- a/datafusion/core/tests/parquet/page_pruning.rs +++ b/datafusion/core/tests/parquet/page_pruning.rs @@ -67,14 +67,7 @@ async fn get_parquet_exec( .await .unwrap(); - let partitioned_file = PartitionedFile { - object_meta: meta, - partition_values: vec![], - range: None, - statistics: None, - extensions: None, - metadata_size_hint: None, - }; + let partitioned_file = PartitionedFile::new_from_meta(meta); let df_schema = schema.clone().to_dfschema().unwrap(); let execution_props = ExecutionProps::new(); diff --git a/datafusion/datasource-parquet/src/row_group_filter.rs b/datafusion/datasource-parquet/src/row_group_filter.rs index 1264197609f3f..046379cc25e23 100644 --- a/datafusion/datasource-parquet/src/row_group_filter.rs +++ b/datafusion/datasource-parquet/src/row_group_filter.rs @@ -1559,14 +1559,7 @@ mod tests { ParquetObjectReader::new(Arc::new(in_memory), object_meta.location.clone()) .with_file_size(object_meta.size); - let partitioned_file = PartitionedFile { - object_meta, - partition_values: vec![], - range: None, - statistics: None, - extensions: None, - metadata_size_hint: None, - }; + let partitioned_file = PartitionedFile::new_from_meta(object_meta); let reader = ParquetFileReader { inner, diff --git a/datafusion/datasource/src/display.rs b/datafusion/datasource/src/display.rs index 15fe8679acdaf..0f59e33ff9eac 100644 --- a/datafusion/datasource/src/display.rs +++ b/datafusion/datasource/src/display.rs @@ -287,13 +287,6 @@ mod tests { version: None, }; - PartitionedFile { - object_meta, - partition_values: vec![], - range: None, - statistics: None, - extensions: None, - metadata_size_hint: None, - } + PartitionedFile::new_from_meta(object_meta) } } diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index c8636343ccc5a..1f7c37315c47a 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -1661,43 +1661,40 @@ mod tests { impl From for PartitionedFile { fn from(file: File) -> Self { - PartitionedFile { - object_meta: ObjectMeta { - location: Path::from(format!( - "data/date={}/{}.parquet", - file.date, file.name - )), - last_modified: chrono::Utc.timestamp_nanos(0), - size: 0, - e_tag: None, - version: None, - }, - partition_values: vec![ScalarValue::from(file.date)], - range: None, - statistics: Some(Arc::new(Statistics { - num_rows: Precision::Absent, - total_byte_size: Precision::Absent, - column_statistics: file - .statistics - .into_iter() - .map(|stats| { - stats - .map(|(min, max)| ColumnStatistics { - min_value: Precision::Exact( - ScalarValue::Float64(min), - ), - max_value: Precision::Exact( - ScalarValue::Float64(max), - ), - ..Default::default() - }) - .unwrap_or_default() - }) - .collect::>(), - })), - extensions: None, - metadata_size_hint: None, - } + let object_meta = ObjectMeta { + location: Path::from(format!( + "data/date={}/{}.parquet", + file.date, file.name + )), + last_modified: chrono::Utc.timestamp_nanos(0), + size: 0, + e_tag: None, + version: None, + }; + let statistics = Arc::new(Statistics { + num_rows: Precision::Absent, + total_byte_size: Precision::Absent, + column_statistics: file + .statistics + .into_iter() + .map(|stats| { + stats + .map(|(min, max)| ColumnStatistics { + min_value: Precision::Exact(ScalarValue::Float64( + min, + )), + max_value: Precision::Exact(ScalarValue::Float64( + max, + )), + ..Default::default() + }) + .unwrap_or_default() + }) + .collect::>(), + }); + PartitionedFile::new_from_meta(object_meta) + .with_partition_values(vec![ScalarValue::from(file.date)]) + .with_statistics(statistics) } } } diff --git a/datafusion/datasource/src/mod.rs b/datafusion/datasource/src/mod.rs index 347e783c278d0..be9c8b7560be6 100644 --- a/datafusion/datasource/src/mod.rs +++ b/datafusion/datasource/src/mod.rs @@ -58,6 +58,7 @@ use chrono::TimeZone; use datafusion_common::stats::Precision; use datafusion_common::{ColumnStatistics, Result, exec_datafusion_err}; use datafusion_common::{ScalarValue, Statistics}; +use datafusion_physical_expr::LexOrdering; use futures::{Stream, StreamExt}; use object_store::{GetOptions, GetRange, ObjectStore}; use object_store::{ObjectMeta, path::Path}; @@ -133,6 +134,16 @@ pub struct PartitionedFile { /// When set via [`Self::with_statistics`], partition column statistics are automatically /// computed from [`Self::partition_values`] with exact min/max/null_count/distinct_count. pub statistics: Option>, + /// The known lexicographical ordering of the rows in this file, if any. + /// + /// This describes how the data within the file is sorted with respect to one or more + /// columns, and is used by the optimizer for planning operations that depend on input + /// ordering (e.g. merges, sorts, and certain aggregations). + /// + /// When available, this is typically inferred from file-level metadata exposed by the + /// underlying format (for example, Parquet `sorting_columns`), but it may also be set + /// explicitly via [`Self::with_ordering`]. + pub ordering: Option, /// An optional field for user defined per object metadata pub extensions: Option>, /// The estimated size of the parquet metadata, in bytes @@ -153,6 +164,20 @@ impl PartitionedFile { partition_values: vec![], range: None, statistics: None, + ordering: None, + extensions: None, + metadata_size_hint: None, + } + } + + /// Create a file from a known ObjectMeta without partition + pub fn new_from_meta(object_meta: ObjectMeta) -> Self { + Self { + object_meta, + partition_values: vec![], + range: None, + statistics: None, + ordering: None, extensions: None, metadata_size_hint: None, } @@ -171,12 +196,20 @@ impl PartitionedFile { partition_values: vec![], range: Some(FileRange { start, end }), statistics: None, + ordering: None, extensions: None, metadata_size_hint: None, } .with_range(start, end) } + /// Attach partition values to this file. + /// This replaces any existing partition values. + pub fn with_partition_values(mut self, partition_values: Vec) -> Self { + self.partition_values = partition_values; + self + } + /// Size of the file to be scanned (taking into account the range, if present). pub fn effective_size(&self) -> u64 { if let Some(range) = &self.range { @@ -282,6 +315,15 @@ impl PartitionedFile { false } } + + /// Set the known ordering of data in this file. + /// + /// The ordering represents the lexicographical sort order of the data, + /// typically inferred from file metadata (e.g., Parquet sorting_columns). + pub fn with_ordering(mut self, ordering: Option) -> Self { + self.ordering = ordering; + self + } } impl From for PartitionedFile { @@ -291,6 +333,7 @@ impl From for PartitionedFile { partition_values: vec![], range: None, statistics: None, + ordering: None, extensions: None, metadata_size_hint: None, } @@ -483,6 +526,7 @@ pub fn generate_test_files(num_files: usize, overlap_factor: f64) -> Vec for PartitionedFile { type Error = DataFusionError; fn try_from(val: &protobuf::PartitionedFile) -> Result { - Ok(PartitionedFile { - object_meta: ObjectMeta { - location: Path::parse(val.path.as_str()).map_err(|e| { - proto_error(format!("Invalid object_store path: {e}")) - })?, - last_modified: Utc.timestamp_nanos(val.last_modified_ns as i64), - size: val.size, - e_tag: None, - version: None, - }, - partition_values: val - .partition_values + let mut pf = PartitionedFile::new_from_meta(ObjectMeta { + location: Path::parse(val.path.as_str()) + .map_err(|e| proto_error(format!("Invalid object_store path: {e}")))?, + last_modified: Utc.timestamp_nanos(val.last_modified_ns as i64), + size: val.size, + e_tag: None, + version: None, + }) + .with_partition_values( + val.partition_values .iter() .map(|v| v.try_into()) .collect::, _>>()?, - range: val.range.as_ref().map(|v| v.try_into()).transpose()?, - statistics: val - .statistics - .as_ref() - .map(|v| v.try_into().map(Arc::new)) - .transpose()?, - extensions: None, - metadata_size_hint: None, - }) + ); + if let Some(range) = val.range.as_ref() { + let file_range: FileRange = range.try_into()?; + pf = pf.with_range(file_range.start, file_range.end); + } + if let Some(proto_stats) = val.statistics.as_ref() { + pf = pf.with_statistics(Arc::new(proto_stats.try_into()?)); + } + Ok(pf) } } @@ -754,20 +752,13 @@ mod tests { #[test] fn partitioned_file_path_roundtrip_percent_encoded() { let path_str = "foo/foo%2Fbar/baz%252Fqux"; - let pf = PartitionedFile { - object_meta: ObjectMeta { - location: Path::parse(path_str).unwrap(), - last_modified: Utc.timestamp_nanos(1_000), - size: 42, - e_tag: None, - version: None, - }, - partition_values: vec![], - range: None, - statistics: None, - extensions: None, - metadata_size_hint: None, - }; + let pf = PartitionedFile::new_from_meta(ObjectMeta { + location: Path::parse(path_str).unwrap(), + last_modified: Utc.timestamp_nanos(1_000), + size: 42, + e_tag: None, + version: None, + }); let proto = protobuf::PartitionedFile::try_from(&pf).unwrap(); assert_eq!(proto.path, path_str); diff --git a/datafusion/substrait/src/physical_plan/consumer.rs b/datafusion/substrait/src/physical_plan/consumer.rs index ac0f26722513c..ccaf1abec4245 100644 --- a/datafusion/substrait/src/physical_plan/consumer.rs +++ b/datafusion/substrait/src/physical_plan/consumer.rs @@ -119,20 +119,14 @@ pub async fn from_substrait_rel( .unwrap(); let size = 0; - let partitioned_file = PartitionedFile { - object_meta: ObjectMeta { + let partitioned_file = + PartitionedFile::new_from_meta(ObjectMeta { last_modified: last_modified.into(), location: path.into(), size, e_tag: None, version: None, - }, - partition_values: vec![], - range: None, - statistics: None, - extensions: None, - metadata_size_hint: None, - }; + }); let part_index = file.partition_index as usize; while part_index >= file_groups.len() {