Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 65 additions & 3 deletions ydb/core/kafka_proxy/actors/kafka_offset_commit_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,51 @@ void TKafkaOffsetCommitActor::Die(const TActorContext& ctx) {
void TKafkaOffsetCommitActor::Handle(NKikimr::NGRpcProxy::V1::TEvPQProxy::TEvCloseSession::TPtr& ev, const TActorContext& ctx) {
KAFKA_LOG_CRIT("Auth failed. reason# " << ev->Get()->Reason);
Error = ConvertErrorCode(ev->Get()->ErrorCode);
SendFailedForAllPartitions(Error, ctx);
if (Error == GROUP_ID_NOT_FOUND && Context->Config.GetAutoCreateConsumersEnable()) {
for (auto topicReq: Message->Topics) {
TString topicPath = NormalizePath(Context->DatabasePath, *topicReq.Name);
CreateConsumerGroupIfNecessary(*topicReq.Name, topicPath, *Message->GroupId);
}
if (PendingResponses == 0) { // case when AlterTopic requests have already sent and returned an unsuccessful response
SendFailedForAllPartitions(Error, ctx);
}
} else {
SendFailedForAllPartitions(Error, ctx);
}
}

void TKafkaOffsetCommitActor::CreateConsumerGroupIfNecessary(const TString& topicName,
const TString& topicPath,
const TString& groupId) {
TTopicGroupIdAndPath consumerTopicRequest = TTopicGroupIdAndPath{groupId, topicPath};
if (ConsumerTopicAlterRequestAttempts.find(consumerTopicRequest) == ConsumerTopicAlterRequestAttempts.end()) {
ConsumerTopicAlterRequestAttempts.insert(consumerTopicRequest);
} else {
// it is enough to send a consumer addition request only once for a particular topic
return;
}
PendingResponses++;

auto request = std::make_unique<Ydb::Topic::AlterTopicRequest>();
request.get()->set_path(topicPath);
auto* consumer = request->add_add_consumers();
consumer->set_name(groupId);
AlterTopicCookie++;
AlterTopicCookieToName[AlterTopicCookie] = topicName;
auto callback = [replyTo = SelfId(), cookie = AlterTopicCookie, path = topicName, this]
(Ydb::StatusIds::StatusCode statusCode, const google::protobuf::Message*) {
NYdb::NIssue::TIssues issues;
NYdb::TStatus status(static_cast<NYdb::EStatus>(statusCode), std::move(issues));
Send(replyTo,
new NKikimr::NReplication::TEvYdbProxy::TEvAlterTopicResponse(std::move(status)),
0,
cookie);
};
NKikimr::NGRpcService::DoAlterTopicRequest(
std::make_unique<NKikimr::NReplication::TLocalProxyRequest>(
topicName, Context->DatabasePath, std::move(request), callback, Context->UserToken),
NKikimr::NReplication::TLocalProxyActor(Context->DatabasePath));

}

void TKafkaOffsetCommitActor::SendFailedForAllPartitions(EKafkaErrors error, const TActorContext& ctx) {
Expand All @@ -42,6 +86,22 @@ void TKafkaOffsetCommitActor::SendFailedForAllPartitions(EKafkaErrors error, con
Die(ctx);
}

void TKafkaOffsetCommitActor::Handle(NKikimr::NReplication::TEvYdbProxy::TEvAlterTopicResponse::TPtr& ev, const TActorContext& ctx) {
NYdb::TStatus& result = ev->Get()->Result;
if (result.GetStatus() == NYdb::EStatus::SUCCESS) {
KAFKA_LOG_D("Handling TEvAlterTopicResponse. Status: " << result.GetStatus() << "\n");
} else {
KAFKA_LOG_I("Handling TEvAlterTopicResponse. Status: " << result.GetStatus() << "\n");
}
PendingResponses--;
if (result.GetStatus() != NYdb::EStatus::ALREADY_EXISTS && result.GetStatus() != NYdb::EStatus::SUCCESS) {
SendFailedForAllPartitions(Error, ctx);
} else if (PendingResponses == 0) {
SendAuthRequest(ctx);
return;
}
}

void TKafkaOffsetCommitActor::Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActorContext& ctx) {
TEvTabletPipe::TEvClientConnected *msg = ev->Get();

Expand Down Expand Up @@ -173,7 +233,7 @@ void TKafkaOffsetCommitActor::AddPartitionResponse(EKafkaErrors error, const TSt
}
}

void TKafkaOffsetCommitActor::Bootstrap(const NActors::TActorContext& ctx) {
void TKafkaOffsetCommitActor::SendAuthRequest(const NActors::TActorContext& ctx) {
THashSet<TString> topicsToResolve;
for (auto topicReq: Message->Topics) {
topicsToResolve.insert(NormalizePath(Context->DatabasePath, topicReq.Name.value()));
Expand All @@ -200,7 +260,9 @@ void TKafkaOffsetCommitActor::Bootstrap(const NActors::TActorContext& ctx) {
NKikimr::NMsgBusProxy::CreatePersQueueMetaCacheV2Id(), NKikimr::MakeSchemeCacheID(), nullptr, Context->UserToken, topicsToConverter,
topicHandler->GetLocalCluster(), false)
);

}
void TKafkaOffsetCommitActor::Bootstrap(const NActors::TActorContext& ctx) {
SendAuthRequest(ctx);
Become(&TKafkaOffsetCommitActor::StateWork);
}

Expand Down
18 changes: 17 additions & 1 deletion ydb/core/kafka_proxy/actors/kafka_offset_commit_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,14 @@

#include "ydb/core/base/tablet_pipe.h"
#include "ydb/core/grpc_services/local_rpc/local_rpc.h"
#include <ydb/core/grpc_services/service_scheme.h>
#include <ydb/core/grpc_services/service_topic.h>
#include "ydb/core/kafka_proxy/actors/actors.h"
#include <ydb/core/kafka_proxy/kafka_events.h>
#include <ydb/core/kafka_proxy/actors/kafka_topic_group_path_struct.h>
#include <ydb/core/tx/replication/ydb_proxy/ydb_proxy.h>
#include <ydb/core/tx/replication/ydb_proxy/local_proxy/local_proxy.h>
#include <ydb/core/tx/replication/ydb_proxy/local_proxy/local_proxy_request.h>
#include <ydb/core/persqueue/events/internal.h>
#include <ydb/library/aclib/aclib.h>
#include <ydb/library/actors/core/actor.h>
Expand Down Expand Up @@ -51,6 +58,7 @@ struct TRequestInfo {
HFunc(NKikimr::NGRpcProxy::V1::TEvPQProxy::TEvCloseSession, Handle);
HFunc(TEvTabletPipe::TEvClientConnected, Handle);
HFunc(TEvTabletPipe::TEvClientDestroyed, Handle);
HFunc(NKikimr::NReplication::TEvYdbProxy::TEvAlterTopicResponse, Handle);
}
}

Expand All @@ -59,7 +67,12 @@ struct TRequestInfo {
void Handle(NKikimr::NGRpcProxy::V1::TEvPQProxy::TEvCloseSession::TPtr& ev, const TActorContext& ctx);
void Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActorContext& ctx);
void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext& ctx);
void Handle(NKikimr::NReplication::TEvYdbProxy::TEvAlterTopicResponse::TPtr& ev, const TActorContext& ctx);

void SendAuthRequest(const NActors::TActorContext& ctx);
void CreateConsumerGroupIfNecessary(const TString& topicName,
const TString& topicPath,
const TString& groupId);
void AddPartitionResponse(EKafkaErrors error, const TString& topicName, ui64 partitionId, const TActorContext& ctx);
void ProcessPipeProblem(ui64 tabletId, const TActorContext& ctx);
void SendFailedForAllPartitions(EKafkaErrors error, const TActorContext& ctx);
Expand All @@ -70,13 +83,16 @@ struct TRequestInfo {
const TMessagePtr<TOffsetCommitRequestData> Message;
const TOffsetCommitResponseData::TPtr Response;

ui64 PendingResponses = 0;
ui64 NextCookie = 0;
ui32 AlterTopicCookie = 0;
ui64 PendingResponses = 0;
std::unordered_map<ui64, TVector<ui64>> TabletIdToCookies;
std::unordered_map<ui64, TRequestInfo> CookieToRequestInfo;
std::unordered_map<TString, ui64> ResponseTopicIds;
NKikimr::NGRpcProxy::TTopicInitInfoMap TopicAndTablets;
std::unordered_map<ui64, TActorId> TabletIdToPipe;
std::unordered_map<ui32, TString> AlterTopicCookieToName;
std::unordered_set<NKafka::TTopicGroupIdAndPath, NKafka::TTopicGroupIdAndPathHash> ConsumerTopicAlterRequestAttempts;
TActorId AuthInitActor;
EKafkaErrors Error = NONE_ERROR;

Expand Down
61 changes: 45 additions & 16 deletions ydb/core/kafka_proxy/actors/kafka_offset_fetch_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,16 @@ class TTopicOffsetActor: public NKikimr::NGRpcProxy::V1::TPQInternalSchemaActor<
const TActorId& requester,
std::shared_ptr<TSet<ui32>> partitions,
const TString& originalTopicName,
const TString& userSID)
const TString& userSID,
const TIntrusiveConstPtr<NACLib::TUserToken> userToken,
bool requireAuthentication = false)
: TBase(request, requester)
, TDescribeTopicActorImpl(ConsumerOffsetSettings(consumers, partitions))
, Requester(requester)
, OriginalTopicName(originalTopicName)
, UserSID(userSID)
, UserToken(userToken)
, RequireAuthentication(requireAuthentication)
{
};

Expand Down Expand Up @@ -148,7 +152,22 @@ class TTopicOffsetActor: public NKikimr::NGRpcProxy::V1::TPQInternalSchemaActor<
TActorBootstrapped::PassAway();
return;
}

auto path = CanonizePath(NKikimr::JoinPath(response.Path));
bool hasRights = true;
if (UserToken && UserToken->GetSerializedToken()) {
hasRights = response.SecurityObject->CheckAccess(NACLib::EAccessRights::SelectRow, *UserToken);
} else if (RequireAuthentication) {
hasRights = false;
}
if (!hasRights) {
RaiseError(
"unauthenticated access is forbidden",
Ydb::PersQueue::ErrorCode::ACCESS_DENIED,
Ydb::StatusIds::StatusCode::StatusIds_StatusCode_UNAUTHORIZED,
ActorContext()
);
return;
}
const auto& pqDescr = response.PQGroupInfo->Description;
ProcessTablets(pqDescr, ActorContext());
}
Expand All @@ -169,6 +188,8 @@ class TTopicOffsetActor: public NKikimr::NGRpcProxy::V1::TPQInternalSchemaActor<
const TActorId Requester;
const TString OriginalTopicName;
const TString UserSID;
const TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
bool RequireAuthentication;
std::shared_ptr<std::unordered_map<ui32, std::unordered_map<TString, TEvKafka::PartitionConsumerOffset>>> PartitionIdToOffsets = std::make_shared<std::unordered_map<ui32, std::unordered_map<TString, TEvKafka::PartitionConsumerOffset>>>();
};

Expand Down Expand Up @@ -224,7 +245,9 @@ void TKafkaOffsetFetchActor::Bootstrap(const NActors::TActorContext& ctx) {
SelfId(),
topicToEntities.second.Partitions,
topicToEntities.first,
GetUsernameOrAnonymous(Context)
GetUsernameOrAnonymous(Context),
Context->UserToken,
Context->RequireAuthentication
));
InflyTopics++;
}
Expand Down Expand Up @@ -252,8 +275,7 @@ void TKafkaOffsetFetchActor::Handle(TEvKafka::TEvCommitedOffsetsResponse::TPtr&
TString topicName = GetTopicNameWithoutDb(DatabasePath, *topicRequest.Name);
TString topicPath = NormalizePath(DatabasePath, topicName);
if (topicPartition.ErrorCode == EKafkaErrors::RESOURCE_NOT_FOUND &&
(Context->Config.GetAutoCreateConsumersEnable() ||
Context->Config.GetAutoCreateTopicsEnable())) {
Context->Config.GetAutoCreateConsumersEnable()) {
// consumer is not assigned to the topic case
TKafkaOffsetFetchActor::CreateConsumerGroupIfNecessary(topicName,
topicPath,
Expand Down Expand Up @@ -303,13 +325,19 @@ void TKafkaOffsetFetchActor::Handle(const TEvKafka::TEvResponse::TPtr& ev, const
SelfId(),
TopicToEntities[createdTopicName].Partitions,
createdTopicName,
GetUsernameOrAnonymous(Context)
GetUsernameOrAnonymous(Context),
Context->UserToken,
Context->RequireAuthentication
));
}

void TKafkaOffsetFetchActor::Handle(NKikimr::NReplication::TEvYdbProxy::TEvAlterTopicResponse::TPtr& ev, const TActorContext& ctx) {
NYdb::TStatus& result = ev->Get()->Result;
KAFKA_LOG_D("Handling TEvAlterTopicResponse. Status: " << result.GetStatus() << "\n");
if (result.GetStatus() == NYdb::EStatus::SUCCESS) {
KAFKA_LOG_D("Handling TEvAlterTopicResponse. Status: " << result.GetStatus() << "\n");
} else {
KAFKA_LOG_I("Handling TEvAlterTopicResponse. Status: " << result.GetStatus() << "\n");
}
if (result.GetStatus() != NYdb::EStatus::SUCCESS) {
InflyTopics--;
if (InflyTopics == 0) {
Expand All @@ -333,7 +361,9 @@ void TKafkaOffsetFetchActor::Handle(NKikimr::NReplication::TEvYdbProxy::TEvAlter
actorId,
TopicToEntities[alteredTopicName].Partitions,
alteredTopicName,
GetUsernameOrAnonymous(Context)
GetUsernameOrAnonymous(Context),
Context->UserToken,
Context->RequireAuthentication
));

}
Expand Down Expand Up @@ -391,7 +421,9 @@ void NKafka::TKafkaOffsetFetchActor::Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr
SelfId(),
topicToEntities.second.Partitions,
topicToEntities.first,
GetUsernameOrAnonymous(Context)
GetUsernameOrAnonymous(Context),
Context->UserToken,
Context->RequireAuthentication
));
InflyTopics++;
}
Expand Down Expand Up @@ -452,14 +484,10 @@ void TKafkaOffsetFetchActor::CreateConsumerGroupIfNecessary(const TString& topic
}
InflyTopics++;

auto topicSettings = NYdb::NTopic::TAlterTopicSettings();
topicSettings.BeginAddConsumer(groupId).EndAddConsumer();
auto request = std::make_unique<Ydb::Topic::AlterTopicRequest>();
request.get()->set_path(topicPath);
for (auto& c : topicSettings.AddConsumers_) {
auto* consumer = request.get()->add_add_consumers();
consumer->set_name(c.ConsumerName_);
}
auto* consumer = request->add_add_consumers();
consumer->set_name(groupId);
AlterTopicCookie++;
AlterTopicCookieToName[AlterTopicCookie] = originalTopicName;
auto callback = [replyTo = SelfId(), cookie = AlterTopicCookie, path = topicName, this]
Expand All @@ -473,7 +501,7 @@ void TKafkaOffsetFetchActor::CreateConsumerGroupIfNecessary(const TString& topic
};
NKikimr::NGRpcService::DoAlterTopicRequest(
std::make_unique<NKikimr::NReplication::TLocalProxyRequest>(
topicName, DatabasePath, std::move(request), callback),
topicName, DatabasePath, std::move(request), callback, Context->UserToken),
NKikimr::NReplication::TLocalProxyActor(DatabasePath));

}
Expand All @@ -499,6 +527,7 @@ void TKafkaOffsetFetchActor::CreateTopicIfNecessary(const TString& topicName,
ContextForTopicCreation->UserToken = Context->UserToken;
ContextForTopicCreation->DatabasePath = Context->DatabasePath;
ContextForTopicCreation->ResourceDatabasePath = Context->ResourceDatabasePath;
ContextForTopicCreation->RequireAuthentication = Context->RequireAuthentication;

TActorId actorId = ctx.Register(new TKafkaCreateTopicsActor(ContextForTopicCreation,
1,
Expand Down
15 changes: 3 additions & 12 deletions ydb/core/kafka_proxy/actors/kafka_offset_fetch_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
#include "../kqp_helper.h"

#include <ydb/library/actors/core/actor_bootstrapped.h>
#include <ydb/core/kafka_proxy/actors/kafka_topic_group_path_struct.h>
#include <ydb/core/kafka_proxy/actors/control_plane_common.h>
#include <ydb/core/tx/replication/ydb_proxy/ydb_proxy.h>
#include <ydb/core/tx/replication/ydb_proxy/local_proxy/local_proxy.h>
#include <ydb/core/tx/replication/ydb_proxy/local_proxy/local_proxy_request.h>
Expand Down Expand Up @@ -45,17 +47,6 @@ struct TTopicGroupRequest {
TString GroupId;
};

struct TTopicGroupIdAndPath {
TString GroupId;
TString TopicPath;

bool operator==(const TTopicGroupIdAndPath& topicGroupIdAndPath) const {
return GroupId == topicGroupIdAndPath.GroupId && TopicPath == topicGroupIdAndPath.TopicPath;
}
};

struct TStructHash { size_t operator()(const TTopicGroupIdAndPath& alterTopicRequest) const { return CombineHashes(std::hash<TString>()(alterTopicRequest.GroupId), std::hash<TString>()(alterTopicRequest.TopicPath)); } };


class TKafkaOffsetFetchActor: public NActors::TActorBootstrapped<TKafkaOffsetFetchActor> {

Expand Down Expand Up @@ -122,7 +113,7 @@ class TKafkaOffsetFetchActor: public NActors::TActorBootstrapped<TKafkaOffsetFet
std::unordered_map<ui32, TString> AlterTopicCookieToName;
std::unordered_map<TString, std::vector<TTopicGroupRequest>> GroupRequests;
std::unordered_map<TActorId, TString> CreateTopicActorIdToName;
std::unordered_set<TTopicGroupIdAndPath, TStructHash> ConsumerTopicAlterRequestAttempts;
std::unordered_set<NKafka::TTopicGroupIdAndPath, NKafka::TTopicGroupIdAndPathHash> ConsumerTopicAlterRequestAttempts;
std::unordered_set<TString> TopicCreateRequestAttempts;
std::unordered_set<TActorId> DependantActors;
std::unique_ptr<NKafka::TKqpTxHelper> Kqp;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#include "kafka_topic_group_path_struct.h"

namespace NKafka {
struct TTopicGroupIdAndPath;
struct TTopicGroupIdAndPathHash;
}
18 changes: 18 additions & 0 deletions ydb/core/kafka_proxy/actors/kafka_topic_group_path_struct.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#pragma once

#include <util/generic/cast.h>
#include <util/digest/numeric.h>

namespace NKafka {
struct TTopicGroupIdAndPath {
TString GroupId;
TString TopicPath;

bool operator==(const TTopicGroupIdAndPath& topicGroupIdAndPath) const {
return GroupId == topicGroupIdAndPath.GroupId && TopicPath == topicGroupIdAndPath.TopicPath;
}
};

struct TTopicGroupIdAndPathHash { size_t operator()(const TTopicGroupIdAndPath& alterTopicRequest) const { return CombineHashes(std::hash<TString>()(alterTopicRequest.GroupId), std::hash<TString>()(alterTopicRequest.TopicPath)); } };

}
Loading
Loading