Skip to content
Draft

cpu mem #50975

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 113 additions & 35 deletions pkg/logs/sender/grpc/batch_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ var (
// When false: patternId, tags, and timestamp are sent as absolute values on every Log datum.
const enableDeltaEncoding = true

const datumBytesTelemetrySampleRate = 16

// StatefulExtra holds state changes (non-Log datums) from a batch
// Used by inflight tracker to maintain snapshot state for stream rotation
type StatefulExtra struct {
Expand Down Expand Up @@ -59,6 +61,8 @@ type batchStrategy struct {

// marshalBuf is reused across flushes for proto.Marshal output to reduce per-flush allocations.
marshalBuf []byte
// datumBytesTelemetryCount keeps sampling unbiased across batch boundaries.
datumBytesTelemetryCount uint64

// Delta encoding state - tracks previous values within current batch
lastTimestamp int64 // milliseconds since epoch
Expand Down Expand Up @@ -179,21 +183,92 @@ func deltaEncodeDatumsForWire(datums []*statefulpb.Datum) []*statefulpb.Datum {
encoded := make([]*statefulpb.Datum, 0, len(datums))
state := batchStrategy{}
for _, datum := range datums {
cloned := proto.Clone(datum).(*statefulpb.Datum)
if patternDefine := cloned.GetPatternDefine(); patternDefine != nil {
state.lastPatternID = patternDefine.PatternId
}
if logDatum := cloned.GetLogs(); logDatum != nil {
state.applyDeltaEncoding(logDatum)
}
if flatLogDatum := cloned.GetFlatLog(); flatLogDatum != nil {
state.applyFlatLogDeltaEncoding(flatLogDatum)
switch data := datum.GetData().(type) {
case *statefulpb.Datum_PatternDefine:
if data.PatternDefine != nil {
state.lastPatternID = data.PatternDefine.PatternId
}
encoded = append(encoded, datum)
case *statefulpb.Datum_Logs:
if data.Logs == nil {
encoded = append(encoded, datum)
continue
}
cloned := cloneLogForDeltaEncoding(data.Logs)
state.applyDeltaEncoding(cloned)
encoded = append(encoded, &statefulpb.Datum{
Data: &statefulpb.Datum_Logs{Logs: cloned},
})
case *statefulpb.Datum_FlatLog:
if data.FlatLog == nil {
encoded = append(encoded, datum)
continue
}
cloned := cloneFlatLogForDeltaEncoding(data.FlatLog)
state.applyFlatLogDeltaEncoding(cloned)
encoded = append(encoded, &statefulpb.Datum{
Data: &statefulpb.Datum_FlatLog{FlatLog: cloned},
})
default:
encoded = append(encoded, datum)
}
encoded = append(encoded, cloned)
}
return encoded
}

func cloneLogForDeltaEncoding(logDatum *statefulpb.Log) *statefulpb.Log {
cloned := &statefulpb.Log{
Timestamp: logDatum.Timestamp,
Tags: logDatum.Tags,
Uuid: logDatum.Uuid,
Status: logDatum.Status,
Service: logDatum.Service,
}
switch content := logDatum.Content.(type) {
case *statefulpb.Log_Structured:
clonedStructured := cloneStructuredLogForDeltaEncoding(content.Structured)
cloned.Content = &statefulpb.Log_Structured{Structured: clonedStructured}
case *statefulpb.Log_Raw:
cloned.Content = &statefulpb.Log_Raw{Raw: content.Raw}
}
return cloned
}

func cloneStructuredLogForDeltaEncoding(logDatum *statefulpb.StructuredLog) *statefulpb.StructuredLog {
if logDatum == nil {
return nil
}
return &statefulpb.StructuredLog{
PatternId: logDatum.PatternId,
DynamicValues: logDatum.DynamicValues,
JsonMessageKey: logDatum.JsonMessageKey,
JsonContextSchemaId: logDatum.JsonContextSchemaId,
JsonContextValues: logDatum.JsonContextValues,
JsonContext: logDatum.JsonContext,
}
}

func cloneFlatLogForDeltaEncoding(logDatum *statefulpb.FlatLog) *statefulpb.FlatLog {
return &statefulpb.FlatLog{
Timestamp: logDatum.Timestamp,
Status: logDatum.Status,
Service: logDatum.Service,
Tags: logDatum.Tags,
PatternId: logDatum.PatternId,
DynamicValues: logDatum.DynamicValues,
RawLog: logDatum.RawLog,
JsonSchemaId: logDatum.JsonSchemaId,
JsonContextValues: logDatum.JsonContextValues,
JsonContextValueKinds: logDatum.JsonContextValueKinds,
JsonContextIntValues: logDatum.JsonContextIntValues,
JsonContextFloatValues: logDatum.JsonContextFloatValues,
JsonContextDictValues: logDatum.JsonContextDictValues,
JsonContextRawValues: logDatum.JsonContextRawValues,
JsonContextStringValues: logDatum.JsonContextStringValues,
Uuid: logDatum.Uuid,
}
}

func (s *batchStrategy) applyTimestampDeltaEncoding(timestamp *int64) {
currentTimestamp := *timestamp
if s.lastTimestamp == 0 {
Expand Down Expand Up @@ -340,33 +415,13 @@ func (s *batchStrategy) sendMessagesWithDatums(messagesMetadata []*message.Messa

stateChanges, wireDatums := splitStateAndWireDatums(grpcDatums)

// Track per-datum-type counts and sizes
for _, datum := range grpcDatums {
var datumType string
switch datum.Data.(type) {
case *statefulpb.Datum_PatternDefine:
datumType = "pattern_define"
case *statefulpb.Datum_PatternDelete:
datumType = "pattern_delete"
case *statefulpb.Datum_Logs:
datumType = "logs"
case *statefulpb.Datum_FlatLog:
datumType = "logs"
case *statefulpb.Datum_DictEntryDefine:
datumType = "dict_entry_define"
case *statefulpb.Datum_DictEntryDelete:
datumType = "dict_entry_delete"
case *statefulpb.Datum_DeltaEncodingSync:
datumType = "delta_encoding_sync"
case *statefulpb.Datum_JsonSchemaDefine:
datumType = "json_schema_define"
case *statefulpb.Datum_JsonSchemaDelete:
datumType = "json_schema_delete"
default:
datumType = "unknown"
}
datumType := datumTelemetryType(datum)
metrics.TlmDatumCount.Add(1, datumType)
metrics.TlmDatumBytes.Add(float64(proto.Size(datum)), datumType)
if s.datumBytesTelemetryCount%datumBytesTelemetrySampleRate == 0 {
metrics.TlmDatumBytes.Add(float64(proto.Size(datum)*datumBytesTelemetrySampleRate), datumType)
}
s.datumBytesTelemetryCount++
}

encodedDatums := deltaEncodeDatumsForWire(wireDatums)
Expand Down Expand Up @@ -416,3 +471,26 @@ func (s *batchStrategy) sendMessagesWithDatums(messagesMetadata []*message.Messa
s.pipelineMonitor.ReportComponentEgress(p, metrics.StrategyTlmName, s.instanceID)
s.pipelineMonitor.ReportComponentIngress(p, metrics.SenderTlmName, metrics.SenderTlmInstanceID)
}

func datumTelemetryType(datum *statefulpb.Datum) string {
switch datum.Data.(type) {
case *statefulpb.Datum_PatternDefine:
return "pattern_define"
case *statefulpb.Datum_PatternDelete:
return "pattern_delete"
case *statefulpb.Datum_Logs, *statefulpb.Datum_FlatLog:
return "logs"
case *statefulpb.Datum_DictEntryDefine:
return "dict_entry_define"
case *statefulpb.Datum_DictEntryDelete:
return "dict_entry_delete"
case *statefulpb.Datum_DeltaEncodingSync:
return "delta_encoding_sync"
case *statefulpb.Datum_JsonSchemaDefine:
return "json_schema_define"
case *statefulpb.Datum_JsonSchemaDelete:
return "json_schema_delete"
default:
return "unknown"
}
}
11 changes: 10 additions & 1 deletion pkg/logs/sender/grpc/inflight.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,12 @@ func (t *inflightTracker) pop() *message.Payload {
// Apply state changes from this payload to snapshot
if payload.StatefulExtra != nil {
if extra, ok := payload.StatefulExtra.(*StatefulExtra); ok {
if refs, ok := t.inflightReferences(); ok {
if t.ackNeedsProtectedRefs(extra) {
refs, ok := t.inflightReferences()
if !ok {
t.planner.applyAcked(extra, nil)
return payload
}
t.planner.applyAcked(extra, &refs)
} else {
t.planner.applyAcked(extra, nil)
Expand Down Expand Up @@ -218,6 +223,10 @@ func (t *inflightTracker) resetStreamSent() {
t.planner.resetStream()
}

func (t *inflightTracker) ackNeedsProtectedRefs(extra *StatefulExtra) bool {
return t.planner.hasDeferredDeletes() || stateChangesContainDeletes(extra.StateChanges)
}

func (t *inflightTracker) inflightReferences() (stateReferences, bool) {
refs := newStateReferences()
for count, index := 0, t.head; count < t.totalCount(); count++ {
Expand Down
52 changes: 49 additions & 3 deletions pkg/logs/sender/grpc/mock_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,8 @@ func (mt *MessageTranslator) buildTagSet(msg *message.Message) (*statefulpb.TagS
currentOrigin := msg.Origin
currentHostname := msg.MessageMetadata.Hostname
currentSource := msg.Origin.Source()
currentTagsString := msg.MessageMetadata.TagsToString()
baseTags := msg.MessageMetadata.Tags()
currentTagsString := strings.Join(baseTags, ",")
currentProcessingTags := strings.Join(msg.MessageMetadata.ProcessingTags, ",")

// Cache hit: all inputs identical and cached dict index still live (not evicted).
Expand All @@ -563,8 +564,6 @@ func (mt *MessageTranslator) buildTagSet(msg *message.Message) (*statefulpb.TagS

// Cache miss: build tag string normally.

// Start with metadata tags (container tags, source config tags, processing tags)
baseTags := msg.MessageMetadata.Tags()
tagStrings := make([]string, len(baseTags), len(baseTags)+4)
copy(tagStrings, baseTags)

Expand Down Expand Up @@ -1037,6 +1036,9 @@ func buildJsonSchemaDelete(schemaID uint64) *statefulpb.Datum {
// like "00123" are kept as strings so they can round-trip without losing lexeme
// fidelity when reconstructed downstream.
func parseLosslessIntString(value string) (int64, bool) {
if !couldBeInteger(value) {
return 0, false
}
intVal, err := strconv.ParseInt(value, 10, 64)
if err != nil {
return 0, false
Expand All @@ -1050,6 +1052,9 @@ func parseLosslessIntString(value string) (int64, bool) {
// parseLosslessFloatString returns a float64 only when the original string is already
// the canonical representation produced by strconv.FormatFloat(..., 'g', -1, 64).
func parseLosslessFloatString(value string) (float64, bool) {
if !couldBeFloat(value) {
return 0, false
}
floatVal, err := strconv.ParseFloat(value, 64)
if err != nil {
return 0, false
Expand All @@ -1060,6 +1065,47 @@ func parseLosslessFloatString(value string) (float64, bool) {
return floatVal, true
}

func couldBeInteger(value string) bool {
if value == "" {
return false
}
start := 0
if value[0] == '-' {
start = 1
if len(value) == 1 {
return false
}
}
for index := start; index < len(value); index++ {
if value[index] < '0' || value[index] > '9' {
return false
}
}
return true
}

func couldBeFloat(value string) bool {
if value == "" {
return false
}
hasFloatMarker := false
for index := 0; index < len(value); index++ {
switch value[index] {
case '.', 'e', 'E':
hasFloatMarker = true
case '+', '-':
if index != 0 && value[index-1] != 'e' && value[index-1] != 'E' {
return false
}
default:
if value[index] < '0' || value[index] > '9' {
return false
}
}
}
return hasFloatMarker
}

func parseLosslessBoolString(value string) (bool, bool) {
switch value {
case "true":
Expand Down
22 changes: 22 additions & 0 deletions pkg/logs/sender/grpc/state_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ func (p *statePlanner) applyAcked(extra *StatefulExtra, protectedRefs *stateRefe
p.snapshot.applyWithProtectedRefs(extra, &expanded)
}

func (p *statePlanner) hasDeferredDeletes() bool {
return p.snapshot.hasDeferredDeletes()
}

func (p *statePlanner) snapshotBytes(refs *stateReferences) ([]byte, stateReferences) {
var snapshotRefs *stateReferences
if refs != nil {
Expand Down Expand Up @@ -345,6 +349,12 @@ func (s *snapshotState) pruneDeferredDeletes(protectedRefs *stateReferences) {
}
}

func (s *snapshotState) hasDeferredDeletes() bool {
return len(s.deferredDeletes.patternIDs) > 0 ||
len(s.deferredDeletes.dictEntryIDs) > 0 ||
len(s.deferredDeletes.jsonSchemaIDs) > 0
}

func (s *snapshotState) serialize(refs *stateReferences) ([]byte, stateReferences) {
sent := newStateReferences()
datums := make([]*statefulpb.Datum, 0, len(s.patternMap)+len(s.dictMap)+len(s.jsonSchemaMap))
Expand Down Expand Up @@ -493,6 +503,18 @@ func splitStateAndWireDatums(datums []*statefulpb.Datum) (stateChanges []*statef
return stateChanges, wireDatums
}

func stateChangesContainDeletes(datums []*statefulpb.Datum) bool {
for _, datum := range datums {
switch datum.Data.(type) {
case *statefulpb.Datum_PatternDelete,
*statefulpb.Datum_DictEntryDelete,
*statefulpb.Datum_JsonSchemaDelete:
return true
}
}
return false
}

func isStateDatum(datum *statefulpb.Datum) bool {
switch datum.Data.(type) {
case *statefulpb.Datum_PatternDefine, *statefulpb.Datum_PatternDelete,
Expand Down
Loading