Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 121 additions & 17 deletions pkg/logs/patterns/processor/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,35 @@ type ExtractionResult struct {
Message string
// MessageKey is the JSON key the message was extracted from (e.g. "msg", "message")
MessageKey string
// JSONContextSchema is a comma-separated sorted list of top-level keys for the JSON context.
// When populated, JSONContextValues contains the corresponding values in the same order.
// Example: for {"level":"info","pid":1234,"service":"api"}, schema is "level,pid,service".
// JSONContextSchema is a comma-separated sorted list of escaped JSON paths for the JSON context.
JSONContextSchema string
// JSONContextValues contains the leaf values corresponding to JSONContextSchema keys, in order.
// Primitive values preserve their JSON type. Nested objects/arrays are preserved as decoded
// map/slice values so the transport layer can encode them as raw JSON.
// JSONContextKeys contains the escaped JSON paths corresponding to JSONContextValues, in order.
JSONContextKeys []string
// JSONContextValues contains the values corresponding to JSONContextKeys, in order.
// Primitive values preserve their JSON type. Boxed nested objects/arrays are preserved as
// decoded map/slice values so the transport layer can encode them as raw JSON.
JSONContextValues []interface{}
}

const (
jsonContextMaxFlattenDepth = 2
jsonContextMaxObjectKeys = 16
)

var boxedSubtreeNames = map[string]struct{}{
"attributes": {},
"attrs": {},
"headers": {},
"labels": {},
"metadata": {},
"tags": {},
}

type jsonContextItem struct {
key string
value interface{}
}

// Common top-level message field names (Layer 0)
// These cover the vast majority of structured logs from popular logging libraries
var topLevelMessageKeys = []string{
Expand Down Expand Up @@ -93,34 +112,119 @@ func PreprocessJSON(content []byte) ExtractionResult {
}
}

// Schema-based encoding: extract sorted keys and leaf values.
schema, values := extractSchemaAndValues(data)
// Schema-based encoding: extract sorted JSON paths and values.
keys, values := extractSchemaAndValues(data)

return ExtractionResult{
IsJSON: true,
Message: message,
MessageKey: extractedPath,
JSONContextSchema: schema,
JSONContextSchema: schemaString(keys),
JSONContextKeys: keys,
JSONContextValues: values,
}
}

// extractSchemaAndValues extracts sorted keys and their corresponding values from a JSON map.
// Primitive values preserve their decoded JSON type. Nested objects and arrays are kept as decoded
// map/slice values so later encoding can preserve them as raw JSON.
func extractSchemaAndValues(data map[string]interface{}) (string, []interface{}) {
// extractSchemaAndValues extracts sorted JSON paths and their corresponding values from a JSON map.
// Stable shallow primitives are flattened into leaf paths. Deep, wide, map-like, or array values
// are boxed as subtree values to avoid one-off schemas for arbitrary nested JSON.
func extractSchemaAndValues(data map[string]interface{}) ([]string, []interface{}) {
keys := make([]string, 0, len(data))
for k := range data {
keys = append(keys, k)
}
sort.Strings(keys)

values := make([]interface{}, len(keys))
for i, k := range keys {
values[i] = normalizeJSONValue(data[k])
items := make([]jsonContextItem, 0, len(keys))
for _, k := range keys {
appendJSONContextItems(&items, escapeJSONPathSegment(k), data[k], 1)
}

contextKeys := make([]string, len(items))
values := make([]interface{}, len(items))
for i, item := range items {
contextKeys[i] = item.key
values[i] = item.value
}

return contextKeys, values
}

func appendJSONContextItems(items *[]jsonContextItem, path string, value interface{}, depth int) {
switch typed := value.(type) {
case map[string]interface{}:
if shouldBoxJSONObject(path, len(typed), depth) {
appendJSONContextSubtree(items, path, typed)
return
}
keys := make([]string, 0, len(typed))
for k := range typed {
keys = append(keys, k)
}
sort.Strings(keys)
for _, key := range keys {
appendJSONContextItems(items, path+"."+escapeJSONPathSegment(key), typed[key], depth+1)
}
case []interface{}:
appendJSONContextSubtree(items, path, typed)
default:
*items = append(*items, jsonContextItem{
key: path,
value: normalizeJSONValue(typed),
})
}
}

func appendJSONContextSubtree(items *[]jsonContextItem, path string, value interface{}) {
*items = append(*items, jsonContextItem{
key: path,
value: value,
})
}

func shouldBoxJSONObject(path string, keyCount int, depth int) bool {
if keyCount == 0 || keyCount > jsonContextMaxObjectKeys || depth >= jsonContextMaxFlattenDepth {
return true
}
_, ok := boxedSubtreeNames[lastPathSegment(path)]
return ok
}

func lastPathSegment(path string) string {
escaped := false
for index := len(path) - 1; index >= 0; index-- {
switch path[index] {
case '\\':
escaped = !escaped
case '.':
if !escaped {
return path[index+1:]
}
escaped = false
default:
escaped = false
}
}
return path
}

func escapeJSONPathSegment(segment string) string {
if !strings.ContainsAny(segment, `.\`) {
return segment
}
var builder strings.Builder
builder.Grow(len(segment) + 1)
for _, char := range segment {
if char == '.' || char == '\\' {
builder.WriteByte('\\')
}
builder.WriteRune(char)
}
return builder.String()
}

return strings.Join(keys, ","), values
func schemaString(keys []string) string {
return strings.Join(keys, ",")
}

// normalizeJSONValue preserves primitive JSON types and keeps nested objects/arrays intact.
Expand Down
79 changes: 68 additions & 11 deletions pkg/logs/patterns/processor/json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,19 +151,19 @@ func TestPreprocessJSON_GenericNestedPaths(t *testing.T) {
name: "data.message",
input: `{"data":{"message":"Nested message","id":123}}`,
expectedMsg: "Nested message",
expectedSchema: "data",
expectedSchema: "data.id",
},
{
name: "event.message",
input: `{"event":{"message":"Event occurred","type":"alert"}}`,
expectedMsg: "Event occurred",
expectedSchema: "event",
expectedSchema: "event.type",
},
{
name: "payload.message",
input: `{"payload":{"message":"Payload data","size":1024}}`,
expectedMsg: "Payload data",
expectedSchema: "payload",
expectedSchema: "payload.size",
},
}

Expand Down Expand Up @@ -231,10 +231,40 @@ func TestPreprocessJSON_NestedObjectsAsValues(t *testing.T) {
result := PreprocessJSON([]byte(input))
require.True(t, result.IsJSON)

assert.Equal(t, "array,nested", result.JSONContextSchema)
require.Len(t, result.JSONContextValues, 2)
assert.Equal(t, "array,nested.apple,nested.zebra", result.JSONContextSchema)
require.Len(t, result.JSONContextValues, 3)
assert.Equal(t, []interface{}{json.Number("1"), json.Number("2"), json.Number("3")}, result.JSONContextValues[0])
assert.Equal(t, map[string]interface{}{"apple": "a", "zebra": "z"}, result.JSONContextValues[1])
assert.Equal(t, "a", result.JSONContextValues[1])
assert.Equal(t, "z", result.JSONContextValues[2])
assert.Equal(t, []string{"array", "nested.apple", "nested.zebra"}, result.JSONContextKeys)
}

func TestPreprocessJSON_BoxesDeepAndMapLikeSubtrees(t *testing.T) {
input := `{
"message": "test",
"http": {
"method": "GET",
"status_code": 200,
"request": {
"headers": {
"x-request-id": "abc"
}
}
},
"labels": {
"customer_123": "enabled"
}
}`

result := PreprocessJSON([]byte(input))
require.True(t, result.IsJSON)

assert.Equal(t, "http.method,http.request,http.status_code,labels", result.JSONContextSchema)
assert.Equal(t, []string{"http.method", "http.request", "http.status_code", "labels"}, result.JSONContextKeys)
assert.Equal(t, "GET", result.JSONContextValues[0])
assert.Equal(t, map[string]interface{}{"headers": map[string]interface{}{"x-request-id": "abc"}}, result.JSONContextValues[1])
assert.Equal(t, json.Number("200"), result.JSONContextValues[2])
assert.Equal(t, map[string]interface{}{"customer_123": "enabled"}, result.JSONContextValues[3])
}

func TestPreprocessJSON_EmptyContextAfterExtraction(t *testing.T) {
Expand Down Expand Up @@ -295,6 +325,33 @@ func TestPreprocessJSON_PreservesLargeIntegerAsJSONNumber(t *testing.T) {
assert.Equal(t, "2323969980879066318", number.String())
}

func TestPreprocessJSON_EscapesDottedPathSegments(t *testing.T) {
tests := []struct {
name string
input string
expectedSchema string
}{
{
name: "top-level dotted key",
input: `{"message":"test","dd.span_id":123}`,
expectedSchema: `dd\.span_id`,
},
{
name: "nested key",
input: `{"message":"test","dd":{"span_id":123}}`,
expectedSchema: `dd.span_id`,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := PreprocessJSON([]byte(tt.input))
require.True(t, result.IsJSON)
assert.Equal(t, tt.expectedSchema, result.JSONContextSchema)
})
}
}

func TestGetValueByPath(t *testing.T) {
data := map[string]interface{}{
"top": "value",
Expand Down Expand Up @@ -375,8 +432,8 @@ func TestExtractSchemaAndValues(t *testing.T) {
"m": "middle",
}

schema, values := extractSchemaAndValues(data)
assert.Equal(t, "a,m,z", schema)
keys, values := extractSchemaAndValues(data)
assert.Equal(t, []string{"a", "m", "z"}, keys)
assert.Equal(t, []interface{}{"first", "middle", "last"}, values)
}

Expand All @@ -389,11 +446,11 @@ func TestExtractSchemaAndValues_MixedTypes(t *testing.T) {
"empty": nil,
}

schema, values := extractSchemaAndValues(data)
assert.Equal(t, "count,empty,flag,name,nested", schema)
keys, values := extractSchemaAndValues(data)
assert.Equal(t, []string{"count", "empty", "flag", "name", "nested.key"}, keys)
assert.Equal(
t,
[]interface{}{json.Number("42"), nil, true, "test", map[string]interface{}{"key": "val"}},
[]interface{}{json.Number("42"), nil, true, "test", "val"},
values,
)
}
Expand Down
Loading
Loading