diff --git a/pkg/logs/patterns/clustering/pattern_eviction.go b/pkg/logs/patterns/clustering/pattern_eviction.go index fc85113d17b0..49c9bde1ca15 100644 --- a/pkg/logs/patterns/clustering/pattern_eviction.go +++ b/pkg/logs/patterns/clustering/pattern_eviction.go @@ -150,3 +150,29 @@ func (cm *ClusterManager) EvictToMemoryTarget(targetBytesToFree int64, decayFact } return patterns } + +// EvictStalePatterns removes all patterns that haven't been matched within the given TTL. +// Returns the evicted patterns so callers can send PatternDelete messages. +func (cm *ClusterManager) EvictStalePatterns(ttl time.Duration) []*Pattern { + cutoff := time.Now().Add(-ttl) + + // Collect stale patterns under read lock + cm.mu.RLock() + var stale []*Pattern + for _, clusters := range cm.hashBuckets { + for _, cluster := range clusters { + for _, pattern := range cluster.Patterns { + if pattern.LastAccessAt.Before(cutoff) { + stale = append(stale, pattern) + } + } + } + } + cm.mu.RUnlock() + + // Remove each stale pattern (removePattern takes its own write lock) + for _, pattern := range stale { + cm.removePattern(pattern) + } + return stale +} diff --git a/pkg/logs/patterns/tags/tag_manager.go b/pkg/logs/patterns/tags/tag_manager.go index df3e852faea4..66d4b3d9b831 100644 --- a/pkg/logs/patterns/tags/tag_manager.go +++ b/pkg/logs/patterns/tags/tag_manager.go @@ -153,6 +153,26 @@ func (tm *TagManager) Get(tag string) (uint64, bool) { return tm.GetStringID(tag) } +// EvictStaleEntries removes all entries that haven't been accessed within the given TTL. +// Returns the IDs of evicted entries so callers can send DictEntryDelete messages. +func (tm *TagManager) EvictStaleEntries(ttl time.Duration) []uint64 { + cutoff := time.Now().Add(-ttl) + + tm.mu.Lock() + defer tm.mu.Unlock() + + var evictedIDs []uint64 + for str, entry := range tm.stringToEntry { + if entry.lastAccessAt.Before(cutoff) { + evictedIDs = append(evictedIDs, entry.id) + tm.cachedMemoryBytes.Add(-entry.EstimatedBytes()) + delete(tm.stringToEntry, str) + delete(tm.idToEntry, entry.id) + } + } + return evictedIDs +} + // Count returns the number of strings in the dictionary func (tm *TagManager) Count() int { tm.mu.RLock() diff --git a/pkg/logs/sender/grpc/mock_state.go b/pkg/logs/sender/grpc/mock_state.go index 2cbd349c03f4..bc320a64af93 100644 --- a/pkg/logs/sender/grpc/mock_state.go +++ b/pkg/logs/sender/grpc/mock_state.go @@ -23,6 +23,8 @@ import ( ) const nanoToMillis = 1000000 +const staleTTL = 14 * time.Minute +const staleSweepInterval = 30 * time.Second // batchEntry is a per-message sidecar used during batch tokenization. // It keeps msg, preprocessed content, and JSON context fields aligned so that @@ -72,7 +74,8 @@ type MessageTranslator struct { tokenizer token.Tokenizer jsonLogsAsRaw bool // when true, JSON logs bypass stateful encoding and are sent as RawLog - pipelineName string + pipelineName string + lastStaleSweep time.Time // tagCache caches the last computed tag set to avoid recomputation across messages // with identical metadata (common in single-source pipelines). @@ -106,6 +109,7 @@ func NewMessageTranslator(pipelineName string, tokenizer token.Tokenizer) *Messa tokenizer: tokenizer, jsonLogsAsRaw: pkgconfigsetup.Datadog().GetBool("logs_config.patterns.json_as_raw"), pipelineName: pipelineName, + lastStaleSweep: time.Now(), } tlmPipelineStateSize.Set(0, pipelineName) return mt @@ -326,6 +330,19 @@ func (mt *MessageTranslator) processPreTokenized(msg *message.Message, tokenList } } + // Periodic TTL sweep: remove entries not accessed in the last 5 minutes. + // This prevents stale entries from accumulating in state and inflating snapshot replays. + if time.Since(mt.lastStaleSweep) >= staleSweepInterval { + mt.lastStaleSweep = time.Now() + + for _, evictedPattern := range mt.clusterManager.EvictStalePatterns(staleTTL) { + mt.sendPatternDelete(evictedPattern.PatternID, msg, outputChan) + } + for _, evictedID := range mt.tagManager.EvictStaleEntries(staleTTL) { + mt.sendDictEntryDelete(outputChan, msg, evictedID) + } + } + // Send PatternDefine for new or updated patterns if patternDatum != nil { mt.sendPatternDefine(patternDatum, msg, outputChan, &patternDefineSent, &patternDefineParamCount)