Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update In-progress CS Stats + Y_ASSERTs #16050

Merged
merged 2 commits into from
Mar 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
192 changes: 147 additions & 45 deletions ydb/core/kqp/executer_actor/kqp_executer_stats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,12 @@ ui64 NonZeroMin(ui64 a, ui64 b) {
return (b == 0) ? a : ((a == 0 || a > b) ? b : a);
}

void TTimeSeriesStats::ExportAggStats(ui64 baseTimeMs, NYql::NDqProto::TDqStatsAggr& stats) {
void TTimeSeriesStats::ExportAggStats(NYql::NDqProto::TDqStatsAggr& stats) {
NKikimr::NKqp::ExportAggStats(Values, stats);
}

void TTimeSeriesStats::ExportAggStats(ui64 baseTimeMs, NYql::NDqProto::TDqStatsAggr& stats) {
ExportAggStats(stats);
ExportHistory(baseTimeMs, stats);
}

Expand All @@ -28,16 +32,21 @@ void TTimeSeriesStats::ExportHistory(ui64 baseTimeMs, NYql::NDqProto::TDqStatsAg
}
}

void TTimeSeriesStats::Resize(ui32 taskCount) {
Values.resize(taskCount);
void TTimeSeriesStats::Resize(ui32 count) {
Values.resize(count);
}

void TTimeSeriesStats::SetNonZero(ui32 taskIndex, ui64 value) {
void TTimeSeriesStats::SetNonZero(ui32 index, ui64 value) {
if (value) {
Y_ASSERT(index < Values.size());
Sum += value;
Sum -= Values[taskIndex];
Values[taskIndex] = value;
Sum -= Values[index];
Values[index] = value;
AppendHistory();
}
}

void TTimeSeriesStats::AppendHistory() {
if (HistorySampleCount) {
auto nowMs = Now().MilliSeconds();

Expand Down Expand Up @@ -97,6 +106,69 @@ void TTimeSeriesStats::Pack() {
}
}

void TPartitionedStats::ResizeByTasks(ui32 taskCount) {
for (auto& p : Parts) {
p.resize(taskCount);
}
}

void TPartitionedStats::ResizeByParts(ui32 partCount, ui32 taskCount) {
auto oldPartCount = Parts.size();
Parts.resize(partCount);
for(auto i = oldPartCount; i < partCount; i++) {
Parts[i].resize(taskCount);
}
Resize(partCount);
}

void TPartitionedStats::SetNonZero(ui32 taskIndex, ui32 partIndex, ui64 value, bool recordTimeSeries) {
if (value) {
Y_ASSERT(partIndex < Parts.size());
auto& part = Parts[partIndex];
auto delta = value - part[taskIndex];
Y_ASSERT(taskIndex < part.size());
part[taskIndex] = value;
Y_ASSERT(partIndex < Values.size());
Values[partIndex] += delta;
Sum += delta;
if (recordTimeSeries) {
AppendHistory();
}
}
}

void TTimeMultiSeriesStats::SetNonZero(TPartitionedStats& stats, ui32 taskIndex, const TString& key, ui64 value, bool recordTimeSeries) {
auto [it, inserted] = Indices.try_emplace(key);
if (inserted) {
it->second = Indices.size() - 1;
if (PartCount < Indices.size()) {
PartCount += 4;
}
}
if (stats.Parts.size() < PartCount) {
stats.ResizeByParts(PartCount, TaskCount);
}
stats.SetNonZero(taskIndex, it->second, value, recordTimeSeries);
}

void TExternalStats::Resize(ui32 taskCount) {
ExternalRows.ResizeByTasks(taskCount);
ExternalBytes.ResizeByTasks(taskCount);
FirstMessageMs.ResizeByTasks(taskCount);
LastMessageMs.ResizeByTasks(taskCount);
TaskCount = taskCount;
}

void TExternalStats::SetHistorySampleCount(ui32 historySampleCount) {
ExternalBytes.HistorySampleCount = historySampleCount;
}

void TExternalStats::ExportHistory(ui64 baseTimeMs, NDqProto::TDqExternalAggrStats& stats) {
if (stats.HasExternalBytes()) {
ExternalBytes.ExportHistory(baseTimeMs, *stats.MutableExternalBytes());
}
}

void TAsyncStats::Resize(ui32 taskCount) {
Bytes.Resize(taskCount);
DecompressedBytes.resize(taskCount);
Expand Down Expand Up @@ -127,20 +199,25 @@ void TAsyncStats::ExportHistory(ui64 baseTimeMs, NYql::NDqProto::TDqAsyncStatsAg
}

void TAsyncBufferStats::Resize(ui32 taskCount) {
External.Resize(taskCount);
Ingress.Resize(taskCount);
Push.Resize(taskCount);
Pop.Resize(taskCount);
Egress.Resize(taskCount);
}

void TAsyncBufferStats::SetHistorySampleCount(ui32 historySampleCount) {
External.SetHistorySampleCount(historySampleCount);
Ingress.SetHistorySampleCount(historySampleCount);
Push.SetHistorySampleCount(historySampleCount);
Pop.SetHistorySampleCount(historySampleCount);
Egress.SetHistorySampleCount(historySampleCount);
}

void TAsyncBufferStats::ExportHistory(ui64 baseTimeMs, NYql::NDqProto::TDqAsyncBufferStatsAggr& stats) {
if (stats.HasExternal()) {
External.ExportHistory(baseTimeMs, *stats.MutableExternal());
}
if (stats.HasIngress()) {
Ingress.ExportHistory(baseTimeMs, *stats.MutableIngress());
}
Expand Down Expand Up @@ -281,40 +358,46 @@ void TStageExecutionStats::ExportHistory(ui64 baseTimeMs, NYql::NDqProto::TDqSta
}
}

void SetNonZero(ui64& target, ui64 source) {
inline void SetNonZero(ui64& target, ui64 source) {
if (source) {
target = source;
}
}

inline void SetNonZero(std::vector<ui64>& vector, ui32 index, ui64 value) {
Y_ASSERT(index < vector.size());
SetNonZero(vector[index], value);
}

ui64 TStageExecutionStats::UpdateAsyncStats(ui32 index, TAsyncStats& aggrAsyncStats, const NYql::NDqProto::TDqAsyncBufferStats& asyncStats) {
ui64 baseTimeMs = 0;

aggrAsyncStats.Bytes.SetNonZero(index, asyncStats.GetBytes());
SetNonZero(aggrAsyncStats.DecompressedBytes[index], asyncStats.GetDecompressedBytes());
SetNonZero(aggrAsyncStats.Rows[index], asyncStats.GetRows());
SetNonZero(aggrAsyncStats.Chunks[index], asyncStats.GetChunks());
SetNonZero(aggrAsyncStats.Splits[index], asyncStats.GetSplits());
SetNonZero(aggrAsyncStats.DecompressedBytes, index, asyncStats.GetDecompressedBytes());
SetNonZero(aggrAsyncStats.Rows, index, asyncStats.GetRows());
SetNonZero(aggrAsyncStats.Chunks, index, asyncStats.GetChunks());
SetNonZero(aggrAsyncStats.Splits, index, asyncStats.GetSplits());

auto firstMessageMs = asyncStats.GetFirstMessageMs();
SetNonZero(aggrAsyncStats.FirstMessageMs[index], firstMessageMs);
SetNonZero(aggrAsyncStats.FirstMessageMs, index, firstMessageMs);
baseTimeMs = NonZeroMin(baseTimeMs, firstMessageMs);

auto pauseMessageMs = asyncStats.GetPauseMessageMs();
SetNonZero(aggrAsyncStats.PauseMessageMs[index], pauseMessageMs);
SetNonZero(aggrAsyncStats.PauseMessageMs, index, pauseMessageMs);
baseTimeMs = NonZeroMin(baseTimeMs, pauseMessageMs);

auto resumeMessageMs = asyncStats.GetResumeMessageMs();
SetNonZero(aggrAsyncStats.ResumeMessageMs[index], resumeMessageMs);
SetNonZero(aggrAsyncStats.ResumeMessageMs, index, resumeMessageMs);
baseTimeMs = NonZeroMin(baseTimeMs, resumeMessageMs);

auto lastMessageMs = asyncStats.GetLastMessageMs();
SetNonZero(aggrAsyncStats.LastMessageMs[index], lastMessageMs);
SetNonZero(aggrAsyncStats.LastMessageMs, index, lastMessageMs);
baseTimeMs = NonZeroMin(baseTimeMs, lastMessageMs);

aggrAsyncStats.WaitTimeUs.SetNonZero(index, asyncStats.GetWaitTimeUs());
SetNonZero(aggrAsyncStats.WaitPeriods[index], asyncStats.GetWaitPeriods());
SetNonZero(aggrAsyncStats.WaitPeriods, index, asyncStats.GetWaitPeriods());
if (firstMessageMs && lastMessageMs > firstMessageMs) {
Y_ASSERT(index < aggrAsyncStats.ActiveTimeUs.size());
aggrAsyncStats.ActiveTimeUs[index] = lastMessageMs - firstMessageMs;
}

Expand Down Expand Up @@ -348,29 +431,29 @@ ui64 TStageExecutionStats::UpdateStats(const NYql::NDqProto::TDqTaskStats& taskS
}

CpuTimeUs.SetNonZero(index, taskStats.GetCpuTimeUs());
SetNonZero(SourceCpuTimeUs[index], taskStats.GetSourceCpuTimeUs());

SetNonZero(InputRows[index], taskStats.GetInputRows());
SetNonZero(InputBytes[index], taskStats.GetInputBytes());
SetNonZero(OutputRows[index], taskStats.GetOutputRows());
SetNonZero(OutputBytes[index], taskStats.GetOutputBytes());
SetNonZero(ResultRows[index], taskStats.GetResultRows());
SetNonZero(ResultBytes[index], taskStats.GetResultBytes());
SetNonZero(IngressRows[index], taskStats.GetIngressRows());
SetNonZero(IngressBytes[index], taskStats.GetIngressBytes());
SetNonZero(IngressDecompressedBytes[index], taskStats.GetIngressDecompressedBytes());
SetNonZero(EgressRows[index], taskStats.GetEgressRows());
SetNonZero(EgressBytes[index], taskStats.GetEgressBytes());
SetNonZero(SourceCpuTimeUs, index, taskStats.GetSourceCpuTimeUs());

SetNonZero(InputRows, index, taskStats.GetInputRows());
SetNonZero(InputBytes, index, taskStats.GetInputBytes());
SetNonZero(OutputRows, index, taskStats.GetOutputRows());
SetNonZero(OutputBytes, index, taskStats.GetOutputBytes());
SetNonZero(ResultRows, index, taskStats.GetResultRows());
SetNonZero(ResultBytes, index, taskStats.GetResultBytes());
SetNonZero(IngressRows, index, taskStats.GetIngressRows());
SetNonZero(IngressBytes, index, taskStats.GetIngressBytes());
SetNonZero(IngressDecompressedBytes, index, taskStats.GetIngressDecompressedBytes());
SetNonZero(EgressRows, index, taskStats.GetEgressRows());
SetNonZero(EgressBytes, index, taskStats.GetEgressBytes());

auto startTimeMs = taskStats.GetStartTimeMs();
SetNonZero(StartTimeMs[index], startTimeMs);
SetNonZero(StartTimeMs, index, startTimeMs);
baseTimeMs = NonZeroMin(baseTimeMs, startTimeMs);

auto finishTimeMs = taskStats.GetFinishTimeMs();
SetNonZero(FinishTimeMs[index], finishTimeMs);
SetNonZero(FinishTimeMs, index, finishTimeMs);
baseTimeMs = NonZeroMin(baseTimeMs, finishTimeMs);

SetNonZero(DurationUs[index], durationUs);
SetNonZero(DurationUs, index, durationUs);
WaitInputTimeUs.SetNonZero(index, taskStats.GetWaitInputTimeUs());
WaitOutputTimeUs.SetNonZero(index, taskStats.GetWaitOutputTimeUs());

Expand All @@ -383,13 +466,13 @@ ui64 TStageExecutionStats::UpdateStats(const NYql::NDqProto::TDqTaskStats& taskS
auto tablePath = tableStat.GetTablePath();
auto [it, inserted] = Tables.try_emplace(tablePath, TaskCount);
auto& aggrTableStats = it->second;
SetNonZero(aggrTableStats.ReadRows[index], tableStat.GetReadRows());
SetNonZero(aggrTableStats.ReadBytes[index], tableStat.GetReadBytes());
SetNonZero(aggrTableStats.WriteRows[index], tableStat.GetWriteRows());
SetNonZero(aggrTableStats.WriteBytes[index], tableStat.GetWriteBytes());
SetNonZero(aggrTableStats.EraseRows[index], tableStat.GetEraseRows());
SetNonZero(aggrTableStats.EraseBytes[index], tableStat.GetEraseBytes());
SetNonZero(aggrTableStats.AffectedPartitions[index], tableStat.GetAffectedPartitions());
SetNonZero(aggrTableStats.ReadRows, index, tableStat.GetReadRows());
SetNonZero(aggrTableStats.ReadBytes, index, tableStat.GetReadBytes());
SetNonZero(aggrTableStats.WriteRows, index, tableStat.GetWriteRows());
SetNonZero(aggrTableStats.WriteBytes, index, tableStat.GetWriteBytes());
SetNonZero(aggrTableStats.EraseRows, index, tableStat.GetEraseRows());
SetNonZero(aggrTableStats.EraseBytes, index, tableStat.GetEraseBytes());
SetNonZero(aggrTableStats.AffectedPartitions, index, tableStat.GetAffectedPartitions());
}

for (auto& sourceStat : taskStats.GetSources()) {
Expand All @@ -403,6 +486,17 @@ ui64 TStageExecutionStats::UpdateStats(const NYql::NDqProto::TDqTaskStats& taskS
baseTimeMs = NonZeroMin(baseTimeMs, UpdateAsyncStats(index, asyncBufferStats.Ingress, sourceStat.GetIngress()));
baseTimeMs = NonZeroMin(baseTimeMs, UpdateAsyncStats(index, asyncBufferStats.Push, sourceStat.GetPush()));
baseTimeMs = NonZeroMin(baseTimeMs, UpdateAsyncStats(index, asyncBufferStats.Pop, sourceStat.GetPop()));
for (auto& partitionStat : sourceStat.GetExternalPartitions()) {
auto key = partitionStat.GetPartitionId();
asyncBufferStats.External.SetNonZero(asyncBufferStats.External.ExternalRows,
index, key, partitionStat.GetExternalRows(), false);
asyncBufferStats.External.SetNonZero(asyncBufferStats.External.ExternalBytes,
index, key, partitionStat.GetExternalBytes(), true);
asyncBufferStats.External.SetNonZero(asyncBufferStats.External.FirstMessageMs,
index, key, partitionStat.GetFirstMessageMs(), false);
asyncBufferStats.External.SetNonZero(asyncBufferStats.External.LastMessageMs,
index, key, partitionStat.GetLastMessageMs(), false);
}
}
}

Expand Down Expand Up @@ -449,22 +543,22 @@ ui64 TStageExecutionStats::UpdateStats(const NYql::NDqProto::TDqTaskStats& taskS
case NYql::NDqProto::TDqOperatorStats::kJoin: {
auto [it, inserted] = Joins.try_emplace(operatorId, TaskCount);
auto& joinStats = it->second;
SetNonZero(joinStats.Rows[index], operatorStat.GetRows());
SetNonZero(joinStats.Bytes[index], operatorStat.GetBytes());
SetNonZero(joinStats.Rows, index, operatorStat.GetRows());
SetNonZero(joinStats.Bytes, index, operatorStat.GetBytes());
break;
}
case NYql::NDqProto::TDqOperatorStats::kFilter: {
auto [it, inserted] = Filters.try_emplace(operatorId, TaskCount);
auto& filterStats = it->second;
SetNonZero(filterStats.Rows[index], operatorStat.GetRows());
SetNonZero(filterStats.Bytes[index], operatorStat.GetBytes());
SetNonZero(filterStats.Rows, index, operatorStat.GetRows());
SetNonZero(filterStats.Bytes, index, operatorStat.GetBytes());
break;
}
case NYql::NDqProto::TDqOperatorStats::kAggregation: {
auto [it, inserted] = Aggregations.try_emplace(operatorId, TaskCount);
auto& aggStats = it->second;
SetNonZero(aggStats.Rows[index], operatorStat.GetRows());
SetNonZero(aggStats.Bytes[index], operatorStat.GetBytes());
SetNonZero(aggStats.Rows, index, operatorStat.GetRows());
SetNonZero(aggStats.Bytes, index, operatorStat.GetBytes());
break;
}
default:
Expand Down Expand Up @@ -1074,6 +1168,8 @@ void TQueryExecutionStats::UpdateTaskStats(ui64 taskId, const NYql::NDqProto::TD
BaseTimeMs = NonZeroMin(BaseTimeMs, it->second.UpdateStats(taskStats, state, stats.GetMaxMemoryUsage(), stats.GetDurationUs()));
}

// SIMD-friendly aggregations are below. Compiler is able to vectorize sum/count, but needs help with min/max

void ExportAggStats(std::vector<ui64>& data, NYql::NDqProto::TDqStatsMinMax& stats) {

Y_DEBUG_ABORT_UNLESS((data.size() & 3) == 0);
Expand Down Expand Up @@ -1215,6 +1311,12 @@ void TQueryExecutionStats::ExportAggAsyncStats(TAsyncStats& data, NYql::NDqProto
}

void TQueryExecutionStats::ExportAggAsyncBufferStats(TAsyncBufferStats& data, NYql::NDqProto::TDqAsyncBufferStatsAggr& stats) {
auto& external = *stats.MutableExternal();
data.External.ExternalRows.ExportAggStats(*external.MutableExternalRows());
data.External.ExternalBytes.ExportAggStats(BaseTimeMs, *external.MutableExternalBytes());
ExportOffsetAggStats(data.External.FirstMessageMs.Values, *external.MutableFirstMessageMs(), BaseTimeMs);
ExportOffsetAggStats(data.External.LastMessageMs.Values, *external.MutableLastMessageMs(), BaseTimeMs);
external.SetPartitionCount(data.External.Indices.size());
ExportAggAsyncStats(data.Ingress, *stats.MutableIngress());
ExportAggAsyncStats(data.Push, *stats.MutablePush());
ExportAggAsyncStats(data.Pop, *stats.MutablePop());
Expand Down
38 changes: 34 additions & 4 deletions ydb/core/kqp/executer_actor/kqp_executer_stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,39 @@ struct TTimeSeriesStats {
std::vector<std::pair<ui64, ui64>> History;

void ExportHistory(ui64 baseTimeMs, NYql::NDqProto::TDqStatsAggr& stats);
void ExportAggStats(NYql::NDqProto::TDqStatsAggr& stats);
void ExportAggStats(ui64 baseTimeMs, NYql::NDqProto::TDqStatsAggr& stats);
void Resize(ui32 taskCount);
void SetNonZero(ui32 taskIndex, ui64 value);
void Resize(ui32 count);
void SetNonZero(ui32 index, ui64 value);
void Pack();
void AppendHistory();
};

struct TPartitionedStats : public TTimeSeriesStats {
std::vector<std::vector<ui64>> Parts;

void ResizeByTasks(ui32 taskCount);
void ResizeByParts(ui32 partCount, ui32 taskCount);
void SetNonZero(ui32 taskIndex, ui32 partIndex, ui64 value, bool recordTimeSeries);
};

struct TTimeMultiSeriesStats {
std::unordered_map<TString, ui32> Indices;
ui32 TaskCount = 0;
ui32 PartCount = 0;

void SetNonZero(TPartitionedStats& stats, ui32 taskIndex, const TString& key, ui64 value, bool recordTimeSeries);
};

struct TExternalStats : public TTimeMultiSeriesStats {
TPartitionedStats ExternalRows;
TPartitionedStats ExternalBytes;
TPartitionedStats FirstMessageMs;
TPartitionedStats LastMessageMs;

void Resize(ui32 taskCount);
void SetHistorySampleCount(ui32 historySampleCount);
void ExportHistory(ui64 baseTimeMs, NYql::NDqProto::TDqExternalAggrStats& stats);
};

struct TMetricInfo {
Expand Down Expand Up @@ -80,6 +109,7 @@ struct TAsyncBufferStats {
Resize(taskCount);
}

TExternalStats External;
TAsyncStats Ingress;
TAsyncStats Push;
TAsyncStats Pop;
Expand Down Expand Up @@ -179,8 +209,8 @@ struct TStageExecutionStats {
std::map<TString, TTableStats> Tables;
std::map<TString, TAsyncBufferStats> Ingress;
std::map<TString, TAsyncBufferStats> Egress;
std::map<ui32, TAsyncBufferStats> Input;
std::map<ui32, TAsyncBufferStats> Output;
std::unordered_map<ui32, TAsyncBufferStats> Input;
std::unordered_map<ui32, TAsyncBufferStats> Output;

std::map<TString, TOperatorStats> Joins;
std::map<TString, TOperatorStats> Filters;
Expand Down
Loading