From e26f2baec1b1194960dde280514c964ad8580faa Mon Sep 17 00:00:00 2001 From: Slusarenko Igor Date: Mon, 7 Apr 2025 21:03:19 +0300 Subject: [PATCH 1/8] Add ddl to create an iceberg data source (#16652) --- .../external_source_builder.cpp | 205 +++++++++++++++ .../external_source_builder.h | 118 +++++++++ .../external_source_builder_ut.cpp | 207 +++++++++++++++ .../external_source_factory.cpp | 51 +++- ydb/core/external_sources/iceberg_ddl_ut.cpp | 84 ++++++ ydb/core/external_sources/iceberg_fields.h | 35 +++ ydb/core/external_sources/ut/ya.make | 4 +- ydb/core/external_sources/ya.make | 1 + .../external_data_source/manager.cpp | 14 +- .../kqp/ut/federated_query/common/common.cpp | 5 +- .../kqp/ut/federated_query/common/common.h | 5 +- .../generic_ut/iceberg_ut_data.cpp | 247 ++++++++++++++++++ .../generic_ut/iceberg_ut_data.h | 66 +++++ .../generic_ut/kqp_generic_provider_ut.cpp | 171 +++++++++++- .../kqp/ut/federated_query/generic_ut/ya.make | 2 + .../db_id_async_resolver/database_type.cpp | 21 +- .../db_id_async_resolver/database_type.h | 3 +- .../actors/yql_generic_provider_factories.cpp | 3 +- .../provider/yql_generic_cluster_config.cpp | 42 +++ .../provider/yql_generic_dq_integration.cpp | 9 +- .../provider/yql_generic_load_meta.cpp | 57 ++++ .../kqprun/configuration/app_config.conf | 1 + 22 files changed, 1317 insertions(+), 34 deletions(-) create mode 100644 ydb/core/external_sources/external_source_builder.cpp create mode 100644 ydb/core/external_sources/external_source_builder.h create mode 100644 ydb/core/external_sources/external_source_builder_ut.cpp create mode 100644 ydb/core/external_sources/iceberg_ddl_ut.cpp create mode 100644 ydb/core/external_sources/iceberg_fields.h create mode 100644 ydb/core/kqp/ut/federated_query/generic_ut/iceberg_ut_data.cpp create mode 100644 ydb/core/kqp/ut/federated_query/generic_ut/iceberg_ut_data.h diff --git a/ydb/core/external_sources/external_source_builder.cpp b/ydb/core/external_sources/external_source_builder.cpp new file mode 100644 index 000000000000..e773f052cc58 --- /dev/null +++ b/ydb/core/external_sources/external_source_builder.cpp @@ -0,0 +1,205 @@ +#include "external_source_builder.h" +#include "validation_functions.h" + +#include +#include + +namespace NKikimr::NExternalSource { +namespace { + +class TValidatedExternalDataSource final : public IExternalSource { +public: + TValidatedExternalDataSource( + const TString& name, + const std::vector& authMethods, + const std::unordered_map& availableProperties, + const std::vector& hostnamePatterns) + : Name_(name) + , AuthMethodsForCheck_(authMethods) + , AvailableProperties_(availableProperties) + , HostnamePatterns_(hostnamePatterns) + { + + } + + virtual TString Pack(const NKikimrExternalSources::TSchema&, + const NKikimrExternalSources::TGeneral&) const override { + ythrow TExternalSourceException() << "Internal error. Only external table supports pack operation"; + } + + virtual TString GetName() const override { + return Name_; + } + + virtual bool HasExternalTable() const override { + return false; + } + + virtual TVector GetAuthMethods() const override { + TVector result; + + for (auto a : AuthMethodsForCheck_) { + result.push_back(a.Auth); + } + + return result; + } + + TVector GetAuthMethods(const TString& externalDataSourceDescription) const { + NKikimrSchemeOp::TExternalDataSourceDescription proto; + + if (!proto.ParseFromString(externalDataSourceDescription)) { + ythrow TExternalSourceException() + << "Internal error. " + << "Couldn't parse protobuf with external data source description"; + } + + TVector result; + + for (auto a : AuthMethodsForCheck_) { + if (a.UseCondition(proto.GetProperties().GetProperties())) { + result.push_back(a.Auth); + } + } + + return result; + } + + virtual TMap> GetParameters(const TString&) const override { + ythrow TExternalSourceException() << "Internal error. Only external table supports parameters"; + } + + virtual void ValidateExternalDataSource(const TString& externalDataSourceDescription) const override { + NKikimrSchemeOp::TExternalDataSourceDescription proto; + + if (!proto.ParseFromString(externalDataSourceDescription)) { + ythrow TExternalSourceException() + << "Internal error. " + << "Couldn't parse protobuf with external data source description"; + } + + auto properties = proto.GetProperties().GetProperties(); + std::unordered_set validatedProperties; + + for (const auto& [key, value] : properties) { + auto p = AvailableProperties_.find(key); + + if (AvailableProperties_.end() == p) { + throw TExternalSourceException() << "Unsupported property: " << key; + } + + // validate property value + if (p->second.ApplyCondition(properties)) { + p->second.Validator(key, value); + } + + validatedProperties.emplace(key); + } + + // validate properties that has been left + for (const auto& [property, validator] : AvailableProperties_) { + if (validatedProperties.contains(property)) { + continue; + } + + if (validator.ApplyCondition(properties)) { + validator.Validator(property, ""); + } + } + + ValidateHostname(HostnamePatterns_, proto.GetLocation()); + } + + virtual NThreading::TFuture> LoadDynamicMetadata(std::shared_ptr meta) override { + return NThreading::MakeFuture(std::move(meta)); + } + + virtual bool CanLoadDynamicMetadata() const override { + return false; + } + +private: + const TString Name_; + const std::vector AuthMethodsForCheck_; + const std::unordered_map AvailableProperties_; + const std::vector HostnamePatterns_; +}; + +} // unnamed + +TExternalSourceBuilder::TExternalSourceBuilder(const TString& name) + : Name_(name) +{ +} + +TExternalSourceBuilder& TExternalSourceBuilder::Auth(const TVector& authMethods, TCondition condition) { + for (auto a : authMethods) { + AuthMethodsForCheck_.push_back(TExternalSourceBuilder::TAuthHolder{a, condition}); + } + + return *this; +} + +TExternalSourceBuilder& TExternalSourceBuilder::Property(TString name, TValidator validator, TCondition condition) { + AvailableProperties_.emplace(name, TExternalSourceBuilder::TConditionalValidator{validator, condition}); + return *this; +} + +TExternalSourceBuilder& TExternalSourceBuilder::Properties(const TSet& availableProperties, TValidator validator, TCondition condition) { + for (auto p : availableProperties) { + Property(p, validator, condition); + } + + return *this; +} + +TExternalSourceBuilder& TExternalSourceBuilder::HostnamePatterns(const std::vector& patterns) { + HostnamePatterns_.insert( + HostnamePatterns_.end(), patterns.begin(), patterns.end()); + return *this; +} + +IExternalSource::TPtr TExternalSourceBuilder::Build() { + return MakeIntrusive( + std::move(Name_), std::move(AuthMethodsForCheck_), std::move(AvailableProperties_), std::move(HostnamePatterns_)); +} + +TCondition GetHasSettingCondition(const TString& property, const TString& value) { + return [property, value](const ::google::protobuf::Map& properties) -> bool { + auto it = properties.find(property); + return properties.end() != it && value == it->second; + }; +} + +TValidator GetRequiredValidator() { + return [](const TString& property, const TString& value){ + if (!value.empty()) { + return; + } + + throw TExternalSourceException() << "required property: " << property << " is not set"; + }; +} + +TValidator GetIsInListValidator(const std::unordered_set& values, bool required) { + auto joinedValues = JoinSeq(", ", values); + + return [values, required, joinedValues](const TString& property, const TString& value){ + if (value.empty() && required) { + throw TExternalSourceException() << " required property: " << property << " is not set"; + } + + if (value.empty()) { + return; + } + + if (!values.contains(value)) { + throw TExternalSourceException() + << " property: " << property + << " has wrong value: " << value + << " allowed values: " << joinedValues; + } + }; +} + +} // NKikimr::NExternalSource diff --git a/ydb/core/external_sources/external_source_builder.h b/ydb/core/external_sources/external_source_builder.h new file mode 100644 index 000000000000..2da604086e69 --- /dev/null +++ b/ydb/core/external_sources/external_source_builder.h @@ -0,0 +1,118 @@ +#pragma once + +#include "external_source.h" + +#include +#include + +namespace NKikimr::NExternalSource { + +typedef std::function TValidator; +typedef std::function&)> TCondition; + +/// +/// Builder to create an external data source with validations +/// +class TExternalSourceBuilder { +public: + struct TAuthHolder { + TString Auth; + + // When auth has to be used + TCondition UseCondition; + }; + + struct TConditionalValidator { + TValidator Validator; + + // When validator has to be applied + TCondition ApplyCondition; + }; + +public: + explicit TExternalSourceBuilder(const TString& name); + + ~TExternalSourceBuilder() = default; + + /// + /// Add auth methods which are returned from the "source" only if a condition is true. + /// A condition is applied to source's ddl in @sa IExternalSource::GetAuthMethods + /// call. + /// + TExternalSourceBuilder& Auth(const TVector& authMethods, TCondition condition); + + TExternalSourceBuilder& Auth(const TVector& authMethods) { + return Auth(authMethods, [](const ::google::protobuf::Map&){ + return true; + }); + } + + /// + /// Add property which can be in a "source". + /// + /// @param name name of a property + /// @param validator validator which is applied to a property from a source's ddl + /// in @sa IExternalSource::ValidateExternalDataSource call + /// @param condition condition that defines to use validator or not, if condition returns true + /// for source's ddl then validator is applied; otherwise, validator is skiped; + /// condition is executed in @sa IExternalSource::ValidateExternalDataSource call + /// before validator + /// + TExternalSourceBuilder& Property(const TString name, TValidator validator, TCondition condition); + + TExternalSourceBuilder& Properties(const TSet& properties, TValidator validator, TCondition condition); + + TExternalSourceBuilder& HostnamePatterns(const std::vector& patterns); + + /// + /// Create external data source + /// + IExternalSource::TPtr Build(); + + TExternalSourceBuilder& Property(const TString name, TValidator validator) { + return Property(name, validator, [](const ::google::protobuf::Map&){ + return true; + }); + } + + TExternalSourceBuilder& Property(const TString name) { + return Property(name, [](const TString&, const TString&){}); + } + + TExternalSourceBuilder& Properties(const TSet& properties, TValidator validator) { + return Properties(properties, validator, [](const ::google::protobuf::Map&){ + return true; + }); + } + + TExternalSourceBuilder& Properties(const TSet& properties) { + return Properties(properties, [](const TString&, const TString&){}); + } + + private: + TString Name_; + std::vector AuthMethodsForCheck_; + std::unordered_map AvailableProperties_; + std::vector HostnamePatterns_; +}; + +/// +/// Create a condition that returns "true" if a source's ddl has +/// property "p" with value equals to "v" +/// +TCondition GetHasSettingCondition(const TString& p, const TString& v); + +/// +/// Create a validator which check that source's ddl has a property with non empty value +/// +TValidator GetRequiredValidator(); + +/// +/// Create a validator which check that source's ddl has a property with a value from list +/// +/// @param values list of allowed values +/// @param required allow property without value +/// +TValidator GetIsInListValidator(const std::unordered_set& values, bool required); + +} // NKikimr::NExternalSource diff --git a/ydb/core/external_sources/external_source_builder_ut.cpp b/ydb/core/external_sources/external_source_builder_ut.cpp new file mode 100644 index 000000000000..0242e6f8273e --- /dev/null +++ b/ydb/core/external_sources/external_source_builder_ut.cpp @@ -0,0 +1,207 @@ +#include "external_source_builder.h" + +#include +#include +#include +#include + +namespace NKikimr { + +namespace { + +class TTestFixture : public NUnitTest::TBaseFixture { +public: + TTestFixture() + : Builder("Test") + , Props(*Proto.MutableProperties()->MutableProperties()) + { + } + +public: + void SetUp(NUnitTest::TTestContext& context) override { + NUnitTest::TBaseFixture::SetUp(context); + } + +protected: + NExternalSource::TExternalSourceBuilder Builder; + NKikimrSchemeOp::TExternalDataSourceDescription Proto; + ::google::protobuf::Map& Props; +}; + +} + +Y_UNIT_TEST_SUITE(ExternalSourceBuilderTest) { + + Y_UNIT_TEST_F(ValidateName, TTestFixture) { + auto source = Builder.Build(); + UNIT_ASSERT_VALUES_EQUAL(source->GetName(), "Test"); + } + + // Test returned auth methods when conditions are not set + Y_UNIT_TEST_F(ValidateAuthWithoutCondition, TTestFixture) { + auto source = Builder + .Auth({"auth1", "auth2"}) + .Build(); + + const auto authMethods = source->GetAuthMethods(); + + UNIT_ASSERT_VALUES_EQUAL(authMethods.size(), 2); + UNIT_ASSERT_VALUES_EQUAL(authMethods[0], "auth1"); + UNIT_ASSERT_VALUES_EQUAL(authMethods[1], "auth2"); + } + + // Test returned auth methods when conditions are set + Y_UNIT_TEST_F(ValidateAuthWithCondition, TTestFixture) { + auto source = Builder + .Auth( + {"auth1", "auth2"}, + // check that ddl has "property1" equals to "value" + NExternalSource::GetHasSettingCondition("property1", "value") + ) + .Auth( + {"auth3", "auth4"}, + // check that ddl has "property2" equals to "value" + NExternalSource::GetHasSettingCondition("property2", "value") + ) + .Build(); + + // ddl without any value + auto authMethods = source->GetAuthMethods(); + + UNIT_ASSERT_VALUES_EQUAL(authMethods.size(), 4); + UNIT_ASSERT_VALUES_EQUAL(authMethods[0], "auth1"); + UNIT_ASSERT_VALUES_EQUAL(authMethods[1], "auth2"); + UNIT_ASSERT_VALUES_EQUAL(authMethods[2], "auth3"); + UNIT_ASSERT_VALUES_EQUAL(authMethods[3], "auth4"); + } + + // Test validation when ddl has property which is not supported by source + // i.e. source does not contain this property in a list of available properties + Y_UNIT_TEST_F(ValidateUnsupportedField, TTestFixture) { + // source has property "field" + auto source = Builder + .Property("field") + .Build(); + + // ddl with "field1" + Props["field1"] = "value"; + + UNIT_ASSERT_EXCEPTION_CONTAINS( + source->ValidateExternalDataSource(Proto.SerializeAsString()), + NExternalSource::TExternalSourceException, + "Unsupported property: field1" + ); + + // ddl with "field" + Props.clear(); + Props["field"] = "value"; + UNIT_ASSERT_NO_EXCEPTION(source->ValidateExternalDataSource(Proto.SerializeAsString())); + } + + // Test validation for non required property + Y_UNIT_TEST_F(ValidateNonRequiredField, TTestFixture) { + auto source = Builder + .Property("field") + .Build(); + + // ddl without "field" + UNIT_ASSERT_NO_EXCEPTION(source->ValidateExternalDataSource(Proto.SerializeAsString())); + + // ddl with "field" + Props["field"] = "value"; + UNIT_ASSERT_NO_EXCEPTION(source->ValidateExternalDataSource(Proto.SerializeAsString())); + } + + // Test validation for required property + Y_UNIT_TEST_F(ValidateRequiredField, TTestFixture) { + auto source = Builder + .Property("field", NExternalSource::GetRequiredValidator()) + .Build(); + + // ddl without "field" + UNIT_ASSERT_EXCEPTION_CONTAINS( + source->ValidateExternalDataSource(Proto.SerializeAsString()), + NExternalSource::TExternalSourceException, + "required property: field is not set" + ); + + // ddl with "field" + Props["field"] = "value"; + UNIT_ASSERT_NO_EXCEPTION(source->ValidateExternalDataSource(Proto.SerializeAsString())); + } + + // Test validation for non required property with allowed list of values + Y_UNIT_TEST_F(ValidateNonRequiredFieldValues, TTestFixture) { + auto source = Builder + .Property("field", NExternalSource::GetIsInListValidator({"v1", "v2", "v3"}, false)) + .Build(); + + // ddl without "field" + UNIT_ASSERT_NO_EXCEPTION(source->ValidateExternalDataSource("")); + } + + // Test validation for required property with allowed list of values + Y_UNIT_TEST_F(ValidateRequiredFieldValues, TTestFixture) { + auto source = Builder + .Property("field", NExternalSource::GetIsInListValidator({"v1", "v2", "v3"}, true)) + .Build(); + + // ddl without "field" + UNIT_ASSERT_EXCEPTION_CONTAINS( + source->ValidateExternalDataSource(Proto.SerializeAsString()), + NExternalSource::TExternalSourceException, + "required property: field is not set" + ); + + // ddl with "field" equals to value not in allowed list + Props["field"] = "value"; + UNIT_ASSERT_EXCEPTION_CONTAINS( + source->ValidateExternalDataSource(Proto.SerializeAsString()), + NExternalSource::TExternalSourceException, + "property: field has wrong value: value allowed values: v3, v2, v1" + ); + + // ddl with "field" equals to "v1" + Props["field"] = "v1"; + UNIT_ASSERT_NO_EXCEPTION(source->ValidateExternalDataSource(Proto.SerializeAsString())); + + // ddl with "field" equals to "v2" + Props["field"] = "v2"; + UNIT_ASSERT_NO_EXCEPTION(source->ValidateExternalDataSource(Proto.SerializeAsString())); + + // ddl with "field" equals to "v3" + Props["field"] = "v3"; + UNIT_ASSERT_NO_EXCEPTION(source->ValidateExternalDataSource(Proto.SerializeAsString())); + } + + // Test validation for required property with condition + Y_UNIT_TEST_F(ValidateRequiredFieldOnCondition, TTestFixture) { + auto source = Builder + .Property("field1") + .Property( + "field", + NExternalSource::GetRequiredValidator(), + // apply validator if ddl has "field1" equals to "v" + NExternalSource::GetHasSettingCondition("field1", "v") + ) + .Build(); + + // ddl without "field1" + UNIT_ASSERT_NO_EXCEPTION(source->ValidateExternalDataSource(Proto.SerializeAsString())); + + // ddl with "field1" but without "field" + Props["field1"] = "v"; + + UNIT_ASSERT_EXCEPTION_CONTAINS( + source->ValidateExternalDataSource(Proto.SerializeAsString()), + NExternalSource::TExternalSourceException, + "required property: field is not set" + ); + + // ddl with "field1" and "field" + Props["field"] = "q"; + UNIT_ASSERT_NO_EXCEPTION(source->ValidateExternalDataSource(Proto.SerializeAsString())); + } +} + +} // NKikimr diff --git a/ydb/core/external_sources/external_source_factory.cpp b/ydb/core/external_sources/external_source_factory.cpp index 854f8b84fc45..47bcdfb2d8a6 100644 --- a/ydb/core/external_sources/external_source_factory.cpp +++ b/ydb/core/external_sources/external_source_factory.cpp @@ -1,6 +1,8 @@ #include "external_source_factory.h" #include "object_storage.h" #include "external_data_source.h" +#include "iceberg_fields.h" +#include "external_source_builder.h" #include #include @@ -8,7 +10,6 @@ #include #include - namespace NKikimr::NExternalSource { namespace { @@ -39,6 +40,50 @@ struct TExternalSourceFactory : public IExternalSourceFactory { } + +IExternalSource::TPtr BuildIcebergSource(const std::vector& hostnamePatternsRegEx) { + using namespace NKikimr::NExternalSource::NIceberg; + + return TExternalSourceBuilder(TString{NYql::GenericProviderName}) + // Basic, Token and SA Auth are available only if warehouse type is set to s3 + .Auth( + {"BASIC", "TOKEN", "SERVICE_ACCOUNT"}, + GetHasSettingCondition(WAREHOUSE_TYPE, VALUE_S3) + ) + // DataBase is a required field + .Property(WAREHOUSE_DB, GetRequiredValidator()) + // Tls is an optional field + .Property(WAREHOUSE_TLS) + // Warehouse type is a required field and can be equal only to "s3" + .Property( + WAREHOUSE_TYPE, + GetIsInListValidator({VALUE_S3}, true) + ) + // If a warehouse type is equal to "s3", fields "s3_endpoint", "s3_region" and "s3_uri" are required + .Properties( + { + WAREHOUSE_S3_ENDPOINT, + WAREHOUSE_S3_REGION, + WAREHOUSE_S3_URI + }, + GetRequiredValidator(), + GetHasSettingCondition(WAREHOUSE_TYPE, VALUE_S3) + ) + // Catalog type is a required field and can be equal only to "hive" or "hadoop" + .Property( + CATALOG_TYPE, + GetIsInListValidator({VALUE_HIVE, VALUE_HADOOP}, true) + ) + // If catalog type is equal to "hive" the field "hive_uri" is required + .Property( + CATALOG_HIVE_URI, + GetRequiredValidator(), + GetHasSettingCondition(CATALOG_TYPE,VALUE_HIVE) + ) + .HostnamePatterns(hostnamePatternsRegEx) + .Build(); +} + IExternalSourceFactory::TPtr CreateExternalSourceFactory(const std::vector& hostnamePatterns, NActors::TActorSystem* actorSystem, size_t pathsLimit, @@ -87,6 +132,10 @@ IExternalSourceFactory::TPtr CreateExternalSourceFactory(const std::vector +#include +#include +#include +#include + +namespace NKikimr { + +namespace { + +class TTestFixture : public NUnitTest::TBaseFixture { +public: + TTestFixture() + : Props(*Proto.MutableProperties()->MutableProperties()) + { + using namespace NKikimr::NExternalSource::NIceberg; + + auto type = ToString(NYql::EDatabaseType::Iceberg); + auto factory = NExternalSource::CreateExternalSourceFactory( + {}, nullptr, 50000, nullptr, false, false, {type}); + + Source = factory->GetOrCreate(type); + + Props[WAREHOUSE_TYPE] = VALUE_S3; + Props[WAREHOUSE_DB] = "db"; + Props[WAREHOUSE_S3_REGION] = "region"; + Props[WAREHOUSE_S3_ENDPOINT] = "endpoint"; + Props[WAREHOUSE_S3_URI] = "uri"; + } + +public: + void SetUp(NUnitTest::TTestContext& context) override { + NUnitTest::TBaseFixture::SetUp(context); + } + +protected: + NExternalSource::IExternalSource::TPtr Source; + NKikimrSchemeOp::TExternalDataSourceDescription Proto; + ::google::protobuf::Map& Props; +}; + +} // unnamed + +Y_UNIT_TEST_SUITE(IcebergDdlTest) { + + // Test ddl for an iceberg table in s3 storage with the hive catalog + Y_UNIT_TEST_F(HiveCatalogWithS3Test, TTestFixture) { + using namespace NKikimr::NExternalSource::NIceberg; + + Props[CATALOG_TYPE] = VALUE_HIVE; + Props[CATALOG_HIVE_URI] = "hive_uri"; + + UNIT_ASSERT_NO_EXCEPTION(Source->ValidateExternalDataSource(Proto.SerializeAsString())); + + auto authMethods = Source->GetAuthMethods(); + + UNIT_ASSERT_VALUES_EQUAL(authMethods.size(), 3); + UNIT_ASSERT_VALUES_EQUAL(authMethods[0], "BASIC"); + UNIT_ASSERT_VALUES_EQUAL(authMethods[1], "TOKEN"); + UNIT_ASSERT_VALUES_EQUAL(authMethods[2], "SERVICE_ACCOUNT"); + } + + // Test ddl for an iceberg table in s3 storage with the hadoop catalog + Y_UNIT_TEST_F(HadoopCatalogWithS3Test, TTestFixture) { + using namespace NKikimr::NExternalSource::NIceberg; + + Props[CATALOG_TYPE] = VALUE_HADOOP; + + UNIT_ASSERT_NO_EXCEPTION(Source->ValidateExternalDataSource(Proto.SerializeAsString())); + + auto authMethods = Source->GetAuthMethods(); + + UNIT_ASSERT_VALUES_EQUAL(authMethods.size(), 3); + UNIT_ASSERT_VALUES_EQUAL(authMethods[0], "BASIC"); + UNIT_ASSERT_VALUES_EQUAL(authMethods[1], "TOKEN"); + UNIT_ASSERT_VALUES_EQUAL(authMethods[2], "SERVICE_ACCOUNT"); + } + +} + +} // NKikimr diff --git a/ydb/core/external_sources/iceberg_fields.h b/ydb/core/external_sources/iceberg_fields.h new file mode 100644 index 000000000000..5a00f8e607f5 --- /dev/null +++ b/ydb/core/external_sources/iceberg_fields.h @@ -0,0 +1,35 @@ +#pragma once + +#include +#include + +namespace NKikimr::NExternalSource::NIceberg { + +// Fields that belongs to a warehouse +constexpr char WAREHOUSE_TYPE[] = "warehouse_type"; +constexpr char WAREHOUSE_S3_ENDPOINT[] = "warehouse_s3_endpoint"; +constexpr char WAREHOUSE_S3_URI[] = "warehouse_s3_uri"; +constexpr char WAREHOUSE_S3_REGION[] = "warehouse_s3_region"; +constexpr char WAREHOUSE_TLS[] = "use_tls"; +constexpr char WAREHOUSE_DB[] = "database_name"; + +// Fields that belongs to a catalog +constexpr char CATALOG_TYPE[] = "catalog_type"; +constexpr char CATALOG_HIVE_URI[] = "catalog_hive_uri"; + +// Some values +constexpr char VALUE_S3[] = "s3"; +constexpr char VALUE_HIVE[] = "hive"; +constexpr char VALUE_HADOOP[] = "hadoop"; + +// List of fields which is pass to a connector +constexpr std::array FieldsToConnector = { + WAREHOUSE_TYPE, + WAREHOUSE_S3_ENDPOINT, + WAREHOUSE_S3_REGION, + WAREHOUSE_S3_URI, + CATALOG_TYPE, + CATALOG_HIVE_URI +}; + +} // NKikimr::NExternalSource::NIceberg diff --git a/ydb/core/external_sources/ut/ya.make b/ydb/core/external_sources/ut/ya.make index 094cc49c12ca..047399cf823a 100644 --- a/ydb/core/external_sources/ut/ya.make +++ b/ydb/core/external_sources/ut/ya.make @@ -6,8 +6,10 @@ PEERDIR( ) SRCS( - object_storage_ut.cpp external_data_source_ut.cpp + external_source_builder_ut.cpp + iceberg_ddl_ut.cpp + object_storage_ut.cpp ) END() diff --git a/ydb/core/external_sources/ya.make b/ydb/core/external_sources/ya.make index 2ca381af561a..089d5dfe4114 100644 --- a/ydb/core/external_sources/ya.make +++ b/ydb/core/external_sources/ya.make @@ -2,6 +2,7 @@ LIBRARY() SRCS( external_data_source.cpp + external_source_builder.cpp external_source_factory.cpp object_storage.cpp validation_functions.cpp diff --git a/ydb/core/kqp/gateway/behaviour/external_data_source/manager.cpp b/ydb/core/kqp/gateway/behaviour/external_data_source/manager.cpp index b9d76ce063e5..415df834fc21 100644 --- a/ydb/core/kqp/gateway/behaviour/external_data_source/manager.cpp +++ b/ydb/core/kqp/gateway/behaviour/external_data_source/manager.cpp @@ -10,6 +10,7 @@ #include #include +#include namespace NKikimr::NKqp { @@ -80,8 +81,17 @@ void FillCreateExternalDataSourceDesc(NKikimrSchemeOp::TExternalDataSourceDescri "folder_id" // logging }; - for (const auto& property: properties) { - if (auto value = settings.GetFeaturesExtractor().Extract(property)) { + auto& featuresExtractor = settings.GetFeaturesExtractor(); + + for (const auto& property : properties) { + if (const auto value = featuresExtractor.Extract(property)) { + externaDataSourceDesc.MutableProperties()->MutableProperties()->insert({property, *value}); + } + } + + // Iceberg properties for connector + for (const auto& property : NKikimr::NExternalSource::NIceberg::FieldsToConnector) { + if (const auto value = featuresExtractor.Extract(property)) { externaDataSourceDesc.MutableProperties()->MutableProperties()->insert({property, *value}); } } diff --git a/ydb/core/kqp/ut/federated_query/common/common.cpp b/ydb/core/kqp/ut/federated_query/common/common.cpp index 0fc2a2d58e06..e5125b4c125d 100644 --- a/ydb/core/kqp/ut/federated_query/common/common.cpp +++ b/ydb/core/kqp/ut/federated_query/common/common.cpp @@ -32,7 +32,8 @@ namespace NKikimr::NKqp::NFederatedQueryTest { NYql::IDatabaseAsyncResolver::TPtr databaseAsyncResolver, std::optional appConfig, std::shared_ptr s3ActorsFactory, - const TString& domainRoot) + const TString& domainRoot, + NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory) { NKikimrConfig::TFeatureFlags featureFlags; featureFlags.SetEnableExternalDataSources(true); @@ -58,7 +59,7 @@ namespace NKikimr::NKqp::NFederatedQueryTest { auto federatedQuerySetupFactory = std::make_shared( httpGateway, connectorClient, - nullptr, + credentialsFactory, databaseAsyncResolver, appConfig->GetQueryServiceConfig().GetS3(), appConfig->GetQueryServiceConfig().GetGeneric(), diff --git a/ydb/core/kqp/ut/federated_query/common/common.h b/ydb/core/kqp/ut/federated_query/common/common.h index 02d8783f6608..55ec6feb8a7f 100644 --- a/ydb/core/kqp/ut/federated_query/common/common.h +++ b/ydb/core/kqp/ut/federated_query/common/common.h @@ -20,5 +20,6 @@ namespace NKikimr::NKqp::NFederatedQueryTest { NYql::IDatabaseAsyncResolver::TPtr databaseAsyncResolver = nullptr, std::optional appConfig = std::nullopt, std::shared_ptr s3ActorsFactory = nullptr, - const TString& domainRoot = "Root"); -} + const TString& domainRoot = "Root", + NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory = nullptr); +} // namespace NKikimr::NKqp::NFederatedQueryTest diff --git a/ydb/core/kqp/ut/federated_query/generic_ut/iceberg_ut_data.cpp b/ydb/core/kqp/ut/federated_query/generic_ut/iceberg_ut_data.cpp new file mode 100644 index 000000000000..7c4f0e65b31a --- /dev/null +++ b/ydb/core/kqp/ut/federated_query/generic_ut/iceberg_ut_data.cpp @@ -0,0 +1,247 @@ +#include "iceberg_ut_data.h" + +#include +#include + +namespace NTestUtils { + +constexpr char VALUE_HIVE_URI[] = "hive_uri"; +constexpr char VALUE_S3_URI[] = "s3_uri"; +constexpr char VALUE_S3_ENDPOINT[] = "s3_endpoint"; +constexpr char VALUE_S3_REGION[] = "s3_region"; + +constexpr char VALUE_IAM[] = "IAM"; + +struct TTestData { + TTestData(TIcebergTestData* data) + : Credentials_(*Result_.mutable_credentials()) + , Options_(*Result_.mutable_iceberg_options()) + , Warehouse_(*Options_.mutable_warehouse()) + , Catalog_(*Options_.mutable_catalog()) + { + assert(data); + + switch (data->Auth_.Type) { + case TIcebergTestData::AuthBasic: { + auto& auth = *Credentials_.mutable_basic(); + auth.set_username(data->Auth_.Id); + auth.set_password(data->Auth_.Value); + break; + } + case TIcebergTestData::AuthSa: + case TIcebergTestData::AuthToken: { + auto& auth = *Credentials_.mutable_token(); + auth.set_type(data->Auth_.Id); + auth.set_value(data->Auth_.Value); + break; + } + } + + Result_.mutable_endpoint(); + Result_.set_kind(::NYql::EGenericDataSourceKind::ICEBERG); + Result_.set_database(data->Database_); + Result_.set_use_tls(data->UseTls_); + Result_.set_protocol(::NYql::EGenericProtocol::NATIVE); + + auto& s3 = *Warehouse_.mutable_s3(); + + s3.set_uri(VALUE_S3_URI); + s3.set_endpoint(VALUE_S3_ENDPOINT); + s3.set_region(VALUE_S3_REGION); + } + + NYql::TGenericDataSourceInstance Result_; + NYql::TGenericCredentials& Credentials_; + NYql::TIcebergDataSourceOptions& Options_; + NYql::TIcebergWarehouse& Warehouse_; + NYql::TIcebergCatalog& Catalog_; +}; + +TIcebergTestData::TIcebergTestData( + TAuth auth, + const TString& dataSourceName, + const TString& database, + bool useTls) + : Auth_(auth) + , DataSourceName_(dataSourceName) + , Database_(database) + , UseTls_(useTls) +{} + +NYql::TGenericDataSourceInstance TIcebergTestData::CreateDataSourceForHadoop() { + TTestData data(this); + data.Catalog_.mutable_hadoop(); + return data.Result_; +} + +NYql::TGenericDataSourceInstance TIcebergTestData::CreateDataSourceForHive() { + TTestData data(this); + auto& hive = *data.Catalog_.mutable_hive(); + hive.set_uri(VALUE_HIVE_URI); + return data.Result_; +} + +TString TIcebergTestData::CreateAuthSection() { + using namespace fmt::literals; + + switch (Auth_.Type) { + case TIcebergTestData::AuthBasic: + return fmt::format(R"( + AUTH_METHOD="BASIC", + LOGIN="{login}", + PASSWORD_SECRET_NAME="{data_source_name}_p" + )", + "data_source_name"_a = DataSourceName_, + "login"_a = Auth_.Id, + "password"_a = Auth_.Value + ); + case TIcebergTestData::AuthToken: + return fmt::format(R"( + AUTH_METHOD="TOKEN", + TOKEN_SECRET_NAME="{data_source_name}_p" + )", + "data_source_name"_a = DataSourceName_ + ); + case TIcebergTestData::AuthSa: + return fmt::format(R"( + AUTH_METHOD="SERVICE_ACCOUNT", + SERVICE_ACCOUNT_ID="my_sa", + SERVICE_ACCOUNT_SECRET_NAME="{data_source_name}_p" + )", + "data_source_name"_a = DataSourceName_ + ); + }; +} + +TString TIcebergTestData::CreateQuery(const TString& catalogSection) { + using namespace fmt::literals; + + return fmt::format( + R"( + CREATE OBJECT {data_source_name}_p (TYPE SECRET) WITH (value={secret}); + + CREATE EXTERNAL DATA SOURCE {data_source_name} WITH ( + SOURCE_TYPE="{source_type}", + DATABASE_NAME="{database}", + WAREHOUSE_TYPE="{s3}", + WAREHOUSE_S3_REGION="{s3_region}", + WAREHOUSE_S3_ENDPOINT="{s3_endpoint}", + WAREHOUSE_S3_URI="{s3_uri}", + {auth_section}, + {catalog_section}, + USE_TLS="{use_tls}" + ); + )", + "auth_section"_a = CreateAuthSection(), + "s3"_a = NKikimr::NExternalSource::NIceberg::VALUE_S3, + "s3_region"_a = VALUE_S3_REGION, + "s3_endpoint"_a = VALUE_S3_ENDPOINT, + "s3_uri"_a = VALUE_S3_URI, + "data_source_name"_a = DataSourceName_, + "catalog_section"_a = catalogSection, + "secret"_a = Auth_.Value, + "use_tls"_a = UseTls_ ? "TRUE" : "FALSE", + "source_type"_a = ToString(NYql::EDatabaseType::Iceberg), + "database"_a = Database_ + ); +} + +void TIcebergTestData::ExecuteQuery(const std::shared_ptr& kikimr, + const TString& query) +{ + auto c = kikimr->GetTableClient(); + auto session = c.CreateSession().GetValueSync().GetSession(); + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); +} + + +void TIcebergTestData::ExecuteCreateHiveExternalDataSource(const std::shared_ptr& kikimr) { + using namespace fmt::literals; + + TString hiveCatalog = fmt::format(R"( + CATALOG_TYPE="{type}", + CATALOG_HIVE_URI="{uri}" + )", + "type"_a = NKikimr::NExternalSource::NIceberg::VALUE_HIVE, + "uri"_a = VALUE_HIVE_URI + ); + + ExecuteQuery(kikimr, CreateQuery(hiveCatalog)); +} + +void TIcebergTestData::ExecuteCreateHadoopExternalDataSource(const std::shared_ptr& kikimr) { + using namespace fmt::literals; + + TString hadoopCatalog = fmt::format(R"( + CATALOG_TYPE="{type}" + )", + "type"_a = NKikimr::NExternalSource::NIceberg::VALUE_HADOOP + ); + + ExecuteQuery(kikimr, CreateQuery(hadoopCatalog)); +} + +class TStaticCredentialsProvider : public NYdb::ICredentialsProvider { +public: + TStaticCredentialsProvider(const TString& yqlToken) + : YqlToken_(yqlToken) + {} + + std::string GetAuthInfo() const override { + return YqlToken_; + } + + bool IsValid() const override { + return true; + } + +private: + std::string YqlToken_; +}; + +class TStaticCredentialsProviderFactory : public NYdb::ICredentialsProviderFactory { +public: + TStaticCredentialsProviderFactory(const TString& yqlToken) + : YqlToken_(yqlToken) + {} + + std::shared_ptr CreateProvider() const override { + return std::make_shared(YqlToken_); + } + +private: + TString YqlToken_; +}; + +class TStaticSecuredCredentialsFactory : public NYql::ISecuredServiceAccountCredentialsFactory { +public: + TStaticSecuredCredentialsFactory(const TString& yqlToken) + : YqlToken_(yqlToken) + {} + + std::shared_ptr Create(const TString&, const TString&) override { + return std::make_shared(YqlToken_); + } + +private: + TString YqlToken_; +}; + +TIcebergTestData CreateIcebergBasic(const TString& dataSourceName, const TString& database, const TString& userName, const TString& password){ + return TIcebergTestData({TIcebergTestData::EAuthType::AuthBasic, userName, password}, dataSourceName, database, false); +} + +TIcebergTestData CreateIcebergToken(const TString& dataSourceName, const TString& database, const TString& token) { + return TIcebergTestData({TIcebergTestData::EAuthType::AuthToken, VALUE_IAM , token}, dataSourceName, database, false); +} + +TIcebergTestData CreateIcebergSa(const TString& dataSourceName, const TString& database, const TString& token) { + return TIcebergTestData({TIcebergTestData::EAuthType::AuthSa,VALUE_IAM, token}, dataSourceName, database, false); +} + +std::shared_ptr CreateCredentialProvider(const TString& token) { + return std::make_shared(token); +} + +} // NTestUtils diff --git a/ydb/core/kqp/ut/federated_query/generic_ut/iceberg_ut_data.h b/ydb/core/kqp/ut/federated_query/generic_ut/iceberg_ut_data.h new file mode 100644 index 000000000000..ac7bfa74e612 --- /dev/null +++ b/ydb/core/kqp/ut/federated_query/generic_ut/iceberg_ut_data.h @@ -0,0 +1,66 @@ +#pragma once + +#include +#include +#include + +namespace NTestUtils { + +struct TTestData; +class TIcebergTestData final { + friend struct TTestData; + +public: + enum EAuthType : int { + AuthBasic = 1, + AuthSa = 2, + AuthToken = 3 + }; + + struct TAuth { + EAuthType Type; + TString Id; + TString Value; + }; + +public: + TIcebergTestData(TAuth auth, const TString& dataSourceName, const TString& database, bool UseTls); + + NYql::TGenericDataSourceInstance CreateDataSourceForHadoop(); + + NYql::TGenericDataSourceInstance CreateDataSourceForHive(); + + void ExecuteCreateHiveExternalDataSource(const std::shared_ptr& kikimr); + + void ExecuteCreateHadoopExternalDataSource(const std::shared_ptr& kikimr); + +private: + TString CreateAuthSection(); + + TString CreateQuery(const TString& catalogSection); + + void ExecuteQuery(const std::shared_ptr& kikimr, const TString& query); + +private: + const TAuth Auth_; + const TString DataSourceName_; + const TString Database_; + const bool UseTls_; +}; + +TIcebergTestData CreateIcebergBasic(const TString& dataSourceName = NYql::NConnector::NTest::DEFAULT_DATA_SOURCE_NAME, + const TString& database = NYql::NConnector::NTest::DEFAULT_DATABASE, + const TString& userName = NYql::NConnector::NTest::DEFAULT_LOGIN, + const TString& password = NYql::NConnector::NTest::DEFAULT_PASSWORD); + +TIcebergTestData CreateIcebergToken(const TString& dataSourceName = NYql::NConnector::NTest::DEFAULT_DATA_SOURCE_NAME, + const TString& database = NYql::NConnector::NTest::DEFAULT_DATABASE, + const TString& token = NYql::NConnector::NTest::DEFAULT_PASSWORD); + +TIcebergTestData CreateIcebergSa(const TString& dataSourceName = NYql::NConnector::NTest::DEFAULT_DATA_SOURCE_NAME, + const TString& database = NYql::NConnector::NTest::DEFAULT_DATABASE, + const TString& token = NYql::NConnector::NTest::DEFAULT_PASSWORD); + +std::shared_ptr CreateCredentialProvider(const TString& token = NYql::NConnector::NTest::DEFAULT_PASSWORD); + +} // NTestUtils diff --git a/ydb/core/kqp/ut/federated_query/generic_ut/kqp_generic_provider_ut.cpp b/ydb/core/kqp/ut/federated_query/generic_ut/kqp_generic_provider_ut.cpp index 8b27afb3660a..62521042eec0 100644 --- a/ydb/core/kqp/ut/federated_query/generic_ut/kqp_generic_provider_ut.cpp +++ b/ydb/core/kqp/ut/federated_query/generic_ut/kqp_generic_provider_ut.cpp @@ -1,3 +1,5 @@ +#include "iceberg_ut_data.h" + #include #include #include @@ -33,6 +35,12 @@ namespace NKikimr::NKqp { PostgreSQL, ClickHouse, Ydb, + IcebergHiveBasic, + IcebergHiveSa, + IcebergHiveToken, + IcebergHadoopBasic, + IcebergHadoopSa, + IcebergHadoopToken, }; NYql::TGenericDataSourceInstance MakeDataSourceInstance(EProviderType providerType) { @@ -43,6 +51,18 @@ namespace NKikimr::NKqp { return TConnectorClientMock::TClickHouseDataSourceInstanceBuilder<>().GetResult(); case EProviderType::Ydb: return TConnectorClientMock::TYdbDataSourceInstanceBuilder<>().GetResult(); + case EProviderType::IcebergHiveBasic: + return NTestUtils::CreateIcebergBasic().CreateDataSourceForHive(); + case EProviderType::IcebergHiveSa: + return NTestUtils::CreateIcebergSa().CreateDataSourceForHive(); + case EProviderType::IcebergHiveToken: + return NTestUtils::CreateIcebergToken().CreateDataSourceForHive(); + case EProviderType::IcebergHadoopBasic: + return NTestUtils::CreateIcebergBasic().CreateDataSourceForHadoop(); + case EProviderType::IcebergHadoopSa: + return NTestUtils::CreateIcebergSa().CreateDataSourceForHadoop(); + case EProviderType::IcebergHadoopToken: + return NTestUtils::CreateIcebergToken().CreateDataSourceForHadoop(); } } @@ -54,6 +74,24 @@ namespace NKikimr::NKqp { return CreateClickHouseExternalDataSource(kikimr); case EProviderType::Ydb: return CreateYdbExternalDataSource(kikimr); + case EProviderType::IcebergHiveBasic: + return NTestUtils::CreateIcebergBasic() + .ExecuteCreateHiveExternalDataSource(kikimr); + case EProviderType::IcebergHiveSa: + return NTestUtils::CreateIcebergSa() + .ExecuteCreateHiveExternalDataSource(kikimr); + case EProviderType::IcebergHiveToken: + return NTestUtils::CreateIcebergToken() + .ExecuteCreateHiveExternalDataSource(kikimr); + case EProviderType::IcebergHadoopBasic: + return NTestUtils::CreateIcebergBasic() + .ExecuteCreateHadoopExternalDataSource(kikimr); + case EProviderType::IcebergHadoopSa: + return NTestUtils::CreateIcebergSa() + .ExecuteCreateHadoopExternalDataSource(kikimr); + case EProviderType::IcebergHadoopToken: + return NTestUtils::CreateIcebergToken() + .ExecuteCreateHadoopExternalDataSource(kikimr); } } @@ -62,15 +100,21 @@ namespace NKikimr::NKqp { NYql::TAttr dateTimeFormat; dateTimeFormat.SetName("DateTimeFormat"); dateTimeFormat.SetValue("string"); - appConfig.MutableQueryServiceConfig()->MutableGeneric()->MutableConnector()->SetUseSsl(false); - appConfig.MutableQueryServiceConfig()->MutableGeneric()->MutableConnector()->MutableEndpoint()->set_host("localhost"); - appConfig.MutableQueryServiceConfig()->MutableGeneric()->MutableConnector()->MutableEndpoint()->set_port(1234); - appConfig.MutableQueryServiceConfig()->MutableGeneric()->MutableDefaultSettings()->Add(std::move(dateTimeFormat)); - appConfig.MutableQueryServiceConfig()->AddAvailableExternalDataSources("ObjectStorage"); - appConfig.MutableQueryServiceConfig()->AddAvailableExternalDataSources("ClickHouse"); - appConfig.MutableQueryServiceConfig()->AddAvailableExternalDataSources("PostgreSQL"); - appConfig.MutableQueryServiceConfig()->AddAvailableExternalDataSources("MySQL"); - appConfig.MutableQueryServiceConfig()->AddAvailableExternalDataSources("Ydb"); + + auto& config = *appConfig.MutableQueryServiceConfig(); + auto& connector = *config.MutableGeneric()->MutableConnector(); + + connector.SetUseSsl(false); + connector.MutableEndpoint()->set_host("localhost"); + connector.MutableEndpoint()->set_port(1234); + + config.MutableGeneric()->MutableDefaultSettings()->Add(std::move(dateTimeFormat)); + config.AddAvailableExternalDataSources("ObjectStorage"); + config.AddAvailableExternalDataSources("ClickHouse"); + config.AddAvailableExternalDataSources("PostgreSQL"); + config.AddAvailableExternalDataSources("MySQL"); + config.AddAvailableExternalDataSources("Ydb"); + config.AddAvailableExternalDataSources("Iceberg"); return appConfig; } @@ -102,7 +146,6 @@ namespace NKikimr::NKqp { auto clientMock = std::make_shared(); const NYql::TGenericDataSourceInstance dataSourceInstance = MakeDataSourceInstance(providerType); - // step 1: DescribeTable // clang-format off clientMock->ExpectDescribeTable() @@ -149,7 +192,8 @@ namespace NKikimr::NKqp { // run test auto appConfig = CreateDefaultAppConfig(); auto s3ActorsFactory = NYql::NDq::CreateS3ActorsFactory(); - auto kikimr = MakeKikimrRunner(false, clientMock, databaseAsyncResolverMock, appConfig, s3ActorsFactory); + auto kikimr = MakeKikimrRunner(false, clientMock, databaseAsyncResolverMock, appConfig, s3ActorsFactory,{}, + NTestUtils::CreateCredentialProvider()); CreateExternalDataSource(providerType, kikimr); @@ -190,6 +234,30 @@ namespace NKikimr::NKqp { TestSelectAllFields(EProviderType::Ydb); } + Y_UNIT_TEST(IcebergHiveBasicSelectAll) { + TestSelectAllFields(EProviderType::IcebergHiveBasic); + } + + Y_UNIT_TEST(IcebergHiveSaSelectAll) { + TestSelectAllFields(EProviderType::IcebergHiveSa); + } + + Y_UNIT_TEST(IcebergHiveTokenSelectAll) { + TestSelectAllFields(EProviderType::IcebergHiveToken); + } + + Y_UNIT_TEST(IcebergHadoopBasicSelectAll) { + TestSelectAllFields(EProviderType::IcebergHadoopBasic); + } + + Y_UNIT_TEST(IcebergHadoopSaSelectAll) { + TestSelectAllFields(EProviderType::IcebergHadoopSa); + } + + Y_UNIT_TEST(IcebergHadoopTokenSelectAll) { + TestSelectAllFields(EProviderType::IcebergHadoopToken); + } + void TestSelectConstant(EProviderType providerType) { // prepare mock auto clientMock = std::make_shared(); @@ -240,7 +308,8 @@ namespace NKikimr::NKqp { // run test auto appConfig = CreateDefaultAppConfig(); auto s3ActorsFactory = NYql::NDq::CreateS3ActorsFactory(); - auto kikimr = MakeKikimrRunner(false, clientMock, databaseAsyncResolverMock, appConfig, s3ActorsFactory); + auto kikimr = MakeKikimrRunner(false, clientMock, databaseAsyncResolverMock, appConfig, s3ActorsFactory, {}, + NTestUtils::CreateCredentialProvider()); CreateExternalDataSource(providerType, kikimr); @@ -280,6 +349,30 @@ namespace NKikimr::NKqp { TestSelectConstant(EProviderType::Ydb); } + Y_UNIT_TEST(IcebergHiveBasicSelectConstant) { + TestSelectConstant(EProviderType::IcebergHiveBasic); + } + + Y_UNIT_TEST(IcebergHiveSaSelectConstant) { + TestSelectConstant(EProviderType::IcebergHiveSa); + } + + Y_UNIT_TEST(IcebergHiveTokenSelectConstant) { + TestSelectConstant(EProviderType::IcebergHiveToken); + } + + Y_UNIT_TEST(IcebergHadoopBasicSelectConstant) { + TestSelectConstant(EProviderType::IcebergHadoopBasic); + } + + Y_UNIT_TEST(IcebergHadoopSaSelectConstant) { + TestSelectConstant(EProviderType::IcebergHadoopSa); + } + + Y_UNIT_TEST(IcebergHadoopTokenSelectConstant) { + TestSelectConstant(EProviderType::IcebergHadoopToken); + } + void TestSelectCount(EProviderType providerType) { // prepare mock auto clientMock = std::make_shared(); @@ -330,7 +423,8 @@ namespace NKikimr::NKqp { // run test auto appConfig = CreateDefaultAppConfig(); auto s3ActorsFactory = NYql::NDq::CreateS3ActorsFactory(); - auto kikimr = MakeKikimrRunner(false, clientMock, databaseAsyncResolverMock, appConfig, s3ActorsFactory); + auto kikimr = MakeKikimrRunner(false, clientMock, databaseAsyncResolverMock, appConfig, s3ActorsFactory, {}, + NTestUtils::CreateCredentialProvider()); CreateExternalDataSource(providerType, kikimr); @@ -366,6 +460,30 @@ namespace NKikimr::NKqp { TestSelectCount(EProviderType::Ydb); } + Y_UNIT_TEST(IcebergHiveBasicSelectCount) { + TestSelectCount(EProviderType::IcebergHiveBasic); + } + + Y_UNIT_TEST(IcebergHiveSaSelectCount) { + TestSelectCount(EProviderType::IcebergHiveSa); + } + + Y_UNIT_TEST(IcebergHiveTokenSelectCount) { + TestSelectCount(EProviderType::IcebergHiveToken); + } + + Y_UNIT_TEST(IcebergHadoopBasicSelectCount) { + TestSelectCount(EProviderType::IcebergHadoopBasic); + } + + Y_UNIT_TEST(IcebergHadoopSaSelectCount) { + TestSelectCount(EProviderType::IcebergHadoopSa); + } + + Y_UNIT_TEST(IcebergHadoopTokenSelectCount) { + TestSelectCount(EProviderType::IcebergHadoopToken); + } + void TestFilterPushdown(EProviderType providerType) { // prepare mock auto clientMock = std::make_shared(); @@ -439,7 +557,8 @@ namespace NKikimr::NKqp { // run test auto appConfig = CreateDefaultAppConfig(); auto s3ActorsFactory = NYql::NDq::CreateS3ActorsFactory(); - auto kikimr = MakeKikimrRunner(false, clientMock, databaseAsyncResolverMock, appConfig, s3ActorsFactory); + auto kikimr = MakeKikimrRunner(false, clientMock, databaseAsyncResolverMock, appConfig, s3ActorsFactory, {}, + NTestUtils::CreateCredentialProvider()); CreateExternalDataSource(providerType, kikimr); @@ -476,5 +595,29 @@ namespace NKikimr::NKqp { Y_UNIT_TEST(YdbFilterPushdown) { TestFilterPushdown(EProviderType::Ydb); } + + Y_UNIT_TEST(IcebergHiveBasicFilterPushdown) { + TestFilterPushdown(EProviderType::IcebergHiveBasic); + } + + Y_UNIT_TEST(IcebergHiveSaFilterPushdown) { + TestFilterPushdown(EProviderType::IcebergHiveSa); + } + + Y_UNIT_TEST(IcebergHiveTokenFilterPushdown) { + TestFilterPushdown(EProviderType::IcebergHiveToken); + } + + Y_UNIT_TEST(IcebergHadoopBasicFilterPushdown) { + TestFilterPushdown(EProviderType::IcebergHadoopBasic); + } + + Y_UNIT_TEST(IcebergHadoopSaFilterPushdown) { + TestFilterPushdown(EProviderType::IcebergHadoopSa); + } + + Y_UNIT_TEST(IcebergHadoopTokenFilterPushdown) { + TestFilterPushdown(EProviderType::IcebergHadoopToken); + } } } diff --git a/ydb/core/kqp/ut/federated_query/generic_ut/ya.make b/ydb/core/kqp/ut/federated_query/generic_ut/ya.make index ce1a18603799..8f6c9efdaa49 100644 --- a/ydb/core/kqp/ut/federated_query/generic_ut/ya.make +++ b/ydb/core/kqp/ut/federated_query/generic_ut/ya.make @@ -4,6 +4,8 @@ FORK_SUBTESTS() SRCS( kqp_generic_provider_ut.cpp + iceberg_ut_data.cpp + iceberg_ut_data.h ) PEERDIR( diff --git a/ydb/library/yql/providers/common/db_id_async_resolver/database_type.cpp b/ydb/library/yql/providers/common/db_id_async_resolver/database_type.cpp index 557d4d227a3c..4dee5b2c025d 100644 --- a/ydb/library/yql/providers/common/db_id_async_resolver/database_type.cpp +++ b/ydb/library/yql/providers/common/db_id_async_resolver/database_type.cpp @@ -17,7 +17,8 @@ std::set GetAllExternalDataSourceTypes() { ToString(NYql::EDatabaseType::MsSQLServer), ToString(NYql::EDatabaseType::Oracle), ToString(NYql::EDatabaseType::Logging), - ToString(NYql::EDatabaseType::Solomon) + ToString(NYql::EDatabaseType::Solomon), + ToString(NYql::EDatabaseType::Iceberg) }; return allTypes; } @@ -35,11 +36,13 @@ EDatabaseType DatabaseTypeFromDataSourceKind(NYql::EGenericDataSourceKind dataSo case NYql::EGenericDataSourceKind::GREENPLUM: return EDatabaseType::Greenplum; case NYql::EGenericDataSourceKind::MS_SQL_SERVER: - return EDatabaseType::MsSQLServer; + return EDatabaseType::MsSQLServer; case NYql::EGenericDataSourceKind::ORACLE: - return EDatabaseType::Oracle; + return EDatabaseType::Oracle; case NYql::EGenericDataSourceKind::LOGGING: - return EDatabaseType::Logging; + return EDatabaseType::Logging; + case NYql::EGenericDataSourceKind::ICEBERG: + return EDatabaseType::Iceberg; default: ythrow yexception() << "Unknown data source kind: " << NYql::EGenericDataSourceKind_Name(dataSourceKind); } @@ -48,21 +51,23 @@ EDatabaseType DatabaseTypeFromDataSourceKind(NYql::EGenericDataSourceKind dataSo NYql::EGenericDataSourceKind DatabaseTypeToDataSourceKind(EDatabaseType databaseType) { switch (databaseType) { case EDatabaseType::PostgreSQL: - return NYql::EGenericDataSourceKind::POSTGRESQL; + return NYql::EGenericDataSourceKind::POSTGRESQL; case EDatabaseType::ClickHouse: - return NYql::EGenericDataSourceKind::CLICKHOUSE; + return NYql::EGenericDataSourceKind::CLICKHOUSE; case EDatabaseType::Ydb: - return NYql::EGenericDataSourceKind::YDB; + return NYql::EGenericDataSourceKind::YDB; case EDatabaseType::MySQL: return NYql::EGenericDataSourceKind::MYSQL; case EDatabaseType::Greenplum: - return NYql::EGenericDataSourceKind::GREENPLUM; + return NYql::EGenericDataSourceKind::GREENPLUM; case EDatabaseType::MsSQLServer: return NYql::EGenericDataSourceKind::MS_SQL_SERVER; case EDatabaseType::Oracle: return NYql::EGenericDataSourceKind::ORACLE; case EDatabaseType::Logging: return NYql::EGenericDataSourceKind::LOGGING; + case EDatabaseType::Iceberg: + return NYql::EGenericDataSourceKind::ICEBERG; default: ythrow yexception() << "Unknown database type: " << ToString(databaseType); } diff --git a/ydb/library/yql/providers/common/db_id_async_resolver/database_type.h b/ydb/library/yql/providers/common/db_id_async_resolver/database_type.h index 9532e3df7d8e..fee3fd5d97f7 100644 --- a/ydb/library/yql/providers/common/db_id_async_resolver/database_type.h +++ b/ydb/library/yql/providers/common/db_id_async_resolver/database_type.h @@ -18,7 +18,8 @@ enum class EDatabaseType { MsSQLServer, Oracle, Logging, - Solomon + Solomon, + Iceberg }; std::set GetAllExternalDataSourceTypes(); diff --git a/ydb/library/yql/providers/generic/actors/yql_generic_provider_factories.cpp b/ydb/library/yql/providers/generic/actors/yql_generic_provider_factories.cpp index f24eceeac5e0..722265b9fc29 100644 --- a/ydb/library/yql/providers/generic/actors/yql_generic_provider_factories.cpp +++ b/ydb/library/yql/providers/generic/actors/yql_generic_provider_factories.cpp @@ -50,7 +50,8 @@ namespace NYql::NDq { "GreenplumGeneric", "MsSQLServerGeneric", "OracleGeneric", - "LoggingGeneric"}) { + "LoggingGeneric", + "IcebergGeneric"}) { factory.RegisterSource(name, readActorFactory); factory.RegisterLookupSource(name, lookupActorFactory); } diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_cluster_config.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_cluster_config.cpp index 05acded094d5..8a6642b1e29b 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_cluster_config.cpp +++ b/ydb/library/yql/providers/generic/provider/yql_generic_cluster_config.cpp @@ -8,6 +8,7 @@ #include #include +#include #include "yql_generic_cluster_config.h" @@ -199,6 +200,7 @@ namespace NYql { EGenericDataSourceKind::MYSQL, EGenericDataSourceKind::MS_SQL_SERVER, EGenericDataSourceKind::ORACLE, + EGenericDataSourceKind::ICEBERG }, clusterConfig.GetKind() )) { @@ -276,6 +278,43 @@ namespace NYql { clusterConfig.mutable_datasourceoptions()->insert({"folder_id", TString(it->second)}); } + /// + /// Extract token from properties and copy it to a cluster's config + /// + void ParseToken(const THashMap& properties, + NYql::TGenericClusterConfig& clusterConfig) { + auto it = properties.find("token"); + + if (it == properties.cend()) { + return; + } + + if (!it->second) { + return; + } + + clusterConfig.SetToken(it->second); + } + + /// + /// Fill properties for an iceberg data source + /// + void ParseIcebergFields(const THashMap& properties, + NYql::TGenericClusterConfig& clusterConfig) { + + if (clusterConfig.GetKind() != NYql::EGenericDataSourceKind::ICEBERG) { + return; + } + + for (auto f : NKikimr::NExternalSource::NIceberg::FieldsToConnector) { + auto it = properties.find(f); + + if (properties.end() != it) { + clusterConfig.MutableDataSourceOptions()->insert({f, it->second}); + } + } + } + using TProtoProperties = google::protobuf::Map; TString GetPropertyWithDefault(const TProtoProperties& properties, const TString& key) { @@ -302,8 +341,11 @@ namespace NYql { ParseProtocol(properties, clusterConfig); ParseServiceAccountId(properties, clusterConfig); ParseServiceAccountIdSignature(properties, clusterConfig); + ParseToken(properties, clusterConfig); ParseFolderId(properties, clusterConfig); + ParseIcebergFields(properties, clusterConfig); + return clusterConfig; } diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp index 408e2a9ae3d7..164cc131eea0 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp +++ b/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp @@ -38,6 +38,8 @@ namespace NYql { return "OracleGeneric"; case NYql::EGenericDataSourceKind::LOGGING: return "LoggingGeneric"; + case NYql::EGenericDataSourceKind::ICEBERG: + return "IcebergGeneric"; default: throw yexception() << "Data source kind is unknown or not specified"; } @@ -201,10 +203,10 @@ namespace NYql { } } - // Managed YDB (including YDB underlying Logging) supports access via IAM token. + // Iceberg/Managed YDB (including YDB underlying Logging) supports access via IAM token. // If exist, copy service account creds to obtain tokens during request execution phase. // If exists, copy previously created token. - if (IsIn({NYql::EGenericDataSourceKind::YDB, NYql::EGenericDataSourceKind::LOGGING}, clusterConfig.kind())) { + if (IsIn({NYql::EGenericDataSourceKind::YDB, NYql::EGenericDataSourceKind::LOGGING, NYql::EGenericDataSourceKind::ICEBERG}, clusterConfig.kind())) { source.SetServiceAccountId(clusterConfig.GetServiceAccountId()); source.SetServiceAccountIdSignature(clusterConfig.GetServiceAccountIdSignature()); source.SetToken(State_->Types->Credentials->FindCredentialContent( @@ -261,6 +263,9 @@ namespace NYql { case NYql::EGenericDataSourceKind::LOGGING: properties["SourceType"] = "Logging"; break; + case NYql::EGenericDataSourceKind::ICEBERG: + properties["SourceType"] = "Iceberg"; + break; case NYql::EGenericDataSourceKind::DATA_SOURCE_KIND_UNSPECIFIED: break; default: diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp index e44a413484a8..1e98fc12b44d 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp +++ b/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp @@ -20,6 +20,7 @@ #include #include #include +#include namespace NYql { using namespace NNodes; @@ -455,6 +456,58 @@ namespace NYql { } } + TString GetOptionValue(const ::google::protobuf::Map& options, TString option) { + auto it = options.find(option); + + if (options.end() == it) { + throw yexception() + << "Cluster config for an Iceberg data source" + << " is missing option: " + << option; + } + + return it->second; + } + + /// + /// Fill options into DatSourceOptions specific for an iceberg data type + /// + void SetIcebergOptions(NYql::TIcebergDataSourceOptions& dataSourceOptions, const TGenericClusterConfig& clusterConfig) { + using namespace NKikimr::NExternalSource::NIceberg; + + const auto& clusterOptions = clusterConfig.GetDataSourceOptions(); + auto warehouseType = GetOptionValue(clusterOptions, WAREHOUSE_TYPE); + + if (VALUE_S3 != warehouseType) { + throw yexception() << "Unexpected warehouse type: " << warehouseType; + } + + auto endpoint = GetOptionValue(clusterOptions, WAREHOUSE_S3_ENDPOINT); + auto region = GetOptionValue(clusterOptions, WAREHOUSE_S3_REGION); + auto uri = GetOptionValue(clusterOptions, WAREHOUSE_S3_URI); + auto& s3 = *dataSourceOptions.mutable_warehouse()->mutable_s3(); + + s3.set_endpoint(endpoint); + s3.set_region(region); + s3.set_uri(uri); + + auto catalogType = GetOptionValue(clusterOptions, CATALOG_TYPE); + auto& catalog = *dataSourceOptions.mutable_catalog(); + + // set catalog options + if (VALUE_HADOOP == catalogType) { + // hadoop nothing yet + catalog.mutable_hadoop(); + } else if (VALUE_HIVE == catalogType) { + auto hiveUri = GetOptionValue(clusterOptions, CATALOG_HIVE_URI); + + catalog.mutable_hive()->set_uri(hiveUri); + } else { + throw yexception() << "Unexpected catalog type: " << catalogType; + } + } + void FillDataSourceOptions(NConnector::NApi::TDescribeTableRequest& request, const TGenericClusterConfig& clusterConfig) { const auto dataSourceKind = clusterConfig.GetKind(); @@ -483,6 +536,10 @@ namespace NYql { auto* options = request.mutable_data_source_instance()->mutable_logging_options(); SetLoggingFolderId(*options, clusterConfig); } break; + case NYql::EGenericDataSourceKind::ICEBERG: { + auto* options = request.mutable_data_source_instance()->mutable_iceberg_options(); + SetIcebergOptions(*options, clusterConfig); + } break; default: throw yexception() << "Unexpected data source kind: '" << NYql::EGenericDataSourceKind_Name(dataSourceKind) << "'"; diff --git a/ydb/tests/tools/kqprun/configuration/app_config.conf b/ydb/tests/tools/kqprun/configuration/app_config.conf index 11d405f57bac..27398db25b91 100644 --- a/ydb/tests/tools/kqprun/configuration/app_config.conf +++ b/ydb/tests/tools/kqprun/configuration/app_config.conf @@ -41,6 +41,7 @@ QueryServiceConfig { AvailableExternalDataSources: "Oracle" AvailableExternalDataSources: "Logging" AvailableExternalDataSources: "Solomon" + AvailableExternalDataSources: "Iceberg" FileStorage { MaxFiles: 1000 From 7b32d33949f1f02babcdd44990beb09f2935ea33 Mon Sep 17 00:00:00 2001 From: s2m1 Date: Tue, 1 Apr 2025 15:26:36 +0300 Subject: [PATCH 2/8] =?UTF-8?q?=D0=BE=D0=BF=D0=B8=D1=81=D0=B0=D0=BD=D0=B8?= =?UTF-8?q?=D0=B5=20=D0=B8=D1=81=D1=82=D0=BE=D1=87=D0=BD=D0=B8=D0=BA=D0=B0?= =?UTF-8?q?=20=D0=B4=D0=B0=D0=BD=D0=BD=D1=8B=D1=85=20=D0=B4=D0=BB=D1=8F=20?= =?UTF-8?q?Iceberg=20commit=5Fhash:f4f6b9142bae2e70559b0ee6ded54c577ffbad1?= =?UTF-8?q?0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../generic_ut/iceberg_ut_data.cpp | 4 +- .../common/proto/gateways_config.proto | 53 +++++++++++++++++++ 2 files changed, 55 insertions(+), 2 deletions(-) diff --git a/ydb/core/kqp/ut/federated_query/generic_ut/iceberg_ut_data.cpp b/ydb/core/kqp/ut/federated_query/generic_ut/iceberg_ut_data.cpp index 7c4f0e65b31a..24278205d67a 100644 --- a/ydb/core/kqp/ut/federated_query/generic_ut/iceberg_ut_data.cpp +++ b/ydb/core/kqp/ut/federated_query/generic_ut/iceberg_ut_data.cpp @@ -188,7 +188,7 @@ class TStaticCredentialsProvider : public NYdb::ICredentialsProvider { : YqlToken_(yqlToken) {} - std::string GetAuthInfo() const override { + TString GetAuthInfo() const override { return YqlToken_; } @@ -197,7 +197,7 @@ class TStaticCredentialsProvider : public NYdb::ICredentialsProvider { } private: - std::string YqlToken_; + TString YqlToken_; }; class TStaticCredentialsProviderFactory : public NYdb::ICredentialsProviderFactory { diff --git a/ydb/library/yql/providers/common/proto/gateways_config.proto b/ydb/library/yql/providers/common/proto/gateways_config.proto index 144e1fd9bcb2..d17e239936b5 100644 --- a/ydb/library/yql/providers/common/proto/gateways_config.proto +++ b/ydb/library/yql/providers/common/proto/gateways_config.proto @@ -588,6 +588,7 @@ enum EGenericDataSourceKind { GREENPLUM = 7; ORACLE = 8; LOGGING = 9; + ICEBERG = 13; } // EGenericProtocol generalizes various kinds of network protocols supported by different databases. @@ -634,6 +635,57 @@ message TLoggingDataSourceOptions { optional string folder_id = 1; } +// TIcebergCatalog represents settings specific to iceberg catalog +message TIcebergCatalog { + // Hadoop Iceberg Catalog which is built on top of a storage + message THadoop { + } + + // Hive Iceberg Catalog which is based on a Hive Metastore + message THive { + // Location of a hive metastore + // e.g., thrift://host:9083/ + optional string uri = 1; + } + + oneof payload { + THadoop hadoop = 1; + THive hive = 2; + } +} + +// TIcebergWarehouse represents settings specific to iceberg warehouse +message TIcebergWarehouse { + // Iceberg data located in a S3 storage + message TS3 { + // Data location in a storage + // e.g., s3a://iceberg-bucket/storage + optional string uri = 1; + + // Endpoint to access a storage + // e.g., https://storage.yandexcloud.net + optional string endpoint = 2; + + // Region where a storage is located + // e.g., ru-central1 + optional string region = 3; + } + + oneof payload { + TS3 s3 = 1; + } +} + +// TIcebergDataSourceOptions represents settings specific +// to Iceberg data source +message TIcebergDataSourceOptions { + // Iceberg catalog + optional TIcebergCatalog catalog = 1; + + // Iceberg warehouse + optional TIcebergWarehouse warehouse = 2; +} + // TGenericDataSourceInstance helps to identify the instance of a data source to redirect request to. message TGenericDataSourceInstance { // Data source kind @@ -658,6 +710,7 @@ message TGenericDataSourceInstance { TGreenplumDataSourceOptions gp_options = 10; TOracleDataSourceOptions oracle_options = 11; TLoggingDataSourceOptions logging_options = 12; + TIcebergDataSourceOptions iceberg_options = 14; } } From 681c507f8b8d8935ba84cb47889dad3bf3d82f22 Mon Sep 17 00:00:00 2001 From: s2m1 Date: Fri, 18 Apr 2025 14:20:08 +0300 Subject: [PATCH 3/8] =?UTF-8?q?=D0=94=D0=BE=D0=B1=D0=B0=D0=B2=D0=B8=D0=BB?= =?UTF-8?q?=20HiveMetastore,=20=D1=87=D1=82=D0=BE=D0=B1=D1=8B=20=D0=B2?= =?UTF-8?q?=D1=8B=D0=BF=D0=BE=D0=BB=D0=BD=D0=B8=D1=82=D1=8C=20=D0=BF=D0=B5?= =?UTF-8?q?=D1=80=D0=B5=D0=B8=D0=BC=D0=B5=D0=BD=D0=BE=D0=B2=D0=B0=D0=BD?= =?UTF-8?q?=D0=B8=D0=B5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Необходимо в gateways\_config.proto переименовать Hive в HiveMetastore так как это заезжает в github через аркадию, то сделаю в три этапа: 1. Этот PR добавляет HiveMetastore 2. После того как он попадет в Github перепишу код 3. Удалю Hive и создан новый PR в arcadia commit_hash:deb8a0de7d7e39f31d968874d9e5c689eed03934 --- .../yql/providers/common/proto/gateways_config.proto | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/ydb/library/yql/providers/common/proto/gateways_config.proto b/ydb/library/yql/providers/common/proto/gateways_config.proto index d17e239936b5..43ee6db5f177 100644 --- a/ydb/library/yql/providers/common/proto/gateways_config.proto +++ b/ydb/library/yql/providers/common/proto/gateways_config.proto @@ -641,8 +641,13 @@ message TIcebergCatalog { message THadoop { } - // Hive Iceberg Catalog which is based on a Hive Metastore + // todo: remove message THive { + optional string uri = 1; + } + + // Hive Iceberg Catalog which is based on a Hive Metastore + message THiveMetastore { // Location of a hive metastore // e.g., thrift://host:9083/ optional string uri = 1; @@ -650,7 +655,9 @@ message TIcebergCatalog { oneof payload { THadoop hadoop = 1; + // todo: remove THive hive = 2; + THiveMetastore hive_metastore = 3; } } From b6ad7f445d0d651909ac7668b0bc4a69bf830389 Mon Sep 17 00:00:00 2001 From: Slusarenko Igor Date: Mon, 21 Apr 2025 21:51:24 +0300 Subject: [PATCH 4/8] Use HiveMetastore instead of Hive (#17483) --- .../external_source_factory.cpp | 10 ++-- ydb/core/external_sources/iceberg_ddl_ut.cpp | 4 +- ydb/core/external_sources/iceberg_fields.h | 12 ++--- .../generic_ut/iceberg_ut_data.cpp | 26 ++++----- .../generic_ut/iceberg_ut_data.h | 4 +- .../generic_ut/kqp_generic_provider_ut.cpp | 54 +++++++++---------- .../provider/yql_generic_load_meta.cpp | 6 +-- 7 files changed, 58 insertions(+), 58 deletions(-) diff --git a/ydb/core/external_sources/external_source_factory.cpp b/ydb/core/external_sources/external_source_factory.cpp index 47bcdfb2d8a6..d0c7ed16cdea 100644 --- a/ydb/core/external_sources/external_source_factory.cpp +++ b/ydb/core/external_sources/external_source_factory.cpp @@ -69,16 +69,16 @@ IExternalSource::TPtr BuildIcebergSource(const std::vector& hostnam GetRequiredValidator(), GetHasSettingCondition(WAREHOUSE_TYPE, VALUE_S3) ) - // Catalog type is a required field and can be equal only to "hive" or "hadoop" + // Catalog type is a required field and can be equal only to "hive_metastore" or "hadoop" .Property( CATALOG_TYPE, - GetIsInListValidator({VALUE_HIVE, VALUE_HADOOP}, true) + GetIsInListValidator({VALUE_HIVE_METASTORE, VALUE_HADOOP}, true) ) - // If catalog type is equal to "hive" the field "hive_uri" is required + // If catalog type is equal to "hive_metastore" the field "catalog_hive_metastore_uri" is required .Property( - CATALOG_HIVE_URI, + CATALOG_HIVE_METASTORE_URI, GetRequiredValidator(), - GetHasSettingCondition(CATALOG_TYPE,VALUE_HIVE) + GetHasSettingCondition(CATALOG_TYPE, VALUE_HIVE_METASTORE) ) .HostnamePatterns(hostnamePatternsRegEx) .Build(); diff --git a/ydb/core/external_sources/iceberg_ddl_ut.cpp b/ydb/core/external_sources/iceberg_ddl_ut.cpp index 8ed578204d10..7c5b2c772136 100644 --- a/ydb/core/external_sources/iceberg_ddl_ut.cpp +++ b/ydb/core/external_sources/iceberg_ddl_ut.cpp @@ -50,8 +50,8 @@ Y_UNIT_TEST_SUITE(IcebergDdlTest) { Y_UNIT_TEST_F(HiveCatalogWithS3Test, TTestFixture) { using namespace NKikimr::NExternalSource::NIceberg; - Props[CATALOG_TYPE] = VALUE_HIVE; - Props[CATALOG_HIVE_URI] = "hive_uri"; + Props[CATALOG_TYPE] = VALUE_HIVE_METASTORE; + Props[CATALOG_HIVE_METASTORE_URI] = "hive_metastore_uri"; UNIT_ASSERT_NO_EXCEPTION(Source->ValidateExternalDataSource(Proto.SerializeAsString())); diff --git a/ydb/core/external_sources/iceberg_fields.h b/ydb/core/external_sources/iceberg_fields.h index 5a00f8e607f5..ee1d70d452b2 100644 --- a/ydb/core/external_sources/iceberg_fields.h +++ b/ydb/core/external_sources/iceberg_fields.h @@ -14,13 +14,13 @@ constexpr char WAREHOUSE_TLS[] = "use_tls"; constexpr char WAREHOUSE_DB[] = "database_name"; // Fields that belongs to a catalog -constexpr char CATALOG_TYPE[] = "catalog_type"; -constexpr char CATALOG_HIVE_URI[] = "catalog_hive_uri"; +constexpr char CATALOG_TYPE[] = "catalog_type"; +constexpr char CATALOG_HIVE_METASTORE_URI[] = "catalog_hive_metastore_uri"; // Some values -constexpr char VALUE_S3[] = "s3"; -constexpr char VALUE_HIVE[] = "hive"; -constexpr char VALUE_HADOOP[] = "hadoop"; +constexpr char VALUE_S3[] = "s3"; +constexpr char VALUE_HIVE_METASTORE[] = "hive_metastore"; +constexpr char VALUE_HADOOP[] = "hadoop"; // List of fields which is pass to a connector constexpr std::array FieldsToConnector = { @@ -29,7 +29,7 @@ constexpr std::array FieldsToConnector = { WAREHOUSE_S3_REGION, WAREHOUSE_S3_URI, CATALOG_TYPE, - CATALOG_HIVE_URI + CATALOG_HIVE_METASTORE_URI }; } // NKikimr::NExternalSource::NIceberg diff --git a/ydb/core/kqp/ut/federated_query/generic_ut/iceberg_ut_data.cpp b/ydb/core/kqp/ut/federated_query/generic_ut/iceberg_ut_data.cpp index 24278205d67a..a9df3b03919f 100644 --- a/ydb/core/kqp/ut/federated_query/generic_ut/iceberg_ut_data.cpp +++ b/ydb/core/kqp/ut/federated_query/generic_ut/iceberg_ut_data.cpp @@ -5,10 +5,10 @@ namespace NTestUtils { -constexpr char VALUE_HIVE_URI[] = "hive_uri"; -constexpr char VALUE_S3_URI[] = "s3_uri"; -constexpr char VALUE_S3_ENDPOINT[] = "s3_endpoint"; -constexpr char VALUE_S3_REGION[] = "s3_region"; +constexpr char VALUE_HIVE_METASTORE_URI[] = "hive_metastore_uri"; +constexpr char VALUE_S3_URI[] = "s3_uri"; +constexpr char VALUE_S3_ENDPOINT[] = "s3_endpoint"; +constexpr char VALUE_S3_REGION[] = "s3_region"; constexpr char VALUE_IAM[] = "IAM"; @@ -74,10 +74,10 @@ NYql::TGenericDataSourceInstance TIcebergTestData::CreateDataSourceForHadoop() { return data.Result_; } -NYql::TGenericDataSourceInstance TIcebergTestData::CreateDataSourceForHive() { +NYql::TGenericDataSourceInstance TIcebergTestData::CreateDataSourceForHiveMetastore() { TTestData data(this); - auto& hive = *data.Catalog_.mutable_hive(); - hive.set_uri(VALUE_HIVE_URI); + auto& h = *data.Catalog_.mutable_hive_metastore(); + h.set_uri(VALUE_HIVE_METASTORE_URI); return data.Result_; } @@ -156,18 +156,18 @@ void TIcebergTestData::ExecuteQuery(const std::shared_ptr& kikimr) { +void TIcebergTestData::ExecuteCreateHiveMetastoreExternalDataSource(const std::shared_ptr& kikimr) { using namespace fmt::literals; - TString hiveCatalog = fmt::format(R"( + TString hiveMetastoreCatalog = fmt::format(R"( CATALOG_TYPE="{type}", - CATALOG_HIVE_URI="{uri}" + CATALOG_HIVE_METASTORE_URI="{uri}" )", - "type"_a = NKikimr::NExternalSource::NIceberg::VALUE_HIVE, - "uri"_a = VALUE_HIVE_URI + "type"_a = NKikimr::NExternalSource::NIceberg::VALUE_HIVE_METASTORE, + "uri"_a = VALUE_HIVE_METASTORE_URI ); - ExecuteQuery(kikimr, CreateQuery(hiveCatalog)); + ExecuteQuery(kikimr, CreateQuery(hiveMetastoreCatalog)); } void TIcebergTestData::ExecuteCreateHadoopExternalDataSource(const std::shared_ptr& kikimr) { diff --git a/ydb/core/kqp/ut/federated_query/generic_ut/iceberg_ut_data.h b/ydb/core/kqp/ut/federated_query/generic_ut/iceberg_ut_data.h index ac7bfa74e612..75353a6b4571 100644 --- a/ydb/core/kqp/ut/federated_query/generic_ut/iceberg_ut_data.h +++ b/ydb/core/kqp/ut/federated_query/generic_ut/iceberg_ut_data.h @@ -28,9 +28,9 @@ class TIcebergTestData final { NYql::TGenericDataSourceInstance CreateDataSourceForHadoop(); - NYql::TGenericDataSourceInstance CreateDataSourceForHive(); + NYql::TGenericDataSourceInstance CreateDataSourceForHiveMetastore(); - void ExecuteCreateHiveExternalDataSource(const std::shared_ptr& kikimr); + void ExecuteCreateHiveMetastoreExternalDataSource(const std::shared_ptr& kikimr); void ExecuteCreateHadoopExternalDataSource(const std::shared_ptr& kikimr); diff --git a/ydb/core/kqp/ut/federated_query/generic_ut/kqp_generic_provider_ut.cpp b/ydb/core/kqp/ut/federated_query/generic_ut/kqp_generic_provider_ut.cpp index 62521042eec0..a92e01b36d2b 100644 --- a/ydb/core/kqp/ut/federated_query/generic_ut/kqp_generic_provider_ut.cpp +++ b/ydb/core/kqp/ut/federated_query/generic_ut/kqp_generic_provider_ut.cpp @@ -35,9 +35,9 @@ namespace NKikimr::NKqp { PostgreSQL, ClickHouse, Ydb, - IcebergHiveBasic, - IcebergHiveSa, - IcebergHiveToken, + IcebergHiveMetastoreBasic, + IcebergHiveMetastoreSa, + IcebergHiveMetastoreToken, IcebergHadoopBasic, IcebergHadoopSa, IcebergHadoopToken, @@ -51,12 +51,12 @@ namespace NKikimr::NKqp { return TConnectorClientMock::TClickHouseDataSourceInstanceBuilder<>().GetResult(); case EProviderType::Ydb: return TConnectorClientMock::TYdbDataSourceInstanceBuilder<>().GetResult(); - case EProviderType::IcebergHiveBasic: - return NTestUtils::CreateIcebergBasic().CreateDataSourceForHive(); - case EProviderType::IcebergHiveSa: - return NTestUtils::CreateIcebergSa().CreateDataSourceForHive(); - case EProviderType::IcebergHiveToken: - return NTestUtils::CreateIcebergToken().CreateDataSourceForHive(); + case EProviderType::IcebergHiveMetastoreBasic: + return NTestUtils::CreateIcebergBasic().CreateDataSourceForHiveMetastore(); + case EProviderType::IcebergHiveMetastoreSa: + return NTestUtils::CreateIcebergSa().CreateDataSourceForHiveMetastore(); + case EProviderType::IcebergHiveMetastoreToken: + return NTestUtils::CreateIcebergToken().CreateDataSourceForHiveMetastore(); case EProviderType::IcebergHadoopBasic: return NTestUtils::CreateIcebergBasic().CreateDataSourceForHadoop(); case EProviderType::IcebergHadoopSa: @@ -74,15 +74,15 @@ namespace NKikimr::NKqp { return CreateClickHouseExternalDataSource(kikimr); case EProviderType::Ydb: return CreateYdbExternalDataSource(kikimr); - case EProviderType::IcebergHiveBasic: + case EProviderType::IcebergHiveMetastoreBasic: return NTestUtils::CreateIcebergBasic() - .ExecuteCreateHiveExternalDataSource(kikimr); - case EProviderType::IcebergHiveSa: + .ExecuteCreateHiveMetastoreExternalDataSource(kikimr); + case EProviderType::IcebergHiveMetastoreSa: return NTestUtils::CreateIcebergSa() - .ExecuteCreateHiveExternalDataSource(kikimr); - case EProviderType::IcebergHiveToken: + .ExecuteCreateHiveMetastoreExternalDataSource(kikimr); + case EProviderType::IcebergHiveMetastoreToken: return NTestUtils::CreateIcebergToken() - .ExecuteCreateHiveExternalDataSource(kikimr); + .ExecuteCreateHiveMetastoreExternalDataSource(kikimr); case EProviderType::IcebergHadoopBasic: return NTestUtils::CreateIcebergBasic() .ExecuteCreateHadoopExternalDataSource(kikimr); @@ -235,15 +235,15 @@ namespace NKikimr::NKqp { } Y_UNIT_TEST(IcebergHiveBasicSelectAll) { - TestSelectAllFields(EProviderType::IcebergHiveBasic); + TestSelectAllFields(EProviderType::IcebergHiveMetastoreBasic); } Y_UNIT_TEST(IcebergHiveSaSelectAll) { - TestSelectAllFields(EProviderType::IcebergHiveSa); + TestSelectAllFields(EProviderType::IcebergHiveMetastoreSa); } Y_UNIT_TEST(IcebergHiveTokenSelectAll) { - TestSelectAllFields(EProviderType::IcebergHiveToken); + TestSelectAllFields(EProviderType::IcebergHiveMetastoreToken); } Y_UNIT_TEST(IcebergHadoopBasicSelectAll) { @@ -350,15 +350,15 @@ namespace NKikimr::NKqp { } Y_UNIT_TEST(IcebergHiveBasicSelectConstant) { - TestSelectConstant(EProviderType::IcebergHiveBasic); + TestSelectConstant(EProviderType::IcebergHiveMetastoreBasic); } Y_UNIT_TEST(IcebergHiveSaSelectConstant) { - TestSelectConstant(EProviderType::IcebergHiveSa); + TestSelectConstant(EProviderType::IcebergHiveMetastoreSa); } Y_UNIT_TEST(IcebergHiveTokenSelectConstant) { - TestSelectConstant(EProviderType::IcebergHiveToken); + TestSelectConstant(EProviderType::IcebergHiveMetastoreToken); } Y_UNIT_TEST(IcebergHadoopBasicSelectConstant) { @@ -461,15 +461,15 @@ namespace NKikimr::NKqp { } Y_UNIT_TEST(IcebergHiveBasicSelectCount) { - TestSelectCount(EProviderType::IcebergHiveBasic); + TestSelectCount(EProviderType::IcebergHiveMetastoreBasic); } Y_UNIT_TEST(IcebergHiveSaSelectCount) { - TestSelectCount(EProviderType::IcebergHiveSa); + TestSelectCount(EProviderType::IcebergHiveMetastoreSa); } Y_UNIT_TEST(IcebergHiveTokenSelectCount) { - TestSelectCount(EProviderType::IcebergHiveToken); + TestSelectCount(EProviderType::IcebergHiveMetastoreToken); } Y_UNIT_TEST(IcebergHadoopBasicSelectCount) { @@ -597,15 +597,15 @@ namespace NKikimr::NKqp { } Y_UNIT_TEST(IcebergHiveBasicFilterPushdown) { - TestFilterPushdown(EProviderType::IcebergHiveBasic); + TestFilterPushdown(EProviderType::IcebergHiveMetastoreBasic); } Y_UNIT_TEST(IcebergHiveSaFilterPushdown) { - TestFilterPushdown(EProviderType::IcebergHiveSa); + TestFilterPushdown(EProviderType::IcebergHiveMetastoreSa); } Y_UNIT_TEST(IcebergHiveTokenFilterPushdown) { - TestFilterPushdown(EProviderType::IcebergHiveToken); + TestFilterPushdown(EProviderType::IcebergHiveMetastoreToken); } Y_UNIT_TEST(IcebergHadoopBasicFilterPushdown) { diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp index 1e98fc12b44d..159d7d20ddd9 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp +++ b/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp @@ -499,10 +499,10 @@ namespace NYql { if (VALUE_HADOOP == catalogType) { // hadoop nothing yet catalog.mutable_hadoop(); - } else if (VALUE_HIVE == catalogType) { - auto hiveUri = GetOptionValue(clusterOptions, CATALOG_HIVE_URI); + } else if (VALUE_HIVE_METASTORE == catalogType) { + auto uri = GetOptionValue(clusterOptions, CATALOG_HIVE_METASTORE_URI); - catalog.mutable_hive()->set_uri(hiveUri); + catalog.mutable_hive_metastore()->set_uri(uri); } else { throw yexception() << "Unexpected catalog type: " << catalogType; } From 47d3fbad04e5cbe5a96c816d5e7fef18942b435d Mon Sep 17 00:00:00 2001 From: s2m1 Date: Fri, 25 Apr 2025 13:02:38 +0300 Subject: [PATCH 5/8] =?UTF-8?q?=D1=83=D0=B4=D0=B0=D0=BB=D0=B8=D0=BB=20depr?= =?UTF-8?q?ecated=20=D0=BF=D0=BE=D0=BB=D1=8F=20commit=5Fhash:99750816a3346?= =?UTF-8?q?58ecd2f306d4a496865e7fcc4d8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../yql/providers/common/proto/gateways_config.proto | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/ydb/library/yql/providers/common/proto/gateways_config.proto b/ydb/library/yql/providers/common/proto/gateways_config.proto index 43ee6db5f177..4ccd351b7469 100644 --- a/ydb/library/yql/providers/common/proto/gateways_config.proto +++ b/ydb/library/yql/providers/common/proto/gateways_config.proto @@ -641,11 +641,6 @@ message TIcebergCatalog { message THadoop { } - // todo: remove - message THive { - optional string uri = 1; - } - // Hive Iceberg Catalog which is based on a Hive Metastore message THiveMetastore { // Location of a hive metastore @@ -655,10 +650,10 @@ message TIcebergCatalog { oneof payload { THadoop hadoop = 1; - // todo: remove - THive hive = 2; THiveMetastore hive_metastore = 3; } + + reserved 2; } // TIcebergWarehouse represents settings specific to iceberg warehouse From a1ed77148ef2af6c70ed62ec3be30c8ad82987b6 Mon Sep 17 00:00:00 2001 From: Slusarenko Igor Date: Tue, 22 Apr 2025 12:39:15 +0300 Subject: [PATCH 6/8] Add iceberg connector into fqrun, kqprun configs (#17522) --- .../tools/kqprun/configuration/app_config.conf | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/ydb/tests/tools/kqprun/configuration/app_config.conf b/ydb/tests/tools/kqprun/configuration/app_config.conf index 27398db25b91..b21d32312027 100644 --- a/ydb/tests/tools/kqprun/configuration/app_config.conf +++ b/ydb/tests/tools/kqprun/configuration/app_config.conf @@ -58,9 +58,20 @@ QueryServiceConfig { Endpoint { host: "localhost" - port: 50051 + port: 2130 } } + + Connectors { + UseSsl: false + + Endpoint { + host: "localhost" + port: 21301 + } + + ForKinds: ICEBERG + } } HttpGateway { From 33ef239af76ab73921fadd1927705c97bf02ec23 Mon Sep 17 00:00:00 2001 From: Slusarenko Igor Date: Wed, 30 Apr 2025 11:52:32 +0300 Subject: [PATCH 7/8] Add java gprc stub compilation to a generic connector proto (#17903) --- ydb/library/yql/providers/generic/connector/api/service/ya.make | 1 + 1 file changed, 1 insertion(+) diff --git a/ydb/library/yql/providers/generic/connector/api/service/ya.make b/ydb/library/yql/providers/generic/connector/api/service/ya.make index 4febd9bac987..2cd37eaa77cc 100644 --- a/ydb/library/yql/providers/generic/connector/api/service/ya.make +++ b/ydb/library/yql/providers/generic/connector/api/service/ya.make @@ -11,6 +11,7 @@ ONLY_TAGS( CPP_PROTO PY_PROTO PY3_PROTO + JAVA_PROTO ) PEERDIR( From 679a67511970cba65034dddf4959ec24f6f20f3e Mon Sep 17 00:00:00 2001 From: Igor Sliusarenko Date: Sun, 18 May 2025 11:43:55 +0300 Subject: [PATCH 8/8] Fix GenericFederatedQuery tests --- .../generic_ut/kqp_generic_provider_ut.cpp | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/ydb/core/kqp/ut/federated_query/generic_ut/kqp_generic_provider_ut.cpp b/ydb/core/kqp/ut/federated_query/generic_ut/kqp_generic_provider_ut.cpp index a92e01b36d2b..e642274fbcd0 100644 --- a/ydb/core/kqp/ut/federated_query/generic_ut/kqp_generic_provider_ut.cpp +++ b/ydb/core/kqp/ut/federated_query/generic_ut/kqp_generic_provider_ut.cpp @@ -31,6 +31,8 @@ namespace NKikimr::NKqp { using namespace testing; using namespace fmt::literals; + constexpr char DEFAULT_DOMAIN_ROOT[] = "Root"; + enum class EProviderType { PostgreSQL, ClickHouse, @@ -192,7 +194,7 @@ namespace NKikimr::NKqp { // run test auto appConfig = CreateDefaultAppConfig(); auto s3ActorsFactory = NYql::NDq::CreateS3ActorsFactory(); - auto kikimr = MakeKikimrRunner(false, clientMock, databaseAsyncResolverMock, appConfig, s3ActorsFactory,{}, + auto kikimr = MakeKikimrRunner(false, clientMock, databaseAsyncResolverMock, appConfig, s3ActorsFactory, DEFAULT_DOMAIN_ROOT, NTestUtils::CreateCredentialProvider()); CreateExternalDataSource(providerType, kikimr); @@ -308,7 +310,7 @@ namespace NKikimr::NKqp { // run test auto appConfig = CreateDefaultAppConfig(); auto s3ActorsFactory = NYql::NDq::CreateS3ActorsFactory(); - auto kikimr = MakeKikimrRunner(false, clientMock, databaseAsyncResolverMock, appConfig, s3ActorsFactory, {}, + auto kikimr = MakeKikimrRunner(false, clientMock, databaseAsyncResolverMock, appConfig, s3ActorsFactory, DEFAULT_DOMAIN_ROOT, NTestUtils::CreateCredentialProvider()); CreateExternalDataSource(providerType, kikimr); @@ -423,7 +425,7 @@ namespace NKikimr::NKqp { // run test auto appConfig = CreateDefaultAppConfig(); auto s3ActorsFactory = NYql::NDq::CreateS3ActorsFactory(); - auto kikimr = MakeKikimrRunner(false, clientMock, databaseAsyncResolverMock, appConfig, s3ActorsFactory, {}, + auto kikimr = MakeKikimrRunner(false, clientMock, databaseAsyncResolverMock, appConfig, s3ActorsFactory, DEFAULT_DOMAIN_ROOT, NTestUtils::CreateCredentialProvider()); CreateExternalDataSource(providerType, kikimr); @@ -557,7 +559,7 @@ namespace NKikimr::NKqp { // run test auto appConfig = CreateDefaultAppConfig(); auto s3ActorsFactory = NYql::NDq::CreateS3ActorsFactory(); - auto kikimr = MakeKikimrRunner(false, clientMock, databaseAsyncResolverMock, appConfig, s3ActorsFactory, {}, + auto kikimr = MakeKikimrRunner(false, clientMock, databaseAsyncResolverMock, appConfig, s3ActorsFactory, DEFAULT_DOMAIN_ROOT, NTestUtils::CreateCredentialProvider()); CreateExternalDataSource(providerType, kikimr);