diff --git a/pkg/logs/patterns/tags/tag_manager.go b/pkg/logs/patterns/tags/tag_manager.go index fa086bcd78da..396d5c4ebb3a 100644 --- a/pkg/logs/patterns/tags/tag_manager.go +++ b/pkg/logs/patterns/tags/tag_manager.go @@ -17,12 +17,20 @@ import ( "github.com/DataDog/datadog-agent/pkg/util/log" ) +const ( + dynamicStringDictionaryThreshold = 2 + maxPendingDynamicStrings = 1_000_000 + minDynamicStringLength = 2 + maxDynamicStringLength = 128 +) + // TagManager manages a dictionary of unique tag strings (keys and values) to dictionary IDs. // It provides thread-safe operations for retrieving/creating IDs and building Tag proto messages // that reference those IDs. type TagManager struct { stringToEntry map[string]*tagEntry idToEntry map[uint64]*tagEntry + pendingDynamic map[string]uint16 nextID atomic.Uint64 cachedMemoryBytes atomic.Int64 mu sync.RWMutex @@ -31,8 +39,9 @@ type TagManager struct { // NewTagManager creates a new TagManager instance func NewTagManager() *TagManager { tm := &TagManager{ - stringToEntry: make(map[string]*tagEntry), - idToEntry: make(map[uint64]*tagEntry), + stringToEntry: make(map[string]*tagEntry), + idToEntry: make(map[uint64]*tagEntry), + pendingDynamic: make(map[string]uint16), } return tm } @@ -72,10 +81,58 @@ func (tm *TagManager) AddString(s string) (dictID uint64, isNew bool) { } tm.stringToEntry[s] = entry tm.idToEntry[id] = entry + delete(tm.pendingDynamic, s) tm.cachedMemoryBytes.Add(entry.EstimatedBytes()) return id, true } +// ObserveDynamicString records a dynamic string value and returns a dictionary ID only +// after the string has repeated enough times to justify defining it. This lets dynamic +// values such as log levels reuse the dictionary while keeping one-off high-cardinality +// tokens inline. +func (tm *TagManager) ObserveDynamicString(s string) (dictID uint64, isNew bool, shouldEncode bool) { + now := time.Now() + + tm.mu.Lock() + defer tm.mu.Unlock() + + if entry, exists := tm.stringToEntry[s]; exists { + entry.usageCount++ + entry.lastAccessAt = now + return entry.id, false, true + } + + if !isDynamicStringDictionaryCandidate(s) { + return 0, false, false + } + + count, tracked := tm.pendingDynamic[s] + if !tracked && len(tm.pendingDynamic) >= maxPendingDynamicStrings { + return 0, false, false + } + if count < dynamicStringDictionaryThreshold { + count++ + } + if count < dynamicStringDictionaryThreshold { + tm.pendingDynamic[s] = count + return 0, false, false + } + + id := tm.nextID.Add(1) + entry := &tagEntry{ + id: id, + str: s, + usageCount: int64(count), + createdAt: now, + lastAccessAt: now, + } + tm.stringToEntry[s] = entry + tm.idToEntry[id] = entry + delete(tm.pendingDynamic, s) + tm.cachedMemoryBytes.Add(entry.EstimatedBytes()) + return id, true, true +} + // EncodeTagStrings converts a slice of "key:value" tag strings into Tag proto messages // backed by dictionary indices. It returns the encoded tags plus the dictionary entries // that must be flushed upstream (ID -> string) for any newly-seen key/value strings. @@ -211,3 +268,70 @@ func dictIndexValue(id uint64) *statefulpb.DynamicValue { }, } } + +func isDynamicStringDictionaryCandidate(s string) bool { + if len(s) < minDynamicStringLength || len(s) > maxDynamicStringLength { + return false + } + if looksLikeUUID(s) || looksLikeTimestamp(s) { + return false + } + return true +} + +func looksLikeUUID(s string) bool { + if len(s) == 36 { + for i := 0; i < len(s); i++ { + switch i { + case 8, 13, 18, 23: + if s[i] != '-' { + return false + } + default: + if !isHex(s[i]) { + return false + } + } + } + return true + } + if len(s) == 32 { + for i := 0; i < len(s); i++ { + if !isHex(s[i]) { + return false + } + } + return true + } + return false +} + +func looksLikeTimestamp(s string) bool { + if len(s) < len("2006-01-02") { + return false + } + if !isDigit(s[0]) || !isDigit(s[1]) || !isDigit(s[2]) || !isDigit(s[3]) || + s[4] != '-' || + !isDigit(s[5]) || !isDigit(s[6]) || + s[7] != '-' || + !isDigit(s[8]) || !isDigit(s[9]) { + return false + } + if len(s) == len("2006-01-02") { + return true + } + switch s[10] { + case 'T', ' ', '_': + return true + default: + return false + } +} + +func isHex(c byte) bool { + return isDigit(c) || ('a' <= c && c <= 'f') || ('A' <= c && c <= 'F') +} + +func isDigit(c byte) bool { + return '0' <= c && c <= '9' +} diff --git a/pkg/logs/patterns/tags/tag_manager_test.go b/pkg/logs/patterns/tags/tag_manager_test.go index 7762fcf148d5..b2c8d061d4e9 100644 --- a/pkg/logs/patterns/tags/tag_manager_test.go +++ b/pkg/logs/patterns/tags/tag_manager_test.go @@ -132,6 +132,64 @@ func TestTagManager_GetStringID(t *testing.T) { assert.Equal(t, uint64(0), id) } +func TestTagManager_ObserveDynamicStringAddsAfterRepeatedUse(t *testing.T) { + tm := NewTagManager() + + dictID, isNew, shouldEncode := tm.ObserveDynamicString("INFO") + assert.Zero(t, dictID) + assert.False(t, isNew) + assert.False(t, shouldEncode) + assert.Equal(t, 0, tm.Count()) + + dictID, isNew, shouldEncode = tm.ObserveDynamicString("INFO") + assert.NotZero(t, dictID) + assert.True(t, isNew) + assert.True(t, shouldEncode) + assert.Equal(t, 1, tm.Count()) + + dictIDAgain, isNew, shouldEncode := tm.ObserveDynamicString("INFO") + assert.Equal(t, dictID, dictIDAgain) + assert.False(t, isNew) + assert.True(t, shouldEncode) + assert.Equal(t, 1, tm.Count()) +} + +func TestTagManager_ObserveDynamicStringUsesExistingEntry(t *testing.T) { + tm := NewTagManager() + existingID, added := tm.AddString("INFO") + require.True(t, added) + + dictID, isNew, shouldEncode := tm.ObserveDynamicString("INFO") + assert.Equal(t, existingID, dictID) + assert.False(t, isNew) + assert.True(t, shouldEncode) + assert.Equal(t, 1, tm.Count()) +} + +func TestTagManager_ObserveDynamicStringSkipsHighCardinalityShapes(t *testing.T) { + tm := NewTagManager() + + values := []string{ + "a", + "550e8400-e29b-41d4-a716-446655440000", + "550e8400e29b41d4a716446655440000", + "2026-04-28", + "2026-04-28T12:34:56Z", + } + + for _, value := range values { + t.Run(value, func(t *testing.T) { + for i := 0; i < 3; i++ { + dictID, isNew, shouldEncode := tm.ObserveDynamicString(value) + assert.Zero(t, dictID) + assert.False(t, isNew) + assert.False(t, shouldEncode) + } + }) + } + assert.Equal(t, 0, tm.Count()) +} + func TestTagManager_Concurrency(t *testing.T) { tm := NewTagManager() diff --git a/pkg/logs/sender/grpc/mock_state.go b/pkg/logs/sender/grpc/mock_state.go index 710606e84a1f..abdaca243e47 100644 --- a/pkg/logs/sender/grpc/mock_state.go +++ b/pkg/logs/sender/grpc/mock_state.go @@ -393,8 +393,8 @@ func (mt *MessageTranslator) processPreTokenized(msg *message.Message, tokenList // (dvBacking + intBacking + dictBacking + stringBacking + dynamicValues). // Each wildcard position uses exactly one field of dvTypeBackings; the unused // fields consume memory but avoid per-position heap allocs. - // High-cardinality values (UUIDs, IPs, request IDs) that are not already in the - // dict are sent inline as string_value — no dict entry created, stopping unbounded + // Repeated low-cardinality values can enter the dictionary once they cross the + // admission threshold. One-off or filtered values stay inline to avoid unbounded // TagManager growth. n := len(wildcardValues) dvBacking := make([]statefulpb.DynamicValue, n) @@ -404,7 +404,10 @@ func (mt *MessageTranslator) processPreTokenized(msg *message.Message, tokenList dynamicValues[i] = &dvBacking[i] } for i, val := range wildcardValues { - mt.fillWildcardDynamicValue(&dvBacking[i], &typeBacking[i].intOneof, &typeBacking[i].dictOneof, &typeBacking[i].stringOneof, val) + dictID, dictValue, isNew := mt.fillWildcardDynamicValue(&dvBacking[i], &typeBacking[i].intOneof, &typeBacking[i].dictOneof, &typeBacking[i].stringOneof, val) + if isNew { + mt.sendDictEntryDefine(outputChan, msg, dictID, dictValue) + } } // Encode message key as dict entry @@ -434,7 +437,7 @@ func (mt *MessageTranslator) processPreTokenized(msg *message.Message, tokenList jsonContextValuesDV[i] = &jsonContextDVBacking[i] } for i, val := range jsonContextValues { - mt.fillDynamicValue( + dictID, dictValue, isNew := mt.fillDynamicValue( &jsonContextDVBacking[i], &jsonContextTypeBacking[i].intOneof, &jsonContextTypeBacking[i].floatOneof, @@ -444,6 +447,9 @@ func (mt *MessageTranslator) processPreTokenized(msg *message.Message, tokenList &jsonContextTypeBacking[i].stringOneof, val, ) + if isNew { + mt.sendDictEntryDefine(outputChan, msg, dictID, dictValue) + } } } @@ -803,78 +809,79 @@ func (mt *MessageTranslator) fillDynamicValue( oneofRawJSON *statefulpb.DynamicValue_RawJsonValue, oneofStr *statefulpb.DynamicValue_StringValue, value interface{}, -) { +) (dictID uint64, dictValue string, isNew bool) { dv.RenderAsString = false switch typed := value.(type) { case nil: dv.Value = nil - return + return 0, "", false case string: if intVal, ok := parseLosslessIntString(typed); ok { oneofInt.IntValue = intVal dv.Value = oneofInt dv.RenderAsString = true - return + return 0, "", false } if floatVal, ok := parseLosslessFloatString(typed); ok { oneofFloat.FloatValue = floatVal dv.Value = oneofFloat dv.RenderAsString = true - return + return 0, "", false } if boolVal, ok := parseLosslessBoolString(typed); ok { oneofBool.BoolValue = boolVal dv.Value = oneofBool dv.RenderAsString = true - return + return 0, "", false } - if dictID, ok := mt.tagManager.GetStringID(typed); ok { + typed = toValidUTF8(typed) + if dictID, isNew, shouldEncode := mt.tagManager.ObserveDynamicString(typed); shouldEncode { oneofDict.DictIndex = dictID dv.Value = oneofDict - return + return dictID, typed, isNew } oneofStr.StringValue = typed dv.Value = oneofStr - return + return 0, "", false case json.Number: raw := typed.String() if intVal, ok := parseLosslessIntString(raw); ok { oneofInt.IntValue = intVal dv.Value = oneofInt - return + return 0, "", false } if floatVal, ok := parseLosslessFloatString(raw); ok { oneofFloat.FloatValue = floatVal dv.Value = oneofFloat - return + return 0, "", false } oneofRawJSON.RawJsonValue = []byte(raw) dv.Value = oneofRawJSON - return + return 0, "", false case float64: if !math.IsInf(typed, 0) && !math.IsNaN(typed) && math.Trunc(typed) == typed && typed >= math.MinInt64 && typed <= math.MaxInt64 { oneofInt.IntValue = int64(typed) dv.Value = oneofInt - return + return 0, "", false } oneofFloat.FloatValue = typed dv.Value = oneofFloat - return + return 0, "", false case bool: oneofBool.BoolValue = typed dv.Value = oneofBool - return + return 0, "", false default: rawJSON, err := json.Marshal(typed) if err != nil { log.Warnf("Failed to marshal nested JSON context value: %v", err) oneofStr.StringValue = "" dv.Value = oneofStr - return + return 0, "", false } oneofRawJSON.RawJsonValue = rawJSON dv.Value = oneofRawJSON - return + return 0, "", false } } @@ -884,7 +891,7 @@ func (mt *MessageTranslator) fillWildcardDynamicValue( oneofDict *statefulpb.DynamicValue_DictIndex, oneofStr *statefulpb.DynamicValue_StringValue, value string, -) { +) (dictID uint64, dictValue string, isNew bool) { dv.RenderAsString = false // Only canonical base-10 integers are safe to encode numerically without // changing the original token's lexeme. @@ -892,20 +899,19 @@ func (mt *MessageTranslator) fillWildcardDynamicValue( oneofInt.IntValue = intVal dv.Value = oneofInt dv.RenderAsString = true - return + return 0, "", false } - // Already in dict (e.g., from a previous tag encoding): reuse existing ID - if dictID, ok := mt.tagManager.GetStringID(value); ok { + value = toValidUTF8(value) + if dictID, isNew, shouldEncode := mt.tagManager.ObserveDynamicString(value); shouldEncode { oneofDict.DictIndex = dictID dv.Value = oneofDict - return + return dictID, value, isNew } - // New value: send inline as string_value — no dict entry created. - // High-cardinality values (UUIDs, IPs, request IDs) never repeat, - // so dict encoding provides zero compression benefit for them. - // Proto3 requires valid UTF-8; replace invalid sequences to avoid corrupt datums. - oneofStr.StringValue = toValidUTF8(value) + // New or filtered value: send inline as string_value — no dict entry created. + // Proto3 requires valid UTF-8; invalid sequences were already replaced above. + oneofStr.StringValue = value dv.Value = oneofStr + return 0, "", false } // buildStructuredLog creates a Datum containing a StructuredLog diff --git a/pkg/logs/sender/grpc/mock_state_dynamic_value_test.go b/pkg/logs/sender/grpc/mock_state_dynamic_value_test.go index 55e5853200f6..7809358c8b83 100644 --- a/pkg/logs/sender/grpc/mock_state_dynamic_value_test.go +++ b/pkg/logs/sender/grpc/mock_state_dynamic_value_test.go @@ -137,6 +137,95 @@ func TestFillDynamicValue_PreservesNonCanonicalNumericStrings(t *testing.T) { assert.True(t, dv.RenderAsString) } +func TestFillDynamicValue_AddsRepeatedStringsToDictionary(t *testing.T) { + mt := NewMessageTranslator("test-pipeline", rtokenizer.NewRustTokenizer()) + + var dv statefulpb.DynamicValue + var intOneof statefulpb.DynamicValue_IntValue + var floatOneof statefulpb.DynamicValue_FloatValue + var boolOneof statefulpb.DynamicValue_BoolValue + var dictOneof statefulpb.DynamicValue_DictIndex + var rawJSONOneof statefulpb.DynamicValue_RawJsonValue + var strOneof statefulpb.DynamicValue_StringValue + + dictID, dictValue, isNew := mt.fillDynamicValue(&dv, &intOneof, &floatOneof, &boolOneof, &dictOneof, &rawJSONOneof, &strOneof, "INFO") + stringValue, ok := dv.Value.(*statefulpb.DynamicValue_StringValue) + require.True(t, ok) + assert.Equal(t, "INFO", stringValue.StringValue) + assert.Zero(t, dictID) + assert.Empty(t, dictValue) + assert.False(t, isNew) + assert.Equal(t, 0, mt.tagManager.Count()) + + dictID, dictValue, isNew = mt.fillDynamicValue(&dv, &intOneof, &floatOneof, &boolOneof, &dictOneof, &rawJSONOneof, &strOneof, "INFO") + dictIndex, ok := dv.Value.(*statefulpb.DynamicValue_DictIndex) + require.True(t, ok) + assert.Equal(t, dictID, dictIndex.DictIndex) + assert.Equal(t, "INFO", dictValue) + assert.True(t, isNew) + assert.Equal(t, 1, mt.tagManager.Count()) + + dictIDAgain, dictValue, isNew := mt.fillDynamicValue(&dv, &intOneof, &floatOneof, &boolOneof, &dictOneof, &rawJSONOneof, &strOneof, "INFO") + dictIndex, ok = dv.Value.(*statefulpb.DynamicValue_DictIndex) + require.True(t, ok) + assert.Equal(t, dictID, dictIDAgain) + assert.Equal(t, dictID, dictIndex.DictIndex) + assert.Equal(t, "INFO", dictValue) + assert.False(t, isNew) +} + +func TestFillWildcardDynamicValue_AddsRepeatedStringsToDictionary(t *testing.T) { + mt := NewMessageTranslator("test-pipeline", rtokenizer.NewRustTokenizer()) + + var dv statefulpb.DynamicValue + var intOneof statefulpb.DynamicValue_IntValue + var dictOneof statefulpb.DynamicValue_DictIndex + var strOneof statefulpb.DynamicValue_StringValue + + dictID, dictValue, isNew := mt.fillWildcardDynamicValue(&dv, &intOneof, &dictOneof, &strOneof, "INFO") + stringValue, ok := dv.Value.(*statefulpb.DynamicValue_StringValue) + require.True(t, ok) + assert.Equal(t, "INFO", stringValue.StringValue) + assert.Zero(t, dictID) + assert.Empty(t, dictValue) + assert.False(t, isNew) + + dictID, dictValue, isNew = mt.fillWildcardDynamicValue(&dv, &intOneof, &dictOneof, &strOneof, "INFO") + dictIndex, ok := dv.Value.(*statefulpb.DynamicValue_DictIndex) + require.True(t, ok) + assert.Equal(t, dictID, dictIndex.DictIndex) + assert.Equal(t, "INFO", dictValue) + assert.True(t, isNew) +} + +func TestFillWildcardDynamicValue_DoesNotDictionaryEncodeUUIDsOrTimestamps(t *testing.T) { + mt := NewMessageTranslator("test-pipeline", rtokenizer.NewRustTokenizer()) + + var dv statefulpb.DynamicValue + var intOneof statefulpb.DynamicValue_IntValue + var dictOneof statefulpb.DynamicValue_DictIndex + var strOneof statefulpb.DynamicValue_StringValue + + values := []string{ + "550e8400-e29b-41d4-a716-446655440000", + "2026-04-28T12:34:56Z", + } + for _, value := range values { + t.Run(value, func(t *testing.T) { + for i := 0; i < 3; i++ { + dictID, dictValue, isNew := mt.fillWildcardDynamicValue(&dv, &intOneof, &dictOneof, &strOneof, value) + stringValue, ok := dv.Value.(*statefulpb.DynamicValue_StringValue) + require.True(t, ok) + assert.Equal(t, value, stringValue.StringValue) + assert.Zero(t, dictID) + assert.Empty(t, dictValue) + assert.False(t, isNew) + } + }) + } + assert.Equal(t, 0, mt.tagManager.Count()) +} + func TestFillDynamicValue_PreservesTypedJSONValues(t *testing.T) { mt := NewMessageTranslator("test-pipeline", rtokenizer.NewRustTokenizer())