Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 6 additions & 33 deletions cmd/amazon-cloudwatch-agent/amazon-cloudwatch-agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (
"github.com/aws/amazon-cloudwatch-agent/cmd/amazon-cloudwatch-agent/internal"
"github.com/aws/amazon-cloudwatch-agent/extension/agenthealth/handler/useragent"
"github.com/aws/amazon-cloudwatch-agent/internal/mapstructure"
"github.com/aws/amazon-cloudwatch-agent/internal/merge/confmap"
"github.com/aws/amazon-cloudwatch-agent/internal/version"
cwaLogger "github.com/aws/amazon-cloudwatch-agent/logger"
"github.com/aws/amazon-cloudwatch-agent/logs"
Expand Down Expand Up @@ -348,11 +347,16 @@ func runAgent(ctx context.Context,

otelConfigs := fOtelConfigs
// try merging configs together, will return nil if nothing to merge
merged, err := mergeConfigs(otelConfigs)
merged, err := mergeConfigs(otelConfigs, envconfig.IsUsageDataEnabled())
if err != nil {
return err
}
if merged != nil {
if _, err = os.Stat(paths.YamlConfigPath); err == nil {
useragent.Get().AddFeatureFlags(featureFlagOtelMergeJSON)
} else {
useragent.Get().AddFeatureFlags(featureFlagOtelMergeYAML)
}
_ = os.Setenv(envconfig.CWAgentMergedOtelConfig, toyamlconfig.ToYamlConfig(merged.ToStringMap()))
otelConfigs = []string{"env:" + envconfig.CWAgentMergedOtelConfig}
} else {
Expand Down Expand Up @@ -418,37 +422,6 @@ func getCollectorParams(factories otelcol.Factories, providerSettings otelcol.Co
}
}

// mergeConfigs tries to merge configurations together. If nothing to merge, returns nil without an error.
func mergeConfigs(configPaths []string) (*confmap.Conf, error) {
var loaders []confmap.Loader
if envconfig.IsRunningInContainer() {
content, ok := os.LookupEnv(envconfig.CWOtelConfigContent)
if ok && len(content) > 0 {
log.Printf("D! Merging OTEL configuration from: %s", envconfig.CWOtelConfigContent)
loaders = append(loaders, confmap.NewByteLoader(envconfig.CWOtelConfigContent, []byte(content)))
}
}
// If using environment variable or passing in more than one config
if len(loaders) > 0 || len(configPaths) > 1 {
log.Printf("D! Merging OTEL configurations from: %s", configPaths)
for _, configPath := range configPaths {
loaders = append(loaders, confmap.NewFileLoader(configPath))
}
result := confmap.New()
for _, loader := range loaders {
conf, err := loader.Load()
if err != nil {
return nil, fmt.Errorf("failed to load OTEL configs: %w", err)
}
if err = result.Merge(conf); err != nil {
return nil, fmt.Errorf("failed to merge OTEL configs: %w", err)
}
}
return result, nil
}
return nil, nil
}

func components(telegrafConfig *config.Config) (otelcol.Factories, error) {
telegrafAdapter := adapter.NewAdapter(telegrafConfig)

Expand Down
96 changes: 0 additions & 96 deletions cmd/amazon-cloudwatch-agent/amazon-cloudwatch-agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ import (
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"github.com/aws/amazon-cloudwatch-agent/cfg/envconfig"
"github.com/aws/amazon-cloudwatch-agent/internal/merge/confmap"
"github.com/aws/amazon-cloudwatch-agent/logger"
"github.com/aws/amazon-cloudwatch-agent/tool/paths"
)
Expand Down Expand Up @@ -67,94 +65,6 @@ func Test_getCollectorParams(t *testing.T) {
}
}

func TestMergeConfigs(t *testing.T) {
testEnvValue := `receivers:
nop/1:
exporters:
nop:
extensions:
nop:
service:
extensions: [nop]
pipelines:
traces/1:
receivers: [nop/1]
exporters: [nop]
`
testCases := map[string]struct {
input []string
isContainer bool
envValue string
want *confmap.Conf
wantErr bool
}{
"WithInvalidFile": {
input: []string{filepath.Join("not", "a", "file"), filepath.Join("testdata", "base.yaml")},
wantErr: true,
},
"WithNoMerge": {
input: []string{filepath.Join("testdata", "base.yaml")},
wantErr: false,
},
"WithoutEnv/Container": {
input: []string{filepath.Join("testdata", "base.yaml"), filepath.Join("testdata", "merge.yaml")},
isContainer: true,
want: mustLoadFromFile(t, filepath.Join("testdata", "base+merge.yaml")),
},
"WithEnv/NonContainer": {
input: []string{filepath.Join("testdata", "base.yaml"), filepath.Join("testdata", "merge.yaml")},
isContainer: false,
envValue: testEnvValue,
want: mustLoadFromFile(t, filepath.Join("testdata", "base+merge.yaml")),
},
"WithEnv/Container": {
input: []string{filepath.Join("testdata", "base.yaml")},
isContainer: true,
envValue: testEnvValue,
want: mustLoadFromFile(t, filepath.Join("testdata", "base+env.yaml")),
},
"WithEmptyEnv/Container": {
input: []string{filepath.Join("testdata", "base.yaml")},
isContainer: true,
envValue: "",
want: nil,
wantErr: false,
},
"WithInvalidEnv/Container": {
input: []string{filepath.Join("testdata", "base.yaml")},
isContainer: true,
envValue: "test",
wantErr: true,
},
"WithEnv/Container/MultipleFiles": {
input: []string{filepath.Join("testdata", "base.yaml"), filepath.Join("testdata", "merge.yaml")},
isContainer: true,
envValue: testEnvValue,
want: mustLoadFromFile(t, filepath.Join("testdata", "base+merge+env.yaml")),
},
}
for name, testCase := range testCases {
t.Run(name, func(t *testing.T) {
if testCase.isContainer {
t.Setenv(envconfig.RunInContainer, envconfig.TrueValue)
}
t.Setenv(envconfig.CWOtelConfigContent, testCase.envValue)
got, err := mergeConfigs(testCase.input)
if testCase.wantErr {
assert.Error(t, err)
assert.Nil(t, got)
} else if testCase.want == nil {
assert.NoError(t, err)
assert.Nil(t, got)
} else {
assert.NoError(t, err)
assert.NotNil(t, got)
assert.Equal(t, testCase.want.ToStringMap(), got.ToStringMap())
}
})
}
}

func TestFallbackOtelConfig(t *testing.T) {
defaultYamlRelativePath := filepath.Join("default", paths.YAML)
testCases := map[string]struct {
Expand Down Expand Up @@ -200,9 +110,3 @@ func TestFallbackOtelConfig(t *testing.T) {
})
}
}

func mustLoadFromFile(t *testing.T, path string) *confmap.Conf {
conf, err := confmap.NewFileLoader(path).Load()
require.NoError(t, err)
return conf
}
215 changes: 215 additions & 0 deletions cmd/amazon-cloudwatch-agent/merge.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT

package main

import (
"errors"
"fmt"
"log"
"os"
"slices"
"strings"

"github.com/aws/amazon-cloudwatch-agent/cfg/envconfig"
"github.com/aws/amazon-cloudwatch-agent/internal/merge/confmap"
agenthealthtranslator "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/extension/agenthealth"
)

const (
featureFlagOtelMergeYAML = "otel_merge_yaml"
featureFlagOtelMergeJSON = "otel_merge_json"
)

// mergeConfigs merges multiple OTEL configs together, including any config
// provided via the CW_CONFIG_CONTENT environment variable when running in a
// container. Returns nil without an error if there is nothing to merge (i.e.
// a single config path with no env override). In practice, a single config
// means no custom YAML was provided — the default agent YAML is always
// accompanied by at least one custom YAML when custom configs are in use.
func mergeConfigs(configPaths []string, isUsageDataEnabled bool) (*confmap.Conf, error) {
var loaders []confmap.Loader
if envconfig.IsRunningInContainer() {
content, ok := os.LookupEnv(envconfig.CWOtelConfigContent)
if ok && len(content) > 0 {
log.Printf("D! Merging OTEL configuration from: %s", envconfig.CWOtelConfigContent)
loaders = append(loaders, confmap.NewByteLoader(envconfig.CWOtelConfigContent, []byte(content)))
}
}
// If using environment variable or passing in more than one config
if len(loaders) > 0 || len(configPaths) > 1 {
log.Printf("D! Merging OTEL configurations from: %s", configPaths)
for _, configPath := range configPaths {
loaders = append(loaders, confmap.NewFileLoader(configPath))
}
var result *confmap.Conf
for _, loader := range loaders {
conf, err := loader.Load()
if err != nil {
if errors.Is(err, os.ErrNotExist) {
log.Printf("D! Skipping non-existent OTEL config: %s", loader.ID())
continue
}
return nil, fmt.Errorf("failed to load OTEL configs: %w", err)
}
if result == nil {
result = confmap.New()
}
if err = result.Merge(conf); err != nil {
return nil, fmt.Errorf("failed to merge OTEL configs: %w", err)
}
}
return mergeAgentHealth(result, isUsageDataEnabled), nil
}
return nil, nil
}

type exporterInfo struct {
middlewareID string
operations []any
}

var logsExporterInfo = exporterInfo{middlewareID: agenthealthtranslator.LogsID.String(), operations: []any{agenthealthtranslator.OperationPutLogEvents}}

// supportedExporters maps exporter type names to their agenthealth middleware ID and operations.
var supportedExporters = map[string]exporterInfo{
"awscloudwatch": {middlewareID: agenthealthtranslator.MetricsID.String(), operations: []any{agenthealthtranslator.OperationPutMetricData}},
"awsemf": logsExporterInfo,
"awscloudwatchlogs": logsExporterInfo,
"awsxray": {middlewareID: agenthealthtranslator.TracesID.String(), operations: []any{agenthealthtranslator.OperationPutTraceSegments}},
}

// mergeAgentHealth scans the exporters in the config for supported AWS exporters
// and adds the appropriate agenthealth extension with a middleware reference to each.
// It also detects otlphttp exporters and sets their auth.authenticator to an
// agenthealth extension, chaining with any existing auth extension.
func mergeAgentHealth(conf *confmap.Conf, isUsageDataEnabled bool) *confmap.Conf {
if conf == nil || !isUsageDataEnabled {
return conf
}

cfgMap := conf.ToStringMap()

exporters, ok := cfgMap["exporters"].(map[string]any)
if !ok {
return conf
}

// Track which agenthealth extensions are needed for AWS exporters
neededExtensions := make(map[string]exporterInfo)
for key := range exporters {
typeName, _, _ := strings.Cut(key, "/")
info, ok := supportedExporters[typeName]
if !ok {
continue
}
exporterCfg, ok := exporters[key].(map[string]any)
if !ok || exporterCfg == nil {
exporterCfg = make(map[string]any)
exporters[key] = exporterCfg
}
if _, alreadySet := exporterCfg["middleware"]; !alreadySet {
exporterCfg["middleware"] = info.middlewareID
neededExtensions[info.middlewareID] = info
}
}

// Detect otlphttp exporters for auth-based agenthealth integration
type otlphttpAuthEntry struct {
exporterKey string
ahExtName string
additionalAuth string
}
var otlphttpEntries []otlphttpAuthEntry
for key := range exporters {
typeName, suffix, hasSuffix := strings.Cut(key, "/")
if typeName != "otlphttp" {
continue
}
ahName := "agenthealth/otlphttp"
if hasSuffix {
ahName = "agenthealth/otlphttp_" + suffix
}
exporterCfg, ok := exporters[key].(map[string]any)
if !ok || exporterCfg == nil {
exporterCfg = make(map[string]any)
exporters[key] = exporterCfg
}
// Skip if already using an agenthealth auth extension
var additionalAuth string
if authMap, ok := exporterCfg["auth"].(map[string]any); ok {
if authn, ok := authMap["authenticator"].(string); ok {
if strings.HasPrefix(authn, "agenthealth/") {
continue
}
additionalAuth = authn
}
}
otlphttpEntries = append(otlphttpEntries, otlphttpAuthEntry{
exporterKey: key,
ahExtName: ahName,
additionalAuth: additionalAuth,
})
}

if len(neededExtensions) == 0 && len(otlphttpEntries) == 0 {
return conf
}

// Ensure extensions section exists
extensions, _ := cfgMap["extensions"].(map[string]any)
if extensions == nil {
extensions = make(map[string]any)
cfgMap["extensions"] = extensions
}

// Ensure service section exists
service, _ := cfgMap["service"].(map[string]any)
if service == nil {
service = make(map[string]any)
cfgMap["service"] = service
}

var svcExtensions []any
if existing, ok := service["extensions"].([]any); ok {
svcExtensions = existing
}

for middlewareID, info := range neededExtensions {
if _, exists := extensions[middlewareID]; !exists {
extensions[middlewareID] = map[string]any{
"is_usage_data_enabled": true,
"stats": map[string]any{
"operations": info.operations,
},
}
}
if !slices.Contains(svcExtensions, any(middlewareID)) {
svcExtensions = append(svcExtensions, middlewareID)
}
}

// Configure agenthealth auth for otlphttp exporters
for _, entry := range otlphttpEntries {
exporterCfg := exporters[entry.exporterKey].(map[string]any)
exporterCfg["auth"] = map[string]any{"authenticator": entry.ahExtName}
if _, exists := extensions[entry.ahExtName]; !exists {
extCfg := map[string]any{
"is_usage_data_enabled": true,
"stats": map[string]any{
"operations": []any{"*"},
},
}
if entry.additionalAuth != "" {
extCfg["additional_auth"] = entry.additionalAuth
}
extensions[entry.ahExtName] = extCfg
}
if !slices.Contains(svcExtensions, any(entry.ahExtName)) {
svcExtensions = append(svcExtensions, entry.ahExtName)
}
}

service["extensions"] = svcExtensions
return confmap.NewFromStringMap(cfgMap)
}
Loading
Loading