Skip to content

Commit 17a95ab

Browse files
authored
fix: runtime filters not work when probe keys are simple casts of non-nullable columns to nullable types (#19020)
* support merge * fix * add logic test * update test * update test * update test
1 parent 6a78ba1 commit 17a95ab

File tree

7 files changed

+118
-10
lines changed

7 files changed

+118
-10
lines changed

โ€Žsrc/query/service/src/physical_plans/runtime_filter/builder.rsโ€Ž

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -102,10 +102,12 @@ pub async fn build_runtime_filter(
102102

103103
let build_side = s_expr.build_side_child();
104104
let build_side_data_distribution = build_side.get_data_distribution()?;
105-
if build_side_data_distribution
106-
.as_ref()
107-
.is_some_and(|e| !matches!(e, Exchange::Broadcast | Exchange::NodeToNodeHash(_)))
108-
{
105+
if build_side_data_distribution.as_ref().is_some_and(|e| {
106+
!matches!(
107+
e,
108+
Exchange::Broadcast | Exchange::NodeToNodeHash(_) | Exchange::Merge
109+
)
110+
}) {
109111
return Ok(Default::default());
110112
}
111113

@@ -124,9 +126,16 @@ pub async fn build_runtime_filter(
124126
})
125127
})
126128
{
127-
// Skip if not a column reference
128-
if probe_key.as_column_ref().is_none() {
129-
continue;
129+
// Skip if the probe expression is neither a direct column reference nor a
130+
// cast from not null to nullable type (e.g. CAST(col AS Nullable(T))).
131+
match &probe_key {
132+
RemoteExpr::ColumnRef { .. } => {}
133+
RemoteExpr::Cast {
134+
expr: box RemoteExpr::ColumnRef { data_type, .. },
135+
dest_type,
136+
..
137+
} if &dest_type.remove_nullable() == data_type => {}
138+
_ => continue,
130139
}
131140

132141
let probe_targets =

โ€Žsrc/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/convert.rsโ€Ž

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,15 @@ fn build_inlist_filter(inlist: Column, probe_key: &Expr<String>) -> Result<Expr<
123123
data_type: DataType::Boolean,
124124
}));
125125
}
126-
let probe_key = probe_key.as_column_ref().unwrap();
126+
let probe_key = match probe_key {
127+
Expr::ColumnRef(col) => col,
128+
// Support simple cast that only changes nullability, e.g. CAST(col AS Nullable(T))
129+
Expr::Cast(cast) => match cast.expr.as_ref() {
130+
Expr::ColumnRef(col) => col,
131+
_ => unreachable!(),
132+
},
133+
_ => unreachable!(),
134+
};
127135

128136
let raw_probe_key = RawExpr::ColumnRef {
129137
span: probe_key.span,
@@ -249,7 +257,15 @@ async fn build_bloom_filter(
249257
probe_key: &Expr<String>,
250258
max_threads: usize,
251259
) -> Result<RuntimeFilterBloom> {
252-
let probe_key = probe_key.as_column_ref().unwrap();
260+
let probe_key = match probe_key {
261+
Expr::ColumnRef(col) => col,
262+
// Support simple cast that only changes nullability, e.g. CAST(col AS Nullable(T))
263+
Expr::Cast(cast) => match cast.expr.as_ref() {
264+
Expr::ColumnRef(col) => col,
265+
_ => unreachable!(),
266+
},
267+
_ => unreachable!(),
268+
};
253269
let column_name = probe_key.id.to_string();
254270
let total_items = bloom.len();
255271

โ€Žsrc/query/service/src/pipelines/processors/transforms/hash_join/util.rsโ€Ž

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,15 @@ pub(crate) fn min_max_filter(
213213
max: Scalar,
214214
probe_key: &Expr<String>,
215215
) -> Result<Expr<String>> {
216-
let probe_key = probe_key.as_column_ref().unwrap();
216+
let probe_key = match probe_key {
217+
Expr::ColumnRef(col) => col,
218+
// Support simple cast that only changes nullability, e.g. CAST(col AS Nullable(T))
219+
Expr::Cast(cast) => match cast.expr.as_ref() {
220+
Expr::ColumnRef(col) => col,
221+
_ => unreachable!(),
222+
},
223+
_ => unreachable!(),
224+
};
217225
let raw_probe_key = RawExpr::ColumnRef {
218226
span: probe_key.span,
219227
id: probe_key.id.to_string(),

โ€Žtests/sqllogictests/suites/mode/cluster/exchange.testโ€Ž

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,8 @@ Exchange
189189
โ”œโ”€โ”€ probe keys: [CAST(t1.number (#2) AS UInt64 NULL)]
190190
โ”œโ”€โ”€ keys is null equal: [false]
191191
โ”œโ”€โ”€ filters: []
192+
โ”œโ”€โ”€ build join filters:
193+
โ”‚ โ””โ”€โ”€ filter id:0, build key:t.number (#1), probe targets:[CAST(t1.number (#2) AS UInt64 NULL)@scan1], filter type:bloom,inlist,min_max
192194
โ”œโ”€โ”€ estimated rows: 2.00
193195
โ”œโ”€โ”€ Exchange(Build)
194196
โ”‚ โ”œโ”€โ”€ output columns: [sum(number) (#1), numbers.number (#0)]
@@ -224,6 +226,7 @@ Exchange
224226
โ”œโ”€โ”€ partitions total: 1
225227
โ”œโ”€โ”€ partitions scanned: 1
226228
โ”œโ”€โ”€ push downs: [filters: [], limit: NONE]
229+
โ”œโ”€โ”€ apply join filters: [#0]
227230
โ””โ”€โ”€ estimated rows: 2.00
228231

229232
query T
@@ -277,6 +280,8 @@ Fragment 2:
277280
โ”œโ”€โ”€ probe keys: [CAST(t1.number (#2) AS UInt64 NULL)]
278281
โ”œโ”€โ”€ keys is null equal: [false]
279282
โ”œโ”€โ”€ filters: []
283+
โ”œโ”€โ”€ build join filters:
284+
โ”‚ โ””โ”€โ”€ filter id:0, build key:t.number (#1), probe targets:[CAST(t1.number (#2) AS UInt64 NULL)@scan1], filter type:bloom,inlist,min_max
280285
โ”œโ”€โ”€ estimated rows: 2.00
281286
โ”œโ”€โ”€ ExchangeSource(Build)
282287
โ”‚ โ”œโ”€โ”€ output columns: [sum(number) (#1), numbers.number (#0)]
@@ -290,6 +295,7 @@ Fragment 2:
290295
โ”œโ”€โ”€ partitions total: 1
291296
โ”œโ”€โ”€ partitions scanned: 1
292297
โ”œโ”€โ”€ push downs: [filters: [], limit: NONE]
298+
โ”œโ”€โ”€ apply join filters: [#0]
293299
โ””โ”€โ”€ estimated rows: 2.00
294300
(empty)
295301
(empty)

โ€Žtests/sqllogictests/suites/mode/standalone/explain/infer_filter.testโ€Ž

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -882,6 +882,8 @@ HashJoin
882882
โ”œโ”€โ”€ probe keys: [CAST(t1.id (#0) AS Int64 NULL)]
883883
โ”œโ”€โ”€ keys is null equal: [false]
884884
โ”œโ”€โ”€ filters: []
885+
โ”œโ”€โ”€ build join filters:
886+
โ”‚ โ””โ”€โ”€ filter id:0, build key:CAST(t2.id (#1) AS Int64 NULL), probe targets:[CAST(t1.id (#0) AS Int64 NULL)@scan0], filter type:bloom,inlist,min_max
885887
โ”œโ”€โ”€ estimated rows: 1.00
886888
โ”œโ”€โ”€ Filter(Build)
887889
โ”‚ โ”œโ”€โ”€ output columns: [t2.id (#1)]
@@ -912,6 +914,7 @@ HashJoin
912914
โ”œโ”€โ”€ partitions scanned: 1
913915
โ”œโ”€โ”€ pruning stats: [segments: <range pruning: 1 to 1>, blocks: <range pruning: 1 to 1, bloom pruning: 1 to 1>]
914916
โ”œโ”€โ”€ push downs: [filters: [t1.id (#0) = 869550529], limit: NONE]
917+
โ”œโ”€โ”€ apply join filters: [#0]
915918
โ””โ”€โ”€ estimated rows: 1.00
916919

917920
statement ok
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
statement ok
2+
drop table if exists rf_cast_probe;
3+
4+
statement ok
5+
drop table if exists rf_cast_build;
6+
7+
statement ok
8+
create table rf_cast_build (id Nullable(Int32));
9+
10+
statement ok
11+
create table rf_cast_probe (a Int not null);
12+
13+
statement ok
14+
insert into rf_cast_build values (1), (3), (5);
15+
16+
statement ok
17+
insert into rf_cast_probe values (1), (2), (3);
18+
19+
query T
20+
EXPLAIN SELECT *
21+
FROM rf_cast_probe p
22+
JOIN rf_cast_build b
23+
ON CAST(p.a AS Nullable(Int32)) = b.id;
24+
----
25+
HashJoin
26+
โ”œโ”€โ”€ output columns: [p.a (#0), b.id (#1)]
27+
โ”œโ”€โ”€ join type: INNER
28+
โ”œโ”€โ”€ build keys: [b.id (#1)]
29+
โ”œโ”€โ”€ probe keys: [CAST(p.a (#0) AS Int32 NULL)]
30+
โ”œโ”€โ”€ keys is null equal: [false]
31+
โ”œโ”€โ”€ filters: []
32+
โ”œโ”€โ”€ build join filters:
33+
โ”‚ โ””โ”€โ”€ filter id:0, build key:b.id (#1), probe targets:[CAST(p.a (#0) AS Int32 NULL)@scan0], filter type:bloom,inlist,min_max
34+
โ”œโ”€โ”€ estimated rows: 1.80
35+
โ”œโ”€โ”€ TableScan(Build)
36+
โ”‚ โ”œโ”€โ”€ table: default.default.rf_cast_build
37+
โ”‚ โ”œโ”€โ”€ scan id: 1
38+
โ”‚ โ”œโ”€โ”€ output columns: [id (#1)]
39+
โ”‚ โ”œโ”€โ”€ read rows: 3
40+
โ”‚ โ”œโ”€โ”€ read size: < 1 KiB
41+
โ”‚ โ”œโ”€โ”€ partitions total: 1
42+
โ”‚ โ”œโ”€โ”€ partitions scanned: 1
43+
โ”‚ โ”œโ”€โ”€ pruning stats: [segments: <range pruning: 1 to 1>, blocks: <range pruning: 1 to 1>]
44+
โ”‚ โ”œโ”€โ”€ push downs: [filters: [], limit: NONE]
45+
โ”‚ โ””โ”€โ”€ estimated rows: 3.00
46+
โ””โ”€โ”€ TableScan(Probe)
47+
โ”œโ”€โ”€ table: default.default.rf_cast_probe
48+
โ”œโ”€โ”€ scan id: 0
49+
โ”œโ”€โ”€ output columns: [a (#0)]
50+
โ”œโ”€โ”€ read rows: 3
51+
โ”œโ”€โ”€ read size: < 1 KiB
52+
โ”œโ”€โ”€ partitions total: 1
53+
โ”œโ”€โ”€ partitions scanned: 1
54+
โ”œโ”€โ”€ pruning stats: [segments: <range pruning: 1 to 1>, blocks: <range pruning: 1 to 1>]
55+
โ”œโ”€โ”€ push downs: [filters: [], limit: NONE]
56+
โ”œโ”€โ”€ apply join filters: [#0]
57+
โ””โ”€โ”€ estimated rows: 3.00
58+
59+
statement ok
60+
drop table rf_cast_probe;
61+
62+
statement ok
63+
drop table rf_cast_build;

โ€Žtests/sqllogictests/suites/mode/standalone/explain_native/infer_filter.testโ€Ž

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -748,6 +748,8 @@ HashJoin
748748
โ”œโ”€โ”€ probe keys: [CAST(t1.id (#0) AS Int64 NULL)]
749749
โ”œโ”€โ”€ keys is null equal: [false]
750750
โ”œโ”€โ”€ filters: []
751+
โ”œโ”€โ”€ build join filters:
752+
โ”‚ โ””โ”€โ”€ filter id:0, build key:CAST(t2.id (#1) AS Int64 NULL), probe targets:[CAST(t1.id (#0) AS Int64 NULL)@scan0], filter type:bloom,inlist,min_max
751753
โ”œโ”€โ”€ estimated rows: 1.00
752754
โ”œโ”€โ”€ TableScan(Build)
753755
โ”‚ โ”œโ”€โ”€ table: default.default.t2
@@ -770,6 +772,7 @@ HashJoin
770772
โ”œโ”€โ”€ partitions scanned: 1
771773
โ”œโ”€โ”€ pruning stats: [segments: <range pruning: 1 to 1>, blocks: <range pruning: 1 to 1, bloom pruning: 1 to 1>]
772774
โ”œโ”€โ”€ push downs: [filters: [t1.id (#0) = 869550529], limit: NONE]
775+
โ”œโ”€โ”€ apply join filters: [#0]
773776
โ””โ”€โ”€ estimated rows: 1.00
774777

775778
statement ok

0 commit comments

Comments
ย (0)