-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Description
Is your feature request related to a problem or challenge?
I have a scenario when I want to perform a series of outer joins on the same key. To achieve that I apply coalesce on the join keys:
SELECT coalesce(coalesce(t1.a, t2.a), t3.a) as a
FROM t1
FULL OUTER JOIN t2 ON t1.a = t2.a
FULL OUTER JOIN t3 ON coalesce(t1.a, t2.a) = t3.aThe problems begin when I try to filter this table:
SELECT coalesce(coalesce(t1.a, t2.a), t3.a) as a
FROM t1
FULL OUTER JOIN t2 ON t1.a = t2.a
FULL OUTER JOIN t3 ON coalesce(t1.a, t2.a) = t3.a
WHERE coalesce(coalesce(t1.a, t2.a), t3.a) < 10The standard PushDownFilter optimizer rule does not propagate filters on coalesce. But as far as I can tell, push-down of the filter on coalesce over join keys is correct. So I propose adding such optimization to DataFusion.
Describe the solution you'd like
I believe PushDownFilter optimizer rule could be extended to support push-down of the filter on coalesce over join keys to the respective sides of the join.
Describe alternatives you've considered
I have developed an optimizer rule covering my narrow use-case. It would be great if someone could generalize it and add to the DataFusion codebase.
Here goes the code (with documentation and tests)
//! Coalesce-aware filter push-down optimization for FULL OUTER JOINs.
//!
//! This rule extends `DataFusion`'s standard filter push-down by handling filters on
//! `coalesce` expressions that combine join keys from FULL OUTER JOINs.
use std::sync::Arc;
use datafusion::{
common::{Column, Result, tree_node::Transformed},
functions::core::coalesce::CoalesceFunc,
logical_expr::{
BinaryExpr, Expr, Filter, LogicalPlan,
expr::{Between, InList, ScalarFunction},
utils::{conjunction, split_conjunction_owned},
},
optimizer::{
ApplyOrder, OptimizerConfig, OptimizerRule, push_down_filter::PushDownFilter,
simplify_expressions::simplify_predicates,
},
};
/// Optimizer rule for pushing filters through `coalesce` expressions in FULL OUTER JOINs.
///
/// # Introduction
///
/// `DataFusion`'s standard [`PushDownFilter`] rule cannot push filters through `coalesce`
/// expressions because doing so is generally incorrect. However, in FULL OUTER JOINs,
/// `coalesce` is used to combine matching join keys (e.g., `coalesce(left.id, right.id)`)
/// where the columns either have identical values or one is NULL. In this specific case,
/// it is safe to decompose a filter on the coalesce into separate filters on each input.
///
/// This rule enables partition pruning and early data elimination by pushing such filters
/// directly to both sides of the join.
///
/// For example, given a FULL JOIN with a coalesced join key:
///
/// ```text
/// Filter: coalesce(left.id, right.id) = 1
/// Projection: coalesce(left.id, right.id) AS id
/// Full Join: left.id = right.id
/// TableScan: left
/// TableScan: right
/// ```
///
/// This rule decomposes the filter on the coalesced key into separate filters for each side:
///
/// ```text
/// Projection: coalesce(left.id, right.id) AS id
/// Full Join: left.id = right.id
/// Filter: left.id = 1
/// TableScan: left
/// Filter: right.id = 1
/// TableScan: right
/// ```
///
/// `DataFusion`'s standard [`PushDownFilter`] can then push these filters further down
/// through projections and unions to eliminate partitions that don't match.
///
/// # Handling Projections and Aliases
///
/// Filters are often applied to aliased columns from projections rather than directly
/// to the join output. The rule traverses through projections and subquery aliases to
/// find the underlying FULL JOIN:
///
/// ```text
/// Filter: j1.id = 1
/// SubqueryAlias: j1
/// Projection: coalesce(left.id, right.id) AS id
/// Full Join: left.id = right.id
/// ```
///
/// The rule builds a projection map to resolve column references and then reconstructs
/// the plan with filters inserted at the join level.
///
/// # Handling Nested Coalesce
///
/// Multi-level FULL JOINs produce nested coalesce expressions. For example:
///
/// ```text
/// coalesce(coalesce(a.id, b.id), c.id)
/// ```
///
/// The rule recursively flattens these using the projection map, expanding all nested
/// references in a single pass:
///
/// ```text
/// Filters applied: a.id = 1, b.id = 1, c.id = 1
/// ```
///
/// # Handling AND Conjunctions
///
/// Like [`PushDownFilter`], this rule handles AND-connected filters (conjunctions).
/// Each conjunction term is processed independently:
///
/// ```text
/// Filter: coalesce(left.id, right.id) = 1 AND left.val = 2
/// Full Join: left.id = right.id
/// ```
///
/// The coalesce filter is decomposed and pushed, while the other filter is left for
/// `DataFusion`'s standard [`PushDownFilter`] to handle:
///
/// ```text
/// Full Join: left.id = right.id
/// Filter: left.id = 1 AND left.val = 2
/// TableScan: left
/// Filter: right.id = 1
/// TableScan: right
/// ```
///
/// # Safety: OR Filters and Non-FULL JOINs
///
/// The rule only processes AND-connected filters containing coalesce expressions on
/// FULL JOINs. OR filters are left unchanged because splitting them would be incorrect:
///
/// ```text
/// Filter: coalesce(left.id, right.id) = 1 OR left.val = 2
/// Full Join
/// ```
///
/// This filter is not touched by this rule and is left for `DataFusion`'s standard
/// optimizer to push through the join (rewriting the coalesce as needed).
///
/// Similarly, filters on INNER, LEFT, or RIGHT joins are not processed because
/// coalesce semantics differ for those join types.
///
/// # Implementation Notes
///
/// This rule performs a top-down traversal, identifying Filter nodes with coalesce
/// expressions. For each such filter:
///
/// 1. Build a projection map to resolve column aliases
/// 2. Traverse through projections and aliases to find the FULL JOIN
/// 3. Decompose coalesce-based filter terms into per-side filters
/// 4. Attach the filters to both join inputs
/// 5. Reconstruct the plan with the same projection/alias structure
/// 6. Feed the result back into `DataFusion`'s [`PushDownFilter`] via `transform_data`
/// for continued optimization
///
/// [`PushDownFilter`]: datafusion::optimizer::push_down_filter::PushDownFilter
#[derive(Debug, Default)]
pub struct CoalesceAwarePushDownFilter {
push_down_filter: PushDownFilter,
}
impl CoalesceAwarePushDownFilter {
/// Creates new [`CoalesceAwarePushDownFilter`] optimizer rule.
#[must_use]
pub fn new() -> Self {
Self::default()
}
}
/// Tracks filters scheduled for each side of the join with basic deduplication.
struct PushDownCoalesceFilterHelper {
join_keys: Vec<(Column, Column)>,
left_filters: Vec<Expr>,
right_filters: Vec<Expr>,
remaining_filters: Vec<Expr>,
}
impl PushDownCoalesceFilterHelper {
fn new(join_keys: &[(Expr, Expr)]) -> Self {
let join_keys = join_keys
.iter()
.filter_map(|key_pair| match key_pair {
(Expr::Column(lhs), Expr::Column(rhs)) => Some((lhs.clone(), rhs.clone())),
_ => None,
})
.collect();
Self {
join_keys,
left_filters: Vec::new(),
right_filters: Vec::new(),
remaining_filters: Vec::new(),
}
}
fn push_columns<F: FnMut(Expr) -> Expr>(
&mut self,
columns: (Column, Column),
mut build_filter: F,
) {
self.left_filters
.push(build_filter(Expr::Column(columns.0)));
self.right_filters
.push(build_filter(Expr::Column(columns.1)));
}
fn extract_join_columns(&self, expr: &Expr) -> Option<(Column, Column)> {
if let Expr::ScalarFunction(ScalarFunction { func, args }) = expr {
let _ = func.inner().as_any().downcast_ref::<CoalesceFunc>()?;
if let [Expr::Column(lhs), Expr::Column(rhs)] = args.as_slice() {
for (join_lhs, join_rhs) in &self.join_keys {
if join_lhs == lhs && join_rhs == rhs {
return Some((lhs.clone(), rhs.clone()));
}
if join_lhs == rhs && join_rhs == lhs {
return Some((rhs.clone(), lhs.clone()));
}
}
}
}
None
}
fn push_term(&mut self, term: &Expr) {
match term {
Expr::BinaryExpr(BinaryExpr { left, op, right }) if op.supports_propagation() => {
if let Some(columns) = self.extract_join_columns(left) {
return self.push_columns(columns, |replacement| {
Expr::BinaryExpr(BinaryExpr {
left: Box::new(replacement),
op: *op,
right: right.clone(),
})
});
}
if let Some(columns) = self.extract_join_columns(right) {
return self.push_columns(columns, |replacement| {
Expr::BinaryExpr(BinaryExpr {
left: left.clone(),
op: *op,
right: Box::new(replacement),
})
});
}
},
Expr::IsNull(expr) => {
if let Some(columns) = self.extract_join_columns(expr) {
return self
.push_columns(columns, |replacement| Expr::IsNull(Box::new(replacement)));
}
},
Expr::IsNotNull(expr) => {
if let Some(columns) = self.extract_join_columns(expr) {
return self.push_columns(columns, |replacement| {
Expr::IsNotNull(Box::new(replacement))
});
}
},
Expr::IsTrue(expr) => {
if let Some(columns) = self.extract_join_columns(expr) {
return self
.push_columns(columns, |replacement| Expr::IsTrue(Box::new(replacement)));
}
},
Expr::IsFalse(expr) => {
if let Some(columns) = self.extract_join_columns(expr) {
return self
.push_columns(columns, |replacement| Expr::IsFalse(Box::new(replacement)));
}
},
Expr::IsUnknown(expr) => {
if let Some(columns) = self.extract_join_columns(expr) {
return self.push_columns(columns, |replacement| {
Expr::IsUnknown(Box::new(replacement))
});
}
},
Expr::IsNotTrue(expr) => {
if let Some(columns) = self.extract_join_columns(expr) {
return self.push_columns(columns, |replacement| {
Expr::IsNotTrue(Box::new(replacement))
});
}
},
Expr::IsNotFalse(expr) => {
if let Some(columns) = self.extract_join_columns(expr) {
return self.push_columns(columns, |replacement| {
Expr::IsNotFalse(Box::new(replacement))
});
}
},
Expr::IsNotUnknown(expr) => {
if let Some(columns) = self.extract_join_columns(expr) {
return self.push_columns(columns, |replacement| {
Expr::IsNotUnknown(Box::new(replacement))
});
}
},
Expr::Between(between) => {
if let Some(columns) = self.extract_join_columns(&between.expr) {
return self.push_columns(columns, |replacement| {
Expr::Between(Between {
expr: Box::new(replacement),
negated: between.negated,
low: between.low.clone(),
high: between.high.clone(),
})
});
}
},
Expr::InList(in_list) => {
if let Some(columns) = self.extract_join_columns(&in_list.expr) {
return self.push_columns(columns, |replacement| {
Expr::InList(InList {
expr: Box::new(replacement),
list: in_list.list.clone(),
negated: in_list.negated,
})
});
}
},
_ => {},
}
self.remaining_filters.push(term.clone());
}
fn push_predicate(
mut self,
predicate: Expr,
) -> Result<(Option<Expr>, Option<Expr>, Option<Expr>)> {
let predicates = split_conjunction_owned(predicate);
let terms = simplify_predicates(predicates)?;
for term in terms {
self.push_term(&term);
}
Ok((
conjunction(self.left_filters),
conjunction(self.right_filters),
conjunction(self.remaining_filters),
))
}
}
fn rewrite_impl(plan: LogicalPlan) -> Result<Transformed<LogicalPlan>> {
let LogicalPlan::Filter(Filter {
predicate, input, ..
}) = &plan
else {
return Ok(Transformed::no(plan));
};
let LogicalPlan::Join(join) = input.as_ref() else {
return Ok(Transformed::no(plan));
};
let (Some(left), Some(right), remaining) =
PushDownCoalesceFilterHelper::new(&join.on).push_predicate(predicate.clone())?
else {
return Ok(Transformed::no(plan));
};
let mut join = join.clone();
join.left = Arc::new(LogicalPlan::Filter(Filter::try_new(
left,
join.left.clone(),
)?));
join.right = Arc::new(LogicalPlan::Filter(Filter::try_new(
right,
join.right.clone(),
)?));
let mut plan = LogicalPlan::Join(join);
if let Some(remaining) = conjunction(remaining) {
plan = LogicalPlan::Filter(Filter::try_new(remaining, Arc::new(plan))?);
}
Ok(Transformed::yes(plan))
}
impl OptimizerRule for CoalesceAwarePushDownFilter {
fn name(&self) -> &'static str {
"coalesce_aware_push_down_filter"
}
fn apply_order(&self) -> Option<ApplyOrder> {
Some(ApplyOrder::TopDown)
}
fn supports_rewrite(&self) -> bool {
true
}
fn rewrite(
&self,
plan: LogicalPlan,
config: &dyn OptimizerConfig,
) -> Result<Transformed<LogicalPlan>> {
rewrite_impl(plan)?.transform_data(|plan| self.push_down_filter.rewrite(plan, config))
}
}
#[cfg(test)]
mod tests {
use datafusion::{
arrow::datatypes::{DataType, Field, Schema},
datasource::{empty::EmptyTable, provider_as_source},
logical_expr::{JoinType, LogicalPlanBuilder, col, lit},
optimizer::{Optimizer, OptimizerContext},
prelude::coalesce,
};
use pretty_assertions::assert_eq;
use super::*;
fn assert_optimized_plan_equal(plan: LogicalPlan, expected: LogicalPlan) -> Result<()> {
let optimizer = Optimizer::with_rules(vec![Arc::new(CoalesceAwarePushDownFilter::new())]);
let config = OptimizerContext::new().with_max_passes(1);
let optimized = optimizer.optimize(plan, &config, |_, _| {})?;
assert_eq!(format!("{optimized}").trim(), format!("{expected}").trim());
Ok(())
}
#[test]
fn primitive_test() -> Result<()> {
let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
let table = EmptyTable::new(Arc::new(schema));
let scan = provider_as_source(Arc::new(table));
let plan = LogicalPlanBuilder::scan("t1", scan.clone(), None)?
.join(
LogicalPlanBuilder::scan("t2", scan.clone(), None)?.build()?,
JoinType::Full,
(vec!["a"], vec!["a"]),
None,
)?
.filter(coalesce(vec![col("t1.a"), col("t2.a")]).eq(lit(1)))?
.build()?;
let expected = LogicalPlanBuilder::scan("t1", scan.clone(), None)?
.filter(col("a").eq(lit(1)))?
.join(
LogicalPlanBuilder::scan("t2", scan.clone(), None)?
.filter(col("t2.a").eq(lit(1)))?
.build()?,
JoinType::Full,
(vec!["a"], vec!["a"]),
None,
)?
.build()?;
assert_optimized_plan_equal(plan, expected)
}
#[test]
fn simple_test() -> Result<()> {
let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
let table = EmptyTable::new(Arc::new(schema));
let scan = provider_as_source(Arc::new(table));
let plan = LogicalPlanBuilder::scan("t1", scan.clone(), None)?
.join(
LogicalPlanBuilder::scan("t2", scan.clone(), None)?.build()?,
JoinType::Full,
(vec!["a"], vec!["a"]),
None,
)?
.project(vec![coalesce(vec![col("t1.a"), col("t2.a")]).alias("a")])?
.alias("j1")?
.filter(col("j1.a").eq(lit(1)))?
.build()?;
let expected = LogicalPlanBuilder::scan("t1", scan.clone(), None)?
.filter(col("a").eq(lit(1)))?
.join(
LogicalPlanBuilder::scan("t2", scan.clone(), None)?
.filter(col("t2.a").eq(lit(1)))?
.build()?,
JoinType::Full,
(vec!["a"], vec!["a"]),
None,
)?
.project(vec![coalesce(vec![col("t1.a"), col("t2.a")]).alias("a")])?
.alias("j1")?
.build()?;
assert_optimized_plan_equal(plan, expected)
}
#[test]
fn propagate_supported_test() -> Result<()> {
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Int32, true),
]);
let table = EmptyTable::new(Arc::new(schema));
let scan = provider_as_source(Arc::new(table));
let plan = LogicalPlanBuilder::scan("t1", scan.clone(), None)?
.join(
LogicalPlanBuilder::scan("t2", scan.clone(), None)?.build()?,
JoinType::Full,
(vec!["a"], vec!["a"]),
None,
)?
.project(vec![
coalesce(vec![col("t1.a"), col("t2.a")]).alias("a"),
col("t1.b").alias("b1"),
col("t2.b").alias("b2"),
])?
.alias("j1")?
.filter(col("j1.a").gt(lit(1)).and(col("j1.b1").eq(lit(2))))?
.build()?;
let expected = LogicalPlanBuilder::scan("t1", scan.clone(), None)?
.filter(col("t1.a").gt(lit(1)))?
.join(
LogicalPlanBuilder::scan("t2", scan.clone(), None)?
.filter(col("t2.a").gt(lit(1)))?
.build()?,
JoinType::Full,
(vec!["a"], vec!["a"]),
None,
)?
.filter(col("t1.b").eq(lit(2)))?
.project(vec![
coalesce(vec![col("t1.a"), col("t2.a")]).alias("a"),
col("t1.b").alias("b1"),
col("t2.b").alias("b2"),
])?
.alias("j1")?
.build()?;
assert_optimized_plan_equal(plan, expected)
}
#[test]
fn do_not_propagate_unsupported_test() -> Result<()> {
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Int32, true),
]);
let table = EmptyTable::new(Arc::new(schema));
let scan = provider_as_source(Arc::new(table));
let plan = LogicalPlanBuilder::scan("t1", scan.clone(), None)?
.join(
LogicalPlanBuilder::scan("t2", scan.clone(), None)?.build()?,
JoinType::Full,
(vec!["a"], vec!["a"]),
None,
)?
.project(vec![
coalesce(vec![col("t1.a"), col("t2.a")]).alias("a"),
col("t1.b").alias("b1"),
col("t2.b").alias("b2"),
])?
.alias("j1")?
.filter(col("j1.a").eq(lit(1)).or(col("j1.b1").eq(lit(2))))?
.build()?;
let expected = LogicalPlanBuilder::scan("t1", scan.clone(), None)?
.join(
LogicalPlanBuilder::scan("t2", scan.clone(), None)?.build()?,
JoinType::Full,
(vec!["a"], vec!["a"]),
None,
)?
.filter(
coalesce(vec![col("t1.a"), col("t2.a")])
.eq(lit(1))
.or(col("t1.b").eq(lit(2))),
)?
.project(vec![
coalesce(vec![col("t1.a"), col("t2.a")]).alias("a"),
col("t1.b").alias("b1"),
col("t2.b").alias("b2"),
])?
.alias("j1")?
.build()?;
assert_optimized_plan_equal(plan, expected)
}
#[test]
fn nested_test() -> Result<()> {
let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
let table = EmptyTable::new(Arc::new(schema));
let scan = provider_as_source(Arc::new(table));
let plan = LogicalPlanBuilder::scan("t1", scan.clone(), None)?
.join(
LogicalPlanBuilder::scan("t2", scan.clone(), None)?
.join(
LogicalPlanBuilder::scan("t3", scan.clone(), None)?
.join(
LogicalPlanBuilder::scan("t4", scan.clone(), None)?.build()?,
JoinType::Full,
(vec!["a"], vec!["a"]),
None,
)?
.project(vec![coalesce(vec![col("t3.a"), col("t4.a")]).alias("a")])?
.alias("j3")?
.build()?,
JoinType::Full,
(vec!["a"], vec!["a"]),
None,
)?
.project(vec![coalesce(vec![col("t2.a"), col("j3.a")]).alias("a")])?
.alias("j2")?
.build()?,
JoinType::Full,
(vec!["a"], vec!["a"]),
None,
)?
.project(vec![coalesce(vec![col("t1.a"), col("j2.a")]).alias("a")])?
.alias("j1")?
.filter(col("j1.a").eq(lit(1)))?
.build()?;
let expected = LogicalPlanBuilder::scan("t1", scan.clone(), None)?
.filter(col("t1.a").eq(lit(1)))?
.join(
LogicalPlanBuilder::scan("t2", scan.clone(), None)?
.filter(col("t2.a").eq(lit(1)))?
.join(
LogicalPlanBuilder::scan("t3", scan.clone(), None)?
.filter(col("t3.a").eq(lit(1)))?
.join(
LogicalPlanBuilder::scan("t4", scan.clone(), None)?
.filter(col("t4.a").eq(lit(1)))?
.build()?,
JoinType::Full,
(vec!["a"], vec!["a"]),
None,
)?
.project(vec![coalesce(vec![col("t3.a"), col("t4.a")]).alias("a")])?
.alias("j3")?
.build()?,
JoinType::Full,
(vec!["a"], vec!["a"]),
None,
)?
.project(vec![coalesce(vec![col("t2.a"), col("j3.a")]).alias("a")])?
.alias("j2")?
.build()?,
JoinType::Full,
(vec!["a"], vec!["a"]),
None,
)?
.project(vec![coalesce(vec![col("t1.a"), col("j2.a")]).alias("a")])?
.alias("j1")?
.build()?;
assert_optimized_plan_equal(plan, expected)
}
#[test]
fn two_sided_test() -> Result<()> {
let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
let table = EmptyTable::new(Arc::new(schema));
let scan = provider_as_source(Arc::new(table));
let plan = LogicalPlanBuilder::scan("t1", scan.clone(), None)?
.join(
LogicalPlanBuilder::scan("t2", scan.clone(), None)?.build()?,
JoinType::Full,
(vec!["a"], vec!["a"]),
None,
)?
.project(vec![coalesce(vec![col("t1.a"), col("t2.a")]).alias("a")])?
.alias("j2")?
.join(
LogicalPlanBuilder::scan("t3", scan.clone(), None)?
.join(
LogicalPlanBuilder::scan("t4", scan.clone(), None)?.build()?,
JoinType::Full,
(vec!["a"], vec!["a"]),
None,
)?
.project(vec![coalesce(vec![col("t3.a"), col("t4.a")]).alias("a")])?
.alias("j3")?
.build()?,
JoinType::Full,
(vec!["a"], vec!["a"]),
None,
)?
.project(vec![coalesce(vec![col("j2.a"), col("j3.a")]).alias("a")])?
.alias("j1")?
.filter(col("j1.a").eq(lit(1)))?
.build()?;
let expected = LogicalPlanBuilder::scan("t1", scan.clone(), None)?
.filter(col("t1.a").eq(lit(1)))?
.join(
LogicalPlanBuilder::scan("t2", scan.clone(), None)?
.filter(col("t2.a").eq(lit(1)))?
.build()?,
JoinType::Full,
(vec!["a"], vec!["a"]),
None,
)?
.project(vec![coalesce(vec![col("t1.a"), col("t2.a")]).alias("a")])?
.alias("j2")?
.join(
LogicalPlanBuilder::scan("t3", scan.clone(), None)?
.filter(col("t3.a").eq(lit(1)))?
.join(
LogicalPlanBuilder::scan("t4", scan.clone(), None)?
.filter(col("t4.a").eq(lit(1)))?
.build()?,
JoinType::Full,
(vec!["a"], vec!["a"]),
None,
)?
.project(vec![coalesce(vec![col("t3.a"), col("t4.a")]).alias("a")])?
.alias("j3")?
.build()?,
JoinType::Full,
(vec!["a"], vec!["a"]),
None,
)?
.project(vec![coalesce(vec![col("j2.a"), col("j3.a")]).alias("a")])?
.alias("j1")?
.build()?;
assert_optimized_plan_equal(plan, expected)
}
#[test]
fn interchanged_test() -> Result<()> {
let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
let table = EmptyTable::new(Arc::new(schema));
let scan = provider_as_source(Arc::new(table));
let plan = LogicalPlanBuilder::scan("t1", scan.clone(), None)?
.join(
LogicalPlanBuilder::scan("t2", scan.clone(), None)?
.join(
LogicalPlanBuilder::scan("t3", scan.clone(), None)?
.join(
LogicalPlanBuilder::scan("t4", scan.clone(), None)?.build()?,
JoinType::Full,
(vec!["a"], vec!["a"]),
None,
)?
.project(vec![coalesce(vec![col("t3.a"), col("t4.a")]).alias("a")])?
.alias("j3")?
.build()?,
JoinType::Inner,
(vec!["a"], vec!["a"]),
None,
)?
.project(vec![col("j3.a").alias("a")])?
.alias("j2")?
.build()?,
JoinType::Full,
(vec!["a"], vec!["a"]),
None,
)?
.project(vec![coalesce(vec![col("t1.a"), col("j2.a")]).alias("a")])?
.alias("j1")?
.filter(col("j1.a").eq(lit(1)))?
.build()?;
let expected = LogicalPlanBuilder::scan("t1", scan.clone(), None)?
.filter(col("t1.a").eq(lit(1)))?
.join(
LogicalPlanBuilder::scan("t2", scan.clone(), None)?
.filter(col("t2.a").eq(lit(1)))?
.join(
LogicalPlanBuilder::scan("t3", scan.clone(), None)?
.filter(col("t3.a").eq(lit(1)))?
.join(
LogicalPlanBuilder::scan("t4", scan.clone(), None)?
.filter(col("t4.a").eq(lit(1)))?
.build()?,
JoinType::Full,
(vec!["a"], vec!["a"]),
None,
)?
.project(vec![coalesce(vec![col("t3.a"), col("t4.a")]).alias("a")])?
.alias("j3")?
.build()?,
JoinType::Inner,
(vec!["a"], vec!["a"]),
None,
)?
.project(vec![col("j3.a").alias("a")])?
.alias("j2")?
.build()?,
JoinType::Full,
(vec!["a"], vec!["a"]),
None,
)?
.project(vec![coalesce(vec![col("t1.a"), col("j2.a")]).alias("a")])?
.alias("j1")?
.build()?;
assert_optimized_plan_equal(plan, expected)
}
}Additional context
Provided implementation has several assumptions standing only in my codebase:
- coalesce is used solely to combine FULL OUTER JOIN keys
- in filters column reference is always on the left-hand side of binary expression
- all column references use qualified column name
- all subquery aliases are uniquely named