diff --git a/comp/logs-library/pipeline/pipeline.go b/comp/logs-library/pipeline/pipeline.go index efe42a0fca1b..1694365e1177 100644 --- a/comp/logs-library/pipeline/pipeline.go +++ b/comp/logs-library/pipeline/pipeline.go @@ -114,6 +114,24 @@ func getStrategy( encoder = compressor.NewCompressor(endpoints.Main.CompressionKind, endpoints.Main.CompressionLevel) } if endpoints.UseGRPC { + // If there are HTTP additional endpoints, use DualStrategy in gRPC-primary mode: + // gRPC is the real primary sender, HTTP additional endpoints are secondaries. + // No shadow headers are injected in this mode. + if httpAdditionals := getHTTPAdditionalEndpoints(endpoints); len(httpAdditionals) > 0 && !serverlessMeta.IsEnabled() { + return grpcsender.NewGRPCPrimaryDualStrategy( + inputChan, + outputChan, + flushChan, + encoder, + cfg, + endpoints, + httpAdditionals, + compressor, + pipelineMonitor, + instanceID, + ) + } + tokenizer := rtokenizer.NewRustTokenizer() translator := grpcsender.NewMessageTranslator(instanceID, tokenizer) // TODO: Consider sharing cluster manager across pipelines for better pattern clustering: @@ -122,6 +140,7 @@ func getStrategy( return grpcsender.NewBatchStrategy(statefulInputChan, outputChan, flushChan, endpoints.BatchWait, endpoints.BatchMaxSize, endpoints.BatchMaxContentSize, "logs", encoder, pipelineMonitor, instanceID) } + // If 2nd endpoint is gRPC, use DualStrategy in shadow mode if grpcEndpoint, ok := firstGRPCAdditionalEndpoint(endpoints); ok && !serverlessMeta.IsEnabled() { grpcComp := buildEndpointCompressor(compressor, grpcEndpoint) return grpcsender.NewDualStrategy(inputChan, outputChan, flushChan, grpcEndpoint, grpcComp, cfg, endpoints, serverlessMeta, encoder, pipelineMonitor, instanceID) @@ -153,6 +172,17 @@ func firstGRPCAdditionalEndpoint(endpoints *config.Endpoints) (config.Endpoint, return config.Endpoint{}, false } +// getHTTPAdditionalEndpoints returns all additional endpoints that use HTTP (not gRPC). +func getHTTPAdditionalEndpoints(endpoints *config.Endpoints) []config.Endpoint { + var result []config.Endpoint + for _, ep := range endpoints.Endpoints[1:] { + if !ep.UseGRPC { + result = append(result, ep) + } + } + return result +} + func buildEndpointCompressor(comp logscompression.Component, endpoint config.Endpoint) compressioncommon.Compressor { if endpoint.UseCompression { return comp.NewCompressor(endpoint.CompressionKind, endpoint.CompressionLevel) diff --git a/comp/logs/agent/agentimpl/agent.go b/comp/logs/agent/agentimpl/agent.go index f56c093493d6..0c15b4f1425d 100644 --- a/comp/logs/agent/agentimpl/agent.go +++ b/comp/logs/agent/agentimpl/agent.go @@ -218,8 +218,9 @@ func (a *logAgent) start(context.Context) error { a.startPipeline() - // If we're currently sending over TCP, attempt restart over HTTP - if !endpoints.UseHTTP { + // If we're currently sending over TCP (not HTTP and not gRPC), attempt restart over HTTP. + // Skip for gRPC: gRPC endpoints also have UseHTTP=false but should not be overridden. + if !endpoints.UseHTTP && !endpoints.UseGRPC { a.smartHTTPRestart() } return nil diff --git a/pkg/logs/sender/grpc/dual_strategy.go b/pkg/logs/sender/grpc/dual_strategy.go index 597cf2b8dd39..f12a6ef9b058 100644 --- a/pkg/logs/sender/grpc/dual_strategy.go +++ b/pkg/logs/sender/grpc/dual_strategy.go @@ -7,11 +7,14 @@ package grpc import ( "github.com/DataDog/datadog-agent/comp/logs-library/metrics" + "github.com/DataDog/datadog-agent/comp/logs-library/processor" "github.com/DataDog/datadog-agent/comp/logs-library/sender" "github.com/DataDog/datadog-agent/comp/logs/agent/config" + logscompression "github.com/DataDog/datadog-agent/comp/serializer/logscompression/def" pkgconfigmodel "github.com/DataDog/datadog-agent/pkg/config/model" pkgconfigsetup "github.com/DataDog/datadog-agent/pkg/config/setup" "github.com/DataDog/datadog-agent/pkg/logs/client" + httpClient "github.com/DataDog/datadog-agent/pkg/logs/client/http" "github.com/DataDog/datadog-agent/pkg/logs/message" rtokenizer "github.com/DataDog/datadog-agent/pkg/logs/patterns/tokenizer/rust" compressioncommon "github.com/DataDog/datadog-agent/pkg/util/compression" @@ -20,20 +23,38 @@ import ( const grpcSecondaryBufferSize = 10000 -// DualStrategy fans messages to both the primary HTTP strategy and a secondary gRPC stateful sender. +// DualStrategy fans messages to two transports simultaneously. +// +// HTTP-primary mode (NewDualStrategy): HTTP uses the pipeline's outputChan; gRPC is a +// standalone secondary with shadow headers injected. +// +// gRPC-primary mode (NewGRPCPrimaryDualStrategy): gRPC uses the pipeline's outputChan; +// HTTP additional endpoints are standalone secondaries. No shadow headers are injected. type DualStrategy struct { inputChan chan *message.Message - httpInputChan chan *message.Message - grpcChan chan *message.Message + primaryChan chan *message.Message // feeds the primary strategy (outputChan side) + secondaryChan chan *message.Message // feeds the standalone secondary primary sender.Strategy - endpoint config.Endpoint - comp compressioncommon.Compressor - cfg pkgconfigmodel.Reader - endpoints *config.Endpoints - instanceID string - done chan struct{} + grpcIsPrimary bool + + // HTTP-primary mode: standalone gRPC secondary + grpcEndpoint config.Endpoint + grpcComp compressioncommon.Compressor + + // gRPC-primary mode: standalone HTTP secondaries + httpEndpoints []config.Endpoint + compression logscompression.Component + + cfg pkgconfigmodel.Reader + endpoints *config.Endpoints + pipelineMonitor metrics.PipelineMonitor + instanceID string + done chan struct{} } +// NewDualStrategy creates a DualStrategy in HTTP-primary mode. +// HTTP sends via the pipeline's outputChan; gRPC is a standalone secondary. +// Shadow headers (dd-shadow-only) are injected on the gRPC endpoint. func NewDualStrategy( inputChan chan *message.Message, outputChan chan *message.Payload, @@ -52,9 +73,9 @@ func NewDualStrategy( } grpcEndpoint.ExtraHTTPHeaders["dd-shadow-only"] = "true" - httpInputChan := make(chan *message.Message, cap(inputChan)) - primary := sender.NewBatchStrategy( - httpInputChan, + httpPrimaryChan := make(chan *message.Message, cap(inputChan)) + httpPrimaryBatchStrategy := sender.NewBatchStrategy( + httpPrimaryChan, outputChan, flushChan, serverlessMeta, @@ -68,38 +89,99 @@ func NewDualStrategy( ) return &DualStrategy{ - inputChan: inputChan, - httpInputChan: httpInputChan, - grpcChan: make(chan *message.Message, grpcSecondaryBufferSize), - primary: primary, - endpoint: grpcEndpoint, - comp: comp, - cfg: cfg, - endpoints: endpoints, - instanceID: instanceID, - done: make(chan struct{}), + inputChan: inputChan, + primaryChan: httpPrimaryChan, + secondaryChan: make(chan *message.Message, grpcSecondaryBufferSize), + primary: httpPrimaryBatchStrategy, + grpcIsPrimary: false, + grpcEndpoint: grpcEndpoint, + grpcComp: comp, + cfg: cfg, + endpoints: endpoints, + pipelineMonitor: pipelineMonitor, + instanceID: instanceID, + done: make(chan struct{}), + } +} + +// NewGRPCPrimaryDualStrategy creates a DualStrategy in gRPC-primary mode. +// gRPC sends via the pipeline's outputChan; HTTP additional endpoints are standalone secondaries. +// No shadow headers are injected on either path. +func NewGRPCPrimaryDualStrategy( + inputChan chan *message.Message, + outputChan chan *message.Payload, + flushChan chan struct{}, + grpcComp compressioncommon.Compressor, + cfg pkgconfigmodel.Reader, + endpoints *config.Endpoints, + httpEndpoints []config.Endpoint, + compression logscompression.Component, + pipelineMonitor metrics.PipelineMonitor, + instanceID string, +) sender.Strategy { + primaryChan := make(chan *message.Message, grpcSecondaryBufferSize) + + // Build the gRPC primary strategy: primaryChan → translator → statefulChan → BatchStrategy → outputChan + tokenizer := rtokenizer.NewRustTokenizer() + translator := NewMessageTranslator(instanceID+"-grpc", tokenizer) + statefulChan := translator.Start(primaryChan, cfg.GetInt("logs_config.message_channel_size")) + primary := NewBatchStrategy( + statefulChan, + outputChan, + flushChan, + endpoints.BatchWait, + endpoints.BatchMaxSize, + endpoints.BatchMaxContentSize, + "logs", + grpcComp, + pipelineMonitor, + instanceID, + ) + + return &DualStrategy{ + inputChan: inputChan, + primaryChan: primaryChan, + secondaryChan: make(chan *message.Message, grpcSecondaryBufferSize), + primary: primary, + grpcIsPrimary: true, + httpEndpoints: httpEndpoints, + compression: compression, + cfg: cfg, + endpoints: endpoints, + pipelineMonitor: pipelineMonitor, + instanceID: instanceID, + done: make(chan struct{}), } } func (d *DualStrategy) Start() { d.primary.Start() - conn, grpcClient, err := newGRPCClient(d.endpoint) + if d.grpcIsPrimary { + d.startHTTPSecondary() + } else { + d.startGRPCSecondary() + } +} + +// startGRPCSecondary handles the HTTP-primary mode: standalone gRPC as secondary. +func (d *DualStrategy) startGRPCSecondary() { + conn, grpcClient, err := newGRPCClient(d.grpcEndpoint) if err != nil { log.Errorf("Failed to create gRPC connection for dual-send in pipeline %s: %v; continuing with HTTP only", d.instanceID, err) go func() { defer close(d.done) for msg := range d.inputChan { - d.httpInputChan <- msg + d.primaryChan <- msg } - close(d.httpInputChan) + close(d.primaryChan) }() return } tokenizer := rtokenizer.NewRustTokenizer() translator := NewMessageTranslator(d.instanceID+"-dual", tokenizer) - statefulChan := translator.Start(d.grpcChan, d.cfg.GetInt("logs_config.message_channel_size")) + statefulChan := translator.Start(d.secondaryChan, d.cfg.GetInt("logs_config.message_channel_size")) payloadChan := make(chan *message.Payload, inputChanBufferSize) grpcFlushChan := make(chan struct{}, 1) @@ -112,7 +194,7 @@ func (d *DualStrategy) Start() { d.endpoints.BatchMaxSize, d.endpoints.BatchMaxContentSize, "grpc-dual", - d.comp, + d.grpcComp, pipelineMonitor, d.instanceID, ) @@ -133,9 +215,9 @@ func (d *DualStrategy) Start() { conn, grpcClient, &sender.NoopSink{}, - d.endpoint, + d.grpcEndpoint, config.StreamLifetime(d.cfg), - d.comp, + d.grpcComp, maxInflight, ) worker.start() @@ -152,12 +234,93 @@ func (d *DualStrategy) Start() { }() for msg := range d.inputChan { - d.httpInputChan <- msg - d.grpcChan <- msg + d.primaryChan <- msg // HTTP primary + d.secondaryChan <- msg // gRPC secondary + } + + close(d.primaryChan) + close(d.secondaryChan) + }() +} + +// startHTTPSecondary handles the gRPC-primary mode: standalone HTTP destinations as secondary. +// Messages are cloned and JSON-encoded before being fed into the existing sender.NewBatchStrategy, +// which handles batching and payload creation. No shadow headers are injected. +func (d *DualStrategy) startHTTPSecondary() { + destCtx := client.NewDestinationsContext() + destCtx.Start() + + // One input channel per HTTP destination for fan-out of payloads. + destPayloadChans := make([]chan *message.Payload, len(d.httpEndpoints)) + httpStopChans := make([]<-chan struct{}, len(d.httpEndpoints)) + for i, ep := range d.httpEndpoints { + ch := make(chan *message.Payload, 100) + destPayloadChans[i] = ch + destMeta := client.NewDestinationMetadata("logs", d.instanceID, "additional-http", ep.Host, "") + dest := httpClient.NewDestination(ep, httpClient.JSONContentType, destCtx, true, destMeta, d.cfg, 1, 4, d.pipelineMonitor, d.instanceID, nil) + httpStopChans[i] = dest.Start(ch, nil, nil) + } + + // HTTP batch strategy: clonedChan → JSON payloads → httpOutputChan + clonedChan := make(chan *message.Message, grpcSecondaryBufferSize) + httpOutputChan := make(chan *message.Payload, 100) + httpFlushChan := make(chan struct{}, 1) + httpComp := d.compression.NewCompressor(compressioncommon.NoneKind, 0) + httpBatch := sender.NewBatchStrategy( + clonedChan, + httpOutputChan, + httpFlushChan, + sender.NewServerlessMeta(false), + d.endpoints.BatchWait, + d.endpoints.BatchMaxSize, + d.endpoints.BatchMaxContentSize, + "logs-http-additional", + httpComp, + d.pipelineMonitor, + d.instanceID, + ) + httpBatch.Start() + + // Fan httpOutputChan payloads to all HTTP destinations. + go func() { + for payload := range httpOutputChan { + for _, ch := range destPayloadChans { + ch <- payload + } + } + for _, ch := range destPayloadChans { + close(ch) + } + }() + + go func() { + defer close(d.done) + defer func() { + httpBatch.Stop() + for _, stopChan := range httpStopChans { + if stopChan != nil { + <-stopChan + } + } + destCtx.Stop() + }() + + for msg := range d.inputChan { + d.primaryChan <- msg // gRPC primary (unmodified) + + // Clone the message and JSON-encode for HTTP. + // Message embeds MessageContent by value so a copy is safe; + // JSONEncoder replaces the Content slice rather than modifying in place. + cloned := *msg + if err := processor.JSONEncoder.Encode(&cloned, msg.MessageMetadata.Hostname); err != nil { + log.Errorf("dual_strategy: failed to JSON-encode message for HTTP secondary: %v", err) + continue + } + clonedChan <- &cloned } - close(d.httpInputChan) - close(d.grpcChan) + close(d.primaryChan) + close(clonedChan) }() }