From da135e487b278230b88709790257ae0577ff9bc3 Mon Sep 17 00:00:00 2001 From: Yury Kiselev Date: Thu, 25 Sep 2025 13:34:38 +0300 Subject: [PATCH 1/2] Remove items from cache --- .../kqp_federated_query_actors.cpp | 27 ++++++++++++++++++- .../kqp_federated_query_actors.h | 15 ++++++++--- 2 files changed, 37 insertions(+), 5 deletions(-) diff --git a/ydb/core/kqp/federated_query/kqp_federated_query_actors.cpp b/ydb/core/kqp/federated_query/kqp_federated_query_actors.cpp index b1dcc2810682..9ba21537c71a 100644 --- a/ydb/core/kqp/federated_query/kqp_federated_query_actors.cpp +++ b/ydb/core/kqp/federated_query/kqp_federated_query_actors.cpp @@ -1,6 +1,7 @@ #include "kqp_federated_query_actors.h" #include +#include #include #include #include @@ -173,6 +174,10 @@ void TDescribeSchemaSecretsService::HandleSchemeShardResponse(NSchemeShard::TEvS return; } + if (const auto it = SchemeBoardSubscribers.find(secretName); it == SchemeBoardSubscribers.end()) { + SchemeBoardSubscribers[secretName] = Register(CreateSchemeBoardSubscriber(SelfId(), secretName)); + } + const auto& secretValue = rec.GetPathDescription().GetSecretDescription().GetValue(); const auto& secretVersion = rec.GetPathDescription().GetSecretDescription().GetVersion(); VersionedSecrets[secretName] = TVersionedSecret{ @@ -181,6 +186,7 @@ void TDescribeSchemaSecretsService::HandleSchemeShardResponse(NSchemeShard::TEvS .Name = secretName, .Value = secretValue, }; + ++respIt->second.FilledSecretsCnt; FillResponseIfFinished(ev->Cookie, respIt->second); @@ -262,8 +268,13 @@ void TDescribeSchemaSecretsService::FillResponseIfFinished(const ui64& requestId std::vector secretValues; secretValues.resize(responseCtx.Secrets.size()); for (const auto& secret : responseCtx.Secrets) { + const auto& secretPath = secret.first; auto it = VersionedSecrets.find(secret.first); - Y_ENSURE(it != VersionedSecrets.end(), "Secrets values were not retrieved for response"); + if (it == VersionedSecrets.end()) { + LOG_N("FillResponseIfFinished: request cookie=" << requestId << ", secret `" << secretPath << "` was dropped during request"); + FillResponse(requestId, TEvDescribeSecretsResponse::TDescription(Ydb::StatusIds::BAD_REQUEST, { NYql::TIssue("secret `" + secretPath + "` not found") })); + return; + } Y_ENSURE(secret.second < secretValues.size()); secretValues[secret.second] = it->second.Value; @@ -271,6 +282,20 @@ void TDescribeSchemaSecretsService::FillResponseIfFinished(const ui64& requestId FillResponse(requestId, TEvDescribeSecretsResponse::TDescription(secretValues)); } +void TDescribeSchemaSecretsService::HandleNotifyUpdate(TSchemeBoardEvents::TEvNotifyUpdate::TPtr& ev) { + Y_UNUSED(ev); +} + +void TDescribeSchemaSecretsService::HandleNotifyDelete(TSchemeBoardEvents::TEvNotifyDelete::TPtr& ev) { + const TString& secretName = CanonizePath(ev->Get()->Path); + VersionedSecrets.erase(secretName); + + auto subscriberIt = SchemeBoardSubscribers.find(secretName); + Y_ENSURE(subscriberIt != SchemeBoardSubscribers.end()); + Send(subscriberIt->second, new TEvents::TEvPoisonPill()); + SchemeBoardSubscribers.erase(subscriberIt); +} + NThreading::TFuture DescribeSecret(const TVector& secretNames, const TString& ownerUserId, TActorSystem* actorSystem) { auto promise = NThreading::NewPromise(); if (actorSystem->AppData()->FeatureFlags.GetEnableSchemaSecrets()) { diff --git a/ydb/core/kqp/federated_query/kqp_federated_query_actors.h b/ydb/core/kqp/federated_query/kqp_federated_query_actors.h index a9e4a205b95a..56ce9905ea7e 100644 --- a/ydb/core/kqp/federated_query/kqp_federated_query_actors.h +++ b/ydb/core/kqp/federated_query/kqp_federated_query_actors.h @@ -2,15 +2,16 @@ #include #include +#include +#include +#include +#include #include #include #include -#include -#include -#include -#include +#include namespace NKikimr::NKqp { @@ -60,12 +61,17 @@ class TDescribeSchemaSecretsService: public NActors::TActorBootstrapped& secretNames, const NACLib::TUserToken& userToken); @@ -83,6 +89,7 @@ class TDescribeSchemaSecretsService: public NActors::TActorBootstrapped ResolveInFlight; THashMap VersionedSecrets; + THashMap SchemeBoardSubscribers; }; IActor* CreateDescribeSecretsActor(const TString& ownerUserId, const std::vector& secretIds, NThreading::TPromise promise); From d43c20bc5913f000ae2d9199761e4c3e334e6f2b Mon Sep 17 00:00:00 2001 From: Yury Kiselev Date: Fri, 26 Sep 2025 12:59:37 +0300 Subject: [PATCH 2/2] Better user token handling --- .../kqp_federated_query_actors.cpp | 34 ++++++---- .../kqp_federated_query_actors.h | 14 ++-- .../kqp_federated_query_actors_ut.cpp | 67 ++++++++++++++++--- .../kqp/federated_query/ut_service/ya.make | 2 + .../external_data_source/manager.cpp | 2 +- ydb/core/kqp/gateway/kqp_metadata_loader.cpp | 2 +- 6 files changed, 91 insertions(+), 30 deletions(-) diff --git a/ydb/core/kqp/federated_query/kqp_federated_query_actors.cpp b/ydb/core/kqp/federated_query/kqp_federated_query_actors.cpp index 9ba21537c71a..caf6760e0770 100644 --- a/ydb/core/kqp/federated_query/kqp_federated_query_actors.cpp +++ b/ydb/core/kqp/federated_query/kqp_federated_query_actors.cpp @@ -212,19 +212,19 @@ void TDescribeSchemaSecretsService::SaveIncomingRequestInfo(const TEvResolveSecr ResolveInFlight[LastCookie] = std::move(ctx); } -void TDescribeSchemaSecretsService::SendSchemeCacheRequests(const TVector& secretNames, const NACLib::TUserToken& userToken) { +void TDescribeSchemaSecretsService::SendSchemeCacheRequests(const TVector& secretNames, const TIntrusiveConstPtr userToken) { TAutoPtr request(new NSchemeCache::TSchemeCacheNavigate()); for (const auto& secretName : secretNames) { NSchemeCache::TSchemeCacheNavigate::TEntry entry; entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpPath; entry.Path = SplitPath(secretName); - if (userToken.GetUserSID()) { + if (userToken && userToken->GetUserSID()) { entry.Access = NACLib::SelectRow; } request->ResultSet.emplace_back(entry); } - if (userToken.GetUserSID()) { - request->UserToken = new NACLib::TUserToken(userToken); + if (userToken && userToken->GetUserSID()) { + request->UserToken = userToken; } Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(request), 0, LastCookie++); @@ -296,7 +296,11 @@ void TDescribeSchemaSecretsService::HandleNotifyDelete(TSchemeBoardEvents::TEvNo SchemeBoardSubscribers.erase(subscriberIt); } -NThreading::TFuture DescribeSecret(const TVector& secretNames, const TString& ownerUserId, TActorSystem* actorSystem) { +NThreading::TFuture DescribeSecret( + const TVector& secretNames, + const TIntrusiveConstPtr userToken, + TActorSystem* actorSystem +) { auto promise = NThreading::NewPromise(); if (actorSystem->AppData()->FeatureFlags.GetEnableSchemaSecrets()) { bool schemaSecrets = false; @@ -309,12 +313,12 @@ NThreading::TFuture DescribeSecret(con if (schemaSecrets) { actorSystem->Send( MakeKqpDescribeSchemaSecretServiceId(actorSystem->NodeId), - new TDescribeSchemaSecretsService::TEvResolveSecret(ownerUserId, secretNames, promise)); + new TDescribeSchemaSecretsService::TEvResolveSecret(userToken, secretNames, promise)); return promise.GetFuture(); } } - actorSystem->Register(CreateDescribeSecretsActor(ownerUserId, secretNames, promise)); + actorSystem->Register(CreateDescribeSecretsActor(userToken ? userToken->GetUserSID() : "", secretNames, promise)); return promise.GetFuture(); } @@ -331,11 +335,15 @@ void RegisterDescribeSecretsActor(const NActors::TActorId& replyActorId, const T }); } -NThreading::TFuture DescribeExternalDataSourceSecrets(const NKikimrSchemeOp::TAuth& authDescription, const TString& ownerUserId, TActorSystem* actorSystem) { +NThreading::TFuture DescribeExternalDataSourceSecrets( + const NKikimrSchemeOp::TAuth& authDescription, + const TIntrusiveConstPtr& userToken, + TActorSystem* actorSystem +) { switch (authDescription.identity_case()) { case NKikimrSchemeOp::TAuth::kServiceAccount: { const TString& saSecretId = authDescription.GetServiceAccount().GetSecretName(); - return DescribeSecret({saSecretId}, ownerUserId, actorSystem); + return DescribeSecret({saSecretId}, userToken, actorSystem); } case NKikimrSchemeOp::TAuth::kNone: @@ -343,24 +351,24 @@ NThreading::TFuture DescribeExternalDa case NKikimrSchemeOp::TAuth::kBasic: { const TString& passwordSecretId = authDescription.GetBasic().GetPasswordSecretName(); - return DescribeSecret({passwordSecretId}, ownerUserId, actorSystem); + return DescribeSecret({passwordSecretId}, userToken, actorSystem); } case NKikimrSchemeOp::TAuth::kMdbBasic: { const TString& saSecretId = authDescription.GetMdbBasic().GetServiceAccountSecretName(); const TString& passwordSecreId = authDescription.GetMdbBasic().GetPasswordSecretName(); - return DescribeSecret({saSecretId, passwordSecreId}, ownerUserId, actorSystem); + return DescribeSecret({saSecretId, passwordSecreId}, userToken, actorSystem); } case NKikimrSchemeOp::TAuth::kAws: { const TString& awsAccessKeyIdSecretId = authDescription.GetAws().GetAwsAccessKeyIdSecretName(); const TString& awsAccessKeyKeySecretId = authDescription.GetAws().GetAwsSecretAccessKeySecretName(); - return DescribeSecret({awsAccessKeyIdSecretId, awsAccessKeyKeySecretId}, ownerUserId, actorSystem); + return DescribeSecret({awsAccessKeyIdSecretId, awsAccessKeyKeySecretId}, userToken, actorSystem); } case NKikimrSchemeOp::TAuth::kToken: { const TString& tokenSecretId = authDescription.GetToken().GetTokenSecretName(); - return DescribeSecret({tokenSecretId}, ownerUserId, actorSystem); + return DescribeSecret({tokenSecretId}, userToken, actorSystem); } case NKikimrSchemeOp::TAuth::IDENTITY_NOT_SET: diff --git a/ydb/core/kqp/federated_query/kqp_federated_query_actors.h b/ydb/core/kqp/federated_query/kqp_federated_query_actors.h index 56ce9905ea7e..3c51bc273640 100644 --- a/ydb/core/kqp/federated_query/kqp_federated_query_actors.h +++ b/ydb/core/kqp/federated_query/kqp_federated_query_actors.h @@ -25,18 +25,18 @@ class TDescribeSchemaSecretsService: public NActors::TActorBootstrapped { public: TEvResolveSecret( - const TString& ownerUserId, + const TIntrusiveConstPtr userToken, const TVector& secretNames, NThreading::TPromise promise ) - : UserToken(NACLib::TUserToken{ownerUserId, TVector{}}) + : UserToken(userToken) , SecretNames(secretNames) , Promise(promise) { } public: - const NACLib::TUserToken UserToken; + const TIntrusiveConstPtr UserToken; const TVector SecretNames; NThreading::TPromise Promise; }; @@ -74,7 +74,7 @@ class TDescribeSchemaSecretsService: public NActors::TActorBootstrapped& secretNames, const NACLib::TUserToken& userToken); + void SendSchemeCacheRequests(const TVector& secretNames, const TIntrusiveConstPtr userToken); bool LocalCacheHasActualVersion(const TVersionedSecret& secret, const ui64& cacheSecretVersion); bool LocalCacheHasActualObject(const TVersionedSecret& secret, const ui64& cacheSecretPathId); bool HandleSchemeCacheErrorsIfAny(const ui64& requestId, NSchemeCache::TSchemeCacheNavigate& result); @@ -96,7 +96,11 @@ IActor* CreateDescribeSecretsActor(const TString& ownerUserId, const std::vector void RegisterDescribeSecretsActor(const TActorId& replyActorId, const TString& ownerUserId, const std::vector& secretIds, TActorSystem* actorSystem); -NThreading::TFuture DescribeExternalDataSourceSecrets(const NKikimrSchemeOp::TAuth& authDescription, const TString& ownerUserId, TActorSystem* actorSystem); +NThreading::TFuture DescribeExternalDataSourceSecrets( + const NKikimrSchemeOp::TAuth& authDescription, + const TIntrusiveConstPtr& userToken, + TActorSystem* actorSystem +); IActor* CreateDescribeSchemaSecretsService(); diff --git a/ydb/core/kqp/federated_query/kqp_federated_query_actors_ut.cpp b/ydb/core/kqp/federated_query/kqp_federated_query_actors_ut.cpp index bb3bdc15e4e8..7a01702164fc 100644 --- a/ydb/core/kqp/federated_query/kqp_federated_query_actors_ut.cpp +++ b/ydb/core/kqp/federated_query/kqp_federated_query_actors_ut.cpp @@ -26,17 +26,17 @@ namespace { } NThreading::TPromise - ResolveSecret(const TVector& secretNames, NKikimr::NKqp::TKikimrRunner& kikimr, const TString& userId = "") { + ResolveSecret(const TVector& secretNames, NKikimr::NKqp::TKikimrRunner& kikimr, const TIntrusiveConstPtr userToken = nullptr) { auto promise = NThreading::NewPromise(); - const auto evResolveSecret = new NKikimr::NKqp::TDescribeSchemaSecretsService::TEvResolveSecret(userId, secretNames, promise); + const auto evResolveSecret = new NKikimr::NKqp::TDescribeSchemaSecretsService::TEvResolveSecret(userToken, secretNames, promise); auto actorSystem = kikimr.GetTestServer().GetRuntime()->GetActorSystem(0); actorSystem->Send(NKikimr::NKqp::MakeKqpDescribeSchemaSecretServiceId(actorSystem->NodeId), evResolveSecret); return promise; } NThreading::TPromise - ResolveSecret(const TString& secretName, NKikimr::NKqp::TKikimrRunner& kikimr, const TString& userId = "") { - return ResolveSecret(TVector{secretName}, kikimr, userId); + ResolveSecret(const TString& secretName, NKikimr::NKqp::TKikimrRunner& kikimr, const TIntrusiveConstPtr userToken = nullptr) { + return ResolveSecret(TVector{secretName}, kikimr, userToken); } void AssertBadRequest(NThreading::TPromise promise, const TString& err) { @@ -168,11 +168,11 @@ Y_UNIT_TEST_SUITE(DescribeSchemaSecretsService) { CreateSchemaSecret(secretName, secretValue, adminSession); - auto promise = ResolveSecret(secretName, kikimr, "root@builtin"); + auto promise = ResolveSecret(secretName, kikimr, new NACLib::TUserToken("root@builtin", {})); UNIT_ASSERT_VALUES_EQUAL(secretValue, promise.GetFuture().GetValueSync().SecretValues[0]); { // assert no grants by default - auto promise = ResolveSecret("/Root/secret-name", kikimr, "user@builtin"); + auto promise = ResolveSecret("/Root/secret-name", kikimr, new NACLib::TUserToken("user@builtin", {})); AssertBadRequest(promise, "
: Error: secret `/Root/secret-name` not found\n"); } @@ -183,7 +183,7 @@ Y_UNIT_TEST_SUITE(DescribeSchemaSecretsService) { UNIT_ASSERT_C(grantResult.GetStatus() == NYdb::EStatus::SUCCESS, grantResult.GetIssues().ToString()); { // assert grants are ok - auto promise = ResolveSecret("/Root/secret-name", kikimr, "user@builtin"); + auto promise = ResolveSecret("/Root/secret-name", kikimr, new NACLib::TUserToken("user@builtin", {})); UNIT_ASSERT_VALUES_EQUAL(secretValue, promise.GetFuture().GetValueSync().SecretValues[0]); } @@ -194,7 +194,54 @@ Y_UNIT_TEST_SUITE(DescribeSchemaSecretsService) { UNIT_ASSERT_C(revokeResult.GetStatus() == NYdb::EStatus::SUCCESS, grantResult.GetIssues().ToString()); { // assert no grants after revoking - auto promise = ResolveSecret("/Root/secret-name", kikimr, "user@builtin"); + auto promise = ResolveSecret("/Root/secret-name", kikimr, new NACLib::TUserToken("user@builtin", {})); + AssertBadRequest(promise, "
: Error: secret `/Root/secret-name` not found\n"); + } + } + + Y_UNIT_TEST(GroupGrants) { + NKikimr::NKqp::TKikimrRunner kikimr; + kikimr.GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableSchemaSecrets(true); + + const TString secretName = "/Root/secret-name"; + const TString secretValue = "secret-value"; + auto adminSession = kikimr.GetTableClient(NYdb::NTable::TClientSettings().AuthToken("root@builtin")) + .CreateSession().GetValueSync().GetSession(); + + CreateSchemaSecret(secretName, secretValue, adminSession); + + auto promise = ResolveSecret(secretName, kikimr, new NACLib::TUserToken("root@builtin", {})); + UNIT_ASSERT_VALUES_EQUAL(secretValue, promise.GetFuture().GetValueSync().SecretValues[0]); + + const TIntrusiveConstPtr userToken = new NACLib::TUserToken("user@builtin", {"group"}); + { // assert no grants by default + auto promise = ResolveSecret("/Root/secret-name", kikimr, userToken); + AssertBadRequest(promise, "
: Error: secret `/Root/secret-name` not found\n"); + } + + const auto createGroupResult = adminSession.ExecuteSchemeQuery( + Sprintf("CREATE GROUP `group` WITH USER `user@builtin`;") + ).GetValueSync(); + UNIT_ASSERT_C(createGroupResult.GetStatus() == NYdb::EStatus::SUCCESS, createGroupResult.GetIssues().ToString()); + + const auto grantResult = adminSession.ExecuteSchemeQuery( + Sprintf("GRANT 'ydb.granular.select_row' ON `%s` TO `%s`;", secretName.data(), "group") + ).GetValueSync(); + UNIT_ASSERT_C(grantResult.GetStatus() == NYdb::EStatus::SUCCESS, grantResult.GetIssues().ToString()); + + { // assert group grants are ok + auto promise = ResolveSecret("/Root/secret-name", kikimr, userToken); + UNIT_ASSERT_VALUES_EQUAL(secretValue, promise.GetFuture().GetValueSync().SecretValues[0]); + } + + // revoke grants + const auto revokeResult = adminSession.ExecuteSchemeQuery( + Sprintf("REVOKE 'ydb.granular.select_row' ON `%s` FROM `%s`;", secretName.data(), "group") + ).GetValueSync(); + UNIT_ASSERT_C(revokeResult.GetStatus() == NYdb::EStatus::SUCCESS, grantResult.GetIssues().ToString()); + + { // assert no grants after revoking + auto promise = ResolveSecret("/Root/secret-name", kikimr, userToken); AssertBadRequest(promise, "
: Error: secret `/Root/secret-name` not found\n"); } } @@ -299,7 +346,7 @@ Y_UNIT_TEST_SUITE(DescribeSchemaSecretsService) { UNIT_ASSERT_C(grantResult.GetStatus() == NYdb::EStatus::SUCCESS, grantResult.GetIssues().ToString()); { // user has grants for names[0], has no grants for names[1] - auto promise = ResolveSecret({names[0], names[1]}, kikimr, "user@builtin"); + auto promise = ResolveSecret({names[0], names[1]}, kikimr, new NACLib::TUserToken("user@builtin", {})); AssertBadRequest(promise, "
: Error: secret `/Root/secret-name-1` not found\n"); } @@ -309,7 +356,7 @@ Y_UNIT_TEST_SUITE(DescribeSchemaSecretsService) { UNIT_ASSERT_C(grantResult.GetStatus() == NYdb::EStatus::SUCCESS, grantResult.GetIssues().ToString()); { // user has grants for all names[0] - auto promise = ResolveSecret({names[0], names[1]}, kikimr, "user@builtin"); + auto promise = ResolveSecret({names[0], names[1]}, kikimr, new NACLib::TUserToken("user@builtin", {})); for (size_t i = 0; i < values.size(); ++i) { UNIT_ASSERT_VALUES_EQUAL(values[i], promise.GetFuture().GetValueSync().SecretValues[i]); } diff --git a/ydb/core/kqp/federated_query/ut_service/ya.make b/ydb/core/kqp/federated_query/ut_service/ya.make index 54e96db48023..9ae593231431 100644 --- a/ydb/core/kqp/federated_query/ut_service/ya.make +++ b/ydb/core/kqp/federated_query/ut_service/ya.make @@ -1,5 +1,7 @@ UNITTEST_FOR(ydb/core/kqp/federated_query) +SIZE(MEDIUM) + PEERDIR( ydb/core/kqp/federated_query ydb/core/kqp/ut/common diff --git a/ydb/core/kqp/gateway/behaviour/external_data_source/manager.cpp b/ydb/core/kqp/gateway/behaviour/external_data_source/manager.cpp index d325abd4ced3..9be3752dbc71 100644 --- a/ydb/core/kqp/gateway/behaviour/external_data_source/manager.cpp +++ b/ydb/core/kqp/gateway/behaviour/external_data_source/manager.cpp @@ -25,7 +25,7 @@ using TYqlConclusion = TConclusionImpl; TAsyncStatus ValidateExternalDatasourceSecrets(const NKikimrSchemeOp::TExternalDataSourceDescription& externalDataSourceDesc, const TExternalDataSourceManager::TInternalModificationContext& context) { const auto& externalData = context.GetExternalData(); const auto& userToken = externalData.GetUserToken(); - auto describeFuture = DescribeExternalDataSourceSecrets(externalDataSourceDesc.GetAuth(), userToken ? userToken->GetUserSID() : "", externalData.GetActorSystem()); + auto describeFuture = DescribeExternalDataSourceSecrets(externalDataSourceDesc.GetAuth(), userToken ? new NACLib::TUserToken(*userToken) : nullptr, externalData.GetActorSystem()); return describeFuture.Apply([](const NThreading::TFuture& f) { if (const auto& value = f.GetValue(); value.Status != Ydb::StatusIds::SUCCESS) { diff --git a/ydb/core/kqp/gateway/kqp_metadata_loader.cpp b/ydb/core/kqp/gateway/kqp_metadata_loader.cpp index da4511e1777c..170d1394397f 100644 --- a/ydb/core/kqp/gateway/kqp_metadata_loader.cpp +++ b/ydb/core/kqp/gateway/kqp_metadata_loader.cpp @@ -574,7 +574,7 @@ void UpdateExternalDataSourceSecretsValue(TTableMetadataResult& externalDataSour NThreading::TFuture LoadExternalDataSourceSecretValues(const NSchemeCache::TSchemeCacheNavigate::TEntry& entry, const TIntrusiveConstPtr& userToken, TActorSystem* actorSystem) { const auto& authDescription = entry.ExternalDataSourceInfo->Description.GetAuth(); - return DescribeExternalDataSourceSecrets(authDescription, userToken ? userToken->GetUserSID() : "", actorSystem); + return DescribeExternalDataSourceSecrets(authDescription, userToken, actorSystem); } } // anonymous namespace