Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cfg/envconfig/envconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ const (
CWOtelConfigContent = "CW_OTEL_CONFIG_CONTENT"
CWAgentMergedOtelConfig = "CWAGENT_MERGED_OTEL_CONFIG"
CWAgentLogsBackpressureMode = "CWAGENT_LOGS_BACKPRESSURE_MODE"
SystemMetricsEnabled = "SYSTEM_METRICS_ENABLED"

// confused deputy prevention related headers
AmzSourceAccount = "AMZ_SOURCE_ACCOUNT" // populates the "x-amz-source-account" header
Expand Down
10 changes: 5 additions & 5 deletions plugins/outputs/cloudwatch/cloudwatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ const (
maxConcurrentPublisher = 10 // the number of CloudWatch clients send request concurrently
defaultForceFlushInterval = time.Minute
highResolutionTagKey = "aws:StorageResolution"
defaultRetryCount = 5 // this is the retry count, the total attempts would be retry count + 1 at most.
defaultRetryCount = 5 // total number of PutMetricData attempts per batch.
backoffRetryBase = 200 * time.Millisecond
MaxDimensions = 30
)
Expand Down Expand Up @@ -93,7 +93,7 @@ func (c *CloudWatch) Capabilities() consumer.Capabilities {
func (c *CloudWatch) Start(_ context.Context, host component.Host) error {
c.publisher, _ = publisher.NewPublisher(
publisher.NewNonBlockingFifoQueue(metricChanBufferSize),
maxConcurrentPublisher,
int64(c.config.MaxConcurrentPublishers),
2*time.Second,
c.WriteToCloudWatch)
credentialConfig := &configaws.CredentialConfig{
Expand Down Expand Up @@ -363,8 +363,8 @@ func (c *CloudWatch) pushMetricDatumBatch() {
// backoffSleep sleeps some amount of time based on number of retries done.
func (c *CloudWatch) backoffSleep() {
d := 1 * time.Minute
if c.retries <= defaultRetryCount {
d = backoffRetryBase * time.Duration(1<<c.retries)
if c.retries <= c.config.MaxRetryCount {
d = c.config.BackoffRetryBase * time.Duration(1<<c.retries)
}
d = (d / 2) + publishJitter(d/2)
log.Printf("W! cloudwatch: %v retries, going to sleep %v ms before retrying.",
Expand Down Expand Up @@ -405,7 +405,7 @@ func (c *CloudWatch) WriteToCloudWatch(req interface{}) {
}

var err error
for i := 0; i < defaultRetryCount; i++ {
for i := 0; i < c.config.MaxRetryCount; i++ {
_, err = c.svc.PutMetricData(params)
if err != nil {
awsErr, ok := err.(awserr.Error)
Expand Down
36 changes: 17 additions & 19 deletions plugins/outputs/cloudwatch/cloudwatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,13 +399,11 @@ func newCloudWatchClient(
svc cloudwatchiface.CloudWatchAPI,
forceFlushInterval time.Duration,
) *CloudWatch {
cfg := createDefaultConfig().(*Config)
cfg.ForceFlushInterval = forceFlushInterval
cloudwatch := &CloudWatch{
svc: svc,
config: &Config{
ForceFlushInterval: forceFlushInterval,
MaxDatumsPerCall: defaultMaxDatumsPerCall,
MaxValuesPerDatum: defaultMaxValuesPerDatum,
},
svc: svc,
config: cfg,
}
cloudwatch.startRoutines()
return cloudwatch
Expand Down Expand Up @@ -528,14 +526,14 @@ func TestMiddleware(t *testing.T) {
w.WriteHeader(http.StatusOK)
}))
defer server.Close()
cfg := createDefaultConfig().(*Config)
cfg.Region = "test-region"
cfg.Namespace = "test-namespace"
cfg.ForceFlushInterval = time.Second
cfg.EndpointOverride = server.URL
cfg.MiddlewareID = &id
cw := &CloudWatch{
config: &Config{
Region: "test-region",
Namespace: "test-namespace",
ForceFlushInterval: time.Second,
EndpointOverride: server.URL,
MiddlewareID: &id,
},
config: cfg,
logger: zap.NewNop(),
}
ctx := context.Background()
Expand All @@ -560,7 +558,7 @@ func TestMiddleware(t *testing.T) {
}

func TestBackoffRetries(t *testing.T) {
c := &CloudWatch{}
c := &CloudWatch{config: createDefaultConfig().(*Config)}
sleeps := []time.Duration{
time.Millisecond * 200,
time.Millisecond * 400,
Expand Down Expand Up @@ -640,7 +638,7 @@ func TestCreateEntityMetricData(t *testing.T) {
func TestWriteToCloudWatchEntity(t *testing.T) {
timestampNow := aws.Time(time.Now())
expectedPMDInput := &cloudwatch.PutMetricDataInput{
Namespace: aws.String(""),
Namespace: aws.String("CWAgent"),
StrictEntityValidation: aws.Bool(false),
EntityMetricData: []*cloudwatch.EntityMetricData{
{
Expand Down Expand Up @@ -748,11 +746,11 @@ func TestUserAgentFeatureFlags(t *testing.T) {
Endpoint: aws.String("http://localhost:12345"),
}))
realSvc := cloudwatch.New(sess)
cfg := createDefaultConfig().(*Config)
cfg.ForceFlushInterval = time.Second
cw := &CloudWatch{
svc: realSvc,
config: &Config{
ForceFlushInterval: time.Second,
},
svc: realSvc,
config: cfg,
logger: zap.NewNop(),
}

Expand Down
7 changes: 7 additions & 0 deletions plugins/outputs/cloudwatch/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ type Config struct {
ResourceToTelemetrySettings resourcetotelemetry.Settings `mapstructure:"resource_to_telemetry_conversion"`
// MiddlewareID is an ID for an extension that can be used to configure the AWS client.
MiddlewareID *component.ID `mapstructure:"middleware,omitempty"`

// MaxRetryCount is the number of retries on PutMetricData failure. Defaults to 5.
MaxRetryCount int `mapstructure:"max_retry_count,omitempty"`
// BackoffRetryBase is the base duration for exponential backoff on retries. Defaults to 200ms.
BackoffRetryBase time.Duration `mapstructure:"backoff_retry_base,omitempty"`
// MaxConcurrentPublishers is the number of concurrent workers making PMD calls. Defaults to 10.
MaxConcurrentPublishers int `mapstructure:"max_concurrent_publishers,omitempty"`
}

var _ component.Config = (*Config)(nil)
Expand Down
11 changes: 7 additions & 4 deletions plugins/outputs/cloudwatch/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,13 @@ func NewFactory() exporter.Factory {

func createDefaultConfig() component.Config {
return &Config{
Namespace: "CWAgent",
MaxDatumsPerCall: defaultMaxDatumsPerCall,
MaxValuesPerDatum: defaultMaxValuesPerDatum,
ForceFlushInterval: defaultForceFlushInterval,
Namespace: "CWAgent",
MaxDatumsPerCall: defaultMaxDatumsPerCall,
MaxValuesPerDatum: defaultMaxValuesPerDatum,
ForceFlushInterval: defaultForceFlushInterval,
MaxRetryCount: defaultRetryCount,
BackoffRetryBase: backoffRetryBase,
MaxConcurrentPublishers: maxConcurrentPublisher,
ResourceToTelemetrySettings: resourcetotelemetry.Settings{
Enabled: true,
},
Expand Down
15 changes: 15 additions & 0 deletions receiver/systemmetricsreceiver/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT

package systemmetricsreceiver

import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/scraper/scraperhelper"
)

type Config struct {
scraperhelper.ControllerConfig `mapstructure:",squash"`
}

var _ component.Config = (*Config)(nil)
61 changes: 61 additions & 0 deletions receiver/systemmetricsreceiver/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT

package systemmetricsreceiver

import (
"context"
"math/rand"
"time"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/receiver"
otelscraper "go.opentelemetry.io/collector/scraper"
"go.opentelemetry.io/collector/scraper/scraperhelper"
)

const (
typeStr = "systemmetrics"
defaultCollectionInterval = 60 * time.Second
maxInitialJitter = 60 * time.Second
stability = component.StabilityLevelAlpha
)

var Type = component.MustNewType(typeStr)

func NewFactory() receiver.Factory {
return receiver.NewFactory(
Type,
createDefaultConfig,
receiver.WithMetrics(createMetricsReceiver, stability),
)
}

func createDefaultConfig() component.Config {
cfg := &Config{
ControllerConfig: scraperhelper.NewDefaultControllerConfig(),
}
cfg.CollectionInterval = defaultCollectionInterval
return cfg
}

func createMetricsReceiver(
_ context.Context,
settings receiver.Settings,
baseCfg component.Config,
consumer consumer.Metrics,
) (receiver.Metrics, error) {
cfg := baseCfg.(*Config)
// Jitter the initial delay to stagger scrape start across hosts
cfg.InitialDelay = cfg.InitialDelay + time.Duration(rand.Int63n(int64(maxInitialJitter))) //nolint:gosec
s := newScraper(settings.Logger)
scraper, err := otelscraper.NewMetrics(s.scrape, otelscraper.WithStart(s.start), otelscraper.WithShutdown(s.shutdown))
if err != nil {
return nil, err
}
return scraperhelper.NewMetricsController(
&cfg.ControllerConfig, settings, consumer,
scraperhelper.AddScraper(Type, scraper),
)
}
38 changes: 38 additions & 0 deletions receiver/systemmetricsreceiver/factory_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT

package systemmetricsreceiver

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/receiver/receivertest"
)

func TestCreateDefaultConfig(t *testing.T) {
cfg := createDefaultConfig().(*Config)
assert.NotNil(t, cfg)
assert.Equal(t, 60*time.Second, cfg.CollectionInterval)
}

func TestCreateMetricsReceiver(t *testing.T) {
cfg := createDefaultConfig().(*Config)
receiver, err := createMetricsReceiver(
context.Background(),
receivertest.NewNopSettings(Type),
cfg,
consumertest.NewNop(),
)
require.NoError(t, err)
require.NotNil(t, receiver)
}

func TestNewFactory(t *testing.T) {
f := NewFactory()
assert.Equal(t, Type, f.Type())
}
42 changes: 42 additions & 0 deletions receiver/systemmetricsreceiver/mock_ps_linux_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
//go:build linux

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT

package systemmetricsreceiver

import (
"context"

"github.com/shirou/gopsutil/v3/cpu"
"github.com/shirou/gopsutil/v3/disk"
"github.com/shirou/gopsutil/v3/mem"
)

// MockPS implements PS for testing.
type MockPS struct {
CPUTimesData []cpu.TimesStat
CPUTimesErr error
VMStatData *mem.VirtualMemoryStat
VMStatErr error
DiskUsageData []*disk.UsageStat
DiskUsageErr error
EthtoolStatsData map[string]uint64
EthtoolStatsErr error
}

func (m *MockPS) CPUTimes(_ context.Context) ([]cpu.TimesStat, error) {
return m.CPUTimesData, m.CPUTimesErr
}

func (m *MockPS) VMStat(_ context.Context) (*mem.VirtualMemoryStat, error) {
return m.VMStatData, m.VMStatErr
}

func (m *MockPS) DiskUsage(_ context.Context) ([]*disk.UsageStat, error) {
return m.DiskUsageData, m.DiskUsageErr
}

func (m *MockPS) EthtoolStats(_ context.Context, _ string) (map[string]uint64, error) {
return m.EthtoolStatsData, m.EthtoolStatsErr
}
78 changes: 78 additions & 0 deletions receiver/systemmetricsreceiver/scraper_cpu_linux.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
//go:build linux

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT

package systemmetricsreceiver

import (
"context"
"time"

"github.com/shirou/gopsutil/v3/cpu"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/zap"
)

const metricCPUIOWaitTime = "cpu_time_iowait"

type cpuScraper struct {
logger *zap.Logger
ps PS
prevStat *cpu.TimesStat
}

func newCPUScraper(logger *zap.Logger, ps PS) *cpuScraper {
return &cpuScraper{
logger: logger,
ps: ps,
}
}

func (s *cpuScraper) Name() string { return "cpu" }

func (s *cpuScraper) Scrape(ctx context.Context, metrics pmetric.Metrics) error {
times, err := s.ps.CPUTimes(ctx)
if err != nil {
s.logger.Debug("Failed to read CPU times", zap.Error(err))
return nil
}
if len(times) == 0 {
return nil
}

cur := times[0]

if s.prevStat == nil {
s.prevStat = &cur
return nil
}

prev := s.prevStat
s.prevStat = &cur

curTotal := cpuTotal(cur)
prevTotal := cpuTotal(*prev)
deltaTotal := curTotal - prevTotal
if deltaTotal <= 0 {
return nil
}

iowaitDelta := cur.Iowait - prev.Iowait
if iowaitDelta < 0 {
return nil
}
iowaitPct := 100 * iowaitDelta / deltaTotal

now := pcommon.NewTimestampFromTime(time.Now())
rm := metrics.ResourceMetrics().AppendEmpty()
sm := rm.ScopeMetrics().AppendEmpty()
addGaugeDP(sm.Metrics().AppendEmpty(), metricCPUIOWaitTime, "Percent", iowaitPct, now)
return nil
}

// cpuTotal sums the 8 base CPU states (excludes Guest/GuestNice to avoid double-counting).
func cpuTotal(t cpu.TimesStat) float64 {
return t.User + t.System + t.Nice + t.Iowait + t.Irq + t.Softirq + t.Steal + t.Idle
}
Loading
Loading