diff --git a/prometheus/observer.go b/prometheus/observer.go index 03773b21f..b666a90ae 100644 --- a/prometheus/observer.go +++ b/prometheus/observer.go @@ -17,6 +17,7 @@ package prometheus // Histogram and Summary to add observations. type Observer interface { Observe(float64) + Collector } // The ObserverFunc type is an adapter to allow the use of ordinary diff --git a/prometheus/timer.go b/prometheus/timer.go index 52344fef5..dfc11f2b8 100644 --- a/prometheus/timer.go +++ b/prometheus/timer.go @@ -13,7 +13,10 @@ package prometheus -import "time" +import ( + "sync" + "time" +) // Timer is a helper type to time functions. Use NewTimer to create new // instances. @@ -79,3 +82,257 @@ func (t *Timer) ObserveDurationWithExemplar(exemplar Labels) time.Duration { } return d } + +type TimerCounter struct { + Counter + updateInterval time.Duration +} + +func NewTimerCounter(cnt Counter, updateInterval time.Duration) *TimerCounter { + t := &TimerCounter{ + Counter: cnt, + updateInterval: updateInterval, + } + + return t +} + +// Observe starts a prom.Timer that records into the embedded Histogram and +// returns a “stop” callback. Best used with defer: +// +// defer timer.Observe()() +// +// The inner closure calls ObserveDuration on the hidden prom.Timer, recording +// the elapsed seconds into the histogram’s current bucket. +func (t *TimerCounter) Observe() (stop func()) { + start := time.Now() + ch := make(chan struct{}) + wg := &sync.WaitGroup{} + + wg.Add(1) + go func() { + defer wg.Done() + added := float64(0) + var updateChan <-chan time.Time + if t.updateInterval > 0 { + ticker := time.NewTicker(t.updateInterval) + defer ticker.Stop() + updateChan = ticker.C + } + for { + select { + case <-ch: + d := time.Since(start) + if diff := d.Seconds() - added; diff > 0 { + t.Counter.Add(diff) + } + return + case <-updateChan: + d := time.Since(start) + if diff := d.Seconds() - added; diff > 0 { + t.Counter.Add(diff) + added += diff + } + } + } + }() + + return func() { + ch <- struct{}{} + wg.Wait() + } +} + +func (t *TimerCounter) Wrap(fn func()) { + defer t.Observe()() + fn() +} + +func (t *TimerCounter) Add(dur time.Duration) { + t.Counter.Add(dur.Seconds()) +} + +type TimerCounterVec struct { + *CounterVec + updateInterval time.Duration +} + +func NewTimerCounterVec(cnt *CounterVec, updateInterval time.Duration) *TimerCounterVec { + t := &TimerCounterVec{ + CounterVec: cnt, + updateInterval: updateInterval, + } + + return t +} + +func (t *TimerCounterVec) Observe(labels map[string]string) (stop func()) { + start := time.Now() + ch := make(chan struct{}) + wg := &sync.WaitGroup{} + + wg.Add(1) + go func() { + defer wg.Done() + added := float64(0) + var updateChan <-chan time.Time + if t.updateInterval > 0 { + ticker := time.NewTicker(t.updateInterval) + defer ticker.Stop() + updateChan = ticker.C + } + for { + select { + case <-ch: + d := time.Since(start) + if diff := d.Seconds() - added; diff > 0 { + t.CounterVec.With(labels).Add(diff) + } + return + case <-updateChan: + d := time.Since(start) + if diff := d.Seconds() - added; diff > 0 { + t.CounterVec.With(labels).Add(diff) + added += diff + } + } + } + }() + + return func() { + ch <- struct{}{} + wg.Wait() + } +} + +func (t *TimerCounterVec) ObserveLabelValues(values ...string) (stop func()) { + start := time.Now() + ch := make(chan struct{}) + wg := &sync.WaitGroup{} + + wg.Add(1) + go func() { + defer wg.Done() + added := float64(0) + var updateChan <-chan time.Time + if t.updateInterval > 0 { + ticker := time.NewTicker(t.updateInterval) + defer ticker.Stop() + updateChan = ticker.C + } + for { + select { + case <-ch: + d := time.Since(start) + if diff := d.Seconds() - added; diff > 0 { + t.CounterVec.WithLabelValues(values...).Add(diff) + } + return + case <-updateChan: + d := time.Since(start) + if diff := d.Seconds() - added; diff > 0 { + t.CounterVec.WithLabelValues(values...).Add(diff) + added += diff + } + } + } + }() + + return func() { + ch <- struct{}{} + wg.Wait() + } +} + +func (t *TimerCounterVec) Wrap(labels map[string]string, fn func()) { + defer t.Observe(labels)() + fn() +} + +func (t *TimerCounterVec) WrapLabelValues(values []string, fn func()) { + defer t.ObserveLabelValues(values...)() + fn() +} + +func (t *TimerCounterVec) Add(dur time.Duration, labels map[string]string) { + t.CounterVec.With(labels).Add(dur.Seconds()) +} + +func (t *TimerCounterVec) AddLabelValues(dur time.Duration, values ...string) { + t.CounterVec.WithLabelValues(values...).Add(dur.Seconds()) +} + +type TimerObserver struct { + Observer +} + +func NewTimerObserver(obs Observer) *TimerObserver { + t := &TimerObserver{ + Observer: obs, + } + + return t +} + +func (t *TimerObserver) Observe() (stop func()) { + start := time.Now() + return func() { + d := time.Since(start) + t.Observer.Observe(d.Seconds()) + } +} + +func (t *TimerObserver) Wrap(fn func()) { + defer t.Observe()() + fn() +} + +func (t *TimerObserver) Add(dur time.Duration) { + t.Observer.Observe(dur.Seconds()) +} + +type TimerObserverVec struct { + ObserverVec +} + +func NewTimerObserverVec(obs ObserverVec) *TimerObserverVec { + t := &TimerObserverVec{ + ObserverVec: obs, + } + + return t +} + +func (t *TimerObserverVec) Observe(labels map[string]string) func() { + start := time.Now() + return func() { + d := time.Since(start) + t.ObserverVec.With(labels).Observe(d.Seconds()) + } +} + +func (t *TimerObserverVec) ObserveLabelValues(values ...string) func() { + start := time.Now() + return func() { + d := time.Since(start) + t.ObserverVec.WithLabelValues(values...).Observe(d.Seconds()) + } +} + +func (t *TimerObserverVec) Wrap(labels map[string]string, fn func()) { + defer t.Observe(labels)() + fn() +} + +func (t *TimerObserverVec) WrapLabelValues(values []string, fn func()) { + defer t.ObserveLabelValues(values...)() + fn() +} + +func (t *TimerObserverVec) Add(dur time.Duration, labels map[string]string) { + t.ObserverVec.With(labels).Observe(dur.Seconds()) +} + +func (t *TimerObserverVec) AddLabelValues(dur time.Duration, values ...string) { + t.ObserverVec.WithLabelValues(values...).Observe(dur.Seconds()) +}