diff --git a/pkg/config/setup/config.go b/pkg/config/setup/config.go index 43f9517adb05..af967c941913 100644 --- a/pkg/config/setup/config.go +++ b/pkg/config/setup/config.go @@ -1487,6 +1487,15 @@ func bindEnvAndSetLogsConfigKeys(config pkgconfigmodel.Setup, prefix string) { config.BindEnvAndSetDefault("logs_config.patterns.saturation_threshold", 50) config.BindEnvAndSetDefault("logs_config.patterns.max_patterns_per_cluster", 0) config.BindEnvAndSetDefault("logs_config.patterns.pattern_scan_budget", 0) + // Logs whose raw token content exceeds this size (bytes) are sent as RawLog datums + // instead of being pattern-encoded. Prevents huge one-off logs (e.g. AWS instance + // metadata dumps) from bloating snapshot state with useless large templates. + // 0 = unlimited. Default: 1024 (1 KB). + config.BindEnvAndSetDefault("logs_config.patterns.max_template_bytes", 1024) + // When true, JSON logs bypass stateful encoding entirely and are sent as RawLog datums. + // Eliminates PreprocessJSON + tokenization + clustering cost for JSON-heavy workloads. + // Trade-off: no pattern compression for JSON logs; transport-level compression still applies. + config.BindEnvAndSetDefault("logs_config.patterns.json_as_raw", false) config.BindEnvAndSetDefault("logs_config.tags.max_tag_count", 700) config.BindEnvAndSetDefault("logs_config.tags.max_memory_bytes", 4*1024*1024) diff --git a/pkg/logs/patterns/clustering/cluster_manager.go b/pkg/logs/patterns/clustering/cluster_manager.go index c8d47ac4f781..775ece6220d4 100644 --- a/pkg/logs/patterns/clustering/cluster_manager.go +++ b/pkg/logs/patterns/clustering/cluster_manager.go @@ -26,6 +26,9 @@ const ( PatternNew // PatternUpdated means an existing pattern's structure changed (more wildcards added) PatternUpdated + // PatternTooLarge means the log exceeds the max template size and should be sent as a RawLog. + // No pattern is created; the caller must not dereference the returned (nil) pattern. + PatternTooLarge ) // ClusterManager manages the clustering of TokenLists using hash-based bucketing. @@ -61,6 +64,10 @@ type ClusterManager struct { maxPatternsPerCluster int // scanBudget limits CanMerge iterations per message in the full-scan loop. 0 = unlimited. scanBudget int + // maxTemplateSizeBytes rejects logs whose raw token content exceeds this byte threshold, + // sending them as RawLog instead. 0 = unlimited. Prevents single huge logs (e.g. AWS + // instance metadata dumps) from bloating snapshot state with useless ~1MB templates. + maxTemplateSizeBytes int } // NewClusterManager creates a new ClusterManager. @@ -78,7 +85,8 @@ func NewClusterManager() *ClusterManager { // saturatedThreshold: consecutive identical merges before pattern is marked saturated (0 = disabled). // maxPatternsPerCluster: per-cluster pattern cap; 0 = unlimited. // scanBudget: CanMerge iterations per message in the full-scan loop; 0 = unlimited. -func NewClusterManagerWithConfig(firstWordProtection bool, firstWordMaxCardinality int, saturatedThreshold int, maxPatternsPerCluster int, scanBudget int) *ClusterManager { +// maxTemplateSizeBytes: reject logs whose raw content exceeds this size; 0 = unlimited. +func NewClusterManagerWithConfig(firstWordProtection bool, firstWordMaxCardinality int, saturatedThreshold int, maxPatternsPerCluster int, scanBudget int, maxTemplateSizeBytes int) *ClusterManager { return &ClusterManager{ hashBuckets: make(map[uint64][]*Cluster), nextID: 1, @@ -87,6 +95,7 @@ func NewClusterManagerWithConfig(firstWordProtection bool, firstWordMaxCardinali saturatedThreshold: saturatedThreshold, maxPatternsPerCluster: maxPatternsPerCluster, scanBudget: scanBudget, + maxTemplateSizeBytes: maxTemplateSizeBytes, } } @@ -116,6 +125,19 @@ func (cm *ClusterManager) Add(tokenList *token.TokenList) (*Pattern, PatternChan return nil, PatternNoChange, 0, 0 } + // Reject logs whose raw content exceeds the template size limit. A 1KB+ template is + // almost never reused (e.g. AWS metadata dumps), so storing it wastes snapshot bytes. + // The caller should send these as RawLog datums instead. + if cm.maxTemplateSizeBytes > 0 { + rawLen := 0 + for i := range tokenList.Tokens { + rawLen += len(tokenList.Tokens[i].Value) + } + if rawLen > cm.maxTemplateSizeBytes { + return nil, PatternTooLarge, cm.patternCount, cm.estimatedBytes + } + } + // Lock the cluster manager to prevent concurrent access to the hash buckets. Current implementation is single-threaded on local pipeline, but we will eventually build a shared cluster manager across multiple pipelines. // todo: implement a shared cluster manager across multiple pipelines cm.mu.Lock() diff --git a/pkg/logs/patterns/clustering/cluster_test.go b/pkg/logs/patterns/clustering/cluster_test.go index 99f511b6fc5b..8b4ca748b516 100644 --- a/pkg/logs/patterns/clustering/cluster_test.go +++ b/pkg/logs/patterns/clustering/cluster_test.go @@ -349,7 +349,7 @@ func TestCluster_Size_MultiPattern(t *testing.T) { func TestCluster_Saturation_BecomeSaturatedAfterThreshold(t *testing.T) { threshold := 5 - cm := NewClusterManagerWithConfig(true, 0, threshold, 0, 0) + cm := NewClusterManagerWithConfig(true, 0, threshold, 0, 0, 0) // Seed: first log creates the pattern seed := token.NewTokenListWithTokens([]token.Token{ @@ -399,7 +399,7 @@ func TestCluster_Saturation_BecomeSaturatedAfterThreshold(t *testing.T) { func TestCluster_Saturation_ResetOnStructuralChange(t *testing.T) { threshold := 3 - cm := NewClusterManagerWithConfig(true, 0, threshold, 0, 0) + cm := NewClusterManagerWithConfig(true, 0, threshold, 0, 0, 0) // Seed + second log to create wildcards at position 2 seed := token.NewTokenListWithTokens([]token.Token{ @@ -502,7 +502,7 @@ func TestCluster_Saturation_DesaturateOnUnexpectedMiss(t *testing.T) { } func TestCluster_Saturation_DisabledWhenThresholdZero(t *testing.T) { - cm := NewClusterManagerWithConfig(true, 0, 0, 0, 0) // threshold=0 disables saturation + cm := NewClusterManagerWithConfig(true, 0, 0, 0, 0, 0) // threshold=0 disables saturation seed := token.NewTokenListWithTokens([]token.Token{ {Value: "Error", Type: token.TokenWord, Wildcard: token.NotWildcard}, @@ -537,7 +537,7 @@ func TestCluster_Saturation_DisabledWhenThresholdZero(t *testing.T) { func TestCluster_Saturation_SkipsPositionsRebuildOnIdenticalMerge(t *testing.T) { threshold := 2 - cm := NewClusterManagerWithConfig(true, 0, threshold, 0, 0) + cm := NewClusterManagerWithConfig(true, 0, threshold, 0, 0, 0) // Seed + second log to create wildcard at position 2 seed := token.NewTokenListWithTokens([]token.Token{ @@ -578,7 +578,7 @@ func TestCluster_Saturation_SkipsPositionsRebuildOnIdenticalMerge(t *testing.T) func TestCluster_Saturation_SaturatedPatternStillMergesCorrectly(t *testing.T) { threshold := 3 - cm := NewClusterManagerWithConfig(true, 0, threshold, 0, 0) + cm := NewClusterManagerWithConfig(true, 0, threshold, 0, 0, 0) // Build and saturate a pattern seed := token.NewTokenListWithTokens([]token.Token{ diff --git a/pkg/logs/patterns/preprocessor/json.go b/pkg/logs/patterns/preprocessor/json.go new file mode 100644 index 000000000000..9facb7cb5ced --- /dev/null +++ b/pkg/logs/patterns/preprocessor/json.go @@ -0,0 +1,239 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +// Package preprocessor provides JSON-aware preprocessing for stateful log encoding. +// It extracts message fields from JSON logs and serializes remaining fields into ordered json_context. +package preprocessor + +import ( + "bytes" + "encoding/json" + "sort" + "strings" + + jsoniter "github.com/json-iterator/go" +) + +// jsonAPI is a drop-in replacement for encoding/json using jsoniter for ~3-5x faster +// Marshal. UseNumber preserves 64-bit integer precision through the unmarshal/marshal +// round-trip — without it, integers larger than 2^53 (e.g. trace IDs, span IDs) +// silently lose precision via float64. +var jsonAPI = jsoniter.Config{ + EscapeHTML: true, + SortMapKeys: true, + ValidateJsonRawMessage: true, + UseNumber: true, +}.Froze() + +// ExtractionResult contains the result of JSON preprocessing +type ExtractionResult struct { + // IsJSON indicates whether the input was valid JSON + IsJSON bool + // Message is the extracted message field (empty if not found or not JSON) + Message string + // MessageKey is the JSON key the message was extracted from (e.g. "msg", "message") + MessageKey string + // JSONContextSchema is a comma-separated sorted list of top-level keys for the JSON context. + // When populated, JSONContextValues contains the corresponding values in the same order. + // Example: for {"level":"info","pid":1234,"service":"api"}, schema is "level,pid,service". + JSONContextSchema string + // JSONContextValues contains the leaf values corresponding to JSONContextSchema keys, in order. + // Nested objects/arrays are serialized as JSON strings. + JSONContextValues []string +} + +// Common top-level message field names (Layer 0) +// These cover the vast majority of structured logs from popular logging libraries +var topLevelMessageKeys = []string{ + "message", + "msg", + "log", + "text", +} + +// Common nested paths (Layer 1 fallback) +// Some applications wrap their log message in a data/event/payload envelope +var nestedMessagePaths = []string{ + "data.message", // Generic data wrapper + "event.message", // Event-based logs + "payload.message", // Payload wrapper +} + +// PreprocessJSON attempts to extract a message field from JSON logs and serialize remaining fields. +func PreprocessJSON(content []byte) ExtractionResult { + fail := ExtractionResult{IsJSON: false} + + // Check if it's a JSON object (handles leading whitespace) + if !IsJSONObject(content) { + return fail + } + + // Parse JSON + var data map[string]interface{} + if err := jsonAPI.Unmarshal(content, &data); err != nil { + return fail + } + + // Try to extract message using layered strategy + message, extractedPath := extractMessage(data) + if message == "" { + return fail + } + + // Remove the extracted message field from data for jsoncontext construction + removeFieldByPath(data, extractedPath) + + // If no fields remain after removing the message, no context to send. + if len(data) == 0 { + return ExtractionResult{ + IsJSON: true, + Message: message, + MessageKey: extractedPath, + } + } + + // Schema-based encoding: extract sorted keys and leaf values. + // Nested objects/arrays are serialized as JSON strings in the values list. + schema, values := extractSchemaAndValues(data) + + return ExtractionResult{ + IsJSON: true, + Message: message, + MessageKey: extractedPath, + JSONContextSchema: schema, + JSONContextValues: values, + } +} + +// extractSchemaAndValues extracts sorted keys and their corresponding values from a JSON map. +// Primitive values (string, number, bool, null) are converted to strings. +// Nested objects and arrays are serialized as JSON strings. +func extractSchemaAndValues(data map[string]interface{}) (string, []string) { + keys := make([]string, 0, len(data)) + for k := range data { + keys = append(keys, k) + } + sort.Strings(keys) + + values := make([]string, len(keys)) + for i, k := range keys { + values[i] = valueToString(data[k]) + } + + return strings.Join(keys, ","), values +} + +// valueToString converts a JSON value to its string representation. +// Primitives are converted directly; objects and arrays are serialized as JSON. +func valueToString(v interface{}) string { + switch val := v.(type) { + case string: + return val + case json.Number: + return val.String() + case bool: + if val { + return "true" + } + return "false" + case nil: + return "" + default: + b, err := jsonAPI.Marshal(val) + if err != nil { + return "" + } + return string(b) + } +} + +// extractMessage attempts to extract a message field using the layered strategy +func extractMessage(data map[string]interface{}) (string, string) { + // Layer 0: Top-level common keys + for _, key := range topLevelMessageKeys { + if val, ok := data[key]; ok { + if str, ok := val.(string); ok && str != "" { + return str, key + } + } + } + + // Layer 1: Common nested paths (e.g., data.message, event.message) + for _, path := range nestedMessagePaths { + if val := getValueByPath(data, path); val != "" { + return val, path + } + } + + return "", "" +} + +// getValueByPath retrieves a string value from nested JSON using dot notation +// e.g., "data.message" -> data["data"]["message"] +func getValueByPath(data map[string]interface{}, path string) string { + parts := strings.Split(path, ".") + if len(parts) == 0 { + return "" + } + + current := data + for _, part := range parts[:len(parts)-1] { + val, ok := current[part] + if !ok { + return "" + } + + nextMap, ok := val.(map[string]interface{}) + if !ok { + return "" + } + current = nextMap + } + + leaf, ok := current[parts[len(parts)-1]] + if !ok { + return "" + } + str, _ := leaf.(string) + return str +} + +// removeFieldByPath removes a field from nested JSON using dot notation +func removeFieldByPath(data map[string]interface{}, path string) { + if path == "" { + return + } + + parts := strings.Split(path, ".") + if len(parts) == 1 { + // Top-level key + delete(data, parts[0]) + return + } + + // Navigate to parent + current := data + for i := 0; i < len(parts)-1; i++ { + val, ok := current[parts[i]] + if !ok { + return + } + if nextMap, ok := val.(map[string]interface{}); ok { + current = nextMap + } else { + return + } + } + + // Delete the final key + delete(current, parts[len(parts)-1]) +} + +// IsJSONObject checks if content is a JSON object, handling leading whitespace. +// Exported for use by callers that need a cheap JSON detection without a full parse. +func IsJSONObject(content []byte) bool { + trimmed := bytes.TrimLeft(content, " \t\n\r") + return len(trimmed) > 0 && trimmed[0] == '{' +} diff --git a/pkg/logs/patterns/processor/json.go b/pkg/logs/patterns/processor/json.go index f1fee8f20f0e..4b9ba93e68cb 100644 --- a/pkg/logs/patterns/processor/json.go +++ b/pkg/logs/patterns/processor/json.go @@ -10,7 +10,6 @@ package processor import ( "bytes" "encoding/json" - "fmt" "sort" "strings" ) @@ -49,6 +48,15 @@ var nestedMessagePaths = []string{ "payload.message", // Payload wrapper } +// unmarshalJSON decodes JSON into a map using UseNumber to preserve 64-bit integer precision. +// Without UseNumber, integers larger than 2^53 (e.g. trace IDs, span IDs) silently +// round-trip through float64 and lose precision. +func unmarshalJSON(content []byte, v interface{}) error { + d := json.NewDecoder(bytes.NewReader(content)) + d.UseNumber() + return d.Decode(v) +} + // PreprocessJSON attempts to extract a message field from JSON logs and serialize remaining fields. func PreprocessJSON(content []byte) ExtractionResult { fail := ExtractionResult{IsJSON: false} @@ -60,7 +68,7 @@ func PreprocessJSON(content []byte) ExtractionResult { // Parse JSON var data map[string]interface{} - if err := json.Unmarshal(content, &data); err != nil { + if err := unmarshalJSON(content, &data); err != nil { return fail } @@ -119,8 +127,8 @@ func valueToString(v interface{}) string { switch val := v.(type) { case string: return val - case float64: - return fmt.Sprintf("%g", val) + case json.Number: + return val.String() case bool: if val { return "true" diff --git a/pkg/logs/patterns/processor/json_test.go b/pkg/logs/patterns/processor/json_test.go index 6d35cb84dc8f..c1f6d1938d92 100644 --- a/pkg/logs/patterns/processor/json_test.go +++ b/pkg/logs/patterns/processor/json_test.go @@ -6,6 +6,7 @@ package processor import ( + "encoding/json" "testing" "github.com/stretchr/testify/assert" @@ -69,7 +70,7 @@ func TestPreprocessJSON_TopLevelMessage(t *testing.T) { expectedMsg: "User login", expectedKey: "msg", expectedSchema: "timestamp", - expectedValues: []string{"1.23456789e+09"}, + expectedValues: []string{"1234567890"}, }, { name: "log field", @@ -360,7 +361,7 @@ func TestExtractSchemaAndValues(t *testing.T) { func TestExtractSchemaAndValues_MixedTypes(t *testing.T) { data := map[string]interface{}{ - "count": float64(42), + "count": json.Number("42"), "flag": true, "name": "test", "nested": map[string]interface{}{"key": "val"}, @@ -371,3 +372,40 @@ func TestExtractSchemaAndValues_MixedTypes(t *testing.T) { assert.Equal(t, "count,empty,flag,name,nested", schema) assert.Equal(t, []string{"42", "", "true", "test", `{"key":"val"}`}, values) } + +func TestPreprocessJSON_LargeIntegerPrecision(t *testing.T) { + tests := []struct { + name string + input string + expectedValues []string + }{ + { + name: "64-bit trace ID preserved exactly", + input: `{"msg":"request","trace_id":9999999999999999}`, + expectedValues: []string{"9999999999999999"}, + }, + { + name: "64-bit span ID preserved exactly", + input: `{"msg":"request","span_id":18446744073709551615}`, + expectedValues: []string{"18446744073709551615"}, + }, + { + name: "normal float preserved", + input: `{"msg":"payment","amount":159.6}`, + expectedValues: []string{"159.6"}, + }, + { + name: "integer zero", + input: `{"msg":"test","count":0}`, + expectedValues: []string{"0"}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := PreprocessJSON([]byte(tt.input)) + require.True(t, result.IsJSON) + assert.Equal(t, tt.expectedValues, result.JSONContextValues) + }) + } +} diff --git a/pkg/logs/sender/grpc/mock_state.go b/pkg/logs/sender/grpc/mock_state.go index 62a31edd4b81..2cbd349c03f4 100644 --- a/pkg/logs/sender/grpc/mock_state.go +++ b/pkg/logs/sender/grpc/mock_state.go @@ -34,6 +34,7 @@ type batchEntry struct { messageKey string // JSON key the message was extracted from (e.g. "msg", "message") jsonContextSchema string // comma-separated sorted keys (e.g. "level,service,timestamp") jsonContextValues []string // leaf values in schema key order + isRawJSON bool // true when patterns.json_as_raw=true — skip tokenization, send as RawLog } func getTranslatorContent(msg *message.Message) []byte { @@ -68,7 +69,8 @@ type MessageTranslator struct { patternEvictionManager *clustering.EvictionManager tagManager *tags.TagManager tagEvictionManager *tags.TagEvictionManager - tokenizer token.Tokenizer + tokenizer token.Tokenizer + jsonLogsAsRaw bool // when true, JSON logs bypass stateful encoding and are sent as RawLog pipelineName string @@ -96,11 +98,13 @@ func NewMessageTranslator(pipelineName string, tokenizer token.Tokenizer) *Messa pkgconfigsetup.Datadog().GetInt("logs_config.patterns.saturation_threshold"), pkgconfigsetup.Datadog().GetInt("logs_config.patterns.max_patterns_per_cluster"), pkgconfigsetup.Datadog().GetInt("logs_config.patterns.pattern_scan_budget"), + pkgconfigsetup.Datadog().GetInt("logs_config.patterns.max_template_bytes"), ), patternEvictionManager: clustering.NewEvictionManager(), tagManager: tags.NewTagManager(), tagEvictionManager: tags.NewTagEvictionManager(), tokenizer: tokenizer, + jsonLogsAsRaw: pkgconfigsetup.Datadog().GetBool("logs_config.patterns.json_as_raw"), pipelineName: pipelineName, } tlmPipelineStateSize.Set(0, pipelineName) @@ -135,7 +139,12 @@ func (mt *MessageTranslator) Start(inputChan chan *message.Message, bufferSize i return // skip empty messages — no sidecar entry, no alignment break } entry := batchEntry{msg: msg} - if results := processor.PreprocessJSON(content); results.Message != "" { + if mt.jsonLogsAsRaw && len(content) > 0 && content[0] == '{' { + // json_as_raw: bypass stateful encoding for JSON logs entirely. + // Send as RawLog — no tokenization, no clustering, no snapshot state. + entry.content = string(content) + entry.isRawJSON = true + } else if results := processor.PreprocessJSON(content); results.Message != "" { entry.content = results.Message entry.messageKey = results.MessageKey entry.jsonContextSchema = results.JSONContextSchema @@ -184,14 +193,34 @@ func (mt *MessageTranslator) Start(inputChan chan *message.Message, bufferSize i // processBatch tokenizes a batch of pre-screened entries in one TokenizeBatch call, // then processes each sequentially through clustering and datum building. // All entries in the batch have non-empty content (empty messages are skipped before enqueueing). +// Entries with isRawJSON=true bypass tokenization and are sent as RawLog datums directly. func (mt *MessageTranslator) processBatch(batch []batchEntry, outputChan chan *message.StatefulMessage) { if len(batch) == 0 { return } - // Extract content strings for batch tokenization (aligned 1:1 with batch entries). - contents := make([]string, len(batch)) - for i, e := range batch { + // Partition: send raw JSON entries immediately, collect the rest for batch tokenization. + tokenBatch := batch[:0:0] + for _, entry := range batch { + if entry.isRawJSON { + ts := getMessageTimestamp(entry.msg) + tagSet, allTagsStr, dictID, isNew := mt.buildTagSet(entry.msg) + if isNew { + mt.sendDictEntryDefine(outputChan, entry.msg, dictID, allTagsStr) + } + mt.sendRawLog(outputChan, entry.msg, entry.content, ts, tagSet) + } else { + tokenBatch = append(tokenBatch, entry) + } + } + + if len(tokenBatch) == 0 { + return + } + + // Extract content strings for batch tokenization (aligned 1:1 with tokenBatch entries). + contents := make([]string, len(tokenBatch)) + for i, e := range tokenBatch { contents[i] = e.content } @@ -201,8 +230,8 @@ func (mt *MessageTranslator) processBatch(batch []batchEntry, outputChan chan *m tokenResults, _ := mt.tokenizer.TokenizeBatch(contents) // Process each entry sequentially — clustering is stateful and must be sequential. - // Alignment is guaranteed: tokenResults[i] corresponds to batch[i]. - for i, entry := range batch { + // Alignment is guaranteed: tokenResults[i] corresponds to tokenBatch[i]. + for i, entry := range tokenBatch { if i >= len(tokenResults) { break } @@ -221,6 +250,15 @@ func (mt *MessageTranslator) processMessage(msg *message.Message, outputChan cha if len(content) == 0 { return } + if mt.jsonLogsAsRaw && len(content) > 0 && content[0] == '{' { + ts := getMessageTimestamp(msg) + tagSet, allTagsStr, dictID, isNew := mt.buildTagSet(msg) + if isNew { + mt.sendDictEntryDefine(outputChan, msg, dictID, allTagsStr) + } + mt.sendRawLog(outputChan, msg, string(content), ts, tagSet) + return + } contentStr := string(content) var messageKey, jsonContextSchema string var jsonContextValues []string @@ -249,6 +287,16 @@ func (mt *MessageTranslator) processPreTokenized(msg *message.Message, tokenList // Process tokenized log through cluster manager to get/create pattern pattern, changeType, patternCount, estimatedBytes := mt.clusterManager.Add(tokenList) + // Log exceeds max_template_bytes — send as RawLog, don't store any pattern state. + if changeType == clustering.PatternTooLarge { + tagSet, allTagsStr, dictID, isNew := mt.buildTagSet(msg) + if isNew { + mt.sendDictEntryDefine(outputChan, msg, dictID, allTagsStr) + } + mt.sendRawLog(outputChan, msg, string(getTranslatorContent(msg)), ts, tagSet) + return + } + // CRITICAL: Extract all pattern data BEFORE eviction to prevent agent panic/data corruption. patternID := pattern.PatternID wildcardValues := pattern.GetWildcardValues(tokenList)