Skip to content

Commit 6ed7c25

Browse files
committed
xx
1 parent 8450f36 commit 6ed7c25

File tree

2 files changed

+51
-21
lines changed

2 files changed

+51
-21
lines changed

src/query/service/src/physical_plans/physical_hash_join.rs

Lines changed: 41 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ pub struct HashJoin {
150150

151151
pub runtime_filter: PhysicalRuntimeFilters,
152152
pub broadcast_id: Option<u32>,
153-
pub nested_loop_filter: NestedLoopFilterInfo,
153+
pub nested_loop_filter: Option<NestedLoopFilterInfo>,
154154
}
155155

156156
#[typetag::serde]
@@ -1202,7 +1202,11 @@ impl PhysicalPlanBuilder {
12021202
probe_schema: &DataSchema,
12031203
build_schema: &DataSchema,
12041204
target_schema: &DataSchema,
1205-
) -> Result<NestedLoopFilterInfo> {
1205+
) -> Result<Option<NestedLoopFilterInfo>> {
1206+
if matches!(join.join_type, JoinType::Inner) {
1207+
return Ok(None);
1208+
}
1209+
12061210
let merged = DataSchema::new(
12071211
probe_schema
12081212
.fields
@@ -1212,19 +1216,40 @@ impl PhysicalPlanBuilder {
12121216
.collect(),
12131217
);
12141218

1215-
let predicates = join
1216-
.non_equi_conditions
1217-
.iter()
1218-
.map(|c| Ok(c.clone()))
1219-
.chain(join.equi_conditions.iter().map(condition_to_expr))
1220-
.map(|scalar| resolve_scalar(&scalar?, &merged))
1221-
.collect::<Result<Vec<_>>>()
1222-
.map_err(|err| {
1223-
err.add_message(format!(
1219+
let mut predicates =
1220+
Vec::with_capacity(join.equi_conditions.len() + join.non_equi_conditions.len());
1221+
1222+
for condition in &join.equi_conditions {
1223+
let scalar = condition_to_expr(condition)?;
1224+
match resolve_scalar(&scalar, &merged) {
1225+
Ok(expr) => predicates.push(expr),
1226+
Err(_)
1227+
if condition
1228+
.left
1229+
.data_type()
1230+
.map(|data_type| data_type.remove_nullable().is_bitmap())
1231+
.unwrap_or_default() =>
1232+
{
1233+
// no function matches signature `eq(Bitmap NULL, Bitmap NULL)
1234+
return Ok(None);
1235+
}
1236+
Err(err) => {
1237+
return Err(err.add_message(format!(
12241238
"Failed build nested loop filter schema: {merged:#?} equi_conditions: {:#?}",
12251239
join.equi_conditions
1240+
)))
1241+
}
1242+
}
1243+
}
1244+
1245+
for scalar in &join.non_equi_conditions {
1246+
predicates.push(resolve_scalar(scalar, &merged).map_err(|err|{
1247+
err.add_message(format!(
1248+
"Failed build nested loop filter schema: {merged:#?} non_equi_conditions: {:#?}",
1249+
join.non_equi_conditions
12261250
))
1227-
})?;
1251+
})?);
1252+
}
12281253

12291254
let projection = target_schema
12301255
.fields
@@ -1237,10 +1262,10 @@ impl PhysicalPlanBuilder {
12371262
))
12381263
})?;
12391264

1240-
Ok(NestedLoopFilterInfo {
1265+
Ok(Some(NestedLoopFilterInfo {
12411266
predicates,
12421267
projection,
1243-
})
1268+
}))
12441269
}
12451270

12461271
pub async fn build_hash_join(
@@ -1372,11 +1397,11 @@ fn condition_to_expr(condition: &JoinEquiCondition) -> Result<ScalarExpr> {
13721397
let right_type = condition.right.data_type()?;
13731398

13741399
let arguments = match (&left_type, &right_type) {
1375-
(DataType::Nullable(left), right) if **left == *right => vec![
1400+
(DataType::Nullable(box left), right) if left == right => vec![
13761401
condition.left.clone(),
13771402
condition.right.clone().unify_to_data_type(&left_type),
13781403
],
1379-
(left, DataType::Nullable(right)) if *left == **right => vec![
1404+
(left, DataType::Nullable(box right)) if left == right => vec![
13801405
condition.left.clone().unify_to_data_type(&right_type),
13811406
condition.right.clone(),
13821407
],

src/query/service/src/pipelines/processors/transforms/hash_join/desc.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ pub struct HashJoinDesc {
6666
pub(crate) probe_projection: ColumnSet,
6767
pub(crate) probe_to_build: Vec<(usize, (bool, bool))>,
6868
pub(crate) build_schema: DataSchemaRef,
69-
pub(crate) nested_loop_filter: NestedLoopFilterInfo,
69+
pub(crate) nested_loop_filter: Option<NestedLoopFilterInfo>,
7070
}
7171

7272
#[derive(Debug, Clone)]
@@ -270,9 +270,15 @@ impl HashJoinDesc {
270270
function_ctx: &FunctionContext,
271271
block_size: usize,
272272
) -> Result<Option<(FilterExecutor, Option<Vec<FieldIndex>>)>> {
273-
let predicates = self
274-
.nested_loop_filter
275-
.predicates
273+
let Some(NestedLoopFilterInfo {
274+
predicates,
275+
projection,
276+
}) = &self.nested_loop_filter
277+
else {
278+
return Ok(None);
279+
};
280+
281+
let predicates = predicates
276282
.iter()
277283
.map(|x| Ok(x.as_expr(&BUILTIN_FUNCTIONS)))
278284
.reduce(|lhs, rhs| {
@@ -283,7 +289,6 @@ impl HashJoinDesc {
283289
return Ok(None);
284290
};
285291

286-
let projection = &self.nested_loop_filter.projection;
287292
let reorder = if !projection.is_sorted() {
288293
let mut mapper = projection.iter().cloned().enumerate().collect::<Vec<_>>();
289294
mapper.sort_by_key(|(_, field)| *field);

0 commit comments

Comments
 (0)