diff --git a/cfg/envconfig/envconfig.go b/cfg/envconfig/envconfig.go index 4b26e8833d2..8885e6c83b0 100644 --- a/cfg/envconfig/envconfig.go +++ b/cfg/envconfig/envconfig.go @@ -37,6 +37,7 @@ const ( CWOtelConfigContent = "CW_OTEL_CONFIG_CONTENT" CWAgentMergedOtelConfig = "CWAGENT_MERGED_OTEL_CONFIG" CWAgentLogsBackpressureMode = "CWAGENT_LOGS_BACKPRESSURE_MODE" + SystemMetricsEnabled = "SYSTEM_METRICS_ENABLED" // confused deputy prevention related headers AmzSourceAccount = "AMZ_SOURCE_ACCOUNT" // populates the "x-amz-source-account" header diff --git a/plugins/outputs/cloudwatch/cloudwatch.go b/plugins/outputs/cloudwatch/cloudwatch.go index e4cf1c6cb1f..6d73247ec57 100644 --- a/plugins/outputs/cloudwatch/cloudwatch.go +++ b/plugins/outputs/cloudwatch/cloudwatch.go @@ -45,7 +45,7 @@ const ( maxConcurrentPublisher = 10 // the number of CloudWatch clients send request concurrently defaultForceFlushInterval = time.Minute highResolutionTagKey = "aws:StorageResolution" - defaultRetryCount = 5 // this is the retry count, the total attempts would be retry count + 1 at most. + defaultRetryCount = 5 // total number of PutMetricData attempts per batch. backoffRetryBase = 200 * time.Millisecond MaxDimensions = 30 ) @@ -93,7 +93,7 @@ func (c *CloudWatch) Capabilities() consumer.Capabilities { func (c *CloudWatch) Start(_ context.Context, host component.Host) error { c.publisher, _ = publisher.NewPublisher( publisher.NewNonBlockingFifoQueue(metricChanBufferSize), - maxConcurrentPublisher, + int64(c.config.MaxConcurrentPublishers), 2*time.Second, c.WriteToCloudWatch) credentialConfig := &configaws.CredentialConfig{ @@ -363,8 +363,8 @@ func (c *CloudWatch) pushMetricDatumBatch() { // backoffSleep sleeps some amount of time based on number of retries done. func (c *CloudWatch) backoffSleep() { d := 1 * time.Minute - if c.retries <= defaultRetryCount { - d = backoffRetryBase * time.Duration(1< 0 { + curStats[iface.Name] = cur + } + + // First scrape seeds baseline — no deltas to emit yet. + prev, hasPrev := s.prevStats[iface.Name] + if !hasPrev { + continue + } + + for enaStat := range enaMetricNames { + curVal, okCur := cur[enaStat] + prevVal, okPrev := prev[enaStat] + if !okCur || !okPrev { + continue + } + if curVal < prevVal { + continue // counter reset — drop this interface's contribution + } + aggDeltas[enaStat] += curVal - prevVal + } + } + + s.prevStats = curStats + + if len(aggDeltas) == 0 { + return nil + } + + rm := metrics.ResourceMetrics().AppendEmpty() + sm := rm.ScopeMetrics().AppendEmpty() + for enaStat, metricName := range enaMetricNames { + if delta, ok := aggDeltas[enaStat]; ok { + addGaugeDP(sm.Metrics().AppendEmpty(), metricName, "None", float64(delta), now) + } + } + return nil +} + +// skipInterface returns true for loopback and veth* interfaces. +func skipInterface(iface net.Interface) bool { + if iface.Flags&net.FlagLoopback != 0 { + return true + } + return strings.HasPrefix(iface.Name, "veth") +} diff --git a/receiver/systemmetricsreceiver/scraper_ethtool_linux_test.go b/receiver/systemmetricsreceiver/scraper_ethtool_linux_test.go new file mode 100644 index 00000000000..90ad1d3842c --- /dev/null +++ b/receiver/systemmetricsreceiver/scraper_ethtool_linux_test.go @@ -0,0 +1,219 @@ +//go:build linux + +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package systemmetricsreceiver + +import ( + "context" + "errors" + "net" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.uber.org/zap" +) + +// fakeIfaces returns a listIfaces func that returns the given interfaces. +func fakeIfaces(ifaces ...net.Interface) func() ([]net.Interface, error) { + return func() ([]net.Interface, error) { return ifaces, nil } +} + +func fakeIfacesErr(err error) func() ([]net.Interface, error) { + return func() ([]net.Interface, error) { return nil, err } +} + +func iface(name string, flags net.Flags) net.Interface { + return net.Interface{Name: name, Flags: flags} +} + +func TestEthtoolScraperName(t *testing.T) { + s := newEthtoolScraper(zap.NewNop(), &MockPS{}) + assert.Equal(t, "ethtool", s.Name()) +} + +func TestEthtoolScraperFirstScrapeSeeds(t *testing.T) { + ps := &MockPS{EthtoolStatsData: map[string]uint64{ + "bw_in_allowance_exceeded": 42, + "bw_out_allowance_exceeded": 7, + "pps_allowance_exceeded": 3, + }} + s := newEthtoolScraper(zap.NewNop(), ps) + s.listIfaces = fakeIfaces(iface("eth0", net.FlagUp)) + + metrics := pmetric.NewMetrics() + require.NoError(t, s.Scrape(context.Background(), metrics)) + + // First scrape seeds baseline — no metrics emitted. + assert.Equal(t, 0, metrics.ResourceMetrics().Len()) + // But prevStats should be populated. + assert.NotNil(t, s.prevStats) + assert.Contains(t, s.prevStats, "eth0") +} + +func TestEthtoolScraperDeltaValues(t *testing.T) { + ps := &MockPS{} + s := newEthtoolScraper(zap.NewNop(), ps) + s.listIfaces = fakeIfaces(iface("eth0", net.FlagUp)) + + // Seed baseline. + ps.EthtoolStatsData = map[string]uint64{ + "bw_in_allowance_exceeded": 10, + "bw_out_allowance_exceeded": 20, + "pps_allowance_exceeded": 30, + } + require.NoError(t, s.Scrape(context.Background(), pmetric.NewMetrics())) + + // Second scrape — should emit deltas. + ps.EthtoolStatsData = map[string]uint64{ + "bw_in_allowance_exceeded": 15, + "bw_out_allowance_exceeded": 25, + "pps_allowance_exceeded": 33, + } + metrics := pmetric.NewMetrics() + require.NoError(t, s.Scrape(context.Background(), metrics)) + + require.Equal(t, 1, metrics.ResourceMetrics().Len()) + sm := metrics.ResourceMetrics().At(0).ScopeMetrics().At(0) + require.Equal(t, 3, sm.Metrics().Len()) + + emitted := make(map[string]float64) + for i := 0; i < sm.Metrics().Len(); i++ { + m := sm.Metrics().At(i) + assert.Equal(t, "None", m.Unit()) + emitted[m.Name()] = m.Gauge().DataPoints().At(0).DoubleValue() + } + + assert.Equal(t, 5.0, emitted["aggregate_bw_in_allowance_exceeded"]) + assert.Equal(t, 5.0, emitted["aggregate_bw_out_allowance_exceeded"]) + assert.Equal(t, 3.0, emitted["aggregate_pps_allowance_exceeded"]) +} + +func TestEthtoolScraperPerInterfaceDeltas(t *testing.T) { + ps := &MockPS{} + s := newEthtoolScraper(zap.NewNop(), ps) + s.listIfaces = fakeIfaces( + iface("eth0", net.FlagUp), + iface("eth1", net.FlagUp), + ) + + // Seed baseline for both interfaces. + ps.EthtoolStatsData = map[string]uint64{ + "bw_in_allowance_exceeded": 100, + } + require.NoError(t, s.Scrape(context.Background(), pmetric.NewMetrics())) + + // Second scrape — deltas summed across interfaces. + ps.EthtoolStatsData = map[string]uint64{ + "bw_in_allowance_exceeded": 110, + } + metrics := pmetric.NewMetrics() + require.NoError(t, s.Scrape(context.Background(), metrics)) + + // 1 ResourceMetrics with summed delta: (110-100) + (110-100) = 20 + require.Equal(t, 1, metrics.ResourceMetrics().Len()) + sm := metrics.ResourceMetrics().At(0).ScopeMetrics().At(0) + assert.Equal(t, 1, sm.Metrics().Len()) + assert.Equal(t, "aggregate_bw_in_allowance_exceeded", sm.Metrics().At(0).Name()) + assert.Equal(t, 20.0, sm.Metrics().At(0).Gauge().DataPoints().At(0).DoubleValue()) +} + +func TestEthtoolScraperCounterResetDropsDelta(t *testing.T) { + ps := &MockPS{} + s := newEthtoolScraper(zap.NewNop(), ps) + s.listIfaces = fakeIfaces(iface("eth0", net.FlagUp)) + + // Seed baseline. + ps.EthtoolStatsData = map[string]uint64{ + "bw_in_allowance_exceeded": 100, + } + require.NoError(t, s.Scrape(context.Background(), pmetric.NewMetrics())) + + // Counter reset — current < previous. + ps.EthtoolStatsData = map[string]uint64{ + "bw_in_allowance_exceeded": 5, + } + metrics := pmetric.NewMetrics() + require.NoError(t, s.Scrape(context.Background(), metrics)) + + // Negative delta dropped — no metrics emitted. + assert.Equal(t, 0, metrics.ResourceMetrics().Len()) +} + +func TestEthtoolScraperSkipsLoopbackAndVeth(t *testing.T) { + ps := &MockPS{} + s := newEthtoolScraper(zap.NewNop(), ps) + s.listIfaces = fakeIfaces( + iface("lo", net.FlagLoopback|net.FlagUp), + iface("veth1234", net.FlagUp), + iface("eth0", net.FlagUp), + ) + + // Seed. + ps.EthtoolStatsData = map[string]uint64{"bw_in_allowance_exceeded": 10} + require.NoError(t, s.Scrape(context.Background(), pmetric.NewMetrics())) + + // Second scrape. + ps.EthtoolStatsData = map[string]uint64{"bw_in_allowance_exceeded": 20} + metrics := pmetric.NewMetrics() + require.NoError(t, s.Scrape(context.Background(), metrics)) + + // Only eth0 should produce metrics. + require.Equal(t, 1, metrics.ResourceMetrics().Len()) +} + +func TestEthtoolScraperFiltersNonAllowanceStats(t *testing.T) { + ps := &MockPS{} + s := newEthtoolScraper(zap.NewNop(), ps) + s.listIfaces = fakeIfaces(iface("eth0", net.FlagUp)) + + // Seed with non-allowance stats only. + ps.EthtoolStatsData = map[string]uint64{"tx_bytes": 999999, "rx_packets": 200} + require.NoError(t, s.Scrape(context.Background(), pmetric.NewMetrics())) + + // Second scrape — still no allowance stats. + metrics := pmetric.NewMetrics() + require.NoError(t, s.Scrape(context.Background(), metrics)) + assert.Equal(t, 0, metrics.ResourceMetrics().Len()) +} + +func TestEthtoolScraperErrorSkips(t *testing.T) { + ps := &MockPS{EthtoolStatsErr: errors.New("no ENA driver")} + s := newEthtoolScraper(zap.NewNop(), ps) + s.listIfaces = fakeIfaces(iface("eth0", net.FlagUp)) + + metrics := pmetric.NewMetrics() + require.NoError(t, s.Scrape(context.Background(), metrics)) + assert.Equal(t, 0, metrics.ResourceMetrics().Len()) +} + +func TestEthtoolScraperListIfacesError(t *testing.T) { + s := newEthtoolScraper(zap.NewNop(), &MockPS{}) + s.listIfaces = fakeIfacesErr(errors.New("permission denied")) + + metrics := pmetric.NewMetrics() + require.NoError(t, s.Scrape(context.Background(), metrics)) + assert.Equal(t, 0, metrics.ResourceMetrics().Len()) +} + +func TestEthtoolScraperNoInterfaces(t *testing.T) { + s := newEthtoolScraper(zap.NewNop(), &MockPS{}) + s.listIfaces = fakeIfaces() + + metrics := pmetric.NewMetrics() + require.NoError(t, s.Scrape(context.Background(), metrics)) + assert.Equal(t, 0, metrics.ResourceMetrics().Len()) +} + +func TestSkipInterface(t *testing.T) { + assert.True(t, skipInterface(iface("lo", net.FlagLoopback|net.FlagUp))) + assert.True(t, skipInterface(iface("veth1234", net.FlagUp))) + assert.True(t, skipInterface(iface("vethABC", net.FlagUp))) + assert.False(t, skipInterface(iface("eth0", net.FlagUp))) + assert.False(t, skipInterface(iface("eth1", net.FlagUp))) + assert.False(t, skipInterface(iface("ens5", net.FlagUp))) + assert.False(t, skipInterface(iface("docker0", net.FlagUp))) +} diff --git a/receiver/systemmetricsreceiver/scraper_jvm_linux.go b/receiver/systemmetricsreceiver/scraper_jvm_linux.go new file mode 100644 index 00000000000..9119cec69cc --- /dev/null +++ b/receiver/systemmetricsreceiver/scraper_jvm_linux.go @@ -0,0 +1,283 @@ +//go:build linux + +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package systemmetricsreceiver + +import ( + "bufio" + "bytes" + "context" + "fmt" + "math" + "net" + "os" + "regexp" + "strconv" + "strings" + "time" + + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.uber.org/zap" +) + +const ( + jvmScrapeTimeout = 5 * time.Second + jvmMaxResponse = 131072 // 128 KB + jvmScrapeCommand = "GET /metrics" + + // Socket discovery + jvmSocketPrefix = "@aws-jvm-metrics-" + sockDgram = "0002" + procNetUnix = "/proc/net/unix" + maxJVMSockets = 100 + + // Metric names from the socket + jvmHeapMax = "jvm_heap_max_bytes" + jvmHeapCommitted = "jvm_heap_committed_bytes" + jvmHeapAfterGC = "jvm_heap_after_gc_bytes" + + // Published metric names + metricHeapMax = "heap_max_bytes" + metricHeapCommitted = "heap_committed_bytes" + metricHeapAfterGC = "heap_after_gc_bytes" + metricHeapFree = "heap_free_after_gc_bytes" + metricHeapUtilized = "aggregate_heap_after_gc_utilized" + metricAggHeapMax = "aggregate_heap_max_bytes" + metricAggHeapFree = "aggregate_heap_free_after_gc_bytes" + metricAggJVMCount = "aggregate_jvm_count" +) + +var pidRegex = regexp.MustCompile(`^\d+$`) + +// discoveredSocket represents a Java agent socket found via /proc/net/unix. +type discoveredSocket struct { + pid string + addr string // abstract socket address with \x00 prefix +} + +type jvmScraper struct { + logger *zap.Logger + seq uint64 + procNetUnixPath string +} + +func newJVMScraper(logger *zap.Logger) *jvmScraper { + return &jvmScraper{logger: logger, procNetUnixPath: procNetUnix} +} + +func (s *jvmScraper) Name() string { + return "jvm" +} + +// heapData holds extracted heap values for a single JVM. +type heapData struct { + pid string + maxBytes float64 + committedBytes float64 + usedBytes float64 +} + +func (s *jvmScraper) Scrape(_ context.Context, metrics pmetric.Metrics) error { + sockets := s.discoverSockets() + if len(sockets) == 0 { + return nil + } + + var allHeap []heapData + now := pcommon.NewTimestampFromTime(time.Now()) + + for _, sock := range sockets { + data, err := s.scrapeSocket(sock.addr) + if err != nil { + s.logger.Debug("Failed to scrape JVM socket", zap.String("pid", sock.pid), zap.Error(err)) + continue + } + hd, err := s.parseHeap(data, sock.pid) + if err != nil { + s.logger.Warn("Failed to parse JVM metrics", zap.String("pid", sock.pid), zap.Error(err)) + continue + } + if hd == nil { + continue + } + allHeap = append(allHeap, *hd) + s.emitPerJVM(metrics, *hd, now) + } + + s.emitAggregate(metrics, allHeap, now) + return nil +} + +// parseHeap extracts heap max, committed, and after-GC bytes from metric text. +// All three are optional — we emit whatever is available. +// Returns nil only if none of the heap metrics are present. +func (s *jvmScraper) parseHeap(data []byte, pid string) (*heapData, error) { + metrics := parseMetricsText(data) + hd := &heapData{pid: pid, maxBytes: -1, committedBytes: -1, usedBytes: -1} + + if v, ok := metrics[jvmHeapMax]; ok { + hd.maxBytes = v + } + if v, ok := metrics[jvmHeapCommitted]; ok { + hd.committedBytes = v + } + if v, ok := metrics[jvmHeapAfterGC]; ok { + hd.usedBytes = v + } + + if hd.maxBytes < 0 && hd.committedBytes < 0 && hd.usedBytes < 0 { + return nil, nil + } + return hd, nil +} + +// parseMetricsText parses flat "name value" text into metric name → value. +// Skips # comments, empty lines, and lines that fail to parse. +func parseMetricsText(data []byte) map[string]float64 { + metrics := make(map[string]float64) + scanner := bufio.NewScanner(bytes.NewReader(data)) + for scanner.Scan() { + line := strings.TrimSpace(scanner.Text()) + if len(line) == 0 || line[0] == '#' { + continue + } + fields := strings.Fields(line) + if len(fields) < 2 { + continue + } + v, err := strconv.ParseFloat(fields[1], 64) + if err != nil || math.IsNaN(v) || math.IsInf(v, 0) { + continue + } + metrics[fields[0]] = v + } + return metrics +} + +// emitPerJVM adds available heap metrics for one JVM. Each metric is independent. +func (s *jvmScraper) emitPerJVM(metrics pmetric.Metrics, hd heapData, now pcommon.Timestamp) { + rm := metrics.ResourceMetrics().AppendEmpty() + sm := rm.ScopeMetrics().AppendEmpty() + + if hd.maxBytes >= 0 { + addGaugeDP(sm.Metrics().AppendEmpty(), metricHeapMax, "Bytes", hd.maxBytes, now) + } + if hd.committedBytes >= 0 { + addGaugeDP(sm.Metrics().AppendEmpty(), metricHeapCommitted, "Bytes", hd.committedBytes, now) + } + if hd.usedBytes >= 0 { + addGaugeDP(sm.Metrics().AppendEmpty(), metricHeapAfterGC, "Bytes", hd.usedBytes, now) + } + if hd.maxBytes >= 0 && hd.usedBytes >= 0 { + addGaugeDP(sm.Metrics().AppendEmpty(), metricHeapFree, "Bytes", hd.maxBytes-hd.usedBytes, now) + } +} + +// emitAggregate adds per-box aggregate heap metrics across all JVMs. +func (s *jvmScraper) emitAggregate(metrics pmetric.Metrics, allHeap []heapData, now pcommon.Timestamp) { + if len(allHeap) == 0 { + return + } + + var totalMax, totalUsed, totalFree float64 + var hasMax, hasUtilized bool + for _, hd := range allHeap { + if hd.maxBytes >= 0 { + totalMax += hd.maxBytes + hasMax = true + } + if hd.maxBytes >= 0 && hd.usedBytes >= 0 { + totalUsed += hd.usedBytes + totalFree += hd.maxBytes - hd.usedBytes + hasUtilized = true + } + } + + rm := metrics.ResourceMetrics().AppendEmpty() + sm := rm.ScopeMetrics().AppendEmpty() + + addGaugeDP(sm.Metrics().AppendEmpty(), metricAggJVMCount, "Count", float64(len(allHeap)), now) + if hasMax { + addGaugeDP(sm.Metrics().AppendEmpty(), metricAggHeapMax, "Bytes", totalMax, now) + } + if hasUtilized { + addGaugeDP(sm.Metrics().AppendEmpty(), metricAggHeapFree, "Bytes", totalFree, now) + } + if hasUtilized && totalMax > 0 { + addGaugeDP(sm.Metrics().AppendEmpty(), metricHeapUtilized, "Percent", totalUsed/totalMax*100, now) + } +} + +func (s *jvmScraper) discoverSockets() []discoveredSocket { + f, err := os.Open(s.procNetUnixPath) + if err != nil { + s.logger.Debug("Failed to read /proc/net/unix", zap.Error(err)) + return nil + } + defer f.Close() + + var sockets []discoveredSocket + scanner := bufio.NewScanner(f) + scanner.Scan() // skip header + for scanner.Scan() { + fields := strings.Fields(scanner.Text()) + if len(fields) < 8 { + continue + } + if fields[4] != sockDgram { + continue + } + path := fields[len(fields)-1] + if !strings.HasPrefix(path, jvmSocketPrefix) { + continue + } + pid := path[len(jvmSocketPrefix):] + if !pidRegex.MatchString(pid) { + continue + } + sockets = append(sockets, discoveredSocket{ + pid: pid, + addr: "\x00" + path[1:], + }) + if len(sockets) >= maxJVMSockets { + s.logger.Warn("Socket discovery cap reached", zap.Int("max", maxJVMSockets)) + break + } + } + if err := scanner.Err(); err != nil { + s.logger.Warn("Error reading /proc/net/unix", zap.Error(err)) + } + return sockets +} + +func (s *jvmScraper) scrapeSocket(serverAddr string) ([]byte, error) { + s.seq++ + clientAddr := fmt.Sprintf("\x00cwagent-scraper-%d", s.seq) + + local := &net.UnixAddr{Name: clientAddr, Net: "unixgram"} + remote := &net.UnixAddr{Name: serverAddr, Net: "unixgram"} + + conn, err := net.DialUnix("unixgram", local, remote) + if err != nil { + return nil, fmt.Errorf("dial: %w", err) + } + defer conn.Close() + + if err := conn.SetDeadline(time.Now().Add(jvmScrapeTimeout)); err != nil { + return nil, fmt.Errorf("set deadline: %w", err) + } + + if _, err := conn.Write([]byte(jvmScrapeCommand)); err != nil { + return nil, fmt.Errorf("send: %w", err) + } + + buf := make([]byte, jvmMaxResponse) + n, err := conn.Read(buf) + if err != nil { + return nil, fmt.Errorf("recv: %w", err) + } + return buf[:n], nil +} diff --git a/receiver/systemmetricsreceiver/scraper_jvm_linux_test.go b/receiver/systemmetricsreceiver/scraper_jvm_linux_test.go new file mode 100644 index 00000000000..2a8cff37b59 --- /dev/null +++ b/receiver/systemmetricsreceiver/scraper_jvm_linux_test.go @@ -0,0 +1,280 @@ +//go:build linux + +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package systemmetricsreceiver + +import ( + "context" + "os" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.uber.org/zap" +) + +func TestJVMScraperName(t *testing.T) { + s := newJVMScraper(zap.NewNop()) + assert.Equal(t, "jvm", s.Name()) +} + +func TestJVMScraperNoSocketsIsNoop(t *testing.T) { + s := newJVMScraper(zap.NewNop()) + metrics := pmetric.NewMetrics() + err := s.Scrape(context.Background(), metrics) + require.NoError(t, err) + assert.Equal(t, 0, metrics.ResourceMetrics().Len()) +} + +func TestParseHeap(t *testing.T) { + tests := map[string]struct { + input []byte + pid string + wantNil bool + wantMax float64 + wantCommitted float64 + wantUsed float64 + }{ + "full prometheus output with HELP/TYPE": { + input: []byte(`# HELP jvm_heap_max_bytes Maximum heap size (Xmx) +# TYPE jvm_heap_max_bytes gauge +jvm_heap_max_bytes 536870912 +# HELP jvm_heap_committed_bytes Committed heap size +# TYPE jvm_heap_committed_bytes gauge +jvm_heap_committed_bytes 402653184 +# HELP jvm_heap_after_gc_bytes Heap memory used after GC +# TYPE jvm_heap_after_gc_bytes gauge +jvm_heap_after_gc_bytes 157286400 +# HELP jvm_gc_count_total Garbage collection count +# TYPE jvm_gc_count_total counter +jvm_gc_count_total 42 +# HELP jvm_allocated_bytes Total allocated bytes +# TYPE jvm_allocated_bytes counter +jvm_allocated_bytes 8589934592 +`), + pid: "1234", wantMax: 536870912, wantCommitted: 402653184, wantUsed: 157286400, + }, + "partial metrics (committed only)": { + input: []byte("# TYPE jvm_heap_committed_bytes gauge\njvm_heap_committed_bytes 400000000\n"), + pid: "1", wantMax: -1, wantCommitted: 400000000, wantUsed: -1, + }, + "bare lines without TYPE/HELP": { + input: []byte("jvm_heap_max_bytes 536870912\njvm_heap_committed_bytes 402653184\njvm_heap_after_gc_bytes 157286400\n"), + pid: "99", wantMax: 536870912, wantCommitted: 402653184, wantUsed: 157286400, + }, + "empty input": { + input: []byte(""), pid: "1", wantNil: true, + }, + "malformed input (no valid metrics)": { + input: []byte("not valid {{{"), pid: "1", wantNil: true, + }, + } + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + s := newJVMScraper(zap.NewNop()) + hd, err := s.parseHeap(tc.input, tc.pid) + require.NoError(t, err) + if tc.wantNil { + assert.Nil(t, hd) + } else { + require.NotNil(t, hd) + assert.Equal(t, tc.pid, hd.pid) + assert.Equal(t, tc.wantMax, hd.maxBytes) + assert.Equal(t, tc.wantCommitted, hd.committedBytes) + assert.Equal(t, tc.wantUsed, hd.usedBytes) + } + }) + } +} + +func TestParseMetricsTextSkipsNaNInf(t *testing.T) { + metricsText := []byte(`jvm_heap_max_bytes 536870912 +jvm_heap_committed_bytes NaN +jvm_heap_after_gc_bytes +Inf +jvm_gc_count_total -Inf +jvm_allocated_bytes 1024 +`) + m := parseMetricsText(metricsText) + assert.Equal(t, 536870912.0, m["jvm_heap_max_bytes"]) + assert.Equal(t, 1024.0, m["jvm_allocated_bytes"]) + assert.NotContains(t, m, "jvm_heap_committed_bytes") + assert.NotContains(t, m, "jvm_heap_after_gc_bytes") + assert.NotContains(t, m, "jvm_gc_count_total") +} + +func TestEmitPerJVM(t *testing.T) { + s := newJVMScraper(zap.NewNop()) + metrics := pmetric.NewMetrics() + now := pcommon.Timestamp(0) + + hd := heapData{pid: "42", maxBytes: 536870912, committedBytes: 402653184, usedBytes: 157286400} + s.emitPerJVM(metrics, hd, now) + + require.Equal(t, 1, metrics.ResourceMetrics().Len()) + rm := metrics.ResourceMetrics().At(0) + + sm := rm.ScopeMetrics().At(0) + assert.Equal(t, 4, sm.Metrics().Len()) + + // heap_max_bytes = 536870912 (raw bytes, from jvm_heap_max_bytes) + m0 := sm.Metrics().At(0) + assert.Equal(t, "heap_max_bytes", m0.Name()) + assert.Equal(t, "Bytes", m0.Unit()) + assert.InDelta(t, 536870912.0, m0.Gauge().DataPoints().At(0).DoubleValue(), 0.01) + + // heap_committed_bytes = 402653184 (raw bytes, from jvm_heap_committed_bytes) + m1 := sm.Metrics().At(1) + assert.Equal(t, "heap_committed_bytes", m1.Name()) + assert.Equal(t, "Bytes", m1.Unit()) + assert.InDelta(t, 402653184.0, m1.Gauge().DataPoints().At(0).DoubleValue(), 0.01) + + // heap_after_gc_bytes = 157286400 (raw bytes) + m2 := sm.Metrics().At(2) + assert.Equal(t, "heap_after_gc_bytes", m2.Name()) + assert.Equal(t, "Bytes", m2.Unit()) + assert.InDelta(t, 157286400.0, m2.Gauge().DataPoints().At(0).DoubleValue(), 0.01) + + // heap_free_after_gc_bytes = 536870912 - 157286400 = 379584512 (max - afterGC) + m3 := sm.Metrics().At(3) + assert.Equal(t, "heap_free_after_gc_bytes", m3.Name()) + assert.Equal(t, "Bytes", m3.Unit()) + assert.InDelta(t, 379584512.0, m3.Gauge().DataPoints().At(0).DoubleValue(), 0.01) +} + +func TestEmitAggregate(t *testing.T) { + s := newJVMScraper(zap.NewNop()) + metrics := pmetric.NewMetrics() + now := pcommon.Timestamp(0) + + allHeap := []heapData{ + {pid: "1", maxBytes: 1048576000, usedBytes: 524288000}, // 50% utilized + {pid: "2", maxBytes: 1048576000, usedBytes: 786432000}, // 75% utilized + } + s.emitAggregate(metrics, allHeap, now) + + require.Equal(t, 1, metrics.ResourceMetrics().Len()) + sm := metrics.ResourceMetrics().At(0).ScopeMetrics().At(0) + require.Equal(t, 4, sm.Metrics().Len()) + + // aggregate_jvm_count = 2 + m0 := sm.Metrics().At(0) + assert.Equal(t, "aggregate_jvm_count", m0.Name()) + assert.Equal(t, "Count", m0.Unit()) + assert.InDelta(t, 2.0, m0.Gauge().DataPoints().At(0).DoubleValue(), 0.01) + + // aggregate_heap_max_bytes = 1048576000 + 1048576000 = 2097152000 + m1 := sm.Metrics().At(1) + assert.Equal(t, "aggregate_heap_max_bytes", m1.Name()) + assert.Equal(t, "Bytes", m1.Unit()) + assert.InDelta(t, 2097152000.0, m1.Gauge().DataPoints().At(0).DoubleValue(), 0.01) + + // aggregate_heap_free_after_gc_bytes = (1048576000-524288000) + (1048576000-786432000) = 786432000 + m2 := sm.Metrics().At(2) + assert.Equal(t, "aggregate_heap_free_after_gc_bytes", m2.Name()) + assert.Equal(t, "Bytes", m2.Unit()) + assert.InDelta(t, 786432000.0, m2.Gauge().DataPoints().At(0).DoubleValue(), 0.01) + + // aggregate_heap_after_gc_utilized = (524288000 + 786432000) / (1048576000 + 1048576000) * 100 = 62.5% + m3 := sm.Metrics().At(3) + assert.Equal(t, "aggregate_heap_after_gc_utilized", m3.Name()) + assert.Equal(t, "Percent", m3.Unit()) + assert.InDelta(t, 62.5, m3.Gauge().DataPoints().At(0).DoubleValue(), 0.01) +} + +func TestEmitAggregateMaxOnly(t *testing.T) { + s := newJVMScraper(zap.NewNop()) + metrics := pmetric.NewMetrics() + now := pcommon.Timestamp(0) + + // JVM with max but no after-GC data — should still contribute to count and aggregate max + allHeap := []heapData{ + {pid: "1", maxBytes: 1048576000, usedBytes: -1}, + } + s.emitAggregate(metrics, allHeap, now) + + require.Equal(t, 1, metrics.ResourceMetrics().Len()) + sm := metrics.ResourceMetrics().At(0).ScopeMetrics().At(0) + require.Equal(t, 2, sm.Metrics().Len()) // count + max only, no free/utilized + + assert.Equal(t, "aggregate_jvm_count", sm.Metrics().At(0).Name()) + assert.InDelta(t, 1.0, sm.Metrics().At(0).Gauge().DataPoints().At(0).DoubleValue(), 0.01) + + assert.Equal(t, "aggregate_heap_max_bytes", sm.Metrics().At(1).Name()) + assert.InDelta(t, 1048576000.0, sm.Metrics().At(1).Gauge().DataPoints().At(0).DoubleValue(), 0.01) +} + +func TestEmitAggregateEmpty(t *testing.T) { + s := newJVMScraper(zap.NewNop()) + metrics := pmetric.NewMetrics() + now := pcommon.Timestamp(0) + + s.emitAggregate(metrics, nil, now) + assert.Equal(t, 0, metrics.ResourceMetrics().Len()) +} + +func TestDiscoverSockets(t *testing.T) { + header := "Num RefCount Protocol Flags Type St Inode Path\n" + tests := map[string]struct { + content string + usePath string // if set, use this path instead of a temp file + wantPIDs []string + }{ + "two matching sockets": { + content: header + "0000000000000000: 00000002 00000000 00010000 0002 01 12345 @aws-jvm-metrics-1234\n" + "0000000000000000: 00000002 00000000 00010000 0002 01 12346 @aws-jvm-metrics-5678\n", + wantPIDs: []string{"1234", "5678"}, + }, + "skips non-DGRAM (SOCK_STREAM 0001)": { + content: header + "0000000000000000: 00000002 00000000 00010000 0001 01 12345 @aws-jvm-metrics-1234\n", + wantPIDs: nil, + }, + "skips non-JVM prefix": { + content: header + "0000000000000000: 00000002 00000000 00010000 0002 01 12345 @some-other-socket\n", + wantPIDs: nil, + }, + "skips invalid PID (non-digits)": { + content: header + "0000000000000000: 00000002 00000000 00010000 0002 01 12345 @aws-jvm-metrics-abc\n" + "0000000000000000: 00000002 00000000 00010000 0002 01 12346 @aws-jvm-metrics-12-34\n", + wantPIDs: nil, + }, + "header only (no sockets)": { + content: header, + wantPIDs: nil, + }, + "missing file returns nil": { + usePath: "/nonexistent/proc/net/unix", + wantPIDs: nil, + }, + } + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + s := newJVMScraper(zap.NewNop()) + if tc.usePath != "" { + s.procNetUnixPath = tc.usePath + } else { + s.procNetUnixPath = writeTempFile(t, tc.content) + } + sockets := s.discoverSockets() + if tc.wantPIDs == nil { + assert.Empty(t, sockets) + } else { + require.Len(t, sockets, len(tc.wantPIDs)) + for i, pid := range tc.wantPIDs { + assert.Equal(t, pid, sockets[i].pid) + } + } + }) + } +} + +func writeTempFile(t *testing.T, content string) string { + t.Helper() + f, err := os.CreateTemp(t.TempDir(), "proc_net_unix") + require.NoError(t, err) + _, err = f.WriteString(content) + require.NoError(t, err) + f.Close() + return f.Name() +} diff --git a/receiver/systemmetricsreceiver/scraper_linux.go b/receiver/systemmetricsreceiver/scraper_linux.go new file mode 100644 index 00000000000..5e520c06241 --- /dev/null +++ b/receiver/systemmetricsreceiver/scraper_linux.go @@ -0,0 +1,78 @@ +//go:build linux + +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package systemmetricsreceiver + +import ( + "context" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.uber.org/zap" +) + +// MetricScraper defines the interface for individual metric scrapers. +// Each scraper is responsible for collecting a specific type of metrics. +type MetricScraper interface { + // Name returns the scraper identifier for logging/debugging. + Name() string + // Scrape collects metrics and appends them to the provided pmetric.Metrics. + Scrape(ctx context.Context, metrics pmetric.Metrics) error +} + +// hostScraper orchestrates multiple MetricScrapers. +type hostScraper struct { + logger *zap.Logger + scrapers []MetricScraper + ps *SystemPS +} + +func newScraper(logger *zap.Logger) *hostScraper { + stats := &SystemPS{} + return &hostScraper{ + logger: logger, + scrapers: []MetricScraper{ + newCPUScraper(logger, stats), + newMemScraper(logger, stats), + newDiskScraper(logger, stats), + newEthtoolScraper(logger, stats), + newJVMScraper(logger), + }, + ps: stats, + } +} + +func (s *hostScraper) start(_ context.Context, _ component.Host) error { + s.logger.Info("Starting system metrics scraper", zap.Int("scraper_count", len(s.scrapers))) + return nil +} + +func (s *hostScraper) shutdown(_ context.Context) error { + s.logger.Info("Shutting down system metrics scraper") + s.ps.Close() + return nil +} + +func (s *hostScraper) scrape(ctx context.Context) (pmetric.Metrics, error) { + metrics := pmetric.NewMetrics() + + for _, scraper := range s.scrapers { + if err := scraper.Scrape(ctx, metrics); err != nil { + s.logger.Warn("Scraper failed", zap.String("scraper", scraper.Name()), zap.Error(err)) + } + } + + return metrics, nil +} + +func addGaugeDP(m pmetric.Metric, name string, unit string, value float64, now pcommon.Timestamp) { + m.SetName(name) + m.SetUnit(unit) + g := m.SetEmptyGauge() + dp := g.DataPoints().AppendEmpty() + dp.SetTimestamp(now) + dp.SetDoubleValue(value) +} diff --git a/receiver/systemmetricsreceiver/scraper_linux_test.go b/receiver/systemmetricsreceiver/scraper_linux_test.go new file mode 100644 index 00000000000..d588d790286 --- /dev/null +++ b/receiver/systemmetricsreceiver/scraper_linux_test.go @@ -0,0 +1,36 @@ +//go:build linux + +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package systemmetricsreceiver + +import ( + "context" + "testing" + + "github.com/shirou/gopsutil/v3/mem" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap" +) + +func TestSystemScraperOrchestration(t *testing.T) { + s := newScraper(zap.NewNop()) + assert.NotNil(t, s.scrapers) + assert.Equal(t, 5, len(s.scrapers), "should have cpu, mem, disk, ethtool, jvm scrapers") +} + +func TestSystemScraperAlwaysEmitsSystemMetrics(t *testing.T) { + s := &hostScraper{ + logger: zap.NewNop(), + scrapers: []MetricScraper{newMemScraper(zap.NewNop(), &MockPS{ + VMStatData: &mem.VirtualMemoryStat{Total: 1000, Available: 500, Cached: 200, Active: 300}, + })}, + ps: &SystemPS{}, + } + + metrics, err := s.scrape(context.Background()) + require.NoError(t, err) + assert.Greater(t, metrics.ResourceMetrics().Len(), 0, "system metrics should always be emitted") +} diff --git a/receiver/systemmetricsreceiver/scraper_mem_linux.go b/receiver/systemmetricsreceiver/scraper_mem_linux.go new file mode 100644 index 00000000000..235bf540292 --- /dev/null +++ b/receiver/systemmetricsreceiver/scraper_mem_linux.go @@ -0,0 +1,51 @@ +//go:build linux + +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package systemmetricsreceiver + +import ( + "context" + "time" + + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.uber.org/zap" +) + +const ( + metricMemTotal = "mem_total" + metricMemAvailable = "mem_available" + metricMemCached = "mem_cached" + metricMemActive = "mem_active" +) + +type memScraper struct { + logger *zap.Logger + ps PS +} + +func newMemScraper(logger *zap.Logger, ps PS) *memScraper { + return &memScraper{logger: logger, ps: ps} +} + +func (s *memScraper) Name() string { return "mem" } + +func (s *memScraper) Scrape(ctx context.Context, metrics pmetric.Metrics) error { + vm, err := s.ps.VMStat(ctx) + if err != nil { + s.logger.Debug("Failed to read memory stats", zap.Error(err)) + return nil + } + + now := pcommon.NewTimestampFromTime(time.Now()) + rm := metrics.ResourceMetrics().AppendEmpty() + sm := rm.ScopeMetrics().AppendEmpty() + + addGaugeDP(sm.Metrics().AppendEmpty(), metricMemTotal, "Bytes", float64(vm.Total), now) + addGaugeDP(sm.Metrics().AppendEmpty(), metricMemAvailable, "Bytes", float64(vm.Available), now) + addGaugeDP(sm.Metrics().AppendEmpty(), metricMemCached, "Bytes", float64(vm.Cached), now) + addGaugeDP(sm.Metrics().AppendEmpty(), metricMemActive, "Bytes", float64(vm.Active), now) + return nil +} diff --git a/receiver/systemmetricsreceiver/scraper_mem_linux_test.go b/receiver/systemmetricsreceiver/scraper_mem_linux_test.go new file mode 100644 index 00000000000..0707137aa1f --- /dev/null +++ b/receiver/systemmetricsreceiver/scraper_mem_linux_test.go @@ -0,0 +1,65 @@ +//go:build linux + +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package systemmetricsreceiver + +import ( + "context" + "errors" + "testing" + + "github.com/shirou/gopsutil/v3/mem" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.uber.org/zap" +) + +func TestMemScraperName(t *testing.T) { + s := newMemScraper(zap.NewNop(), &MockPS{}) + assert.Equal(t, "mem", s.Name()) +} + +func TestMemScraperMetrics(t *testing.T) { + ps := &MockPS{VMStatData: &mem.VirtualMemoryStat{ + Total: 8 * 1048576 * 1024, // 8 GB + Available: 6 * 1048576 * 1024, // 6 GB + Cached: 2 * 1048576 * 1024, // 2 GB + Active: 3 * 1048576 * 1024, // 3 GB + }} + s := newMemScraper(zap.NewNop(), ps) + + metrics := pmetric.NewMetrics() + require.NoError(t, s.Scrape(context.Background(), metrics)) + + require.Equal(t, 1, metrics.ResourceMetrics().Len()) + sm := metrics.ResourceMetrics().At(0).ScopeMetrics().At(0) + require.Equal(t, 4, sm.Metrics().Len()) + + assert.Equal(t, "mem_total", sm.Metrics().At(0).Name()) + assert.Equal(t, "Bytes", sm.Metrics().At(0).Unit()) + assert.InDelta(t, 8*1048576*1024.0, sm.Metrics().At(0).Gauge().DataPoints().At(0).DoubleValue(), 0.01) + + assert.Equal(t, "mem_available", sm.Metrics().At(1).Name()) + assert.Equal(t, "Bytes", sm.Metrics().At(1).Unit()) + assert.InDelta(t, 6*1048576*1024.0, sm.Metrics().At(1).Gauge().DataPoints().At(0).DoubleValue(), 0.01) + + assert.Equal(t, "mem_cached", sm.Metrics().At(2).Name()) + assert.Equal(t, "Bytes", sm.Metrics().At(2).Unit()) + assert.InDelta(t, 2*1048576*1024.0, sm.Metrics().At(2).Gauge().DataPoints().At(0).DoubleValue(), 0.01) + + assert.Equal(t, "mem_active", sm.Metrics().At(3).Name()) + assert.Equal(t, "Bytes", sm.Metrics().At(3).Unit()) + assert.InDelta(t, 3*1048576*1024.0, sm.Metrics().At(3).Gauge().DataPoints().At(0).DoubleValue(), 0.01) +} + +func TestMemScraperErrorSkips(t *testing.T) { + ps := &MockPS{VMStatErr: errors.New("permission denied")} + s := newMemScraper(zap.NewNop(), ps) + + metrics := pmetric.NewMetrics() + require.NoError(t, s.Scrape(context.Background(), metrics)) + assert.Equal(t, 0, metrics.ResourceMetrics().Len()) +} diff --git a/receiver/systemmetricsreceiver/scraper_other.go b/receiver/systemmetricsreceiver/scraper_other.go new file mode 100644 index 00000000000..82bec7b3211 --- /dev/null +++ b/receiver/systemmetricsreceiver/scraper_other.go @@ -0,0 +1,35 @@ +//go:build !linux + +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package systemmetricsreceiver + +import ( + "context" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.uber.org/zap" +) + +type hostScraper struct { + logger *zap.Logger +} + +func newScraper(logger *zap.Logger) *hostScraper { + return &hostScraper{logger: logger} +} + +func (s *hostScraper) start(_ context.Context, _ component.Host) error { + s.logger.Info("System metrics scraper not supported on this platform") + return nil +} + +func (s *hostScraper) shutdown(_ context.Context) error { + return nil +} + +func (s *hostScraper) scrape(_ context.Context) (pmetric.Metrics, error) { + return pmetric.NewMetrics(), nil +} diff --git a/receiver/systemmetricsreceiver/system_stats_linux.go b/receiver/systemmetricsreceiver/system_stats_linux.go new file mode 100644 index 00000000000..3fd01412a21 --- /dev/null +++ b/receiver/systemmetricsreceiver/system_stats_linux.go @@ -0,0 +1,80 @@ +//go:build linux + +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package systemmetricsreceiver + +import ( + "context" + "strings" + "sync" + + "github.com/safchain/ethtool" + "github.com/shirou/gopsutil/v3/cpu" + "github.com/shirou/gopsutil/v3/disk" + "github.com/shirou/gopsutil/v3/mem" +) + +// PS abstracts OS-level data sources for testing. +type PS interface { + CPUTimes(ctx context.Context) ([]cpu.TimesStat, error) + VMStat(ctx context.Context) (*mem.VirtualMemoryStat, error) + DiskUsage(ctx context.Context) ([]*disk.UsageStat, error) + EthtoolStats(ctx context.Context, iface string) (map[string]uint64, error) +} + +type SystemPS struct { + ethtool *ethtool.Ethtool + ethtoolOnce sync.Once + ethtoolErr error +} + +func (s *SystemPS) CPUTimes(ctx context.Context) ([]cpu.TimesStat, error) { + return cpu.TimesWithContext(ctx, false) +} + +func (s *SystemPS) VMStat(ctx context.Context) (*mem.VirtualMemoryStat, error) { + return mem.VirtualMemoryWithContext(ctx) +} + +func (s *SystemPS) DiskUsage(ctx context.Context) ([]*disk.UsageStat, error) { + partitions, err := disk.PartitionsWithContext(ctx, true) + if err != nil { + return nil, err + } + seen := make(map[string]struct{}) + var stats []*disk.UsageStat + for _, p := range partitions { + if !strings.HasPrefix(p.Device, "/dev/") { + continue + } + if _, ok := seen[p.Mountpoint]; ok { + continue + } + seen[p.Mountpoint] = struct{}{} + usage, err := disk.UsageWithContext(ctx, p.Mountpoint) + if err != nil || usage.Total == 0 { + continue + } + stats = append(stats, usage) + } + return stats, nil +} + +func (s *SystemPS) EthtoolStats(_ context.Context, iface string) (map[string]uint64, error) { + s.ethtoolOnce.Do(func() { + s.ethtool, s.ethtoolErr = ethtool.NewEthtool() + }) + if s.ethtoolErr != nil { + return nil, s.ethtoolErr + } + return s.ethtool.Stats(iface) +} + +func (s *SystemPS) Close() { + if s.ethtool != nil { + s.ethtool.Close() + s.ethtool = nil + } +} diff --git a/service/defaultcomponents/components.go b/service/defaultcomponents/components.go index acee94c653c..ededf1b051f 100644 --- a/service/defaultcomponents/components.go +++ b/service/defaultcomponents/components.go @@ -68,6 +68,7 @@ import ( "github.com/aws/amazon-cloudwatch-agent/plugins/processors/kueueattributes" "github.com/aws/amazon-cloudwatch-agent/processor/rollupprocessor" "github.com/aws/amazon-cloudwatch-agent/receiver/awsnvmereceiver" + "github.com/aws/amazon-cloudwatch-agent/receiver/systemmetricsreceiver" ) func Factories() (otelcol.Factories, error) { @@ -88,6 +89,7 @@ func Factories() (otelcol.Factories, error) { otlpreceiver.NewFactory(), prometheusreceiver.NewFactory(), statsdreceiver.NewFactory(), + systemmetricsreceiver.NewFactory(), tcplogreceiver.NewFactory(), udplogreceiver.NewFactory(), zipkinreceiver.NewFactory(), diff --git a/service/defaultcomponents/components_test.go b/service/defaultcomponents/components_test.go index c77d7e43da4..0cd980d7184 100644 --- a/service/defaultcomponents/components_test.go +++ b/service/defaultcomponents/components_test.go @@ -30,6 +30,7 @@ func TestComponents(t *testing.T) { "otlp", "prometheus", "statsd", + "systemmetrics", "tcplog", "udplog", "zipkin", diff --git a/translator/tocwconfig/sampleConfig/advanced_config_darwin.yaml b/translator/tocwconfig/sampleConfig/advanced_config_darwin.yaml index 3fd273bc3dd..72e34587957 100644 --- a/translator/tocwconfig/sampleConfig/advanced_config_darwin.yaml +++ b/translator/tocwconfig/sampleConfig/advanced_config_darwin.yaml @@ -1,7 +1,10 @@ exporters: awscloudwatch: + backoff_retry_base: 200ms force_flush_interval: 1m0s + max_concurrent_publishers: 10 max_datums_per_call: 1000 + max_retry_count: 5 max_values_per_datum: 150 middleware: agenthealth/metrics namespace: CWAgent diff --git a/translator/tocwconfig/sampleConfig/advanced_config_linux.yaml b/translator/tocwconfig/sampleConfig/advanced_config_linux.yaml index b7dd24e2f5a..a6557b00c16 100644 --- a/translator/tocwconfig/sampleConfig/advanced_config_linux.yaml +++ b/translator/tocwconfig/sampleConfig/advanced_config_linux.yaml @@ -1,7 +1,10 @@ exporters: awscloudwatch: + backoff_retry_base: 200ms force_flush_interval: 1m0s + max_concurrent_publishers: 10 max_datums_per_call: 1000 + max_retry_count: 5 max_values_per_datum: 150 middleware: agenthealth/metrics namespace: CWAgent diff --git a/translator/tocwconfig/sampleConfig/advanced_config_windows.yaml b/translator/tocwconfig/sampleConfig/advanced_config_windows.yaml index bead2892e8a..f853ff115a6 100644 --- a/translator/tocwconfig/sampleConfig/advanced_config_windows.yaml +++ b/translator/tocwconfig/sampleConfig/advanced_config_windows.yaml @@ -1,7 +1,10 @@ exporters: awscloudwatch: + backoff_retry_base: 200ms force_flush_interval: 1m0s + max_concurrent_publishers: 10 max_datums_per_call: 1000 + max_retry_count: 5 max_values_per_datum: 150 middleware: agenthealth/metrics namespace: CWAgent diff --git a/translator/tocwconfig/sampleConfig/amp_config_linux.yaml b/translator/tocwconfig/sampleConfig/amp_config_linux.yaml index c7223c9523c..93fe5478330 100644 --- a/translator/tocwconfig/sampleConfig/amp_config_linux.yaml +++ b/translator/tocwconfig/sampleConfig/amp_config_linux.yaml @@ -1,10 +1,13 @@ exporters: awscloudwatch: + backoff_retry_base: 200ms drop_original_metrics: CPU_USAGE_IDLE: true cpu_time_active: true force_flush_interval: 1m0s + max_concurrent_publishers: 10 max_datums_per_call: 1000 + max_retry_count: 5 max_values_per_datum: 150 middleware: agenthealth/metrics namespace: CWAgent diff --git a/translator/tocwconfig/sampleConfig/append_dimensions_host_metrics.yaml b/translator/tocwconfig/sampleConfig/append_dimensions_host_metrics.yaml index 5a7682502c0..c89a0cd2428 100644 --- a/translator/tocwconfig/sampleConfig/append_dimensions_host_metrics.yaml +++ b/translator/tocwconfig/sampleConfig/append_dimensions_host_metrics.yaml @@ -1,7 +1,10 @@ exporters: awscloudwatch: + backoff_retry_base: 200ms force_flush_interval: 1m0s + max_concurrent_publishers: 10 max_datums_per_call: 1000 + max_retry_count: 5 max_values_per_datum: 150 middleware: agenthealth/metrics namespace: CWAgent diff --git a/translator/tocwconfig/sampleConfig/basic_config_linux.yaml b/translator/tocwconfig/sampleConfig/basic_config_linux.yaml index 2c288905dff..4f7161f5a3a 100644 --- a/translator/tocwconfig/sampleConfig/basic_config_linux.yaml +++ b/translator/tocwconfig/sampleConfig/basic_config_linux.yaml @@ -1,7 +1,10 @@ exporters: awscloudwatch: + backoff_retry_base: 200ms force_flush_interval: 1m0s + max_concurrent_publishers: 10 max_datums_per_call: 1000 + max_retry_count: 5 max_values_per_datum: 150 middleware: agenthealth/metrics namespace: CWAgent diff --git a/translator/tocwconfig/sampleConfig/basic_config_windows.yaml b/translator/tocwconfig/sampleConfig/basic_config_windows.yaml index 5f8b6a5f076..09e0adbe80b 100644 --- a/translator/tocwconfig/sampleConfig/basic_config_windows.yaml +++ b/translator/tocwconfig/sampleConfig/basic_config_windows.yaml @@ -1,7 +1,10 @@ exporters: awscloudwatch: + backoff_retry_base: 200ms force_flush_interval: 1m0s + max_concurrent_publishers: 10 max_datums_per_call: 1000 + max_retry_count: 5 max_values_per_datum: 150 middleware: agenthealth/metrics namespace: CWAgent diff --git a/translator/tocwconfig/sampleConfig/collectd_append_dimensions_linux.yaml b/translator/tocwconfig/sampleConfig/collectd_append_dimensions_linux.yaml index 3291238c43c..740359213e2 100644 --- a/translator/tocwconfig/sampleConfig/collectd_append_dimensions_linux.yaml +++ b/translator/tocwconfig/sampleConfig/collectd_append_dimensions_linux.yaml @@ -1,7 +1,10 @@ exporters: awscloudwatch: + backoff_retry_base: 200ms force_flush_interval: 1m0s + max_concurrent_publishers: 10 max_datums_per_call: 1000 + max_retry_count: 5 max_values_per_datum: 150 middleware: agenthealth/metrics namespace: CWAgent diff --git a/translator/tocwconfig/sampleConfig/collectd_config_linux.yaml b/translator/tocwconfig/sampleConfig/collectd_config_linux.yaml index 3291238c43c..740359213e2 100644 --- a/translator/tocwconfig/sampleConfig/collectd_config_linux.yaml +++ b/translator/tocwconfig/sampleConfig/collectd_config_linux.yaml @@ -1,7 +1,10 @@ exporters: awscloudwatch: + backoff_retry_base: 200ms force_flush_interval: 1m0s + max_concurrent_publishers: 10 max_datums_per_call: 1000 + max_retry_count: 5 max_values_per_datum: 150 middleware: agenthealth/metrics namespace: CWAgent diff --git a/translator/tocwconfig/sampleConfig/compass_linux_config.yaml b/translator/tocwconfig/sampleConfig/compass_linux_config.yaml index 9e1bc8e675f..6fc2066026c 100644 --- a/translator/tocwconfig/sampleConfig/compass_linux_config.yaml +++ b/translator/tocwconfig/sampleConfig/compass_linux_config.yaml @@ -1,11 +1,14 @@ exporters: awscloudwatch: + backoff_retry_base: 200ms drop_original_metrics: collectd_drop: true statsd_drop: true endpoint_override: https://monitoring-fips.us-west-2.amazonaws.com force_flush_interval: 1m0s + max_concurrent_publishers: 10 max_datums_per_call: 1000 + max_retry_count: 5 max_values_per_datum: 5000 middleware: agenthealth/metrics namespace: CWAgent diff --git a/translator/tocwconfig/sampleConfig/complete_darwin_config.yaml b/translator/tocwconfig/sampleConfig/complete_darwin_config.yaml index e23f731b05a..adfa705a9d9 100644 --- a/translator/tocwconfig/sampleConfig/complete_darwin_config.yaml +++ b/translator/tocwconfig/sampleConfig/complete_darwin_config.yaml @@ -1,8 +1,11 @@ exporters: awscloudwatch: + backoff_retry_base: 200ms endpoint_override: https://monitoring-fips.us-west-2.amazonaws.com force_flush_interval: 1m0s + max_concurrent_publishers: 10 max_datums_per_call: 1000 + max_retry_count: 5 max_values_per_datum: 5000 middleware: agenthealth/metrics namespace: CWAgent diff --git a/translator/tocwconfig/sampleConfig/complete_linux_config.yaml b/translator/tocwconfig/sampleConfig/complete_linux_config.yaml index a48f08dc2df..562a9010e7b 100644 --- a/translator/tocwconfig/sampleConfig/complete_linux_config.yaml +++ b/translator/tocwconfig/sampleConfig/complete_linux_config.yaml @@ -1,5 +1,6 @@ exporters: awscloudwatch: + backoff_retry_base: 200ms drop_original_metrics: CPU_USAGE_IDLE: true collectd_drop: true @@ -7,7 +8,9 @@ exporters: statsd_drop: true endpoint_override: https://monitoring-fips.us-west-2.amazonaws.com force_flush_interval: 1m0s + max_concurrent_publishers: 10 max_datums_per_call: 1000 + max_retry_count: 5 max_values_per_datum: 5000 middleware: agenthealth/metrics namespace: CWAgent diff --git a/translator/tocwconfig/sampleConfig/complete_windows_config.yaml b/translator/tocwconfig/sampleConfig/complete_windows_config.yaml index 563d3db6092..c1ff635a608 100644 --- a/translator/tocwconfig/sampleConfig/complete_windows_config.yaml +++ b/translator/tocwconfig/sampleConfig/complete_windows_config.yaml @@ -1,8 +1,11 @@ exporters: awscloudwatch: + backoff_retry_base: 200ms endpoint_override: https://monitoring-fips.us-west-2.amazonaws.com force_flush_interval: 1m0s + max_concurrent_publishers: 10 max_datums_per_call: 1000 + max_retry_count: 5 max_values_per_datum: 5000 middleware: agenthealth/metrics namespace: CWAgent diff --git a/translator/tocwconfig/sampleConfig/delta_config_linux.yaml b/translator/tocwconfig/sampleConfig/delta_config_linux.yaml index e6696733c01..8eed2509267 100644 --- a/translator/tocwconfig/sampleConfig/delta_config_linux.yaml +++ b/translator/tocwconfig/sampleConfig/delta_config_linux.yaml @@ -1,7 +1,10 @@ exporters: awscloudwatch: + backoff_retry_base: 200ms force_flush_interval: 1m0s + max_concurrent_publishers: 10 max_datums_per_call: 1000 + max_retry_count: 5 max_values_per_datum: 150 middleware: agenthealth/metrics namespace: CWAgent diff --git a/translator/tocwconfig/sampleConfig/delta_net_config_linux.yaml b/translator/tocwconfig/sampleConfig/delta_net_config_linux.yaml index 33a2a06947d..c38fcd47978 100644 --- a/translator/tocwconfig/sampleConfig/delta_net_config_linux.yaml +++ b/translator/tocwconfig/sampleConfig/delta_net_config_linux.yaml @@ -1,7 +1,10 @@ exporters: awscloudwatch: + backoff_retry_base: 200ms force_flush_interval: 1m0s + max_concurrent_publishers: 10 max_datums_per_call: 1000 + max_retry_count: 5 max_values_per_datum: 150 middleware: agenthealth/metrics namespace: CWAgent diff --git a/translator/tocwconfig/sampleConfig/diskio_mixed_config_linux.yaml b/translator/tocwconfig/sampleConfig/diskio_mixed_config_linux.yaml index 4a2611c1074..c45ff298ab9 100644 --- a/translator/tocwconfig/sampleConfig/diskio_mixed_config_linux.yaml +++ b/translator/tocwconfig/sampleConfig/diskio_mixed_config_linux.yaml @@ -1,7 +1,10 @@ exporters: awscloudwatch: + backoff_retry_base: 200ms force_flush_interval: 1m0s + max_concurrent_publishers: 10 max_datums_per_call: 1000 + max_retry_count: 5 max_values_per_datum: 150 middleware: agenthealth/metrics namespace: CWAgent diff --git a/translator/tocwconfig/sampleConfig/diskio_nvme_config_linux.yaml b/translator/tocwconfig/sampleConfig/diskio_nvme_config_linux.yaml index ea294dc7e27..a3d1417ed1e 100644 --- a/translator/tocwconfig/sampleConfig/diskio_nvme_config_linux.yaml +++ b/translator/tocwconfig/sampleConfig/diskio_nvme_config_linux.yaml @@ -1,7 +1,10 @@ exporters: awscloudwatch: + backoff_retry_base: 200ms force_flush_interval: 1m0s + max_concurrent_publishers: 10 max_datums_per_call: 1000 + max_retry_count: 5 max_values_per_datum: 150 middleware: agenthealth/metrics namespace: CWAgent diff --git a/translator/tocwconfig/sampleConfig/diskio_telegraf_config_linux.yaml b/translator/tocwconfig/sampleConfig/diskio_telegraf_config_linux.yaml index 0936394b3aa..82d8f05792f 100644 --- a/translator/tocwconfig/sampleConfig/diskio_telegraf_config_linux.yaml +++ b/translator/tocwconfig/sampleConfig/diskio_telegraf_config_linux.yaml @@ -1,7 +1,10 @@ exporters: awscloudwatch: + backoff_retry_base: 200ms force_flush_interval: 1m0s + max_concurrent_publishers: 10 max_datums_per_call: 1000 + max_retry_count: 5 max_values_per_datum: 150 middleware: agenthealth/metrics namespace: CWAgent diff --git a/translator/tocwconfig/sampleConfig/drop_origin_linux.yaml b/translator/tocwconfig/sampleConfig/drop_origin_linux.yaml index a4d7ef73c04..9b6c876e150 100644 --- a/translator/tocwconfig/sampleConfig/drop_origin_linux.yaml +++ b/translator/tocwconfig/sampleConfig/drop_origin_linux.yaml @@ -1,12 +1,15 @@ exporters: awscloudwatch: + backoff_retry_base: 200ms drop_original_metrics: CPU_USAGE_IDLE: true cpu_time_active: true nvidia_smi_temperature_gpu: true nvidia_smi_utilization_gpu: true force_flush_interval: 1m0s + max_concurrent_publishers: 10 max_datums_per_call: 1000 + max_retry_count: 5 max_values_per_datum: 150 middleware: agenthealth/metrics namespace: CWAgent diff --git a/translator/tocwconfig/sampleConfig/dualstack_config.yaml b/translator/tocwconfig/sampleConfig/dualstack_config.yaml index 93fca2b3e0d..b028f62e7fb 100644 --- a/translator/tocwconfig/sampleConfig/dualstack_config.yaml +++ b/translator/tocwconfig/sampleConfig/dualstack_config.yaml @@ -1,7 +1,10 @@ exporters: awscloudwatch: + backoff_retry_base: 200ms force_flush_interval: 1m0s + max_concurrent_publishers: 10 max_datums_per_call: 1000 + max_retry_count: 5 max_values_per_datum: 150 middleware: agenthealth/metrics namespace: CWAgent diff --git a/translator/tocwconfig/sampleConfig/ignore_append_dimensions.yaml b/translator/tocwconfig/sampleConfig/ignore_append_dimensions.yaml index 54b819a0e7e..b05b4f1d3b9 100644 --- a/translator/tocwconfig/sampleConfig/ignore_append_dimensions.yaml +++ b/translator/tocwconfig/sampleConfig/ignore_append_dimensions.yaml @@ -1,7 +1,10 @@ exporters: awscloudwatch: + backoff_retry_base: 200ms force_flush_interval: 1m0s + max_concurrent_publishers: 10 max_datums_per_call: 1000 + max_retry_count: 5 max_values_per_datum: 150 middleware: agenthealth/metrics namespace: CWAgent diff --git a/translator/tocwconfig/sampleConfig/invalid_input_linux.yaml b/translator/tocwconfig/sampleConfig/invalid_input_linux.yaml index bcb976cc6ff..722928413b3 100644 --- a/translator/tocwconfig/sampleConfig/invalid_input_linux.yaml +++ b/translator/tocwconfig/sampleConfig/invalid_input_linux.yaml @@ -1,7 +1,10 @@ exporters: awscloudwatch: + backoff_retry_base: 200ms force_flush_interval: 1m0s + max_concurrent_publishers: 10 max_datums_per_call: 1000 + max_retry_count: 5 max_values_per_datum: 150 middleware: agenthealth/metrics namespace: CWAgent diff --git a/translator/tocwconfig/sampleConfig/jmx_config_linux.yaml b/translator/tocwconfig/sampleConfig/jmx_config_linux.yaml index a754cd83042..2c3fdc2ea1b 100644 --- a/translator/tocwconfig/sampleConfig/jmx_config_linux.yaml +++ b/translator/tocwconfig/sampleConfig/jmx_config_linux.yaml @@ -1,10 +1,13 @@ exporters: awscloudwatch: + backoff_retry_base: 200ms drop_original_metrics: CPU_USAGE_IDLE: true cpu_time_active: true force_flush_interval: 1m0s + max_concurrent_publishers: 10 max_datums_per_call: 1000 + max_retry_count: 5 max_values_per_datum: 150 middleware: agenthealth/metrics namespace: CWAgent diff --git a/translator/tocwconfig/sampleConfig/jmx_eks_config_linux.yaml b/translator/tocwconfig/sampleConfig/jmx_eks_config_linux.yaml index a157037dd16..cc42b793fa8 100644 --- a/translator/tocwconfig/sampleConfig/jmx_eks_config_linux.yaml +++ b/translator/tocwconfig/sampleConfig/jmx_eks_config_linux.yaml @@ -1,7 +1,10 @@ exporters: awscloudwatch: + backoff_retry_base: 200ms force_flush_interval: 1m0s + max_concurrent_publishers: 10 max_datums_per_call: 1000 + max_retry_count: 5 max_values_per_datum: 150 middleware: agenthealth/metrics namespace: CWAgent diff --git a/translator/tocwconfig/sampleConfig/otlp_metrics_config.yaml b/translator/tocwconfig/sampleConfig/otlp_metrics_config.yaml index 69b2db084a1..89bf2c3bec0 100644 --- a/translator/tocwconfig/sampleConfig/otlp_metrics_config.yaml +++ b/translator/tocwconfig/sampleConfig/otlp_metrics_config.yaml @@ -1,7 +1,10 @@ exporters: awscloudwatch: + backoff_retry_base: 200ms force_flush_interval: 1m0s + max_concurrent_publishers: 10 max_datums_per_call: 1000 + max_retry_count: 5 max_values_per_datum: 150 middleware: agenthealth/metrics namespace: CWAgent diff --git a/translator/tocwconfig/sampleConfig/otlp_metrics_eks_config.yaml b/translator/tocwconfig/sampleConfig/otlp_metrics_eks_config.yaml index eb55b34c17b..c8892099fc4 100644 --- a/translator/tocwconfig/sampleConfig/otlp_metrics_eks_config.yaml +++ b/translator/tocwconfig/sampleConfig/otlp_metrics_eks_config.yaml @@ -1,7 +1,10 @@ exporters: awscloudwatch: + backoff_retry_base: 200ms force_flush_interval: 1m0s + max_concurrent_publishers: 10 max_datums_per_call: 1000 + max_retry_count: 5 max_values_per_datum: 150 middleware: agenthealth/metrics namespace: CWAgent diff --git a/translator/tocwconfig/sampleConfig/procstat_memory_swap_config.yaml b/translator/tocwconfig/sampleConfig/procstat_memory_swap_config.yaml index 1e9ab52fda5..845c1b2ed9a 100644 --- a/translator/tocwconfig/sampleConfig/procstat_memory_swap_config.yaml +++ b/translator/tocwconfig/sampleConfig/procstat_memory_swap_config.yaml @@ -1,7 +1,10 @@ exporters: awscloudwatch: + backoff_retry_base: 200ms force_flush_interval: 1m0s + max_concurrent_publishers: 10 max_datums_per_call: 1000 + max_retry_count: 5 max_values_per_datum: 150 middleware: agenthealth/metrics namespace: CWAgent diff --git a/translator/tocwconfig/sampleConfig/shared_otlp_config.yaml b/translator/tocwconfig/sampleConfig/shared_otlp_config.yaml index 0a99c080000..1019ec5525c 100644 --- a/translator/tocwconfig/sampleConfig/shared_otlp_config.yaml +++ b/translator/tocwconfig/sampleConfig/shared_otlp_config.yaml @@ -1,7 +1,10 @@ exporters: awscloudwatch: + backoff_retry_base: 200ms force_flush_interval: 1m0s + max_concurrent_publishers: 10 max_datums_per_call: 1000 + max_retry_count: 5 max_values_per_datum: 150 middleware: agenthealth/metrics namespace: CWAgent diff --git a/translator/tocwconfig/sampleConfig/standard_config_darwin.yaml b/translator/tocwconfig/sampleConfig/standard_config_darwin.yaml index 32c2e131242..d7adc01d739 100644 --- a/translator/tocwconfig/sampleConfig/standard_config_darwin.yaml +++ b/translator/tocwconfig/sampleConfig/standard_config_darwin.yaml @@ -1,7 +1,10 @@ exporters: awscloudwatch: + backoff_retry_base: 200ms force_flush_interval: 1m0s + max_concurrent_publishers: 10 max_datums_per_call: 1000 + max_retry_count: 5 max_values_per_datum: 150 middleware: agenthealth/metrics namespace: CWAgent diff --git a/translator/tocwconfig/sampleConfig/standard_config_linux.yaml b/translator/tocwconfig/sampleConfig/standard_config_linux.yaml index be36065cf36..8e303cd0422 100644 --- a/translator/tocwconfig/sampleConfig/standard_config_linux.yaml +++ b/translator/tocwconfig/sampleConfig/standard_config_linux.yaml @@ -1,7 +1,10 @@ exporters: awscloudwatch: + backoff_retry_base: 200ms force_flush_interval: 1m0s + max_concurrent_publishers: 10 max_datums_per_call: 1000 + max_retry_count: 5 max_values_per_datum: 150 middleware: agenthealth/metrics namespace: CWAgent diff --git a/translator/tocwconfig/sampleConfig/standard_config_linux_with_common_config.yaml b/translator/tocwconfig/sampleConfig/standard_config_linux_with_common_config.yaml index 1e348a59a1a..de247e92b5d 100644 --- a/translator/tocwconfig/sampleConfig/standard_config_linux_with_common_config.yaml +++ b/translator/tocwconfig/sampleConfig/standard_config_linux_with_common_config.yaml @@ -1,7 +1,10 @@ exporters: awscloudwatch: + backoff_retry_base: 200ms force_flush_interval: 1m0s + max_concurrent_publishers: 10 max_datums_per_call: 1000 + max_retry_count: 5 max_values_per_datum: 150 middleware: agenthealth/metrics namespace: CWAgent diff --git a/translator/tocwconfig/sampleConfig/standard_config_windows.yaml b/translator/tocwconfig/sampleConfig/standard_config_windows.yaml index 15057e2d419..183fce390bd 100644 --- a/translator/tocwconfig/sampleConfig/standard_config_windows.yaml +++ b/translator/tocwconfig/sampleConfig/standard_config_windows.yaml @@ -1,7 +1,10 @@ exporters: awscloudwatch: + backoff_retry_base: 200ms force_flush_interval: 1m0s + max_concurrent_publishers: 10 max_datums_per_call: 1000 + max_retry_count: 5 max_values_per_datum: 150 middleware: agenthealth/metrics namespace: CWAgent diff --git a/translator/tocwconfig/sampleConfig/standard_config_windows_with_common_config.yaml b/translator/tocwconfig/sampleConfig/standard_config_windows_with_common_config.yaml index 9d3f4c77dd8..b69ba3a7a32 100644 --- a/translator/tocwconfig/sampleConfig/standard_config_windows_with_common_config.yaml +++ b/translator/tocwconfig/sampleConfig/standard_config_windows_with_common_config.yaml @@ -1,7 +1,10 @@ exporters: awscloudwatch: + backoff_retry_base: 200ms force_flush_interval: 1m0s + max_concurrent_publishers: 10 max_datums_per_call: 1000 + max_retry_count: 5 max_values_per_datum: 150 middleware: agenthealth/metrics namespace: CWAgent diff --git a/translator/tocwconfig/sampleConfig/statsd_config_linux.yaml b/translator/tocwconfig/sampleConfig/statsd_config_linux.yaml index 86b25a8869d..41cba713199 100644 --- a/translator/tocwconfig/sampleConfig/statsd_config_linux.yaml +++ b/translator/tocwconfig/sampleConfig/statsd_config_linux.yaml @@ -1,7 +1,10 @@ exporters: awscloudwatch: + backoff_retry_base: 200ms force_flush_interval: 1m0s + max_concurrent_publishers: 10 max_datums_per_call: 1000 + max_retry_count: 5 max_values_per_datum: 150 middleware: agenthealth/metrics namespace: CWAgent diff --git a/translator/tocwconfig/sampleConfig/statsd_config_windows.yaml b/translator/tocwconfig/sampleConfig/statsd_config_windows.yaml index e0eda775cca..c9cbd1a4f23 100644 --- a/translator/tocwconfig/sampleConfig/statsd_config_windows.yaml +++ b/translator/tocwconfig/sampleConfig/statsd_config_windows.yaml @@ -1,7 +1,10 @@ exporters: awscloudwatch: + backoff_retry_base: 200ms force_flush_interval: 1m0s + max_concurrent_publishers: 10 max_datums_per_call: 1000 + max_retry_count: 5 max_values_per_datum: 150 middleware: agenthealth/metrics namespace: CWAgent diff --git a/translator/tocwconfig/sampleConfig/statsd_ecs_config.yaml b/translator/tocwconfig/sampleConfig/statsd_ecs_config.yaml index 5569675b98b..c96a060d2d2 100644 --- a/translator/tocwconfig/sampleConfig/statsd_ecs_config.yaml +++ b/translator/tocwconfig/sampleConfig/statsd_ecs_config.yaml @@ -1,7 +1,10 @@ exporters: awscloudwatch: + backoff_retry_base: 200ms force_flush_interval: 1m0s + max_concurrent_publishers: 10 max_datums_per_call: 1000 + max_retry_count: 5 max_values_per_datum: 150 middleware: agenthealth/metrics namespace: CWAgent diff --git a/translator/tocwconfig/sampleConfig/statsd_eks_config.yaml b/translator/tocwconfig/sampleConfig/statsd_eks_config.yaml index 1ed2d5355a5..c2ef001830a 100644 --- a/translator/tocwconfig/sampleConfig/statsd_eks_config.yaml +++ b/translator/tocwconfig/sampleConfig/statsd_eks_config.yaml @@ -1,7 +1,10 @@ exporters: awscloudwatch: + backoff_retry_base: 200ms force_flush_interval: 1m0s + max_concurrent_publishers: 10 max_datums_per_call: 1000 + max_retry_count: 5 max_values_per_datum: 150 middleware: agenthealth/metrics namespace: CWAgent diff --git a/translator/tocwconfig/sampleConfig/system_metrics_config.conf b/translator/tocwconfig/sampleConfig/system_metrics_config.conf new file mode 100644 index 00000000000..cdad5897cfa --- /dev/null +++ b/translator/tocwconfig/sampleConfig/system_metrics_config.conf @@ -0,0 +1,25 @@ +[agent] + collection_jitter = "0s" + debug = false + flush_interval = "1s" + flush_jitter = "0s" + hostname = "" + interval = "60s" + logfile = "/opt/aws/amazon-cloudwatch-agent/logs/amazon-cloudwatch-agent.log" + logtarget = "lumberjack" + metric_batch_size = 1000 + metric_buffer_limit = 10000 + omit_hostname = false + precision = "" + quiet = false + round_interval = false + +[inputs] + + [[inputs.cpu]] + fieldpass = ["usage_idle"] + totalcpu = true + +[outputs] + + [[outputs.cloudwatch]] diff --git a/translator/tocwconfig/sampleConfig/system_metrics_config.json b/translator/tocwconfig/sampleConfig/system_metrics_config.json new file mode 100644 index 00000000000..443adadf0b5 --- /dev/null +++ b/translator/tocwconfig/sampleConfig/system_metrics_config.json @@ -0,0 +1,14 @@ +{ + "agent": { + "region": "us-east-1", + "system_metrics_enabled": true + }, + "metrics": { + "metrics_collected": { + "cpu": { + "measurement": ["cpu_usage_idle"], + "totalcpu": true + } + } + } +} diff --git a/translator/tocwconfig/sampleConfig/system_metrics_config.yaml b/translator/tocwconfig/sampleConfig/system_metrics_config.yaml new file mode 100644 index 00000000000..82f8aa04a93 --- /dev/null +++ b/translator/tocwconfig/sampleConfig/system_metrics_config.yaml @@ -0,0 +1,108 @@ +exporters: + awscloudwatch: + backoff_retry_base: 200ms + force_flush_interval: 1m0s + max_concurrent_publishers: 10 + max_datums_per_call: 1000 + max_retry_count: 5 + max_values_per_datum: 150 + middleware: agenthealth/metrics + namespace: CWAgent + region: us-east-1 + resource_to_telemetry_conversion: + enabled: true + awscloudwatch/systemmetrics: + backoff_retry_base: 1m0s + force_flush_interval: 1m0s + max_concurrent_publishers: 1 + max_datums_per_call: 1000 + max_retry_count: 2 + max_values_per_datum: 150 + middleware: agenthealth/metrics + namespace: CWAgent/System + region: us-east-1 + resource_to_telemetry_conversion: + enabled: true + rollup_dimensions: + - - InstanceId + - [ ] +extensions: + agenthealth/metrics: + is_usage_data_enabled: true + stats: + operations: + - PutMetricData + usage_flags: + mode: EC2 + region_type: ACJ + agenthealth/statuscode: + is_status_code_enabled: true + is_usage_data_enabled: true + stats: + usage_flags: + mode: EC2 + region_type: ACJ + entitystore: + mode: ec2 + region: us-east-1 +processors: + awsentity/resource: + entity_type: Resource + platform: ec2 + batch/systemmetrics: + metadata_cardinality_limit: 1000 + send_batch_max_size: 0 + send_batch_size: 8192 + timeout: 15m0s + ec2tagger/systemmetrics: + ec2_metadata_tags: + - InstanceId + imds_retries: 1 + middleware: agenthealth/statuscode + refresh_tags_interval: 0s + refresh_volumes_interval: 0s +receivers: + systemmetrics: + collection_interval: 1m0s + initial_delay: 1s + timeout: 0s + telegraf_cpu: + collection_interval: 1m0s + initial_delay: 1s + timeout: 0s +service: + extensions: + - agenthealth/metrics + - agenthealth/statuscode + - entitystore + pipelines: + metrics/systemmetrics: + exporters: + - awscloudwatch/systemmetrics + processors: + - ec2tagger/systemmetrics + - batch/systemmetrics + receivers: + - systemmetrics + metrics/host: + exporters: + - awscloudwatch + processors: + - awsentity/resource + receivers: + - telegraf_cpu + telemetry: + logs: + encoding: console + level: info + output_paths: + - /opt/aws/amazon-cloudwatch-agent/logs/amazon-cloudwatch-agent.log + sampling: + enabled: true + initial: 2 + thereafter: 500 + tick: 10s + metrics: + level: None + traces: + level: None diff --git a/translator/tocwconfig/tocwconfig_test.go b/translator/tocwconfig/tocwconfig_test.go index 5314fe4b07e..0850ed0bf49 100644 --- a/translator/tocwconfig/tocwconfig_test.go +++ b/translator/tocwconfig/tocwconfig_test.go @@ -448,6 +448,14 @@ func TestDiskIOMixedConfig(t *testing.T) { checkTranslation(t, "diskio_mixed_config_linux", "linux", expectedEnvVars, "") } +func TestSystemMetricsConfig(t *testing.T) { + resetContext(t) + t.Setenv(envconfig.SystemMetricsEnabled, "true") + context.CurrentContext().SetMode(config.ModeEC2) + expectedEnvVars := map[string]string{} + checkTranslation(t, "system_metrics_config", "linux", expectedEnvVars, "") +} + // prometheus func TestPrometheusConfig(t *testing.T) { resetContext(t) @@ -952,6 +960,7 @@ func readCommonConfig(t *testing.T, commonConfigFilePath string) { func resetContext(t *testing.T) { t.Setenv(envconfig.IMDS_NUMBER_RETRY, strconv.Itoa(retryer.DefaultImdsRetries)) + t.Setenv(envconfig.SystemMetricsEnabled, "false") util.DetectRegion = func(string, map[string]string) (string, string) { return "us-west-2", "ACJ" } diff --git a/translator/translate/otel/common/common.go b/translator/translate/otel/common/common.go index bb69626e96b..c31dd0dda0d 100644 --- a/translator/translate/otel/common/common.go +++ b/translator/translate/otel/common/common.go @@ -86,18 +86,19 @@ const ( ) const ( - CollectDMetricKey = "collectd" - CollectDPluginKey = "socket_listener" - CPUMetricKey = "cpu" - DiskMetricKey = "disk" - DiskIoMetricKey = "diskio" - StatsDMetricKey = "statsd" - SwapMetricKey = "swap" - MemMetricKey = "mem" - NetMetricKey = "net" - NetStatMetricKey = "netstat" - ProcessMetricKey = "process" - ProcStatMetricKey = "procstat" + CollectDMetricKey = "collectd" + CollectDPluginKey = "socket_listener" + CPUMetricKey = "cpu" + DiskMetricKey = "disk" + DiskIoMetricKey = "diskio" + StatsDMetricKey = "statsd" + SwapMetricKey = "swap" + MemMetricKey = "mem" + NetMetricKey = "net" + NetStatMetricKey = "netstat" + ProcessMetricKey = "process" + ProcStatMetricKey = "procstat" + SystemMetricsEnabledKey = "system_metrics_enabled" //Windows Plugins MemMetricKeyWindows = "Memory" @@ -122,6 +123,7 @@ const ( PipelineNameEmfLogs = "emf_logs" PipelineNamePrometheus = "prometheus" PipelineNameKueue = "kueueContainerInsights" + PipelineNameSystemMetrics = "systemmetrics" AppSignals = "application_signals" AppSignalsFallback = "app_signals" AppSignalsRules = "rules" @@ -141,8 +143,9 @@ var ( pipeline.SignalTraces: {AppSignalsTraces, AppSignalsTracesFallback}, pipeline.SignalMetrics: {AppSignalsMetrics, AppSignalsMetricsFallback}, } - JmxConfigKey = ConfigKey(MetricsKey, MetricsCollectedKey, JmxKey) - ContainerInsightsConfigKey = ConfigKey(LogsKey, MetricsCollectedKey, KubernetesKey) + SystemMetricsEnabledConfigKey = ConfigKey(AgentKey, SystemMetricsEnabledKey) + JmxConfigKey = ConfigKey(MetricsKey, MetricsCollectedKey, JmxKey) + ContainerInsightsConfigKey = ConfigKey(LogsKey, MetricsCollectedKey, KubernetesKey) JmxTargets = []string{"activemq", "cassandra", "hbase", "hadoop", "jetty", "jvm", "kafka", "kafka-consumer", "kafka-producer", "solr", "tomcat", "wildfly"} diff --git a/translator/translate/otel/pipeline/systemmetrics/cloudwatch.go b/translator/translate/otel/pipeline/systemmetrics/cloudwatch.go new file mode 100644 index 00000000000..365a162e835 --- /dev/null +++ b/translator/translate/otel/pipeline/systemmetrics/cloudwatch.go @@ -0,0 +1,47 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package systemmetrics + +import ( + "time" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/confmap" + "go.opentelemetry.io/collector/exporter" + + "github.com/aws/amazon-cloudwatch-agent/plugins/outputs/cloudwatch" + "github.com/aws/amazon-cloudwatch-agent/translator/translate/agent" + "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/common" + "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/extension/agenthealth" +) + +const systemMetricsNamespace = "CWAgent/System" + +type cloudWatchTranslator struct { + factory exporter.Factory +} + +func newCloudWatchTranslator() common.ComponentTranslator { + return &cloudWatchTranslator{factory: cloudwatch.NewFactory()} +} + +func (t *cloudWatchTranslator) ID() component.ID { + return component.NewIDWithName(t.factory.Type(), common.PipelineNameSystemMetrics) +} + +func (t *cloudWatchTranslator) Translate(_ *confmap.Conf) (component.Config, error) { + cfg := t.factory.CreateDefaultConfig().(*cloudwatch.Config) + credentials := confmap.NewFromStringMap(agent.Global_Config.Credentials) + _ = credentials.Unmarshal(cfg) + + cfg.Region = agent.Global_Config.Region + cfg.Namespace = systemMetricsNamespace + cfg.RollupDimensions = [][]string{{"InstanceId"}, {}} + cfg.MiddlewareID = &agenthealth.MetricsID + cfg.MaxRetryCount = 2 + cfg.BackoffRetryBase = time.Minute + cfg.MaxConcurrentPublishers = 1 + + return cfg, nil +} diff --git a/translator/translate/otel/pipeline/systemmetrics/cloudwatch_test.go b/translator/translate/otel/pipeline/systemmetrics/cloudwatch_test.go new file mode 100644 index 00000000000..df5fbfdab39 --- /dev/null +++ b/translator/translate/otel/pipeline/systemmetrics/cloudwatch_test.go @@ -0,0 +1,32 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package systemmetrics + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/aws/amazon-cloudwatch-agent/plugins/outputs/cloudwatch" + "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/common" + "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/extension/agenthealth" +) + +func TestCloudWatchTranslator(t *testing.T) { + tt := newCloudWatchTranslator() + assert.Equal(t, "awscloudwatch/"+common.PipelineNameSystemMetrics, tt.ID().String()) + + got, err := tt.Translate(nil) + require.NoError(t, err) + cfg, ok := got.(*cloudwatch.Config) + require.True(t, ok) + assert.Equal(t, "CWAgent/System", cfg.Namespace) + assert.Equal(t, [][]string{{"InstanceId"}, {}}, cfg.RollupDimensions) + assert.Equal(t, &agenthealth.MetricsID, cfg.MiddlewareID) + assert.Equal(t, 2, cfg.MaxRetryCount) + assert.Equal(t, time.Minute, cfg.BackoffRetryBase) + assert.Equal(t, 1, cfg.MaxConcurrentPublishers) +} diff --git a/translator/translate/otel/pipeline/systemmetrics/ec2tagger.go b/translator/translate/otel/pipeline/systemmetrics/ec2tagger.go new file mode 100644 index 00000000000..a243e425ece --- /dev/null +++ b/translator/translate/otel/pipeline/systemmetrics/ec2tagger.go @@ -0,0 +1,41 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package systemmetrics + +import ( + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/confmap" + "go.opentelemetry.io/collector/processor" + + "github.com/aws/amazon-cloudwatch-agent/internal/retryer" + "github.com/aws/amazon-cloudwatch-agent/plugins/processors/ec2tagger" + "github.com/aws/amazon-cloudwatch-agent/translator/translate/agent" + "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/common" + "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/extension/agenthealth" +) + +type ec2TaggerTranslator struct { + factory processor.Factory +} + +func newEc2TaggerTranslator() common.ComponentTranslator { + return &ec2TaggerTranslator{factory: ec2tagger.NewFactory()} +} + +func (t *ec2TaggerTranslator) ID() component.ID { + return component.NewIDWithName(t.factory.Type(), common.PipelineNameSystemMetrics) +} + +func (t *ec2TaggerTranslator) Translate(_ *confmap.Conf) (component.Config, error) { + cfg := t.factory.CreateDefaultConfig().(*ec2tagger.Config) + credentials := confmap.NewFromStringMap(agent.Global_Config.Credentials) + _ = credentials.Unmarshal(cfg) + + // Always add InstanceId + cfg.EC2MetadataTags = []string{"InstanceId"} + cfg.MiddlewareID = &agenthealth.StatusCodeID + cfg.IMDSRetries = retryer.GetDefaultRetryNumber() + + return cfg, nil +} diff --git a/translator/translate/otel/pipeline/systemmetrics/ec2tagger_test.go b/translator/translate/otel/pipeline/systemmetrics/ec2tagger_test.go new file mode 100644 index 00000000000..4e3dc4d4dac --- /dev/null +++ b/translator/translate/otel/pipeline/systemmetrics/ec2tagger_test.go @@ -0,0 +1,28 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package systemmetrics + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/aws/amazon-cloudwatch-agent/plugins/processors/ec2tagger" + "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/common" + "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/extension/agenthealth" +) + +func TestEc2TaggerTranslator(t *testing.T) { + tt := newEc2TaggerTranslator() + assert.Equal(t, "ec2tagger/"+common.PipelineNameSystemMetrics, tt.ID().String()) + + got, err := tt.Translate(nil) + require.NoError(t, err) + cfg, ok := got.(*ec2tagger.Config) + require.True(t, ok) + assert.Equal(t, []string{"InstanceId"}, cfg.EC2MetadataTags) + assert.Equal(t, &agenthealth.StatusCodeID, cfg.MiddlewareID) + assert.Greater(t, cfg.IMDSRetries, 0) +} diff --git a/translator/translate/otel/pipeline/systemmetrics/translator.go b/translator/translate/otel/pipeline/systemmetrics/translator.go new file mode 100644 index 00000000000..1ace327fc4f --- /dev/null +++ b/translator/translate/otel/pipeline/systemmetrics/translator.go @@ -0,0 +1,187 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package systemmetrics + +import ( + "bufio" + "math" + "os" + "strconv" + "strings" + "time" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/confmap" + "go.opentelemetry.io/collector/pipeline" + + "github.com/aws/amazon-cloudwatch-agent/cfg/envconfig" + "github.com/aws/amazon-cloudwatch-agent/translator/context" + "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/common" + "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/extension/agenthealth" + "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/processor/batchprocessor" + "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/receiver/systemmetrics" +) + +const batchTimeout = 15 * time.Minute + +const ( + cgroupV2MemoryMaxPath = "/sys/fs/cgroup/memory.max" + cgroupV1MemoryLimitPath = "/sys/fs/cgroup/memory/memory.limit_in_bytes" + apolloDir = "/apollo" + imageIDFile = "/etc/image-id" + osReleaseFile = "/etc/os-release" +) + +var amznMarkers = []string{"naws", "internal"} +var amznPrefixes = []string{"amzn2", "al2023"} + +type translator struct{} + +var _ common.PipelineTranslator = (*translator)(nil) + +func NewTranslator() common.PipelineTranslator { + return &translator{} +} + +func (t *translator) ID() pipeline.ID { + return pipeline.NewIDWithName(pipeline.SignalMetrics, common.PipelineNameSystemMetrics) +} + +func (t *translator) Translate(conf *confmap.Conf) (*common.ComponentTranslators, error) { + if !isEnabled(conf) { + return nil, &common.MissingKeyError{ID: t.ID(), JsonKey: common.SystemMetricsEnabledConfigKey} + } + + translators := common.ComponentTranslators{ + Receivers: common.NewTranslatorMap[component.Config, component.ID](systemmetrics.NewTranslator()), + Processors: common.NewTranslatorMap[component.Config, component.ID]( + newEc2TaggerTranslator(), + batchprocessor.NewTranslator( + common.WithName(common.PipelineNameSystemMetrics), + batchprocessor.WithTimeout(batchTimeout), + ), + ), + Exporters: common.NewTranslatorMap[component.Config, component.ID](newCloudWatchTranslator()), + Extensions: common.NewTranslatorMap[component.Config, component.ID]( + agenthealth.NewTranslator(agenthealth.MetricsName, []string{agenthealth.OperationPutMetricData}), + ), + } + + return &translators, nil +} + +func isKubernetes() bool { + _, ok := os.LookupEnv("KUBERNETES_SERVICE_HOST") + return ok +} + +func isCgroupMemoryConstrained() bool { + // cgroup v2: memory.max defaults to "max" (unlimited) + if data, err := os.ReadFile(cgroupV2MemoryMaxPath); err == nil { + if val := strings.TrimSpace(string(data)); val != "max" { + return true + } + } + // cgroup v1: memory.limit_in_bytes defaults to ~MaxInt64 when unlimited + if data, err := os.ReadFile(cgroupV1MemoryLimitPath); err == nil { + if limit, err := strconv.ParseInt(strings.TrimSpace(string(data)), 10, 64); err == nil { + if limit < math.MaxInt64/2 { + return true + } + } + } + return false +} + +// isEnabled determines whether the systemmetrics pipeline should be created. +func isEnabled(conf *confmap.Conf) bool { + if val, ok := os.LookupEnv(envconfig.SystemMetricsEnabled); ok { + return strings.EqualFold(val, "true") + } + + if conf != nil && conf.IsSet(common.SystemMetricsEnabledConfigKey) { + enabled, ok := conf.Get(common.SystemMetricsEnabledConfigKey).(bool) + if !ok { + return false + } + return enabled + } + + if context.CurrentContext().RunInContainer() || isKubernetes() || isCgroupMemoryConstrained() { + return false + } + + return isSystemMetricsHost() +} + +// isSystemMetricsHost checks whether this host should collect system metrics. +func isSystemMetricsHost() bool { + return hasApollo() || isRecognizedAMI() +} + +func hasApollo() bool { + info, err := os.Stat(apolloDir) + if err != nil { + return false + } + return info.IsDir() +} + +func isRecognizedAMI() bool { + return checkImageID() || checkOSRelease() +} + +func checkImageID() bool { + imageName := parseKeyFromFile(imageIDFile, "image_name") + return matchesImageNameMarker(imageName) +} + +func matchesImageNameMarker(imageName string) bool { + lower := strings.ToLower(imageName) + + if strings.HasPrefix(lower, "al2-unified") { + return true + } + hasPrefix := false + for _, prefix := range amznPrefixes { + if strings.HasPrefix(lower, prefix) { + hasPrefix = true + break + } + } + if !hasPrefix { + return false + } + for _, marker := range amznMarkers { + if strings.Contains(lower, marker) { + return true + } + } + return false +} + +func checkOSRelease() bool { + return parseKeyFromFile(osReleaseFile, "NAME") == "Amazon Linux" && + parseKeyFromFile(osReleaseFile, "VARIANT") == "internal" +} + +// parseKeyFromFile reads a KEY=VALUE or KEY="VALUE" file and returns the value for the given key. +func parseKeyFromFile(path string, key string) string { + f, err := os.Open(path) + if err != nil { + return "" + } + defer f.Close() + + prefix := key + "=" + scanner := bufio.NewScanner(f) + for scanner.Scan() { + line := strings.TrimSpace(scanner.Text()) + if strings.HasPrefix(line, prefix) { + val := line[len(prefix):] + return strings.Trim(val, "\"") + } + } + return "" +} diff --git a/translator/translate/otel/pipeline/systemmetrics/translator_test.go b/translator/translate/otel/pipeline/systemmetrics/translator_test.go new file mode 100644 index 00000000000..ac16751ee3f --- /dev/null +++ b/translator/translate/otel/pipeline/systemmetrics/translator_test.go @@ -0,0 +1,191 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package systemmetrics + +import ( + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/confmap" + + "github.com/aws/amazon-cloudwatch-agent/cfg/envconfig" + "github.com/aws/amazon-cloudwatch-agent/internal/util/collections" + "github.com/aws/amazon-cloudwatch-agent/translator/context" + "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/common" +) + +func TestTranslator(t *testing.T) { + type want struct { + receivers []string + processors []string + exporters []string + extensions []string + } + tt := NewTranslator() + assert.Equal(t, "metrics/systemmetrics", tt.ID().String()) + + testCases := map[string]struct { + input map[string]interface{} + runInContainer bool + kubernetes bool + envEnabled string // "true", "false", or "" (not set) + want *want + wantErr error + }{ + "WithEnvDisabled": { + input: map[string]interface{}{}, + envEnabled: "false", + wantErr: &common.MissingKeyError{ID: tt.ID(), JsonKey: common.SystemMetricsEnabledConfigKey}, + }, + "WithEnvEnabled": { + input: map[string]interface{}{}, + envEnabled: "true", + want: &want{ + receivers: []string{"systemmetrics"}, + processors: []string{"ec2tagger/systemmetrics", "batch/systemmetrics"}, + exporters: []string{"awscloudwatch/systemmetrics"}, + extensions: []string{"agenthealth/metrics"}, + }, + }, + "WithEnvEnabledOverridesContainer": { + input: map[string]interface{}{}, + envEnabled: "true", + runInContainer: true, + want: &want{ + receivers: []string{"systemmetrics"}, + processors: []string{"ec2tagger/systemmetrics", "batch/systemmetrics"}, + exporters: []string{"awscloudwatch/systemmetrics"}, + extensions: []string{"agenthealth/metrics"}, + }, + }, + "WithJsonDisabled": { + input: map[string]interface{}{ + "agent": map[string]interface{}{ + "system_metrics_enabled": false, + }, + }, + wantErr: &common.MissingKeyError{ID: tt.ID(), JsonKey: common.SystemMetricsEnabledConfigKey}, + }, + "WithJsonEnabled": { + input: map[string]interface{}{ + "agent": map[string]interface{}{ + "system_metrics_enabled": true, + }, + }, + want: &want{ + receivers: []string{"systemmetrics"}, + processors: []string{"ec2tagger/systemmetrics", "batch/systemmetrics"}, + exporters: []string{"awscloudwatch/systemmetrics"}, + extensions: []string{"agenthealth/metrics"}, + }, + }, + "WithRunInContainer": { + input: map[string]interface{}{}, + runInContainer: true, + wantErr: &common.MissingKeyError{ID: tt.ID(), JsonKey: common.SystemMetricsEnabledConfigKey}, + }, + "WithKubernetes": { + input: map[string]interface{}{}, + kubernetes: true, + wantErr: &common.MissingKeyError{ID: tt.ID(), JsonKey: common.SystemMetricsEnabledConfigKey}, + }, + // TODO: add WithUnrecognizedHost once host detection paths (/apollo, /etc/image-id, + // /etc/os-release) are injectable, so the test doesn't depend on the environment it runs in. + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + context.ResetContext() + if tc.envEnabled != "" { + t.Setenv(envconfig.SystemMetricsEnabled, tc.envEnabled) + } + if tc.runInContainer { + context.CurrentContext().SetRunInContainer(true) + } + if tc.kubernetes { + t.Setenv("KUBERNETES_SERVICE_HOST", "10.0.0.1") + } + conf := confmap.NewFromStringMap(tc.input) + got, err := tt.Translate(conf) + assert.Equal(t, tc.wantErr, err) + if tc.want == nil { + assert.Nil(t, got) + } else { + require.NotNil(t, got) + assert.Equal(t, tc.want.receivers, collections.MapSlice(got.Receivers.Keys(), component.ID.String)) + assert.Equal(t, tc.want.processors, collections.MapSlice(got.Processors.Keys(), component.ID.String)) + assert.Equal(t, tc.want.exporters, collections.MapSlice(got.Exporters.Keys(), component.ID.String)) + assert.Equal(t, tc.want.extensions, collections.MapSlice(got.Extensions.Keys(), component.ID.String)) + } + }) + } +} + +func TestParseKeyFromFile(t *testing.T) { + tmp := t.TempDir() + + f1 := filepath.Join(tmp, "noquotes") + require.NoError(t, os.WriteFile(f1, []byte("image_name=amzn2-foo-naws-bar\nimage_version=2.0\n"), 0600)) + assert.Equal(t, "amzn2-foo-naws-bar", parseKeyFromFile(f1, "image_name")) + assert.Equal(t, "2.0", parseKeyFromFile(f1, "image_version")) + assert.Equal(t, "", parseKeyFromFile(f1, "missing_key")) + + f2 := filepath.Join(tmp, "quoted") + require.NoError(t, os.WriteFile(f2, []byte("VARIANT=\"internal\"\nNAME=\"Amazon Linux\"\n"), 0600)) + assert.Equal(t, "internal", parseKeyFromFile(f2, "VARIANT")) + assert.Equal(t, "Amazon Linux", parseKeyFromFile(f2, "NAME")) + + assert.Equal(t, "", parseKeyFromFile(filepath.Join(tmp, "nonexistent"), "key")) +} + +func TestCheckImageIDWithMarkers(t *testing.T) { + tests := []struct { + name string + imageName string + expected bool + }{ + {"amzn2 naws", "amzn2-foo-naws-bar", true}, + {"amzn2 internal", "amzn2-internal-baz", true}, + {"al2023 naws", "al2023-naws-qux", true}, + {"al2023 internal", "al2023-internal-quux", true}, + {"al2-unified", "al2-unified-corge", true}, + {"al2023 no marker", "al2023-foo-bar", false}, + {"amzn2 no marker", "amzn2-foo-bar", false}, + // wrong prefix with valid marker — should NOT match + {"ubuntu with naws", "ubuntu-naws-22.04", false}, + {"centos with internal", "centos-internal-7", false}, + // unified without al2 prefix — should NOT match + {"random unified", "fedora-unified-39", false}, + // empty + {"empty", "", false}, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + assert.Equal(t, tc.expected, matchesImageNameMarker(tc.imageName)) + }) + } +} + +func TestCheckOSReleaseVariant(t *testing.T) { + tmp := t.TempDir() + + f1 := filepath.Join(tmp, "internal") + require.NoError(t, os.WriteFile(f1, []byte("NAME=\"Amazon Linux\"\nVARIANT=\"internal\"\n"), 0600)) + assert.Equal(t, "Amazon Linux", parseKeyFromFile(f1, "NAME")) + assert.Equal(t, "internal", parseKeyFromFile(f1, "VARIANT")) + + f2 := filepath.Join(tmp, "public") + require.NoError(t, os.WriteFile(f2, []byte("NAME=\"Amazon Linux\"\nVERSION=\"2023\"\n"), 0600)) + assert.Equal(t, "", parseKeyFromFile(f2, "VARIANT")) + + f3 := filepath.Join(tmp, "wrong-name") + require.NoError(t, os.WriteFile(f3, []byte("NAME=\"Ubuntu\"\nVARIANT=\"internal\"\n"), 0600)) + assert.Equal(t, "Ubuntu", parseKeyFromFile(f3, "NAME")) + assert.Equal(t, "internal", parseKeyFromFile(f3, "VARIANT")) +} diff --git a/translator/translate/otel/receiver/systemmetrics/translator.go b/translator/translate/otel/receiver/systemmetrics/translator.go new file mode 100644 index 00000000000..64c5d19ace3 --- /dev/null +++ b/translator/translate/otel/receiver/systemmetrics/translator.go @@ -0,0 +1,31 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package systemmetrics + +import ( + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/confmap" + "go.opentelemetry.io/collector/receiver" + + "github.com/aws/amazon-cloudwatch-agent/receiver/systemmetricsreceiver" + "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/common" +) + +type translator struct { + factory receiver.Factory +} + +var _ common.ComponentTranslator = (*translator)(nil) + +func NewTranslator() common.ComponentTranslator { + return &translator{factory: systemmetricsreceiver.NewFactory()} +} + +func (t *translator) ID() component.ID { + return component.NewIDWithName(t.factory.Type(), "") +} + +func (t *translator) Translate(_ *confmap.Conf) (component.Config, error) { + return t.factory.CreateDefaultConfig(), nil +} diff --git a/translator/translate/otel/receiver/systemmetrics/translator_test.go b/translator/translate/otel/receiver/systemmetrics/translator_test.go new file mode 100644 index 00000000000..354c968f503 --- /dev/null +++ b/translator/translate/otel/receiver/systemmetrics/translator_test.go @@ -0,0 +1,26 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package systemmetrics + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/aws/amazon-cloudwatch-agent/receiver/systemmetricsreceiver" +) + +func TestTranslator(t *testing.T) { + tt := NewTranslator() + assert.Equal(t, "systemmetrics", tt.ID().String()) + + got, err := tt.Translate(nil) + require.NoError(t, err) + require.NotNil(t, got) + cfg, ok := got.(*systemmetricsreceiver.Config) + require.True(t, ok) + assert.Equal(t, 60*time.Second, cfg.CollectionInterval) +} diff --git a/translator/translate/otel/translate_otel.go b/translator/translate/otel/translate_otel.go index 7c55a6f1752..154e35210b8 100644 --- a/translator/translate/otel/translate_otel.go +++ b/translator/translate/otel/translate_otel.go @@ -33,6 +33,7 @@ import ( "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/pipeline/jmx" "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/pipeline/nop" "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/pipeline/prometheus" + "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/pipeline/systemmetrics" "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/pipeline/xray" "github.com/aws/amazon-cloudwatch-agent/translator/util/ecsutil" ) @@ -77,6 +78,7 @@ func Translate(jsonConfig interface{}, os string) (*otelcol.Config, error) { translators.Set(xray.NewTranslator()) translators.Set(containerinsightsjmx.NewTranslator()) translators.Merge(jmx.NewTranslators(conf)) + translators.Set(systemmetrics.NewTranslator()) translators.Merge(registry) pipelines, err := pipelinetranslator.NewTranslator(translators).Translate(conf) if err != nil { diff --git a/translator/translate/otel/translate_otel_test.go b/translator/translate/otel/translate_otel_test.go index b82acab7e0c..6b2e98c8fa4 100644 --- a/translator/translate/otel/translate_otel_test.go +++ b/translator/translate/otel/translate_otel_test.go @@ -21,6 +21,7 @@ import ( ) func TestTranslator(t *testing.T) { + t.Setenv("SYSTEM_METRICS_ENABLED", "false") agent.Global_Config.Region = "us-east-1" testutil.SetPrometheusRemoteWriteTestingEnv(t) testCases := map[string]struct {