diff --git a/comp/logs/agent/config/config.go b/comp/logs/agent/config/config.go index 891dc18b2eb3..69062a1ea779 100644 --- a/comp/logs/agent/config/config.go +++ b/comp/logs/agent/config/config.go @@ -349,6 +349,7 @@ func buildHTTPEndpoints(coreConfig pkgconfigmodel.Reader, logsConfig *LogsConfig defaultNoSSL := logsConfig.logsNoSSL() main := newHTTPEndpoint(logsConfig, registerCallback) + main.ExtraHTTPHeaders = logsConfig.additionalHTTPHeaders() if logsConfig.useV2API() && intakeTrackType != "" { main.Version = EPIntakeVersion2 @@ -426,6 +427,7 @@ func buildHTTPEndpoints(coreConfig pkgconfigmodel.Reader, logsConfig *LogsConfig e.Protocol = intakeProtocol e.Origin = intakeOrigin e.OriginVersion = main.OriginVersion + e.ExtraHTTPHeaders = cloneExtraHTTPHeaders(main.ExtraHTTPHeaders) e.onConfigUpdateFromReaderMainEndpoint(coreConfig) additionals = append(additionals, e) diff --git a/comp/logs/agent/config/config_keys.go b/comp/logs/agent/config/config_keys.go index 18927166c642..201fef1d09f0 100644 --- a/comp/logs/agent/config/config_keys.go +++ b/comp/logs/agent/config/config_keys.go @@ -109,6 +109,19 @@ func (l *LogsConfigKeys) ddEVPOriginVersion() string { return l.getConfig().GetString(l.getConfigKey("dd_evp_origin_version")) } +func (l *LogsConfigKeys) additionalHTTPHeaders() map[string]string { + headers := l.getConfig().GetStringMapString(l.getConfigKey("additional_http_headers")) + if len(headers) == 0 { + return nil + } + + copiedHeaders := make(map[string]string, len(headers)) + for k, v := range headers { + copiedHeaders[k] = v + } + return copiedHeaders +} + func (l *LogsConfigKeys) logsNoSSL() bool { return l.getConfig().GetBool(l.getConfigKey("logs_no_ssl")) } diff --git a/comp/logs/agent/config/config_test.go b/comp/logs/agent/config/config_test.go index bdc763106285..9ac5b17b1e42 100644 --- a/comp/logs/agent/config/config_test.go +++ b/comp/logs/agent/config/config_test.go @@ -79,6 +79,8 @@ func compareEndpoint(t *testing.T, expected Endpoint, actual Endpoint) { assert.Equal(t, expected.TrackType, actual.TrackType, "TrackType is not Equal") assert.Equal(t, expected.Protocol, actual.Protocol, "Protocol is not Equal") assert.Equal(t, expected.Origin, actual.Origin, "Origin is not Equal") + assert.Equal(t, expected.OriginVersion, actual.OriginVersion, "OriginVersion is not Equal") + assert.Equal(t, expected.ExtraHTTPHeaders, actual.ExtraHTTPHeaders, "ExtraHTTPHeaders is not Equal") } func (suite *ConfigTestSuite) compareEndpoints(expected *Endpoints, actual *Endpoints) { @@ -540,6 +542,24 @@ func (suite *ConfigTestSuite) TestLogsEVPOriginVersionOverride() { suite.Equal("cluster-a", endpoints.Main.OriginVersion) } +func (suite *ConfigTestSuite) TestLogsAdditionalHTTPHeaders() { + suite.config.SetWithoutSource("api_key", "123") + suite.config.SetWithoutSource("logs_config.additional_http_headers", map[string]string{ + "x-custom-routing": "cluster-a", + "x-experiment": "stateful", + }) + + endpoints, err := BuildHTTPEndpoints(suite.config, "test-track", "test-proto", "test-source") + + expectedHeaders := map[string]string{ + "x-custom-routing": "cluster-a", + "x-experiment": "stateful", + } + suite.Nil(err) + suite.Equal(expectedHeaders, endpoints.Main.ExtraHTTPHeaders) + suite.Equal(expectedHeaders, endpoints.Endpoints[0].ExtraHTTPHeaders) +} + func (suite *ConfigTestSuite) TestMultipleTCPEndpointsInConf() { suite.config.SetWithoutSource("api_key", "123") suite.config.SetWithoutSource("logs_config.logs_dd_url", "agent-http-intake.logs.datadoghq.com:443") diff --git a/comp/logs/agent/config/endpoints.go b/comp/logs/agent/config/endpoints.go index d9350a8c49a1..2b9557909886 100644 --- a/comp/logs/agent/config/endpoints.go +++ b/comp/logs/agent/config/endpoints.go @@ -211,6 +211,7 @@ func loadTCPAdditionalEndpoints(main Endpoint, l *LogsConfigKeys, registerCallba newE.Protocol = e.Protocol newE.Origin = e.Origin newE.OriginVersion = e.OriginVersion + newE.ExtraHTTPHeaders = cloneExtraHTTPHeaders(e.ExtraHTTPHeaders) newE.UseGRPC = e.UseGRPC if e.UseSSL != nil { @@ -259,6 +260,10 @@ func loadHTTPAdditionalEndpoints(main Endpoint, l *LogsConfigKeys, intakeTrackTy newE.Protocol = e.Protocol newE.Origin = e.Origin newE.OriginVersion = e.OriginVersion + newE.ExtraHTTPHeaders = cloneExtraHTTPHeaders(e.ExtraHTTPHeaders) + if newE.ExtraHTTPHeaders == nil { + newE.ExtraHTTPHeaders = cloneExtraHTTPHeaders(main.ExtraHTTPHeaders) + } if e.UseSSL != nil { newE.useSSL = *e.UseSSL @@ -286,6 +291,18 @@ func loadHTTPAdditionalEndpoints(main Endpoint, l *LogsConfigKeys, intakeTrackTy return newEndpoints } +func cloneExtraHTTPHeaders(headers map[string]string) map[string]string { + if len(headers) == 0 { + return nil + } + + clonedHeaders := make(map[string]string, len(headers)) + for k, v := range headers { + clonedHeaders[k] = v + } + return clonedHeaders +} + // GetAPIKey returns the latest API Key for the Endpoint, including when the configuration gets updated at runtime func (e *Endpoint) GetAPIKey() string { return e.apiKey.Load() diff --git a/pkg/config/config_template.yaml b/pkg/config/config_template.yaml index ce349451ca52..cc17ca2d25d5 100644 --- a/pkg/config/config_template.yaml +++ b/pkg/config/config_template.yaml @@ -1226,6 +1226,13 @@ api_key: # # dd_evp_origin_version: "" +# # @param additional_http_headers - object - optional - default: {} +# # @env DD_LOGS_CONFIG_ADDITIONAL_HTTP_HEADERS - JSON object of string to string - optional - default: {} +# # Additional headers to send with logs HTTP and gRPC requests. +# # Required Agent headers are set after these values and cannot be overridden. +# +# additional_http_headers: {} + # # @param grpc - custom object - optional # # Configuration for the logs gRPC transport. # diff --git a/pkg/config/schema/compressed/core_schema.yaml.zstd b/pkg/config/schema/compressed/core_schema.yaml.zstd index ff562734496f..e2ea4f9a5057 100644 Binary files a/pkg/config/schema/compressed/core_schema.yaml.zstd and b/pkg/config/schema/compressed/core_schema.yaml.zstd differ diff --git a/pkg/config/schema/core_schema.yaml b/pkg/config/schema/core_schema.yaml index 37914c88e66d..0bf67b063e0a 100644 --- a/pkg/config/schema/core_schema.yaml +++ b/pkg/config/schema/core_schema.yaml @@ -2017,6 +2017,19 @@ properties: tags: - golang_type:int - template_section:LogsAgent + additional_http_headers: + node_type: setting + type: object + default: {} + visibility: public + description: |- + Additional headers to send with logs HTTP and gRPC requests. + Required Agent headers are set after these values and cannot be overridden. + example: '{}' + additionalProperties: + type: string + tags: + - template_section:LogsAgent force_use_tcp: node_type: setting type: boolean diff --git a/pkg/config/setup/config.go b/pkg/config/setup/config.go index e5d552297e94..eb9a8b9f4486 100644 --- a/pkg/config/setup/config.go +++ b/pkg/config/setup/config.go @@ -1515,6 +1515,7 @@ func bindEnvAndSetLogsConfigKeys(config pkgconfigmodel.Setup, prefix string) { config.BindEnvAndSetDefault(prefix+"use_rust_tokenizer", false) config.BindEnvAndSetDefault(prefix+"stream_lifetime", DefaultLogsStreamLifetime) config.BindEnvAndSetDefault(prefix+"dd_evp_origin_version", "") + config.BindEnvAndSetDefault(prefix+"additional_http_headers", map[string]string{}) config.BindEnvAndSetDefault("logs_config.grpc.dual_send_uuids_enabled", true) config.BindEnvAndSetDefault("logs_config.grpc.max_inflight_payloads", DefaultMaxInflightPayloads) config.BindEnvAndSetDefault("logs_config.message_channel_size", 100) diff --git a/pkg/logs/client/http/destination.go b/pkg/logs/client/http/destination.go index 0837489e500f..ddb54c3d97fe 100644 --- a/pkg/logs/client/http/destination.go +++ b/pkg/logs/client/http/destination.go @@ -365,6 +365,10 @@ func (d *Destination) unconditionalSend(payload *message.Payload) (err error) { return err } + for k, v := range d.endpoint.ExtraHTTPHeaders { + req.Header.Set(k, v) + } + req.Header.Set("DD-API-KEY", d.endpoint.GetAPIKey()) req.Header.Set("Content-Type", d.contentType) req.Header.Set("User-Agent", "datadog-agent/"+version.AgentVersion) @@ -382,9 +386,6 @@ func (d *Destination) unconditionalSend(payload *message.Payload) (err error) { req.Header.Set("dd-message-timestamp", strconv.FormatInt(getMessageTimestamp(payload.MessageMetas), 10)) then := time.Now() req.Header.Set("dd-current-timestamp", strconv.FormatInt(then.UnixMilli(), 10)) - for k, v := range d.endpoint.ExtraHTTPHeaders { - req.Header.Set(k, v) - } req = req.WithContext(ctx) resp, err := d.client.Do(req) diff --git a/pkg/logs/client/http/destination_test.go b/pkg/logs/client/http/destination_test.go index d5cd42bd43e6..b8907f482fae 100644 --- a/pkg/logs/client/http/destination_test.go +++ b/pkg/logs/client/http/destination_test.go @@ -390,6 +390,31 @@ func TestDestinationSendsOriginVersionOverride(t *testing.T) { assert.Equal(t, "cluster-a", server.request.Header.Get("dd-evp-origin-version")) } +func TestDestinationSendsAdditionalHeadersWithoutOverridingRequiredHeaders(t *testing.T) { + cfg := configmock.New(t) + server := NewTestServer(200, cfg) + defer server.httpServer.Close() + + server.Destination.protocol = "test-proto" + server.Destination.origin = "test-origin" + server.Destination.endpoint.ExtraHTTPHeaders = map[string]string{ + "x-custom-routing": "cluster-a", + "dd-api-key": "bad-key", + "dd-protocol": "bad-proto", + "dd-evp-origin": "bad-origin", + "dd-evp-origin-version": "bad-version", + } + + err := server.Destination.unconditionalSend(&message.Payload{Encoded: []byte("payload")}) + + assert.Nil(t, err) + assert.Equal(t, "cluster-a", server.request.Header.Get("x-custom-routing")) + assert.Equal(t, server.Destination.endpoint.GetAPIKey(), server.request.Header.Get("dd-api-key")) + assert.Equal(t, "test-proto", server.request.Header.Get("dd-protocol")) + assert.Equal(t, "test-origin", server.request.Header.Get("dd-evp-origin")) + assert.Equal(t, originVersionOrDefault(""), server.request.Header.Get("dd-evp-origin-version")) +} + func TestDestinationDoesntSendEmptyV2Protocol(t *testing.T) { cfg := configmock.New(t) server := NewTestServer(200, cfg) diff --git a/pkg/logs/sender/grpc/sender.go b/pkg/logs/sender/grpc/sender.go index 2e95090862ea..d7a1b92d440c 100644 --- a/pkg/logs/sender/grpc/sender.go +++ b/pkg/logs/sender/grpc/sender.go @@ -46,10 +46,13 @@ type headerCredentials struct { // GetRequestMetadata adds required headers to each RPC call func (h *headerCredentials) GetRequestMetadata(_ context.Context, _ ...string) (map[string]string, error) { - headers := map[string]string{ - "dd-api-key": h.endpoint.GetAPIKey(), + headers := make(map[string]string, 4+len(h.endpoint.ExtraHTTPHeaders)) + for k, v := range h.endpoint.ExtraHTTPHeaders { + headers[k] = v } + headers["dd-api-key"] = h.endpoint.GetAPIKey() + // Add protocol header if specified if h.endpoint.Protocol != "" { headers["dd-protocol"] = string(h.endpoint.Protocol) @@ -67,10 +70,6 @@ func (h *headerCredentials) GetRequestMetadata(_ context.Context, _ ...string) ( headers["dd-content-encoding"] = "identity" } - for k, v := range h.endpoint.ExtraHTTPHeaders { - headers[k] = v - } - return headers, nil } diff --git a/pkg/logs/sender/grpc/stream_worker_test.go b/pkg/logs/sender/grpc/stream_worker_test.go index 026a61f30ada..02fa36574dd3 100644 --- a/pkg/logs/sender/grpc/stream_worker_test.go +++ b/pkg/logs/sender/grpc/stream_worker_test.go @@ -95,6 +95,32 @@ func TestHeaderCredentialsSendsOriginVersionOverride(t *testing.T) { require.Equal(t, "cluster-a", headers["dd-evp-origin-version"]) } +func TestHeaderCredentialsSendsAdditionalHeadersWithoutOverridingRequiredHeaders(t *testing.T) { + endpoint := config.NewEndpoint("test-api-key", "", "test-host", 443, config.EmptyPathPrefix, true) + endpoint.Protocol = "test-proto" + endpoint.Origin = "test-origin" + endpoint.UseCompression = true + endpoint.CompressionKind = "zstd" + endpoint.ExtraHTTPHeaders = map[string]string{ + "x-custom-routing": "cluster-a", + "dd-api-key": "bad-key", + "dd-protocol": "bad-proto", + "dd-evp-origin": "bad-origin", + "dd-evp-origin-version": "bad-version", + "dd-content-encoding": "bad-encoding", + } + + headers, err := (&headerCredentials{endpoint: endpoint}).GetRequestMetadata(context.Background()) + + require.NoError(t, err) + require.Equal(t, "cluster-a", headers["x-custom-routing"]) + require.Equal(t, "test-api-key", headers["dd-api-key"]) + require.Equal(t, "test-proto", headers["dd-protocol"]) + require.Equal(t, "test-origin", headers["dd-evp-origin"]) + require.Equal(t, originVersionOrDefault(""), headers["dd-evp-origin-version"]) + require.Equal(t, "zstd", headers["dd-content-encoding"]) +} + func (m *mockLogsStream) Send(batch *statefulpb.StatefulBatch) error { m.mu.Lock() if m.sendErr != nil {