Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/logs/patterns/tags/tag_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ func NewTagManager() *TagManager {
idToEntry: make(map[uint64]*tagEntry),
pendingDynamic: make(map[string]uint16),
}
// Stateful FlatLog reserves dict index 1 as the empty/omit sentinel.
tm.nextID.Store(1)
return tm
}

Expand Down
12 changes: 10 additions & 2 deletions pkg/logs/patterns/tags/tag_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,19 @@ func TestTagManager_EncodeTagStrings_MixedNewAndExisting(t *testing.T) {

_, secondEntries := tm.EncodeTagStrings([]string{"env:production", "service:api"})
assert.Len(t, secondEntries, 2)
assert.Equal(t, "service", secondEntries[3])
assert.Equal(t, "api", secondEntries[4])
assert.Contains(t, mapValues(secondEntries), "service")
assert.Contains(t, mapValues(secondEntries), "api")
assert.Equal(t, 4, tm.Count())
}

func mapValues[K comparable, V any](values map[K]V) []V {
out := make([]V, 0, len(values))
for _, value := range values {
out = append(out, value)
}
return out
}

func TestTagManager_EncodeTagStrings_InvalidFormats(t *testing.T) {
tm := NewTagManager()

Expand Down
84 changes: 66 additions & 18 deletions pkg/logs/sender/grpc/batch_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ type StatefulExtra struct {
func isStateDatum(datum *statefulpb.Datum) bool {
switch datum.Data.(type) {
case *statefulpb.Datum_PatternDefine, *statefulpb.Datum_PatternDelete,
*statefulpb.Datum_DictEntryDefine, *statefulpb.Datum_DictEntryDelete:
*statefulpb.Datum_DictEntryDefine, *statefulpb.Datum_DictEntryDelete,
*statefulpb.Datum_JsonSchemaDefine, *statefulpb.Datum_JsonSchemaDelete:
return true
default:
return false
Expand Down Expand Up @@ -73,10 +74,12 @@ type batchStrategy struct {
marshalBuf []byte

// Delta encoding state - tracks previous values within current batch
lastTimestamp int64 // milliseconds since epoch
lastPatternID uint64 // pattern identifier
lastTagsDictIndex uint64 // dictionary index of tag string
lastServiceDictID uint64 // dictionary index of the service field
lastTimestamp int64 // milliseconds since epoch
lastPatternID uint64 // pattern identifier
lastTagsDictIndex uint64 // dictionary index of tag string
lastServiceDictID uint64 // dictionary index of the service field
lastStatusDictID uint64 // dictionary index of the status field
lastJSONSchemaDict uint64 // dictionary index of the JSON schema field

// Telemetry
pipelineMonitor metrics.PipelineMonitor
Expand Down Expand Up @@ -188,29 +191,33 @@ func (s *batchStrategy) addMessage(m *message.StatefulMessage) bool {
if logDatum := m.Datum.GetLogs(); logDatum != nil {
s.applyDeltaEncoding(logDatum)
}
if flatLogDatum := m.Datum.GetFlatLog(); flatLogDatum != nil {
s.applyFlatLogDeltaEncoding(flatLogDatum)
}

s.grpcDatums = append(s.grpcDatums, m.Datum)
return true
}

func (s *batchStrategy) applyTimestampDeltaEncoding(timestamp *int64) {
currentTimestamp := *timestamp
if s.lastTimestamp == 0 {
s.lastTimestamp = currentTimestamp
return
}

delta := currentTimestamp - s.lastTimestamp
s.lastTimestamp = currentTimestamp
*timestamp = delta // Note that when delta is 0, proto3 omits the timestamp field.
}

// applyDeltaEncoding applies delta encoding to a Log datum within the current batch.
func (s *batchStrategy) applyDeltaEncoding(logDatum *statefulpb.Log) {
if !enableDeltaEncoding {
return
}
// Timestamp delta encoding
currentTimestamp := logDatum.Timestamp

// First message in batch: send absolute timestamp
if s.lastTimestamp == 0 {
s.lastTimestamp = currentTimestamp
// Keep absolute value in logDatum.Timestamp
} else {
// Normal case: compute and send delta
delta := currentTimestamp - s.lastTimestamp
s.lastTimestamp = currentTimestamp
logDatum.Timestamp = delta // Note that when delta is 0, proto3 omits the timestamp field
}
s.applyTimestampDeltaEncoding(&logDatum.Timestamp)

// Pattern ID delta encoding (for structured logs only)
if structured := logDatum.GetStructured(); structured != nil {
Expand Down Expand Up @@ -246,6 +253,39 @@ func (s *batchStrategy) applyDeltaEncoding(logDatum *statefulpb.Log) {
}
}

// applyFlatLogDeltaEncoding applies delta encoding to a FlatLog datum within the current batch.
func (s *batchStrategy) applyFlatLogDeltaEncoding(logDatum *statefulpb.FlatLog) {
if !enableDeltaEncoding {
return
}

s.applyTimestampDeltaEncoding(&logDatum.Timestamp)

logDatum.Status = s.deltaFlatLogDictIndex(logDatum.Status, &s.lastStatusDictID)
logDatum.Service = s.deltaFlatLogDictIndex(logDatum.Service, &s.lastServiceDictID)
logDatum.Tags = s.deltaFlatLogDictIndex(logDatum.Tags, &s.lastTagsDictIndex)
logDatum.JsonSchemaId = s.deltaFlatLogDictIndex(logDatum.JsonSchemaId, &s.lastJSONSchemaDict)

if logDatum.RawLog == "" {
if logDatum.PatternId == s.lastPatternID {
logDatum.PatternId = 0
} else {
s.lastPatternID = logDatum.PatternId
}
}
}

func (s *batchStrategy) deltaFlatLogDictIndex(current uint64, last *uint64) uint64 {
if current == 0 {
current = flatLogEmptyDictIndex
}
if current == *last {
return 0
}
*last = current
return current
}

// Mostly copy/pasted from batch.go
func (s *batchStrategy) processMessage(m *message.StatefulMessage, outputChan chan *message.Payload) {
// Track latency stats from metadata
Expand Down Expand Up @@ -289,13 +329,15 @@ func (s *batchStrategy) flushBuffer(outputChan chan *message.Payload) {
s.lastPatternID = 0
s.lastTagsDictIndex = 0
s.lastServiceDictID = 0
s.lastStatusDictID = 0
s.lastJSONSchemaDict = 0

s.sendMessagesWithDatums(messagesMetadata, grpcDatums, outputChan)
}

func isWireStateDatum(datum *statefulpb.Datum) bool {
switch datum.Data.(type) {
case *statefulpb.Datum_PatternDelete, *statefulpb.Datum_DictEntryDelete:
case *statefulpb.Datum_PatternDelete, *statefulpb.Datum_DictEntryDelete, *statefulpb.Datum_JsonSchemaDelete:
return false
default:
return true
Expand Down Expand Up @@ -333,12 +375,18 @@ func (s *batchStrategy) sendMessagesWithDatums(messagesMetadata []*message.Messa
datumType = "pattern_delete"
case *statefulpb.Datum_Logs:
datumType = "logs"
case *statefulpb.Datum_FlatLog:
datumType = "logs"
case *statefulpb.Datum_DictEntryDefine:
datumType = "dict_entry_define"
case *statefulpb.Datum_DictEntryDelete:
datumType = "dict_entry_delete"
case *statefulpb.Datum_DeltaEncodingSync:
datumType = "delta_encoding_sync"
case *statefulpb.Datum_JsonSchemaDefine:
datumType = "json_schema_define"
case *statefulpb.Datum_JsonSchemaDelete:
datumType = "json_schema_delete"
default:
datumType = "unknown"
}
Expand Down
19 changes: 19 additions & 0 deletions pkg/logs/sender/grpc/flat_log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

package grpc

const flatLogEmptyDictIndex uint64 = 1

func flatLogDictIndex(id uint64) uint64 {
if id == 0 {
return flatLogEmptyDictIndex
}
return id
}

func isFlatLogEmptyDictIndex(id uint64) bool {
return id == 0 || id == flatLogEmptyDictIndex
}
Loading
Loading