Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pkg/logs/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type Pipeline struct {
processor *processor.Processor
strategy sender.Strategy
pipelineMonitor metrics.PipelineMonitor
prepareForNewStream func()
prepareForNewStream func() grpcsender.EvictedState
}

// NewPipeline returns a new Pipeline
Expand Down Expand Up @@ -107,7 +107,7 @@ func getStrategy(
compressor logscompression.Component,
instanceID string,
cfg pkgconfigmodel.Reader,
) (sender.Strategy, func()) {
) (sender.Strategy, func() grpcsender.EvictedState) {
if endpoints.UseGRPC || endpoints.UseHTTP || serverlessMeta.IsEnabled() {
var encoder compressioncommon.Compressor
encoder = compressor.NewCompressor(compressioncommon.NoneKind, 0)
Expand All @@ -125,7 +125,7 @@ func getStrategy(
}
if grpcEndpoint, ok := firstGRPCAdditionalEndpoint(endpoints); ok && !serverlessMeta.IsEnabled() {
grpcComp := buildEndpointCompressor(compressor, grpcEndpoint)
return grpcsender.NewDualStrategy(inputChan, outputChan, flushChan, grpcEndpoint, grpcComp, cfg, endpoints, serverlessMeta, encoder, pipelineMonitor, instanceID), nil
return grpcsender.NewDualStrategy(inputChan, outputChan, flushChan, grpcEndpoint, grpcComp, cfg, endpoints, serverlessMeta, encoder, pipelineMonitor, instanceID), translator.PrepareForNewStream
}
return sender.NewBatchStrategy(
inputChan,
Expand Down
10 changes: 7 additions & 3 deletions pkg/logs/pipeline/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ type provider struct {
routerChannels []chan *message.Message
currentRouterIndex *atomic.Uint32
forwarderWaitGroup sync.WaitGroup
grpcPrepareHooks []func()
grpcPrepareHooks []func() grpcsender.EvictedState
}

// NewProvider returns a new Provider.
Expand Down Expand Up @@ -116,12 +116,16 @@ func NewProvider(
}

if endpoints.UseGRPC {
senderImpl = grpcsender.NewSender(numberOfPipelines, cfg, sink, endpoints, destinationsContext, compression, func() {
senderImpl = grpcsender.NewSender(numberOfPipelines, cfg, sink, endpoints, destinationsContext, compression, func() grpcsender.EvictedState {
var merged grpcsender.EvictedState
for _, hook := range p.grpcPrepareHooks {
if hook != nil {
hook()
evicted := hook()
merged.DictIDs = append(merged.DictIDs, evicted.DictIDs...)
merged.PatternIDs = append(merged.PatternIDs, evicted.PatternIDs...)
}
}
return merged
})
} else if endpoints.UseHTTP {
if _, ok := firstGRPCAdditionalEndpoint(endpoints); ok {
Expand Down
7 changes: 7 additions & 0 deletions pkg/logs/sender/grpc/.bits/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"permissions": {
"allow": [
"Bash(go:*)"
]
}
}
83 changes: 83 additions & 0 deletions pkg/logs/sender/grpc/dict_admission.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

package grpc

const (
// defaultDictAdmissionThreshold is the number of times a longer string
// must be seen before it is promoted into the dictionary.
defaultDictAdmissionThreshold uint16 = 3

// defaultDictMaxShortLen is the byte-length at or below which a string
// is promoted immediately (heuristic path). Covers log levels (INFO,
// ERROR), HTTP methods (GET, POST), short status words (none, success).
defaultDictMaxShortLen = 8

// defaultDictMaxTracked caps the number of candidate strings tracked
// for count-based promotion to bound memory usage.
defaultDictMaxTracked = 4096
)

// dictAdmission decides whether a JSON-context string value should be
// promoted into the stream dictionary. Two complementary strategies:
//
// 1. Heuristic – strings ≤ maxShortLen bytes are admitted immediately.
// Short strings are almost always categorical (log levels, HTTP
// methods, enum labels) and pay for their dictionary define on the
// second occurrence.
//
// 2. Frequency – longer strings are counted; once a string has been
// seen threshold times it is promoted. This catches things like
// route names, source identifiers, and URL paths that repeat often
// but are too long for the heuristic.
type dictAdmission struct {
counts map[string]uint16
threshold uint16
maxShortLen int
maxTracked int
}

func newDictAdmission() *dictAdmission {
return &dictAdmission{
counts: make(map[string]uint16),
threshold: defaultDictAdmissionThreshold,
maxShortLen: defaultDictMaxShortLen,
maxTracked: defaultDictMaxTracked,
}
}

// shouldAdmit returns true when s should be added to the dictionary.
func (da *dictAdmission) shouldAdmit(s string) bool {
// Heuristic: short strings are almost always categorical.
if len(s) <= da.maxShortLen {
return true
}

// Frequency: promote after threshold occurrences.
n := da.counts[s] + 1
if n >= da.threshold {
delete(da.counts, s) // no longer need to track
return true
}

// Only start tracking if we have capacity.
if len(da.counts) < da.maxTracked {
da.counts[s] = n
}
return false
}

// reset clears accumulated counts. Called on stream rotation so that
// stale candidates don't persist across streams.
func (da *dictAdmission) reset() {
// Re-use the map when it's small to reduce GC pressure.
if len(da.counts) <= 256 {
for k := range da.counts {
delete(da.counts, k)
}
} else {
da.counts = make(map[string]uint16)
}
}
12 changes: 12 additions & 0 deletions pkg/logs/sender/grpc/inflight.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,18 @@ func (t *inflightTracker) getSnapshot() []byte {
return t.snapshot.serialize()
}

// pruneSnapshot removes evicted dict and pattern entries from the snapshot.
// Called after PrepareForNewStream so the snapshot sent on the new stream
// does not contain state that has been evicted from the translator.
func (t *inflightTracker) pruneSnapshot(evicted EvictedState) {
for _, id := range evicted.DictIDs {
delete(t.snapshot.dictMap, id)
}
for _, id := range evicted.PatternIDs {
delete(t.snapshot.patternMap, id)
}
}

// snapshotState maintains the accumulated state changes for stream bootstrapping
// It represents the state "before" the first payload in the inflight queue
type snapshotState struct {
Expand Down
30 changes: 25 additions & 5 deletions pkg/logs/sender/grpc/mock_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ type MessageTranslator struct {
tokenizer token.Tokenizer
jsonLogsAsRaw bool // when true, JSON logs bypass stateful encoding and are sent as RawLog

pipelineName string
pipelineName string
dictAdmission *dictAdmission

// tagCache caches the last computed tag set to avoid recomputation across messages
// with identical metadata (common in single-source pipelines).
Expand Down Expand Up @@ -136,6 +137,7 @@ func NewMessageTranslator(pipelineName string, tokenizer token.Tokenizer) *Messa
tokenizer: tokenizer,
jsonLogsAsRaw: pkgconfigsetup.Datadog().GetBool("logs_config.patterns.json_as_raw"),
pipelineName: pipelineName,
dictAdmission: newDictAdmission(),
}
tlmPipelineStateSize.Set(0, pipelineName)
return mt
Expand Down Expand Up @@ -596,12 +598,30 @@ func (mt *MessageTranslator) sendPatternDelete(patternID uint64, msg *message.Me
}
}

// EvictedState holds IDs evicted during PrepareForNewStream so the stream
// worker can prune its snapshot before sending it on a new stream.
type EvictedState struct {
DictIDs []uint64
PatternIDs []uint64
}

// PrepareForNewStream performs local-only stale eviction before a new stream snapshot is built.
func (mt *MessageTranslator) PrepareForNewStream() {
for _, evictedID := range mt.tagManager.EvictStaleEntries(staleTTL) {
mt.invalidateTagCache(evictedID)
// Returns the evicted dict and pattern IDs so the caller can prune the inflight snapshot.
func (mt *MessageTranslator) PrepareForNewStream() EvictedState {
evictedDictIDs := mt.tagManager.EvictStaleEntries(staleTTL)
for _, id := range evictedDictIDs {
mt.invalidateTagCache(id)
}
evictedPatterns := mt.clusterManager.EvictStalePatterns(staleTTL)
evictedPatternIDs := make([]uint64, len(evictedPatterns))
for i, p := range evictedPatterns {
evictedPatternIDs[i] = p.PatternID
}
mt.dictAdmission.reset()
return EvictedState{
DictIDs: evictedDictIDs,
PatternIDs: evictedPatternIDs,
}
mt.clusterManager.EvictStalePatterns(staleTTL)
}

// sendDictEntryDefine creates and sends a DictEntryDefine datum
Expand Down
4 changes: 2 additions & 2 deletions pkg/logs/sender/grpc/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ type Sender struct {
destinationsContext *client.DestinationsContext
cfg pkgconfigmodel.Reader
numberOfWorkers int
prepareForNewStream func()
prepareForNewStream func() EvictedState

// Pipeline integration
pipelineMonitor metrics.PipelineMonitor
Expand All @@ -115,7 +115,7 @@ func NewSender(
endpoints *config.Endpoints,
destinationsCtx *client.DestinationsContext,
compressor logscompression.Component,
prepareForNewStream func(),
prepareForNewStream func() EvictedState,
) *Sender {

// For now, use the first reliable endpoint
Expand Down
9 changes: 5 additions & 4 deletions pkg/logs/sender/grpc/stream_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ type streamWorker struct {
// Configuration
workerID string
destinationsContext *client.DestinationsContext
prepareForNewStream func()
prepareForNewStream func() EvictedState

// Pipeline integration
inputChan chan *message.Payload
Expand Down Expand Up @@ -161,7 +161,7 @@ func newStreamWorker(
streamLifetime time.Duration,
compressor compression.Compressor,
maxInflight int,
prepareForNewStream func(),
prepareForNewStream func() EvictedState,
) *streamWorker {
return newStreamWorkerWithClock(workerID, inputChan, destinationsCtx, conn, client, sink,
endpoint, streamLifetime, compressor, clock.New(), nil, maxInflight, prepareForNewStream)
Expand All @@ -181,7 +181,7 @@ func newStreamWorkerWithClock(
clock clock.Clock,
inflightTracker *inflightTracker,
maxInflight int,
prepareForNewStream func(),
prepareForNewStream func() EvictedState,
) *streamWorker {
backoffPolicy := backoff.NewExpBackoffPolicy(
endpoint.BackoffFactor,
Expand Down Expand Up @@ -547,7 +547,8 @@ func (s *streamWorker) finishStreamRotation(streamInfo *streamInfo) {
s.inflight.resetOnRotation()

if s.prepareForNewStream != nil {
s.prepareForNewStream()
evicted := s.prepareForNewStream()
s.inflight.pruneSnapshot(evicted)
}

log.Infof("Worker %s: Stream rotation complete, now active", s.workerID)
Expand Down
Loading