Skip to content

Commit 3277343

Browse files
committed
update
1 parent e6e9bfb commit 3277343

File tree

3 files changed

+77
-9
lines changed

3 files changed

+77
-9
lines changed

src/common/base/src/runtime/perf/query_perf.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ impl QueryPerf {
5454
.blocklist(&["libc", "libgcc", "pthread", "vdso"])
5555
.set_filter_func(filter_closure)
5656
.build()
57-
.map_err(|_e| ErrorCode::Internal("Failed to create profiler"))?;
57+
.map_err(|e| ErrorCode::Internal(format!("Failed to create profiler, {e}")))?;
5858
debug!("starting perf with frequency: {}", frequency);
5959
let mut payload = ThreadTracker::new_tracking_payload();
6060
payload.perf_enabled = true;

src/query/service/src/physical_plans/physical_hash_join.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1219,7 +1219,21 @@ impl PhysicalPlanBuilder {
12191219
let mut predicates =
12201220
Vec::with_capacity(join.equi_conditions.len() + join.non_equi_conditions.len());
12211221

1222+
let is_simple_expr = |expr| {
1223+
matches!(
1224+
expr,
1225+
ScalarExpr::BoundColumnRef(_)
1226+
| ScalarExpr::ConstantExpr(_)
1227+
| ScalarExpr::TypedConstantExpr(_, _)
1228+
)
1229+
};
1230+
12221231
for condition in &join.equi_conditions {
1232+
if !is_simple_expr(&condition.left) || !is_simple_expr(&condition.right) {
1233+
// todo: Filtering after cross join cause expression to be evaluated multiple times
1234+
return Ok(None);
1235+
}
1236+
12231237
let scalar = condition_to_expr(condition)?;
12241238
match resolve_scalar(&scalar, &merged) {
12251239
Ok(expr) => predicates.push(expr),

src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/nested_loop.rs

Lines changed: 62 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,12 @@ impl<T: Join> Join for NestedLoopJoin<T> {
6969
return self.inner.probe_block(data);
7070
};
7171

72-
let nested = Box::new(LoopJoinStream::new(data, build_blocks));
72+
let nested: Box<dyn JoinStream + '_> = if data.num_rows() <= *self.basic_state.build_rows {
73+
Box::new(ConstProbeLoopJoinStream::new(data, build_blocks))
74+
} else {
75+
Box::new(ConstBuildLoopJoinStream::new(data, build_blocks))
76+
};
77+
7378
Ok(InnerHashJoinFilterStream::create(
7479
nested,
7580
&mut self.desc.filter,
@@ -78,15 +83,15 @@ impl<T: Join> Join for NestedLoopJoin<T> {
7883
}
7984
}
8085

81-
struct LoopJoinStream<'a> {
86+
struct ConstProbeLoopJoinStream<'a> {
8287
probe_rows: VecDeque<Vec<Scalar>>,
8388
probe_types: Vec<DataType>,
8489
build_blocks: &'a [DataBlock],
8590
build_index: usize,
8691
}
8792

88-
impl<'a> LoopJoinStream<'a> {
89-
pub fn new(probe: DataBlock, build_blocks: &'a [DataBlock]) -> Self {
93+
impl<'a> ConstProbeLoopJoinStream<'a> {
94+
fn new(probe: DataBlock, build_blocks: &'a [DataBlock]) -> Self {
9095
let mut probe_rows = vec![Vec::new(); probe.num_rows()];
9196
for entry in probe.columns().iter() {
9297
match entry {
@@ -103,22 +108,22 @@ impl<'a> LoopJoinStream<'a> {
103108
}
104109
}
105110

106-
let left_types = probe
111+
let probe_types = probe
107112
.columns()
108113
.iter()
109114
.map(|entry| entry.data_type())
110115
.collect();
111116

112-
LoopJoinStream {
117+
ConstProbeLoopJoinStream {
113118
probe_rows: probe_rows.into(),
114-
probe_types: left_types,
119+
probe_types,
115120
build_blocks,
116121
build_index: 0,
117122
}
118123
}
119124
}
120125

121-
impl<'a> JoinStream for LoopJoinStream<'a> {
126+
impl<'a> JoinStream for ConstProbeLoopJoinStream<'a> {
122127
fn next(&mut self) -> Result<Option<DataBlock>> {
123128
let Some(probe_entries) = self.probe_rows.front() else {
124129
return Ok(None);
@@ -144,3 +149,52 @@ impl<'a> JoinStream for LoopJoinStream<'a> {
144149
Ok(Some(DataBlock::new(entries, build_block.num_rows())))
145150
}
146151
}
152+
153+
struct ConstBuildLoopJoinStream<'a> {
154+
probe: DataBlock,
155+
build_blocks: &'a [DataBlock],
156+
block_index: usize,
157+
row_index: usize,
158+
}
159+
160+
impl<'a> ConstBuildLoopJoinStream<'a> {
161+
fn new(probe: DataBlock, build_blocks: &'a [DataBlock]) -> Self {
162+
ConstBuildLoopJoinStream {
163+
probe,
164+
build_blocks,
165+
block_index: 0,
166+
row_index: 0,
167+
}
168+
}
169+
}
170+
171+
impl<'a> JoinStream for ConstBuildLoopJoinStream<'a> {
172+
fn next(&mut self) -> Result<Option<DataBlock>> {
173+
let Some(build_block) = self.build_blocks.get(self.block_index) else {
174+
return Ok(None);
175+
};
176+
177+
let num_rows = self.probe.num_rows();
178+
let entries = self
179+
.probe
180+
.columns()
181+
.iter()
182+
.cloned()
183+
.chain(build_block.columns().iter().map(|entry| {
184+
BlockEntry::Const(
185+
entry.index(self.row_index).unwrap().to_owned(),
186+
entry.data_type(),
187+
num_rows,
188+
)
189+
}))
190+
.collect();
191+
192+
self.row_index += 1;
193+
if self.row_index >= build_block.num_rows() {
194+
self.row_index = 0;
195+
self.block_index += 1;
196+
}
197+
198+
Ok(Some(DataBlock::new(entries, num_rows)))
199+
}
200+
}

0 commit comments

Comments
 (0)