diff --git a/src/common/base/src/runtime/perf/query_perf.rs b/src/common/base/src/runtime/perf/query_perf.rs index 1d20c387fab9d..15084a3350b0c 100644 --- a/src/common/base/src/runtime/perf/query_perf.rs +++ b/src/common/base/src/runtime/perf/query_perf.rs @@ -54,7 +54,7 @@ impl QueryPerf { .blocklist(&["libc", "libgcc", "pthread", "vdso"]) .set_filter_func(filter_closure) .build() - .map_err(|_e| ErrorCode::Internal("Failed to create profiler"))?; + .map_err(|e| ErrorCode::Internal(format!("Failed to create profiler, {e}")))?; debug!("starting perf with frequency: {}", frequency); let mut payload = ThreadTracker::new_tracking_payload(); payload.perf_enabled = true; diff --git a/src/query/expression/src/block.rs b/src/query/expression/src/block.rs index cbeede2fec333..2f4bb95062ca7 100644 --- a/src/query/expression/src/block.rs +++ b/src/query/expression/src/block.rs @@ -191,6 +191,18 @@ impl BlockEntry { BlockEntry::Column(column) => Ok(ColumnView::Column(T::try_downcast_column(column)?)), } } + + pub fn into_nullable(self) -> BlockEntry { + match self { + BlockEntry::Const(scalar, data_type, n) if !data_type.is_nullable_or_null() => { + BlockEntry::Const(scalar, DataType::Nullable(Box::new(data_type)), n) + } + entry @ BlockEntry::Const(_, _, _) + | entry @ BlockEntry::Column(Column::Nullable(_)) + | entry @ BlockEntry::Column(Column::Null { .. }) => entry, + BlockEntry::Column(column) => column.wrap_nullable(None).into(), + } + } } impl From for BlockEntry { @@ -846,10 +858,9 @@ impl DataBlock { pub fn project(mut self, projections: &ColumnSet) -> Self { let mut entries = Vec::with_capacity(projections.len()); for (index, column) in self.entries.into_iter().enumerate() { - if !projections.contains(&index) { - continue; + if projections.contains(&index) { + entries.push(column); } - entries.push(column); } self.entries = entries; self diff --git a/src/query/expression/src/filter/filter_executor.rs b/src/query/expression/src/filter/filter_executor.rs index e9011f72ea03d..ce361f1bc3a64 100644 --- a/src/query/expression/src/filter/filter_executor.rs +++ b/src/query/expression/src/filter/filter_executor.rs @@ -81,6 +81,7 @@ impl FilterExecutor { // Filter a DataBlock, return the filtered DataBlock. pub fn filter(&mut self, data_block: DataBlock) -> Result { if self.func_ctx.enable_selector_executor { + debug_assert!(data_block.num_rows() <= self.max_block_size); let origin_count = data_block.num_rows(); let result_count = self.select(&data_block)?; self.take(data_block, origin_count, result_count) @@ -100,6 +101,7 @@ impl FilterExecutor { // Store the filtered indices of data_block in `true_selection` and return the number of filtered indices. pub fn select(&mut self, data_block: &DataBlock) -> Result { + debug_assert!(data_block.num_rows() <= self.max_block_size); let evaluator = Evaluator::new(data_block, &self.func_ctx, self.fn_registry); let selector = Selector::new(evaluator, data_block.num_rows()); selector.select( @@ -208,4 +210,8 @@ impl FilterExecutor { pub fn mutable_true_selection(&mut self) -> &mut [u32] { &mut self.true_selection } + + pub fn max_block_size(&self) -> usize { + self.max_block_size + } } diff --git a/src/query/expression/src/kernels/take_ranges.rs b/src/query/expression/src/kernels/take_ranges.rs index e11ae845560a6..b214965dfae37 100644 --- a/src/query/expression/src/kernels/take_ranges.rs +++ b/src/query/expression/src/kernels/take_ranges.rs @@ -60,6 +60,21 @@ impl DataBlock { self.get_meta().cloned(), )) } + + pub fn merge_indices_to_ranges(indices: &[u32]) -> Vec> { + debug_assert!(indices.is_sorted()); + let mut ranges: Vec> = Vec::with_capacity(indices.len() / 2); + for &index in indices { + if let Some(cur) = ranges.last_mut() + && cur.end == index + { + cur.end += 1; + } else { + ranges.push(index..index + 1) + } + } + ranges + } } struct TakeRangeVisitor<'a> { diff --git a/src/query/expression/src/schema.rs b/src/query/expression/src/schema.rs index 11bc9b66f9f78..e5fe75c97dbc4 100644 --- a/src/query/expression/src/schema.rs +++ b/src/query/expression/src/schema.rs @@ -362,6 +362,10 @@ impl DataSchema { } } + pub fn new_ref(fields: Vec) -> Arc { + Self::new(fields).into() + } + pub fn new_from(fields: Vec, metadata: BTreeMap) -> Self { Self { fields, metadata } } @@ -416,8 +420,7 @@ impl DataSchema { let mut valid_fields: Vec = self.fields.iter().map(|f| f.name().clone()).collect(); valid_fields.truncate(16); Err(ErrorCode::BadArguments(format!( - "Unable to get field named \"{}\". Valid fields: {:?} ...", - name, valid_fields + "Unable to get field named {name:?}. Valid fields: {valid_fields:?} ...", ))) } diff --git a/src/query/service/src/lib.rs b/src/query/service/src/lib.rs index d7684a3d87788..949b678e04414 100644 --- a/src/query/service/src/lib.rs +++ b/src/query/service/src/lib.rs @@ -41,6 +41,7 @@ #![allow(clippy::diverging_sub_expression)] #![allow(clippy::arc_with_non_send_sync)] #![feature(debug_closure_helpers)] +#![feature(stmt_expr_attributes)] extern crate core; diff --git a/src/query/service/src/physical_plans/mod.rs b/src/query/service/src/physical_plans/mod.rs index a6008acbace85..9a0d5f7e4a2e4 100644 --- a/src/query/service/src/physical_plans/mod.rs +++ b/src/query/service/src/physical_plans/mod.rs @@ -81,7 +81,7 @@ pub use physical_exchange::Exchange; pub use physical_exchange_sink::ExchangeSink; pub use physical_exchange_source::ExchangeSource; pub use physical_filter::Filter; -pub use physical_hash_join::HashJoin; +pub use physical_hash_join::*; pub use physical_limit::Limit; pub use physical_materialized_cte::*; pub use physical_multi_table_insert::*; diff --git a/src/query/service/src/physical_plans/physical_hash_join.rs b/src/query/service/src/physical_plans/physical_hash_join.rs index 553a462ba688e..873e48d923c86 100644 --- a/src/query/service/src/physical_plans/physical_hash_join.rs +++ b/src/query/service/src/physical_plans/physical_hash_join.rs @@ -25,6 +25,7 @@ use databend_common_expression::types::DataType; use databend_common_expression::ConstantFolder; use databend_common_expression::DataBlock; use databend_common_expression::DataField; +use databend_common_expression::DataSchema; use databend_common_expression::DataSchemaRef; use databend_common_expression::DataSchemaRefExt; use databend_common_expression::RemoteExpr; @@ -35,7 +36,9 @@ use databend_common_pipeline::core::Pipe; use databend_common_pipeline::core::PipeItem; use databend_common_pipeline::core::ProcessorPtr; use databend_common_sql::optimizer::ir::SExpr; +use databend_common_sql::plans::FunctionCall; use databend_common_sql::plans::Join; +use databend_common_sql::plans::JoinEquiCondition; use databend_common_sql::plans::JoinType; use databend_common_sql::ColumnEntry; use databend_common_sql::ColumnSet; @@ -52,6 +55,7 @@ use crate::physical_plans::format::PhysicalFormat; use crate::physical_plans::physical_plan::IPhysicalPlan; use crate::physical_plans::physical_plan::PhysicalPlan; use crate::physical_plans::physical_plan::PhysicalPlanMeta; +use crate::physical_plans::resolve_scalar; use crate::physical_plans::runtime_filter::build_runtime_filter; use crate::physical_plans::Exchange; use crate::physical_plans::PhysicalPlanBuilder; @@ -99,6 +103,12 @@ type MergedFieldsResult = ( Vec<(usize, (bool, bool))>, ); +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] +pub struct NestedLoopFilterInfo { + pub predicates: Vec, + pub projection: Vec, +} + #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] pub struct HashJoin { pub meta: PhysicalPlanMeta, @@ -140,6 +150,7 @@ pub struct HashJoin { pub runtime_filter: PhysicalRuntimeFilters, pub broadcast_id: Option, + pub nested_loop_filter: Option, } #[typetag::serde] @@ -261,6 +272,7 @@ impl IPhysicalPlan for HashJoin { build_side_cache_info: self.build_side_cache_info.clone(), runtime_filter: self.runtime_filter.clone(), broadcast_id: self.broadcast_id, + nested_loop_filter: self.nested_loop_filter.clone(), }) } @@ -527,13 +539,15 @@ impl PhysicalPlanBuilder { required: &mut ColumnSet, others_required: &mut ColumnSet, ) -> (Vec, Vec) { - let retained_columns = self.metadata.read().get_retained_column().clone(); - *required = required.union(&retained_columns).cloned().collect(); - let column_projections = required.clone().into_iter().collect::>(); - - *others_required = others_required.union(&retained_columns).cloned().collect(); - let pre_column_projections = others_required.clone().into_iter().collect::>(); + { + let metadata = self.metadata.read(); + let retained_columns = metadata.get_retained_column(); + required.extend(retained_columns); + others_required.extend(retained_columns); + } + let column_projections = required.iter().copied().collect(); + let pre_column_projections = others_required.iter().copied().collect(); (column_projections, pre_column_projections) } @@ -1029,7 +1043,7 @@ impl PhysicalPlanBuilder { } // Add tail fields - build_fields.extend(tail_fields.clone()); + build_fields.extend_from_slice(&tail_fields); merged_fields.extend(tail_fields); Ok((merged_fields, probe_fields, build_fields, probe_to_build)) @@ -1137,7 +1151,7 @@ impl PhysicalPlanBuilder { // Create projections and output schema let mut projections = ColumnSet::new(); - let projected_schema = DataSchemaRefExt::create(merged_fields.clone()); + let projected_schema = DataSchema::new(merged_fields.clone()); for column in column_projections.iter() { if let Some((index, _)) = projected_schema.column_with_name(&column.to_string()) { @@ -1182,79 +1196,89 @@ impl PhysicalPlanBuilder { .collect::>() } - /// Creates a HashJoin physical plan - /// - /// # Arguments - /// * `join` - Join operation - /// * `probe_side` - Probe side physical plan - /// * `build_side` - Build side physical plan - /// * `is_broadcast` - Whether this is a broadcast join - /// * `projections` - Column projections - /// * `probe_projections` - Probe side projections - /// * `build_projections` - Build side projections - /// * `left_join_conditions` - Left join conditions - /// * `right_join_conditions` - Right join conditions - /// * `is_null_equal` - Null equality flags - /// * `non_equi_conditions` - Non-equi conditions - /// * `probe_to_build` - Probe to build mapping - /// * `output_schema` - Output schema - /// * `build_side_cache_info` - Build side cache info - /// * `runtime_filter` - Runtime filter - /// * `stat_info` - Statistics info - /// - /// # Returns - /// * `Result` - The HashJoin physical plan - #[allow(clippy::too_many_arguments)] - fn create_hash_join( + fn build_nested_loop_filter_info( &self, - s_expr: &SExpr, join: &Join, - probe_side: PhysicalPlan, - build_side: PhysicalPlan, - projections: ColumnSet, - probe_projections: ColumnSet, - build_projections: ColumnSet, - left_join_conditions: Vec, - right_join_conditions: Vec, - is_null_equal: Vec, - non_equi_conditions: Vec, - probe_to_build: Vec<(usize, (bool, bool))>, - output_schema: DataSchemaRef, - build_side_cache_info: Option<(usize, HashMap)>, - runtime_filter: PhysicalRuntimeFilters, - stat_info: PlanStatsInfo, - ) -> Result { - let build_side_data_distribution = s_expr.build_side_child().get_data_distribution()?; - let broadcast_id = if build_side_data_distribution - .as_ref() - .is_some_and(|e| matches!(e, databend_common_sql::plans::Exchange::NodeToNodeHash(_))) - { - Some(self.ctx.get_next_broadcast_id()) - } else { - None + probe_schema: &DataSchema, + build_schema: &DataSchema, + target_schema: &DataSchema, + ) -> Result> { + if !matches!(join.join_type, JoinType::Inner) { + return Ok(None); + } + + let merged = DataSchema::new( + probe_schema + .fields + .iter() + .cloned() + .chain(build_schema.fields.iter().cloned()) + .collect(), + ); + + let mut predicates = + Vec::with_capacity(join.equi_conditions.len() + join.non_equi_conditions.len()); + + let is_simple_expr = |expr: &ScalarExpr| { + matches!( + expr, + ScalarExpr::BoundColumnRef(_) + | ScalarExpr::ConstantExpr(_) + | ScalarExpr::TypedConstantExpr(_, _) + ) }; - Ok(PhysicalPlan::new(HashJoin { - projections, - build_projections, - probe_projections, - build: build_side, - probe: probe_side, - join_type: join.join_type, - build_keys: right_join_conditions, - probe_keys: left_join_conditions, - is_null_equal, - non_equi_conditions, - marker_index: join.marker_index, - meta: PhysicalPlanMeta::new("HashJoin"), - from_correlated_subquery: join.from_correlated_subquery, - probe_to_build, - output_schema, - need_hold_hash_table: join.need_hold_hash_table, - stat_info: Some(stat_info), - single_to_inner: join.single_to_inner, - build_side_cache_info, - runtime_filter, - broadcast_id, + + for condition in &join.equi_conditions { + if !is_simple_expr(&condition.left) || !is_simple_expr(&condition.right) { + // todo: Filtering after cross join cause expression to be evaluated multiple times + return Ok(None); + } + + let scalar = condition_to_expr(condition)?; + match resolve_scalar(&scalar, &merged) { + Ok(expr) => predicates.push(expr), + Err(_) + if condition + .left + .data_type() + .map(|data_type| data_type.remove_nullable().is_bitmap()) + .unwrap_or_default() => + { + // no function matches signature `eq(Bitmap NULL, Bitmap NULL) + return Ok(None); + } + Err(err) => { + return Err(err.add_message(format!( + "Failed build nested loop filter schema: {merged:#?} equi_conditions: {:#?}", + join.equi_conditions + ))) + } + } + } + + for scalar in &join.non_equi_conditions { + predicates.push(resolve_scalar(scalar, &merged).map_err(|err|{ + err.add_message(format!( + "Failed build nested loop filter schema: {merged:#?} non_equi_conditions: {:#?}", + join.non_equi_conditions + )) + })?); + } + + let projection = target_schema + .fields + .iter() + .map(|column| merged.index_of(column.name())) + .collect::>>() + .map_err(|err| { + err.add_message(format!( + "Failed build nested loop filter schema: {merged:#?} target: {target_schema:#?}", + )) + })?; + + Ok(Some(NestedLoopFilterInfo { + predicates, + projection, })) } @@ -1342,24 +1366,67 @@ impl PhysicalPlanBuilder { ) .await?; + let nested_loop_filter = + self.build_nested_loop_filter_info(join, &probe_schema, &build_schema, &merged_schema)?; + // Step 12: Create and return the HashJoin - self.create_hash_join( - s_expr, - join, - probe_side, - build_side, + let build_side_data_distribution = s_expr.build_side_child().get_data_distribution()?; + let broadcast_id = if build_side_data_distribution + .as_ref() + .is_some_and(|e| matches!(e, databend_common_sql::plans::Exchange::NodeToNodeHash(_))) + { + Some(self.ctx.get_next_broadcast_id()) + } else { + None + }; + Ok(PhysicalPlan::new(HashJoin { projections, - probe_projections, build_projections, - left_join_conditions, - right_join_conditions, + probe_projections, + build: build_side, + probe: probe_side, + join_type: join.join_type, + build_keys: right_join_conditions, + probe_keys: left_join_conditions, is_null_equal, non_equi_conditions, + marker_index: join.marker_index, + meta: PhysicalPlanMeta::new("HashJoin"), + from_correlated_subquery: join.from_correlated_subquery, probe_to_build, output_schema, + need_hold_hash_table: join.need_hold_hash_table, + stat_info: Some(stat_info), + single_to_inner: join.single_to_inner, build_side_cache_info, runtime_filter, - stat_info, - ) + broadcast_id, + nested_loop_filter, + })) + } +} + +fn condition_to_expr(condition: &JoinEquiCondition) -> Result { + let left_type = condition.left.data_type()?; + let right_type = condition.right.data_type()?; + + let arguments = match (&left_type, &right_type) { + (DataType::Nullable(box left), right) if left == right => vec![ + condition.left.clone(), + condition.right.clone().unify_to_data_type(&left_type), + ], + (left, DataType::Nullable(box right)) if left == right => vec![ + condition.left.clone().unify_to_data_type(&right_type), + condition.right.clone(), + ], + _ => vec![condition.left.clone(), condition.right.clone()], + }; + + Ok(FunctionCall { + span: condition.left.span(), + func_name: "eq".to_string(), + params: vec![], + arguments, } + .into()) } diff --git a/src/query/service/src/physical_plans/physical_join.rs b/src/query/service/src/physical_plans/physical_join.rs index bc48246312e38..b8910faf02664 100644 --- a/src/query/service/src/physical_plans/physical_join.rs +++ b/src/query/service/src/physical_plans/physical_join.rs @@ -30,7 +30,10 @@ use crate::physical_plans::PhysicalPlanBuilder; enum PhysicalJoinType { Hash, // The first arg is range conditions, the second arg is other conditions - RangeJoin(Vec, Vec), + RangeJoin { + range: Vec, + other: Vec, + }, } // Choose physical join type by join conditions @@ -41,6 +44,10 @@ fn physical_join(join: &Join, s_expr: &SExpr) -> Result { )); } + let left_rel_expr = RelExpr::with_s_expr(s_expr.left_child()); + let right_rel_expr = RelExpr::with_s_expr(s_expr.right_child()); + let right_stat_info = right_rel_expr.derive_cardinality()?; + if !join.equi_conditions.is_empty() { // Contain equi condition, use hash join return Ok(PhysicalJoinType::Hash); @@ -51,9 +58,6 @@ fn physical_join(join: &Join, s_expr: &SExpr) -> Result { return Ok(PhysicalJoinType::Hash); } - let left_rel_expr = RelExpr::with_s_expr(s_expr.child(0)?); - let right_rel_expr = RelExpr::with_s_expr(s_expr.child(1)?); - let right_stat_info = right_rel_expr.derive_cardinality()?; if matches!(right_stat_info.statistics.precise_cardinality, Some(1)) || right_stat_info.cardinality == 1.0 { @@ -61,22 +65,22 @@ fn physical_join(join: &Join, s_expr: &SExpr) -> Result { return Ok(PhysicalJoinType::Hash); } - let left_prop = left_rel_expr.derive_relational_prop()?; - let right_prop = right_rel_expr.derive_relational_prop()?; - let (range_conditions, other_conditions) = join - .non_equi_conditions - .iter() - .cloned() - .partition::, _>(|condition| { - is_range_join_condition(condition, &left_prop, &right_prop).is_some() - }); - - if !range_conditions.is_empty() && matches!(join.join_type, JoinType::Inner | JoinType::Cross) { - return Ok(PhysicalJoinType::RangeJoin( - range_conditions, - other_conditions, - )); + if matches!(join.join_type, JoinType::Inner | JoinType::Cross) { + let left_prop = left_rel_expr.derive_relational_prop()?; + let right_prop = right_rel_expr.derive_relational_prop()?; + let (range, other) = join + .non_equi_conditions + .iter() + .cloned() + .partition::, _>(|condition| { + is_range_join_condition(condition, &left_prop, &right_prop).is_some() + }); + + if !range.is_empty() { + return Ok(PhysicalJoinType::RangeJoin { range, other }); + } } + // Leverage hash join to execute nested loop join Ok(PhysicalJoinType::Hash) } @@ -170,7 +174,7 @@ impl PhysicalPlanBuilder { ) .await } - PhysicalJoinType::RangeJoin(range, other) => { + PhysicalJoinType::RangeJoin { range, other } => { self.build_range_join( join.join_type, s_expr, diff --git a/src/query/service/src/physical_plans/physical_range_join.rs b/src/query/service/src/physical_plans/physical_range_join.rs index 9c9723edf6e73..1be812e3f26d0 100644 --- a/src/query/service/src/physical_plans/physical_range_join.rs +++ b/src/query/service/src/physical_plans/physical_range_join.rs @@ -20,7 +20,6 @@ use databend_common_exception::Result; use databend_common_expression::type_check::common_super_type; use databend_common_expression::DataSchema; use databend_common_expression::DataSchemaRef; -use databend_common_expression::DataSchemaRefExt; use databend_common_expression::RemoteExpr; use databend_common_functions::BUILTIN_FUNCTIONS; use databend_common_pipeline::core::ProcessorPtr; @@ -232,10 +231,7 @@ impl PhysicalPlanBuilder { let left_schema = self.prepare_probe_schema(join_type, &left_side)?; let right_schema = self.prepare_build_schema(join_type, &right_side)?; - let mut output_schema = Vec::clone(left_schema.fields()); - output_schema.extend_from_slice(right_schema.fields()); - - let merged_schema = DataSchemaRefExt::create( + let output_schema = DataSchema::new_ref( left_schema .fields() .iter() @@ -262,11 +258,11 @@ impl PhysicalPlanBuilder { .collect::>()?, other_conditions: other_conditions .iter() - .map(|scalar| resolve_scalar(scalar, &merged_schema)) + .map(|scalar| resolve_scalar(scalar, &output_schema)) .collect::>()?, join_type, range_join_type, - output_schema: Arc::new(DataSchema::new(output_schema)), + output_schema, stat_info: Some(self.build_plan_stat_info(s_expr)?), })) } @@ -343,9 +339,9 @@ fn resolve_range_condition( } } -fn resolve_scalar(scalar: &ScalarExpr, schema: &DataSchemaRef) -> Result { +pub fn resolve_scalar(scalar: &ScalarExpr, schema: &DataSchema) -> Result { let expr = scalar - .type_check(schema.as_ref())? + .type_check(schema)? .project_column_ref(|index| schema.index_of(&index.to_string()))?; Ok(expr.as_remote_expr()) } diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/desc.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/desc.rs index ae2a25d06733b..5eee7bd77acc7 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/desc.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/desc.rs @@ -22,14 +22,18 @@ use databend_common_expression::DataBlock; use databend_common_expression::DataSchemaRef; use databend_common_expression::Evaluator; use databend_common_expression::Expr; +use databend_common_expression::FieldIndex; +use databend_common_expression::FilterExecutor; use databend_common_expression::FunctionContext; use databend_common_expression::RemoteExpr; use databend_common_functions::BUILTIN_FUNCTIONS; +use databend_common_settings::Settings; use databend_common_sql::executor::cast_expr_to_non_null_boolean; use databend_common_sql::ColumnSet; use parking_lot::RwLock; use crate::physical_plans::HashJoin; +use crate::physical_plans::NestedLoopFilterInfo; use crate::physical_plans::PhysicalRuntimeFilter; use crate::physical_plans::PhysicalRuntimeFilters; use crate::pipelines::processors::transforms::wrap_true_validity; @@ -39,11 +43,13 @@ pub const MARKER_KIND_TRUE: u8 = 0; pub const MARKER_KIND_FALSE: u8 = 1; pub const MARKER_KIND_NULL: u8 = 2; +#[derive(Debug)] pub struct MarkJoinDesc { // pub(crate) marker_index: Option, pub(crate) has_null: RwLock, } +#[derive(Debug)] pub struct HashJoinDesc { pub(crate) build_keys: Vec, pub(crate) probe_keys: Vec, @@ -60,9 +66,10 @@ pub struct HashJoinDesc { pub(crate) runtime_filter: RuntimeFiltersDesc, pub(crate) build_projection: ColumnSet, - pub(crate) probe_projections: ColumnSet, + pub(crate) probe_projection: ColumnSet, pub(crate) probe_to_build: Vec<(usize, (bool, bool))>, pub(crate) build_schema: DataSchemaRef, + pub(crate) nested_loop_filter: Option, } #[derive(Debug, Clone)] @@ -76,7 +83,8 @@ pub struct RuntimeFilterDesc { pub enable_min_max_runtime_filter: bool, } -pub struct RuntimeFiltersDesc { +#[derive(Debug)] +pub(crate) struct RuntimeFiltersDesc { pub filters: Vec, } @@ -136,8 +144,9 @@ impl HashJoinDesc { runtime_filter: (&join.runtime_filter).into(), probe_to_build: join.probe_to_build.clone(), build_projection: join.build_projections.clone(), - probe_projections: join.probe_projections.clone(), + probe_projection: join.probe_projections.clone(), build_schema: join.build.output_schema()?, + nested_loop_filter: join.nested_loop_filter.clone(), }) } @@ -259,4 +268,67 @@ impl HashJoinDesc { } } } + + pub fn create_nested_loop_desc( + &self, + settings: &Settings, + function_ctx: &FunctionContext, + ) -> Result> { + let nested_loop_join_threshold = settings.get_nested_loop_join_threshold()? as usize; + let block_size = settings.get_max_block_size()? as usize; + if nested_loop_join_threshold == 0 { + return Ok(None); + } + + let Some(NestedLoopFilterInfo { + predicates, + projection, + }) = &self.nested_loop_filter + else { + return Ok(None); + }; + + let predicates = predicates + .iter() + .map(|x| Ok(x.as_expr(&BUILTIN_FUNCTIONS))) + .reduce(|lhs, rhs| { + check_function(None, "and_filters", &[], &[lhs?, rhs?], &BUILTIN_FUNCTIONS) + }) + .transpose()?; + let Some(predicates) = predicates else { + return Ok(None); + }; + + let field_reorder = if !projection.is_sorted() { + let mut mapper = projection.iter().cloned().enumerate().collect::>(); + mapper.sort_by_key(|(_, field)| *field); + let reorder = (0..projection.len()) + .map(|j| mapper.iter().position(|(i, _)| *i == j).unwrap()) + .collect::>(); + Some(reorder) + } else { + None + }; + + Ok(Some(NestedLoopDesc { + filter: FilterExecutor::new( + predicates, + function_ctx.clone(), + block_size, + None, + &BUILTIN_FUNCTIONS, + false, + ), + projections: projection.iter().copied().collect(), + field_reorder, + nested_loop_join_threshold, + })) + } +} + +pub struct NestedLoopDesc { + pub filter: FilterExecutor, + pub projections: ColumnSet, + pub field_reorder: Option>, + pub nested_loop_join_threshold: usize, } diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_build_state.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_build_state.rs index 380977cee104e..78380489e3fdf 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_build_state.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_build_state.rs @@ -744,6 +744,7 @@ impl HashJoinBuildState { "Aborted query, because the hash table is uninitialized.", )); } + HashJoinHashTable::NestedLoop(_) => unreachable!(), HashJoinHashTable::SkipDuplicatesSerializer(table) => insert_binary_key! { &mut table.hash_table, &table.hash_method, chunk, build_keys, valids, chunk_index as u32, entry_size, &mut local_raw_entry_spaces, }, diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_probe_state.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_probe_state.rs index 1632c7dab8371..0dd15dd68df6f 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_probe_state.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_probe_state.rs @@ -165,6 +165,7 @@ impl HashJoinProbeState { // Continue to probe hash table and process data blocks. self.result_blocks(probe_state, keys, &table.hash_table) } + HashJoinHashTable::NestedLoop(_) => unreachable!(), HashJoinHashTable::Null => Err(ErrorCode::AbortedQuery( "Aborted query, because the hash table is uninitialized.", )), @@ -376,6 +377,7 @@ impl HashJoinProbeState { // Continue to probe hash table and process data blocks. self.result_blocks(probe_state, keys, &table.hash_table) } + HashJoinHashTable::NestedLoop(_) => unreachable!(), HashJoinHashTable::Null => Err(ErrorCode::AbortedQuery( "Aborted query, because the hash table is uninitialized.", )), diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_state.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_state.rs index d696ca2304e7b..9107d9ea12965 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_state.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_state.rs @@ -78,6 +78,7 @@ pub struct FixedKeyHashJoinHashTable< pub enum HashJoinHashTable { Null, + NestedLoop(Vec), Serializer(SerializerHashJoinHashTable), SkipDuplicatesSerializer(SkipDuplicatesSerializerHashJoinHashTable), SingleBinary(SingleBinaryHashJoinHashTable), @@ -348,9 +349,9 @@ impl HashJoinState { } impl HashJoinHashTable { - pub fn len(&self) -> usize { - match self { - HashJoinHashTable::Null => 0, + pub fn size(&self) -> Option { + let n = match self { + HashJoinHashTable::Null | HashJoinHashTable::NestedLoop(_) => return None, HashJoinHashTable::Serializer(table) => table.hash_table.len(), HashJoinHashTable::SingleBinary(table) => table.hash_table.len(), HashJoinHashTable::KeysU8(table) => table.hash_table.len(), @@ -367,11 +368,7 @@ impl HashJoinHashTable { HashJoinHashTable::SkipDuplicatesKeysU64(table) => table.hash_table.len(), HashJoinHashTable::SkipDuplicatesKeysU128(table) => table.hash_table.len(), HashJoinHashTable::SkipDuplicatesKeysU256(table) => table.hash_table.len(), - } - } - - #[allow(dead_code)] - pub fn is_empty(&self) -> bool { - self.len() == 0 + }; + Some(n) } } diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/mod.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/mod.rs index 32b46295ccb1f..2f5caba0e8474 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/mod.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/mod.rs @@ -32,6 +32,7 @@ mod util; pub use common::wrap_true_validity; pub use desc::HashJoinDesc; +pub use desc::NestedLoopDesc; pub use desc::RuntimeFilterDesc; pub use hash_join_build_state::HashJoinBuildState; pub use hash_join_probe_state::HashJoinProbeState; diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/result_blocks.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/result_blocks.rs index 114f019669287..f7950e3948366 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/result_blocks.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/result_blocks.rs @@ -54,7 +54,9 @@ impl HashJoinProbeState { JoinType::InnerAny | JoinType::RightAny ) { let hash_table = unsafe { &*self.hash_join_state.hash_table.get() }; - probe_state.used_once = Some(MutableBitmap::from_len_zeroed(hash_table.len())) + probe_state.used_once = Some(MutableBitmap::from_len_zeroed( + hash_table.size().unwrap_or_default(), + )) } let no_other_predicate = self .hash_join_state diff --git a/src/query/service/src/pipelines/processors/transforms/new_hash_join/hash_join_factory.rs b/src/query/service/src/pipelines/processors/transforms/new_hash_join/hash_join_factory.rs index bc7aad3e4a36f..ad75c32761383 100644 --- a/src/query/service/src/pipelines/processors/transforms/new_hash_join/hash_join_factory.rs +++ b/src/query/service/src/pipelines/processors/transforms/new_hash_join/hash_join_factory.rs @@ -26,9 +26,10 @@ use databend_common_expression::FunctionContext; use databend_common_expression::HashMethodKind; use databend_common_sql::plans::JoinType; -use crate::pipelines::processors::transforms::memory::outer_left_join::OuterLeftHashJoin; -use crate::pipelines::processors::transforms::new_hash_join::common::CStyleCell; -use crate::pipelines::processors::transforms::new_hash_join::grace::GraceHashJoinState; +use super::common::CStyleCell; +use super::grace::GraceHashJoinState; +use super::memory::outer_left_join::OuterLeftHashJoin; +use super::memory::NestedLoopJoin; use crate::pipelines::processors::transforms::BasicHashJoinState; use crate::pipelines::processors::transforms::GraceHashJoin; use crate::pipelines::processors::transforms::InnerHashJoin; @@ -126,13 +127,29 @@ impl HashJoinFactory { } match typ { - JoinType::Inner => Ok(Box::new(InnerHashJoin::create( - &self.ctx, - self.function_ctx.clone(), - self.hash_method.clone(), - self.desc.clone(), - self.create_basic_state(id)?, - )?)), + JoinType::Inner => { + let state = self.create_basic_state(id)?; + let nested_loop_desc = self + .desc + .create_nested_loop_desc(&settings, &self.function_ctx)?; + + let inner = InnerHashJoin::create( + &settings, + self.function_ctx.clone(), + self.hash_method.clone(), + self.desc.clone(), + state.clone(), + nested_loop_desc + .as_ref() + .map(|desc| desc.nested_loop_join_threshold) + .unwrap_or_default(), + )?; + + match nested_loop_desc { + Some(desc) => Ok(Box::new(NestedLoopJoin::new(inner, state, desc))), + None => Ok(Box::new(inner)), + } + } JoinType::Left => Ok(Box::new(OuterLeftHashJoin::create( &self.ctx, self.function_ctx.clone(), @@ -148,11 +165,12 @@ impl HashJoinFactory { match typ { JoinType::Inner => { let inner_hash_join = InnerHashJoin::create( - &self.ctx, + &self.ctx.get_settings(), self.function_ctx.clone(), self.hash_method.clone(), self.desc.clone(), self.create_basic_state(id)?, + 0, )?; Ok(Box::new(GraceHashJoin::create( diff --git a/src/query/service/src/pipelines/processors/transforms/new_hash_join/join.rs b/src/query/service/src/pipelines/processors/transforms/new_hash_join/join.rs index d1a44c0992b28..a4006838900f7 100644 --- a/src/query/service/src/pipelines/processors/transforms/new_hash_join/join.rs +++ b/src/query/service/src/pipelines/processors/transforms/new_hash_join/join.rs @@ -24,16 +24,22 @@ pub trait JoinStream: Send + Sync { } pub trait Join: Send + Sync + 'static { + /// Push one block into the build side. `None` signals the end of input. fn add_block(&mut self, data: Option) -> Result<()>; + /// Finalize build phase in chunks; each call processes the next pending build batch and + /// returns its progress. Once all batches are consumed it returns `None` to signal completion. fn final_build(&mut self) -> Result>; + /// Generate runtime filter packet for the given filter description. fn build_runtime_filter(&self, _: &RuntimeFiltersDesc) -> Result { Ok(JoinRuntimeFilterPacket::default()) } + /// Probe with a single block and return a streaming iterator over results. fn probe_block(&mut self, data: DataBlock) -> Result>; + /// Final steps after probing all blocks; used when more output is pending. fn final_probe(&mut self) -> Result>> { Ok(None) } diff --git a/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/basic.rs b/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/basic.rs index 8168fbc081173..e2d755d8425ae 100644 --- a/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/basic.rs +++ b/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/basic.rs @@ -17,7 +17,6 @@ use std::sync::Arc; use std::sync::PoisonError; use databend_common_base::base::ProgressValues; -use databend_common_catalog::table_context::TableContext; use databend_common_exception::Result; use databend_common_expression::Column; use databend_common_expression::DataBlock; @@ -27,6 +26,7 @@ use databend_common_expression::HashMethodSerializer; use databend_common_expression::HashMethodSingleBinary; use databend_common_hashtable::BinaryHashJoinHashMap; use databend_common_hashtable::HashJoinHashMap; +use databend_common_settings::Settings; use databend_common_sql::plans::JoinType; use ethnum::U256; @@ -40,7 +40,6 @@ use crate::pipelines::processors::transforms::SkipDuplicatesFixedKeyHashJoinHash use crate::pipelines::processors::transforms::SkipDuplicatesSerializerHashJoinHashTable; use crate::pipelines::processors::transforms::SkipDuplicatesSingleBinaryHashJoinHashTable; use crate::pipelines::processors::HashJoinDesc; -use crate::sessions::QueryContext; pub struct BasicHashJoin { pub(crate) desc: Arc, @@ -49,28 +48,33 @@ pub struct BasicHashJoin { pub(crate) method: HashMethodKind, pub(crate) function_ctx: FunctionContext, pub(crate) state: Arc, + nested_loop_join_threshold: usize, } impl BasicHashJoin { pub fn create( - ctx: &QueryContext, + settings: &Settings, function_ctx: FunctionContext, method: HashMethodKind, desc: Arc, state: Arc, + nested_loop_join_threshold: usize, ) -> Result { - let settings = ctx.get_settings(); - let block_size = settings.get_max_block_size()? as usize; - let block_bytes = settings.get_max_block_size()? as usize; + let squash_block = SquashBlocks::new( + settings.get_max_block_size()? as _, + settings.get_max_block_bytes()? as _, + ); Ok(BasicHashJoin { desc, state, method, function_ctx, - squash_block: SquashBlocks::new(block_size, block_bytes), + squash_block, + nested_loop_join_threshold, }) } + pub(crate) fn add_block(&mut self, mut data: Option) -> Result<()> { let mut squashed_block = match data.take() { None => self.squash_block.finalize()?, @@ -78,22 +82,24 @@ impl BasicHashJoin { }; if let Some(squashed_block) = squashed_block.take() { - let locked = self.state.mutex.lock(); - let _locked = locked.unwrap_or_else(PoisonError::into_inner); - - *self.state.build_rows.as_mut() += squashed_block.num_rows(); - let chunk_index = self.state.chunks.len(); - self.state.chunks.as_mut().push(squashed_block); - self.state.build_queue.as_mut().push_back(chunk_index); + self.state.push_chunk(squashed_block); } Ok(()) } pub(crate) fn final_build(&mut self) -> Result> { - self.init_memory_hash_table(); + match self.state.hash_table.deref() { + HashJoinHashTable::Null => match self.init_memory_hash_table() { + Some(true) => return Ok(Some(self.build_nested_loop())), + Some(false) => return Ok(None), + None => {} + }, + HashJoinHashTable::NestedLoop(_) => return Ok(None), + _ => {} + } - let Some(chunk_index) = self.steal_chunk_index() else { + let Some(chunk_index) = self.state.steal_chunk_index() else { return Ok(None); }; @@ -135,14 +141,6 @@ impl BasicHashJoin { bytes: num_bytes, })) } -} - -impl BasicHashJoin { - fn steal_chunk_index(&self) -> Option { - let locked = self.state.mutex.lock(); - let _locked = locked.unwrap_or_else(PoisonError::into_inner); - self.state.build_queue.as_mut().pop_front() - } pub(crate) fn finalize_chunks(&mut self) { if self.desc.build_projection.is_empty() || !self.state.columns.is_empty() { @@ -152,16 +150,21 @@ impl BasicHashJoin { let locked = self.state.mutex.lock(); let _locked = locked.unwrap_or_else(PoisonError::into_inner); - if self.state.chunks.is_empty() || !self.state.columns.is_empty() { + debug_assert!(!matches!( + self.state.hash_table.deref(), + HashJoinHashTable::NestedLoop(_) + )); + + if !self.state.columns.is_empty() { return; } - if let Some(block) = self.state.chunks.first() { - for offset in 0..self.desc.build_projection.len() { - let column_type = self.state.column_types.as_mut(); - column_type.push(block.get_by_offset(offset).data_type()); - } - } + *self.state.column_types.as_mut() = (0..self.desc.build_projection.len()) + .map(|offset| block.get_by_offset(offset).data_type()) + .collect(); + } else { + return; + }; let mut columns = Vec::with_capacity(self.desc.build_projection.len()); for offset in 0..self.desc.build_projection.len() { @@ -175,135 +178,138 @@ impl BasicHashJoin { columns.push(Column::take_downcast_column_vec(&full_columns)); } - std::mem::swap(&mut columns, self.state.columns.as_mut()); + *self.state.columns.as_mut() = columns; } - fn init_memory_hash_table(&mut self) { - if !matches!(self.state.hash_table.deref(), HashJoinHashTable::Null) { - return; - } + fn init_memory_hash_table(&mut self) -> Option { let skip_duplicates = matches!(self.desc.join_type, JoinType::InnerAny | JoinType::LeftAny); let locked = self.state.mutex.lock(); let _locked = locked.unwrap_or_else(PoisonError::into_inner); - if matches!(self.state.hash_table.deref(), HashJoinHashTable::Null) { - let build_num_rows = *self.state.build_rows.deref(); - *self.state.hash_table.as_mut() = match (self.method.clone(), skip_duplicates) { - (HashMethodKind::Serializer(_), false) => { - HashJoinHashTable::Serializer(SerializerHashJoinHashTable::new( - BinaryHashJoinHashMap::with_build_row_num(build_num_rows), - HashMethodSerializer::default(), - )) - } - (HashMethodKind::Serializer(_), true) => { - HashJoinHashTable::SkipDuplicatesSerializer( - SkipDuplicatesSerializerHashJoinHashTable::new( - BinaryHashJoinHashMap::with_build_row_num(build_num_rows), - HashMethodSerializer::default(), - ), - ) - } - (HashMethodKind::SingleBinary(_), false) => { - HashJoinHashTable::SingleBinary(SingleBinaryHashJoinHashTable::new( + match self.state.hash_table.deref() { + HashJoinHashTable::Null => {} + HashJoinHashTable::NestedLoop(_) => return Some(false), + _ => return None, + } + + let build_num_rows = *self.state.build_rows.deref(); + if build_num_rows < self.nested_loop_join_threshold { + *self.state.hash_table.as_mut() = HashJoinHashTable::NestedLoop(vec![]); + return Some(true); + } + + *self.state.hash_table.as_mut() = match (self.method.clone(), skip_duplicates) { + (HashMethodKind::Serializer(_), false) => { + HashJoinHashTable::Serializer(SerializerHashJoinHashTable::new( + BinaryHashJoinHashMap::with_build_row_num(build_num_rows), + HashMethodSerializer::default(), + )) + } + (HashMethodKind::Serializer(_), true) => HashJoinHashTable::SkipDuplicatesSerializer( + SkipDuplicatesSerializerHashJoinHashTable::new( + BinaryHashJoinHashMap::with_build_row_num(build_num_rows), + HashMethodSerializer::default(), + ), + ), + (HashMethodKind::SingleBinary(_), false) => { + HashJoinHashTable::SingleBinary(SingleBinaryHashJoinHashTable::new( + BinaryHashJoinHashMap::with_build_row_num(build_num_rows), + HashMethodSingleBinary::default(), + )) + } + (HashMethodKind::SingleBinary(_), true) => { + HashJoinHashTable::SkipDuplicatesSingleBinary( + SkipDuplicatesSingleBinaryHashJoinHashTable::new( BinaryHashJoinHashMap::with_build_row_num(build_num_rows), HashMethodSingleBinary::default(), - )) - } - (HashMethodKind::SingleBinary(_), true) => { - HashJoinHashTable::SkipDuplicatesSingleBinary( - SkipDuplicatesSingleBinaryHashJoinHashTable::new( - BinaryHashJoinHashMap::with_build_row_num(build_num_rows), - HashMethodSingleBinary::default(), - ), - ) - } - (HashMethodKind::KeysU8(hash_method), false) => { - HashJoinHashTable::KeysU8(FixedKeyHashJoinHashTable::new( - HashJoinHashMap::::with_build_row_num(build_num_rows), - hash_method, - )) - } - (HashMethodKind::KeysU8(hash_method), true) => { - HashJoinHashTable::SkipDuplicatesKeysU8( - SkipDuplicatesFixedKeyHashJoinHashTable::new( - HashJoinHashMap::::with_build_row_num(build_num_rows), - hash_method, - ), - ) - } - (HashMethodKind::KeysU16(hash_method), false) => { - HashJoinHashTable::KeysU16(FixedKeyHashJoinHashTable::new( - HashJoinHashMap::::with_build_row_num(build_num_rows), + ), + ) + } + (HashMethodKind::KeysU8(hash_method), false) => { + HashJoinHashTable::KeysU8(FixedKeyHashJoinHashTable::new( + HashJoinHashMap::::with_build_row_num(build_num_rows), + hash_method, + )) + } + (HashMethodKind::KeysU8(hash_method), true) => HashJoinHashTable::SkipDuplicatesKeysU8( + SkipDuplicatesFixedKeyHashJoinHashTable::new( + HashJoinHashMap::::with_build_row_num(build_num_rows), + hash_method, + ), + ), + (HashMethodKind::KeysU16(hash_method), false) => { + HashJoinHashTable::KeysU16(FixedKeyHashJoinHashTable::new( + HashJoinHashMap::::with_build_row_num(build_num_rows), + hash_method, + )) + } + (HashMethodKind::KeysU16(hash_method), true) => { + HashJoinHashTable::SkipDuplicatesKeysU16( + SkipDuplicatesFixedKeyHashJoinHashTable::new( + HashJoinHashMap::::with_build_row_num(build_num_rows), hash_method, - )) - } - (HashMethodKind::KeysU16(hash_method), true) => { - HashJoinHashTable::SkipDuplicatesKeysU16( - SkipDuplicatesFixedKeyHashJoinHashTable::new( - HashJoinHashMap::::with_build_row_num(build_num_rows), - hash_method, - ), - ) - } - (HashMethodKind::KeysU32(hash_method), false) => { - HashJoinHashTable::KeysU32(FixedKeyHashJoinHashTable::new( - HashJoinHashMap::::with_build_row_num(build_num_rows), + ), + ) + } + (HashMethodKind::KeysU32(hash_method), false) => { + HashJoinHashTable::KeysU32(FixedKeyHashJoinHashTable::new( + HashJoinHashMap::::with_build_row_num(build_num_rows), + hash_method, + )) + } + (HashMethodKind::KeysU32(hash_method), true) => { + HashJoinHashTable::SkipDuplicatesKeysU32( + SkipDuplicatesFixedKeyHashJoinHashTable::new( + HashJoinHashMap::::with_build_row_num(build_num_rows), hash_method, - )) - } - (HashMethodKind::KeysU32(hash_method), true) => { - HashJoinHashTable::SkipDuplicatesKeysU32( - SkipDuplicatesFixedKeyHashJoinHashTable::new( - HashJoinHashMap::::with_build_row_num(build_num_rows), - hash_method, - ), - ) - } - (HashMethodKind::KeysU64(hash_method), false) => { - HashJoinHashTable::KeysU64(FixedKeyHashJoinHashTable::new( - HashJoinHashMap::::with_build_row_num(build_num_rows), + ), + ) + } + (HashMethodKind::KeysU64(hash_method), false) => { + HashJoinHashTable::KeysU64(FixedKeyHashJoinHashTable::new( + HashJoinHashMap::::with_build_row_num(build_num_rows), + hash_method, + )) + } + (HashMethodKind::KeysU64(hash_method), true) => { + HashJoinHashTable::SkipDuplicatesKeysU64( + SkipDuplicatesFixedKeyHashJoinHashTable::new( + HashJoinHashMap::::with_build_row_num(build_num_rows), hash_method, - )) - } - (HashMethodKind::KeysU64(hash_method), true) => { - HashJoinHashTable::SkipDuplicatesKeysU64( - SkipDuplicatesFixedKeyHashJoinHashTable::new( - HashJoinHashMap::::with_build_row_num(build_num_rows), - hash_method, - ), - ) - } - (HashMethodKind::KeysU128(hash_method), false) => { - HashJoinHashTable::KeysU128(FixedKeyHashJoinHashTable::new( - HashJoinHashMap::::with_build_row_num(build_num_rows), + ), + ) + } + (HashMethodKind::KeysU128(hash_method), false) => { + HashJoinHashTable::KeysU128(FixedKeyHashJoinHashTable::new( + HashJoinHashMap::::with_build_row_num(build_num_rows), + hash_method, + )) + } + (HashMethodKind::KeysU128(hash_method), true) => { + HashJoinHashTable::SkipDuplicatesKeysU128( + SkipDuplicatesFixedKeyHashJoinHashTable::new( + HashJoinHashMap::::with_build_row_num(build_num_rows), hash_method, - )) - } - (HashMethodKind::KeysU128(hash_method), true) => { - HashJoinHashTable::SkipDuplicatesKeysU128( - SkipDuplicatesFixedKeyHashJoinHashTable::new( - HashJoinHashMap::::with_build_row_num(build_num_rows), - hash_method, - ), - ) - } - (HashMethodKind::KeysU256(hash_method), false) => { - HashJoinHashTable::KeysU256(FixedKeyHashJoinHashTable::new( - HashJoinHashMap::::with_build_row_num(build_num_rows), + ), + ) + } + (HashMethodKind::KeysU256(hash_method), false) => { + HashJoinHashTable::KeysU256(FixedKeyHashJoinHashTable::new( + HashJoinHashMap::::with_build_row_num(build_num_rows), + hash_method, + )) + } + (HashMethodKind::KeysU256(hash_method), true) => { + HashJoinHashTable::SkipDuplicatesKeysU256( + SkipDuplicatesFixedKeyHashJoinHashTable::new( + HashJoinHashMap::::with_build_row_num(build_num_rows), hash_method, - )) - } - (HashMethodKind::KeysU256(hash_method), true) => { - HashJoinHashTable::SkipDuplicatesKeysU256( - SkipDuplicatesFixedKeyHashJoinHashTable::new( - HashJoinHashMap::::with_build_row_num(build_num_rows), - hash_method, - ), - ) - } - }; - } + ), + ) + } + }; + None } fn build_hash_table(&self, keys: DataBlock, chunk_idx: usize) -> Result<()> { @@ -311,6 +317,7 @@ impl BasicHashJoin { match self.state.hash_table.deref() { HashJoinHashTable::Null => (), + HashJoinHashTable::NestedLoop(_) => unreachable!(), HashJoinHashTable::Serializer(v) => v.insert(keys, chunk_idx, &mut arena)?, HashJoinHashTable::SingleBinary(v) => v.insert(keys, chunk_idx, &mut arena)?, HashJoinHashTable::KeysU8(v) => v.insert(keys, chunk_idx, &mut arena)?, @@ -345,4 +352,21 @@ impl BasicHashJoin { Ok(()) } + + fn build_nested_loop(&self) -> ProgressValues { + let mut progress = ProgressValues::default(); + let mut plain = vec![]; + while let Some(chunk_index) = self.state.steal_chunk_index() { + let chunk_block = &self.state.chunks[chunk_index]; + progress.rows += chunk_block.num_rows(); + progress.bytes += chunk_block.memory_size(); + plain.push(chunk_block.clone()); + } + debug_assert!(matches!( + *self.state.hash_table, + HashJoinHashTable::NestedLoop(_) + )); + *self.state.hash_table.as_mut() = HashJoinHashTable::NestedLoop(plain); + progress + } } diff --git a/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/basic_state.rs b/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/basic_state.rs index 897ed42c27eee..40fc675bc7834 100644 --- a/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/basic_state.rs +++ b/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/basic_state.rs @@ -15,6 +15,7 @@ use std::collections::VecDeque; use std::sync::Arc; use std::sync::Mutex; +use std::sync::PoisonError; use databend_common_expression::types::DataType; use databend_common_expression::ColumnVec; @@ -54,6 +55,22 @@ impl BasicHashJoinState { hash_table: CStyleCell::new(HashJoinHashTable::Null), } } + + pub(super) fn push_chunk(&self, chunk: DataBlock) { + let locked = self.mutex.lock(); + let _locked = locked.unwrap_or_else(PoisonError::into_inner); + + *self.build_rows.as_mut() += chunk.num_rows(); + let chunk_index = self.chunks.len(); + self.chunks.as_mut().push(chunk); + self.build_queue.as_mut().push_back(chunk_index); + } + + pub(super) fn steal_chunk_index(&self) -> Option { + let locked = self.mutex.lock(); + let _locked = locked.unwrap_or_else(PoisonError::into_inner); + self.build_queue.as_mut().pop_front() + } } impl Drop for BasicHashJoinState { diff --git a/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/inner_join.rs b/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/inner_join.rs index 3b1dab84b39a7..a5bebc663062f 100644 --- a/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/inner_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/inner_join.rs @@ -16,7 +16,6 @@ use std::ops::Deref; use std::sync::Arc; use databend_common_base::base::ProgressValues; -use databend_common_catalog::table_context::TableContext; use databend_common_column::bitmap::Bitmap; use databend_common_exception::ErrorCode; use databend_common_exception::Result; @@ -27,7 +26,10 @@ use databend_common_expression::DataBlock; use databend_common_expression::FilterExecutor; use databend_common_expression::FunctionContext; use databend_common_expression::HashMethodKind; +use databend_common_settings::Settings; +use super::basic::BasicHashJoin; +use super::basic_state::BasicHashJoinState; use crate::pipelines::processors::transforms::build_runtime_filter_packet; use crate::pipelines::processors::transforms::new_hash_join::hashtable::basic::ProbeStream; use crate::pipelines::processors::transforms::new_hash_join::hashtable::basic::ProbedRows; @@ -35,14 +37,11 @@ use crate::pipelines::processors::transforms::new_hash_join::hashtable::ProbeDat use crate::pipelines::processors::transforms::new_hash_join::join::EmptyJoinStream; use crate::pipelines::processors::transforms::new_hash_join::join::Join; use crate::pipelines::processors::transforms::new_hash_join::join::JoinStream; -use crate::pipelines::processors::transforms::new_hash_join::memory::basic::BasicHashJoin; -use crate::pipelines::processors::transforms::new_hash_join::memory::basic_state::BasicHashJoinState; use crate::pipelines::processors::transforms::new_hash_join::performance::PerformanceContext; use crate::pipelines::processors::transforms::HashJoinHashTable; use crate::pipelines::processors::transforms::JoinRuntimeFilterPacket; use crate::pipelines::processors::transforms::RuntimeFiltersDesc; use crate::pipelines::processors::HashJoinDesc; -use crate::sessions::QueryContext; pub struct InnerHashJoin { pub(crate) basic_hash_join: BasicHashJoin, @@ -55,23 +54,24 @@ pub struct InnerHashJoin { impl InnerHashJoin { pub fn create( - ctx: &QueryContext, + settings: &Settings, function_ctx: FunctionContext, method: HashMethodKind, desc: Arc, state: Arc, + nested_loop_join_threshold: usize, ) -> Result { - let settings = ctx.get_settings(); let block_size = settings.get_max_block_size()? as usize; let context = PerformanceContext::create(block_size, desc.clone(), function_ctx.clone()); let basic_hash_join = BasicHashJoin::create( - ctx, + settings, function_ctx.clone(), method, desc.clone(), state.clone(), + nested_loop_join_threshold, )?; Ok(InnerHashJoin { @@ -123,35 +123,38 @@ impl Join for InnerHashJoin { }; self.desc.remove_keys_nullable(&mut keys); - let probe_block = data.project(&self.desc.probe_projections); - - let joined_stream = - with_join_hash_method!(|T| match self.basic_state.hash_table.deref() { - HashJoinHashTable::T(table) => { - let probe_hash_statistics = &mut self.performance_context.probe_hash_statistics; - probe_hash_statistics.clear(probe_block.num_rows()); - - let probe_data = ProbeData::new(keys, valids, probe_hash_statistics); - let probe_keys_stream = table.probe_matched(probe_data)?; - - Ok(InnerHashJoinStream::create( - probe_block, - self.basic_state.clone(), - probe_keys_stream, - self.desc.clone(), - &mut self.performance_context.probe_result, - )) - } - HashJoinHashTable::Null => Err(ErrorCode::AbortedQuery( + let probe_block = data.project(&self.desc.probe_projection); + + let joined_stream = with_join_hash_method!(|T| match self.basic_state.hash_table.deref() { + HashJoinHashTable::T(table) => { + let probe_hash_statistics = &mut self.performance_context.probe_hash_statistics; + probe_hash_statistics.clear(probe_block.num_rows()); + + let probe_data = ProbeData::new(keys, valids, probe_hash_statistics); + let probe_keys_stream = table.probe_matched(probe_data)?; + + InnerHashJoinStream::create( + probe_block, + self.basic_state.clone(), + probe_keys_stream, + self.desc.clone(), + &mut self.performance_context.probe_result, + ) + } + HashJoinHashTable::Null => { + return Err(ErrorCode::AbortedQuery( "Aborted query, because the hash table is uninitialized.", - )), - })?; + )); + } + HashJoinHashTable::NestedLoop(_) => unreachable!(), + }); match &mut self.performance_context.filter_executor { None => Ok(joined_stream), Some(filter_executor) => Ok(InnerHashJoinFilterStream::create( joined_stream, filter_executor, + None, )), } } @@ -232,31 +235,27 @@ impl<'a> JoinStream for InnerHashJoinStream<'a> { (None, None) => DataBlock::new(vec![], self.probed_rows.matched_build.len()), }; - if !self.desc.probe_to_build.is_empty() { - for (index, (is_probe_nullable, is_build_nullable)) in - self.desc.probe_to_build.iter() - { - let entry = match (is_probe_nullable, is_build_nullable) { - (true, true) | (false, false) => result_block.get_by_offset(*index).clone(), - (true, false) => { - result_block.get_by_offset(*index).clone().remove_nullable() + for (index, (is_probe_nullable, is_build_nullable)) in + self.desc.probe_to_build.iter().cloned() + { + let entry = match (is_probe_nullable, is_build_nullable) { + (true, true) | (false, false) => result_block.get_by_offset(index).clone(), + (true, false) => result_block.get_by_offset(index).clone().remove_nullable(), + (false, true) => { + let entry = result_block.get_by_offset(index); + let col = entry.to_column(); + + match col.is_null() || col.is_nullable() { + true => entry.clone(), + false => BlockEntry::from(NullableColumn::new_column( + col, + Bitmap::new_constant(true, result_block.num_rows()), + )), } - (false, true) => { - let entry = result_block.get_by_offset(*index); - let col = entry.to_column(); - - match col.is_null() || col.is_nullable() { - true => entry.clone(), - false => BlockEntry::from(NullableColumn::new_column( - col, - Bitmap::new_constant(true, result_block.num_rows()), - )), - } - } - }; + } + }; - result_block.add_entry(entry); - } + result_block.add_entry(entry); } return Ok(Some(result_block)); @@ -264,19 +263,22 @@ impl<'a> JoinStream for InnerHashJoinStream<'a> { } } -struct InnerHashJoinFilterStream<'a> { +pub(super) struct InnerHashJoinFilterStream<'a> { inner: Box, filter_executor: &'a mut FilterExecutor, + field_reorder: Option<&'a [usize]>, } impl<'a> InnerHashJoinFilterStream<'a> { pub fn create( inner: Box, filter_executor: &'a mut FilterExecutor, + field_reorder: Option<&'a [usize]>, ) -> Box { Box::new(InnerHashJoinFilterStream { inner, filter_executor, + field_reorder, }) } } @@ -298,6 +300,16 @@ impl<'a> JoinStream for InnerHashJoinFilterStream<'a> { continue; } + let data_block = if let Some(field_reorder) = self.field_reorder { + DataBlock::from_iter( + field_reorder + .iter() + .map(|offset| data_block.get_by_offset(*offset).clone()), + data_block.num_rows(), + ) + } else { + data_block + }; return Ok(Some(data_block)); } } diff --git a/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/mod.rs b/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/mod.rs index 4979c37245fca..e8eb9fba13b79 100644 --- a/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/mod.rs +++ b/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/mod.rs @@ -15,7 +15,9 @@ mod basic; mod basic_state; mod inner_join; +mod nested_loop; pub mod outer_left_join; pub use basic_state::BasicHashJoinState; pub use inner_join::InnerHashJoin; +pub use nested_loop::*; diff --git a/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/nested_loop.rs b/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/nested_loop.rs new file mode 100644 index 0000000000000..d432791e44019 --- /dev/null +++ b/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/nested_loop.rs @@ -0,0 +1,268 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; +use std::sync::PoisonError; + +use databend_common_base::base::ProgressValues; +use databend_common_exception::Result; +use databend_common_expression::BlockEntry; +use databend_common_expression::Column; +use databend_common_expression::DataBlock; +use databend_common_expression::SELECTIVITY_THRESHOLD; +use databend_common_hashtable::RowPtr; + +use crate::pipelines::processors::transforms::new_hash_join::join::EmptyJoinStream; +use crate::pipelines::processors::transforms::BasicHashJoinState; +use crate::pipelines::processors::transforms::HashJoinHashTable; +use crate::pipelines::processors::transforms::Join; +use crate::pipelines::processors::transforms::JoinRuntimeFilterPacket; +use crate::pipelines::processors::transforms::JoinStream; +use crate::pipelines::processors::transforms::NestedLoopDesc; +use crate::pipelines::processors::transforms::RuntimeFiltersDesc; + +pub struct NestedLoopJoin { + inner: T, + state: Arc, + desc: NestedLoopDesc, +} + +impl NestedLoopJoin { + pub fn new(inner: T, state: Arc, desc: NestedLoopDesc) -> Self { + Self { inner, state, desc } + } + + fn finalize_chunks(&self) { + if !self.state.columns.is_empty() { + return; + } + + let locked = self.state.mutex.lock(); + let _locked = locked.unwrap_or_else(PoisonError::into_inner); + + if !self.state.columns.is_empty() { + return; + } + let num_columns = if let Some(block) = self.state.chunks.first() { + *self.state.column_types.as_mut() = block + .columns() + .iter() + .map(|entry| entry.data_type()) + .collect(); + block.num_columns() + } else { + return; + }; + + *self.state.columns.as_mut() = (0..num_columns) + .map(|offset| { + let full_columns = self + .state + .chunks + .iter() + .map(|block| block.get_by_offset(offset).to_column()) + .collect::>(); + + Column::take_downcast_column_vec(&full_columns) + }) + .collect(); + } +} + +impl Join for NestedLoopJoin { + fn add_block(&mut self, data: Option) -> Result<()> { + self.inner.add_block(data) + } + + fn final_build(&mut self) -> Result> { + self.inner.final_build() + } + + fn build_runtime_filter(&self, desc: &RuntimeFiltersDesc) -> Result { + self.inner.build_runtime_filter(desc) + } + + fn probe_block(&mut self, data: DataBlock) -> Result> { + if data.is_empty() || *self.state.build_rows == 0 { + return Ok(Box::new(EmptyJoinStream)); + } + + let HashJoinHashTable::NestedLoop(build_blocks) = &*self.state.hash_table else { + return self.inner.probe_block(data); + }; + self.finalize_chunks(); + + let max_block_size = self.desc.filter.max_block_size(); + Ok(Box::new(NestedLoopJoinStream { + probe_block: data, + build_blocks, + state: &self.state, + max_block_size, + desc: &mut self.desc, + matches: Vec::with_capacity(max_block_size), + build_block_index: 0, + build_row_index: 0, + use_range: false, + })) + } +} + +struct NestedLoopJoinStream<'a> { + probe_block: DataBlock, + build_blocks: &'a [DataBlock], + state: &'a BasicHashJoinState, + desc: &'a mut NestedLoopDesc, + max_block_size: usize, + build_block_index: usize, + build_row_index: usize, + matches: Vec<(u32, RowPtr)>, + use_range: bool, +} + +impl<'a> NestedLoopJoinStream<'a> { + fn process_next_row(&mut self) -> Result<()> { + let build_block = &self.build_blocks[self.build_block_index]; + + let probe_rows = self.probe_block.num_rows(); + let entries = self + .probe_block + .columns() + .iter() + .cloned() + .chain(build_block.columns().iter().map(|entry| { + BlockEntry::Const( + entry.index(self.build_row_index).unwrap().to_owned(), + entry.data_type(), + probe_rows, + ) + })) + .collect(); + let merged = DataBlock::new(entries, probe_rows); + let row_ptr = RowPtr { + chunk_index: self.build_block_index as u32, + row_index: self.build_row_index as u32, + }; + + let max_block_size = self.desc.filter.max_block_size(); + if merged.num_rows() <= max_block_size { + let result_count = self.desc.filter.select(&merged)?; + self.matches.extend( + self.desc.filter.true_selection()[..result_count] + .iter() + .map(|probe| (*probe, row_ptr)), + ); + } else { + for (i, block) in merged + .split_by_rows_no_tail(max_block_size) + .into_iter() + .enumerate() + { + let offset = (i * max_block_size) as u32; + let result_count = self.desc.filter.select(&block)?; + self.matches.extend( + self.desc.filter.true_selection()[..result_count] + .iter() + .map(|probe| (*probe + offset, row_ptr)), + ); + } + } + + self.build_row_index += 1; + if self.build_row_index >= build_block.num_rows() { + self.build_row_index = 0; + self.build_block_index += 1; + } + + Ok(()) + } + + fn emit_block(&mut self, count: usize) -> Result { + if !self.use_range + && self.matches.len() as f64 + > SELECTIVITY_THRESHOLD * self.probe_block.num_rows() as f64 + { + // Need to test the scenario where a probe matches multiple builds + self.use_range = true; + } + + let block = { + if self.use_range { + self.matches.sort_unstable_by_key(|(probe, _)| *probe); + } + let (probe_indices, build_indices): (Vec<_>, Vec<_>) = + self.matches.drain(..count).unzip(); + + let probe = self.probe_block.clone().project(&self.desc.projections); + let probe = if self.use_range { + let ranges = DataBlock::merge_indices_to_ranges(&probe_indices); + probe.take_ranges(&ranges, count)? + } else { + probe.take_with_optimize_size(&probe_indices)? + }; + + let build_entries = self + .state + .columns + .iter() + .zip(self.state.column_types.as_slice()) + .enumerate() + .filter_map(|(i, x)| { + let i = self.probe_block.num_columns() + i; + self.desc.projections.contains(&i).then_some(x) + }) + .map(|(columns, data_type)| { + Column::take_column_vec_indices( + columns, + data_type.clone(), + &build_indices, + count, + ) + .into() + }); + + DataBlock::from_iter(probe.take_columns().into_iter().chain(build_entries), count) + }; + + if let Some(field_reorder) = &self.desc.field_reorder { + Ok(DataBlock::from_iter( + field_reorder + .iter() + .map(|offset| block.get_by_offset(*offset).clone()), + block.num_rows(), + )) + } else { + Ok(block) + } + } +} + +impl<'a> JoinStream for NestedLoopJoinStream<'a> { + fn next(&mut self) -> Result> { + loop { + if self.matches.len() >= self.max_block_size { + return Ok(Some(self.emit_block(self.max_block_size)?)); + } + + if self.build_block_index >= self.build_blocks.len() { + return if self.matches.is_empty() { + Ok(None) + } else { + Ok(Some(self.emit_block(self.matches.len())?)) + }; + } + + self.process_next_row()?; + } + } +} diff --git a/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/outer_left_join.rs b/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/outer_left_join.rs index 8210b495bfca3..f6faa9db4f60c 100644 --- a/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/outer_left_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/outer_left_join.rs @@ -68,11 +68,12 @@ impl OuterLeftHashJoin { let context = PerformanceContext::create(block_size, desc.clone(), function_ctx.clone()); let basic_hash_join = BasicHashJoin::create( - ctx, + &settings, function_ctx.clone(), method, desc.clone(), state.clone(), + 0, )?; Ok(OuterLeftHashJoin { @@ -111,7 +112,7 @@ impl Join for OuterLeftHashJoin { .collect::>(); let build_block = null_build_block(&types, data.num_rows()); - let probe_block = Some(data.project(&self.desc.probe_projections)); + let probe_block = Some(data.project(&self.desc.probe_projection)); let result_block = final_result_block(&self.desc, probe_block, build_block, num_rows); return Ok(Box::new(OneBlockJoinStream(Some(result_block)))); } @@ -127,7 +128,7 @@ impl Join for OuterLeftHashJoin { }; self.desc.remove_keys_nullable(&mut keys); - let probe_block = data.project(&self.desc.probe_projections); + let probe_block = data.project(&self.desc.probe_projection); let probe_stream = with_join_hash_method!(|T| match self.basic_state.hash_table.deref() { HashJoinHashTable::T(table) => { @@ -137,6 +138,7 @@ impl Join for OuterLeftHashJoin { let probe_data = ProbeData::new(keys, valids, probe_hash_statistics); table.probe(probe_data) } + HashJoinHashTable::NestedLoop(_) => unreachable!(), HashJoinHashTable::Null => Err(ErrorCode::AbortedQuery( "Aborted query, because the hash table is uninitialized.", )), diff --git a/src/query/service/src/pipelines/processors/transforms/new_hash_join/mod.rs b/src/query/service/src/pipelines/processors/transforms/new_hash_join/mod.rs index 002e21b578204..30a71a4a0ee72 100644 --- a/src/query/service/src/pipelines/processors/transforms/new_hash_join/mod.rs +++ b/src/query/service/src/pipelines/processors/transforms/new_hash_join/mod.rs @@ -26,6 +26,7 @@ mod transform_hash_join; pub use grace::GraceHashJoin; pub use hash_join_factory::HashJoinFactory; pub use join::Join; +pub use join::JoinStream; pub use memory::BasicHashJoinState; pub use memory::InnerHashJoin; pub use runtime_filter::RuntimeFiltersDesc; diff --git a/src/query/service/src/pipelines/processors/transforms/new_hash_join/transform_hash_join.rs b/src/query/service/src/pipelines/processors/transforms/new_hash_join/transform_hash_join.rs index dce2ca6ecceb7..b9d50285c2a6a 100644 --- a/src/query/service/src/pipelines/processors/transforms/new_hash_join/transform_hash_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/new_hash_join/transform_hash_join.rs @@ -15,6 +15,7 @@ use std::any::Any; use std::fmt::Debug; use std::fmt::Formatter; +use std::marker::PhantomPinned; use std::sync::Arc; use databend_common_exception::Result; @@ -42,6 +43,7 @@ pub struct TransformHashJoin { stage_sync_barrier: Arc, projection: ColumnSet, rf_desc: Arc, + _p: PhantomPinned, } impl TransformHashJoin { @@ -67,6 +69,7 @@ impl TransformHashJoin { finished: false, build_data: None, }), + _p: PhantomPinned, })) } } @@ -117,7 +120,6 @@ impl Processor for TransformHashJoin { } } - #[allow(clippy::missing_transmute_annotations)] fn process(&mut self) -> Result<()> { match &mut self.stage { Stage::Finished => Ok(()), @@ -144,7 +146,9 @@ impl Processor for TransformHashJoin { if let Some(probe_data) = state.input_data.take() { let stream = self.join.probe_block(probe_data)?; // This is safe because both join and stream are properties of the struct. - state.stream = Some(unsafe { std::mem::transmute(stream) }); + state.stream = Some(unsafe { + std::mem::transmute::, Box>(stream) + }); } if let Some(mut stream) = state.stream.take() { @@ -161,7 +165,11 @@ impl Processor for TransformHashJoin { if let Some(final_stream) = self.join.final_probe()? { state.initialize = true; // This is safe because both join and stream are properties of the struct. - state.stream = Some(unsafe { std::mem::transmute(final_stream) }); + state.stream = Some(unsafe { + std::mem::transmute::, Box>( + final_stream, + ) + }); } else { state.finished = true; } diff --git a/src/query/service/src/pipelines/processors/transforms/range_join/range_join_state.rs b/src/query/service/src/pipelines/processors/transforms/range_join/range_join_state.rs index 45b5acea21e5e..19b88f044cd7d 100644 --- a/src/query/service/src/pipelines/processors/transforms/range_join/range_join_state.rs +++ b/src/query/service/src/pipelines/processors/transforms/range_join/range_join_state.rs @@ -17,12 +17,12 @@ use std::sync::atomic::AtomicU64; use std::sync::Arc; use databend_common_catalog::table_context::TableContext; -use databend_common_column::bitmap::Bitmap; use databend_common_column::bitmap::MutableBitmap; use databend_common_exception::Result; use databend_common_expression::types::DataType; use databend_common_expression::types::NumberDataType; use databend_common_expression::types::NumberScalar; +use databend_common_expression::BlockEntry; use databend_common_expression::ColumnBuilder; use databend_common_expression::DataBlock; use databend_common_expression::Evaluator; @@ -39,7 +39,6 @@ use crate::physical_plans::RangeJoinCondition; use crate::physical_plans::RangeJoinType; use crate::pipelines::executor::WatchNotify; use crate::pipelines::processors::transforms::range_join::IEJoinState; -use crate::pipelines::processors::transforms::wrap_true_validity; use crate::sessions::QueryContext; pub struct RangeJoinState { @@ -109,16 +108,17 @@ impl RangeJoinState { pub(crate) fn sink_right(&self, block: DataBlock) -> Result<()> { // Sink block to right table let mut right_table = self.right_table.write(); - let mut right_block = block; - if matches!(self.join_type, JoinType::Left | JoinType::LeftAsof) { - let validity = Bitmap::new_constant(true, right_block.num_rows()); - let nullable_right_columns = right_block - .columns() - .iter() - .map(|c| wrap_true_validity(c, right_block.num_rows(), &validity)) - .collect::>(); - right_block = DataBlock::new(nullable_right_columns, right_block.num_rows()); - } + let right_block = if matches!(self.join_type, JoinType::Left | JoinType::LeftAsof) { + let rows = block.num_rows(); + let nullable_right_columns = block + .take_columns() + .into_iter() + .map(BlockEntry::into_nullable) + .collect(); + DataBlock::new(nullable_right_columns, rows) + } else { + block + }; right_table.push(right_block); Ok(()) } @@ -126,16 +126,17 @@ impl RangeJoinState { pub(crate) fn sink_left(&self, block: DataBlock) -> Result<()> { // Sink block to left table let mut left_table = self.left_table.write(); - let mut left_block = block; - if matches!(self.join_type, JoinType::Right | JoinType::RightAsof) { - let validity = Bitmap::new_constant(true, left_block.num_rows()); - let nullable_left_columns = left_block - .columns() - .iter() - .map(|c| wrap_true_validity(c, left_block.num_rows(), &validity)) - .collect::>(); - left_block = DataBlock::new(nullable_left_columns, left_block.num_rows()); - } + let left_block = if matches!(self.join_type, JoinType::Right | JoinType::RightAsof) { + let rows = block.num_rows(); + let nullable_left_columns = block + .take_columns() + .into_iter() + .map(BlockEntry::into_nullable) + .collect(); + DataBlock::new(nullable_left_columns, rows) + } else { + block + }; left_table.push(left_block); Ok(()) } diff --git a/src/query/service/src/pipelines/processors/transforms/range_join/transform_range_join.rs b/src/query/service/src/pipelines/processors/transforms/range_join/transform_range_join.rs index bda89435842c7..d62382fcd3e05 100644 --- a/src/query/service/src/pipelines/processors/transforms/range_join/transform_range_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/range_join/transform_range_join.rs @@ -24,7 +24,7 @@ use databend_common_pipeline::core::OutputPort; use databend_common_pipeline::core::Processor; use databend_common_pipeline::sinks::Sink; -use crate::pipelines::processors::transforms::range_join::RangeJoinState; +use super::RangeJoinState; enum RangeJoinStep { Sink, diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index 795c82339870d..29c2ac09fe272 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -495,6 +495,13 @@ impl DefaultSettings { scope: SettingScope::Both, range: Some(SettingRange::Numeric(0..=u64::MAX)), }), + ("nested_loop_join_threshold", DefaultSettingValue { + value: UserSettingValue::UInt64(10000), + desc: "Set the threshold for use nested loop join. Setting it to 0 disable nested loop join.", + mode: SettingMode::Both, + scope: SettingScope::Both, + range: Some(SettingRange::Numeric(0..=u64::MAX)), + }), ("enable_bloom_runtime_filter", DefaultSettingValue { value: UserSettingValue::UInt64(1), desc: "Enables bloom runtime filter optimization for JOIN.", @@ -1491,7 +1498,7 @@ impl DefaultSettings { range: Some(SettingRange::Numeric(0..=1)), }), ("enable_experimental_new_join", DefaultSettingValue { - value: UserSettingValue::UInt64(0), + value: UserSettingValue::UInt64(1), desc: "Enables the experimental new join implement", mode: SettingMode::Both, scope: SettingScope::Both, diff --git a/src/query/settings/src/settings_getter_setter.rs b/src/query/settings/src/settings_getter_setter.rs index 6057058270e25..e0cb508822177 100644 --- a/src/query/settings/src/settings_getter_setter.rs +++ b/src/query/settings/src/settings_getter_setter.rs @@ -380,6 +380,10 @@ impl Settings { Ok(self.try_get_u64("inlist_to_join_threshold")? as usize) } + pub fn get_nested_loop_join_threshold(&self) -> Result { + self.try_get_u64("nested_loop_join_threshold") + } + pub fn get_bloom_runtime_filter(&self) -> Result { Ok(self.try_get_u64("enable_bloom_runtime_filter")? != 0) } diff --git a/src/query/sql/src/planner/plans/join.rs b/src/query/sql/src/planner/plans/join.rs index 7cc07b8b3909d..c7d6147379bc8 100644 --- a/src/query/sql/src/planner/plans/join.rs +++ b/src/query/sql/src/planner/plans/join.rs @@ -222,7 +222,11 @@ impl JoinEquiCondition { left.into_iter() .zip(right) .enumerate() - .map(|(index, (l, r))| JoinEquiCondition::new(l, r, is_null_equal.contains(&index))) + .map(|(index, (left, right))| Self { + left, + right, + is_null_equal: is_null_equal.contains(&index), + }) .collect() } } diff --git a/tests/sqllogictests/suites/crdb/join_small_block_size.test b/tests/sqllogictests/suites/crdb/join_small_block_size.test index e615a6616f191..9606b5d7a94df 100644 --- a/tests/sqllogictests/suites/crdb/join_small_block_size.test +++ b/tests/sqllogictests/suites/crdb/join_small_block_size.test @@ -207,7 +207,7 @@ SELECT * FROM empty AS a(x) JOIN onecolumn AS b(y) ON a.x = b.y statement ok SELECT * FROM empty AS a JOIN onecolumn AS b USING(x) -query IT +query II SELECT * FROM onecolumn AS a(x) LEFT OUTER JOIN empty AS b(y) ON a.x = b.y ORDER BY a.x ---- 42 NULL @@ -291,7 +291,7 @@ SELECT o.x, t.y FROM onecolumn o INNER JOIN twocolumn t ON (o.x=t.x AND t.y=53) ---- 42 53 -query IT +query II SELECT o.x, t.y FROM onecolumn o LEFT OUTER JOIN twocolumn t ON (o.x=t.x AND t.y=53) order by o.x ---- 42 53 @@ -333,27 +333,27 @@ CREATE TABLE b (i int, b bool) statement ok INSERT INTO b VALUES (2, true), (3, true), (4, false) -query III +query IIB SELECT * FROM a INNER JOIN b ON a.i = b.i ---- 2 2 1 3 3 1 -query ITT +query IIB SELECT * FROM a LEFT OUTER JOIN b ON a.i = b.i ---- 1 NULL NULL 2 2 1 3 3 1 -query III +query IIB SELECT * FROM a RIGHT OUTER JOIN b ON a.i = b.i order by b ---- 2 2 1 3 3 1 NULL 4 0 -query III +query IIB SELECT * FROM a FULL OUTER JOIN b ON a.i = b.i order by b ---- 1 NULL NULL @@ -361,7 +361,7 @@ SELECT * FROM a FULL OUTER JOIN b ON a.i = b.i order by b 3 3 1 NULL 4 0 -query III +query IIB SELECT * FROM a FULL OUTER JOIN b ON (a.i = b.i and a.i>2) ORDER BY a.i, b.i ---- 1 NULL NULL @@ -374,7 +374,7 @@ NULL 4 0 statement ok INSERT INTO b VALUES (3, false) -query III +query IIB SELECT * FROM a RIGHT OUTER JOIN b ON a.i=b.i ORDER BY b.i, b.b ---- 2 2 1 @@ -382,7 +382,7 @@ SELECT * FROM a RIGHT OUTER JOIN b ON a.i=b.i ORDER BY b.i, b.b 3 3 1 NULL 4 0 -query III +query IIB SELECT * FROM a FULL OUTER JOIN b ON a.i=b.i ORDER BY b.i, b.b ---- 1 NULL NULL @@ -414,7 +414,7 @@ SELECT * FROM onecolumn JOIN (SELECT x + 2 AS x FROM onecolumn) USING(x) ---- 44 -query IIIII +query IIII SELECT * FROM (twocolumn AS a JOIN twocolumn AS b USING(x) JOIN twocolumn AS c USING(x)) ORDER BY x LIMIT 1 ---- 42 53 53 53 @@ -548,7 +548,7 @@ SELECT * FROM pairs FULL OUTER JOIN square ON pairs.a + pairs.b = square.sq WHER 1 3 2 4 3 6 3 9 -query IITT +query IIII SELECT * FROM (SELECT * FROM pairs LEFT JOIN square ON b = sq AND a > 1 AND n < 6) WHERE b > 1 AND (n IS NULL OR n > 1) AND (n IS NULL OR a < sq) ---- 1 2 NULL NULL @@ -772,7 +772,7 @@ SELECT * FROM xyu INNER JOIN xyv ON xyu.x = xyv.x AND xyu.y = xyv.y AND xyu.x = ---- 1 1 1 1 1 1 -query IIITTT +query IIIIII SELECT * FROM xyu LEFT OUTER JOIN xyv ON xyu.x = xyv.x AND xyu.y = xyv.y AND xyu.x = 1 AND xyu.y < 10 ---- 0 0 0 NULL NULL NULL @@ -807,7 +807,7 @@ NULL 5 5 55 # query # SELECT * FROM (SELECT * FROM xyu ORDER BY x, y) AS xyu FULL OUTER JOIN (SELECT * FROM xyv ORDER BY x, y) AS xyv USING(x, y) WHERE x > 2 -query IIITTT +query IIIIII SELECT * FROM (SELECT * FROM xyu ORDER BY x, y) AS xyu LEFT OUTER JOIN (SELECT * FROM xyv ORDER BY x, y) AS xyv ON xyu.x = xyv.x AND xyu.y = xyv.y AND xyu.x = 1 AND xyu.y < 10 ---- 0 0 0 NULL NULL NULL @@ -888,7 +888,7 @@ SELECT * FROM foo NATURAL JOIN bar 2 2 2.0 2.0 3 3 3.0 3.0 -query II??I?I +query IIRRIRI SELECT * FROM foo JOIN bar USING (b) ---- 1 1 1.0 1.0 1 1.0 1 @@ -944,7 +944,7 @@ SELECT * FROM foo JOIN bar USING (a, b) WHERE foo.c = bar.c AND foo.d = bar.d 2 2 2.0 2.0 2.0 2 3 3 3.0 3.0 3.0 3 -query TII +query III SELECT * FROM onecolumn AS a(x) RIGHT JOIN twocolumn ON false order by y ---- NULL 42 53 diff --git a/tests/sqllogictests/suites/duckdb/join/full_outer/test_full_outer_join_issue_4252.test b/tests/sqllogictests/suites/duckdb/join/full_outer/test_full_outer_join_issue_4252.test index 37c4131f55009..66195dfe18dce 100644 --- a/tests/sqllogictests/suites/duckdb/join/full_outer/test_full_outer_join_issue_4252.test +++ b/tests/sqllogictests/suites/duckdb/join/full_outer/test_full_outer_join_issue_4252.test @@ -48,7 +48,7 @@ INSERT INTO df3 VALUES ('2022-02-03', 2000, 'org2'), ('2022-04-01', 3000, 'org3'); -query II +query ?R SELECT coalesce(anon_1.month, anon_2.month) AS month, coalesce(coalesce(CAST(anon_1.value AS float), 0.0) + coalesce(CAST(anon_2.value AS float), 0.0), 0.0) AS value @@ -93,7 +93,7 @@ FULL OUTER JOIN ( ---- 2022-01-01 1100.0 -query II +query ?R SELECT coalesce(anon_1.month, anon_2.month) AS month, coalesce(coalesce(CAST(anon_1.value AS float), 0.0) + coalesce(CAST(anon_2.value AS float), 0.0), 0.0) AS value diff --git a/tests/sqllogictests/suites/duckdb/join/iejoin/test_iejoin_east_west.test b/tests/sqllogictests/suites/duckdb/join/iejoin/test_iejoin_east_west.test index b193a7c74faf3..f7f9918cad0fc 100644 --- a/tests/sqllogictests/suites/duckdb/join/iejoin/test_iejoin_east_west.test +++ b/tests/sqllogictests/suites/duckdb/join/iejoin/test_iejoin_east_west.test @@ -21,7 +21,7 @@ CREATE TABLE west AS SELECT * FROM (VALUES ) west(rid, t_id, time, cost, cores) # Qs -query II +query TT SELECT s1.rid, s2.rid FROM west s1, west s2 WHERE s1.time > s2.time @@ -35,7 +35,7 @@ s2 s4 s4 s3 # Qp -query II +query TT SELECT s1.rid, s2.rid FROM west s1, west s2 WHERE s1.time > s2.time AND s1.cost < s2.cost @@ -45,7 +45,7 @@ s1 s3 s4 s3 # Qt -query II +query TT SELECT east.rid, west.rid FROM east, west WHERE east.dur < west.time AND east.rev > west.cost @@ -54,7 +54,7 @@ ORDER BY 1, 2 r2 s2 # Test string comparisons -query II +query TT WITH weststr AS ( SELECT rid, time::VARCHAR AS time, cost::VARCHAR as cost FROM west diff --git a/tests/sqllogictests/suites/duckdb/join/inner/equality_join_limits.test b/tests/sqllogictests/suites/duckdb/join/inner/equality_join_limits.test index 70be64638f413..1ef504ff4fe9d 100644 --- a/tests/sqllogictests/suites/duckdb/join/inner/equality_join_limits.test +++ b/tests/sqllogictests/suites/duckdb/join/inner/equality_join_limits.test @@ -1,18 +1,12 @@ -statement ok -drop table if exists t; - -statement ok -drop table if exists u; - # TINYINT limits statement ok -CREATE TABLE t(t_k0 TINYINT); +CREATE OR REPLACE TABLE t(t_k0 TINYINT); statement ok INSERT INTO t VALUES (-128), (127); statement ok -CREATE TABLE u(u_k0 TINYINT); +CREATE OR REPLACE TABLE u(u_k0 TINYINT); statement ok INSERT INTO u VALUES (-128), (127); diff --git a/tests/sqllogictests/suites/query/join/join.test b/tests/sqllogictests/suites/query/join/join.test index 5b58536f71ba1..b68c0cfa5e47a 100644 --- a/tests/sqllogictests/suites/query/join/join.test +++ b/tests/sqllogictests/suites/query/join/join.test @@ -15,17 +15,17 @@ select * from (select * from numbers(100)) n join t1 on n.number = t1.a; ---- # right semi with empty build side -query II +query I select * from (select * from numbers(100)) n right semi join t1 on n.number = t1.a; ---- # right anti with empty build side -query II +query I select * from (select * from numbers(100)) n right anti join t1 on n.number = t1.a; ---- # left semi with empty build side -query II +query I select * from (select * from numbers(100)) n left semi join t1 on n.number = t1.a; ---- @@ -97,7 +97,7 @@ insert into t1 values(2, 2); statement ok insert into t2 values(2, 6), (2, 8); -query I +query III select t1.a, t2.b, t1.b from t1 inner join t2 on t1.a = t2.a order by t1.a, t2.b, t1.b; ---- 1 2 2 @@ -139,7 +139,7 @@ select t1.id from t1 left join t2 on t1.id = t2.id where t1.val >= t2.val; 2 2 -query I +query II select t1.id, t1.val from t1 left join t2 on t1.id = t2.id and t1.val = t2.val where t1.val >= t2.val; ---- 2 1696549154013 @@ -162,7 +162,7 @@ create or replace table t1(id int, col1 varchar); statement ok insert into t1 values(1, 'c'), (3, 'd'); -query I rowsort +query IITIIT rowsort SELECT * FROM t JOIN t1, t as t2 JOIN t1 as t3; ---- 1 1 c 1 1 c @@ -265,7 +265,7 @@ select * from (select a, 'A' as name from t) t1 full outer join (select number f 2 A 2 3 A 3 -query ITI +query IIT select * from (select number from numbers(5)) t2 full outer join (select a, 'A' as name from t) t1 on t1.a = t2.number where t1.name is not null order by a; ---- 1 1 A @@ -291,15 +291,15 @@ select * from t join t1 on t.a = t1.a 1 1.0 0 2 2.0 1 -query ITB +query ITB rowsort select * from t join t1 on t.a = t1.b ---- -3 1 1 -2 1 1 1 1 1 -3 2.0 1 -2 2.0 1 1 2.0 1 +2 1 1 +2 2.0 1 +3 1 1 +3 2.0 1 statement ok drop table if exists t; diff --git a/tests/sqllogictests/suites/query/join/large_query.test b/tests/sqllogictests/suites/query/join/large_query.test index 14e35ed68676a..6ddac845443ad 100644 --- a/tests/sqllogictests/suites/query/join/large_query.test +++ b/tests/sqllogictests/suites/query/join/large_query.test @@ -1,242 +1,122 @@ statement ok -drop table if exists t1; +create or replace table t1 (id int, c1 int); statement ok -drop table if exists t2; +create or replace table t2 (id int, c1 int); statement ok -drop table if exists t3; +create or replace table t3 (id int, c1 int); statement ok -drop table if exists t4; +create or replace table t4 (id int, c1 int); statement ok -drop table if exists t5; +create or replace table t5 (id int, c1 int); statement ok -drop table if exists t6; +create or replace table t6 (id int, c1 int); statement ok -drop table if exists t7; +create or replace table t7 (id int, c1 int); statement ok -drop table if exists t8; +create or replace table t8 (id int, c1 int); statement ok -drop table if exists t9; +create or replace table t9 (id int, c1 int); statement ok -drop table if exists t10; +create or replace table t10 (id int, c1 int); statement ok -drop table if exists t11; +create or replace table t11 (id int, c1 int); statement ok -drop table if exists t12; +create or replace table t12 (id int, c1 int); statement ok -drop table if exists t13; +create or replace table t13 (id int, c1 int); statement ok -drop table if exists t14; +create or replace table t14 (id int, c1 int); statement ok -drop table if exists t15; +create or replace table t15 (id int, c1 int); statement ok -drop table if exists t16; +create or replace table t16 (id int, c1 int); statement ok -drop table if exists t17; +create or replace table t17 (id int, c1 int); statement ok -drop table if exists t18; +create or replace table t18 (id int, c1 int); statement ok -drop table if exists t19; +create or replace table t19 (id int, c1 int); statement ok -drop table if exists t20; +create or replace table t20 (id int, c1 int); statement ok -drop table if exists t21; +create or replace table t21 (id int, c1 int); statement ok -drop table if exists t22; +create or replace table t22 (id int, c1 int); statement ok -drop table if exists t23; +create or replace table t23 (id int, c1 int); statement ok -drop table if exists t24; +create or replace table t24 (id int, c1 int); statement ok -drop table if exists t25; +create or replace table t25 (id int, c1 int); statement ok -drop table if exists t26; +create or replace table t26 (id int, c1 int); statement ok -drop table if exists t27; +create or replace table t27 (id int, c1 int); statement ok -drop table if exists t28; +create or replace table t28 (id int, c1 int); statement ok -drop table if exists t29; +create or replace table t29 (id int, c1 int); statement ok -drop table if exists t30; +create or replace table t30 (id int, c1 int); statement ok -drop table if exists t31; +create or replace table t31 (id int, c1 int); statement ok -drop table if exists t32; +create or replace table t32 (id int, c1 int); statement ok -drop table if exists t33; +create or replace table t33 (id int, c1 int); statement ok -drop table if exists t34; +create or replace table t34 (id int, c1 int); statement ok -drop table if exists t35; +create or replace table t35 (id int, c1 int); statement ok -drop table if exists t36; +create or replace table t36 (id int, c1 int); statement ok -drop table if exists t37; +create or replace table t37 (id int, c1 int); statement ok -drop table if exists t38; +create or replace table t38 (id int, c1 int); statement ok -drop table if exists t39; +create or replace table t39 (id int, c1 int); statement ok -drop table if exists t40; - -statement ok -create table t1 (id int, c1 int); - -statement ok -create table t2 (id int, c1 int); - -statement ok -create table t3 (id int, c1 int); - -statement ok -create table t4 (id int, c1 int); - -statement ok -create table t5 (id int, c1 int); - -statement ok -create table t6 (id int, c1 int); - -statement ok -create table t7 (id int, c1 int); - -statement ok -create table t8 (id int, c1 int); - -statement ok -create table t9 (id int, c1 int); - -statement ok -create table t10 (id int, c1 int); - -statement ok -create table t11 (id int, c1 int); - -statement ok -create table t12 (id int, c1 int); - -statement ok -create table t13 (id int, c1 int); - -statement ok -create table t14 (id int, c1 int); - -statement ok -create table t15 (id int, c1 int); - -statement ok -create table t16 (id int, c1 int); - -statement ok -create table t17 (id int, c1 int); - -statement ok -create table t18 (id int, c1 int); - -statement ok -create table t19 (id int, c1 int); - -statement ok -create table t20 (id int, c1 int); - -statement ok -create table t21 (id int, c1 int); - -statement ok -create table t22 (id int, c1 int); - -statement ok -create table t23 (id int, c1 int); - -statement ok -create table t24 (id int, c1 int); - -statement ok -create table t25 (id int, c1 int); - -statement ok -create table t26 (id int, c1 int); - -statement ok -create table t27 (id int, c1 int); - -statement ok -create table t28 (id int, c1 int); - -statement ok -create table t29 (id int, c1 int); - -statement ok -create table t30 (id int, c1 int); - -statement ok -create table t31 (id int, c1 int); - -statement ok -create table t32 (id int, c1 int); - -statement ok -create table t33 (id int, c1 int); - -statement ok -create table t34 (id int, c1 int); - -statement ok -create table t35 (id int, c1 int); - -statement ok -create table t36 (id int, c1 int); - -statement ok -create table t37 (id int, c1 int); - -statement ok -create table t38 (id int, c1 int); - -statement ok -create table t39 (id int, c1 int); - -statement ok -create table t40 (id int, c1 int); +create or replace table t40 (id int, c1 int); statement ok insert into t1 (id, c1) VALUES (0, 36); @@ -299,7 +179,7 @@ statement ok insert into t20 (id, c1) VALUES (0, 1); # Large query -query I +query IIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIII select * from t1 join t2 on t1.id = t2.id join t3 on t2.id = t3.id diff --git a/tests/sqllogictests/suites/query/join_small_block_size.test b/tests/sqllogictests/suites/query/join_small_block_size.test index 789e04c831741..fa71d57cfa89e 100644 --- a/tests/sqllogictests/suites/query/join_small_block_size.test +++ b/tests/sqllogictests/suites/query/join_small_block_size.test @@ -26,7 +26,7 @@ create table t2(a int, b int) statement ok insert into t2 values(1, 2), (1, 4), (2, 6), (2, 8), (3, 10); -query I +query IIII select * from t1 left join t2 on t1.a = t2.a order by t2.b, t1.b; ---- 1 NULL 1 2 @@ -64,19 +64,13 @@ statement ok use default statement ok -drop table if exists t1 - -statement ok -create table t1(a int, b int) +create or replace table t1(a int, b int) statement ok insert into t1 values(7, 8), (3, 4), (5, 6) statement ok -drop table if exists t2 - -statement ok -create table t2(a int, d int) +create or replace table t2(a int, d int) statement ok insert into t2 values(1, 2), (3, 4), (5, 6) @@ -205,17 +199,17 @@ select * from (select * from numbers(100)) n join t1 on n.number = t1.a; ---- # right semi with empty build side -query II +query I select * from (select * from numbers(100)) n right semi join t1 on n.number = t1.a; ---- # right anti with empty build side -query II +query I select * from (select * from numbers(100)) n right anti join t1 on n.number = t1.a; ---- # left semi with empty build side -query II +query I select * from (select * from numbers(100)) n left semi join t1 on n.number = t1.a; ---- @@ -293,7 +287,7 @@ insert into t1 values(2, 2); statement ok insert into t2 values(2, 6), (2, 8); -query I +query III select t1.a, t2.b, t1.b from t1 inner join t2 on t1.a = t2.a order by t1.a, t2.b, t1.b; ---- 1 2 2 @@ -331,7 +325,7 @@ select t1.id from t1 left join t2 on t1.id = t2.id where t1.val >= t2.val; 2 2 -query I +query II select t1.id, t1.val from t1 left join t2 on t1.id = t2.id and t1.val = t2.val where t1.val >= t2.val; ---- 2 1696549154013 @@ -354,7 +348,7 @@ create table t1(id int, col1 varchar); statement ok insert into t1 values(1, 'c'), (3, 'd'); -query I rowsort +query IITIIT rowsort SELECT * FROM t JOIN t1, t as t2 JOIN t1 as t3; ---- 1 1 c 1 1 c @@ -554,7 +548,7 @@ INSERT INTO t2(c1float) VALUES (0.9702655076980591); statement ok INSERT INTO t2(c1float, c2varchar) VALUES (0.5340723991394043, '02'), (0.4661566913127899, '1261837'); -query IIR +query BBR SELECT t0.c0boolean, t1.c0boolean, t1.c1float FROM t0, t1 RIGHT JOIN t2 ON t1.c0boolean order by t0.c0boolean; ---- 0 NULL NULL @@ -600,7 +594,7 @@ create table t2(a int, b int) statement ok insert into t2 values(1, 2), (1, 4), (1, 6), (1, 8), (1, 10); -query I +query IIII select * from t1 left join t2 on t1.a = t2.a order by t2.b, t1.b; ---- 1 2 1 2 @@ -614,7 +608,7 @@ select * from t1 left join t2 on t1.a = t2.a order by t2.b, t1.b; 5 10 NULL NULL # left join with conjunct -query II +query IIII select * from t1 left join t2 on t1.a = t2.a and t1.b > t2.b order by t1.a, t2.a ---- 1 2 NULL NULL @@ -642,7 +636,7 @@ INSERT INTO t1 VALUES (1, 2), (2, 3), (3, 4); statement ok INSERT INTO t2 VALUES (1, 10), (2, 20); -query I +query IIII SELECT * FROM t1 LEFT OUTER JOIN t2 ON TRUE AND t1.i=t2.k AND FALSE order by i, j; ---- 1 2 NULL NULL @@ -706,14 +700,14 @@ create table t2(b int null); statement ok insert into t2 values(1), (NULL), (3); -query I +query II select * from t1 join t2 on t1.a < t2.b order by t1.a; ---- 1 3 2 3 -query I +query II select * from t1 join t2 on t1.a <= t2.b order by t1.a, t2.b desc; ---- 1 3 @@ -744,7 +738,7 @@ CREATE TABLE twocolumn (x INT NULL, y INT NULL); statement ok INSERT INTO twocolumn(x, y) VALUES (44,51), (NULL,52), (42,53), (45,45); -query I +query II select * from onecolumn as a right anti join twocolumn as b on a.x = b.x and a.x > 42 order by b.x; ---- 42 53 diff --git a/tests/sqllogictests/suites/tpch/join.test b/tests/sqllogictests/suites/tpch/join.test index 63db102dbc85a..2ebb83b6dab07 100644 --- a/tests/sqllogictests/suites/tpch/join.test +++ b/tests/sqllogictests/suites/tpch/join.test @@ -4,7 +4,7 @@ set sandbox_tenant = 'test_tenant'; statement ok use tpch_test; -query I +query II select c_custkey, count(o_orderkey) as c_count from @@ -71,7 +71,7 @@ from 103 103 -query I +query II select c_custkey, count(o_orderkey) as c_count from @@ -107,7 +107,7 @@ order by c_custkey 20 0 -query I +query II select c_custkey, count(o_orderkey) as c_count from @@ -330,7 +330,7 @@ select l_orderkey from (select * from lineitem order by l_orderkey limit 5000) a 3 # LEFT OUTER / LEFT SINGEL / FULL -query I +query I?I select l_orderkey, o_orderdate, o_shippriority from lineitem left join orders on l_orderkey = o_orderkey and o_orderdate < to_date('1995-03-15') order by o_orderdate, l_orderkey limit 5; ---- 3271 1992-01-01 0