Skip to content

Commit

Permalink
metrics: fix windowed histogram merging approach
Browse files Browse the repository at this point in the history
Fixes #103814.
Fixes #98266.

This commit updates the windowed histogram merging approach
to add the previous window's histogram bucket counts and sample
count to those of the current one. As a result of this change,
the histograms will no longer report under-sampled quantile values,
and timeseries metrics-derived charts (e.g., the quantile-based
SQL service latency charts on the DB console's Metrics page) will
more accurately display metrics.

Release note (bug fix): Updated the histogram window merge calculation
to more accurately interpolate quantile values. This change will result
in smoother, more accurate Metrics charts on the DB Console.

Co-authored-by: Aaditya Sondhi <[email protected]>
  • Loading branch information
ericharmeling and aadityasondhi committed Jun 2, 2023
1 parent 24a32f2 commit 16408b7
Show file tree
Hide file tree
Showing 3 changed files with 155 additions and 19 deletions.
16 changes: 9 additions & 7 deletions pkg/util/metric/hdrhistogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,15 +171,19 @@ func (h *HdrHistogram) ToPrometheusMetric() *prometheusgo.Metric {

// TotalWindowed implements the WindowedHistogram interface.
func (h *HdrHistogram) TotalWindowed() (int64, float64) {
pHist := h.ToPrometheusMetricWindowed().Histogram
return int64(pHist.GetSampleCount()), pHist.GetSampleSum()
h.mu.Lock()
defer h.mu.Unlock()
hist := h.mu.sliding.Merge()
totalSum := float64(hist.TotalCount()) * hist.Mean()
return hist.TotalCount(), totalSum
}

func (h *HdrHistogram) toPrometheusMetricWindowedLocked() *prometheusgo.Metric {
hist := &prometheusgo.Histogram{}

maybeTick(h.mu.tickHelper)
bars := h.mu.sliding.Current.Distribution()
mergedHist := h.mu.sliding.Merge()
bars := mergedHist.Distribution()
hist.Bucket = make([]*prometheusgo.Bucket, 0, len(bars))

var cumCount uint64
Expand All @@ -202,7 +206,6 @@ func (h *HdrHistogram) toPrometheusMetricWindowedLocked() *prometheusgo.Metric {
}
hist.SampleCount = &cumCount
hist.SampleSum = &sum // can do better here; we approximate in the loop

return &prometheusgo.Metric{
Histogram: hist,
}
Expand Down Expand Up @@ -233,13 +236,12 @@ func (h *HdrHistogram) ValueAtQuantileWindowed(q float64) float64 {
func (h *HdrHistogram) Mean() float64 {
h.mu.Lock()
defer h.mu.Unlock()

return h.mu.cumulative.Mean()
}

func (h *HdrHistogram) MeanWindowed() float64 {
h.mu.Lock()
defer h.mu.Unlock()

return h.mu.sliding.Current.Mean()
hist := h.mu.sliding.Merge()
return hist.Mean()
}
66 changes: 56 additions & 10 deletions pkg/util/metric/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,12 @@ type WindowedHistogram interface {
Total() (int64, float64)
// MeanWindowed returns the average of the samples in the current window.
MeanWindowed() float64
// Mean returns the average of the sample in teh cumulative histogram.
// Mean returns the average of the sample in the cumulative histogram.
Mean() float64
// ValueAtQuantileWindowed takes a quantile value [0,100] and returns the
// interpolated value at that quantile for the windowed histogram.
// Methods implementing this interface should the merge buckets, sums,
// and counts of previous and current windows.
ValueAtQuantileWindowed(q float64) float64
}

Expand Down Expand Up @@ -269,7 +271,7 @@ func newHistogram(meta Metadata, windowDuration time.Duration, buckets []float64
}
h.windowed.tickHelper = &tickHelper{
nextT: now(),
tickInterval: windowDuration,
tickInterval: windowDuration / 2,
onTick: func() {
h.windowed.prev = h.windowed.cur
h.windowed.cur = prometheus.NewHistogram(opts)
Expand Down Expand Up @@ -368,15 +370,23 @@ func (h *Histogram) ToPrometheusMetric() *prometheusgo.Metric {
return m
}

// ToPrometheusMetricWindowed returns a filled-in prometheus metric of the right type.
// ToPrometheusMetricWindowed returns a filled-in prometheus metric of the
// right type.
func (h *Histogram) ToPrometheusMetricWindowed() *prometheusgo.Metric {
h.windowed.Lock()
defer h.windowed.Unlock()
m := &prometheusgo.Metric{}
if err := h.windowed.cur.Write(m); err != nil {
cur := &prometheusgo.Metric{}
prev := &prometheusgo.Metric{}
if err := h.windowed.cur.Write(cur); err != nil {
panic(err)
}
return m
if h.windowed.prev != nil {
if err := h.windowed.prev.Write(prev); err != nil {
panic(err)
}
MergeWindowedHistogram(cur.Histogram, prev.Histogram)
}
return cur
}

// GetMetadata returns the metric's metadata including the Prometheus
Expand Down Expand Up @@ -428,7 +438,8 @@ func (h *Histogram) MeanWindowed() float64 {
// 2. Since the prometheus client library ensures buckets are in a strictly
// increasing order at creation, we do not sort them.
func (h *Histogram) ValueAtQuantileWindowed(q float64) float64 {
return ValueAtQuantileWindowed(h.ToPrometheusMetricWindowed().Histogram, q)
return ValueAtQuantileWindowed(h.ToPrometheusMetricWindowed().Histogram,
q)
}

var _ PrometheusExportable = (*ManualWindowHistogram)(nil)
Expand Down Expand Up @@ -592,26 +603,46 @@ func (mwh *ManualWindowHistogram) ToPrometheusMetric() *prometheusgo.Metric {
return m
}

// ToPrometheusMetricWindowed returns a filled-in prometheus metric of the
// right type.
func (mwh *ManualWindowHistogram) ToPrometheusMetricWindowed() *prometheusgo.
Metric {
cur := &prometheusgo.Metric{}
if err := mwh.mu.cum.Write(cur); err != nil {
panic(err)
}
if mwh.mu.prev != nil {
MergeWindowedHistogram(cur.Histogram, mwh.mu.prev)
}
return cur
}

// TotalWindowed implements the WindowedHistogram interface.
func (mwh *ManualWindowHistogram) TotalWindowed() (int64, float64) {
mwh.mu.RLock()
defer mwh.mu.RUnlock()
return int64(mwh.mu.cur.GetSampleCount()), mwh.mu.cur.GetSampleSum()
pHist := mwh.ToPrometheusMetricWindowed().Histogram
return int64(pHist.GetSampleCount()), pHist.GetSampleSum()
}

// Total implements the WindowedHistogram interface.
func (mwh *ManualWindowHistogram) Total() (int64, float64) {
mwh.mu.RLock()
defer mwh.mu.RUnlock()
h := mwh.ToPrometheusMetric().Histogram
return int64(h.GetSampleCount()), h.GetSampleSum()
}

func (mwh *ManualWindowHistogram) MeanWindowed() float64 {
mwh.mu.RLock()
defer mwh.mu.RUnlock()
return mwh.mu.cur.GetSampleSum() / float64(mwh.mu.cur.GetSampleCount())
pHist := mwh.ToPrometheusMetricWindowed().Histogram
return pHist.GetSampleSum() / float64(pHist.GetSampleCount())
}

func (mwh *ManualWindowHistogram) Mean() float64 {
mwh.mu.RLock()
defer mwh.mu.RUnlock()
h := mwh.ToPrometheusMetric().Histogram
return h.GetSampleSum() / float64(h.GetSampleCount())
}
Expand All @@ -626,7 +657,7 @@ func (mwh *ManualWindowHistogram) ValueAtQuantileWindowed(q float64) float64 {
if mwh.mu.cur == nil {
return 0
}
return ValueAtQuantileWindowed(mwh.mu.cur, q)
return ValueAtQuantileWindowed(mwh.ToPrometheusMetricWindowed().Histogram, q)
}

// A Counter holds a single mutable atomic value.
Expand Down Expand Up @@ -881,6 +912,21 @@ func (g *GaugeFloat64) GetMetadata() Metadata {
return baseMetadata
}

// MergeWindowedHistogram adds the bucket counts, sample count, and sample sum
// from the previous windowed histogram to those of the current windowed
// histogram.
// NB: Buckets on each histogram must be the same
func MergeWindowedHistogram(cur *prometheusgo.Histogram, prev *prometheusgo.Histogram) {
for i, bucket := range cur.Bucket {
count := *bucket.CumulativeCount + *prev.Bucket[i].CumulativeCount
*bucket.CumulativeCount = count
}
sampleCount := *cur.SampleCount + *prev.SampleCount
*cur.SampleCount = sampleCount
sampleSum := *cur.SampleSum + *prev.SampleSum
*cur.SampleSum = sampleSum
}

// ValueAtQuantileWindowed takes a quantile value [0,100] and returns the
// interpolated value at that quantile for the given histogram.
func ValueAtQuantileWindowed(histogram *prometheusgo.Histogram, q float64) float64 {
Expand Down
92 changes: 90 additions & 2 deletions pkg/util/metric/metric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"encoding/json"
"math"
"reflect"
"sort"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -285,11 +286,10 @@ func TestNewHistogramRotate(t *testing.T) {
// But cumulative histogram has history (if i > 0).
count, _ := h.Total()
require.EqualValues(t, i, count)

// Add a measurement and verify it's there.
{
h.RecordValue(12345)
f := float64(12345)
f := float64(12345) + sum
_, wSum := h.TotalWindowed()
require.Equal(t, wSum, f)
}
Expand All @@ -298,3 +298,91 @@ func TestNewHistogramRotate(t *testing.T) {
// Go to beginning.
}
}

func TestHistogramWindowed(t *testing.T) {
defer TestingSetNow(nil)()
setNow(0)

duration := 10 * time.Second

h := NewHistogram(HistogramOptions{
Mode: HistogramModePrometheus,
Metadata: Metadata{},
Duration: duration,
Buckets: IOLatencyBuckets,
})

measurements := []int64{200000000, 0, 4000000, 5000000, 10000000, 20000000,
25000000, 30000000, 40000000, 90000000}

// here we sort the measurements so we can calculate the expected quantile
// values for a given windowed histogram. These should be the same for all
// windows w > 0 before observations and for all windows w after observations.
sortedMeasurements := make([]int64, len(measurements))
copy(sortedMeasurements, measurements)
sort.Slice(sortedMeasurements, func(i, j int) bool {
return sortedMeasurements[i] < sortedMeasurements[j]
})

count := 0
j := 0
var expQuantileValues []float64
for i := range IOLatencyBuckets {
if j < len(sortedMeasurements) && IOLatencyBuckets[i] > float64(
sortedMeasurements[j]) {
count += 1
j += 1
expQuantileValues = append(expQuantileValues, IOLatencyBuckets[i])
}
}

w := 4
var expHist []prometheusgo.Histogram
var expSum float64
var expCount uint64
for i := 0; i < w; i++ {
h.Inspect(func(interface{}) {}) // triggers ticking
// We want to check the sum, count, and quantile values of the current
// window after it has merged with the previous one, and before any
// observations have been recorded.
if i > 0 {
expSum = *expHist[i-1].SampleSum
expCount = *expHist[i-1].SampleCount
require.Equal(t, expSum/float64(expCount), h.MeanWindowed())
require.Equal(t, 0.0, h.ValueAtQuantileWindowed(0))
require.Equal(t, expQuantileValues[0], h.ValueAtQuantileWindowed(10))
require.Equal(t, expQuantileValues[4], h.ValueAtQuantileWindowed(50))
require.Equal(t, expQuantileValues[7], h.ValueAtQuantileWindowed(80))
require.Equal(t, expQuantileValues[9], h.ValueAtQuantileWindowed(99.99))
} else {
// If there is no previous window, we should be unable to calculate mean
// or quantile without any observations.
require.Equal(t, 0.0, h.ValueAtQuantileWindowed(99.99))
if !math.IsNaN(h.MeanWindowed()) {
t.Fatalf("mean should be undefined with no observations")
}
}
for _, m := range measurements {
h.RecordValue(m)
expCount += 1
expSum += float64(m)
}
expHist = append(expHist, prometheusgo.Histogram{
SampleCount: &expCount,
SampleSum: &expSum,
})

// Values should all be the same, since we have observed and counted the
// same values on each window iteration.
require.Equal(t, expSum/float64(expCount), h.MeanWindowed())
require.Equal(t, 0.0, h.ValueAtQuantileWindowed(0))
require.Equal(t, expQuantileValues[0], h.ValueAtQuantileWindowed(10))
require.Equal(t, expQuantileValues[4], h.ValueAtQuantileWindowed(50))
require.Equal(t, expQuantileValues[7], h.ValueAtQuantileWindowed(80))
require.Equal(t, expQuantileValues[9], h.ValueAtQuantileWindowed(99.99))

// Tick. This rotates the histogram.
setNow(time.Duration(i+1) * (duration / 2))
// Go to beginning.
}
}

0 comments on commit 16408b7

Please sign in to comment.