Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 47 additions & 14 deletions ydb/core/kqp/federated_query/kqp_federated_query_actors.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "kqp_federated_query_actors.h"

#include <ydb/core/kqp/common/simple/services.h>
#include <ydb/core/tx/scheme_board/subscriber.h>
#include <ydb/services/metadata/secret/fetcher.h>
#include <ydb/services/metadata/secret/snapshot.h>
#include <ydb/library/actors/core/log.h>
Expand Down Expand Up @@ -173,6 +174,10 @@ void TDescribeSchemaSecretsService::HandleSchemeShardResponse(NSchemeShard::TEvS
return;
}

if (const auto it = SchemeBoardSubscribers.find(secretName); it == SchemeBoardSubscribers.end()) {
SchemeBoardSubscribers[secretName] = Register(CreateSchemeBoardSubscriber(SelfId(), secretName));
}

const auto& secretValue = rec.GetPathDescription().GetSecretDescription().GetValue();
const auto& secretVersion = rec.GetPathDescription().GetSecretDescription().GetVersion();
VersionedSecrets[secretName] = TVersionedSecret{
Expand All @@ -181,6 +186,7 @@ void TDescribeSchemaSecretsService::HandleSchemeShardResponse(NSchemeShard::TEvS
.Name = secretName,
.Value = secretValue,
};

++respIt->second.FilledSecretsCnt;

FillResponseIfFinished(ev->Cookie, respIt->second);
Expand All @@ -206,19 +212,19 @@ void TDescribeSchemaSecretsService::SaveIncomingRequestInfo(const TEvResolveSecr
ResolveInFlight[LastCookie] = std::move(ctx);
}

void TDescribeSchemaSecretsService::SendSchemeCacheRequests(const TVector<TString>& secretNames, const NACLib::TUserToken& userToken) {
void TDescribeSchemaSecretsService::SendSchemeCacheRequests(const TVector<TString>& secretNames, const TIntrusiveConstPtr<NACLib::TUserToken> userToken) {
TAutoPtr<NSchemeCache::TSchemeCacheNavigate> request(new NSchemeCache::TSchemeCacheNavigate());
for (const auto& secretName : secretNames) {
NSchemeCache::TSchemeCacheNavigate::TEntry entry;
entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpPath;
entry.Path = SplitPath(secretName);
if (userToken.GetUserSID()) {
if (userToken && userToken->GetUserSID()) {
entry.Access = NACLib::SelectRow;
}
request->ResultSet.emplace_back(entry);
}
if (userToken.GetUserSID()) {
request->UserToken = new NACLib::TUserToken(userToken);
if (userToken && userToken->GetUserSID()) {
request->UserToken = userToken;
}

Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(request), 0, LastCookie++);
Expand Down Expand Up @@ -262,16 +268,39 @@ void TDescribeSchemaSecretsService::FillResponseIfFinished(const ui64& requestId
std::vector<TString> secretValues;
secretValues.resize(responseCtx.Secrets.size());
for (const auto& secret : responseCtx.Secrets) {
const auto& secretPath = secret.first;
auto it = VersionedSecrets.find(secret.first);
Y_ENSURE(it != VersionedSecrets.end(), "Secrets values were not retrieved for response");
if (it == VersionedSecrets.end()) {
LOG_N("FillResponseIfFinished: request cookie=" << requestId << ", secret `" << secretPath << "` was dropped during request");
FillResponse(requestId, TEvDescribeSecretsResponse::TDescription(Ydb::StatusIds::BAD_REQUEST, { NYql::TIssue("secret `" + secretPath + "` not found") }));
return;
}

Y_ENSURE(secret.second < secretValues.size());
secretValues[secret.second] = it->second.Value;
}
FillResponse(requestId, TEvDescribeSecretsResponse::TDescription(secretValues));
}

NThreading::TFuture<TEvDescribeSecretsResponse::TDescription> DescribeSecret(const TVector<TString>& secretNames, const TString& ownerUserId, TActorSystem* actorSystem) {
void TDescribeSchemaSecretsService::HandleNotifyUpdate(TSchemeBoardEvents::TEvNotifyUpdate::TPtr& ev) {
Y_UNUSED(ev);
}

void TDescribeSchemaSecretsService::HandleNotifyDelete(TSchemeBoardEvents::TEvNotifyDelete::TPtr& ev) {
const TString& secretName = CanonizePath(ev->Get()->Path);
VersionedSecrets.erase(secretName);

auto subscriberIt = SchemeBoardSubscribers.find(secretName);
Y_ENSURE(subscriberIt != SchemeBoardSubscribers.end());
Send(subscriberIt->second, new TEvents::TEvPoisonPill());
SchemeBoardSubscribers.erase(subscriberIt);
}

NThreading::TFuture<TEvDescribeSecretsResponse::TDescription> DescribeSecret(
const TVector<TString>& secretNames,
const TIntrusiveConstPtr<NACLib::TUserToken> userToken,
TActorSystem* actorSystem
) {
auto promise = NThreading::NewPromise<TEvDescribeSecretsResponse::TDescription>();
if (actorSystem->AppData<TAppData>()->FeatureFlags.GetEnableSchemaSecrets()) {
bool schemaSecrets = false;
Expand All @@ -284,12 +313,12 @@ NThreading::TFuture<TEvDescribeSecretsResponse::TDescription> DescribeSecret(con
if (schemaSecrets) {
actorSystem->Send(
MakeKqpDescribeSchemaSecretServiceId(actorSystem->NodeId),
new TDescribeSchemaSecretsService::TEvResolveSecret(ownerUserId, secretNames, promise));
new TDescribeSchemaSecretsService::TEvResolveSecret(userToken, secretNames, promise));
return promise.GetFuture();
}
}

actorSystem->Register(CreateDescribeSecretsActor(ownerUserId, secretNames, promise));
actorSystem->Register(CreateDescribeSecretsActor(userToken ? userToken->GetUserSID() : "", secretNames, promise));
return promise.GetFuture();
}

Expand All @@ -306,36 +335,40 @@ void RegisterDescribeSecretsActor(const NActors::TActorId& replyActorId, const T
});
}

NThreading::TFuture<TEvDescribeSecretsResponse::TDescription> DescribeExternalDataSourceSecrets(const NKikimrSchemeOp::TAuth& authDescription, const TString& ownerUserId, TActorSystem* actorSystem) {
NThreading::TFuture<TEvDescribeSecretsResponse::TDescription> DescribeExternalDataSourceSecrets(
const NKikimrSchemeOp::TAuth& authDescription,
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken,
TActorSystem* actorSystem
) {
switch (authDescription.identity_case()) {
case NKikimrSchemeOp::TAuth::kServiceAccount: {
const TString& saSecretId = authDescription.GetServiceAccount().GetSecretName();
return DescribeSecret({saSecretId}, ownerUserId, actorSystem);
return DescribeSecret({saSecretId}, userToken, actorSystem);
}

case NKikimrSchemeOp::TAuth::kNone:
return NThreading::MakeFuture(TEvDescribeSecretsResponse::TDescription({}));

case NKikimrSchemeOp::TAuth::kBasic: {
const TString& passwordSecretId = authDescription.GetBasic().GetPasswordSecretName();
return DescribeSecret({passwordSecretId}, ownerUserId, actorSystem);
return DescribeSecret({passwordSecretId}, userToken, actorSystem);
}

case NKikimrSchemeOp::TAuth::kMdbBasic: {
const TString& saSecretId = authDescription.GetMdbBasic().GetServiceAccountSecretName();
const TString& passwordSecreId = authDescription.GetMdbBasic().GetPasswordSecretName();
return DescribeSecret({saSecretId, passwordSecreId}, ownerUserId, actorSystem);
return DescribeSecret({saSecretId, passwordSecreId}, userToken, actorSystem);
}

case NKikimrSchemeOp::TAuth::kAws: {
const TString& awsAccessKeyIdSecretId = authDescription.GetAws().GetAwsAccessKeyIdSecretName();
const TString& awsAccessKeyKeySecretId = authDescription.GetAws().GetAwsSecretAccessKeySecretName();
return DescribeSecret({awsAccessKeyIdSecretId, awsAccessKeyKeySecretId}, ownerUserId, actorSystem);
return DescribeSecret({awsAccessKeyIdSecretId, awsAccessKeyKeySecretId}, userToken, actorSystem);
}

case NKikimrSchemeOp::TAuth::kToken: {
const TString& tokenSecretId = authDescription.GetToken().GetTokenSecretName();
return DescribeSecret({tokenSecretId}, ownerUserId, actorSystem);
return DescribeSecret({tokenSecretId}, userToken, actorSystem);
}

case NKikimrSchemeOp::TAuth::IDENTITY_NOT_SET:
Expand Down
29 changes: 20 additions & 9 deletions ydb/core/kqp/federated_query/kqp_federated_query_actors.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@

#include <ydb/core/kqp/common/events/script_executions.h>
#include <ydb/core/protos/flat_scheme_op.pb.h>
#include <ydb/core/tx/scheme_cache/scheme_cache.h>
#include <ydb/core/tx/tx_proxy/proxy.h>
#include <ydb/core/tx/schemeshard/schemeshard.h>
#include <ydb/core/tx/scheme_board/events.h>

#include <ydb/library/actors/core/actor.h>
#include <ydb/library/actors/core/actor_bootstrapped.h>
#include <ydb/library/aclib/aclib.h>
#include <library/cpp/threading/future/future.h>

#include <ydb/core/tx/scheme_cache/scheme_cache.h>
#include <ydb/core/tx/tx_proxy/proxy.h>
#include <ydb/core/tx/schemeshard/schemeshard.h>
#include <library/cpp/threading/future/future.h>

namespace NKikimr::NKqp {

Expand All @@ -24,18 +25,18 @@ class TDescribeSchemaSecretsService: public NActors::TActorBootstrapped<TDescrib
struct TEvResolveSecret : public NActors::TEventLocal<TEvResolveSecret, EvResolveSecret> {
public:
TEvResolveSecret(
const TString& ownerUserId,
const TIntrusiveConstPtr<NACLib::TUserToken> userToken,
const TVector<TString>& secretNames,
NThreading::TPromise<TEvDescribeSecretsResponse::TDescription> promise
)
: UserToken(NACLib::TUserToken{ownerUserId, TVector<NACLib::TSID>{}})
: UserToken(userToken)
, SecretNames(secretNames)
, Promise(promise)
{
}

public:
const NACLib::TUserToken UserToken;
const TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
const TVector<TString> SecretNames;
NThreading::TPromise<TEvDescribeSecretsResponse::TDescription> Promise;
};
Expand All @@ -60,15 +61,20 @@ class TDescribeSchemaSecretsService: public NActors::TActorBootstrapped<TDescrib
hFunc(TEvResolveSecret, HandleIncomingRequest);
hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, HandleSchemeCacheResponse);
hFunc(NSchemeShard::TEvSchemeShard::TEvDescribeSchemeResult, HandleSchemeShardResponse);
hFunc(TSchemeBoardEvents::TEvNotifyDelete, HandleNotifyDelete);
hFunc(TSchemeBoardEvents::TEvNotifyUpdate, HandleNotifyUpdate);
cFunc(NActors::TEvents::TEvPoison::EventType, PassAway);
)

void HandleIncomingRequest(TEvResolveSecret::TPtr& ev);
void HandleSchemeCacheResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev);
void HandleSchemeShardResponse(NSchemeShard::TEvSchemeShard::TEvDescribeSchemeResult::TPtr& ev);
void HandleNotifyDelete(TSchemeBoardEvents::TEvNotifyDelete::TPtr& ev);
void HandleNotifyUpdate(TSchemeBoardEvents::TEvNotifyUpdate::TPtr& ev);

void FillResponse(const ui64& requestId, const TEvDescribeSecretsResponse::TDescription& response);
void SaveIncomingRequestInfo(const TEvResolveSecret& req);
void SendSchemeCacheRequests(const TVector<TString>& secretNames, const NACLib::TUserToken& userToken);
void SendSchemeCacheRequests(const TVector<TString>& secretNames, const TIntrusiveConstPtr<NACLib::TUserToken> userToken);
bool LocalCacheHasActualVersion(const TVersionedSecret& secret, const ui64& cacheSecretVersion);
bool LocalCacheHasActualObject(const TVersionedSecret& secret, const ui64& cacheSecretPathId);
bool HandleSchemeCacheErrorsIfAny(const ui64& requestId, NSchemeCache::TSchemeCacheNavigate& result);
Expand All @@ -83,13 +89,18 @@ class TDescribeSchemaSecretsService: public NActors::TActorBootstrapped<TDescrib
ui64 LastCookie = 0;
THashMap<ui64, TResponseContext> ResolveInFlight;
THashMap<TString, TVersionedSecret> VersionedSecrets;
THashMap<TString, TActorId> SchemeBoardSubscribers;
};

IActor* CreateDescribeSecretsActor(const TString& ownerUserId, const std::vector<TString>& secretIds, NThreading::TPromise<TEvDescribeSecretsResponse::TDescription> promise);

void RegisterDescribeSecretsActor(const TActorId& replyActorId, const TString& ownerUserId, const std::vector<TString>& secretIds, TActorSystem* actorSystem);

NThreading::TFuture<TEvDescribeSecretsResponse::TDescription> DescribeExternalDataSourceSecrets(const NKikimrSchemeOp::TAuth& authDescription, const TString& ownerUserId, TActorSystem* actorSystem);
NThreading::TFuture<TEvDescribeSecretsResponse::TDescription> DescribeExternalDataSourceSecrets(
const NKikimrSchemeOp::TAuth& authDescription,
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken,
TActorSystem* actorSystem
);

IActor* CreateDescribeSchemaSecretsService();

Expand Down
67 changes: 57 additions & 10 deletions ydb/core/kqp/federated_query/kqp_federated_query_actors_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,17 @@ namespace {
}

NThreading::TPromise<NKikimr::NKqp::TEvDescribeSecretsResponse::TDescription>
ResolveSecret(const TVector<TString>& secretNames, NKikimr::NKqp::TKikimrRunner& kikimr, const TString& userId = "") {
ResolveSecret(const TVector<TString>& secretNames, NKikimr::NKqp::TKikimrRunner& kikimr, const TIntrusiveConstPtr<NACLib::TUserToken> userToken = nullptr) {
auto promise = NThreading::NewPromise<NKikimr::NKqp::TEvDescribeSecretsResponse::TDescription>();
const auto evResolveSecret = new NKikimr::NKqp::TDescribeSchemaSecretsService::TEvResolveSecret(userId, secretNames, promise);
const auto evResolveSecret = new NKikimr::NKqp::TDescribeSchemaSecretsService::TEvResolveSecret(userToken, secretNames, promise);
auto actorSystem = kikimr.GetTestServer().GetRuntime()->GetActorSystem(0);
actorSystem->Send(NKikimr::NKqp::MakeKqpDescribeSchemaSecretServiceId(actorSystem->NodeId), evResolveSecret);
return promise;
}

NThreading::TPromise<NKikimr::NKqp::TEvDescribeSecretsResponse::TDescription>
ResolveSecret(const TString& secretName, NKikimr::NKqp::TKikimrRunner& kikimr, const TString& userId = "") {
return ResolveSecret(TVector<TString>{secretName}, kikimr, userId);
ResolveSecret(const TString& secretName, NKikimr::NKqp::TKikimrRunner& kikimr, const TIntrusiveConstPtr<NACLib::TUserToken> userToken = nullptr) {
return ResolveSecret(TVector<TString>{secretName}, kikimr, userToken);
}

void AssertBadRequest(NThreading::TPromise<NKikimr::NKqp::TEvDescribeSecretsResponse::TDescription> promise, const TString& err) {
Expand Down Expand Up @@ -168,11 +168,11 @@ Y_UNIT_TEST_SUITE(DescribeSchemaSecretsService) {

CreateSchemaSecret(secretName, secretValue, adminSession);

auto promise = ResolveSecret(secretName, kikimr, "root@builtin");
auto promise = ResolveSecret(secretName, kikimr, new NACLib::TUserToken("root@builtin", {}));
UNIT_ASSERT_VALUES_EQUAL(secretValue, promise.GetFuture().GetValueSync().SecretValues[0]);

{ // assert no grants by default
auto promise = ResolveSecret("/Root/secret-name", kikimr, "user@builtin");
auto promise = ResolveSecret("/Root/secret-name", kikimr, new NACLib::TUserToken("user@builtin", {}));
AssertBadRequest(promise, "<main>: Error: secret `/Root/secret-name` not found\n");
}

Expand All @@ -183,7 +183,7 @@ Y_UNIT_TEST_SUITE(DescribeSchemaSecretsService) {
UNIT_ASSERT_C(grantResult.GetStatus() == NYdb::EStatus::SUCCESS, grantResult.GetIssues().ToString());

{ // assert grants are ok
auto promise = ResolveSecret("/Root/secret-name", kikimr, "user@builtin");
auto promise = ResolveSecret("/Root/secret-name", kikimr, new NACLib::TUserToken("user@builtin", {}));
UNIT_ASSERT_VALUES_EQUAL(secretValue, promise.GetFuture().GetValueSync().SecretValues[0]);
}

Expand All @@ -194,7 +194,54 @@ Y_UNIT_TEST_SUITE(DescribeSchemaSecretsService) {
UNIT_ASSERT_C(revokeResult.GetStatus() == NYdb::EStatus::SUCCESS, grantResult.GetIssues().ToString());

{ // assert no grants after revoking
auto promise = ResolveSecret("/Root/secret-name", kikimr, "user@builtin");
auto promise = ResolveSecret("/Root/secret-name", kikimr, new NACLib::TUserToken("user@builtin", {}));
AssertBadRequest(promise, "<main>: Error: secret `/Root/secret-name` not found\n");
}
}

Y_UNIT_TEST(GroupGrants) {
NKikimr::NKqp::TKikimrRunner kikimr;
kikimr.GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableSchemaSecrets(true);

const TString secretName = "/Root/secret-name";
const TString secretValue = "secret-value";
auto adminSession = kikimr.GetTableClient(NYdb::NTable::TClientSettings().AuthToken("root@builtin"))
.CreateSession().GetValueSync().GetSession();

CreateSchemaSecret(secretName, secretValue, adminSession);

auto promise = ResolveSecret(secretName, kikimr, new NACLib::TUserToken("root@builtin", {}));
UNIT_ASSERT_VALUES_EQUAL(secretValue, promise.GetFuture().GetValueSync().SecretValues[0]);

const TIntrusiveConstPtr<NACLib::TUserToken> userToken = new NACLib::TUserToken("user@builtin", {"group"});
{ // assert no grants by default
auto promise = ResolveSecret("/Root/secret-name", kikimr, userToken);
AssertBadRequest(promise, "<main>: Error: secret `/Root/secret-name` not found\n");
}

const auto createGroupResult = adminSession.ExecuteSchemeQuery(
Sprintf("CREATE GROUP `group` WITH USER `user@builtin`;")
).GetValueSync();
UNIT_ASSERT_C(createGroupResult.GetStatus() == NYdb::EStatus::SUCCESS, createGroupResult.GetIssues().ToString());

const auto grantResult = adminSession.ExecuteSchemeQuery(
Sprintf("GRANT 'ydb.granular.select_row' ON `%s` TO `%s`;", secretName.data(), "group")
).GetValueSync();
UNIT_ASSERT_C(grantResult.GetStatus() == NYdb::EStatus::SUCCESS, grantResult.GetIssues().ToString());

{ // assert group grants are ok
auto promise = ResolveSecret("/Root/secret-name", kikimr, userToken);
UNIT_ASSERT_VALUES_EQUAL(secretValue, promise.GetFuture().GetValueSync().SecretValues[0]);
}

// revoke grants
const auto revokeResult = adminSession.ExecuteSchemeQuery(
Sprintf("REVOKE 'ydb.granular.select_row' ON `%s` FROM `%s`;", secretName.data(), "group")
).GetValueSync();
UNIT_ASSERT_C(revokeResult.GetStatus() == NYdb::EStatus::SUCCESS, grantResult.GetIssues().ToString());

{ // assert no grants after revoking
auto promise = ResolveSecret("/Root/secret-name", kikimr, userToken);
AssertBadRequest(promise, "<main>: Error: secret `/Root/secret-name` not found\n");
}
}
Expand Down Expand Up @@ -299,7 +346,7 @@ Y_UNIT_TEST_SUITE(DescribeSchemaSecretsService) {
UNIT_ASSERT_C(grantResult.GetStatus() == NYdb::EStatus::SUCCESS, grantResult.GetIssues().ToString());

{ // user has grants for names[0], has no grants for names[1]
auto promise = ResolveSecret({names[0], names[1]}, kikimr, "user@builtin");
auto promise = ResolveSecret({names[0], names[1]}, kikimr, new NACLib::TUserToken("user@builtin", {}));
AssertBadRequest(promise, "<main>: Error: secret `/Root/secret-name-1` not found\n");
}

Expand All @@ -309,7 +356,7 @@ Y_UNIT_TEST_SUITE(DescribeSchemaSecretsService) {
UNIT_ASSERT_C(grantResult.GetStatus() == NYdb::EStatus::SUCCESS, grantResult.GetIssues().ToString());

{ // user has grants for all names[0]
auto promise = ResolveSecret({names[0], names[1]}, kikimr, "user@builtin");
auto promise = ResolveSecret({names[0], names[1]}, kikimr, new NACLib::TUserToken("user@builtin", {}));
for (size_t i = 0; i < values.size(); ++i) {
UNIT_ASSERT_VALUES_EQUAL(values[i], promise.GetFuture().GetValueSync().SecretValues[i]);
}
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/kqp/federated_query/ut_service/ya.make
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
UNITTEST_FOR(ydb/core/kqp/federated_query)

SIZE(MEDIUM)

PEERDIR(
ydb/core/kqp/federated_query
ydb/core/kqp/ut/common
Expand Down
Loading
Loading