diff --git a/pkg/logs/patterns/processor/json.go b/pkg/logs/patterns/processor/json.go index 9699f9620229..7cc89f975260 100644 --- a/pkg/logs/patterns/processor/json.go +++ b/pkg/logs/patterns/processor/json.go @@ -26,16 +26,35 @@ type ExtractionResult struct { Message string // MessageKey is the JSON key the message was extracted from (e.g. "msg", "message") MessageKey string - // JSONContextSchema is a comma-separated sorted list of top-level keys for the JSON context. - // When populated, JSONContextValues contains the corresponding values in the same order. - // Example: for {"level":"info","pid":1234,"service":"api"}, schema is "level,pid,service". + // JSONContextSchema is a comma-separated sorted list of escaped JSON paths for the JSON context. JSONContextSchema string - // JSONContextValues contains the leaf values corresponding to JSONContextSchema keys, in order. - // Primitive values preserve their JSON type. Nested objects/arrays are preserved as decoded - // map/slice values so the transport layer can encode them as raw JSON. + // JSONContextKeys contains the escaped JSON paths corresponding to JSONContextValues, in order. + JSONContextKeys []string + // JSONContextValues contains the values corresponding to JSONContextKeys, in order. + // Primitive values preserve their JSON type. Boxed nested objects/arrays are preserved as + // decoded map/slice values so the transport layer can encode them as raw JSON. JSONContextValues []interface{} } +const ( + jsonContextMaxFlattenDepth = 2 + jsonContextMaxObjectKeys = 16 +) + +var boxedSubtreeNames = map[string]struct{}{ + "attributes": {}, + "attrs": {}, + "headers": {}, + "labels": {}, + "metadata": {}, + "tags": {}, +} + +type jsonContextItem struct { + key string + value interface{} +} + // Common top-level message field names (Layer 0) // These cover the vast majority of structured logs from popular logging libraries var topLevelMessageKeys = []string{ @@ -93,34 +112,119 @@ func PreprocessJSON(content []byte) ExtractionResult { } } - // Schema-based encoding: extract sorted keys and leaf values. - schema, values := extractSchemaAndValues(data) + // Schema-based encoding: extract sorted JSON paths and values. + keys, values := extractSchemaAndValues(data) return ExtractionResult{ IsJSON: true, Message: message, MessageKey: extractedPath, - JSONContextSchema: schema, + JSONContextSchema: schemaString(keys), + JSONContextKeys: keys, JSONContextValues: values, } } -// extractSchemaAndValues extracts sorted keys and their corresponding values from a JSON map. -// Primitive values preserve their decoded JSON type. Nested objects and arrays are kept as decoded -// map/slice values so later encoding can preserve them as raw JSON. -func extractSchemaAndValues(data map[string]interface{}) (string, []interface{}) { +// extractSchemaAndValues extracts sorted JSON paths and their corresponding values from a JSON map. +// Stable shallow primitives are flattened into leaf paths. Deep, wide, map-like, or array values +// are boxed as subtree values to avoid one-off schemas for arbitrary nested JSON. +func extractSchemaAndValues(data map[string]interface{}) ([]string, []interface{}) { keys := make([]string, 0, len(data)) for k := range data { keys = append(keys, k) } sort.Strings(keys) - values := make([]interface{}, len(keys)) - for i, k := range keys { - values[i] = normalizeJSONValue(data[k]) + items := make([]jsonContextItem, 0, len(keys)) + for _, k := range keys { + appendJSONContextItems(&items, escapeJSONPathSegment(k), data[k], 1) + } + + contextKeys := make([]string, len(items)) + values := make([]interface{}, len(items)) + for i, item := range items { + contextKeys[i] = item.key + values[i] = item.value + } + + return contextKeys, values +} + +func appendJSONContextItems(items *[]jsonContextItem, path string, value interface{}, depth int) { + switch typed := value.(type) { + case map[string]interface{}: + if shouldBoxJSONObject(path, len(typed), depth) { + appendJSONContextSubtree(items, path, typed) + return + } + keys := make([]string, 0, len(typed)) + for k := range typed { + keys = append(keys, k) + } + sort.Strings(keys) + for _, key := range keys { + appendJSONContextItems(items, path+"."+escapeJSONPathSegment(key), typed[key], depth+1) + } + case []interface{}: + appendJSONContextSubtree(items, path, typed) + default: + *items = append(*items, jsonContextItem{ + key: path, + value: normalizeJSONValue(typed), + }) } +} + +func appendJSONContextSubtree(items *[]jsonContextItem, path string, value interface{}) { + *items = append(*items, jsonContextItem{ + key: path, + value: value, + }) +} + +func shouldBoxJSONObject(path string, keyCount int, depth int) bool { + if keyCount == 0 || keyCount > jsonContextMaxObjectKeys || depth >= jsonContextMaxFlattenDepth { + return true + } + _, ok := boxedSubtreeNames[lastPathSegment(path)] + return ok +} + +func lastPathSegment(path string) string { + escaped := false + for index := len(path) - 1; index >= 0; index-- { + switch path[index] { + case '\\': + escaped = !escaped + case '.': + if !escaped { + return path[index+1:] + } + escaped = false + default: + escaped = false + } + } + return path +} + +func escapeJSONPathSegment(segment string) string { + if !strings.ContainsAny(segment, `.\`) { + return segment + } + var builder strings.Builder + builder.Grow(len(segment) + 1) + for _, char := range segment { + if char == '.' || char == '\\' { + builder.WriteByte('\\') + } + builder.WriteRune(char) + } + return builder.String() +} - return strings.Join(keys, ","), values +func schemaString(keys []string) string { + return strings.Join(keys, ",") } // normalizeJSONValue preserves primitive JSON types and keeps nested objects/arrays intact. diff --git a/pkg/logs/patterns/processor/json_test.go b/pkg/logs/patterns/processor/json_test.go index 4107009a09b1..7bec41b165ce 100644 --- a/pkg/logs/patterns/processor/json_test.go +++ b/pkg/logs/patterns/processor/json_test.go @@ -151,19 +151,19 @@ func TestPreprocessJSON_GenericNestedPaths(t *testing.T) { name: "data.message", input: `{"data":{"message":"Nested message","id":123}}`, expectedMsg: "Nested message", - expectedSchema: "data", + expectedSchema: "data.id", }, { name: "event.message", input: `{"event":{"message":"Event occurred","type":"alert"}}`, expectedMsg: "Event occurred", - expectedSchema: "event", + expectedSchema: "event.type", }, { name: "payload.message", input: `{"payload":{"message":"Payload data","size":1024}}`, expectedMsg: "Payload data", - expectedSchema: "payload", + expectedSchema: "payload.size", }, } @@ -231,10 +231,40 @@ func TestPreprocessJSON_NestedObjectsAsValues(t *testing.T) { result := PreprocessJSON([]byte(input)) require.True(t, result.IsJSON) - assert.Equal(t, "array,nested", result.JSONContextSchema) - require.Len(t, result.JSONContextValues, 2) + assert.Equal(t, "array,nested.apple,nested.zebra", result.JSONContextSchema) + require.Len(t, result.JSONContextValues, 3) assert.Equal(t, []interface{}{json.Number("1"), json.Number("2"), json.Number("3")}, result.JSONContextValues[0]) - assert.Equal(t, map[string]interface{}{"apple": "a", "zebra": "z"}, result.JSONContextValues[1]) + assert.Equal(t, "a", result.JSONContextValues[1]) + assert.Equal(t, "z", result.JSONContextValues[2]) + assert.Equal(t, []string{"array", "nested.apple", "nested.zebra"}, result.JSONContextKeys) +} + +func TestPreprocessJSON_BoxesDeepAndMapLikeSubtrees(t *testing.T) { + input := `{ + "message": "test", + "http": { + "method": "GET", + "status_code": 200, + "request": { + "headers": { + "x-request-id": "abc" + } + } + }, + "labels": { + "customer_123": "enabled" + } + }` + + result := PreprocessJSON([]byte(input)) + require.True(t, result.IsJSON) + + assert.Equal(t, "http.method,http.request,http.status_code,labels", result.JSONContextSchema) + assert.Equal(t, []string{"http.method", "http.request", "http.status_code", "labels"}, result.JSONContextKeys) + assert.Equal(t, "GET", result.JSONContextValues[0]) + assert.Equal(t, map[string]interface{}{"headers": map[string]interface{}{"x-request-id": "abc"}}, result.JSONContextValues[1]) + assert.Equal(t, json.Number("200"), result.JSONContextValues[2]) + assert.Equal(t, map[string]interface{}{"customer_123": "enabled"}, result.JSONContextValues[3]) } func TestPreprocessJSON_EmptyContextAfterExtraction(t *testing.T) { @@ -295,6 +325,33 @@ func TestPreprocessJSON_PreservesLargeIntegerAsJSONNumber(t *testing.T) { assert.Equal(t, "2323969980879066318", number.String()) } +func TestPreprocessJSON_EscapesDottedPathSegments(t *testing.T) { + tests := []struct { + name string + input string + expectedSchema string + }{ + { + name: "top-level dotted key", + input: `{"message":"test","dd.span_id":123}`, + expectedSchema: `dd\.span_id`, + }, + { + name: "nested key", + input: `{"message":"test","dd":{"span_id":123}}`, + expectedSchema: `dd.span_id`, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := PreprocessJSON([]byte(tt.input)) + require.True(t, result.IsJSON) + assert.Equal(t, tt.expectedSchema, result.JSONContextSchema) + }) + } +} + func TestGetValueByPath(t *testing.T) { data := map[string]interface{}{ "top": "value", @@ -375,8 +432,8 @@ func TestExtractSchemaAndValues(t *testing.T) { "m": "middle", } - schema, values := extractSchemaAndValues(data) - assert.Equal(t, "a,m,z", schema) + keys, values := extractSchemaAndValues(data) + assert.Equal(t, []string{"a", "m", "z"}, keys) assert.Equal(t, []interface{}{"first", "middle", "last"}, values) } @@ -389,11 +446,11 @@ func TestExtractSchemaAndValues_MixedTypes(t *testing.T) { "empty": nil, } - schema, values := extractSchemaAndValues(data) - assert.Equal(t, "count,empty,flag,name,nested", schema) + keys, values := extractSchemaAndValues(data) + assert.Equal(t, []string{"count", "empty", "flag", "name", "nested.key"}, keys) assert.Equal( t, - []interface{}{json.Number("42"), nil, true, "test", map[string]interface{}{"key": "val"}}, + []interface{}{json.Number("42"), nil, true, "test", "val"}, values, ) } diff --git a/pkg/logs/sender/grpc/mock_state.go b/pkg/logs/sender/grpc/mock_state.go index 0923532bd850..0fbde53ba392 100644 --- a/pkg/logs/sender/grpc/mock_state.go +++ b/pkg/logs/sender/grpc/mock_state.go @@ -37,7 +37,7 @@ type batchEntry struct { msg *message.Message content string // preprocessed content (JSON extracted message, or raw string) messageKey string // JSON key the message was extracted from (e.g. "msg", "message") - jsonContextSchema string // comma-separated sorted keys (e.g. "level,service,timestamp") + jsonContextKeys []string jsonContextValues []interface{} isRawJSON bool // true when patterns.json_as_raw=true — skip tokenization, send as RawLog } @@ -202,7 +202,7 @@ func (mt *MessageTranslator) Start(inputChan chan *message.Message, bufferSize i } else if results := processor.PreprocessJSON(content); results.Message != "" { entry.content = results.Message entry.messageKey = results.MessageKey - entry.jsonContextSchema = results.JSONContextSchema + entry.jsonContextKeys = results.JSONContextKeys entry.jsonContextValues = results.JSONContextValues } else { entry.content = string(content) @@ -302,7 +302,7 @@ func (mt *MessageTranslator) processBatch(batch []batchEntry, outputChan chan *m log.Warnf("Failed to tokenize log message: %v", tokenResults[i].Err) continue } - mt.processPreTokenized(entry.msg, tokenResults[i].TokenList, entry.messageKey, entry.jsonContextSchema, entry.jsonContextValues, outputChan) + mt.processPreTokenized(entry.msg, tokenResults[i].TokenList, entry.messageKey, entry.jsonContextKeys, entry.jsonContextValues, outputChan) } } @@ -331,12 +331,13 @@ func (mt *MessageTranslator) processMessage(msg *message.Message, outputChan cha return } contentStr := string(content) - var messageKey, jsonContextSchema string + var messageKey string + var jsonContextKeys []string var jsonContextValues []interface{} if results := processor.PreprocessJSON(content); results.Message != "" { contentStr = results.Message messageKey = results.MessageKey - jsonContextSchema = results.JSONContextSchema + jsonContextKeys = results.JSONContextKeys jsonContextValues = results.JSONContextValues } tokenList, err := mt.tokenizer.Tokenize(contentStr) @@ -344,12 +345,12 @@ func (mt *MessageTranslator) processMessage(msg *message.Message, outputChan cha log.Warnf("Failed to tokenize log message: %v", err) return } - mt.processPreTokenized(msg, tokenList, messageKey, jsonContextSchema, jsonContextValues, outputChan) + mt.processPreTokenized(msg, tokenList, messageKey, jsonContextKeys, jsonContextValues, outputChan) } // processPreTokenized handles post-tokenization: clustering, eviction, datum construction, and sending. // Called by both processBatch (batch pipeline) and processMessage (single-message path). -func (mt *MessageTranslator) processPreTokenized(msg *message.Message, tokenList *token.TokenList, messageKey string, jsonContextSchema string, jsonContextValues []interface{}, outputChan chan *message.StatefulMessage) { +func (mt *MessageTranslator) processPreTokenized(msg *message.Message, tokenList *token.TokenList, messageKey string, jsonContextKeys []string, jsonContextValues []interface{}, outputChan chan *message.StatefulMessage) { var patternDefineSent bool var patternDefineParamCount uint32 @@ -449,8 +450,8 @@ func (mt *MessageTranslator) processPreTokenized(msg *message.Message, tokenList var messageKeyDV *statefulpb.DynamicValue var jsonContextSchemaID uint64 var jsonContextValuesDV []*statefulpb.DynamicValue - if jsonContextSchema != "" { - messageKeyDV, jsonContextSchemaID = mt.sendJsonSchemaDefineIfNeeded(outputChan, msg, messageKey, jsonContextSchema) + if len(jsonContextKeys) > 0 { + messageKeyDV, jsonContextSchemaID = mt.sendJsonSchemaDefineIfNeeded(outputChan, msg, messageKey, jsonContextKeys) jsonContextDVBacking := make([]statefulpb.DynamicValue, len(jsonContextValues)) jsonContextTypeBacking := make([]dvTypeBackings, len(jsonContextValues)) @@ -600,7 +601,7 @@ func (mt *MessageTranslator) buildStatusField(msg *message.Message) (uint64, str return dictID, status, isNew } -func (mt *MessageTranslator) sendJsonSchemaDefineIfNeeded(outputChan chan *message.StatefulMessage, msg *message.Message, messageKey string, jsonContextSchema string) (*statefulpb.DynamicValue, uint64) { +func (mt *MessageTranslator) sendJsonSchemaDefineIfNeeded(outputChan chan *message.StatefulMessage, msg *message.Message, messageKey string, keys []string) (*statefulpb.DynamicValue, uint64) { if messageKey == "" { messageKey = "message" } @@ -609,8 +610,9 @@ func (mt *MessageTranslator) sendJsonSchemaDefineIfNeeded(outputChan chan *messa mt.sendDictEntryDefine(outputChan, msg, messageKeyID, messageKey) } - keys := strings.Split(jsonContextSchema, ",") keyIDs := make([]uint64, 0, len(keys)) + schemaKeyBuilder := strings.Builder{} + schemaKeyBuilder.WriteString(messageKey) for _, key := range keys { if key == "" { continue @@ -620,9 +622,11 @@ func (mt *MessageTranslator) sendJsonSchemaDefineIfNeeded(outputChan chan *messa mt.sendDictEntryDefine(outputChan, msg, keyID, key) } keyIDs = append(keyIDs, keyID) + schemaKeyBuilder.WriteByte('\x00') + schemaKeyBuilder.WriteString(key) } - schemaKey := messageKey + "\x00" + jsonContextSchema + schemaKey := schemaKeyBuilder.String() schemaID, ok := mt.jsonSchemaToID[schemaKey] if !ok { mt.nextJSONSchemaID++ diff --git a/pkg/proto/datadog/stateful/stateful_encoding.proto b/pkg/proto/datadog/stateful/stateful_encoding.proto index af0bae6c7069..145c473a5af4 100644 --- a/pkg/proto/datadog/stateful/stateful_encoding.proto +++ b/pkg/proto/datadog/stateful/stateful_encoding.proto @@ -104,6 +104,8 @@ message Log { message JsonSchemaDefine { uint64 schema_id = 1; + // Dictionary IDs for escaped JSON paths. Dots separate nested fields; literal dots + // and backslashes inside field names are backslash-escaped. repeated uint64 keys = 2; uint64 message_key_id = 3; } diff --git a/pkg/proto/pbgo/statefulpb/stateful_encoding.pb.go b/pkg/proto/pbgo/statefulpb/stateful_encoding.pb.go index a56e113a16d8..8af139717591 100644 --- a/pkg/proto/pbgo/statefulpb/stateful_encoding.pb.go +++ b/pkg/proto/pbgo/statefulpb/stateful_encoding.pb.go @@ -635,10 +635,12 @@ func (*Log_Structured) isLog_Content() {} func (*Log_Raw) isLog_Content() {} type JsonSchemaDefine struct { - state protoimpl.MessageState `protogen:"open.v1"` - SchemaId uint64 `protobuf:"varint,1,opt,name=schema_id,json=schemaId,proto3" json:"schema_id,omitempty"` - Keys []uint64 `protobuf:"varint,2,rep,packed,name=keys,proto3" json:"keys,omitempty"` - MessageKeyId uint64 `protobuf:"varint,3,opt,name=message_key_id,json=messageKeyId,proto3" json:"message_key_id,omitempty"` + state protoimpl.MessageState `protogen:"open.v1"` + SchemaId uint64 `protobuf:"varint,1,opt,name=schema_id,json=schemaId,proto3" json:"schema_id,omitempty"` + // Dictionary IDs for escaped JSON paths. Dots separate nested fields; literal dots + // and backslashes inside field names are backslash-escaped. + Keys []uint64 `protobuf:"varint,2,rep,packed,name=keys,proto3" json:"keys,omitempty"` + MessageKeyId uint64 `protobuf:"varint,3,opt,name=message_key_id,json=messageKeyId,proto3" json:"message_key_id,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache }