Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 15 additions & 12 deletions ydb/core/fq/libs/private_client/private_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,22 +28,20 @@ class TPrivateClient::TImpl : public TClientImplCommon<TPrivateClient::TImpl> {
{
#ifndef YDB_GRPC_BYPASS_CHANNEL_POOL
auto weakConnections = std::weak_ptr<TGRpcConnectionsImpl>(Connections_);
std::weak_ptr<TPrivateClient::TImpl> self = weak_from_this();
auto channelPoolUpdateWrapper = [weakConnections, self]
auto channelPoolUpdateWrapper = [weakConnections, knownEndpoints = KnownEndpoints]
(NYdb::NIssue::TIssues&&, EStatus status) mutable {
if (status != EStatus::SUCCESS) {
return false;
}
auto ptr = self.lock();
auto connections = weakConnections.lock();
if (!ptr || !connections) {
if (!connections) {
return false;
}
}
std::vector<std::string> endpoints;
{
std::lock_guard lock(ptr->KnownEndpointsLock);
endpoints.assign(ptr->KnownEndpoints.begin(), ptr->KnownEndpoints.end());
ptr->KnownEndpoints.clear();
std::lock_guard lock(knownEndpoints->Lock);
endpoints.assign(knownEndpoints->Endpoints.begin(), knownEndpoints->Endpoints.end());
knownEndpoints->Endpoints.clear();
}
connections->DeleteChannels(endpoints);
return true;
Expand Down Expand Up @@ -221,11 +219,17 @@ class TPrivateClient::TImpl : public TClientImplCommon<TPrivateClient::TImpl> {
private:
void UpdateKnownEndpoints([[maybe_unused]] const std::string& endpoint) {
#ifndef YDB_GRPC_BYPASS_CHANNEL_POOL
std::lock_guard lock(KnownEndpointsLock);
KnownEndpoints.emplace(endpoint);
std::lock_guard lock(KnownEndpoints->Lock);
KnownEndpoints->Endpoints.emplace(endpoint);
#endif
}

private:
struct TKnownEndpoints {
std::unordered_set<std::string> Endpoints;
std::mutex Lock;
};

private:
const NMonitoring::TDynamicCounterPtr Counters;
const NMonitoring::THistogramPtr PingTaskTime;
Expand All @@ -234,8 +238,7 @@ class TPrivateClient::TImpl : public TClientImplCommon<TPrivateClient::TImpl> {
const NMonitoring::THistogramPtr NodesHealthCheckTime;
const NMonitoring::THistogramPtr CreateRateLimiterResourceTime;
const NMonitoring::THistogramPtr DeleteRateLimiterResourceTime;
std::unordered_set<std::string> KnownEndpoints;
std::mutex KnownEndpointsLock;
std::shared_ptr<TKnownEndpoints> KnownEndpoints = std::make_shared<TKnownEndpoints>();
};

TPrivateClient::TPrivateClient(
Expand Down
Loading