diff --git a/pkg/logs/sender/grpc/batch_strategy.go b/pkg/logs/sender/grpc/batch_strategy.go index 2a99fc14bf36..27e812bc2cc0 100644 --- a/pkg/logs/sender/grpc/batch_strategy.go +++ b/pkg/logs/sender/grpc/batch_strategy.go @@ -172,32 +172,28 @@ func (s *batchStrategy) addMessage(m *message.StatefulMessage) bool { return false } - // Update delta state when PatternDefine passes through (only when delta encoding is active) + if !s.buffer.AddMessageWithSize(m.Metadata, m.Metadata.RawDataLen) { + // Buffer full (not an error) + return false + } + + // Only mutate delta-encoded fields after the message is accepted into this + // batch. If AddMessageWithSize fails, processMessage flushes and retries the + // same datum in a new batch, where it must still contain absolute values. if enableDeltaEncoding { if patternDefine := m.Datum.GetPatternDefine(); patternDefine != nil { s.lastPatternID = patternDefine.PatternId } } - - // Apply delta encoding to Log datums before adding to batch if logDatum := m.Datum.GetLogs(); logDatum != nil { s.applyDeltaEncoding(logDatum) } - // Try to add to buffer - if s.buffer.AddMessageWithSize(m.Metadata, m.Metadata.RawDataLen) { - s.grpcDatums = append(s.grpcDatums, m.Datum) - return true - } - - // Buffer full (not an error) - return false + s.grpcDatums = append(s.grpcDatums, m.Datum) + return true } // applyDeltaEncoding applies delta encoding to a Log datum within the current batch. -// Currently gated behind enableDeltaEncoding=false: the server does not implement delta -// reconstruction for any field (patternId, tags, timestamp), so this is a no-op until -// server-side support is added and protocol-version-negotiated. func (s *batchStrategy) applyDeltaEncoding(logDatum *statefulpb.Log) { if !enableDeltaEncoding { return @@ -219,7 +215,7 @@ func (s *batchStrategy) applyDeltaEncoding(logDatum *statefulpb.Log) { // Pattern ID delta encoding (for structured logs only) if structured := logDatum.GetStructured(); structured != nil { if structured.PatternId == s.lastPatternID { - structured.PatternId = 0 // proto3 omits zero values + structured.PatternId = 0 } else { s.lastPatternID = structured.PatternId } diff --git a/pkg/logs/sender/grpc/batch_strategy_test.go b/pkg/logs/sender/grpc/batch_strategy_test.go index c6f51299f812..196e5b01178b 100644 --- a/pkg/logs/sender/grpc/batch_strategy_test.go +++ b/pkg/logs/sender/grpc/batch_strategy_test.go @@ -55,6 +55,53 @@ func createTestStatefulMessageWithService(content string, serviceDictID uint64) return msg } +func createTestPatternDefineStatefulMessage(patternID uint64, template string) *message.StatefulMessage { + msg := message.NewMessage(nil, nil, "", 0) + return &message.StatefulMessage{ + Metadata: &msg.MessageMetadata, + Datum: &statefulpb.Datum{ + Data: &statefulpb.Datum_PatternDefine{ + PatternDefine: &statefulpb.PatternDefine{ + PatternId: patternID, + Template: template, + ParamCount: 1, + }, + }, + }, + } +} + +func createTestStructuredStatefulMessage(patternID uint64, dynamicValueCount int) *message.StatefulMessage { + msg := message.NewMessage([]byte("structured"), nil, "", 0) + msg.MessageMetadata.RawDataLen = len("structured") + + dynamicValues := make([]*statefulpb.DynamicValue, dynamicValueCount) + for i := range dynamicValues { + dynamicValues[i] = &statefulpb.DynamicValue{ + Value: &statefulpb.DynamicValue_StringValue{ + StringValue: "value", + }, + } + } + + return &message.StatefulMessage{ + Metadata: &msg.MessageMetadata, + Datum: &statefulpb.Datum{ + Data: &statefulpb.Datum_Logs{ + Logs: &statefulpb.Log{ + Timestamp: 12345, + Content: &statefulpb.Log_Structured{ + Structured: &statefulpb.StructuredLog{ + PatternId: patternID, + DynamicValues: dynamicValues, + }, + }, + }, + }, + }, + } +} + func TestBatchStrategySendsPayloadWhenBufferIsFull(t *testing.T) { input := make(chan *message.StatefulMessage) output := make(chan *message.Payload) @@ -508,6 +555,125 @@ func TestBatchStrategyDeltaEncodesRepeatedService(t *testing.T) { strategy.Stop() } +func TestBatchStrategyDeltaEncodesPatternIDFromPatternDefine(t *testing.T) { + input := make(chan *message.StatefulMessage) + output := make(chan *message.Payload, 10) + flushChan := make(chan struct{}) + + strategy := NewBatchStrategy( + input, + output, + flushChan, + time.Hour, + 100, + 10000, + "test", + compressionfx.NewMockCompressor().NewCompressor(compression.NoneKind, 1), + metrics.NewNoopPipelineMonitor(""), + "test") + strategy.Start() + + input <- createTestPatternDefineStatefulMessage(12, "pattern") + input <- createTestStructuredStatefulMessage(12, 1) + flushChan <- struct{}{} + + payload := <-output + var datumSeq statefulpb.DatumSequence + err := proto.Unmarshal(payload.Encoded, &datumSeq) + require.NoError(t, err) + require.Len(t, datumSeq.Data, 2) + + require.NotNil(t, datumSeq.Data[0].GetPatternDefine()) + require.NotNil(t, datumSeq.Data[1].GetLogs().GetStructured()) + assert.EqualValues(t, 0, datumSeq.Data[1].GetLogs().GetStructured().PatternId) + + strategy.Stop() +} + +func TestBatchStrategyDeltaEncodesRepeatedPatternID(t *testing.T) { + input := make(chan *message.StatefulMessage) + output := make(chan *message.Payload, 10) + flushChan := make(chan struct{}) + + strategy := NewBatchStrategy( + input, + output, + flushChan, + time.Hour, + 100, + 10000, + "test", + compressionfx.NewMockCompressor().NewCompressor(compression.NoneKind, 1), + metrics.NewNoopPipelineMonitor(""), + "test") + strategy.Start() + + input <- createTestStructuredStatefulMessage(12, 1) + input <- createTestStructuredStatefulMessage(12, 1) + flushChan <- struct{}{} + + payload := <-output + var datumSeq statefulpb.DatumSequence + err := proto.Unmarshal(payload.Encoded, &datumSeq) + require.NoError(t, err) + require.Len(t, datumSeq.Data, 2) + + assert.EqualValues(t, 12, datumSeq.Data[0].GetLogs().GetStructured().PatternId) + assert.EqualValues(t, 0, datumSeq.Data[1].GetLogs().GetStructured().PatternId) + + strategy.Stop() +} + +func TestBatchStrategyDoesNotMutateDeltaStateWhenAddFails(t *testing.T) { + input := make(chan *message.StatefulMessage) + output := make(chan *message.Payload, 10) + flushChan := make(chan struct{}) + + strategy := NewBatchStrategy( + input, + output, + flushChan, + time.Hour, + 100, + 10, + "test", + compressionfx.NewMockCompressor().NewCompressor(compression.NoneKind, 1), + metrics.NewNoopPipelineMonitor(""), + "test") + strategy.Start() + + input <- createTestPatternDefineStatefulMessage(12, "pattern") + + firstLog := createTestStructuredStatefulMessage(12, 1) + firstLog.Metadata.RawDataLen = 5 + input <- firstLog + + secondLog := createTestStructuredStatefulMessage(12, 1) + secondLog.Metadata.RawDataLen = 6 + input <- secondLog + + firstPayload := <-output + var firstDatumSeq statefulpb.DatumSequence + err := proto.Unmarshal(firstPayload.Encoded, &firstDatumSeq) + require.NoError(t, err) + require.Len(t, firstDatumSeq.Data, 2) + require.NotNil(t, firstDatumSeq.Data[0].GetPatternDefine()) + require.NotNil(t, firstDatumSeq.Data[1].GetLogs().GetStructured()) + assert.EqualValues(t, 0, firstDatumSeq.Data[1].GetLogs().GetStructured().PatternId) + + flushChan <- struct{}{} + + secondPayload := <-output + var secondDatumSeq statefulpb.DatumSequence + err = proto.Unmarshal(secondPayload.Encoded, &secondDatumSeq) + require.NoError(t, err) + require.Len(t, secondDatumSeq.Data, 1) + require.NotNil(t, secondDatumSeq.Data[0].GetLogs().GetStructured()) + assert.EqualValues(t, 12, secondDatumSeq.Data[0].GetLogs().GetStructured().PatternId) + + strategy.Stop() +} + func TestBatchStrategyRecordsPreCompressionBytesInStatefulExtra(t *testing.T) { input := make(chan *message.StatefulMessage) output := make(chan *message.Payload, 10) // Buffered to prevent deadlock diff --git a/pkg/logs/sender/grpc/inflight.go b/pkg/logs/sender/grpc/inflight.go index c568fdc724bd..f467ab767914 100644 --- a/pkg/logs/sender/grpc/inflight.go +++ b/pkg/logs/sender/grpc/inflight.go @@ -169,8 +169,12 @@ func (t *inflightTracker) nextToSendEncoded(compressor compression.Compressor) ( return payload.Encoded, nil } - datums := make([]*statefulpb.Datum, 0, len(prefix)+len(extra.WireDatums)) + sync := replayDeltaEncodingSync(extra.WireDatums) + datums := make([]*statefulpb.Datum, 0, len(prefix)+len(extra.WireDatums)+1) datums = append(datums, prefix...) + if sync != nil { + datums = append(datums, sync) + } datums = append(datums, extra.WireDatums...) serialized, err := proto.Marshal(&statefulpb.DatumSequence{Data: datums}) @@ -180,6 +184,79 @@ func (t *inflightTracker) nextToSendEncoded(compressor compression.Compressor) ( return compressor.Compress(serialized) } +func replayDeltaEncodingSync(datums []*statefulpb.Datum) *statefulpb.Datum { + var sync statefulpb.DeltaEncodingSync + var hasSync bool + var currentPatternID uint64 + var currentTags *statefulpb.TagSet + var syncedPattern bool + var syncedTags bool + + for _, datum := range datums { + if datum == nil { + continue + } + switch d := datum.Data.(type) { + case *statefulpb.Datum_PatternDefine: + if d.PatternDefine == nil { + continue + } + currentPatternID = d.PatternDefine.PatternId + case *statefulpb.Datum_DeltaEncodingSync: + if d.DeltaEncodingSync == nil { + continue + } + if d.DeltaEncodingSync.PatternId != 0 { + currentPatternID = d.DeltaEncodingSync.PatternId + } + if d.DeltaEncodingSync.Tags != nil { + currentTags = d.DeltaEncodingSync.Tags + } + case *statefulpb.Datum_Logs: + logDatum := d.Logs + if logDatum == nil { + continue + } + if !syncedPattern { + if structured := logDatum.GetStructured(); structured != nil { + if structured.PatternId == 0 { + if currentPatternID != 0 { + sync.PatternId = currentPatternID + hasSync = true + syncedPattern = true + } + } else { + currentPatternID = structured.PatternId + } + } + } + if !syncedTags { + if logDatum.Tags == nil || logDatum.Tags.Tagset == nil { + if currentTags != nil { + sync.Tags = currentTags + hasSync = true + syncedTags = true + } + } else { + currentTags = logDatum.Tags + } + } + } + if syncedPattern && syncedTags { + break + } + } + + if !hasSync { + return nil + } + return &statefulpb.Datum{ + Data: &statefulpb.Datum_DeltaEncodingSync{ + DeltaEncodingSync: &sync, + }, + } +} + // sentCount returns the number of sent payloads awaiting ack func (t *inflightTracker) sentCount() int { return (t.sentTail - t.head + len(t.items)) % len(t.items) diff --git a/pkg/logs/sender/grpc/inflight_test.go b/pkg/logs/sender/grpc/inflight_test.go index 166d18a600f8..c85cd9f5e1a5 100644 --- a/pkg/logs/sender/grpc/inflight_test.go +++ b/pkg/logs/sender/grpc/inflight_test.go @@ -544,6 +544,39 @@ func TestInflightTrackerNextToSendPrependsLazySnapshotState(t *testing.T) { assert.True(t, tracker.streamSent.hasDictEntry(20)) } +func TestInflightTrackerNextToSendPrependsDeltaEncodingSyncBeforeReplayedDatums(t *testing.T) { + tracker := newInflightTracker("test", 5) + tracker.snapshot.apply(&StatefulExtra{ + StateChanges: []*statefulpb.Datum{ + createInflightPatternDefine(2, "pattern2"), + createInflightDictEntryDefine(20, "value20"), + }, + }) + tracker.streamSent = newStateReferences() + + require.True(t, tracker.append(createInflightPayloadWithWireDatums( + createInflightLogDatum(2, 20), + createInflightLogDatum(0, 20), + ))) + + encoded, err := tracker.nextToSendEncoded(noopimpl.New()) + require.NoError(t, err) + + datumSeq := decodeInflightDatumSequence(t, encoded) + require.Len(t, datumSeq.Data, 5) + assert.Equal(t, map[uint64]string{2: "pattern2"}, collectInflightPatterns(datumSeq.Data[:2])) + assert.Equal(t, map[uint64]string{20: "value20"}, collectInflightDictEntries(datumSeq.Data[:2])) + + sync := datumSeq.Data[2].GetDeltaEncodingSync() + require.NotNil(t, sync) + assert.EqualValues(t, 2, sync.PatternId) + + require.NotNil(t, datumSeq.Data[3].GetLogs()) + assert.EqualValues(t, 2, datumSeq.Data[3].GetLogs().GetStructured().PatternId) + require.NotNil(t, datumSeq.Data[4].GetLogs()) + assert.EqualValues(t, 0, datumSeq.Data[4].GetLogs().GetStructured().PatternId) +} + func TestInflightTrackerNextToSendDoesNotPrependStateDefinedInSamePayload(t *testing.T) { tracker := newInflightTracker("test", 5) tracker.snapshot.apply(&StatefulExtra{