From 2a828ede472675f5def42d9495a97689cd041d1e Mon Sep 17 00:00:00 2001 From: Duong Cong Toai Date: Sun, 25 May 2025 16:10:07 +0200 Subject: [PATCH 1/9] fix: allow OuterRefColumn for non-adjacent outer relation --- datafusion/sql/src/expr/identifier.rs | 62 +++++++++++--------- datafusion/sql/src/expr/subquery.rs | 16 +++-- datafusion/sql/src/planner.rs | 29 +++++---- datafusion/sql/src/relation/mod.rs | 14 ++--- datafusion/sql/src/select.rs | 6 +- datafusion/sqllogictest/test_files/debug.slt | 52 ++++++++++++++++ 6 files changed, 121 insertions(+), 58 deletions(-) create mode 100644 datafusion/sqllogictest/test_files/debug.slt diff --git a/datafusion/sql/src/expr/identifier.rs b/datafusion/sql/src/expr/identifier.rs index 7c276ce53e35..c9eda721fada 100644 --- a/datafusion/sql/src/expr/identifier.rs +++ b/datafusion/sql/src/expr/identifier.rs @@ -69,7 +69,7 @@ impl SqlToRel<'_, S> { } // Check the outer query schema - if let Some(outer) = planner_context.outer_query_schema() { + for outer in planner_context.outer_query_schema() { if let Ok((qualifier, field)) = outer.qualified_field_with_unqualified_name(normalize_ident.as_str()) { @@ -165,35 +165,43 @@ impl SqlToRel<'_, S> { not_impl_err!("compound identifier: {ids:?}") } else { // Check the outer_query_schema and try to find a match - if let Some(outer) = planner_context.outer_query_schema() { - let search_result = search_dfschema(&ids, outer); - match search_result { - // Found matching field with spare identifier(s) for nested field(s) in structure - Some((field, qualifier, nested_names)) - if !nested_names.is_empty() => - { - // TODO: remove when can support nested identifiers for OuterReferenceColumn - not_impl_err!( + let outer_schemas = planner_context.outer_query_schema(); + let mut maybe_result = None; + if outer_schemas.len() > 0 { + for outer in planner_context.outer_query_schema() { + let search_result = search_dfschema(&ids, outer); + let result = match search_result { + // Found matching field with spare identifier(s) for nested field(s) in structure + Some((field, qualifier, nested_names)) + if !nested_names.is_empty() => + { + // TODO: remove when can support nested identifiers for OuterReferenceColumn + not_impl_err!( "Nested identifiers are not yet supported for OuterReferenceColumn {}", Column::from((qualifier, field)).quoted_flat_name() ) - } - // Found matching field with no spare identifier(s) - Some((field, qualifier, _nested_names)) => { - // Found an exact match on a qualified name in the outer plan schema, so this is an outer reference column - Ok(Expr::OuterReferenceColumn( - field.data_type().clone(), - Column::from((qualifier, field)), - )) - } - // Found no matching field, will return a default - None => { - let s = &ids[0..ids.len()]; - // safe unwrap as s can never be empty or exceed the bounds - let (relation, column_name) = - form_identifier(s).unwrap(); - Ok(Expr::Column(Column::new(relation, column_name))) - } + } + // Found matching field with no spare identifier(s) + Some((field, qualifier, _nested_names)) => { + // Found an exact match on a qualified name in the outer plan schema, so this is an outer reference column + Ok(Expr::OuterReferenceColumn( + field.data_type().clone(), + Column::from((qualifier, field)), + )) + } + // Found no matching field, will return a default + None => continue, + }; + maybe_result = Some(result); + break; + } + if let Some(result) = maybe_result { + result + } else { + let s = &ids[0..ids.len()]; + // safe unwrap as s can never be empty or exceed the bounds + let (relation, column_name) = form_identifier(s).unwrap(); + Ok(Expr::Column(Column::new(relation, column_name))) } } else { let s = &ids[0..ids.len()]; diff --git a/datafusion/sql/src/expr/subquery.rs b/datafusion/sql/src/expr/subquery.rs index 602d39233d58..dd4f307b2074 100644 --- a/datafusion/sql/src/expr/subquery.rs +++ b/datafusion/sql/src/expr/subquery.rs @@ -31,11 +31,10 @@ impl SqlToRel<'_, S> { input_schema: &DFSchema, planner_context: &mut PlannerContext, ) -> Result { - let old_outer_query_schema = - planner_context.set_outer_query_schema(Some(input_schema.clone().into())); + planner_context.set_outer_query_schema(input_schema.clone().into()); let sub_plan = self.query_to_plan(subquery, planner_context)?; let outer_ref_columns = sub_plan.all_out_ref_exprs(); - planner_context.set_outer_query_schema(old_outer_query_schema); + planner_context.pop_outer_query_schema(); Ok(Expr::Exists(Exists { subquery: Subquery { subquery: Arc::new(sub_plan), @@ -54,8 +53,7 @@ impl SqlToRel<'_, S> { input_schema: &DFSchema, planner_context: &mut PlannerContext, ) -> Result { - let old_outer_query_schema = - planner_context.set_outer_query_schema(Some(input_schema.clone().into())); + planner_context.set_outer_query_schema(input_schema.clone().into()); let mut spans = Spans::new(); if let SetExpr::Select(select) = subquery.body.as_ref() { @@ -70,7 +68,7 @@ impl SqlToRel<'_, S> { let sub_plan = self.query_to_plan(subquery, planner_context)?; let outer_ref_columns = sub_plan.all_out_ref_exprs(); - planner_context.set_outer_query_schema(old_outer_query_schema); + planner_context.pop_outer_query_schema(); self.validate_single_column( &sub_plan, @@ -98,8 +96,8 @@ impl SqlToRel<'_, S> { input_schema: &DFSchema, planner_context: &mut PlannerContext, ) -> Result { - let old_outer_query_schema = - planner_context.set_outer_query_schema(Some(input_schema.clone().into())); + planner_context.set_outer_query_schema(input_schema.clone().into()); + let mut spans = Spans::new(); if let SetExpr::Select(select) = subquery.body.as_ref() { for item in &select.projection { @@ -112,7 +110,7 @@ impl SqlToRel<'_, S> { } let sub_plan = self.query_to_plan(subquery, planner_context)?; let outer_ref_columns = sub_plan.all_out_ref_exprs(); - planner_context.set_outer_query_schema(old_outer_query_schema); + planner_context.pop_outer_query_schema(); self.validate_single_column( &sub_plan, diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs index 73d136d7d1cc..0d33ff68c212 100644 --- a/datafusion/sql/src/planner.rs +++ b/datafusion/sql/src/planner.rs @@ -199,7 +199,7 @@ pub struct PlannerContext { /// Use `Arc` to allow cheap cloning ctes: HashMap>, /// The query schema of the outer query plan, used to resolve the columns in subquery - outer_query_schema: Option, + outer_query_schema: Vec, /// The joined schemas of all FROM clauses planned so far. When planning LATERAL /// FROM clauses, this should become a suffix of the `outer_query_schema`. outer_from_schema: Option, @@ -219,7 +219,7 @@ impl PlannerContext { Self { prepare_param_data_types: Arc::new(vec![]), ctes: HashMap::new(), - outer_query_schema: None, + outer_query_schema: vec![], outer_from_schema: None, create_table_schema: None, } @@ -235,18 +235,27 @@ impl PlannerContext { } // Return a reference to the outer query's schema - pub fn outer_query_schema(&self) -> Option<&DFSchema> { - self.outer_query_schema.as_ref().map(|s| s.as_ref()) + pub fn outer_query_schema(&self) -> Vec<&DFSchema> { + self.outer_query_schema + .iter() + .map(|sc| sc.as_ref()) + .collect() } /// Sets the outer query schema, returning the existing one, if /// any - pub fn set_outer_query_schema( - &mut self, - mut schema: Option, - ) -> Option { - std::mem::swap(&mut self.outer_query_schema, &mut schema); - schema + pub fn set_outer_query_schema(&mut self, mut schema: DFSchemaRef) { + self.outer_query_schema.push(schema); + } + + pub fn latest_outer_query_schema(&mut self) -> Option { + self.outer_query_schema.last().clone().cloned() + } + + /// Sets the outer query schema, returning the existing one, if + /// any + pub fn pop_outer_query_schema(&mut self) -> Option { + self.outer_query_schema.pop() } pub fn set_table_schema( diff --git a/datafusion/sql/src/relation/mod.rs b/datafusion/sql/src/relation/mod.rs index 88a32a218341..8319f213bc26 100644 --- a/datafusion/sql/src/relation/mod.rs +++ b/datafusion/sql/src/relation/mod.rs @@ -25,7 +25,7 @@ use datafusion_common::{ }; use datafusion_expr::builder::subquery_alias; use datafusion_expr::{expr::Unnest, Expr, LogicalPlan, LogicalPlanBuilder}; -use datafusion_expr::{Subquery, SubqueryAlias}; +use datafusion_expr::{planner, Subquery, SubqueryAlias}; use sqlparser::ast::{FunctionArg, FunctionArgExpr, Spanned, TableFactor}; mod join; @@ -184,20 +184,20 @@ impl SqlToRel<'_, S> { let old_from_schema = planner_context .set_outer_from_schema(None) .unwrap_or_else(|| Arc::new(DFSchema::empty())); - let new_query_schema = match planner_context.outer_query_schema() { + let new_query_schema = match planner_context.pop_outer_query_schema() { Some(old_query_schema) => { let mut new_query_schema = old_from_schema.as_ref().clone(); - new_query_schema.merge(old_query_schema); - Some(Arc::new(new_query_schema)) + new_query_schema.merge(old_query_schema.as_ref()); + Arc::new(new_query_schema) } - None => Some(Arc::clone(&old_from_schema)), + None => Arc::clone(&old_from_schema), }; - let old_query_schema = planner_context.set_outer_query_schema(new_query_schema); + planner_context.set_outer_query_schema(new_query_schema); let plan = self.create_relation(subquery, planner_context)?; let outer_ref_columns = plan.all_out_ref_exprs(); - planner_context.set_outer_query_schema(old_query_schema); + planner_context.pop_outer_query_schema(); planner_context.set_outer_from_schema(Some(old_from_schema)); // We can omit the subquery wrapper if there are no columns diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs index 9fad274b51c0..242b77a32a6f 100644 --- a/datafusion/sql/src/select.rs +++ b/datafusion/sql/src/select.rs @@ -506,14 +506,10 @@ impl SqlToRel<'_, S> { match selection { Some(predicate_expr) => { let fallback_schemas = plan.fallback_normalize_schemas(); - let outer_query_schema = planner_context.outer_query_schema().cloned(); - let outer_query_schema_vec = outer_query_schema - .as_ref() - .map(|schema| vec![schema]) - .unwrap_or_else(Vec::new); let filter_expr = self.sql_to_expr(predicate_expr, plan.schema(), planner_context)?; + let outer_query_schema_vec = planner_context.outer_query_schema(); // Check for aggregation functions let aggregate_exprs = diff --git a/datafusion/sqllogictest/test_files/debug.slt b/datafusion/sqllogictest/test_files/debug.slt new file mode 100644 index 000000000000..48fd16bc0fd9 --- /dev/null +++ b/datafusion/sqllogictest/test_files/debug.slt @@ -0,0 +1,52 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# make sure to a batch size smaller than row number of the table. +statement ok +set datafusion.execution.batch_size = 2; + +statement ok +CREATE TABLE employees ( + employee_id INTEGER, + employee_name VARCHAR, + dept_id INTEGER, + salary DECIMAL +); + +statement ok +CREATE TABLE project_assignments ( + project_id INTEGER, + employee_id INTEGER, + priority INTEGER +); + + + +query TT +explain SELECT e1.employee_name, e1.salary +FROM employees e1 +WHERE e1.salary > ( + SELECT AVG(e2.salary) + FROM employees e2 + WHERE e2.dept_id = e1.dept_id + AND e2.salary > ( + SELECT AVG(e3.salary) + FROM employees e3 + WHERE e3.dept_id = e1.dept_id + ) +); +---- \ No newline at end of file From dea0b7011ab8260850f233ea8363623346179511 Mon Sep 17 00:00:00 2001 From: Duong Cong Toai Date: Sun, 25 May 2025 17:14:53 +0200 Subject: [PATCH 2/9] fix: accidentally pushdown filter with subquery --- .../expr/src/logical_plan/invariants.rs | 16 ++++++---- datafusion/optimizer/src/push_down_filter.rs | 14 ++++++++- .../sqllogictest/test_files/subquery.slt | 29 +++++++++++++++++++ 3 files changed, 53 insertions(+), 6 deletions(-) diff --git a/datafusion/expr/src/logical_plan/invariants.rs b/datafusion/expr/src/logical_plan/invariants.rs index 0c30c9785766..e70b261a6a1f 100644 --- a/datafusion/expr/src/logical_plan/invariants.rs +++ b/datafusion/expr/src/logical_plan/invariants.rs @@ -200,9 +200,12 @@ pub fn check_subquery_expr( } }?; match outer_plan { - LogicalPlan::Projection(_) - | LogicalPlan::Filter(_) => Ok(()), - LogicalPlan::Aggregate(Aggregate { group_expr, aggr_expr, .. }) => { + LogicalPlan::Projection(_) | LogicalPlan::Filter(_) => Ok(()), + LogicalPlan::Aggregate(Aggregate { + group_expr, + aggr_expr, + .. + }) => { if group_expr.contains(expr) && !aggr_expr.contains(expr) { // TODO revisit this validation logic plan_err!( @@ -212,9 +215,12 @@ pub fn check_subquery_expr( Ok(()) } } - _ => plan_err!( - "Correlated scalar subquery can only be used in Projection, Filter, Aggregate plan nodes" + any => { + println!("here {any}"); + plan_err!( + "Correlated scalar subquery can only be used in Projection, Filter, Aggregate plan nodes123 {any}" ) + } }?; } check_correlations_in_subquery(inner_plan) diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index bbf0b0dd810e..9fa45d2ad9b5 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -1089,7 +1089,11 @@ impl OptimizerRule for PushDownFilter { let (volatile_filters, non_volatile_filters): (Vec<&Expr>, Vec<&Expr>) = filter_predicates .into_iter() - .partition(|pred| pred.is_volatile()); + // TODO: subquery decorrelation sometimes cannot decorrelated all the expr + // (i.e in the case of recursive subquery) + // this function may accidentally pushdown the subquery expr as well + // until then, we have to exclude these exprs here + .partition(|pred| pred.is_volatile() || has_subquery(*pred)); // Check which non-volatile filters are supported by source let supported_filters = scan @@ -1382,6 +1386,14 @@ fn contain(e: &Expr, check_map: &HashMap) -> bool { is_contain } +fn has_subquery(expr: &Expr) -> bool { + expr.exists(|e| match e { + Expr::InSubquery(_) | Expr::Exists(_) | Expr::ScalarSubquery(_) => Ok(true), + _ => Ok(false), + }) + .unwrap() +} + #[cfg(test)] mod tests { use std::any::Any; diff --git a/datafusion/sqllogictest/test_files/subquery.slt b/datafusion/sqllogictest/test_files/subquery.slt index a0ac15b740d7..c2620404f1dc 100644 --- a/datafusion/sqllogictest/test_files/subquery.slt +++ b/datafusion/sqllogictest/test_files/subquery.slt @@ -1482,3 +1482,32 @@ logical_plan statement count 0 drop table person; + +# correlated_recursive_scalar_subquery_with_level_3_subquery_referencing_level1_relation +query TT +explain select c_custkey from customer +where c_acctbal < ( + select sum(o_totalprice) from orders + where o_custkey = c_custkey + and o_totalprice < ( + select sum(l_extendedprice) as price from lineitem where l_orderkey = o_orderkey + and l_extendedprice < c_acctbal + ) +) order by c_custkey; +---- +logical_plan +01)Sort: customer.c_custkey ASC NULLS LAST +02)--Projection: customer.c_custkey +03)----Inner Join: customer.c_custkey = __scalar_sq_1.o_custkey Filter: CAST(customer.c_acctbal AS Decimal128(25, 2)) < __scalar_sq_1.sum(orders.o_totalprice) +04)------TableScan: customer projection=[c_custkey, c_acctbal] +05)------SubqueryAlias: __scalar_sq_1 +06)--------Projection: sum(orders.o_totalprice), orders.o_custkey +07)----------Aggregate: groupBy=[[orders.o_custkey]], aggr=[[sum(orders.o_totalprice)]] +08)------------Projection: orders.o_custkey, orders.o_totalprice +09)--------------Filter: CAST(orders.o_totalprice AS Decimal128(25, 2)) < () +10)----------------Subquery: +11)------------------Projection: sum(lineitem.l_extendedprice) AS price +12)--------------------Aggregate: groupBy=[[]], aggr=[[sum(lineitem.l_extendedprice)]] +13)----------------------Filter: lineitem.l_orderkey = outer_ref(orders.o_orderkey) AND lineitem.l_extendedprice < outer_ref(customer.c_acctbal) +14)------------------------TableScan: lineitem, partial_filters=[lineitem.l_orderkey = outer_ref(orders.o_orderkey), lineitem.l_extendedprice < outer_ref(customer.c_acctbal)] +15)----------------TableScan: orders projection=[o_orderkey, o_custkey, o_totalprice] From 5ed2d24a736875114209367bc9a41d7b5e8817fb Mon Sep 17 00:00:00 2001 From: Duong Cong Toai Date: Sun, 25 May 2025 17:20:20 +0200 Subject: [PATCH 3/9] chore: clippy --- datafusion/optimizer/src/push_down_filter.rs | 2 +- datafusion/sql/src/expr/identifier.rs | 2 +- datafusion/sql/src/expr/subquery.rs | 6 +++--- datafusion/sql/src/planner.rs | 4 ++-- datafusion/sql/src/relation/mod.rs | 4 ++-- 5 files changed, 9 insertions(+), 9 deletions(-) diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index 9fa45d2ad9b5..9555c9d2baac 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -1093,7 +1093,7 @@ impl OptimizerRule for PushDownFilter { // (i.e in the case of recursive subquery) // this function may accidentally pushdown the subquery expr as well // until then, we have to exclude these exprs here - .partition(|pred| pred.is_volatile() || has_subquery(*pred)); + .partition(|pred| pred.is_volatile() || has_subquery(pred)); // Check which non-volatile filters are supported by source let supported_filters = scan diff --git a/datafusion/sql/src/expr/identifier.rs b/datafusion/sql/src/expr/identifier.rs index c9eda721fada..1994c9075a5c 100644 --- a/datafusion/sql/src/expr/identifier.rs +++ b/datafusion/sql/src/expr/identifier.rs @@ -167,7 +167,7 @@ impl SqlToRel<'_, S> { // Check the outer_query_schema and try to find a match let outer_schemas = planner_context.outer_query_schema(); let mut maybe_result = None; - if outer_schemas.len() > 0 { + if !outer_schemas.is_empty() { for outer in planner_context.outer_query_schema() { let search_result = search_dfschema(&ids, outer); let result = match search_result { diff --git a/datafusion/sql/src/expr/subquery.rs b/datafusion/sql/src/expr/subquery.rs index dd4f307b2074..6e10607d8533 100644 --- a/datafusion/sql/src/expr/subquery.rs +++ b/datafusion/sql/src/expr/subquery.rs @@ -31,7 +31,7 @@ impl SqlToRel<'_, S> { input_schema: &DFSchema, planner_context: &mut PlannerContext, ) -> Result { - planner_context.set_outer_query_schema(input_schema.clone().into()); + planner_context.append_outer_query_schema(input_schema.clone().into()); let sub_plan = self.query_to_plan(subquery, planner_context)?; let outer_ref_columns = sub_plan.all_out_ref_exprs(); planner_context.pop_outer_query_schema(); @@ -53,7 +53,7 @@ impl SqlToRel<'_, S> { input_schema: &DFSchema, planner_context: &mut PlannerContext, ) -> Result { - planner_context.set_outer_query_schema(input_schema.clone().into()); + planner_context.append_outer_query_schema(input_schema.clone().into()); let mut spans = Spans::new(); if let SetExpr::Select(select) = subquery.body.as_ref() { @@ -96,7 +96,7 @@ impl SqlToRel<'_, S> { input_schema: &DFSchema, planner_context: &mut PlannerContext, ) -> Result { - planner_context.set_outer_query_schema(input_schema.clone().into()); + planner_context.append_outer_query_schema(input_schema.clone().into()); let mut spans = Spans::new(); if let SetExpr::Select(select) = subquery.body.as_ref() { diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs index 0d33ff68c212..771a17c16639 100644 --- a/datafusion/sql/src/planner.rs +++ b/datafusion/sql/src/planner.rs @@ -244,12 +244,12 @@ impl PlannerContext { /// Sets the outer query schema, returning the existing one, if /// any - pub fn set_outer_query_schema(&mut self, mut schema: DFSchemaRef) { + pub fn append_outer_query_schema(&mut self, schema: DFSchemaRef) { self.outer_query_schema.push(schema); } pub fn latest_outer_query_schema(&mut self) -> Option { - self.outer_query_schema.last().clone().cloned() + self.outer_query_schema.last().cloned() } /// Sets the outer query schema, returning the existing one, if diff --git a/datafusion/sql/src/relation/mod.rs b/datafusion/sql/src/relation/mod.rs index 8319f213bc26..9acb3897c033 100644 --- a/datafusion/sql/src/relation/mod.rs +++ b/datafusion/sql/src/relation/mod.rs @@ -25,7 +25,7 @@ use datafusion_common::{ }; use datafusion_expr::builder::subquery_alias; use datafusion_expr::{expr::Unnest, Expr, LogicalPlan, LogicalPlanBuilder}; -use datafusion_expr::{planner, Subquery, SubqueryAlias}; +use datafusion_expr::{Subquery, SubqueryAlias}; use sqlparser::ast::{FunctionArg, FunctionArgExpr, Spanned, TableFactor}; mod join; @@ -192,7 +192,7 @@ impl SqlToRel<'_, S> { } None => Arc::clone(&old_from_schema), }; - planner_context.set_outer_query_schema(new_query_schema); + planner_context.append_outer_query_schema(new_query_schema); let plan = self.create_relation(subquery, planner_context)?; let outer_ref_columns = plan.all_out_ref_exprs(); From c2caf3744520c6b218e93ed6fa6aaf3fc5e8f408 Mon Sep 17 00:00:00 2001 From: Duong Cong Toai Date: Sun, 25 May 2025 17:23:33 +0200 Subject: [PATCH 4/9] chore: rm debug details --- .../expr/src/logical_plan/invariants.rs | 7 +-- datafusion/sqllogictest/test_files/debug.slt | 52 ------------------- 2 files changed, 2 insertions(+), 57 deletions(-) delete mode 100644 datafusion/sqllogictest/test_files/debug.slt diff --git a/datafusion/expr/src/logical_plan/invariants.rs b/datafusion/expr/src/logical_plan/invariants.rs index e70b261a6a1f..bb51f9dc35db 100644 --- a/datafusion/expr/src/logical_plan/invariants.rs +++ b/datafusion/expr/src/logical_plan/invariants.rs @@ -215,12 +215,9 @@ pub fn check_subquery_expr( Ok(()) } } - any => { - println!("here {any}"); - plan_err!( - "Correlated scalar subquery can only be used in Projection, Filter, Aggregate plan nodes123 {any}" + _ => plan_err!( + "Correlated scalar subquery can only be used in Projection, Filter, Aggregate plan" ) - } }?; } check_correlations_in_subquery(inner_plan) diff --git a/datafusion/sqllogictest/test_files/debug.slt b/datafusion/sqllogictest/test_files/debug.slt deleted file mode 100644 index 48fd16bc0fd9..000000000000 --- a/datafusion/sqllogictest/test_files/debug.slt +++ /dev/null @@ -1,52 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at - -# http://www.apache.org/licenses/LICENSE-2.0 - -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -# make sure to a batch size smaller than row number of the table. -statement ok -set datafusion.execution.batch_size = 2; - -statement ok -CREATE TABLE employees ( - employee_id INTEGER, - employee_name VARCHAR, - dept_id INTEGER, - salary DECIMAL -); - -statement ok -CREATE TABLE project_assignments ( - project_id INTEGER, - employee_id INTEGER, - priority INTEGER -); - - - -query TT -explain SELECT e1.employee_name, e1.salary -FROM employees e1 -WHERE e1.salary > ( - SELECT AVG(e2.salary) - FROM employees e2 - WHERE e2.dept_id = e1.dept_id - AND e2.salary > ( - SELECT AVG(e3.salary) - FROM employees e3 - WHERE e3.dept_id = e1.dept_id - ) -); ----- \ No newline at end of file From cec566a22836cb996cc3e0681b83e0685fe7e736 Mon Sep 17 00:00:00 2001 From: Duong Cong Toai Date: Sun, 25 May 2025 18:50:33 +0200 Subject: [PATCH 5/9] fix: breaking changes --- .../expr/src/logical_plan/invariants.rs | 8 ++-- datafusion/optimizer/src/push_down_filter.rs | 8 ++-- datafusion/sql/src/expr/identifier.rs | 6 +-- datafusion/sql/src/planner.rs | 48 +++++++++++++++---- datafusion/sql/src/select.rs | 2 +- 5 files changed, 53 insertions(+), 19 deletions(-) diff --git a/datafusion/expr/src/logical_plan/invariants.rs b/datafusion/expr/src/logical_plan/invariants.rs index bb51f9dc35db..0d425c57f55b 100644 --- a/datafusion/expr/src/logical_plan/invariants.rs +++ b/datafusion/expr/src/logical_plan/invariants.rs @@ -209,15 +209,17 @@ pub fn check_subquery_expr( if group_expr.contains(expr) && !aggr_expr.contains(expr) { // TODO revisit this validation logic plan_err!( - "Correlated scalar subquery in the GROUP BY clause must also be in the aggregate expressions" + "Correlated scalar subquery in the GROUP BY clause must \ + also be in the aggregate expressions" ) } else { Ok(()) } } _ => plan_err!( - "Correlated scalar subquery can only be used in Projection, Filter, Aggregate plan" - ) + "Correlated scalar subquery can only be used in Projection, \ + Filter, Aggregate plan nodes" + ), }?; } check_correlations_in_subquery(inner_plan) diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index 9555c9d2baac..499cfeebe421 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -1093,7 +1093,9 @@ impl OptimizerRule for PushDownFilter { // (i.e in the case of recursive subquery) // this function may accidentally pushdown the subquery expr as well // until then, we have to exclude these exprs here - .partition(|pred| pred.is_volatile() || has_subquery(pred)); + .partition(|pred| { + pred.is_volatile() || has_scalar_subquery(pred) + }); // Check which non-volatile filters are supported by source let supported_filters = scan @@ -1386,9 +1388,9 @@ fn contain(e: &Expr, check_map: &HashMap) -> bool { is_contain } -fn has_subquery(expr: &Expr) -> bool { +fn has_scalar_subquery(expr: &Expr) -> bool { expr.exists(|e| match e { - Expr::InSubquery(_) | Expr::Exists(_) | Expr::ScalarSubquery(_) => Ok(true), + Expr::ScalarSubquery(_) => Ok(true), _ => Ok(false), }) .unwrap() diff --git a/datafusion/sql/src/expr/identifier.rs b/datafusion/sql/src/expr/identifier.rs index 1994c9075a5c..0d1ef1ca951b 100644 --- a/datafusion/sql/src/expr/identifier.rs +++ b/datafusion/sql/src/expr/identifier.rs @@ -69,7 +69,7 @@ impl SqlToRel<'_, S> { } // Check the outer query schema - for outer in planner_context.outer_query_schema() { + for outer in planner_context.outer_queries_schemas() { if let Ok((qualifier, field)) = outer.qualified_field_with_unqualified_name(normalize_ident.as_str()) { @@ -165,10 +165,10 @@ impl SqlToRel<'_, S> { not_impl_err!("compound identifier: {ids:?}") } else { // Check the outer_query_schema and try to find a match - let outer_schemas = planner_context.outer_query_schema(); + let outer_schemas = planner_context.outer_queries_schemas(); let mut maybe_result = None; if !outer_schemas.is_empty() { - for outer in planner_context.outer_query_schema() { + for outer in planner_context.outer_queries_schemas() { let search_result = search_dfschema(&ids, outer); let result = match search_result { // Found matching field with spare identifier(s) for nested field(s) in structure diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs index 771a17c16639..1b9ff438e0d2 100644 --- a/datafusion/sql/src/planner.rs +++ b/datafusion/sql/src/planner.rs @@ -198,8 +198,16 @@ pub struct PlannerContext { /// Map of CTE name to logical plan of the WITH clause. /// Use `Arc` to allow cheap cloning ctes: HashMap>, + + /// The queries schemas of outer query relations, used to resolve the outer referenced + /// columns in subquery (recursive aware) + outer_queries_schemas_stack: Vec, + /// The query schema of the outer query plan, used to resolve the columns in subquery - outer_query_schema: Vec, + /// This field is maintained to support deprecated functions + /// `outer_query_schema` and `set_outer_query_schema` + /// which is only aware of the adjacent outer relation + outer_query_schema: Option, /// The joined schemas of all FROM clauses planned so far. When planning LATERAL /// FROM clauses, this should become a suffix of the `outer_query_schema`. outer_from_schema: Option, @@ -219,7 +227,8 @@ impl PlannerContext { Self { prepare_param_data_types: Arc::new(vec![]), ctes: HashMap::new(), - outer_query_schema: vec![], + outer_query_schema: None, + outer_queries_schemas_stack: vec![], outer_from_schema: None, create_table_schema: None, } @@ -235,8 +244,29 @@ impl PlannerContext { } // Return a reference to the outer query's schema - pub fn outer_query_schema(&self) -> Vec<&DFSchema> { - self.outer_query_schema + // This function is only compatible with + #[deprecated(note = "Use outer_queries_schemas instead")] + pub fn outer_query_schema(&self) -> Option<&DFSchema> { + self.outer_query_schema.as_ref().map(|s| s.as_ref()) + } + + /// Sets the outer query schema, returning the existing one, if + /// any + #[deprecated( + note = "This struct is now aware of a stack of schemas, check pop_outer_query_schema" + )] + pub fn set_outer_query_schema( + &mut self, + mut schema: Option, + ) -> Option { + std::mem::swap(&mut self.outer_query_schema, &mut schema); + schema + } + + /// Return the stack of outer relations' schemas, the outer most + /// relation are at the first entry + pub fn outer_queries_schemas(&self) -> Vec<&DFSchema> { + self.outer_queries_schemas_stack .iter() .map(|sc| sc.as_ref()) .collect() @@ -245,17 +275,17 @@ impl PlannerContext { /// Sets the outer query schema, returning the existing one, if /// any pub fn append_outer_query_schema(&mut self, schema: DFSchemaRef) { - self.outer_query_schema.push(schema); + self.outer_queries_schemas_stack.push(schema); } + /// The schema of the adjacent outer relation pub fn latest_outer_query_schema(&mut self) -> Option { - self.outer_query_schema.last().cloned() + self.outer_queries_schemas_stack.last().cloned() } - /// Sets the outer query schema, returning the existing one, if - /// any + /// Remove the schema of the adjacent outer relation pub fn pop_outer_query_schema(&mut self) -> Option { - self.outer_query_schema.pop() + self.outer_queries_schemas_stack.pop() } pub fn set_table_schema( diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs index 242b77a32a6f..2bb0c34aff78 100644 --- a/datafusion/sql/src/select.rs +++ b/datafusion/sql/src/select.rs @@ -509,7 +509,7 @@ impl SqlToRel<'_, S> { let filter_expr = self.sql_to_expr(predicate_expr, plan.schema(), planner_context)?; - let outer_query_schema_vec = planner_context.outer_query_schema(); + let outer_query_schema_vec = planner_context.outer_queries_schemas(); // Check for aggregation functions let aggregate_exprs = From 699424d3691c53c94eb5c5cb70879e8d8b4ab4c3 Mon Sep 17 00:00:00 2001 From: Duong Cong Toai Date: Sun, 25 May 2025 22:38:51 +0200 Subject: [PATCH 6/9] fix: lateral join losing its outer ref columns --- datafusion/sql/src/expr/identifier.rs | 2 +- datafusion/sql/src/planner.rs | 7 ++----- datafusion/sql/src/relation/mod.rs | 8 ++++++-- datafusion/sql/src/select.rs | 16 +++++++++++++--- 4 files changed, 22 insertions(+), 11 deletions(-) diff --git a/datafusion/sql/src/expr/identifier.rs b/datafusion/sql/src/expr/identifier.rs index 0d1ef1ca951b..9ee7b22e6dde 100644 --- a/datafusion/sql/src/expr/identifier.rs +++ b/datafusion/sql/src/expr/identifier.rs @@ -169,7 +169,7 @@ impl SqlToRel<'_, S> { let mut maybe_result = None; if !outer_schemas.is_empty() { for outer in planner_context.outer_queries_schemas() { - let search_result = search_dfschema(&ids, outer); + let search_result = search_dfschema(&ids, &outer); let result = match search_result { // Found matching field with spare identifier(s) for nested field(s) in structure Some((field, qualifier, nested_names)) diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs index 1b9ff438e0d2..deef1c38ef55 100644 --- a/datafusion/sql/src/planner.rs +++ b/datafusion/sql/src/planner.rs @@ -265,11 +265,8 @@ impl PlannerContext { /// Return the stack of outer relations' schemas, the outer most /// relation are at the first entry - pub fn outer_queries_schemas(&self) -> Vec<&DFSchema> { - self.outer_queries_schemas_stack - .iter() - .map(|sc| sc.as_ref()) - .collect() + pub fn outer_queries_schemas(&self) -> Vec { + self.outer_queries_schemas_stack.to_vec() } /// Sets the outer query schema, returning the existing one, if diff --git a/datafusion/sql/src/relation/mod.rs b/datafusion/sql/src/relation/mod.rs index 9acb3897c033..e494404a50a7 100644 --- a/datafusion/sql/src/relation/mod.rs +++ b/datafusion/sql/src/relation/mod.rs @@ -184,8 +184,9 @@ impl SqlToRel<'_, S> { let old_from_schema = planner_context .set_outer_from_schema(None) .unwrap_or_else(|| Arc::new(DFSchema::empty())); - let new_query_schema = match planner_context.pop_outer_query_schema() { - Some(old_query_schema) => { + let outer_query_schema = planner_context.pop_outer_query_schema(); + let new_query_schema = match outer_query_schema { + Some(ref old_query_schema) => { let mut new_query_schema = old_from_schema.as_ref().clone(); new_query_schema.merge(old_query_schema.as_ref()); Arc::new(new_query_schema) @@ -198,6 +199,9 @@ impl SqlToRel<'_, S> { let outer_ref_columns = plan.all_out_ref_exprs(); planner_context.pop_outer_query_schema(); + if let Some(schema) = outer_query_schema { + planner_context.append_outer_query_schema(schema); + } planner_context.set_outer_from_schema(Some(old_from_schema)); // We can omit the subquery wrapper if there are no columns diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs index 2bb0c34aff78..ac2fea310933 100644 --- a/datafusion/sql/src/select.rs +++ b/datafusion/sql/src/select.rs @@ -29,7 +29,7 @@ use crate::utils::{ use datafusion_common::error::DataFusionErrorBuilder; use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion}; -use datafusion_common::{not_impl_err, plan_err, Result}; +use datafusion_common::{not_impl_err, plan_err, DFSchema, Result}; use datafusion_common::{RecursionUnnestOption, UnnestOptions}; use datafusion_expr::expr::{Alias, PlannedReplaceSelectItem, WildcardOptions}; use datafusion_expr::expr_rewriter::{ @@ -507,9 +507,9 @@ impl SqlToRel<'_, S> { Some(predicate_expr) => { let fallback_schemas = plan.fallback_normalize_schemas(); + let outer_query_schema_vec = planner_context.outer_queries_schemas(); let filter_expr = self.sql_to_expr(predicate_expr, plan.schema(), planner_context)?; - let outer_query_schema_vec = planner_context.outer_queries_schemas(); // Check for aggregation functions let aggregate_exprs = @@ -522,9 +522,19 @@ impl SqlToRel<'_, S> { let mut using_columns = HashSet::new(); expr_to_columns(&filter_expr, &mut using_columns)?; + let mut schema_stack: Vec> = + vec![vec![plan.schema()], fallback_schemas]; + for sc in outer_query_schema_vec.iter().rev() { + schema_stack.push(vec![sc.as_ref()]); + } + let filter_expr = normalize_col_with_schemas_and_ambiguity_check( filter_expr, - &[&[plan.schema()], &fallback_schemas, &outer_query_schema_vec], + schema_stack + .iter() + .map(|sc| sc.as_slice()) + .collect::>() + .as_slice(), &[using_columns], )?; From 4edaf616986c19c79491b3e99e613fb9fef590f5 Mon Sep 17 00:00:00 2001 From: Duong Cong Toai Date: Sun, 25 May 2025 22:56:11 +0200 Subject: [PATCH 7/9] test: more test case for other decorrelation --- datafusion/sqllogictest/test_files/joins.slt | 24 ++++++++ .../sqllogictest/test_files/subquery.slt | 55 ++++++++++++++++++- 2 files changed, 78 insertions(+), 1 deletion(-) diff --git a/datafusion/sqllogictest/test_files/joins.slt b/datafusion/sqllogictest/test_files/joins.slt index ddf701ba04ef..d40e745f6b65 100644 --- a/datafusion/sqllogictest/test_files/joins.slt +++ b/datafusion/sqllogictest/test_files/joins.slt @@ -4689,6 +4689,30 @@ logical_plan 08)----------TableScan: j3 projection=[j3_string, j3_id] physical_plan_error This feature is not implemented: Physical plan does not support logical expression OuterReferenceColumn(Int32, Column { relation: Some(Bare { table: "j1" }), name: "j1_id" }) +# 2 nested lateral join with the deepest join referencing the outer most relation +query TT +explain SELECT * FROM j1 j1_outer, LATERAL ( + SELECT * FROM j1 j1_inner, LATERAL ( + SELECT * FROM j2 WHERE j1_inner.j1_id = j2_id and j1_outer.j1_id=j2_id + ) as j2 +) as j2; +---- +logical_plan +01)Cross Join: +02)--SubqueryAlias: j1_outer +03)----TableScan: j1 projection=[j1_string, j1_id] +04)--SubqueryAlias: j2 +05)----Subquery: +06)------Cross Join: +07)--------SubqueryAlias: j1_inner +08)----------TableScan: j1 projection=[j1_string, j1_id] +09)--------SubqueryAlias: j2 +10)----------Subquery: +11)------------Filter: outer_ref(j1_inner.j1_id) = j2.j2_id AND outer_ref(j1_outer.j1_id) = j2.j2_id +12)--------------TableScan: j2 projection=[j2_string, j2_id] +physical_plan_error This feature is not implemented: Physical plan does not support logical expression OuterReferenceColumn(Int32, Column { relation: Some(Bare { table: "j1_inner" }), name: "j1_id" }) + + query TT explain SELECT * FROM j1, LATERAL (SELECT 1) AS j2; ---- diff --git a/datafusion/sqllogictest/test_files/subquery.slt b/datafusion/sqllogictest/test_files/subquery.slt index c2620404f1dc..b8d5f0e75351 100644 --- a/datafusion/sqllogictest/test_files/subquery.slt +++ b/datafusion/sqllogictest/test_files/subquery.slt @@ -1483,7 +1483,7 @@ logical_plan statement count 0 drop table person; -# correlated_recursive_scalar_subquery_with_level_3_subquery_referencing_level1_relation +# correlated_recursive_scalar_subquery_with_level_3_scalar_subquery_referencing_level1_relation query TT explain select c_custkey from customer where c_acctbal < ( @@ -1511,3 +1511,56 @@ logical_plan 13)----------------------Filter: lineitem.l_orderkey = outer_ref(orders.o_orderkey) AND lineitem.l_extendedprice < outer_ref(customer.c_acctbal) 14)------------------------TableScan: lineitem, partial_filters=[lineitem.l_orderkey = outer_ref(orders.o_orderkey), lineitem.l_extendedprice < outer_ref(customer.c_acctbal)] 15)----------------TableScan: orders projection=[o_orderkey, o_custkey, o_totalprice] + +# correlated_recursive_scalar_subquery_with_level_3_exists_subquery_referencing_level1_relation +query TT +explain select c_custkey from customer +where c_acctbal < ( + select sum(o_totalprice) from orders + where o_custkey = c_custkey + and exists ( + select * from lineitem where l_orderkey = o_orderkey + and l_extendedprice < c_acctbal + ) +) order by c_custkey; +---- +logical_plan +01)Sort: customer.c_custkey ASC NULLS LAST +02)--Projection: customer.c_custkey +03)----Inner Join: customer.c_custkey = __scalar_sq_2.o_custkey Filter: CAST(customer.c_acctbal AS Decimal128(25, 2)) < __scalar_sq_2.sum(orders.o_totalprice) +04)------TableScan: customer projection=[c_custkey, c_acctbal] +05)------SubqueryAlias: __scalar_sq_2 +06)--------Projection: sum(orders.o_totalprice), orders.o_custkey +07)----------Aggregate: groupBy=[[orders.o_custkey]], aggr=[[sum(orders.o_totalprice)]] +08)------------Projection: orders.o_custkey, orders.o_totalprice +09)--------------LeftSemi Join: orders.o_orderkey = __correlated_sq_1.l_orderkey Filter: __correlated_sq_1.l_extendedprice < customer.c_acctbal +10)----------------TableScan: orders projection=[o_orderkey, o_custkey, o_totalprice] +11)----------------SubqueryAlias: __correlated_sq_1 +12)------------------TableScan: lineitem projection=[l_orderkey, l_extendedprice] + +# correlated_recursive_scalar_subquery_with_level_3_in_subquery_referencing_level1_relation +query TT +explain select c_custkey from customer +where c_acctbal < ( + select sum(o_totalprice) from orders + where o_custkey = c_custkey + and o_totalprice in ( + select l_extendedprice as price from lineitem where l_orderkey = o_orderkey + and l_extendedprice < c_acctbal + ) +) order by c_custkey; +---- +logical_plan +01)Sort: customer.c_custkey ASC NULLS LAST +02)--Projection: customer.c_custkey +03)----Inner Join: customer.c_custkey = __scalar_sq_2.o_custkey Filter: CAST(customer.c_acctbal AS Decimal128(25, 2)) < __scalar_sq_2.sum(orders.o_totalprice) +04)------TableScan: customer projection=[c_custkey, c_acctbal] +05)------SubqueryAlias: __scalar_sq_2 +06)--------Projection: sum(orders.o_totalprice), orders.o_custkey +07)----------Aggregate: groupBy=[[orders.o_custkey]], aggr=[[sum(orders.o_totalprice)]] +08)------------Projection: orders.o_custkey, orders.o_totalprice +09)--------------LeftSemi Join: orders.o_totalprice = __correlated_sq_1.price, orders.o_orderkey = __correlated_sq_1.l_orderkey Filter: __correlated_sq_1.l_extendedprice < customer.c_acctbal +10)----------------TableScan: orders projection=[o_orderkey, o_custkey, o_totalprice] +11)----------------SubqueryAlias: __correlated_sq_1 +12)------------------Projection: lineitem.l_extendedprice AS price, lineitem.l_extendedprice, lineitem.l_orderkey +13)--------------------TableScan: lineitem projection=[l_orderkey, l_extendedprice] \ No newline at end of file From 244a77865588e64cc164472466f52080ff62e36d Mon Sep 17 00:00:00 2001 From: Duong Cong Toai Date: Mon, 26 May 2025 06:35:10 +0200 Subject: [PATCH 8/9] doc: better comments --- datafusion/sql/src/planner.rs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs index deef1c38ef55..c7bf248e1b1a 100644 --- a/datafusion/sql/src/planner.rs +++ b/datafusion/sql/src/planner.rs @@ -243,15 +243,19 @@ impl PlannerContext { self } - // Return a reference to the outer query's schema - // This function is only compatible with + /// Return a reference to the outer query's schema + /// This function should not be used together with + /// `outer_queries_schemas`, `append_outer_query_schema` + /// `latest_outer_query_schema` and `pop_outer_query_schema` #[deprecated(note = "Use outer_queries_schemas instead")] pub fn outer_query_schema(&self) -> Option<&DFSchema> { self.outer_query_schema.as_ref().map(|s| s.as_ref()) } /// Sets the outer query schema, returning the existing one, if - /// any + /// any, this function should not be used together with + /// `outer_queries_schemas`, `append_outer_query_schema` + /// `latest_outer_query_schema` and `pop_outer_query_schema` #[deprecated( note = "This struct is now aware of a stack of schemas, check pop_outer_query_schema" )] From 8a8da4028df2a023d05410c183aca972131ab8ac Mon Sep 17 00:00:00 2001 From: Duong Cong Toai Date: Sat, 22 Nov 2025 15:24:40 +0100 Subject: [PATCH 9/9] fix: test --- datafusion/sql/src/relation/mod.rs | 5 +++-- datafusion/sqllogictest/test_files/joins.slt | 3 +-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/datafusion/sql/src/relation/mod.rs b/datafusion/sql/src/relation/mod.rs index 482816b553c3..eb2b94191f7f 100644 --- a/datafusion/sql/src/relation/mod.rs +++ b/datafusion/sql/src/relation/mod.rs @@ -159,9 +159,10 @@ impl SqlToRel<'_, S> { } => { let tbl_func_ref = self.object_name_to_table_reference(name)?; let schema = planner_context - .outer_query_schema() + .outer_queries_schemas() + .last() .cloned() - .unwrap_or_else(DFSchema::empty); + .unwrap_or_else(|| Arc::new(DFSchema::empty())); let func_args = args .into_iter() .map(|arg| match arg { diff --git a/datafusion/sqllogictest/test_files/joins.slt b/datafusion/sqllogictest/test_files/joins.slt index 4dddd837c2ab..8c631326d2cf 100644 --- a/datafusion/sqllogictest/test_files/joins.slt +++ b/datafusion/sqllogictest/test_files/joins.slt @@ -4731,8 +4731,7 @@ logical_plan 10)----------Subquery: 11)------------Filter: outer_ref(j1_inner.j1_id) = j2.j2_id AND outer_ref(j1_outer.j1_id) = j2.j2_id 12)--------------TableScan: j2 projection=[j2_string, j2_id] -physical_plan_error This feature is not implemented: Physical plan does not support logical expression OuterReferenceColumn(Int32, Column { relation: Some(Bare { table: "j1_inner" }), name: "j1_id" }) - +physical_plan_error This feature is not implemented: Physical plan does not support logical expression OuterReferenceColumn(Field { name: "j1_id", data_type: Int32, nullable: true }, Column { relation: Some(Bare { table: "j1_inner" }), name: "j1_id" }) query TT explain SELECT * FROM j1, LATERAL (SELECT 1) AS j2;