Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 126 additions & 2 deletions pkg/logs/patterns/tags/tag_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,20 @@ import (
"github.com/DataDog/datadog-agent/pkg/util/log"
)

const (
dynamicStringDictionaryThreshold = 2
maxPendingDynamicStrings = 1_000_000
minDynamicStringLength = 2
maxDynamicStringLength = 128
)

// TagManager manages a dictionary of unique tag strings (keys and values) to dictionary IDs.
// It provides thread-safe operations for retrieving/creating IDs and building Tag proto messages
// that reference those IDs.
type TagManager struct {
stringToEntry map[string]*tagEntry
idToEntry map[uint64]*tagEntry
pendingDynamic map[string]uint16
nextID atomic.Uint64
cachedMemoryBytes atomic.Int64
mu sync.RWMutex
Expand All @@ -31,8 +39,9 @@ type TagManager struct {
// NewTagManager creates a new TagManager instance
func NewTagManager() *TagManager {
tm := &TagManager{
stringToEntry: make(map[string]*tagEntry),
idToEntry: make(map[uint64]*tagEntry),
stringToEntry: make(map[string]*tagEntry),
idToEntry: make(map[uint64]*tagEntry),
pendingDynamic: make(map[string]uint16),
}
return tm
}
Expand Down Expand Up @@ -72,10 +81,58 @@ func (tm *TagManager) AddString(s string) (dictID uint64, isNew bool) {
}
tm.stringToEntry[s] = entry
tm.idToEntry[id] = entry
delete(tm.pendingDynamic, s)
tm.cachedMemoryBytes.Add(entry.EstimatedBytes())
return id, true
}

// ObserveDynamicString records a dynamic string value and returns a dictionary ID only
// after the string has repeated enough times to justify defining it. This lets dynamic
// values such as log levels reuse the dictionary while keeping one-off high-cardinality
// tokens inline.
func (tm *TagManager) ObserveDynamicString(s string) (dictID uint64, isNew bool, shouldEncode bool) {
now := time.Now()

tm.mu.Lock()
defer tm.mu.Unlock()

if entry, exists := tm.stringToEntry[s]; exists {
entry.usageCount++
entry.lastAccessAt = now
return entry.id, false, true
}

if !isDynamicStringDictionaryCandidate(s) {
return 0, false, false
}

count, tracked := tm.pendingDynamic[s]
if !tracked && len(tm.pendingDynamic) >= maxPendingDynamicStrings {
return 0, false, false
}
if count < dynamicStringDictionaryThreshold {
count++
}
if count < dynamicStringDictionaryThreshold {
tm.pendingDynamic[s] = count
return 0, false, false
}

id := tm.nextID.Add(1)
entry := &tagEntry{
id: id,
str: s,
usageCount: int64(count),
createdAt: now,
lastAccessAt: now,
}
tm.stringToEntry[s] = entry
tm.idToEntry[id] = entry
delete(tm.pendingDynamic, s)
tm.cachedMemoryBytes.Add(entry.EstimatedBytes())
return id, true, true
}

// EncodeTagStrings converts a slice of "key:value" tag strings into Tag proto messages
// backed by dictionary indices. It returns the encoded tags plus the dictionary entries
// that must be flushed upstream (ID -> string) for any newly-seen key/value strings.
Expand Down Expand Up @@ -211,3 +268,70 @@ func dictIndexValue(id uint64) *statefulpb.DynamicValue {
},
}
}

func isDynamicStringDictionaryCandidate(s string) bool {
if len(s) < minDynamicStringLength || len(s) > maxDynamicStringLength {
return false
}
if looksLikeUUID(s) || looksLikeTimestamp(s) {
return false
}
return true
}

func looksLikeUUID(s string) bool {
if len(s) == 36 {
for i := 0; i < len(s); i++ {
switch i {
case 8, 13, 18, 23:
if s[i] != '-' {
return false
}
default:
if !isHex(s[i]) {
return false
}
}
}
return true
}
if len(s) == 32 {
for i := 0; i < len(s); i++ {
if !isHex(s[i]) {
return false
}
}
return true
}
return false
}

func looksLikeTimestamp(s string) bool {
if len(s) < len("2006-01-02") {
return false
}
if !isDigit(s[0]) || !isDigit(s[1]) || !isDigit(s[2]) || !isDigit(s[3]) ||
s[4] != '-' ||
!isDigit(s[5]) || !isDigit(s[6]) ||
s[7] != '-' ||
!isDigit(s[8]) || !isDigit(s[9]) {
return false
}
if len(s) == len("2006-01-02") {
return true
}
switch s[10] {
case 'T', ' ', '_':
return true
default:
return false
}
}

func isHex(c byte) bool {
return isDigit(c) || ('a' <= c && c <= 'f') || ('A' <= c && c <= 'F')
}

func isDigit(c byte) bool {
return '0' <= c && c <= '9'
}
58 changes: 58 additions & 0 deletions pkg/logs/patterns/tags/tag_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,64 @@ func TestTagManager_GetStringID(t *testing.T) {
assert.Equal(t, uint64(0), id)
}

func TestTagManager_ObserveDynamicStringAddsAfterRepeatedUse(t *testing.T) {
tm := NewTagManager()

dictID, isNew, shouldEncode := tm.ObserveDynamicString("INFO")
assert.Zero(t, dictID)
assert.False(t, isNew)
assert.False(t, shouldEncode)
assert.Equal(t, 0, tm.Count())

dictID, isNew, shouldEncode = tm.ObserveDynamicString("INFO")
assert.NotZero(t, dictID)
assert.True(t, isNew)
assert.True(t, shouldEncode)
assert.Equal(t, 1, tm.Count())

dictIDAgain, isNew, shouldEncode := tm.ObserveDynamicString("INFO")
assert.Equal(t, dictID, dictIDAgain)
assert.False(t, isNew)
assert.True(t, shouldEncode)
assert.Equal(t, 1, tm.Count())
}

func TestTagManager_ObserveDynamicStringUsesExistingEntry(t *testing.T) {
tm := NewTagManager()
existingID, added := tm.AddString("INFO")
require.True(t, added)

dictID, isNew, shouldEncode := tm.ObserveDynamicString("INFO")
assert.Equal(t, existingID, dictID)
assert.False(t, isNew)
assert.True(t, shouldEncode)
assert.Equal(t, 1, tm.Count())
}

func TestTagManager_ObserveDynamicStringSkipsHighCardinalityShapes(t *testing.T) {
tm := NewTagManager()

values := []string{
"a",
"550e8400-e29b-41d4-a716-446655440000",
"550e8400e29b41d4a716446655440000",
"2026-04-28",
"2026-04-28T12:34:56Z",
}

for _, value := range values {
t.Run(value, func(t *testing.T) {
for i := 0; i < 3; i++ {
dictID, isNew, shouldEncode := tm.ObserveDynamicString(value)
assert.Zero(t, dictID)
assert.False(t, isNew)
assert.False(t, shouldEncode)
}
})
}
assert.Equal(t, 0, tm.Count())
}

func TestTagManager_Concurrency(t *testing.T) {
tm := NewTagManager()

Expand Down
Loading
Loading