diff --git a/pkg/logs/pipeline/pipeline.go b/pkg/logs/pipeline/pipeline.go index a3f25313231d..5dd563af3067 100644 --- a/pkg/logs/pipeline/pipeline.go +++ b/pkg/logs/pipeline/pipeline.go @@ -31,7 +31,7 @@ type Pipeline struct { processor *processor.Processor strategy sender.Strategy pipelineMonitor metrics.PipelineMonitor - prepareForNewStream func() + prepareForNewStream func() grpcsender.EvictedState } // NewPipeline returns a new Pipeline @@ -107,7 +107,7 @@ func getStrategy( compressor logscompression.Component, instanceID string, cfg pkgconfigmodel.Reader, -) (sender.Strategy, func()) { +) (sender.Strategy, func() grpcsender.EvictedState) { if endpoints.UseGRPC || endpoints.UseHTTP || serverlessMeta.IsEnabled() { var encoder compressioncommon.Compressor encoder = compressor.NewCompressor(compressioncommon.NoneKind, 0) @@ -125,7 +125,7 @@ func getStrategy( } if grpcEndpoint, ok := firstGRPCAdditionalEndpoint(endpoints); ok && !serverlessMeta.IsEnabled() { grpcComp := buildEndpointCompressor(compressor, grpcEndpoint) - return grpcsender.NewDualStrategy(inputChan, outputChan, flushChan, grpcEndpoint, grpcComp, cfg, endpoints, serverlessMeta, encoder, pipelineMonitor, instanceID), nil + return grpcsender.NewDualStrategy(inputChan, outputChan, flushChan, grpcEndpoint, grpcComp, cfg, endpoints, serverlessMeta, encoder, pipelineMonitor, instanceID), translator.PrepareForNewStream } return sender.NewBatchStrategy( inputChan, diff --git a/pkg/logs/pipeline/provider.go b/pkg/logs/pipeline/provider.go index a31963e21836..74a563606c3a 100644 --- a/pkg/logs/pipeline/provider.go +++ b/pkg/logs/pipeline/provider.go @@ -76,7 +76,7 @@ type provider struct { routerChannels []chan *message.Message currentRouterIndex *atomic.Uint32 forwarderWaitGroup sync.WaitGroup - grpcPrepareHooks []func() + grpcPrepareHooks []func() grpcsender.EvictedState } // NewProvider returns a new Provider. @@ -116,12 +116,16 @@ func NewProvider( } if endpoints.UseGRPC { - senderImpl = grpcsender.NewSender(numberOfPipelines, cfg, sink, endpoints, destinationsContext, compression, func() { + senderImpl = grpcsender.NewSender(numberOfPipelines, cfg, sink, endpoints, destinationsContext, compression, func() grpcsender.EvictedState { + var merged grpcsender.EvictedState for _, hook := range p.grpcPrepareHooks { if hook != nil { - hook() + evicted := hook() + merged.DictIDs = append(merged.DictIDs, evicted.DictIDs...) + merged.PatternIDs = append(merged.PatternIDs, evicted.PatternIDs...) } } + return merged }) } else if endpoints.UseHTTP { if _, ok := firstGRPCAdditionalEndpoint(endpoints); ok { diff --git a/pkg/logs/sender/grpc/.bits/settings.json b/pkg/logs/sender/grpc/.bits/settings.json new file mode 100644 index 000000000000..ef013e4d3df6 --- /dev/null +++ b/pkg/logs/sender/grpc/.bits/settings.json @@ -0,0 +1,7 @@ +{ + "permissions": { + "allow": [ + "Bash(go:*)" + ] + } +} \ No newline at end of file diff --git a/pkg/logs/sender/grpc/dict_admission.go b/pkg/logs/sender/grpc/dict_admission.go new file mode 100644 index 000000000000..4c183ea822c9 --- /dev/null +++ b/pkg/logs/sender/grpc/dict_admission.go @@ -0,0 +1,83 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +package grpc + +const ( + // defaultDictAdmissionThreshold is the number of times a longer string + // must be seen before it is promoted into the dictionary. + defaultDictAdmissionThreshold uint16 = 3 + + // defaultDictMaxShortLen is the byte-length at or below which a string + // is promoted immediately (heuristic path). Covers log levels (INFO, + // ERROR), HTTP methods (GET, POST), short status words (none, success). + defaultDictMaxShortLen = 8 + + // defaultDictMaxTracked caps the number of candidate strings tracked + // for count-based promotion to bound memory usage. + defaultDictMaxTracked = 4096 +) + +// dictAdmission decides whether a JSON-context string value should be +// promoted into the stream dictionary. Two complementary strategies: +// +// 1. Heuristic – strings ≤ maxShortLen bytes are admitted immediately. +// Short strings are almost always categorical (log levels, HTTP +// methods, enum labels) and pay for their dictionary define on the +// second occurrence. +// +// 2. Frequency – longer strings are counted; once a string has been +// seen threshold times it is promoted. This catches things like +// route names, source identifiers, and URL paths that repeat often +// but are too long for the heuristic. +type dictAdmission struct { + counts map[string]uint16 + threshold uint16 + maxShortLen int + maxTracked int +} + +func newDictAdmission() *dictAdmission { + return &dictAdmission{ + counts: make(map[string]uint16), + threshold: defaultDictAdmissionThreshold, + maxShortLen: defaultDictMaxShortLen, + maxTracked: defaultDictMaxTracked, + } +} + +// shouldAdmit returns true when s should be added to the dictionary. +func (da *dictAdmission) shouldAdmit(s string) bool { + // Heuristic: short strings are almost always categorical. + if len(s) <= da.maxShortLen { + return true + } + + // Frequency: promote after threshold occurrences. + n := da.counts[s] + 1 + if n >= da.threshold { + delete(da.counts, s) // no longer need to track + return true + } + + // Only start tracking if we have capacity. + if len(da.counts) < da.maxTracked { + da.counts[s] = n + } + return false +} + +// reset clears accumulated counts. Called on stream rotation so that +// stale candidates don't persist across streams. +func (da *dictAdmission) reset() { + // Re-use the map when it's small to reduce GC pressure. + if len(da.counts) <= 256 { + for k := range da.counts { + delete(da.counts, k) + } + } else { + da.counts = make(map[string]uint16) + } +} diff --git a/pkg/logs/sender/grpc/inflight.go b/pkg/logs/sender/grpc/inflight.go index 535c2153250f..2578bccbb9fe 100644 --- a/pkg/logs/sender/grpc/inflight.go +++ b/pkg/logs/sender/grpc/inflight.go @@ -173,6 +173,18 @@ func (t *inflightTracker) getSnapshot() []byte { return t.snapshot.serialize() } +// pruneSnapshot removes evicted dict and pattern entries from the snapshot. +// Called after PrepareForNewStream so the snapshot sent on the new stream +// does not contain state that has been evicted from the translator. +func (t *inflightTracker) pruneSnapshot(evicted EvictedState) { + for _, id := range evicted.DictIDs { + delete(t.snapshot.dictMap, id) + } + for _, id := range evicted.PatternIDs { + delete(t.snapshot.patternMap, id) + } +} + // snapshotState maintains the accumulated state changes for stream bootstrapping // It represents the state "before" the first payload in the inflight queue type snapshotState struct { diff --git a/pkg/logs/sender/grpc/mock_state.go b/pkg/logs/sender/grpc/mock_state.go index cf524dd6f7b1..9fa661c0a929 100644 --- a/pkg/logs/sender/grpc/mock_state.go +++ b/pkg/logs/sender/grpc/mock_state.go @@ -102,7 +102,8 @@ type MessageTranslator struct { tokenizer token.Tokenizer jsonLogsAsRaw bool // when true, JSON logs bypass stateful encoding and are sent as RawLog - pipelineName string + pipelineName string + dictAdmission *dictAdmission // tagCache caches the last computed tag set to avoid recomputation across messages // with identical metadata (common in single-source pipelines). @@ -136,6 +137,7 @@ func NewMessageTranslator(pipelineName string, tokenizer token.Tokenizer) *Messa tokenizer: tokenizer, jsonLogsAsRaw: pkgconfigsetup.Datadog().GetBool("logs_config.patterns.json_as_raw"), pipelineName: pipelineName, + dictAdmission: newDictAdmission(), } tlmPipelineStateSize.Set(0, pipelineName) return mt @@ -596,12 +598,30 @@ func (mt *MessageTranslator) sendPatternDelete(patternID uint64, msg *message.Me } } +// EvictedState holds IDs evicted during PrepareForNewStream so the stream +// worker can prune its snapshot before sending it on a new stream. +type EvictedState struct { + DictIDs []uint64 + PatternIDs []uint64 +} + // PrepareForNewStream performs local-only stale eviction before a new stream snapshot is built. -func (mt *MessageTranslator) PrepareForNewStream() { - for _, evictedID := range mt.tagManager.EvictStaleEntries(staleTTL) { - mt.invalidateTagCache(evictedID) +// Returns the evicted dict and pattern IDs so the caller can prune the inflight snapshot. +func (mt *MessageTranslator) PrepareForNewStream() EvictedState { + evictedDictIDs := mt.tagManager.EvictStaleEntries(staleTTL) + for _, id := range evictedDictIDs { + mt.invalidateTagCache(id) + } + evictedPatterns := mt.clusterManager.EvictStalePatterns(staleTTL) + evictedPatternIDs := make([]uint64, len(evictedPatterns)) + for i, p := range evictedPatterns { + evictedPatternIDs[i] = p.PatternID + } + mt.dictAdmission.reset() + return EvictedState{ + DictIDs: evictedDictIDs, + PatternIDs: evictedPatternIDs, } - mt.clusterManager.EvictStalePatterns(staleTTL) } // sendDictEntryDefine creates and sends a DictEntryDefine datum diff --git a/pkg/logs/sender/grpc/sender.go b/pkg/logs/sender/grpc/sender.go index 01cd342a2a86..212583a67396 100644 --- a/pkg/logs/sender/grpc/sender.go +++ b/pkg/logs/sender/grpc/sender.go @@ -88,7 +88,7 @@ type Sender struct { destinationsContext *client.DestinationsContext cfg pkgconfigmodel.Reader numberOfWorkers int - prepareForNewStream func() + prepareForNewStream func() EvictedState // Pipeline integration pipelineMonitor metrics.PipelineMonitor @@ -115,7 +115,7 @@ func NewSender( endpoints *config.Endpoints, destinationsCtx *client.DestinationsContext, compressor logscompression.Component, - prepareForNewStream func(), + prepareForNewStream func() EvictedState, ) *Sender { // For now, use the first reliable endpoint diff --git a/pkg/logs/sender/grpc/stream_worker.go b/pkg/logs/sender/grpc/stream_worker.go index 5cc8457b9cbb..5833561794ea 100644 --- a/pkg/logs/sender/grpc/stream_worker.go +++ b/pkg/logs/sender/grpc/stream_worker.go @@ -108,7 +108,7 @@ type streamWorker struct { // Configuration workerID string destinationsContext *client.DestinationsContext - prepareForNewStream func() + prepareForNewStream func() EvictedState // Pipeline integration inputChan chan *message.Payload @@ -161,7 +161,7 @@ func newStreamWorker( streamLifetime time.Duration, compressor compression.Compressor, maxInflight int, - prepareForNewStream func(), + prepareForNewStream func() EvictedState, ) *streamWorker { return newStreamWorkerWithClock(workerID, inputChan, destinationsCtx, conn, client, sink, endpoint, streamLifetime, compressor, clock.New(), nil, maxInflight, prepareForNewStream) @@ -181,7 +181,7 @@ func newStreamWorkerWithClock( clock clock.Clock, inflightTracker *inflightTracker, maxInflight int, - prepareForNewStream func(), + prepareForNewStream func() EvictedState, ) *streamWorker { backoffPolicy := backoff.NewExpBackoffPolicy( endpoint.BackoffFactor, @@ -547,7 +547,8 @@ func (s *streamWorker) finishStreamRotation(streamInfo *streamInfo) { s.inflight.resetOnRotation() if s.prepareForNewStream != nil { - s.prepareForNewStream() + evicted := s.prepareForNewStream() + s.inflight.pruneSnapshot(evicted) } log.Infof("Worker %s: Stream rotation complete, now active", s.workerID)