Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 30 additions & 12 deletions pkg/logs/patterns/clustering/pattern.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,7 @@ func (p *Pattern) GetPatternString() string {
if tok.Wildcard == token.IsWildcard {
continue
}
if sanitizeForTemplateInto(&builder, tok.Value) == 0 {
continue
}
sanitizeForTemplateInto(&builder, tok.Value)
}

return builder.String()
Expand Down Expand Up @@ -158,11 +156,11 @@ func (p *Pattern) GetWildcardCharPositions() []int {
charPositions = append(charPositions, currentPos)
// Wildcard tokens are NOT in the template, so don't advance currentPos
} else {
cleanedLen := sanitizeForTemplateLen(tok.Value)
if cleanedLen > 0 {
// Add the length of the cleaned token value
currentPos += cleanedLen
}
// Use rune count (not byte count) so positions match Java String indices.
// Java String.length() returns UTF-16 code units; for BMP characters
// (U+0000–U+FFFF, which covers all common log content including →, ≥, etc.)
// this equals Unicode codepoint count.
currentPos += sanitizeForTemplateRuneLen(tok.Value)
}
}

Expand Down Expand Up @@ -199,18 +197,38 @@ func sanitizeForTemplate(s string) string {
return builder.String()
}

// sanitizeForTemplateLen returns the length of the sanitized string without allocating.
// sanitizeForTemplateLen returns the byte length of the sanitized string without allocating.
// Used for memory estimation (EstimatedBytes). For wire-protocol positions use sanitizeForTemplateRuneLen.
func sanitizeForTemplateLen(s string) int {
return sanitizeForTemplateInto(nil, s)
}

// sanitizeForTemplateRuneLen returns the Unicode codepoint count of the sanitized string.
// Used in GetWildcardCharPositions so positions match Java String.length() (UTF-16 code units).
// For BMP characters (U+0000–U+FFFF, the vast majority of log content) codepoint count
// equals UTF-16 code unit count. Supplementary-plane characters (emoji etc.) are uncommon
// in log templates and would still be off by the surrogate count — an acceptable tradeoff.
func sanitizeForTemplateRuneLen(s string) int {
count := 0
for _, r := range s {
if (r >= ' ' && r != 0x7F) || r == '\t' || r == '\n' || r == '\r' {
if r != utf8.RuneError && r < 0xFFFD {
count++
}
}
}
return count
}

// sanitizeForTemplateInto appends the sanitized string into builder when non-nil.
// Uses an ASCII fast path: bytes < 0x80 are checked directly without rune decoding.
// Preserved: printable ASCII (0x20–0x7E), horizontal tab (0x09), newline (0x0A), carriage return (0x0D).
// Stripped: other control characters (0x00–0x08, 0x0B–0x0C, 0x0E–0x1F), DEL (0x7F).
func sanitizeForTemplateInto(builder *strings.Builder, s string) int {
for i := 0; i < len(s); i++ {
b := s[i]
if b < utf8.RuneSelf {
if b >= ' ' && b != 0x7F {
if (b >= ' ' && b != 0x7F) || b == '\t' || b == '\n' || b == '\r' {
continue
}
// ASCII control character found — flush clean prefix then filter the rest
Expand All @@ -222,7 +240,7 @@ func sanitizeForTemplateInto(builder *strings.Builder, s string) int {
for i < len(s) {
b = s[i]
if b < utf8.RuneSelf {
if b >= ' ' && b != 0x7F {
if (b >= ' ' && b != 0x7F) || b == '\t' || b == '\n' || b == '\r' {
if builder != nil {
builder.WriteByte(b)
}
Expand Down Expand Up @@ -253,7 +271,7 @@ func sanitizeForTemplateInto(builder *strings.Builder, s string) int {
for i < len(s) {
b = s[i]
if b < utf8.RuneSelf {
if b >= ' ' && b != 0x7F {
if (b >= ' ' && b != 0x7F) || b == '\t' || b == '\n' || b == '\r' {
if builder != nil {
builder.WriteByte(b)
}
Expand Down
192 changes: 192 additions & 0 deletions pkg/logs/patterns/clustering/pattern_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,3 +430,195 @@ func TestPattern_IntegrationScenario(t *testing.T) {
values := pattern.GetWildcardValues(log2)
assert.Equal(t, []string{"Network", "timeout", "reached"}, values)
}

// --- sanitizeForTemplate tab tests ---

func TestSanitizeForTemplate_TabPreserved(t *testing.T) {
// Tabs appear in journald messages and must be preserved
assert.Equal(t, "Worker\ttask\t123", sanitizeForTemplate("Worker\ttask\t123"))
}

func TestSanitizeForTemplate_TabOnlyToken(t *testing.T) {
assert.Equal(t, "\t", sanitizeForTemplate("\t"))
}

func TestSanitizeForTemplate_MixedTabAndControl(t *testing.T) {
// Tab preserved, null byte stripped
assert.Equal(t, "key\tvalue", sanitizeForTemplate("key\t\x00value"))
}

func TestSanitizeForTemplate_NewlinePreserved(t *testing.T) {
// \n in message fields (e.g. xDS ADS pretty-printed request bodies) must survive.
// Server confirmed it can handle \n in template strings.
input := "ADS request sent: {\n \"versionInfo\": \"9\"\n}"
assert.Equal(t, input, sanitizeForTemplate(input))
}

func TestSanitizeForTemplate_CarriageReturnPreserved(t *testing.T) {
// \r seen in curl output: "\r100 396 100 396"
assert.Equal(t, "\r100 396", sanitizeForTemplate("\r100 396"))
}

func TestSanitizeForTemplate_NullStillStripped(t *testing.T) {
// NUL (0x00) must still be stripped — panics C.CString in Rust tokenizer
assert.Equal(t, "nonulhere", sanitizeForTemplate("no\x00nul\x00here"))
}

// --- GetPatternString trailing whitespace tests ---

func TestGetPatternString_TrailingWhitespacePreserved(t *testing.T) {
// Template: "config: " [wildcard] — trailing space before wildcard must survive
tl := token.NewTokenList()
tl.Add(token.NewToken(token.TokenWord, "config:", token.NotWildcard))
tl.Add(token.NewToken(token.TokenWhitespace, " ", token.NotWildcard))
tl.Add(token.NewToken(token.TokenWord, "abc123", token.IsWildcard))
p := newPattern(tl, 1)
p.Template = tl
p.Positions = []int{2}
assert.Equal(t, "config: ", p.GetPatternString())
}

func TestGetWildcardCharPositions_UnicodeArrow(t *testing.T) {
// "→" is 3 UTF-8 bytes but 1 rune/Java char. Positions after it must use rune count.
// Template: "state: " [wild1] " → " [wild2]
// "state: " = 7 runes → wild1 at 7
// " → " = 3 runes (space + arrow + space) → wild2 at 10
tl := token.NewTokenList()
tl.Add(token.NewToken(token.TokenWord, "state:", token.NotWildcard))
tl.Add(token.NewToken(token.TokenWhitespace, " ", token.NotWildcard))
tl.Add(token.NewToken(token.TokenWord, "open", token.IsWildcard))
tl.Add(token.NewToken(token.TokenWhitespace, " → ", token.NotWildcard))
tl.Add(token.NewToken(token.TokenWord, "closed", token.IsWildcard))
p := newPattern(tl, 1)
p.Template = tl
p.Positions = []int{2, 4}
positions := p.GetWildcardCharPositions()
assert.Equal(t, []int{7, 10}, positions)
}

func TestGetPatternString_MultipleSpacesPreserved(t *testing.T) {
// "err=< rpc error" — double-space whitespace token must survive intact
tl := token.NewTokenList()
tl.Add(token.NewToken(token.TokenWord, "err=<", token.NotWildcard))
tl.Add(token.NewToken(token.TokenWhitespace, " ", token.NotWildcard))
tl.Add(token.NewToken(token.TokenWord, "rpc", token.IsWildcard))
p := newPattern(tl, 1)
p.Template = tl
p.Positions = []int{2}
assert.Equal(t, "err=< ", p.GetPatternString())
}

// --- sanitizeForTemplateRuneLen direct tests ---

func TestSanitizeForTemplateRuneLen_ASCII(t *testing.T) {
// Pure ASCII: rune count == byte count
assert.Equal(t, 11, sanitizeForTemplateRuneLen("hello world"))
assert.Equal(t, 0, sanitizeForTemplateRuneLen(""))
}

func TestSanitizeForTemplateRuneLen_MultiByteBMP(t *testing.T) {
// BMP characters: each counts as 1 (matches Java String.length())
// → is U+2192, 3 UTF-8 bytes but 1 Java char
assert.Equal(t, 1, sanitizeForTemplateRuneLen("→"))
// "state: → " = 9 runes (7 ASCII + arrow + space)
assert.Equal(t, 9, sanitizeForTemplateRuneLen("state: → "))
}

func TestSanitizeForTemplateRuneLen_TabNewlinePreserved(t *testing.T) {
// \t, \n, \r each count as 1
assert.Equal(t, 3, sanitizeForTemplateRuneLen("\t\n\r"))
}

func TestSanitizeForTemplateRuneLen_NullStripped(t *testing.T) {
// NUL is stripped → only the 3 non-NUL chars count
assert.Equal(t, 3, sanitizeForTemplateRuneLen("a\x00b\x00c"))
}

// --- GetWildcardCharPositions additional tests ---

func TestGetWildcardCharPositions_PureASCII_Unchanged(t *testing.T) {
// For ASCII-only templates byte count == rune count, so positions must be identical.
tl := token.NewTokenList()
tl.Add(token.NewToken(token.TokenWord, "err=<", token.NotWildcard))
tl.Add(token.NewToken(token.TokenWhitespace, " ", token.NotWildcard))
tl.Add(token.NewToken(token.TokenWord, "rpc", token.IsWildcard))
tl.Add(token.NewToken(token.TokenWhitespace, " ", token.NotWildcard))
tl.Add(token.NewToken(token.TokenWord, "failed", token.IsWildcard))
p := newPattern(tl, 1)
p.Template = tl
p.Positions = []int{2, 4}
positions := p.GetWildcardCharPositions()
// "err=< " = 6 chars → wild1 at 6; " " = 1 char → wild2 at 7
assert.Equal(t, []int{6, 7}, positions)
}

// --- GetPatternString newline/CR tests ---

func TestGetPatternString_NewlinePreserved(t *testing.T) {
// \n inside a token value must survive in the template string
tl := token.NewTokenList()
tl.Add(token.NewToken(token.TokenWord, "hint_type: DELETE\nlimit: 5000\n", token.NotWildcard))
tl.Add(token.NewToken(token.TokenWord, "query", token.IsWildcard))
p := newPattern(tl, 1)
p.Template = tl
p.Positions = []int{1}
assert.Equal(t, "hint_type: DELETE\nlimit: 5000\n", p.GetPatternString())
}

// --- Production mismatch regression tests (from staging flink-intakeshadow-metrics) ---

func TestSanitizeForTemplate_StagingMismatch_CtrProgress(t *testing.T) {
// Real mismatch observed in staging:
// HTTP: "Importing\telapsed: 0.4 s\ttotal: 0.0 B\t(0.0 B/s)"
// gRPC: "Importingelapsed: 0.4 stotal: 0.0 B(0.0 B/s)" ← tabs stripped
// The message field of a journald/ctr log uses \t as field separator.
input := "Importing\telapsed: 0.4 s\ttotal: 0.0 B\t(0.0 B/s)"
assert.Equal(t, input, sanitizeForTemplate(input),
"tabs in ctr progress output must be preserved in template")
}

func TestSanitizeForTemplate_StagingMismatch_EcrPull(t *testing.T) {
// Real mismatch observed in staging:
// HTTP: "486234852809.dkr.ecr.us east 1.amazonaws\tsaved"
// gRPC: "486234852809.dkr.ecr.us east 1.amazonawssaved" ← tab stripped
input := "486234852809.dkr.ecr.us east 1.amazonaws\tsaved"
assert.Equal(t, input, sanitizeForTemplate(input),
"tab separator in ECR pull log must be preserved")
}

func TestSanitizeForTemplate_StagingMismatch_DnsRecord(t *testing.T) {
// Real mismatch observed in staging:
// HTTP: "vault.us1.staging.dog.\t29\tIN\tA\t10.128.150.56"
// gRPC: "vault.us1.staging.dog.29INA10.128.150.56" ← all tabs stripped
input := "vault.us1.staging.dog.\t29\tIN\tA\t10.128.150.56"
assert.Equal(t, input, sanitizeForTemplate(input),
"tab-separated DNS record fields must be preserved")
}

func TestSanitizeForTemplate_StagingMismatch_NewlineInMessage(t *testing.T) {
// Real mismatch (2026-04-22 staging):
// HTTP: "Listing hints...: hint_type: DELETE\nlimit: 5000\nmasked_only: true\n"
// gRPC: "Listing hints...: hint_type: DELETElimit: 5000masked_only: true"
// \n between proto fields was stripped — they all run together.
input := "Listing hints for cell temporal-8a67: hint_type: DELETE\nlimit: 5000\nmasked_only: true\n"
assert.Equal(t, input, sanitizeForTemplate(input),
"\\n separating proto fields in message must be preserved")
}

func TestGetPatternString_StagingMismatch_TrailingSpace(t *testing.T) {
// Real mismatch (2026-04-22 staging):
// HTTP message: "Checking error " (trailing space)
// gRPC message: "Checking error" (trailing space dropped)
// The trailing whitespace token before the wildcard must survive in the template.
tl := token.NewTokenList()
tl.Add(token.NewToken(token.TokenWord, "Checking", token.NotWildcard))
tl.Add(token.NewToken(token.TokenWhitespace, " ", token.NotWildcard))
tl.Add(token.NewToken(token.TokenWord, "error", token.NotWildcard))
tl.Add(token.NewToken(token.TokenWhitespace, " ", token.NotWildcard)) // trailing space
tl.Add(token.NewToken(token.TokenWord, "detail", token.IsWildcard))
p := newPattern(tl, 1)
p.Template = tl
p.Positions = []int{4}
assert.Equal(t, "Checking error ", p.GetPatternString(),
"trailing space whitespace token must appear in template")
}
38 changes: 17 additions & 21 deletions pkg/logs/patterns/preprocessor/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ type ExtractionResult struct {
// Example: for {"level":"info","pid":1234,"service":"api"}, schema is "level,pid,service".
JSONContextSchema string
// JSONContextValues contains the leaf values corresponding to JSONContextSchema keys, in order.
// Nested objects/arrays are serialized as JSON strings.
JSONContextValues []string
// Primitive values preserve their decoded JSON type. Nested objects/arrays are kept as decoded
// map/slice values so the transport layer can encode them as raw JSON.
JSONContextValues []interface{}
}

// Common top-level message field names (Layer 0)
Expand Down Expand Up @@ -108,44 +109,39 @@ func PreprocessJSON(content []byte) ExtractionResult {
}

// extractSchemaAndValues extracts sorted keys and their corresponding values from a JSON map.
// Primitive values (string, number, bool, null) are converted to strings.
// Nested objects and arrays are serialized as JSON strings.
func extractSchemaAndValues(data map[string]interface{}) (string, []string) {
// Primitive values preserve their decoded JSON type. Nested objects and arrays are kept as decoded
// map/slice values so the transport layer can encode them as raw JSON.
func extractSchemaAndValues(data map[string]interface{}) (string, []interface{}) {
keys := make([]string, 0, len(data))
for k := range data {
keys = append(keys, k)
}
sort.Strings(keys)

values := make([]string, len(keys))
values := make([]interface{}, len(keys))
for i, k := range keys {
values[i] = valueToString(data[k])
values[i] = normalizeJSONValue(data[k])
}

return strings.Join(keys, ","), values
}

// valueToString converts a JSON value to its string representation.
// Primitives are converted directly; objects and arrays are serialized as JSON.
func valueToString(v interface{}) string {
// normalizeJSONValue preserves primitive JSON types and keeps nested objects/arrays intact
// so the transport layer can encode them correctly (numbers as numbers, not strings).
func normalizeJSONValue(v interface{}) interface{} {
switch val := v.(type) {
case string:
return val
case json.Number:
return val.String()
return val
case float64:
return val
case bool:
if val {
return "true"
}
return "false"
return val
case nil:
return ""
return nil
default:
b, err := jsonAPI.Marshal(val)
if err != nil {
return ""
}
return string(b)
return val
}
}

Expand Down
Loading
Loading