Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
262 changes: 43 additions & 219 deletions ydb/core/persqueue/pqrb/read_balancer.cpp

Large diffs are not rendered by default.

54 changes: 13 additions & 41 deletions ydb/core/persqueue/pqrb/read_balancer.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,15 @@ class TBalancer;
class TMLPBalancer;
}

class TTopicMetricsHandler;

struct TDatabaseInfo {
TString DatabasePath;
TString DatabaseId;
TString FolderId;
TString CloudId;
};


class TMetricsTimeKeeper {
public:
Expand Down Expand Up @@ -213,54 +222,17 @@ class TPersQueueReadBalancer : public TActor<TPersQueueReadBalancer>,
std::unordered_map<ui64, TPipeLocation> TabletPipes;
std::unordered_set<ui64> PipesRequested;

std::vector<::NMonitoring::TDynamicCounters::TCounterPtr> AggregatedCounters;
std::vector<::NMonitoring::TDynamicCounters::TCounterPtr> AggregatedExtendedCounters;
std::vector<::NMonitoring::TDynamicCounters::TCounterPtr> AggregatedCompactionCounters;
TDatabaseInfo DatabaseInfo;

NMonitoring::TDynamicCounterPtr DynamicCounters;
NMonitoring::TDynamicCounters::TCounterPtr ActivePartitionCountCounter;
NMonitoring::TDynamicCounters::TCounterPtr InactivePartitionCountCounter;
std::unique_ptr<TTopicMetricsHandler> TopicMetricsHandler;

TString DatabasePath;
TString DatabaseId;
TString FolderId;
TString CloudId;

struct TPartitionStats {
ui64 DataSize = 0;
ui64 UsedReserveSize = 0;
NKikimrPQ::TAggregatedCounters Counters;
bool HasCounters = false;
};

struct TPartitionMetrics {
ui64 TotalAvgWriteSpeedPerSec = 0;
ui64 MaxAvgWriteSpeedPerSec = 0;
ui64 TotalAvgWriteSpeedPerMin = 0;
ui64 MaxAvgWriteSpeedPerMin = 0;
ui64 TotalAvgWriteSpeedPerHour = 0;
ui64 MaxAvgWriteSpeedPerHour = 0;
ui64 TotalAvgWriteSpeedPerDay = 0;
ui64 MaxAvgWriteSpeedPerDay = 0;
};

struct TAggregatedStats {
std::unordered_map<ui32, TPartitionStats> Stats;
struct TStatsRequestTracker {
std::unordered_map<ui64, ui64> Cookies;

ui64 TotalDataSize = 0;
ui64 TotalUsedReserveSize = 0;

TPartitionMetrics Metrics;
TPartitionMetrics NewMetrics;

ui64 Round = 0;
ui64 NextCookie = 0;

void AggrStats(ui32 partition, ui64 dataSize, ui64 usedReserveSize);
void AggrStats(ui64 avgWriteSpeedPerSec, ui64 avgWriteSpeedPerMin, ui64 avgWriteSpeedPerHour, ui64 avgWriteSpeedPerDay);
};
TAggregatedStats AggregatedStats;
TStatsRequestTracker StatsRequestTracker;

struct TTxWritePartitionStats;
bool TTxWritePartitionStatsScheduled = false;
Expand Down
301 changes: 301 additions & 0 deletions ydb/core/persqueue/pqrb/read_balancer__metrics.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,301 @@
#include "read_balancer.h"
#include "read_balancer__metrics.h"

#include <ydb/core/base/appdata.h>
#include <ydb/core/persqueue/common/percentiles.h>
#include <ydb/core/protos/counters_pq.pb.h>
#include <ydb/core/protos/pqconfig.pb.h>
#include <ydb/core/tablet/tablet_counters_protobuf.h>
#include <ydb/library/actors/core/actor.h>
#include <ydb/library/persqueue/topic_parser/topic_parser.h>

namespace NKikimr::NPQ {


namespace {

template<const NProtoBuf::EnumDescriptor* SimpleDesc()>
struct TMetricCollector {
using TConfig = TProtobufTabletLabeledCounters<SimpleDesc>;

void Collect(const auto& values) {
Collect(values.begin(), values.end());
}

void Collect(auto begin, auto end) {
ssize_t in_size = std::distance(begin, end);
AFL_ENSURE(in_size >= 0)("in_size", in_size);

ssize_t count = std::min<ssize_t>(Counters.GetCounters().Size(), in_size);
for (ssize_t i = 0; i < count; ++i) {
Counters.GetCounters()[i] = *begin;
++begin;
}

// The aggregation function configured in the protofile is used for each counter.
Aggregator.AggregateWith(Counters);
}

TConfig Counters;
TTabletLabeledCountersBase Aggregator;
};

struct TConsumerMetricCollector {
TMetricCollector<EClientLabeledCounters_descriptor> ClientLabeledCounters;
//TMetricCollector<EMLPConsumerLabeledCounters_descriptor> MLPConsumerLabeledCounters;
//TMetricCollector MLPMessageLockAttemptsCounter;
};

struct TTopicMetricCollector {
TTopicMetricCollector(absl::flat_hash_map<ui32, TPartitionMetrics>& partitionMetrics)
: PartitionMetrics(partitionMetrics)
{
}

absl::flat_hash_map<ui32, TPartitionMetrics>& PartitionMetrics;

TTopicMetrics TopicMetrics;

TMetricCollector<EPartitionLabeledCounters_descriptor> PartitionLabeledCounters;
TMetricCollector<EPartitionExtendedLabeledCounters_descriptor> PartitionExtendedLabeledCounters;
TMetricCollector<EPartitionKeyCompactionLabeledCounters_descriptor> PartitionKeyCompactionLabeledCounters;

absl::flat_hash_map<TString, TConsumerMetricCollector> Consumers;

void Collect(const NKikimrPQ::TStatusResponse::TPartResult& partitionStatus) {
auto& partitionMetrics = PartitionMetrics[partitionStatus.GetPartition()];
partitionMetrics.DataSize = partitionStatus.GetPartitionSize();
partitionMetrics.UsedReserveSize = partitionStatus.GetUsedReserveSize();

TopicMetrics.TotalAvgWriteSpeedPerSec += partitionStatus.GetAvgWriteSpeedPerSec();
TopicMetrics.MaxAvgWriteSpeedPerSec = Max<ui64>(TopicMetrics.MaxAvgWriteSpeedPerSec, partitionStatus.GetAvgWriteSpeedPerSec());
TopicMetrics.TotalAvgWriteSpeedPerMin += partitionStatus.GetAvgWriteSpeedPerMin();
TopicMetrics.MaxAvgWriteSpeedPerMin = Max<ui64>(TopicMetrics.MaxAvgWriteSpeedPerMin, partitionStatus.GetAvgWriteSpeedPerMin());
TopicMetrics.TotalAvgWriteSpeedPerHour += partitionStatus.GetAvgWriteSpeedPerHour();
TopicMetrics.MaxAvgWriteSpeedPerHour = Max<ui64>(TopicMetrics.MaxAvgWriteSpeedPerHour, partitionStatus.GetAvgWriteSpeedPerHour());
TopicMetrics.TotalAvgWriteSpeedPerDay += partitionStatus.GetAvgWriteSpeedPerDay();
TopicMetrics.MaxAvgWriteSpeedPerDay = Max<ui64>(TopicMetrics.MaxAvgWriteSpeedPerDay, partitionStatus.GetAvgWriteSpeedPerDay());

Collect(partitionStatus.GetAggregatedCounters());
}

void Collect(const NKikimrPQ::TAggregatedCounters& counters) {
PartitionLabeledCounters.Collect(counters.GetValues());
PartitionExtendedLabeledCounters.Collect(counters.GetExtendedCounters().GetValues());
PartitionKeyCompactionLabeledCounters.Collect(counters.GetCompactionCounters().GetValues());

for (const auto& consumer : counters.GetConsumerAggregatedCounters()) {
Consumers[consumer.GetConsumer()].ClientLabeledCounters.Collect(consumer.GetValues());
}

// TODO MLP
// for (const auto& consumer : counters.GetMLPConsumerCounters()) {
// auto& collector = Consumers[consumer.GetConsumer()];
// collector.MLPConsumerLabeledCounters.Collect(consumer.GetCountersValues());
// collector.MLPMessageLockAttemptsCounter.Collect(consumer.GetMessageLocksValues());
// }
}

void Finish() {
for (auto& [_, partitionMetrics] : PartitionMetrics) {
TopicMetrics.TotalDataSize += partitionMetrics.DataSize;
TopicMetrics.TotalUsedReserveSize += partitionMetrics.UsedReserveSize;
}
}
};

template<const NProtoBuf::EnumDescriptor* SimpleDesc()>
TCounters InitializeCounters(
NMonitoring::TDynamicCounterPtr root,
const std::vector<std::pair<TString, TString>>& subgroups = {},
bool skipPrefix = true
) {
auto group = root;

for (const auto& subgroup : subgroups) {
group = group->GetSubgroup(subgroup.first, subgroup.second);
}

const auto* config = NAux::GetLabeledCounterOpts<SimpleDesc>();

std::vector<::NMonitoring::TDynamicCounters::TCounterPtr> result;
for (size_t i = 0; i < config->Size; ++i) {
TString name = config->GetSVNames()[i];
if (skipPrefix) {
TStringBuf nameBuf = name;
nameBuf.SkipPrefix("PQ/");
name = nameBuf;
}
result.push_back(name.empty() ? nullptr : group->GetExpiringNamedCounter("name", name, false));
}

return {
.Types = config->GetTypes(),
.Counters = std::move(result)
};
}

void SetCounters(TCounters& counters, const auto& metrics) {
ui64 now = TAppData::TimeProvider->Now().MilliSeconds();
auto& aggregatedCounters = metrics.Aggregator.GetCounters();

for (size_t i = 0; i < counters.Counters.size(); ++i) {
if (!counters.Counters[i]) {
continue;
}
if (aggregatedCounters.Size() == i) {
break;
}

auto value = aggregatedCounters[i].Get();
const auto& type = counters.Types[i];
if (type == TLabeledCounterOptions::CT_TIMELAG) {
value = value < now ? now - value : 0;
}

counters.Counters[i]->Set(value);
}
}

}

TTopicMetricsHandler::TTopicMetricsHandler() = default;
TTopicMetricsHandler::~TTopicMetricsHandler() = default;

const TTopicMetrics& TTopicMetricsHandler::GetTopicMetrics() const {
return TopicMetrics;
}

const absl::flat_hash_map<ui32, TPartitionMetrics>& TTopicMetricsHandler::GetPartitionMetrics() const {
return PartitionMetrics;
}

void TTopicMetricsHandler::Initialize(const NKikimrPQ::TPQTabletConfig& tabletConfig, const TDatabaseInfo& database, const TString& topicPath, const NActors::TActorContext& ctx) {
if (DynamicCounters) {
return;
}

TStringBuf name = TStringBuf(topicPath);
name.SkipPrefix(database.DatabasePath);
name.SkipPrefix("/");

bool isServerless = AppData(ctx)->FeatureFlags.GetEnableDbCounters(); //TODO: find out it via describe
DynamicCounters = AppData(ctx)->Counters->GetSubgroup("counters", isServerless ? "topics_serverless" : "topics")
->GetSubgroup("host", "")
->GetSubgroup("database", database.DatabasePath)
->GetSubgroup("cloud_id", database.CloudId)
->GetSubgroup("folder_id", database.FolderId)
->GetSubgroup("database_id", database.DatabaseId)
->GetSubgroup("topic", TString(name));

ActivePartitionCountCounter = DynamicCounters->GetExpiringNamedCounter("name", "topic.partition.active_count", false);
InactivePartitionCountCounter = DynamicCounters->GetExpiringNamedCounter("name", "topic.partition.inactive_count", false);

PartitionLabeledCounters = InitializeCounters<EPartitionLabeledCounters_descriptor>(DynamicCounters);
PartitionExtendedLabeledCounters = InitializeCounters<EPartitionExtendedLabeledCounters_descriptor>(DynamicCounters, {}, true);
InitializeKeyCompactionCounters(tabletConfig);
InitializeConsumerCounters(tabletConfig, ctx);
}

void TTopicMetricsHandler::InitializeConsumerCounters(const NKikimrPQ::TPQTabletConfig& tabletConfig, const NActors::TActorContext& ctx) {
for (const auto& consumer : tabletConfig.GetConsumers()) {
auto metricsConsumerName = NPersQueue::ConvertOldConsumerName(consumer.GetName(), ctx);

auto& counters = ConsumerCounters[consumer.GetName()];
counters.ClientLabeledCounters = InitializeCounters<EClientLabeledCounters_descriptor>(DynamicCounters, {{"consumer", metricsConsumerName}});

if (consumer.GetType() == NKikimrPQ::TPQTabletConfig::CONSUMER_TYPE_MLP) {
//metrics.MLPClientLabeledCounters = InitializeCounters<EMLPConsumerLabeledCounters_descriptor>(DynamicCounters, {{"consumer", metricsConsumerName}});
//metrics.MLPMessageLockAttemptsCounter = InitializeCounters<EMLPMessageLockAttemptsLabeledCounters_descriptor>(DynamicCounters, {{"consumer", metricsConsumerName}});
}
}

absl::flat_hash_set<std::string_view> existedConsumers;
for (const auto& consumer : tabletConfig.GetConsumers()) {
existedConsumers.insert(consumer.GetName());
}
for (auto it = ConsumerCounters.begin(); it != ConsumerCounters.end();) {
if (!existedConsumers.contains(it->first)) {
ConsumerCounters.erase(it++);
} else {
++it;
}
}
}

void TTopicMetricsHandler::InitializeKeyCompactionCounters(const NKikimrPQ::TPQTabletConfig& tabletConfig) {
if (tabletConfig.GetEnableCompactification()) {
PartitionKeyCompactionLabeledCounters = InitializeCounters<EPartitionKeyCompactionLabeledCounters_descriptor>(DynamicCounters, {}, true);
} else {
PartitionKeyCompactionLabeledCounters.Counters.clear();
}
}

void TTopicMetricsHandler::UpdateConfig(const NKikimrPQ::TPQTabletConfig& tabletConfig, const TDatabaseInfo& database, const TString& topicPath, const NActors::TActorContext& ctx) {
Y_UNUSED(database);
Y_UNUSED(topicPath);

if (!DynamicCounters) {
return;
}

InitializeKeyCompactionCounters(tabletConfig);
InitializeConsumerCounters(tabletConfig, ctx);

size_t inactiveCount = std::count_if(tabletConfig.GetAllPartitions().begin(), tabletConfig.GetAllPartitions().end(), [](auto& p) {
return p.GetStatus() == NKikimrPQ::ETopicPartitionStatus::Inactive;
});

ActivePartitionCountCounter->Set(tabletConfig.GetAllPartitions().size() - inactiveCount);
InactivePartitionCountCounter->Set(inactiveCount);
}

void TTopicMetricsHandler::InitializePartitions(ui32 partitionId, ui64 dataSize, ui64 usedReserveSize) {
PartitionMetrics[partitionId] = {
.DataSize = dataSize,
.UsedReserveSize = usedReserveSize
};

TopicMetrics.TotalDataSize += dataSize;
TopicMetrics.TotalUsedReserveSize += usedReserveSize;
Comment on lines +258 to +260
Copy link

Copilot AI Dec 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

InitializePartitions accumulates DataSize and UsedReserveSize into TopicMetrics (lines 265-266), but these values will be completely replaced when UpdateMetrics is called (line 285 in UpdateMetrics method), which recalculates TopicMetrics from scratch. This means the initial accumulation in InitializePartitions is unnecessary and could be misleading. Consider removing the accumulation here or documenting that TopicMetrics will be recalculated on the first UpdateMetrics call.

Suggested change
TopicMetrics.TotalDataSize += dataSize;
TopicMetrics.TotalUsedReserveSize += usedReserveSize;

Copilot uses AI. Check for mistakes.
}

void TTopicMetricsHandler::Handle(NKikimrPQ::TStatusResponse_TPartResult&& partitionStatus) {
PartitionStatuses[partitionStatus.GetPartition()] = std::move(partitionStatus);
}

void TTopicMetricsHandler::UpdateMetrics() {
if (!DynamicCounters || PartitionStatuses.empty()) {
return;
}

TTopicMetricCollector collector(PartitionMetrics);
for (auto& [_, partitionStatus] : PartitionStatuses) {
collector.Collect(partitionStatus);
}
collector.Finish();
PartitionStatuses.erase(PartitionStatuses.begin(), PartitionStatuses.end());
Copy link

Copilot AI Dec 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using erase with range [begin, end) clears all elements from the map, but the standard approach is to simply call clear(). The current code is unnecessarily verbose and potentially confusing. Consider replacing with PartitionStatuses.clear() for better readability.

Suggested change
PartitionStatuses.erase(PartitionStatuses.begin(), PartitionStatuses.end());
PartitionStatuses.clear();

Copilot uses AI. Check for mistakes.

TopicMetrics = collector.TopicMetrics;

SetCounters(PartitionLabeledCounters, collector.PartitionLabeledCounters);
SetCounters(PartitionExtendedLabeledCounters, collector.PartitionExtendedLabeledCounters);
SetCounters(PartitionKeyCompactionLabeledCounters, collector.PartitionKeyCompactionLabeledCounters);

for (auto& [consumer, consumerCounters] : ConsumerCounters) {
auto it = collector.Consumers.find(consumer);
if (it == collector.Consumers.end()) {
continue;
}

auto& consumerMetrics = it->second;

SetCounters(consumerCounters.ClientLabeledCounters, consumerMetrics.ClientLabeledCounters);
//if (!consumerCounters.MLPClientLabeledCounters.Counters.empty()) {
// SetCounters(consumerCounters.MLPClientLabeledCounters, consumerMetrics.MLPConsumerLabeledCounters);
// TODO MLPMessageLockAttemptsCounter
//}
}
}

}
Loading
Loading