diff --git a/pkg/logs/sender/grpc/batch_strategy.go b/pkg/logs/sender/grpc/batch_strategy.go index 5ef9b22a2168..80798d6b2893 100644 --- a/pkg/logs/sender/grpc/batch_strategy.go +++ b/pkg/logs/sender/grpc/batch_strategy.go @@ -29,6 +29,8 @@ var ( // When false: patternId, tags, and timestamp are sent as absolute values on every Log datum. const enableDeltaEncoding = true +const datumBytesTelemetrySampleRate = 16 + // StatefulExtra holds state changes (non-Log datums) from a batch // Used by inflight tracker to maintain snapshot state for stream rotation type StatefulExtra struct { @@ -59,6 +61,8 @@ type batchStrategy struct { // marshalBuf is reused across flushes for proto.Marshal output to reduce per-flush allocations. marshalBuf []byte + // datumBytesTelemetryCount keeps sampling unbiased across batch boundaries. + datumBytesTelemetryCount uint64 // Delta encoding state - tracks previous values within current batch lastTimestamp int64 // milliseconds since epoch @@ -179,21 +183,92 @@ func deltaEncodeDatumsForWire(datums []*statefulpb.Datum) []*statefulpb.Datum { encoded := make([]*statefulpb.Datum, 0, len(datums)) state := batchStrategy{} for _, datum := range datums { - cloned := proto.Clone(datum).(*statefulpb.Datum) - if patternDefine := cloned.GetPatternDefine(); patternDefine != nil { - state.lastPatternID = patternDefine.PatternId - } - if logDatum := cloned.GetLogs(); logDatum != nil { - state.applyDeltaEncoding(logDatum) - } - if flatLogDatum := cloned.GetFlatLog(); flatLogDatum != nil { - state.applyFlatLogDeltaEncoding(flatLogDatum) + switch data := datum.GetData().(type) { + case *statefulpb.Datum_PatternDefine: + if data.PatternDefine != nil { + state.lastPatternID = data.PatternDefine.PatternId + } + encoded = append(encoded, datum) + case *statefulpb.Datum_Logs: + if data.Logs == nil { + encoded = append(encoded, datum) + continue + } + cloned := cloneLogForDeltaEncoding(data.Logs) + state.applyDeltaEncoding(cloned) + encoded = append(encoded, &statefulpb.Datum{ + Data: &statefulpb.Datum_Logs{Logs: cloned}, + }) + case *statefulpb.Datum_FlatLog: + if data.FlatLog == nil { + encoded = append(encoded, datum) + continue + } + cloned := cloneFlatLogForDeltaEncoding(data.FlatLog) + state.applyFlatLogDeltaEncoding(cloned) + encoded = append(encoded, &statefulpb.Datum{ + Data: &statefulpb.Datum_FlatLog{FlatLog: cloned}, + }) + default: + encoded = append(encoded, datum) } - encoded = append(encoded, cloned) } return encoded } +func cloneLogForDeltaEncoding(logDatum *statefulpb.Log) *statefulpb.Log { + cloned := &statefulpb.Log{ + Timestamp: logDatum.Timestamp, + Tags: logDatum.Tags, + Uuid: logDatum.Uuid, + Status: logDatum.Status, + Service: logDatum.Service, + } + switch content := logDatum.Content.(type) { + case *statefulpb.Log_Structured: + clonedStructured := cloneStructuredLogForDeltaEncoding(content.Structured) + cloned.Content = &statefulpb.Log_Structured{Structured: clonedStructured} + case *statefulpb.Log_Raw: + cloned.Content = &statefulpb.Log_Raw{Raw: content.Raw} + } + return cloned +} + +func cloneStructuredLogForDeltaEncoding(logDatum *statefulpb.StructuredLog) *statefulpb.StructuredLog { + if logDatum == nil { + return nil + } + return &statefulpb.StructuredLog{ + PatternId: logDatum.PatternId, + DynamicValues: logDatum.DynamicValues, + JsonMessageKey: logDatum.JsonMessageKey, + JsonContextSchemaId: logDatum.JsonContextSchemaId, + JsonContextValues: logDatum.JsonContextValues, + JsonContext: logDatum.JsonContext, + } +} + +func cloneFlatLogForDeltaEncoding(logDatum *statefulpb.FlatLog) *statefulpb.FlatLog { + return &statefulpb.FlatLog{ + Timestamp: logDatum.Timestamp, + Status: logDatum.Status, + Service: logDatum.Service, + Tags: logDatum.Tags, + PatternId: logDatum.PatternId, + DynamicValues: logDatum.DynamicValues, + RawLog: logDatum.RawLog, + JsonSchemaId: logDatum.JsonSchemaId, + JsonContextValues: logDatum.JsonContextValues, + JsonContextValueKinds: logDatum.JsonContextValueKinds, + JsonContextIntValues: logDatum.JsonContextIntValues, + JsonContextFloatValues: logDatum.JsonContextFloatValues, + JsonContextDictValues: logDatum.JsonContextDictValues, + JsonContextRawValues: logDatum.JsonContextRawValues, + JsonContextStringValues: logDatum.JsonContextStringValues, + Uuid: logDatum.Uuid, + } +} + func (s *batchStrategy) applyTimestampDeltaEncoding(timestamp *int64) { currentTimestamp := *timestamp if s.lastTimestamp == 0 { @@ -340,33 +415,13 @@ func (s *batchStrategy) sendMessagesWithDatums(messagesMetadata []*message.Messa stateChanges, wireDatums := splitStateAndWireDatums(grpcDatums) - // Track per-datum-type counts and sizes for _, datum := range grpcDatums { - var datumType string - switch datum.Data.(type) { - case *statefulpb.Datum_PatternDefine: - datumType = "pattern_define" - case *statefulpb.Datum_PatternDelete: - datumType = "pattern_delete" - case *statefulpb.Datum_Logs: - datumType = "logs" - case *statefulpb.Datum_FlatLog: - datumType = "logs" - case *statefulpb.Datum_DictEntryDefine: - datumType = "dict_entry_define" - case *statefulpb.Datum_DictEntryDelete: - datumType = "dict_entry_delete" - case *statefulpb.Datum_DeltaEncodingSync: - datumType = "delta_encoding_sync" - case *statefulpb.Datum_JsonSchemaDefine: - datumType = "json_schema_define" - case *statefulpb.Datum_JsonSchemaDelete: - datumType = "json_schema_delete" - default: - datumType = "unknown" - } + datumType := datumTelemetryType(datum) metrics.TlmDatumCount.Add(1, datumType) - metrics.TlmDatumBytes.Add(float64(proto.Size(datum)), datumType) + if s.datumBytesTelemetryCount%datumBytesTelemetrySampleRate == 0 { + metrics.TlmDatumBytes.Add(float64(proto.Size(datum)*datumBytesTelemetrySampleRate), datumType) + } + s.datumBytesTelemetryCount++ } encodedDatums := deltaEncodeDatumsForWire(wireDatums) @@ -416,3 +471,26 @@ func (s *batchStrategy) sendMessagesWithDatums(messagesMetadata []*message.Messa s.pipelineMonitor.ReportComponentEgress(p, metrics.StrategyTlmName, s.instanceID) s.pipelineMonitor.ReportComponentIngress(p, metrics.SenderTlmName, metrics.SenderTlmInstanceID) } + +func datumTelemetryType(datum *statefulpb.Datum) string { + switch datum.Data.(type) { + case *statefulpb.Datum_PatternDefine: + return "pattern_define" + case *statefulpb.Datum_PatternDelete: + return "pattern_delete" + case *statefulpb.Datum_Logs, *statefulpb.Datum_FlatLog: + return "logs" + case *statefulpb.Datum_DictEntryDefine: + return "dict_entry_define" + case *statefulpb.Datum_DictEntryDelete: + return "dict_entry_delete" + case *statefulpb.Datum_DeltaEncodingSync: + return "delta_encoding_sync" + case *statefulpb.Datum_JsonSchemaDefine: + return "json_schema_define" + case *statefulpb.Datum_JsonSchemaDelete: + return "json_schema_delete" + default: + return "unknown" + } +} diff --git a/pkg/logs/sender/grpc/inflight.go b/pkg/logs/sender/grpc/inflight.go index eb551b0dcfcf..5a0dc82cc7ea 100644 --- a/pkg/logs/sender/grpc/inflight.go +++ b/pkg/logs/sender/grpc/inflight.go @@ -86,7 +86,12 @@ func (t *inflightTracker) pop() *message.Payload { // Apply state changes from this payload to snapshot if payload.StatefulExtra != nil { if extra, ok := payload.StatefulExtra.(*StatefulExtra); ok { - if refs, ok := t.inflightReferences(); ok { + if t.ackNeedsProtectedRefs(extra) { + refs, ok := t.inflightReferences() + if !ok { + t.planner.applyAcked(extra, nil) + return payload + } t.planner.applyAcked(extra, &refs) } else { t.planner.applyAcked(extra, nil) @@ -218,6 +223,10 @@ func (t *inflightTracker) resetStreamSent() { t.planner.resetStream() } +func (t *inflightTracker) ackNeedsProtectedRefs(extra *StatefulExtra) bool { + return t.planner.hasDeferredDeletes() || stateChangesContainDeletes(extra.StateChanges) +} + func (t *inflightTracker) inflightReferences() (stateReferences, bool) { refs := newStateReferences() for count, index := 0, t.head; count < t.totalCount(); count++ { diff --git a/pkg/logs/sender/grpc/mock_state.go b/pkg/logs/sender/grpc/mock_state.go index 7c48cdbf2013..1b6539595a86 100644 --- a/pkg/logs/sender/grpc/mock_state.go +++ b/pkg/logs/sender/grpc/mock_state.go @@ -547,7 +547,8 @@ func (mt *MessageTranslator) buildTagSet(msg *message.Message) (*statefulpb.TagS currentOrigin := msg.Origin currentHostname := msg.MessageMetadata.Hostname currentSource := msg.Origin.Source() - currentTagsString := msg.MessageMetadata.TagsToString() + baseTags := msg.MessageMetadata.Tags() + currentTagsString := strings.Join(baseTags, ",") currentProcessingTags := strings.Join(msg.MessageMetadata.ProcessingTags, ",") // Cache hit: all inputs identical and cached dict index still live (not evicted). @@ -563,8 +564,6 @@ func (mt *MessageTranslator) buildTagSet(msg *message.Message) (*statefulpb.TagS // Cache miss: build tag string normally. - // Start with metadata tags (container tags, source config tags, processing tags) - baseTags := msg.MessageMetadata.Tags() tagStrings := make([]string, len(baseTags), len(baseTags)+4) copy(tagStrings, baseTags) @@ -1037,6 +1036,9 @@ func buildJsonSchemaDelete(schemaID uint64) *statefulpb.Datum { // like "00123" are kept as strings so they can round-trip without losing lexeme // fidelity when reconstructed downstream. func parseLosslessIntString(value string) (int64, bool) { + if !couldBeInteger(value) { + return 0, false + } intVal, err := strconv.ParseInt(value, 10, 64) if err != nil { return 0, false @@ -1050,6 +1052,9 @@ func parseLosslessIntString(value string) (int64, bool) { // parseLosslessFloatString returns a float64 only when the original string is already // the canonical representation produced by strconv.FormatFloat(..., 'g', -1, 64). func parseLosslessFloatString(value string) (float64, bool) { + if !couldBeFloat(value) { + return 0, false + } floatVal, err := strconv.ParseFloat(value, 64) if err != nil { return 0, false @@ -1060,6 +1065,47 @@ func parseLosslessFloatString(value string) (float64, bool) { return floatVal, true } +func couldBeInteger(value string) bool { + if value == "" { + return false + } + start := 0 + if value[0] == '-' { + start = 1 + if len(value) == 1 { + return false + } + } + for index := start; index < len(value); index++ { + if value[index] < '0' || value[index] > '9' { + return false + } + } + return true +} + +func couldBeFloat(value string) bool { + if value == "" { + return false + } + hasFloatMarker := false + for index := 0; index < len(value); index++ { + switch value[index] { + case '.', 'e', 'E': + hasFloatMarker = true + case '+', '-': + if index != 0 && value[index-1] != 'e' && value[index-1] != 'E' { + return false + } + default: + if value[index] < '0' || value[index] > '9' { + return false + } + } + } + return hasFloatMarker +} + func parseLosslessBoolString(value string) (bool, bool) { switch value { case "true": diff --git a/pkg/logs/sender/grpc/state_planner.go b/pkg/logs/sender/grpc/state_planner.go index 05ddc14dca4c..eba2140c4708 100644 --- a/pkg/logs/sender/grpc/state_planner.go +++ b/pkg/logs/sender/grpc/state_planner.go @@ -37,6 +37,10 @@ func (p *statePlanner) applyAcked(extra *StatefulExtra, protectedRefs *stateRefe p.snapshot.applyWithProtectedRefs(extra, &expanded) } +func (p *statePlanner) hasDeferredDeletes() bool { + return p.snapshot.hasDeferredDeletes() +} + func (p *statePlanner) snapshotBytes(refs *stateReferences) ([]byte, stateReferences) { var snapshotRefs *stateReferences if refs != nil { @@ -345,6 +349,12 @@ func (s *snapshotState) pruneDeferredDeletes(protectedRefs *stateReferences) { } } +func (s *snapshotState) hasDeferredDeletes() bool { + return len(s.deferredDeletes.patternIDs) > 0 || + len(s.deferredDeletes.dictEntryIDs) > 0 || + len(s.deferredDeletes.jsonSchemaIDs) > 0 +} + func (s *snapshotState) serialize(refs *stateReferences) ([]byte, stateReferences) { sent := newStateReferences() datums := make([]*statefulpb.Datum, 0, len(s.patternMap)+len(s.dictMap)+len(s.jsonSchemaMap)) @@ -493,6 +503,18 @@ func splitStateAndWireDatums(datums []*statefulpb.Datum) (stateChanges []*statef return stateChanges, wireDatums } +func stateChangesContainDeletes(datums []*statefulpb.Datum) bool { + for _, datum := range datums { + switch datum.Data.(type) { + case *statefulpb.Datum_PatternDelete, + *statefulpb.Datum_DictEntryDelete, + *statefulpb.Datum_JsonSchemaDelete: + return true + } + } + return false +} + func isStateDatum(datum *statefulpb.Datum) bool { switch datum.Data.(type) { case *statefulpb.Datum_PatternDefine, *statefulpb.Datum_PatternDelete,