diff --git a/pkg/logs/sender/grpc/dict_admission.go b/pkg/logs/sender/grpc/dict_admission.go new file mode 100644 index 000000000000..4c183ea822c9 --- /dev/null +++ b/pkg/logs/sender/grpc/dict_admission.go @@ -0,0 +1,83 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +package grpc + +const ( + // defaultDictAdmissionThreshold is the number of times a longer string + // must be seen before it is promoted into the dictionary. + defaultDictAdmissionThreshold uint16 = 3 + + // defaultDictMaxShortLen is the byte-length at or below which a string + // is promoted immediately (heuristic path). Covers log levels (INFO, + // ERROR), HTTP methods (GET, POST), short status words (none, success). + defaultDictMaxShortLen = 8 + + // defaultDictMaxTracked caps the number of candidate strings tracked + // for count-based promotion to bound memory usage. + defaultDictMaxTracked = 4096 +) + +// dictAdmission decides whether a JSON-context string value should be +// promoted into the stream dictionary. Two complementary strategies: +// +// 1. Heuristic – strings ≤ maxShortLen bytes are admitted immediately. +// Short strings are almost always categorical (log levels, HTTP +// methods, enum labels) and pay for their dictionary define on the +// second occurrence. +// +// 2. Frequency – longer strings are counted; once a string has been +// seen threshold times it is promoted. This catches things like +// route names, source identifiers, and URL paths that repeat often +// but are too long for the heuristic. +type dictAdmission struct { + counts map[string]uint16 + threshold uint16 + maxShortLen int + maxTracked int +} + +func newDictAdmission() *dictAdmission { + return &dictAdmission{ + counts: make(map[string]uint16), + threshold: defaultDictAdmissionThreshold, + maxShortLen: defaultDictMaxShortLen, + maxTracked: defaultDictMaxTracked, + } +} + +// shouldAdmit returns true when s should be added to the dictionary. +func (da *dictAdmission) shouldAdmit(s string) bool { + // Heuristic: short strings are almost always categorical. + if len(s) <= da.maxShortLen { + return true + } + + // Frequency: promote after threshold occurrences. + n := da.counts[s] + 1 + if n >= da.threshold { + delete(da.counts, s) // no longer need to track + return true + } + + // Only start tracking if we have capacity. + if len(da.counts) < da.maxTracked { + da.counts[s] = n + } + return false +} + +// reset clears accumulated counts. Called on stream rotation so that +// stale candidates don't persist across streams. +func (da *dictAdmission) reset() { + // Re-use the map when it's small to reduce GC pressure. + if len(da.counts) <= 256 { + for k := range da.counts { + delete(da.counts, k) + } + } else { + da.counts = make(map[string]uint16) + } +} diff --git a/pkg/logs/sender/grpc/mock_state.go b/pkg/logs/sender/grpc/mock_state.go index 0bf78415ae7b..d4185540234c 100644 --- a/pkg/logs/sender/grpc/mock_state.go +++ b/pkg/logs/sender/grpc/mock_state.go @@ -103,6 +103,7 @@ type MessageTranslator struct { jsonLogsAsRaw bool // when true, JSON logs bypass stateful encoding and are sent as RawLog pipelineName string + dictAdmission *dictAdmission lastStaleSweep time.Time // tagCache caches the last computed tag set to avoid recomputation across messages @@ -137,6 +138,7 @@ func NewMessageTranslator(pipelineName string, tokenizer token.Tokenizer) *Messa tokenizer: tokenizer, jsonLogsAsRaw: pkgconfigsetup.Datadog().GetBool("logs_config.patterns.json_as_raw"), pipelineName: pipelineName, + dictAdmission: newDictAdmission(), lastStaleSweep: time.Now(), } tlmPipelineStateSize.Set(0, pipelineName) @@ -434,7 +436,7 @@ func (mt *MessageTranslator) processPreTokenized(msg *message.Message, tokenList jsonContextValuesDV[i] = &jsonContextDVBacking[i] } for i, val := range jsonContextValues { - mt.fillDynamicValue( + newDefine := mt.fillDynamicValue( &jsonContextDVBacking[i], &jsonContextTypeBacking[i].intOneof, &jsonContextTypeBacking[i].floatOneof, @@ -444,6 +446,9 @@ func (mt *MessageTranslator) processPreTokenized(msg *message.Message, tokenList &jsonContextTypeBacking[i].stringOneof, val, ) + if newDefine != nil { + mt.sendDictEntryDefine(outputChan, msg, newDefine.id, newDefine.value) + } } } @@ -791,9 +796,18 @@ func (mt *MessageTranslator) encodeDynamicValue(value string) (*statefulpb.Dynam }, dictID, isNew } +// dictDefine records a new dictionary entry that the caller must send +// as a DICT_ENTRY_DEFINE datum before the referencing log. +type dictDefine struct { + id uint64 + value string +} + // fillDynamicValue fills a pre-allocated DynamicValue in-place for a typed JSON context value. // Primitive JSON types preserve their native type; nested objects/arrays arrive as JSON strings. // String values may use numeric or bool encodings with render_as_string when they can round-trip exactly. +// When a string is promoted to the dictionary for the first time, a non-nil *dictDefine is returned +// so the caller can emit the corresponding DICT_ENTRY_DEFINE. func (mt *MessageTranslator) fillDynamicValue( dv *statefulpb.DynamicValue, oneofInt *statefulpb.DynamicValue_IntValue, @@ -803,78 +817,88 @@ func (mt *MessageTranslator) fillDynamicValue( oneofRawJSON *statefulpb.DynamicValue_RawJsonValue, oneofStr *statefulpb.DynamicValue_StringValue, value interface{}, -) { +) *dictDefine { dv.RenderAsString = false switch typed := value.(type) { case nil: dv.Value = nil - return + return nil case string: if intVal, ok := parseLosslessIntString(typed); ok { oneofInt.IntValue = intVal dv.Value = oneofInt dv.RenderAsString = true - return + return nil } if floatVal, ok := parseLosslessFloatString(typed); ok { oneofFloat.FloatValue = floatVal dv.Value = oneofFloat dv.RenderAsString = true - return + return nil } if boolVal, ok := parseLosslessBoolString(typed); ok { oneofBool.BoolValue = boolVal dv.Value = oneofBool dv.RenderAsString = true - return + return nil } if dictID, ok := mt.tagManager.GetStringID(typed); ok { oneofDict.DictIndex = dictID dv.Value = oneofDict - return + return nil + } + if mt.dictAdmission.shouldAdmit(typed) { + dictID, isNew := mt.tagManager.AddString(typed) + oneofDict.DictIndex = dictID + dv.Value = oneofDict + if isNew { + tlmPipelineDictPromotions.Inc(mt.pipelineName) + return &dictDefine{id: dictID, value: typed} + } + return nil } oneofStr.StringValue = typed dv.Value = oneofStr - return + return nil case json.Number: raw := typed.String() if intVal, ok := parseLosslessIntString(raw); ok { oneofInt.IntValue = intVal dv.Value = oneofInt - return + return nil } if floatVal, ok := parseLosslessFloatString(raw); ok { oneofFloat.FloatValue = floatVal dv.Value = oneofFloat - return + return nil } oneofRawJSON.RawJsonValue = []byte(raw) dv.Value = oneofRawJSON - return + return nil case float64: if !math.IsInf(typed, 0) && !math.IsNaN(typed) && math.Trunc(typed) == typed && typed >= math.MinInt64 && typed <= math.MaxInt64 { oneofInt.IntValue = int64(typed) dv.Value = oneofInt - return + return nil } oneofFloat.FloatValue = typed dv.Value = oneofFloat - return + return nil case bool: oneofBool.BoolValue = typed dv.Value = oneofBool - return + return nil default: rawJSON, err := json.Marshal(typed) if err != nil { log.Warnf("Failed to marshal nested JSON context value: %v", err) oneofStr.StringValue = "" dv.Value = oneofStr - return + return nil } oneofRawJSON.RawJsonValue = rawJSON dv.Value = oneofRawJSON - return + return nil } } diff --git a/pkg/logs/sender/grpc/mock_state_dynamic_value_test.go b/pkg/logs/sender/grpc/mock_state_dynamic_value_test.go index 55e5853200f6..78f60c2dea72 100644 --- a/pkg/logs/sender/grpc/mock_state_dynamic_value_test.go +++ b/pkg/logs/sender/grpc/mock_state_dynamic_value_test.go @@ -117,17 +117,20 @@ func TestFillDynamicValue_PreservesNonCanonicalNumericStrings(t *testing.T) { var rawJSONOneof statefulpb.DynamicValue_RawJsonValue var strOneof statefulpb.DynamicValue_StringValue - mt.fillDynamicValue(&dv, &intOneof, &floatOneof, &boolOneof, &dictOneof, &rawJSONOneof, &strOneof, "00123") - stringValue, ok := dv.Value.(*statefulpb.DynamicValue_StringValue) + // "00123" is ≤ 8 bytes so the admission heuristic promotes it to dictionary immediately. + newDefine := mt.fillDynamicValue(&dv, &intOneof, &floatOneof, &boolOneof, &dictOneof, &rawJSONOneof, &strOneof, "00123") + require.NotNil(t, newDefine, "short non-canonical string should be promoted to dictionary") + assert.Equal(t, "00123", newDefine.value) + dictValue, ok := dv.Value.(*statefulpb.DynamicValue_DictIndex) require.True(t, ok) - assert.Equal(t, "00123", stringValue.StringValue) + assert.Equal(t, newDefine.id, dictValue.DictIndex) assert.False(t, dv.RenderAsString) - dictID, _ := mt.tagManager.AddString("00123") - mt.fillDynamicValue(&dv, &intOneof, &floatOneof, &boolOneof, &dictOneof, &rawJSONOneof, &strOneof, "00123") - dictValue, ok := dv.Value.(*statefulpb.DynamicValue_DictIndex) + // Second call should find it already in the dictionary (no new define). + newDefine = mt.fillDynamicValue(&dv, &intOneof, &floatOneof, &boolOneof, &dictOneof, &rawJSONOneof, &strOneof, "00123") + assert.Nil(t, newDefine) + dictValue, ok = dv.Value.(*statefulpb.DynamicValue_DictIndex) require.True(t, ok) - assert.Equal(t, dictID, dictValue.DictIndex) assert.False(t, dv.RenderAsString) mt.fillDynamicValue(&dv, &intOneof, &floatOneof, &boolOneof, &dictOneof, &rawJSONOneof, &strOneof, "446") diff --git a/pkg/logs/sender/grpc/state_telemetry.go b/pkg/logs/sender/grpc/state_telemetry.go index b24be5ca6a2d..6ba6e4472ac3 100644 --- a/pkg/logs/sender/grpc/state_telemetry.go +++ b/pkg/logs/sender/grpc/state_telemetry.go @@ -29,6 +29,8 @@ var ( tlmPipelinePatternLogsProcessedBytes = telemetry.NewCounter("logs_sender_grpc", "pattern_logs_bytes", []string{"pipeline"}, "Bytes of patterned logs sent") tlmPipelineRawLogsProcessed = telemetry.NewCounter("logs_sender_grpc", "raw_logs", []string{"pipeline"}, "# raw logs sent") tlmPipelineRawLogsProcessedBytes = telemetry.NewCounter("logs_sender_grpc", "raw_logs_bytes", []string{"pipeline"}, "Bytes of raw logs sent") + + tlmPipelineDictPromotions = telemetry.NewCounter("logs_sender_grpc", "dict_promotions", []string{"pipeline"}, "JSON context string values promoted to dictionary") ) // Per-worker metrics