Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions comp/logs-library/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,24 @@ func getStrategy(
encoder = compressor.NewCompressor(endpoints.Main.CompressionKind, endpoints.Main.CompressionLevel)
}
if endpoints.UseGRPC {
// If there are HTTP additional endpoints, use DualStrategy in gRPC-primary mode:
// gRPC is the real primary sender, HTTP additional endpoints are secondaries.
// No shadow headers are injected in this mode.
if httpAdditionals := getHTTPAdditionalEndpoints(endpoints); len(httpAdditionals) > 0 && !serverlessMeta.IsEnabled() {
return grpcsender.NewGRPCPrimaryDualStrategy(
inputChan,
outputChan,
flushChan,
encoder,
cfg,
endpoints,
httpAdditionals,
compressor,
pipelineMonitor,
instanceID,
)
}

tokenizer := rtokenizer.NewRustTokenizer()
translator := grpcsender.NewMessageTranslator(instanceID, tokenizer)
// TODO: Consider sharing cluster manager across pipelines for better pattern clustering:
Expand All @@ -122,6 +140,7 @@ func getStrategy(

return grpcsender.NewBatchStrategy(statefulInputChan, outputChan, flushChan, endpoints.BatchWait, endpoints.BatchMaxSize, endpoints.BatchMaxContentSize, "logs", encoder, pipelineMonitor, instanceID)
}
// If 2nd endpoint is gRPC, use DualStrategy in shadow mode
if grpcEndpoint, ok := firstGRPCAdditionalEndpoint(endpoints); ok && !serverlessMeta.IsEnabled() {
grpcComp := buildEndpointCompressor(compressor, grpcEndpoint)
return grpcsender.NewDualStrategy(inputChan, outputChan, flushChan, grpcEndpoint, grpcComp, cfg, endpoints, serverlessMeta, encoder, pipelineMonitor, instanceID)
Expand Down Expand Up @@ -153,6 +172,17 @@ func firstGRPCAdditionalEndpoint(endpoints *config.Endpoints) (config.Endpoint,
return config.Endpoint{}, false
}

// getHTTPAdditionalEndpoints returns all additional endpoints that use HTTP (not gRPC).
func getHTTPAdditionalEndpoints(endpoints *config.Endpoints) []config.Endpoint {
var result []config.Endpoint
for _, ep := range endpoints.Endpoints[1:] {
if !ep.UseGRPC {
result = append(result, ep)
}
}
return result
}

func buildEndpointCompressor(comp logscompression.Component, endpoint config.Endpoint) compressioncommon.Compressor {
if endpoint.UseCompression {
return comp.NewCompressor(endpoint.CompressionKind, endpoint.CompressionLevel)
Expand Down
5 changes: 3 additions & 2 deletions comp/logs/agent/agentimpl/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,9 @@ func (a *logAgent) start(context.Context) error {

a.startPipeline()

// If we're currently sending over TCP, attempt restart over HTTP
if !endpoints.UseHTTP {
// If we're currently sending over TCP (not HTTP and not gRPC), attempt restart over HTTP.
// Skip for gRPC: gRPC endpoints also have UseHTTP=false but should not be overridden.
if !endpoints.UseHTTP && !endpoints.UseGRPC {
a.smartHTTPRestart()
}
return nil
Expand Down
229 changes: 196 additions & 33 deletions pkg/logs/sender/grpc/dual_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,14 @@ package grpc

import (
"github.com/DataDog/datadog-agent/comp/logs-library/metrics"
"github.com/DataDog/datadog-agent/comp/logs-library/processor"
"github.com/DataDog/datadog-agent/comp/logs-library/sender"
"github.com/DataDog/datadog-agent/comp/logs/agent/config"
logscompression "github.com/DataDog/datadog-agent/comp/serializer/logscompression/def"
pkgconfigmodel "github.com/DataDog/datadog-agent/pkg/config/model"
pkgconfigsetup "github.com/DataDog/datadog-agent/pkg/config/setup"
"github.com/DataDog/datadog-agent/pkg/logs/client"
httpClient "github.com/DataDog/datadog-agent/pkg/logs/client/http"
"github.com/DataDog/datadog-agent/pkg/logs/message"
rtokenizer "github.com/DataDog/datadog-agent/pkg/logs/patterns/tokenizer/rust"
compressioncommon "github.com/DataDog/datadog-agent/pkg/util/compression"
Expand All @@ -20,20 +23,38 @@ import (

const grpcSecondaryBufferSize = 10000

// DualStrategy fans messages to both the primary HTTP strategy and a secondary gRPC stateful sender.
// DualStrategy fans messages to two transports simultaneously.
//
// HTTP-primary mode (NewDualStrategy): HTTP uses the pipeline's outputChan; gRPC is a
// standalone secondary with shadow headers injected.
//
// gRPC-primary mode (NewGRPCPrimaryDualStrategy): gRPC uses the pipeline's outputChan;
// HTTP additional endpoints are standalone secondaries. No shadow headers are injected.
type DualStrategy struct {
inputChan chan *message.Message
httpInputChan chan *message.Message
grpcChan chan *message.Message
primaryChan chan *message.Message // feeds the primary strategy (outputChan side)
secondaryChan chan *message.Message // feeds the standalone secondary
primary sender.Strategy
endpoint config.Endpoint
comp compressioncommon.Compressor
cfg pkgconfigmodel.Reader
endpoints *config.Endpoints
instanceID string
done chan struct{}
grpcIsPrimary bool

// HTTP-primary mode: standalone gRPC secondary
grpcEndpoint config.Endpoint
grpcComp compressioncommon.Compressor

// gRPC-primary mode: standalone HTTP secondaries
httpEndpoints []config.Endpoint
compression logscompression.Component

cfg pkgconfigmodel.Reader
endpoints *config.Endpoints
pipelineMonitor metrics.PipelineMonitor
instanceID string
done chan struct{}
}

// NewDualStrategy creates a DualStrategy in HTTP-primary mode.
// HTTP sends via the pipeline's outputChan; gRPC is a standalone secondary.
// Shadow headers (dd-shadow-only) are injected on the gRPC endpoint.
func NewDualStrategy(
inputChan chan *message.Message,
outputChan chan *message.Payload,
Expand All @@ -52,9 +73,9 @@ func NewDualStrategy(
}
grpcEndpoint.ExtraHTTPHeaders["dd-shadow-only"] = "true"

httpInputChan := make(chan *message.Message, cap(inputChan))
primary := sender.NewBatchStrategy(
httpInputChan,
httpPrimaryChan := make(chan *message.Message, cap(inputChan))
httpPrimaryBatchStrategy := sender.NewBatchStrategy(
httpPrimaryChan,
outputChan,
flushChan,
serverlessMeta,
Expand All @@ -68,38 +89,99 @@ func NewDualStrategy(
)

return &DualStrategy{
inputChan: inputChan,
httpInputChan: httpInputChan,
grpcChan: make(chan *message.Message, grpcSecondaryBufferSize),
primary: primary,
endpoint: grpcEndpoint,
comp: comp,
cfg: cfg,
endpoints: endpoints,
instanceID: instanceID,
done: make(chan struct{}),
inputChan: inputChan,
primaryChan: httpPrimaryChan,
secondaryChan: make(chan *message.Message, grpcSecondaryBufferSize),
primary: httpPrimaryBatchStrategy,
grpcIsPrimary: false,
grpcEndpoint: grpcEndpoint,
grpcComp: comp,
cfg: cfg,
endpoints: endpoints,
pipelineMonitor: pipelineMonitor,
instanceID: instanceID,
done: make(chan struct{}),
}
}

// NewGRPCPrimaryDualStrategy creates a DualStrategy in gRPC-primary mode.
// gRPC sends via the pipeline's outputChan; HTTP additional endpoints are standalone secondaries.
// No shadow headers are injected on either path.
func NewGRPCPrimaryDualStrategy(
inputChan chan *message.Message,
outputChan chan *message.Payload,
flushChan chan struct{},
grpcComp compressioncommon.Compressor,
cfg pkgconfigmodel.Reader,
endpoints *config.Endpoints,
httpEndpoints []config.Endpoint,
compression logscompression.Component,
pipelineMonitor metrics.PipelineMonitor,
instanceID string,
) sender.Strategy {
primaryChan := make(chan *message.Message, grpcSecondaryBufferSize)

// Build the gRPC primary strategy: primaryChan → translator → statefulChan → BatchStrategy → outputChan
tokenizer := rtokenizer.NewRustTokenizer()
translator := NewMessageTranslator(instanceID+"-grpc", tokenizer)
statefulChan := translator.Start(primaryChan, cfg.GetInt("logs_config.message_channel_size"))
primary := NewBatchStrategy(
statefulChan,
outputChan,
flushChan,
endpoints.BatchWait,
endpoints.BatchMaxSize,
endpoints.BatchMaxContentSize,
"logs",
grpcComp,
pipelineMonitor,
instanceID,
)

return &DualStrategy{
inputChan: inputChan,
primaryChan: primaryChan,
secondaryChan: make(chan *message.Message, grpcSecondaryBufferSize),
primary: primary,
grpcIsPrimary: true,
httpEndpoints: httpEndpoints,
compression: compression,
cfg: cfg,
endpoints: endpoints,
pipelineMonitor: pipelineMonitor,
instanceID: instanceID,
done: make(chan struct{}),
}
}

func (d *DualStrategy) Start() {
d.primary.Start()

conn, grpcClient, err := newGRPCClient(d.endpoint)
if d.grpcIsPrimary {
d.startHTTPSecondary()
} else {
d.startGRPCSecondary()
}
}

// startGRPCSecondary handles the HTTP-primary mode: standalone gRPC as secondary.
func (d *DualStrategy) startGRPCSecondary() {
conn, grpcClient, err := newGRPCClient(d.grpcEndpoint)
if err != nil {
log.Errorf("Failed to create gRPC connection for dual-send in pipeline %s: %v; continuing with HTTP only", d.instanceID, err)
go func() {
defer close(d.done)
for msg := range d.inputChan {
d.httpInputChan <- msg
d.primaryChan <- msg
}
close(d.httpInputChan)
close(d.primaryChan)
}()
return
}

tokenizer := rtokenizer.NewRustTokenizer()
translator := NewMessageTranslator(d.instanceID+"-dual", tokenizer)
statefulChan := translator.Start(d.grpcChan, d.cfg.GetInt("logs_config.message_channel_size"))
statefulChan := translator.Start(d.secondaryChan, d.cfg.GetInt("logs_config.message_channel_size"))
payloadChan := make(chan *message.Payload, inputChanBufferSize)
grpcFlushChan := make(chan struct{}, 1)

Expand All @@ -112,7 +194,7 @@ func (d *DualStrategy) Start() {
d.endpoints.BatchMaxSize,
d.endpoints.BatchMaxContentSize,
"grpc-dual",
d.comp,
d.grpcComp,
pipelineMonitor,
d.instanceID,
)
Expand All @@ -133,9 +215,9 @@ func (d *DualStrategy) Start() {
conn,
grpcClient,
&sender.NoopSink{},
d.endpoint,
d.grpcEndpoint,
config.StreamLifetime(d.cfg),
d.comp,
d.grpcComp,
maxInflight,
)
worker.start()
Expand All @@ -152,12 +234,93 @@ func (d *DualStrategy) Start() {
}()

for msg := range d.inputChan {
d.httpInputChan <- msg
d.grpcChan <- msg
d.primaryChan <- msg // HTTP primary
d.secondaryChan <- msg // gRPC secondary
}

close(d.primaryChan)
close(d.secondaryChan)
}()
}

// startHTTPSecondary handles the gRPC-primary mode: standalone HTTP destinations as secondary.
// Messages are cloned and JSON-encoded before being fed into the existing sender.NewBatchStrategy,
// which handles batching and payload creation. No shadow headers are injected.
func (d *DualStrategy) startHTTPSecondary() {
destCtx := client.NewDestinationsContext()
destCtx.Start()

// One input channel per HTTP destination for fan-out of payloads.
destPayloadChans := make([]chan *message.Payload, len(d.httpEndpoints))
httpStopChans := make([]<-chan struct{}, len(d.httpEndpoints))
for i, ep := range d.httpEndpoints {
ch := make(chan *message.Payload, 100)
destPayloadChans[i] = ch
destMeta := client.NewDestinationMetadata("logs", d.instanceID, "additional-http", ep.Host, "")
dest := httpClient.NewDestination(ep, httpClient.JSONContentType, destCtx, true, destMeta, d.cfg, 1, 4, d.pipelineMonitor, d.instanceID, nil)
httpStopChans[i] = dest.Start(ch, nil, nil)
}

// HTTP batch strategy: clonedChan → JSON payloads → httpOutputChan
clonedChan := make(chan *message.Message, grpcSecondaryBufferSize)
httpOutputChan := make(chan *message.Payload, 100)
httpFlushChan := make(chan struct{}, 1)
httpComp := d.compression.NewCompressor(compressioncommon.NoneKind, 0)
httpBatch := sender.NewBatchStrategy(
clonedChan,
httpOutputChan,
httpFlushChan,
sender.NewServerlessMeta(false),
d.endpoints.BatchWait,
d.endpoints.BatchMaxSize,
d.endpoints.BatchMaxContentSize,
"logs-http-additional",
httpComp,
d.pipelineMonitor,
d.instanceID,
)
httpBatch.Start()

// Fan httpOutputChan payloads to all HTTP destinations.
go func() {
for payload := range httpOutputChan {
for _, ch := range destPayloadChans {
ch <- payload
}
}
for _, ch := range destPayloadChans {
close(ch)
}
}()

go func() {
defer close(d.done)
defer func() {
httpBatch.Stop()
for _, stopChan := range httpStopChans {
if stopChan != nil {
<-stopChan
}
}
destCtx.Stop()
}()

for msg := range d.inputChan {
d.primaryChan <- msg // gRPC primary (unmodified)

// Clone the message and JSON-encode for HTTP.
// Message embeds MessageContent by value so a copy is safe;
// JSONEncoder replaces the Content slice rather than modifying in place.
cloned := *msg
if err := processor.JSONEncoder.Encode(&cloned, msg.MessageMetadata.Hostname); err != nil {
log.Errorf("dual_strategy: failed to JSON-encode message for HTTP secondary: %v", err)
continue
}
clonedChan <- &cloned
}

close(d.httpInputChan)
close(d.grpcChan)
close(d.primaryChan)
close(clonedChan)
}()
}

Expand Down
Loading