Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/PR-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ on:
branches:
- main*
- feature*
- feature/**
types:
- opened
- synchronize
Expand Down
46 changes: 6 additions & 40 deletions cmd/amazon-cloudwatch-agent/amazon-cloudwatch-agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (
"github.com/aws/amazon-cloudwatch-agent/cmd/amazon-cloudwatch-agent/internal"
"github.com/aws/amazon-cloudwatch-agent/extension/agenthealth/handler/useragent"
"github.com/aws/amazon-cloudwatch-agent/internal/mapstructure"
"github.com/aws/amazon-cloudwatch-agent/internal/merge/confmap"
"github.com/aws/amazon-cloudwatch-agent/internal/version"
cwaLogger "github.com/aws/amazon-cloudwatch-agent/logger"
"github.com/aws/amazon-cloudwatch-agent/logs"
Expand Down Expand Up @@ -348,11 +347,16 @@ func runAgent(ctx context.Context,

otelConfigs := fOtelConfigs
// try merging configs together, will return nil if nothing to merge
merged, err := mergeConfigs(otelConfigs)
merged, err := mergeConfigs(otelConfigs, envconfig.IsUsageDataEnabled())
if err != nil {
return err
}
if merged != nil {
if _, err = os.Stat(paths.YamlConfigPath); err == nil {
useragent.Get().AddFeatureFlags(featureFlagOtelMergeJSON)
} else {
useragent.Get().AddFeatureFlags(featureFlagOtelMergeYAML)
}
_ = os.Setenv(envconfig.CWAgentMergedOtelConfig, toyamlconfig.ToYamlConfig(merged.ToStringMap()))
otelConfigs = []string{"env:" + envconfig.CWAgentMergedOtelConfig}
} else {
Expand Down Expand Up @@ -418,44 +422,6 @@ func getCollectorParams(factories otelcol.Factories, providerSettings otelcol.Co
}
}

// mergeConfigs tries to merge configurations together. If nothing to merge, returns nil without an error.
func mergeConfigs(configPaths []string) (*confmap.Conf, error) {
var loaders []confmap.Loader
if envconfig.IsRunningInContainer() {
content, ok := os.LookupEnv(envconfig.CWOtelConfigContent)
if ok && len(content) > 0 {
log.Printf("D! Merging OTEL configuration from: %s", envconfig.CWOtelConfigContent)
loaders = append(loaders, confmap.NewByteLoader(envconfig.CWOtelConfigContent, []byte(content)))
}
}
// If using environment variable or passing in more than one config
if len(loaders) > 0 || len(configPaths) > 1 {
log.Printf("D! Merging OTEL configurations from: %s", configPaths)
for _, configPath := range configPaths {
loaders = append(loaders, confmap.NewFileLoader(configPath))
}
var result *confmap.Conf
for _, loader := range loaders {
conf, err := loader.Load()
if err != nil {
if errors.Is(err, os.ErrNotExist) {
log.Printf("D! Skipping non-existent OTEL config: %s", loader.ID())
continue
}
return nil, fmt.Errorf("failed to load OTEL configs: %w", err)
}
if result == nil {
result = confmap.New()
}
if err = result.Merge(conf); err != nil {
return nil, fmt.Errorf("failed to merge OTEL configs: %w", err)
}
}
return result, nil
}
return nil, nil
}

func components(telegrafConfig *config.Config) (otelcol.Factories, error) {
telegrafAdapter := adapter.NewAdapter(telegrafConfig)

Expand Down
104 changes: 0 additions & 104 deletions cmd/amazon-cloudwatch-agent/amazon-cloudwatch-agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ import (
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"github.com/aws/amazon-cloudwatch-agent/cfg/envconfig"
"github.com/aws/amazon-cloudwatch-agent/internal/merge/confmap"
"github.com/aws/amazon-cloudwatch-agent/logger"
"github.com/aws/amazon-cloudwatch-agent/tool/paths"
)
Expand Down Expand Up @@ -67,102 +65,6 @@ func Test_getCollectorParams(t *testing.T) {
}
}

func TestMergeConfigs(t *testing.T) {
testEnvValue := `receivers:
nop/1:
exporters:
nop:
extensions:
nop:
service:
extensions: [nop]
pipelines:
traces/1:
receivers: [nop/1]
exporters: [nop]
`
testCases := map[string]struct {
input []string
isContainer bool
envValue string
want *confmap.Conf
wantErr bool
}{
"WithInvalidFile": {
input: []string{filepath.Join("testdata", "invalid.yaml"), filepath.Join("testdata", "base.yaml")},
wantErr: true,
},
"WithAllMissingFiles": {
input: []string{filepath.Join("not", "a", "file"), filepath.Join("also", "not", "a", "file")},
want: nil,
},
"WithMissingFile": {
input: []string{filepath.Join("not", "a", "file"), filepath.Join("testdata", "base.yaml")},
want: mustLoadFromFile(t, filepath.Join("testdata", "base.yaml")),
},
"WithNoMerge": {
input: []string{filepath.Join("testdata", "base.yaml")},
wantErr: false,
},
"WithoutEnv/Container": {
input: []string{filepath.Join("testdata", "base.yaml"), filepath.Join("testdata", "merge.yaml")},
isContainer: true,
want: mustLoadFromFile(t, filepath.Join("testdata", "base+merge.yaml")),
},
"WithEnv/NonContainer": {
input: []string{filepath.Join("testdata", "base.yaml"), filepath.Join("testdata", "merge.yaml")},
isContainer: false,
envValue: testEnvValue,
want: mustLoadFromFile(t, filepath.Join("testdata", "base+merge.yaml")),
},
"WithEnv/Container": {
input: []string{filepath.Join("testdata", "base.yaml")},
isContainer: true,
envValue: testEnvValue,
want: mustLoadFromFile(t, filepath.Join("testdata", "base+env.yaml")),
},
"WithEmptyEnv/Container": {
input: []string{filepath.Join("testdata", "base.yaml")},
isContainer: true,
envValue: "",
want: nil,
wantErr: false,
},
"WithInvalidEnv/Container": {
input: []string{filepath.Join("testdata", "base.yaml")},
isContainer: true,
envValue: "test",
wantErr: true,
},
"WithEnv/Container/MultipleFiles": {
input: []string{filepath.Join("testdata", "base.yaml"), filepath.Join("testdata", "merge.yaml")},
isContainer: true,
envValue: testEnvValue,
want: mustLoadFromFile(t, filepath.Join("testdata", "base+merge+env.yaml")),
},
}
for name, testCase := range testCases {
t.Run(name, func(t *testing.T) {
if testCase.isContainer {
t.Setenv(envconfig.RunInContainer, envconfig.TrueValue)
}
t.Setenv(envconfig.CWOtelConfigContent, testCase.envValue)
got, err := mergeConfigs(testCase.input)
if testCase.wantErr {
assert.Error(t, err)
assert.Nil(t, got)
} else if testCase.want == nil {
assert.NoError(t, err)
assert.Nil(t, got)
} else {
assert.NoError(t, err)
assert.NotNil(t, got)
assert.Equal(t, testCase.want.ToStringMap(), got.ToStringMap())
}
})
}
}

func TestFallbackOtelConfig(t *testing.T) {
defaultYamlRelativePath := filepath.Join("default", paths.YAML)
testCases := map[string]struct {
Expand Down Expand Up @@ -208,9 +110,3 @@ func TestFallbackOtelConfig(t *testing.T) {
})
}
}

func mustLoadFromFile(t *testing.T, path string) *confmap.Conf {
conf, err := confmap.NewFileLoader(path).Load()
require.NoError(t, err)
return conf
}
154 changes: 154 additions & 0 deletions cmd/amazon-cloudwatch-agent/merge.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT

package main

import (
"errors"
"fmt"
"log"
"os"
"slices"
"strings"

"github.com/aws/amazon-cloudwatch-agent/cfg/envconfig"
"github.com/aws/amazon-cloudwatch-agent/internal/merge/confmap"
agenthealthtranslator "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/extension/agenthealth"
)

const (
featureFlagOtelMergeYAML = "otel_merge_yaml"
featureFlagOtelMergeJSON = "otel_merge_json"
)

// mergeConfigs merges multiple OTEL configs together, including any config
// provided via the CW_CONFIG_CONTENT environment variable when running in a
// container. Returns nil without an error if there is nothing to merge (i.e.
// a single config path with no env override). In practice, a single config
// means no custom YAML was provided — the default agent YAML is always
// accompanied by at least one custom YAML when custom configs are in use.
func mergeConfigs(configPaths []string, isUsageDataEnabled bool) (*confmap.Conf, error) {
var loaders []confmap.Loader
if envconfig.IsRunningInContainer() {
content, ok := os.LookupEnv(envconfig.CWOtelConfigContent)
if ok && len(content) > 0 {
log.Printf("D! Merging OTEL configuration from: %s", envconfig.CWOtelConfigContent)
loaders = append(loaders, confmap.NewByteLoader(envconfig.CWOtelConfigContent, []byte(content)))
}
}
// If using environment variable or passing in more than one config
if len(loaders) > 0 || len(configPaths) > 1 {
log.Printf("D! Merging OTEL configurations from: %s", configPaths)
for _, configPath := range configPaths {
loaders = append(loaders, confmap.NewFileLoader(configPath))
}
var result *confmap.Conf
for _, loader := range loaders {
conf, err := loader.Load()
if err != nil {
if errors.Is(err, os.ErrNotExist) {
log.Printf("D! Skipping non-existent OTEL config: %s", loader.ID())
continue
}
return nil, fmt.Errorf("failed to load OTEL configs: %w", err)
}
if result == nil {
result = confmap.New()
}
if err = result.Merge(conf); err != nil {
return nil, fmt.Errorf("failed to merge OTEL configs: %w", err)
}
}
return mergeAgentHealth(result, isUsageDataEnabled), nil
}
return nil, nil
}

type exporterInfo struct {
middlewareID string
operations []any
}

var logsExporterInfo = exporterInfo{middlewareID: agenthealthtranslator.LogsID.String(), operations: []any{agenthealthtranslator.OperationPutLogEvents}}

// supportedExporters maps exporter type names to their agenthealth middleware ID and operations.
var supportedExporters = map[string]exporterInfo{
"awscloudwatch": {middlewareID: agenthealthtranslator.MetricsID.String(), operations: []any{agenthealthtranslator.OperationPutMetricData}},
"awsemf": logsExporterInfo,
"awscloudwatchlogs": logsExporterInfo,
"awsxray": {middlewareID: agenthealthtranslator.TracesID.String(), operations: []any{agenthealthtranslator.OperationPutTraceSegments}},
}

// mergeAgentHealth scans the exporters in the config for supported AWS exporters
// and adds the appropriate agenthealth extension with a middleware reference to each.
func mergeAgentHealth(conf *confmap.Conf, isUsageDataEnabled bool) *confmap.Conf {
if conf == nil || !isUsageDataEnabled {
return conf
}

cfgMap := conf.ToStringMap()

exporters, ok := cfgMap["exporters"].(map[string]any)
if !ok {
return conf
}

// Track which agenthealth extensions are needed
neededExtensions := make(map[string]exporterInfo)
for key := range exporters {
typeName, _, _ := strings.Cut(key, "/")
info, ok := supportedExporters[typeName]
if !ok {
continue
}
exporterCfg, ok := exporters[key].(map[string]any)
if !ok || exporterCfg == nil {
exporterCfg = make(map[string]any)
exporters[key] = exporterCfg
}
if _, alreadySet := exporterCfg["middleware"]; !alreadySet {
exporterCfg["middleware"] = info.middlewareID
neededExtensions[info.middlewareID] = info
}
}

if len(neededExtensions) == 0 {
return conf
}

// Ensure extensions section exists
extensions, _ := cfgMap["extensions"].(map[string]any)
if extensions == nil {
extensions = make(map[string]any)
cfgMap["extensions"] = extensions
}

// Ensure service section exists
service, _ := cfgMap["service"].(map[string]any)
if service == nil {
service = make(map[string]any)
cfgMap["service"] = service
}

var svcExtensions []any
if existing, ok := service["extensions"].([]any); ok {
svcExtensions = existing
}

for middlewareID, info := range neededExtensions {
if _, exists := extensions[middlewareID]; !exists {
extensions[middlewareID] = map[string]any{
"is_usage_data_enabled": true,
"stats": map[string]any{
"operations": info.operations,
},
}
}
if !slices.Contains(svcExtensions, any(middlewareID)) {
svcExtensions = append(svcExtensions, middlewareID)
}
}

service["extensions"] = svcExtensions
return confmap.NewFromStringMap(cfgMap)
}
Loading