Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions comp/logs/agent/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,7 @@ func buildHTTPEndpoints(coreConfig pkgconfigmodel.Reader, logsConfig *LogsConfig
main.TrackType = intakeTrackType
main.Protocol = intakeProtocol
main.Origin = intakeOrigin
main.OriginVersion = logsConfig.ddEVPOriginVersion()
} else {
main.Version = EPIntakeVersion1
}
Expand Down Expand Up @@ -424,6 +425,7 @@ func buildHTTPEndpoints(coreConfig pkgconfigmodel.Reader, logsConfig *LogsConfig
e.TrackType = intakeTrackType
e.Protocol = intakeProtocol
e.Origin = intakeOrigin
e.OriginVersion = main.OriginVersion
e.onConfigUpdateFromReaderMainEndpoint(coreConfig)

additionals = append(additionals, e)
Expand Down
4 changes: 4 additions & 0 deletions comp/logs/agent/config/config_keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ func (l *LogsConfigKeys) isGRPCUse() bool {
return l.getConfig().GetBool(l.getConfigKey("use_grpc"))
}

func (l *LogsConfigKeys) ddEVPOriginVersion() string {
return l.getConfig().GetString(l.getConfigKey("dd_evp_origin_version"))
}

func (l *LogsConfigKeys) logsNoSSL() bool {
return l.getConfig().GetBool(l.getConfigKey("logs_no_ssl"))
}
Expand Down
10 changes: 10 additions & 0 deletions comp/logs/agent/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,16 @@ func (suite *ConfigTestSuite) TestMultipleHttpEndpointsInConfig2() {
suite.compareEndpoints(expectedEndpoints, endpoints)
}

func (suite *ConfigTestSuite) TestLogsEVPOriginVersionOverride() {
suite.config.SetWithoutSource("api_key", "123")
suite.config.SetWithoutSource("logs_config.dd_evp_origin_version", "cluster-a")

endpoints, err := BuildHTTPEndpoints(suite.config, "test-track", "test-proto", "test-source")

suite.Nil(err)
suite.Equal("cluster-a", endpoints.Main.OriginVersion)
}

func (suite *ConfigTestSuite) TestMultipleTCPEndpointsInConf() {
suite.config.SetWithoutSource("api_key", "123")
suite.config.SetWithoutSource("logs_config.logs_dd_url", "agent-http-intake.logs.datadoghq.com:443")
Expand Down
7 changes: 7 additions & 0 deletions comp/logs/agent/config/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ type Endpoint struct {
TrackType IntakeTrackType
Protocol IntakeProtocol
Origin IntakeOrigin
// OriginVersion overrides the dd-evp-origin-version header when set.
OriginVersion string

ExtraHTTPHeaders map[string]string
}
Expand Down Expand Up @@ -208,6 +210,7 @@ func loadTCPAdditionalEndpoints(main Endpoint, l *LogsConfigKeys, registerCallba
newE.TrackType = e.TrackType
newE.Protocol = e.Protocol
newE.Origin = e.Origin
newE.OriginVersion = e.OriginVersion
newE.UseGRPC = e.UseGRPC

if e.UseSSL != nil {
Expand Down Expand Up @@ -255,6 +258,7 @@ func loadHTTPAdditionalEndpoints(main Endpoint, l *LogsConfigKeys, intakeTrackTy
newE.TrackType = e.TrackType
newE.Protocol = e.Protocol
newE.Origin = e.Origin
newE.OriginVersion = e.OriginVersion

if e.UseSSL != nil {
newE.useSSL = *e.UseSSL
Expand All @@ -269,6 +273,9 @@ func loadHTTPAdditionalEndpoints(main Endpoint, l *LogsConfigKeys, intakeTrackTy
newE.TrackType = intakeTrackType
newE.Protocol = intakeProtocol
newE.Origin = intakeOrigin
if newE.OriginVersion == "" {
newE.OriginVersion = main.OriginVersion
}
}

newEndpoints = append(newEndpoints, newE)
Expand Down
7 changes: 7 additions & 0 deletions pkg/config/config_template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1219,6 +1219,13 @@ api_key:
#
# http_timeout: 10

# # @param dd_evp_origin_version - string - optional
# # @env DD_LOGS_CONFIG_DD_EVP_ORIGIN_VERSION - string - optional
# # Override the dd-evp-origin-version header sent with logs HTTP and gRPC requests.
# # When empty, the Agent version is used.
#
# dd_evp_origin_version: ""

# # @param grpc - custom object - optional
# # Configuration for the logs gRPC transport.
#
Expand Down
1 change: 1 addition & 0 deletions pkg/config/setup/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1514,6 +1514,7 @@ func bindEnvAndSetLogsConfigKeys(config pkgconfigmodel.Setup, prefix string) {
config.BindEnvAndSetDefault(prefix+"use_grpc", false)
config.BindEnvAndSetDefault(prefix+"use_rust_tokenizer", false)
config.BindEnvAndSetDefault(prefix+"stream_lifetime", DefaultLogsStreamLifetime)
config.BindEnvAndSetDefault(prefix+"dd_evp_origin_version", "")
config.BindEnvAndSetDefault("logs_config.grpc.dual_send_uuids_enabled", true)
config.BindEnvAndSetDefault("logs_config.grpc.max_inflight_payloads", DefaultMaxInflightPayloads)
config.BindEnvAndSetDefault("logs_config.message_channel_size", 100)
Expand Down
11 changes: 10 additions & 1 deletion pkg/logs/client/http/destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ type Destination struct {
destinationsContext *client.DestinationsContext
protocol config.IntakeProtocol
origin config.IntakeOrigin
originVersion string
isMRF bool

// Concurrency
Expand Down Expand Up @@ -180,6 +181,7 @@ func newDestination(endpoint config.Endpoint,
backoff: policy,
protocol: endpoint.Protocol,
origin: endpoint.Origin,
originVersion: endpoint.OriginVersion,
lastRetryError: nil,
retryLock: sync.Mutex{},
shouldRetry: shouldRetry,
Expand All @@ -203,6 +205,13 @@ func errorToTag(err error) string {
return "non-retryable"
}

func originVersionOrDefault(originVersion string) string {
if originVersion != "" {
return originVersion
}
return version.AgentVersion
}

// IsMRF indicates that this destination is a Multi-Region Failover destination.
func (d *Destination) IsMRF() bool {
return d.isMRF
Expand Down Expand Up @@ -368,7 +377,7 @@ func (d *Destination) unconditionalSend(payload *message.Payload) (err error) {
}
if d.origin != "" {
req.Header.Set("DD-EVP-ORIGIN", string(d.origin))
req.Header.Set("DD-EVP-ORIGIN-VERSION", version.AgentVersion)
req.Header.Set("DD-EVP-ORIGIN-VERSION", originVersionOrDefault(d.originVersion))
}
req.Header.Set("dd-message-timestamp", strconv.FormatInt(getMessageTimestamp(payload.MessageMetas), 10))
then := time.Now()
Expand Down
13 changes: 13 additions & 0 deletions pkg/logs/client/http/destination_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,19 @@ func TestDestinationSendsV2Protocol(t *testing.T) {
assert.Equal(t, server.request.Header.Get("dd-protocol"), "test-proto")
}

func TestDestinationSendsOriginVersionOverride(t *testing.T) {
cfg := configmock.New(t)
server := NewTestServer(200, cfg)
defer server.httpServer.Close()

server.Destination.origin = "test-origin"
server.Destination.originVersion = "cluster-a"
err := server.Destination.unconditionalSend(&message.Payload{Encoded: []byte("payload")})
assert.Nil(t, err)
assert.Equal(t, "test-origin", server.request.Header.Get("dd-evp-origin"))
assert.Equal(t, "cluster-a", server.request.Header.Get("dd-evp-origin-version"))
}

func TestDestinationDoesntSendEmptyV2Protocol(t *testing.T) {
cfg := configmock.New(t)
server := NewTestServer(200, cfg)
Expand Down
9 changes: 8 additions & 1 deletion pkg/logs/sender/grpc/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (h *headerCredentials) GetRequestMetadata(_ context.Context, _ ...string) (
// Add origin headers if specified
if h.endpoint.Origin != "" {
headers["dd-evp-origin"] = string(h.endpoint.Origin)
headers["dd-evp-origin-version"] = version.AgentVersion
headers["dd-evp-origin-version"] = originVersionOrDefault(h.endpoint.OriginVersion)
}

if h.endpoint.UseCompression {
Expand All @@ -79,6 +79,13 @@ func (h *headerCredentials) RequireTransportSecurity() bool {
return false // We handle TLS separately via WithTransportCredentials
}

func originVersionOrDefault(originVersion string) string {
if originVersion != "" {
return originVersion
}
return version.AgentVersion
}

// Sender implements PipelineComponent interface for gRPC log transmission.
// It manages multiple streamWorker instances (one per pipeline) using round-robin distribution.
// It is similar to Sender/Worker architecture
Expand Down
12 changes: 12 additions & 0 deletions pkg/logs/sender/grpc/stream_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,18 @@ func newMockLogsStream(ctx context.Context) *mockLogsStream {
}
}

func TestHeaderCredentialsSendsOriginVersionOverride(t *testing.T) {
endpoint := config.NewEndpoint("test-api-key", "", "test-host", 443, config.EmptyPathPrefix, true)
endpoint.Origin = "test-origin"
endpoint.OriginVersion = "cluster-a"

headers, err := (&headerCredentials{endpoint: endpoint}).GetRequestMetadata(context.Background())

require.NoError(t, err)
require.Equal(t, "test-origin", headers["dd-evp-origin"])
require.Equal(t, "cluster-a", headers["dd-evp-origin-version"])
}

func (m *mockLogsStream) Send(batch *statefulpb.StatefulBatch) error {
m.mu.Lock()
if m.sendErr != nil {
Expand Down
Loading