Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions pkg/logs/patterns/clustering/pattern_eviction.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,3 +150,29 @@ func (cm *ClusterManager) EvictToMemoryTarget(targetBytesToFree int64, decayFact
}
return patterns
}

// EvictStalePatterns removes all patterns that haven't been matched within the given TTL.
// Returns the evicted patterns so callers can send PatternDelete messages.
func (cm *ClusterManager) EvictStalePatterns(ttl time.Duration) []*Pattern {
cutoff := time.Now().Add(-ttl)

// Collect stale patterns under read lock
cm.mu.RLock()
var stale []*Pattern
for _, clusters := range cm.hashBuckets {
for _, cluster := range clusters {
for _, pattern := range cluster.Patterns {
if pattern.LastAccessAt.Before(cutoff) {
stale = append(stale, pattern)
}
}
}
}
cm.mu.RUnlock()

// Remove each stale pattern (removePattern takes its own write lock)
for _, pattern := range stale {
cm.removePattern(pattern)
}
return stale
}
20 changes: 20 additions & 0 deletions pkg/logs/patterns/tags/tag_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,26 @@ func (tm *TagManager) Get(tag string) (uint64, bool) {
return tm.GetStringID(tag)
}

// EvictStaleEntries removes all entries that haven't been accessed within the given TTL.
// Returns the IDs of evicted entries so callers can send DictEntryDelete messages.
func (tm *TagManager) EvictStaleEntries(ttl time.Duration) []uint64 {
cutoff := time.Now().Add(-ttl)

tm.mu.Lock()
defer tm.mu.Unlock()

var evictedIDs []uint64
for str, entry := range tm.stringToEntry {
if entry.lastAccessAt.Before(cutoff) {
evictedIDs = append(evictedIDs, entry.id)
tm.cachedMemoryBytes.Add(-entry.EstimatedBytes())
delete(tm.stringToEntry, str)
delete(tm.idToEntry, entry.id)
}
}
return evictedIDs
}

// Count returns the number of strings in the dictionary
func (tm *TagManager) Count() int {
tm.mu.RLock()
Expand Down
19 changes: 18 additions & 1 deletion pkg/logs/sender/grpc/mock_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
)

const nanoToMillis = 1000000
const staleTTL = 14 * time.Minute
const staleSweepInterval = 30 * time.Second

// batchEntry is a per-message sidecar used during batch tokenization.
// It keeps msg, preprocessed content, and JSON context fields aligned so that
Expand Down Expand Up @@ -72,7 +74,8 @@ type MessageTranslator struct {
tokenizer token.Tokenizer
jsonLogsAsRaw bool // when true, JSON logs bypass stateful encoding and are sent as RawLog

pipelineName string
pipelineName string
lastStaleSweep time.Time

// tagCache caches the last computed tag set to avoid recomputation across messages
// with identical metadata (common in single-source pipelines).
Expand Down Expand Up @@ -106,6 +109,7 @@ func NewMessageTranslator(pipelineName string, tokenizer token.Tokenizer) *Messa
tokenizer: tokenizer,
jsonLogsAsRaw: pkgconfigsetup.Datadog().GetBool("logs_config.patterns.json_as_raw"),
pipelineName: pipelineName,
lastStaleSweep: time.Now(),
}
tlmPipelineStateSize.Set(0, pipelineName)
return mt
Expand Down Expand Up @@ -326,6 +330,19 @@ func (mt *MessageTranslator) processPreTokenized(msg *message.Message, tokenList
}
}

// Periodic TTL sweep: remove entries not accessed in the last 5 minutes.
// This prevents stale entries from accumulating in state and inflating snapshot replays.
if time.Since(mt.lastStaleSweep) >= staleSweepInterval {
mt.lastStaleSweep = time.Now()

for _, evictedPattern := range mt.clusterManager.EvictStalePatterns(staleTTL) {
mt.sendPatternDelete(evictedPattern.PatternID, msg, outputChan)
}
for _, evictedID := range mt.tagManager.EvictStaleEntries(staleTTL) {
mt.sendDictEntryDelete(outputChan, msg, evictedID)
}
}

// Send PatternDefine for new or updated patterns
if patternDatum != nil {
mt.sendPatternDefine(patternDatum, msg, outputChan, &patternDefineSent, &patternDefineParamCount)
Expand Down
Loading