Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions plugins/inputs/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers/openmetrics"
parsers_prometheus "github.com/influxdata/telegraf/plugins/parsers/prometheus"
"github.com/influxdata/telegraf/selfstat"
)

//go:embed sample.conf
Expand Down Expand Up @@ -80,6 +81,7 @@ type Prometheus struct {
PodLabelInclude []string `toml:"pod_label_include"`
PodLabelExclude []string `toml:"pod_label_exclude"`
CacheRefreshInterval int `toml:"cache_refresh_interval"`
Statistics *selfstat.Collector `toml:"-"`

// Consul discovery
ConsulConfig consulConfig `toml:"consul"`
Expand Down Expand Up @@ -118,6 +120,10 @@ type Prometheus struct {

// list of http services to scrape
httpServices map[string]urlAndAddress

connectStat map[string]selfstat.Stat
gathersTotalSuccessStat map[string]selfstat.Stat
gathersTotalFailureStat map[string]selfstat.Stat
}

type urlAndAddress struct {
Expand Down Expand Up @@ -256,6 +262,14 @@ func (p *Prometheus) Init() error {

p.kubernetesPods = make(map[podID]urlAndAddress)

// Initialize Internal Metrics
// if p.Statistics == nil {
// p.Statistics = selfstat.NewCollector(nil)
// }
p.connectStat = make(map[string]selfstat.Stat)
p.gathersTotalSuccessStat = make(map[string]selfstat.Stat)
p.gathersTotalFailureStat = make(map[string]selfstat.Stat)

return nil
}

Expand Down Expand Up @@ -531,6 +545,13 @@ func (p *Prometheus) gatherURL(u urlAndAddress, acc telegraf.Accumulator) (map[s

var err error
var resp *http.Response
urlStr := u.url.String()

// Create internal metrics for each URL
p.connectStat[urlStr] = p.Statistics.Register("prometheus", "connection_status", map[string]string{"url": urlStr})
p.gathersTotalSuccessStat[urlStr] = p.Statistics.Register("prometheus", "gathers_total", map[string]string{"url": urlStr, "status": "success"})
p.gathersTotalFailureStat[urlStr] = p.Statistics.Register("prometheus", "gathers_total", map[string]string{"url": urlStr, "status": "failure"})

var start time.Time
if u.url.Scheme != "unix" {
start = time.Now()
Expand All @@ -541,13 +562,17 @@ func (p *Prometheus) gatherURL(u urlAndAddress, acc telegraf.Accumulator) (map[s
}
end := time.Since(start).Seconds()
if err != nil {
p.connectStat[urlStr].Set(0)
p.gathersTotalFailureStat[urlStr].Incr(1)
return requestFields, tags, fmt.Errorf("error making HTTP request to %q: %w", u.url, err)
}
requestFields["response_time"] = end

defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
p.connectStat[urlStr].Set(0)
p.gathersTotalFailureStat[urlStr].Incr(1)
return requestFields, tags, fmt.Errorf("%q returned HTTP status %q", u.url, resp.Status)
}

Expand All @@ -562,19 +587,26 @@ func (p *Prometheus) gatherURL(u urlAndAddress, acc telegraf.Accumulator) (map[s

body, err = io.ReadAll(lr)
if err != nil {
p.connectStat[urlStr].Set(0)
p.gathersTotalFailureStat[urlStr].Incr(1)
return requestFields, tags, fmt.Errorf("error reading body: %w", err)
}
if int64(len(body)) > limit {
p.connectStat[urlStr].Set(0)
p.gathersTotalFailureStat[urlStr].Incr(1)
p.Log.Infof("skipping %s: content length exceeded maximum body size (%d)", u.url, limit)
return requestFields, tags, nil
}
} else {
body, err = io.ReadAll(resp.Body)
if err != nil {
p.connectStat[urlStr].Set(0)
p.gathersTotalFailureStat[urlStr].Incr(1)
return requestFields, tags, fmt.Errorf("error reading body: %w", err)
}
}
requestFields["content_length"] = len(body)
p.connectStat[urlStr].Set(1)

// Override the response format if the user requested it
if p.contentType != "" {
Expand All @@ -600,8 +632,10 @@ func (p *Prometheus) gatherURL(u urlAndAddress, acc telegraf.Accumulator) (map[s
}
metrics, err := metricParser.Parse(body)
if err != nil {
p.gathersTotalFailureStat[urlStr].Incr(1)
return requestFields, tags, fmt.Errorf("error reading metrics for %q: %w", u.url, err)
}
p.gathersTotalSuccessStat[urlStr].Incr(1)

for _, metric := range metrics {
tags := metric.Tags()
Expand Down
43 changes: 33 additions & 10 deletions plugins/inputs/prometheus/prometheus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/selfstat"
"github.com/influxdata/telegraf/testutil"
)

Expand Down Expand Up @@ -62,9 +63,10 @@ func TestPrometheusGeneratesMetrics(t *testing.T) {
defer ts.Close()

p := &Prometheus{
Log: testutil.Logger{},
URLs: []string{ts.URL},
URLTag: "url",
Log: testutil.Logger{},
URLs: []string{ts.URL},
Statistics: selfstat.NewCollector(nil),
URLTag: "url",
}
err := p.Init()
require.NoError(t, err)
Expand Down Expand Up @@ -139,6 +141,7 @@ func TestPrometheusCustomHeader(t *testing.T) {
URLs: []string{ts.URL},
URLTag: "url",
HTTPHeaders: test.headers,
Statistics: selfstat.NewCollector(nil),
}
err := p.Init()
require.NoError(t, err)
Expand All @@ -162,6 +165,7 @@ func TestPrometheusGeneratesMetricsWithHostNameTag(t *testing.T) {
p := &Prometheus{
Log: testutil.Logger{},
KubernetesServices: []string{ts.URL},
Statistics: selfstat.NewCollector(nil),
URLTag: "url",
}
err := p.Init()
Expand Down Expand Up @@ -200,6 +204,7 @@ test_counter{label="test"} 1 1685443805885`
p := &Prometheus{
Log: testutil.Logger{},
KubernetesServices: []string{ts.URL},
Statistics: selfstat.NewCollector(nil),
}
require.NoError(t, p.Init())

Expand Down Expand Up @@ -240,6 +245,7 @@ func TestPrometheusGeneratesMetricsAlthoughFirstDNSFailsIntegration(t *testing.T
Log: testutil.Logger{},
URLs: []string{ts.URL},
KubernetesServices: []string{"http://random.telegraf.local:88/metrics"},
Statistics: selfstat.NewCollector(nil),
}
err := p.Init()
require.NoError(t, err)
Expand Down Expand Up @@ -273,6 +279,7 @@ func TestPrometheusGeneratesMetricsSlowEndpoint(t *testing.T) {
client: &http.Client{
Timeout: time.Second * 5,
},
Statistics: selfstat.NewCollector(nil),
}
err := p.Init()
require.NoError(t, err)
Expand Down Expand Up @@ -308,6 +315,7 @@ func TestPrometheusGeneratesMetricsSlowEndpointHitTheTimeout(t *testing.T) {
client: &http.Client{
Timeout: time.Second * 5,
},
Statistics: selfstat.NewCollector(nil),
}
err := p.Init()
require.NoError(t, err)
Expand All @@ -334,9 +342,10 @@ func TestPrometheusGeneratesMetricsSlowEndpointNewConfigParameter(t *testing.T)
defer ts.Close()

p := &Prometheus{
Log: testutil.Logger{},
URLs: []string{ts.URL},
URLTag: "url",
Log: testutil.Logger{},
URLs: []string{ts.URL},
URLTag: "url",
Statistics: selfstat.NewCollector(nil),
}
err := p.Init()
require.NoError(t, err)
Expand Down Expand Up @@ -367,9 +376,10 @@ func TestPrometheusGeneratesMetricsSlowEndpointHitTheTimeoutNewConfigParameter(t
defer ts.Close()

p := &Prometheus{
Log: testutil.Logger{},
URLs: []string{ts.URL},
URLTag: "url",
Log: testutil.Logger{},
URLs: []string{ts.URL},
URLTag: "url",
Statistics: selfstat.NewCollector(nil),
}
err := p.Init()
require.NoError(t, err)
Expand All @@ -396,6 +406,7 @@ func TestPrometheusContentLengthLimit(t *testing.T) {
URLs: []string{ts.URL},
URLTag: "url",
ContentLengthLimit: 1,
Statistics: selfstat.NewCollector(nil),
}
require.NoError(t, p.Init())

Expand All @@ -419,6 +430,7 @@ func TestPrometheusGeneratesSummaryMetricsV2(t *testing.T) {
URLs: []string{ts.URL},
URLTag: "url",
MetricVersion: 2,
Statistics: selfstat.NewCollector(nil),
}
err := p.Init()
require.NoError(t, err)
Expand Down Expand Up @@ -457,6 +469,7 @@ go_gc_duration_seconds_count 42`
URLTag: "",
MetricVersion: 2,
EnableRequestMetrics: true,
Statistics: selfstat.NewCollector(nil),
}
err := p.Init()
require.NoError(t, err)
Expand Down Expand Up @@ -528,6 +541,7 @@ func TestPrometheusGeneratesGaugeMetricsV2(t *testing.T) {
URLs: []string{ts.URL},
URLTag: "url",
MetricVersion: 2,
Statistics: selfstat.NewCollector(nil),
}
err := p.Init()
require.NoError(t, err)
Expand Down Expand Up @@ -557,6 +571,7 @@ func TestPrometheusGeneratesMetricsWithIgnoreTimestamp(t *testing.T) {
URLs: []string{ts.URL},
URLTag: "url",
IgnoreTimestamp: true,
Statistics: selfstat.NewCollector(nil),
}
require.NoError(t, p.Init())

Expand All @@ -571,7 +586,7 @@ func TestPrometheusGeneratesMetricsWithIgnoreTimestamp(t *testing.T) {

func TestUnsupportedFieldSelector(t *testing.T) {
fieldSelectorString := "spec.containerName=container"
prom := &Prometheus{Log: testutil.Logger{}, KubernetesFieldSelector: fieldSelectorString}
prom := &Prometheus{Log: testutil.Logger{}, KubernetesFieldSelector: fieldSelectorString, Statistics: selfstat.NewCollector(nil)}

fieldSelector, err := fields.ParseSelector(prom.KubernetesFieldSelector)
require.NoError(t, err)
Expand All @@ -589,6 +604,7 @@ func TestInitConfigErrors(t *testing.T) {
MonitorPods: true,
PodScrapeScope: "node",
PodScrapeInterval: 60,
Statistics: selfstat.NewCollector(nil),
}

// Both invalid IP addresses
Expand Down Expand Up @@ -634,6 +650,7 @@ func TestInitConfigSelectors(t *testing.T) {
PodScrapeInterval: 60,
KubernetesLabelSelector: "app=test",
KubernetesFieldSelector: "spec.nodeName=node-0",
Statistics: selfstat.NewCollector(nil),
}
err := p.Init()
require.NoError(t, err)
Expand All @@ -659,6 +676,7 @@ test_counter{label="test"} 1 1685443805885`
Log: testutil.Logger{},
KubernetesServices: []string{ts.URL},
EnableRequestMetrics: true,
Statistics: selfstat.NewCollector(nil),
}
require.NoError(t, p.Init())

Expand Down Expand Up @@ -703,6 +721,7 @@ func TestPrometheusInternalContentBadFormat(t *testing.T) {
Log: testutil.Logger{},
KubernetesServices: []string{ts.URL},
EnableRequestMetrics: true,
Statistics: selfstat.NewCollector(nil),
}
require.NoError(t, p.Init())

Expand Down Expand Up @@ -738,6 +757,7 @@ func TestPrometheusInternalNoWeb(t *testing.T) {
Log: testutil.Logger{},
KubernetesServices: []string{ts.URL},
EnableRequestMetrics: true,
Statistics: selfstat.NewCollector(nil),
}
require.NoError(t, p.Init())

Expand Down Expand Up @@ -793,6 +813,7 @@ go_memstats_heap_alloc_bytes 1.581062048e+09
URLs: []string{ts.URL},
URLTag: "",
MetricVersion: 2,
Statistics: selfstat.NewCollector(nil),
}
require.NoError(t, p.Init())

Expand Down Expand Up @@ -845,6 +866,7 @@ func TestOpenmetricsProtobuf(t *testing.T) {
URLs: []string{ts.URL},
URLTag: "",
MetricVersion: 2,
Statistics: selfstat.NewCollector(nil),
}
require.NoError(t, p.Init())

Expand Down Expand Up @@ -908,6 +930,7 @@ go_memstats_heap_alloc_bytes 1.581062048e+09
URLTag: "",
MetricVersion: 2,
ContentTypeOverride: "openmetrics-text",
Statistics: selfstat.NewCollector(nil),
}
require.NoError(t, p.Init())

Expand Down
Loading