Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions datafusion/core/tests/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ mod expr_adapter;
mod external_access_plan;
mod file_statistics;
mod filter_pushdown;
mod ordering;
mod page_pruning;
mod row_group_pruning;
mod schema;
Expand Down
103 changes: 103 additions & 0 deletions datafusion/core/tests/parquet/ordering.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Tests for ordering in Parquet sorting_columns metadata

use datafusion::prelude::SessionContext;
use datafusion_common::Result;
use tempfile::tempdir;

/// Test that CREATE TABLE ... WITH ORDER writes sorting_columns to Parquet metadata
#[tokio::test]
async fn test_create_table_with_order_writes_sorting_columns() -> Result<()> {
use parquet::file::reader::FileReader;
use parquet::file::serialized_reader::SerializedFileReader;
use std::fs::File;

let ctx = SessionContext::new();
let tmp_dir = tempdir()?;
let table_path = tmp_dir.path().join("sorted_table");
std::fs::create_dir_all(&table_path)?;

// Create external table with ordering
let create_table_sql = format!(
"CREATE EXTERNAL TABLE sorted_data (a INT, b VARCHAR) \
STORED AS PARQUET \
LOCATION '{}' \
WITH ORDER (a ASC NULLS FIRST, b DESC NULLS LAST)",
table_path.display()
);
ctx.sql(&create_table_sql).await?;

// Insert sorted data
ctx.sql("INSERT INTO sorted_data VALUES (1, 'x'), (2, 'y'), (3, 'z')")
.await?
.collect()
.await?;

// Find the parquet file that was written
let parquet_files: Vec<_> = std::fs::read_dir(&table_path)?
.filter_map(|e| e.ok())
.filter(|e| e.path().extension().is_some_and(|ext| ext == "parquet"))
.collect();

assert!(
!parquet_files.is_empty(),
"Expected at least one parquet file in {}",
table_path.display()
);

// Read the parquet file and verify sorting_columns metadata
let file = File::open(parquet_files[0].path())?;
let reader = SerializedFileReader::new(file)?;
let metadata = reader.metadata();

// Check that row group has sorting_columns
let row_group = metadata.row_group(0);
let sorting_columns = row_group.sorting_columns();

assert!(
sorting_columns.is_some(),
"Expected sorting_columns in row group metadata"
);
let sorting = sorting_columns.unwrap();
assert_eq!(sorting.len(), 2, "Expected 2 sorting columns");

// First column: a ASC NULLS FIRST (column_idx = 0)
assert_eq!(sorting[0].column_idx, 0, "First sort column should be 'a'");
assert!(
!sorting[0].descending,
"First column should be ASC (descending=false)"
);
assert!(
sorting[0].nulls_first,
"First column should have NULLS FIRST"
);

// Second column: b DESC NULLS LAST (column_idx = 1)
assert_eq!(sorting[1].column_idx, 1, "Second sort column should be 'b'");
assert!(
sorting[1].descending,
"Second column should be DESC (descending=true)"
);
assert!(
!sorting[1].nulls_first,
"Second column should have NULLS LAST"
);

Ok(())
}
41 changes: 37 additions & 4 deletions datafusion/datasource-parquet/src/file_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@ use datafusion_datasource::sink::{DataSink, DataSinkExec};
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryPool, MemoryReservation};
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_expr::dml::InsertOp;
use datafusion_physical_expr_common::sort_expr::LexRequirement;
use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement};
use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan};
use datafusion_session::Session;

use crate::metadata::DFParquetMetadata;
use crate::metadata::{DFParquetMetadata, lex_ordering_to_sorting_columns};
use crate::reader::CachedParquetFileReaderFactory;
use crate::source::{ParquetSource, parse_coerce_int96_string};
use async_trait::async_trait;
Expand All @@ -81,7 +81,7 @@ use parquet::basic::Type;
#[cfg(feature = "parquet_encryption")]
use parquet::encryption::encrypt::FileEncryptionProperties;
use parquet::errors::ParquetError;
use parquet::file::metadata::ParquetMetaData;
use parquet::file::metadata::{ParquetMetaData, SortingColumn};
use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder};
use parquet::file::writer::SerializedFileWriter;
use parquet::schema::types::SchemaDescriptor;
Expand Down Expand Up @@ -500,7 +500,22 @@ impl FileFormat for ParquetFormat {
return not_impl_err!("Overwrites are not implemented yet for Parquet");
}

let sink = Arc::new(ParquetSink::new(conf, self.options.clone()));
// Convert ordering requirements to Parquet SortingColumns for file metadata
let sorting_columns = if let Some(ref requirements) = order_requirements {
let ordering: LexOrdering = requirements.clone().into();
// In cases like `COPY (... ORDER BY ...) TO ...` the ORDER BY clause
// may not be compatible with Parquet sorting columns (e.g. ordering on `random()`).
// So if we cannot create a Parquet sorting column from the ordering requirement,
// we skip setting sorting columns on the Parquet sink.
lex_ordering_to_sorting_columns(&ordering).ok()
} else {
None
};

let sink = Arc::new(
ParquetSink::new(conf, self.options.clone())
.with_sorting_columns(sorting_columns),
);

Ok(Arc::new(DataSinkExec::new(input, sink, order_requirements)) as _)
}
Expand Down Expand Up @@ -1088,6 +1103,8 @@ pub struct ParquetSink {
/// File metadata from successfully produced parquet files. The Mutex is only used
/// to allow inserting to HashMap from behind borrowed reference in DataSink::write_all.
written: Arc<parking_lot::Mutex<HashMap<Path, ParquetMetaData>>>,
/// Optional sorting columns to write to Parquet metadata
sorting_columns: Option<Vec<SortingColumn>>,
}

impl Debug for ParquetSink {
Expand Down Expand Up @@ -1119,9 +1136,19 @@ impl ParquetSink {
config,
parquet_options,
written: Default::default(),
sorting_columns: None,
}
}

/// Set sorting columns for the Parquet file metadata.
pub fn with_sorting_columns(
mut self,
sorting_columns: Option<Vec<SortingColumn>>,
) -> Self {
self.sorting_columns = sorting_columns;
self
}

/// Retrieve the file metadata for the written files, keyed to the path
/// which may be partitioned (in the case of hive style partitioning).
pub fn written(&self) -> HashMap<Path, ParquetMetaData> {
Expand All @@ -1145,6 +1172,12 @@ impl ParquetSink {
}

let mut builder = WriterPropertiesBuilder::try_from(&parquet_opts)?;

// Set sorting columns if configured
if let Some(ref sorting_columns) = self.sorting_columns {
builder = builder.set_sorting_columns(Some(sorting_columns.clone()));
}

builder = set_writer_encryption_properties(
builder,
runtime,
Expand Down
44 changes: 44 additions & 0 deletions datafusion/datasource-parquet/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ use datafusion_common::{
};
use datafusion_execution::cache::cache_manager::{FileMetadata, FileMetadataCache};
use datafusion_functions_aggregate_common::min_max::{MaxAccumulator, MinAccumulator};
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
use datafusion_physical_plan::Accumulator;
use log::debug;
use object_store::path::Path;
Expand All @@ -41,6 +43,7 @@ use parquet::arrow::arrow_reader::statistics::StatisticsConverter;
use parquet::arrow::{parquet_column, parquet_to_arrow_schema};
use parquet::file::metadata::{
PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader, RowGroupMetaData,
SortingColumn,
};
use parquet::schema::types::SchemaDescriptor;
use std::any::Any;
Expand Down Expand Up @@ -613,6 +616,47 @@ impl FileMetadata for CachedParquetMetaData {
}
}

/// Convert a [`PhysicalSortExpr`] to a Parquet [`SortingColumn`].
///
/// Returns `Err` if the expression is not a simple column reference.
pub(crate) fn sort_expr_to_sorting_column(
sort_expr: &PhysicalSortExpr,
) -> Result<SortingColumn> {
let column = sort_expr
.expr
.as_any()
.downcast_ref::<Column>()
.ok_or_else(|| {
DataFusionError::Plan(format!(
"Parquet sorting_columns only supports simple column references, \
but got expression: {}",
sort_expr.expr
))
})?;

let column_idx: i32 = column.index().try_into().map_err(|_| {
DataFusionError::Plan(format!(
"Column index {} is too large to be represented as i32",
column.index()
))
})?;

Ok(SortingColumn {
column_idx,
descending: sort_expr.options.descending,
nulls_first: sort_expr.options.nulls_first,
})
}

/// Convert a [`LexOrdering`] to `Vec<SortingColumn>` for Parquet.
///
/// Returns `Err` if any expression is not a simple column reference.
pub(crate) fn lex_ordering_to_sorting_columns(
ordering: &LexOrdering,
) -> Result<Vec<SortingColumn>> {
ordering.iter().map(sort_expr_to_sorting_column).collect()
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down