Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions ydb/core/kqp/opt/kqp_query_plan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -571,8 +571,8 @@ class TxPlanSerializer {
} else if (inputItemType->GetKind() == ETypeAnnotationKind::Tuple) {
planNode.TypeName = "TableLookupJoin";
const auto inputTupleType = inputItemType->Cast<TTupleExprType>();
YQL_ENSURE(inputTupleType->GetItems()[0]->GetKind() == ETypeAnnotationKind::Optional);
const auto joinKeyType = inputTupleType->GetItems()[0]->Cast<TOptionalExprType>()->GetItemType();
YQL_ENSURE(inputTupleType->GetItems()[1]->GetKind() == ETypeAnnotationKind::Optional);
const auto joinKeyType = inputTupleType->GetItems()[1]->Cast<TOptionalExprType>()->GetItemType();
YQL_ENSURE(joinKeyType->GetKind() == ETypeAnnotationKind::Struct);
lookupKeyColumnsStruct = joinKeyType->Cast<TStructExprType>();
}
Expand Down
16 changes: 8 additions & 8 deletions ydb/core/kqp/opt/kqp_type_ann.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -522,21 +522,21 @@ TStatus AnnotateLookupTable(const TExprNode::TPtr& node, TExprContext& ctx, cons
return TStatus::Error;
}

if (!EnsureOptionalType(node->Pos(), *tupleType->GetItems()[0], ctx)) {
if (!EnsureOptionalType(node->Pos(), *tupleType->GetItems()[1], ctx)) {
return TStatus::Error;
}

auto joinKeyType = tupleType->GetItems()[0]->Cast<TOptionalExprType>()->GetItemType();
auto joinKeyType = tupleType->GetItems()[1]->Cast<TOptionalExprType>()->GetItemType();
if (!EnsureStructType(node->Pos(), *joinKeyType, ctx)) {
return TStatus::Error;
}

if (!EnsureStructType(node->Pos(), *tupleType->GetItems()[1], ctx)) {
if (!EnsureStructType(node->Pos(), *tupleType->GetItems()[0], ctx)) {
return TStatus::Error;
}

structType = joinKeyType->Cast<TStructExprType>();
auto leftRowType = tupleType->GetItems()[1]->Cast<TStructExprType>();
auto leftRowType = tupleType->GetItems()[0]->Cast<TStructExprType>();

TVector<const TTypeAnnotationNode*> outputTypes;
outputTypes.push_back(leftRowType);
Expand Down Expand Up @@ -1882,21 +1882,21 @@ TStatus AnnotateStreamLookupConnection(const TExprNode::TPtr& node, TExprContext
return TStatus::Error;
}

if (!EnsureOptionalType(node->Pos(), *inputTupleType->GetItems()[0], ctx)) {
if (!EnsureOptionalType(node->Pos(), *inputTupleType->GetItems()[1], ctx)) {
return TStatus::Error;
}

auto joinKeyType = inputTupleType->GetItems()[0]->Cast<TOptionalExprType>()->GetItemType();
auto joinKeyType = inputTupleType->GetItems()[1]->Cast<TOptionalExprType>()->GetItemType();
if (!EnsureStructType(node->Pos(), *joinKeyType, ctx)) {
return TStatus::Error;
}

if (!EnsureStructType(node->Pos(), *inputTupleType->GetItems()[1], ctx)) {
if (!EnsureStructType(node->Pos(), *inputTupleType->GetItems()[0], ctx)) {
return TStatus::Error;
}

const TStructExprType* joinKeys = joinKeyType->Cast<TStructExprType>();
const TStructExprType* leftRowType = inputTupleType->GetItems()[1]->Cast<TStructExprType>();
const TStructExprType* leftRowType = inputTupleType->GetItems()[0]->Cast<TStructExprType>();

for (const auto& inputKey : joinKeys->GetItems()) {
if (!table.second->GetKeyColumnIndex(TString(inputKey->GetName()))) {
Expand Down
31 changes: 1 addition & 30 deletions ydb/core/kqp/opt/logical/kqp_opt_log_indexes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -855,41 +855,12 @@ TExprBase KqpRewriteStreamLookupIndex(const TExprBase& node, TExprContext& ctx,
.Settings(streamLookupIndex.Settings())
.Done();

TMaybeNode<TExprBase> lookupKeys;
if (settings.Strategy == EStreamLookupStrategyType::LookupJoinRows || settings.Strategy == EStreamLookupStrategyType::LookupSemiJoinRows) {
// Result type of lookupIndexTable: list<tuple<left_row, optional<main_table_pk>, rowMeta>>,
// expected input type for main table stream join: list<tuple<optional<main_table_pk>, left_row, rowMeta>>,
// so we should transform list<tuple<left_row, optional<main_table_pk>>> to list<tuple<optional<main_table_pk>, left_row, rowMeta>>
lookupKeys = Build<TCoMap>(ctx, node.Pos())
.Input(lookupIndexTable)
.Lambda()
.Args({"tuple"})
.Body<TExprList>()
.Add<TCoNth>()
.Tuple("tuple")
.Index().Value("1").Build()
.Build()
.Add<TCoNth>()
.Tuple("tuple")
.Index().Value("0").Build()
.Build()
.Add<TCoNth>()
.Tuple("tuple")
.Index().Value("2").Build()
.Build()
.Build()
.Build()
.Done();
} else {
lookupKeys = lookupIndexTable;
}

// We should allow lookup by null keys here,
// because main table pk can contain nulls and we don't want to lose these rows
settings.AllowNullKeysPrefixSize = keyColumnsList.Size();
return Build<TKqlStreamLookupTable>(ctx, node.Pos())
.Table(streamLookupIndex.Table())
.LookupKeys(lookupKeys.Cast())
.LookupKeys(lookupIndexTable)
.Columns(streamLookupIndex.Columns())
.Settings(settings.BuildNode(ctx, node.Pos()))
.Done();
Expand Down
80 changes: 67 additions & 13 deletions ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,8 @@ TMaybeNode<TExprBase> KqpJoinToIndexLookupImpl(const TDqJoin& join, TExprContext
TSet<TString> deduplicateLeftColumns;
TVector<TExprBase> prefixFilters;

TVector<const TItemExprType*> nullLookupMembersItems;

for (auto& rightColumnName : rightTableDesc.Metadata->KeyColumnNames) {
TExprNode::TPtr member;

Expand Down Expand Up @@ -648,11 +650,22 @@ TMaybeNode<TExprBase> KqpJoinToIndexLookupImpl(const TDqJoin& join, TExprContext
.Add(member)
.Done());

auto rightType = rightTableDesc.GetColumnType(rightColumnName);
YQL_ENSURE(rightType);
if (rightType->GetKind() == ETypeAnnotationKind::Data) {
rightType = ctx.MakeType<TOptionalExprType>(rightType);
}

nullLookupMembersItems.emplace_back(
ctx.MakeType<TItemExprType>(rightColumnName, rightType));

if (leftColumn) {
skipNullColumns.emplace_back(ctx.NewAtom(join.Pos(), rightColumnName));
}
}

const TTypeAnnotationNode* nullLookupMembers = ctx.MakeType<TOptionalExprType>(ctx.MakeType<TStructExprType>(nullLookupMembersItems));

if (lookupMembers.size() <= fixedPrefix) {
return {};
}
Expand Down Expand Up @@ -754,30 +767,71 @@ TMaybeNode<TExprBase> KqpJoinToIndexLookupImpl(const TDqJoin& join, TExprContext
YQL_ENSURE(joinKeyPredicate.IsValid());

auto leftRowTuple = Build<TExprList>(ctx, join.Pos())
.Add(leftRowArg)
.Add<TCoOptionalIf>()
.Predicate(joinKeyPredicate.Cast())
.Value<TCoAsStruct>()
.Add(lookupMembers)
.Build()
.Build()
.Add(leftRowArg)
.Done();

auto leftInput = Build<TCoFlatMap>(ctx, join.Pos())
.Input(leftData)
.Lambda()
.Args({leftRowArg})
.Body<TCoMap>()
.Input(rightPrefixExpr.Cast())
.Lambda()
.Args({prefixRowArg})
.Body(leftRowTuple)
TMaybeNode<TExprBase> leftInput;
if (fixedPrefix > 0) {
/*
When `fixedPrefix > 0`, it indicates a point predicate on the right table (e.g., `SELECT ... FROM b WHERE PK = $param`).
However, the predicate extraction logic cannot guarantee that the `rightPrefixExpr` will produce a
non-empty list of point or range conditions. It might happen if the predicate cointains NULLs (e.g. PK = NULL is always false)
This guarantee is critical for a LEFT JOIN. The join's correctness depends on knowing
if there is at least one matching row on the right side to determine if the result should be extended or filled with NULLs.
Therefore, we must explicitly check if the generated list from `rightPrefixExpr` is empty or not.
*/
leftInput = Build<TCoFlatMap>(ctx, join.Pos())
.Input(leftData)
.Lambda()
.Args({leftRowArg})
.Body<TCoIf>()
.Predicate<TCoCmpEqual>()
.Left<TCoLength>().List(rightPrefixExpr.Cast()).Build()
.Right<TCoUint64>().Literal().Value("0").Build().Build()
.Build()
.ThenValue<TCoAsList>()
.Add<TExprList>()
.Add(leftRowArg)
.Add<TCoNothing>()
.OptionalType(NCommon::BuildTypeExpr(join.Pos(), *nullLookupMembers, ctx))
.Build()
.Build()
.Build()
.ElseValue<TCoMap>()
.Input(rightPrefixExpr.Cast())
.Lambda()
.Args({prefixRowArg})
.Body(leftRowTuple)
.Build()
.Build()
.Build()
.Build()
.Build()
.Done();
.Done();
} else {
leftInput = Build<TCoFlatMap>(ctx, join.Pos())
.Input(leftData)
.Lambda()
.Args({leftRowArg})
.Body<TCoMap>()
.Input(rightPrefixExpr.Cast())
.Lambda()
.Args({prefixRowArg})
.Body(leftRowTuple)
.Build()
.Build()
.Build()
.Done();
}

return BuildKqpStreamIndexLookupJoin(join, leftInput, indexName, *prefixLookup, *rightReadMatch, rightTableUnmatchedJoinKeys, ctx);
return BuildKqpStreamIndexLookupJoin(join, leftInput.Cast(), indexName, *prefixLookup, *rightReadMatch, rightTableUnmatchedJoinKeys, ctx);
}

auto leftDataDeduplicated = DeduplicateByMembers(leftData, filter, deduplicateLeftColumns, ctx, join.Pos());
Expand Down
10 changes: 5 additions & 5 deletions ydb/core/kqp/opt/physical/effects/kqp_opt_phy_vector_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,8 @@ TExprBase BuildVectorIndexPrefixRows(const TKikimrTableDescription& table, const
auto postingType = ctx.MakeType<TStructExprType>(postingItems);

TVector<const TTypeAnnotationNode*> joinItemItems;
joinItemItems.push_back(ctx.MakeType<TOptionalExprType>(prefixType));
joinItemItems.push_back(postingType);
joinItemItems.push_back(ctx.MakeType<TOptionalExprType>(prefixType));
auto joinItemType = ctx.MakeType<TTupleExprType>(joinItemItems);
auto joinInputType = ctx.MakeType<TListExprType>(joinItemType);

Expand Down Expand Up @@ -223,16 +223,16 @@ TExprBase BuildVectorIndexPrefixRows(const TKikimrTableDescription& table, const
.Input(rowsArg)
.Lambda()
.Args({rowArg})
// Join StreamLookup takes <key, left row> tuples as input - build them
// Join StreamLookup takes <left row, key> tuples as input - build them
.Body<TExprList>()
.Add<TCoAsStruct>()
.Add(MakeColumnGetters(rowArg, postingColumns, pos, ctx))
.Build()
.Add<TCoJust>()
.Input<TCoAsStruct>()
.Add(MakeColumnGetters(rowArg, prefixColumns, pos, ctx))
.Build()
.Build()
.Add<TCoAsStruct>()
.Add(MakeColumnGetters(rowArg, postingColumns, pos, ctx))
.Build()
.Build()
.Build()
.Build()
Expand Down
12 changes: 6 additions & 6 deletions ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -668,12 +668,12 @@ NYql::NNodes::TExprBase KqpBuildStreamLookupTableStages(NYql::NNodes::TExprBase
}

NYql::NNodes::TExprBase KqpBuildStreamIdxLookupJoinStagesKeepSorted(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx,
TTypeAnnotationContext& typeCtx, bool ruleEnabled)
TTypeAnnotationContext& typeCtx, bool ruleEnabled)
{
if (!ruleEnabled) {
return node;
}

if (!node.Maybe<TKqlIndexLookupJoin>()) {
return node;
}
Expand Down Expand Up @@ -711,9 +711,9 @@ NYql::NNodes::TExprBase KqpBuildStreamIdxLookupJoinStagesKeepSorted(NYql::NNodes
TExprNodeList args;
args.push_back(arg.Ptr());

auto rightStruct = tupleType.Arg(1).Cast<TCoStructType>();
auto leftStruct = tupleType.Arg(0).Cast<TCoStructType>();

for (auto structContent : rightStruct ) {
for (auto structContent : leftStruct) {
auto attrName = structContent.Ptr()->Child(0);
auto field = Build<TCoNameValueTuple>(ctx, node.Pos())
.Name(attrName)
Expand Down Expand Up @@ -859,10 +859,10 @@ NYql::NNodes::TExprBase KqpBuildStreamIdxLookupJoinStagesKeepSortedFSM(
TExprNodeList args;
args.push_back(arg.Ptr());

auto rightStruct = tupleType.Arg(1).Cast<TCoStructType>();
auto leftStruct = tupleType.Arg(0).Cast<TCoStructType>();

THashSet<TString> passthroughColumns;
for (const auto& structContent : rightStruct ) {
for (const auto& structContent : leftStruct) {
auto attrName = structContent.Ptr()->Child(0);
passthroughColumns.insert(TString(attrName->Content()));
auto field = Build<TCoNameValueTuple>(ctx, node.Pos())
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1671,8 +1671,8 @@ class TKqpQueryCompiler : public IKqpQueryCompiler {
const auto inputTupleType = inputItemType->Cast<TTupleExprType>();
YQL_ENSURE(inputTupleType->GetSize() == 2 || inputTupleType->GetSize() == 3);

YQL_ENSURE(inputTupleType->GetItems()[0]->GetKind() == ETypeAnnotationKind::Optional);
const auto joinKeyType = inputTupleType->GetItems()[0]->Cast<TOptionalExprType>()->GetItemType();
YQL_ENSURE(inputTupleType->GetItems()[1]->GetKind() == ETypeAnnotationKind::Optional);
const auto joinKeyType = inputTupleType->GetItems()[1]->Cast<TOptionalExprType>()->GetItemType();
YQL_ENSURE(joinKeyType->GetKind() == ETypeAnnotationKind::Struct);
const auto& joinKeyColumns = joinKeyType->Cast<TStructExprType>()->GetItems();
for (const auto keyColumn : joinKeyColumns) {
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
};

void AddInputRow(NUdf::TUnboxedValue inputRow) final {
auto joinKey = inputRow.GetElement(0);
auto joinKey = inputRow.GetElement(1);
std::vector<TCell> joinKeyCells(Settings.LookupKeyColumns.size());
NMiniKQL::TStringProviderBackend backend;

Expand Down Expand Up @@ -546,7 +546,7 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
}
}

TSizedUnboxedValue row{.Data=std::move(std::move(inputRow.GetElement(1))), .StorageBytes=0};
TSizedUnboxedValue row{.Data=std::move(std::move(inputRow.GetElement(0))), .StorageBytes=0};
row.ComputeSize = NYql::NDq::TDqDataSerializer::EstimateSize(row.Data, GetLeftRowType());
ui64 joinKeyId = JoinKeySeqNo++;
TOwnedCellVec cellVec(std::move(joinKeyCells));
Expand Down
4 changes: 0 additions & 4 deletions ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1169,10 +1169,6 @@ Y_UNIT_TEST_TWIN(LeftJoinNonPkJoinConditions, StreamLookupJoin) {
}

Y_UNIT_TEST_TWIN(LeftJoinPointPredicateAndJoinAfterThat, StreamLookupJoin) {
if (StreamLookupJoin) {
return;
}

auto tester = TTester{
.Query=R"(
DECLARE $idx_a AS List<String>;
Expand Down
Loading