Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 100 additions & 3 deletions pkg/logs/sender/grpc/mock_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,21 @@ type compactJSONContextValues struct {
dicts []uint64
rawValues [][]byte
stringValues []string
presence []byte
}

type dictEntryDefinition struct {
id uint64
value string
}

type jsonSchemaState struct {
id uint64
messageKeyID uint64
keys []string
keyIDs []uint64
}

type tagCacheEntry struct {
origin *message.Origin
hostname string
Expand All @@ -130,6 +138,7 @@ type MessageTranslator struct {
tokenizer token.Tokenizer
jsonLogsAsRaw bool // when true, JSON logs bypass stateful encoding and are sent as RawLog
jsonSchemaToID map[string]uint64
jsonSchemasByMessage map[string][]*jsonSchemaState
nextJSONSchemaID uint64

pipelineName string
Expand Down Expand Up @@ -183,6 +192,7 @@ func NewMessageTranslator(pipelineName string, tokenizer token.Tokenizer, opts .
tokenizer: tokenizer,
jsonLogsAsRaw: pkgconfigsetup.Datadog().GetBool("logs_config.patterns.json_as_raw"),
jsonSchemaToID: make(map[string]uint64),
jsonSchemasByMessage: make(map[string][]*jsonSchemaState),
nextJSONSchemaID: flatLogEmptyDictIndex,
pipelineName: pipelineName,
lastStaleSweep: time.Now(),
Expand Down Expand Up @@ -481,10 +491,14 @@ func (mt *MessageTranslator) processPreTokenized(msg *message.Message, tokenList
var jsonContextValuesDV []*statefulpb.DynamicValue
var compactJSONContext compactJSONContextValues
if len(jsonContextKeys) > 0 {
messageKeyDV, jsonContextSchemaID = mt.sendJsonSchemaDefineIfNeeded(outputChan, msg, messageKey, jsonContextKeys)
var schemaKeys []string
var jsonContextPresence []byte
messageKeyDV, jsonContextSchemaID, schemaKeys, jsonContextPresence = mt.sendJsonSchemaDefineIfNeeded(outputChan, msg, messageKey, jsonContextKeys)
jsonContextValues = sparseJSONContextValues(schemaKeys, jsonContextKeys, jsonContextValues)

var dictDefs []dictEntryDefinition
compactJSONContext, dictDefs = mt.compactJSONContextValues(jsonContextValues)
compactJSONContext.presence = jsonContextPresence
for _, dictDef := range dictDefs {
mt.sendDictEntryDefine(outputChan, msg, dictDef.id, dictDef.value)
}
Expand Down Expand Up @@ -619,7 +633,7 @@ func (mt *MessageTranslator) buildStatusField(msg *message.Message) (uint64, str
return dictID, status, isNew
}

func (mt *MessageTranslator) sendJsonSchemaDefineIfNeeded(outputChan chan *message.StatefulMessage, msg *message.Message, messageKey string, keys []string) (*statefulpb.DynamicValue, uint64) {
func (mt *MessageTranslator) sendJsonSchemaDefineIfNeeded(outputChan chan *message.StatefulMessage, msg *message.Message, messageKey string, keys []string) (*statefulpb.DynamicValue, uint64, []string, []byte) {
if messageKey == "" {
messageKey = "message"
}
Expand All @@ -628,6 +642,14 @@ func (mt *MessageTranslator) sendJsonSchemaDefineIfNeeded(outputChan chan *messa
mt.sendDictEntryDefine(outputChan, msg, messageKeyID, messageKey)
}

if schema := mt.findJSONSchemaSuperset(messageKey, keys); schema != nil {
return &statefulpb.DynamicValue{
Value: &statefulpb.DynamicValue_DictIndex{
DictIndex: messageKeyID,
},
}, schema.id, schema.keys, buildJSONContextPresence(schema.keys, keys)
}

keyIDs := make([]uint64, 0, len(keys))
schemaKeyBuilder := strings.Builder{}
schemaKeyBuilder.WriteString(messageKey)
Expand All @@ -650,14 +672,88 @@ func (mt *MessageTranslator) sendJsonSchemaDefineIfNeeded(outputChan chan *messa
mt.nextJSONSchemaID++
schemaID = mt.nextJSONSchemaID
mt.jsonSchemaToID[schemaKey] = schemaID
mt.jsonSchemasByMessage[messageKey] = append(mt.jsonSchemasByMessage[messageKey], &jsonSchemaState{
id: schemaID,
messageKeyID: messageKeyID,
keys: append([]string(nil), keys...),
keyIDs: append([]uint64(nil), keyIDs...),
})
mt.sendJsonSchemaDefine(outputChan, msg, schemaID, keyIDs, messageKeyID)
}

return &statefulpb.DynamicValue{
Value: &statefulpb.DynamicValue_DictIndex{
DictIndex: messageKeyID,
},
}, schemaID
}, schemaID, keys, nil
}

func (mt *MessageTranslator) findJSONSchemaSuperset(messageKey string, keys []string) *jsonSchemaState {
var best *jsonSchemaState
for _, schema := range mt.jsonSchemasByMessage[messageKey] {
if !jsonSchemaContainsAllKeys(schema.keys, keys) {
continue
}
if best == nil || len(schema.keys) < len(best.keys) {
best = schema
}
}
return best
}

func jsonSchemaContainsAllKeys(schemaKeys []string, keys []string) bool {
if len(keys) > len(schemaKeys) {
return false
}
keyIndex := 0
for _, schemaKey := range schemaKeys {
if keyIndex >= len(keys) {
return true
}
if schemaKey == keys[keyIndex] {
keyIndex++
}
}
return keyIndex == len(keys)
}

func buildJSONContextPresence(schemaKeys []string, keys []string) []byte {
if len(schemaKeys) == len(keys) {
return nil
}
presence := make([]byte, (len(schemaKeys)+7)/8)
keyIndex := 0
for schemaIndex, schemaKey := range schemaKeys {
if keyIndex >= len(keys) {
break
}
if schemaKey != keys[keyIndex] {
continue
}
presence[schemaIndex/8] |= 1 << uint(schemaIndex%8)
keyIndex++
}
return presence
}

func sparseJSONContextValues(schemaKeys []string, keys []string, values []interface{}) []interface{} {
if len(schemaKeys) == len(keys) {
return values
}
valuesByKey := make(map[string]interface{}, len(keys))
for i, key := range keys {
if i >= len(values) {
break
}
valuesByKey[key] = values[i]
}
sparseValues := make([]interface{}, 0, len(keys))
for _, schemaKey := range schemaKeys {
if value, ok := valuesByKey[schemaKey]; ok {
sparseValues = append(sparseValues, value)
}
}
return sparseValues
}

// getMessageTimestamp returns the timestamp for the message, preferring the HTTP
Expand Down Expand Up @@ -1157,6 +1253,7 @@ func buildStructuredLog(timestamp int64, patternID uint64, dynamicValues []*stat
JsonContextDictValues: compactJSONContext.dicts,
JsonContextRawValues: compactJSONContext.rawValues,
JsonContextStringValues: compactJSONContext.stringValues,
JsonContextPresence: compactJSONContext.presence,
}
if uuid != "" {
log.Uuid = &uuid
Expand Down
45 changes: 45 additions & 0 deletions pkg/logs/sender/grpc/mock_state_dynamic_value_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/DataDog/datadog-agent/pkg/logs/message"
rtokenizer "github.com/DataDog/datadog-agent/pkg/logs/patterns/tokenizer/rust"
"github.com/DataDog/datadog-agent/pkg/proto/pbgo/statefulpb"
)
Expand Down Expand Up @@ -314,6 +315,50 @@ func TestFillDynamicValue_JSONNumberUsesFloatWhenLossless(t *testing.T) {
assert.False(t, dv.RenderAsString)
}

func TestJSONSchemaPresenceHelpers(t *testing.T) {
schemaKeys := []string{"a", "b", "c", "d", "e", "f", "g", "h", "i"}
keys := []string{"b", "d", "i"}

assert.True(t, jsonSchemaContainsAllKeys(schemaKeys, keys))
assert.False(t, jsonSchemaContainsAllKeys(schemaKeys, []string{"b", "x"}))
assert.Equal(t, []byte{0b00001010, 0b00000001}, buildJSONContextPresence(schemaKeys, keys))

values := sparseJSONContextValues(schemaKeys, keys, []interface{}{"bee", "dee", "eye"})
require.Len(t, values, 3)
assert.Equal(t, []interface{}{"bee", "dee", "eye"}, values)
}

func TestSendJsonSchemaDefineIfNeeded_ReusesSupersetWithPresence(t *testing.T) {
mt := NewMessageTranslator("test-pipeline", rtokenizer.NewRustTokenizer())
outputChan := make(chan *message.StatefulMessage, 10)
msg := message.NewMessage([]byte("request done"), nil, "", 0)

_, schemaID, schemaKeys, presence := mt.sendJsonSchemaDefineIfNeeded(outputChan, msg, "message", []string{"a", "b", "c"})
require.NotZero(t, schemaID)
assert.Equal(t, []string{"a", "b", "c"}, schemaKeys)
assert.Nil(t, presence)

_, reusedSchemaID, reusedSchemaKeys, reusedPresence := mt.sendJsonSchemaDefineIfNeeded(outputChan, msg, "message", []string{"a", "c"})
assert.Equal(t, schemaID, reusedSchemaID)
assert.Equal(t, []string{"a", "b", "c"}, reusedSchemaKeys)
assert.Equal(t, []byte{0b00000101}, reusedPresence)
}

func TestBuildStructuredLogIncludesJSONContextPresence(t *testing.T) {
compact := compactJSONContextValues{
kinds: []byte{jsonValueKindInt},
ints: []int64{42},
presence: []byte{0b00000100},
}

datum := buildStructuredLog(123, 12, nil, nil, "", nil, 0, nil, 5, nil, compact)
flatLog := datum.GetFlatLog()
require.NotNil(t, flatLog)
assert.Equal(t, []byte{0b00000100}, flatLog.JsonContextPresence)
assert.Equal(t, []byte{jsonValueKindInt}, flatLog.JsonContextValueKinds)
assert.Equal(t, []int64{42}, flatLog.JsonContextIntValues)
}

func ptrInt64(v int64) *int64 { return &v }
func ptrFloat64(v float64) *float64 { return &v }
func ptrBool(v bool) *bool { return &v }
7 changes: 5 additions & 2 deletions pkg/proto/datadog/stateful/stateful_encoding.proto
Original file line number Diff line number Diff line change
Expand Up @@ -73,15 +73,18 @@ message FlatLog {
uint64 json_schema_id = 8;
// Deprecated fallback values for the json schema.
repeated DynamicValue json_context_values = 9;
// Compact values for the json schema. json_context_value_kinds has one byte per schema key.
// Compact values for the json schema. json_context_value_kinds has one byte per present schema key.
// Each kind consumes the next value from the corresponding packed value field.
// Kind values are defined by the JsonValueKind constants in the encoder/decoder.
bytes json_context_value_kinds = 10;
repeated int64 json_context_int_values = 11;
repeated sint64 json_context_int_values = 11;
repeated double json_context_float_values = 12;
repeated uint64 json_context_dict_values = 13;
repeated bytes json_context_raw_values = 14;
repeated string json_context_string_values = 15;
// Optional bitmap for sparse schemas. If unset, every schema key is present.
// Bit i indicates whether schema key i has a value in the compact value streams.
bytes json_context_presence = 16;

// Optional UUID used to track logs when dual-sent via HTTP and gRPC.
optional string uuid = 100;
Expand Down
21 changes: 16 additions & 5 deletions pkg/proto/pbgo/statefulpb/stateful_encoding.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading