Skip to content
17 changes: 11 additions & 6 deletions datafusion/expr/src/logical_plan/invariants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,21 +198,26 @@ pub fn check_subquery_expr(
}
}?;
match outer_plan {
LogicalPlan::Projection(_)
| LogicalPlan::Filter(_) => Ok(()),
LogicalPlan::Aggregate(Aggregate { group_expr, aggr_expr, .. }) => {
LogicalPlan::Projection(_) | LogicalPlan::Filter(_) => Ok(()),
LogicalPlan::Aggregate(Aggregate {
group_expr,
aggr_expr,
..
}) => {
if group_expr.contains(expr) && !aggr_expr.contains(expr) {
// TODO revisit this validation logic
plan_err!(
"Correlated scalar subquery in the GROUP BY clause must also be in the aggregate expressions"
"Correlated scalar subquery in the GROUP BY clause must \
also be in the aggregate expressions"
)
} else {
Ok(())
}
}
_ => plan_err!(
"Correlated scalar subquery can only be used in Projection, Filter, Aggregate plan nodes"
)
"Correlated scalar subquery can only be used in Projection, \
Filter, Aggregate plan nodes"
),
}?;
}
check_correlations_in_subquery(inner_plan)
Expand Down
16 changes: 15 additions & 1 deletion datafusion/optimizer/src/push_down_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1130,7 +1130,13 @@ impl OptimizerRule for PushDownFilter {
let (volatile_filters, non_volatile_filters): (Vec<&Expr>, Vec<&Expr>) =
filter_predicates
.into_iter()
.partition(|pred| pred.is_volatile());
// TODO: subquery decorrelation sometimes cannot decorrelated all the expr
// (i.e in the case of recursive subquery)
// this function may accidentally pushdown the subquery expr as well
// until then, we have to exclude these exprs here
.partition(|pred| {
pred.is_volatile() || has_scalar_subquery(pred)
});

// Check which non-volatile filters are supported by source
let supported_filters = scan
Expand Down Expand Up @@ -1422,6 +1428,14 @@ fn contain(e: &Expr, check_map: &HashMap<String, Expr>) -> bool {
is_contain
}

fn has_scalar_subquery(expr: &Expr) -> bool {
expr.exists(|e| match e {
Expr::ScalarSubquery(_) => Ok(true),
_ => Ok(false),
})
.unwrap()
}

#[cfg(test)]
mod tests {
use std::any::Any;
Expand Down
62 changes: 35 additions & 27 deletions datafusion/sql/src/expr/identifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
}

// Check the outer query schema
if let Some(outer) = planner_context.outer_query_schema() {
for outer in planner_context.outer_queries_schemas() {
if let Ok((qualifier, field)) =
outer.qualified_field_with_unqualified_name(normalize_ident.as_str())
{
Expand Down Expand Up @@ -163,35 +163,43 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
not_impl_err!("compound identifier: {ids:?}")
} else {
// Check the outer_query_schema and try to find a match
if let Some(outer) = planner_context.outer_query_schema() {
let search_result = search_dfschema(&ids, outer);
match search_result {
// Found matching field with spare identifier(s) for nested field(s) in structure
Some((field, qualifier, nested_names))
if !nested_names.is_empty() =>
{
// TODO: remove when can support nested identifiers for OuterReferenceColumn
not_impl_err!(
let outer_schemas = planner_context.outer_queries_schemas();
let mut maybe_result = None;
if !outer_schemas.is_empty() {
for outer in planner_context.outer_queries_schemas() {
let search_result = search_dfschema(&ids, &outer);
let result = match search_result {
// Found matching field with spare identifier(s) for nested field(s) in structure
Some((field, qualifier, nested_names))
if !nested_names.is_empty() =>
{
// TODO: remove when can support nested identifiers for OuterReferenceColumn
not_impl_err!(
"Nested identifiers are not yet supported for OuterReferenceColumn {}",
Column::from((qualifier, field)).quoted_flat_name()
)
}
// Found matching field with no spare identifier(s)
Some((field, qualifier, _nested_names)) => {
// Found an exact match on a qualified name in the outer plan schema, so this is an outer reference column
Ok(Expr::OuterReferenceColumn(
Arc::new(field.clone()),
Column::from((qualifier, field)),
))
}
// Found no matching field, will return a default
None => {
let s = &ids[0..ids.len()];
// safe unwrap as s can never be empty or exceed the bounds
let (relation, column_name) =
form_identifier(s).unwrap();
Ok(Expr::Column(Column::new(relation, column_name)))
}
}
// Found matching field with no spare identifier(s)
Some((field, qualifier, _nested_names)) => {
// Found an exact match on a qualified name in the outer plan schema, so this is an outer reference column
Ok(Expr::OuterReferenceColumn(
Arc::new(field.clone()),
Column::from((qualifier, field)),
))
}
// Found no matching field, will return a default
None => continue,
};
maybe_result = Some(result);
break;
}
if let Some(result) = maybe_result {
result
} else {
let s = &ids[0..ids.len()];
// safe unwrap as s can never be empty or exceed the bounds
let (relation, column_name) = form_identifier(s).unwrap();
Ok(Expr::Column(Column::new(relation, column_name)))
}
} else {
let s = &ids[0..ids.len()];
Expand Down
16 changes: 7 additions & 9 deletions datafusion/sql/src/expr/subquery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,10 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
input_schema: &DFSchema,
planner_context: &mut PlannerContext,
) -> Result<Expr> {
let old_outer_query_schema =
planner_context.set_outer_query_schema(Some(input_schema.clone().into()));
planner_context.append_outer_query_schema(input_schema.clone().into());
let sub_plan = self.query_to_plan(subquery, planner_context)?;
let outer_ref_columns = sub_plan.all_out_ref_exprs();
planner_context.set_outer_query_schema(old_outer_query_schema);
planner_context.pop_outer_query_schema();
Ok(Expr::Exists(Exists {
subquery: Subquery {
subquery: Arc::new(sub_plan),
Expand All @@ -54,8 +53,7 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
input_schema: &DFSchema,
planner_context: &mut PlannerContext,
) -> Result<Expr> {
let old_outer_query_schema =
planner_context.set_outer_query_schema(Some(input_schema.clone().into()));
planner_context.append_outer_query_schema(input_schema.clone().into());

let mut spans = Spans::new();
if let SetExpr::Select(select) = &subquery.body.as_ref() {
Expand All @@ -70,7 +68,7 @@ impl<S: ContextProvider> SqlToRel<'_, S> {

let sub_plan = self.query_to_plan(subquery, planner_context)?;
let outer_ref_columns = sub_plan.all_out_ref_exprs();
planner_context.set_outer_query_schema(old_outer_query_schema);
planner_context.pop_outer_query_schema();

self.validate_single_column(
&sub_plan,
Expand Down Expand Up @@ -98,8 +96,8 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
input_schema: &DFSchema,
planner_context: &mut PlannerContext,
) -> Result<Expr> {
let old_outer_query_schema =
planner_context.set_outer_query_schema(Some(input_schema.clone().into()));
planner_context.append_outer_query_schema(input_schema.clone().into());

let mut spans = Spans::new();
if let SetExpr::Select(select) = subquery.body.as_ref() {
for item in &select.projection {
Expand All @@ -112,7 +110,7 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
}
let sub_plan = self.query_to_plan(subquery, planner_context)?;
let outer_ref_columns = sub_plan.all_out_ref_exprs();
planner_context.set_outer_query_schema(old_outer_query_schema);
planner_context.pop_outer_query_schema();

self.validate_single_column(
&sub_plan,
Expand Down
44 changes: 42 additions & 2 deletions datafusion/sql/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,15 @@ pub struct PlannerContext {
/// Map of CTE name to logical plan of the WITH clause.
/// Use `Arc<LogicalPlan>` to allow cheap cloning
ctes: HashMap<String, Arc<LogicalPlan>>,

/// The queries schemas of outer query relations, used to resolve the outer referenced
/// columns in subquery (recursive aware)
outer_queries_schemas_stack: Vec<DFSchemaRef>,

/// The query schema of the outer query plan, used to resolve the columns in subquery
/// This field is maintained to support deprecated functions
/// `outer_query_schema` and `set_outer_query_schema`
/// which is only aware of the adjacent outer relation
outer_query_schema: Option<DFSchemaRef>,
/// The joined schemas of all FROM clauses planned so far. When planning LATERAL
/// FROM clauses, this should become a suffix of the `outer_query_schema`.
Expand All @@ -281,6 +289,7 @@ impl PlannerContext {
prepare_param_data_types: Arc::new(vec![]),
ctes: HashMap::new(),
outer_query_schema: None,
outer_queries_schemas_stack: vec![],
outer_from_schema: None,
create_table_schema: None,
}
Expand All @@ -295,13 +304,22 @@ impl PlannerContext {
self
}

// Return a reference to the outer query's schema
/// Return a reference to the outer query's schema
/// This function should not be used together with
/// `outer_queries_schemas`, `append_outer_query_schema`
/// `latest_outer_query_schema` and `pop_outer_query_schema`
#[deprecated(note = "Use outer_queries_schemas instead")]
pub fn outer_query_schema(&self) -> Option<&DFSchema> {
self.outer_query_schema.as_ref().map(|s| s.as_ref())
}

/// Sets the outer query schema, returning the existing one, if
/// any
/// any, this function should not be used together with
/// `outer_queries_schemas`, `append_outer_query_schema`
/// `latest_outer_query_schema` and `pop_outer_query_schema`
#[deprecated(
note = "This struct is now aware of a stack of schemas, check pop_outer_query_schema"
)]
pub fn set_outer_query_schema(
&mut self,
mut schema: Option<DFSchemaRef>,
Expand All @@ -310,6 +328,28 @@ impl PlannerContext {
schema
}

/// Return the stack of outer relations' schemas, the outer most
/// relation are at the first entry
pub fn outer_queries_schemas(&self) -> Vec<DFSchemaRef> {
self.outer_queries_schemas_stack.to_vec()
}

/// Sets the outer query schema, returning the existing one, if
/// any
pub fn append_outer_query_schema(&mut self, schema: DFSchemaRef) {
self.outer_queries_schemas_stack.push(schema);
}

/// The schema of the adjacent outer relation
pub fn latest_outer_query_schema(&mut self) -> Option<DFSchemaRef> {
self.outer_queries_schemas_stack.last().cloned()
}

/// Remove the schema of the adjacent outer relation
pub fn pop_outer_query_schema(&mut self) -> Option<DFSchemaRef> {
self.outer_queries_schemas_stack.pop()
}

pub fn set_table_schema(
&mut self,
mut schema: Option<DFSchemaRef>,
Expand Down
23 changes: 14 additions & 9 deletions datafusion/sql/src/relation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,10 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
} => {
let tbl_func_ref = self.object_name_to_table_reference(name)?;
let schema = planner_context
.outer_query_schema()
.outer_queries_schemas()
.last()
.cloned()
.unwrap_or_else(DFSchema::empty);
.unwrap_or_else(|| Arc::new(DFSchema::empty()));
let func_args = args
.into_iter()
.map(|arg| match arg {
Expand Down Expand Up @@ -213,20 +214,24 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
let old_from_schema = planner_context
.set_outer_from_schema(None)
.unwrap_or_else(|| Arc::new(DFSchema::empty()));
let new_query_schema = match planner_context.outer_query_schema() {
Some(old_query_schema) => {
let outer_query_schema = planner_context.pop_outer_query_schema();
let new_query_schema = match outer_query_schema {
Some(ref old_query_schema) => {
let mut new_query_schema = old_from_schema.as_ref().clone();
new_query_schema.merge(old_query_schema);
Some(Arc::new(new_query_schema))
new_query_schema.merge(old_query_schema.as_ref());
Arc::new(new_query_schema)
}
None => Some(Arc::clone(&old_from_schema)),
None => Arc::clone(&old_from_schema),
};
let old_query_schema = planner_context.set_outer_query_schema(new_query_schema);
planner_context.append_outer_query_schema(new_query_schema);

let plan = self.create_relation(subquery, planner_context)?;
let outer_ref_columns = plan.all_out_ref_exprs();

planner_context.set_outer_query_schema(old_query_schema);
planner_context.pop_outer_query_schema();
if let Some(schema) = outer_query_schema {
planner_context.append_outer_query_schema(schema);
}
planner_context.set_outer_from_schema(Some(old_from_schema));

// We can omit the subquery wrapper if there are no columns
Expand Down
20 changes: 13 additions & 7 deletions datafusion/sql/src/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::utils::{

use datafusion_common::error::DataFusionErrorBuilder;
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
use datafusion_common::{not_impl_err, plan_err, Result};
use datafusion_common::{not_impl_err, plan_err, DFSchema, Result};
use datafusion_common::{RecursionUnnestOption, UnnestOptions};
use datafusion_expr::expr::{Alias, PlannedReplaceSelectItem, WildcardOptions};
use datafusion_expr::expr_rewriter::{
Expand Down Expand Up @@ -594,12 +594,8 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
match selection {
Some(predicate_expr) => {
let fallback_schemas = plan.fallback_normalize_schemas();
let outer_query_schema = planner_context.outer_query_schema().cloned();
let outer_query_schema_vec = outer_query_schema
.as_ref()
.map(|schema| vec![schema])
.unwrap_or_else(Vec::new);

let outer_query_schema_vec = planner_context.outer_queries_schemas();
let filter_expr =
self.sql_to_expr(predicate_expr, plan.schema(), planner_context)?;

Expand All @@ -614,9 +610,19 @@ impl<S: ContextProvider> SqlToRel<'_, S> {

let mut using_columns = HashSet::new();
expr_to_columns(&filter_expr, &mut using_columns)?;
let mut schema_stack: Vec<Vec<&DFSchema>> =
vec![vec![plan.schema()], fallback_schemas];
for sc in outer_query_schema_vec.iter().rev() {
schema_stack.push(vec![sc.as_ref()]);
}

let filter_expr = normalize_col_with_schemas_and_ambiguity_check(
filter_expr,
&[&[plan.schema()], &fallback_schemas, &outer_query_schema_vec],
schema_stack
.iter()
.map(|sc| sc.as_slice())
.collect::<Vec<&[&DFSchema]>>()
.as_slice(),
&[using_columns],
)?;

Expand Down
23 changes: 23 additions & 0 deletions datafusion/sqllogictest/test_files/joins.slt
Original file line number Diff line number Diff line change
Expand Up @@ -4710,6 +4710,29 @@ logical_plan
08)----------TableScan: j3 projection=[j3_string, j3_id]
physical_plan_error This feature is not implemented: Physical plan does not support logical expression OuterReferenceColumn(Field { name: "j1_id", data_type: Int32, nullable: true }, Column { relation: Some(Bare { table: "j1" }), name: "j1_id" })

# 2 nested lateral join with the deepest join referencing the outer most relation
query TT
explain SELECT * FROM j1 j1_outer, LATERAL (
SELECT * FROM j1 j1_inner, LATERAL (
SELECT * FROM j2 WHERE j1_inner.j1_id = j2_id and j1_outer.j1_id=j2_id
) as j2
) as j2;
----
logical_plan
01)Cross Join:
02)--SubqueryAlias: j1_outer
03)----TableScan: j1 projection=[j1_string, j1_id]
04)--SubqueryAlias: j2
05)----Subquery:
06)------Cross Join:
07)--------SubqueryAlias: j1_inner
08)----------TableScan: j1 projection=[j1_string, j1_id]
09)--------SubqueryAlias: j2
10)----------Subquery:
11)------------Filter: outer_ref(j1_inner.j1_id) = j2.j2_id AND outer_ref(j1_outer.j1_id) = j2.j2_id
12)--------------TableScan: j2 projection=[j2_string, j2_id]
physical_plan_error This feature is not implemented: Physical plan does not support logical expression OuterReferenceColumn(Field { name: "j1_id", data_type: Int32, nullable: true }, Column { relation: Some(Bare { table: "j1_inner" }), name: "j1_id" })

query TT
explain SELECT * FROM j1, LATERAL (SELECT 1) AS j2;
----
Expand Down
Loading