Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions prometheus/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package prometheus
// Histogram and Summary to add observations.
type Observer interface {
Observe(float64)
Collector
}

// The ObserverFunc type is an adapter to allow the use of ordinary
Expand Down
259 changes: 258 additions & 1 deletion prometheus/timer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@

package prometheus

import "time"
import (
"sync"
"time"
)

// Timer is a helper type to time functions. Use NewTimer to create new
// instances.
Expand Down Expand Up @@ -79,3 +82,257 @@ func (t *Timer) ObserveDurationWithExemplar(exemplar Labels) time.Duration {
}
return d
}

type TimerCounter struct {
Counter
updateInterval time.Duration
}

func NewTimerCounter(cnt Counter, updateInterval time.Duration) *TimerCounter {
t := &TimerCounter{
Counter: cnt,
updateInterval: updateInterval,
}

return t
}

// Observe starts a prom.Timer that records into the embedded Histogram and
// returns a “stop” callback. Best used with defer:
//
// defer timer.Observe()()
//
// The inner closure calls ObserveDuration on the hidden prom.Timer, recording
// the elapsed seconds into the histogram’s current bucket.
func (t *TimerCounter) Observe() (stop func()) {
start := time.Now()
ch := make(chan struct{})
wg := &sync.WaitGroup{}

wg.Add(1)
go func() {
defer wg.Done()
added := float64(0)
var updateChan <-chan time.Time
if t.updateInterval > 0 {
ticker := time.NewTicker(t.updateInterval)
defer ticker.Stop()
updateChan = ticker.C
}
for {
select {
case <-ch:
d := time.Since(start)
if diff := d.Seconds() - added; diff > 0 {
t.Counter.Add(diff)
}
return
case <-updateChan:
d := time.Since(start)
if diff := d.Seconds() - added; diff > 0 {
t.Counter.Add(diff)
added += diff
}
}
}
}()

return func() {
ch <- struct{}{}
wg.Wait()
}
}

func (t *TimerCounter) Wrap(fn func()) {
defer t.Observe()()
fn()
}

func (t *TimerCounter) Add(dur time.Duration) {
t.Counter.Add(dur.Seconds())
}

type TimerCounterVec struct {
*CounterVec
updateInterval time.Duration
}

func NewTimerCounterVec(cnt *CounterVec, updateInterval time.Duration) *TimerCounterVec {
t := &TimerCounterVec{
CounterVec: cnt,
updateInterval: updateInterval,
}

return t
}

func (t *TimerCounterVec) Observe(labels map[string]string) (stop func()) {
start := time.Now()
ch := make(chan struct{})
wg := &sync.WaitGroup{}

wg.Add(1)
go func() {
defer wg.Done()
added := float64(0)
var updateChan <-chan time.Time
if t.updateInterval > 0 {
ticker := time.NewTicker(t.updateInterval)
defer ticker.Stop()
updateChan = ticker.C
}
for {
select {
case <-ch:
d := time.Since(start)
if diff := d.Seconds() - added; diff > 0 {
t.CounterVec.With(labels).Add(diff)
}
return
case <-updateChan:
d := time.Since(start)
if diff := d.Seconds() - added; diff > 0 {
t.CounterVec.With(labels).Add(diff)
added += diff
}
}
}
}()

return func() {
ch <- struct{}{}
wg.Wait()
}
}

func (t *TimerCounterVec) ObserveLabelValues(values ...string) (stop func()) {
start := time.Now()
ch := make(chan struct{})
wg := &sync.WaitGroup{}

wg.Add(1)
go func() {
defer wg.Done()
added := float64(0)
var updateChan <-chan time.Time
if t.updateInterval > 0 {
ticker := time.NewTicker(t.updateInterval)
defer ticker.Stop()
updateChan = ticker.C
}
for {
select {
case <-ch:
d := time.Since(start)
if diff := d.Seconds() - added; diff > 0 {
t.CounterVec.WithLabelValues(values...).Add(diff)
}
return
case <-updateChan:
d := time.Since(start)
if diff := d.Seconds() - added; diff > 0 {
t.CounterVec.WithLabelValues(values...).Add(diff)
added += diff
}
}
}
}()

return func() {
ch <- struct{}{}
wg.Wait()
}
}

func (t *TimerCounterVec) Wrap(labels map[string]string, fn func()) {
defer t.Observe(labels)()
fn()
}

func (t *TimerCounterVec) WrapLabelValues(values []string, fn func()) {
defer t.ObserveLabelValues(values...)()
fn()
}

func (t *TimerCounterVec) Add(dur time.Duration, labels map[string]string) {
t.CounterVec.With(labels).Add(dur.Seconds())
}

func (t *TimerCounterVec) AddLabelValues(dur time.Duration, values ...string) {
t.CounterVec.WithLabelValues(values...).Add(dur.Seconds())
}

type TimerObserver struct {
Observer
}

func NewTimerObserver(obs Observer) *TimerObserver {
t := &TimerObserver{
Observer: obs,
}

return t
}

func (t *TimerObserver) Observe() (stop func()) {
start := time.Now()
return func() {
d := time.Since(start)
t.Observer.Observe(d.Seconds())
}
}

func (t *TimerObserver) Wrap(fn func()) {
defer t.Observe()()
fn()
}

func (t *TimerObserver) Add(dur time.Duration) {
t.Observer.Observe(dur.Seconds())
}

type TimerObserverVec struct {
ObserverVec
}

func NewTimerObserverVec(obs ObserverVec) *TimerObserverVec {
t := &TimerObserverVec{
ObserverVec: obs,
}

return t
}

func (t *TimerObserverVec) Observe(labels map[string]string) func() {
start := time.Now()
return func() {
d := time.Since(start)
t.ObserverVec.With(labels).Observe(d.Seconds())
}
}

func (t *TimerObserverVec) ObserveLabelValues(values ...string) func() {
start := time.Now()
return func() {
d := time.Since(start)
t.ObserverVec.WithLabelValues(values...).Observe(d.Seconds())
}
}

func (t *TimerObserverVec) Wrap(labels map[string]string, fn func()) {
defer t.Observe(labels)()
fn()
}

func (t *TimerObserverVec) WrapLabelValues(values []string, fn func()) {
defer t.ObserveLabelValues(values...)()
fn()
}

func (t *TimerObserverVec) Add(dur time.Duration, labels map[string]string) {
t.ObserverVec.With(labels).Observe(dur.Seconds())
}

func (t *TimerObserverVec) AddLabelValues(dur time.Duration, values ...string) {
t.ObserverVec.WithLabelValues(values...).Observe(dur.Seconds())
}
Loading