Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pkg/logs/sender/grpc/inflight.go
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,9 @@ func (t *inflightTracker) addMissingFlatLogReferences(missing stateReferences, k
}
t.addMissingJsonSchemaReference(missing, known, log.JsonSchemaId)
t.addMissingDynamicValueReferences(missing, known, log.JsonContextValues)
for _, dictID := range log.JsonContextDictValues {
t.addMissingDictEntryReference(missing, known, dictID)
}
}

func (t *inflightTracker) addMissingDeltaEncodingSyncReferences(missing stateReferences, known stateReferences, sync *statefulpb.DeltaEncodingSync) {
Expand Down Expand Up @@ -784,6 +787,9 @@ func addFlatLogReferences(refs stateReferences, log *statefulpb.FlatLog) {
}
addFlatLogJsonSchemaReference(refs, log.JsonSchemaId)
addDynamicValueReferences(refs, log.JsonContextValues)
for _, dictID := range log.JsonContextDictValues {
refs.addDictEntry(dictID)
}
}

func addDeltaEncodingSyncReferences(refs stateReferences, sync *statefulpb.DeltaEncodingSync) {
Expand Down
177 changes: 149 additions & 28 deletions pkg/logs/sender/grpc/mock_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,21 @@ const (
defaultTokenizeBatchSize = 20
)

const (
jsonValueKindNull byte = iota
jsonValueKindInt
jsonValueKindFloat
jsonValueKindBoolFalse
jsonValueKindBoolTrue
jsonValueKindString
jsonValueKindDict
jsonValueKindRaw
jsonValueKindIntAsString
jsonValueKindFloatAsString
jsonValueKindBoolFalseAsString
jsonValueKindBoolTrueAsString
)

// dvTypeBackings holds the three oneof wrapper types for a single DynamicValue in one
// contiguous allocation. Each wildcard position uses exactly one of the three fields;
// grouping them avoids three separate heap allocations per wildcard position.
Expand All @@ -82,6 +97,20 @@ type dvTypeBackings struct {
stringOneof statefulpb.DynamicValue_StringValue
}

type compactJSONContextValues struct {
kinds []byte
ints []int64
floats []float64
dicts []uint64
rawValues [][]byte
stringValues []string
}

type dictEntryDefinition struct {
id uint64
value string
}

type tagCacheEntry struct {
origin *message.Origin
hostname string
Expand Down Expand Up @@ -471,30 +500,19 @@ func (mt *MessageTranslator) processPreTokenized(msg *message.Message, tokenList
var messageKeyDV *statefulpb.DynamicValue
var jsonContextSchemaID uint64
var jsonContextValuesDV []*statefulpb.DynamicValue
var compactJSONContext compactJSONContextValues
if len(jsonContextKeys) > 0 {
messageKeyDV, jsonContextSchemaID = mt.sendJsonSchemaDefineIfNeeded(outputChan, msg, messageKey, jsonContextKeys)

jsonContextDVBacking := make([]statefulpb.DynamicValue, len(jsonContextValues))
jsonContextTypeBacking := make([]dvTypeBackings, len(jsonContextValues))
jsonContextValuesDV = make([]*statefulpb.DynamicValue, len(jsonContextValues))
for i := range jsonContextDVBacking {
jsonContextValuesDV[i] = &jsonContextDVBacking[i]
}
for i, val := range jsonContextValues {
dictID, dictValue, isNew := mt.fillDynamicValue(
&jsonContextDVBacking[i],
&jsonContextTypeBacking[i].intOneof,
&jsonContextTypeBacking[i].floatOneof,
&jsonContextTypeBacking[i].boolOneof,
&jsonContextTypeBacking[i].dictOneof,
&jsonContextTypeBacking[i].rawJSONOneof,
&jsonContextTypeBacking[i].stringOneof,
val,
)
if isNew {
mt.sendDictEntryDefine(outputChan, msg, dictID, dictValue)
}
var dictDefs []dictEntryDefinition
compactJSONContext, dictDefs = mt.compactJSONContextValues(jsonContextValues)
for _, dictDef := range dictDefs {
mt.sendDictEntryDefine(outputChan, msg, dictDef.id, dictDef.value)
}

// Keep the legacy field empty for FlatLog. Consumers that do not understand the compact
// streams should ignore the json schema when json_context_values is absent.
jsonContextValuesDV = nil
}

service, serviceDictID, serviceIsNew := mt.buildServiceField(msg)
Expand All @@ -515,7 +533,7 @@ func (mt *MessageTranslator) processPreTokenized(msg *message.Message, tokenList

// Send StructuredLog with all fields
tsMillis := ts.UnixNano() / nanoToMillis
mt.sendStructuredLog(outputChan, msg, tsMillis, patternID, dynamicValues, tagSet, service, statusDictID, messageKeyDV, jsonContextSchemaID, jsonContextValuesDV)
mt.sendStructuredLog(outputChan, msg, tsMillis, patternID, dynamicValues, tagSet, service, statusDictID, messageKeyDV, jsonContextSchemaID, jsonContextValuesDV, compactJSONContext)
}

// buildTagSet constructs the complete tag list for a message and encodes it as a TagSet.
Expand Down Expand Up @@ -881,8 +899,8 @@ func (mt *MessageTranslator) sendRawLog(outputChan chan *message.StatefulMessage
}

// sendStructuredLog creates and sends a StructuredLog datum
func (mt *MessageTranslator) sendStructuredLog(outputChan chan *message.StatefulMessage, msg *message.Message, timestamp int64, patternID uint64, dynamicValues []*statefulpb.DynamicValue, tagSet *statefulpb.TagSet, service *statefulpb.DynamicValue, statusDictID uint64, messageKey *statefulpb.DynamicValue, jsonContextSchemaID uint64, jsonContextValues []*statefulpb.DynamicValue) {
logDatum := buildStructuredLog(timestamp, patternID, dynamicValues, tagSet, msg.MessageMetadata.DualSendUUID, service, statusDictID, messageKey, jsonContextSchemaID, jsonContextValues)
func (mt *MessageTranslator) sendStructuredLog(outputChan chan *message.StatefulMessage, msg *message.Message, timestamp int64, patternID uint64, dynamicValues []*statefulpb.DynamicValue, tagSet *statefulpb.TagSet, service *statefulpb.DynamicValue, statusDictID uint64, messageKey *statefulpb.DynamicValue, jsonContextSchemaID uint64, jsonContextValues []*statefulpb.DynamicValue, compactJSONContext compactJSONContextValues) {
logDatum := buildStructuredLog(timestamp, patternID, dynamicValues, tagSet, msg.MessageMetadata.DualSendUUID, service, statusDictID, messageKey, jsonContextSchemaID, jsonContextValues, compactJSONContext)

tlmPipelinePatternLogsProcessed.Inc(mt.pipelineName)
tlmPipelinePatternLogsProcessedBytes.Add(float64(proto.Size(logDatum)), mt.pipelineName)
Expand Down Expand Up @@ -1123,6 +1141,103 @@ func (mt *MessageTranslator) fillDynamicValue(
}
}

func (mt *MessageTranslator) compactJSONContextValues(values []interface{}) (compactJSONContextValues, []dictEntryDefinition) {
compact := compactJSONContextValues{kinds: make([]byte, 0, len(values))}
dictDefs := make([]dictEntryDefinition, 0)
for _, value := range values {
dictID, dictValue, isNew := mt.appendCompactJSONContextValue(&compact, value)
if isNew {
dictDefs = append(dictDefs, dictEntryDefinition{id: dictID, value: dictValue})
}
}
return compact, dictDefs
}

func (mt *MessageTranslator) appendCompactJSONContextValue(compact *compactJSONContextValues, value interface{}) (dictID uint64, dictValue string, isNew bool) {
switch typed := value.(type) {
case nil:
compact.kinds = append(compact.kinds, jsonValueKindNull)
return 0, "", false
case string:
return mt.appendCompactJSONString(compact, typed)
case json.Number:
return appendCompactJSONNumber(compact, typed.String())
case float64:
if !math.IsInf(typed, 0) && !math.IsNaN(typed) && math.Trunc(typed) == typed && typed >= math.MinInt64 && typed <= math.MaxInt64 {
compact.kinds = append(compact.kinds, jsonValueKindInt)
compact.ints = append(compact.ints, int64(typed))
return 0, "", false
}
compact.kinds = append(compact.kinds, jsonValueKindFloat)
compact.floats = append(compact.floats, typed)
return 0, "", false
case bool:
if typed {
compact.kinds = append(compact.kinds, jsonValueKindBoolTrue)
} else {
compact.kinds = append(compact.kinds, jsonValueKindBoolFalse)
}
return 0, "", false
default:
rawJSON, err := json.Marshal(typed)
if err != nil {
log.Warnf("Failed to marshal nested JSON context value: %v", err)
compact.kinds = append(compact.kinds, jsonValueKindString)
compact.stringValues = append(compact.stringValues, "")
return 0, "", false
}
compact.kinds = append(compact.kinds, jsonValueKindRaw)
compact.rawValues = append(compact.rawValues, rawJSON)
return 0, "", false
}
}

func (mt *MessageTranslator) appendCompactJSONString(compact *compactJSONContextValues, value string) (dictID uint64, dictValue string, isNew bool) {
if intVal, ok := parseLosslessIntString(value); ok {
compact.kinds = append(compact.kinds, jsonValueKindIntAsString)
compact.ints = append(compact.ints, intVal)
return 0, "", false
}
if floatVal, ok := parseLosslessFloatString(value); ok {
compact.kinds = append(compact.kinds, jsonValueKindFloatAsString)
compact.floats = append(compact.floats, floatVal)
return 0, "", false
}
if boolVal, ok := parseLosslessBoolString(value); ok {
if boolVal {
compact.kinds = append(compact.kinds, jsonValueKindBoolTrueAsString)
} else {
compact.kinds = append(compact.kinds, jsonValueKindBoolFalseAsString)
}
return 0, "", false
}
value = toValidUTF8(value)
if dictID, isNew, shouldEncode := mt.tagManager.ObserveDynamicString(value); shouldEncode {
compact.kinds = append(compact.kinds, jsonValueKindDict)
compact.dicts = append(compact.dicts, dictID)
return dictID, value, isNew
}
compact.kinds = append(compact.kinds, jsonValueKindString)
compact.stringValues = append(compact.stringValues, value)
return 0, "", false
}

func appendCompactJSONNumber(compact *compactJSONContextValues, raw string) (dictID uint64, dictValue string, isNew bool) {
if intVal, ok := parseLosslessIntString(raw); ok {
compact.kinds = append(compact.kinds, jsonValueKindInt)
compact.ints = append(compact.ints, intVal)
return 0, "", false
}
if floatVal, ok := parseLosslessFloatString(raw); ok {
compact.kinds = append(compact.kinds, jsonValueKindFloat)
compact.floats = append(compact.floats, floatVal)
return 0, "", false
}
compact.kinds = append(compact.kinds, jsonValueKindRaw)
compact.rawValues = append(compact.rawValues, []byte(raw))
return 0, "", false
}

func (mt *MessageTranslator) fillWildcardDynamicValue(
dv *statefulpb.DynamicValue,
oneofInt *statefulpb.DynamicValue_IntValue,
Expand Down Expand Up @@ -1167,18 +1282,24 @@ func flatLogDynamicValueDictIndex(value *statefulpb.DynamicValue) uint64 {
}

// buildStructuredLog creates a Datum containing a FlatLog with pattern references.
func buildStructuredLog(timestamp int64, patternID uint64, dynamicValues []*statefulpb.DynamicValue, tagSet *statefulpb.TagSet, uuid string, service *statefulpb.DynamicValue, statusDictID uint64, messageKey *statefulpb.DynamicValue, jsonContextSchemaID uint64, jsonContextValues []*statefulpb.DynamicValue) *statefulpb.Datum {
func buildStructuredLog(timestamp int64, patternID uint64, dynamicValues []*statefulpb.DynamicValue, tagSet *statefulpb.TagSet, uuid string, service *statefulpb.DynamicValue, statusDictID uint64, messageKey *statefulpb.DynamicValue, jsonContextSchemaID uint64, jsonContextValues []*statefulpb.DynamicValue, compactJSONContext compactJSONContextValues) *statefulpb.Datum {
_ = messageKey
log := &statefulpb.FlatLog{
Timestamp: timestamp,
Status: flatLogDictIndex(statusDictID),
Service: flatLogDictIndex(flatLogDynamicValueDictIndex(service)),
Tags: flatLogDictIndex(flatLogTagSetDictIndex(tagSet)),

PatternId: patternID,
DynamicValues: dynamicValues,
JsonSchemaId: flatLogDictIndex(jsonContextSchemaID),
JsonContextValues: jsonContextValues,
PatternId: patternID,
DynamicValues: dynamicValues,
JsonSchemaId: flatLogDictIndex(jsonContextSchemaID),
JsonContextValues: jsonContextValues,
JsonContextValueKinds: compactJSONContext.kinds,
JsonContextIntValues: compactJSONContext.ints,
JsonContextFloatValues: compactJSONContext.floats,
JsonContextDictValues: compactJSONContext.dicts,
JsonContextRawValues: compactJSONContext.rawValues,
JsonContextStringValues: compactJSONContext.stringValues,
}
if uuid != "" {
log.Uuid = &uuid
Expand Down
2 changes: 1 addition & 1 deletion pkg/logs/sender/grpc/mock_state_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func TestBuildStructuredLogUsesFlatLog(t *testing.T) {
Value: &statefulpb.DynamicValue_StringValue{StringValue: "value"},
}}

datum := buildStructuredLog(123, 12, values, tagSet, "uuid", service, 2, nil, 5, nil)
datum := buildStructuredLog(123, 12, values, tagSet, "uuid", service, 2, nil, 5, nil, compactJSONContextValues{})

require.Nil(t, datum.GetLogs())
flatLog := datum.GetFlatLog()
Expand Down
11 changes: 10 additions & 1 deletion pkg/proto/datadog/stateful/stateful_encoding.proto
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,17 @@ message FlatLog {
// Delta encoded json_schema_id: when 0 use last json_schema_id. Otherwise use json_schema_id in this dict entry.
// Use 1 to indicate no json_schema_id.
uint64 json_schema_id = 8;
// Values for the json schema.
// Deprecated fallback values for the json schema.
repeated DynamicValue json_context_values = 9;
// Compact values for the json schema. json_context_value_kinds has one byte per schema key.
// Each kind consumes the next value from the corresponding packed value field.
// Kind values are defined by the JsonValueKind constants in the encoder/decoder.
bytes json_context_value_kinds = 10;
repeated int64 json_context_int_values = 11;
repeated double json_context_float_values = 12;
repeated uint64 json_context_dict_values = 13;
repeated bytes json_context_raw_values = 14;
repeated string json_context_string_values = 15;

// Optional UUID used to track logs when dual-sent via HTTP and gRPC.
optional string uuid = 100;
Expand Down
64 changes: 61 additions & 3 deletions pkg/proto/pbgo/statefulpb/stateful_encoding.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading