diff --git a/pkg/logs/patterns/tags/tag_manager.go b/pkg/logs/patterns/tags/tag_manager.go index 396d5c4ebb3a..1953340de0a7 100644 --- a/pkg/logs/patterns/tags/tag_manager.go +++ b/pkg/logs/patterns/tags/tag_manager.go @@ -43,6 +43,8 @@ func NewTagManager() *TagManager { idToEntry: make(map[uint64]*tagEntry), pendingDynamic: make(map[string]uint16), } + // Stateful FlatLog reserves dict index 1 as the empty/omit sentinel. + tm.nextID.Store(1) return tm } diff --git a/pkg/logs/patterns/tags/tag_manager_test.go b/pkg/logs/patterns/tags/tag_manager_test.go index b2c8d061d4e9..48807c747856 100644 --- a/pkg/logs/patterns/tags/tag_manager_test.go +++ b/pkg/logs/patterns/tags/tag_manager_test.go @@ -64,11 +64,19 @@ func TestTagManager_EncodeTagStrings_MixedNewAndExisting(t *testing.T) { _, secondEntries := tm.EncodeTagStrings([]string{"env:production", "service:api"}) assert.Len(t, secondEntries, 2) - assert.Equal(t, "service", secondEntries[3]) - assert.Equal(t, "api", secondEntries[4]) + assert.Contains(t, mapValues(secondEntries), "service") + assert.Contains(t, mapValues(secondEntries), "api") assert.Equal(t, 4, tm.Count()) } +func mapValues[K comparable, V any](values map[K]V) []V { + out := make([]V, 0, len(values)) + for _, value := range values { + out = append(out, value) + } + return out +} + func TestTagManager_EncodeTagStrings_InvalidFormats(t *testing.T) { tm := NewTagManager() diff --git a/pkg/logs/sender/grpc/batch_strategy.go b/pkg/logs/sender/grpc/batch_strategy.go index 27e812bc2cc0..de447ff7ecf1 100644 --- a/pkg/logs/sender/grpc/batch_strategy.go +++ b/pkg/logs/sender/grpc/batch_strategy.go @@ -45,7 +45,8 @@ type StatefulExtra struct { func isStateDatum(datum *statefulpb.Datum) bool { switch datum.Data.(type) { case *statefulpb.Datum_PatternDefine, *statefulpb.Datum_PatternDelete, - *statefulpb.Datum_DictEntryDefine, *statefulpb.Datum_DictEntryDelete: + *statefulpb.Datum_DictEntryDefine, *statefulpb.Datum_DictEntryDelete, + *statefulpb.Datum_JsonSchemaDefine, *statefulpb.Datum_JsonSchemaDelete: return true default: return false @@ -73,10 +74,12 @@ type batchStrategy struct { marshalBuf []byte // Delta encoding state - tracks previous values within current batch - lastTimestamp int64 // milliseconds since epoch - lastPatternID uint64 // pattern identifier - lastTagsDictIndex uint64 // dictionary index of tag string - lastServiceDictID uint64 // dictionary index of the service field + lastTimestamp int64 // milliseconds since epoch + lastPatternID uint64 // pattern identifier + lastTagsDictIndex uint64 // dictionary index of tag string + lastServiceDictID uint64 // dictionary index of the service field + lastStatusDictID uint64 // dictionary index of the status field + lastJSONSchemaDict uint64 // dictionary index of the JSON schema field // Telemetry pipelineMonitor metrics.PipelineMonitor @@ -188,29 +191,33 @@ func (s *batchStrategy) addMessage(m *message.StatefulMessage) bool { if logDatum := m.Datum.GetLogs(); logDatum != nil { s.applyDeltaEncoding(logDatum) } + if flatLogDatum := m.Datum.GetFlatLog(); flatLogDatum != nil { + s.applyFlatLogDeltaEncoding(flatLogDatum) + } s.grpcDatums = append(s.grpcDatums, m.Datum) return true } +func (s *batchStrategy) applyTimestampDeltaEncoding(timestamp *int64) { + currentTimestamp := *timestamp + if s.lastTimestamp == 0 { + s.lastTimestamp = currentTimestamp + return + } + + delta := currentTimestamp - s.lastTimestamp + s.lastTimestamp = currentTimestamp + *timestamp = delta // Note that when delta is 0, proto3 omits the timestamp field. +} + // applyDeltaEncoding applies delta encoding to a Log datum within the current batch. func (s *batchStrategy) applyDeltaEncoding(logDatum *statefulpb.Log) { if !enableDeltaEncoding { return } // Timestamp delta encoding - currentTimestamp := logDatum.Timestamp - - // First message in batch: send absolute timestamp - if s.lastTimestamp == 0 { - s.lastTimestamp = currentTimestamp - // Keep absolute value in logDatum.Timestamp - } else { - // Normal case: compute and send delta - delta := currentTimestamp - s.lastTimestamp - s.lastTimestamp = currentTimestamp - logDatum.Timestamp = delta // Note that when delta is 0, proto3 omits the timestamp field - } + s.applyTimestampDeltaEncoding(&logDatum.Timestamp) // Pattern ID delta encoding (for structured logs only) if structured := logDatum.GetStructured(); structured != nil { @@ -246,6 +253,39 @@ func (s *batchStrategy) applyDeltaEncoding(logDatum *statefulpb.Log) { } } +// applyFlatLogDeltaEncoding applies delta encoding to a FlatLog datum within the current batch. +func (s *batchStrategy) applyFlatLogDeltaEncoding(logDatum *statefulpb.FlatLog) { + if !enableDeltaEncoding { + return + } + + s.applyTimestampDeltaEncoding(&logDatum.Timestamp) + + logDatum.Status = s.deltaFlatLogDictIndex(logDatum.Status, &s.lastStatusDictID) + logDatum.Service = s.deltaFlatLogDictIndex(logDatum.Service, &s.lastServiceDictID) + logDatum.Tags = s.deltaFlatLogDictIndex(logDatum.Tags, &s.lastTagsDictIndex) + logDatum.JsonSchemaId = s.deltaFlatLogDictIndex(logDatum.JsonSchemaId, &s.lastJSONSchemaDict) + + if logDatum.RawLog == "" { + if logDatum.PatternId == s.lastPatternID { + logDatum.PatternId = 0 + } else { + s.lastPatternID = logDatum.PatternId + } + } +} + +func (s *batchStrategy) deltaFlatLogDictIndex(current uint64, last *uint64) uint64 { + if current == 0 { + current = flatLogEmptyDictIndex + } + if current == *last { + return 0 + } + *last = current + return current +} + // Mostly copy/pasted from batch.go func (s *batchStrategy) processMessage(m *message.StatefulMessage, outputChan chan *message.Payload) { // Track latency stats from metadata @@ -289,13 +329,15 @@ func (s *batchStrategy) flushBuffer(outputChan chan *message.Payload) { s.lastPatternID = 0 s.lastTagsDictIndex = 0 s.lastServiceDictID = 0 + s.lastStatusDictID = 0 + s.lastJSONSchemaDict = 0 s.sendMessagesWithDatums(messagesMetadata, grpcDatums, outputChan) } func isWireStateDatum(datum *statefulpb.Datum) bool { switch datum.Data.(type) { - case *statefulpb.Datum_PatternDelete, *statefulpb.Datum_DictEntryDelete: + case *statefulpb.Datum_PatternDelete, *statefulpb.Datum_DictEntryDelete, *statefulpb.Datum_JsonSchemaDelete: return false default: return true @@ -333,12 +375,18 @@ func (s *batchStrategy) sendMessagesWithDatums(messagesMetadata []*message.Messa datumType = "pattern_delete" case *statefulpb.Datum_Logs: datumType = "logs" + case *statefulpb.Datum_FlatLog: + datumType = "logs" case *statefulpb.Datum_DictEntryDefine: datumType = "dict_entry_define" case *statefulpb.Datum_DictEntryDelete: datumType = "dict_entry_delete" case *statefulpb.Datum_DeltaEncodingSync: datumType = "delta_encoding_sync" + case *statefulpb.Datum_JsonSchemaDefine: + datumType = "json_schema_define" + case *statefulpb.Datum_JsonSchemaDelete: + datumType = "json_schema_delete" default: datumType = "unknown" } diff --git a/pkg/logs/sender/grpc/flat_log.go b/pkg/logs/sender/grpc/flat_log.go new file mode 100644 index 000000000000..8ffda7ce177d --- /dev/null +++ b/pkg/logs/sender/grpc/flat_log.go @@ -0,0 +1,19 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +package grpc + +const flatLogEmptyDictIndex uint64 = 1 + +func flatLogDictIndex(id uint64) uint64 { + if id == 0 { + return flatLogEmptyDictIndex + } + return id +} + +func isFlatLogEmptyDictIndex(id uint64) bool { + return id == 0 || id == flatLogEmptyDictIndex +} diff --git a/pkg/logs/sender/grpc/inflight.go b/pkg/logs/sender/grpc/inflight.go index f467ab767914..d44413dd970b 100644 --- a/pkg/logs/sender/grpc/inflight.go +++ b/pkg/logs/sender/grpc/inflight.go @@ -189,8 +189,16 @@ func replayDeltaEncodingSync(datums []*statefulpb.Datum) *statefulpb.Datum { var hasSync bool var currentPatternID uint64 var currentTags *statefulpb.TagSet + var currentFlatLogStatus uint64 + var currentFlatLogService uint64 + var currentFlatLogTags uint64 + var currentFlatLogJSONSchema uint64 var syncedPattern bool var syncedTags bool + var syncedFlatLogStatus bool + var syncedFlatLogService bool + var syncedFlatLogTags bool + var syncedFlatLogJSONSchema bool for _, datum := range datums { if datum == nil { @@ -211,6 +219,19 @@ func replayDeltaEncodingSync(datums []*statefulpb.Datum) *statefulpb.Datum { } if d.DeltaEncodingSync.Tags != nil { currentTags = d.DeltaEncodingSync.Tags + currentFlatLogTags = flatLogTagSetDictIndex(d.DeltaEncodingSync.Tags) + } + if d.DeltaEncodingSync.Status != 0 { + currentFlatLogStatus = d.DeltaEncodingSync.Status + } + if d.DeltaEncodingSync.Service != 0 { + currentFlatLogService = d.DeltaEncodingSync.Service + } + if d.DeltaEncodingSync.FlatLogTags != 0 { + currentFlatLogTags = d.DeltaEncodingSync.FlatLogTags + } + if d.DeltaEncodingSync.JsonSchemaId != 0 { + currentFlatLogJSONSchema = d.DeltaEncodingSync.JsonSchemaId } case *statefulpb.Datum_Logs: logDatum := d.Logs @@ -241,9 +262,36 @@ func replayDeltaEncodingSync(datums []*statefulpb.Datum) *statefulpb.Datum { currentTags = logDatum.Tags } } - } - if syncedPattern && syncedTags { - break + case *statefulpb.Datum_FlatLog: + logDatum := d.FlatLog + if logDatum == nil { + continue + } + if !syncedPattern && logDatum.RawLog == "" { + if logDatum.PatternId == 0 { + if currentPatternID != 0 { + sync.PatternId = currentPatternID + hasSync = true + syncedPattern = true + } + } else { + currentPatternID = logDatum.PatternId + } + } + if !syncedFlatLogTags { + if logDatum.Tags == 0 { + if currentFlatLogTags != 0 { + sync.FlatLogTags = currentFlatLogTags + hasSync = true + syncedFlatLogTags = true + } + } else { + currentFlatLogTags = logDatum.Tags + } + } + currentFlatLogStatus = setDeltaEncodingSyncField(logDatum.Status, currentFlatLogStatus, &sync.Status, &syncedFlatLogStatus, &hasSync) + currentFlatLogService = setDeltaEncodingSyncField(logDatum.Service, currentFlatLogService, &sync.Service, &syncedFlatLogService, &hasSync) + currentFlatLogJSONSchema = setDeltaEncodingSyncField(logDatum.JsonSchemaId, currentFlatLogJSONSchema, &sync.JsonSchemaId, &syncedFlatLogJSONSchema, &hasSync) } } @@ -257,6 +305,21 @@ func replayDeltaEncodingSync(datums []*statefulpb.Datum) *statefulpb.Datum { } } +func setDeltaEncodingSyncField(value uint64, current uint64, syncField *uint64, synced *bool, hasSync *bool) uint64 { + if *synced { + return current + } + if value == 0 { + if current != 0 { + *syncField = current + *hasSync = true + *synced = true + } + return current + } + return value +} + // sentCount returns the number of sent payloads awaiting ack func (t *inflightTracker) sentCount() int { return (t.sentTail - t.head + len(t.items)) % len(t.items) @@ -302,19 +365,22 @@ func (t *inflightTracker) resetStreamSent() { // snapshotState maintains the accumulated state changes for stream bootstrapping // It represents the state "before" the first payload in the inflight queue type snapshotState struct { - dictMap map[uint64]*statefulpb.DictEntryDefine - patternMap map[uint64]*statefulpb.PatternDefine + dictMap map[uint64]*statefulpb.DictEntryDefine + patternMap map[uint64]*statefulpb.PatternDefine + jsonSchemaMap map[uint64]*statefulpb.JsonSchemaDefine } type stateReferences struct { - dictEntryIDs map[uint64]struct{} - patternIDs map[uint64]struct{} + dictEntryIDs map[uint64]struct{} + patternIDs map[uint64]struct{} + jsonSchemaIDs map[uint64]struct{} } func newStateReferences() stateReferences { return stateReferences{ - dictEntryIDs: make(map[uint64]struct{}), - patternIDs: make(map[uint64]struct{}), + dictEntryIDs: make(map[uint64]struct{}), + patternIDs: make(map[uint64]struct{}), + jsonSchemaIDs: make(map[uint64]struct{}), } } @@ -326,6 +392,9 @@ func (r stateReferences) clone() stateReferences { for id := range r.patternIDs { clone.patternIDs[id] = struct{}{} } + for id := range r.jsonSchemaIDs { + clone.jsonSchemaIDs[id] = struct{}{} + } return clone } @@ -339,6 +408,11 @@ func (r stateReferences) hasPattern(id uint64) bool { return ok } +func (r stateReferences) hasJsonSchema(id uint64) bool { + _, ok := r.jsonSchemaIDs[id] + return ok +} + func (r stateReferences) addDictEntry(id uint64) { if id == 0 { return @@ -353,6 +427,13 @@ func (r stateReferences) addPattern(id uint64) { r.patternIDs[id] = struct{}{} } +func (r stateReferences) addJsonSchema(id uint64) { + if id == 0 || id == flatLogEmptyDictIndex { + return + } + r.jsonSchemaIDs[id] = struct{}{} +} + func (r stateReferences) deleteDictEntry(id uint64) { delete(r.dictEntryIDs, id) } @@ -361,11 +442,16 @@ func (r stateReferences) deletePattern(id uint64) { delete(r.patternIDs, id) } +func (r stateReferences) deleteJsonSchema(id uint64) { + delete(r.jsonSchemaIDs, id) +} + // newSnapshotState creates a new empty snapshot state func newSnapshotState() *snapshotState { return &snapshotState{ - dictMap: make(map[uint64]*statefulpb.DictEntryDefine), - patternMap: make(map[uint64]*statefulpb.PatternDefine), + dictMap: make(map[uint64]*statefulpb.DictEntryDefine), + patternMap: make(map[uint64]*statefulpb.PatternDefine), + jsonSchemaMap: make(map[uint64]*statefulpb.JsonSchemaDefine), } } @@ -385,6 +471,10 @@ func (s *snapshotState) apply(extra *StatefulExtra) { s.dictMap[d.DictEntryDefine.Id] = d.DictEntryDefine case *statefulpb.Datum_DictEntryDelete: delete(s.dictMap, d.DictEntryDelete.Id) + case *statefulpb.Datum_JsonSchemaDefine: + s.jsonSchemaMap[d.JsonSchemaDefine.SchemaId] = d.JsonSchemaDefine + case *statefulpb.Datum_JsonSchemaDelete: + delete(s.jsonSchemaMap, d.JsonSchemaDelete.SchemaId) } } } @@ -394,7 +484,7 @@ func (s *snapshotState) apply(extra *StatefulExtra) { // Used to send snapshot on new stream creation func (s *snapshotState) serialize(refs *stateReferences) ([]byte, stateReferences) { sent := newStateReferences() - datums := make([]*statefulpb.Datum, 0, len(s.patternMap)+len(s.dictMap)) + datums := make([]*statefulpb.Datum, 0, len(s.patternMap)+len(s.dictMap)+len(s.jsonSchemaMap)) for id, pattern := range s.patternMap { if refs != nil && !refs.hasPattern(id) { @@ -414,6 +504,15 @@ func (s *snapshotState) serialize(refs *stateReferences) ([]byte, stateReference }) sent.addDictEntry(id) } + for id, schema := range s.jsonSchemaMap { + if refs != nil && !refs.hasJsonSchema(id) { + continue + } + datums = append(datums, &statefulpb.Datum{ + Data: &statefulpb.Datum_JsonSchemaDefine{JsonSchemaDefine: schema}, + }) + sent.addJsonSchema(id) + } if len(datums) == 0 { return nil, sent @@ -455,14 +554,21 @@ func (t *inflightTracker) missingSnapshotDefines(datums []*statefulpb.Datum) []* known.addDictEntry(d.DictEntryDefine.Id) case *statefulpb.Datum_DictEntryDelete: known.deleteDictEntry(d.DictEntryDelete.Id) + case *statefulpb.Datum_JsonSchemaDefine: + known.addJsonSchema(d.JsonSchemaDefine.SchemaId) + addJsonSchemaReferences(known, d.JsonSchemaDefine) + case *statefulpb.Datum_JsonSchemaDelete: + known.deleteJsonSchema(d.JsonSchemaDelete.SchemaId) case *statefulpb.Datum_Logs: t.addMissingLogReferences(missing, known, d.Logs) + case *statefulpb.Datum_FlatLog: + t.addMissingFlatLogReferences(missing, known, d.FlatLog) case *statefulpb.Datum_DeltaEncodingSync: t.addMissingDeltaEncodingSyncReferences(missing, known, d.DeltaEncodingSync) } } - prefix := make([]*statefulpb.Datum, 0, len(missing.patternIDs)+len(missing.dictEntryIDs)) + prefix := make([]*statefulpb.Datum, 0, len(missing.patternIDs)+len(missing.dictEntryIDs)+len(missing.jsonSchemaIDs)) for id := range missing.patternIDs { pattern := t.snapshot.patternMap[id] if pattern == nil { @@ -481,6 +587,15 @@ func (t *inflightTracker) missingSnapshotDefines(datums []*statefulpb.Datum) []* Data: &statefulpb.Datum_DictEntryDefine{DictEntryDefine: entry}, }) } + for id := range missing.jsonSchemaIDs { + schema := t.snapshot.jsonSchemaMap[id] + if schema == nil { + continue + } + prefix = append(prefix, &statefulpb.Datum{ + Data: &statefulpb.Datum_JsonSchemaDefine{JsonSchemaDefine: schema}, + }) + } return prefix } @@ -502,6 +617,21 @@ func (t *inflightTracker) addMissingLogReferences(missing stateReferences, known } } +func (t *inflightTracker) addMissingFlatLogReferences(missing stateReferences, known stateReferences, log *statefulpb.FlatLog) { + if log == nil { + return + } + t.addMissingFlatLogDictEntryReference(missing, known, log.Status) + t.addMissingFlatLogDictEntryReference(missing, known, log.Service) + t.addMissingFlatLogDictEntryReference(missing, known, log.Tags) + if log.RawLog == "" { + t.addMissingPatternReference(missing, known, log.PatternId) + t.addMissingDynamicValueReferences(missing, known, log.DynamicValues) + } + t.addMissingJsonSchemaReference(missing, known, log.JsonSchemaId) + t.addMissingDynamicValueReferences(missing, known, log.JsonContextValues) +} + func (t *inflightTracker) addMissingDeltaEncodingSyncReferences(missing stateReferences, known stateReferences, sync *statefulpb.DeltaEncodingSync) { if sync == nil { return @@ -510,6 +640,10 @@ func (t *inflightTracker) addMissingDeltaEncodingSyncReferences(missing stateRef if sync.Tags != nil { t.addMissingDynamicValueReference(missing, known, sync.Tags.Tagset) } + t.addMissingFlatLogDictEntryReference(missing, known, sync.Status) + t.addMissingFlatLogDictEntryReference(missing, known, sync.Service) + t.addMissingFlatLogDictEntryReference(missing, known, sync.FlatLogTags) + t.addMissingJsonSchemaReference(missing, known, sync.JsonSchemaId) } func (t *inflightTracker) addMissingDynamicValueReferences(missing stateReferences, known stateReferences, values []*statefulpb.DynamicValue) { @@ -525,6 +659,13 @@ func (t *inflightTracker) addMissingDynamicValueReference(missing stateReference t.addMissingDictEntryReference(missing, known, value.GetDictIndex()) } +func (t *inflightTracker) addMissingFlatLogDictEntryReference(missing stateReferences, known stateReferences, id uint64) { + if isFlatLogEmptyDictIndex(id) { + return + } + t.addMissingDictEntryReference(missing, known, id) +} + func (t *inflightTracker) addMissingDictEntryReference(missing stateReferences, known stateReferences, id uint64) { if id == 0 || known.hasDictEntry(id) || t.snapshot.dictMap[id] == nil { return @@ -533,6 +674,22 @@ func (t *inflightTracker) addMissingDictEntryReference(missing stateReferences, known.addDictEntry(id) } +func (t *inflightTracker) addMissingJsonSchemaReference(missing stateReferences, known stateReferences, id uint64) { + if id == 0 || id == flatLogEmptyDictIndex || known.hasJsonSchema(id) { + return + } + schema := t.snapshot.jsonSchemaMap[id] + if schema == nil { + return + } + missing.addJsonSchema(id) + known.addJsonSchema(id) + t.addMissingDictEntryReference(missing, known, schema.MessageKeyId) + for _, keyID := range schema.Keys { + t.addMissingDictEntryReference(missing, known, keyID) + } +} + func (t *inflightTracker) addMissingPatternReference(missing stateReferences, known stateReferences, id uint64) { if id == 0 || known.hasPattern(id) || t.snapshot.patternMap[id] == nil { return @@ -552,6 +709,9 @@ func (t *inflightTracker) markStateSent(payload *message.Payload) { t.streamSent.addPattern(d.PatternDefine.PatternId) case *statefulpb.Datum_DictEntryDefine: t.streamSent.addDictEntry(d.DictEntryDefine.Id) + case *statefulpb.Datum_JsonSchemaDefine: + t.streamSent.addJsonSchema(d.JsonSchemaDefine.SchemaId) + addJsonSchemaReferences(t.streamSent, d.JsonSchemaDefine) } } t.streamSent.applyStateChanges(extra.WireDatums) @@ -568,6 +728,11 @@ func (r stateReferences) applyStateChanges(datums []*statefulpb.Datum) { r.addDictEntry(d.DictEntryDefine.Id) case *statefulpb.Datum_DictEntryDelete: r.deleteDictEntry(d.DictEntryDelete.Id) + case *statefulpb.Datum_JsonSchemaDefine: + r.addJsonSchema(d.JsonSchemaDefine.SchemaId) + addJsonSchemaReferences(r, d.JsonSchemaDefine) + case *statefulpb.Datum_JsonSchemaDelete: + r.deleteJsonSchema(d.JsonSchemaDelete.SchemaId) } } } @@ -577,8 +742,13 @@ func addDatumReferences(refs stateReferences, datums []*statefulpb.Datum) { switch d := datum.Data.(type) { case *statefulpb.Datum_Logs: addLogReferences(refs, d.Logs) + case *statefulpb.Datum_FlatLog: + addFlatLogReferences(refs, d.FlatLog) case *statefulpb.Datum_DeltaEncodingSync: addDeltaEncodingSyncReferences(refs, d.DeltaEncodingSync) + case *statefulpb.Datum_JsonSchemaDefine: + refs.addJsonSchema(d.JsonSchemaDefine.SchemaId) + addJsonSchemaReferences(refs, d.JsonSchemaDefine) } } } @@ -601,6 +771,21 @@ func addLogReferences(refs stateReferences, log *statefulpb.Log) { } } +func addFlatLogReferences(refs stateReferences, log *statefulpb.FlatLog) { + if log == nil { + return + } + addFlatLogDictEntryReference(refs, log.Status) + addFlatLogDictEntryReference(refs, log.Service) + addFlatLogDictEntryReference(refs, log.Tags) + if log.RawLog == "" { + refs.addPattern(log.PatternId) + addDynamicValueReferences(refs, log.DynamicValues) + } + addFlatLogJsonSchemaReference(refs, log.JsonSchemaId) + addDynamicValueReferences(refs, log.JsonContextValues) +} + func addDeltaEncodingSyncReferences(refs stateReferences, sync *statefulpb.DeltaEncodingSync) { if sync == nil { return @@ -609,6 +794,20 @@ func addDeltaEncodingSyncReferences(refs stateReferences, sync *statefulpb.Delta if sync.Tags != nil { addDynamicValueReference(refs, sync.Tags.Tagset) } + addFlatLogDictEntryReference(refs, sync.Status) + addFlatLogDictEntryReference(refs, sync.Service) + addFlatLogDictEntryReference(refs, sync.FlatLogTags) + addFlatLogJsonSchemaReference(refs, sync.JsonSchemaId) +} + +func addJsonSchemaReferences(refs stateReferences, schema *statefulpb.JsonSchemaDefine) { + if schema == nil { + return + } + addFlatLogDictEntryReference(refs, schema.MessageKeyId) + for _, keyID := range schema.Keys { + addFlatLogDictEntryReference(refs, keyID) + } } func addDynamicValueReferences(refs stateReferences, values []*statefulpb.DynamicValue) { @@ -623,3 +822,17 @@ func addDynamicValueReference(refs stateReferences, value *statefulpb.DynamicVal } refs.addDictEntry(value.GetDictIndex()) } + +func addFlatLogDictEntryReference(refs stateReferences, id uint64) { + if isFlatLogEmptyDictIndex(id) { + return + } + refs.addDictEntry(id) +} + +func addFlatLogJsonSchemaReference(refs stateReferences, id uint64) { + if id == 0 || id == flatLogEmptyDictIndex { + return + } + refs.addJsonSchema(id) +} diff --git a/pkg/logs/sender/grpc/inflight_test.go b/pkg/logs/sender/grpc/inflight_test.go index c85cd9f5e1a5..567afd49ac90 100644 --- a/pkg/logs/sender/grpc/inflight_test.go +++ b/pkg/logs/sender/grpc/inflight_test.go @@ -577,6 +577,37 @@ func TestInflightTrackerNextToSendPrependsDeltaEncodingSyncBeforeReplayedDatums( assert.EqualValues(t, 0, datumSeq.Data[4].GetLogs().GetStructured().PatternId) } +func TestInflightTrackerNextToSendDeltaEncodingSyncIncludesFlatLogFields(t *testing.T) { + tracker := newInflightTracker("test", 5) + tracker.snapshot.apply(&StatefulExtra{ + StateChanges: []*statefulpb.Datum{ + createInflightPatternDefine(2, "pattern2"), + createInflightDictEntryDefine(20, "value20"), + createInflightDictEntryDefine(21, "status"), + createInflightDictEntryDefine(22, "service"), + createInflightDictEntryDefine(23, "tags"), + createInflightDictEntryDefine(24, "json-schema"), + }, + }) + tracker.streamSent = newStateReferences() + + require.True(t, tracker.append(createInflightPayloadWithWireDatums( + createInflightFlatLogDatum(2, 20, 21, 22, 23, 24), + createInflightFlatLogDatum(0, 20, 0, 0, 0, 0), + ))) + + encoded, err := tracker.nextToSendEncoded(noopimpl.New()) + require.NoError(t, err) + + datumSeq := decodeInflightDatumSequence(t, encoded) + sync := findInflightDeltaEncodingSync(t, datumSeq.Data) + assert.EqualValues(t, 2, sync.PatternId) + assert.EqualValues(t, 21, sync.Status) + assert.EqualValues(t, 22, sync.Service) + assert.EqualValues(t, 23, sync.FlatLogTags) + assert.EqualValues(t, 24, sync.JsonSchemaId) +} + func TestInflightTrackerNextToSendDoesNotPrependStateDefinedInSamePayload(t *testing.T) { tracker := newInflightTracker("test", 5) tracker.snapshot.apply(&StatefulExtra{ @@ -646,6 +677,27 @@ func createInflightLogDatum(patternID uint64, dictID uint64) *statefulpb.Datum { } } +func createInflightFlatLogDatum(patternID uint64, dictID uint64, status uint64, service uint64, tags uint64, jsonSchemaID uint64) *statefulpb.Datum { + return &statefulpb.Datum{ + Data: &statefulpb.Datum_FlatLog{ + FlatLog: &statefulpb.FlatLog{ + PatternId: patternID, + Status: status, + Service: service, + Tags: tags, + JsonSchemaId: jsonSchemaID, + DynamicValues: []*statefulpb.DynamicValue{ + { + Value: &statefulpb.DynamicValue_DictIndex{ + DictIndex: dictID, + }, + }, + }, + }, + }, + } +} + func createInflightPayloadWithWireDatums(datums ...*statefulpb.Datum) *message.Payload { serialized, _ := proto.Marshal(&statefulpb.DatumSequence{Data: datums}) return &message.Payload{ @@ -663,6 +715,17 @@ func decodeInflightDatumSequence(t *testing.T, data []byte) statefulpb.DatumSequ return datumSeq } +func findInflightDeltaEncodingSync(t *testing.T, datums []*statefulpb.Datum) *statefulpb.DeltaEncodingSync { + t.Helper() + for _, datum := range datums { + if sync := datum.GetDeltaEncodingSync(); sync != nil { + return sync + } + } + require.Fail(t, "DeltaEncodingSync not found") + return nil +} + func collectInflightPatterns(datums []*statefulpb.Datum) map[uint64]string { patterns := make(map[uint64]string) for _, datum := range datums { diff --git a/pkg/logs/sender/grpc/mock_state.go b/pkg/logs/sender/grpc/mock_state.go index 78030cad6831..0923532bd850 100644 --- a/pkg/logs/sender/grpc/mock_state.go +++ b/pkg/logs/sender/grpc/mock_state.go @@ -85,7 +85,6 @@ type tagCacheEntry struct { origin *message.Origin hostname string source string - status string processingTags string // joined ProcessingTags; part of cache key tagSet *statefulpb.TagSet dictID uint64 @@ -101,6 +100,8 @@ type MessageTranslator struct { tagEvictionManager *tags.TagEvictionManager tokenizer token.Tokenizer jsonLogsAsRaw bool // when true, JSON logs bypass stateful encoding and are sent as RawLog + jsonSchemaToID map[string]uint64 + nextJSONSchemaID uint64 pipelineName string lastStaleSweep time.Time @@ -113,7 +114,6 @@ type MessageTranslator struct { hostname string service string source string - status string processingTags string // joined ProcessingTags; part of cache key tagSet *statefulpb.TagSet dictID uint64 @@ -153,6 +153,8 @@ func NewMessageTranslator(pipelineName string, tokenizer token.Tokenizer, opts . tagEvictionManager: tags.NewTagEvictionManager(), tokenizer: tokenizer, jsonLogsAsRaw: pkgconfigsetup.Datadog().GetBool("logs_config.patterns.json_as_raw"), + jsonSchemaToID: make(map[string]uint64), + nextJSONSchemaID: flatLogEmptyDictIndex, pipelineName: pipelineName, lastStaleSweep: time.Now(), staleTTL: staleTTL, @@ -261,11 +263,15 @@ func (mt *MessageTranslator) processBatch(batch []batchEntry, outputChan chan *m if serviceIsNew { mt.sendDictEntryDefine(outputChan, entry.msg, serviceDictID, entry.msg.Origin.Service()) } + statusDictID, status, statusIsNew := mt.buildStatusField(entry.msg) + if statusIsNew { + mt.sendDictEntryDefine(outputChan, entry.msg, statusDictID, status) + } tagSet, allTagsStr, dictID, isNew := mt.buildTagSet(entry.msg) if isNew { mt.sendDictEntryDefine(outputChan, entry.msg, dictID, allTagsStr) } - mt.sendRawLog(outputChan, entry.msg, entry.content, ts, tagSet, service) + mt.sendRawLog(outputChan, entry.msg, entry.content, ts, tagSet, service, statusDictID) } else { tokenBatch = append(tokenBatch, entry) } @@ -313,11 +319,15 @@ func (mt *MessageTranslator) processMessage(msg *message.Message, outputChan cha if serviceIsNew { mt.sendDictEntryDefine(outputChan, msg, serviceDictID, msg.Origin.Service()) } + statusDictID, status, statusIsNew := mt.buildStatusField(msg) + if statusIsNew { + mt.sendDictEntryDefine(outputChan, msg, statusDictID, status) + } tagSet, allTagsStr, dictID, isNew := mt.buildTagSet(msg) if isNew { mt.sendDictEntryDefine(outputChan, msg, dictID, allTagsStr) } - mt.sendRawLog(outputChan, msg, string(content), ts, tagSet, service) + mt.sendRawLog(outputChan, msg, string(content), ts, tagSet, service, statusDictID) return } contentStr := string(content) @@ -354,11 +364,15 @@ func (mt *MessageTranslator) processPreTokenized(msg *message.Message, tokenList if serviceIsNew { mt.sendDictEntryDefine(outputChan, msg, serviceDictID, msg.Origin.Service()) } + statusDictID, status, statusIsNew := mt.buildStatusField(msg) + if statusIsNew { + mt.sendDictEntryDefine(outputChan, msg, statusDictID, status) + } tagSet, allTagsStr, dictID, isNew := mt.buildTagSet(msg) if isNew { mt.sendDictEntryDefine(outputChan, msg, dictID, allTagsStr) } - mt.sendRawLog(outputChan, msg, string(getTranslatorContent(msg)), ts, tagSet, service) + mt.sendRawLog(outputChan, msg, string(getTranslatorContent(msg)), ts, tagSet, service, statusDictID) return } @@ -431,25 +445,12 @@ func (mt *MessageTranslator) processPreTokenized(msg *message.Message, tokenList } } - // Encode message key as dict entry + // Encode JSON context schema as state and values as DynamicValues. var messageKeyDV *statefulpb.DynamicValue - if messageKey != "" { - encoded, mkDictID, mkIsNew := mt.encodeDynamicValue(messageKey) - messageKeyDV = encoded - if mkIsNew { - mt.sendDictEntryDefine(outputChan, msg, mkDictID, messageKey) - } - } - - // Encode JSON context schema as dict entry and values as DynamicValues var jsonContextSchemaID uint64 var jsonContextValuesDV []*statefulpb.DynamicValue if jsonContextSchema != "" { - var schemaIsNew bool - jsonContextSchemaID, schemaIsNew = mt.tagManager.AddString(jsonContextSchema) - if schemaIsNew { - mt.sendDictEntryDefine(outputChan, msg, jsonContextSchemaID, jsonContextSchema) - } + messageKeyDV, jsonContextSchemaID = mt.sendJsonSchemaDefineIfNeeded(outputChan, msg, messageKey, jsonContextSchema) jsonContextDVBacking := make([]statefulpb.DynamicValue, len(jsonContextValues)) jsonContextTypeBacking := make([]dvTypeBackings, len(jsonContextValues)) @@ -479,6 +480,11 @@ func (mt *MessageTranslator) processPreTokenized(msg *message.Message, tokenList mt.sendDictEntryDefine(outputChan, msg, serviceDictID, msg.Origin.Service()) } + statusDictID, status, statusIsNew := mt.buildStatusField(msg) + if statusIsNew { + mt.sendDictEntryDefine(outputChan, msg, statusDictID, status) + } + // Build complete tag list and encode as TagSet tagSet, allTagsString, dictID, isNew := mt.buildTagSet(msg) if isNew { @@ -487,21 +493,20 @@ func (mt *MessageTranslator) processPreTokenized(msg *message.Message, tokenList // Send StructuredLog with all fields tsMillis := ts.UnixNano() / nanoToMillis - mt.sendStructuredLog(outputChan, msg, tsMillis, patternID, dynamicValues, tagSet, service, messageKeyDV, jsonContextSchemaID, jsonContextValuesDV) + mt.sendStructuredLog(outputChan, msg, tsMillis, patternID, dynamicValues, tagSet, service, statusDictID, messageKeyDV, jsonContextSchemaID, jsonContextValuesDV) } // buildTagSet constructs the complete tag list for a message and encodes it as a TagSet. -// This includes log-level fields (hostname, ddsource, status) as tags, +// This includes log-level fields (hostname, ddsource) as tags, // plus all other tags from the message metadata (container tags, source config tags, processing tags). // All tags are joined as a single string, encoded as a single dictionary entry in the TagSet. -// A single-entry cache keyed on (origin ptr, hostname, source, status) avoids all +// A single-entry cache keyed on (origin ptr, hostname, source, processing tags) avoids all // allocations in the common case where these inputs are constant across messages (single-source pipeline). func (mt *MessageTranslator) buildTagSet(msg *message.Message) (*statefulpb.TagSet, string, uint64, bool) { // Read current inputs currentOrigin := msg.Origin currentHostname := msg.MessageMetadata.Hostname currentSource := msg.Origin.Source() - currentStatus := msg.MessageMetadata.GetStatus() currentProcessingTags := strings.Join(msg.MessageMetadata.ProcessingTags, ",") // Cache hit: all inputs identical and cached dict index still live (not evicted). @@ -509,7 +514,6 @@ func (mt *MessageTranslator) buildTagSet(msg *message.Message) (*statefulpb.TagS mt.tagCache.origin == currentOrigin && mt.tagCache.hostname == currentHostname && mt.tagCache.source == currentSource && - mt.tagCache.status == currentStatus && mt.tagCache.processingTags == currentProcessingTags && mt.tagManager.TouchDictID(mt.tagCache.dictID) { return mt.tagCache.tagSet, mt.tagCache.tagStr, mt.tagCache.dictID, false @@ -533,10 +537,6 @@ func (mt *MessageTranslator) buildTagSet(msg *message.Message) (*statefulpb.TagS tagStrings = append(tagStrings, "ddsource:"+currentSource) } - if currentStatus != "" { - tagStrings = append(tagStrings, "status:"+currentStatus) - } - allTagsString := strings.Join(tagStrings, ",") if allTagsString == "" { return nil, "", 0, false @@ -556,7 +556,6 @@ func (mt *MessageTranslator) buildTagSet(msg *message.Message) (*statefulpb.TagS mt.tagCache.origin = currentOrigin mt.tagCache.hostname = currentHostname mt.tagCache.source = currentSource - mt.tagCache.status = currentStatus mt.tagCache.processingTags = currentProcessingTags mt.tagCache.tagSet = tagSet mt.tagCache.dictID = dictID @@ -592,6 +591,53 @@ func (mt *MessageTranslator) buildServiceField(msg *message.Message) (*statefulp }, dictID, isNew } +func (mt *MessageTranslator) buildStatusField(msg *message.Message) (uint64, string, bool) { + status := msg.MessageMetadata.GetStatus() + if status == "" { + return 0, "", false + } + dictID, isNew := mt.tagManager.AddString(status) + return dictID, status, isNew +} + +func (mt *MessageTranslator) sendJsonSchemaDefineIfNeeded(outputChan chan *message.StatefulMessage, msg *message.Message, messageKey string, jsonContextSchema string) (*statefulpb.DynamicValue, uint64) { + if messageKey == "" { + messageKey = "message" + } + messageKeyID, messageKeyIsNew := mt.tagManager.AddString(messageKey) + if messageKeyIsNew { + mt.sendDictEntryDefine(outputChan, msg, messageKeyID, messageKey) + } + + keys := strings.Split(jsonContextSchema, ",") + keyIDs := make([]uint64, 0, len(keys)) + for _, key := range keys { + if key == "" { + continue + } + keyID, keyIsNew := mt.tagManager.AddString(key) + if keyIsNew { + mt.sendDictEntryDefine(outputChan, msg, keyID, key) + } + keyIDs = append(keyIDs, keyID) + } + + schemaKey := messageKey + "\x00" + jsonContextSchema + schemaID, ok := mt.jsonSchemaToID[schemaKey] + if !ok { + mt.nextJSONSchemaID++ + schemaID = mt.nextJSONSchemaID + mt.jsonSchemaToID[schemaKey] = schemaID + mt.sendJsonSchemaDefine(outputChan, msg, schemaID, keyIDs, messageKeyID) + } + + return &statefulpb.DynamicValue{ + Value: &statefulpb.DynamicValue_DictIndex{ + DictIndex: messageKeyID, + }, + }, schemaID +} + // getMessageTimestamp returns the timestamp for the message, preferring the HTTP // encoder timestamp when available so the dual-send paths stay aligned. func getMessageTimestamp(msg *message.Message) time.Time { @@ -671,10 +717,22 @@ func (mt *MessageTranslator) sendDictEntryDelete(outputChan chan *message.Statef } } +func (mt *MessageTranslator) sendJsonSchemaDefine(outputChan chan *message.StatefulMessage, msg *message.Message, schemaID uint64, keyIDs []uint64, messageKeyID uint64) { + schemaDatum := buildJsonSchemaDefine(schemaID, keyIDs, messageKeyID) + + bytesAdded := float64(proto.Size(schemaDatum)) + tlmPipelineStateSize.Add(bytesAdded, mt.pipelineName) + + outputChan <- &message.StatefulMessage{ + Datum: schemaDatum, + Metadata: &msg.MessageMetadata, + } +} + // sendRawLog creates and sends a raw log datum (currently unused) -func (mt *MessageTranslator) sendRawLog(outputChan chan *message.StatefulMessage, msg *message.Message, contentStr string, ts time.Time, tagSet *statefulpb.TagSet, service *statefulpb.DynamicValue) { +func (mt *MessageTranslator) sendRawLog(outputChan chan *message.StatefulMessage, msg *message.Message, contentStr string, ts time.Time, tagSet *statefulpb.TagSet, service *statefulpb.DynamicValue, statusDictID uint64) { // Proto3 string fields require valid UTF-8; replace invalid sequences to avoid corrupt datums. - logDatum := buildRawLog(toValidUTF8(contentStr), ts, tagSet, msg.MessageMetadata.DualSendUUID, service) + logDatum := buildRawLog(toValidUTF8(contentStr), ts, tagSet, msg.MessageMetadata.DualSendUUID, service, statusDictID) tlmPipelineRawLogsProcessed.Inc(mt.pipelineName) tlmPipelineRawLogsProcessedBytes.Add(float64(proto.Size(logDatum)), mt.pipelineName) @@ -686,8 +744,8 @@ func (mt *MessageTranslator) sendRawLog(outputChan chan *message.StatefulMessage } // sendStructuredLog creates and sends a StructuredLog datum -func (mt *MessageTranslator) sendStructuredLog(outputChan chan *message.StatefulMessage, msg *message.Message, timestamp int64, patternID uint64, dynamicValues []*statefulpb.DynamicValue, tagSet *statefulpb.TagSet, service *statefulpb.DynamicValue, messageKey *statefulpb.DynamicValue, jsonContextSchemaID uint64, jsonContextValues []*statefulpb.DynamicValue) { - logDatum := buildStructuredLog(timestamp, patternID, dynamicValues, tagSet, msg.MessageMetadata.DualSendUUID, service, messageKey, jsonContextSchemaID, jsonContextValues) +func (mt *MessageTranslator) sendStructuredLog(outputChan chan *message.StatefulMessage, msg *message.Message, timestamp int64, patternID uint64, dynamicValues []*statefulpb.DynamicValue, tagSet *statefulpb.TagSet, service *statefulpb.DynamicValue, statusDictID uint64, messageKey *statefulpb.DynamicValue, jsonContextSchemaID uint64, jsonContextValues []*statefulpb.DynamicValue) { + logDatum := buildStructuredLog(timestamp, patternID, dynamicValues, tagSet, msg.MessageMetadata.DualSendUUID, service, statusDictID, messageKey, jsonContextSchemaID, jsonContextValues) tlmPipelinePatternLogsProcessed.Inc(mt.pipelineName) tlmPipelinePatternLogsProcessedBytes.Add(float64(proto.Size(logDatum)), mt.pipelineName) @@ -741,6 +799,18 @@ func buildDictEntryDefine(id uint64, value string) *statefulpb.Datum { } } +func buildJsonSchemaDefine(schemaID uint64, keyIDs []uint64, messageKeyID uint64) *statefulpb.Datum { + return &statefulpb.Datum{ + Data: &statefulpb.Datum_JsonSchemaDefine{ + JsonSchemaDefine: &statefulpb.JsonSchemaDefine{ + SchemaId: schemaID, + Keys: keyIDs, + MessageKeyId: messageKeyID, + }, + }, + } +} + // parseLosslessIntString returns an int64 only when the original string is already // the canonical base-10 representation of that integer. Numeric-looking strings // like "00123" are kept as strings so they can round-trip without losing lexeme @@ -935,48 +1005,61 @@ func (mt *MessageTranslator) fillWildcardDynamicValue( return 0, "", false } -// buildStructuredLog creates a Datum containing a StructuredLog -func buildStructuredLog(timestamp int64, patternID uint64, dynamicValues []*statefulpb.DynamicValue, tagSet *statefulpb.TagSet, uuid string, service *statefulpb.DynamicValue, messageKey *statefulpb.DynamicValue, jsonContextSchemaID uint64, jsonContextValues []*statefulpb.DynamicValue) *statefulpb.Datum { - log := &statefulpb.Log{ +func flatLogTagSetDictIndex(tagSet *statefulpb.TagSet) uint64 { + if tagSet == nil || tagSet.Tagset == nil { + return 0 + } + return tagSet.Tagset.GetDictIndex() +} + +func flatLogDynamicValueDictIndex(value *statefulpb.DynamicValue) uint64 { + if value == nil { + return 0 + } + return value.GetDictIndex() +} + +// buildStructuredLog creates a Datum containing a FlatLog with pattern references. +func buildStructuredLog(timestamp int64, patternID uint64, dynamicValues []*statefulpb.DynamicValue, tagSet *statefulpb.TagSet, uuid string, service *statefulpb.DynamicValue, statusDictID uint64, messageKey *statefulpb.DynamicValue, jsonContextSchemaID uint64, jsonContextValues []*statefulpb.DynamicValue) *statefulpb.Datum { + _ = messageKey + log := &statefulpb.FlatLog{ Timestamp: timestamp, - Content: &statefulpb.Log_Structured{ - Structured: &statefulpb.StructuredLog{ - PatternId: patternID, - DynamicValues: dynamicValues, - JsonMessageKey: messageKey, - JsonContextSchemaId: jsonContextSchemaID, - JsonContextValues: jsonContextValues, - }, - }, - Tags: tagSet, - Service: service, + Status: flatLogDictIndex(statusDictID), + Service: flatLogDictIndex(flatLogDynamicValueDictIndex(service)), + Tags: flatLogDictIndex(flatLogTagSetDictIndex(tagSet)), + + PatternId: patternID, + DynamicValues: dynamicValues, + JsonSchemaId: flatLogDictIndex(jsonContextSchemaID), + JsonContextValues: jsonContextValues, } if uuid != "" { log.Uuid = &uuid } return &statefulpb.Datum{ - Data: &statefulpb.Datum_Logs{ - Logs: log, + Data: &statefulpb.Datum_FlatLog{ + FlatLog: log, }, } } -// buildRawLog creates a Datum containing a raw log (no pattern) -func buildRawLog(content string, ts time.Time, tagSet *statefulpb.TagSet, uuid string, service *statefulpb.DynamicValue) *statefulpb.Datum { - log := &statefulpb.Log{ +// buildRawLog creates a Datum containing a FlatLog with raw content (no pattern). +func buildRawLog(content string, ts time.Time, tagSet *statefulpb.TagSet, uuid string, service *statefulpb.DynamicValue, statusDictID uint64) *statefulpb.Datum { + log := &statefulpb.FlatLog{ Timestamp: ts.UnixNano() / nanoToMillis, - Content: &statefulpb.Log_Raw{ - Raw: content, - }, - Tags: tagSet, - Service: service, + Status: flatLogDictIndex(statusDictID), + Service: flatLogDictIndex(flatLogDynamicValueDictIndex(service)), + Tags: flatLogDictIndex(flatLogTagSetDictIndex(tagSet)), + RawLog: content, + + JsonSchemaId: flatLogEmptyDictIndex, } if uuid != "" { log.Uuid = &uuid } return &statefulpb.Datum{ - Data: &statefulpb.Datum_Logs{ - Logs: log, + Data: &statefulpb.Datum_FlatLog{ + FlatLog: log, }, } } diff --git a/pkg/logs/sender/grpc/mock_state_cache_test.go b/pkg/logs/sender/grpc/mock_state_cache_test.go index b804d6e2570b..47bf146f899e 100644 --- a/pkg/logs/sender/grpc/mock_state_cache_test.go +++ b/pkg/logs/sender/grpc/mock_state_cache_test.go @@ -6,7 +6,6 @@ package grpc import ( - "strings" "testing" "time" "unicode/utf8" @@ -18,6 +17,7 @@ import ( "github.com/DataDog/datadog-agent/pkg/logs/message" rtokenizer "github.com/DataDog/datadog-agent/pkg/logs/patterns/tokenizer/rust" "github.com/DataDog/datadog-agent/pkg/logs/sources" + "github.com/DataDog/datadog-agent/pkg/proto/pbgo/statefulpb" ) // makeTestOrigin builds an Origin backed by a LogsConfig with explicit service/source. @@ -102,20 +102,58 @@ func TestBuildTagSet_CacheCorrectness(t *testing.T) { assert.NotContains(t, tagStr5, "ddsource:src-a", "old source must not appear after source change") assert.NotEqual(t, tagStr4, tagStr5, "source change must produce a different allTagsString") - // ── Case 6: status change causes cache miss ─────────────────────────────── - // Use the same origin3 (same origin ptr, service, source) but change status + // ── Case 6: status change does not affect joined tags ───────────────────── + // Status is encoded in FlatLog.status, not in the joined tag string. msg6 := makeMsg("log line 6", "host-2", "error", origin3) tagSet6, tagStr6, _, _ := mt.buildTagSet(msg6) require.NotNil(t, tagSet6) - assert.Contains(t, tagStr6, "status:error", "new status must appear in tag string") - // "info" is the default status (returned by GetStatus when Status=="") so it appears - // in tagStr5. After changing to "error" the strings must differ. - assert.True(t, - strings.Contains(tagStr5, "status:info") || !strings.Contains(tagStr5, "status:error"), - "prior tag string must not already contain status:error") - assert.NotEqual(t, tagStr5, tagStr6, "status change must produce a different allTagsString") - assert.Equal(t, "error", mt.tagCache.status, "cache must reflect updated status") + assert.NotContains(t, tagStr6, "status:error", "status must not be encoded in the joined tag string") + assert.Equal(t, tagStr5, tagStr6, "status change must not change the joined tag string") +} + +func TestBuildStructuredLogUsesFlatLog(t *testing.T) { + tagSet := &statefulpb.TagSet{ + Tagset: &statefulpb.DynamicValue{ + Value: &statefulpb.DynamicValue_DictIndex{DictIndex: 4}, + }, + } + service := &statefulpb.DynamicValue{ + Value: &statefulpb.DynamicValue_DictIndex{DictIndex: 3}, + } + values := []*statefulpb.DynamicValue{{ + Value: &statefulpb.DynamicValue_StringValue{StringValue: "value"}, + }} + + datum := buildStructuredLog(123, 12, values, tagSet, "uuid", service, 2, nil, 5, nil) + + require.Nil(t, datum.GetLogs()) + flatLog := datum.GetFlatLog() + require.NotNil(t, flatLog) + assert.EqualValues(t, 123, flatLog.Timestamp) + assert.EqualValues(t, 2, flatLog.Status) + assert.EqualValues(t, 3, flatLog.Service) + assert.EqualValues(t, 4, flatLog.Tags) + assert.EqualValues(t, 12, flatLog.PatternId) + assert.EqualValues(t, 5, flatLog.JsonSchemaId) + assert.Equal(t, values, flatLog.DynamicValues) + assert.Equal(t, "uuid", flatLog.GetUuid()) +} + +func TestBuildRawLogUsesFlatLog(t *testing.T) { + ts := time.UnixMilli(123) + + datum := buildRawLog("raw", ts, nil, "", nil, 0) + + require.Nil(t, datum.GetLogs()) + flatLog := datum.GetFlatLog() + require.NotNil(t, flatLog) + assert.EqualValues(t, 123, flatLog.Timestamp) + assert.Equal(t, "raw", flatLog.RawLog) + assert.EqualValues(t, flatLogEmptyDictIndex, flatLog.Status) + assert.EqualValues(t, flatLogEmptyDictIndex, flatLog.Service) + assert.EqualValues(t, flatLogEmptyDictIndex, flatLog.Tags) + assert.EqualValues(t, flatLogEmptyDictIndex, flatLog.JsonSchemaId) } func TestBuildTagSet_ProcessingTagsInvalidateCache(t *testing.T) { diff --git a/pkg/proto/datadog/stateful/stateful_encoding.proto b/pkg/proto/datadog/stateful/stateful_encoding.proto index 76a2598db8eb..af0bae6c7069 100644 --- a/pkg/proto/datadog/stateful/stateful_encoding.proto +++ b/pkg/proto/datadog/stateful/stateful_encoding.proto @@ -47,6 +47,37 @@ message Tag { DynamicValue value = 2; } +message FlatLog { + // Delta encoded timestamp: add this to previous timestamp (or DeltaEncodingSync.timestamp) to get the absolute timestamp. + sint64 timestamp = 1; + // Delta encoded status: when 0 use last status. + // Use 1 to indicate no status. + uint64 status = 2; + // Delta encoded service: when 0 use last service. Otherwise use service in this dict entry. + // Use 1 to indicate no status. + uint64 service = 3; + // Delta encoded tags: when 0 use last tags. Otherwise use tags in this dict entry. + uint64 tags = 4; + + + // Delta encoded pattern_id: when 0 use last pattern_id. Otherwise use pattern_id in this dict entry. + uint64 pattern_id = 5; + // Values for the pattern. + repeated DynamicValue dynamic_values = 6; + // Optional - used instead of pattern_id and dynamic_values for e.g. larger logs. + string raw_log = 7; + + + // Delta encoded json_schema_id: when 0 use last json_schema_id. Otherwise use json_schema_id in this dict entry. + // Use 1 to indicate no json_schema_id. + uint64 json_schema_id = 8; + // Values for the json schema. + repeated DynamicValue json_context_values = 9; + + // Optional UUID used to track logs when dual-sent via HTTP and gRPC. + optional string uuid = 100; +} + message Log { sint64 timestamp = 1; oneof content { @@ -70,6 +101,17 @@ message Log { DynamicValue service = 7; } + +message JsonSchemaDefine { + uint64 schema_id = 1; + repeated uint64 keys = 2; + uint64 message_key_id = 3; +} + +message JsonSchemaDelete { + uint64 schema_id = 1; +} + message StructuredLog { uint64 pattern_id = 1; repeated DynamicValue dynamic_values = 2; @@ -115,15 +157,16 @@ message DynamicValue { bool render_as_string = 5; } -// We could choose to delta encode at batch level or at stream level. -// If at stream level, then we need to send the delta encoding related state -// to resync the Intake on stream restart -// Currently we are doing it at batch level. This message below is not used, -// we declare it here for future use. +// Restores delta encoding state before replayed datums, for example when the +// agent prepends lazy snapshot definitions ahead of already-encoded payload data. message DeltaEncodingSync { uint64 timestamp = 1; uint64 pattern_id = 2; TagSet tags = 3; + uint64 status = 4; + uint64 service = 5; + uint64 flat_log_tags = 6; + uint64 json_schema_id = 7; } // --------------------------------------------------------------------------- @@ -138,6 +181,9 @@ message Datum { DictEntryDelete dict_entry_delete = 4; DeltaEncodingSync delta_encoding_sync = 5; Log logs = 6; + FlatLog flat_log = 7; + JsonSchemaDefine json_schema_define = 8; + JsonSchemaDelete json_schema_delete = 9; } } diff --git a/pkg/proto/pbgo/statefulpb/stateful_encoding.pb.go b/pkg/proto/pbgo/statefulpb/stateful_encoding.pb.go index 101743e74717..a56e113a16d8 100644 --- a/pkg/proto/pbgo/statefulpb/stateful_encoding.pb.go +++ b/pkg/proto/pbgo/statefulpb/stateful_encoding.pb.go @@ -65,7 +65,7 @@ func (x BatchStatus_Status) Number() protoreflect.EnumNumber { // Deprecated: Use BatchStatus_Status.Descriptor instead. func (BatchStatus_Status) EnumDescriptor() ([]byte, []int) { - return file_datadog_stateful_stateful_encoding_proto_rawDescGZIP(), []int{13, 0} + return file_datadog_stateful_stateful_encoding_proto_rawDescGZIP(), []int{16, 0} } type DictEntryDefine struct { @@ -375,6 +375,135 @@ func (x *Tag) GetValue() *DynamicValue { return nil } +type FlatLog struct { + state protoimpl.MessageState `protogen:"open.v1"` + // Delta encoded timestamp: add this to previous timestamp (or DeltaEncodingSync.timestamp) to get the absolute timestamp. + Timestamp int64 `protobuf:"zigzag64,1,opt,name=timestamp,proto3" json:"timestamp,omitempty"` + // Delta encoded status: when 0 use last status. + // Use 1 to indicate no status. + Status uint64 `protobuf:"varint,2,opt,name=status,proto3" json:"status,omitempty"` + // Delta encoded service: when 0 use last service. Otherwise use service in this dict entry. + // Use 1 to indicate no status. + Service uint64 `protobuf:"varint,3,opt,name=service,proto3" json:"service,omitempty"` + // Delta encoded tags: when 0 use last tags. Otherwise use tags in this dict entry. + Tags uint64 `protobuf:"varint,4,opt,name=tags,proto3" json:"tags,omitempty"` + // Delta encoded pattern_id: when 0 use last pattern_id. Otherwise use pattern_id in this dict entry. + PatternId uint64 `protobuf:"varint,5,opt,name=pattern_id,json=patternId,proto3" json:"pattern_id,omitempty"` + // Values for the pattern. + DynamicValues []*DynamicValue `protobuf:"bytes,6,rep,name=dynamic_values,json=dynamicValues,proto3" json:"dynamic_values,omitempty"` + // Optional - used instead of pattern_id and dynamic_values for e.g. larger logs. + RawLog string `protobuf:"bytes,7,opt,name=raw_log,json=rawLog,proto3" json:"raw_log,omitempty"` + // Delta encoded json_schema_id: when 0 use last json_schema_id. Otherwise use json_schema_id in this dict entry. + // Use 1 to indicate no json_schema_id. + JsonSchemaId uint64 `protobuf:"varint,8,opt,name=json_schema_id,json=jsonSchemaId,proto3" json:"json_schema_id,omitempty"` + // Values for the json schema. + JsonContextValues []*DynamicValue `protobuf:"bytes,9,rep,name=json_context_values,json=jsonContextValues,proto3" json:"json_context_values,omitempty"` + // Optional UUID used to track logs when dual-sent via HTTP and gRPC. + Uuid *string `protobuf:"bytes,100,opt,name=uuid,proto3,oneof" json:"uuid,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *FlatLog) Reset() { + *x = FlatLog{} + mi := &file_datadog_stateful_stateful_encoding_proto_msgTypes[6] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *FlatLog) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*FlatLog) ProtoMessage() {} + +func (x *FlatLog) ProtoReflect() protoreflect.Message { + mi := &file_datadog_stateful_stateful_encoding_proto_msgTypes[6] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use FlatLog.ProtoReflect.Descriptor instead. +func (*FlatLog) Descriptor() ([]byte, []int) { + return file_datadog_stateful_stateful_encoding_proto_rawDescGZIP(), []int{6} +} + +func (x *FlatLog) GetTimestamp() int64 { + if x != nil { + return x.Timestamp + } + return 0 +} + +func (x *FlatLog) GetStatus() uint64 { + if x != nil { + return x.Status + } + return 0 +} + +func (x *FlatLog) GetService() uint64 { + if x != nil { + return x.Service + } + return 0 +} + +func (x *FlatLog) GetTags() uint64 { + if x != nil { + return x.Tags + } + return 0 +} + +func (x *FlatLog) GetPatternId() uint64 { + if x != nil { + return x.PatternId + } + return 0 +} + +func (x *FlatLog) GetDynamicValues() []*DynamicValue { + if x != nil { + return x.DynamicValues + } + return nil +} + +func (x *FlatLog) GetRawLog() string { + if x != nil { + return x.RawLog + } + return "" +} + +func (x *FlatLog) GetJsonSchemaId() uint64 { + if x != nil { + return x.JsonSchemaId + } + return 0 +} + +func (x *FlatLog) GetJsonContextValues() []*DynamicValue { + if x != nil { + return x.JsonContextValues + } + return nil +} + +func (x *FlatLog) GetUuid() string { + if x != nil && x.Uuid != nil { + return *x.Uuid + } + return "" +} + type Log struct { state protoimpl.MessageState `protogen:"open.v1"` Timestamp int64 `protobuf:"zigzag64,1,opt,name=timestamp,proto3" json:"timestamp,omitempty"` @@ -401,7 +530,7 @@ type Log struct { func (x *Log) Reset() { *x = Log{} - mi := &file_datadog_stateful_stateful_encoding_proto_msgTypes[6] + mi := &file_datadog_stateful_stateful_encoding_proto_msgTypes[7] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -413,7 +542,7 @@ func (x *Log) String() string { func (*Log) ProtoMessage() {} func (x *Log) ProtoReflect() protoreflect.Message { - mi := &file_datadog_stateful_stateful_encoding_proto_msgTypes[6] + mi := &file_datadog_stateful_stateful_encoding_proto_msgTypes[7] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -426,7 +555,7 @@ func (x *Log) ProtoReflect() protoreflect.Message { // Deprecated: Use Log.ProtoReflect.Descriptor instead. func (*Log) Descriptor() ([]byte, []int) { - return file_datadog_stateful_stateful_encoding_proto_rawDescGZIP(), []int{6} + return file_datadog_stateful_stateful_encoding_proto_rawDescGZIP(), []int{7} } func (x *Log) GetTimestamp() int64 { @@ -505,6 +634,110 @@ func (*Log_Structured) isLog_Content() {} func (*Log_Raw) isLog_Content() {} +type JsonSchemaDefine struct { + state protoimpl.MessageState `protogen:"open.v1"` + SchemaId uint64 `protobuf:"varint,1,opt,name=schema_id,json=schemaId,proto3" json:"schema_id,omitempty"` + Keys []uint64 `protobuf:"varint,2,rep,packed,name=keys,proto3" json:"keys,omitempty"` + MessageKeyId uint64 `protobuf:"varint,3,opt,name=message_key_id,json=messageKeyId,proto3" json:"message_key_id,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *JsonSchemaDefine) Reset() { + *x = JsonSchemaDefine{} + mi := &file_datadog_stateful_stateful_encoding_proto_msgTypes[8] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *JsonSchemaDefine) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*JsonSchemaDefine) ProtoMessage() {} + +func (x *JsonSchemaDefine) ProtoReflect() protoreflect.Message { + mi := &file_datadog_stateful_stateful_encoding_proto_msgTypes[8] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use JsonSchemaDefine.ProtoReflect.Descriptor instead. +func (*JsonSchemaDefine) Descriptor() ([]byte, []int) { + return file_datadog_stateful_stateful_encoding_proto_rawDescGZIP(), []int{8} +} + +func (x *JsonSchemaDefine) GetSchemaId() uint64 { + if x != nil { + return x.SchemaId + } + return 0 +} + +func (x *JsonSchemaDefine) GetKeys() []uint64 { + if x != nil { + return x.Keys + } + return nil +} + +func (x *JsonSchemaDefine) GetMessageKeyId() uint64 { + if x != nil { + return x.MessageKeyId + } + return 0 +} + +type JsonSchemaDelete struct { + state protoimpl.MessageState `protogen:"open.v1"` + SchemaId uint64 `protobuf:"varint,1,opt,name=schema_id,json=schemaId,proto3" json:"schema_id,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *JsonSchemaDelete) Reset() { + *x = JsonSchemaDelete{} + mi := &file_datadog_stateful_stateful_encoding_proto_msgTypes[9] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *JsonSchemaDelete) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*JsonSchemaDelete) ProtoMessage() {} + +func (x *JsonSchemaDelete) ProtoReflect() protoreflect.Message { + mi := &file_datadog_stateful_stateful_encoding_proto_msgTypes[9] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use JsonSchemaDelete.ProtoReflect.Descriptor instead. +func (*JsonSchemaDelete) Descriptor() ([]byte, []int) { + return file_datadog_stateful_stateful_encoding_proto_rawDescGZIP(), []int{9} +} + +func (x *JsonSchemaDelete) GetSchemaId() uint64 { + if x != nil { + return x.SchemaId + } + return 0 +} + type StructuredLog struct { state protoimpl.MessageState `protogen:"open.v1"` PatternId uint64 `protobuf:"varint,1,opt,name=pattern_id,json=patternId,proto3" json:"pattern_id,omitempty"` @@ -527,7 +760,7 @@ type StructuredLog struct { func (x *StructuredLog) Reset() { *x = StructuredLog{} - mi := &file_datadog_stateful_stateful_encoding_proto_msgTypes[7] + mi := &file_datadog_stateful_stateful_encoding_proto_msgTypes[10] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -539,7 +772,7 @@ func (x *StructuredLog) String() string { func (*StructuredLog) ProtoMessage() {} func (x *StructuredLog) ProtoReflect() protoreflect.Message { - mi := &file_datadog_stateful_stateful_encoding_proto_msgTypes[7] + mi := &file_datadog_stateful_stateful_encoding_proto_msgTypes[10] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -552,7 +785,7 @@ func (x *StructuredLog) ProtoReflect() protoreflect.Message { // Deprecated: Use StructuredLog.ProtoReflect.Descriptor instead. func (*StructuredLog) Descriptor() ([]byte, []int) { - return file_datadog_stateful_stateful_encoding_proto_rawDescGZIP(), []int{7} + return file_datadog_stateful_stateful_encoding_proto_rawDescGZIP(), []int{10} } func (x *StructuredLog) GetPatternId() uint64 { @@ -618,7 +851,7 @@ type DynamicValue struct { func (x *DynamicValue) Reset() { *x = DynamicValue{} - mi := &file_datadog_stateful_stateful_encoding_proto_msgTypes[8] + mi := &file_datadog_stateful_stateful_encoding_proto_msgTypes[11] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -630,7 +863,7 @@ func (x *DynamicValue) String() string { func (*DynamicValue) ProtoMessage() {} func (x *DynamicValue) ProtoReflect() protoreflect.Message { - mi := &file_datadog_stateful_stateful_encoding_proto_msgTypes[8] + mi := &file_datadog_stateful_stateful_encoding_proto_msgTypes[11] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -643,7 +876,7 @@ func (x *DynamicValue) ProtoReflect() protoreflect.Message { // Deprecated: Use DynamicValue.ProtoReflect.Descriptor instead. func (*DynamicValue) Descriptor() ([]byte, []int) { - return file_datadog_stateful_stateful_encoding_proto_rawDescGZIP(), []int{8} + return file_datadog_stateful_stateful_encoding_proto_rawDescGZIP(), []int{11} } func (x *DynamicValue) GetValue() isDynamicValue_Value { @@ -754,23 +987,24 @@ func (*DynamicValue_DictIndex) isDynamicValue_Value() {} func (*DynamicValue_RawJsonValue) isDynamicValue_Value() {} -// We could choose to delta encode at batch level or at stream level. -// If at stream level, then we need to send the delta encoding related state -// to resync the Intake on stream restart -// Currently we are doing it at batch level. This message below is not used, -// we declare it here for future use. +// Restores delta encoding state before replayed datums, for example when the +// agent prepends lazy snapshot definitions ahead of already-encoded payload data. type DeltaEncodingSync struct { state protoimpl.MessageState `protogen:"open.v1"` Timestamp uint64 `protobuf:"varint,1,opt,name=timestamp,proto3" json:"timestamp,omitempty"` PatternId uint64 `protobuf:"varint,2,opt,name=pattern_id,json=patternId,proto3" json:"pattern_id,omitempty"` Tags *TagSet `protobuf:"bytes,3,opt,name=tags,proto3" json:"tags,omitempty"` + Status uint64 `protobuf:"varint,4,opt,name=status,proto3" json:"status,omitempty"` + Service uint64 `protobuf:"varint,5,opt,name=service,proto3" json:"service,omitempty"` + FlatLogTags uint64 `protobuf:"varint,6,opt,name=flat_log_tags,json=flatLogTags,proto3" json:"flat_log_tags,omitempty"` + JsonSchemaId uint64 `protobuf:"varint,7,opt,name=json_schema_id,json=jsonSchemaId,proto3" json:"json_schema_id,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } func (x *DeltaEncodingSync) Reset() { *x = DeltaEncodingSync{} - mi := &file_datadog_stateful_stateful_encoding_proto_msgTypes[9] + mi := &file_datadog_stateful_stateful_encoding_proto_msgTypes[12] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -782,7 +1016,7 @@ func (x *DeltaEncodingSync) String() string { func (*DeltaEncodingSync) ProtoMessage() {} func (x *DeltaEncodingSync) ProtoReflect() protoreflect.Message { - mi := &file_datadog_stateful_stateful_encoding_proto_msgTypes[9] + mi := &file_datadog_stateful_stateful_encoding_proto_msgTypes[12] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -795,7 +1029,7 @@ func (x *DeltaEncodingSync) ProtoReflect() protoreflect.Message { // Deprecated: Use DeltaEncodingSync.ProtoReflect.Descriptor instead. func (*DeltaEncodingSync) Descriptor() ([]byte, []int) { - return file_datadog_stateful_stateful_encoding_proto_rawDescGZIP(), []int{9} + return file_datadog_stateful_stateful_encoding_proto_rawDescGZIP(), []int{12} } func (x *DeltaEncodingSync) GetTimestamp() uint64 { @@ -819,6 +1053,34 @@ func (x *DeltaEncodingSync) GetTags() *TagSet { return nil } +func (x *DeltaEncodingSync) GetStatus() uint64 { + if x != nil { + return x.Status + } + return 0 +} + +func (x *DeltaEncodingSync) GetService() uint64 { + if x != nil { + return x.Service + } + return 0 +} + +func (x *DeltaEncodingSync) GetFlatLogTags() uint64 { + if x != nil { + return x.FlatLogTags + } + return 0 +} + +func (x *DeltaEncodingSync) GetJsonSchemaId() uint64 { + if x != nil { + return x.JsonSchemaId + } + return 0 +} + type Datum struct { state protoimpl.MessageState `protogen:"open.v1"` // Types that are valid to be assigned to Data: @@ -829,6 +1091,9 @@ type Datum struct { // *Datum_DictEntryDelete // *Datum_DeltaEncodingSync // *Datum_Logs + // *Datum_FlatLog + // *Datum_JsonSchemaDefine + // *Datum_JsonSchemaDelete Data isDatum_Data `protobuf_oneof:"data"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache @@ -836,7 +1101,7 @@ type Datum struct { func (x *Datum) Reset() { *x = Datum{} - mi := &file_datadog_stateful_stateful_encoding_proto_msgTypes[10] + mi := &file_datadog_stateful_stateful_encoding_proto_msgTypes[13] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -848,7 +1113,7 @@ func (x *Datum) String() string { func (*Datum) ProtoMessage() {} func (x *Datum) ProtoReflect() protoreflect.Message { - mi := &file_datadog_stateful_stateful_encoding_proto_msgTypes[10] + mi := &file_datadog_stateful_stateful_encoding_proto_msgTypes[13] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -861,7 +1126,7 @@ func (x *Datum) ProtoReflect() protoreflect.Message { // Deprecated: Use Datum.ProtoReflect.Descriptor instead. func (*Datum) Descriptor() ([]byte, []int) { - return file_datadog_stateful_stateful_encoding_proto_rawDescGZIP(), []int{10} + return file_datadog_stateful_stateful_encoding_proto_rawDescGZIP(), []int{13} } func (x *Datum) GetData() isDatum_Data { @@ -925,6 +1190,33 @@ func (x *Datum) GetLogs() *Log { return nil } +func (x *Datum) GetFlatLog() *FlatLog { + if x != nil { + if x, ok := x.Data.(*Datum_FlatLog); ok { + return x.FlatLog + } + } + return nil +} + +func (x *Datum) GetJsonSchemaDefine() *JsonSchemaDefine { + if x != nil { + if x, ok := x.Data.(*Datum_JsonSchemaDefine); ok { + return x.JsonSchemaDefine + } + } + return nil +} + +func (x *Datum) GetJsonSchemaDelete() *JsonSchemaDelete { + if x != nil { + if x, ok := x.Data.(*Datum_JsonSchemaDelete); ok { + return x.JsonSchemaDelete + } + } + return nil +} + type isDatum_Data interface { isDatum_Data() } @@ -953,6 +1245,18 @@ type Datum_Logs struct { Logs *Log `protobuf:"bytes,6,opt,name=logs,proto3,oneof"` } +type Datum_FlatLog struct { + FlatLog *FlatLog `protobuf:"bytes,7,opt,name=flat_log,json=flatLog,proto3,oneof"` +} + +type Datum_JsonSchemaDefine struct { + JsonSchemaDefine *JsonSchemaDefine `protobuf:"bytes,8,opt,name=json_schema_define,json=jsonSchemaDefine,proto3,oneof"` +} + +type Datum_JsonSchemaDelete struct { + JsonSchemaDelete *JsonSchemaDelete `protobuf:"bytes,9,opt,name=json_schema_delete,json=jsonSchemaDelete,proto3,oneof"` +} + func (*Datum_PatternDefine) isDatum_Data() {} func (*Datum_PatternDelete) isDatum_Data() {} @@ -965,6 +1269,12 @@ func (*Datum_DeltaEncodingSync) isDatum_Data() {} func (*Datum_Logs) isDatum_Data() {} +func (*Datum_FlatLog) isDatum_Data() {} + +func (*Datum_JsonSchemaDefine) isDatum_Data() {} + +func (*Datum_JsonSchemaDelete) isDatum_Data() {} + // DatumSequence wraps a sequence of Datum messages // Used for serialization in application-level compression type DatumSequence struct { @@ -976,7 +1286,7 @@ type DatumSequence struct { func (x *DatumSequence) Reset() { *x = DatumSequence{} - mi := &file_datadog_stateful_stateful_encoding_proto_msgTypes[11] + mi := &file_datadog_stateful_stateful_encoding_proto_msgTypes[14] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -988,7 +1298,7 @@ func (x *DatumSequence) String() string { func (*DatumSequence) ProtoMessage() {} func (x *DatumSequence) ProtoReflect() protoreflect.Message { - mi := &file_datadog_stateful_stateful_encoding_proto_msgTypes[11] + mi := &file_datadog_stateful_stateful_encoding_proto_msgTypes[14] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1001,7 +1311,7 @@ func (x *DatumSequence) ProtoReflect() protoreflect.Message { // Deprecated: Use DatumSequence.ProtoReflect.Descriptor instead. func (*DatumSequence) Descriptor() ([]byte, []int) { - return file_datadog_stateful_stateful_encoding_proto_rawDescGZIP(), []int{11} + return file_datadog_stateful_stateful_encoding_proto_rawDescGZIP(), []int{14} } func (x *DatumSequence) GetData() []*Datum { @@ -1025,7 +1335,7 @@ type StatefulBatch struct { func (x *StatefulBatch) Reset() { *x = StatefulBatch{} - mi := &file_datadog_stateful_stateful_encoding_proto_msgTypes[12] + mi := &file_datadog_stateful_stateful_encoding_proto_msgTypes[15] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1037,7 +1347,7 @@ func (x *StatefulBatch) String() string { func (*StatefulBatch) ProtoMessage() {} func (x *StatefulBatch) ProtoReflect() protoreflect.Message { - mi := &file_datadog_stateful_stateful_encoding_proto_msgTypes[12] + mi := &file_datadog_stateful_stateful_encoding_proto_msgTypes[15] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1050,7 +1360,7 @@ func (x *StatefulBatch) ProtoReflect() protoreflect.Message { // Deprecated: Use StatefulBatch.ProtoReflect.Descriptor instead. func (*StatefulBatch) Descriptor() ([]byte, []int) { - return file_datadog_stateful_stateful_encoding_proto_rawDescGZIP(), []int{12} + return file_datadog_stateful_stateful_encoding_proto_rawDescGZIP(), []int{15} } func (x *StatefulBatch) GetBatchId() uint32 { @@ -1077,7 +1387,7 @@ type BatchStatus struct { func (x *BatchStatus) Reset() { *x = BatchStatus{} - mi := &file_datadog_stateful_stateful_encoding_proto_msgTypes[13] + mi := &file_datadog_stateful_stateful_encoding_proto_msgTypes[16] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1089,7 +1399,7 @@ func (x *BatchStatus) String() string { func (*BatchStatus) ProtoMessage() {} func (x *BatchStatus) ProtoReflect() protoreflect.Message { - mi := &file_datadog_stateful_stateful_encoding_proto_msgTypes[13] + mi := &file_datadog_stateful_stateful_encoding_proto_msgTypes[16] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1102,7 +1412,7 @@ func (x *BatchStatus) ProtoReflect() protoreflect.Message { // Deprecated: Use BatchStatus.ProtoReflect.Descriptor instead. func (*BatchStatus) Descriptor() ([]byte, []int) { - return file_datadog_stateful_stateful_encoding_proto_rawDescGZIP(), []int{13} + return file_datadog_stateful_stateful_encoding_proto_rawDescGZIP(), []int{16} } func (x *BatchStatus) GetBatchId() uint32 { @@ -1143,7 +1453,20 @@ const file_datadog_stateful_stateful_encoding_proto_rawDesc = "" + "\x06tagset\x18\x01 \x01(\v2%.datadog.intake.stateful.DynamicValueR\x06tagset\"{\n" + "\x03Tag\x127\n" + "\x03key\x18\x01 \x01(\v2%.datadog.intake.stateful.DynamicValueR\x03key\x12;\n" + - "\x05value\x18\x02 \x01(\v2%.datadog.intake.stateful.DynamicValueR\x05value\"\xe3\x02\n" + + "\x05value\x18\x02 \x01(\v2%.datadog.intake.stateful.DynamicValueR\x05value\"\x92\x03\n" + + "\aFlatLog\x12\x1c\n" + + "\ttimestamp\x18\x01 \x01(\x12R\ttimestamp\x12\x16\n" + + "\x06status\x18\x02 \x01(\x04R\x06status\x12\x18\n" + + "\aservice\x18\x03 \x01(\x04R\aservice\x12\x12\n" + + "\x04tags\x18\x04 \x01(\x04R\x04tags\x12\x1d\n" + + "\n" + + "pattern_id\x18\x05 \x01(\x04R\tpatternId\x12L\n" + + "\x0edynamic_values\x18\x06 \x03(\v2%.datadog.intake.stateful.DynamicValueR\rdynamicValues\x12\x17\n" + + "\araw_log\x18\a \x01(\tR\x06rawLog\x12$\n" + + "\x0ejson_schema_id\x18\b \x01(\x04R\fjsonSchemaId\x12U\n" + + "\x13json_context_values\x18\t \x03(\v2%.datadog.intake.stateful.DynamicValueR\x11jsonContextValues\x12\x17\n" + + "\x04uuid\x18d \x01(\tH\x00R\x04uuid\x88\x01\x01B\a\n" + + "\x05_uuid\"\xe3\x02\n" + "\x03Log\x12\x1c\n" + "\ttimestamp\x18\x01 \x01(\x12R\ttimestamp\x12H\n" + "\n" + @@ -1155,7 +1478,13 @@ const file_datadog_stateful_stateful_encoding_proto_rawDesc = "" + "\x06status\x18\x06 \x01(\v2%.datadog.intake.stateful.DynamicValueR\x06status\x12?\n" + "\aservice\x18\a \x01(\v2%.datadog.intake.stateful.DynamicValueR\aserviceB\t\n" + "\acontentB\a\n" + - "\x05_uuid\"\xfc\x02\n" + + "\x05_uuid\"i\n" + + "\x10JsonSchemaDefine\x12\x1b\n" + + "\tschema_id\x18\x01 \x01(\x04R\bschemaId\x12\x12\n" + + "\x04keys\x18\x02 \x03(\x04R\x04keys\x12$\n" + + "\x0emessage_key_id\x18\x03 \x01(\x04R\fmessageKeyId\"/\n" + + "\x10JsonSchemaDelete\x12\x1b\n" + + "\tschema_id\x18\x01 \x01(\x04R\bschemaId\"\xfc\x02\n" + "\rStructuredLog\x12\x1d\n" + "\n" + "pattern_id\x18\x01 \x01(\x04R\tpatternId\x12L\n" + @@ -1175,19 +1504,26 @@ const file_datadog_stateful_stateful_encoding_proto_rawDesc = "" + "dict_index\x18\x04 \x01(\x04H\x00R\tdictIndex\x12&\n" + "\x0eraw_json_value\x18\a \x01(\fH\x00R\frawJsonValue\x12(\n" + "\x10render_as_string\x18\x05 \x01(\bR\x0erenderAsStringB\a\n" + - "\x05value\"\x85\x01\n" + + "\x05value\"\x81\x02\n" + "\x11DeltaEncodingSync\x12\x1c\n" + "\ttimestamp\x18\x01 \x01(\x04R\ttimestamp\x12\x1d\n" + "\n" + "pattern_id\x18\x02 \x01(\x04R\tpatternId\x123\n" + - "\x04tags\x18\x03 \x01(\v2\x1f.datadog.intake.stateful.TagSetR\x04tags\"\xf3\x03\n" + + "\x04tags\x18\x03 \x01(\v2\x1f.datadog.intake.stateful.TagSetR\x04tags\x12\x16\n" + + "\x06status\x18\x04 \x01(\x04R\x06status\x12\x18\n" + + "\aservice\x18\x05 \x01(\x04R\aservice\x12\"\n" + + "\rflat_log_tags\x18\x06 \x01(\x04R\vflatLogTags\x12$\n" + + "\x0ejson_schema_id\x18\a \x01(\x04R\fjsonSchemaId\"\xe8\x05\n" + "\x05Datum\x12O\n" + "\x0epattern_define\x18\x01 \x01(\v2&.datadog.intake.stateful.PatternDefineH\x00R\rpatternDefine\x12O\n" + "\x0epattern_delete\x18\x02 \x01(\v2&.datadog.intake.stateful.PatternDeleteH\x00R\rpatternDelete\x12V\n" + "\x11dict_entry_define\x18\x03 \x01(\v2(.datadog.intake.stateful.DictEntryDefineH\x00R\x0fdictEntryDefine\x12V\n" + "\x11dict_entry_delete\x18\x04 \x01(\v2(.datadog.intake.stateful.DictEntryDeleteH\x00R\x0fdictEntryDelete\x12\\\n" + "\x13delta_encoding_sync\x18\x05 \x01(\v2*.datadog.intake.stateful.DeltaEncodingSyncH\x00R\x11deltaEncodingSync\x122\n" + - "\x04logs\x18\x06 \x01(\v2\x1c.datadog.intake.stateful.LogH\x00R\x04logsB\x06\n" + + "\x04logs\x18\x06 \x01(\v2\x1c.datadog.intake.stateful.LogH\x00R\x04logs\x12=\n" + + "\bflat_log\x18\a \x01(\v2 .datadog.intake.stateful.FlatLogH\x00R\aflatLog\x12Y\n" + + "\x12json_schema_define\x18\b \x01(\v2).datadog.intake.stateful.JsonSchemaDefineH\x00R\x10jsonSchemaDefine\x12Y\n" + + "\x12json_schema_delete\x18\t \x01(\v2).datadog.intake.stateful.JsonSchemaDeleteH\x00R\x10jsonSchemaDeleteB\x06\n" + "\x04data\"C\n" + "\rDatumSequence\x122\n" + "\x04data\x18\x01 \x03(\v2\x1e.datadog.intake.stateful.DatumR\x04data\">\n" + @@ -1217,7 +1553,7 @@ func file_datadog_stateful_stateful_encoding_proto_rawDescGZIP() []byte { } var file_datadog_stateful_stateful_encoding_proto_enumTypes = make([]protoimpl.EnumInfo, 1) -var file_datadog_stateful_stateful_encoding_proto_msgTypes = make([]protoimpl.MessageInfo, 14) +var file_datadog_stateful_stateful_encoding_proto_msgTypes = make([]protoimpl.MessageInfo, 17) var file_datadog_stateful_stateful_encoding_proto_goTypes = []any{ (BatchStatus_Status)(0), // 0: datadog.intake.stateful.BatchStatus.Status (*DictEntryDefine)(nil), // 1: datadog.intake.stateful.DictEntryDefine @@ -1226,42 +1562,50 @@ var file_datadog_stateful_stateful_encoding_proto_goTypes = []any{ (*PatternDelete)(nil), // 4: datadog.intake.stateful.PatternDelete (*TagSet)(nil), // 5: datadog.intake.stateful.TagSet (*Tag)(nil), // 6: datadog.intake.stateful.Tag - (*Log)(nil), // 7: datadog.intake.stateful.Log - (*StructuredLog)(nil), // 8: datadog.intake.stateful.StructuredLog - (*DynamicValue)(nil), // 9: datadog.intake.stateful.DynamicValue - (*DeltaEncodingSync)(nil), // 10: datadog.intake.stateful.DeltaEncodingSync - (*Datum)(nil), // 11: datadog.intake.stateful.Datum - (*DatumSequence)(nil), // 12: datadog.intake.stateful.DatumSequence - (*StatefulBatch)(nil), // 13: datadog.intake.stateful.StatefulBatch - (*BatchStatus)(nil), // 14: datadog.intake.stateful.BatchStatus + (*FlatLog)(nil), // 7: datadog.intake.stateful.FlatLog + (*Log)(nil), // 8: datadog.intake.stateful.Log + (*JsonSchemaDefine)(nil), // 9: datadog.intake.stateful.JsonSchemaDefine + (*JsonSchemaDelete)(nil), // 10: datadog.intake.stateful.JsonSchemaDelete + (*StructuredLog)(nil), // 11: datadog.intake.stateful.StructuredLog + (*DynamicValue)(nil), // 12: datadog.intake.stateful.DynamicValue + (*DeltaEncodingSync)(nil), // 13: datadog.intake.stateful.DeltaEncodingSync + (*Datum)(nil), // 14: datadog.intake.stateful.Datum + (*DatumSequence)(nil), // 15: datadog.intake.stateful.DatumSequence + (*StatefulBatch)(nil), // 16: datadog.intake.stateful.StatefulBatch + (*BatchStatus)(nil), // 17: datadog.intake.stateful.BatchStatus } var file_datadog_stateful_stateful_encoding_proto_depIdxs = []int32{ - 9, // 0: datadog.intake.stateful.TagSet.tagset:type_name -> datadog.intake.stateful.DynamicValue - 9, // 1: datadog.intake.stateful.Tag.key:type_name -> datadog.intake.stateful.DynamicValue - 9, // 2: datadog.intake.stateful.Tag.value:type_name -> datadog.intake.stateful.DynamicValue - 8, // 3: datadog.intake.stateful.Log.structured:type_name -> datadog.intake.stateful.StructuredLog - 5, // 4: datadog.intake.stateful.Log.tags:type_name -> datadog.intake.stateful.TagSet - 9, // 5: datadog.intake.stateful.Log.status:type_name -> datadog.intake.stateful.DynamicValue - 9, // 6: datadog.intake.stateful.Log.service:type_name -> datadog.intake.stateful.DynamicValue - 9, // 7: datadog.intake.stateful.StructuredLog.dynamic_values:type_name -> datadog.intake.stateful.DynamicValue - 9, // 8: datadog.intake.stateful.StructuredLog.json_message_key:type_name -> datadog.intake.stateful.DynamicValue - 9, // 9: datadog.intake.stateful.StructuredLog.json_context_values:type_name -> datadog.intake.stateful.DynamicValue - 5, // 10: datadog.intake.stateful.DeltaEncodingSync.tags:type_name -> datadog.intake.stateful.TagSet - 3, // 11: datadog.intake.stateful.Datum.pattern_define:type_name -> datadog.intake.stateful.PatternDefine - 4, // 12: datadog.intake.stateful.Datum.pattern_delete:type_name -> datadog.intake.stateful.PatternDelete - 1, // 13: datadog.intake.stateful.Datum.dict_entry_define:type_name -> datadog.intake.stateful.DictEntryDefine - 2, // 14: datadog.intake.stateful.Datum.dict_entry_delete:type_name -> datadog.intake.stateful.DictEntryDelete - 10, // 15: datadog.intake.stateful.Datum.delta_encoding_sync:type_name -> datadog.intake.stateful.DeltaEncodingSync - 7, // 16: datadog.intake.stateful.Datum.logs:type_name -> datadog.intake.stateful.Log - 11, // 17: datadog.intake.stateful.DatumSequence.data:type_name -> datadog.intake.stateful.Datum - 0, // 18: datadog.intake.stateful.BatchStatus.status:type_name -> datadog.intake.stateful.BatchStatus.Status - 13, // 19: datadog.intake.stateful.StatefulLogsService.LogsStream:input_type -> datadog.intake.stateful.StatefulBatch - 14, // 20: datadog.intake.stateful.StatefulLogsService.LogsStream:output_type -> datadog.intake.stateful.BatchStatus - 20, // [20:21] is the sub-list for method output_type - 19, // [19:20] is the sub-list for method input_type - 19, // [19:19] is the sub-list for extension type_name - 19, // [19:19] is the sub-list for extension extendee - 0, // [0:19] is the sub-list for field type_name + 12, // 0: datadog.intake.stateful.TagSet.tagset:type_name -> datadog.intake.stateful.DynamicValue + 12, // 1: datadog.intake.stateful.Tag.key:type_name -> datadog.intake.stateful.DynamicValue + 12, // 2: datadog.intake.stateful.Tag.value:type_name -> datadog.intake.stateful.DynamicValue + 12, // 3: datadog.intake.stateful.FlatLog.dynamic_values:type_name -> datadog.intake.stateful.DynamicValue + 12, // 4: datadog.intake.stateful.FlatLog.json_context_values:type_name -> datadog.intake.stateful.DynamicValue + 11, // 5: datadog.intake.stateful.Log.structured:type_name -> datadog.intake.stateful.StructuredLog + 5, // 6: datadog.intake.stateful.Log.tags:type_name -> datadog.intake.stateful.TagSet + 12, // 7: datadog.intake.stateful.Log.status:type_name -> datadog.intake.stateful.DynamicValue + 12, // 8: datadog.intake.stateful.Log.service:type_name -> datadog.intake.stateful.DynamicValue + 12, // 9: datadog.intake.stateful.StructuredLog.dynamic_values:type_name -> datadog.intake.stateful.DynamicValue + 12, // 10: datadog.intake.stateful.StructuredLog.json_message_key:type_name -> datadog.intake.stateful.DynamicValue + 12, // 11: datadog.intake.stateful.StructuredLog.json_context_values:type_name -> datadog.intake.stateful.DynamicValue + 5, // 12: datadog.intake.stateful.DeltaEncodingSync.tags:type_name -> datadog.intake.stateful.TagSet + 3, // 13: datadog.intake.stateful.Datum.pattern_define:type_name -> datadog.intake.stateful.PatternDefine + 4, // 14: datadog.intake.stateful.Datum.pattern_delete:type_name -> datadog.intake.stateful.PatternDelete + 1, // 15: datadog.intake.stateful.Datum.dict_entry_define:type_name -> datadog.intake.stateful.DictEntryDefine + 2, // 16: datadog.intake.stateful.Datum.dict_entry_delete:type_name -> datadog.intake.stateful.DictEntryDelete + 13, // 17: datadog.intake.stateful.Datum.delta_encoding_sync:type_name -> datadog.intake.stateful.DeltaEncodingSync + 8, // 18: datadog.intake.stateful.Datum.logs:type_name -> datadog.intake.stateful.Log + 7, // 19: datadog.intake.stateful.Datum.flat_log:type_name -> datadog.intake.stateful.FlatLog + 9, // 20: datadog.intake.stateful.Datum.json_schema_define:type_name -> datadog.intake.stateful.JsonSchemaDefine + 10, // 21: datadog.intake.stateful.Datum.json_schema_delete:type_name -> datadog.intake.stateful.JsonSchemaDelete + 14, // 22: datadog.intake.stateful.DatumSequence.data:type_name -> datadog.intake.stateful.Datum + 0, // 23: datadog.intake.stateful.BatchStatus.status:type_name -> datadog.intake.stateful.BatchStatus.Status + 16, // 24: datadog.intake.stateful.StatefulLogsService.LogsStream:input_type -> datadog.intake.stateful.StatefulBatch + 17, // 25: datadog.intake.stateful.StatefulLogsService.LogsStream:output_type -> datadog.intake.stateful.BatchStatus + 25, // [25:26] is the sub-list for method output_type + 24, // [24:25] is the sub-list for method input_type + 24, // [24:24] is the sub-list for extension type_name + 24, // [24:24] is the sub-list for extension extendee + 0, // [0:24] is the sub-list for field type_name } func init() { file_datadog_stateful_stateful_encoding_proto_init() } @@ -1269,11 +1613,12 @@ func file_datadog_stateful_stateful_encoding_proto_init() { if File_datadog_stateful_stateful_encoding_proto != nil { return } - file_datadog_stateful_stateful_encoding_proto_msgTypes[6].OneofWrappers = []any{ + file_datadog_stateful_stateful_encoding_proto_msgTypes[6].OneofWrappers = []any{} + file_datadog_stateful_stateful_encoding_proto_msgTypes[7].OneofWrappers = []any{ (*Log_Structured)(nil), (*Log_Raw)(nil), } - file_datadog_stateful_stateful_encoding_proto_msgTypes[8].OneofWrappers = []any{ + file_datadog_stateful_stateful_encoding_proto_msgTypes[11].OneofWrappers = []any{ (*DynamicValue_IntValue)(nil), (*DynamicValue_FloatValue)(nil), (*DynamicValue_BoolValue)(nil), @@ -1281,13 +1626,16 @@ func file_datadog_stateful_stateful_encoding_proto_init() { (*DynamicValue_DictIndex)(nil), (*DynamicValue_RawJsonValue)(nil), } - file_datadog_stateful_stateful_encoding_proto_msgTypes[10].OneofWrappers = []any{ + file_datadog_stateful_stateful_encoding_proto_msgTypes[13].OneofWrappers = []any{ (*Datum_PatternDefine)(nil), (*Datum_PatternDelete)(nil), (*Datum_DictEntryDefine)(nil), (*Datum_DictEntryDelete)(nil), (*Datum_DeltaEncodingSync)(nil), (*Datum_Logs)(nil), + (*Datum_FlatLog)(nil), + (*Datum_JsonSchemaDefine)(nil), + (*Datum_JsonSchemaDelete)(nil), } type x struct{} out := protoimpl.TypeBuilder{ @@ -1295,7 +1643,7 @@ func file_datadog_stateful_stateful_encoding_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_datadog_stateful_stateful_encoding_proto_rawDesc), len(file_datadog_stateful_stateful_encoding_proto_rawDesc)), NumEnums: 1, - NumMessages: 14, + NumMessages: 17, NumExtensions: 0, NumServices: 1, }, diff --git a/pkg/proto/pbgo/statefulpb/stateful_encoding_vtproto.pb.go b/pkg/proto/pbgo/statefulpb/stateful_encoding_vtproto.pb.go index 2df8c107342b..a3bbb1cd1df6 100644 --- a/pkg/proto/pbgo/statefulpb/stateful_encoding_vtproto.pb.go +++ b/pkg/proto/pbgo/statefulpb/stateful_encoding_vtproto.pb.go @@ -307,6 +307,109 @@ func (m *Tag) MarshalToSizedBufferVT(dAtA []byte) (int, error) { return len(dAtA) - i, nil } +func (m *FlatLog) MarshalVT() (dAtA []byte, err error) { + if m == nil { + return nil, nil + } + size := m.SizeVT() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBufferVT(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *FlatLog) MarshalToVT(dAtA []byte) (int, error) { + size := m.SizeVT() + return m.MarshalToSizedBufferVT(dAtA[:size]) +} + +func (m *FlatLog) MarshalToSizedBufferVT(dAtA []byte) (int, error) { + if m == nil { + return 0, nil + } + i := len(dAtA) + _ = i + var l int + _ = l + if m.unknownFields != nil { + i -= len(m.unknownFields) + copy(dAtA[i:], m.unknownFields) + } + if m.Uuid != nil { + i -= len(*m.Uuid) + copy(dAtA[i:], *m.Uuid) + i = protohelpers.EncodeVarint(dAtA, i, uint64(len(*m.Uuid))) + i-- + dAtA[i] = 0x6 + i-- + dAtA[i] = 0xa2 + } + if len(m.JsonContextValues) > 0 { + for iNdEx := len(m.JsonContextValues) - 1; iNdEx >= 0; iNdEx-- { + size, err := m.JsonContextValues[iNdEx].MarshalToSizedBufferVT(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = protohelpers.EncodeVarint(dAtA, i, uint64(size)) + i-- + dAtA[i] = 0x4a + } + } + if m.JsonSchemaId != 0 { + i = protohelpers.EncodeVarint(dAtA, i, uint64(m.JsonSchemaId)) + i-- + dAtA[i] = 0x40 + } + if len(m.RawLog) > 0 { + i -= len(m.RawLog) + copy(dAtA[i:], m.RawLog) + i = protohelpers.EncodeVarint(dAtA, i, uint64(len(m.RawLog))) + i-- + dAtA[i] = 0x3a + } + if len(m.DynamicValues) > 0 { + for iNdEx := len(m.DynamicValues) - 1; iNdEx >= 0; iNdEx-- { + size, err := m.DynamicValues[iNdEx].MarshalToSizedBufferVT(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = protohelpers.EncodeVarint(dAtA, i, uint64(size)) + i-- + dAtA[i] = 0x32 + } + } + if m.PatternId != 0 { + i = protohelpers.EncodeVarint(dAtA, i, uint64(m.PatternId)) + i-- + dAtA[i] = 0x28 + } + if m.Tags != 0 { + i = protohelpers.EncodeVarint(dAtA, i, uint64(m.Tags)) + i-- + dAtA[i] = 0x20 + } + if m.Service != 0 { + i = protohelpers.EncodeVarint(dAtA, i, uint64(m.Service)) + i-- + dAtA[i] = 0x18 + } + if m.Status != 0 { + i = protohelpers.EncodeVarint(dAtA, i, uint64(m.Status)) + i-- + dAtA[i] = 0x10 + } + if m.Timestamp != 0 { + i = protohelpers.EncodeVarint(dAtA, i, uint64((uint64(m.Timestamp)<<1)^uint64((m.Timestamp>>63)))) + i-- + dAtA[i] = 0x8 + } + return len(dAtA) - i, nil +} + func (m *Log) MarshalVT() (dAtA []byte, err error) { if m == nil { return nil, nil @@ -428,6 +531,107 @@ func (m *Log_Raw) MarshalToSizedBufferVT(dAtA []byte) (int, error) { dAtA[i] = 0x1a return len(dAtA) - i, nil } +func (m *JsonSchemaDefine) MarshalVT() (dAtA []byte, err error) { + if m == nil { + return nil, nil + } + size := m.SizeVT() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBufferVT(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *JsonSchemaDefine) MarshalToVT(dAtA []byte) (int, error) { + size := m.SizeVT() + return m.MarshalToSizedBufferVT(dAtA[:size]) +} + +func (m *JsonSchemaDefine) MarshalToSizedBufferVT(dAtA []byte) (int, error) { + if m == nil { + return 0, nil + } + i := len(dAtA) + _ = i + var l int + _ = l + if m.unknownFields != nil { + i -= len(m.unknownFields) + copy(dAtA[i:], m.unknownFields) + } + if m.MessageKeyId != 0 { + i = protohelpers.EncodeVarint(dAtA, i, uint64(m.MessageKeyId)) + i-- + dAtA[i] = 0x18 + } + if len(m.Keys) > 0 { + var pksize2 int + for _, num := range m.Keys { + pksize2 += protohelpers.SizeOfVarint(uint64(num)) + } + i -= pksize2 + j1 := i + for _, num := range m.Keys { + for num >= 1<<7 { + dAtA[j1] = uint8(uint64(num)&0x7f | 0x80) + num >>= 7 + j1++ + } + dAtA[j1] = uint8(num) + j1++ + } + i = protohelpers.EncodeVarint(dAtA, i, uint64(pksize2)) + i-- + dAtA[i] = 0x12 + } + if m.SchemaId != 0 { + i = protohelpers.EncodeVarint(dAtA, i, uint64(m.SchemaId)) + i-- + dAtA[i] = 0x8 + } + return len(dAtA) - i, nil +} + +func (m *JsonSchemaDelete) MarshalVT() (dAtA []byte, err error) { + if m == nil { + return nil, nil + } + size := m.SizeVT() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBufferVT(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *JsonSchemaDelete) MarshalToVT(dAtA []byte) (int, error) { + size := m.SizeVT() + return m.MarshalToSizedBufferVT(dAtA[:size]) +} + +func (m *JsonSchemaDelete) MarshalToSizedBufferVT(dAtA []byte) (int, error) { + if m == nil { + return 0, nil + } + i := len(dAtA) + _ = i + var l int + _ = l + if m.unknownFields != nil { + i -= len(m.unknownFields) + copy(dAtA[i:], m.unknownFields) + } + if m.SchemaId != 0 { + i = protohelpers.EncodeVarint(dAtA, i, uint64(m.SchemaId)) + i-- + dAtA[i] = 0x8 + } + return len(dAtA) - i, nil +} + func (m *StructuredLog) MarshalVT() (dAtA []byte, err error) { if m == nil { return nil, nil @@ -676,6 +880,26 @@ func (m *DeltaEncodingSync) MarshalToSizedBufferVT(dAtA []byte) (int, error) { i -= len(m.unknownFields) copy(dAtA[i:], m.unknownFields) } + if m.JsonSchemaId != 0 { + i = protohelpers.EncodeVarint(dAtA, i, uint64(m.JsonSchemaId)) + i-- + dAtA[i] = 0x38 + } + if m.FlatLogTags != 0 { + i = protohelpers.EncodeVarint(dAtA, i, uint64(m.FlatLogTags)) + i-- + dAtA[i] = 0x30 + } + if m.Service != 0 { + i = protohelpers.EncodeVarint(dAtA, i, uint64(m.Service)) + i-- + dAtA[i] = 0x28 + } + if m.Status != 0 { + i = protohelpers.EncodeVarint(dAtA, i, uint64(m.Status)) + i-- + dAtA[i] = 0x20 + } if m.Tags != nil { size, err := m.Tags.MarshalToSizedBufferVT(dAtA[:i]) if err != nil { @@ -879,6 +1103,75 @@ func (m *Datum_Logs) MarshalToSizedBufferVT(dAtA []byte) (int, error) { } return len(dAtA) - i, nil } +func (m *Datum_FlatLog) MarshalToVT(dAtA []byte) (int, error) { + size := m.SizeVT() + return m.MarshalToSizedBufferVT(dAtA[:size]) +} + +func (m *Datum_FlatLog) MarshalToSizedBufferVT(dAtA []byte) (int, error) { + i := len(dAtA) + if m.FlatLog != nil { + size, err := m.FlatLog.MarshalToSizedBufferVT(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = protohelpers.EncodeVarint(dAtA, i, uint64(size)) + i-- + dAtA[i] = 0x3a + } else { + i = protohelpers.EncodeVarint(dAtA, i, 0) + i-- + dAtA[i] = 0x3a + } + return len(dAtA) - i, nil +} +func (m *Datum_JsonSchemaDefine) MarshalToVT(dAtA []byte) (int, error) { + size := m.SizeVT() + return m.MarshalToSizedBufferVT(dAtA[:size]) +} + +func (m *Datum_JsonSchemaDefine) MarshalToSizedBufferVT(dAtA []byte) (int, error) { + i := len(dAtA) + if m.JsonSchemaDefine != nil { + size, err := m.JsonSchemaDefine.MarshalToSizedBufferVT(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = protohelpers.EncodeVarint(dAtA, i, uint64(size)) + i-- + dAtA[i] = 0x42 + } else { + i = protohelpers.EncodeVarint(dAtA, i, 0) + i-- + dAtA[i] = 0x42 + } + return len(dAtA) - i, nil +} +func (m *Datum_JsonSchemaDelete) MarshalToVT(dAtA []byte) (int, error) { + size := m.SizeVT() + return m.MarshalToSizedBufferVT(dAtA[:size]) +} + +func (m *Datum_JsonSchemaDelete) MarshalToSizedBufferVT(dAtA []byte) (int, error) { + i := len(dAtA) + if m.JsonSchemaDelete != nil { + size, err := m.JsonSchemaDelete.MarshalToSizedBufferVT(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = protohelpers.EncodeVarint(dAtA, i, uint64(size)) + i-- + dAtA[i] = 0x4a + } else { + i = protohelpers.EncodeVarint(dAtA, i, 0) + i-- + dAtA[i] = 0x4a + } + return len(dAtA) - i, nil +} func (m *DatumSequence) MarshalVT() (dAtA []byte, err error) { if m == nil { return nil, nil @@ -1114,7 +1407,7 @@ func (m *Tag) SizeVT() (n int) { return n } -func (m *Log) SizeVT() (n int) { +func (m *FlatLog) SizeVT() (n int) { if m == nil { return 0 } @@ -1123,26 +1416,74 @@ func (m *Log) SizeVT() (n int) { if m.Timestamp != 0 { n += 1 + protohelpers.SizeOfZigzag(uint64(m.Timestamp)) } - if vtmsg, ok := m.Content.(interface{ SizeVT() int }); ok { - n += vtmsg.SizeVT() - } - if m.Tags != nil { - l = m.Tags.SizeVT() - n += 1 + l + protohelpers.SizeOfVarint(uint64(l)) + if m.Status != 0 { + n += 1 + protohelpers.SizeOfVarint(uint64(m.Status)) } - if m.Uuid != nil { - l = len(*m.Uuid) - n += 1 + l + protohelpers.SizeOfVarint(uint64(l)) + if m.Service != 0 { + n += 1 + protohelpers.SizeOfVarint(uint64(m.Service)) } - if m.Status != nil { - l = m.Status.SizeVT() - n += 1 + l + protohelpers.SizeOfVarint(uint64(l)) + if m.Tags != 0 { + n += 1 + protohelpers.SizeOfVarint(uint64(m.Tags)) } - if m.Service != nil { - l = m.Service.SizeVT() - n += 1 + l + protohelpers.SizeOfVarint(uint64(l)) + if m.PatternId != 0 { + n += 1 + protohelpers.SizeOfVarint(uint64(m.PatternId)) } - n += len(m.unknownFields) + if len(m.DynamicValues) > 0 { + for _, e := range m.DynamicValues { + l = e.SizeVT() + n += 1 + l + protohelpers.SizeOfVarint(uint64(l)) + } + } + l = len(m.RawLog) + if l > 0 { + n += 1 + l + protohelpers.SizeOfVarint(uint64(l)) + } + if m.JsonSchemaId != 0 { + n += 1 + protohelpers.SizeOfVarint(uint64(m.JsonSchemaId)) + } + if len(m.JsonContextValues) > 0 { + for _, e := range m.JsonContextValues { + l = e.SizeVT() + n += 1 + l + protohelpers.SizeOfVarint(uint64(l)) + } + } + if m.Uuid != nil { + l = len(*m.Uuid) + n += 2 + l + protohelpers.SizeOfVarint(uint64(l)) + } + n += len(m.unknownFields) + return n +} + +func (m *Log) SizeVT() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Timestamp != 0 { + n += 1 + protohelpers.SizeOfZigzag(uint64(m.Timestamp)) + } + if vtmsg, ok := m.Content.(interface{ SizeVT() int }); ok { + n += vtmsg.SizeVT() + } + if m.Tags != nil { + l = m.Tags.SizeVT() + n += 1 + l + protohelpers.SizeOfVarint(uint64(l)) + } + if m.Uuid != nil { + l = len(*m.Uuid) + n += 1 + l + protohelpers.SizeOfVarint(uint64(l)) + } + if m.Status != nil { + l = m.Status.SizeVT() + n += 1 + l + protohelpers.SizeOfVarint(uint64(l)) + } + if m.Service != nil { + l = m.Service.SizeVT() + n += 1 + l + protohelpers.SizeOfVarint(uint64(l)) + } + n += len(m.unknownFields) return n } @@ -1170,6 +1511,42 @@ func (m *Log_Raw) SizeVT() (n int) { n += 1 + l + protohelpers.SizeOfVarint(uint64(l)) return n } +func (m *JsonSchemaDefine) SizeVT() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.SchemaId != 0 { + n += 1 + protohelpers.SizeOfVarint(uint64(m.SchemaId)) + } + if len(m.Keys) > 0 { + l = 0 + for _, e := range m.Keys { + l += protohelpers.SizeOfVarint(uint64(e)) + } + n += 1 + protohelpers.SizeOfVarint(uint64(l)) + l + } + if m.MessageKeyId != 0 { + n += 1 + protohelpers.SizeOfVarint(uint64(m.MessageKeyId)) + } + n += len(m.unknownFields) + return n +} + +func (m *JsonSchemaDelete) SizeVT() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.SchemaId != 0 { + n += 1 + protohelpers.SizeOfVarint(uint64(m.SchemaId)) + } + n += len(m.unknownFields) + return n +} + func (m *StructuredLog) SizeVT() (n int) { if m == nil { return 0 @@ -1294,6 +1671,18 @@ func (m *DeltaEncodingSync) SizeVT() (n int) { l = m.Tags.SizeVT() n += 1 + l + protohelpers.SizeOfVarint(uint64(l)) } + if m.Status != 0 { + n += 1 + protohelpers.SizeOfVarint(uint64(m.Status)) + } + if m.Service != 0 { + n += 1 + protohelpers.SizeOfVarint(uint64(m.Service)) + } + if m.FlatLogTags != 0 { + n += 1 + protohelpers.SizeOfVarint(uint64(m.FlatLogTags)) + } + if m.JsonSchemaId != 0 { + n += 1 + protohelpers.SizeOfVarint(uint64(m.JsonSchemaId)) + } n += len(m.unknownFields) return n } @@ -1395,6 +1784,48 @@ func (m *Datum_Logs) SizeVT() (n int) { } return n } +func (m *Datum_FlatLog) SizeVT() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.FlatLog != nil { + l = m.FlatLog.SizeVT() + n += 1 + l + protohelpers.SizeOfVarint(uint64(l)) + } else { + n += 3 + } + return n +} +func (m *Datum_JsonSchemaDefine) SizeVT() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.JsonSchemaDefine != nil { + l = m.JsonSchemaDefine.SizeVT() + n += 1 + l + protohelpers.SizeOfVarint(uint64(l)) + } else { + n += 3 + } + return n +} +func (m *Datum_JsonSchemaDelete) SizeVT() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.JsonSchemaDelete != nil { + l = m.JsonSchemaDelete.SizeVT() + n += 1 + l + protohelpers.SizeOfVarint(uint64(l)) + } else { + n += 3 + } + return n +} func (m *DatumSequence) SizeVT() (n int) { if m == nil { return 0 @@ -2093,7 +2524,7 @@ func (m *Tag) UnmarshalVT(dAtA []byte) error { } return nil } -func (m *Log) UnmarshalVT(dAtA []byte) error { +func (m *FlatLog) UnmarshalVT(dAtA []byte) error { l := len(dAtA) iNdEx := 0 for iNdEx < l { @@ -2116,10 +2547,10 @@ func (m *Log) UnmarshalVT(dAtA []byte) error { fieldNum := int32(wire >> 3) wireType := int(wire & 0x7) if wireType == 4 { - return fmt.Errorf("proto: Log: wiretype end group for non-group") + return fmt.Errorf("proto: FlatLog: wiretype end group for non-group") } if fieldNum <= 0 { - return fmt.Errorf("proto: Log: illegal tag %d (wire type %d)", fieldNum, wire) + return fmt.Errorf("proto: FlatLog: illegal tag %d (wire type %d)", fieldNum, wire) } switch fieldNum { case 1: @@ -2144,8 +2575,84 @@ func (m *Log) UnmarshalVT(dAtA []byte) error { v = (v >> 1) ^ uint64((int64(v&1)<<63)>>63) m.Timestamp = int64(v) case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Status", wireType) + } + m.Status = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Status |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 3: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Service", wireType) + } + m.Service = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Service |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 4: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Tags", wireType) + } + m.Tags = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Tags |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 5: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field PatternId", wireType) + } + m.PatternId = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.PatternId |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 6: if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Structured", wireType) + return fmt.Errorf("proto: wrong wireType = %d for field DynamicValues", wireType) } var msglen int for shift := uint(0); ; shift += 7 { @@ -2172,21 +2679,14 @@ func (m *Log) UnmarshalVT(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - if oneof, ok := m.Content.(*Log_Structured); ok { - if err := oneof.Structured.UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil { - return err - } - } else { - v := &StructuredLog{} - if err := v.UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil { - return err - } - m.Content = &Log_Structured{Structured: v} + m.DynamicValues = append(m.DynamicValues, &DynamicValue{}) + if err := m.DynamicValues[len(m.DynamicValues)-1].UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil { + return err } iNdEx = postIndex - case 3: + case 7: if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Raw", wireType) + return fmt.Errorf("proto: wrong wireType = %d for field RawLog", wireType) } var stringLen uint64 for shift := uint(0); ; shift += 7 { @@ -2214,11 +2714,30 @@ func (m *Log) UnmarshalVT(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - m.Content = &Log_Raw{Raw: string(dAtA[iNdEx:postIndex])} + m.RawLog = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex - case 4: + case 8: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field JsonSchemaId", wireType) + } + m.JsonSchemaId = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.JsonSchemaId |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 9: if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Tags", wireType) + return fmt.Errorf("proto: wrong wireType = %d for field JsonContextValues", wireType) } var msglen int for shift := uint(0); ; shift += 7 { @@ -2245,14 +2764,12 @@ func (m *Log) UnmarshalVT(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - if m.Tags == nil { - m.Tags = &TagSet{} - } - if err := m.Tags.UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil { + m.JsonContextValues = append(m.JsonContextValues, &DynamicValue{}) + if err := m.JsonContextValues[len(m.JsonContextValues)-1].UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil { return err } iNdEx = postIndex - case 5: + case 100: if wireType != 2 { return fmt.Errorf("proto: wrong wireType = %d for field Uuid", wireType) } @@ -2285,11 +2802,443 @@ func (m *Log) UnmarshalVT(dAtA []byte) error { s := string(dAtA[iNdEx:postIndex]) m.Uuid = &s iNdEx = postIndex - case 6: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Status", wireType) + default: + iNdEx = preIndex + skippy, err := protohelpers.Skip(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return protohelpers.ErrInvalidLength + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.unknownFields = append(m.unknownFields, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *Log) UnmarshalVT(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: Log: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: Log: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Timestamp", wireType) + } + var v uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + v = (v >> 1) ^ uint64((int64(v&1)<<63)>>63) + m.Timestamp = int64(v) + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Structured", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return protohelpers.ErrInvalidLength + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return protohelpers.ErrInvalidLength + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if oneof, ok := m.Content.(*Log_Structured); ok { + if err := oneof.Structured.UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil { + return err + } + } else { + v := &StructuredLog{} + if err := v.UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil { + return err + } + m.Content = &Log_Structured{Structured: v} + } + iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Raw", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return protohelpers.ErrInvalidLength + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return protohelpers.ErrInvalidLength + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Content = &Log_Raw{Raw: string(dAtA[iNdEx:postIndex])} + iNdEx = postIndex + case 4: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Tags", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return protohelpers.ErrInvalidLength + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return protohelpers.ErrInvalidLength + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Tags == nil { + m.Tags = &TagSet{} + } + if err := m.Tags.UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 5: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Uuid", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return protohelpers.ErrInvalidLength + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return protohelpers.ErrInvalidLength + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + s := string(dAtA[iNdEx:postIndex]) + m.Uuid = &s + iNdEx = postIndex + case 6: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Status", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return protohelpers.ErrInvalidLength + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return protohelpers.ErrInvalidLength + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Status == nil { + m.Status = &DynamicValue{} + } + if err := m.Status.UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 7: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Service", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return protohelpers.ErrInvalidLength + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return protohelpers.ErrInvalidLength + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Service == nil { + m.Service = &DynamicValue{} + } + if err := m.Service.UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := protohelpers.Skip(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return protohelpers.ErrInvalidLength + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.unknownFields = append(m.unknownFields, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *JsonSchemaDefine) UnmarshalVT(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: JsonSchemaDefine: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: JsonSchemaDefine: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field SchemaId", wireType) + } + m.SchemaId = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.SchemaId |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 2: + if wireType == 0 { + var v uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.Keys = append(m.Keys, v) + } else if wireType == 2 { + var packedLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + packedLen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if packedLen < 0 { + return protohelpers.ErrInvalidLength + } + postIndex := iNdEx + packedLen + if postIndex < 0 { + return protohelpers.ErrInvalidLength + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + var elementCount int + var count int + for _, integer := range dAtA[iNdEx:postIndex] { + if integer < 128 { + count++ + } + } + elementCount = count + if elementCount != 0 && len(m.Keys) == 0 { + m.Keys = make([]uint64, 0, elementCount) + } + for iNdEx < postIndex { + var v uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.Keys = append(m.Keys, v) + } + } else { + return fmt.Errorf("proto: wrong wireType = %d for field Keys", wireType) } - var msglen int + case 3: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field MessageKeyId", wireType) + } + m.MessageKeyId = 0 for shift := uint(0); ; shift += 7 { if shift >= 64 { return protohelpers.ErrIntOverflow @@ -2299,33 +3248,67 @@ func (m *Log) UnmarshalVT(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - msglen |= int(b&0x7F) << shift + m.MessageKeyId |= uint64(b&0x7F) << shift if b < 0x80 { break } } - if msglen < 0 { - return protohelpers.ErrInvalidLength + default: + iNdEx = preIndex + skippy, err := protohelpers.Skip(dAtA[iNdEx:]) + if err != nil { + return err } - postIndex := iNdEx + msglen - if postIndex < 0 { + if (skippy < 0) || (iNdEx+skippy) < 0 { return protohelpers.ErrInvalidLength } - if postIndex > l { + if (iNdEx + skippy) > l { return io.ErrUnexpectedEOF } - if m.Status == nil { - m.Status = &DynamicValue{} + m.unknownFields = append(m.unknownFields, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *JsonSchemaDelete) UnmarshalVT(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow } - if err := m.Status.UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil { - return err + if iNdEx >= l { + return io.ErrUnexpectedEOF } - iNdEx = postIndex - case 7: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Service", wireType) + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break } - var msglen int + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: JsonSchemaDelete: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: JsonSchemaDelete: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field SchemaId", wireType) + } + m.SchemaId = 0 for shift := uint(0); ; shift += 7 { if shift >= 64 { return protohelpers.ErrIntOverflow @@ -2335,28 +3318,11 @@ func (m *Log) UnmarshalVT(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - msglen |= int(b&0x7F) << shift + m.SchemaId |= uint64(b&0x7F) << shift if b < 0x80 { break } } - if msglen < 0 { - return protohelpers.ErrInvalidLength - } - postIndex := iNdEx + msglen - if postIndex < 0 { - return protohelpers.ErrInvalidLength - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - if m.Service == nil { - m.Service = &DynamicValue{} - } - if err := m.Service.UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil { - return err - } - iNdEx = postIndex default: iNdEx = preIndex skippy, err := protohelpers.Skip(dAtA[iNdEx:]) @@ -2917,6 +3883,82 @@ func (m *DeltaEncodingSync) UnmarshalVT(dAtA []byte) error { return err } iNdEx = postIndex + case 4: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Status", wireType) + } + m.Status = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Status |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 5: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Service", wireType) + } + m.Service = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Service |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 6: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field FlatLogTags", wireType) + } + m.FlatLogTags = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.FlatLogTags |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 7: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field JsonSchemaId", wireType) + } + m.JsonSchemaId = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.JsonSchemaId |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := protohelpers.Skip(dAtA[iNdEx:]) @@ -3214,6 +4256,129 @@ func (m *Datum) UnmarshalVT(dAtA []byte) error { m.Data = &Datum_Logs{Logs: v} } iNdEx = postIndex + case 7: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field FlatLog", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return protohelpers.ErrInvalidLength + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return protohelpers.ErrInvalidLength + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if oneof, ok := m.Data.(*Datum_FlatLog); ok { + if err := oneof.FlatLog.UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil { + return err + } + } else { + v := &FlatLog{} + if err := v.UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil { + return err + } + m.Data = &Datum_FlatLog{FlatLog: v} + } + iNdEx = postIndex + case 8: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field JsonSchemaDefine", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return protohelpers.ErrInvalidLength + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return protohelpers.ErrInvalidLength + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if oneof, ok := m.Data.(*Datum_JsonSchemaDefine); ok { + if err := oneof.JsonSchemaDefine.UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil { + return err + } + } else { + v := &JsonSchemaDefine{} + if err := v.UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil { + return err + } + m.Data = &Datum_JsonSchemaDefine{JsonSchemaDefine: v} + } + iNdEx = postIndex + case 9: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field JsonSchemaDelete", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return protohelpers.ErrInvalidLength + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return protohelpers.ErrInvalidLength + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if oneof, ok := m.Data.(*Datum_JsonSchemaDelete); ok { + if err := oneof.JsonSchemaDelete.UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil { + return err + } + } else { + v := &JsonSchemaDelete{} + if err := v.UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil { + return err + } + m.Data = &Datum_JsonSchemaDelete{JsonSchemaDelete: v} + } + iNdEx = postIndex default: iNdEx = preIndex skippy, err := protohelpers.Skip(dAtA[iNdEx:])