Skip to content

Commit e8f5001

Browse files
JayPolancomitali-salvijefchien
authored
Add OTLP export support with receivers, endpoint validation, and agent health integration (#2062)
Co-authored-by: Mitali Salvi <44349099+mitali-salvi@users.noreply.github.com> Co-authored-by: Jeffrey Chien <chienjef@amazon.com>
1 parent b3c2664 commit e8f5001

25 files changed

Lines changed: 1644 additions & 237 deletions

cmd/amazon-cloudwatch-agent/amazon-cloudwatch-agent.go

Lines changed: 6 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ import (
3737
"github.com/aws/amazon-cloudwatch-agent/cmd/amazon-cloudwatch-agent/internal"
3838
"github.com/aws/amazon-cloudwatch-agent/extension/agenthealth/handler/useragent"
3939
"github.com/aws/amazon-cloudwatch-agent/internal/mapstructure"
40-
"github.com/aws/amazon-cloudwatch-agent/internal/merge/confmap"
4140
"github.com/aws/amazon-cloudwatch-agent/internal/version"
4241
cwaLogger "github.com/aws/amazon-cloudwatch-agent/logger"
4342
"github.com/aws/amazon-cloudwatch-agent/logs"
@@ -348,11 +347,16 @@ func runAgent(ctx context.Context,
348347

349348
otelConfigs := fOtelConfigs
350349
// try merging configs together, will return nil if nothing to merge
351-
merged, err := mergeConfigs(otelConfigs)
350+
merged, err := mergeConfigs(otelConfigs, envconfig.IsUsageDataEnabled())
352351
if err != nil {
353352
return err
354353
}
355354
if merged != nil {
355+
if _, err = os.Stat(paths.YamlConfigPath); err == nil {
356+
useragent.Get().AddFeatureFlags(featureFlagOtelMergeJSON)
357+
} else {
358+
useragent.Get().AddFeatureFlags(featureFlagOtelMergeYAML)
359+
}
356360
_ = os.Setenv(envconfig.CWAgentMergedOtelConfig, toyamlconfig.ToYamlConfig(merged.ToStringMap()))
357361
otelConfigs = []string{"env:" + envconfig.CWAgentMergedOtelConfig}
358362
} else {
@@ -418,37 +422,6 @@ func getCollectorParams(factories otelcol.Factories, providerSettings otelcol.Co
418422
}
419423
}
420424

421-
// mergeConfigs tries to merge configurations together. If nothing to merge, returns nil without an error.
422-
func mergeConfigs(configPaths []string) (*confmap.Conf, error) {
423-
var loaders []confmap.Loader
424-
if envconfig.IsRunningInContainer() {
425-
content, ok := os.LookupEnv(envconfig.CWOtelConfigContent)
426-
if ok && len(content) > 0 {
427-
log.Printf("D! Merging OTEL configuration from: %s", envconfig.CWOtelConfigContent)
428-
loaders = append(loaders, confmap.NewByteLoader(envconfig.CWOtelConfigContent, []byte(content)))
429-
}
430-
}
431-
// If using environment variable or passing in more than one config
432-
if len(loaders) > 0 || len(configPaths) > 1 {
433-
log.Printf("D! Merging OTEL configurations from: %s", configPaths)
434-
for _, configPath := range configPaths {
435-
loaders = append(loaders, confmap.NewFileLoader(configPath))
436-
}
437-
result := confmap.New()
438-
for _, loader := range loaders {
439-
conf, err := loader.Load()
440-
if err != nil {
441-
return nil, fmt.Errorf("failed to load OTEL configs: %w", err)
442-
}
443-
if err = result.Merge(conf); err != nil {
444-
return nil, fmt.Errorf("failed to merge OTEL configs: %w", err)
445-
}
446-
}
447-
return result, nil
448-
}
449-
return nil, nil
450-
}
451-
452425
func components(telegrafConfig *config.Config) (otelcol.Factories, error) {
453426
telegrafAdapter := adapter.NewAdapter(telegrafConfig)
454427

cmd/amazon-cloudwatch-agent/amazon-cloudwatch-agent_test.go

Lines changed: 0 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@ import (
1717
"go.uber.org/zap"
1818
"go.uber.org/zap/zapcore"
1919

20-
"github.com/aws/amazon-cloudwatch-agent/cfg/envconfig"
21-
"github.com/aws/amazon-cloudwatch-agent/internal/merge/confmap"
2220
"github.com/aws/amazon-cloudwatch-agent/logger"
2321
"github.com/aws/amazon-cloudwatch-agent/tool/paths"
2422
)
@@ -67,94 +65,6 @@ func Test_getCollectorParams(t *testing.T) {
6765
}
6866
}
6967

70-
func TestMergeConfigs(t *testing.T) {
71-
testEnvValue := `receivers:
72-
nop/1:
73-
exporters:
74-
nop:
75-
extensions:
76-
nop:
77-
service:
78-
extensions: [nop]
79-
pipelines:
80-
traces/1:
81-
receivers: [nop/1]
82-
exporters: [nop]
83-
`
84-
testCases := map[string]struct {
85-
input []string
86-
isContainer bool
87-
envValue string
88-
want *confmap.Conf
89-
wantErr bool
90-
}{
91-
"WithInvalidFile": {
92-
input: []string{filepath.Join("not", "a", "file"), filepath.Join("testdata", "base.yaml")},
93-
wantErr: true,
94-
},
95-
"WithNoMerge": {
96-
input: []string{filepath.Join("testdata", "base.yaml")},
97-
wantErr: false,
98-
},
99-
"WithoutEnv/Container": {
100-
input: []string{filepath.Join("testdata", "base.yaml"), filepath.Join("testdata", "merge.yaml")},
101-
isContainer: true,
102-
want: mustLoadFromFile(t, filepath.Join("testdata", "base+merge.yaml")),
103-
},
104-
"WithEnv/NonContainer": {
105-
input: []string{filepath.Join("testdata", "base.yaml"), filepath.Join("testdata", "merge.yaml")},
106-
isContainer: false,
107-
envValue: testEnvValue,
108-
want: mustLoadFromFile(t, filepath.Join("testdata", "base+merge.yaml")),
109-
},
110-
"WithEnv/Container": {
111-
input: []string{filepath.Join("testdata", "base.yaml")},
112-
isContainer: true,
113-
envValue: testEnvValue,
114-
want: mustLoadFromFile(t, filepath.Join("testdata", "base+env.yaml")),
115-
},
116-
"WithEmptyEnv/Container": {
117-
input: []string{filepath.Join("testdata", "base.yaml")},
118-
isContainer: true,
119-
envValue: "",
120-
want: nil,
121-
wantErr: false,
122-
},
123-
"WithInvalidEnv/Container": {
124-
input: []string{filepath.Join("testdata", "base.yaml")},
125-
isContainer: true,
126-
envValue: "test",
127-
wantErr: true,
128-
},
129-
"WithEnv/Container/MultipleFiles": {
130-
input: []string{filepath.Join("testdata", "base.yaml"), filepath.Join("testdata", "merge.yaml")},
131-
isContainer: true,
132-
envValue: testEnvValue,
133-
want: mustLoadFromFile(t, filepath.Join("testdata", "base+merge+env.yaml")),
134-
},
135-
}
136-
for name, testCase := range testCases {
137-
t.Run(name, func(t *testing.T) {
138-
if testCase.isContainer {
139-
t.Setenv(envconfig.RunInContainer, envconfig.TrueValue)
140-
}
141-
t.Setenv(envconfig.CWOtelConfigContent, testCase.envValue)
142-
got, err := mergeConfigs(testCase.input)
143-
if testCase.wantErr {
144-
assert.Error(t, err)
145-
assert.Nil(t, got)
146-
} else if testCase.want == nil {
147-
assert.NoError(t, err)
148-
assert.Nil(t, got)
149-
} else {
150-
assert.NoError(t, err)
151-
assert.NotNil(t, got)
152-
assert.Equal(t, testCase.want.ToStringMap(), got.ToStringMap())
153-
}
154-
})
155-
}
156-
}
157-
15868
func TestFallbackOtelConfig(t *testing.T) {
15969
defaultYamlRelativePath := filepath.Join("default", paths.YAML)
16070
testCases := map[string]struct {
@@ -200,9 +110,3 @@ func TestFallbackOtelConfig(t *testing.T) {
200110
})
201111
}
202112
}
203-
204-
func mustLoadFromFile(t *testing.T, path string) *confmap.Conf {
205-
conf, err := confmap.NewFileLoader(path).Load()
206-
require.NoError(t, err)
207-
return conf
208-
}
Lines changed: 215 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,215 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: MIT
3+
4+
package main
5+
6+
import (
7+
"errors"
8+
"fmt"
9+
"log"
10+
"os"
11+
"slices"
12+
"strings"
13+
14+
"github.com/aws/amazon-cloudwatch-agent/cfg/envconfig"
15+
"github.com/aws/amazon-cloudwatch-agent/internal/merge/confmap"
16+
agenthealthtranslator "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/extension/agenthealth"
17+
)
18+
19+
const (
20+
featureFlagOtelMergeYAML = "otel_merge_yaml"
21+
featureFlagOtelMergeJSON = "otel_merge_json"
22+
)
23+
24+
// mergeConfigs merges multiple OTEL configs together, including any config
25+
// provided via the CW_CONFIG_CONTENT environment variable when running in a
26+
// container. Returns nil without an error if there is nothing to merge (i.e.
27+
// a single config path with no env override). In practice, a single config
28+
// means no custom YAML was provided — the default agent YAML is always
29+
// accompanied by at least one custom YAML when custom configs are in use.
30+
func mergeConfigs(configPaths []string, isUsageDataEnabled bool) (*confmap.Conf, error) {
31+
var loaders []confmap.Loader
32+
if envconfig.IsRunningInContainer() {
33+
content, ok := os.LookupEnv(envconfig.CWOtelConfigContent)
34+
if ok && len(content) > 0 {
35+
log.Printf("D! Merging OTEL configuration from: %s", envconfig.CWOtelConfigContent)
36+
loaders = append(loaders, confmap.NewByteLoader(envconfig.CWOtelConfigContent, []byte(content)))
37+
}
38+
}
39+
// If using environment variable or passing in more than one config
40+
if len(loaders) > 0 || len(configPaths) > 1 {
41+
log.Printf("D! Merging OTEL configurations from: %s", configPaths)
42+
for _, configPath := range configPaths {
43+
loaders = append(loaders, confmap.NewFileLoader(configPath))
44+
}
45+
var result *confmap.Conf
46+
for _, loader := range loaders {
47+
conf, err := loader.Load()
48+
if err != nil {
49+
if errors.Is(err, os.ErrNotExist) {
50+
log.Printf("D! Skipping non-existent OTEL config: %s", loader.ID())
51+
continue
52+
}
53+
return nil, fmt.Errorf("failed to load OTEL configs: %w", err)
54+
}
55+
if result == nil {
56+
result = confmap.New()
57+
}
58+
if err = result.Merge(conf); err != nil {
59+
return nil, fmt.Errorf("failed to merge OTEL configs: %w", err)
60+
}
61+
}
62+
return mergeAgentHealth(result, isUsageDataEnabled), nil
63+
}
64+
return nil, nil
65+
}
66+
67+
type exporterInfo struct {
68+
middlewareID string
69+
operations []any
70+
}
71+
72+
var logsExporterInfo = exporterInfo{middlewareID: agenthealthtranslator.LogsID.String(), operations: []any{agenthealthtranslator.OperationPutLogEvents}}
73+
74+
// supportedExporters maps exporter type names to their agenthealth middleware ID and operations.
75+
var supportedExporters = map[string]exporterInfo{
76+
"awscloudwatch": {middlewareID: agenthealthtranslator.MetricsID.String(), operations: []any{agenthealthtranslator.OperationPutMetricData}},
77+
"awsemf": logsExporterInfo,
78+
"awscloudwatchlogs": logsExporterInfo,
79+
"awsxray": {middlewareID: agenthealthtranslator.TracesID.String(), operations: []any{agenthealthtranslator.OperationPutTraceSegments}},
80+
}
81+
82+
// mergeAgentHealth scans the exporters in the config for supported AWS exporters
83+
// and adds the appropriate agenthealth extension with a middleware reference to each.
84+
// It also detects otlphttp exporters and sets their auth.authenticator to an
85+
// agenthealth extension, chaining with any existing auth extension.
86+
func mergeAgentHealth(conf *confmap.Conf, isUsageDataEnabled bool) *confmap.Conf {
87+
if conf == nil || !isUsageDataEnabled {
88+
return conf
89+
}
90+
91+
cfgMap := conf.ToStringMap()
92+
93+
exporters, ok := cfgMap["exporters"].(map[string]any)
94+
if !ok {
95+
return conf
96+
}
97+
98+
// Track which agenthealth extensions are needed for AWS exporters
99+
neededExtensions := make(map[string]exporterInfo)
100+
for key := range exporters {
101+
typeName, _, _ := strings.Cut(key, "/")
102+
info, ok := supportedExporters[typeName]
103+
if !ok {
104+
continue
105+
}
106+
exporterCfg, ok := exporters[key].(map[string]any)
107+
if !ok || exporterCfg == nil {
108+
exporterCfg = make(map[string]any)
109+
exporters[key] = exporterCfg
110+
}
111+
if _, alreadySet := exporterCfg["middleware"]; !alreadySet {
112+
exporterCfg["middleware"] = info.middlewareID
113+
neededExtensions[info.middlewareID] = info
114+
}
115+
}
116+
117+
// Detect otlphttp exporters for auth-based agenthealth integration
118+
type otlphttpAuthEntry struct {
119+
exporterKey string
120+
ahExtName string
121+
additionalAuth string
122+
}
123+
var otlphttpEntries []otlphttpAuthEntry
124+
for key := range exporters {
125+
typeName, suffix, hasSuffix := strings.Cut(key, "/")
126+
if typeName != "otlphttp" {
127+
continue
128+
}
129+
ahName := "agenthealth/otlphttp"
130+
if hasSuffix {
131+
ahName = "agenthealth/otlphttp_" + suffix
132+
}
133+
exporterCfg, ok := exporters[key].(map[string]any)
134+
if !ok || exporterCfg == nil {
135+
exporterCfg = make(map[string]any)
136+
exporters[key] = exporterCfg
137+
}
138+
// Skip if already using an agenthealth auth extension
139+
var additionalAuth string
140+
if authMap, ok := exporterCfg["auth"].(map[string]any); ok {
141+
if authn, ok := authMap["authenticator"].(string); ok {
142+
if strings.HasPrefix(authn, "agenthealth/") {
143+
continue
144+
}
145+
additionalAuth = authn
146+
}
147+
}
148+
otlphttpEntries = append(otlphttpEntries, otlphttpAuthEntry{
149+
exporterKey: key,
150+
ahExtName: ahName,
151+
additionalAuth: additionalAuth,
152+
})
153+
}
154+
155+
if len(neededExtensions) == 0 && len(otlphttpEntries) == 0 {
156+
return conf
157+
}
158+
159+
// Ensure extensions section exists
160+
extensions, _ := cfgMap["extensions"].(map[string]any)
161+
if extensions == nil {
162+
extensions = make(map[string]any)
163+
cfgMap["extensions"] = extensions
164+
}
165+
166+
// Ensure service section exists
167+
service, _ := cfgMap["service"].(map[string]any)
168+
if service == nil {
169+
service = make(map[string]any)
170+
cfgMap["service"] = service
171+
}
172+
173+
var svcExtensions []any
174+
if existing, ok := service["extensions"].([]any); ok {
175+
svcExtensions = existing
176+
}
177+
178+
for middlewareID, info := range neededExtensions {
179+
if _, exists := extensions[middlewareID]; !exists {
180+
extensions[middlewareID] = map[string]any{
181+
"is_usage_data_enabled": true,
182+
"stats": map[string]any{
183+
"operations": info.operations,
184+
},
185+
}
186+
}
187+
if !slices.Contains(svcExtensions, any(middlewareID)) {
188+
svcExtensions = append(svcExtensions, middlewareID)
189+
}
190+
}
191+
192+
// Configure agenthealth auth for otlphttp exporters
193+
for _, entry := range otlphttpEntries {
194+
exporterCfg := exporters[entry.exporterKey].(map[string]any)
195+
exporterCfg["auth"] = map[string]any{"authenticator": entry.ahExtName}
196+
if _, exists := extensions[entry.ahExtName]; !exists {
197+
extCfg := map[string]any{
198+
"is_usage_data_enabled": true,
199+
"stats": map[string]any{
200+
"operations": []any{"*"},
201+
},
202+
}
203+
if entry.additionalAuth != "" {
204+
extCfg["additional_auth"] = entry.additionalAuth
205+
}
206+
extensions[entry.ahExtName] = extCfg
207+
}
208+
if !slices.Contains(svcExtensions, any(entry.ahExtName)) {
209+
svcExtensions = append(svcExtensions, entry.ahExtName)
210+
}
211+
}
212+
213+
service["extensions"] = svcExtensions
214+
return confmap.NewFromStringMap(cfgMap)
215+
}

0 commit comments

Comments
 (0)