Skip to content

Commit 7ccd4f6

Browse files
committed
Add agent health during merge (#2039)
1 parent 2958697 commit 7ccd4f6

6 files changed

Lines changed: 540 additions & 144 deletions

File tree

cmd/amazon-cloudwatch-agent/amazon-cloudwatch-agent.go

Lines changed: 6 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ import (
3737
"github.com/aws/amazon-cloudwatch-agent/cmd/amazon-cloudwatch-agent/internal"
3838
"github.com/aws/amazon-cloudwatch-agent/extension/agenthealth/handler/useragent"
3939
"github.com/aws/amazon-cloudwatch-agent/internal/mapstructure"
40-
"github.com/aws/amazon-cloudwatch-agent/internal/merge/confmap"
4140
"github.com/aws/amazon-cloudwatch-agent/internal/version"
4241
cwaLogger "github.com/aws/amazon-cloudwatch-agent/logger"
4342
"github.com/aws/amazon-cloudwatch-agent/logs"
@@ -348,11 +347,16 @@ func runAgent(ctx context.Context,
348347

349348
otelConfigs := fOtelConfigs
350349
// try merging configs together, will return nil if nothing to merge
351-
merged, err := mergeConfigs(otelConfigs)
350+
merged, err := mergeConfigs(otelConfigs, envconfig.IsUsageDataEnabled())
352351
if err != nil {
353352
return err
354353
}
355354
if merged != nil {
355+
if _, err = os.Stat(paths.YamlConfigPath); err == nil {
356+
useragent.Get().AddFeatureFlags(featureFlagOtelMergeJSON)
357+
} else {
358+
useragent.Get().AddFeatureFlags(featureFlagOtelMergeYAML)
359+
}
356360
_ = os.Setenv(envconfig.CWAgentMergedOtelConfig, toyamlconfig.ToYamlConfig(merged.ToStringMap()))
357361
otelConfigs = []string{"env:" + envconfig.CWAgentMergedOtelConfig}
358362
} else {
@@ -418,44 +422,6 @@ func getCollectorParams(factories otelcol.Factories, providerSettings otelcol.Co
418422
}
419423
}
420424

421-
// mergeConfigs tries to merge configurations together. If nothing to merge, returns nil without an error.
422-
func mergeConfigs(configPaths []string) (*confmap.Conf, error) {
423-
var loaders []confmap.Loader
424-
if envconfig.IsRunningInContainer() {
425-
content, ok := os.LookupEnv(envconfig.CWOtelConfigContent)
426-
if ok && len(content) > 0 {
427-
log.Printf("D! Merging OTEL configuration from: %s", envconfig.CWOtelConfigContent)
428-
loaders = append(loaders, confmap.NewByteLoader(envconfig.CWOtelConfigContent, []byte(content)))
429-
}
430-
}
431-
// If using environment variable or passing in more than one config
432-
if len(loaders) > 0 || len(configPaths) > 1 {
433-
log.Printf("D! Merging OTEL configurations from: %s", configPaths)
434-
for _, configPath := range configPaths {
435-
loaders = append(loaders, confmap.NewFileLoader(configPath))
436-
}
437-
var result *confmap.Conf
438-
for _, loader := range loaders {
439-
conf, err := loader.Load()
440-
if err != nil {
441-
if errors.Is(err, os.ErrNotExist) {
442-
log.Printf("D! Skipping non-existent OTEL config: %s", loader.ID())
443-
continue
444-
}
445-
return nil, fmt.Errorf("failed to load OTEL configs: %w", err)
446-
}
447-
if result == nil {
448-
result = confmap.New()
449-
}
450-
if err = result.Merge(conf); err != nil {
451-
return nil, fmt.Errorf("failed to merge OTEL configs: %w", err)
452-
}
453-
}
454-
return result, nil
455-
}
456-
return nil, nil
457-
}
458-
459425
func components(telegrafConfig *config.Config) (otelcol.Factories, error) {
460426
telegrafAdapter := adapter.NewAdapter(telegrafConfig)
461427

cmd/amazon-cloudwatch-agent/amazon-cloudwatch-agent_test.go

Lines changed: 0 additions & 104 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@ import (
1717
"go.uber.org/zap"
1818
"go.uber.org/zap/zapcore"
1919

20-
"github.com/aws/amazon-cloudwatch-agent/cfg/envconfig"
21-
"github.com/aws/amazon-cloudwatch-agent/internal/merge/confmap"
2220
"github.com/aws/amazon-cloudwatch-agent/logger"
2321
"github.com/aws/amazon-cloudwatch-agent/tool/paths"
2422
)
@@ -67,102 +65,6 @@ func Test_getCollectorParams(t *testing.T) {
6765
}
6866
}
6967

70-
func TestMergeConfigs(t *testing.T) {
71-
testEnvValue := `receivers:
72-
nop/1:
73-
exporters:
74-
nop:
75-
extensions:
76-
nop:
77-
service:
78-
extensions: [nop]
79-
pipelines:
80-
traces/1:
81-
receivers: [nop/1]
82-
exporters: [nop]
83-
`
84-
testCases := map[string]struct {
85-
input []string
86-
isContainer bool
87-
envValue string
88-
want *confmap.Conf
89-
wantErr bool
90-
}{
91-
"WithInvalidFile": {
92-
input: []string{filepath.Join("testdata", "invalid.yaml"), filepath.Join("testdata", "base.yaml")},
93-
wantErr: true,
94-
},
95-
"WithAllMissingFiles": {
96-
input: []string{filepath.Join("not", "a", "file"), filepath.Join("also", "not", "a", "file")},
97-
want: nil,
98-
},
99-
"WithMissingFile": {
100-
input: []string{filepath.Join("not", "a", "file"), filepath.Join("testdata", "base.yaml")},
101-
want: mustLoadFromFile(t, filepath.Join("testdata", "base.yaml")),
102-
},
103-
"WithNoMerge": {
104-
input: []string{filepath.Join("testdata", "base.yaml")},
105-
wantErr: false,
106-
},
107-
"WithoutEnv/Container": {
108-
input: []string{filepath.Join("testdata", "base.yaml"), filepath.Join("testdata", "merge.yaml")},
109-
isContainer: true,
110-
want: mustLoadFromFile(t, filepath.Join("testdata", "base+merge.yaml")),
111-
},
112-
"WithEnv/NonContainer": {
113-
input: []string{filepath.Join("testdata", "base.yaml"), filepath.Join("testdata", "merge.yaml")},
114-
isContainer: false,
115-
envValue: testEnvValue,
116-
want: mustLoadFromFile(t, filepath.Join("testdata", "base+merge.yaml")),
117-
},
118-
"WithEnv/Container": {
119-
input: []string{filepath.Join("testdata", "base.yaml")},
120-
isContainer: true,
121-
envValue: testEnvValue,
122-
want: mustLoadFromFile(t, filepath.Join("testdata", "base+env.yaml")),
123-
},
124-
"WithEmptyEnv/Container": {
125-
input: []string{filepath.Join("testdata", "base.yaml")},
126-
isContainer: true,
127-
envValue: "",
128-
want: nil,
129-
wantErr: false,
130-
},
131-
"WithInvalidEnv/Container": {
132-
input: []string{filepath.Join("testdata", "base.yaml")},
133-
isContainer: true,
134-
envValue: "test",
135-
wantErr: true,
136-
},
137-
"WithEnv/Container/MultipleFiles": {
138-
input: []string{filepath.Join("testdata", "base.yaml"), filepath.Join("testdata", "merge.yaml")},
139-
isContainer: true,
140-
envValue: testEnvValue,
141-
want: mustLoadFromFile(t, filepath.Join("testdata", "base+merge+env.yaml")),
142-
},
143-
}
144-
for name, testCase := range testCases {
145-
t.Run(name, func(t *testing.T) {
146-
if testCase.isContainer {
147-
t.Setenv(envconfig.RunInContainer, envconfig.TrueValue)
148-
}
149-
t.Setenv(envconfig.CWOtelConfigContent, testCase.envValue)
150-
got, err := mergeConfigs(testCase.input)
151-
if testCase.wantErr {
152-
assert.Error(t, err)
153-
assert.Nil(t, got)
154-
} else if testCase.want == nil {
155-
assert.NoError(t, err)
156-
assert.Nil(t, got)
157-
} else {
158-
assert.NoError(t, err)
159-
assert.NotNil(t, got)
160-
assert.Equal(t, testCase.want.ToStringMap(), got.ToStringMap())
161-
}
162-
})
163-
}
164-
}
165-
16668
func TestFallbackOtelConfig(t *testing.T) {
16769
defaultYamlRelativePath := filepath.Join("default", paths.YAML)
16870
testCases := map[string]struct {
@@ -208,9 +110,3 @@ func TestFallbackOtelConfig(t *testing.T) {
208110
})
209111
}
210112
}
211-
212-
func mustLoadFromFile(t *testing.T, path string) *confmap.Conf {
213-
conf, err := confmap.NewFileLoader(path).Load()
214-
require.NoError(t, err)
215-
return conf
216-
}
Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: MIT
3+
4+
package main
5+
6+
import (
7+
"errors"
8+
"fmt"
9+
"log"
10+
"os"
11+
"slices"
12+
"strings"
13+
14+
"github.com/aws/amazon-cloudwatch-agent/cfg/envconfig"
15+
"github.com/aws/amazon-cloudwatch-agent/internal/merge/confmap"
16+
agenthealthtranslator "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/extension/agenthealth"
17+
)
18+
19+
const (
20+
featureFlagOtelMergeYAML = "otel_merge_yaml"
21+
featureFlagOtelMergeJSON = "otel_merge_json"
22+
)
23+
24+
// mergeConfigs merges multiple OTEL configs together, including any config
25+
// provided via the CW_CONFIG_CONTENT environment variable when running in a
26+
// container. Returns nil without an error if there is nothing to merge (i.e.
27+
// a single config path with no env override). In practice, a single config
28+
// means no custom YAML was provided — the default agent YAML is always
29+
// accompanied by at least one custom YAML when custom configs are in use.
30+
func mergeConfigs(configPaths []string, isUsageDataEnabled bool) (*confmap.Conf, error) {
31+
var loaders []confmap.Loader
32+
if envconfig.IsRunningInContainer() {
33+
content, ok := os.LookupEnv(envconfig.CWOtelConfigContent)
34+
if ok && len(content) > 0 {
35+
log.Printf("D! Merging OTEL configuration from: %s", envconfig.CWOtelConfigContent)
36+
loaders = append(loaders, confmap.NewByteLoader(envconfig.CWOtelConfigContent, []byte(content)))
37+
}
38+
}
39+
// If using environment variable or passing in more than one config
40+
if len(loaders) > 0 || len(configPaths) > 1 {
41+
log.Printf("D! Merging OTEL configurations from: %s", configPaths)
42+
for _, configPath := range configPaths {
43+
loaders = append(loaders, confmap.NewFileLoader(configPath))
44+
}
45+
var result *confmap.Conf
46+
for _, loader := range loaders {
47+
conf, err := loader.Load()
48+
if err != nil {
49+
if errors.Is(err, os.ErrNotExist) {
50+
log.Printf("D! Skipping non-existent OTEL config: %s", loader.ID())
51+
continue
52+
}
53+
return nil, fmt.Errorf("failed to load OTEL configs: %w", err)
54+
}
55+
if result == nil {
56+
result = confmap.New()
57+
}
58+
if err = result.Merge(conf); err != nil {
59+
return nil, fmt.Errorf("failed to merge OTEL configs: %w", err)
60+
}
61+
}
62+
return mergeAgentHealth(result, isUsageDataEnabled), nil
63+
}
64+
return nil, nil
65+
}
66+
67+
type exporterInfo struct {
68+
middlewareID string
69+
operations []any
70+
}
71+
72+
var logsExporterInfo = exporterInfo{middlewareID: agenthealthtranslator.LogsID.String(), operations: []any{agenthealthtranslator.OperationPutLogEvents}}
73+
74+
// supportedExporters maps exporter type names to their agenthealth middleware ID and operations.
75+
var supportedExporters = map[string]exporterInfo{
76+
"awscloudwatch": {middlewareID: agenthealthtranslator.MetricsID.String(), operations: []any{agenthealthtranslator.OperationPutMetricData}},
77+
"awsemf": logsExporterInfo,
78+
"awscloudwatchlogs": logsExporterInfo,
79+
"awsxray": {middlewareID: agenthealthtranslator.TracesID.String(), operations: []any{agenthealthtranslator.OperationPutTraceSegments}},
80+
}
81+
82+
// mergeAgentHealth scans the exporters in the config for supported AWS exporters
83+
// and adds the appropriate agenthealth extension with a middleware reference to each.
84+
func mergeAgentHealth(conf *confmap.Conf, isUsageDataEnabled bool) *confmap.Conf {
85+
if conf == nil || !isUsageDataEnabled {
86+
return conf
87+
}
88+
89+
cfgMap := conf.ToStringMap()
90+
91+
exporters, ok := cfgMap["exporters"].(map[string]any)
92+
if !ok {
93+
return conf
94+
}
95+
96+
// Track which agenthealth extensions are needed
97+
neededExtensions := make(map[string]exporterInfo)
98+
for key := range exporters {
99+
typeName, _, _ := strings.Cut(key, "/")
100+
info, ok := supportedExporters[typeName]
101+
if !ok {
102+
continue
103+
}
104+
exporterCfg, ok := exporters[key].(map[string]any)
105+
if !ok || exporterCfg == nil {
106+
exporterCfg = make(map[string]any)
107+
exporters[key] = exporterCfg
108+
}
109+
if _, alreadySet := exporterCfg["middleware"]; !alreadySet {
110+
exporterCfg["middleware"] = info.middlewareID
111+
neededExtensions[info.middlewareID] = info
112+
}
113+
}
114+
115+
if len(neededExtensions) == 0 {
116+
return conf
117+
}
118+
119+
// Ensure extensions section exists
120+
extensions, _ := cfgMap["extensions"].(map[string]any)
121+
if extensions == nil {
122+
extensions = make(map[string]any)
123+
cfgMap["extensions"] = extensions
124+
}
125+
126+
// Ensure service section exists
127+
service, _ := cfgMap["service"].(map[string]any)
128+
if service == nil {
129+
service = make(map[string]any)
130+
cfgMap["service"] = service
131+
}
132+
133+
var svcExtensions []any
134+
if existing, ok := service["extensions"].([]any); ok {
135+
svcExtensions = existing
136+
}
137+
138+
for middlewareID, info := range neededExtensions {
139+
if _, exists := extensions[middlewareID]; !exists {
140+
extensions[middlewareID] = map[string]any{
141+
"is_usage_data_enabled": true,
142+
"stats": map[string]any{
143+
"operations": info.operations,
144+
},
145+
}
146+
}
147+
if !slices.Contains(svcExtensions, any(middlewareID)) {
148+
svcExtensions = append(svcExtensions, middlewareID)
149+
}
150+
}
151+
152+
service["extensions"] = svcExtensions
153+
return confmap.NewFromStringMap(cfgMap)
154+
}

0 commit comments

Comments
 (0)