Skip to content

Commit 7f6c3bc

Browse files
committed
Supported join with external ydb
1 parent 24faad9 commit 7f6c3bc

File tree

4 files changed

+216
-27
lines changed

4 files changed

+216
-27
lines changed

ydb/core/kqp/gateway/behaviour/streaming_query/optimization.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ bool ExploreStreamingQueryNode(TExprNode::TPtr node, TStreamingExploreCtx& res)
3939
const auto providerArg = node->ChildPtr(1);
4040
if (const auto maybeDataSource = TMaybeNode<TCoDataSource>(providerArg)) {
4141
const auto dataSourceCategory = maybeDataSource.Cast().Category().Value();
42-
if (IsIn({NYql::PqProviderName, NYql::S3ProviderName}, dataSourceCategory)) {
42+
if (IsIn({NYql::PqProviderName, NYql::S3ProviderName, NYql::GenericProviderName}, dataSourceCategory)) {
4343
res.StreamingReads += dataSourceCategory == NYql::PqProviderName;
4444
return true;
4545
}

ydb/core/kqp/ut/federated_query/datastreams/datastreams_ut.cpp

Lines changed: 183 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,12 @@
66
#include <ydb/core/kqp/proxy_service/kqp_script_executions.h>
77
#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
88
#include <ydb/core/kqp/ut/federated_query/common/common.h>
9-
10-
#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/draft/ydb_scripting.h>
11-
129
#include <ydb/library/testlib/pq_helpers/mock_pq_gateway.h>
1310
#include <ydb/library/testlib/s3_recipe_helper/s3_recipe_helper.h>
11+
#include <ydb/library/yql/providers/generic/connector/libcpp/error.h>
12+
#include <ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.h>
1413
#include <ydb/library/yql/providers/s3/actors/yql_s3_actors_factory_impl.h>
14+
#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/draft/ydb_scripting.h>
1515

1616
#include <fmt/format.h>
1717

@@ -22,6 +22,8 @@ using namespace NYdb::NQuery;
2222
using namespace NKikimr::NKqp::NFederatedQueryTest;
2323
using namespace fmt::literals;
2424
using namespace NTestUtils;
25+
using namespace NYql::NConnector::NTest;
26+
using namespace NYql::NConnector::NApi;
2527

2628
namespace {
2729

@@ -65,6 +67,16 @@ class TStreamingTestFixture : public NUnitTest::TBaseFixture {
6567
return mockPqGateway;
6668
}
6769

70+
std::shared_ptr<TConnectorClientMock> SetupMockConnectorClient() {
71+
UNIT_ASSERT_C(!ConnectorClient, "ConnectorClient is already initialized");
72+
EnsureNotInitialized("ConnectorClient");
73+
74+
auto mockConnectorClient = std::make_shared<TConnectorClientMock>();
75+
ConnectorClient = mockConnectorClient;
76+
77+
return mockConnectorClient;
78+
}
79+
6880
// Local kikimr test cluster
6981

7082
std::shared_ptr<TKikimrRunner> GetKikimrRunner() {
@@ -79,7 +91,7 @@ class TStreamingTestFixture : public NUnitTest::TBaseFixture {
7991
queryServiceConfig.SetEnableMatchRecognize(true);
8092
queryServiceConfig.SetProgressStatsPeriodMs(1000);
8193

82-
Kikimr = MakeKikimrRunner(true, nullptr, nullptr, AppConfig, NYql::NDq::CreateS3ActorsFactory(), {
94+
Kikimr = MakeKikimrRunner(true, ConnectorClient, nullptr, AppConfig, NYql::NDq::CreateS3ActorsFactory(), {
8395
.PqGateway = PqGateway,
8496
.CheckpointPeriod = CheckpointPeriod,
8597
});
@@ -608,6 +620,7 @@ class TStreamingTestFixture : public NUnitTest::TBaseFixture {
608620
private:
609621
std::optional<NKikimrConfig::TAppConfig> AppConfig;
610622
TIntrusivePtr<NYql::IPqGateway> PqGateway;
623+
NYql::NConnector::IClient::TPtr ConnectorClient;
611624
std::shared_ptr<TKikimrRunner> Kikimr;
612625

613626
std::shared_ptr<TDriver> InternalDriver;
@@ -1565,6 +1578,172 @@ Y_UNIT_TEST_SUITE(KqpStreamingQueriesDdl) {
15651578
WaitMockPqReadSession(pqGateway, inputTopicName)->AddDataReceivedEvent(sampleMessages);
15661579
ReadMockPqMessages(WaitMockPqWriteSession(pqGateway, outputTopicName), sampleResult);
15671580
}
1581+
1582+
Y_UNIT_TEST_F(StreamingQueryWithYdbJoin, TStreamingTestFixture) {
1583+
// Test that defaults are overridden for streaming queries
1584+
auto& setting = *SetupAppConfig().MutableKQPConfig()->AddSettings();
1585+
setting.SetName("HashJoinMode");
1586+
setting.SetValue("grace");
1587+
1588+
const auto connectorClient = SetupMockConnectorClient();
1589+
const auto pqGateway = SetupMockPqGateway();
1590+
1591+
constexpr char inputTopicName[] = "inputTopicName";
1592+
constexpr char outputTopicName[] = "outputTopicName";
1593+
CreateTopic(inputTopicName);
1594+
CreateTopic(outputTopicName);
1595+
1596+
constexpr char pqSourceName[] = "pqSourceName";
1597+
CreatePqSource(pqSourceName);
1598+
1599+
constexpr char ydbSourceName[] = "ydbSourceName";
1600+
ExecQuery(fmt::format(R"(
1601+
CREATE OBJECT secret_name (TYPE SECRET) WITH (value = "{token}");
1602+
CREATE EXTERNAL DATA SOURCE `{ydb_source}` WITH (
1603+
SOURCE_TYPE = "Ydb",
1604+
LOCATION = "{ydb_location}",
1605+
DATABASE_NAME = "{ydb_database_name}",
1606+
AUTH_METHOD = "TOKEN",
1607+
TOKEN_SECRET_NAME = "secret_name",
1608+
USE_TLS = "FALSE"
1609+
);)",
1610+
"ydb_source"_a = ydbSourceName,
1611+
"ydb_location"_a = YDB_ENDPOINT,
1612+
"ydb_database_name"_a = YDB_DATABASE,
1613+
"token"_a = BUILTIN_ACL_ROOT
1614+
));
1615+
1616+
constexpr char ydbTable[] = "lookup";
1617+
ExecExternalQuery(fmt::format(R"(
1618+
CREATE TABLE `{table}` (
1619+
fqdn String,
1620+
payload String,
1621+
PRIMARY KEY (fqdn)
1622+
))",
1623+
"table"_a = ydbTable
1624+
));
1625+
1626+
{ // Prepare connector mock
1627+
NYql::TGenericDataSourceInstance dataSourceInstance;
1628+
dataSourceInstance.set_kind(NYql::YDB);
1629+
dataSourceInstance.set_database(YDB_DATABASE);
1630+
dataSourceInstance.set_use_tls(false);
1631+
dataSourceInstance.set_protocol(NYql::NATIVE);
1632+
1633+
auto& endpoint = *dataSourceInstance.mutable_endpoint();
1634+
TIpPort port;
1635+
NHttp::CrackAddress(YDB_ENDPOINT, *endpoint.mutable_host(), port);
1636+
endpoint.set_port(port);
1637+
1638+
auto& iamToken = *dataSourceInstance.mutable_credentials()->mutable_token();
1639+
iamToken.set_type("IAM");
1640+
iamToken.set_value(BUILTIN_ACL_ROOT);
1641+
1642+
TTypeMappingSettings typeMappingSettings;
1643+
typeMappingSettings.set_date_time_format(STRING_FORMAT);
1644+
1645+
auto describeTableBuilder = connectorClient->ExpectDescribeTable();
1646+
describeTableBuilder
1647+
.Table(ydbTable)
1648+
.DataSourceInstance(dataSourceInstance)
1649+
.TypeMappingSettings(typeMappingSettings);
1650+
1651+
auto listSplitsBuilder = connectorClient->ExpectListSplits();
1652+
listSplitsBuilder.Select()
1653+
.DataSourceInstance(dataSourceInstance)
1654+
.Table(ydbTable);
1655+
1656+
const std::vector<std::string> fqdnColumn = {"host1.example.com", "host2.example.com", "host3.example.com"};
1657+
const std::vector<std::string> payloadColumn = {"P1", "P2", "P3"};
1658+
auto readSplitsBuilder = connectorClient->ExpectReadSplits();
1659+
readSplitsBuilder
1660+
.Filtering(TReadSplitsRequest::FILTERING_OPTIONAL)
1661+
.Split()
1662+
.Description("some binary description")
1663+
.Select()
1664+
.Table(ydbTable)
1665+
.DataSourceInstance(dataSourceInstance)
1666+
.What()
1667+
.Column("fqdn", Ydb::Type::STRING)
1668+
.Column("payload", Ydb::Type::STRING);
1669+
1670+
const auto builtResults = [&]() {
1671+
describeTableBuilder.Response()
1672+
.Column("fqdn", Ydb::Type::STRING)
1673+
.Column("payload", Ydb::Type::STRING);
1674+
1675+
listSplitsBuilder.Result()
1676+
.AddResponse(NYql::NConnector::NewSuccess())
1677+
.Description("some binary description")
1678+
.Select()
1679+
.DataSourceInstance(dataSourceInstance)
1680+
.What()
1681+
.Column("fqdn", Ydb::Type::STRING)
1682+
.Column("payload", Ydb::Type::STRING);
1683+
1684+
readSplitsBuilder.Result()
1685+
.AddResponse(
1686+
MakeRecordBatch(
1687+
MakeArray<arrow::BinaryBuilder>("fqdn", fqdnColumn, arrow::binary()),
1688+
MakeArray<arrow::BinaryBuilder>("payload", payloadColumn, arrow::binary())
1689+
),
1690+
NYql::NConnector::NewSuccess()
1691+
);
1692+
};
1693+
1694+
builtResults();
1695+
builtResults(); // Streaming queries compiled twice, also in test results requested twice due to retry
1696+
}
1697+
1698+
constexpr char queryName[] = "streamingQuery";
1699+
ExecQuery(fmt::format(R"(
1700+
CREATE STREAMING QUERY `{query_name}` AS
1701+
DO BEGIN
1702+
$ydb_lookup = SELECT * FROM `{ydb_source}`.`{ydb_table}`;
1703+
1704+
$pq_source = SELECT * FROM `{pq_source}`.`{input_topic}` WITH (
1705+
FORMAT = "json_each_row",
1706+
SCHEMA (
1707+
time Int32 NOT NULL,
1708+
event String,
1709+
host String
1710+
)
1711+
);
1712+
1713+
$joined = SELECT l.payload AS payload, p.* FROM $pq_source AS p
1714+
LEFT JOIN $ydb_lookup AS l
1715+
ON (l.fqdn = p.host);
1716+
1717+
INSERT INTO `{pq_source}`.`{output_topic}`
1718+
SELECT Unwrap(event || "-" || payload) FROM $joined
1719+
END DO;)",
1720+
"query_name"_a = queryName,
1721+
"pq_source"_a = pqSourceName,
1722+
"ydb_source"_a = ydbSourceName,
1723+
"ydb_table"_a = ydbTable,
1724+
"input_topic"_a = inputTopicName,
1725+
"output_topic"_a = outputTopicName
1726+
));
1727+
1728+
CheckScriptExecutionsCount(1, 1);
1729+
1730+
auto readSession = WaitMockPqReadSession(pqGateway, inputTopicName);
1731+
const std::vector<IMockPqReadSession::TMessage> sampleMessages = {
1732+
{0, R"({"time": 0, "event": "A", "host": "host1.example.com"})"},
1733+
{1, R"({"time": 1, "event": "B", "host": "host3.example.com"})"},
1734+
{2, R"({"time": 2, "event": "A", "host": "host1.example.com"})"},
1735+
};
1736+
readSession->AddDataReceivedEvent(sampleMessages);
1737+
1738+
auto writeSession = WaitMockPqWriteSession(pqGateway, outputTopicName);
1739+
const std::vector<TString> sampleResult = {"A-P1", "B-P3", "A-P1"};
1740+
ReadMockPqMessages(writeSession, sampleResult);
1741+
1742+
readSession->AddCloseSessionEvent(EStatus::UNAVAILABLE, {NIssue::TIssue("Test pq session failure")});
1743+
1744+
WaitMockPqReadSession(pqGateway, inputTopicName)->AddDataReceivedEvent(sampleMessages);
1745+
ReadMockPqMessages(WaitMockPqWriteSession(pqGateway, outputTopicName), sampleResult);
1746+
}
15681747
}
15691748

15701749
} // namespace NKikimr::NKqp

ydb/core/kqp/ut/federated_query/datastreams/ya.make

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ PEERDIR(
1616
ydb/core/kqp/ut/federated_query/common
1717
ydb/library/testlib/pq_helpers
1818
ydb/library/testlib/s3_recipe_helper
19+
ydb/library/yql/providers/generic/connector/libcpp
20+
ydb/library/yql/providers/generic/connector/libcpp/ut_helpers
1921
yql/essentials/sql/pg
2022
yql/essentials/parser/pg_wrapper
2123
)

ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.h

Lines changed: 30 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -373,12 +373,11 @@ namespace NYql::NConnector::NTest {
373373
DATA_SOURCE_INSTANCE_SUBBUILDER();
374374

375375
TDescribeTableResultBuilder<TBuilder> Response() {
376-
return TDescribeTableResultBuilder<TBuilder>(ResponseResult_, this);
376+
return TDescribeTableResultBuilder<TBuilder>(ResponseResults_.emplace_back(std::make_shared<NApi::TDescribeTableResponse>()), this);
377377
}
378378

379379
void FillWithDefaults() {
380380
Table(DEFAULT_TABLE);
381-
Response();
382381
}
383382

384383
TBuilder& TypeMappingSettings(const NApi::TTypeMappingSettings& proto) {
@@ -388,16 +387,19 @@ namespace NYql::NConnector::NTest {
388387

389388
private:
390389
void SetExpectation() {
391-
EXPECT_CALL(*Mock_, DescribeTableImpl(ProtobufRequestMatcher(*Result_)))
392-
.WillOnce(Return(
393-
TResult<NApi::TDescribeTableResponse>(
394-
{NYdbGrpc::TGrpcStatus(),
395-
*ResponseResult_})));
390+
if (ResponseResults_.empty()) {
391+
Response();
392+
}
393+
394+
auto& expectBuilder = EXPECT_CALL(*Mock_, DescribeTableImpl(ProtobufRequestMatcher(*Result_)));
395+
for (auto result : ResponseResults_) {
396+
expectBuilder.WillOnce(Return(TResult<NApi::TDescribeTableResponse>({NYdbGrpc::TGrpcStatus(), *result})));
397+
}
396398
}
397399

398400
private:
399401
TConnectorClientMock* Mock_ = nullptr;
400-
std::shared_ptr<NApi::TDescribeTableResponse> ResponseResult_ = std::make_shared<NApi::TDescribeTableResponse>();
402+
std::vector<std::shared_ptr<NApi::TDescribeTableResponse>> ResponseResults_;
401403
};
402404

403405
template <class TParent = void /* no parent by default */>
@@ -663,9 +665,7 @@ namespace NYql::NConnector::NTest {
663665
explicit TListSplitsExpectationBuilder(NApi::TListSplitsRequest* result = nullptr, TConnectorClientMock* mock = nullptr)
664666
: TProtoBuilder<int, NApi::TListSplitsRequest>(result)
665667
, Mock_(mock)
666-
667668
{
668-
FillWithDefaults();
669669
}
670670

671671
explicit TListSplitsExpectationBuilder(TConnectorClientMock* mock)
@@ -681,27 +681,29 @@ namespace NYql::NConnector::NTest {
681681
SETTER(MaxSplitCount, max_split_count);
682682

683683
TListSplitsResultBuilder<TBuilder> Result() {
684-
return TListSplitsResultBuilder<TBuilder>(ResponseResult_, this);
684+
return TListSplitsResultBuilder<TBuilder>(ResponseResults_.emplace_back(std::make_shared<TListSplitsStreamIteratorMock>()), this);
685685
}
686686

687687
auto& Status(const NYdbGrpc::TGrpcStatus& status) {
688688
ResponseStatus_ = status;
689689
return *this;
690690
}
691691

692-
void FillWithDefaults() {
693-
Result();
694-
}
695-
696692
private:
697693
void SetExpectation() {
698-
EXPECT_CALL(*Mock_, ListSplitsImpl(ProtobufRequestMatcher(*Result_)))
699-
.WillOnce(Return(TIteratorResult<IListSplitsStreamIterator>{ResponseStatus_, ResponseResult_}));
694+
if (ResponseResults_.empty()) {
695+
Result();
696+
}
697+
698+
auto& expectBuilder = EXPECT_CALL(*Mock_, ListSplitsImpl(ProtobufRequestMatcher(*Result_)));
699+
for (auto response : ResponseResults_) {
700+
expectBuilder.WillOnce(Return(TIteratorResult<IListSplitsStreamIterator>{ResponseStatus_, response}));
701+
}
700702
}
701703

702704
private:
703705
TConnectorClientMock* Mock_ = nullptr;
704-
TListSplitsStreamIteratorMock::TPtr ResponseResult_ = std::make_shared<TListSplitsStreamIteratorMock>();
706+
std::vector<TListSplitsStreamIteratorMock::TPtr> ResponseResults_;
705707
NYdbGrpc::TGrpcStatus ResponseStatus_ {};
706708
};
707709

@@ -757,7 +759,7 @@ namespace NYql::NConnector::NTest {
757759
SETTER(Filtering, filtering);
758760

759761
TReadSplitsResultBuilder<TBuilder> Result() {
760-
return TReadSplitsResultBuilder<TBuilder>(ResponseResult_, this);
762+
return TReadSplitsResultBuilder<TBuilder>(ResponseResults_.emplace_back(std::make_shared<TReadSplitsStreamIteratorMock>()), this);
761763
}
762764

763765
auto& Status(const NYdbGrpc::TGrpcStatus& status) {
@@ -771,13 +773,19 @@ namespace NYql::NConnector::NTest {
771773

772774
private:
773775
void SetExpectation() {
774-
EXPECT_CALL(*Mock_, ReadSplitsImpl(ProtobufRequestMatcher(*Result_)))
775-
.WillOnce(Return(TIteratorResult<IReadSplitsStreamIterator>{ResponseStatus_, ResponseResult_}));
776+
if (ResponseResults_.empty()) {
777+
Result();
778+
}
779+
780+
auto& expectBuilder = EXPECT_CALL(*Mock_, ReadSplitsImpl(ProtobufRequestMatcher(*Result_)));
781+
for (auto response : ResponseResults_) {
782+
expectBuilder.WillOnce(Return(TIteratorResult<IReadSplitsStreamIterator>{ResponseStatus_, response}));
783+
}
776784
}
777785

778786
private:
779787
TConnectorClientMock* Mock_ = nullptr;
780-
TReadSplitsStreamIteratorMock::TPtr ResponseResult_ = std::make_shared<TReadSplitsStreamIteratorMock>();
788+
std::vector<TReadSplitsStreamIteratorMock::TPtr> ResponseResults_;
781789
NYdbGrpc::TGrpcStatus ResponseStatus_ {};
782790
};
783791

0 commit comments

Comments
 (0)