Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions pkg/config/setup/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1487,6 +1487,15 @@ func bindEnvAndSetLogsConfigKeys(config pkgconfigmodel.Setup, prefix string) {
config.BindEnvAndSetDefault("logs_config.patterns.saturation_threshold", 50)
config.BindEnvAndSetDefault("logs_config.patterns.max_patterns_per_cluster", 0)
config.BindEnvAndSetDefault("logs_config.patterns.pattern_scan_budget", 0)
// Logs whose raw token content exceeds this size (bytes) are sent as RawLog datums
// instead of being pattern-encoded. Prevents huge one-off logs (e.g. AWS instance
// metadata dumps) from bloating snapshot state with useless large templates.
// 0 = unlimited. Default: 1024 (1 KB).
config.BindEnvAndSetDefault("logs_config.patterns.max_template_bytes", 1024)
// When true, JSON logs bypass stateful encoding entirely and are sent as RawLog datums.
// Eliminates PreprocessJSON + tokenization + clustering cost for JSON-heavy workloads.
// Trade-off: no pattern compression for JSON logs; transport-level compression still applies.
config.BindEnvAndSetDefault("logs_config.patterns.json_as_raw", false)

config.BindEnvAndSetDefault("logs_config.tags.max_tag_count", 700)
config.BindEnvAndSetDefault("logs_config.tags.max_memory_bytes", 4*1024*1024)
Expand Down
24 changes: 23 additions & 1 deletion pkg/logs/patterns/clustering/cluster_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ const (
PatternNew
// PatternUpdated means an existing pattern's structure changed (more wildcards added)
PatternUpdated
// PatternTooLarge means the log exceeds the max template size and should be sent as a RawLog.
// No pattern is created; the caller must not dereference the returned (nil) pattern.
PatternTooLarge
)

// ClusterManager manages the clustering of TokenLists using hash-based bucketing.
Expand Down Expand Up @@ -61,6 +64,10 @@ type ClusterManager struct {
maxPatternsPerCluster int
// scanBudget limits CanMerge iterations per message in the full-scan loop. 0 = unlimited.
scanBudget int
// maxTemplateSizeBytes rejects logs whose raw token content exceeds this byte threshold,
// sending them as RawLog instead. 0 = unlimited. Prevents single huge logs (e.g. AWS
// instance metadata dumps) from bloating snapshot state with useless ~1MB templates.
maxTemplateSizeBytes int
}

// NewClusterManager creates a new ClusterManager.
Expand All @@ -78,7 +85,8 @@ func NewClusterManager() *ClusterManager {
// saturatedThreshold: consecutive identical merges before pattern is marked saturated (0 = disabled).
// maxPatternsPerCluster: per-cluster pattern cap; 0 = unlimited.
// scanBudget: CanMerge iterations per message in the full-scan loop; 0 = unlimited.
func NewClusterManagerWithConfig(firstWordProtection bool, firstWordMaxCardinality int, saturatedThreshold int, maxPatternsPerCluster int, scanBudget int) *ClusterManager {
// maxTemplateSizeBytes: reject logs whose raw content exceeds this size; 0 = unlimited.
func NewClusterManagerWithConfig(firstWordProtection bool, firstWordMaxCardinality int, saturatedThreshold int, maxPatternsPerCluster int, scanBudget int, maxTemplateSizeBytes int) *ClusterManager {
return &ClusterManager{
hashBuckets: make(map[uint64][]*Cluster),
nextID: 1,
Expand All @@ -87,6 +95,7 @@ func NewClusterManagerWithConfig(firstWordProtection bool, firstWordMaxCardinali
saturatedThreshold: saturatedThreshold,
maxPatternsPerCluster: maxPatternsPerCluster,
scanBudget: scanBudget,
maxTemplateSizeBytes: maxTemplateSizeBytes,
}
}

Expand Down Expand Up @@ -116,6 +125,19 @@ func (cm *ClusterManager) Add(tokenList *token.TokenList) (*Pattern, PatternChan
return nil, PatternNoChange, 0, 0
}

// Reject logs whose raw content exceeds the template size limit. A 1KB+ template is
// almost never reused (e.g. AWS metadata dumps), so storing it wastes snapshot bytes.
// The caller should send these as RawLog datums instead.
if cm.maxTemplateSizeBytes > 0 {
rawLen := 0
for i := range tokenList.Tokens {
rawLen += len(tokenList.Tokens[i].Value)
}
if rawLen > cm.maxTemplateSizeBytes {
return nil, PatternTooLarge, cm.patternCount, cm.estimatedBytes
}
}

// Lock the cluster manager to prevent concurrent access to the hash buckets. Current implementation is single-threaded on local pipeline, but we will eventually build a shared cluster manager across multiple pipelines.
// todo: implement a shared cluster manager across multiple pipelines
cm.mu.Lock()
Expand Down
10 changes: 5 additions & 5 deletions pkg/logs/patterns/clustering/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ func TestCluster_Size_MultiPattern(t *testing.T) {

func TestCluster_Saturation_BecomeSaturatedAfterThreshold(t *testing.T) {
threshold := 5
cm := NewClusterManagerWithConfig(true, 0, threshold, 0, 0)
cm := NewClusterManagerWithConfig(true, 0, threshold, 0, 0, 0)

// Seed: first log creates the pattern
seed := token.NewTokenListWithTokens([]token.Token{
Expand Down Expand Up @@ -399,7 +399,7 @@ func TestCluster_Saturation_BecomeSaturatedAfterThreshold(t *testing.T) {

func TestCluster_Saturation_ResetOnStructuralChange(t *testing.T) {
threshold := 3
cm := NewClusterManagerWithConfig(true, 0, threshold, 0, 0)
cm := NewClusterManagerWithConfig(true, 0, threshold, 0, 0, 0)

// Seed + second log to create wildcards at position 2
seed := token.NewTokenListWithTokens([]token.Token{
Expand Down Expand Up @@ -502,7 +502,7 @@ func TestCluster_Saturation_DesaturateOnUnexpectedMiss(t *testing.T) {
}

func TestCluster_Saturation_DisabledWhenThresholdZero(t *testing.T) {
cm := NewClusterManagerWithConfig(true, 0, 0, 0, 0) // threshold=0 disables saturation
cm := NewClusterManagerWithConfig(true, 0, 0, 0, 0, 0) // threshold=0 disables saturation

seed := token.NewTokenListWithTokens([]token.Token{
{Value: "Error", Type: token.TokenWord, Wildcard: token.NotWildcard},
Expand Down Expand Up @@ -537,7 +537,7 @@ func TestCluster_Saturation_DisabledWhenThresholdZero(t *testing.T) {

func TestCluster_Saturation_SkipsPositionsRebuildOnIdenticalMerge(t *testing.T) {
threshold := 2
cm := NewClusterManagerWithConfig(true, 0, threshold, 0, 0)
cm := NewClusterManagerWithConfig(true, 0, threshold, 0, 0, 0)

// Seed + second log to create wildcard at position 2
seed := token.NewTokenListWithTokens([]token.Token{
Expand Down Expand Up @@ -578,7 +578,7 @@ func TestCluster_Saturation_SkipsPositionsRebuildOnIdenticalMerge(t *testing.T)

func TestCluster_Saturation_SaturatedPatternStillMergesCorrectly(t *testing.T) {
threshold := 3
cm := NewClusterManagerWithConfig(true, 0, threshold, 0, 0)
cm := NewClusterManagerWithConfig(true, 0, threshold, 0, 0, 0)

// Build and saturate a pattern
seed := token.NewTokenListWithTokens([]token.Token{
Expand Down
239 changes: 239 additions & 0 deletions pkg/logs/patterns/preprocessor/json.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

// Package preprocessor provides JSON-aware preprocessing for stateful log encoding.
// It extracts message fields from JSON logs and serializes remaining fields into ordered json_context.
package preprocessor

import (
"bytes"
"encoding/json"
"sort"
"strings"

jsoniter "github.com/json-iterator/go"
)

// jsonAPI is a drop-in replacement for encoding/json using jsoniter for ~3-5x faster
// Marshal. UseNumber preserves 64-bit integer precision through the unmarshal/marshal
// round-trip — without it, integers larger than 2^53 (e.g. trace IDs, span IDs)
// silently lose precision via float64.
var jsonAPI = jsoniter.Config{
EscapeHTML: true,
SortMapKeys: true,
ValidateJsonRawMessage: true,
UseNumber: true,
}.Froze()

// ExtractionResult contains the result of JSON preprocessing
type ExtractionResult struct {
// IsJSON indicates whether the input was valid JSON
IsJSON bool
// Message is the extracted message field (empty if not found or not JSON)
Message string
// MessageKey is the JSON key the message was extracted from (e.g. "msg", "message")
MessageKey string
// JSONContextSchema is a comma-separated sorted list of top-level keys for the JSON context.
// When populated, JSONContextValues contains the corresponding values in the same order.
// Example: for {"level":"info","pid":1234,"service":"api"}, schema is "level,pid,service".
JSONContextSchema string
// JSONContextValues contains the leaf values corresponding to JSONContextSchema keys, in order.
// Nested objects/arrays are serialized as JSON strings.
JSONContextValues []string
}

// Common top-level message field names (Layer 0)
// These cover the vast majority of structured logs from popular logging libraries
var topLevelMessageKeys = []string{
"message",
"msg",
"log",
"text",
}

// Common nested paths (Layer 1 fallback)
// Some applications wrap their log message in a data/event/payload envelope
var nestedMessagePaths = []string{
"data.message", // Generic data wrapper
"event.message", // Event-based logs
"payload.message", // Payload wrapper
}

// PreprocessJSON attempts to extract a message field from JSON logs and serialize remaining fields.
func PreprocessJSON(content []byte) ExtractionResult {
fail := ExtractionResult{IsJSON: false}

// Check if it's a JSON object (handles leading whitespace)
if !IsJSONObject(content) {
return fail
}

// Parse JSON
var data map[string]interface{}
if err := jsonAPI.Unmarshal(content, &data); err != nil {
return fail
}

// Try to extract message using layered strategy
message, extractedPath := extractMessage(data)
if message == "" {
return fail
}

// Remove the extracted message field from data for jsoncontext construction
removeFieldByPath(data, extractedPath)

// If no fields remain after removing the message, no context to send.
if len(data) == 0 {
return ExtractionResult{
IsJSON: true,
Message: message,
MessageKey: extractedPath,
}
}

// Schema-based encoding: extract sorted keys and leaf values.
// Nested objects/arrays are serialized as JSON strings in the values list.
schema, values := extractSchemaAndValues(data)

return ExtractionResult{
IsJSON: true,
Message: message,
MessageKey: extractedPath,
JSONContextSchema: schema,
JSONContextValues: values,
}
}

// extractSchemaAndValues extracts sorted keys and their corresponding values from a JSON map.
// Primitive values (string, number, bool, null) are converted to strings.
// Nested objects and arrays are serialized as JSON strings.
func extractSchemaAndValues(data map[string]interface{}) (string, []string) {
keys := make([]string, 0, len(data))
for k := range data {
keys = append(keys, k)
}
sort.Strings(keys)

values := make([]string, len(keys))
for i, k := range keys {
values[i] = valueToString(data[k])
}

return strings.Join(keys, ","), values
}

// valueToString converts a JSON value to its string representation.
// Primitives are converted directly; objects and arrays are serialized as JSON.
func valueToString(v interface{}) string {
switch val := v.(type) {
case string:
return val
case json.Number:
return val.String()
case bool:
if val {
return "true"
}
return "false"
case nil:
return ""
default:
b, err := jsonAPI.Marshal(val)
if err != nil {
return ""
}
return string(b)
}
}

// extractMessage attempts to extract a message field using the layered strategy
func extractMessage(data map[string]interface{}) (string, string) {
// Layer 0: Top-level common keys
for _, key := range topLevelMessageKeys {
if val, ok := data[key]; ok {
if str, ok := val.(string); ok && str != "" {
return str, key
}
}
}

// Layer 1: Common nested paths (e.g., data.message, event.message)
for _, path := range nestedMessagePaths {
if val := getValueByPath(data, path); val != "" {
return val, path
}
}

return "", ""
}

// getValueByPath retrieves a string value from nested JSON using dot notation
// e.g., "data.message" -> data["data"]["message"]
func getValueByPath(data map[string]interface{}, path string) string {
parts := strings.Split(path, ".")
if len(parts) == 0 {
return ""
}

current := data
for _, part := range parts[:len(parts)-1] {
val, ok := current[part]
if !ok {
return ""
}

nextMap, ok := val.(map[string]interface{})
if !ok {
return ""
}
current = nextMap
}

leaf, ok := current[parts[len(parts)-1]]
if !ok {
return ""
}
str, _ := leaf.(string)
return str
}

// removeFieldByPath removes a field from nested JSON using dot notation
func removeFieldByPath(data map[string]interface{}, path string) {
if path == "" {
return
}

parts := strings.Split(path, ".")
if len(parts) == 1 {
// Top-level key
delete(data, parts[0])
return
}

// Navigate to parent
current := data
for i := 0; i < len(parts)-1; i++ {
val, ok := current[parts[i]]
if !ok {
return
}
if nextMap, ok := val.(map[string]interface{}); ok {
current = nextMap
} else {
return
}
}

// Delete the final key
delete(current, parts[len(parts)-1])
}

// IsJSONObject checks if content is a JSON object, handling leading whitespace.
// Exported for use by callers that need a cheap JSON detection without a full parse.
func IsJSONObject(content []byte) bool {
trimmed := bytes.TrimLeft(content, " \t\n\r")
return len(trimmed) > 0 && trimmed[0] == '{'
}
Loading
Loading