Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions pkg/logs/pipeline/.bits/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"permissions": {
"allow": [
"Bash(go:*)"
]
}
}
35 changes: 18 additions & 17 deletions pkg/logs/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,12 @@ import (

// Pipeline processes and sends messages to the backend
type Pipeline struct {
InputChan chan *message.Message
flushChan chan struct{}
processor *processor.Processor
strategy sender.Strategy
pipelineMonitor metrics.PipelineMonitor
InputChan chan *message.Message
flushChan chan struct{}
processor *processor.Processor
strategy sender.Strategy
pipelineMonitor metrics.PipelineMonitor
prepareForNewStream func()
}

// NewPipeline returns a new Pipeline
Expand All @@ -47,7 +48,6 @@ func NewPipeline(
) *Pipeline {
strategyInput := make(chan *message.Message, cfg.GetInt("logs_config.message_channel_size"))
flushChan := make(chan struct{})

var encoder processor.Encoder
if serverlessMeta.IsEnabled() {
encoder = processor.JSONServerlessInitEncoder
Expand All @@ -62,19 +62,20 @@ func NewPipeline(
} else {
encoder = processor.RawEncoder
}
strategy := getStrategy(strategyInput, senderImpl.In(), flushChan, endpoints, serverlessMeta, senderImpl.PipelineMonitor(), compression, instanceID, cfg)
strategy, prepareForNewStream := getStrategy(strategyInput, senderImpl.In(), flushChan, endpoints, serverlessMeta, senderImpl.PipelineMonitor(), compression, instanceID, cfg)

inputChan := make(chan *message.Message, cfg.GetInt("logs_config.message_channel_size"))

processor := processor.New(cfg, inputChan, strategyInput, processingRules,
encoder, diagnosticMessageReceiver, hostname, senderImpl.PipelineMonitor(), instanceID)

return &Pipeline{
InputChan: inputChan,
flushChan: flushChan,
processor: processor,
strategy: strategy,
pipelineMonitor: senderImpl.PipelineMonitor(),
InputChan: inputChan,
flushChan: flushChan,
processor: processor,
strategy: strategy,
pipelineMonitor: senderImpl.PipelineMonitor(),
prepareForNewStream: prepareForNewStream,
}
}

Expand Down Expand Up @@ -106,7 +107,7 @@ func getStrategy(
compressor logscompression.Component,
instanceID string,
cfg pkgconfigmodel.Reader,
) sender.Strategy {
) (sender.Strategy, func()) {
if endpoints.UseGRPC || endpoints.UseHTTP || serverlessMeta.IsEnabled() {
var encoder compressioncommon.Compressor
encoder = compressor.NewCompressor(compressioncommon.NoneKind, 0)
Expand All @@ -120,11 +121,11 @@ func getStrategy(
// translator := grpcsender.NewMessageTranslator(getSharedClusterManager(), tokenizer)
statefulInputChan := translator.Start(inputChan, cfg.GetInt("logs_config.message_channel_size"))

return grpcsender.NewBatchStrategy(statefulInputChan, outputChan, flushChan, endpoints.BatchWait, endpoints.BatchMaxSize, endpoints.BatchMaxContentSize, "logs", encoder, pipelineMonitor, instanceID)
return grpcsender.NewBatchStrategy(statefulInputChan, outputChan, flushChan, endpoints.BatchWait, endpoints.BatchMaxSize, endpoints.BatchMaxContentSize, "logs", encoder, pipelineMonitor, instanceID), translator.PrepareForNewStream
}
if grpcEndpoint, ok := firstGRPCAdditionalEndpoint(endpoints); ok && !serverlessMeta.IsEnabled() {
grpcComp := buildEndpointCompressor(compressor, grpcEndpoint)
return grpcsender.NewDualStrategy(inputChan, outputChan, flushChan, grpcEndpoint, grpcComp, cfg, endpoints, serverlessMeta, encoder, pipelineMonitor, instanceID)
return grpcsender.NewDualStrategy(inputChan, outputChan, flushChan, grpcEndpoint, grpcComp, cfg, endpoints, serverlessMeta, encoder, pipelineMonitor, instanceID), nil
}
return sender.NewBatchStrategy(
inputChan,
Expand All @@ -137,11 +138,11 @@ func getStrategy(
"logs",
encoder,
pipelineMonitor,
instanceID)
instanceID), nil
}

log.Infof("Pipeline: Using StreamStrategy (default)")
return sender.NewStreamStrategy(inputChan, outputChan, compressor.NewCompressor(compressioncommon.NoneKind, 0))
return sender.NewStreamStrategy(inputChan, outputChan, compressor.NewCompressor(compressioncommon.NoneKind, 0)), nil
}

func firstGRPCAdditionalEndpoint(endpoints *config.Endpoints) (config.Endpoint, bool) {
Expand Down
40 changes: 28 additions & 12 deletions pkg/logs/pipeline/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ type provider struct {
routerChannels []chan *message.Message
currentRouterIndex *atomic.Uint32
forwarderWaitGroup sync.WaitGroup
grpcPrepareHooks []func()
}

// NewProvider returns a new Provider.
Expand All @@ -99,8 +100,29 @@ func NewProvider(
var senderImpl sender.PipelineComponent
serverlessMeta := sender.NewServerlessMeta(serverless)

p := &provider{
numberOfPipelines: numberOfPipelines,
diagnosticMessageReceiver: diagnosticMessageReceiver,
processingRules: processingRules,
endpoints: endpoints,
pipelines: []*Pipeline{},
currentPipelineIndex: atomic.NewUint32(0),
serverlessMeta: serverlessMeta,
hostname: hostname,
cfg: cfg,
compression: compression,
failoverEnabled: cfg.GetBool("logs_config.pipeline_failover.enabled"),
currentRouterIndex: atomic.NewUint32(0),
}

if endpoints.UseGRPC {
senderImpl = grpcsender.NewSender(numberOfPipelines, cfg, sink, endpoints, destinationsContext, compression)
senderImpl = grpcsender.NewSender(numberOfPipelines, cfg, sink, endpoints, destinationsContext, compression, func() {
for _, hook := range p.grpcPrepareHooks {
if hook != nil {
hook()
}
}
})
} else if endpoints.UseHTTP {
if _, ok := firstGRPCAdditionalEndpoint(endpoints); ok {
if endpoints.Main.ExtraHTTPHeaders == nil {
Expand All @@ -117,17 +139,8 @@ func NewProvider(
senderImpl = tcpSender(numberOfPipelines, cfg, sink, endpoints, destinationsContext, status, serverlessMeta, legacyMode)
}

return newProvider(
numberOfPipelines,
diagnosticMessageReceiver,
processingRules,
endpoints,
hostname,
cfg,
compression,
serverlessMeta,
senderImpl,
)
p.sender = senderImpl
return p
}

// NewMockProvider creates a new provider that will not provide any pipelines.
Expand Down Expand Up @@ -274,6 +287,9 @@ func (p *provider) Start() {
strconv.Itoa(i),
)
pipeline.Start()
if pipeline.prepareForNewStream != nil {
p.grpcPrepareHooks = append(p.grpcPrepareHooks, pipeline.prepareForNewStream)
}
p.pipelines = append(p.pipelines, pipeline)
}

Expand Down
18 changes: 16 additions & 2 deletions pkg/logs/sender/grpc/batch_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,15 @@ func isStateDatum(datum *statefulpb.Datum) bool {
}
}

func isWireStateDatum(datum *statefulpb.Datum) bool {
switch datum.Data.(type) {
case *statefulpb.Datum_PatternDelete, *statefulpb.Datum_DictEntryDelete:
return false
default:
return true
}
}

// batchStrategy contains batching logic for gRPC sender without serializer
// It collects Datum objects from StatefulMessages and creates Payload with serialized DatumSequence
// Note: Serverless logs are not supported in this PoC implementation
Expand Down Expand Up @@ -302,12 +311,17 @@ func (s *batchStrategy) sendMessagesWithDatums(messagesMetadata []*message.Messa
unencodedSize += msgMeta.RawDataLen
}

// Extract all state changes from this batch for snapshot management
// Extract all state changes from this batch for snapshot management, and build
// the filtered set of datums that will actually be sent over the wire.
var stateChanges []*statefulpb.Datum
wireDatums := make([]*statefulpb.Datum, 0, len(grpcDatums))
for _, datum := range grpcDatums {
if isStateDatum(datum) {
stateChanges = append(stateChanges, datum)
}
if isWireStateDatum(datum) {
wireDatums = append(wireDatums, datum)
}
}

// Track per-datum-type counts and sizes
Expand Down Expand Up @@ -335,7 +349,7 @@ func (s *batchStrategy) sendMessagesWithDatums(messagesMetadata []*message.Messa

// Create DatumSequence and marshal to bytes
datumSeq := &statefulpb.DatumSequence{
Data: grpcDatums,
Data: wireDatums,
}

var err error
Expand Down
10 changes: 10 additions & 0 deletions pkg/logs/sender/grpc/batch_strategy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -753,6 +753,16 @@ func TestBatchStrategyStatefulExtra(t *testing.T) {
assert.Equal(t, uint64(3), extra2.StateChanges[3].GetDictEntryDefine().Id)
assert.Equal(t, "value3", extra2.StateChanges[3].GetDictEntryDefine().Value)

// Verify delete datums are tracked locally but not serialized on the wire.
var batch2DatumSeq statefulpb.DatumSequence
err := proto.Unmarshal(payload2.Encoded, &batch2DatumSeq)
require.NoError(t, err)
require.Len(t, batch2DatumSeq.Data, 4, "Batch 2 wire payload should omit delete datums")
assert.NotNil(t, batch2DatumSeq.Data[0].GetLogs())
assert.Equal(t, uint64(3), batch2DatumSeq.Data[1].GetPatternDefine().PatternId)
assert.Equal(t, uint64(3), batch2DatumSeq.Data[2].GetDictEntryDefine().Id)
assert.NotNil(t, batch2DatumSeq.Data[3].GetLogs())

// Batch 3 (3 entries): add p4, add d4, log
input <- createPatternDefineMsg(4, "pattern4")
input <- createDictEntryDefineMsg(4, "value4")
Expand Down
1 change: 1 addition & 0 deletions pkg/logs/sender/grpc/dual_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ func (d *DualStrategy) Start() {
config.StreamLifetime(d.cfg),
d.comp,
maxInflight,
translator.PrepareForNewStream,
)
worker.start()

Expand Down
28 changes: 11 additions & 17 deletions pkg/logs/sender/grpc/mock_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
)

const nanoToMillis = 1000000
const staleTTL = 14 * time.Minute
const staleTTL = 5 * time.Minute
const staleSweepInterval = 30 * time.Second

// batchEntry is a per-message sidecar used during batch tokenization.
Expand Down Expand Up @@ -102,8 +102,7 @@ type MessageTranslator struct {
tokenizer token.Tokenizer
jsonLogsAsRaw bool // when true, JSON logs bypass stateful encoding and are sent as RawLog

pipelineName string
lastStaleSweep time.Time
pipelineName string

// tagCache caches the last computed tag set to avoid recomputation across messages
// with identical metadata (common in single-source pipelines).
Expand Down Expand Up @@ -137,7 +136,6 @@ func NewMessageTranslator(pipelineName string, tokenizer token.Tokenizer) *Messa
tokenizer: tokenizer,
jsonLogsAsRaw: pkgconfigsetup.Datadog().GetBool("logs_config.patterns.json_as_raw"),
pipelineName: pipelineName,
lastStaleSweep: time.Now(),
}
tlmPipelineStateSize.Set(0, pipelineName)
return mt
Expand Down Expand Up @@ -366,19 +364,7 @@ func (mt *MessageTranslator) processPreTokenized(msg *message.Message, tokenList
tagCountOverLimit, tagBytesOverLimit := mt.tagEvictionManager.ShouldEvict(tagCount, tagMemoryBytes)
if tagCountOverLimit || tagBytesOverLimit {
for _, evictedID := range mt.tagEvictionManager.Evict(mt.tagManager, tagCount, tagMemoryBytes, tagCountOverLimit, tagBytesOverLimit) {
mt.sendDictEntryDelete(outputChan, msg, evictedID)
}
}

// Periodic TTL sweep: remove entries not accessed in the last 5 minutes.
// This prevents stale entries from accumulating in state and inflating snapshot replays.
if time.Since(mt.lastStaleSweep) >= staleSweepInterval {
mt.lastStaleSweep = time.Now()

for _, evictedPattern := range mt.clusterManager.EvictStalePatterns(staleTTL) {
mt.sendPatternDelete(evictedPattern.PatternID, msg, outputChan)
}
for _, evictedID := range mt.tagManager.EvictStaleEntries(staleTTL) {
mt.invalidateTagCache(evictedID)
mt.sendDictEntryDelete(outputChan, msg, evictedID)
}
}
Expand Down Expand Up @@ -610,6 +596,14 @@ func (mt *MessageTranslator) sendPatternDelete(patternID uint64, msg *message.Me
}
}

// PrepareForNewStream performs local-only stale eviction before a new stream snapshot is built.
func (mt *MessageTranslator) PrepareForNewStream() {
for _, evictedID := range mt.tagManager.EvictStaleEntries(staleTTL) {
mt.invalidateTagCache(evictedID)
}
mt.clusterManager.EvictStalePatterns(staleTTL)
}

// sendDictEntryDefine creates and sends a DictEntryDefine datum
func (mt *MessageTranslator) sendDictEntryDefine(outputChan chan *message.StatefulMessage, msg *message.Message, id uint64, value string) {
dictDatum := buildDictEntryDefine(id, value)
Expand Down
26 changes: 25 additions & 1 deletion pkg/logs/sender/grpc/mock_state_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,30 @@ func TestBuildTagSet_CacheHitSelfHealsAfterSilentDictEviction(t *testing.T) {
assert.NotEqual(t, tagSet1, tagSet2, "rebuilt tagset must not reuse stale cached pointer")
}

func TestPrepareForNewStreamInvalidatesEvictedTagCache(t *testing.T) {
tok := rtokenizer.NewRustTokenizer()
mt := NewMessageTranslator("test-pipeline", tok)

origin := makeTestOrigin("svc-a", "src-a", []string{"env:test"})
msg1 := makeMsg("log line 1", "host-1", "info", origin)

tagSet1, tagStr1, dictID1, isNew1 := mt.buildTagSet(msg1)
require.NotNil(t, tagSet1)
require.True(t, isNew1)

mt.tagManager.EvictStaleEntries(0)
mt.PrepareForNewStream()

msg2 := makeMsg("log line 2", "host-1", "info", origin)
tagSet2, tagStr2, dictID2, isNew2 := mt.buildTagSet(msg2)

require.NotNil(t, tagSet2)
assert.True(t, isNew2, "prepareForNewStream must rebuild evicted cached tagset")
assert.Equal(t, tagStr1, tagStr2)
assert.NotEqual(t, dictID1, dictID2)
assert.NotEqual(t, tagSet1, tagSet2)
}

// --- toValidUTF8 tests ---

func TestToValidUTF8_ValidString(t *testing.T) {
Expand All @@ -206,7 +230,7 @@ func TestToValidUTF8_ValidString(t *testing.T) {
{"ascii", "hello world"},
{"multibyte", "caf\xc3\xa9"}, // café
{"emoji", "\xf0\x9f\x98\x80 smile"}, // U+1F600
{"nul is valid utf8", "hello\x00world"}, // NUL is valid UTF-8 (U+0000)
{"nul is valid utf8", "hello\x00world"}, // NUL is valid UTF-8 (U+0000)
{"mixed scripts", "\xe4\xb8\xad\xe6\x96\x87 Chinese"}, // 中文
}
for _, tt := range tests {
Expand Down
4 changes: 4 additions & 0 deletions pkg/logs/sender/grpc/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ type Sender struct {
destinationsContext *client.DestinationsContext
cfg pkgconfigmodel.Reader
numberOfWorkers int
prepareForNewStream func()

// Pipeline integration
pipelineMonitor metrics.PipelineMonitor
Expand All @@ -114,6 +115,7 @@ func NewSender(
endpoints *config.Endpoints,
destinationsCtx *client.DestinationsContext,
compressor logscompression.Component,
prepareForNewStream func(),
) *Sender {

// For now, use the first reliable endpoint
Expand Down Expand Up @@ -149,6 +151,7 @@ func NewSender(
destinationsContext: destinationsCtx,
cfg: cfg,
numberOfWorkers: numberOfWorkers,
prepareForNewStream: prepareForNewStream,
pipelineMonitor: metrics.NewTelemetryPipelineMonitor(),
workers: make([]*streamWorker, 0, numberOfWorkers),
queues: make([]chan *message.Payload, numberOfWorkers),
Expand Down Expand Up @@ -183,6 +186,7 @@ func NewSender(
streamLifetime,
comp,
maxInflight,
sender.prepareForNewStream,
)

sender.workers = append(sender.workers, worker)
Expand Down
Loading
Loading