diff --git a/comp/logs/agent/config/config.go b/comp/logs/agent/config/config.go index 8b9700e23889..891dc18b2eb3 100644 --- a/comp/logs/agent/config/config.go +++ b/comp/logs/agent/config/config.go @@ -355,6 +355,7 @@ func buildHTTPEndpoints(coreConfig pkgconfigmodel.Reader, logsConfig *LogsConfig main.TrackType = intakeTrackType main.Protocol = intakeProtocol main.Origin = intakeOrigin + main.OriginVersion = logsConfig.ddEVPOriginVersion() } else { main.Version = EPIntakeVersion1 } @@ -424,6 +425,7 @@ func buildHTTPEndpoints(coreConfig pkgconfigmodel.Reader, logsConfig *LogsConfig e.TrackType = intakeTrackType e.Protocol = intakeProtocol e.Origin = intakeOrigin + e.OriginVersion = main.OriginVersion e.onConfigUpdateFromReaderMainEndpoint(coreConfig) additionals = append(additionals, e) diff --git a/comp/logs/agent/config/config_keys.go b/comp/logs/agent/config/config_keys.go index 19827e269ba9..18927166c642 100644 --- a/comp/logs/agent/config/config_keys.go +++ b/comp/logs/agent/config/config_keys.go @@ -105,6 +105,10 @@ func (l *LogsConfigKeys) isGRPCUse() bool { return l.getConfig().GetBool(l.getConfigKey("use_grpc")) } +func (l *LogsConfigKeys) ddEVPOriginVersion() string { + return l.getConfig().GetString(l.getConfigKey("dd_evp_origin_version")) +} + func (l *LogsConfigKeys) logsNoSSL() bool { return l.getConfig().GetBool(l.getConfigKey("logs_no_ssl")) } diff --git a/comp/logs/agent/config/config_test.go b/comp/logs/agent/config/config_test.go index cbd67629cd3a..bdc763106285 100644 --- a/comp/logs/agent/config/config_test.go +++ b/comp/logs/agent/config/config_test.go @@ -530,6 +530,16 @@ func (suite *ConfigTestSuite) TestMultipleHttpEndpointsInConfig2() { suite.compareEndpoints(expectedEndpoints, endpoints) } +func (suite *ConfigTestSuite) TestLogsEVPOriginVersionOverride() { + suite.config.SetWithoutSource("api_key", "123") + suite.config.SetWithoutSource("logs_config.dd_evp_origin_version", "cluster-a") + + endpoints, err := BuildHTTPEndpoints(suite.config, "test-track", "test-proto", "test-source") + + suite.Nil(err) + suite.Equal("cluster-a", endpoints.Main.OriginVersion) +} + func (suite *ConfigTestSuite) TestMultipleTCPEndpointsInConf() { suite.config.SetWithoutSource("api_key", "123") suite.config.SetWithoutSource("logs_config.logs_dd_url", "agent-http-intake.logs.datadoghq.com:443") diff --git a/comp/logs/agent/config/endpoints.go b/comp/logs/agent/config/endpoints.go index d063f63d2b12..d9350a8c49a1 100644 --- a/comp/logs/agent/config/endpoints.go +++ b/comp/logs/agent/config/endpoints.go @@ -88,6 +88,8 @@ type Endpoint struct { TrackType IntakeTrackType Protocol IntakeProtocol Origin IntakeOrigin + // OriginVersion overrides the dd-evp-origin-version header when set. + OriginVersion string ExtraHTTPHeaders map[string]string } @@ -208,6 +210,7 @@ func loadTCPAdditionalEndpoints(main Endpoint, l *LogsConfigKeys, registerCallba newE.TrackType = e.TrackType newE.Protocol = e.Protocol newE.Origin = e.Origin + newE.OriginVersion = e.OriginVersion newE.UseGRPC = e.UseGRPC if e.UseSSL != nil { @@ -255,6 +258,7 @@ func loadHTTPAdditionalEndpoints(main Endpoint, l *LogsConfigKeys, intakeTrackTy newE.TrackType = e.TrackType newE.Protocol = e.Protocol newE.Origin = e.Origin + newE.OriginVersion = e.OriginVersion if e.UseSSL != nil { newE.useSSL = *e.UseSSL @@ -269,6 +273,9 @@ func loadHTTPAdditionalEndpoints(main Endpoint, l *LogsConfigKeys, intakeTrackTy newE.TrackType = intakeTrackType newE.Protocol = intakeProtocol newE.Origin = intakeOrigin + if newE.OriginVersion == "" { + newE.OriginVersion = main.OriginVersion + } } newEndpoints = append(newEndpoints, newE) diff --git a/pkg/config/config_template.yaml b/pkg/config/config_template.yaml index 79b8fda469a7..ce349451ca52 100644 --- a/pkg/config/config_template.yaml +++ b/pkg/config/config_template.yaml @@ -1219,6 +1219,13 @@ api_key: # # http_timeout: 10 +# # @param dd_evp_origin_version - string - optional +# # @env DD_LOGS_CONFIG_DD_EVP_ORIGIN_VERSION - string - optional +# # Override the dd-evp-origin-version header sent with logs HTTP and gRPC requests. +# # When empty, the Agent version is used. +# +# dd_evp_origin_version: "" + # # @param grpc - custom object - optional # # Configuration for the logs gRPC transport. # diff --git a/pkg/config/setup/config.go b/pkg/config/setup/config.go index 5f8e4a083549..e5d552297e94 100644 --- a/pkg/config/setup/config.go +++ b/pkg/config/setup/config.go @@ -1514,6 +1514,7 @@ func bindEnvAndSetLogsConfigKeys(config pkgconfigmodel.Setup, prefix string) { config.BindEnvAndSetDefault(prefix+"use_grpc", false) config.BindEnvAndSetDefault(prefix+"use_rust_tokenizer", false) config.BindEnvAndSetDefault(prefix+"stream_lifetime", DefaultLogsStreamLifetime) + config.BindEnvAndSetDefault(prefix+"dd_evp_origin_version", "") config.BindEnvAndSetDefault("logs_config.grpc.dual_send_uuids_enabled", true) config.BindEnvAndSetDefault("logs_config.grpc.max_inflight_payloads", DefaultMaxInflightPayloads) config.BindEnvAndSetDefault("logs_config.message_channel_size", 100) diff --git a/pkg/logs/client/http/destination.go b/pkg/logs/client/http/destination.go index 9290d102bca1..0837489e500f 100644 --- a/pkg/logs/client/http/destination.go +++ b/pkg/logs/client/http/destination.go @@ -81,6 +81,7 @@ type Destination struct { destinationsContext *client.DestinationsContext protocol config.IntakeProtocol origin config.IntakeOrigin + originVersion string isMRF bool // Concurrency @@ -180,6 +181,7 @@ func newDestination(endpoint config.Endpoint, backoff: policy, protocol: endpoint.Protocol, origin: endpoint.Origin, + originVersion: endpoint.OriginVersion, lastRetryError: nil, retryLock: sync.Mutex{}, shouldRetry: shouldRetry, @@ -203,6 +205,13 @@ func errorToTag(err error) string { return "non-retryable" } +func originVersionOrDefault(originVersion string) string { + if originVersion != "" { + return originVersion + } + return version.AgentVersion +} + // IsMRF indicates that this destination is a Multi-Region Failover destination. func (d *Destination) IsMRF() bool { return d.isMRF @@ -368,7 +377,7 @@ func (d *Destination) unconditionalSend(payload *message.Payload) (err error) { } if d.origin != "" { req.Header.Set("DD-EVP-ORIGIN", string(d.origin)) - req.Header.Set("DD-EVP-ORIGIN-VERSION", version.AgentVersion) + req.Header.Set("DD-EVP-ORIGIN-VERSION", originVersionOrDefault(d.originVersion)) } req.Header.Set("dd-message-timestamp", strconv.FormatInt(getMessageTimestamp(payload.MessageMetas), 10)) then := time.Now() diff --git a/pkg/logs/client/http/destination_test.go b/pkg/logs/client/http/destination_test.go index c070fde727ac..d5cd42bd43e6 100644 --- a/pkg/logs/client/http/destination_test.go +++ b/pkg/logs/client/http/destination_test.go @@ -377,6 +377,19 @@ func TestDestinationSendsV2Protocol(t *testing.T) { assert.Equal(t, server.request.Header.Get("dd-protocol"), "test-proto") } +func TestDestinationSendsOriginVersionOverride(t *testing.T) { + cfg := configmock.New(t) + server := NewTestServer(200, cfg) + defer server.httpServer.Close() + + server.Destination.origin = "test-origin" + server.Destination.originVersion = "cluster-a" + err := server.Destination.unconditionalSend(&message.Payload{Encoded: []byte("payload")}) + assert.Nil(t, err) + assert.Equal(t, "test-origin", server.request.Header.Get("dd-evp-origin")) + assert.Equal(t, "cluster-a", server.request.Header.Get("dd-evp-origin-version")) +} + func TestDestinationDoesntSendEmptyV2Protocol(t *testing.T) { cfg := configmock.New(t) server := NewTestServer(200, cfg) diff --git a/pkg/logs/sender/grpc/sender.go b/pkg/logs/sender/grpc/sender.go index c2d4a4e33253..2e95090862ea 100644 --- a/pkg/logs/sender/grpc/sender.go +++ b/pkg/logs/sender/grpc/sender.go @@ -58,7 +58,7 @@ func (h *headerCredentials) GetRequestMetadata(_ context.Context, _ ...string) ( // Add origin headers if specified if h.endpoint.Origin != "" { headers["dd-evp-origin"] = string(h.endpoint.Origin) - headers["dd-evp-origin-version"] = version.AgentVersion + headers["dd-evp-origin-version"] = originVersionOrDefault(h.endpoint.OriginVersion) } if h.endpoint.UseCompression { @@ -79,6 +79,13 @@ func (h *headerCredentials) RequireTransportSecurity() bool { return false // We handle TLS separately via WithTransportCredentials } +func originVersionOrDefault(originVersion string) string { + if originVersion != "" { + return originVersion + } + return version.AgentVersion +} + // Sender implements PipelineComponent interface for gRPC log transmission. // It manages multiple streamWorker instances (one per pipeline) using round-robin distribution. // It is similar to Sender/Worker architecture diff --git a/pkg/logs/sender/grpc/stream_worker_test.go b/pkg/logs/sender/grpc/stream_worker_test.go index 6719be1c33d1..026a61f30ada 100644 --- a/pkg/logs/sender/grpc/stream_worker_test.go +++ b/pkg/logs/sender/grpc/stream_worker_test.go @@ -83,6 +83,18 @@ func newMockLogsStream(ctx context.Context) *mockLogsStream { } } +func TestHeaderCredentialsSendsOriginVersionOverride(t *testing.T) { + endpoint := config.NewEndpoint("test-api-key", "", "test-host", 443, config.EmptyPathPrefix, true) + endpoint.Origin = "test-origin" + endpoint.OriginVersion = "cluster-a" + + headers, err := (&headerCredentials{endpoint: endpoint}).GetRequestMetadata(context.Background()) + + require.NoError(t, err) + require.Equal(t, "test-origin", headers["dd-evp-origin"]) + require.Equal(t, "cluster-a", headers["dd-evp-origin-version"]) +} + func (m *mockLogsStream) Send(batch *statefulpb.StatefulBatch) error { m.mu.Lock() if m.sendErr != nil {