diff --git a/Cargo.lock b/Cargo.lock index 22ec582536069..f1792bfe281ba 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2164,12 +2164,15 @@ dependencies = [ "async-trait", "bytes", "chrono", + "criterion", + "datafusion", "datafusion-common", "datafusion-common-runtime", "datafusion-datasource", "datafusion-execution", "datafusion-expr", "datafusion-functions-aggregate-common", + "datafusion-functions-nested", "datafusion-physical-expr", "datafusion-physical-expr-adapter", "datafusion-physical-expr-common", @@ -2182,6 +2185,7 @@ dependencies = [ "object_store", "parking_lot", "parquet", + "tempfile", "tokio", ] diff --git a/datafusion/datasource-parquet/Cargo.toml b/datafusion/datasource-parquet/Cargo.toml index a5f6f56ac6f33..13322a37dc8ff 100644 --- a/datafusion/datasource-parquet/Cargo.toml +++ b/datafusion/datasource-parquet/Cargo.toml @@ -56,6 +56,10 @@ tokio = { workspace = true } [dev-dependencies] chrono = { workspace = true } +criterion = { workspace = true } +datafusion = { workspace = true, default-features = true } +datafusion-functions-nested = { workspace = true } +tempfile = { workspace = true } # Note: add additional linter rules in lib.rs. # Rust does not support workspace + new linter rules in subcrates yet @@ -73,3 +77,7 @@ parquet_encryption = [ "datafusion-common/parquet_encryption", "datafusion-execution/parquet_encryption", ] + +[[bench]] +name = "parquet_nested_filter_pushdown" +harness = false diff --git a/datafusion/datasource-parquet/benches/parquet_nested_filter_pushdown.rs b/datafusion/datasource-parquet/benches/parquet_nested_filter_pushdown.rs new file mode 100644 index 0000000000000..31a3c222c69bd --- /dev/null +++ b/datafusion/datasource-parquet/benches/parquet_nested_filter_pushdown.rs @@ -0,0 +1,400 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Benchmark for Parquet nested list filter pushdown performance. +//! +//! This benchmark demonstrates the performance improvement of pushing down +//! filters on nested list columns (such as `array_has`, `array_has_all`) to +//! the Parquet decoder level, allowing row group skipping based on min/max +//! statistics. +//! +//! The benchmark creates a dataset with: +//! - 100K rows across 10 row groups (10K rows per group) +//! - A `List` column with sorted values (lexicographically ordered) +//! - A filter that matches only ~10% of row groups +//! +//! With pushdown enabled, ~90% of row groups can be skipped based on min/max +//! statistics, significantly reducing the rows that need to be decoded and +//! filtered. + +use arrow::array::{ArrayRef, ListArray, StringArray, UInt64Array}; +use arrow::buffer::{OffsetBuffer, ScalarBuffer}; +use arrow::datatypes::{DataType, Field, Schema}; +use arrow::record_batch::RecordBatch; +use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main}; +use datafusion::config::{ConfigOptions, SessionConfig}; +use datafusion::datasource::{file_scan_config::FileScanConfig, source::DataSourceExec}; +use datafusion::execution::context::SessionContext; +use datafusion::physical_plan::ExecutionPlan; +use datafusion::prelude::*; +use parquet::arrow::ArrowWriter; +use parquet::file::properties::WriterProperties; +use std::fs::File; +use std::hint::black_box; +use std::path::PathBuf; +use std::sync::Arc; +use tempfile::TempDir; +use tokio::runtime::{Builder, Runtime}; + +/// Configuration for the benchmark dataset +#[derive(Clone)] +struct BenchmarkConfig { + /// Total number of rows in the dataset + total_rows: usize, + /// Target number of rows per row group + rows_per_group: usize, + /// Selectivity: percentage of row groups that match the filter (0.0 to 1.0) + selectivity: f64, +} + +impl BenchmarkConfig { + fn num_row_groups(&self) -> usize { + (self.total_rows + self.rows_per_group - 1) / self.rows_per_group + } +} + +/// Generates test data with sorted List column +/// +/// Creates a dataset where list values are lexicographically sorted across +/// row groups, enabling effective min/max filtering. For example: +/// - Row group 0: lists containing "aaa" to "bbb" +/// - Row group 1: lists containing "bbc" to "ccc" +/// - Row group 2: lists containing "ccd" to "ddd" +/// - etc. +fn generate_sorted_list_data( + config: &BenchmarkConfig, + temp_dir: &TempDir, + target_value: &str, +) -> std::io::Result<(PathBuf, usize)> { + let file_path = temp_dir.path().join("data.parquet"); + + // Define the schema with a List column and an id column + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int64, false), + Field::new( + "list_col", + DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))), + true, + ), + ])); + + let file = File::create(&file_path)?; + + // Configure writer with explicit row group size + let props = WriterProperties::builder() + .set_max_row_group_size(config.rows_per_group) + .build(); + + let mut writer = ArrowWriter::try_new(file, schema.clone(), Some(props)) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; + + let num_groups = config.num_row_groups(); + let matching_groups = + ((num_groups as f64 * config.selectivity).ceil() as usize).clamp(1, num_groups); + let mut row_id = 0i64; + + // Generate row groups with sorted list values + for group_idx in 0..num_groups { + let should_match = group_idx < matching_groups; + let mut batch_ids = Vec::new(); + let mut all_values = Vec::new(); + let mut offsets = vec![0i32]; + + for local_idx in 0..config.rows_per_group { + // Add row ID + batch_ids.push(row_id); + row_id += 1; + + // Create lexicographically sorted values. Matching row groups contain the + // `target_value`, while non-matching groups use a higher prefix so the + // min/max range excludes the target. + let prefix = format!("g{:02}{}", group_idx, local_idx); + if should_match { + all_values.push(format!("{}_before", prefix)); + all_values.push(target_value.to_string()); + all_values.push(format!("{}_after", prefix)); + } else { + // Keep all values lexicographically greater than `target_value` to + // allow pushdown to skip these row groups when filtering by the + // target. + all_values.push(format!("zz{}_value_a", prefix)); + all_values.push(format!("zz{}_value_b", prefix)); + all_values.push(format!("zz{}_value_c", prefix)); + } + + offsets.push((offsets.last().unwrap() + 3) as i32); + } + + // Create arrays + let id_array = Arc::new(arrow::array::Int64Array::from_iter_values( + batch_ids.iter().copied(), + )) as ArrayRef; + + let values_array = + Arc::new(StringArray::from_iter_values(all_values.iter())) as ArrayRef; + + // Create offset buffer from scalar buffer + let scalar_buffer: ScalarBuffer = offsets.into(); + let offset_buffer = OffsetBuffer::new(scalar_buffer); + + let list_array = Arc::new(ListArray::new( + Arc::new(Field::new("item", DataType::Utf8, true)), + offset_buffer, + values_array, + None, + )) as ArrayRef; + + let batch = RecordBatch::try_new(schema.clone(), vec![id_array, list_array]) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; + + writer + .write(&batch) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; + } + + writer + .finish() + .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; + + Ok((file_path, matching_groups)) +} + +fn assert_scan_has_row_filter(plan: &Arc) { + let mut stack = vec![Arc::clone(plan)]; + + while let Some(plan) = stack.pop() { + if let Some(source_exec) = plan.as_any().downcast_ref::() { + if let Some(file_scan_config) = source_exec + .data_source() + .as_any() + .downcast_ref::() + { + assert!( + file_scan_config.file_source().filter().is_some(), + "Expected DataSourceExec to include a pushed-down row filter" + ); + return; + } + } + + stack.extend(plan.children().into_iter().cloned()); + } + + panic!("Expected physical plan to contain a DataSourceExec"); +} + +fn create_pushdown_context() -> SessionContext { + let mut config_options = ConfigOptions::new(); + config_options.execution.parquet.pushdown_filters = true; + config_options.execution.parquet.reorder_filters = true; + + let session_config = SessionConfig::new().with_options(config_options); + SessionContext::new_with_config(session_config) +} + +/// Benchmark for array_has filter with pushdown enabled +/// +/// This measures the performance of filtering using array_has when pushdown +/// is active. With selective filters, this should skip ~90% of row groups, +/// resulting in minimal row decoding. +fn benchmark_array_has_with_pushdown(c: &mut Criterion) { + let rt = build_runtime(); + let mut group = c.benchmark_group("parquet_array_has_pushdown"); + + // Test configuration: 100K rows, 10 row groups, selective filter (10% match) + let config = BenchmarkConfig { + total_rows: 100_000, + rows_per_group: 10_000, + selectivity: 0.1, // Only ~10% of row groups match the filter + }; + + let temp_dir = TempDir::new().expect("Failed to create temp directory"); + let (file_path, _) = generate_sorted_list_data(&config, &temp_dir, "aa0_value_a") + .expect("Failed to generate test data"); + + group.bench_function( + BenchmarkId::from_parameter(format!( + "rows={},selectivity={:.0}%", + config.total_rows, + config.selectivity * 100.0 + )), + |b| { + b.to_async(&rt).iter(|| async { + let ctx = create_pushdown_context(); + + // Register the parquet file + ctx.register_parquet( + "test_table", + file_path.to_str().unwrap(), + ParquetReadOptions::default(), + ) + .await + .expect("Failed to register parquet"); + + // Execute query with array_has filter + // This should demonstrate pushdown benefits for selective filters + let sql = + "SELECT * FROM test_table WHERE array_has(list_col, 'aa0_value_a')"; + let df = ctx.sql(sql).await.expect("Failed to create dataframe"); + + let plan = df + .create_physical_plan() + .await + .expect("Failed to create physical plan"); + assert_scan_has_row_filter(&plan); + + // Collect results to ensure full execution + let results = df.collect().await.expect("Failed to collect results"); + + black_box(results) + }); + }, + ); + + group.finish(); +} + +/// Benchmark comparing filter selectivity impact +/// +/// Demonstrates how different selectivity levels (percentage of matching +/// row groups) affect performance with pushdown enabled. +fn benchmark_selectivity_comparison(c: &mut Criterion) { + let rt = build_runtime(); + let mut group = c.benchmark_group("parquet_selectivity_impact"); + + let temp_dir = TempDir::new().expect("Failed to create temp directory"); + + // Pre-generate all test data. Each selectivity level targets a fraction of the + // ten row groups (rounded up), and the target value is injected into every row + // within those matching groups: + // - 10% => 1 matching row group => 10,000 matching rows + // - 30% => 3 matching row groups => 30,000 matching rows + // - 50% => 5 matching row groups => 50,000 matching rows + // - 90% => 9 matching row groups => 90,000 matching rows + let test_cases = vec![ + (0.1, "aa0_value_a"), + (0.3, "ac0_value_a"), + (0.5, "ae0_value_a"), + (0.9, "ai0_value_a"), + ]; + + for (selectivity, target_value) in test_cases { + let config = BenchmarkConfig { + total_rows: 100_000, + rows_per_group: 10_000, + selectivity, + }; + + let (file_path, matching_groups) = + generate_sorted_list_data(&config, &temp_dir, target_value) + .expect("Failed to generate test data"); + + // Validate that the generated data matches the expected selectivity so each + // benchmark run measures a different pushdown rate. + let expected_match_rows = matching_groups * config.rows_per_group; + validate_match_rate(&rt, file_path.clone(), target_value, expected_match_rows); + + group.bench_function( + BenchmarkId::from_parameter(format!( + "selectivity_{:.0}%", + selectivity * 100.0 + )), + |b| { + b.to_async(&rt).iter(|| async { + let ctx = create_pushdown_context(); + + ctx.register_parquet( + "test_table", + file_path.to_str().unwrap(), + ParquetReadOptions::default(), + ) + .await + .expect("Failed to register parquet"); + + // Use a filter that matches the selectivity level + let sql = format!( + "SELECT COUNT(*) FROM test_table WHERE array_has(list_col, '{}')", + target_value + ); + let df = ctx.sql(&sql).await.expect("Failed to create dataframe"); + + let plan = df + .create_physical_plan() + .await + .expect("Failed to create physical plan"); + assert_scan_has_row_filter(&plan); + + let results = df.collect().await.expect("Failed to collect"); + + black_box(results) + }); + }, + ); + } + + group.finish(); +} + +criterion_group!( + benches, + benchmark_array_has_with_pushdown, + benchmark_selectivity_comparison +); +criterion_main!(benches); + +fn validate_match_rate( + rt: &Runtime, + file_path: PathBuf, + target_value: &str, + expected_match_rows: usize, +) { + let actual_match_rows = rt.block_on(async { + let ctx = SessionContext::new(); + ctx.register_parquet( + "test_table", + file_path.to_str().unwrap(), + ParquetReadOptions::default(), + ) + .await + .expect("Failed to register parquet"); + + let sql = format!( + "SELECT COUNT(*) FROM test_table WHERE array_has(list_col, '{}')", + target_value + ); + let df = ctx.sql(&sql).await.expect("Failed to create dataframe"); + let results = df.collect().await.expect("Failed to collect"); + let count_array = results[0] + .column(0) + .as_any() + .downcast_ref::() + .expect("COUNT(*) should be UInt64"); + count_array.value(0) as usize + }); + + assert_eq!( + actual_match_rows, expected_match_rows, + "Generated data did not match expected selectivity" + ); +} + +fn build_runtime() -> Runtime { + Builder::new_current_thread() + .enable_all() + .build() + .expect("Failed to create runtime") +} diff --git a/datafusion/datasource-parquet/src/mod.rs b/datafusion/datasource-parquet/src/mod.rs index eb4cc9e9ad5a3..d7e92f70afa99 100644 --- a/datafusion/datasource-parquet/src/mod.rs +++ b/datafusion/datasource-parquet/src/mod.rs @@ -32,6 +32,7 @@ mod row_filter; mod row_group_filter; mod sort; pub mod source; +mod supported_predicates; mod writer; pub use access_plan::{ParquetAccessPlan, RowGroupAccess}; diff --git a/datafusion/datasource-parquet/src/row_filter.rs b/datafusion/datasource-parquet/src/row_filter.rs index ba3b29be40d74..d41008a9b47c3 100644 --- a/datafusion/datasource-parquet/src/row_filter.rs +++ b/datafusion/datasource-parquet/src/row_filter.rs @@ -58,6 +58,11 @@ //! 8. Build the `RowFilter` with the sorted predicates followed by //! the unsorted predicates. Within each partition, predicates are //! still be sorted by size. +//! +//! List-aware predicates (for example, `array_has`, `array_has_all`, and +//! `array_has_any`) can be evaluated directly during Parquet decoding. Struct +//! columns and other nested projections that are not explicitly supported will +//! continue to be evaluated after the batches are materialized. use std::cmp::Ordering; use std::collections::BTreeSet; @@ -70,6 +75,7 @@ use arrow::record_batch::RecordBatch; use parquet::arrow::ProjectionMask; use parquet::arrow::arrow_reader::{ArrowPredicate, RowFilter}; use parquet::file::metadata::ParquetMetaData; +use parquet::schema::types::SchemaDescriptor; use datafusion_common::Result; use datafusion_common::cast::as_boolean_array; @@ -81,6 +87,7 @@ use datafusion_physical_expr::{PhysicalExpr, split_conjunction}; use datafusion_physical_plan::metrics; use super::ParquetFileMetrics; +use super::supported_predicates::supports_list_predicates; /// A "compiled" predicate passed to `ParquetRecordBatchStream` to perform /// row-level filtering during parquet decoding. @@ -91,12 +98,14 @@ use super::ParquetFileMetrics; /// /// An expression can be evaluated as a `DatafusionArrowPredicate` if it: /// * Does not reference any projected columns -/// * Does not reference columns with non-primitive types (e.g. structs / lists) +/// * References either primitive columns or list columns used by +/// supported predicates (such as `array_has_all` or NULL checks). Struct +/// columns are still evaluated after decoding. #[derive(Debug)] pub(crate) struct DatafusionArrowPredicate { /// the filter expression physical_expr: Arc, - /// Path to the columns in the parquet schema required to evaluate the + /// Path to the leaf columns in the parquet schema required to evaluate the /// expression projection_mask: ProjectionMask, /// how many rows were filtered out by this predicate @@ -121,9 +130,12 @@ impl DatafusionArrowPredicate { Ok(Self { physical_expr, - projection_mask: ProjectionMask::roots( + // Use leaf indices: when nested columns are involved, we must specify + // leaf (primitive) column indices in the Parquet schema so the decoder + // can properly project and filter nested structures. + projection_mask: ProjectionMask::leaves( metadata.file_metadata().schema_descr(), - candidate.projection, + candidate.projection.leaf_indices.iter().copied(), ), rows_pruned, rows_matched, @@ -177,12 +189,19 @@ pub(crate) struct FilterCandidate { /// Can this filter use an index (e.g. a page index) to prune rows? can_use_index: bool, /// Column indices into the parquet file schema required to evaluate this filter. - projection: Vec, + projection: LeafProjection, /// The Arrow schema containing only the columns required by this filter, /// projected from the file's Arrow schema. filter_schema: SchemaRef, } +/// Tracks the projection of an expression in both root and leaf coordinates. +#[derive(Debug, Clone)] +struct LeafProjection { + /// Leaf column indices in the Parquet schema descriptor. + leaf_indices: Vec, +} + /// Helper to build a `FilterCandidate`. /// /// This will do several things: @@ -212,23 +231,29 @@ impl FilterCandidateBuilder { /// * `Ok(None)` if the expression cannot be used as an ArrowFilter /// * `Err(e)` if an error occurs while building the candidate pub fn build(self, metadata: &ParquetMetaData) -> Result> { - let Some(required_column_indices) = - pushdown_columns(&self.expr, &self.file_schema)? + let Some(required_columns) = pushdown_columns(&self.expr, &self.file_schema)? else { return Ok(None); }; - let projected_schema = - Arc::new(self.file_schema.project(&required_column_indices)?); + let root_indices: Vec<_> = + required_columns.required_columns.into_iter().collect(); + let leaf_indices = leaf_indices_for_roots( + &root_indices, + metadata.file_metadata().schema_descr(), + required_columns.nested, + ); - let required_bytes = size_of_columns(&required_column_indices, metadata)?; - let can_use_index = columns_sorted(&required_column_indices, metadata)?; + let projected_schema = Arc::new(self.file_schema.project(&root_indices)?); + + let required_bytes = size_of_columns(&leaf_indices, metadata)?; + let can_use_index = columns_sorted(&leaf_indices, metadata)?; Ok(Some(FilterCandidate { expr: self.expr, required_bytes, can_use_index, - projection: required_column_indices, + projection: LeafProjection { leaf_indices }, filter_schema: projected_schema, })) } @@ -238,7 +263,8 @@ impl FilterCandidateBuilder { /// prevent the expression from being pushed down to the parquet decoder. /// /// An expression cannot be pushed down if it references: -/// - Non-primitive columns (like structs or lists) +/// - Unsupported nested columns (structs or list fields that are not covered by +/// the supported predicate set) /// - Columns that don't exist in the file schema struct PushdownChecker<'schema> { /// Does the expression require any non-primitive columns (like structs)? @@ -247,34 +273,78 @@ struct PushdownChecker<'schema> { projected_columns: bool, /// Indices into the file schema of columns required to evaluate the expression. required_columns: BTreeSet, + /// Tracks the nested column behavior found during traversal. + nested_behavior: NestedColumnSupport, + /// Whether nested list columns are supported by the predicate semantics. + allow_list_columns: bool, /// The Arrow schema of the parquet file. file_schema: &'schema Schema, } impl<'schema> PushdownChecker<'schema> { - fn new(file_schema: &'schema Schema) -> Self { + fn new(file_schema: &'schema Schema, allow_list_columns: bool) -> Self { Self { non_primitive_columns: false, projected_columns: false, required_columns: BTreeSet::default(), + nested_behavior: NestedColumnSupport::PrimitiveOnly, + allow_list_columns, file_schema, } } fn check_single_column(&mut self, column_name: &str) -> Option { - if let Ok(idx) = self.file_schema.index_of(column_name) { - self.required_columns.insert(idx); - if DataType::is_nested(self.file_schema.field(idx).data_type()) { - self.non_primitive_columns = true; + let idx = match self.file_schema.index_of(column_name) { + Ok(idx) => idx, + Err(_) => { + // Column does not exist in the file schema, so we can't push this down. + self.projected_columns = true; return Some(TreeNodeRecursion::Jump); } + }; + + self.required_columns.insert(idx); + let data_type = self.file_schema.field(idx).data_type(); + + if DataType::is_nested(data_type) { + self.handle_nested_type(data_type) + } else { + None + } + } + + /// Determines whether a nested data type can be pushed down to Parquet decoding. + /// + /// Returns `Some(TreeNodeRecursion::Jump)` if the nested type prevents pushdown, + /// `None` if the type is supported and pushdown can continue. + fn handle_nested_type(&mut self, data_type: &DataType) -> Option { + if self.is_nested_type_supported(data_type) { + // Update to ListsSupported if we haven't encountered unsupported types yet + if self.nested_behavior == NestedColumnSupport::PrimitiveOnly { + self.nested_behavior = NestedColumnSupport::ListsSupported; + } + None } else { - // Column does not exist in the file schema, so we can't push this down. - self.projected_columns = true; - return Some(TreeNodeRecursion::Jump); + // Block pushdown for unsupported nested types: + // - Structs (regardless of predicate support) + // - Lists without supported predicates + self.nested_behavior = NestedColumnSupport::Unsupported; + self.non_primitive_columns = true; + Some(TreeNodeRecursion::Jump) } + } - None + /// Checks if a nested data type is supported for list column pushdown. + /// + /// List columns are only supported if: + /// 1. The data type is a list variant (List, LargeList, or FixedSizeList) + /// 2. The expression contains supported list predicates (e.g., array_has_all) + fn is_nested_type_supported(&self, data_type: &DataType) -> bool { + let is_list = matches!( + data_type, + DataType::List(_) | DataType::LargeList(_) | DataType::FixedSizeList(_, _) + ); + self.allow_list_columns && is_list } #[inline] @@ -297,34 +367,137 @@ impl TreeNodeVisitor<'_> for PushdownChecker<'_> { } } +/// Describes the nested column behavior for filter pushdown. +/// +/// This enum makes explicit the different states a predicate can be in +/// with respect to nested column handling during Parquet decoding. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum NestedColumnSupport { + /// Expression references only primitive (non-nested) columns. + /// These can always be pushed down to the Parquet decoder. + PrimitiveOnly, + /// Expression references list columns with supported predicates + /// (e.g., array_has, array_has_all, IS NULL). + /// These can be pushed down to the Parquet decoder. + ListsSupported, + /// Expression references unsupported nested types (e.g., structs) + /// or list columns without supported predicates. + /// These cannot be pushed down and must be evaluated after decoding. + Unsupported, +} + +#[derive(Debug)] +struct PushdownColumns { + required_columns: BTreeSet, + nested: NestedColumnSupport, +} + /// Checks if a given expression can be pushed down to the parquet decoder. /// -/// Returns `Some(column_indices)` if the expression can be pushed down, -/// where `column_indices` are the indices into the file schema of all columns +/// Returns `Some(PushdownColumns)` if the expression can be pushed down, +/// where the struct contains the indices into the file schema of all columns /// required to evaluate the expression. /// /// Returns `None` if the expression cannot be pushed down (e.g., references -/// non-primitive types or columns not in the file). +/// unsupported nested types or columns not in the file). fn pushdown_columns( expr: &Arc, file_schema: &Schema, -) -> Result>> { - let mut checker = PushdownChecker::new(file_schema); +) -> Result> { + let allow_list_columns = supports_list_predicates(expr); + let mut checker = PushdownChecker::new(file_schema, allow_list_columns); expr.visit(&mut checker)?; - Ok((!checker.prevents_pushdown()) - .then_some(checker.required_columns.into_iter().collect())) + Ok((!checker.prevents_pushdown()).then_some(PushdownColumns { + required_columns: checker.required_columns, + nested: checker.nested_behavior, + })) +} + +fn leaf_indices_for_roots( + root_indices: &[usize], + schema_descr: &SchemaDescriptor, + nested: NestedColumnSupport, +) -> Vec { + // For primitive-only columns, root indices ARE the leaf indices + if nested == NestedColumnSupport::PrimitiveOnly { + return root_indices.to_vec(); + } + + // For nested columns (lists or structs), we need to expand to all leaf columns + let root_set: BTreeSet<_> = root_indices.iter().copied().collect(); + + (0..schema_descr.num_columns()) + .filter(|leaf_idx| { + root_set.contains(&schema_descr.get_column_root_idx(*leaf_idx)) + }) + .collect() } /// Checks if a predicate expression can be pushed down to the parquet decoder. /// /// Returns `true` if all columns referenced by the expression: /// - Exist in the provided schema -/// - Are primitive types (not structs, lists, etc.) +/// - Are primitive types OR list columns with supported predicates +/// (e.g., `array_has`, `array_has_all`, `array_has_any`, IS NULL, IS NOT NULL) +/// - Struct columns are not supported and will prevent pushdown /// /// # Arguments /// * `expr` - The filter expression to check /// * `file_schema` - The Arrow schema of the parquet file (or table schema when /// the file schema is not yet available during planning) +/// +/// # Examples +/// +/// Primitive column filters can be pushed down: +/// ```ignore +/// use datafusion_expr::{col, Expr}; +/// use datafusion_common::ScalarValue; +/// use arrow::datatypes::{DataType, Field, Schema}; +/// use std::sync::Arc; +/// +/// let schema = Arc::new(Schema::new(vec![ +/// Field::new("age", DataType::Int32, false), +/// ])); +/// +/// // Primitive filter: can be pushed down +/// let expr = col("age").gt(Expr::Literal(ScalarValue::Int32(Some(30)), None)); +/// let expr = logical2physical(&expr, &schema); +/// assert!(can_expr_be_pushed_down_with_schemas(&expr, &schema)); +/// ``` +/// +/// Struct column filters cannot be pushed down: +/// ```ignore +/// use arrow::datatypes::Fields; +/// +/// let schema = Arc::new(Schema::new(vec![ +/// Field::new("person", DataType::Struct( +/// Fields::from(vec![Field::new("name", DataType::Utf8, true)]) +/// ), true), +/// ])); +/// +/// // Struct filter: cannot be pushed down +/// let expr = col("person").is_not_null(); +/// let expr = logical2physical(&expr, &schema); +/// assert!(!can_expr_be_pushed_down_with_schemas(&expr, &schema)); +/// ``` +/// +/// List column filters with supported predicates can be pushed down: +/// ```ignore +/// use datafusion_functions_nested::expr_fn::{array_has_all, make_array}; +/// +/// let schema = Arc::new(Schema::new(vec![ +/// Field::new("tags", DataType::List( +/// Arc::new(Field::new("item", DataType::Utf8, true)) +/// ), true), +/// ])); +/// +/// // Array filter with supported predicate: can be pushed down +/// let expr = array_has_all(col("tags"), make_array(vec![ +/// Expr::Literal(ScalarValue::Utf8(Some("rust".to_string())), None) +/// ])); +/// let expr = logical2physical(&expr, &schema); +/// assert!(can_expr_be_pushed_down_with_schemas(&expr, &schema)); +/// ``` pub fn can_expr_be_pushed_down_with_schemas( expr: &Arc, file_schema: &Schema, @@ -335,7 +508,7 @@ pub fn can_expr_be_pushed_down_with_schemas( } } -/// Calculate the total compressed size of all `Column`'s required for +/// Calculate the total compressed size of all leaf columns required for /// predicate `Expr`. /// /// This value represents the total amount of IO required to evaluate the @@ -464,21 +637,27 @@ mod test { use super::*; use datafusion_common::ScalarValue; + use arrow::array::{ListBuilder, StringBuilder}; use arrow::datatypes::{Field, TimeUnit::Nanosecond}; use datafusion_expr::{Expr, col}; + use datafusion_functions_nested::expr_fn::{ + array_has, array_has_all, array_has_any, make_array, + }; use datafusion_physical_expr::planner::logical2physical; use datafusion_physical_expr_adapter::{ DefaultPhysicalExprAdapterFactory, PhysicalExprAdapterFactory, }; - use datafusion_physical_plan::metrics::{Count, Time}; + use datafusion_physical_plan::metrics::{Count, ExecutionPlanMetricsSet, Time}; + use parquet::arrow::ArrowWriter; use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; use parquet::arrow::parquet_to_arrow_schema; use parquet::file::reader::{FileReader, SerializedFileReader}; + use tempfile::NamedTempFile; - // We should ignore predicate that read non-primitive columns + // List predicates used by the decoder should be accepted for pushdown #[test] - fn test_filter_candidate_builder_ignore_complex_types() { + fn test_filter_candidate_builder_supports_list_types() { let testdata = datafusion_common::test_util::parquet_test_data(); let file = std::fs::File::open(format!("{testdata}/list_columns.parquet")) .expect("opening file"); @@ -496,11 +675,16 @@ mod test { let table_schema = Arc::new(table_schema.clone()); + let list_index = table_schema + .index_of("int64_list") + .expect("list column should exist"); + let candidate = FilterCandidateBuilder::new(expr, table_schema) .build(metadata) - .expect("building candidate"); + .expect("building candidate") + .expect("list pushdown should be supported"); - assert!(candidate.is_none()); + assert_eq!(candidate.projection.leaf_indices, vec![list_index]); } #[test] @@ -590,14 +774,193 @@ mod test { } #[test] - fn nested_data_structures_prevent_pushdown() { + fn struct_data_structures_prevent_pushdown() { + let table_schema = Arc::new(Schema::new(vec![Field::new( + "struct_col", + DataType::Struct( + vec![Arc::new(Field::new("a", DataType::Int32, true))].into(), + ), + true, + )])); + + let expr = col("struct_col").is_not_null(); + let expr = logical2physical(&expr, &table_schema); + + assert!(!can_expr_be_pushed_down_with_schemas(&expr, &table_schema)); + } + + #[test] + fn mixed_primitive_and_struct_prevents_pushdown() { + // Even when a predicate contains both primitive and unsupported nested columns, + // the entire predicate should not be pushed down because the struct column + // cannot be evaluated during Parquet decoding. + let table_schema = Arc::new(Schema::new(vec![ + Field::new( + "struct_col", + DataType::Struct( + vec![Arc::new(Field::new("a", DataType::Int32, true))].into(), + ), + true, + ), + Field::new("int_col", DataType::Int32, false), + ])); + + // Expression: (struct_col IS NOT NULL) AND (int_col = 5) + // Even though int_col is primitive, the presence of struct_col in the + // conjunction should prevent pushdown of the entire expression. + let expr = col("struct_col") + .is_not_null() + .and(col("int_col").eq(Expr::Literal(ScalarValue::Int32(Some(5)), None))); + let expr = logical2physical(&expr, &table_schema); + + // The entire expression should not be pushed down + assert!(!can_expr_be_pushed_down_with_schemas(&expr, &table_schema)); + + // However, just the int_col predicate alone should be pushable + let expr_int_only = + col("int_col").eq(Expr::Literal(ScalarValue::Int32(Some(5)), None)); + let expr_int_only = logical2physical(&expr_int_only, &table_schema); + assert!(can_expr_be_pushed_down_with_schemas( + &expr_int_only, + &table_schema + )); + } + + #[test] + fn nested_lists_allow_pushdown_checks() { let table_schema = Arc::new(get_lists_table_schema()); let expr = col("utf8_list").is_not_null(); let expr = logical2physical(&expr, &table_schema); check_expression_can_evaluate_against_schema(&expr, &table_schema); - assert!(!can_expr_be_pushed_down_with_schemas(&expr, &table_schema)); + assert!(can_expr_be_pushed_down_with_schemas(&expr, &table_schema)); + } + + #[test] + fn array_has_all_pushdown_filters_rows() { + // Test array_has_all: checks if array contains all of ["c"] + // Rows with "c": row 1 and row 2 + let expr = array_has_all( + col("letters"), + make_array(vec![Expr::Literal( + ScalarValue::Utf8(Some("c".to_string())), + None, + )]), + ); + test_array_predicate_pushdown("array_has_all", expr, 1, 2); + } + + /// Helper function to test array predicate pushdown functionality. + /// + /// Creates a Parquet file with a list column, applies the given predicate, + /// and verifies that rows are correctly filtered during decoding. + fn test_array_predicate_pushdown( + func_name: &str, + predicate_expr: Expr, + expected_pruned: usize, + expected_matched: usize, + ) { + let item_field = Arc::new(Field::new("item", DataType::Utf8, true)); + let schema = Arc::new(Schema::new(vec![Field::new( + "letters", + DataType::List(item_field), + true, + )])); + + let mut builder = ListBuilder::new(StringBuilder::new()); + // Row 0: ["a", "b"] + builder.values().append_value("a"); + builder.values().append_value("b"); + builder.append(true); + + // Row 1: ["c"] + builder.values().append_value("c"); + builder.append(true); + + // Row 2: ["c", "d"] + builder.values().append_value("c"); + builder.values().append_value("d"); + builder.append(true); + + let batch = + RecordBatch::try_new(schema.clone(), vec![Arc::new(builder.finish())]) + .expect("record batch"); + + let file = NamedTempFile::new().expect("temp file"); + let mut writer = + ArrowWriter::try_new(file.reopen().unwrap(), schema, None).expect("writer"); + writer.write(&batch).expect("write batch"); + writer.close().expect("close writer"); + + let reader_file = file.reopen().expect("reopen file"); + let parquet_reader_builder = + ParquetRecordBatchReaderBuilder::try_new(reader_file) + .expect("reader builder"); + let metadata = parquet_reader_builder.metadata().clone(); + let file_schema = parquet_reader_builder.schema().clone(); + + let expr = logical2physical(&predicate_expr, &file_schema); + + let metrics = ExecutionPlanMetricsSet::new(); + let file_metrics = + ParquetFileMetrics::new(0, &format!("{func_name}.parquet"), &metrics); + + let row_filter = + build_row_filter(&expr, &file_schema, &metadata, false, &file_metrics) + .expect("building row filter") + .expect("row filter should exist"); + + let reader = parquet_reader_builder + .with_row_filter(row_filter) + .build() + .expect("build reader"); + + let mut total_rows = 0; + for batch in reader { + let batch = batch.expect("record batch"); + total_rows += batch.num_rows(); + } + + assert_eq!( + file_metrics.pushdown_rows_pruned.value(), + expected_pruned, + "{func_name}: expected {expected_pruned} pruned rows" + ); + assert_eq!( + file_metrics.pushdown_rows_matched.value(), + expected_matched, + "{func_name}: expected {expected_matched} matched rows" + ); + assert_eq!( + total_rows, expected_matched, + "{func_name}: expected {expected_matched} total rows" + ); + } + + #[test] + fn array_has_pushdown_filters_rows() { + // Test array_has: checks if "c" is in the array + // Rows with "c": row 1 and row 2 + let expr = array_has( + col("letters"), + Expr::Literal(ScalarValue::Utf8(Some("c".to_string())), None), + ); + test_array_predicate_pushdown("array_has", expr, 1, 2); + } + + #[test] + fn array_has_any_pushdown_filters_rows() { + // Test array_has_any: checks if array contains any of ["a", "d"] + // Row 0 has "a", row 2 has "d" - both should match + let expr = array_has_any( + col("letters"), + make_array(vec![ + Expr::Literal(ScalarValue::Utf8(Some("a".to_string())), None), + Expr::Literal(ScalarValue::Utf8(Some("d".to_string())), None), + ]), + ); + test_array_predicate_pushdown("array_has_any", expr, 1, 2); } #[test] diff --git a/datafusion/datasource-parquet/src/supported_predicates.rs b/datafusion/datasource-parquet/src/supported_predicates.rs new file mode 100644 index 0000000000000..a7ae1539dea30 --- /dev/null +++ b/datafusion/datasource-parquet/src/supported_predicates.rs @@ -0,0 +1,140 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Registry of physical expressions that support nested list column pushdown +//! to the Parquet decoder. +//! +//! This module provides a trait-based approach for determining which predicates +//! can be safely evaluated on nested list columns during Parquet decoding. + +use std::sync::Arc; + +use datafusion_physical_expr::expressions::{IsNotNullExpr, IsNullExpr}; +use datafusion_physical_expr::{PhysicalExpr, ScalarFunctionExpr}; + +/// Trait for physical expressions that support list column pushdown during +/// Parquet decoding. +/// +/// This trait provides a type-safe mechanism for identifying expressions that +/// can be safely pushed down to the Parquet decoder for evaluation on nested +/// list columns. +/// +/// # Implementation Notes +/// +/// Expression types in external crates cannot directly implement this trait +/// due to Rust's orphan rules. Instead, we use a blanket implementation that +/// delegates to a registration mechanism. +/// +/// # Examples +/// +/// ```ignore +/// use datafusion_physical_expr::PhysicalExpr; +/// use datafusion_datasource_parquet::SupportsListPushdown; +/// +/// let expr: Arc = ...; +/// if expr.supports_list_pushdown() { +/// // Can safely push down to Parquet decoder +/// } +/// ``` +pub trait SupportsListPushdown { + /// Returns `true` if this expression supports list column pushdown. + fn supports_list_pushdown(&self) -> bool; +} + +/// Blanket implementation for all physical expressions. +/// +/// This delegates to specialized predicates that check whether the concrete +/// expression type is registered as supporting list pushdown. This design +/// allows the trait to work with expression types defined in external crates. +impl SupportsListPushdown for dyn PhysicalExpr { + fn supports_list_pushdown(&self) -> bool { + is_null_check(self) || is_supported_scalar_function(self) + } +} + +/// Checks if an expression is a NULL or NOT NULL check. +/// +/// These checks are universally supported for all column types. +fn is_null_check(expr: &dyn PhysicalExpr) -> bool { + expr.as_any().downcast_ref::().is_some() + || expr.as_any().downcast_ref::().is_some() +} + +/// Checks if an expression is a scalar function registered for list pushdown. +/// +/// Returns `true` if the expression is a `ScalarFunctionExpr` whose function +/// is in the registry of supported operations. +fn is_supported_scalar_function(expr: &dyn PhysicalExpr) -> bool { + expr.as_any() + .downcast_ref::() + .is_some_and(|fun| { + // Registry of verified array functions + matches!(fun.name(), "array_has" | "array_has_all" | "array_has_any") + }) +} + +/// Checks whether the given physical expression contains a supported nested +/// predicate (for example, `array_has_all`). +/// +/// This function recursively traverses the expression tree to determine if +/// any node contains predicates that support list column pushdown to the +/// Parquet decoder. +/// +/// # Supported predicates +/// +/// - `IS NULL` and `IS NOT NULL` checks on any column type +/// - Array functions: `array_has`, `array_has_all`, `array_has_any` +/// +/// # Returns +/// +/// `true` if the expression or any of its children contain supported predicates. +pub fn supports_list_predicates(expr: &Arc) -> bool { + expr.supports_list_pushdown() + || expr + .children() + .iter() + .any(|child| supports_list_predicates(child)) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_null_check_detection() { + use datafusion_physical_expr::expressions::Column; + + let col_expr: Arc = Arc::new(Column::new("test", 0)); + assert!(!is_null_check(col_expr.as_ref())); + + // IsNullExpr and IsNotNullExpr detection requires actual instances + // which need schema setup - tested in integration tests + } + + #[test] + fn test_supported_scalar_functions() { + use datafusion_physical_expr::expressions::Column; + + let col_expr: Arc = Arc::new(Column::new("test", 0)); + + // Non-function expressions should return false + assert!(!is_supported_scalar_function(col_expr.as_ref())); + + // Testing with actual ScalarFunctionExpr requires function setup + // and is better suited for integration tests + } +} diff --git a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt index 8bb79d576990e..aa94e2e2f2c04 100644 --- a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt +++ b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt @@ -563,3 +563,114 @@ ORDER BY start_timestamp, trace_id LIMIT 1; ---- 2024-10-01T00:00:00 + +### +# Array function predicate pushdown tests +# These tests verify that array_has, array_has_all, and array_has_any predicates +# are correctly pushed down to the DataSourceExec node +### + +# Create test data with array columns +statement ok +COPY ( + SELECT 1 as id, ['rust', 'performance'] as tags + UNION ALL + SELECT 2 as id, ['python', 'javascript'] as tags + UNION ALL + SELECT 3 as id, ['rust', 'webassembly'] as tags +) +TO 'test_files/scratch/parquet_filter_pushdown/array_data/data.parquet'; + +statement ok +CREATE EXTERNAL TABLE array_test STORED AS PARQUET LOCATION 'test_files/scratch/parquet_filter_pushdown/array_data/'; + +statement ok +SET datafusion.execution.parquet.pushdown_filters = true; + +# Test array_has predicate pushdown +query I? +SELECT id, tags FROM array_test WHERE array_has(tags, 'rust') ORDER BY id; +---- +1 [rust, performance] +3 [rust, webassembly] + +query TT +EXPLAIN SELECT id, tags FROM array_test WHERE array_has(tags, 'rust') ORDER BY id; +---- +logical_plan +01)Sort: array_test.id ASC NULLS LAST +02)--Filter: array_has(array_test.tags, Utf8("rust")) +03)----TableScan: array_test projection=[id, tags], partial_filters=[array_has(array_test.tags, Utf8("rust"))] +physical_plan +01)SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/array_data/data.parquet]]}, projection=[id, tags], file_type=parquet, predicate=array_has(tags@1, rust) + +# Test array_has_all predicate pushdown +query I? +SELECT id, tags FROM array_test WHERE array_has_all(tags, ['rust', 'performance']) ORDER BY id; +---- +1 [rust, performance] + +query TT +EXPLAIN SELECT id, tags FROM array_test WHERE array_has_all(tags, ['rust', 'performance']) ORDER BY id; +---- +logical_plan +01)Sort: array_test.id ASC NULLS LAST +02)--Filter: array_has_all(array_test.tags, List([rust, performance])) +03)----TableScan: array_test projection=[id, tags], partial_filters=[array_has_all(array_test.tags, List([rust, performance]))] +physical_plan +01)SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/array_data/data.parquet]]}, projection=[id, tags], file_type=parquet, predicate=array_has_all(tags@1, [rust, performance]) + +# Test array_has_any predicate pushdown +query I? +SELECT id, tags FROM array_test WHERE array_has_any(tags, ['python', 'go']) ORDER BY id; +---- +2 [python, javascript] + +query TT +EXPLAIN SELECT id, tags FROM array_test WHERE array_has_any(tags, ['python', 'go']) ORDER BY id; +---- +logical_plan +01)Sort: array_test.id ASC NULLS LAST +02)--Filter: array_has_any(array_test.tags, List([python, go])) +03)----TableScan: array_test projection=[id, tags], partial_filters=[array_has_any(array_test.tags, List([python, go]))] +physical_plan +01)SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/array_data/data.parquet]]}, projection=[id, tags], file_type=parquet, predicate=array_has_any(tags@1, [python, go]) + +# Test complex predicate with OR +query I? +SELECT id, tags FROM array_test WHERE array_has_all(tags, ['rust']) OR array_has_any(tags, ['python', 'go']) ORDER BY id; +---- +1 [rust, performance] +2 [python, javascript] +3 [rust, webassembly] + +query TT +EXPLAIN SELECT id, tags FROM array_test WHERE array_has_all(tags, ['rust']) OR array_has_any(tags, ['python', 'go']) ORDER BY id; +---- +logical_plan +01)Sort: array_test.id ASC NULLS LAST +02)--Filter: array_has_all(array_test.tags, List([rust])) OR array_has_any(array_test.tags, List([python, go])) +03)----TableScan: array_test projection=[id, tags], partial_filters=[array_has_all(array_test.tags, List([rust])) OR array_has_any(array_test.tags, List([python, go]))] +physical_plan +01)SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/array_data/data.parquet]]}, projection=[id, tags], file_type=parquet, predicate=array_has_all(tags@1, [rust]) OR array_has_any(tags@1, [python, go]) + +# Test array function with other predicates +query I? +SELECT id, tags FROM array_test WHERE id > 1 AND array_has(tags, 'rust') ORDER BY id; +---- +3 [rust, webassembly] + +query TT +EXPLAIN SELECT id, tags FROM array_test WHERE id > 1 AND array_has(tags, 'rust') ORDER BY id; +---- +logical_plan +01)Sort: array_test.id ASC NULLS LAST +02)--Filter: array_test.id > Int64(1) AND array_has(array_test.tags, Utf8("rust")) +03)----TableScan: array_test projection=[id, tags], partial_filters=[array_test.id > Int64(1), array_has(array_test.tags, Utf8("rust"))] +physical_plan +01)SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/array_data/data.parquet]]}, projection=[id, tags], file_type=parquet, predicate=id@0 > 1 AND array_has(tags@1, rust), pruning_predicate=id_null_count@1 != row_count@2 AND id_max@0 > 1, required_guarantees=[]