diff --git a/datafusion/core/tests/parquet/mod.rs b/datafusion/core/tests/parquet/mod.rs index 35b5918d9e8bf..44c9a2393e3d8 100644 --- a/datafusion/core/tests/parquet/mod.rs +++ b/datafusion/core/tests/parquet/mod.rs @@ -50,6 +50,7 @@ mod expr_adapter; mod external_access_plan; mod file_statistics; mod filter_pushdown; +mod ordering; mod page_pruning; mod row_group_pruning; mod schema; diff --git a/datafusion/core/tests/parquet/ordering.rs b/datafusion/core/tests/parquet/ordering.rs new file mode 100644 index 0000000000000..faecb4ca6a861 --- /dev/null +++ b/datafusion/core/tests/parquet/ordering.rs @@ -0,0 +1,103 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Tests for ordering in Parquet sorting_columns metadata + +use datafusion::prelude::SessionContext; +use datafusion_common::Result; +use tempfile::tempdir; + +/// Test that CREATE TABLE ... WITH ORDER writes sorting_columns to Parquet metadata +#[tokio::test] +async fn test_create_table_with_order_writes_sorting_columns() -> Result<()> { + use parquet::file::reader::FileReader; + use parquet::file::serialized_reader::SerializedFileReader; + use std::fs::File; + + let ctx = SessionContext::new(); + let tmp_dir = tempdir()?; + let table_path = tmp_dir.path().join("sorted_table"); + std::fs::create_dir_all(&table_path)?; + + // Create external table with ordering + let create_table_sql = format!( + "CREATE EXTERNAL TABLE sorted_data (a INT, b VARCHAR) \ + STORED AS PARQUET \ + LOCATION '{}' \ + WITH ORDER (a ASC NULLS FIRST, b DESC NULLS LAST)", + table_path.display() + ); + ctx.sql(&create_table_sql).await?; + + // Insert sorted data + ctx.sql("INSERT INTO sorted_data VALUES (1, 'x'), (2, 'y'), (3, 'z')") + .await? + .collect() + .await?; + + // Find the parquet file that was written + let parquet_files: Vec<_> = std::fs::read_dir(&table_path)? + .filter_map(|e| e.ok()) + .filter(|e| e.path().extension().is_some_and(|ext| ext == "parquet")) + .collect(); + + assert!( + !parquet_files.is_empty(), + "Expected at least one parquet file in {}", + table_path.display() + ); + + // Read the parquet file and verify sorting_columns metadata + let file = File::open(parquet_files[0].path())?; + let reader = SerializedFileReader::new(file)?; + let metadata = reader.metadata(); + + // Check that row group has sorting_columns + let row_group = metadata.row_group(0); + let sorting_columns = row_group.sorting_columns(); + + assert!( + sorting_columns.is_some(), + "Expected sorting_columns in row group metadata" + ); + let sorting = sorting_columns.unwrap(); + assert_eq!(sorting.len(), 2, "Expected 2 sorting columns"); + + // First column: a ASC NULLS FIRST (column_idx = 0) + assert_eq!(sorting[0].column_idx, 0, "First sort column should be 'a'"); + assert!( + !sorting[0].descending, + "First column should be ASC (descending=false)" + ); + assert!( + sorting[0].nulls_first, + "First column should have NULLS FIRST" + ); + + // Second column: b DESC NULLS LAST (column_idx = 1) + assert_eq!(sorting[1].column_idx, 1, "Second sort column should be 'b'"); + assert!( + sorting[1].descending, + "Second column should be DESC (descending=true)" + ); + assert!( + !sorting[1].nulls_first, + "Second column should have NULLS LAST" + ); + + Ok(()) +} diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index 5e482382be687..2109416d646fb 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -54,11 +54,11 @@ use datafusion_datasource::sink::{DataSink, DataSinkExec}; use datafusion_execution::memory_pool::{MemoryConsumer, MemoryPool, MemoryReservation}; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; use datafusion_expr::dml::InsertOp; -use datafusion_physical_expr_common::sort_expr::LexRequirement; +use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement}; use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan}; use datafusion_session::Session; -use crate::metadata::DFParquetMetadata; +use crate::metadata::{DFParquetMetadata, lex_ordering_to_sorting_columns}; use crate::reader::CachedParquetFileReaderFactory; use crate::source::{ParquetSource, parse_coerce_int96_string}; use async_trait::async_trait; @@ -81,7 +81,7 @@ use parquet::basic::Type; #[cfg(feature = "parquet_encryption")] use parquet::encryption::encrypt::FileEncryptionProperties; use parquet::errors::ParquetError; -use parquet::file::metadata::ParquetMetaData; +use parquet::file::metadata::{ParquetMetaData, SortingColumn}; use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder}; use parquet::file::writer::SerializedFileWriter; use parquet::schema::types::SchemaDescriptor; @@ -500,7 +500,22 @@ impl FileFormat for ParquetFormat { return not_impl_err!("Overwrites are not implemented yet for Parquet"); } - let sink = Arc::new(ParquetSink::new(conf, self.options.clone())); + // Convert ordering requirements to Parquet SortingColumns for file metadata + let sorting_columns = if let Some(ref requirements) = order_requirements { + let ordering: LexOrdering = requirements.clone().into(); + // In cases like `COPY (... ORDER BY ...) TO ...` the ORDER BY clause + // may not be compatible with Parquet sorting columns (e.g. ordering on `random()`). + // So if we cannot create a Parquet sorting column from the ordering requirement, + // we skip setting sorting columns on the Parquet sink. + lex_ordering_to_sorting_columns(&ordering).ok() + } else { + None + }; + + let sink = Arc::new( + ParquetSink::new(conf, self.options.clone()) + .with_sorting_columns(sorting_columns), + ); Ok(Arc::new(DataSinkExec::new(input, sink, order_requirements)) as _) } @@ -1088,6 +1103,8 @@ pub struct ParquetSink { /// File metadata from successfully produced parquet files. The Mutex is only used /// to allow inserting to HashMap from behind borrowed reference in DataSink::write_all. written: Arc>>, + /// Optional sorting columns to write to Parquet metadata + sorting_columns: Option>, } impl Debug for ParquetSink { @@ -1119,9 +1136,19 @@ impl ParquetSink { config, parquet_options, written: Default::default(), + sorting_columns: None, } } + /// Set sorting columns for the Parquet file metadata. + pub fn with_sorting_columns( + mut self, + sorting_columns: Option>, + ) -> Self { + self.sorting_columns = sorting_columns; + self + } + /// Retrieve the file metadata for the written files, keyed to the path /// which may be partitioned (in the case of hive style partitioning). pub fn written(&self) -> HashMap { @@ -1145,6 +1172,12 @@ impl ParquetSink { } let mut builder = WriterPropertiesBuilder::try_from(&parquet_opts)?; + + // Set sorting columns if configured + if let Some(ref sorting_columns) = self.sorting_columns { + builder = builder.set_sorting_columns(Some(sorting_columns.clone())); + } + builder = set_writer_encryption_properties( builder, runtime, diff --git a/datafusion/datasource-parquet/src/metadata.rs b/datafusion/datasource-parquet/src/metadata.rs index 8b11ba64ae7f1..13251101b131e 100644 --- a/datafusion/datasource-parquet/src/metadata.rs +++ b/datafusion/datasource-parquet/src/metadata.rs @@ -33,6 +33,8 @@ use datafusion_common::{ }; use datafusion_execution::cache::cache_manager::{FileMetadata, FileMetadataCache}; use datafusion_functions_aggregate_common::min_max::{MaxAccumulator, MinAccumulator}; +use datafusion_physical_expr::expressions::Column; +use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr}; use datafusion_physical_plan::Accumulator; use log::debug; use object_store::path::Path; @@ -41,6 +43,7 @@ use parquet::arrow::arrow_reader::statistics::StatisticsConverter; use parquet::arrow::{parquet_column, parquet_to_arrow_schema}; use parquet::file::metadata::{ PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader, RowGroupMetaData, + SortingColumn, }; use parquet::schema::types::SchemaDescriptor; use std::any::Any; @@ -613,6 +616,47 @@ impl FileMetadata for CachedParquetMetaData { } } +/// Convert a [`PhysicalSortExpr`] to a Parquet [`SortingColumn`]. +/// +/// Returns `Err` if the expression is not a simple column reference. +pub(crate) fn sort_expr_to_sorting_column( + sort_expr: &PhysicalSortExpr, +) -> Result { + let column = sort_expr + .expr + .as_any() + .downcast_ref::() + .ok_or_else(|| { + DataFusionError::Plan(format!( + "Parquet sorting_columns only supports simple column references, \ + but got expression: {}", + sort_expr.expr + )) + })?; + + let column_idx: i32 = column.index().try_into().map_err(|_| { + DataFusionError::Plan(format!( + "Column index {} is too large to be represented as i32", + column.index() + )) + })?; + + Ok(SortingColumn { + column_idx, + descending: sort_expr.options.descending, + nulls_first: sort_expr.options.nulls_first, + }) +} + +/// Convert a [`LexOrdering`] to `Vec` for Parquet. +/// +/// Returns `Err` if any expression is not a simple column reference. +pub(crate) fn lex_ordering_to_sorting_columns( + ordering: &LexOrdering, +) -> Result> { + ordering.iter().map(sort_expr_to_sorting_column).collect() +} + #[cfg(test)] mod tests { use super::*;