Skip to content

Commit 1353a6f

Browse files
committed
Prevent exponential planning time for Window functions
Before the change, the planning time was exponential with respect to number of columns used in window partitioning clause. This is a stop-gap solution to avoid exponential planning time.
1 parent e2c2d38 commit 1353a6f

File tree

5 files changed

+76
-60
lines changed

5 files changed

+76
-60
lines changed

datafusion/core/tests/physical_optimizer/enforce_sorting.rs

Lines changed: 27 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2696,8 +2696,9 @@ async fn test_window_partial_constant_and_set_monotonicity() -> Result<()> {
26962696
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC NULLS LAST], file_type=parquet",
26972697
],
26982698
expected_plan: vec![
2699-
"WindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)), is_causal: false }]",
2700-
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC NULLS LAST], file_type=parquet",
2699+
"SortExec: expr=[nullable_col@0 ASC NULLS LAST, count@2 ASC NULLS LAST], preserve_partitioning=[false]",
2700+
" WindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)), is_causal: false }]",
2701+
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC NULLS LAST], file_type=parquet",
27012702
],
27022703
},
27032704
// Case 9:
@@ -2712,8 +2713,9 @@ async fn test_window_partial_constant_and_set_monotonicity() -> Result<()> {
27122713
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC NULLS LAST], file_type=parquet",
27132714
],
27142715
expected_plan: vec![
2715-
"WindowAggExec: wdw=[max: Ok(Field { name: \"max\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)), is_causal: false }]",
2716-
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC NULLS LAST], file_type=parquet",
2716+
"SortExec: expr=[nullable_col@0 ASC NULLS LAST, max@2 DESC NULLS LAST], preserve_partitioning=[false]",
2717+
" WindowAggExec: wdw=[max: Ok(Field { name: \"max\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)), is_causal: false }]",
2718+
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC NULLS LAST], file_type=parquet",
27172719
],
27182720
},
27192721
// Case 10:
@@ -2978,8 +2980,9 @@ async fn test_window_partial_constant_and_set_monotonicity() -> Result<()> {
29782980
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC NULLS LAST], file_type=parquet",
29792981
],
29802982
expected_plan: vec![
2981-
"WindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: CurrentRow, end_bound: Following(UInt64(NULL)), is_causal: false }]",
2982-
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC NULLS LAST], file_type=parquet",
2983+
"SortExec: expr=[nullable_col@0 ASC NULLS LAST, count@2 DESC NULLS LAST], preserve_partitioning=[false]",
2984+
" WindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: CurrentRow, end_bound: Following(UInt64(NULL)), is_causal: false }]",
2985+
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC NULLS LAST], file_type=parquet",
29832986
],
29842987
},
29852988
// Case 25:
@@ -3066,8 +3069,9 @@ async fn test_window_partial_constant_and_set_monotonicity() -> Result<()> {
30663069
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC NULLS LAST], file_type=parquet",
30673070
],
30683071
expected_plan: vec![
3069-
"WindowAggExec: wdw=[max: Ok(Field { name: \"max\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: CurrentRow, end_bound: Following(UInt64(NULL)), is_causal: false }]",
3070-
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC NULLS LAST], file_type=parquet",
3072+
"SortExec: expr=[nullable_col@0 ASC NULLS LAST, max@2 DESC], preserve_partitioning=[false]",
3073+
" WindowAggExec: wdw=[max: Ok(Field { name: \"max\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: CurrentRow, end_bound: Following(UInt64(NULL)), is_causal: false }]",
3074+
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC NULLS LAST], file_type=parquet",
30713075
],
30723076
},
30733077
// Case 30:
@@ -3260,8 +3264,9 @@ async fn test_window_partial_constant_and_set_monotonicity() -> Result<()> {
32603264
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC NULLS LAST], file_type=parquet",
32613265
],
32623266
expected_plan: vec![
3263-
"BoundedWindowAggExec: wdw=[count: Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted]",
3264-
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC NULLS LAST], file_type=parquet",
3267+
"SortExec: expr=[nullable_col@0 ASC NULLS LAST, count@2 ASC NULLS LAST], preserve_partitioning=[false]",
3268+
" BoundedWindowAggExec: wdw=[count: Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted]",
3269+
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC NULLS LAST], file_type=parquet",
32653270
],
32663271
},
32673272
// Case 41:
@@ -3364,8 +3369,9 @@ async fn test_window_partial_constant_and_set_monotonicity() -> Result<()> {
33643369
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC NULLS LAST], file_type=parquet",
33653370
],
33663371
expected_plan: vec![
3367-
"BoundedWindowAggExec: wdw=[min: Field { name: \"min\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted]",
3368-
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC NULLS LAST], file_type=parquet",
3372+
"SortExec: expr=[nullable_col@0 ASC NULLS LAST, min@2 DESC NULLS LAST], preserve_partitioning=[false]",
3373+
" BoundedWindowAggExec: wdw=[min: Field { name: \"min\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted]",
3374+
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC NULLS LAST], file_type=parquet",
33693375
],
33703376
},
33713377
// Case 47:
@@ -3400,8 +3406,9 @@ async fn test_window_partial_constant_and_set_monotonicity() -> Result<()> {
34003406
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC NULLS LAST], file_type=parquet",
34013407
],
34023408
expected_plan: vec![
3403-
"BoundedWindowAggExec: wdw=[count: Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: ROWS BETWEEN 1 PRECEDING AND CURRENT ROW], mode=[Sorted]",
3404-
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC NULLS LAST], file_type=parquet",
3409+
"SortExec: expr=[count@2 ASC NULLS LAST, nullable_col@0 ASC NULLS LAST], preserve_partitioning=[false]",
3410+
" BoundedWindowAggExec: wdw=[count: Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: ROWS BETWEEN 1 PRECEDING AND CURRENT ROW], mode=[Sorted]",
3411+
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC NULLS LAST], file_type=parquet",
34053412
],
34063413
},
34073414
// Case 49:
@@ -3433,8 +3440,9 @@ async fn test_window_partial_constant_and_set_monotonicity() -> Result<()> {
34333440
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC NULLS LAST], file_type=parquet",
34343441
],
34353442
expected_plan: vec![
3436-
"BoundedWindowAggExec: wdw=[min: Field { name: \"min\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: ROWS BETWEEN 1 PRECEDING AND CURRENT ROW], mode=[Sorted]",
3437-
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC NULLS LAST], file_type=parquet",
3443+
"SortExec: expr=[nullable_col@0 ASC NULLS LAST, min@2 DESC NULLS LAST], preserve_partitioning=[false]",
3444+
" BoundedWindowAggExec: wdw=[min: Field { name: \"min\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: ROWS BETWEEN 1 PRECEDING AND CURRENT ROW], mode=[Sorted]",
3445+
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC NULLS LAST], file_type=parquet",
34383446
],
34393447
},
34403448
// Case 51:
@@ -3541,8 +3549,9 @@ async fn test_window_partial_constant_and_set_monotonicity() -> Result<()> {
35413549
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC NULLS LAST], file_type=parquet",
35423550
],
35433551
expected_plan: vec![
3544-
"BoundedWindowAggExec: wdw=[count: Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: ROWS BETWEEN 1 PRECEDING AND CURRENT ROW], mode=[Sorted]",
3545-
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC NULLS LAST], file_type=parquet",
3552+
"SortExec: expr=[count@2 ASC NULLS LAST, nullable_col@0 ASC NULLS LAST], preserve_partitioning=[false]",
3553+
" BoundedWindowAggExec: wdw=[count: Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: ROWS BETWEEN 1 PRECEDING AND CURRENT ROW], mode=[Sorted]",
3554+
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC NULLS LAST], file_type=parquet",
35463555
],
35473556
},
35483557
// Case 57:

datafusion/datasource-parquet/src/opener.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,7 @@ impl FileOpener for ParquetOpener {
154154
let encryption_context = self.get_encryption_context();
155155

156156
Ok(Box::pin(async move {
157+
#[cfg(feature = "parquet_encryption")]
157158
let file_decryption_properties = encryption_context
158159
.get_file_decryption_properties(&file_location)
159160
.await?;

datafusion/physical-plan/src/windows/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -636,7 +636,8 @@ pub fn get_window_mode(
636636

637637
fn sort_options_resolving_constant(expr: Arc<dyn PhysicalExpr>) -> Vec<PhysicalSortExpr> {
638638
vec![
639-
PhysicalSortExpr::new(Arc::clone(&expr), SortOptions::new(false, false)),
639+
// TODO restore while avoiding exponential planning time
640+
//PhysicalSortExpr::new(Arc::clone(&expr), SortOptions::new(false, false)),
640641
PhysicalSortExpr::new(expr, SortOptions::new(true, true)),
641642
]
642643
}

datafusion/sqllogictest/test_files/projection.slt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,7 @@ physical_plan
253253
statement ok
254254
drop table t;
255255

256-
# Regression test for
256+
# Regression test for
257257
# https://github.com/apache/datafusion/issues/17513
258258

259259
query I

0 commit comments

Comments
 (0)