From ee342aeab06d961873b3682bf8cb4c9b1bd04ab2 Mon Sep 17 00:00:00 2001 From: Jake Saferstein Date: Tue, 5 May 2026 15:30:02 -0700 Subject: [PATCH] json presence --- pkg/logs/sender/grpc/mock_state.go | 103 +++++++++++++++++- .../grpc/mock_state_dynamic_value_test.go | 45 ++++++++ .../datadog/stateful/stateful_encoding.proto | 7 +- .../pbgo/statefulpb/stateful_encoding.pb.go | 21 +++- .../stateful_encoding_vtproto.pb.go | 77 ++++++++++--- 5 files changed, 229 insertions(+), 24 deletions(-) diff --git a/pkg/logs/sender/grpc/mock_state.go b/pkg/logs/sender/grpc/mock_state.go index e614dc16fe8d..c439309cab3a 100644 --- a/pkg/logs/sender/grpc/mock_state.go +++ b/pkg/logs/sender/grpc/mock_state.go @@ -103,6 +103,7 @@ type compactJSONContextValues struct { dicts []uint64 rawValues [][]byte stringValues []string + presence []byte } type dictEntryDefinition struct { @@ -110,6 +111,13 @@ type dictEntryDefinition struct { value string } +type jsonSchemaState struct { + id uint64 + messageKeyID uint64 + keys []string + keyIDs []uint64 +} + type tagCacheEntry struct { origin *message.Origin hostname string @@ -130,6 +138,7 @@ type MessageTranslator struct { tokenizer token.Tokenizer jsonLogsAsRaw bool // when true, JSON logs bypass stateful encoding and are sent as RawLog jsonSchemaToID map[string]uint64 + jsonSchemasByMessage map[string][]*jsonSchemaState nextJSONSchemaID uint64 pipelineName string @@ -183,6 +192,7 @@ func NewMessageTranslator(pipelineName string, tokenizer token.Tokenizer, opts . tokenizer: tokenizer, jsonLogsAsRaw: pkgconfigsetup.Datadog().GetBool("logs_config.patterns.json_as_raw"), jsonSchemaToID: make(map[string]uint64), + jsonSchemasByMessage: make(map[string][]*jsonSchemaState), nextJSONSchemaID: flatLogEmptyDictIndex, pipelineName: pipelineName, lastStaleSweep: time.Now(), @@ -481,10 +491,14 @@ func (mt *MessageTranslator) processPreTokenized(msg *message.Message, tokenList var jsonContextValuesDV []*statefulpb.DynamicValue var compactJSONContext compactJSONContextValues if len(jsonContextKeys) > 0 { - messageKeyDV, jsonContextSchemaID = mt.sendJsonSchemaDefineIfNeeded(outputChan, msg, messageKey, jsonContextKeys) + var schemaKeys []string + var jsonContextPresence []byte + messageKeyDV, jsonContextSchemaID, schemaKeys, jsonContextPresence = mt.sendJsonSchemaDefineIfNeeded(outputChan, msg, messageKey, jsonContextKeys) + jsonContextValues = sparseJSONContextValues(schemaKeys, jsonContextKeys, jsonContextValues) var dictDefs []dictEntryDefinition compactJSONContext, dictDefs = mt.compactJSONContextValues(jsonContextValues) + compactJSONContext.presence = jsonContextPresence for _, dictDef := range dictDefs { mt.sendDictEntryDefine(outputChan, msg, dictDef.id, dictDef.value) } @@ -619,7 +633,7 @@ func (mt *MessageTranslator) buildStatusField(msg *message.Message) (uint64, str return dictID, status, isNew } -func (mt *MessageTranslator) sendJsonSchemaDefineIfNeeded(outputChan chan *message.StatefulMessage, msg *message.Message, messageKey string, keys []string) (*statefulpb.DynamicValue, uint64) { +func (mt *MessageTranslator) sendJsonSchemaDefineIfNeeded(outputChan chan *message.StatefulMessage, msg *message.Message, messageKey string, keys []string) (*statefulpb.DynamicValue, uint64, []string, []byte) { if messageKey == "" { messageKey = "message" } @@ -628,6 +642,14 @@ func (mt *MessageTranslator) sendJsonSchemaDefineIfNeeded(outputChan chan *messa mt.sendDictEntryDefine(outputChan, msg, messageKeyID, messageKey) } + if schema := mt.findJSONSchemaSuperset(messageKey, keys); schema != nil { + return &statefulpb.DynamicValue{ + Value: &statefulpb.DynamicValue_DictIndex{ + DictIndex: messageKeyID, + }, + }, schema.id, schema.keys, buildJSONContextPresence(schema.keys, keys) + } + keyIDs := make([]uint64, 0, len(keys)) schemaKeyBuilder := strings.Builder{} schemaKeyBuilder.WriteString(messageKey) @@ -650,6 +672,12 @@ func (mt *MessageTranslator) sendJsonSchemaDefineIfNeeded(outputChan chan *messa mt.nextJSONSchemaID++ schemaID = mt.nextJSONSchemaID mt.jsonSchemaToID[schemaKey] = schemaID + mt.jsonSchemasByMessage[messageKey] = append(mt.jsonSchemasByMessage[messageKey], &jsonSchemaState{ + id: schemaID, + messageKeyID: messageKeyID, + keys: append([]string(nil), keys...), + keyIDs: append([]uint64(nil), keyIDs...), + }) mt.sendJsonSchemaDefine(outputChan, msg, schemaID, keyIDs, messageKeyID) } @@ -657,7 +685,75 @@ func (mt *MessageTranslator) sendJsonSchemaDefineIfNeeded(outputChan chan *messa Value: &statefulpb.DynamicValue_DictIndex{ DictIndex: messageKeyID, }, - }, schemaID + }, schemaID, keys, nil +} + +func (mt *MessageTranslator) findJSONSchemaSuperset(messageKey string, keys []string) *jsonSchemaState { + var best *jsonSchemaState + for _, schema := range mt.jsonSchemasByMessage[messageKey] { + if !jsonSchemaContainsAllKeys(schema.keys, keys) { + continue + } + if best == nil || len(schema.keys) < len(best.keys) { + best = schema + } + } + return best +} + +func jsonSchemaContainsAllKeys(schemaKeys []string, keys []string) bool { + if len(keys) > len(schemaKeys) { + return false + } + keyIndex := 0 + for _, schemaKey := range schemaKeys { + if keyIndex >= len(keys) { + return true + } + if schemaKey == keys[keyIndex] { + keyIndex++ + } + } + return keyIndex == len(keys) +} + +func buildJSONContextPresence(schemaKeys []string, keys []string) []byte { + if len(schemaKeys) == len(keys) { + return nil + } + presence := make([]byte, (len(schemaKeys)+7)/8) + keyIndex := 0 + for schemaIndex, schemaKey := range schemaKeys { + if keyIndex >= len(keys) { + break + } + if schemaKey != keys[keyIndex] { + continue + } + presence[schemaIndex/8] |= 1 << uint(schemaIndex%8) + keyIndex++ + } + return presence +} + +func sparseJSONContextValues(schemaKeys []string, keys []string, values []interface{}) []interface{} { + if len(schemaKeys) == len(keys) { + return values + } + valuesByKey := make(map[string]interface{}, len(keys)) + for i, key := range keys { + if i >= len(values) { + break + } + valuesByKey[key] = values[i] + } + sparseValues := make([]interface{}, 0, len(keys)) + for _, schemaKey := range schemaKeys { + if value, ok := valuesByKey[schemaKey]; ok { + sparseValues = append(sparseValues, value) + } + } + return sparseValues } // getMessageTimestamp returns the timestamp for the message, preferring the HTTP @@ -1157,6 +1253,7 @@ func buildStructuredLog(timestamp int64, patternID uint64, dynamicValues []*stat JsonContextDictValues: compactJSONContext.dicts, JsonContextRawValues: compactJSONContext.rawValues, JsonContextStringValues: compactJSONContext.stringValues, + JsonContextPresence: compactJSONContext.presence, } if uuid != "" { log.Uuid = &uuid diff --git a/pkg/logs/sender/grpc/mock_state_dynamic_value_test.go b/pkg/logs/sender/grpc/mock_state_dynamic_value_test.go index 7809358c8b83..7a225f5f0cde 100644 --- a/pkg/logs/sender/grpc/mock_state_dynamic_value_test.go +++ b/pkg/logs/sender/grpc/mock_state_dynamic_value_test.go @@ -12,6 +12,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/DataDog/datadog-agent/pkg/logs/message" rtokenizer "github.com/DataDog/datadog-agent/pkg/logs/patterns/tokenizer/rust" "github.com/DataDog/datadog-agent/pkg/proto/pbgo/statefulpb" ) @@ -314,6 +315,50 @@ func TestFillDynamicValue_JSONNumberUsesFloatWhenLossless(t *testing.T) { assert.False(t, dv.RenderAsString) } +func TestJSONSchemaPresenceHelpers(t *testing.T) { + schemaKeys := []string{"a", "b", "c", "d", "e", "f", "g", "h", "i"} + keys := []string{"b", "d", "i"} + + assert.True(t, jsonSchemaContainsAllKeys(schemaKeys, keys)) + assert.False(t, jsonSchemaContainsAllKeys(schemaKeys, []string{"b", "x"})) + assert.Equal(t, []byte{0b00001010, 0b00000001}, buildJSONContextPresence(schemaKeys, keys)) + + values := sparseJSONContextValues(schemaKeys, keys, []interface{}{"bee", "dee", "eye"}) + require.Len(t, values, 3) + assert.Equal(t, []interface{}{"bee", "dee", "eye"}, values) +} + +func TestSendJsonSchemaDefineIfNeeded_ReusesSupersetWithPresence(t *testing.T) { + mt := NewMessageTranslator("test-pipeline", rtokenizer.NewRustTokenizer()) + outputChan := make(chan *message.StatefulMessage, 10) + msg := message.NewMessage([]byte("request done"), nil, "", 0) + + _, schemaID, schemaKeys, presence := mt.sendJsonSchemaDefineIfNeeded(outputChan, msg, "message", []string{"a", "b", "c"}) + require.NotZero(t, schemaID) + assert.Equal(t, []string{"a", "b", "c"}, schemaKeys) + assert.Nil(t, presence) + + _, reusedSchemaID, reusedSchemaKeys, reusedPresence := mt.sendJsonSchemaDefineIfNeeded(outputChan, msg, "message", []string{"a", "c"}) + assert.Equal(t, schemaID, reusedSchemaID) + assert.Equal(t, []string{"a", "b", "c"}, reusedSchemaKeys) + assert.Equal(t, []byte{0b00000101}, reusedPresence) +} + +func TestBuildStructuredLogIncludesJSONContextPresence(t *testing.T) { + compact := compactJSONContextValues{ + kinds: []byte{jsonValueKindInt}, + ints: []int64{42}, + presence: []byte{0b00000100}, + } + + datum := buildStructuredLog(123, 12, nil, nil, "", nil, 0, nil, 5, nil, compact) + flatLog := datum.GetFlatLog() + require.NotNil(t, flatLog) + assert.Equal(t, []byte{0b00000100}, flatLog.JsonContextPresence) + assert.Equal(t, []byte{jsonValueKindInt}, flatLog.JsonContextValueKinds) + assert.Equal(t, []int64{42}, flatLog.JsonContextIntValues) +} + func ptrInt64(v int64) *int64 { return &v } func ptrFloat64(v float64) *float64 { return &v } func ptrBool(v bool) *bool { return &v } diff --git a/pkg/proto/datadog/stateful/stateful_encoding.proto b/pkg/proto/datadog/stateful/stateful_encoding.proto index 21d327ffe198..a183a66035a9 100644 --- a/pkg/proto/datadog/stateful/stateful_encoding.proto +++ b/pkg/proto/datadog/stateful/stateful_encoding.proto @@ -73,15 +73,18 @@ message FlatLog { uint64 json_schema_id = 8; // Deprecated fallback values for the json schema. repeated DynamicValue json_context_values = 9; - // Compact values for the json schema. json_context_value_kinds has one byte per schema key. + // Compact values for the json schema. json_context_value_kinds has one byte per present schema key. // Each kind consumes the next value from the corresponding packed value field. // Kind values are defined by the JsonValueKind constants in the encoder/decoder. bytes json_context_value_kinds = 10; - repeated int64 json_context_int_values = 11; + repeated sint64 json_context_int_values = 11; repeated double json_context_float_values = 12; repeated uint64 json_context_dict_values = 13; repeated bytes json_context_raw_values = 14; repeated string json_context_string_values = 15; + // Optional bitmap for sparse schemas. If unset, every schema key is present. + // Bit i indicates whether schema key i has a value in the compact value streams. + bytes json_context_presence = 16; // Optional UUID used to track logs when dual-sent via HTTP and gRPC. optional string uuid = 100; diff --git a/pkg/proto/pbgo/statefulpb/stateful_encoding.pb.go b/pkg/proto/pbgo/statefulpb/stateful_encoding.pb.go index a54a13e3cbe1..c7612d8629bb 100644 --- a/pkg/proto/pbgo/statefulpb/stateful_encoding.pb.go +++ b/pkg/proto/pbgo/statefulpb/stateful_encoding.pb.go @@ -398,15 +398,18 @@ type FlatLog struct { JsonSchemaId uint64 `protobuf:"varint,8,opt,name=json_schema_id,json=jsonSchemaId,proto3" json:"json_schema_id,omitempty"` // Deprecated fallback values for the json schema. JsonContextValues []*DynamicValue `protobuf:"bytes,9,rep,name=json_context_values,json=jsonContextValues,proto3" json:"json_context_values,omitempty"` - // Compact values for the json schema. json_context_value_kinds has one byte per schema key. + // Compact values for the json schema. json_context_value_kinds has one byte per present schema key. // Each kind consumes the next value from the corresponding packed value field. // Kind values are defined by the JsonValueKind constants in the encoder/decoder. JsonContextValueKinds []byte `protobuf:"bytes,10,opt,name=json_context_value_kinds,json=jsonContextValueKinds,proto3" json:"json_context_value_kinds,omitempty"` - JsonContextIntValues []int64 `protobuf:"varint,11,rep,packed,name=json_context_int_values,json=jsonContextIntValues,proto3" json:"json_context_int_values,omitempty"` + JsonContextIntValues []int64 `protobuf:"zigzag64,11,rep,packed,name=json_context_int_values,json=jsonContextIntValues,proto3" json:"json_context_int_values,omitempty"` JsonContextFloatValues []float64 `protobuf:"fixed64,12,rep,packed,name=json_context_float_values,json=jsonContextFloatValues,proto3" json:"json_context_float_values,omitempty"` JsonContextDictValues []uint64 `protobuf:"varint,13,rep,packed,name=json_context_dict_values,json=jsonContextDictValues,proto3" json:"json_context_dict_values,omitempty"` JsonContextRawValues [][]byte `protobuf:"bytes,14,rep,name=json_context_raw_values,json=jsonContextRawValues,proto3" json:"json_context_raw_values,omitempty"` JsonContextStringValues []string `protobuf:"bytes,15,rep,name=json_context_string_values,json=jsonContextStringValues,proto3" json:"json_context_string_values,omitempty"` + // Optional bitmap for sparse schemas. If unset, every schema key is present. + // Bit i indicates whether schema key i has a value in the compact value streams. + JsonContextPresence []byte `protobuf:"bytes,16,opt,name=json_context_presence,json=jsonContextPresence,proto3" json:"json_context_presence,omitempty"` // Optional UUID used to track logs when dual-sent via HTTP and gRPC. Uuid *string `protobuf:"bytes,100,opt,name=uuid,proto3,oneof" json:"uuid,omitempty"` unknownFields protoimpl.UnknownFields @@ -548,6 +551,13 @@ func (x *FlatLog) GetJsonContextStringValues() []string { return nil } +func (x *FlatLog) GetJsonContextPresence() []byte { + if x != nil { + return x.JsonContextPresence + } + return nil +} + func (x *FlatLog) GetUuid() string { if x != nil && x.Uuid != nil { return *x.Uuid @@ -1506,7 +1516,7 @@ const file_datadog_stateful_stateful_encoding_proto_rawDesc = "" + "\x06tagset\x18\x01 \x01(\v2%.datadog.intake.stateful.DynamicValueR\x06tagset\"{\n" + "\x03Tag\x127\n" + "\x03key\x18\x01 \x01(\v2%.datadog.intake.stateful.DynamicValueR\x03key\x12;\n" + - "\x05value\x18\x02 \x01(\v2%.datadog.intake.stateful.DynamicValueR\x05value\"\xea\x05\n" + + "\x05value\x18\x02 \x01(\v2%.datadog.intake.stateful.DynamicValueR\x05value\"\x9e\x06\n" + "\aFlatLog\x12\x1c\n" + "\ttimestamp\x18\x01 \x01(\x12R\ttimestamp\x12\x16\n" + "\x06status\x18\x02 \x01(\x04R\x06status\x12\x18\n" + @@ -1520,11 +1530,12 @@ const file_datadog_stateful_stateful_encoding_proto_rawDesc = "" + "\x13json_context_values\x18\t \x03(\v2%.datadog.intake.stateful.DynamicValueR\x11jsonContextValues\x127\n" + "\x18json_context_value_kinds\x18\n" + " \x01(\fR\x15jsonContextValueKinds\x125\n" + - "\x17json_context_int_values\x18\v \x03(\x03R\x14jsonContextIntValues\x129\n" + + "\x17json_context_int_values\x18\v \x03(\x12R\x14jsonContextIntValues\x129\n" + "\x19json_context_float_values\x18\f \x03(\x01R\x16jsonContextFloatValues\x127\n" + "\x18json_context_dict_values\x18\r \x03(\x04R\x15jsonContextDictValues\x125\n" + "\x17json_context_raw_values\x18\x0e \x03(\fR\x14jsonContextRawValues\x12;\n" + - "\x1ajson_context_string_values\x18\x0f \x03(\tR\x17jsonContextStringValues\x12\x17\n" + + "\x1ajson_context_string_values\x18\x0f \x03(\tR\x17jsonContextStringValues\x122\n" + + "\x15json_context_presence\x18\x10 \x01(\fR\x13jsonContextPresence\x12\x17\n" + "\x04uuid\x18d \x01(\tH\x00R\x04uuid\x88\x01\x01B\a\n" + "\x05_uuid\"\xe3\x02\n" + "\x03Log\x12\x1c\n" + diff --git a/pkg/proto/pbgo/statefulpb/stateful_encoding_vtproto.pb.go b/pkg/proto/pbgo/statefulpb/stateful_encoding_vtproto.pb.go index 320a2935a24c..28b6abab485f 100644 --- a/pkg/proto/pbgo/statefulpb/stateful_encoding_vtproto.pb.go +++ b/pkg/proto/pbgo/statefulpb/stateful_encoding_vtproto.pb.go @@ -346,6 +346,15 @@ func (m *FlatLog) MarshalToSizedBufferVT(dAtA []byte) (int, error) { i-- dAtA[i] = 0xa2 } + if len(m.JsonContextPresence) > 0 { + i -= len(m.JsonContextPresence) + copy(dAtA[i:], m.JsonContextPresence) + i = protohelpers.EncodeVarint(dAtA, i, uint64(len(m.JsonContextPresence))) + i-- + dAtA[i] = 0x1 + i-- + dAtA[i] = 0x82 + } if len(m.JsonContextStringValues) > 0 { for iNdEx := len(m.JsonContextStringValues) - 1; iNdEx >= 0; iNdEx-- { i -= len(m.JsonContextStringValues[iNdEx]) @@ -397,18 +406,18 @@ func (m *FlatLog) MarshalToSizedBufferVT(dAtA []byte) (int, error) { if len(m.JsonContextIntValues) > 0 { var pksize5 int for _, num := range m.JsonContextIntValues { - pksize5 += protohelpers.SizeOfVarint(uint64(num)) + pksize5 += protohelpers.SizeOfZigzag(uint64(num)) } i -= pksize5 j4 := i - for _, num1 := range m.JsonContextIntValues { - num := uint64(num1) - for num >= 1<<7 { - dAtA[j4] = uint8(uint64(num)&0x7f | 0x80) - num >>= 7 + for _, num := range m.JsonContextIntValues { + x6 := (uint64(num) << 1) ^ uint64((num >> 63)) + for x6 >= 1<<7 { + dAtA[j4] = uint8(uint64(x6)&0x7f | 0x80) j4++ + x6 >>= 7 } - dAtA[j4] = uint8(num) + dAtA[j4] = uint8(x6) j4++ } i = protohelpers.EncodeVarint(dAtA, i, uint64(pksize5)) @@ -1530,7 +1539,7 @@ func (m *FlatLog) SizeVT() (n int) { if len(m.JsonContextIntValues) > 0 { l = 0 for _, e := range m.JsonContextIntValues { - l += protohelpers.SizeOfVarint(uint64(e)) + l += protohelpers.SizeOfZigzag(uint64(e)) } n += 1 + protohelpers.SizeOfVarint(uint64(l)) + l } @@ -1556,6 +1565,10 @@ func (m *FlatLog) SizeVT() (n int) { n += 1 + l + protohelpers.SizeOfVarint(uint64(l)) } } + l = len(m.JsonContextPresence) + if l > 0 { + n += 2 + l + protohelpers.SizeOfVarint(uint64(l)) + } if m.Uuid != nil { l = len(*m.Uuid) n += 2 + l + protohelpers.SizeOfVarint(uint64(l)) @@ -2914,7 +2927,7 @@ func (m *FlatLog) UnmarshalVT(dAtA []byte) error { iNdEx = postIndex case 11: if wireType == 0 { - var v int64 + var v uint64 for shift := uint(0); ; shift += 7 { if shift >= 64 { return protohelpers.ErrIntOverflow @@ -2924,12 +2937,13 @@ func (m *FlatLog) UnmarshalVT(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - v |= int64(b&0x7F) << shift + v |= uint64(b&0x7F) << shift if b < 0x80 { break } } - m.JsonContextIntValues = append(m.JsonContextIntValues, v) + v = (v >> 1) ^ uint64((int64(v&1)<<63)>>63) + m.JsonContextIntValues = append(m.JsonContextIntValues, int64(v)) } else if wireType == 2 { var packedLen int for shift := uint(0); ; shift += 7 { @@ -2968,7 +2982,7 @@ func (m *FlatLog) UnmarshalVT(dAtA []byte) error { m.JsonContextIntValues = make([]int64, 0, elementCount) } for iNdEx < postIndex { - var v int64 + var v uint64 for shift := uint(0); ; shift += 7 { if shift >= 64 { return protohelpers.ErrIntOverflow @@ -2978,12 +2992,13 @@ func (m *FlatLog) UnmarshalVT(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - v |= int64(b&0x7F) << shift + v |= uint64(b&0x7F) << shift if b < 0x80 { break } } - m.JsonContextIntValues = append(m.JsonContextIntValues, v) + v = (v >> 1) ^ uint64((int64(v&1)<<63)>>63) + m.JsonContextIntValues = append(m.JsonContextIntValues, int64(v)) } } else { return fmt.Errorf("proto: wrong wireType = %d for field JsonContextIntValues", wireType) @@ -3182,6 +3197,40 @@ func (m *FlatLog) UnmarshalVT(dAtA []byte) error { } m.JsonContextStringValues = append(m.JsonContextStringValues, string(dAtA[iNdEx:postIndex])) iNdEx = postIndex + case 16: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field JsonContextPresence", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return protohelpers.ErrInvalidLength + } + postIndex := iNdEx + byteLen + if postIndex < 0 { + return protohelpers.ErrInvalidLength + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.JsonContextPresence = append(m.JsonContextPresence[:0], dAtA[iNdEx:postIndex]...) + if m.JsonContextPresence == nil { + m.JsonContextPresence = []byte{} + } + iNdEx = postIndex case 100: if wireType != 2 { return fmt.Errorf("proto: wrong wireType = %d for field Uuid", wireType)