Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions comp/logs/agent/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,7 @@ func buildHTTPEndpoints(coreConfig pkgconfigmodel.Reader, logsConfig *LogsConfig
defaultNoSSL := logsConfig.logsNoSSL()

main := newHTTPEndpoint(logsConfig, registerCallback)
main.ExtraHTTPHeaders = logsConfig.additionalHTTPHeaders()

if logsConfig.useV2API() && intakeTrackType != "" {
main.Version = EPIntakeVersion2
Expand Down Expand Up @@ -426,6 +427,7 @@ func buildHTTPEndpoints(coreConfig pkgconfigmodel.Reader, logsConfig *LogsConfig
e.Protocol = intakeProtocol
e.Origin = intakeOrigin
e.OriginVersion = main.OriginVersion
e.ExtraHTTPHeaders = cloneExtraHTTPHeaders(main.ExtraHTTPHeaders)
e.onConfigUpdateFromReaderMainEndpoint(coreConfig)

additionals = append(additionals, e)
Expand Down
13 changes: 13 additions & 0 deletions comp/logs/agent/config/config_keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,19 @@ func (l *LogsConfigKeys) ddEVPOriginVersion() string {
return l.getConfig().GetString(l.getConfigKey("dd_evp_origin_version"))
}

func (l *LogsConfigKeys) additionalHTTPHeaders() map[string]string {
headers := l.getConfig().GetStringMapString(l.getConfigKey("additional_http_headers"))
if len(headers) == 0 {
return nil
}

copiedHeaders := make(map[string]string, len(headers))
for k, v := range headers {
copiedHeaders[k] = v
}
return copiedHeaders
}

func (l *LogsConfigKeys) logsNoSSL() bool {
return l.getConfig().GetBool(l.getConfigKey("logs_no_ssl"))
}
Expand Down
20 changes: 20 additions & 0 deletions comp/logs/agent/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ func compareEndpoint(t *testing.T, expected Endpoint, actual Endpoint) {
assert.Equal(t, expected.TrackType, actual.TrackType, "TrackType is not Equal")
assert.Equal(t, expected.Protocol, actual.Protocol, "Protocol is not Equal")
assert.Equal(t, expected.Origin, actual.Origin, "Origin is not Equal")
assert.Equal(t, expected.OriginVersion, actual.OriginVersion, "OriginVersion is not Equal")
assert.Equal(t, expected.ExtraHTTPHeaders, actual.ExtraHTTPHeaders, "ExtraHTTPHeaders is not Equal")
}

func (suite *ConfigTestSuite) compareEndpoints(expected *Endpoints, actual *Endpoints) {
Expand Down Expand Up @@ -540,6 +542,24 @@ func (suite *ConfigTestSuite) TestLogsEVPOriginVersionOverride() {
suite.Equal("cluster-a", endpoints.Main.OriginVersion)
}

func (suite *ConfigTestSuite) TestLogsAdditionalHTTPHeaders() {
suite.config.SetWithoutSource("api_key", "123")
suite.config.SetWithoutSource("logs_config.additional_http_headers", map[string]string{
"x-custom-routing": "cluster-a",
"x-experiment": "stateful",
})

endpoints, err := BuildHTTPEndpoints(suite.config, "test-track", "test-proto", "test-source")

expectedHeaders := map[string]string{
"x-custom-routing": "cluster-a",
"x-experiment": "stateful",
}
suite.Nil(err)
suite.Equal(expectedHeaders, endpoints.Main.ExtraHTTPHeaders)
suite.Equal(expectedHeaders, endpoints.Endpoints[0].ExtraHTTPHeaders)
}

func (suite *ConfigTestSuite) TestMultipleTCPEndpointsInConf() {
suite.config.SetWithoutSource("api_key", "123")
suite.config.SetWithoutSource("logs_config.logs_dd_url", "agent-http-intake.logs.datadoghq.com:443")
Expand Down
17 changes: 17 additions & 0 deletions comp/logs/agent/config/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ func loadTCPAdditionalEndpoints(main Endpoint, l *LogsConfigKeys, registerCallba
newE.Protocol = e.Protocol
newE.Origin = e.Origin
newE.OriginVersion = e.OriginVersion
newE.ExtraHTTPHeaders = cloneExtraHTTPHeaders(e.ExtraHTTPHeaders)
newE.UseGRPC = e.UseGRPC

if e.UseSSL != nil {
Expand Down Expand Up @@ -259,6 +260,10 @@ func loadHTTPAdditionalEndpoints(main Endpoint, l *LogsConfigKeys, intakeTrackTy
newE.Protocol = e.Protocol
newE.Origin = e.Origin
newE.OriginVersion = e.OriginVersion
newE.ExtraHTTPHeaders = cloneExtraHTTPHeaders(e.ExtraHTTPHeaders)
if newE.ExtraHTTPHeaders == nil {
newE.ExtraHTTPHeaders = cloneExtraHTTPHeaders(main.ExtraHTTPHeaders)
}

if e.UseSSL != nil {
newE.useSSL = *e.UseSSL
Expand Down Expand Up @@ -286,6 +291,18 @@ func loadHTTPAdditionalEndpoints(main Endpoint, l *LogsConfigKeys, intakeTrackTy
return newEndpoints
}

func cloneExtraHTTPHeaders(headers map[string]string) map[string]string {
if len(headers) == 0 {
return nil
}

clonedHeaders := make(map[string]string, len(headers))
for k, v := range headers {
clonedHeaders[k] = v
}
return clonedHeaders
}

// GetAPIKey returns the latest API Key for the Endpoint, including when the configuration gets updated at runtime
func (e *Endpoint) GetAPIKey() string {
return e.apiKey.Load()
Expand Down
7 changes: 7 additions & 0 deletions pkg/config/config_template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1226,6 +1226,13 @@ api_key:
#
# dd_evp_origin_version: ""

# # @param additional_http_headers - object - optional - default: {}
# # @env DD_LOGS_CONFIG_ADDITIONAL_HTTP_HEADERS - JSON object of string to string - optional - default: {}
# # Additional headers to send with logs HTTP and gRPC requests.
# # Required Agent headers are set after these values and cannot be overridden.
#
# additional_http_headers: {}

# # @param grpc - custom object - optional
# # Configuration for the logs gRPC transport.
#
Expand Down
Binary file modified pkg/config/schema/compressed/core_schema.yaml.zstd
Binary file not shown.
13 changes: 13 additions & 0 deletions pkg/config/schema/core_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2017,6 +2017,19 @@ properties:
tags:
- golang_type:int
- template_section:LogsAgent
additional_http_headers:
node_type: setting
type: object
default: {}
visibility: public
description: |-
Additional headers to send with logs HTTP and gRPC requests.
Required Agent headers are set after these values and cannot be overridden.
example: '{}'
additionalProperties:
type: string
tags:
- template_section:LogsAgent
force_use_tcp:
node_type: setting
type: boolean
Expand Down
1 change: 1 addition & 0 deletions pkg/config/setup/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1515,6 +1515,7 @@ func bindEnvAndSetLogsConfigKeys(config pkgconfigmodel.Setup, prefix string) {
config.BindEnvAndSetDefault(prefix+"use_rust_tokenizer", false)
config.BindEnvAndSetDefault(prefix+"stream_lifetime", DefaultLogsStreamLifetime)
config.BindEnvAndSetDefault(prefix+"dd_evp_origin_version", "")
config.BindEnvAndSetDefault(prefix+"additional_http_headers", map[string]string{})
config.BindEnvAndSetDefault("logs_config.grpc.dual_send_uuids_enabled", true)
config.BindEnvAndSetDefault("logs_config.grpc.max_inflight_payloads", DefaultMaxInflightPayloads)
config.BindEnvAndSetDefault("logs_config.message_channel_size", 100)
Expand Down
7 changes: 4 additions & 3 deletions pkg/logs/client/http/destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,10 @@ func (d *Destination) unconditionalSend(payload *message.Payload) (err error) {
return err
}

for k, v := range d.endpoint.ExtraHTTPHeaders {
req.Header.Set(k, v)
}

req.Header.Set("DD-API-KEY", d.endpoint.GetAPIKey())
req.Header.Set("Content-Type", d.contentType)
req.Header.Set("User-Agent", "datadog-agent/"+version.AgentVersion)
Expand All @@ -382,9 +386,6 @@ func (d *Destination) unconditionalSend(payload *message.Payload) (err error) {
req.Header.Set("dd-message-timestamp", strconv.FormatInt(getMessageTimestamp(payload.MessageMetas), 10))
then := time.Now()
req.Header.Set("dd-current-timestamp", strconv.FormatInt(then.UnixMilli(), 10))
for k, v := range d.endpoint.ExtraHTTPHeaders {
req.Header.Set(k, v)
}

req = req.WithContext(ctx)
resp, err := d.client.Do(req)
Expand Down
25 changes: 25 additions & 0 deletions pkg/logs/client/http/destination_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,31 @@ func TestDestinationSendsOriginVersionOverride(t *testing.T) {
assert.Equal(t, "cluster-a", server.request.Header.Get("dd-evp-origin-version"))
}

func TestDestinationSendsAdditionalHeadersWithoutOverridingRequiredHeaders(t *testing.T) {
cfg := configmock.New(t)
server := NewTestServer(200, cfg)
defer server.httpServer.Close()

server.Destination.protocol = "test-proto"
server.Destination.origin = "test-origin"
server.Destination.endpoint.ExtraHTTPHeaders = map[string]string{
"x-custom-routing": "cluster-a",
"dd-api-key": "bad-key",
"dd-protocol": "bad-proto",
"dd-evp-origin": "bad-origin",
"dd-evp-origin-version": "bad-version",
}

err := server.Destination.unconditionalSend(&message.Payload{Encoded: []byte("payload")})

assert.Nil(t, err)
assert.Equal(t, "cluster-a", server.request.Header.Get("x-custom-routing"))
assert.Equal(t, server.Destination.endpoint.GetAPIKey(), server.request.Header.Get("dd-api-key"))
assert.Equal(t, "test-proto", server.request.Header.Get("dd-protocol"))
assert.Equal(t, "test-origin", server.request.Header.Get("dd-evp-origin"))
assert.Equal(t, originVersionOrDefault(""), server.request.Header.Get("dd-evp-origin-version"))
}

func TestDestinationDoesntSendEmptyV2Protocol(t *testing.T) {
cfg := configmock.New(t)
server := NewTestServer(200, cfg)
Expand Down
11 changes: 5 additions & 6 deletions pkg/logs/sender/grpc/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,13 @@ type headerCredentials struct {

// GetRequestMetadata adds required headers to each RPC call
func (h *headerCredentials) GetRequestMetadata(_ context.Context, _ ...string) (map[string]string, error) {
headers := map[string]string{
"dd-api-key": h.endpoint.GetAPIKey(),
headers := make(map[string]string, 4+len(h.endpoint.ExtraHTTPHeaders))
for k, v := range h.endpoint.ExtraHTTPHeaders {
headers[k] = v
}

headers["dd-api-key"] = h.endpoint.GetAPIKey()

// Add protocol header if specified
if h.endpoint.Protocol != "" {
headers["dd-protocol"] = string(h.endpoint.Protocol)
Expand All @@ -67,10 +70,6 @@ func (h *headerCredentials) GetRequestMetadata(_ context.Context, _ ...string) (
headers["dd-content-encoding"] = "identity"
}

for k, v := range h.endpoint.ExtraHTTPHeaders {
headers[k] = v
}

return headers, nil
}

Expand Down
26 changes: 26 additions & 0 deletions pkg/logs/sender/grpc/stream_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,32 @@ func TestHeaderCredentialsSendsOriginVersionOverride(t *testing.T) {
require.Equal(t, "cluster-a", headers["dd-evp-origin-version"])
}

func TestHeaderCredentialsSendsAdditionalHeadersWithoutOverridingRequiredHeaders(t *testing.T) {
endpoint := config.NewEndpoint("test-api-key", "", "test-host", 443, config.EmptyPathPrefix, true)
endpoint.Protocol = "test-proto"
endpoint.Origin = "test-origin"
endpoint.UseCompression = true
endpoint.CompressionKind = "zstd"
endpoint.ExtraHTTPHeaders = map[string]string{
"x-custom-routing": "cluster-a",
"dd-api-key": "bad-key",
"dd-protocol": "bad-proto",
"dd-evp-origin": "bad-origin",
"dd-evp-origin-version": "bad-version",
"dd-content-encoding": "bad-encoding",
}

headers, err := (&headerCredentials{endpoint: endpoint}).GetRequestMetadata(context.Background())

require.NoError(t, err)
require.Equal(t, "cluster-a", headers["x-custom-routing"])
require.Equal(t, "test-api-key", headers["dd-api-key"])
require.Equal(t, "test-proto", headers["dd-protocol"])
require.Equal(t, "test-origin", headers["dd-evp-origin"])
require.Equal(t, originVersionOrDefault(""), headers["dd-evp-origin-version"])
require.Equal(t, "zstd", headers["dd-content-encoding"])
}

func (m *mockLogsStream) Send(batch *statefulpb.StatefulBatch) error {
m.mu.Lock()
if m.sendErr != nil {
Expand Down
Loading