Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 11 additions & 15 deletions pkg/logs/sender/grpc/batch_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,32 +172,28 @@ func (s *batchStrategy) addMessage(m *message.StatefulMessage) bool {
return false
}

// Update delta state when PatternDefine passes through (only when delta encoding is active)
if !s.buffer.AddMessageWithSize(m.Metadata, m.Metadata.RawDataLen) {
// Buffer full (not an error)
return false
}

// Only mutate delta-encoded fields after the message is accepted into this
// batch. If AddMessageWithSize fails, processMessage flushes and retries the
// same datum in a new batch, where it must still contain absolute values.
if enableDeltaEncoding {
if patternDefine := m.Datum.GetPatternDefine(); patternDefine != nil {
s.lastPatternID = patternDefine.PatternId
}
}

// Apply delta encoding to Log datums before adding to batch
if logDatum := m.Datum.GetLogs(); logDatum != nil {
s.applyDeltaEncoding(logDatum)
}

// Try to add to buffer
if s.buffer.AddMessageWithSize(m.Metadata, m.Metadata.RawDataLen) {
s.grpcDatums = append(s.grpcDatums, m.Datum)
return true
}

// Buffer full (not an error)
return false
s.grpcDatums = append(s.grpcDatums, m.Datum)
return true
}

// applyDeltaEncoding applies delta encoding to a Log datum within the current batch.
// Currently gated behind enableDeltaEncoding=false: the server does not implement delta
// reconstruction for any field (patternId, tags, timestamp), so this is a no-op until
// server-side support is added and protocol-version-negotiated.
func (s *batchStrategy) applyDeltaEncoding(logDatum *statefulpb.Log) {
if !enableDeltaEncoding {
return
Expand All @@ -219,7 +215,7 @@ func (s *batchStrategy) applyDeltaEncoding(logDatum *statefulpb.Log) {
// Pattern ID delta encoding (for structured logs only)
if structured := logDatum.GetStructured(); structured != nil {
if structured.PatternId == s.lastPatternID {
structured.PatternId = 0 // proto3 omits zero values
structured.PatternId = 0
} else {
s.lastPatternID = structured.PatternId
}
Expand Down
166 changes: 166 additions & 0 deletions pkg/logs/sender/grpc/batch_strategy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,53 @@ func createTestStatefulMessageWithService(content string, serviceDictID uint64)
return msg
}

func createTestPatternDefineStatefulMessage(patternID uint64, template string) *message.StatefulMessage {
msg := message.NewMessage(nil, nil, "", 0)
return &message.StatefulMessage{
Metadata: &msg.MessageMetadata,
Datum: &statefulpb.Datum{
Data: &statefulpb.Datum_PatternDefine{
PatternDefine: &statefulpb.PatternDefine{
PatternId: patternID,
Template: template,
ParamCount: 1,
},
},
},
}
}

func createTestStructuredStatefulMessage(patternID uint64, dynamicValueCount int) *message.StatefulMessage {
msg := message.NewMessage([]byte("structured"), nil, "", 0)
msg.MessageMetadata.RawDataLen = len("structured")

dynamicValues := make([]*statefulpb.DynamicValue, dynamicValueCount)
for i := range dynamicValues {
dynamicValues[i] = &statefulpb.DynamicValue{
Value: &statefulpb.DynamicValue_StringValue{
StringValue: "value",
},
}
}

return &message.StatefulMessage{
Metadata: &msg.MessageMetadata,
Datum: &statefulpb.Datum{
Data: &statefulpb.Datum_Logs{
Logs: &statefulpb.Log{
Timestamp: 12345,
Content: &statefulpb.Log_Structured{
Structured: &statefulpb.StructuredLog{
PatternId: patternID,
DynamicValues: dynamicValues,
},
},
},
},
},
}
}

func TestBatchStrategySendsPayloadWhenBufferIsFull(t *testing.T) {
input := make(chan *message.StatefulMessage)
output := make(chan *message.Payload)
Expand Down Expand Up @@ -508,6 +555,125 @@ func TestBatchStrategyDeltaEncodesRepeatedService(t *testing.T) {
strategy.Stop()
}

func TestBatchStrategyDeltaEncodesPatternIDFromPatternDefine(t *testing.T) {
input := make(chan *message.StatefulMessage)
output := make(chan *message.Payload, 10)
flushChan := make(chan struct{})

strategy := NewBatchStrategy(
input,
output,
flushChan,
time.Hour,
100,
10000,
"test",
compressionfx.NewMockCompressor().NewCompressor(compression.NoneKind, 1),
metrics.NewNoopPipelineMonitor(""),
"test")
strategy.Start()

input <- createTestPatternDefineStatefulMessage(12, "pattern")
input <- createTestStructuredStatefulMessage(12, 1)
flushChan <- struct{}{}

payload := <-output
var datumSeq statefulpb.DatumSequence
err := proto.Unmarshal(payload.Encoded, &datumSeq)
require.NoError(t, err)
require.Len(t, datumSeq.Data, 2)

require.NotNil(t, datumSeq.Data[0].GetPatternDefine())
require.NotNil(t, datumSeq.Data[1].GetLogs().GetStructured())
assert.EqualValues(t, 0, datumSeq.Data[1].GetLogs().GetStructured().PatternId)

strategy.Stop()
}

func TestBatchStrategyDeltaEncodesRepeatedPatternID(t *testing.T) {
input := make(chan *message.StatefulMessage)
output := make(chan *message.Payload, 10)
flushChan := make(chan struct{})

strategy := NewBatchStrategy(
input,
output,
flushChan,
time.Hour,
100,
10000,
"test",
compressionfx.NewMockCompressor().NewCompressor(compression.NoneKind, 1),
metrics.NewNoopPipelineMonitor(""),
"test")
strategy.Start()

input <- createTestStructuredStatefulMessage(12, 1)
input <- createTestStructuredStatefulMessage(12, 1)
flushChan <- struct{}{}

payload := <-output
var datumSeq statefulpb.DatumSequence
err := proto.Unmarshal(payload.Encoded, &datumSeq)
require.NoError(t, err)
require.Len(t, datumSeq.Data, 2)

assert.EqualValues(t, 12, datumSeq.Data[0].GetLogs().GetStructured().PatternId)
assert.EqualValues(t, 0, datumSeq.Data[1].GetLogs().GetStructured().PatternId)

strategy.Stop()
}

func TestBatchStrategyDoesNotMutateDeltaStateWhenAddFails(t *testing.T) {
input := make(chan *message.StatefulMessage)
output := make(chan *message.Payload, 10)
flushChan := make(chan struct{})

strategy := NewBatchStrategy(
input,
output,
flushChan,
time.Hour,
100,
10,
"test",
compressionfx.NewMockCompressor().NewCompressor(compression.NoneKind, 1),
metrics.NewNoopPipelineMonitor(""),
"test")
strategy.Start()

input <- createTestPatternDefineStatefulMessage(12, "pattern")

firstLog := createTestStructuredStatefulMessage(12, 1)
firstLog.Metadata.RawDataLen = 5
input <- firstLog

secondLog := createTestStructuredStatefulMessage(12, 1)
secondLog.Metadata.RawDataLen = 6
input <- secondLog

firstPayload := <-output
var firstDatumSeq statefulpb.DatumSequence
err := proto.Unmarshal(firstPayload.Encoded, &firstDatumSeq)
require.NoError(t, err)
require.Len(t, firstDatumSeq.Data, 2)
require.NotNil(t, firstDatumSeq.Data[0].GetPatternDefine())
require.NotNil(t, firstDatumSeq.Data[1].GetLogs().GetStructured())
assert.EqualValues(t, 0, firstDatumSeq.Data[1].GetLogs().GetStructured().PatternId)

flushChan <- struct{}{}

secondPayload := <-output
var secondDatumSeq statefulpb.DatumSequence
err = proto.Unmarshal(secondPayload.Encoded, &secondDatumSeq)
require.NoError(t, err)
require.Len(t, secondDatumSeq.Data, 1)
require.NotNil(t, secondDatumSeq.Data[0].GetLogs().GetStructured())
assert.EqualValues(t, 12, secondDatumSeq.Data[0].GetLogs().GetStructured().PatternId)

strategy.Stop()
}

func TestBatchStrategyRecordsPreCompressionBytesInStatefulExtra(t *testing.T) {
input := make(chan *message.StatefulMessage)
output := make(chan *message.Payload, 10) // Buffered to prevent deadlock
Expand Down
79 changes: 78 additions & 1 deletion pkg/logs/sender/grpc/inflight.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,12 @@ func (t *inflightTracker) nextToSendEncoded(compressor compression.Compressor) (
return payload.Encoded, nil
}

datums := make([]*statefulpb.Datum, 0, len(prefix)+len(extra.WireDatums))
sync := replayDeltaEncodingSync(extra.WireDatums)
datums := make([]*statefulpb.Datum, 0, len(prefix)+len(extra.WireDatums)+1)
datums = append(datums, prefix...)
if sync != nil {
datums = append(datums, sync)
}
datums = append(datums, extra.WireDatums...)

serialized, err := proto.Marshal(&statefulpb.DatumSequence{Data: datums})
Expand All @@ -180,6 +184,79 @@ func (t *inflightTracker) nextToSendEncoded(compressor compression.Compressor) (
return compressor.Compress(serialized)
}

func replayDeltaEncodingSync(datums []*statefulpb.Datum) *statefulpb.Datum {
var sync statefulpb.DeltaEncodingSync
var hasSync bool
var currentPatternID uint64
var currentTags *statefulpb.TagSet
var syncedPattern bool
var syncedTags bool

for _, datum := range datums {
if datum == nil {
continue
}
switch d := datum.Data.(type) {
case *statefulpb.Datum_PatternDefine:
if d.PatternDefine == nil {
continue
}
currentPatternID = d.PatternDefine.PatternId
case *statefulpb.Datum_DeltaEncodingSync:
if d.DeltaEncodingSync == nil {
continue
}
if d.DeltaEncodingSync.PatternId != 0 {
currentPatternID = d.DeltaEncodingSync.PatternId
}
if d.DeltaEncodingSync.Tags != nil {
currentTags = d.DeltaEncodingSync.Tags
}
case *statefulpb.Datum_Logs:
logDatum := d.Logs
if logDatum == nil {
continue
}
if !syncedPattern {
if structured := logDatum.GetStructured(); structured != nil {
if structured.PatternId == 0 {
if currentPatternID != 0 {
sync.PatternId = currentPatternID
hasSync = true
syncedPattern = true
}
} else {
currentPatternID = structured.PatternId
}
}
}
if !syncedTags {
if logDatum.Tags == nil || logDatum.Tags.Tagset == nil {
if currentTags != nil {
sync.Tags = currentTags
hasSync = true
syncedTags = true
}
} else {
currentTags = logDatum.Tags
}
}
}
if syncedPattern && syncedTags {
break
}
}

if !hasSync {
return nil
}
return &statefulpb.Datum{
Data: &statefulpb.Datum_DeltaEncodingSync{
DeltaEncodingSync: &sync,
},
}
}

// sentCount returns the number of sent payloads awaiting ack
func (t *inflightTracker) sentCount() int {
return (t.sentTail - t.head + len(t.items)) % len(t.items)
Expand Down
33 changes: 33 additions & 0 deletions pkg/logs/sender/grpc/inflight_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,39 @@ func TestInflightTrackerNextToSendPrependsLazySnapshotState(t *testing.T) {
assert.True(t, tracker.streamSent.hasDictEntry(20))
}

func TestInflightTrackerNextToSendPrependsDeltaEncodingSyncBeforeReplayedDatums(t *testing.T) {
tracker := newInflightTracker("test", 5)
tracker.snapshot.apply(&StatefulExtra{
StateChanges: []*statefulpb.Datum{
createInflightPatternDefine(2, "pattern2"),
createInflightDictEntryDefine(20, "value20"),
},
})
tracker.streamSent = newStateReferences()

require.True(t, tracker.append(createInflightPayloadWithWireDatums(
createInflightLogDatum(2, 20),
createInflightLogDatum(0, 20),
)))

encoded, err := tracker.nextToSendEncoded(noopimpl.New())
require.NoError(t, err)

datumSeq := decodeInflightDatumSequence(t, encoded)
require.Len(t, datumSeq.Data, 5)
assert.Equal(t, map[uint64]string{2: "pattern2"}, collectInflightPatterns(datumSeq.Data[:2]))
assert.Equal(t, map[uint64]string{20: "value20"}, collectInflightDictEntries(datumSeq.Data[:2]))

sync := datumSeq.Data[2].GetDeltaEncodingSync()
require.NotNil(t, sync)
assert.EqualValues(t, 2, sync.PatternId)

require.NotNil(t, datumSeq.Data[3].GetLogs())
assert.EqualValues(t, 2, datumSeq.Data[3].GetLogs().GetStructured().PatternId)
require.NotNil(t, datumSeq.Data[4].GetLogs())
assert.EqualValues(t, 0, datumSeq.Data[4].GetLogs().GetStructured().PatternId)
}

func TestInflightTrackerNextToSendDoesNotPrependStateDefinedInSamePayload(t *testing.T) {
tracker := newInflightTracker("test", 5)
tracker.snapshot.apply(&StatefulExtra{
Expand Down
Loading