Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 30 additions & 52 deletions pkg/logs/sender/grpc/batch_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,25 +33,13 @@ const enableDeltaEncoding = true
// Used by inflight tracker to maintain snapshot state for stream rotation
type StatefulExtra struct {
StateChanges []*statefulpb.Datum
// WireDatums are the datums in the encoded payload. Inflight uses them to
// find state references and rebuild replay batches with lazy snapshot state.
// WireDatums are the canonical, pre-delta datums represented by the encoded
// payload. Inflight uses them to find state references and rebuild replay
// batches with lazy snapshot state before final wire delta encoding.
WireDatums []*statefulpb.Datum
PreCompressionBytes int
}

// isStateDatum returns true if the datum represents a state change
// (pattern/dict define/delete operations)
func isStateDatum(datum *statefulpb.Datum) bool {
switch datum.Data.(type) {
case *statefulpb.Datum_PatternDefine, *statefulpb.Datum_PatternDelete,
*statefulpb.Datum_DictEntryDefine, *statefulpb.Datum_DictEntryDelete,
*statefulpb.Datum_JsonSchemaDefine, *statefulpb.Datum_JsonSchemaDelete:
return true
default:
return false
}
}

// batchStrategy contains batching logic for gRPC sender without serializer
// It collects Datum objects from StatefulMessages and creates Payload with serialized DatumSequence
// Note: Serverless logs are not supported in this PoC implementation
Expand Down Expand Up @@ -179,25 +167,33 @@ func (s *batchStrategy) addMessage(m *message.StatefulMessage) bool {
return false
}

// Only mutate delta-encoded fields after the message is accepted into this
// batch. If AddMessageWithSize fails, processMessage flushes and retries the
// same datum in a new batch, where it must still contain absolute values.
if enableDeltaEncoding {
if patternDefine := m.Datum.GetPatternDefine(); patternDefine != nil {
s.lastPatternID = patternDefine.PatternId
}
}
if logDatum := m.Datum.GetLogs(); logDatum != nil {
s.applyDeltaEncoding(logDatum)
}
if flatLogDatum := m.Datum.GetFlatLog(); flatLogDatum != nil {
s.applyFlatLogDeltaEncoding(flatLogDatum)
}

s.grpcDatums = append(s.grpcDatums, m.Datum)
return true
}

func deltaEncodeDatumsForWire(datums []*statefulpb.Datum) []*statefulpb.Datum {
if !enableDeltaEncoding {
return datums
}

encoded := make([]*statefulpb.Datum, 0, len(datums))
state := batchStrategy{}
for _, datum := range datums {
cloned := proto.Clone(datum).(*statefulpb.Datum)
if patternDefine := cloned.GetPatternDefine(); patternDefine != nil {
state.lastPatternID = patternDefine.PatternId
}
if logDatum := cloned.GetLogs(); logDatum != nil {
state.applyDeltaEncoding(logDatum)
}
if flatLogDatum := cloned.GetFlatLog(); flatLogDatum != nil {
state.applyFlatLogDeltaEncoding(flatLogDatum)
}
encoded = append(encoded, cloned)
}
return encoded
}

func (s *batchStrategy) applyTimestampDeltaEncoding(timestamp *int64) {
currentTimestamp := *timestamp
if s.lastTimestamp == 0 {
Expand Down Expand Up @@ -334,15 +330,6 @@ func (s *batchStrategy) flushBuffer(outputChan chan *message.Payload) {
s.sendMessagesWithDatums(messagesMetadata, grpcDatums, outputChan)
}

func isWireStateDatum(datum *statefulpb.Datum) bool {
switch datum.Data.(type) {
case *statefulpb.Datum_PatternDelete, *statefulpb.Datum_DictEntryDelete, *statefulpb.Datum_JsonSchemaDelete:
return false
default:
return true
}
}

func (s *batchStrategy) sendMessagesWithDatums(messagesMetadata []*message.MessageMetadata, grpcDatums []*statefulpb.Datum, outputChan chan *message.Payload) {
defer s.utilization.Stop()

Expand All @@ -351,18 +338,7 @@ func (s *batchStrategy) sendMessagesWithDatums(messagesMetadata []*message.Messa
unencodedSize += msgMeta.RawDataLen
}

// Extract all state changes from this batch for snapshot management, and build
// the filtered set of datums that will actually be sent over the wire.
var stateChanges []*statefulpb.Datum
wireDatums := make([]*statefulpb.Datum, 0, len(grpcDatums))
for _, datum := range grpcDatums {
if isStateDatum(datum) {
stateChanges = append(stateChanges, datum)
}
if isWireStateDatum(datum) {
wireDatums = append(wireDatums, datum)
}
}
stateChanges, wireDatums := splitStateAndWireDatums(grpcDatums)

// Track per-datum-type counts and sizes
for _, datum := range grpcDatums {
Expand Down Expand Up @@ -393,9 +369,11 @@ func (s *batchStrategy) sendMessagesWithDatums(messagesMetadata []*message.Messa
metrics.TlmDatumBytes.Add(float64(proto.Size(datum)), datumType)
}

encodedDatums := deltaEncodeDatumsForWire(wireDatums)

// Create DatumSequence and marshal to bytes
datumSeq := &statefulpb.DatumSequence{
Data: wireDatums,
Data: encodedDatums,
}

var err error
Expand Down
3 changes: 3 additions & 0 deletions pkg/logs/sender/grpc/batch_strategy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,9 @@ func TestBatchStrategyDeltaEncodesPatternIDFromPatternDefine(t *testing.T) {
require.NotNil(t, datumSeq.Data[0].GetPatternDefine())
require.NotNil(t, datumSeq.Data[1].GetLogs().GetStructured())
assert.EqualValues(t, 0, datumSeq.Data[1].GetLogs().GetStructured().PatternId)
extra, ok := payload.StatefulExtra.(*StatefulExtra)
require.True(t, ok)
assert.EqualValues(t, 12, extra.WireDatums[1].GetLogs().GetStructured().PatternId)

strategy.Stop()
}
Expand Down
Loading
Loading