From c991d51f907430ce04a55aab33b3319ed80a63a2 Mon Sep 17 00:00:00 2001 From: Vitalii Gridnev Date: Mon, 22 Sep 2025 14:03:23 -0700 Subject: [PATCH 1/2] fix slj logic with empty list of points after the predicate extraction (#25548) --- ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp | 78 ++++++++++++++++--- .../kqp/ut/join/kqp_index_lookup_join_ut.cpp | 4 - 2 files changed, 66 insertions(+), 16 deletions(-) diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp index 88b9a497dd65..c022122f530c 100644 --- a/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp +++ b/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp @@ -556,6 +556,8 @@ TMaybeNode KqpJoinToIndexLookupImpl(const TDqJoin& join, TExprContext TSet deduplicateLeftColumns; TVector prefixFilters; + TVector nullLookupMembersItems; + for (auto& rightColumnName : rightTableDesc.Metadata->KeyColumnNames) { TExprNode::TPtr member; @@ -648,11 +650,22 @@ TMaybeNode KqpJoinToIndexLookupImpl(const TDqJoin& join, TExprContext .Add(member) .Done()); + auto rightType = rightTableDesc.GetColumnType(rightColumnName); + YQL_ENSURE(rightType); + if (rightType->GetKind() == ETypeAnnotationKind::Data) { + rightType = ctx.MakeType(rightType); + } + + nullLookupMembersItems.emplace_back( + ctx.MakeType(rightColumnName, rightType)); + if (leftColumn) { skipNullColumns.emplace_back(ctx.NewAtom(join.Pos(), rightColumnName)); } } + const TTypeAnnotationNode* nullLookupMembers = ctx.MakeType(ctx.MakeType(nullLookupMembersItems)); + if (lookupMembers.size() <= fixedPrefix) { return {}; } @@ -763,21 +776,62 @@ TMaybeNode KqpJoinToIndexLookupImpl(const TDqJoin& join, TExprContext .Add(leftRowArg) .Done(); - auto leftInput = Build(ctx, join.Pos()) - .Input(leftData) - .Lambda() - .Args({leftRowArg}) - .Body() - .Input(rightPrefixExpr.Cast()) - .Lambda() - .Args({prefixRowArg}) - .Body(leftRowTuple) + TMaybeNode leftInput; + if (fixedPrefix > 0) { + /* + When `fixedPrefix > 0`, it indicates a point predicate on the right table (e.g., `SELECT ... FROM b WHERE PK = $param`). + + However, the predicate extraction logic cannot guarantee that the `rightPrefixExpr` will produce a + non-empty list of point or range conditions. It might happen if the predicate cointains NULLs (e.g. PK = NULL is always false) + + This guarantee is critical for a LEFT JOIN. The join's correctness depends on knowing + if there is at least one matching row on the right side to determine if the result should be extended or filled with NULLs. + Therefore, we must explicitly check if the generated list from `rightPrefixExpr` is empty or not. + */ + leftInput = Build(ctx, join.Pos()) + .Input(leftData) + .Lambda() + .Args({leftRowArg}) + .Body() + .Predicate() + .Left().List(rightPrefixExpr.Cast()).Build() + .Right().Literal().Value("0").Build().Build() + .Build() + .ThenValue() + .Add() + .Add() + .OptionalType(NCommon::BuildTypeExpr(join.Pos(), *nullLookupMembers, ctx)) + .Build() + .Add(leftRowArg) + .Build() + .Build() + .ElseValue() + .Input(rightPrefixExpr.Cast()) + .Lambda() + .Args({prefixRowArg}) + .Body(leftRowTuple) + .Build() + .Build() .Build() .Build() - .Build() - .Done(); + .Done(); + } else { + leftInput = Build(ctx, join.Pos()) + .Input(leftData) + .Lambda() + .Args({leftRowArg}) + .Body() + .Input(rightPrefixExpr.Cast()) + .Lambda() + .Args({prefixRowArg}) + .Body(leftRowTuple) + .Build() + .Build() + .Build() + .Done(); + } - return BuildKqpStreamIndexLookupJoin(join, leftInput, indexName, *prefixLookup, *rightReadMatch, rightTableUnmatchedJoinKeys, ctx); + return BuildKqpStreamIndexLookupJoin(join, leftInput.Cast(), indexName, *prefixLookup, *rightReadMatch, rightTableUnmatchedJoinKeys, ctx); } auto leftDataDeduplicated = DeduplicateByMembers(leftData, filter, deduplicateLeftColumns, ctx, join.Pos()); diff --git a/ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp b/ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp index b3078a8ecceb..8bf7c269c048 100644 --- a/ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp +++ b/ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp @@ -1169,10 +1169,6 @@ Y_UNIT_TEST_TWIN(LeftJoinNonPkJoinConditions, StreamLookupJoin) { } Y_UNIT_TEST_TWIN(LeftJoinPointPredicateAndJoinAfterThat, StreamLookupJoin) { - if (StreamLookupJoin) { - return; - } - auto tester = TTester{ .Query=R"( DECLARE $idx_a AS List; From 73a697c0705148b5be9bf86b1e35a5f4a697d9d2 Mon Sep 17 00:00:00 2001 From: Vitalii Gridnev Date: Wed, 24 Sep 2025 05:10:34 -0700 Subject: [PATCH 2/2] swap join key and left row arg in stream lookup join input (#23678) --- ydb/core/kqp/opt/kqp_query_plan.cpp | 4 +-- ydb/core/kqp/opt/kqp_type_ann.cpp | 16 +++++----- .../kqp/opt/logical/kqp_opt_log_indexes.cpp | 31 +------------------ ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp | 4 +-- .../effects/kqp_opt_phy_vector_index.cpp | 10 +++--- .../opt/physical/kqp_opt_phy_build_stage.cpp | 12 +++---- .../kqp/query_compiler/kqp_query_compiler.cpp | 4 +-- .../kqp/runtime/kqp_stream_lookup_worker.cpp | 4 +-- 8 files changed, 28 insertions(+), 57 deletions(-) diff --git a/ydb/core/kqp/opt/kqp_query_plan.cpp b/ydb/core/kqp/opt/kqp_query_plan.cpp index 339c92d971b3..c6881209d8cf 100644 --- a/ydb/core/kqp/opt/kqp_query_plan.cpp +++ b/ydb/core/kqp/opt/kqp_query_plan.cpp @@ -571,8 +571,8 @@ class TxPlanSerializer { } else if (inputItemType->GetKind() == ETypeAnnotationKind::Tuple) { planNode.TypeName = "TableLookupJoin"; const auto inputTupleType = inputItemType->Cast(); - YQL_ENSURE(inputTupleType->GetItems()[0]->GetKind() == ETypeAnnotationKind::Optional); - const auto joinKeyType = inputTupleType->GetItems()[0]->Cast()->GetItemType(); + YQL_ENSURE(inputTupleType->GetItems()[1]->GetKind() == ETypeAnnotationKind::Optional); + const auto joinKeyType = inputTupleType->GetItems()[1]->Cast()->GetItemType(); YQL_ENSURE(joinKeyType->GetKind() == ETypeAnnotationKind::Struct); lookupKeyColumnsStruct = joinKeyType->Cast(); } diff --git a/ydb/core/kqp/opt/kqp_type_ann.cpp b/ydb/core/kqp/opt/kqp_type_ann.cpp index 0e3d75b4becf..79839da99f85 100644 --- a/ydb/core/kqp/opt/kqp_type_ann.cpp +++ b/ydb/core/kqp/opt/kqp_type_ann.cpp @@ -522,21 +522,21 @@ TStatus AnnotateLookupTable(const TExprNode::TPtr& node, TExprContext& ctx, cons return TStatus::Error; } - if (!EnsureOptionalType(node->Pos(), *tupleType->GetItems()[0], ctx)) { + if (!EnsureOptionalType(node->Pos(), *tupleType->GetItems()[1], ctx)) { return TStatus::Error; } - auto joinKeyType = tupleType->GetItems()[0]->Cast()->GetItemType(); + auto joinKeyType = tupleType->GetItems()[1]->Cast()->GetItemType(); if (!EnsureStructType(node->Pos(), *joinKeyType, ctx)) { return TStatus::Error; } - if (!EnsureStructType(node->Pos(), *tupleType->GetItems()[1], ctx)) { + if (!EnsureStructType(node->Pos(), *tupleType->GetItems()[0], ctx)) { return TStatus::Error; } structType = joinKeyType->Cast(); - auto leftRowType = tupleType->GetItems()[1]->Cast(); + auto leftRowType = tupleType->GetItems()[0]->Cast(); TVector outputTypes; outputTypes.push_back(leftRowType); @@ -1882,21 +1882,21 @@ TStatus AnnotateStreamLookupConnection(const TExprNode::TPtr& node, TExprContext return TStatus::Error; } - if (!EnsureOptionalType(node->Pos(), *inputTupleType->GetItems()[0], ctx)) { + if (!EnsureOptionalType(node->Pos(), *inputTupleType->GetItems()[1], ctx)) { return TStatus::Error; } - auto joinKeyType = inputTupleType->GetItems()[0]->Cast()->GetItemType(); + auto joinKeyType = inputTupleType->GetItems()[1]->Cast()->GetItemType(); if (!EnsureStructType(node->Pos(), *joinKeyType, ctx)) { return TStatus::Error; } - if (!EnsureStructType(node->Pos(), *inputTupleType->GetItems()[1], ctx)) { + if (!EnsureStructType(node->Pos(), *inputTupleType->GetItems()[0], ctx)) { return TStatus::Error; } const TStructExprType* joinKeys = joinKeyType->Cast(); - const TStructExprType* leftRowType = inputTupleType->GetItems()[1]->Cast(); + const TStructExprType* leftRowType = inputTupleType->GetItems()[0]->Cast(); for (const auto& inputKey : joinKeys->GetItems()) { if (!table.second->GetKeyColumnIndex(TString(inputKey->GetName()))) { diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log_indexes.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log_indexes.cpp index 84d1a699f489..062c6772db02 100644 --- a/ydb/core/kqp/opt/logical/kqp_opt_log_indexes.cpp +++ b/ydb/core/kqp/opt/logical/kqp_opt_log_indexes.cpp @@ -855,41 +855,12 @@ TExprBase KqpRewriteStreamLookupIndex(const TExprBase& node, TExprContext& ctx, .Settings(streamLookupIndex.Settings()) .Done(); - TMaybeNode lookupKeys; - if (settings.Strategy == EStreamLookupStrategyType::LookupJoinRows || settings.Strategy == EStreamLookupStrategyType::LookupSemiJoinRows) { - // Result type of lookupIndexTable: list, rowMeta>>, - // expected input type for main table stream join: list, left_row, rowMeta>>, - // so we should transform list>> to list, left_row, rowMeta>> - lookupKeys = Build(ctx, node.Pos()) - .Input(lookupIndexTable) - .Lambda() - .Args({"tuple"}) - .Body() - .Add() - .Tuple("tuple") - .Index().Value("1").Build() - .Build() - .Add() - .Tuple("tuple") - .Index().Value("0").Build() - .Build() - .Add() - .Tuple("tuple") - .Index().Value("2").Build() - .Build() - .Build() - .Build() - .Done(); - } else { - lookupKeys = lookupIndexTable; - } - // We should allow lookup by null keys here, // because main table pk can contain nulls and we don't want to lose these rows settings.AllowNullKeysPrefixSize = keyColumnsList.Size(); return Build(ctx, node.Pos()) .Table(streamLookupIndex.Table()) - .LookupKeys(lookupKeys.Cast()) + .LookupKeys(lookupIndexTable) .Columns(streamLookupIndex.Columns()) .Settings(settings.BuildNode(ctx, node.Pos())) .Done(); diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp index c022122f530c..4e8b75f4de76 100644 --- a/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp +++ b/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp @@ -767,13 +767,13 @@ TMaybeNode KqpJoinToIndexLookupImpl(const TDqJoin& join, TExprContext YQL_ENSURE(joinKeyPredicate.IsValid()); auto leftRowTuple = Build(ctx, join.Pos()) + .Add(leftRowArg) .Add() .Predicate(joinKeyPredicate.Cast()) .Value() .Add(lookupMembers) .Build() .Build() - .Add(leftRowArg) .Done(); TMaybeNode leftInput; @@ -799,10 +799,10 @@ TMaybeNode KqpJoinToIndexLookupImpl(const TDqJoin& join, TExprContext .Build() .ThenValue() .Add() + .Add(leftRowArg) .Add() .OptionalType(NCommon::BuildTypeExpr(join.Pos(), *nullLookupMembers, ctx)) .Build() - .Add(leftRowArg) .Build() .Build() .ElseValue() diff --git a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_vector_index.cpp b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_vector_index.cpp index d0fbfef64107..748198e630f9 100644 --- a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_vector_index.cpp +++ b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_vector_index.cpp @@ -193,8 +193,8 @@ TExprBase BuildVectorIndexPrefixRows(const TKikimrTableDescription& table, const auto postingType = ctx.MakeType(postingItems); TVector joinItemItems; - joinItemItems.push_back(ctx.MakeType(prefixType)); joinItemItems.push_back(postingType); + joinItemItems.push_back(ctx.MakeType(prefixType)); auto joinItemType = ctx.MakeType(joinItemItems); auto joinInputType = ctx.MakeType(joinItemType); @@ -223,16 +223,16 @@ TExprBase BuildVectorIndexPrefixRows(const TKikimrTableDescription& table, const .Input(rowsArg) .Lambda() .Args({rowArg}) - // Join StreamLookup takes tuples as input - build them + // Join StreamLookup takes tuples as input - build them .Body() + .Add() + .Add(MakeColumnGetters(rowArg, postingColumns, pos, ctx)) + .Build() .Add() .Input() .Add(MakeColumnGetters(rowArg, prefixColumns, pos, ctx)) .Build() .Build() - .Add() - .Add(MakeColumnGetters(rowArg, postingColumns, pos, ctx)) - .Build() .Build() .Build() .Build() diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp index 5689bb2b8097..a9ecb0c74fe4 100644 --- a/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp +++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp @@ -668,12 +668,12 @@ NYql::NNodes::TExprBase KqpBuildStreamLookupTableStages(NYql::NNodes::TExprBase } NYql::NNodes::TExprBase KqpBuildStreamIdxLookupJoinStagesKeepSorted(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx, - TTypeAnnotationContext& typeCtx, bool ruleEnabled) + TTypeAnnotationContext& typeCtx, bool ruleEnabled) { if (!ruleEnabled) { return node; } - + if (!node.Maybe()) { return node; } @@ -711,9 +711,9 @@ NYql::NNodes::TExprBase KqpBuildStreamIdxLookupJoinStagesKeepSorted(NYql::NNodes TExprNodeList args; args.push_back(arg.Ptr()); - auto rightStruct = tupleType.Arg(1).Cast(); + auto leftStruct = tupleType.Arg(0).Cast(); - for (auto structContent : rightStruct ) { + for (auto structContent : leftStruct) { auto attrName = structContent.Ptr()->Child(0); auto field = Build(ctx, node.Pos()) .Name(attrName) @@ -859,10 +859,10 @@ NYql::NNodes::TExprBase KqpBuildStreamIdxLookupJoinStagesKeepSortedFSM( TExprNodeList args; args.push_back(arg.Ptr()); - auto rightStruct = tupleType.Arg(1).Cast(); + auto leftStruct = tupleType.Arg(0).Cast(); THashSet passthroughColumns; - for (const auto& structContent : rightStruct ) { + for (const auto& structContent : leftStruct) { auto attrName = structContent.Ptr()->Child(0); passthroughColumns.insert(TString(attrName->Content())); auto field = Build(ctx, node.Pos()) diff --git a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp index 692fb66088ab..76b7c3a63f43 100644 --- a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp +++ b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp @@ -1671,8 +1671,8 @@ class TKqpQueryCompiler : public IKqpQueryCompiler { const auto inputTupleType = inputItemType->Cast(); YQL_ENSURE(inputTupleType->GetSize() == 2 || inputTupleType->GetSize() == 3); - YQL_ENSURE(inputTupleType->GetItems()[0]->GetKind() == ETypeAnnotationKind::Optional); - const auto joinKeyType = inputTupleType->GetItems()[0]->Cast()->GetItemType(); + YQL_ENSURE(inputTupleType->GetItems()[1]->GetKind() == ETypeAnnotationKind::Optional); + const auto joinKeyType = inputTupleType->GetItems()[1]->Cast()->GetItemType(); YQL_ENSURE(joinKeyType->GetKind() == ETypeAnnotationKind::Struct); const auto& joinKeyColumns = joinKeyType->Cast()->GetItems(); for (const auto keyColumn : joinKeyColumns) { diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp index 857d58403425..4cb6d4f58cb0 100644 --- a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp +++ b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp @@ -518,7 +518,7 @@ class TKqpJoinRows : public TKqpStreamLookupWorker { }; void AddInputRow(NUdf::TUnboxedValue inputRow) final { - auto joinKey = inputRow.GetElement(0); + auto joinKey = inputRow.GetElement(1); std::vector joinKeyCells(Settings.LookupKeyColumns.size()); NMiniKQL::TStringProviderBackend backend; @@ -546,7 +546,7 @@ class TKqpJoinRows : public TKqpStreamLookupWorker { } } - TSizedUnboxedValue row{.Data=std::move(std::move(inputRow.GetElement(1))), .StorageBytes=0}; + TSizedUnboxedValue row{.Data=std::move(std::move(inputRow.GetElement(0))), .StorageBytes=0}; row.ComputeSize = NYql::NDq::TDqDataSerializer::EstimateSize(row.Data, GetLeftRowType()); ui64 joinKeyId = JoinKeySeqNo++; TOwnedCellVec cellVec(std::move(joinKeyCells));