From cd9abc7f9c38f7703d6b98d9ef5460c00a245f5f Mon Sep 17 00:00:00 2001 From: Nikolay Shestakov Date: Fri, 12 Dec 2025 11:27:08 +0000 Subject: [PATCH 01/16] WIP --- ydb/core/persqueue/pqrb/read_balancer.cpp | 178 ++---------- ydb/core/persqueue/pqrb/read_balancer.h | 22 +- .../persqueue/pqrb/read_balancer__metrics.cpp | 254 ++++++++++++++++++ .../persqueue/pqrb/read_balancer__metrics.h | 56 ++++ .../persqueue/pqrb/read_balancer__txinit.h | 3 +- ydb/core/persqueue/pqrb/ya.make | 1 + ydb/core/persqueue/public/config.h | 2 + 7 files changed, 347 insertions(+), 169 deletions(-) create mode 100644 ydb/core/persqueue/pqrb/read_balancer__metrics.cpp create mode 100644 ydb/core/persqueue/pqrb/read_balancer__metrics.h diff --git a/ydb/core/persqueue/pqrb/read_balancer.cpp b/ydb/core/persqueue/pqrb/read_balancer.cpp index f9e9d41ed6c9..4deac4251f28 100644 --- a/ydb/core/persqueue/pqrb/read_balancer.cpp +++ b/ydb/core/persqueue/pqrb/read_balancer.cpp @@ -1,15 +1,17 @@ #include "read_balancer.h" #include "read_balancer__balancing.h" +#include "read_balancer__metrics.h" #include "read_balancer__mlp_balancing.h" #include "read_balancer__txpreinit.h" #include "read_balancer__txwrite.h" #include "read_balancer_log.h" #include "mirror_describer_factory.h" +#include #include #include -#include #include + #include #include #include @@ -47,6 +49,7 @@ TPersQueueReadBalancer::TPersQueueReadBalancer(const TActorId &tablet, TTabletSt , StartPartitionIdForWrite(0) , TotalGroups(0) , ResourceMetrics(nullptr) + , TopicMetrics(std::make_unique()) , StatsReportRound(0) { Balancer = std::make_unique(*this); @@ -284,7 +287,7 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvUpdateBalancerConfig::TPtr if (SplitMergeEnabled(TabletConfig)) { if (!PartitionsScaleManager) { - PartitionsScaleManager = std::make_unique(Topic, Path, DatabasePath, PathId, Version, TabletConfig, PartitionGraph); + PartitionsScaleManager = std::make_unique(Topic, Path, DatabaseInfo.DatabasePath, PathId, Version, TabletConfig, PartitionGraph); } else { PartitionsScaleManager->UpdateBalancerConfig(PathId, Version, TabletConfig); } @@ -457,7 +460,7 @@ void TPersQueueReadBalancer::RequestTabletIfNeeded(const ui64 tabletId, const TA void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvStatusResponse::TPtr& ev, const TActorContext& ctx) { - const auto& record = ev->Get()->Record; + auto& record = ev->Get()->Record; ui64 tabletId = record.GetTabletId(); ui64 cookie = ev->Cookie; @@ -467,7 +470,7 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvStatusResponse::TPtr& ev, c AggregatedStats.Cookies.erase(tabletId); - for (const auto& partRes : record.GetPartResult()) { + for (auto& partRes : *record.MutablePartResult()) { ui32 partitionId = partRes.GetPartition(); if (!PartitionsInfo.contains(partitionId)) { continue; @@ -483,6 +486,9 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvStatusResponse::TPtr& ev, c ); } + TopicMetrics->Handle(std::move(partRes)); + + // TODO delete AggregatedStats.AggrStats(partitionId, partRes.GetPartitionSize(), partRes.GetUsedReserveSize()); AggregatedStats.AggrStats(partRes.GetAvgWriteSpeedPerSec(), partRes.GetAvgWriteSpeedPerMin(), partRes.GetAvgWriteSpeedPerHour(), partRes.GetAvgWriteSpeedPerDay()); @@ -568,161 +574,19 @@ void TPersQueueReadBalancer::CheckStat(const TActorContext& ctx) { } void TPersQueueReadBalancer::InitCounters(const TActorContext& ctx) { - if (!DatabasePath) { - return; - } - - if (DynamicCounters) { + if (!DatabaseInfo.DatabasePath.empty()) { return; } - TStringBuf name = TStringBuf(Path); - name.SkipPrefix(DatabasePath); - name.SkipPrefix("/"); - - bool isServerless = AppData(ctx)->FeatureFlags.GetEnableDbCounters(); //TODO: find out it via describe - DynamicCounters = AppData(ctx)->Counters->GetSubgroup("counters", isServerless ? "topics_serverless" : "topics") - ->GetSubgroup("host", "") - ->GetSubgroup("database", DatabasePath) - ->GetSubgroup("cloud_id", CloudId) - ->GetSubgroup("folder_id", FolderId) - ->GetSubgroup("database_id", DatabaseId) - ->GetSubgroup("topic", TString(name)); - - ActivePartitionCountCounter = DynamicCounters->GetExpiringNamedCounter("name", "topic.partition.active_count", false); - InactivePartitionCountCounter = DynamicCounters->GetExpiringNamedCounter("name", "topic.partition.inactive_count", false); + TopicMetrics->Initialize(TabletConfig, DatabaseInfo, Path, ctx); } void TPersQueueReadBalancer::UpdateConfigCounters() { - if (!DynamicCounters) { - return; - } - - size_t inactiveCount = std::count_if(TabletConfig.GetAllPartitions().begin(), TabletConfig.GetAllPartitions().end(), [](auto& p) { - return p.GetStatus() == NKikimrPQ::ETopicPartitionStatus::Inactive; - }); - - ActivePartitionCountCounter->Set(PartitionsInfo.size() - inactiveCount); - InactivePartitionCountCounter->Set(inactiveCount); + TopicMetrics->UpdateConfig(TabletConfig, DatabaseInfo, Path, ActorContext()); } -void TPersQueueReadBalancer::UpdateCounters(const TActorContext& ctx) { - if (!AggregatedStats.Stats.size()) { - return; - } - - if (!DynamicCounters) { - return; - } - - auto ensureCounters = [&](auto& counters, auto& config, const std::vector>& subgroups = {}, bool skipPrefix = true) { - auto group = DynamicCounters; - if (counters.empty()) { - for (const auto& subgroup : subgroups) { - group = group->GetSubgroup(subgroup.first, subgroup.second); - } - - for (size_t i = 0; i < config->GetCounters().Size(); ++i) { - TString name = config->GetNames()[i]; - if (skipPrefix) { - TStringBuf nameBuf = name; - nameBuf.SkipPrefix("PQ/"); - name = nameBuf; - } - counters.push_back(name.empty() ? nullptr : group->GetExpiringNamedCounter("name", name, false)); - } - } - }; - - using TPartitionLabeledCounters = TProtobufTabletLabeledCounters; - auto labeledCounters = std::make_unique("topic", 0, DatabasePath); - ensureCounters(AggregatedCounters, labeledCounters); - - using TPartitionExtendedLabeledCounters = TProtobufTabletLabeledCounters; - auto extendedLabeledCounters = std::make_unique("topic", 0, DatabasePath); - ensureCounters(AggregatedExtendedCounters, extendedLabeledCounters, {}, false); - - using TPartitionKeyCompactionCounters = TProtobufTabletLabeledCounters; - auto compactionCounters = std::make_unique("topic", 0, DatabasePath); - if (TabletConfig.GetEnableCompactification()) { - ensureCounters(AggregatedCompactionCounters, compactionCounters); - } else { - AggregatedCompactionCounters.clear(); - } - - using TConsumerLabeledCounters = TProtobufTabletLabeledCounters; - auto labeledConsumerCounters = std::make_unique("topic|x|consumer", 0, DatabasePath); - for (auto& [consumer, info]: Consumers) { - ensureCounters(info.AggregatedCounters, labeledConsumerCounters, {{"consumer", NPersQueue::ConvertOldConsumerName(consumer, ctx)}}); - info.Aggr.Reset(new TTabletLabeledCountersBase{}); - } - - /*** apply counters ****/ - - ui64 milliSeconds = TAppData::TimeProvider->Now().MilliSeconds(); - - auto aggr = std::make_unique(); - auto aggrExtended = std::make_unique(); - auto compactionAggr = std::make_unique(); - - auto setCounters = [](auto& counters, const auto& state) { - for (size_t i = 0; i < counters->GetCounters().Size() && i < state.ValuesSize(); ++i) { - counters->GetCounters()[i] = state.GetValues(i); - } - }; - - for (auto it = AggregatedStats.Stats.begin(); it != AggregatedStats.Stats.end(); ++it) { - auto& partitionStats = it->second; - - if (!partitionStats.HasCounters) { - continue; - } - - setCounters(labeledCounters, partitionStats.Counters); - aggr->AggregateWith(*labeledCounters); - - setCounters(extendedLabeledCounters, partitionStats.Counters.GetExtendedCounters()); - aggrExtended->AggregateWith(*extendedLabeledCounters); - - if (TabletConfig.GetEnableCompactification()) { - setCounters(compactionCounters, partitionStats.Counters.GetCompactionCounters()); - compactionAggr->AggregateWith(*compactionCounters); - } - - for (const auto& consumerStats : partitionStats.Counters.GetConsumerAggregatedCounters()) { - auto jt = Consumers.find(consumerStats.GetConsumer()); - if (jt == Consumers.end()) { - continue; - } - auto& consumerInfo = jt->second; - - setCounters(labeledConsumerCounters, consumerStats); - consumerInfo.Aggr->AggregateWith(*labeledConsumerCounters); - } - } - - auto processAggregators = [milliSeconds](auto& aggregator, auto& counters) { - for (size_t i = 0; aggregator->HasCounters() && i < aggregator->GetCounters().Size(); ++i) { - if (!counters[i]) { - continue; - } - const auto& type = aggregator->GetCounterType(i); - auto val = aggregator->GetCounters()[i].Get(); - if (type == TLabeledCounterOptions::CT_TIMELAG) { - val = val <= milliSeconds ? milliSeconds - val : 0; - } - counters[i]->Set(val); - } - }; - - /*** show counters ***/ - processAggregators(aggr, AggregatedCounters); - processAggregators(aggrExtended, AggregatedExtendedCounters); - processAggregators(compactionAggr, AggregatedCompactionCounters); - - for (auto& [consumer, info] : Consumers) { - processAggregators(info.Aggr, info.AggregatedCounters); - } +void TPersQueueReadBalancer::UpdateCounters(const TActorContext&) { + TopicMetrics->UpdateMetrics(); } TEvPersQueue::TEvPeriodicTopicStats* TPersQueueReadBalancer::GetStatsEvent() { @@ -916,12 +780,12 @@ void TPersQueueReadBalancer::StartWatchingSubDomainPathId() { void TPersQueueReadBalancer::Handle(TEvTxProxySchemeCache::TEvWatchNotifyUpdated::TPtr& ev, const TActorContext& ctx) { const auto* msg = ev->Get(); - if (DatabasePath.empty()) { - DatabasePath = msg->Result->GetPath(); + if (DatabaseInfo.DatabasePath.empty()) { + DatabaseInfo.DatabasePath = msg->Result->GetPath(); for (const auto& attr : msg->Result->GetPathDescription().GetUserAttributes()) { - if (attr.GetKey() == "folder_id") FolderId = attr.GetValue(); - if (attr.GetKey() == "cloud_id") CloudId = attr.GetValue(); - if (attr.GetKey() == "database_id") DatabaseId = attr.GetValue(); + if (attr.GetKey() == "folder_id") DatabaseInfo.FolderId = attr.GetValue(); + if (attr.GetKey() == "cloud_id") DatabaseInfo.CloudId = attr.GetValue(); + if (attr.GetKey() == "database_id") DatabaseInfo.DatabaseId = attr.GetValue(); } InitCounters(ctx); @@ -929,7 +793,7 @@ void TPersQueueReadBalancer::Handle(TEvTxProxySchemeCache::TEvWatchNotifyUpdated } if (PartitionsScaleManager) { - PartitionsScaleManager->UpdateDatabasePath(DatabasePath); + PartitionsScaleManager->UpdateDatabasePath(DatabaseInfo.DatabasePath); } if (SubDomainPathId && msg->PathId == *SubDomainPathId) { diff --git a/ydb/core/persqueue/pqrb/read_balancer.h b/ydb/core/persqueue/pqrb/read_balancer.h index 14f22bb5dad4..43ce094cf2b3 100644 --- a/ydb/core/persqueue/pqrb/read_balancer.h +++ b/ydb/core/persqueue/pqrb/read_balancer.h @@ -30,6 +30,15 @@ class TBalancer; class TMLPBalancer; } +class TTopicMetrics; + +struct TDatabaseInfo { + TString DatabasePath; + TString DatabaseId; + TString FolderId; + TString CloudId; +}; + class TMetricsTimeKeeper { public: @@ -213,18 +222,9 @@ class TPersQueueReadBalancer : public TActor, std::unordered_map TabletPipes; std::unordered_set PipesRequested; - std::vector<::NMonitoring::TDynamicCounters::TCounterPtr> AggregatedCounters; - std::vector<::NMonitoring::TDynamicCounters::TCounterPtr> AggregatedExtendedCounters; - std::vector<::NMonitoring::TDynamicCounters::TCounterPtr> AggregatedCompactionCounters; - - NMonitoring::TDynamicCounterPtr DynamicCounters; - NMonitoring::TDynamicCounters::TCounterPtr ActivePartitionCountCounter; - NMonitoring::TDynamicCounters::TCounterPtr InactivePartitionCountCounter; + TDatabaseInfo DatabaseInfo; - TString DatabasePath; - TString DatabaseId; - TString FolderId; - TString CloudId; + std::unique_ptr TopicMetrics; struct TPartitionStats { ui64 DataSize = 0; diff --git a/ydb/core/persqueue/pqrb/read_balancer__metrics.cpp b/ydb/core/persqueue/pqrb/read_balancer__metrics.cpp new file mode 100644 index 000000000000..3f816ed17645 --- /dev/null +++ b/ydb/core/persqueue/pqrb/read_balancer__metrics.cpp @@ -0,0 +1,254 @@ +#include "read_balancer.h" +#include "read_balancer__metrics.h" + +#include +#include +#include +#include +#include +#include +#include + +namespace NKikimr::NPQ { + + +namespace { + +struct TMetricCollector { + TMetricCollector(size_t size) + : Counters(size) + { + } + + void Collect(const auto& values) { + Collect(values.begin(), values.end()); + } + + void Collect(auto begin, auto end) { + ssize_t in_size = std::distance(begin, end); + AFL_ENSURE(in_size >= 0)("in_size", in_size); + size_t count = std::min(Counters.size(), static_cast(in_size)); + + for (size_t i = 0; i < count; ++i) { + Counters[i] += *begin; + ++begin; + } + } + + std::vector Counters; +}; + +struct TConsumerMetricCollector { + TConsumerMetricCollector() + : ClientLabeledCounters(EClientLabeledCounters_descriptor()->value_count()) + //, MLPConsumerLabeledCounters(EMLPConsumerLabeledCounters_descriptor()->value_count()) + //, MLPMessageLockAttemptsCounter(std::size(MLP_LOCKS_INTERVALS)) + { + } + + TMetricCollector ClientLabeledCounters; + //TMetricCollector MLPConsumerLabeledCounters; + //TMetricCollector MLPMessageLockAttemptsCounter; +}; + +struct TTopicMetricCollector { + TTopicMetricCollector() + : PartitionLabeledCounters(EPartitionLabeledCounters_descriptor()->value_count()) + , PartitionExtendedLabeledCounters(EPartitionExtendedLabeledCounters_descriptor()->value_count()) + , PartitionKeyCompactionLabeledCounters(EPartitionKeyCompactionLabeledCounters_descriptor()->value_count()) + { + } + + TMetricCollector PartitionLabeledCounters; + TMetricCollector PartitionExtendedLabeledCounters; + TMetricCollector PartitionKeyCompactionLabeledCounters; + + std::unordered_map Consumers; + + void Collect(const NKikimrPQ::TStatusResponse::TPartResult& partitionStatus) { + Collect(partitionStatus.GetAggregatedCounters()); + } + + void Collect(const NKikimrPQ::TAggregatedCounters& counters) { + PartitionLabeledCounters.Collect(counters.GetValues()); + PartitionExtendedLabeledCounters.Collect(counters.GetExtendedCounters().GetValues()); + PartitionKeyCompactionLabeledCounters.Collect(counters.GetCompactionCounters().GetValues()); + + for (const auto& consumer : counters.GetConsumerAggregatedCounters()) { + Consumers[consumer.GetConsumer()].ClientLabeledCounters.Collect(consumer.GetValues()); + } + + // for (const auto& consumer : counters.GetMLPConsumerCounters()) { + // auto& collector = Consumers[consumer.GetConsumer()]; + // collector.MLPConsumerLabeledCounters.Collect(consumer.GetCountersValues()); + // collector.MLPMessageLockAttemptsCounter.Collect(consumer.GetMessageLocksValues()); + // } + } +}; + +template +TCounters InitializeCounters( + NMonitoring::TDynamicCounterPtr root, + const TString& databasePath, + const TString labels = "topic", + const std::vector>& subgroups = {}, + bool skipPrefix = true +) { + auto group = root; + + for (const auto& subgroup : subgroups) { + group = group->GetSubgroup(subgroup.first, subgroup.second); + } + + using TConfig = TProtobufTabletLabeledCounters; + auto config = std::make_unique(labels, 0, databasePath); + + std::vector<::NMonitoring::TDynamicCounters::TCounterPtr> result; + for (size_t i = 0; i < config->GetCounters().Size(); ++i) { + TString name = config->GetNames()[i]; + if (skipPrefix) { + TStringBuf nameBuf = name; + nameBuf.SkipPrefix("PQ/"); + name = nameBuf; + } + result.push_back(name.empty() ? nullptr : group->GetExpiringNamedCounter("name", name, false)); + } + + return { + .Config = std::move(config), + .Counters = std::move(result) + }; +} + +void SetCounters(TCounters& counters, const TMetricCollector& metrics) { + ui64 now = TAppData::TimeProvider->Now().MilliSeconds(); + + for (size_t i = 0; i < counters.Counters.size(); ++i) { + auto value = metrics.Counters[i]; + + const auto& type = counters.Config->GetCounterType(i); + if (type == TLabeledCounterOptions::CT_TIMELAG) { + value = value < now ? now - value : 0; + } + + counters.Counters[i]->Set(value); + } +} + +} + + + +void TTopicMetrics::Initialize(const NKikimrPQ::TPQTabletConfig& tabletConfig, const TDatabaseInfo& database, const TString& topicPath, const NActors::TActorContext& ctx) { + if (DynamicCounters) { + return; + } + + TStringBuf name = TStringBuf(topicPath); + name.SkipPrefix(database.DatabasePath); + name.SkipPrefix("/"); + + bool isServerless = AppData(ctx)->FeatureFlags.GetEnableDbCounters(); //TODO: find out it via describe + DynamicCounters = AppData(ctx)->Counters->GetSubgroup("counters", isServerless ? "topics_serverless" : "topics") + ->GetSubgroup("host", "") + ->GetSubgroup("database", database.DatabasePath) + ->GetSubgroup("cloud_id", database.CloudId) + ->GetSubgroup("folder_id", database.FolderId) + ->GetSubgroup("database_id", database.DatabaseId) + ->GetSubgroup("topic", TString(name)); + + ActivePartitionCountCounter = DynamicCounters->GetExpiringNamedCounter("name", "topic.partition.active_count", false); + InactivePartitionCountCounter = DynamicCounters->GetExpiringNamedCounter("name", "topic.partition.inactive_count", false); + + PartitionLabeledCounters = InitializeCounters(DynamicCounters, database.DatabasePath); + PartitionExtendedLabeledCounters = InitializeCounters(DynamicCounters, database.DatabasePath, "topic", {}, true); + InitializeKeyCompactionCounters(database.DatabasePath, tabletConfig); + InitializeConsumerCounters(database.DatabasePath, tabletConfig, ctx); +} + +void TTopicMetrics::InitializeConsumerCounters(const TString& databasePath, const NKikimrPQ::TPQTabletConfig& tabletConfig, const NActors::TActorContext& ctx) { + for (const auto& consumer : tabletConfig.GetConsumers()) { + auto metricsConsumerName = NPersQueue::ConvertOldConsumerName(consumer.GetName(), ctx); + + auto& counters = ConsumerCounters[consumer.GetName()]; + counters.ClientLabeledCounters = InitializeCounters(DynamicCounters, databasePath, "topic|x|consumer",{{"consumer", metricsConsumerName}}); + + if (consumer.GetType() == NKikimrPQ::TPQTabletConfig::CONSUMER_TYPE_MLP) { + //metrics.MLPClientLabeledCounters = InitializeCounters(DynamicCounters, databasePath, "topic|consumer", {{"consumer", metricsConsumerName}}); + //metrics.MLPMessageLockAttemptsCounter = InitializeCounters(DynamicCounters, databasePath, {{"consumer", metricsConsumerName}}); + } + } + + std::unordered_set existedConsumers; + for (const auto& consumer : tabletConfig.GetConsumers()) { + existedConsumers.insert(consumer.GetName()); + } + for (auto it = ConsumerCounters.begin(); it != ConsumerCounters.end();) { + if (!existedConsumers.contains(it->first)) { + it = ConsumerCounters.erase(it); + } else { + ++it; + } + } +} + +void TTopicMetrics::InitializeKeyCompactionCounters(const TString& databasePath, const NKikimrPQ::TPQTabletConfig& tabletConfig) { + if (tabletConfig.GetEnableCompactification()) { + PartitionKeyCompactionLabeledCounters = InitializeCounters(DynamicCounters, databasePath, "topic", {}, true); + } else { + PartitionKeyCompactionLabeledCounters.Counters.clear(); + } +} + +void TTopicMetrics::UpdateConfig(const NKikimrPQ::TPQTabletConfig& tabletConfig, const TDatabaseInfo& database, const TString& topicPath, const NActors::TActorContext& ctx) { + Y_UNUSED(topicPath); + + if (!DynamicCounters) { + return; + } + + InitializeConsumerCounters(database.DatabasePath, tabletConfig, ctx); + + size_t inactiveCount = std::count_if(tabletConfig.GetAllPartitions().begin(), tabletConfig.GetAllPartitions().end(), [](auto& p) { + return p.GetStatus() == NKikimrPQ::ETopicPartitionStatus::Inactive; + }); + + ActivePartitionCountCounter->Set(tabletConfig.GetAllPartitions().size() - inactiveCount); + InactivePartitionCountCounter->Set(inactiveCount); +} + +void TTopicMetrics::Handle(NKikimrPQ::TStatusResponse::TPartResult&& partitionStatus) { + PartitionStatuses[partitionStatus.GetPartition()] = std::move(partitionStatus); +} + +void TTopicMetrics::UpdateMetrics() { + if (PartitionStatuses.empty()) { + return; + } + + TTopicMetricCollector collector; + for (auto& [_, partitionStatus] : PartitionStatuses) { + collector.Collect(partitionStatus); + } + + SetCounters(PartitionLabeledCounters, collector.PartitionLabeledCounters); + SetCounters(PartitionExtendedLabeledCounters, collector.PartitionExtendedLabeledCounters); + SetCounters(PartitionKeyCompactionLabeledCounters, collector.PartitionKeyCompactionLabeledCounters); + + for (auto& [consumer, consumerCounters] : ConsumerCounters) { + auto it = collector.Consumers.find(consumer); + if (it == collector.Consumers.end()) { + continue; + } + + auto& consumerMetrics = it->second; + + SetCounters(consumerCounters.ClientLabeledCounters, consumerMetrics.ClientLabeledCounters); + if (!consumerCounters.MLPClientLabeledCounters.Counters.empty()) { + // SetCounters(consumerCounters.MLPClientLabeledCounters, consumerMetrics.MLPConsumerLabeledCounters); + // TODO MLPMessageLockAttemptsCounter + } + } +} + +} \ No newline at end of file diff --git a/ydb/core/persqueue/pqrb/read_balancer__metrics.h b/ydb/core/persqueue/pqrb/read_balancer__metrics.h new file mode 100644 index 000000000000..c9ec70e1284c --- /dev/null +++ b/ydb/core/persqueue/pqrb/read_balancer__metrics.h @@ -0,0 +1,56 @@ +#pragma once + +#include +#include +#include +#include + +#include + +#include +#include + +namespace NKikimr::NPQ { + +struct TDatabaseInfo; + +struct TCounters { + std::unique_ptr Config; + std::vector<::NMonitoring::TDynamicCounters::TCounterPtr> Counters; +}; + +class TTopicMetrics { +public: + TTopicMetrics(); + ~TTopicMetrics(); + + void Initialize(const NKikimrPQ::TPQTabletConfig& tabletConfig, const TDatabaseInfo& database, const TString& topicPath, const NActors::TActorContext& ctx); + void UpdateConfig(const NKikimrPQ::TPQTabletConfig& tabletConfig, const TDatabaseInfo& database, const TString& topicPath, const NActors::TActorContext& ctx); + + void Handle(NKikimrPQ::TStatusResponse_TPartResult&& partitionStatus); + void UpdateMetrics(); + +protected: + void InitializeKeyCompactionCounters(const TString& databasePath, const NKikimrPQ::TPQTabletConfig& tabletConfig); + void InitializeConsumerCounters(const TString& databasePath, const NKikimrPQ::TPQTabletConfig& tabletConfig, const NActors::TActorContext& ctx); + +private: + NMonitoring::TDynamicCounterPtr DynamicCounters; + + NMonitoring::TDynamicCounters::TCounterPtr ActivePartitionCountCounter; + NMonitoring::TDynamicCounters::TCounterPtr InactivePartitionCountCounter; + + TCounters PartitionLabeledCounters; + TCounters PartitionExtendedLabeledCounters; + TCounters PartitionKeyCompactionLabeledCounters; + + struct TConsumerCounters { + TCounters ClientLabeledCounters; + TCounters MLPClientLabeledCounters; + ::NMonitoring::TDynamicCounters::TCounterPtr MLPMessageLockAttemptsCounter; + }; + std::unordered_map ConsumerCounters; + + std::unordered_map PartitionStatuses; +}; +} diff --git a/ydb/core/persqueue/pqrb/read_balancer__txinit.h b/ydb/core/persqueue/pqrb/read_balancer__txinit.h index 4ca469b4c3e6..a563d82d933e 100644 --- a/ydb/core/persqueue/pqrb/read_balancer__txinit.h +++ b/ydb/core/persqueue/pqrb/read_balancer__txinit.h @@ -60,7 +60,8 @@ struct TPersQueueReadBalancer::TTxInit : public ITransaction { Self->PartitionGraph = MakePartitionGraph(Self->TabletConfig); if (SplitMergeEnabled(Self->TabletConfig)) { - Self->PartitionsScaleManager = std::make_unique(Self->Topic, Self->Path, Self->DatabasePath, Self->PathId, Self->Version, Self->TabletConfig, Self->PartitionGraph); + // TODO DatabasePath is not initialized yet + Self->PartitionsScaleManager = std::make_unique(Self->Topic, Self->Path, Self->DatabaseInfo.DatabasePath, Self->PathId, Self->Version, Self->TabletConfig, Self->PartitionGraph); } Self->UpdateConfigCounters(); } diff --git a/ydb/core/persqueue/pqrb/ya.make b/ydb/core/persqueue/pqrb/ya.make index 9824d5544b4c..96ffab820467 100644 --- a/ydb/core/persqueue/pqrb/ya.make +++ b/ydb/core/persqueue/pqrb/ya.make @@ -7,6 +7,7 @@ SRCS( partition_scale_manager_graph_cmp.cpp read_balancer__balancing_app.cpp read_balancer__balancing.cpp + read_balancer__metrics.cpp read_balancer__mlp_balancing.cpp read_balancer_app.cpp read_balancer.cpp diff --git a/ydb/core/persqueue/public/config.h b/ydb/core/persqueue/public/config.h index 2cda9747e779..08d1f4fc3e2e 100644 --- a/ydb/core/persqueue/public/config.h +++ b/ydb/core/persqueue/public/config.h @@ -9,6 +9,8 @@ class TPQTabletConfig_TConsumer; class TMirrorPartitionConfig; class TMLPStorageSnapshot; class TMLPStorageWAL; +class TStatusResponse; +class TStatusResponse_TPartResult; }; From 919d1b355db43a1c3cf60ddc361282777469e0eb Mon Sep 17 00:00:00 2001 From: Nikolay Shestakov Date: Fri, 12 Dec 2025 11:47:11 +0000 Subject: [PATCH 02/16] rename --- ydb/core/persqueue/pqrb/read_balancer.cpp | 10 +++++----- ydb/core/persqueue/pqrb/read_balancer.h | 4 ++-- ydb/core/persqueue/pqrb/read_balancer__metrics.cpp | 12 ++++++------ ydb/core/persqueue/pqrb/read_balancer__metrics.h | 8 +++++--- 4 files changed, 18 insertions(+), 16 deletions(-) diff --git a/ydb/core/persqueue/pqrb/read_balancer.cpp b/ydb/core/persqueue/pqrb/read_balancer.cpp index 4deac4251f28..1bcce258cf4b 100644 --- a/ydb/core/persqueue/pqrb/read_balancer.cpp +++ b/ydb/core/persqueue/pqrb/read_balancer.cpp @@ -49,7 +49,7 @@ TPersQueueReadBalancer::TPersQueueReadBalancer(const TActorId &tablet, TTabletSt , StartPartitionIdForWrite(0) , TotalGroups(0) , ResourceMetrics(nullptr) - , TopicMetrics(std::make_unique()) + , TopicMetricsHandler(std::make_unique()) , StatsReportRound(0) { Balancer = std::make_unique(*this); @@ -486,7 +486,7 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvStatusResponse::TPtr& ev, c ); } - TopicMetrics->Handle(std::move(partRes)); + TopicMetricsHandler->Handle(std::move(partRes)); // TODO delete AggregatedStats.AggrStats(partitionId, partRes.GetPartitionSize(), partRes.GetUsedReserveSize()); @@ -578,15 +578,15 @@ void TPersQueueReadBalancer::InitCounters(const TActorContext& ctx) { return; } - TopicMetrics->Initialize(TabletConfig, DatabaseInfo, Path, ctx); + TopicMetricsHandler->Initialize(TabletConfig, DatabaseInfo, Path, ctx); } void TPersQueueReadBalancer::UpdateConfigCounters() { - TopicMetrics->UpdateConfig(TabletConfig, DatabaseInfo, Path, ActorContext()); + TopicMetricsHandler->UpdateConfig(TabletConfig, DatabaseInfo, Path, ActorContext()); } void TPersQueueReadBalancer::UpdateCounters(const TActorContext&) { - TopicMetrics->UpdateMetrics(); + TopicMetricsHandler->UpdateMetrics(); } TEvPersQueue::TEvPeriodicTopicStats* TPersQueueReadBalancer::GetStatsEvent() { diff --git a/ydb/core/persqueue/pqrb/read_balancer.h b/ydb/core/persqueue/pqrb/read_balancer.h index 43ce094cf2b3..2d6096f8fd93 100644 --- a/ydb/core/persqueue/pqrb/read_balancer.h +++ b/ydb/core/persqueue/pqrb/read_balancer.h @@ -30,7 +30,7 @@ class TBalancer; class TMLPBalancer; } -class TTopicMetrics; +class TTopicMetricsHandler; struct TDatabaseInfo { TString DatabasePath; @@ -224,7 +224,7 @@ class TPersQueueReadBalancer : public TActor, TDatabaseInfo DatabaseInfo; - std::unique_ptr TopicMetrics; + std::unique_ptr TopicMetricsHandler; struct TPartitionStats { ui64 DataSize = 0; diff --git a/ydb/core/persqueue/pqrb/read_balancer__metrics.cpp b/ydb/core/persqueue/pqrb/read_balancer__metrics.cpp index 3f816ed17645..4a31d2ebdc8a 100644 --- a/ydb/core/persqueue/pqrb/read_balancer__metrics.cpp +++ b/ydb/core/persqueue/pqrb/read_balancer__metrics.cpp @@ -139,7 +139,7 @@ void SetCounters(TCounters& counters, const TMetricCollector& metrics) { -void TTopicMetrics::Initialize(const NKikimrPQ::TPQTabletConfig& tabletConfig, const TDatabaseInfo& database, const TString& topicPath, const NActors::TActorContext& ctx) { +void TTopicMetricsHandler::Initialize(const NKikimrPQ::TPQTabletConfig& tabletConfig, const TDatabaseInfo& database, const TString& topicPath, const NActors::TActorContext& ctx) { if (DynamicCounters) { return; } @@ -166,7 +166,7 @@ void TTopicMetrics::Initialize(const NKikimrPQ::TPQTabletConfig& tabletConfig, c InitializeConsumerCounters(database.DatabasePath, tabletConfig, ctx); } -void TTopicMetrics::InitializeConsumerCounters(const TString& databasePath, const NKikimrPQ::TPQTabletConfig& tabletConfig, const NActors::TActorContext& ctx) { +void TTopicMetricsHandler::InitializeConsumerCounters(const TString& databasePath, const NKikimrPQ::TPQTabletConfig& tabletConfig, const NActors::TActorContext& ctx) { for (const auto& consumer : tabletConfig.GetConsumers()) { auto metricsConsumerName = NPersQueue::ConvertOldConsumerName(consumer.GetName(), ctx); @@ -192,7 +192,7 @@ void TTopicMetrics::InitializeConsumerCounters(const TString& databasePath, cons } } -void TTopicMetrics::InitializeKeyCompactionCounters(const TString& databasePath, const NKikimrPQ::TPQTabletConfig& tabletConfig) { +void TTopicMetricsHandler::InitializeKeyCompactionCounters(const TString& databasePath, const NKikimrPQ::TPQTabletConfig& tabletConfig) { if (tabletConfig.GetEnableCompactification()) { PartitionKeyCompactionLabeledCounters = InitializeCounters(DynamicCounters, databasePath, "topic", {}, true); } else { @@ -200,7 +200,7 @@ void TTopicMetrics::InitializeKeyCompactionCounters(const TString& databasePath, } } -void TTopicMetrics::UpdateConfig(const NKikimrPQ::TPQTabletConfig& tabletConfig, const TDatabaseInfo& database, const TString& topicPath, const NActors::TActorContext& ctx) { +void TTopicMetricsHandler::UpdateConfig(const NKikimrPQ::TPQTabletConfig& tabletConfig, const TDatabaseInfo& database, const TString& topicPath, const NActors::TActorContext& ctx) { Y_UNUSED(topicPath); if (!DynamicCounters) { @@ -217,11 +217,11 @@ void TTopicMetrics::UpdateConfig(const NKikimrPQ::TPQTabletConfig& tabletConfig, InactivePartitionCountCounter->Set(inactiveCount); } -void TTopicMetrics::Handle(NKikimrPQ::TStatusResponse::TPartResult&& partitionStatus) { +void TTopicMetricsHandler::Handle(NKikimrPQ::TStatusResponse::TPartResult&& partitionStatus) { PartitionStatuses[partitionStatus.GetPartition()] = std::move(partitionStatus); } -void TTopicMetrics::UpdateMetrics() { +void TTopicMetricsHandler::UpdateMetrics() { if (PartitionStatuses.empty()) { return; } diff --git a/ydb/core/persqueue/pqrb/read_balancer__metrics.h b/ydb/core/persqueue/pqrb/read_balancer__metrics.h index c9ec70e1284c..1c5c9d8f6145 100644 --- a/ydb/core/persqueue/pqrb/read_balancer__metrics.h +++ b/ydb/core/persqueue/pqrb/read_balancer__metrics.h @@ -14,15 +14,17 @@ namespace NKikimr::NPQ { struct TDatabaseInfo; + + struct TCounters { std::unique_ptr Config; std::vector<::NMonitoring::TDynamicCounters::TCounterPtr> Counters; }; -class TTopicMetrics { +class TTopicMetricsHandler { public: - TTopicMetrics(); - ~TTopicMetrics(); + TTopicMetricsHandler(); + ~TTopicMetricsHandler(); void Initialize(const NKikimrPQ::TPQTabletConfig& tabletConfig, const TDatabaseInfo& database, const TString& topicPath, const NActors::TActorContext& ctx); void UpdateConfig(const NKikimrPQ::TPQTabletConfig& tabletConfig, const TDatabaseInfo& database, const TString& topicPath, const NActors::TActorContext& ctx); From 13fe00ac5cfb7715ab81513f7cd74ca1a44d27b4 Mon Sep 17 00:00:00 2001 From: Nikolay Shestakov Date: Fri, 12 Dec 2025 12:34:11 +0000 Subject: [PATCH 03/16] WIP --- ydb/core/persqueue/pqrb/read_balancer.cpp | 60 ++++--------------- ydb/core/persqueue/pqrb/read_balancer.h | 28 --------- .../persqueue/pqrb/read_balancer__metrics.cpp | 56 ++++++++++++++++- .../persqueue/pqrb/read_balancer__metrics.h | 25 +++++++- .../persqueue/pqrb/read_balancer__txinit.h | 5 +- ydb/core/persqueue/pqrb/read_balancer_app.cpp | 11 ++-- 6 files changed, 96 insertions(+), 89 deletions(-) diff --git a/ydb/core/persqueue/pqrb/read_balancer.cpp b/ydb/core/persqueue/pqrb/read_balancer.cpp index 1bcce258cf4b..6f6647b134d4 100644 --- a/ydb/core/persqueue/pqrb/read_balancer.cpp +++ b/ydb/core/persqueue/pqrb/read_balancer.cpp @@ -66,11 +66,10 @@ struct TPersQueueReadBalancer::TTxWritePartitionStats : public ITransaction { bool Execute(TTransactionContext& txc, const TActorContext&) override { Self->TTxWritePartitionStatsScheduled = false; - NIceDb::TNiceDb db(txc.DB); - for (auto& s : Self->AggregatedStats.Stats) { - auto partition = s.first; - auto& stats = s.second; + auto& metrics = Self->TopicMetricsHandler->GetTopicMetrics(); + NIceDb::TNiceDb db(txc.DB); + for (auto& [partition, stats] : metrics.PartitionMetrics) { auto it = Self->PartitionsInfo.find(partition); if (it == Self->PartitionsInfo.end()) { continue; @@ -487,13 +486,6 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvStatusResponse::TPtr& ev, c } TopicMetricsHandler->Handle(std::move(partRes)); - - // TODO delete - AggregatedStats.AggrStats(partitionId, partRes.GetPartitionSize(), partRes.GetUsedReserveSize()); - AggregatedStats.AggrStats(partRes.GetAvgWriteSpeedPerSec(), partRes.GetAvgWriteSpeedPerMin(), - partRes.GetAvgWriteSpeedPerHour(), partRes.GetAvgWriteSpeedPerDay()); - AggregatedStats.Stats[partitionId].Counters = partRes.GetAggregatedCounters(); - AggregatedStats.Stats[partitionId].HasCounters = true; } Balancer->Handle(ev, ctx); @@ -521,34 +513,6 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvStatus::TPtr& ev, const TAc Send(ev.Get()->Sender, GetStatsEvent()); } -void TPersQueueReadBalancer::TAggregatedStats::AggrStats(ui32 partition, ui64 dataSize, ui64 usedReserveSize) { - AFL_ENSURE(dataSize >= usedReserveSize); - - auto& oldValue = Stats[partition]; - - TPartitionStats newValue; - newValue.DataSize = dataSize; - newValue.UsedReserveSize = usedReserveSize; - - TotalDataSize += (newValue.DataSize - oldValue.DataSize); - TotalUsedReserveSize += (newValue.UsedReserveSize - oldValue.UsedReserveSize); - - AFL_ENSURE(TotalDataSize >= TotalUsedReserveSize); - - oldValue = newValue; -} - -void TPersQueueReadBalancer::TAggregatedStats::AggrStats(ui64 avgWriteSpeedPerSec, ui64 avgWriteSpeedPerMin, ui64 avgWriteSpeedPerHour, ui64 avgWriteSpeedPerDay) { - NewMetrics.TotalAvgWriteSpeedPerSec += avgWriteSpeedPerSec; - NewMetrics.MaxAvgWriteSpeedPerSec = Max(NewMetrics.MaxAvgWriteSpeedPerSec, avgWriteSpeedPerSec); - NewMetrics.TotalAvgWriteSpeedPerMin += avgWriteSpeedPerMin; - NewMetrics.MaxAvgWriteSpeedPerMin = Max(NewMetrics.MaxAvgWriteSpeedPerMin, avgWriteSpeedPerMin); - NewMetrics.TotalAvgWriteSpeedPerHour += avgWriteSpeedPerHour; - NewMetrics.MaxAvgWriteSpeedPerHour = Max(NewMetrics.MaxAvgWriteSpeedPerHour, avgWriteSpeedPerHour); - NewMetrics.TotalAvgWriteSpeedPerDay += avgWriteSpeedPerDay; - NewMetrics.MaxAvgWriteSpeedPerDay = Max(NewMetrics.MaxAvgWriteSpeedPerDay, avgWriteSpeedPerDay); -} - void TPersQueueReadBalancer::CheckStat(const TActorContext& ctx) { Y_UNUSED(ctx); //TODO: Deside about changing number of partitions and send request to SchemeShard @@ -559,18 +523,17 @@ void TPersQueueReadBalancer::CheckStat(const TActorContext& ctx) { Execute(new TTxWritePartitionStats(this)); } - AggregatedStats.Metrics = AggregatedStats.NewMetrics; + UpdateCounters(ctx); TEvPersQueue::TEvPeriodicTopicStats* ev = GetStatsEvent(); PQ_LOG_D("Send TEvPeriodicTopicStats PathId: " << PathId << " Generation: " << Generation << " StatsReportRound: " << StatsReportRound - << " DataSize: " << AggregatedStats.TotalDataSize - << " UsedReserveSize: " << AggregatedStats.TotalUsedReserveSize); + << " DataSize: " << TopicMetricsHandler->GetTopicMetrics().TotalDataSize + << " UsedReserveSize: " << TopicMetricsHandler->GetTopicMetrics().TotalUsedReserveSize); NTabletPipe::SendData(ctx, GetPipeClient(SchemeShardId, ctx), ev); - UpdateCounters(ctx); } void TPersQueueReadBalancer::InitCounters(const TActorContext& ctx) { @@ -590,14 +553,16 @@ void TPersQueueReadBalancer::UpdateCounters(const TActorContext&) { } TEvPersQueue::TEvPeriodicTopicStats* TPersQueueReadBalancer::GetStatsEvent() { + auto& metrics = TopicMetricsHandler->GetTopicMetrics(); + TEvPersQueue::TEvPeriodicTopicStats* ev = new TEvPersQueue::TEvPeriodicTopicStats(); auto& rec = ev->Record; rec.SetPathId(PathId); rec.SetGeneration(Generation); rec.SetRound(++StatsReportRound); - rec.SetDataSize(AggregatedStats.TotalDataSize); - rec.SetUsedReserveSize(AggregatedStats.TotalUsedReserveSize); + rec.SetDataSize(metrics.TotalDataSize); + rec.SetUsedReserveSize(metrics.TotalUsedReserveSize); rec.SetSubDomainOutOfSpace(SubDomainOutOfSpace); return ev; @@ -609,12 +574,7 @@ void TPersQueueReadBalancer::GetStat(const TActorContext& ctx) { CheckStat(ctx); } - TPartitionMetrics newMetrics; - AggregatedStats.NewMetrics = newMetrics; - for (auto& p : PartitionsInfo) { - AggregatedStats.Stats[p.first].HasCounters = false; - const ui64& tabletId = p.second.TabletId; if (AggregatedStats.Cookies.contains(tabletId)) { //already asked stat continue; diff --git a/ydb/core/persqueue/pqrb/read_balancer.h b/ydb/core/persqueue/pqrb/read_balancer.h index 2d6096f8fd93..7d4db3bc13be 100644 --- a/ydb/core/persqueue/pqrb/read_balancer.h +++ b/ydb/core/persqueue/pqrb/read_balancer.h @@ -226,39 +226,11 @@ class TPersQueueReadBalancer : public TActor, std::unique_ptr TopicMetricsHandler; - struct TPartitionStats { - ui64 DataSize = 0; - ui64 UsedReserveSize = 0; - NKikimrPQ::TAggregatedCounters Counters; - bool HasCounters = false; - }; - - struct TPartitionMetrics { - ui64 TotalAvgWriteSpeedPerSec = 0; - ui64 MaxAvgWriteSpeedPerSec = 0; - ui64 TotalAvgWriteSpeedPerMin = 0; - ui64 MaxAvgWriteSpeedPerMin = 0; - ui64 TotalAvgWriteSpeedPerHour = 0; - ui64 MaxAvgWriteSpeedPerHour = 0; - ui64 TotalAvgWriteSpeedPerDay = 0; - ui64 MaxAvgWriteSpeedPerDay = 0; - }; - struct TAggregatedStats { - std::unordered_map Stats; std::unordered_map Cookies; - ui64 TotalDataSize = 0; - ui64 TotalUsedReserveSize = 0; - - TPartitionMetrics Metrics; - TPartitionMetrics NewMetrics; - ui64 Round = 0; ui64 NextCookie = 0; - - void AggrStats(ui32 partition, ui64 dataSize, ui64 usedReserveSize); - void AggrStats(ui64 avgWriteSpeedPerSec, ui64 avgWriteSpeedPerMin, ui64 avgWriteSpeedPerHour, ui64 avgWriteSpeedPerDay); }; TAggregatedStats AggregatedStats; diff --git a/ydb/core/persqueue/pqrb/read_balancer__metrics.cpp b/ydb/core/persqueue/pqrb/read_balancer__metrics.cpp index 4a31d2ebdc8a..ca645c3a77e3 100644 --- a/ydb/core/persqueue/pqrb/read_balancer__metrics.cpp +++ b/ydb/core/persqueue/pqrb/read_balancer__metrics.cpp @@ -52,13 +52,18 @@ struct TConsumerMetricCollector { }; struct TTopicMetricCollector { - TTopicMetricCollector() - : PartitionLabeledCounters(EPartitionLabeledCounters_descriptor()->value_count()) + TTopicMetricCollector(absl::flat_hash_map partitionMetrics) + : ExistedPartitionMetrics(std::move(partitionMetrics)) + , PartitionLabeledCounters(EPartitionLabeledCounters_descriptor()->value_count()) , PartitionExtendedLabeledCounters(EPartitionExtendedLabeledCounters_descriptor()->value_count()) , PartitionKeyCompactionLabeledCounters(EPartitionKeyCompactionLabeledCounters_descriptor()->value_count()) { } + absl::flat_hash_map ExistedPartitionMetrics; + + TTopicMetrics TopicMetrics; + TMetricCollector PartitionLabeledCounters; TMetricCollector PartitionExtendedLabeledCounters; TMetricCollector PartitionKeyCompactionLabeledCounters; @@ -66,7 +71,25 @@ struct TTopicMetricCollector { std::unordered_map Consumers; void Collect(const NKikimrPQ::TStatusResponse::TPartResult& partitionStatus) { + TopicMetrics.TotalDataSize += partitionStatus.GetPartitionSize(); + TopicMetrics.TotalUsedReserveSize += partitionStatus.GetUsedReserveSize(); + + TopicMetrics.TotalAvgWriteSpeedPerSec += partitionStatus.GetAvgWriteSpeedPerSec(); + TopicMetrics.MaxAvgWriteSpeedPerSec = Max(TopicMetrics.MaxAvgWriteSpeedPerSec, partitionStatus.GetAvgWriteSpeedPerSec()); + TopicMetrics.TotalAvgWriteSpeedPerMin += partitionStatus.GetAvgWriteSpeedPerMin(); + TopicMetrics.MaxAvgWriteSpeedPerMin = Max(TopicMetrics.MaxAvgWriteSpeedPerMin, partitionStatus.GetAvgWriteSpeedPerMin()); + TopicMetrics.TotalAvgWriteSpeedPerHour += partitionStatus.GetAvgWriteSpeedPerHour(); + TopicMetrics.MaxAvgWriteSpeedPerHour = Max(TopicMetrics.MaxAvgWriteSpeedPerHour, partitionStatus.GetAvgWriteSpeedPerHour()); + TopicMetrics.TotalAvgWriteSpeedPerDay += partitionStatus.GetAvgWriteSpeedPerDay(); + TopicMetrics.MaxAvgWriteSpeedPerDay = Max(TopicMetrics.MaxAvgWriteSpeedPerDay, partitionStatus.GetAvgWriteSpeedPerDay()); + + auto& partitionMetrics = TopicMetrics.PartitionMetrics[partitionStatus.GetPartition()]; + partitionMetrics.DataSize = partitionStatus.GetPartitionSize(); + partitionMetrics.UsedReserveSize = partitionStatus.GetUsedReserveSize(); + Collect(partitionStatus.GetAggregatedCounters()); + + ExistedPartitionMetrics.erase(partitionStatus.GetPartition()); } void Collect(const NKikimrPQ::TAggregatedCounters& counters) { @@ -78,12 +101,19 @@ struct TTopicMetricCollector { Consumers[consumer.GetConsumer()].ClientLabeledCounters.Collect(consumer.GetValues()); } + // TODO MLP // for (const auto& consumer : counters.GetMLPConsumerCounters()) { // auto& collector = Consumers[consumer.GetConsumer()]; // collector.MLPConsumerLabeledCounters.Collect(consumer.GetCountersValues()); // collector.MLPMessageLockAttemptsCounter.Collect(consumer.GetMessageLocksValues()); // } } + + void Finish() { + for (auto& [partitionId, partitionMetrics] : ExistedPartitionMetrics) { + TopicMetrics.PartitionMetrics[partitionId] = partitionMetrics; + } + } }; template @@ -137,7 +167,12 @@ void SetCounters(TCounters& counters, const TMetricCollector& metrics) { } +TTopicMetricsHandler::TTopicMetricsHandler() = default; +TTopicMetricsHandler::~TTopicMetricsHandler() = default; +const TTopicMetrics& TTopicMetricsHandler::GetTopicMetrics() const { + return TopicMetrics; +} void TTopicMetricsHandler::Initialize(const NKikimrPQ::TPQTabletConfig& tabletConfig, const TDatabaseInfo& database, const TString& topicPath, const NActors::TActorContext& ctx) { if (DynamicCounters) { @@ -207,6 +242,7 @@ void TTopicMetricsHandler::UpdateConfig(const NKikimrPQ::TPQTabletConfig& tablet return; } + InitializeKeyCompactionCounters(database.DatabasePath, tabletConfig); InitializeConsumerCounters(database.DatabasePath, tabletConfig, ctx); size_t inactiveCount = std::count_if(tabletConfig.GetAllPartitions().begin(), tabletConfig.GetAllPartitions().end(), [](auto& p) { @@ -217,6 +253,16 @@ void TTopicMetricsHandler::UpdateConfig(const NKikimrPQ::TPQTabletConfig& tablet InactivePartitionCountCounter->Set(inactiveCount); } +void TTopicMetricsHandler::InitializePartitions(ui32 partitionId, ui64 dataSize, ui64 usedReserveSize) { + TopicMetrics.PartitionMetrics[partitionId] = { + .DataSize = dataSize, + .UsedReserveSize = usedReserveSize + }; + + TopicMetrics.TotalDataSize += dataSize; + TopicMetrics.TotalUsedReserveSize += usedReserveSize; +} + void TTopicMetricsHandler::Handle(NKikimrPQ::TStatusResponse::TPartResult&& partitionStatus) { PartitionStatuses[partitionStatus.GetPartition()] = std::move(partitionStatus); } @@ -226,10 +272,14 @@ void TTopicMetricsHandler::UpdateMetrics() { return; } - TTopicMetricCollector collector; + TTopicMetricCollector collector(TopicMetrics.PartitionMetrics); for (auto& [_, partitionStatus] : PartitionStatuses) { collector.Collect(partitionStatus); } + collector.Finish(); + PartitionStatuses.clear(); + + TopicMetrics = collector.TopicMetrics; SetCounters(PartitionLabeledCounters, collector.PartitionLabeledCounters); SetCounters(PartitionExtendedLabeledCounters, collector.PartitionExtendedLabeledCounters); diff --git a/ydb/core/persqueue/pqrb/read_balancer__metrics.h b/ydb/core/persqueue/pqrb/read_balancer__metrics.h index 1c5c9d8f6145..54ce6d880014 100644 --- a/ydb/core/persqueue/pqrb/read_balancer__metrics.h +++ b/ydb/core/persqueue/pqrb/read_balancer__metrics.h @@ -14,7 +14,25 @@ namespace NKikimr::NPQ { struct TDatabaseInfo; - +struct TTopicMetrics { + ui64 TotalDataSize = 0; + ui64 TotalUsedReserveSize = 0; + + ui64 TotalAvgWriteSpeedPerSec = 0; + ui64 MaxAvgWriteSpeedPerSec = 0; + ui64 TotalAvgWriteSpeedPerMin = 0; + ui64 MaxAvgWriteSpeedPerMin = 0; + ui64 TotalAvgWriteSpeedPerHour = 0; + ui64 MaxAvgWriteSpeedPerHour = 0; + ui64 TotalAvgWriteSpeedPerDay = 0; + ui64 MaxAvgWriteSpeedPerDay = 0; + + struct TPartitionMetrics { + ui64 DataSize = 0; + ui64 UsedReserveSize = 0; + }; + absl::flat_hash_map PartitionMetrics; +}; struct TCounters { std::unique_ptr Config; @@ -26,8 +44,11 @@ class TTopicMetricsHandler { TTopicMetricsHandler(); ~TTopicMetricsHandler(); + const TTopicMetrics& GetTopicMetrics() const; + void Initialize(const NKikimrPQ::TPQTabletConfig& tabletConfig, const TDatabaseInfo& database, const TString& topicPath, const NActors::TActorContext& ctx); void UpdateConfig(const NKikimrPQ::TPQTabletConfig& tabletConfig, const TDatabaseInfo& database, const TString& topicPath, const NActors::TActorContext& ctx); + void InitializePartitions(ui32 partitionId, ui64 dataSize, ui64 usedReserveSize); void Handle(NKikimrPQ::TStatusResponse_TPartResult&& partitionStatus); void UpdateMetrics(); @@ -39,6 +60,8 @@ class TTopicMetricsHandler { private: NMonitoring::TDynamicCounterPtr DynamicCounters; + TTopicMetrics TopicMetrics; + NMonitoring::TDynamicCounters::TCounterPtr ActivePartitionCountCounter; NMonitoring::TDynamicCounters::TCounterPtr InactivePartitionCountCounter; diff --git a/ydb/core/persqueue/pqrb/read_balancer__txinit.h b/ydb/core/persqueue/pqrb/read_balancer__txinit.h index a563d82d933e..7a7694f4657c 100644 --- a/ydb/core/persqueue/pqrb/read_balancer__txinit.h +++ b/ydb/core/persqueue/pqrb/read_balancer__txinit.h @@ -1,6 +1,7 @@ #pragma once #include "read_balancer.h" +#include "read_balancer__metrics.h" #include "read_balancer__schema.h" #include @@ -77,8 +78,8 @@ struct TPersQueueReadBalancer::TTxInit : public ITransaction { ui64 tabletId = partsRowset.GetValue(); partitionsInfo[part] = {tabletId}; - Self->AggregatedStats.AggrStats(part, partsRowset.GetValue(), - partsRowset.GetValue()); + Self->TopicMetricsHandler->InitializePartitions(part, partsRowset.GetValue(), + partsRowset.GetValue()); if (!partsRowset.Next()) return false; diff --git a/ydb/core/persqueue/pqrb/read_balancer_app.cpp b/ydb/core/persqueue/pqrb/read_balancer_app.cpp index 5abe979a3d98..6d6ebc2ada4a 100644 --- a/ydb/core/persqueue/pqrb/read_balancer_app.cpp +++ b/ydb/core/persqueue/pqrb/read_balancer_app.cpp @@ -1,6 +1,7 @@ #include "read_balancer.h" #include "read_balancer__balancing.h" +#include "read_balancer__metrics.h" #include @@ -17,7 +18,7 @@ bool TPersQueueReadBalancer::OnRenderAppHtmlPage(NMon::TEvRemoteHttpInfo::TPtr e } TString TPersQueueReadBalancer::GenerateStat() { - auto& metrics = AggregatedStats.Metrics; + auto& metrics = TopicMetricsHandler->GetTopicMetrics(); TStringStream str; HTML_APP_PAGE(str, "PersQueueReadBalancer " << TabletID() << " (" << Path << ")") { @@ -45,9 +46,9 @@ TString TPersQueueReadBalancer::GenerateStat() { PROPERTIES("Statistics") { PROPERTY("Active pipes", Balancer->GetSessions().size()); PROPERTY("Active partitions", NumActiveParts); - PROPERTY("Total data size", AggregatedStats.TotalDataSize); + PROPERTY("Total data size", metrics.TotalDataSize); PROPERTY("Reserve size", PartitionReserveSize()); - PROPERTY("Used reserve size", AggregatedStats.TotalUsedReserveSize); + PROPERTY("Used reserve size", metrics.TotalUsedReserveSize); PROPERTY("[Total/Max/Avg]WriteSpeedSec", metrics.TotalAvgWriteSpeedPerSec << "/" << metrics.MaxAvgWriteSpeedPerSec << "/" << metrics.TotalAvgWriteSpeedPerSec / NumActiveParts); PROPERTY("[Total/Max/Avg]WriteSpeedMin", metrics.TotalAvgWriteSpeedPerMin << "/" << metrics.MaxAvgWriteSpeedPerMin << "/" << metrics.TotalAvgWriteSpeedPerMin / NumActiveParts); PROPERTY("[Total/Max/Avg]WriteSpeedHour", metrics.TotalAvgWriteSpeedPerHour << "/" << metrics.MaxAvgWriteSpeedPerHour << "/" << metrics.TotalAvgWriteSpeedPerHour / NumActiveParts); @@ -75,7 +76,7 @@ TString TPersQueueReadBalancer::GenerateStat() { } TABLEBODY() { for (auto& [partitionId, partitionInfo] : PartitionsInfo) { - const auto& stats = AggregatedStats.Stats[partitionId]; + const auto& stats = metrics.PartitionMetrics.find(partitionId); const auto* node = PartitionGraph.GetPartition(partitionId); TString style = node && node->DirectChildren.empty() ? "text-success" : "text-muted"; @@ -110,7 +111,7 @@ TString TPersQueueReadBalancer::GenerateStat() { } } } - TABLED() { str << stats.DataSize; } + TABLED() { str << (stats == metrics.PartitionMetrics.end() ? 0 : stats->second.DataSize); } } } } From d39c5e8294f9306ecc22aa0ad8bcf9adfa2bedf9 Mon Sep 17 00:00:00 2001 From: Nikolay Shestakov Date: Fri, 12 Dec 2025 12:43:01 +0000 Subject: [PATCH 04/16] fix --- ydb/core/persqueue/pqrb/read_balancer__metrics.cpp | 3 +++ 1 file changed, 3 insertions(+) diff --git a/ydb/core/persqueue/pqrb/read_balancer__metrics.cpp b/ydb/core/persqueue/pqrb/read_balancer__metrics.cpp index ca645c3a77e3..faff0097ae48 100644 --- a/ydb/core/persqueue/pqrb/read_balancer__metrics.cpp +++ b/ydb/core/persqueue/pqrb/read_balancer__metrics.cpp @@ -111,6 +111,9 @@ struct TTopicMetricCollector { void Finish() { for (auto& [partitionId, partitionMetrics] : ExistedPartitionMetrics) { + TopicMetrics.TotalDataSize += partitionMetrics.DataSize; + TopicMetrics.TotalUsedReserveSize += partitionMetrics.UsedReserveSize; + TopicMetrics.PartitionMetrics[partitionId] = partitionMetrics; } } From 5bc996ba6c9a3e58e734cd6b7e7c2c06e1ac506b Mon Sep 17 00:00:00 2001 From: Nikolay Shestakov Date: Fri, 12 Dec 2025 13:44:11 +0000 Subject: [PATCH 05/16] fix --- .../persqueue/pqrb/read_balancer__metrics.cpp | 69 +++++++++++-------- 1 file changed, 42 insertions(+), 27 deletions(-) diff --git a/ydb/core/persqueue/pqrb/read_balancer__metrics.cpp b/ydb/core/persqueue/pqrb/read_balancer__metrics.cpp index faff0097ae48..d6bf8166f1a4 100644 --- a/ydb/core/persqueue/pqrb/read_balancer__metrics.cpp +++ b/ydb/core/persqueue/pqrb/read_balancer__metrics.cpp @@ -14,12 +14,27 @@ namespace NKikimr::NPQ { namespace { -struct TMetricCollector { - TMetricCollector(size_t size) - : Counters(size) - { +template +TString GetLables() { + auto desc = NAux::GetLabeledCounterOpts(); + auto groupNames = desc->GetGroupNames(); + + TStringBuilder labels; + for (size_t i = 0; i < desc->GetGroupNamesSize(); ++i) { + if (i) { + labels << "|"; + } + labels << groupNames[i]; } + return labels; +} + + +template +struct TMetricCollector { + using TConfig = TProtobufTabletLabeledCounters; + void Collect(const auto& values) { Collect(values.begin(), values.end()); } @@ -27,36 +42,33 @@ struct TMetricCollector { void Collect(auto begin, auto end) { ssize_t in_size = std::distance(begin, end); AFL_ENSURE(in_size >= 0)("in_size", in_size); - size_t count = std::min(Counters.size(), static_cast(in_size)); + size_t count = std::min(static_cast(Counters.GetCounters().Size()), static_cast(in_size)); for (size_t i = 0; i < count; ++i) { - Counters[i] += *begin; + Counters.GetCounters()[i] += *begin; ++begin; } + + Aggregator.AggregateWith(Counters); } - std::vector Counters; + TConfig Counters = TConfig(GetLables(), 0, ""); + TTabletLabeledCountersBase Aggregator; }; struct TConsumerMetricCollector { TConsumerMetricCollector() - : ClientLabeledCounters(EClientLabeledCounters_descriptor()->value_count()) - //, MLPConsumerLabeledCounters(EMLPConsumerLabeledCounters_descriptor()->value_count()) - //, MLPMessageLockAttemptsCounter(std::size(MLP_LOCKS_INTERVALS)) { } - TMetricCollector ClientLabeledCounters; - //TMetricCollector MLPConsumerLabeledCounters; + TMetricCollector ClientLabeledCounters; + //TMetricCollector MLPConsumerLabeledCounters; //TMetricCollector MLPMessageLockAttemptsCounter; }; struct TTopicMetricCollector { TTopicMetricCollector(absl::flat_hash_map partitionMetrics) : ExistedPartitionMetrics(std::move(partitionMetrics)) - , PartitionLabeledCounters(EPartitionLabeledCounters_descriptor()->value_count()) - , PartitionExtendedLabeledCounters(EPartitionExtendedLabeledCounters_descriptor()->value_count()) - , PartitionKeyCompactionLabeledCounters(EPartitionKeyCompactionLabeledCounters_descriptor()->value_count()) { } @@ -64,9 +76,9 @@ struct TTopicMetricCollector { TTopicMetrics TopicMetrics; - TMetricCollector PartitionLabeledCounters; - TMetricCollector PartitionExtendedLabeledCounters; - TMetricCollector PartitionKeyCompactionLabeledCounters; + TMetricCollector PartitionLabeledCounters; + TMetricCollector PartitionExtendedLabeledCounters; + TMetricCollector PartitionKeyCompactionLabeledCounters; std::unordered_map Consumers; @@ -123,7 +135,6 @@ template TCounters InitializeCounters( NMonitoring::TDynamicCounterPtr root, const TString& databasePath, - const TString labels = "topic", const std::vector>& subgroups = {}, bool skipPrefix = true ) { @@ -134,7 +145,7 @@ TCounters InitializeCounters( } using TConfig = TProtobufTabletLabeledCounters; - auto config = std::make_unique(labels, 0, databasePath); + auto config = std::make_unique(GetLables(), 0, databasePath); std::vector<::NMonitoring::TDynamicCounters::TCounterPtr> result; for (size_t i = 0; i < config->GetCounters().Size(); ++i) { @@ -153,12 +164,16 @@ TCounters InitializeCounters( }; } -void SetCounters(TCounters& counters, const TMetricCollector& metrics) { +void SetCounters(TCounters& counters, const auto& metrics) { ui64 now = TAppData::TimeProvider->Now().MilliSeconds(); + auto& aggregatedCounters = metrics.Aggregator.GetCounters(); for (size_t i = 0; i < counters.Counters.size(); ++i) { - auto value = metrics.Counters[i]; + if (aggregatedCounters.Size() == i) { + break; + } + auto value = aggregatedCounters[i].Get(); const auto& type = counters.Config->GetCounterType(i); if (type == TLabeledCounterOptions::CT_TIMELAG) { value = value < now ? now - value : 0; @@ -199,7 +214,7 @@ void TTopicMetricsHandler::Initialize(const NKikimrPQ::TPQTabletConfig& tabletCo InactivePartitionCountCounter = DynamicCounters->GetExpiringNamedCounter("name", "topic.partition.inactive_count", false); PartitionLabeledCounters = InitializeCounters(DynamicCounters, database.DatabasePath); - PartitionExtendedLabeledCounters = InitializeCounters(DynamicCounters, database.DatabasePath, "topic", {}, true); + PartitionExtendedLabeledCounters = InitializeCounters(DynamicCounters, database.DatabasePath, {}, true); InitializeKeyCompactionCounters(database.DatabasePath, tabletConfig); InitializeConsumerCounters(database.DatabasePath, tabletConfig, ctx); } @@ -209,7 +224,7 @@ void TTopicMetricsHandler::InitializeConsumerCounters(const TString& databasePat auto metricsConsumerName = NPersQueue::ConvertOldConsumerName(consumer.GetName(), ctx); auto& counters = ConsumerCounters[consumer.GetName()]; - counters.ClientLabeledCounters = InitializeCounters(DynamicCounters, databasePath, "topic|x|consumer",{{"consumer", metricsConsumerName}}); + counters.ClientLabeledCounters = InitializeCounters(DynamicCounters, databasePath, {{"consumer", metricsConsumerName}}); if (consumer.GetType() == NKikimrPQ::TPQTabletConfig::CONSUMER_TYPE_MLP) { //metrics.MLPClientLabeledCounters = InitializeCounters(DynamicCounters, databasePath, "topic|consumer", {{"consumer", metricsConsumerName}}); @@ -232,7 +247,7 @@ void TTopicMetricsHandler::InitializeConsumerCounters(const TString& databasePat void TTopicMetricsHandler::InitializeKeyCompactionCounters(const TString& databasePath, const NKikimrPQ::TPQTabletConfig& tabletConfig) { if (tabletConfig.GetEnableCompactification()) { - PartitionKeyCompactionLabeledCounters = InitializeCounters(DynamicCounters, databasePath, "topic", {}, true); + PartitionKeyCompactionLabeledCounters = InitializeCounters(DynamicCounters, databasePath, {}, true); } else { PartitionKeyCompactionLabeledCounters.Counters.clear(); } @@ -297,10 +312,10 @@ void TTopicMetricsHandler::UpdateMetrics() { auto& consumerMetrics = it->second; SetCounters(consumerCounters.ClientLabeledCounters, consumerMetrics.ClientLabeledCounters); - if (!consumerCounters.MLPClientLabeledCounters.Counters.empty()) { + //if (!consumerCounters.MLPClientLabeledCounters.Counters.empty()) { // SetCounters(consumerCounters.MLPClientLabeledCounters, consumerMetrics.MLPConsumerLabeledCounters); // TODO MLPMessageLockAttemptsCounter - } + //} } } From 42a787a11c58dc6398a8df039b1c91ccabeb0d0c Mon Sep 17 00:00:00 2001 From: Nikolay Shestakov Date: Fri, 12 Dec 2025 15:04:12 +0000 Subject: [PATCH 06/16] fix --- ydb/core/persqueue/pqrb/read_balancer.cpp | 4 ++-- ydb/core/persqueue/pqrb/read_balancer__metrics.cpp | 10 +++++----- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/ydb/core/persqueue/pqrb/read_balancer.cpp b/ydb/core/persqueue/pqrb/read_balancer.cpp index 6f6647b134d4..96ebdad2dee6 100644 --- a/ydb/core/persqueue/pqrb/read_balancer.cpp +++ b/ydb/core/persqueue/pqrb/read_balancer.cpp @@ -515,7 +515,7 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvStatus::TPtr& ev, const TAc void TPersQueueReadBalancer::CheckStat(const TActorContext& ctx) { Y_UNUSED(ctx); - //TODO: Deside about changing number of partitions and send request to SchemeShard + //TODO: Decide about changing number of partitions and send request to SchemeShard //TODO: make AlterTopic request via TX_PROXY if (!TTxWritePartitionStatsScheduled) { @@ -537,7 +537,7 @@ void TPersQueueReadBalancer::CheckStat(const TActorContext& ctx) { } void TPersQueueReadBalancer::InitCounters(const TActorContext& ctx) { - if (!DatabaseInfo.DatabasePath.empty()) { + if (DatabaseInfo.DatabasePath.empty()) { return; } diff --git a/ydb/core/persqueue/pqrb/read_balancer__metrics.cpp b/ydb/core/persqueue/pqrb/read_balancer__metrics.cpp index d6bf8166f1a4..f72ba310cd06 100644 --- a/ydb/core/persqueue/pqrb/read_balancer__metrics.cpp +++ b/ydb/core/persqueue/pqrb/read_balancer__metrics.cpp @@ -15,7 +15,7 @@ namespace NKikimr::NPQ { namespace { template -TString GetLables() { +TString GetLabels() { auto desc = NAux::GetLabeledCounterOpts(); auto groupNames = desc->GetGroupNames(); @@ -52,7 +52,7 @@ struct TMetricCollector { Aggregator.AggregateWith(Counters); } - TConfig Counters = TConfig(GetLables(), 0, ""); + TConfig Counters = TConfig(GetLabels(), 0, ""); TTabletLabeledCountersBase Aggregator; }; @@ -145,7 +145,7 @@ TCounters InitializeCounters( } using TConfig = TProtobufTabletLabeledCounters; - auto config = std::make_unique(GetLables(), 0, databasePath); + auto config = std::make_unique(GetLabels(), 0, databasePath); std::vector<::NMonitoring::TDynamicCounters::TCounterPtr> result; for (size_t i = 0; i < config->GetCounters().Size(); ++i) { @@ -178,7 +178,7 @@ void SetCounters(TCounters& counters, const auto& metrics) { if (type == TLabeledCounterOptions::CT_TIMELAG) { value = value < now ? now - value : 0; } - + counters.Counters[i]->Set(value); } } @@ -281,7 +281,7 @@ void TTopicMetricsHandler::InitializePartitions(ui32 partitionId, ui64 dataSize, TopicMetrics.TotalUsedReserveSize += usedReserveSize; } -void TTopicMetricsHandler::Handle(NKikimrPQ::TStatusResponse::TPartResult&& partitionStatus) { +void TTopicMetricsHandler::Handle(NKikimrPQ::TStatusResponse_TPartResult&& partitionStatus) { PartitionStatuses[partitionStatus.GetPartition()] = std::move(partitionStatus); } From 65ae051f0330464fceecc44b0fd0fbbc6076f51b Mon Sep 17 00:00:00 2001 From: Nikolay Shestakov Date: Fri, 12 Dec 2025 15:39:00 +0000 Subject: [PATCH 07/16] fix --- ydb/core/persqueue/pqrb/read_balancer.cpp | 26 +++++++++---------- ydb/core/persqueue/pqrb/read_balancer.h | 4 +-- .../persqueue/pqrb/read_balancer__metrics.cpp | 13 ++++++---- 3 files changed, 23 insertions(+), 20 deletions(-) diff --git a/ydb/core/persqueue/pqrb/read_balancer.cpp b/ydb/core/persqueue/pqrb/read_balancer.cpp index 96ebdad2dee6..eda1950bf53a 100644 --- a/ydb/core/persqueue/pqrb/read_balancer.cpp +++ b/ydb/core/persqueue/pqrb/read_balancer.cpp @@ -442,14 +442,14 @@ void TPersQueueReadBalancer::RequestTabletIfNeeded(const ui64 tabletId, const TA NTabletPipe::SendData(ctx, pipeClient, new TEvPQ::TEvSubDomainStatus(SubDomainOutOfSpace)); - auto it = AggregatedStats.Cookies.find(tabletId); - if (!pipeReconnected || it != AggregatedStats.Cookies.end()) { + auto it = StatsRequestTracker.Cookies.find(tabletId); + if (!pipeReconnected || it != StatsRequestTracker.Cookies.end()) { ui64 cookie; if (pipeReconnected) { cookie = it->second; } else { - cookie = ++AggregatedStats.NextCookie; - AggregatedStats.Cookies[tabletId] = cookie; + cookie = ++StatsRequestTracker.NextCookie; + StatsRequestTracker.Cookies[tabletId] = cookie; } PQ_LOG_D("Send TEvPersQueue::TEvStatus TabletId: " << tabletId << " Cookie: " << cookie); @@ -463,11 +463,11 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvStatusResponse::TPtr& ev, c ui64 tabletId = record.GetTabletId(); ui64 cookie = ev->Cookie; - if ((0 != cookie && cookie != AggregatedStats.Cookies[tabletId]) || (0 == cookie && !AggregatedStats.Cookies.contains(tabletId))) { + if ((0 != cookie && cookie != StatsRequestTracker.Cookies[tabletId]) || (0 == cookie && !StatsRequestTracker.Cookies.contains(tabletId))) { return; } - AggregatedStats.Cookies.erase(tabletId); + StatsRequestTracker.Cookies.erase(tabletId); for (auto& partRes : *record.MutablePartResult()) { ui32 partitionId = partRes.GetPartition(); @@ -490,19 +490,19 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvStatusResponse::TPtr& ev, c Balancer->Handle(ev, ctx); - if (AggregatedStats.Cookies.empty()) { + if (StatsRequestTracker.Cookies.empty()) { CheckStat(ctx); Balancer->ProcessPendingStats(ctx); } } void TPersQueueReadBalancer::Handle(TEvPQ::TEvStatsWakeup::TPtr& ev, const TActorContext& ctx) { - if (AggregatedStats.Round != ev->Get()->Round) { + if (StatsRequestTracker.Round != ev->Get()->Round) { // old message return; } - if (AggregatedStats.Cookies.empty()) { + if (StatsRequestTracker.Cookies.empty()) { return; } @@ -569,14 +569,14 @@ TEvPersQueue::TEvPeriodicTopicStats* TPersQueueReadBalancer::GetStatsEvent() { } void TPersQueueReadBalancer::GetStat(const TActorContext& ctx) { - if (!AggregatedStats.Cookies.empty()) { - AggregatedStats.Cookies.clear(); + if (!StatsRequestTracker.Cookies.empty()) { + StatsRequestTracker.Cookies.clear(); CheckStat(ctx); } for (auto& p : PartitionsInfo) { const ui64& tabletId = p.second.TabletId; - if (AggregatedStats.Cookies.contains(tabletId)) { //already asked stat + if (StatsRequestTracker.Cookies.contains(tabletId)) { //already asked stat continue; } RequestTabletIfNeeded(tabletId, ctx); @@ -588,7 +588,7 @@ void TPersQueueReadBalancer::GetStat(const TActorContext& ctx) { auto stateWakeupInterval = std::max(config.GetBalancerWakeupIntervalSec(), 1); ui64 delayMs = std::min(stateWakeupInterval * 1000, wakeupInterval * 500); if (0 < delayMs) { - Schedule(TDuration::MilliSeconds(delayMs), new TEvPQ::TEvStatsWakeup(++AggregatedStats.Round)); + Schedule(TDuration::MilliSeconds(delayMs), new TEvPQ::TEvStatsWakeup(++StatsRequestTracker.Round)); } } diff --git a/ydb/core/persqueue/pqrb/read_balancer.h b/ydb/core/persqueue/pqrb/read_balancer.h index 7d4db3bc13be..6f89d213f33a 100644 --- a/ydb/core/persqueue/pqrb/read_balancer.h +++ b/ydb/core/persqueue/pqrb/read_balancer.h @@ -226,13 +226,13 @@ class TPersQueueReadBalancer : public TActor, std::unique_ptr TopicMetricsHandler; - struct TAggregatedStats { + struct TStatsRequestTracker { std::unordered_map Cookies; ui64 Round = 0; ui64 NextCookie = 0; }; - TAggregatedStats AggregatedStats; + TStatsRequestTracker StatsRequestTracker; struct TTxWritePartitionStats; bool TTxWritePartitionStatsScheduled = false; diff --git a/ydb/core/persqueue/pqrb/read_balancer__metrics.cpp b/ydb/core/persqueue/pqrb/read_balancer__metrics.cpp index f72ba310cd06..a3f117bc1dd1 100644 --- a/ydb/core/persqueue/pqrb/read_balancer__metrics.cpp +++ b/ydb/core/persqueue/pqrb/read_balancer__metrics.cpp @@ -15,16 +15,16 @@ namespace NKikimr::NPQ { namespace { template -TString GetLabels() { +constexpr std::string GetLabels() { auto desc = NAux::GetLabeledCounterOpts(); auto groupNames = desc->GetGroupNames(); - TStringBuilder labels; + std::string labels; for (size_t i = 0; i < desc->GetGroupNamesSize(); ++i) { if (i) { - labels << "|"; + labels.push_back('|'); } - labels << groupNames[i]; + labels.append(groupNames[i]); } return labels; @@ -169,6 +169,9 @@ void SetCounters(TCounters& counters, const auto& metrics) { auto& aggregatedCounters = metrics.Aggregator.GetCounters(); for (size_t i = 0; i < counters.Counters.size(); ++i) { + if (!counters.Counters[i]) { + continue; + } if (aggregatedCounters.Size() == i) { break; } @@ -286,7 +289,7 @@ void TTopicMetricsHandler::Handle(NKikimrPQ::TStatusResponse_TPartResult&& parti } void TTopicMetricsHandler::UpdateMetrics() { - if (PartitionStatuses.empty()) { + if (!DynamicCounters || PartitionStatuses.empty()) { return; } From fde8e9119f64f1c2bd53b3c8595bd3ad82e844f6 Mon Sep 17 00:00:00 2001 From: Nikolay Shestakov Date: Fri, 12 Dec 2025 15:56:45 +0000 Subject: [PATCH 08/16] fix --- ydb/core/persqueue/pqrb/read_balancer__metrics.cpp | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/ydb/core/persqueue/pqrb/read_balancer__metrics.cpp b/ydb/core/persqueue/pqrb/read_balancer__metrics.cpp index a3f117bc1dd1..e6d4f1ecb96b 100644 --- a/ydb/core/persqueue/pqrb/read_balancer__metrics.cpp +++ b/ydb/core/persqueue/pqrb/read_balancer__metrics.cpp @@ -42,13 +42,14 @@ struct TMetricCollector { void Collect(auto begin, auto end) { ssize_t in_size = std::distance(begin, end); AFL_ENSURE(in_size >= 0)("in_size", in_size); - size_t count = std::min(static_cast(Counters.GetCounters().Size()), static_cast(in_size)); - for (size_t i = 0; i < count; ++i) { - Counters.GetCounters()[i] += *begin; + ssize_t count = std::min(Counters.GetCounters().Size(), in_size); + for (ssize_t i = 0; i < count; ++i) { + Counters.GetCounters()[i] = *begin; ++begin; } + // here, the aggregation function configured in the protofile for each counter is used for each counter. Aggregator.AggregateWith(Counters); } @@ -57,10 +58,6 @@ struct TMetricCollector { }; struct TConsumerMetricCollector { - TConsumerMetricCollector() - { - } - TMetricCollector ClientLabeledCounters; //TMetricCollector MLPConsumerLabeledCounters; //TMetricCollector MLPMessageLockAttemptsCounter; From b4c8f1d743c00c4930cfde67e6855473dcf6ef55 Mon Sep 17 00:00:00 2001 From: Nikolay Shestakov Date: Sat, 13 Dec 2025 04:39:43 +0000 Subject: [PATCH 09/16] fix --- ydb/core/persqueue/pqrb/read_balancer__metrics.cpp | 10 +++++----- ydb/core/persqueue/pqrb/read_balancer__metrics.h | 5 ++--- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/ydb/core/persqueue/pqrb/read_balancer__metrics.cpp b/ydb/core/persqueue/pqrb/read_balancer__metrics.cpp index e6d4f1ecb96b..eb50ce0b8f13 100644 --- a/ydb/core/persqueue/pqrb/read_balancer__metrics.cpp +++ b/ydb/core/persqueue/pqrb/read_balancer__metrics.cpp @@ -77,7 +77,7 @@ struct TTopicMetricCollector { TMetricCollector PartitionExtendedLabeledCounters; TMetricCollector PartitionKeyCompactionLabeledCounters; - std::unordered_map Consumers; + absl::flat_hash_map Consumers; void Collect(const NKikimrPQ::TStatusResponse::TPartResult& partitionStatus) { TopicMetrics.TotalDataSize += partitionStatus.GetPartitionSize(); @@ -232,13 +232,13 @@ void TTopicMetricsHandler::InitializeConsumerCounters(const TString& databasePat } } - std::unordered_set existedConsumers; + absl::flat_hash_set existedConsumers; for (const auto& consumer : tabletConfig.GetConsumers()) { existedConsumers.insert(consumer.GetName()); } for (auto it = ConsumerCounters.begin(); it != ConsumerCounters.end();) { if (!existedConsumers.contains(it->first)) { - it = ConsumerCounters.erase(it); + ConsumerCounters.erase(it++); } else { ++it; } @@ -295,7 +295,7 @@ void TTopicMetricsHandler::UpdateMetrics() { collector.Collect(partitionStatus); } collector.Finish(); - PartitionStatuses.clear(); + PartitionStatuses.erase(PartitionStatuses.begin(), PartitionStatuses.end()); TopicMetrics = collector.TopicMetrics; @@ -319,4 +319,4 @@ void TTopicMetricsHandler::UpdateMetrics() { } } -} \ No newline at end of file +} diff --git a/ydb/core/persqueue/pqrb/read_balancer__metrics.h b/ydb/core/persqueue/pqrb/read_balancer__metrics.h index 54ce6d880014..917900961bad 100644 --- a/ydb/core/persqueue/pqrb/read_balancer__metrics.h +++ b/ydb/core/persqueue/pqrb/read_balancer__metrics.h @@ -7,7 +7,6 @@ #include -#include #include namespace NKikimr::NPQ { @@ -74,8 +73,8 @@ class TTopicMetricsHandler { TCounters MLPClientLabeledCounters; ::NMonitoring::TDynamicCounters::TCounterPtr MLPMessageLockAttemptsCounter; }; - std::unordered_map ConsumerCounters; + absl::flat_hash_map ConsumerCounters; - std::unordered_map PartitionStatuses; + absl::flat_hash_map PartitionStatuses; }; } From d028ea87322e634e5531a9fea9d66a52ec19e05e Mon Sep 17 00:00:00 2001 From: Nikolay Shestakov Date: Sat, 13 Dec 2025 08:21:34 +0000 Subject: [PATCH 10/16] fix --- .../persqueue/pqrb/read_balancer__metrics.cpp | 19 ++++--------------- ydb/core/tablet/tablet_counters_protobuf.h | 18 +++++++++++++++++- 2 files changed, 21 insertions(+), 16 deletions(-) diff --git a/ydb/core/persqueue/pqrb/read_balancer__metrics.cpp b/ydb/core/persqueue/pqrb/read_balancer__metrics.cpp index eb50ce0b8f13..44ee96f7e414 100644 --- a/ydb/core/persqueue/pqrb/read_balancer__metrics.cpp +++ b/ydb/core/persqueue/pqrb/read_balancer__metrics.cpp @@ -15,19 +15,8 @@ namespace NKikimr::NPQ { namespace { template -constexpr std::string GetLabels() { - auto desc = NAux::GetLabeledCounterOpts(); - auto groupNames = desc->GetGroupNames(); - - std::string labels; - for (size_t i = 0; i < desc->GetGroupNamesSize(); ++i) { - if (i) { - labels.push_back('|'); - } - labels.append(groupNames[i]); - } - - return labels; +const TString& GetLabels() { + return NAux::GetLabeledCounterOpts()->GetGroups(); } @@ -53,7 +42,7 @@ struct TMetricCollector { Aggregator.AggregateWith(Counters); } - TConfig Counters = TConfig(GetLabels(), 0, ""); + TConfig Counters; TTabletLabeledCountersBase Aggregator; }; @@ -142,7 +131,7 @@ TCounters InitializeCounters( } using TConfig = TProtobufTabletLabeledCounters; - auto config = std::make_unique(GetLabels(), 0, databasePath); + auto config = std::make_unique(databasePath); std::vector<::NMonitoring::TDynamicCounters::TCounterPtr> result; for (size_t i = 0; i < config->GetCounters().Size(); ++i) { diff --git a/ydb/core/tablet/tablet_counters_protobuf.h b/ydb/core/tablet/tablet_counters_protobuf.h index 54a4230bf380..8dc547c8cae3 100644 --- a/ydb/core/tablet/tablet_counters_protobuf.h +++ b/ydb/core/tablet/tablet_counters_protobuf.h @@ -3,8 +3,9 @@ #include "tablet_counters.h" #include "tablet_counters_aggregator.h" #include -#include +#include #include +#include namespace NKikimr { @@ -275,6 +276,7 @@ struct TLabeledCounterParsedOpts { TVector Types; TVector GroupNamesStrings; TVector GroupNames; + TString Groups; public: explicit TLabeledCounterParsedOpts() : Size(LabeledCountersDesc()->value_count()) @@ -319,7 +321,9 @@ struct TLabeledCounterParsedOpts { std::transform(GroupNamesStrings.begin(), GroupNamesStrings.end(), std::back_inserter(GroupNames), [](auto& string) { return string.data(); } ); + Groups = JoinRange("/", GroupNamesStrings.begin(), GroupNamesStrings.end()); } + virtual ~TLabeledCounterParsedOpts() {} @@ -353,6 +357,11 @@ struct TLabeledCounterParsedOpts { return AggregateFuncs.begin(); } + const TString& GetGroups() const + { + return Groups; + } + protected: TString GetFilePrefix(const NProtoBuf::FileDescriptor* desc) { if (desc->options().HasExtension(TabletTypeName)) { @@ -635,6 +644,13 @@ class TProtobufTabletLabeledCounters : public TTabletLabeledCountersBase { return NAux::GetLabeledCounterOpts(); } + TProtobufTabletLabeledCounters(TMaybe databasePath = Nothing(), const ui64 id = 0) + : TTabletLabeledCountersBase( + SimpleOpts()->Size, SimpleOpts()->GetNames(), SimpleOpts()->GetCounterTypes(), + SimpleOpts()->GetAggregateFuncs(), SimpleOpts()->GetGroups(), SimpleOpts()->GetGroupNames(), id, std::move(databasePath)) + { + } + TProtobufTabletLabeledCounters(const TString& group, const ui64 id) : TTabletLabeledCountersBase( SimpleOpts()->Size, SimpleOpts()->GetNames(), SimpleOpts()->GetCounterTypes(), From 1abb6684e31efdb15846dccaf9ef1f8fb4da0683 Mon Sep 17 00:00:00 2001 From: Nikolay Shestakov Date: Sat, 13 Dec 2025 10:41:31 +0000 Subject: [PATCH 11/16] fix --- ydb/core/tablet/tablet_counters_protobuf.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/core/tablet/tablet_counters_protobuf.h b/ydb/core/tablet/tablet_counters_protobuf.h index 8dc547c8cae3..bdcba1a33eb5 100644 --- a/ydb/core/tablet/tablet_counters_protobuf.h +++ b/ydb/core/tablet/tablet_counters_protobuf.h @@ -321,7 +321,7 @@ struct TLabeledCounterParsedOpts { std::transform(GroupNamesStrings.begin(), GroupNamesStrings.end(), std::back_inserter(GroupNames), [](auto& string) { return string.data(); } ); - Groups = JoinRange("/", GroupNamesStrings.begin(), GroupNamesStrings.end()); + Groups = JoinRange("|", GroupNamesStrings.begin(), GroupNamesStrings.end()); } virtual ~TLabeledCounterParsedOpts() From 70f394fddc23888fde0457d2596ba994e4f19bdc Mon Sep 17 00:00:00 2001 From: Nikolay Shestakov Date: Sat, 13 Dec 2025 10:57:44 +0000 Subject: [PATCH 12/16] fix --- ydb/core/persqueue/pqrb/read_balancer__metrics.cpp | 6 ------ 1 file changed, 6 deletions(-) diff --git a/ydb/core/persqueue/pqrb/read_balancer__metrics.cpp b/ydb/core/persqueue/pqrb/read_balancer__metrics.cpp index 44ee96f7e414..7803ab14f467 100644 --- a/ydb/core/persqueue/pqrb/read_balancer__metrics.cpp +++ b/ydb/core/persqueue/pqrb/read_balancer__metrics.cpp @@ -14,12 +14,6 @@ namespace NKikimr::NPQ { namespace { -template -const TString& GetLabels() { - return NAux::GetLabeledCounterOpts()->GetGroups(); -} - - template struct TMetricCollector { using TConfig = TProtobufTabletLabeledCounters; From 4a6ec76877ea98a774e0bb0c9486d90b48328a7d Mon Sep 17 00:00:00 2001 From: Nikolay Shestakov Date: Sat, 13 Dec 2025 14:16:37 +0000 Subject: [PATCH 13/16] fix --- ydb/core/persqueue/pqrb/read_balancer__metrics.cpp | 8 ++++---- ydb/core/persqueue/pqrb/read_balancer__metrics.h | 2 +- ydb/core/tablet/tablet_counters_protobuf.h | 4 ++-- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/ydb/core/persqueue/pqrb/read_balancer__metrics.cpp b/ydb/core/persqueue/pqrb/read_balancer__metrics.cpp index 7803ab14f467..5609f7e3cf94 100644 --- a/ydb/core/persqueue/pqrb/read_balancer__metrics.cpp +++ b/ydb/core/persqueue/pqrb/read_balancer__metrics.cpp @@ -125,11 +125,11 @@ TCounters InitializeCounters( } using TConfig = TProtobufTabletLabeledCounters; - auto config = std::make_unique(databasePath); + TConfig config(databasePath); std::vector<::NMonitoring::TDynamicCounters::TCounterPtr> result; - for (size_t i = 0; i < config->GetCounters().Size(); ++i) { - TString name = config->GetNames()[i]; + for (size_t i = 0; i < config.GetCounters().Size(); ++i) { + TString name = config.GetNames()[i]; if (skipPrefix) { TStringBuf nameBuf = name; nameBuf.SkipPrefix("PQ/"); @@ -157,7 +157,7 @@ void SetCounters(TCounters& counters, const auto& metrics) { } auto value = aggregatedCounters[i].Get(); - const auto& type = counters.Config->GetCounterType(i); + const auto& type = counters.Config.GetCounterType(i); if (type == TLabeledCounterOptions::CT_TIMELAG) { value = value < now ? now - value : 0; } diff --git a/ydb/core/persqueue/pqrb/read_balancer__metrics.h b/ydb/core/persqueue/pqrb/read_balancer__metrics.h index 917900961bad..cfdead87b7eb 100644 --- a/ydb/core/persqueue/pqrb/read_balancer__metrics.h +++ b/ydb/core/persqueue/pqrb/read_balancer__metrics.h @@ -34,7 +34,7 @@ struct TTopicMetrics { }; struct TCounters { - std::unique_ptr Config; + TTabletLabeledCountersBase Config; std::vector<::NMonitoring::TDynamicCounters::TCounterPtr> Counters; }; diff --git a/ydb/core/tablet/tablet_counters_protobuf.h b/ydb/core/tablet/tablet_counters_protobuf.h index bdcba1a33eb5..6b33855dd3e5 100644 --- a/ydb/core/tablet/tablet_counters_protobuf.h +++ b/ydb/core/tablet/tablet_counters_protobuf.h @@ -646,8 +646,8 @@ class TProtobufTabletLabeledCounters : public TTabletLabeledCountersBase { TProtobufTabletLabeledCounters(TMaybe databasePath = Nothing(), const ui64 id = 0) : TTabletLabeledCountersBase( - SimpleOpts()->Size, SimpleOpts()->GetNames(), SimpleOpts()->GetCounterTypes(), - SimpleOpts()->GetAggregateFuncs(), SimpleOpts()->GetGroups(), SimpleOpts()->GetGroupNames(), id, std::move(databasePath)) + SimpleOpts()->Size, SimpleOpts()->GetSVNames(), SimpleOpts()->GetCounterTypes(), + SimpleOpts()->GetAggregateFuncs(), SimpleOpts()->GetGroups(), SimpleOpts()->GetGroupNames(), id, std::move(databasePath)) { } From 828cf54b1846ce89cd9f56acee86ce78ccb38d96 Mon Sep 17 00:00:00 2001 From: Nikolay Shestakov Date: Sat, 13 Dec 2025 14:39:54 +0000 Subject: [PATCH 14/16] WIP --- ydb/core/persqueue/pqrb/read_balancer.cpp | 4 +- .../persqueue/pqrb/read_balancer__metrics.cpp | 46 ++++++++++--------- .../persqueue/pqrb/read_balancer__metrics.h | 16 +++++-- ydb/core/persqueue/pqrb/read_balancer_app.cpp | 5 +- 4 files changed, 40 insertions(+), 31 deletions(-) diff --git a/ydb/core/persqueue/pqrb/read_balancer.cpp b/ydb/core/persqueue/pqrb/read_balancer.cpp index eda1950bf53a..258400ffdc43 100644 --- a/ydb/core/persqueue/pqrb/read_balancer.cpp +++ b/ydb/core/persqueue/pqrb/read_balancer.cpp @@ -66,10 +66,10 @@ struct TPersQueueReadBalancer::TTxWritePartitionStats : public ITransaction { bool Execute(TTransactionContext& txc, const TActorContext&) override { Self->TTxWritePartitionStatsScheduled = false; - auto& metrics = Self->TopicMetricsHandler->GetTopicMetrics(); + auto& metrics = Self->TopicMetricsHandler->GetPartitionMetrics(); NIceDb::TNiceDb db(txc.DB); - for (auto& [partition, stats] : metrics.PartitionMetrics) { + for (auto& [partition, stats] : metrics) { auto it = Self->PartitionsInfo.find(partition); if (it == Self->PartitionsInfo.end()) { continue; diff --git a/ydb/core/persqueue/pqrb/read_balancer__metrics.cpp b/ydb/core/persqueue/pqrb/read_balancer__metrics.cpp index 5609f7e3cf94..3fbd41d054c2 100644 --- a/ydb/core/persqueue/pqrb/read_balancer__metrics.cpp +++ b/ydb/core/persqueue/pqrb/read_balancer__metrics.cpp @@ -47,12 +47,12 @@ struct TConsumerMetricCollector { }; struct TTopicMetricCollector { - TTopicMetricCollector(absl::flat_hash_map partitionMetrics) - : ExistedPartitionMetrics(std::move(partitionMetrics)) + TTopicMetricCollector(absl::flat_hash_map& partitionMetrics) + : PartitionMetrics(partitionMetrics) { } - absl::flat_hash_map ExistedPartitionMetrics; + absl::flat_hash_map& PartitionMetrics; TTopicMetrics TopicMetrics; @@ -63,25 +63,16 @@ struct TTopicMetricCollector { absl::flat_hash_map Consumers; void Collect(const NKikimrPQ::TStatusResponse::TPartResult& partitionStatus) { - TopicMetrics.TotalDataSize += partitionStatus.GetPartitionSize(); - TopicMetrics.TotalUsedReserveSize += partitionStatus.GetUsedReserveSize(); - - TopicMetrics.TotalAvgWriteSpeedPerSec += partitionStatus.GetAvgWriteSpeedPerSec(); - TopicMetrics.MaxAvgWriteSpeedPerSec = Max(TopicMetrics.MaxAvgWriteSpeedPerSec, partitionStatus.GetAvgWriteSpeedPerSec()); - TopicMetrics.TotalAvgWriteSpeedPerMin += partitionStatus.GetAvgWriteSpeedPerMin(); - TopicMetrics.MaxAvgWriteSpeedPerMin = Max(TopicMetrics.MaxAvgWriteSpeedPerMin, partitionStatus.GetAvgWriteSpeedPerMin()); - TopicMetrics.TotalAvgWriteSpeedPerHour += partitionStatus.GetAvgWriteSpeedPerHour(); - TopicMetrics.MaxAvgWriteSpeedPerHour = Max(TopicMetrics.MaxAvgWriteSpeedPerHour, partitionStatus.GetAvgWriteSpeedPerHour()); - TopicMetrics.TotalAvgWriteSpeedPerDay += partitionStatus.GetAvgWriteSpeedPerDay(); - TopicMetrics.MaxAvgWriteSpeedPerDay = Max(TopicMetrics.MaxAvgWriteSpeedPerDay, partitionStatus.GetAvgWriteSpeedPerDay()); - - auto& partitionMetrics = TopicMetrics.PartitionMetrics[partitionStatus.GetPartition()]; + auto& partitionMetrics = PartitionMetrics[partitionStatus.GetPartition()]; partitionMetrics.DataSize = partitionStatus.GetPartitionSize(); partitionMetrics.UsedReserveSize = partitionStatus.GetUsedReserveSize(); - Collect(partitionStatus.GetAggregatedCounters()); + partitionMetrics.AvgWriteSpeedPerSec = partitionStatus.GetAvgWriteSpeedPerSec(); + partitionMetrics.AvgWriteSpeedPerMin = partitionStatus.GetAvgWriteSpeedPerMin(); + partitionMetrics.AvgWriteSpeedPerHour = partitionStatus.GetAvgWriteSpeedPerHour(); + partitionMetrics.AvgWriteSpeedPerDay = partitionStatus.GetAvgWriteSpeedPerDay(); - ExistedPartitionMetrics.erase(partitionStatus.GetPartition()); + Collect(partitionStatus.GetAggregatedCounters()); } void Collect(const NKikimrPQ::TAggregatedCounters& counters) { @@ -102,11 +93,18 @@ struct TTopicMetricCollector { } void Finish() { - for (auto& [partitionId, partitionMetrics] : ExistedPartitionMetrics) { + for (auto& [_, partitionMetrics] : PartitionMetrics) { TopicMetrics.TotalDataSize += partitionMetrics.DataSize; TopicMetrics.TotalUsedReserveSize += partitionMetrics.UsedReserveSize; - TopicMetrics.PartitionMetrics[partitionId] = partitionMetrics; + TopicMetrics.TotalAvgWriteSpeedPerSec += partitionMetrics.AvgWriteSpeedPerSec; + TopicMetrics.MaxAvgWriteSpeedPerSec = Max(TopicMetrics.MaxAvgWriteSpeedPerSec, partitionMetrics.AvgWriteSpeedPerSec); + TopicMetrics.TotalAvgWriteSpeedPerMin += partitionMetrics.AvgWriteSpeedPerMin; + TopicMetrics.MaxAvgWriteSpeedPerMin = Max(TopicMetrics.MaxAvgWriteSpeedPerMin, partitionMetrics.AvgWriteSpeedPerMin); + TopicMetrics.TotalAvgWriteSpeedPerHour += partitionMetrics.AvgWriteSpeedPerHour; + TopicMetrics.MaxAvgWriteSpeedPerHour = Max(TopicMetrics.MaxAvgWriteSpeedPerHour, partitionMetrics.AvgWriteSpeedPerHour); + TopicMetrics.TotalAvgWriteSpeedPerDay += partitionMetrics.AvgWriteSpeedPerDay; + TopicMetrics.MaxAvgWriteSpeedPerDay = Max(TopicMetrics.MaxAvgWriteSpeedPerDay, partitionMetrics.AvgWriteSpeedPerDay); } } }; @@ -175,6 +173,10 @@ const TTopicMetrics& TTopicMetricsHandler::GetTopicMetrics() const { return TopicMetrics; } +const absl::flat_hash_map& TTopicMetricsHandler::GetPartitionMetrics() const { + return PartitionMetrics; +} + void TTopicMetricsHandler::Initialize(const NKikimrPQ::TPQTabletConfig& tabletConfig, const TDatabaseInfo& database, const TString& topicPath, const NActors::TActorContext& ctx) { if (DynamicCounters) { return; @@ -255,7 +257,7 @@ void TTopicMetricsHandler::UpdateConfig(const NKikimrPQ::TPQTabletConfig& tablet } void TTopicMetricsHandler::InitializePartitions(ui32 partitionId, ui64 dataSize, ui64 usedReserveSize) { - TopicMetrics.PartitionMetrics[partitionId] = { + PartitionMetrics[partitionId] = { .DataSize = dataSize, .UsedReserveSize = usedReserveSize }; @@ -273,7 +275,7 @@ void TTopicMetricsHandler::UpdateMetrics() { return; } - TTopicMetricCollector collector(TopicMetrics.PartitionMetrics); + TTopicMetricCollector collector(PartitionMetrics); for (auto& [_, partitionStatus] : PartitionStatuses) { collector.Collect(partitionStatus); } diff --git a/ydb/core/persqueue/pqrb/read_balancer__metrics.h b/ydb/core/persqueue/pqrb/read_balancer__metrics.h index cfdead87b7eb..44ec0c3d9df7 100644 --- a/ydb/core/persqueue/pqrb/read_balancer__metrics.h +++ b/ydb/core/persqueue/pqrb/read_balancer__metrics.h @@ -25,12 +25,16 @@ struct TTopicMetrics { ui64 MaxAvgWriteSpeedPerHour = 0; ui64 TotalAvgWriteSpeedPerDay = 0; ui64 MaxAvgWriteSpeedPerDay = 0; +}; - struct TPartitionMetrics { - ui64 DataSize = 0; - ui64 UsedReserveSize = 0; - }; - absl::flat_hash_map PartitionMetrics; +struct TPartitionMetrics { + ui64 DataSize = 0; + ui64 UsedReserveSize = 0; + + ui64 AvgWriteSpeedPerSec = 0; + ui64 AvgWriteSpeedPerMin = 0; + ui64 AvgWriteSpeedPerHour = 0; + ui64 AvgWriteSpeedPerDay = 0; }; struct TCounters { @@ -44,6 +48,7 @@ class TTopicMetricsHandler { ~TTopicMetricsHandler(); const TTopicMetrics& GetTopicMetrics() const; + const absl::flat_hash_map& GetPartitionMetrics() const; void Initialize(const NKikimrPQ::TPQTabletConfig& tabletConfig, const TDatabaseInfo& database, const TString& topicPath, const NActors::TActorContext& ctx); void UpdateConfig(const NKikimrPQ::TPQTabletConfig& tabletConfig, const TDatabaseInfo& database, const TString& topicPath, const NActors::TActorContext& ctx); @@ -60,6 +65,7 @@ class TTopicMetricsHandler { NMonitoring::TDynamicCounterPtr DynamicCounters; TTopicMetrics TopicMetrics; + absl::flat_hash_map PartitionMetrics; NMonitoring::TDynamicCounters::TCounterPtr ActivePartitionCountCounter; NMonitoring::TDynamicCounters::TCounterPtr InactivePartitionCountCounter; diff --git a/ydb/core/persqueue/pqrb/read_balancer_app.cpp b/ydb/core/persqueue/pqrb/read_balancer_app.cpp index 6d6ebc2ada4a..4d7fa164d40d 100644 --- a/ydb/core/persqueue/pqrb/read_balancer_app.cpp +++ b/ydb/core/persqueue/pqrb/read_balancer_app.cpp @@ -19,6 +19,7 @@ bool TPersQueueReadBalancer::OnRenderAppHtmlPage(NMon::TEvRemoteHttpInfo::TPtr e TString TPersQueueReadBalancer::GenerateStat() { auto& metrics = TopicMetricsHandler->GetTopicMetrics(); + auto& partitionMetrics = TopicMetricsHandler->GetPartitionMetrics(); TStringStream str; HTML_APP_PAGE(str, "PersQueueReadBalancer " << TabletID() << " (" << Path << ")") { @@ -76,7 +77,7 @@ TString TPersQueueReadBalancer::GenerateStat() { } TABLEBODY() { for (auto& [partitionId, partitionInfo] : PartitionsInfo) { - const auto& stats = metrics.PartitionMetrics.find(partitionId); + const auto& stats = partitionMetrics.find(partitionId); const auto* node = PartitionGraph.GetPartition(partitionId); TString style = node && node->DirectChildren.empty() ? "text-success" : "text-muted"; @@ -111,7 +112,7 @@ TString TPersQueueReadBalancer::GenerateStat() { } } } - TABLED() { str << (stats == metrics.PartitionMetrics.end() ? 0 : stats->second.DataSize); } + TABLED() { str << (stats == partitionMetrics.end() ? 0 : stats->second.DataSize); } } } } From 37c4ce473e906490312134c784649881c196b5d3 Mon Sep 17 00:00:00 2001 From: Nikolay Shestakov Date: Sun, 14 Dec 2025 17:25:18 +0000 Subject: [PATCH 15/16] fix --- .../persqueue/pqrb/read_balancer__metrics.cpp | 21 +++++++------------ .../persqueue/pqrb/read_balancer__metrics.h | 5 ----- 2 files changed, 8 insertions(+), 18 deletions(-) diff --git a/ydb/core/persqueue/pqrb/read_balancer__metrics.cpp b/ydb/core/persqueue/pqrb/read_balancer__metrics.cpp index 3fbd41d054c2..5ea62bac2033 100644 --- a/ydb/core/persqueue/pqrb/read_balancer__metrics.cpp +++ b/ydb/core/persqueue/pqrb/read_balancer__metrics.cpp @@ -67,10 +67,14 @@ struct TTopicMetricCollector { partitionMetrics.DataSize = partitionStatus.GetPartitionSize(); partitionMetrics.UsedReserveSize = partitionStatus.GetUsedReserveSize(); - partitionMetrics.AvgWriteSpeedPerSec = partitionStatus.GetAvgWriteSpeedPerSec(); - partitionMetrics.AvgWriteSpeedPerMin = partitionStatus.GetAvgWriteSpeedPerMin(); - partitionMetrics.AvgWriteSpeedPerHour = partitionStatus.GetAvgWriteSpeedPerHour(); - partitionMetrics.AvgWriteSpeedPerDay = partitionStatus.GetAvgWriteSpeedPerDay(); + TopicMetrics.TotalAvgWriteSpeedPerSec += partitionStatus.GetAvgWriteSpeedPerSec(); + TopicMetrics.MaxAvgWriteSpeedPerSec = Max(TopicMetrics.MaxAvgWriteSpeedPerSec, partitionStatus.GetAvgWriteSpeedPerSec()); + TopicMetrics.TotalAvgWriteSpeedPerMin += partitionStatus.GetAvgWriteSpeedPerMin(); + TopicMetrics.MaxAvgWriteSpeedPerMin = Max(TopicMetrics.MaxAvgWriteSpeedPerMin, partitionStatus.GetAvgWriteSpeedPerMin()); + TopicMetrics.TotalAvgWriteSpeedPerHour += partitionStatus.GetAvgWriteSpeedPerHour(); + TopicMetrics.MaxAvgWriteSpeedPerHour = Max(TopicMetrics.MaxAvgWriteSpeedPerHour, partitionStatus.GetAvgWriteSpeedPerHour()); + TopicMetrics.TotalAvgWriteSpeedPerDay += partitionStatus.GetAvgWriteSpeedPerDay(); + TopicMetrics.MaxAvgWriteSpeedPerDay = Max(TopicMetrics.MaxAvgWriteSpeedPerDay, partitionStatus.GetAvgWriteSpeedPerDay()); Collect(partitionStatus.GetAggregatedCounters()); } @@ -96,15 +100,6 @@ struct TTopicMetricCollector { for (auto& [_, partitionMetrics] : PartitionMetrics) { TopicMetrics.TotalDataSize += partitionMetrics.DataSize; TopicMetrics.TotalUsedReserveSize += partitionMetrics.UsedReserveSize; - - TopicMetrics.TotalAvgWriteSpeedPerSec += partitionMetrics.AvgWriteSpeedPerSec; - TopicMetrics.MaxAvgWriteSpeedPerSec = Max(TopicMetrics.MaxAvgWriteSpeedPerSec, partitionMetrics.AvgWriteSpeedPerSec); - TopicMetrics.TotalAvgWriteSpeedPerMin += partitionMetrics.AvgWriteSpeedPerMin; - TopicMetrics.MaxAvgWriteSpeedPerMin = Max(TopicMetrics.MaxAvgWriteSpeedPerMin, partitionMetrics.AvgWriteSpeedPerMin); - TopicMetrics.TotalAvgWriteSpeedPerHour += partitionMetrics.AvgWriteSpeedPerHour; - TopicMetrics.MaxAvgWriteSpeedPerHour = Max(TopicMetrics.MaxAvgWriteSpeedPerHour, partitionMetrics.AvgWriteSpeedPerHour); - TopicMetrics.TotalAvgWriteSpeedPerDay += partitionMetrics.AvgWriteSpeedPerDay; - TopicMetrics.MaxAvgWriteSpeedPerDay = Max(TopicMetrics.MaxAvgWriteSpeedPerDay, partitionMetrics.AvgWriteSpeedPerDay); } } }; diff --git a/ydb/core/persqueue/pqrb/read_balancer__metrics.h b/ydb/core/persqueue/pqrb/read_balancer__metrics.h index 44ec0c3d9df7..8c9ff8d3d31c 100644 --- a/ydb/core/persqueue/pqrb/read_balancer__metrics.h +++ b/ydb/core/persqueue/pqrb/read_balancer__metrics.h @@ -30,11 +30,6 @@ struct TTopicMetrics { struct TPartitionMetrics { ui64 DataSize = 0; ui64 UsedReserveSize = 0; - - ui64 AvgWriteSpeedPerSec = 0; - ui64 AvgWriteSpeedPerMin = 0; - ui64 AvgWriteSpeedPerHour = 0; - ui64 AvgWriteSpeedPerDay = 0; }; struct TCounters { From 26f3cd0af43af032e3b4393aeb3ebedcb7ab28ac Mon Sep 17 00:00:00 2001 From: Nikolay Shestakov Date: Sun, 14 Dec 2025 18:30:57 +0000 Subject: [PATCH 16/16] fix --- .../persqueue/pqrb/read_balancer__metrics.cpp | 39 +++++++++---------- .../persqueue/pqrb/read_balancer__metrics.h | 6 +-- ydb/core/tablet/tablet_counters_protobuf.h | 5 +++ 3 files changed, 27 insertions(+), 23 deletions(-) diff --git a/ydb/core/persqueue/pqrb/read_balancer__metrics.cpp b/ydb/core/persqueue/pqrb/read_balancer__metrics.cpp index 5ea62bac2033..bda49aefd468 100644 --- a/ydb/core/persqueue/pqrb/read_balancer__metrics.cpp +++ b/ydb/core/persqueue/pqrb/read_balancer__metrics.cpp @@ -32,7 +32,7 @@ struct TMetricCollector { ++begin; } - // here, the aggregation function configured in the protofile for each counter is used for each counter. + // The aggregation function configured in the protofile is used for each counter. Aggregator.AggregateWith(Counters); } @@ -107,7 +107,6 @@ struct TTopicMetricCollector { template TCounters InitializeCounters( NMonitoring::TDynamicCounterPtr root, - const TString& databasePath, const std::vector>& subgroups = {}, bool skipPrefix = true ) { @@ -117,12 +116,11 @@ TCounters InitializeCounters( group = group->GetSubgroup(subgroup.first, subgroup.second); } - using TConfig = TProtobufTabletLabeledCounters; - TConfig config(databasePath); + const auto* config = NAux::GetLabeledCounterOpts(); std::vector<::NMonitoring::TDynamicCounters::TCounterPtr> result; - for (size_t i = 0; i < config.GetCounters().Size(); ++i) { - TString name = config.GetNames()[i]; + for (size_t i = 0; i < config->Size; ++i) { + TString name = config->GetSVNames()[i]; if (skipPrefix) { TStringBuf nameBuf = name; nameBuf.SkipPrefix("PQ/"); @@ -132,7 +130,7 @@ TCounters InitializeCounters( } return { - .Config = std::move(config), + .Types = config->GetTypes(), .Counters = std::move(result) }; } @@ -150,7 +148,7 @@ void SetCounters(TCounters& counters, const auto& metrics) { } auto value = aggregatedCounters[i].Get(); - const auto& type = counters.Config.GetCounterType(i); + const auto& type = counters.Types[i]; if (type == TLabeledCounterOptions::CT_TIMELAG) { value = value < now ? now - value : 0; } @@ -193,22 +191,22 @@ void TTopicMetricsHandler::Initialize(const NKikimrPQ::TPQTabletConfig& tabletCo ActivePartitionCountCounter = DynamicCounters->GetExpiringNamedCounter("name", "topic.partition.active_count", false); InactivePartitionCountCounter = DynamicCounters->GetExpiringNamedCounter("name", "topic.partition.inactive_count", false); - PartitionLabeledCounters = InitializeCounters(DynamicCounters, database.DatabasePath); - PartitionExtendedLabeledCounters = InitializeCounters(DynamicCounters, database.DatabasePath, {}, true); - InitializeKeyCompactionCounters(database.DatabasePath, tabletConfig); - InitializeConsumerCounters(database.DatabasePath, tabletConfig, ctx); + PartitionLabeledCounters = InitializeCounters(DynamicCounters); + PartitionExtendedLabeledCounters = InitializeCounters(DynamicCounters, {}, true); + InitializeKeyCompactionCounters(tabletConfig); + InitializeConsumerCounters(tabletConfig, ctx); } -void TTopicMetricsHandler::InitializeConsumerCounters(const TString& databasePath, const NKikimrPQ::TPQTabletConfig& tabletConfig, const NActors::TActorContext& ctx) { +void TTopicMetricsHandler::InitializeConsumerCounters(const NKikimrPQ::TPQTabletConfig& tabletConfig, const NActors::TActorContext& ctx) { for (const auto& consumer : tabletConfig.GetConsumers()) { auto metricsConsumerName = NPersQueue::ConvertOldConsumerName(consumer.GetName(), ctx); auto& counters = ConsumerCounters[consumer.GetName()]; - counters.ClientLabeledCounters = InitializeCounters(DynamicCounters, databasePath, {{"consumer", metricsConsumerName}}); + counters.ClientLabeledCounters = InitializeCounters(DynamicCounters, {{"consumer", metricsConsumerName}}); if (consumer.GetType() == NKikimrPQ::TPQTabletConfig::CONSUMER_TYPE_MLP) { - //metrics.MLPClientLabeledCounters = InitializeCounters(DynamicCounters, databasePath, "topic|consumer", {{"consumer", metricsConsumerName}}); - //metrics.MLPMessageLockAttemptsCounter = InitializeCounters(DynamicCounters, databasePath, {{"consumer", metricsConsumerName}}); + //metrics.MLPClientLabeledCounters = InitializeCounters(DynamicCounters, {{"consumer", metricsConsumerName}}); + //metrics.MLPMessageLockAttemptsCounter = InitializeCounters(DynamicCounters, {{"consumer", metricsConsumerName}}); } } @@ -225,23 +223,24 @@ void TTopicMetricsHandler::InitializeConsumerCounters(const TString& databasePat } } -void TTopicMetricsHandler::InitializeKeyCompactionCounters(const TString& databasePath, const NKikimrPQ::TPQTabletConfig& tabletConfig) { +void TTopicMetricsHandler::InitializeKeyCompactionCounters(const NKikimrPQ::TPQTabletConfig& tabletConfig) { if (tabletConfig.GetEnableCompactification()) { - PartitionKeyCompactionLabeledCounters = InitializeCounters(DynamicCounters, databasePath, {}, true); + PartitionKeyCompactionLabeledCounters = InitializeCounters(DynamicCounters, {}, true); } else { PartitionKeyCompactionLabeledCounters.Counters.clear(); } } void TTopicMetricsHandler::UpdateConfig(const NKikimrPQ::TPQTabletConfig& tabletConfig, const TDatabaseInfo& database, const TString& topicPath, const NActors::TActorContext& ctx) { + Y_UNUSED(database); Y_UNUSED(topicPath); if (!DynamicCounters) { return; } - InitializeKeyCompactionCounters(database.DatabasePath, tabletConfig); - InitializeConsumerCounters(database.DatabasePath, tabletConfig, ctx); + InitializeKeyCompactionCounters(tabletConfig); + InitializeConsumerCounters(tabletConfig, ctx); size_t inactiveCount = std::count_if(tabletConfig.GetAllPartitions().begin(), tabletConfig.GetAllPartitions().end(), [](auto& p) { return p.GetStatus() == NKikimrPQ::ETopicPartitionStatus::Inactive; diff --git a/ydb/core/persqueue/pqrb/read_balancer__metrics.h b/ydb/core/persqueue/pqrb/read_balancer__metrics.h index 8c9ff8d3d31c..b84b0039aeb6 100644 --- a/ydb/core/persqueue/pqrb/read_balancer__metrics.h +++ b/ydb/core/persqueue/pqrb/read_balancer__metrics.h @@ -33,7 +33,7 @@ struct TPartitionMetrics { }; struct TCounters { - TTabletLabeledCountersBase Config; + std::vector Types; std::vector<::NMonitoring::TDynamicCounters::TCounterPtr> Counters; }; @@ -53,8 +53,8 @@ class TTopicMetricsHandler { void UpdateMetrics(); protected: - void InitializeKeyCompactionCounters(const TString& databasePath, const NKikimrPQ::TPQTabletConfig& tabletConfig); - void InitializeConsumerCounters(const TString& databasePath, const NKikimrPQ::TPQTabletConfig& tabletConfig, const NActors::TActorContext& ctx); + void InitializeKeyCompactionCounters(const NKikimrPQ::TPQTabletConfig& tabletConfig); + void InitializeConsumerCounters(const NKikimrPQ::TPQTabletConfig& tabletConfig, const NActors::TActorContext& ctx); private: NMonitoring::TDynamicCounterPtr DynamicCounters; diff --git a/ydb/core/tablet/tablet_counters_protobuf.h b/ydb/core/tablet/tablet_counters_protobuf.h index 6b33855dd3e5..d9933c88ed29 100644 --- a/ydb/core/tablet/tablet_counters_protobuf.h +++ b/ydb/core/tablet/tablet_counters_protobuf.h @@ -362,6 +362,11 @@ struct TLabeledCounterParsedOpts { return Groups; } + const TVector& GetTypes() const + { + return Types; + } + protected: TString GetFilePrefix(const NProtoBuf::FileDescriptor* desc) { if (desc->options().HasExtension(TabletTypeName)) {