Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions pkg/logs/patterns/tags/tag_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,21 @@ func (tm *TagManager) Get(tag string) (uint64, bool) {
return tm.GetStringID(tag)
}

// TouchDictID updates the access metadata for an existing dictionary ID.
// It returns false if the ID was already evicted.
func (tm *TagManager) TouchDictID(id uint64) bool {
tm.mu.Lock()
defer tm.mu.Unlock()

entry, exists := tm.idToEntry[id]
if !exists {
return false
}
entry.usageCount++
entry.lastAccessAt = time.Now()
return true
}

// EvictStaleEntries removes all entries that haven't been accessed within the given TTL.
// Returns the IDs of evicted entries so callers can send DictEntryDelete messages.
func (tm *TagManager) EvictStaleEntries(ttl time.Duration) []uint64 {
Expand Down
18 changes: 16 additions & 2 deletions pkg/logs/sender/grpc/batch_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,15 @@ func (s *batchStrategy) flushBuffer(outputChan chan *message.Payload) {
s.sendMessagesWithDatums(messagesMetadata, grpcDatums, outputChan)
}

func isWireStateDatum(datum *statefulpb.Datum) bool {
switch datum.Data.(type) {
case *statefulpb.Datum_PatternDelete, *statefulpb.Datum_DictEntryDelete:
return false
default:
return true
}
}

func (s *batchStrategy) sendMessagesWithDatums(messagesMetadata []*message.MessageMetadata, grpcDatums []*statefulpb.Datum, outputChan chan *message.Payload) {
defer s.utilization.Stop()

Expand All @@ -301,12 +310,17 @@ func (s *batchStrategy) sendMessagesWithDatums(messagesMetadata []*message.Messa
unencodedSize += msgMeta.RawDataLen
}

// Extract all state changes from this batch for snapshot management
// Extract all state changes from this batch for snapshot management, and build
// the filtered set of datums that will actually be sent over the wire.
var stateChanges []*statefulpb.Datum
wireDatums := make([]*statefulpb.Datum, 0, len(grpcDatums))
for _, datum := range grpcDatums {
if isStateDatum(datum) {
stateChanges = append(stateChanges, datum)
}
if isWireStateDatum(datum) {
wireDatums = append(wireDatums, datum)
}
}

// Track per-datum-type counts and sizes
Expand Down Expand Up @@ -334,7 +348,7 @@ func (s *batchStrategy) sendMessagesWithDatums(messagesMetadata []*message.Messa

// Create DatumSequence and marshal to bytes
datumSeq := &statefulpb.DatumSequence{
Data: grpcDatums,
Data: wireDatums,
}

var err error
Expand Down
4 changes: 2 additions & 2 deletions pkg/logs/sender/grpc/mock_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
)

const nanoToMillis = 1000000
const staleTTL = 14 * time.Minute
const staleTTL = 5 * time.Minute
const staleSweepInterval = 30 * time.Second

// batchEntry is a per-message sidecar used during batch tokenization.
Expand Down Expand Up @@ -497,7 +497,7 @@ func (mt *MessageTranslator) buildTagSet(msg *message.Message) (*statefulpb.TagS
mt.tagCache.source == currentSource &&
mt.tagCache.status == currentStatus &&
mt.tagCache.tagsString == currentTagsString &&
mt.tagManager.HasDictID(mt.tagCache.dictID) {
mt.tagManager.TouchDictID(mt.tagCache.dictID) {
return mt.tagCache.tagSet, mt.tagCache.tagStr, mt.tagCache.dictID, false
}

Expand Down
31 changes: 30 additions & 1 deletion pkg/logs/sender/grpc/mock_state_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,35 @@ func TestBuildTagSet_CacheHitSelfHealsAfterSilentDictEviction(t *testing.T) {
assert.NotEqual(t, tagSet1, tagSet2, "rebuilt tagset must not reuse stale cached pointer")
}

func TestBuildTagSet_CacheHitRefreshesDictAccess(t *testing.T) {
tok := rtokenizer.NewRustTokenizer()
mt := NewMessageTranslator("test-pipeline", tok)

origin := makeTestOrigin("svc-a", "src-a", []string{"env:test"})
msg1 := makeMsg("log line 1", "host-1", "info", origin)

tagSet1, _, dictID1, isNew1 := mt.buildTagSet(msg1)

require.NotNil(t, tagSet1)
require.True(t, isNew1)

ttl := 20 * time.Millisecond
time.Sleep(ttl + 10*time.Millisecond)

msg2 := makeMsg("log line 2", "host-1", "info", origin)
tagSet2, _, dictID2, isNew2 := mt.buildTagSet(msg2)

require.NotNil(t, tagSet2)
assert.False(t, isNew2, "second call should use the tagset cache")
assert.Equal(t, dictID1, dictID2)
assert.Equal(t, tagSet1, tagSet2)

evictedIDs := mt.tagManager.EvictStaleEntries(ttl)

assert.NotContains(t, evictedIDs, dictID1, "cache hit should refresh the dict entry access time")
assert.True(t, mt.tagManager.HasDictID(dictID1), "refreshed cached tagset should remain live")
}

// --- toValidUTF8 tests ---

func TestToValidUTF8_ValidString(t *testing.T) {
Expand All @@ -205,7 +234,7 @@ func TestToValidUTF8_ValidString(t *testing.T) {
{"ascii", "hello world"},
{"multibyte", "caf\xc3\xa9"}, // café
{"emoji", "\xf0\x9f\x98\x80 smile"}, // U+1F600
{"nul is valid utf8", "hello\x00world"}, // NUL is valid UTF-8 (U+0000)
{"nul is valid utf8", "hello\x00world"}, // NUL is valid UTF-8 (U+0000)
{"mixed scripts", "\xe4\xb8\xad\xe6\x96\x87 Chinese"}, // 中文
}
for _, tt := range tests {
Expand Down
Loading