diff --git a/pkg/logs/patterns/clustering/pattern.go b/pkg/logs/patterns/clustering/pattern.go index 895bb4f191a4..c48dec025c81 100644 --- a/pkg/logs/patterns/clustering/pattern.go +++ b/pkg/logs/patterns/clustering/pattern.go @@ -121,9 +121,7 @@ func (p *Pattern) GetPatternString() string { if tok.Wildcard == token.IsWildcard { continue } - if sanitizeForTemplateInto(&builder, tok.Value) == 0 { - continue - } + sanitizeForTemplateInto(&builder, tok.Value) } return builder.String() @@ -158,11 +156,11 @@ func (p *Pattern) GetWildcardCharPositions() []int { charPositions = append(charPositions, currentPos) // Wildcard tokens are NOT in the template, so don't advance currentPos } else { - cleanedLen := sanitizeForTemplateLen(tok.Value) - if cleanedLen > 0 { - // Add the length of the cleaned token value - currentPos += cleanedLen - } + // Use rune count (not byte count) so positions match Java String indices. + // Java String.length() returns UTF-16 code units; for BMP characters + // (U+0000–U+FFFF, which covers all common log content including →, ≥, etc.) + // this equals Unicode codepoint count. + currentPos += sanitizeForTemplateRuneLen(tok.Value) } } @@ -199,18 +197,38 @@ func sanitizeForTemplate(s string) string { return builder.String() } -// sanitizeForTemplateLen returns the length of the sanitized string without allocating. +// sanitizeForTemplateLen returns the byte length of the sanitized string without allocating. +// Used for memory estimation (EstimatedBytes). For wire-protocol positions use sanitizeForTemplateRuneLen. func sanitizeForTemplateLen(s string) int { return sanitizeForTemplateInto(nil, s) } +// sanitizeForTemplateRuneLen returns the Unicode codepoint count of the sanitized string. +// Used in GetWildcardCharPositions so positions match Java String.length() (UTF-16 code units). +// For BMP characters (U+0000–U+FFFF, the vast majority of log content) codepoint count +// equals UTF-16 code unit count. Supplementary-plane characters (emoji etc.) are uncommon +// in log templates and would still be off by the surrogate count — an acceptable tradeoff. +func sanitizeForTemplateRuneLen(s string) int { + count := 0 + for _, r := range s { + if (r >= ' ' && r != 0x7F) || r == '\t' || r == '\n' || r == '\r' { + if r != utf8.RuneError && r < 0xFFFD { + count++ + } + } + } + return count +} + // sanitizeForTemplateInto appends the sanitized string into builder when non-nil. // Uses an ASCII fast path: bytes < 0x80 are checked directly without rune decoding. +// Preserved: printable ASCII (0x20–0x7E), horizontal tab (0x09), newline (0x0A), carriage return (0x0D). +// Stripped: other control characters (0x00–0x08, 0x0B–0x0C, 0x0E–0x1F), DEL (0x7F). func sanitizeForTemplateInto(builder *strings.Builder, s string) int { for i := 0; i < len(s); i++ { b := s[i] if b < utf8.RuneSelf { - if b >= ' ' && b != 0x7F { + if (b >= ' ' && b != 0x7F) || b == '\t' || b == '\n' || b == '\r' { continue } // ASCII control character found — flush clean prefix then filter the rest @@ -222,7 +240,7 @@ func sanitizeForTemplateInto(builder *strings.Builder, s string) int { for i < len(s) { b = s[i] if b < utf8.RuneSelf { - if b >= ' ' && b != 0x7F { + if (b >= ' ' && b != 0x7F) || b == '\t' || b == '\n' || b == '\r' { if builder != nil { builder.WriteByte(b) } @@ -253,7 +271,7 @@ func sanitizeForTemplateInto(builder *strings.Builder, s string) int { for i < len(s) { b = s[i] if b < utf8.RuneSelf { - if b >= ' ' && b != 0x7F { + if (b >= ' ' && b != 0x7F) || b == '\t' || b == '\n' || b == '\r' { if builder != nil { builder.WriteByte(b) } diff --git a/pkg/logs/patterns/clustering/pattern_test.go b/pkg/logs/patterns/clustering/pattern_test.go index 97d723c1db0d..d43498aefe0d 100644 --- a/pkg/logs/patterns/clustering/pattern_test.go +++ b/pkg/logs/patterns/clustering/pattern_test.go @@ -430,3 +430,195 @@ func TestPattern_IntegrationScenario(t *testing.T) { values := pattern.GetWildcardValues(log2) assert.Equal(t, []string{"Network", "timeout", "reached"}, values) } + +// --- sanitizeForTemplate tab tests --- + +func TestSanitizeForTemplate_TabPreserved(t *testing.T) { + // Tabs appear in journald messages and must be preserved + assert.Equal(t, "Worker\ttask\t123", sanitizeForTemplate("Worker\ttask\t123")) +} + +func TestSanitizeForTemplate_TabOnlyToken(t *testing.T) { + assert.Equal(t, "\t", sanitizeForTemplate("\t")) +} + +func TestSanitizeForTemplate_MixedTabAndControl(t *testing.T) { + // Tab preserved, null byte stripped + assert.Equal(t, "key\tvalue", sanitizeForTemplate("key\t\x00value")) +} + +func TestSanitizeForTemplate_NewlinePreserved(t *testing.T) { + // \n in message fields (e.g. xDS ADS pretty-printed request bodies) must survive. + // Server confirmed it can handle \n in template strings. + input := "ADS request sent: {\n \"versionInfo\": \"9\"\n}" + assert.Equal(t, input, sanitizeForTemplate(input)) +} + +func TestSanitizeForTemplate_CarriageReturnPreserved(t *testing.T) { + // \r seen in curl output: "\r100 396 100 396" + assert.Equal(t, "\r100 396", sanitizeForTemplate("\r100 396")) +} + +func TestSanitizeForTemplate_NullStillStripped(t *testing.T) { + // NUL (0x00) must still be stripped — panics C.CString in Rust tokenizer + assert.Equal(t, "nonulhere", sanitizeForTemplate("no\x00nul\x00here")) +} + +// --- GetPatternString trailing whitespace tests --- + +func TestGetPatternString_TrailingWhitespacePreserved(t *testing.T) { + // Template: "config: " [wildcard] — trailing space before wildcard must survive + tl := token.NewTokenList() + tl.Add(token.NewToken(token.TokenWord, "config:", token.NotWildcard)) + tl.Add(token.NewToken(token.TokenWhitespace, " ", token.NotWildcard)) + tl.Add(token.NewToken(token.TokenWord, "abc123", token.IsWildcard)) + p := newPattern(tl, 1) + p.Template = tl + p.Positions = []int{2} + assert.Equal(t, "config: ", p.GetPatternString()) +} + +func TestGetWildcardCharPositions_UnicodeArrow(t *testing.T) { + // "→" is 3 UTF-8 bytes but 1 rune/Java char. Positions after it must use rune count. + // Template: "state: " [wild1] " → " [wild2] + // "state: " = 7 runes → wild1 at 7 + // " → " = 3 runes (space + arrow + space) → wild2 at 10 + tl := token.NewTokenList() + tl.Add(token.NewToken(token.TokenWord, "state:", token.NotWildcard)) + tl.Add(token.NewToken(token.TokenWhitespace, " ", token.NotWildcard)) + tl.Add(token.NewToken(token.TokenWord, "open", token.IsWildcard)) + tl.Add(token.NewToken(token.TokenWhitespace, " → ", token.NotWildcard)) + tl.Add(token.NewToken(token.TokenWord, "closed", token.IsWildcard)) + p := newPattern(tl, 1) + p.Template = tl + p.Positions = []int{2, 4} + positions := p.GetWildcardCharPositions() + assert.Equal(t, []int{7, 10}, positions) +} + +func TestGetPatternString_MultipleSpacesPreserved(t *testing.T) { + // "err=< rpc error" — double-space whitespace token must survive intact + tl := token.NewTokenList() + tl.Add(token.NewToken(token.TokenWord, "err=<", token.NotWildcard)) + tl.Add(token.NewToken(token.TokenWhitespace, " ", token.NotWildcard)) + tl.Add(token.NewToken(token.TokenWord, "rpc", token.IsWildcard)) + p := newPattern(tl, 1) + p.Template = tl + p.Positions = []int{2} + assert.Equal(t, "err=< ", p.GetPatternString()) +} + +// --- sanitizeForTemplateRuneLen direct tests --- + +func TestSanitizeForTemplateRuneLen_ASCII(t *testing.T) { + // Pure ASCII: rune count == byte count + assert.Equal(t, 11, sanitizeForTemplateRuneLen("hello world")) + assert.Equal(t, 0, sanitizeForTemplateRuneLen("")) +} + +func TestSanitizeForTemplateRuneLen_MultiByteBMP(t *testing.T) { + // BMP characters: each counts as 1 (matches Java String.length()) + // → is U+2192, 3 UTF-8 bytes but 1 Java char + assert.Equal(t, 1, sanitizeForTemplateRuneLen("→")) + // "state: → " = 9 runes (7 ASCII + arrow + space) + assert.Equal(t, 9, sanitizeForTemplateRuneLen("state: → ")) +} + +func TestSanitizeForTemplateRuneLen_TabNewlinePreserved(t *testing.T) { + // \t, \n, \r each count as 1 + assert.Equal(t, 3, sanitizeForTemplateRuneLen("\t\n\r")) +} + +func TestSanitizeForTemplateRuneLen_NullStripped(t *testing.T) { + // NUL is stripped → only the 3 non-NUL chars count + assert.Equal(t, 3, sanitizeForTemplateRuneLen("a\x00b\x00c")) +} + +// --- GetWildcardCharPositions additional tests --- + +func TestGetWildcardCharPositions_PureASCII_Unchanged(t *testing.T) { + // For ASCII-only templates byte count == rune count, so positions must be identical. + tl := token.NewTokenList() + tl.Add(token.NewToken(token.TokenWord, "err=<", token.NotWildcard)) + tl.Add(token.NewToken(token.TokenWhitespace, " ", token.NotWildcard)) + tl.Add(token.NewToken(token.TokenWord, "rpc", token.IsWildcard)) + tl.Add(token.NewToken(token.TokenWhitespace, " ", token.NotWildcard)) + tl.Add(token.NewToken(token.TokenWord, "failed", token.IsWildcard)) + p := newPattern(tl, 1) + p.Template = tl + p.Positions = []int{2, 4} + positions := p.GetWildcardCharPositions() + // "err=< " = 6 chars → wild1 at 6; " " = 1 char → wild2 at 7 + assert.Equal(t, []int{6, 7}, positions) +} + +// --- GetPatternString newline/CR tests --- + +func TestGetPatternString_NewlinePreserved(t *testing.T) { + // \n inside a token value must survive in the template string + tl := token.NewTokenList() + tl.Add(token.NewToken(token.TokenWord, "hint_type: DELETE\nlimit: 5000\n", token.NotWildcard)) + tl.Add(token.NewToken(token.TokenWord, "query", token.IsWildcard)) + p := newPattern(tl, 1) + p.Template = tl + p.Positions = []int{1} + assert.Equal(t, "hint_type: DELETE\nlimit: 5000\n", p.GetPatternString()) +} + +// --- Production mismatch regression tests (from staging flink-intakeshadow-metrics) --- + +func TestSanitizeForTemplate_StagingMismatch_CtrProgress(t *testing.T) { + // Real mismatch observed in staging: + // HTTP: "Importing\telapsed: 0.4 s\ttotal: 0.0 B\t(0.0 B/s)" + // gRPC: "Importingelapsed: 0.4 stotal: 0.0 B(0.0 B/s)" ← tabs stripped + // The message field of a journald/ctr log uses \t as field separator. + input := "Importing\telapsed: 0.4 s\ttotal: 0.0 B\t(0.0 B/s)" + assert.Equal(t, input, sanitizeForTemplate(input), + "tabs in ctr progress output must be preserved in template") +} + +func TestSanitizeForTemplate_StagingMismatch_EcrPull(t *testing.T) { + // Real mismatch observed in staging: + // HTTP: "486234852809.dkr.ecr.us east 1.amazonaws\tsaved" + // gRPC: "486234852809.dkr.ecr.us east 1.amazonawssaved" ← tab stripped + input := "486234852809.dkr.ecr.us east 1.amazonaws\tsaved" + assert.Equal(t, input, sanitizeForTemplate(input), + "tab separator in ECR pull log must be preserved") +} + +func TestSanitizeForTemplate_StagingMismatch_DnsRecord(t *testing.T) { + // Real mismatch observed in staging: + // HTTP: "vault.us1.staging.dog.\t29\tIN\tA\t10.128.150.56" + // gRPC: "vault.us1.staging.dog.29INA10.128.150.56" ← all tabs stripped + input := "vault.us1.staging.dog.\t29\tIN\tA\t10.128.150.56" + assert.Equal(t, input, sanitizeForTemplate(input), + "tab-separated DNS record fields must be preserved") +} + +func TestSanitizeForTemplate_StagingMismatch_NewlineInMessage(t *testing.T) { + // Real mismatch (2026-04-22 staging): + // HTTP: "Listing hints...: hint_type: DELETE\nlimit: 5000\nmasked_only: true\n" + // gRPC: "Listing hints...: hint_type: DELETElimit: 5000masked_only: true" + // \n between proto fields was stripped — they all run together. + input := "Listing hints for cell temporal-8a67: hint_type: DELETE\nlimit: 5000\nmasked_only: true\n" + assert.Equal(t, input, sanitizeForTemplate(input), + "\\n separating proto fields in message must be preserved") +} + +func TestGetPatternString_StagingMismatch_TrailingSpace(t *testing.T) { + // Real mismatch (2026-04-22 staging): + // HTTP message: "Checking error " (trailing space) + // gRPC message: "Checking error" (trailing space dropped) + // The trailing whitespace token before the wildcard must survive in the template. + tl := token.NewTokenList() + tl.Add(token.NewToken(token.TokenWord, "Checking", token.NotWildcard)) + tl.Add(token.NewToken(token.TokenWhitespace, " ", token.NotWildcard)) + tl.Add(token.NewToken(token.TokenWord, "error", token.NotWildcard)) + tl.Add(token.NewToken(token.TokenWhitespace, " ", token.NotWildcard)) // trailing space + tl.Add(token.NewToken(token.TokenWord, "detail", token.IsWildcard)) + p := newPattern(tl, 1) + p.Template = tl + p.Positions = []int{4} + assert.Equal(t, "Checking error ", p.GetPatternString(), + "trailing space whitespace token must appear in template") +} diff --git a/pkg/logs/patterns/preprocessor/json.go b/pkg/logs/patterns/preprocessor/json.go index 9facb7cb5ced..d6aac5bfa4e9 100644 --- a/pkg/logs/patterns/preprocessor/json.go +++ b/pkg/logs/patterns/preprocessor/json.go @@ -40,8 +40,9 @@ type ExtractionResult struct { // Example: for {"level":"info","pid":1234,"service":"api"}, schema is "level,pid,service". JSONContextSchema string // JSONContextValues contains the leaf values corresponding to JSONContextSchema keys, in order. - // Nested objects/arrays are serialized as JSON strings. - JSONContextValues []string + // Primitive values preserve their decoded JSON type. Nested objects/arrays are kept as decoded + // map/slice values so the transport layer can encode them as raw JSON. + JSONContextValues []interface{} } // Common top-level message field names (Layer 0) @@ -108,44 +109,39 @@ func PreprocessJSON(content []byte) ExtractionResult { } // extractSchemaAndValues extracts sorted keys and their corresponding values from a JSON map. -// Primitive values (string, number, bool, null) are converted to strings. -// Nested objects and arrays are serialized as JSON strings. -func extractSchemaAndValues(data map[string]interface{}) (string, []string) { +// Primitive values preserve their decoded JSON type. Nested objects and arrays are kept as decoded +// map/slice values so the transport layer can encode them as raw JSON. +func extractSchemaAndValues(data map[string]interface{}) (string, []interface{}) { keys := make([]string, 0, len(data)) for k := range data { keys = append(keys, k) } sort.Strings(keys) - values := make([]string, len(keys)) + values := make([]interface{}, len(keys)) for i, k := range keys { - values[i] = valueToString(data[k]) + values[i] = normalizeJSONValue(data[k]) } return strings.Join(keys, ","), values } -// valueToString converts a JSON value to its string representation. -// Primitives are converted directly; objects and arrays are serialized as JSON. -func valueToString(v interface{}) string { +// normalizeJSONValue preserves primitive JSON types and keeps nested objects/arrays intact +// so the transport layer can encode them correctly (numbers as numbers, not strings). +func normalizeJSONValue(v interface{}) interface{} { switch val := v.(type) { case string: return val case json.Number: - return val.String() + return val + case float64: + return val case bool: - if val { - return "true" - } - return "false" + return val case nil: - return "" + return nil default: - b, err := jsonAPI.Marshal(val) - if err != nil { - return "" - } - return string(b) + return val } } diff --git a/pkg/logs/patterns/preprocessor/json_test.go b/pkg/logs/patterns/preprocessor/json_test.go new file mode 100644 index 000000000000..0b27579cdfc9 --- /dev/null +++ b/pkg/logs/patterns/preprocessor/json_test.go @@ -0,0 +1,144 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +package preprocessor + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestPreprocessJSON_NotJSON(t *testing.T) { + result := PreprocessJSON([]byte("plain text log")) + assert.False(t, result.IsJSON) + assert.Empty(t, result.Message) +} + +func TestPreprocessJSON_NoMessageField(t *testing.T) { + result := PreprocessJSON([]byte(`{"level":"info","service":"api"}`)) + assert.False(t, result.IsJSON) +} + +func TestPreprocessJSON_MessageOnly(t *testing.T) { + result := PreprocessJSON([]byte(`{"message":"hello"}`)) + assert.True(t, result.IsJSON) + assert.Equal(t, "hello", result.Message) + assert.Equal(t, "message", result.MessageKey) + assert.Empty(t, result.JSONContextSchema) + assert.Empty(t, result.JSONContextValues) +} + +func TestPreprocessJSON_MessageWithContext(t *testing.T) { + result := PreprocessJSON([]byte(`{"message":"Processing","level":"info","pid":1234}`)) + require.True(t, result.IsJSON) + assert.Equal(t, "Processing", result.Message) + assert.Equal(t, "level,pid", result.JSONContextSchema) + require.Len(t, result.JSONContextValues, 2) + assert.Equal(t, "info", result.JSONContextValues[0]) + // pid must be json.Number (not float64 or string) to preserve integer precision. + // Failing here means UseNumber is not configured, causing float64 coercion. + pidNum, ok := result.JSONContextValues[1].(interface{ String() string }) + require.True(t, ok, "pid must be json.Number, not float64 or string") + assert.Equal(t, "1234", pidNum.String()) +} + +func TestPreprocessJSON_LargeIntPreserved(t *testing.T) { + // Integers exceeding float64 precision (>2^53) must not become scientific notation. + // UseNumber preserves the exact string representation. + input := `{"message":"evt","eventHash":163631252358535540000}` + result := PreprocessJSON([]byte(input)) + require.True(t, result.IsJSON) + require.Len(t, result.JSONContextValues, 1) + // Value should be a json.Number preserving original digits + numStr, ok := result.JSONContextValues[0].(interface{ String() string }) + require.True(t, ok, "large int must be json.Number, not float64") + assert.Equal(t, "163631252358535540000", numStr.String()) +} + +func TestPreprocessJSON_MsgKey(t *testing.T) { + result := PreprocessJSON([]byte(`{"msg":"User login","timestamp":1234567890}`)) + require.True(t, result.IsJSON) + assert.Equal(t, "User login", result.Message) + assert.Equal(t, "msg", result.MessageKey) +} + +func TestPreprocessJSON_NestedPath(t *testing.T) { + result := PreprocessJSON([]byte(`{"data":{"message":"Nested","id":123}}`)) + require.True(t, result.IsJSON) + assert.Equal(t, "Nested", result.Message) + assert.Equal(t, "data.message", result.MessageKey) +} + +func TestPreprocessJSON_NullValue(t *testing.T) { + result := PreprocessJSON([]byte(`{"message":"test","extra":null}`)) + require.True(t, result.IsJSON) + assert.Equal(t, "extra", result.JSONContextSchema) + assert.Equal(t, []interface{}{nil}, result.JSONContextValues) +} + +func TestPreprocessJSON_BoolValue(t *testing.T) { + result := PreprocessJSON([]byte(`{"message":"test","active":true}`)) + require.True(t, result.IsJSON) + assert.Equal(t, "active", result.JSONContextSchema) + assert.Equal(t, []interface{}{true}, result.JSONContextValues) +} + +// --- Production mismatch regression tests --- + +func TestPreprocessJSON_StagingMismatch_MessageWithTabs(t *testing.T) { + // journald log where message field contains \t separators + input := `{"message":"Importing\telapsed: 0.4 s\ttotal: 0.0 B\t(0.0 B/s)","journald":{"_COMM":"ctr"}}` + result := PreprocessJSON([]byte(input)) + require.True(t, result.IsJSON) + assert.Equal(t, "Importing\telapsed: 0.4 s\ttotal: 0.0 B\t(0.0 B/s)", result.Message) +} + +func TestPreprocessJSON_NestedObjectPreserved(t *testing.T) { + // Nested objects are kept as decoded map values ([]interface{} type) + input := `{"message":"test","metadata":{"region":"us-east-1","zone":"a"}}` + result := PreprocessJSON([]byte(input)) + require.True(t, result.IsJSON) + assert.Equal(t, "metadata", result.JSONContextSchema) + require.Len(t, result.JSONContextValues, 1) + nested, ok := result.JSONContextValues[0].(map[string]interface{}) + require.True(t, ok, "nested object must be map[string]interface{}") + assert.Equal(t, "us-east-1", nested["region"]) +} + +func TestPreprocessJSON_IsJSONObject(t *testing.T) { + assert.True(t, IsJSONObject([]byte(`{"key":"val"}`))) + assert.True(t, IsJSONObject([]byte(` {"key":"val"}`))) // leading whitespace + assert.False(t, IsJSONObject([]byte(`["array"]`))) + assert.False(t, IsJSONObject([]byte(`plain text`))) + assert.False(t, IsJSONObject([]byte(``))) +} + +func TestPreprocessJSON_StagingMismatch_NewlineInMessage(t *testing.T) { + // \n in a JSON string value must be returned verbatim in Message. + // PreprocessJSON does not strip whitespace from extracted values — stripping + // is the responsibility of sanitizeForTemplateInto downstream. + // This test guards against any accidental trimming in the extraction layer. + input := `{"message":"hint_type: DELETE\nlimit: 5000\n","logger":"cell"}` + result := PreprocessJSON([]byte(input)) + require.True(t, result.IsJSON) + // JSON \n in a string → real newline char after parsing + assert.Equal(t, "hint_type: DELETE\nlimit: 5000\n", result.Message, + "newlines in JSON string values must survive extraction unchanged") + // The fix for stripping in the final reconstructed log is in sanitizeForTemplateInto, + // not here — this test confirms the extraction layer does not interfere. +} + +func TestPreprocessJSON_StagingMismatch_TrailingSpaceInMessage(t *testing.T) { + // Real mismatch (2026-04-23 staging): trailing space in msg field was stripped. + // HTTP: "Checking error " (with trailing space) + // gRPC: "Checking error" (no trailing space) + input := `{"msg":"Checking error ","level":"INFO"}` + result := PreprocessJSON([]byte(input)) + require.True(t, result.IsJSON) + assert.Equal(t, "Checking error ", result.Message, + "trailing space in message field must be returned verbatim") +} diff --git a/pkg/logs/patterns/tags/tag_manager.go b/pkg/logs/patterns/tags/tag_manager.go index b6e694311ef9..08a7927b9aed 100644 --- a/pkg/logs/patterns/tags/tag_manager.go +++ b/pkg/logs/patterns/tags/tag_manager.go @@ -180,6 +180,14 @@ func (tm *TagManager) Count() int { return len(tm.stringToEntry) } +// HasDictID reports whether id is still present in the dictionary (not evicted). +func (tm *TagManager) HasDictID(id uint64) bool { + tm.mu.RLock() + defer tm.mu.RUnlock() + _, ok := tm.idToEntry[id] + return ok +} + // dictIndexValue converts a dictionary ID to a DynamicValue proto message func dictIndexValue(id uint64) *statefulpb.DynamicValue { return &statefulpb.DynamicValue{ diff --git a/pkg/logs/sender/grpc/mock_state.go b/pkg/logs/sender/grpc/mock_state.go index 70fe0f38ac8a..4c80358237ed 100644 --- a/pkg/logs/sender/grpc/mock_state.go +++ b/pkg/logs/sender/grpc/mock_state.go @@ -11,6 +11,7 @@ import ( "strconv" "strings" "time" + "unicode/utf8" "google.golang.org/protobuf/proto" @@ -49,6 +50,17 @@ func getTranslatorContent(msg *message.Message) []byte { return msg.GetContent() } +// toValidUTF8 returns s unchanged if it is already valid UTF-8 (zero allocation). +// Otherwise replaces each maximal run of invalid bytes with U+FFFD. +// Required before writing to proto3 string fields, which must be valid UTF-8 — +// invalid bytes would corrupt or drop the datum entirely. +func toValidUTF8(s string) string { + if utf8.ValidString(s) { + return s + } + return strings.ToValidUTF8(s, "\uFFFD") +} + const ( // defaultTokenizeBatchSize is the maximum number of messages to accumulate // before calling TokenizeBatch. Larger batches amortize CGo overhead more. @@ -478,13 +490,14 @@ func (mt *MessageTranslator) buildTagSet(msg *message.Message) (*statefulpb.TagS currentStatus := msg.MessageMetadata.GetStatus() currentTagsString := msg.MessageMetadata.TagsToString() - // Cache hit: all inputs identical → return cached result (zero allocations) + // Cache hit: all inputs identical and cached dict index still live (not evicted). if mt.tagCache.tagSet != nil && mt.tagCache.origin == currentOrigin && mt.tagCache.hostname == currentHostname && mt.tagCache.source == currentSource && mt.tagCache.status == currentStatus && - mt.tagCache.tagsString == currentTagsString { + mt.tagCache.tagsString == currentTagsString && + mt.tagManager.HasDictID(mt.tagCache.dictID) { return mt.tagCache.tagSet, mt.tagCache.tagStr, mt.tagCache.dictID, false } @@ -538,6 +551,17 @@ func (mt *MessageTranslator) buildTagSet(msg *message.Message) (*statefulpb.TagS return tagSet, allTagsString, dictID, isNew } +// invalidateTagCache clears the in-memory tag cache when it references dictID. +// Used when dictionary entries are removed out-of-band (e.g. TTL eviction). +func (mt *MessageTranslator) invalidateTagCache(dictID uint64) { + if mt.tagCache.dictID != dictID { + return + } + mt.tagCache.tagSet = nil + mt.tagCache.dictID = 0 + mt.tagCache.tagStr = "" +} + func (mt *MessageTranslator) buildServiceField(msg *message.Message) (*statefulpb.DynamicValue, uint64, bool) { if msg.Origin == nil { return nil, 0, false @@ -635,7 +659,8 @@ func (mt *MessageTranslator) sendDictEntryDelete(outputChan chan *message.Statef // sendRawLog creates and sends a raw log datum (currently unused) func (mt *MessageTranslator) sendRawLog(outputChan chan *message.StatefulMessage, msg *message.Message, contentStr string, ts time.Time, tagSet *statefulpb.TagSet, service *statefulpb.DynamicValue) { - logDatum := buildRawLog(contentStr, ts, tagSet, msg.MessageMetadata.DualSendUUID, service) + // Proto3 string fields require valid UTF-8; replace invalid sequences to avoid corrupt datums. + logDatum := buildRawLog(toValidUTF8(contentStr), ts, tagSet, msg.MessageMetadata.DualSendUUID, service) tlmPipelineRawLogsProcessed.Inc(mt.pipelineName) tlmPipelineRawLogsProcessedBytes.Add(float64(proto.Size(logDatum)), mt.pipelineName) @@ -891,7 +916,8 @@ func (mt *MessageTranslator) fillWildcardDynamicValue( // New value: send inline as string_value — no dict entry created. // High-cardinality values (UUIDs, IPs, request IDs) never repeat, // so dict encoding provides zero compression benefit for them. - oneofStr.StringValue = value + // Proto3 requires valid UTF-8; replace invalid sequences to avoid corrupt datums. + oneofStr.StringValue = toValidUTF8(value) dv.Value = oneofStr } diff --git a/pkg/logs/sender/grpc/mock_state_cache_test.go b/pkg/logs/sender/grpc/mock_state_cache_test.go index 660cd0985377..6b60a7cd898f 100644 --- a/pkg/logs/sender/grpc/mock_state_cache_test.go +++ b/pkg/logs/sender/grpc/mock_state_cache_test.go @@ -9,6 +9,7 @@ import ( "strings" "testing" "time" + "unicode/utf8" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -143,3 +144,100 @@ func TestBuildTagSet_OriginTagChangesInvalidateCache(t *testing.T) { assert.NotContains(t, tagStr2, "team:red") assert.NotEqual(t, tagStr1, tagStr2, "different origin tags should change the final joined tag string") } + +func TestBuildTagSet_RebuildsAfterCachedDictEntryEviction(t *testing.T) { + tok := rtokenizer.NewRustTokenizer() + mt := NewMessageTranslator("test-pipeline", tok) + + origin := makeTestOrigin("svc-a", "src-a", []string{"env:test"}) + msg1 := makeMsg("log line 1", "host-1", "info", origin) + + tagSet1, tagStr1, dictID1, isNew1 := mt.buildTagSet(msg1) + + require.NotNil(t, tagSet1) + require.True(t, isNew1) + + mt.invalidateTagCache(dictID1) + mt.tagManager.EvictStaleEntries(0) + + msg2 := makeMsg("log line 2", "host-1", "info", origin) + tagSet2, tagStr2, dictID2, isNew2 := mt.buildTagSet(msg2) + + require.NotNil(t, tagSet2) + assert.True(t, isNew2, "evicted cached tagset must be redefined") + assert.Equal(t, tagStr1, tagStr2) + assert.NotEqual(t, dictID1, dictID2, "recreated tagset must use a fresh dict id") + assert.NotEqual(t, tagSet1, tagSet2, "recreated tagset must not reuse stale cached pointer") +} + +func TestBuildTagSet_CacheHitSelfHealsAfterSilentDictEviction(t *testing.T) { + tok := rtokenizer.NewRustTokenizer() + mt := NewMessageTranslator("test-pipeline", tok) + + origin := makeTestOrigin("svc-a", "src-a", []string{"env:test"}) + msg1 := makeMsg("log line 1", "host-1", "info", origin) + + tagSet1, tagStr1, dictID1, isNew1 := mt.buildTagSet(msg1) + + require.NotNil(t, tagSet1) + require.True(t, isNew1) + + mt.tagManager.EvictStaleEntries(0) + + msg2 := makeMsg("log line 2", "host-1", "info", origin) + tagSet2, tagStr2, dictID2, isNew2 := mt.buildTagSet(msg2) + + require.NotNil(t, tagSet2) + assert.True(t, isNew2, "cache hit path must revalidate dict liveness and rebuild") + assert.Equal(t, tagStr1, tagStr2) + assert.NotEqual(t, dictID1, dictID2, "rebuilt tagset must get a new dict id after eviction") + assert.NotEqual(t, tagSet1, tagSet2, "rebuilt tagset must not reuse stale cached pointer") +} + +// --- toValidUTF8 tests --- + +func TestToValidUTF8_ValidString(t *testing.T) { + tests := []struct { + name string + input string + }{ + {"empty", ""}, + {"ascii", "hello world"}, + {"multibyte", "caf\xc3\xa9"}, // café + {"emoji", "\xf0\x9f\x98\x80 smile"}, // U+1F600 + {"nul is valid utf8", "hello\x00world"}, // NUL is valid UTF-8 (U+0000) + {"mixed scripts", "\xe4\xb8\xad\xe6\x96\x87 Chinese"}, // 中文 + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + require.True(t, utf8.ValidString(tt.input), "test precondition: input must be valid UTF-8") + result := toValidUTF8(tt.input) + assert.Equal(t, tt.input, result) + }) + } +} + +func TestToValidUTF8_InvalidBytes(t *testing.T) { + tests := []struct { + name string + input string + expected string + }{ + {"lone continuation byte", "hello\x80world", "hello\uFFFDworld"}, + {"truncated sequence", "hello\xc3", "hello\uFFFD"}, + {"invalid lead byte 0xFE", "a\xFEb", "a\uFFFDb"}, + // strings.ToValidUTF8 replaces each maximal *run* of invalid bytes with one + // replacement character, not one per byte. \x80\x81\x82 are three consecutive + // lone continuation bytes — treated as one run → one U+FFFD. + {"multiple invalid bytes", "\x80\x81\x82", "\uFFFD"}, + {"mixed valid and invalid", "ok\xc3\xa9\x80ok", "ok\xc3\xa9\uFFFDok"}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + require.False(t, utf8.ValidString(tt.input), "test precondition: input must contain invalid UTF-8") + result := toValidUTF8(tt.input) + assert.Equal(t, tt.expected, result) + assert.True(t, utf8.ValidString(result), "result must be valid UTF-8") + }) + } +}