diff --git a/ydb/core/persqueue/pqrb/read_balancer.cpp b/ydb/core/persqueue/pqrb/read_balancer.cpp index f9e9d41ed6c9..258400ffdc43 100644 --- a/ydb/core/persqueue/pqrb/read_balancer.cpp +++ b/ydb/core/persqueue/pqrb/read_balancer.cpp @@ -1,15 +1,17 @@ #include "read_balancer.h" #include "read_balancer__balancing.h" +#include "read_balancer__metrics.h" #include "read_balancer__mlp_balancing.h" #include "read_balancer__txpreinit.h" #include "read_balancer__txwrite.h" #include "read_balancer_log.h" #include "mirror_describer_factory.h" +#include #include #include -#include #include + #include #include #include @@ -47,6 +49,7 @@ TPersQueueReadBalancer::TPersQueueReadBalancer(const TActorId &tablet, TTabletSt , StartPartitionIdForWrite(0) , TotalGroups(0) , ResourceMetrics(nullptr) + , TopicMetricsHandler(std::make_unique()) , StatsReportRound(0) { Balancer = std::make_unique(*this); @@ -63,11 +66,10 @@ struct TPersQueueReadBalancer::TTxWritePartitionStats : public ITransaction { bool Execute(TTransactionContext& txc, const TActorContext&) override { Self->TTxWritePartitionStatsScheduled = false; - NIceDb::TNiceDb db(txc.DB); - for (auto& s : Self->AggregatedStats.Stats) { - auto partition = s.first; - auto& stats = s.second; + auto& metrics = Self->TopicMetricsHandler->GetPartitionMetrics(); + NIceDb::TNiceDb db(txc.DB); + for (auto& [partition, stats] : metrics) { auto it = Self->PartitionsInfo.find(partition); if (it == Self->PartitionsInfo.end()) { continue; @@ -284,7 +286,7 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvUpdateBalancerConfig::TPtr if (SplitMergeEnabled(TabletConfig)) { if (!PartitionsScaleManager) { - PartitionsScaleManager = std::make_unique(Topic, Path, DatabasePath, PathId, Version, TabletConfig, PartitionGraph); + PartitionsScaleManager = std::make_unique(Topic, Path, DatabaseInfo.DatabasePath, PathId, Version, TabletConfig, PartitionGraph); } else { PartitionsScaleManager->UpdateBalancerConfig(PathId, Version, TabletConfig); } @@ -440,14 +442,14 @@ void TPersQueueReadBalancer::RequestTabletIfNeeded(const ui64 tabletId, const TA NTabletPipe::SendData(ctx, pipeClient, new TEvPQ::TEvSubDomainStatus(SubDomainOutOfSpace)); - auto it = AggregatedStats.Cookies.find(tabletId); - if (!pipeReconnected || it != AggregatedStats.Cookies.end()) { + auto it = StatsRequestTracker.Cookies.find(tabletId); + if (!pipeReconnected || it != StatsRequestTracker.Cookies.end()) { ui64 cookie; if (pipeReconnected) { cookie = it->second; } else { - cookie = ++AggregatedStats.NextCookie; - AggregatedStats.Cookies[tabletId] = cookie; + cookie = ++StatsRequestTracker.NextCookie; + StatsRequestTracker.Cookies[tabletId] = cookie; } PQ_LOG_D("Send TEvPersQueue::TEvStatus TabletId: " << tabletId << " Cookie: " << cookie); @@ -457,17 +459,17 @@ void TPersQueueReadBalancer::RequestTabletIfNeeded(const ui64 tabletId, const TA void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvStatusResponse::TPtr& ev, const TActorContext& ctx) { - const auto& record = ev->Get()->Record; + auto& record = ev->Get()->Record; ui64 tabletId = record.GetTabletId(); ui64 cookie = ev->Cookie; - if ((0 != cookie && cookie != AggregatedStats.Cookies[tabletId]) || (0 == cookie && !AggregatedStats.Cookies.contains(tabletId))) { + if ((0 != cookie && cookie != StatsRequestTracker.Cookies[tabletId]) || (0 == cookie && !StatsRequestTracker.Cookies.contains(tabletId))) { return; } - AggregatedStats.Cookies.erase(tabletId); + StatsRequestTracker.Cookies.erase(tabletId); - for (const auto& partRes : record.GetPartResult()) { + for (auto& partRes : *record.MutablePartResult()) { ui32 partitionId = partRes.GetPartition(); if (!PartitionsInfo.contains(partitionId)) { continue; @@ -483,28 +485,24 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvStatusResponse::TPtr& ev, c ); } - AggregatedStats.AggrStats(partitionId, partRes.GetPartitionSize(), partRes.GetUsedReserveSize()); - AggregatedStats.AggrStats(partRes.GetAvgWriteSpeedPerSec(), partRes.GetAvgWriteSpeedPerMin(), - partRes.GetAvgWriteSpeedPerHour(), partRes.GetAvgWriteSpeedPerDay()); - AggregatedStats.Stats[partitionId].Counters = partRes.GetAggregatedCounters(); - AggregatedStats.Stats[partitionId].HasCounters = true; + TopicMetricsHandler->Handle(std::move(partRes)); } Balancer->Handle(ev, ctx); - if (AggregatedStats.Cookies.empty()) { + if (StatsRequestTracker.Cookies.empty()) { CheckStat(ctx); Balancer->ProcessPendingStats(ctx); } } void TPersQueueReadBalancer::Handle(TEvPQ::TEvStatsWakeup::TPtr& ev, const TActorContext& ctx) { - if (AggregatedStats.Round != ev->Get()->Round) { + if (StatsRequestTracker.Round != ev->Get()->Round) { // old message return; } - if (AggregatedStats.Cookies.empty()) { + if (StatsRequestTracker.Cookies.empty()) { return; } @@ -515,37 +513,9 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvStatus::TPtr& ev, const TAc Send(ev.Get()->Sender, GetStatsEvent()); } -void TPersQueueReadBalancer::TAggregatedStats::AggrStats(ui32 partition, ui64 dataSize, ui64 usedReserveSize) { - AFL_ENSURE(dataSize >= usedReserveSize); - - auto& oldValue = Stats[partition]; - - TPartitionStats newValue; - newValue.DataSize = dataSize; - newValue.UsedReserveSize = usedReserveSize; - - TotalDataSize += (newValue.DataSize - oldValue.DataSize); - TotalUsedReserveSize += (newValue.UsedReserveSize - oldValue.UsedReserveSize); - - AFL_ENSURE(TotalDataSize >= TotalUsedReserveSize); - - oldValue = newValue; -} - -void TPersQueueReadBalancer::TAggregatedStats::AggrStats(ui64 avgWriteSpeedPerSec, ui64 avgWriteSpeedPerMin, ui64 avgWriteSpeedPerHour, ui64 avgWriteSpeedPerDay) { - NewMetrics.TotalAvgWriteSpeedPerSec += avgWriteSpeedPerSec; - NewMetrics.MaxAvgWriteSpeedPerSec = Max(NewMetrics.MaxAvgWriteSpeedPerSec, avgWriteSpeedPerSec); - NewMetrics.TotalAvgWriteSpeedPerMin += avgWriteSpeedPerMin; - NewMetrics.MaxAvgWriteSpeedPerMin = Max(NewMetrics.MaxAvgWriteSpeedPerMin, avgWriteSpeedPerMin); - NewMetrics.TotalAvgWriteSpeedPerHour += avgWriteSpeedPerHour; - NewMetrics.MaxAvgWriteSpeedPerHour = Max(NewMetrics.MaxAvgWriteSpeedPerHour, avgWriteSpeedPerHour); - NewMetrics.TotalAvgWriteSpeedPerDay += avgWriteSpeedPerDay; - NewMetrics.MaxAvgWriteSpeedPerDay = Max(NewMetrics.MaxAvgWriteSpeedPerDay, avgWriteSpeedPerDay); -} - void TPersQueueReadBalancer::CheckStat(const TActorContext& ctx) { Y_UNUSED(ctx); - //TODO: Deside about changing number of partitions and send request to SchemeShard + //TODO: Decide about changing number of partitions and send request to SchemeShard //TODO: make AlterTopic request via TX_PROXY if (!TTxWritePartitionStatsScheduled) { @@ -553,206 +523,60 @@ void TPersQueueReadBalancer::CheckStat(const TActorContext& ctx) { Execute(new TTxWritePartitionStats(this)); } - AggregatedStats.Metrics = AggregatedStats.NewMetrics; + UpdateCounters(ctx); TEvPersQueue::TEvPeriodicTopicStats* ev = GetStatsEvent(); PQ_LOG_D("Send TEvPeriodicTopicStats PathId: " << PathId << " Generation: " << Generation << " StatsReportRound: " << StatsReportRound - << " DataSize: " << AggregatedStats.TotalDataSize - << " UsedReserveSize: " << AggregatedStats.TotalUsedReserveSize); + << " DataSize: " << TopicMetricsHandler->GetTopicMetrics().TotalDataSize + << " UsedReserveSize: " << TopicMetricsHandler->GetTopicMetrics().TotalUsedReserveSize); NTabletPipe::SendData(ctx, GetPipeClient(SchemeShardId, ctx), ev); - UpdateCounters(ctx); } void TPersQueueReadBalancer::InitCounters(const TActorContext& ctx) { - if (!DatabasePath) { - return; - } - - if (DynamicCounters) { + if (DatabaseInfo.DatabasePath.empty()) { return; } - TStringBuf name = TStringBuf(Path); - name.SkipPrefix(DatabasePath); - name.SkipPrefix("/"); - - bool isServerless = AppData(ctx)->FeatureFlags.GetEnableDbCounters(); //TODO: find out it via describe - DynamicCounters = AppData(ctx)->Counters->GetSubgroup("counters", isServerless ? "topics_serverless" : "topics") - ->GetSubgroup("host", "") - ->GetSubgroup("database", DatabasePath) - ->GetSubgroup("cloud_id", CloudId) - ->GetSubgroup("folder_id", FolderId) - ->GetSubgroup("database_id", DatabaseId) - ->GetSubgroup("topic", TString(name)); - - ActivePartitionCountCounter = DynamicCounters->GetExpiringNamedCounter("name", "topic.partition.active_count", false); - InactivePartitionCountCounter = DynamicCounters->GetExpiringNamedCounter("name", "topic.partition.inactive_count", false); + TopicMetricsHandler->Initialize(TabletConfig, DatabaseInfo, Path, ctx); } void TPersQueueReadBalancer::UpdateConfigCounters() { - if (!DynamicCounters) { - return; - } - - size_t inactiveCount = std::count_if(TabletConfig.GetAllPartitions().begin(), TabletConfig.GetAllPartitions().end(), [](auto& p) { - return p.GetStatus() == NKikimrPQ::ETopicPartitionStatus::Inactive; - }); - - ActivePartitionCountCounter->Set(PartitionsInfo.size() - inactiveCount); - InactivePartitionCountCounter->Set(inactiveCount); + TopicMetricsHandler->UpdateConfig(TabletConfig, DatabaseInfo, Path, ActorContext()); } -void TPersQueueReadBalancer::UpdateCounters(const TActorContext& ctx) { - if (!AggregatedStats.Stats.size()) { - return; - } - - if (!DynamicCounters) { - return; - } - - auto ensureCounters = [&](auto& counters, auto& config, const std::vector>& subgroups = {}, bool skipPrefix = true) { - auto group = DynamicCounters; - if (counters.empty()) { - for (const auto& subgroup : subgroups) { - group = group->GetSubgroup(subgroup.first, subgroup.second); - } - - for (size_t i = 0; i < config->GetCounters().Size(); ++i) { - TString name = config->GetNames()[i]; - if (skipPrefix) { - TStringBuf nameBuf = name; - nameBuf.SkipPrefix("PQ/"); - name = nameBuf; - } - counters.push_back(name.empty() ? nullptr : group->GetExpiringNamedCounter("name", name, false)); - } - } - }; - - using TPartitionLabeledCounters = TProtobufTabletLabeledCounters; - auto labeledCounters = std::make_unique("topic", 0, DatabasePath); - ensureCounters(AggregatedCounters, labeledCounters); - - using TPartitionExtendedLabeledCounters = TProtobufTabletLabeledCounters; - auto extendedLabeledCounters = std::make_unique("topic", 0, DatabasePath); - ensureCounters(AggregatedExtendedCounters, extendedLabeledCounters, {}, false); - - using TPartitionKeyCompactionCounters = TProtobufTabletLabeledCounters; - auto compactionCounters = std::make_unique("topic", 0, DatabasePath); - if (TabletConfig.GetEnableCompactification()) { - ensureCounters(AggregatedCompactionCounters, compactionCounters); - } else { - AggregatedCompactionCounters.clear(); - } - - using TConsumerLabeledCounters = TProtobufTabletLabeledCounters; - auto labeledConsumerCounters = std::make_unique("topic|x|consumer", 0, DatabasePath); - for (auto& [consumer, info]: Consumers) { - ensureCounters(info.AggregatedCounters, labeledConsumerCounters, {{"consumer", NPersQueue::ConvertOldConsumerName(consumer, ctx)}}); - info.Aggr.Reset(new TTabletLabeledCountersBase{}); - } - - /*** apply counters ****/ - - ui64 milliSeconds = TAppData::TimeProvider->Now().MilliSeconds(); - - auto aggr = std::make_unique(); - auto aggrExtended = std::make_unique(); - auto compactionAggr = std::make_unique(); - - auto setCounters = [](auto& counters, const auto& state) { - for (size_t i = 0; i < counters->GetCounters().Size() && i < state.ValuesSize(); ++i) { - counters->GetCounters()[i] = state.GetValues(i); - } - }; - - for (auto it = AggregatedStats.Stats.begin(); it != AggregatedStats.Stats.end(); ++it) { - auto& partitionStats = it->second; - - if (!partitionStats.HasCounters) { - continue; - } - - setCounters(labeledCounters, partitionStats.Counters); - aggr->AggregateWith(*labeledCounters); - - setCounters(extendedLabeledCounters, partitionStats.Counters.GetExtendedCounters()); - aggrExtended->AggregateWith(*extendedLabeledCounters); - - if (TabletConfig.GetEnableCompactification()) { - setCounters(compactionCounters, partitionStats.Counters.GetCompactionCounters()); - compactionAggr->AggregateWith(*compactionCounters); - } - - for (const auto& consumerStats : partitionStats.Counters.GetConsumerAggregatedCounters()) { - auto jt = Consumers.find(consumerStats.GetConsumer()); - if (jt == Consumers.end()) { - continue; - } - auto& consumerInfo = jt->second; - - setCounters(labeledConsumerCounters, consumerStats); - consumerInfo.Aggr->AggregateWith(*labeledConsumerCounters); - } - } - - auto processAggregators = [milliSeconds](auto& aggregator, auto& counters) { - for (size_t i = 0; aggregator->HasCounters() && i < aggregator->GetCounters().Size(); ++i) { - if (!counters[i]) { - continue; - } - const auto& type = aggregator->GetCounterType(i); - auto val = aggregator->GetCounters()[i].Get(); - if (type == TLabeledCounterOptions::CT_TIMELAG) { - val = val <= milliSeconds ? milliSeconds - val : 0; - } - counters[i]->Set(val); - } - }; - - /*** show counters ***/ - processAggregators(aggr, AggregatedCounters); - processAggregators(aggrExtended, AggregatedExtendedCounters); - processAggregators(compactionAggr, AggregatedCompactionCounters); - - for (auto& [consumer, info] : Consumers) { - processAggregators(info.Aggr, info.AggregatedCounters); - } +void TPersQueueReadBalancer::UpdateCounters(const TActorContext&) { + TopicMetricsHandler->UpdateMetrics(); } TEvPersQueue::TEvPeriodicTopicStats* TPersQueueReadBalancer::GetStatsEvent() { + auto& metrics = TopicMetricsHandler->GetTopicMetrics(); + TEvPersQueue::TEvPeriodicTopicStats* ev = new TEvPersQueue::TEvPeriodicTopicStats(); auto& rec = ev->Record; rec.SetPathId(PathId); rec.SetGeneration(Generation); rec.SetRound(++StatsReportRound); - rec.SetDataSize(AggregatedStats.TotalDataSize); - rec.SetUsedReserveSize(AggregatedStats.TotalUsedReserveSize); + rec.SetDataSize(metrics.TotalDataSize); + rec.SetUsedReserveSize(metrics.TotalUsedReserveSize); rec.SetSubDomainOutOfSpace(SubDomainOutOfSpace); return ev; } void TPersQueueReadBalancer::GetStat(const TActorContext& ctx) { - if (!AggregatedStats.Cookies.empty()) { - AggregatedStats.Cookies.clear(); + if (!StatsRequestTracker.Cookies.empty()) { + StatsRequestTracker.Cookies.clear(); CheckStat(ctx); } - TPartitionMetrics newMetrics; - AggregatedStats.NewMetrics = newMetrics; - for (auto& p : PartitionsInfo) { - AggregatedStats.Stats[p.first].HasCounters = false; - const ui64& tabletId = p.second.TabletId; - if (AggregatedStats.Cookies.contains(tabletId)) { //already asked stat + if (StatsRequestTracker.Cookies.contains(tabletId)) { //already asked stat continue; } RequestTabletIfNeeded(tabletId, ctx); @@ -764,7 +588,7 @@ void TPersQueueReadBalancer::GetStat(const TActorContext& ctx) { auto stateWakeupInterval = std::max(config.GetBalancerWakeupIntervalSec(), 1); ui64 delayMs = std::min(stateWakeupInterval * 1000, wakeupInterval * 500); if (0 < delayMs) { - Schedule(TDuration::MilliSeconds(delayMs), new TEvPQ::TEvStatsWakeup(++AggregatedStats.Round)); + Schedule(TDuration::MilliSeconds(delayMs), new TEvPQ::TEvStatsWakeup(++StatsRequestTracker.Round)); } } @@ -916,12 +740,12 @@ void TPersQueueReadBalancer::StartWatchingSubDomainPathId() { void TPersQueueReadBalancer::Handle(TEvTxProxySchemeCache::TEvWatchNotifyUpdated::TPtr& ev, const TActorContext& ctx) { const auto* msg = ev->Get(); - if (DatabasePath.empty()) { - DatabasePath = msg->Result->GetPath(); + if (DatabaseInfo.DatabasePath.empty()) { + DatabaseInfo.DatabasePath = msg->Result->GetPath(); for (const auto& attr : msg->Result->GetPathDescription().GetUserAttributes()) { - if (attr.GetKey() == "folder_id") FolderId = attr.GetValue(); - if (attr.GetKey() == "cloud_id") CloudId = attr.GetValue(); - if (attr.GetKey() == "database_id") DatabaseId = attr.GetValue(); + if (attr.GetKey() == "folder_id") DatabaseInfo.FolderId = attr.GetValue(); + if (attr.GetKey() == "cloud_id") DatabaseInfo.CloudId = attr.GetValue(); + if (attr.GetKey() == "database_id") DatabaseInfo.DatabaseId = attr.GetValue(); } InitCounters(ctx); @@ -929,7 +753,7 @@ void TPersQueueReadBalancer::Handle(TEvTxProxySchemeCache::TEvWatchNotifyUpdated } if (PartitionsScaleManager) { - PartitionsScaleManager->UpdateDatabasePath(DatabasePath); + PartitionsScaleManager->UpdateDatabasePath(DatabaseInfo.DatabasePath); } if (SubDomainPathId && msg->PathId == *SubDomainPathId) { diff --git a/ydb/core/persqueue/pqrb/read_balancer.h b/ydb/core/persqueue/pqrb/read_balancer.h index 14f22bb5dad4..6f89d213f33a 100644 --- a/ydb/core/persqueue/pqrb/read_balancer.h +++ b/ydb/core/persqueue/pqrb/read_balancer.h @@ -30,6 +30,15 @@ class TBalancer; class TMLPBalancer; } +class TTopicMetricsHandler; + +struct TDatabaseInfo { + TString DatabasePath; + TString DatabaseId; + TString FolderId; + TString CloudId; +}; + class TMetricsTimeKeeper { public: @@ -213,54 +222,17 @@ class TPersQueueReadBalancer : public TActor, std::unordered_map TabletPipes; std::unordered_set PipesRequested; - std::vector<::NMonitoring::TDynamicCounters::TCounterPtr> AggregatedCounters; - std::vector<::NMonitoring::TDynamicCounters::TCounterPtr> AggregatedExtendedCounters; - std::vector<::NMonitoring::TDynamicCounters::TCounterPtr> AggregatedCompactionCounters; + TDatabaseInfo DatabaseInfo; - NMonitoring::TDynamicCounterPtr DynamicCounters; - NMonitoring::TDynamicCounters::TCounterPtr ActivePartitionCountCounter; - NMonitoring::TDynamicCounters::TCounterPtr InactivePartitionCountCounter; + std::unique_ptr TopicMetricsHandler; - TString DatabasePath; - TString DatabaseId; - TString FolderId; - TString CloudId; - - struct TPartitionStats { - ui64 DataSize = 0; - ui64 UsedReserveSize = 0; - NKikimrPQ::TAggregatedCounters Counters; - bool HasCounters = false; - }; - - struct TPartitionMetrics { - ui64 TotalAvgWriteSpeedPerSec = 0; - ui64 MaxAvgWriteSpeedPerSec = 0; - ui64 TotalAvgWriteSpeedPerMin = 0; - ui64 MaxAvgWriteSpeedPerMin = 0; - ui64 TotalAvgWriteSpeedPerHour = 0; - ui64 MaxAvgWriteSpeedPerHour = 0; - ui64 TotalAvgWriteSpeedPerDay = 0; - ui64 MaxAvgWriteSpeedPerDay = 0; - }; - - struct TAggregatedStats { - std::unordered_map Stats; + struct TStatsRequestTracker { std::unordered_map Cookies; - ui64 TotalDataSize = 0; - ui64 TotalUsedReserveSize = 0; - - TPartitionMetrics Metrics; - TPartitionMetrics NewMetrics; - ui64 Round = 0; ui64 NextCookie = 0; - - void AggrStats(ui32 partition, ui64 dataSize, ui64 usedReserveSize); - void AggrStats(ui64 avgWriteSpeedPerSec, ui64 avgWriteSpeedPerMin, ui64 avgWriteSpeedPerHour, ui64 avgWriteSpeedPerDay); }; - TAggregatedStats AggregatedStats; + TStatsRequestTracker StatsRequestTracker; struct TTxWritePartitionStats; bool TTxWritePartitionStatsScheduled = false; diff --git a/ydb/core/persqueue/pqrb/read_balancer__metrics.cpp b/ydb/core/persqueue/pqrb/read_balancer__metrics.cpp new file mode 100644 index 000000000000..bda49aefd468 --- /dev/null +++ b/ydb/core/persqueue/pqrb/read_balancer__metrics.cpp @@ -0,0 +1,301 @@ +#include "read_balancer.h" +#include "read_balancer__metrics.h" + +#include +#include +#include +#include +#include +#include +#include + +namespace NKikimr::NPQ { + + +namespace { + +template +struct TMetricCollector { + using TConfig = TProtobufTabletLabeledCounters; + + void Collect(const auto& values) { + Collect(values.begin(), values.end()); + } + + void Collect(auto begin, auto end) { + ssize_t in_size = std::distance(begin, end); + AFL_ENSURE(in_size >= 0)("in_size", in_size); + + ssize_t count = std::min(Counters.GetCounters().Size(), in_size); + for (ssize_t i = 0; i < count; ++i) { + Counters.GetCounters()[i] = *begin; + ++begin; + } + + // The aggregation function configured in the protofile is used for each counter. + Aggregator.AggregateWith(Counters); + } + + TConfig Counters; + TTabletLabeledCountersBase Aggregator; +}; + +struct TConsumerMetricCollector { + TMetricCollector ClientLabeledCounters; + //TMetricCollector MLPConsumerLabeledCounters; + //TMetricCollector MLPMessageLockAttemptsCounter; +}; + +struct TTopicMetricCollector { + TTopicMetricCollector(absl::flat_hash_map& partitionMetrics) + : PartitionMetrics(partitionMetrics) + { + } + + absl::flat_hash_map& PartitionMetrics; + + TTopicMetrics TopicMetrics; + + TMetricCollector PartitionLabeledCounters; + TMetricCollector PartitionExtendedLabeledCounters; + TMetricCollector PartitionKeyCompactionLabeledCounters; + + absl::flat_hash_map Consumers; + + void Collect(const NKikimrPQ::TStatusResponse::TPartResult& partitionStatus) { + auto& partitionMetrics = PartitionMetrics[partitionStatus.GetPartition()]; + partitionMetrics.DataSize = partitionStatus.GetPartitionSize(); + partitionMetrics.UsedReserveSize = partitionStatus.GetUsedReserveSize(); + + TopicMetrics.TotalAvgWriteSpeedPerSec += partitionStatus.GetAvgWriteSpeedPerSec(); + TopicMetrics.MaxAvgWriteSpeedPerSec = Max(TopicMetrics.MaxAvgWriteSpeedPerSec, partitionStatus.GetAvgWriteSpeedPerSec()); + TopicMetrics.TotalAvgWriteSpeedPerMin += partitionStatus.GetAvgWriteSpeedPerMin(); + TopicMetrics.MaxAvgWriteSpeedPerMin = Max(TopicMetrics.MaxAvgWriteSpeedPerMin, partitionStatus.GetAvgWriteSpeedPerMin()); + TopicMetrics.TotalAvgWriteSpeedPerHour += partitionStatus.GetAvgWriteSpeedPerHour(); + TopicMetrics.MaxAvgWriteSpeedPerHour = Max(TopicMetrics.MaxAvgWriteSpeedPerHour, partitionStatus.GetAvgWriteSpeedPerHour()); + TopicMetrics.TotalAvgWriteSpeedPerDay += partitionStatus.GetAvgWriteSpeedPerDay(); + TopicMetrics.MaxAvgWriteSpeedPerDay = Max(TopicMetrics.MaxAvgWriteSpeedPerDay, partitionStatus.GetAvgWriteSpeedPerDay()); + + Collect(partitionStatus.GetAggregatedCounters()); + } + + void Collect(const NKikimrPQ::TAggregatedCounters& counters) { + PartitionLabeledCounters.Collect(counters.GetValues()); + PartitionExtendedLabeledCounters.Collect(counters.GetExtendedCounters().GetValues()); + PartitionKeyCompactionLabeledCounters.Collect(counters.GetCompactionCounters().GetValues()); + + for (const auto& consumer : counters.GetConsumerAggregatedCounters()) { + Consumers[consumer.GetConsumer()].ClientLabeledCounters.Collect(consumer.GetValues()); + } + + // TODO MLP + // for (const auto& consumer : counters.GetMLPConsumerCounters()) { + // auto& collector = Consumers[consumer.GetConsumer()]; + // collector.MLPConsumerLabeledCounters.Collect(consumer.GetCountersValues()); + // collector.MLPMessageLockAttemptsCounter.Collect(consumer.GetMessageLocksValues()); + // } + } + + void Finish() { + for (auto& [_, partitionMetrics] : PartitionMetrics) { + TopicMetrics.TotalDataSize += partitionMetrics.DataSize; + TopicMetrics.TotalUsedReserveSize += partitionMetrics.UsedReserveSize; + } + } +}; + +template +TCounters InitializeCounters( + NMonitoring::TDynamicCounterPtr root, + const std::vector>& subgroups = {}, + bool skipPrefix = true +) { + auto group = root; + + for (const auto& subgroup : subgroups) { + group = group->GetSubgroup(subgroup.first, subgroup.second); + } + + const auto* config = NAux::GetLabeledCounterOpts(); + + std::vector<::NMonitoring::TDynamicCounters::TCounterPtr> result; + for (size_t i = 0; i < config->Size; ++i) { + TString name = config->GetSVNames()[i]; + if (skipPrefix) { + TStringBuf nameBuf = name; + nameBuf.SkipPrefix("PQ/"); + name = nameBuf; + } + result.push_back(name.empty() ? nullptr : group->GetExpiringNamedCounter("name", name, false)); + } + + return { + .Types = config->GetTypes(), + .Counters = std::move(result) + }; +} + +void SetCounters(TCounters& counters, const auto& metrics) { + ui64 now = TAppData::TimeProvider->Now().MilliSeconds(); + auto& aggregatedCounters = metrics.Aggregator.GetCounters(); + + for (size_t i = 0; i < counters.Counters.size(); ++i) { + if (!counters.Counters[i]) { + continue; + } + if (aggregatedCounters.Size() == i) { + break; + } + + auto value = aggregatedCounters[i].Get(); + const auto& type = counters.Types[i]; + if (type == TLabeledCounterOptions::CT_TIMELAG) { + value = value < now ? now - value : 0; + } + + counters.Counters[i]->Set(value); + } +} + +} + +TTopicMetricsHandler::TTopicMetricsHandler() = default; +TTopicMetricsHandler::~TTopicMetricsHandler() = default; + +const TTopicMetrics& TTopicMetricsHandler::GetTopicMetrics() const { + return TopicMetrics; +} + +const absl::flat_hash_map& TTopicMetricsHandler::GetPartitionMetrics() const { + return PartitionMetrics; +} + +void TTopicMetricsHandler::Initialize(const NKikimrPQ::TPQTabletConfig& tabletConfig, const TDatabaseInfo& database, const TString& topicPath, const NActors::TActorContext& ctx) { + if (DynamicCounters) { + return; + } + + TStringBuf name = TStringBuf(topicPath); + name.SkipPrefix(database.DatabasePath); + name.SkipPrefix("/"); + + bool isServerless = AppData(ctx)->FeatureFlags.GetEnableDbCounters(); //TODO: find out it via describe + DynamicCounters = AppData(ctx)->Counters->GetSubgroup("counters", isServerless ? "topics_serverless" : "topics") + ->GetSubgroup("host", "") + ->GetSubgroup("database", database.DatabasePath) + ->GetSubgroup("cloud_id", database.CloudId) + ->GetSubgroup("folder_id", database.FolderId) + ->GetSubgroup("database_id", database.DatabaseId) + ->GetSubgroup("topic", TString(name)); + + ActivePartitionCountCounter = DynamicCounters->GetExpiringNamedCounter("name", "topic.partition.active_count", false); + InactivePartitionCountCounter = DynamicCounters->GetExpiringNamedCounter("name", "topic.partition.inactive_count", false); + + PartitionLabeledCounters = InitializeCounters(DynamicCounters); + PartitionExtendedLabeledCounters = InitializeCounters(DynamicCounters, {}, true); + InitializeKeyCompactionCounters(tabletConfig); + InitializeConsumerCounters(tabletConfig, ctx); +} + +void TTopicMetricsHandler::InitializeConsumerCounters(const NKikimrPQ::TPQTabletConfig& tabletConfig, const NActors::TActorContext& ctx) { + for (const auto& consumer : tabletConfig.GetConsumers()) { + auto metricsConsumerName = NPersQueue::ConvertOldConsumerName(consumer.GetName(), ctx); + + auto& counters = ConsumerCounters[consumer.GetName()]; + counters.ClientLabeledCounters = InitializeCounters(DynamicCounters, {{"consumer", metricsConsumerName}}); + + if (consumer.GetType() == NKikimrPQ::TPQTabletConfig::CONSUMER_TYPE_MLP) { + //metrics.MLPClientLabeledCounters = InitializeCounters(DynamicCounters, {{"consumer", metricsConsumerName}}); + //metrics.MLPMessageLockAttemptsCounter = InitializeCounters(DynamicCounters, {{"consumer", metricsConsumerName}}); + } + } + + absl::flat_hash_set existedConsumers; + for (const auto& consumer : tabletConfig.GetConsumers()) { + existedConsumers.insert(consumer.GetName()); + } + for (auto it = ConsumerCounters.begin(); it != ConsumerCounters.end();) { + if (!existedConsumers.contains(it->first)) { + ConsumerCounters.erase(it++); + } else { + ++it; + } + } +} + +void TTopicMetricsHandler::InitializeKeyCompactionCounters(const NKikimrPQ::TPQTabletConfig& tabletConfig) { + if (tabletConfig.GetEnableCompactification()) { + PartitionKeyCompactionLabeledCounters = InitializeCounters(DynamicCounters, {}, true); + } else { + PartitionKeyCompactionLabeledCounters.Counters.clear(); + } +} + +void TTopicMetricsHandler::UpdateConfig(const NKikimrPQ::TPQTabletConfig& tabletConfig, const TDatabaseInfo& database, const TString& topicPath, const NActors::TActorContext& ctx) { + Y_UNUSED(database); + Y_UNUSED(topicPath); + + if (!DynamicCounters) { + return; + } + + InitializeKeyCompactionCounters(tabletConfig); + InitializeConsumerCounters(tabletConfig, ctx); + + size_t inactiveCount = std::count_if(tabletConfig.GetAllPartitions().begin(), tabletConfig.GetAllPartitions().end(), [](auto& p) { + return p.GetStatus() == NKikimrPQ::ETopicPartitionStatus::Inactive; + }); + + ActivePartitionCountCounter->Set(tabletConfig.GetAllPartitions().size() - inactiveCount); + InactivePartitionCountCounter->Set(inactiveCount); +} + +void TTopicMetricsHandler::InitializePartitions(ui32 partitionId, ui64 dataSize, ui64 usedReserveSize) { + PartitionMetrics[partitionId] = { + .DataSize = dataSize, + .UsedReserveSize = usedReserveSize + }; + + TopicMetrics.TotalDataSize += dataSize; + TopicMetrics.TotalUsedReserveSize += usedReserveSize; +} + +void TTopicMetricsHandler::Handle(NKikimrPQ::TStatusResponse_TPartResult&& partitionStatus) { + PartitionStatuses[partitionStatus.GetPartition()] = std::move(partitionStatus); +} + +void TTopicMetricsHandler::UpdateMetrics() { + if (!DynamicCounters || PartitionStatuses.empty()) { + return; + } + + TTopicMetricCollector collector(PartitionMetrics); + for (auto& [_, partitionStatus] : PartitionStatuses) { + collector.Collect(partitionStatus); + } + collector.Finish(); + PartitionStatuses.erase(PartitionStatuses.begin(), PartitionStatuses.end()); + + TopicMetrics = collector.TopicMetrics; + + SetCounters(PartitionLabeledCounters, collector.PartitionLabeledCounters); + SetCounters(PartitionExtendedLabeledCounters, collector.PartitionExtendedLabeledCounters); + SetCounters(PartitionKeyCompactionLabeledCounters, collector.PartitionKeyCompactionLabeledCounters); + + for (auto& [consumer, consumerCounters] : ConsumerCounters) { + auto it = collector.Consumers.find(consumer); + if (it == collector.Consumers.end()) { + continue; + } + + auto& consumerMetrics = it->second; + + SetCounters(consumerCounters.ClientLabeledCounters, consumerMetrics.ClientLabeledCounters); + //if (!consumerCounters.MLPClientLabeledCounters.Counters.empty()) { + // SetCounters(consumerCounters.MLPClientLabeledCounters, consumerMetrics.MLPConsumerLabeledCounters); + // TODO MLPMessageLockAttemptsCounter + //} + } +} + +} diff --git a/ydb/core/persqueue/pqrb/read_balancer__metrics.h b/ydb/core/persqueue/pqrb/read_balancer__metrics.h new file mode 100644 index 000000000000..b84b0039aeb6 --- /dev/null +++ b/ydb/core/persqueue/pqrb/read_balancer__metrics.h @@ -0,0 +1,81 @@ +#pragma once + +#include +#include +#include +#include + +#include + +#include + +namespace NKikimr::NPQ { + +struct TDatabaseInfo; + +struct TTopicMetrics { + ui64 TotalDataSize = 0; + ui64 TotalUsedReserveSize = 0; + + ui64 TotalAvgWriteSpeedPerSec = 0; + ui64 MaxAvgWriteSpeedPerSec = 0; + ui64 TotalAvgWriteSpeedPerMin = 0; + ui64 MaxAvgWriteSpeedPerMin = 0; + ui64 TotalAvgWriteSpeedPerHour = 0; + ui64 MaxAvgWriteSpeedPerHour = 0; + ui64 TotalAvgWriteSpeedPerDay = 0; + ui64 MaxAvgWriteSpeedPerDay = 0; +}; + +struct TPartitionMetrics { + ui64 DataSize = 0; + ui64 UsedReserveSize = 0; +}; + +struct TCounters { + std::vector Types; + std::vector<::NMonitoring::TDynamicCounters::TCounterPtr> Counters; +}; + +class TTopicMetricsHandler { +public: + TTopicMetricsHandler(); + ~TTopicMetricsHandler(); + + const TTopicMetrics& GetTopicMetrics() const; + const absl::flat_hash_map& GetPartitionMetrics() const; + + void Initialize(const NKikimrPQ::TPQTabletConfig& tabletConfig, const TDatabaseInfo& database, const TString& topicPath, const NActors::TActorContext& ctx); + void UpdateConfig(const NKikimrPQ::TPQTabletConfig& tabletConfig, const TDatabaseInfo& database, const TString& topicPath, const NActors::TActorContext& ctx); + void InitializePartitions(ui32 partitionId, ui64 dataSize, ui64 usedReserveSize); + + void Handle(NKikimrPQ::TStatusResponse_TPartResult&& partitionStatus); + void UpdateMetrics(); + +protected: + void InitializeKeyCompactionCounters(const NKikimrPQ::TPQTabletConfig& tabletConfig); + void InitializeConsumerCounters(const NKikimrPQ::TPQTabletConfig& tabletConfig, const NActors::TActorContext& ctx); + +private: + NMonitoring::TDynamicCounterPtr DynamicCounters; + + TTopicMetrics TopicMetrics; + absl::flat_hash_map PartitionMetrics; + + NMonitoring::TDynamicCounters::TCounterPtr ActivePartitionCountCounter; + NMonitoring::TDynamicCounters::TCounterPtr InactivePartitionCountCounter; + + TCounters PartitionLabeledCounters; + TCounters PartitionExtendedLabeledCounters; + TCounters PartitionKeyCompactionLabeledCounters; + + struct TConsumerCounters { + TCounters ClientLabeledCounters; + TCounters MLPClientLabeledCounters; + ::NMonitoring::TDynamicCounters::TCounterPtr MLPMessageLockAttemptsCounter; + }; + absl::flat_hash_map ConsumerCounters; + + absl::flat_hash_map PartitionStatuses; +}; +} diff --git a/ydb/core/persqueue/pqrb/read_balancer__txinit.h b/ydb/core/persqueue/pqrb/read_balancer__txinit.h index 4ca469b4c3e6..7a7694f4657c 100644 --- a/ydb/core/persqueue/pqrb/read_balancer__txinit.h +++ b/ydb/core/persqueue/pqrb/read_balancer__txinit.h @@ -1,6 +1,7 @@ #pragma once #include "read_balancer.h" +#include "read_balancer__metrics.h" #include "read_balancer__schema.h" #include @@ -60,7 +61,8 @@ struct TPersQueueReadBalancer::TTxInit : public ITransaction { Self->PartitionGraph = MakePartitionGraph(Self->TabletConfig); if (SplitMergeEnabled(Self->TabletConfig)) { - Self->PartitionsScaleManager = std::make_unique(Self->Topic, Self->Path, Self->DatabasePath, Self->PathId, Self->Version, Self->TabletConfig, Self->PartitionGraph); + // TODO DatabasePath is not initialized yet + Self->PartitionsScaleManager = std::make_unique(Self->Topic, Self->Path, Self->DatabaseInfo.DatabasePath, Self->PathId, Self->Version, Self->TabletConfig, Self->PartitionGraph); } Self->UpdateConfigCounters(); } @@ -76,8 +78,8 @@ struct TPersQueueReadBalancer::TTxInit : public ITransaction { ui64 tabletId = partsRowset.GetValue(); partitionsInfo[part] = {tabletId}; - Self->AggregatedStats.AggrStats(part, partsRowset.GetValue(), - partsRowset.GetValue()); + Self->TopicMetricsHandler->InitializePartitions(part, partsRowset.GetValue(), + partsRowset.GetValue()); if (!partsRowset.Next()) return false; diff --git a/ydb/core/persqueue/pqrb/read_balancer_app.cpp b/ydb/core/persqueue/pqrb/read_balancer_app.cpp index 5abe979a3d98..4d7fa164d40d 100644 --- a/ydb/core/persqueue/pqrb/read_balancer_app.cpp +++ b/ydb/core/persqueue/pqrb/read_balancer_app.cpp @@ -1,6 +1,7 @@ #include "read_balancer.h" #include "read_balancer__balancing.h" +#include "read_balancer__metrics.h" #include @@ -17,7 +18,8 @@ bool TPersQueueReadBalancer::OnRenderAppHtmlPage(NMon::TEvRemoteHttpInfo::TPtr e } TString TPersQueueReadBalancer::GenerateStat() { - auto& metrics = AggregatedStats.Metrics; + auto& metrics = TopicMetricsHandler->GetTopicMetrics(); + auto& partitionMetrics = TopicMetricsHandler->GetPartitionMetrics(); TStringStream str; HTML_APP_PAGE(str, "PersQueueReadBalancer " << TabletID() << " (" << Path << ")") { @@ -45,9 +47,9 @@ TString TPersQueueReadBalancer::GenerateStat() { PROPERTIES("Statistics") { PROPERTY("Active pipes", Balancer->GetSessions().size()); PROPERTY("Active partitions", NumActiveParts); - PROPERTY("Total data size", AggregatedStats.TotalDataSize); + PROPERTY("Total data size", metrics.TotalDataSize); PROPERTY("Reserve size", PartitionReserveSize()); - PROPERTY("Used reserve size", AggregatedStats.TotalUsedReserveSize); + PROPERTY("Used reserve size", metrics.TotalUsedReserveSize); PROPERTY("[Total/Max/Avg]WriteSpeedSec", metrics.TotalAvgWriteSpeedPerSec << "/" << metrics.MaxAvgWriteSpeedPerSec << "/" << metrics.TotalAvgWriteSpeedPerSec / NumActiveParts); PROPERTY("[Total/Max/Avg]WriteSpeedMin", metrics.TotalAvgWriteSpeedPerMin << "/" << metrics.MaxAvgWriteSpeedPerMin << "/" << metrics.TotalAvgWriteSpeedPerMin / NumActiveParts); PROPERTY("[Total/Max/Avg]WriteSpeedHour", metrics.TotalAvgWriteSpeedPerHour << "/" << metrics.MaxAvgWriteSpeedPerHour << "/" << metrics.TotalAvgWriteSpeedPerHour / NumActiveParts); @@ -75,7 +77,7 @@ TString TPersQueueReadBalancer::GenerateStat() { } TABLEBODY() { for (auto& [partitionId, partitionInfo] : PartitionsInfo) { - const auto& stats = AggregatedStats.Stats[partitionId]; + const auto& stats = partitionMetrics.find(partitionId); const auto* node = PartitionGraph.GetPartition(partitionId); TString style = node && node->DirectChildren.empty() ? "text-success" : "text-muted"; @@ -110,7 +112,7 @@ TString TPersQueueReadBalancer::GenerateStat() { } } } - TABLED() { str << stats.DataSize; } + TABLED() { str << (stats == partitionMetrics.end() ? 0 : stats->second.DataSize); } } } } diff --git a/ydb/core/persqueue/pqrb/ya.make b/ydb/core/persqueue/pqrb/ya.make index 9824d5544b4c..96ffab820467 100644 --- a/ydb/core/persqueue/pqrb/ya.make +++ b/ydb/core/persqueue/pqrb/ya.make @@ -7,6 +7,7 @@ SRCS( partition_scale_manager_graph_cmp.cpp read_balancer__balancing_app.cpp read_balancer__balancing.cpp + read_balancer__metrics.cpp read_balancer__mlp_balancing.cpp read_balancer_app.cpp read_balancer.cpp diff --git a/ydb/core/persqueue/public/config.h b/ydb/core/persqueue/public/config.h index 2cda9747e779..08d1f4fc3e2e 100644 --- a/ydb/core/persqueue/public/config.h +++ b/ydb/core/persqueue/public/config.h @@ -9,6 +9,8 @@ class TPQTabletConfig_TConsumer; class TMirrorPartitionConfig; class TMLPStorageSnapshot; class TMLPStorageWAL; +class TStatusResponse; +class TStatusResponse_TPartResult; }; diff --git a/ydb/core/tablet/tablet_counters_protobuf.h b/ydb/core/tablet/tablet_counters_protobuf.h index 54a4230bf380..d9933c88ed29 100644 --- a/ydb/core/tablet/tablet_counters_protobuf.h +++ b/ydb/core/tablet/tablet_counters_protobuf.h @@ -3,8 +3,9 @@ #include "tablet_counters.h" #include "tablet_counters_aggregator.h" #include -#include +#include #include +#include namespace NKikimr { @@ -275,6 +276,7 @@ struct TLabeledCounterParsedOpts { TVector Types; TVector GroupNamesStrings; TVector GroupNames; + TString Groups; public: explicit TLabeledCounterParsedOpts() : Size(LabeledCountersDesc()->value_count()) @@ -319,7 +321,9 @@ struct TLabeledCounterParsedOpts { std::transform(GroupNamesStrings.begin(), GroupNamesStrings.end(), std::back_inserter(GroupNames), [](auto& string) { return string.data(); } ); + Groups = JoinRange("|", GroupNamesStrings.begin(), GroupNamesStrings.end()); } + virtual ~TLabeledCounterParsedOpts() {} @@ -353,6 +357,16 @@ struct TLabeledCounterParsedOpts { return AggregateFuncs.begin(); } + const TString& GetGroups() const + { + return Groups; + } + + const TVector& GetTypes() const + { + return Types; + } + protected: TString GetFilePrefix(const NProtoBuf::FileDescriptor* desc) { if (desc->options().HasExtension(TabletTypeName)) { @@ -635,6 +649,13 @@ class TProtobufTabletLabeledCounters : public TTabletLabeledCountersBase { return NAux::GetLabeledCounterOpts(); } + TProtobufTabletLabeledCounters(TMaybe databasePath = Nothing(), const ui64 id = 0) + : TTabletLabeledCountersBase( + SimpleOpts()->Size, SimpleOpts()->GetSVNames(), SimpleOpts()->GetCounterTypes(), + SimpleOpts()->GetAggregateFuncs(), SimpleOpts()->GetGroups(), SimpleOpts()->GetGroupNames(), id, std::move(databasePath)) + { + } + TProtobufTabletLabeledCounters(const TString& group, const ui64 id) : TTabletLabeledCountersBase( SimpleOpts()->Size, SimpleOpts()->GetNames(), SimpleOpts()->GetCounterTypes(),