Skip to content

Commit 0eb6597

Browse files
committed
Supported stream lookup join
1 parent 38df75e commit 0eb6597

File tree

23 files changed

+747
-331
lines changed

23 files changed

+747
-331
lines changed

ydb/core/kqp/common/kqp_tx.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -392,6 +392,7 @@ bool HasUncommittedChangesRead(THashSet<NKikimr::TTableId>& modifiedTables, cons
392392
case NKqpProto::TKqpPhyConnection::kResult:
393393
case NKqpProto::TKqpPhyConnection::kValue:
394394
case NKqpProto::TKqpPhyConnection::kMerge:
395+
case NKqpProto::TKqpPhyConnection::kDqSourceStreamLookup:
395396
case NKqpProto::TKqpPhyConnection::TYPE_NOT_SET:
396397
break;
397398
}

ydb/core/kqp/compile_service/kqp_compile_actor.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,10 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
9696
} else {
9797
Config->_ResultRowsLimit.Clear();
9898
}
99+
100+
if (UserRequestContext && UserRequestContext->IsStreamingQuery) {
101+
Config->HashJoinMode = NYql::NDq::EHashJoinMode::Map;
102+
}
99103
}
100104
PerStatementResult = perStatementResult && Config->EnablePerStatementQueryExecution;
101105

ydb/core/kqp/compute_actor/kqp_compute_actor.cpp

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,19 +6,20 @@
66
#include <ydb/core/base/appdata.h>
77
#include <ydb/core/kqp/runtime/kqp_compute.h>
88
#include <ydb/core/kqp/runtime/kqp_read_actor.h>
9-
#include <ydb/core/kqp/runtime/kqp_write_actor.h>
109
#include <ydb/core/kqp/runtime/kqp_read_table.h>
1110
#include <ydb/core/kqp/runtime/kqp_sequencer_factory.h>
1211
#include <ydb/core/kqp/runtime/kqp_stream_lookup_factory.h>
1312
#include <ydb/core/kqp/runtime/kqp_vector_actor.h>
14-
#include <ydb/library/yql/providers/generic/actors/yql_generic_provider_factories.h>
13+
#include <ydb/core/kqp/runtime/kqp_write_actor.h>
1514
#include <ydb/library/formats/arrow/protos/ssa.pb.h>
15+
#include <ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup_factory.h>
16+
#include <ydb/library/yql/dq/comp_nodes/dq_block_hash_join.h>
17+
#include <ydb/library/yql/dq/comp_nodes/dq_hash_combine.h>
1618
#include <ydb/library/yql/dq/proto/dq_tasks.pb.h>
17-
#include <ydb/library/yql/providers/solomon/actors/dq_solomon_read_actor.h>
19+
#include <ydb/library/yql/providers/generic/actors/yql_generic_provider_factories.h>
1820
#include <ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.h>
1921
#include <ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.h>
20-
#include <ydb/library/yql/dq/comp_nodes/dq_block_hash_join.h>
21-
#include <ydb/library/yql/dq/comp_nodes/dq_hash_combine.h>
22+
#include <ydb/library/yql/providers/solomon/actors/dq_solomon_read_actor.h>
2223

2324
namespace NKikimr {
2425
namespace NMiniKQL {
@@ -90,6 +91,7 @@ NYql::NDq::IDqAsyncIoFactory::TPtr CreateKqpAsyncIoFactory(
9091
RegisterKqpWriteActor(*factory, counters);
9192
RegisterSequencerActorFactory(*factory, counters);
9293
RegisterKqpVectorResolveActor(*factory, counters);
94+
NYql::NDq::RegisterDqInputTransformLookupActorFactory(*factory);
9395

9496
if (federatedQuerySetup) {
9597
auto s3HttpRetryPolicy = NYql::GetHTTPDefaultRetryPolicy(NYql::THttpRetryPolicyOptions{.RetriedCurlCodes = NYql::FqRetriedCurlCodes()});

ydb/core/kqp/compute_actor/ya.make

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,13 @@ PEERDIR(
2525
ydb/library/formats/arrow/protos
2626
ydb/library/formats/arrow/common
2727
ydb/library/yql/dq/actors/compute
28+
ydb/library/yql/dq/actors/input_transforms
29+
ydb/library/yql/dq/comp_nodes
2830
ydb/library/yql/providers/generic/actors
2931
ydb/library/yql/providers/pq/async_io
3032
ydb/library/yql/providers/s3/actors_factory
3133
ydb/library/yql/providers/solomon/actors
3234
yql/essentials/public/issue
33-
ydb/library/yql/dq/comp_nodes
3435
)
3536

3637
GENERATE_ENUM_SERIALIZATION(kqp_compute_state.h)

ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -517,6 +517,47 @@ void TKqpTasksGraph::BuildVectorResolveChannels(const TStageInfo& stageInfo, ui3
517517
inputStageInfo, outputIndex, enableSpilling, logFunc);
518518
}
519519

520+
void TKqpTasksGraph::BuildDqSourceStreamLookupChannels(const TStageInfo& stageInfo, ui32 inputIndex, const TStageInfo& inputStageInfo,
521+
ui32 outputIndex, const NKqpProto::TKqpPhyCnDqSourceStreamLookup& dqSourceStreamLookup, const TChannelLogFunc& logFunc) {
522+
YQL_ENSURE(stageInfo.Tasks.size() == 1);
523+
524+
auto* settings = GetMeta().Allocate<NDqProto::TDqInputTransformLookupSettings>();
525+
settings->SetLeftLabel(dqSourceStreamLookup.GetLeftLabel());
526+
settings->SetRightLabel(dqSourceStreamLookup.GetRightLabel());
527+
settings->SetJoinType(dqSourceStreamLookup.GetJoinType());
528+
settings->SetNarrowInputRowType(dqSourceStreamLookup.GetConnectionInputRowType());
529+
settings->SetNarrowOutputRowType(dqSourceStreamLookup.GetConnectionOutputRowType());
530+
settings->SetCacheLimit(dqSourceStreamLookup.GetCacheLimit());
531+
settings->SetCacheTtlSeconds(dqSourceStreamLookup.GetCacheTtlSeconds());
532+
settings->SetMaxDelayedRows(dqSourceStreamLookup.GetMaxDelayedRows());
533+
settings->SetIsMultiget(dqSourceStreamLookup.GetIsMultiGet());
534+
535+
const auto& leftJointKeys = dqSourceStreamLookup.GetLeftJoinKeyNames();
536+
settings->MutableLeftJoinKeyNames()->Assign(leftJointKeys.begin(), leftJointKeys.end());
537+
538+
const auto& rightJointKeys = dqSourceStreamLookup.GetRightJoinKeyNames();
539+
settings->MutableRightJoinKeyNames()->Assign(rightJointKeys.begin(), rightJointKeys.end());
540+
541+
auto& streamLookupSource = *settings->MutableRightSource();
542+
streamLookupSource.SetSerializedRowType(dqSourceStreamLookup.GetLookupRowType());
543+
const auto& compiledSource = dqSourceStreamLookup.GetLookupSource();
544+
streamLookupSource.SetProviderName(compiledSource.GetType());
545+
*streamLookupSource.MutableLookupSource() = compiledSource.GetSettings();
546+
547+
TTransform dqSourceStreamLookupTransform = {
548+
.Type = "StreamLookupInputTransform",
549+
.InputType = dqSourceStreamLookup.GetInputStageRowType(),
550+
.OutputType = dqSourceStreamLookup.GetOutputStageRowType(),
551+
};
552+
YQL_ENSURE(dqSourceStreamLookupTransform.Settings.PackFrom(*settings));
553+
554+
for (const auto taskId : stageInfo.Tasks) {
555+
GetTask(taskId).Inputs[inputIndex].Transform = dqSourceStreamLookupTransform;
556+
}
557+
558+
BuildUnionAllChannels(*this, stageInfo, inputIndex, inputStageInfo, outputIndex, /* enableSpilling */ false, logFunc);
559+
}
560+
520561
void TKqpTasksGraph::BuildKqpStageChannels(TStageInfo& stageInfo, ui64 txId, bool enableSpilling, bool enableShuffleElimination) {
521562
auto& stage = stageInfo.Meta.GetStage(stageInfo.Id);
522563

@@ -709,6 +750,12 @@ void TKqpTasksGraph::BuildKqpStageChannels(TStageInfo& stageInfo, ui64 txId, boo
709750
break;
710751
}
711752

753+
case NKqpProto::TKqpPhyConnection::kDqSourceStreamLookup: {
754+
BuildDqSourceStreamLookupChannels(stageInfo, inputIdx, inputStageInfo, outputIdx,
755+
input.GetDqSourceStreamLookup(), log);
756+
break;
757+
}
758+
712759
default:
713760
YQL_ENSURE(false, "Unexpected stage input type: " << (ui32)input.GetTypeCase());
714761
}
@@ -1369,6 +1416,8 @@ void TKqpTasksGraph::FillInputDesc(NYql::NDqProto::TTaskInput& inputDesc, const
13691416
}
13701417

13711418
transformProto->MutableSettings()->PackFrom(*input.Meta.VectorResolveSettings);
1419+
} else {
1420+
*transformProto->MutableSettings() = input.Transform->Settings;
13721421
}
13731422
}
13741423
}
@@ -1724,6 +1773,7 @@ bool TKqpTasksGraph::BuildComputeTasks(TStageInfo& stageInfo, const ui32 nodesCo
17241773
case NKqpProto::TKqpPhyConnection::kMap:
17251774
case NKqpProto::TKqpPhyConnection::kParallelUnionAll:
17261775
case NKqpProto::TKqpPhyConnection::kVectorResolve:
1776+
case NKqpProto::TKqpPhyConnection::kDqSourceStreamLookup:
17271777
break;
17281778
default:
17291779
YQL_ENSURE(false, "Unexpected connection type: " << (ui32)input.GetTypeCase() << Endl);

ydb/core/kqp/executer_actor/kqp_tasks_graph.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -422,6 +422,8 @@ class TKqpTasksGraph : public NYql::NDq::TDqTasksGraph<TGraphMeta, TStageInfoMet
422422
void BuildVectorResolveChannels(const TStageInfo& stageInfo, ui32 inputIndex,
423423
const TStageInfo& inputStageInfo, ui32 outputIndex,
424424
const NKqpProto::TKqpPhyCnVectorResolve& vectorResolve, bool enableSpilling, const NYql::NDq::TChannelLogFunc& logFunc);
425+
void BuildDqSourceStreamLookupChannels(const TStageInfo& stageInfo, ui32 inputIndex, const TStageInfo& inputStageInfo,
426+
ui32 outputIndex, const NKqpProto::TKqpPhyCnDqSourceStreamLookup& dqSourceStreamLookup, const NYql::NDq::TChannelLogFunc& logFunc);
425427

426428
void FillOutputDesc(NYql::NDqProto::TTaskOutput& outputDesc, const TTaskOutput& output, ui32 outputIdx,
427429
bool enableSpilling, const TStageInfo& stageInfo) const;

ydb/core/kqp/host/kqp_host.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1985,6 +1985,7 @@ class TKqpHost : public IKqpHost {
19851985
auto configProvider = CreateConfigProvider(*TypesCtx, gatewaysConfig, {}, allowSettings);
19861986
TypesCtx->AddDataSource(ConfigProviderName, configProvider);
19871987
TypesCtx->MatchRecognize = QueryServiceConfig.GetEnableMatchRecognize();
1988+
TypesCtx->StreamLookupJoin = true;
19881989

19891990
YQL_ENSURE(TypesCtx->Initialize(*ExprCtx));
19901991

ydb/core/kqp/opt/logical/kqp_opt_log.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ class TKqpLogicalOptTransformer : public TOptimizeTransformerBase {
4141
AddHandler(0, &TCoTake::Match, HNDL(RewriteTakeSortToTopSort));
4242
AddHandler(0, &TCoFlatMap::Match, HNDL(RewriteSqlInToEquiJoin));
4343
AddHandler(0, &TCoFlatMap::Match, HNDL(RewriteSqlInCompactToJoin));
44+
AddHandler(0, &TCoEquiJoin::Match, HNDL(RewriteStreamEquiJoinWithLookup));
4445
AddHandler(0, &TCoEquiJoin::Match, HNDL(OptimizeEquiJoinWithCosts));
4546
AddHandler(0, &TCoEquiJoin::Match, HNDL(RewriteEquiJoin));
4647
AddHandler(0, &TDqJoin::Match, HNDL(JoinToIndexLookup));
@@ -167,6 +168,12 @@ class TKqpLogicalOptTransformer : public TOptimizeTransformerBase {
167168
return output;
168169
}
169170

171+
TMaybeNode<TExprBase> RewriteStreamEquiJoinWithLookup(TExprBase node, TExprContext& ctx) {
172+
TExprBase output = DqRewriteStreamEquiJoinWithLookup(node, ctx, TypesCtx);
173+
DumpAppliedRule("KqpRewriteStreamEquiJoinWithLookup", node.Ptr(), output.Ptr(), ctx);
174+
return output;
175+
}
176+
170177
TMaybeNode<TExprBase> OptimizeEquiJoinWithCosts(TExprBase node, TExprContext& ctx) {
171178
auto maxDPhypDPTableSize = Config->MaxDPHypDPTableSize.Get().GetOrElse(TDqSettings::TDefault::MaxDPHypDPTableSize);
172179
auto optLevel = Config->CostBasedOptimizationLevel.Get().GetOrElse(Config->DefaultCostBasedOptimizationLevel);

ydb/core/kqp/opt/physical/kqp_opt_phy.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ class TKqpPhysicalOptTransformer : public TOptimizeTransformerBase {
7171
AddHandler(0, &TCoExtendBase::Match, HNDL(BuildExtendStage));
7272
AddHandler(0, &TDqJoin::Match, HNDL(RewriteRightJoinToLeft));
7373
AddHandler(0, &TDqJoin::Match, HNDL(RewriteLeftPureJoin<false>));
74+
AddHandler(0, &TDqJoin::Match, HNDL(RewriteStreamLookupJoin));
7475
AddHandler(0, &TDqJoin::Match, HNDL(BuildJoin<false>));
7576
AddHandler(0, &TDqPrecompute::Match, HNDL(BuildPrecompute));
7677
AddHandler(0, &TCoLMap::Match, HNDL(PushLMapToStage<false>));
@@ -507,6 +508,14 @@ class TKqpPhysicalOptTransformer : public TOptimizeTransformerBase {
507508
return output;
508509
}
509510

511+
TMaybeNode<TExprBase> RewriteStreamLookupJoin(TExprBase node, TExprContext& ctx) {
512+
TMaybeNode<TExprBase> output = DqRewriteStreamLookupJoin(node, ctx);
513+
if (output) {
514+
DumpAppliedRule("RewriteStreamLookupJoin", node.Ptr(), output.Cast().Ptr(), ctx);
515+
}
516+
return output;
517+
}
518+
510519
template <bool IsGlobal>
511520
TMaybeNode<TExprBase> BuildJoin(TExprBase node, TExprContext& ctx,
512521
IOptimizationContext& optCtx, const TGetParents& getParents)

ydb/core/kqp/provider/yql_kikimr_datasource.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -666,7 +666,8 @@ class TKikimrDataSource : public TDataProviderBase {
666666
node.IsCallable(TDqReadWrap::CallableName()) ||
667667
node.IsCallable(TDqReadWideWrap::CallableName()) ||
668668
node.IsCallable(TDqReadBlockWideWrap::CallableName()) ||
669-
node.IsCallable(TDqSource::CallableName())
669+
node.IsCallable(TDqSource::CallableName()) ||
670+
node.IsCallable(TDqLookupSourceWrap::CallableName())
670671
)
671672
)
672673
{

0 commit comments

Comments
 (0)