Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 83 additions & 0 deletions pkg/logs/sender/grpc/dict_admission.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

package grpc

const (
// defaultDictAdmissionThreshold is the number of times a longer string
// must be seen before it is promoted into the dictionary.
defaultDictAdmissionThreshold uint16 = 3

// defaultDictMaxShortLen is the byte-length at or below which a string
// is promoted immediately (heuristic path). Covers log levels (INFO,
// ERROR), HTTP methods (GET, POST), short status words (none, success).
defaultDictMaxShortLen = 8

// defaultDictMaxTracked caps the number of candidate strings tracked
// for count-based promotion to bound memory usage.
defaultDictMaxTracked = 4096
)

// dictAdmission decides whether a JSON-context string value should be
// promoted into the stream dictionary. Two complementary strategies:
//
// 1. Heuristic – strings ≤ maxShortLen bytes are admitted immediately.
// Short strings are almost always categorical (log levels, HTTP
// methods, enum labels) and pay for their dictionary define on the
// second occurrence.
//
// 2. Frequency – longer strings are counted; once a string has been
// seen threshold times it is promoted. This catches things like
// route names, source identifiers, and URL paths that repeat often
// but are too long for the heuristic.
type dictAdmission struct {
counts map[string]uint16
threshold uint16
maxShortLen int
maxTracked int
}

func newDictAdmission() *dictAdmission {
return &dictAdmission{
counts: make(map[string]uint16),
threshold: defaultDictAdmissionThreshold,
maxShortLen: defaultDictMaxShortLen,
maxTracked: defaultDictMaxTracked,
}
}

// shouldAdmit returns true when s should be added to the dictionary.
func (da *dictAdmission) shouldAdmit(s string) bool {
// Heuristic: short strings are almost always categorical.
if len(s) <= da.maxShortLen {
return true
}

// Frequency: promote after threshold occurrences.
n := da.counts[s] + 1
if n >= da.threshold {
delete(da.counts, s) // no longer need to track
return true
}

// Only start tracking if we have capacity.
if len(da.counts) < da.maxTracked {
da.counts[s] = n
}
return false
}

// reset clears accumulated counts. Called on stream rotation so that
// stale candidates don't persist across streams.
func (da *dictAdmission) reset() {
// Re-use the map when it's small to reduce GC pressure.
if len(da.counts) <= 256 {
for k := range da.counts {
delete(da.counts, k)
}
} else {
da.counts = make(map[string]uint16)
}
}
56 changes: 40 additions & 16 deletions pkg/logs/sender/grpc/mock_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ type MessageTranslator struct {
jsonLogsAsRaw bool // when true, JSON logs bypass stateful encoding and are sent as RawLog

pipelineName string
dictAdmission *dictAdmission
lastStaleSweep time.Time

// tagCache caches the last computed tag set to avoid recomputation across messages
Expand Down Expand Up @@ -137,6 +138,7 @@ func NewMessageTranslator(pipelineName string, tokenizer token.Tokenizer) *Messa
tokenizer: tokenizer,
jsonLogsAsRaw: pkgconfigsetup.Datadog().GetBool("logs_config.patterns.json_as_raw"),
pipelineName: pipelineName,
dictAdmission: newDictAdmission(),
lastStaleSweep: time.Now(),
}
tlmPipelineStateSize.Set(0, pipelineName)
Expand Down Expand Up @@ -434,7 +436,7 @@ func (mt *MessageTranslator) processPreTokenized(msg *message.Message, tokenList
jsonContextValuesDV[i] = &jsonContextDVBacking[i]
}
for i, val := range jsonContextValues {
mt.fillDynamicValue(
newDefine := mt.fillDynamicValue(
&jsonContextDVBacking[i],
&jsonContextTypeBacking[i].intOneof,
&jsonContextTypeBacking[i].floatOneof,
Expand All @@ -444,6 +446,9 @@ func (mt *MessageTranslator) processPreTokenized(msg *message.Message, tokenList
&jsonContextTypeBacking[i].stringOneof,
val,
)
if newDefine != nil {
mt.sendDictEntryDefine(outputChan, msg, newDefine.id, newDefine.value)
}
}
}

Expand Down Expand Up @@ -791,9 +796,18 @@ func (mt *MessageTranslator) encodeDynamicValue(value string) (*statefulpb.Dynam
}, dictID, isNew
}

// dictDefine records a new dictionary entry that the caller must send
// as a DICT_ENTRY_DEFINE datum before the referencing log.
type dictDefine struct {
id uint64
value string
}

// fillDynamicValue fills a pre-allocated DynamicValue in-place for a typed JSON context value.
// Primitive JSON types preserve their native type; nested objects/arrays arrive as JSON strings.
// String values may use numeric or bool encodings with render_as_string when they can round-trip exactly.
// When a string is promoted to the dictionary for the first time, a non-nil *dictDefine is returned
// so the caller can emit the corresponding DICT_ENTRY_DEFINE.
func (mt *MessageTranslator) fillDynamicValue(
dv *statefulpb.DynamicValue,
oneofInt *statefulpb.DynamicValue_IntValue,
Expand All @@ -803,78 +817,88 @@ func (mt *MessageTranslator) fillDynamicValue(
oneofRawJSON *statefulpb.DynamicValue_RawJsonValue,
oneofStr *statefulpb.DynamicValue_StringValue,
value interface{},
) {
) *dictDefine {
dv.RenderAsString = false
switch typed := value.(type) {
case nil:
dv.Value = nil
return
return nil
case string:
if intVal, ok := parseLosslessIntString(typed); ok {
oneofInt.IntValue = intVal
dv.Value = oneofInt
dv.RenderAsString = true
return
return nil
}
if floatVal, ok := parseLosslessFloatString(typed); ok {
oneofFloat.FloatValue = floatVal
dv.Value = oneofFloat
dv.RenderAsString = true
return
return nil
}
if boolVal, ok := parseLosslessBoolString(typed); ok {
oneofBool.BoolValue = boolVal
dv.Value = oneofBool
dv.RenderAsString = true
return
return nil
}
if dictID, ok := mt.tagManager.GetStringID(typed); ok {
oneofDict.DictIndex = dictID
dv.Value = oneofDict
return
return nil
}
if mt.dictAdmission.shouldAdmit(typed) {
dictID, isNew := mt.tagManager.AddString(typed)
oneofDict.DictIndex = dictID
dv.Value = oneofDict
if isNew {
tlmPipelineDictPromotions.Inc(mt.pipelineName)
return &dictDefine{id: dictID, value: typed}
}
return nil
}
oneofStr.StringValue = typed
dv.Value = oneofStr
return
return nil
case json.Number:
raw := typed.String()
if intVal, ok := parseLosslessIntString(raw); ok {
oneofInt.IntValue = intVal
dv.Value = oneofInt
return
return nil
}
if floatVal, ok := parseLosslessFloatString(raw); ok {
oneofFloat.FloatValue = floatVal
dv.Value = oneofFloat
return
return nil
}
oneofRawJSON.RawJsonValue = []byte(raw)
dv.Value = oneofRawJSON
return
return nil
case float64:
if !math.IsInf(typed, 0) && !math.IsNaN(typed) && math.Trunc(typed) == typed && typed >= math.MinInt64 && typed <= math.MaxInt64 {
oneofInt.IntValue = int64(typed)
dv.Value = oneofInt
return
return nil
}
oneofFloat.FloatValue = typed
dv.Value = oneofFloat
return
return nil
case bool:
oneofBool.BoolValue = typed
dv.Value = oneofBool
return
return nil
default:
rawJSON, err := json.Marshal(typed)
if err != nil {
log.Warnf("Failed to marshal nested JSON context value: %v", err)
oneofStr.StringValue = ""
dv.Value = oneofStr
return
return nil
}
oneofRawJSON.RawJsonValue = rawJSON
dv.Value = oneofRawJSON
return
return nil
}
}

Expand Down
17 changes: 10 additions & 7 deletions pkg/logs/sender/grpc/mock_state_dynamic_value_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,17 +117,20 @@ func TestFillDynamicValue_PreservesNonCanonicalNumericStrings(t *testing.T) {
var rawJSONOneof statefulpb.DynamicValue_RawJsonValue
var strOneof statefulpb.DynamicValue_StringValue

mt.fillDynamicValue(&dv, &intOneof, &floatOneof, &boolOneof, &dictOneof, &rawJSONOneof, &strOneof, "00123")
stringValue, ok := dv.Value.(*statefulpb.DynamicValue_StringValue)
// "00123" is ≤ 8 bytes so the admission heuristic promotes it to dictionary immediately.
newDefine := mt.fillDynamicValue(&dv, &intOneof, &floatOneof, &boolOneof, &dictOneof, &rawJSONOneof, &strOneof, "00123")
require.NotNil(t, newDefine, "short non-canonical string should be promoted to dictionary")
assert.Equal(t, "00123", newDefine.value)
dictValue, ok := dv.Value.(*statefulpb.DynamicValue_DictIndex)
require.True(t, ok)
assert.Equal(t, "00123", stringValue.StringValue)
assert.Equal(t, newDefine.id, dictValue.DictIndex)
assert.False(t, dv.RenderAsString)

dictID, _ := mt.tagManager.AddString("00123")
mt.fillDynamicValue(&dv, &intOneof, &floatOneof, &boolOneof, &dictOneof, &rawJSONOneof, &strOneof, "00123")
dictValue, ok := dv.Value.(*statefulpb.DynamicValue_DictIndex)
// Second call should find it already in the dictionary (no new define).
newDefine = mt.fillDynamicValue(&dv, &intOneof, &floatOneof, &boolOneof, &dictOneof, &rawJSONOneof, &strOneof, "00123")
assert.Nil(t, newDefine)
dictValue, ok = dv.Value.(*statefulpb.DynamicValue_DictIndex)
require.True(t, ok)
assert.Equal(t, dictID, dictValue.DictIndex)
assert.False(t, dv.RenderAsString)

mt.fillDynamicValue(&dv, &intOneof, &floatOneof, &boolOneof, &dictOneof, &rawJSONOneof, &strOneof, "446")
Expand Down
2 changes: 2 additions & 0 deletions pkg/logs/sender/grpc/state_telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ var (
tlmPipelinePatternLogsProcessedBytes = telemetry.NewCounter("logs_sender_grpc", "pattern_logs_bytes", []string{"pipeline"}, "Bytes of patterned logs sent")
tlmPipelineRawLogsProcessed = telemetry.NewCounter("logs_sender_grpc", "raw_logs", []string{"pipeline"}, "# raw logs sent")
tlmPipelineRawLogsProcessedBytes = telemetry.NewCounter("logs_sender_grpc", "raw_logs_bytes", []string{"pipeline"}, "Bytes of raw logs sent")

tlmPipelineDictPromotions = telemetry.NewCounter("logs_sender_grpc", "dict_promotions", []string{"pipeline"}, "JSON context string values promoted to dictionary")
)

// Per-worker metrics
Expand Down
Loading