diff --git a/pkg/logs/patterns/tags/tag_manager.go b/pkg/logs/patterns/tags/tag_manager.go index 08a7927b9aed..fa086bcd78da 100644 --- a/pkg/logs/patterns/tags/tag_manager.go +++ b/pkg/logs/patterns/tags/tag_manager.go @@ -153,6 +153,21 @@ func (tm *TagManager) Get(tag string) (uint64, bool) { return tm.GetStringID(tag) } +// TouchDictID updates the access metadata for an existing dictionary ID. +// It returns false if the ID was already evicted. +func (tm *TagManager) TouchDictID(id uint64) bool { + tm.mu.Lock() + defer tm.mu.Unlock() + + entry, exists := tm.idToEntry[id] + if !exists { + return false + } + entry.usageCount++ + entry.lastAccessAt = time.Now() + return true +} + // EvictStaleEntries removes all entries that haven't been accessed within the given TTL. // Returns the IDs of evicted entries so callers can send DictEntryDelete messages. func (tm *TagManager) EvictStaleEntries(ttl time.Duration) []uint64 { diff --git a/pkg/logs/sender/grpc/batch_strategy.go b/pkg/logs/sender/grpc/batch_strategy.go index 2cee6bfe1ebd..f39520e92eef 100644 --- a/pkg/logs/sender/grpc/batch_strategy.go +++ b/pkg/logs/sender/grpc/batch_strategy.go @@ -293,6 +293,15 @@ func (s *batchStrategy) flushBuffer(outputChan chan *message.Payload) { s.sendMessagesWithDatums(messagesMetadata, grpcDatums, outputChan) } +func isWireStateDatum(datum *statefulpb.Datum) bool { + switch datum.Data.(type) { + case *statefulpb.Datum_PatternDelete, *statefulpb.Datum_DictEntryDelete: + return false + default: + return true + } +} + func (s *batchStrategy) sendMessagesWithDatums(messagesMetadata []*message.MessageMetadata, grpcDatums []*statefulpb.Datum, outputChan chan *message.Payload) { defer s.utilization.Stop() @@ -301,12 +310,17 @@ func (s *batchStrategy) sendMessagesWithDatums(messagesMetadata []*message.Messa unencodedSize += msgMeta.RawDataLen } - // Extract all state changes from this batch for snapshot management + // Extract all state changes from this batch for snapshot management, and build + // the filtered set of datums that will actually be sent over the wire. var stateChanges []*statefulpb.Datum + wireDatums := make([]*statefulpb.Datum, 0, len(grpcDatums)) for _, datum := range grpcDatums { if isStateDatum(datum) { stateChanges = append(stateChanges, datum) } + if isWireStateDatum(datum) { + wireDatums = append(wireDatums, datum) + } } // Track per-datum-type counts and sizes @@ -334,7 +348,7 @@ func (s *batchStrategy) sendMessagesWithDatums(messagesMetadata []*message.Messa // Create DatumSequence and marshal to bytes datumSeq := &statefulpb.DatumSequence{ - Data: grpcDatums, + Data: wireDatums, } var err error diff --git a/pkg/logs/sender/grpc/mock_state.go b/pkg/logs/sender/grpc/mock_state.go index 4c80358237ed..cbea344ebcfc 100644 --- a/pkg/logs/sender/grpc/mock_state.go +++ b/pkg/logs/sender/grpc/mock_state.go @@ -27,7 +27,7 @@ import ( ) const nanoToMillis = 1000000 -const staleTTL = 14 * time.Minute +const staleTTL = 5 * time.Minute const staleSweepInterval = 30 * time.Second // batchEntry is a per-message sidecar used during batch tokenization. @@ -497,7 +497,7 @@ func (mt *MessageTranslator) buildTagSet(msg *message.Message) (*statefulpb.TagS mt.tagCache.source == currentSource && mt.tagCache.status == currentStatus && mt.tagCache.tagsString == currentTagsString && - mt.tagManager.HasDictID(mt.tagCache.dictID) { + mt.tagManager.TouchDictID(mt.tagCache.dictID) { return mt.tagCache.tagSet, mt.tagCache.tagStr, mt.tagCache.dictID, false } diff --git a/pkg/logs/sender/grpc/mock_state_cache_test.go b/pkg/logs/sender/grpc/mock_state_cache_test.go index 6b60a7cd898f..3e9ccf087b89 100644 --- a/pkg/logs/sender/grpc/mock_state_cache_test.go +++ b/pkg/logs/sender/grpc/mock_state_cache_test.go @@ -194,6 +194,35 @@ func TestBuildTagSet_CacheHitSelfHealsAfterSilentDictEviction(t *testing.T) { assert.NotEqual(t, tagSet1, tagSet2, "rebuilt tagset must not reuse stale cached pointer") } +func TestBuildTagSet_CacheHitRefreshesDictAccess(t *testing.T) { + tok := rtokenizer.NewRustTokenizer() + mt := NewMessageTranslator("test-pipeline", tok) + + origin := makeTestOrigin("svc-a", "src-a", []string{"env:test"}) + msg1 := makeMsg("log line 1", "host-1", "info", origin) + + tagSet1, _, dictID1, isNew1 := mt.buildTagSet(msg1) + + require.NotNil(t, tagSet1) + require.True(t, isNew1) + + ttl := 20 * time.Millisecond + time.Sleep(ttl + 10*time.Millisecond) + + msg2 := makeMsg("log line 2", "host-1", "info", origin) + tagSet2, _, dictID2, isNew2 := mt.buildTagSet(msg2) + + require.NotNil(t, tagSet2) + assert.False(t, isNew2, "second call should use the tagset cache") + assert.Equal(t, dictID1, dictID2) + assert.Equal(t, tagSet1, tagSet2) + + evictedIDs := mt.tagManager.EvictStaleEntries(ttl) + + assert.NotContains(t, evictedIDs, dictID1, "cache hit should refresh the dict entry access time") + assert.True(t, mt.tagManager.HasDictID(dictID1), "refreshed cached tagset should remain live") +} + // --- toValidUTF8 tests --- func TestToValidUTF8_ValidString(t *testing.T) { @@ -205,7 +234,7 @@ func TestToValidUTF8_ValidString(t *testing.T) { {"ascii", "hello world"}, {"multibyte", "caf\xc3\xa9"}, // café {"emoji", "\xf0\x9f\x98\x80 smile"}, // U+1F600 - {"nul is valid utf8", "hello\x00world"}, // NUL is valid UTF-8 (U+0000) + {"nul is valid utf8", "hello\x00world"}, // NUL is valid UTF-8 (U+0000) {"mixed scripts", "\xe4\xb8\xad\xe6\x96\x87 Chinese"}, // 中文 } for _, tt := range tests {