-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Support push-down of the filter on coalesce over join keys #18848
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
alamb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please also write an end to end test with sqllogictests?
Given that coalesce is rewritten to CASE during simplification
datafusion/datafusion/functions/src/core/coalesce.rs
Lines 97 to 122 in 1833093
| fn simplify( | |
| &self, | |
| args: Vec<Expr>, | |
| _info: &dyn SimplifyInfo, | |
| ) -> Result<ExprSimplifyResult> { | |
| if args.is_empty() { | |
| return plan_err!("coalesce must have at least one argument"); | |
| } | |
| if args.len() == 1 { | |
| return Ok(ExprSimplifyResult::Simplified( | |
| args.into_iter().next().unwrap(), | |
| )); | |
| } | |
| let n = args.len(); | |
| let (init, last_elem) = args.split_at(n - 1); | |
| let whens = init | |
| .iter() | |
| .map(|x| x.clone().is_not_null()) | |
| .collect::<Vec<_>>(); | |
| let cases = init.to_vec(); | |
| Ok(ExprSimplifyResult::Simplified( | |
| CaseBuilder::new(None, whens, cases, Some(Box::new(last_elem[0].clone()))) | |
| .end()?, | |
| )) | |
| } |
I suspect that this optimization may not be triggered when run end to end
|
|
||
| fn extract_join_columns(&self, expr: &Expr) -> Option<(Column, Column)> { | ||
| if let Expr::ScalarFunction(ScalarFunction { func, args }) = expr { | ||
| if func.name() != "coalesce" { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this seems very specific to coalesce and will likely break anyone who provides their own implementation of coalesce that overrides the built in one
Can we formualte this as some more general property of the function that allows pushing down? That way we could mark coalesce as having this property
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the fact that coalesce is basically identity function over kept side of the join allows such optimization...
I will add end-to-end test tomorrow. I have a problem on my data where I need this optimization, so I will try to reproduce it in end-to-end test.
|
Thank you for this contribution @vadimpiven |
| } | ||
| } | ||
|
|
||
| fn push_full_join_coalesce_filters( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to push filters into the inputs of a FULL JOIN , you need to guarantee that the join doens't reintroduce rows (with nulls) that would have been filtered if the filter was applied beforehand
In other words, it is not clear to me that this optimization is correct
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, forgot to change the name. I am using this optimization in my code specifically for chains (up to 50-table long) of FULL OUTER JOINs. I am making joins with a sequence join->project with coalesce over join keys -> alias, like:
let plan = LogicalPlanBuilder::scan("t1", scan.clone(), None)?
.join(
LogicalPlanBuilder::scan("t2", scan.clone(), None)?.build()?,
JoinType::Full,
(vec!["a"], vec!["a"]),
None,
)?
.project(vec![
coalesce(vec![col("t1.a"), col("t2.a")]).alias("a"),
col("t1.b").alias("b1"),
col("t2.b").alias("b2"),
])?
.alias("j1")?
.build()?;This way the initial data which looks like
{
"table1": {
"1": 100,
"2": 200,
"3": 300
},
"table2": {
"2": 2000,
"3": 3000,
"4": 4000
},
"table3": {
"3": 30000,
"4": 40000,
"5": 50000
}is joined into
| key | table1 | table2 | table3 |
|---|---|---|---|
| 1 | 100 | null | null |
| 2 | 200 | 2000 | null |
| 3 | 300 | 3000 | 30000 |
| 4 | null | 4000 | 40000 |
| 5 | null | null | 50000 |
instead of
| key1 | key2 | key3 | table1 | table2 | table3 |
|---|---|---|---|---|---|
| 1 | null | null | 100 | null | null |
| 2 | 2 | null | 200 | 2000 | null |
| 3 | 3 | 3 | 300 | 3000 | 30000 |
| null | 4 | 4 | null | 4000 | 40000 |
| null | null | 5 | null | null | 50000 |
You can check the illustration https://docs.platforma.bio/guides/vdj-analysis/diversity-analysis/#results-table where different sample properties are joined by Sample Id from different parquet files.
When I apply filter on Key what I effectively want is to replicate this filter to all input tables. And optimization that I provided does exactly that.
I am applying the chain join->project with coalesce over join keys -> alias for each new table, so for 50 tables I would have 49 projections with coalesce. Without my optimization, each optimizer pass has simplification which turns coalesce into CASE and then performs push-down which again turns case to coalesce. So 1 optimizer pass gives me propagation through 1 layer, and for 50 tables I would have to have 49 optimizer passes for full propagation. The optimization in this PR allows to optimize such scenario in 1 optimizer pass.
I realized that this optimization seems correct for any type of join if coalesce is applied to the join keys, so I do not have explicit check for FULL OUTER JOIN in proposed code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me know if you believe that nobody else has the scenario I described, this way we can simply close the PR and issue without further discussion)
Which issue does this PR close?
Rationale for this change
The standard PushDownFilter optimizer rule does not propagate filters on coalesce. But as far as I can tell, push-down of the filter on coalesce over join keys is correct. So I propose adding such optimization to DataFusion.
What changes are included in this PR?
Added test-case and PushDownCoalesceFilterHelper applying optimization.
Are these changes tested?
Yes, new test-case passes, old test-cases are not affected.
Are there any user-facing changes?
No user-facing changes.