diff --git a/comp/otelcol/logsagentpipeline/go.mod b/comp/otelcol/logsagentpipeline/go.mod index 2656b79c031c..4f766d4394f1 100644 --- a/comp/otelcol/logsagentpipeline/go.mod +++ b/comp/otelcol/logsagentpipeline/go.mod @@ -45,7 +45,6 @@ require ( github.com/DataDog/datadog-agent/pkg/logs/status/statusinterface v0.64.0-rc.3 // indirect github.com/DataDog/datadog-agent/pkg/logs/status/utils v0.64.0-rc.3 // indirect github.com/DataDog/datadog-agent/pkg/logs/types v0.71.0-rc.1 // indirect - github.com/DataDog/datadog-agent/pkg/proto v0.74.1 // indirect github.com/DataDog/datadog-agent/pkg/telemetry v0.64.1 // indirect github.com/DataDog/datadog-agent/pkg/template v0.65.1 // indirect github.com/DataDog/datadog-agent/pkg/trace/log v0.77.0-devel // indirect @@ -78,6 +77,7 @@ require ( github.com/gofrs/flock v0.13.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/google/flatbuffers v25.2.10+incompatible // indirect + github.com/google/uuid v1.6.0 // indirect github.com/hectane/go-acl v0.0.0-20230225031251-cdfc9e3acf94 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/lufia/plan9stats v0.0.0-20251013123823-9fd1530e3ec3 // indirect diff --git a/comp/otelcol/logsagentpipeline/logsagentpipelineimpl/go.mod b/comp/otelcol/logsagentpipeline/logsagentpipelineimpl/go.mod index 61066ac0b599..a0ba2744dc13 100644 --- a/comp/otelcol/logsagentpipeline/logsagentpipelineimpl/go.mod +++ b/comp/otelcol/logsagentpipeline/logsagentpipelineimpl/go.mod @@ -61,7 +61,6 @@ require ( github.com/DataDog/datadog-agent/pkg/logs/sender/grpc v0.0.0-00010101000000-000000000000 // indirect github.com/DataDog/datadog-agent/pkg/logs/status/utils v0.64.0-rc.3 // indirect github.com/DataDog/datadog-agent/pkg/logs/types v0.71.0-rc.1 // indirect - github.com/DataDog/datadog-agent/pkg/proto v0.74.1 // indirect github.com/DataDog/datadog-agent/pkg/telemetry v0.64.1 // indirect github.com/DataDog/datadog-agent/pkg/template v0.65.1 // indirect github.com/DataDog/datadog-agent/pkg/trace/log v0.77.0-devel // indirect @@ -94,6 +93,7 @@ require ( github.com/gofrs/flock v0.13.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/google/flatbuffers v25.2.10+incompatible // indirect + github.com/google/uuid v1.6.0 // indirect github.com/hectane/go-acl v0.0.0-20230225031251-cdfc9e3acf94 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/lufia/plan9stats v0.0.0-20251013123823-9fd1530e3ec3 // indirect diff --git a/pkg/compliance/reporter.go b/pkg/compliance/reporter.go index 487aa08eb734..31a3da1cc820 100644 --- a/pkg/compliance/reporter.go +++ b/pkg/compliance/reporter.go @@ -51,6 +51,7 @@ func NewLogReporter(hostname string, sourceName, sourceType string, endpoints *c compression, cfg.GetBool("logs_config.disable_distributed_senders"), false, // serverless + nil, // secretsComp ) pipelineProvider.Start() diff --git a/pkg/logs/client/go.mod b/pkg/logs/client/go.mod index 1a80ab0caf49..d265296304f9 100644 --- a/pkg/logs/client/go.mod +++ b/pkg/logs/client/go.mod @@ -43,7 +43,6 @@ require ( github.com/DataDog/datadog-agent/pkg/fips v0.0.0 // indirect github.com/DataDog/datadog-agent/pkg/logs/status/utils v0.61.0 // indirect github.com/DataDog/datadog-agent/pkg/logs/types v0.71.0-rc.1 // indirect - github.com/DataDog/datadog-agent/pkg/proto v0.0.0-00010101000000-000000000000 // indirect github.com/DataDog/datadog-agent/pkg/template v0.65.1 // indirect github.com/DataDog/datadog-agent/pkg/util/executable v0.61.0 // indirect github.com/DataDog/datadog-agent/pkg/util/filesystem v0.61.0 // indirect @@ -88,6 +87,9 @@ require ( github.com/tklauser/go-sysconf v0.3.16 // indirect github.com/tklauser/numcpus v0.11.0 // indirect github.com/yusufpapurcu/wmi v1.2.4 // indirect + go.opentelemetry.io/otel/metric v1.42.0 // indirect + go.opentelemetry.io/otel/sdk v1.41.0 // indirect + go.opentelemetry.io/otel/trace v1.42.0 // indirect go.uber.org/atomic v1.11.0 // indirect go.uber.org/dig v1.19.0 // indirect go.uber.org/fx v1.24.0 // indirect diff --git a/pkg/logs/diagnostic/go.mod b/pkg/logs/diagnostic/go.mod index 985403f0137b..63b14ca45976 100644 --- a/pkg/logs/diagnostic/go.mod +++ b/pkg/logs/diagnostic/go.mod @@ -30,7 +30,6 @@ require ( github.com/DataDog/datadog-agent/pkg/fips v0.0.0 // indirect github.com/DataDog/datadog-agent/pkg/logs/status/utils v0.61.0 // indirect github.com/DataDog/datadog-agent/pkg/logs/types v0.71.0-rc.1 // indirect - github.com/DataDog/datadog-agent/pkg/proto v0.0.0-00010101000000-000000000000 // indirect github.com/DataDog/datadog-agent/pkg/template v0.65.1 // indirect github.com/DataDog/datadog-agent/pkg/util/executable v0.61.0 // indirect github.com/DataDog/datadog-agent/pkg/util/filesystem v0.61.0 // indirect @@ -70,6 +69,9 @@ require ( github.com/tklauser/go-sysconf v0.3.16 // indirect github.com/tklauser/numcpus v0.11.0 // indirect github.com/yusufpapurcu/wmi v1.2.4 // indirect + go.opentelemetry.io/otel/metric v1.42.0 // indirect + go.opentelemetry.io/otel/sdk v1.41.0 // indirect + go.opentelemetry.io/otel/trace v1.42.0 // indirect go.uber.org/atomic v1.11.0 // indirect go.uber.org/dig v1.19.0 // indirect go.uber.org/fx v1.24.0 // indirect diff --git a/pkg/logs/message/go.mod b/pkg/logs/message/go.mod index fa34674fbdfb..bb520a6dab50 100644 --- a/pkg/logs/message/go.mod +++ b/pkg/logs/message/go.mod @@ -7,7 +7,6 @@ require ( github.com/DataDog/datadog-agent/comp/logs/agent/config v0.61.0 github.com/DataDog/datadog-agent/pkg/logs/sources v0.61.0 github.com/DataDog/datadog-agent/pkg/logs/types v0.71.0-rc.1 - github.com/DataDog/datadog-agent/pkg/proto v0.0.0-00010101000000-000000000000 github.com/DataDog/datadog-agent/pkg/util/log v0.73.0-rc.5 github.com/stretchr/testify v1.11.1 ) @@ -63,9 +62,8 @@ require ( github.com/tklauser/go-sysconf v0.3.16 // indirect github.com/tklauser/numcpus v0.11.0 // indirect github.com/yusufpapurcu/wmi v1.2.4 // indirect - go.opentelemetry.io/otel/metric v1.42.0 // indirect - go.opentelemetry.io/otel/sdk v1.41.0 // indirect - go.opentelemetry.io/otel/trace v1.42.0 // indirect + go.opentelemetry.io/otel v1.42.0 // indirect + go.opentelemetry.io/otel/sdk/metric v1.41.0 // indirect go.uber.org/atomic v1.11.0 // indirect go.yaml.in/yaml/v2 v2.4.3 // indirect go.yaml.in/yaml/v3 v3.0.4 // indirect @@ -74,6 +72,7 @@ require ( golang.org/x/sys v0.42.0 // indirect golang.org/x/text v0.35.0 // indirect golang.org/x/time v0.14.0 // indirect + gonum.org/v1/gonum v0.17.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20260226221140-a57be14db171 // indirect google.golang.org/grpc v1.79.3 // indirect google.golang.org/protobuf v1.36.11 // indirect diff --git a/pkg/logs/patterns/processor/json.go b/pkg/logs/patterns/processor/json.go index d96488b23966..f1fee8f20f0e 100644 --- a/pkg/logs/patterns/processor/json.go +++ b/pkg/logs/patterns/processor/json.go @@ -10,6 +10,8 @@ package processor import ( "bytes" "encoding/json" + "fmt" + "sort" "strings" ) @@ -19,8 +21,15 @@ type ExtractionResult struct { IsJSON bool // Message is the extracted message field (empty if not found or not JSON) Message string - // JSONContext is the ordered, serialized remaining JSON fields (nil if not JSON or extraction failed) - JSONContext []byte + // MessageKey is the JSON key the message was extracted from (e.g. "msg", "message") + MessageKey string + // JSONContextSchema is a comma-separated sorted list of top-level keys for the JSON context. + // When populated, JSONContextValues contains the corresponding values in the same order. + // Example: for {"level":"info","pid":1234,"service":"api"}, schema is "level,pid,service". + JSONContextSchema string + // JSONContextValues contains the leaf values corresponding to JSONContextSchema keys, in order. + // Nested objects/arrays are serialized as JSON strings. + JSONContextValues []string } // Common top-level message field names (Layer 0) @@ -64,26 +73,68 @@ func PreprocessJSON(content []byte) ExtractionResult { // Remove the extracted message field from data for jsoncontext construction removeFieldByPath(data, extractedPath) - // If no fields remain after removing the message, keep json_context nil (avoid sending "{}"). + // If no fields remain after removing the message, no context to send. if len(data) == 0 { return ExtractionResult{ - IsJSON: true, - Message: message, - JSONContext: nil, + IsJSON: true, + Message: message, + MessageKey: extractedPath, } } - // Serialize remaining fields as JSON context. - // encoding/json marshals maps with deterministic key ordering for better compression. - jsonContext, err := json.Marshal(data) - if err != nil { - return fail - } + // Schema-based encoding: extract sorted keys and leaf values. + // Nested objects/arrays are serialized as JSON strings in the values list. + schema, values := extractSchemaAndValues(data) return ExtractionResult{ - IsJSON: true, - Message: message, - JSONContext: jsonContext, + IsJSON: true, + Message: message, + MessageKey: extractedPath, + JSONContextSchema: schema, + JSONContextValues: values, + } +} + +// extractSchemaAndValues extracts sorted keys and their corresponding values from a JSON map. +// Primitive values (string, number, bool, null) are converted to strings. +// Nested objects and arrays are serialized as JSON strings. +func extractSchemaAndValues(data map[string]interface{}) (string, []string) { + keys := make([]string, 0, len(data)) + for k := range data { + keys = append(keys, k) + } + sort.Strings(keys) + + values := make([]string, len(keys)) + for i, k := range keys { + values[i] = valueToString(data[k]) + } + + return strings.Join(keys, ","), values +} + +// valueToString converts a JSON value to its string representation. +// Primitives are converted directly; objects and arrays are serialized as JSON. +func valueToString(v interface{}) string { + switch val := v.(type) { + case string: + return val + case float64: + return fmt.Sprintf("%g", val) + case bool: + if val { + return "true" + } + return "false" + case nil: + return "" + default: + // Nested object or array — serialize as JSON + b, err := json.Marshal(val) + if err != nil { + return "" + } + return string(b) } } diff --git a/pkg/logs/patterns/processor/json_test.go b/pkg/logs/patterns/processor/json_test.go index 1c17e2b46da7..6d35cb84dc8f 100644 --- a/pkg/logs/patterns/processor/json_test.go +++ b/pkg/logs/patterns/processor/json_test.go @@ -6,7 +6,6 @@ package processor import ( - "encoding/json" "testing" "github.com/stretchr/testify/assert" @@ -41,41 +40,52 @@ func TestPreprocessJSON_NotJSON(t *testing.T) { result := PreprocessJSON(tt.content) assert.False(t, result.IsJSON) assert.Empty(t, result.Message) - assert.Nil(t, result.JSONContext) + assert.Empty(t, result.JSONContextSchema) + assert.Nil(t, result.JSONContextValues) }) } } func TestPreprocessJSON_TopLevelMessage(t *testing.T) { tests := []struct { - name string - input string - expectedMsg string - expectedCtx map[string]interface{} + name string + input string + expectedMsg string + expectedKey string + expectedSchema string + expectedValues []string }{ { - name: "message field", - input: `{"message":"Processing order","level":"info","service":"api"}`, - expectedMsg: "Processing order", - expectedCtx: map[string]interface{}{"level": "info", "service": "api"}, + name: "message field", + input: `{"message":"Processing order","level":"info","service":"api"}`, + expectedMsg: "Processing order", + expectedKey: "message", + expectedSchema: "level,service", + expectedValues: []string{"info", "api"}, }, { - name: "msg field", - input: `{"msg":"User login","timestamp":1234567890}`, - expectedMsg: "User login", - expectedCtx: map[string]interface{}{"timestamp": float64(1234567890)}, + name: "msg field", + input: `{"msg":"User login","timestamp":1234567890}`, + expectedMsg: "User login", + expectedKey: "msg", + expectedSchema: "timestamp", + expectedValues: []string{"1.23456789e+09"}, }, { - name: "log field", - input: `{"log":"Container started","container_id":"abc123"}`, - expectedMsg: "Container started", - expectedCtx: map[string]interface{}{"container_id": "abc123"}, + name: "log field", + input: `{"log":"Container started","container_id":"abc123"}`, + expectedMsg: "Container started", + expectedKey: "log", + expectedSchema: "container_id", + expectedValues: []string{"abc123"}, }, { - name: "text field", - input: `{"text":"System event","severity":"warning"}`, - expectedMsg: "System event", - expectedCtx: map[string]interface{}{"severity": "warning"}, + name: "text field", + input: `{"text":"System event","severity":"warning"}`, + expectedMsg: "System event", + expectedKey: "text", + expectedSchema: "severity", + expectedValues: []string{"warning"}, }, } @@ -84,37 +94,37 @@ func TestPreprocessJSON_TopLevelMessage(t *testing.T) { result := PreprocessJSON([]byte(tt.input)) assert.True(t, result.IsJSON) assert.Equal(t, tt.expectedMsg, result.Message) - assert.NotNil(t, result.JSONContext) - - // Verify json_context contains expected fields - var ctx map[string]interface{} - err := json.Unmarshal(result.JSONContext, &ctx) - require.NoError(t, err) - assert.Equal(t, tt.expectedCtx, ctx) + assert.Equal(t, tt.expectedKey, result.MessageKey) + assert.Equal(t, tt.expectedSchema, result.JSONContextSchema) + assert.Equal(t, tt.expectedValues, result.JSONContextValues) }) } } func TestPreprocessJSON_TopLevelKeys(t *testing.T) { tests := []struct { - name string - input string - expectedMsg string + name string + input string + expectedMsg string + expectedSchema string }{ { - name: "Kubernetes/Docker log field", - input: `{"log":"Pod started\n","stream":"stdout","time":"2024-01-01"}`, - expectedMsg: "Pod started\n", + name: "Kubernetes/Docker log field", + input: `{"log":"Pod started\n","stream":"stdout","time":"2024-01-01"}`, + expectedMsg: "Pod started\n", + expectedSchema: "stream,time", }, { - name: "Standard message field", - input: `{"message":"Container log","level":"info"}`, - expectedMsg: "Container log", + name: "Standard message field", + input: `{"message":"Container log","level":"info"}`, + expectedMsg: "Container log", + expectedSchema: "level", }, { - name: "msg field (common in Go logs)", - input: `{"msg":"Service started","service":"api"}`, - expectedMsg: "Service started", + name: "msg field (common in Go logs)", + input: `{"msg":"Service started","service":"api"}`, + expectedMsg: "Service started", + expectedSchema: "service", }, } @@ -123,31 +133,36 @@ func TestPreprocessJSON_TopLevelKeys(t *testing.T) { result := PreprocessJSON([]byte(tt.input)) assert.True(t, result.IsJSON) assert.Equal(t, tt.expectedMsg, result.Message) - assert.NotNil(t, result.JSONContext) + assert.Equal(t, tt.expectedSchema, result.JSONContextSchema) + assert.NotEmpty(t, result.JSONContextValues) }) } } func TestPreprocessJSON_GenericNestedPaths(t *testing.T) { tests := []struct { - name string - input string - expectedMsg string + name string + input string + expectedMsg string + expectedSchema string }{ { - name: "data.message", - input: `{"data":{"message":"Nested message","id":123}}`, - expectedMsg: "Nested message", + name: "data.message", + input: `{"data":{"message":"Nested message","id":123}}`, + expectedMsg: "Nested message", + expectedSchema: "data", }, { - name: "event.message", - input: `{"event":{"message":"Event occurred","type":"alert"}}`, - expectedMsg: "Event occurred", + name: "event.message", + input: `{"event":{"message":"Event occurred","type":"alert"}}`, + expectedMsg: "Event occurred", + expectedSchema: "event", }, { - name: "payload.message", - input: `{"payload":{"message":"Payload data","size":1024}}`, - expectedMsg: "Payload data", + name: "payload.message", + input: `{"payload":{"message":"Payload data","size":1024}}`, + expectedMsg: "Payload data", + expectedSchema: "payload", }, } @@ -156,7 +171,8 @@ func TestPreprocessJSON_GenericNestedPaths(t *testing.T) { result := PreprocessJSON([]byte(tt.input)) assert.True(t, result.IsJSON) assert.Equal(t, tt.expectedMsg, result.Message) - assert.NotNil(t, result.JSONContext) + assert.Equal(t, tt.expectedSchema, result.JSONContextSchema) + assert.NotEmpty(t, result.JSONContextValues) }) } } @@ -183,33 +199,25 @@ func TestPreprocessJSON_NoMessageFound(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { result := PreprocessJSON([]byte(tt.input)) - // Fail-fast: if no message found, treat as plain text assert.False(t, result.IsJSON) assert.Empty(t, result.Message) - assert.Nil(t, result.JSONContext) + assert.Empty(t, result.JSONContextSchema) + assert.Nil(t, result.JSONContextValues) }) } } func TestPreprocessJSON_OrderedJSONContext(t *testing.T) { - // Test that json_context has deterministic ordering input := `{"message":"test","zebra":"z","apple":"a","banana":"b"}` result := PreprocessJSON([]byte(input)) require.True(t, result.IsJSON) - require.NotNil(t, result.JSONContext) - - // Parse and verify keys are in sorted order - var ctx map[string]interface{} - err := json.Unmarshal(result.JSONContext, &ctx) - require.NoError(t, err) - // Re-marshal to check ordering - expected := `{"apple":"a","banana":"b","zebra":"z"}` - assert.JSONEq(t, expected, string(result.JSONContext)) + assert.Equal(t, "apple,banana,zebra", result.JSONContextSchema) + assert.Equal(t, []string{"a", "b", "z"}, result.JSONContextValues) } -func TestPreprocessJSON_NestedObjectsOrdered(t *testing.T) { +func TestPreprocessJSON_NestedObjectsAsValues(t *testing.T) { input := `{ "message":"test", "nested": { @@ -221,30 +229,25 @@ func TestPreprocessJSON_NestedObjectsOrdered(t *testing.T) { result := PreprocessJSON([]byte(input)) require.True(t, result.IsJSON) - require.NotNil(t, result.JSONContext) - - var ctx map[string]interface{} - err := json.Unmarshal(result.JSONContext, &ctx) - require.NoError(t, err) - // Verify nested map is also ordered - nested := ctx["nested"].(map[string]interface{}) - assert.Equal(t, "a", nested["apple"]) - assert.Equal(t, "z", nested["zebra"]) + assert.Equal(t, "array,nested", result.JSONContextSchema) + require.Len(t, result.JSONContextValues, 2) + assert.Equal(t, "[1,2,3]", result.JSONContextValues[0]) + assert.Equal(t, `{"apple":"a","zebra":"z"}`, result.JSONContextValues[1]) } func TestPreprocessJSON_EmptyContextAfterExtraction(t *testing.T) { - // If only message field exists, json_context should be nil input := `{"message":"only message here"}` result := PreprocessJSON([]byte(input)) assert.True(t, result.IsJSON) assert.Equal(t, "only message here", result.Message) - assert.Nil(t, result.JSONContext) // No remaining fields + assert.Equal(t, "message", result.MessageKey) + assert.Empty(t, result.JSONContextSchema) + assert.Nil(t, result.JSONContextValues) } func TestPreprocessJSON_ComplexRealWorld(t *testing.T) { - // Real-world example from the CSV analysis input := `{ "level":"info", "msg":"Processing payment", @@ -258,18 +261,16 @@ func TestPreprocessJSON_ComplexRealWorld(t *testing.T) { result := PreprocessJSON([]byte(input)) require.True(t, result.IsJSON) assert.Equal(t, "Processing payment", result.Message) - require.NotNil(t, result.JSONContext) - - var ctx map[string]interface{} - err := json.Unmarshal(result.JSONContext, &ctx) - require.NoError(t, err) - - // Verify all non-message fields are preserved - assert.Equal(t, "info", ctx["level"]) - assert.Equal(t, "payment-api", ctx["service"]) - assert.Equal(t, float64(99.99), ctx["amount"]) - assert.Equal(t, "USD", ctx["currency"]) - assert.NotContains(t, ctx, "msg") // Message field removed + assert.Equal(t, "msg", result.MessageKey) + + assert.Equal(t, "amount,currency,level,service,timestamp,user_id", result.JSONContextSchema) + require.Len(t, result.JSONContextValues, 6) + assert.Equal(t, "99.99", result.JSONContextValues[0]) // amount + assert.Equal(t, "USD", result.JSONContextValues[1]) // currency + assert.Equal(t, "info", result.JSONContextValues[2]) // level + assert.Equal(t, "payment-api", result.JSONContextValues[3]) // service + assert.Equal(t, "2024-01-01T12:00:00Z", result.JSONContextValues[4]) // timestamp + assert.Equal(t, "user123", result.JSONContextValues[5]) // user_id } func TestGetValueByPath(t *testing.T) { @@ -345,29 +346,28 @@ func TestRemoveFieldByPath(t *testing.T) { } } -func TestMarshalJSONDeterministicOrdering(t *testing.T) { +func TestExtractSchemaAndValues(t *testing.T) { data := map[string]interface{}{ "z": "last", "a": "first", "m": "middle", } - result, err := json.Marshal(data) - require.NoError(t, err) - - // Keys should be in sorted order - expected := `{"a":"first","m":"middle","z":"last"}` - assert.JSONEq(t, expected, string(result)) + schema, values := extractSchemaAndValues(data) + assert.Equal(t, "a,m,z", schema) + assert.Equal(t, []string{"first", "middle", "last"}, values) } -func TestMarshalJSON_EmptyContextIsNil(t *testing.T) { - data := map[string]interface{}{} - // We intentionally avoid marshalling empty maps for json_context to save bytes. - if len(data) == 0 { - var result []byte - assert.Nil(t, result) - return +func TestExtractSchemaAndValues_MixedTypes(t *testing.T) { + data := map[string]interface{}{ + "count": float64(42), + "flag": true, + "name": "test", + "nested": map[string]interface{}{"key": "val"}, + "empty": nil, } - t.Fatal("unexpected non-empty map") + schema, values := extractSchemaAndValues(data) + assert.Equal(t, "count,empty,flag,name,nested", schema) + assert.Equal(t, []string{"42", "", "true", "test", `{"key":"val"}`}, values) } diff --git a/pkg/logs/patterns/tags/BUILD.bazel b/pkg/logs/patterns/tags/BUILD.bazel index a41dafed84a4..4f34a3693436 100644 --- a/pkg/logs/patterns/tags/BUILD.bazel +++ b/pkg/logs/patterns/tags/BUILD.bazel @@ -12,8 +12,8 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/logs/patterns/eviction", - "//pkg/proto/pbgo/statefulpb", "//pkg/util/log", + "@com_github_datadog_agent_payload_v5//statefulpb", "@com_github_datadog_datadog_agent_pkg_config_setup//:setup", ], ) diff --git a/pkg/logs/patterns/tags/go.mod b/pkg/logs/patterns/tags/go.mod index 3194e051798b..82d627c38cc6 100644 --- a/pkg/logs/patterns/tags/go.mod +++ b/pkg/logs/patterns/tags/go.mod @@ -3,9 +3,9 @@ module github.com/DataDog/datadog-agent/pkg/logs/patterns/tags go 1.25.0 require ( + github.com/DataDog/agent-payload/v5 v5.0.191 github.com/DataDog/datadog-agent/pkg/config/setup v0.61.0 github.com/DataDog/datadog-agent/pkg/logs/patterns/eviction v0.0.0-00010101000000-000000000000 - github.com/DataDog/datadog-agent/pkg/proto v0.74.1 github.com/DataDog/datadog-agent/pkg/util/log v0.73.0-rc.5 github.com/stretchr/testify v1.11.1 ) @@ -57,9 +57,8 @@ require ( github.com/tklauser/go-sysconf v0.3.16 // indirect github.com/tklauser/numcpus v0.11.0 // indirect github.com/yusufpapurcu/wmi v1.2.4 // indirect - go.opentelemetry.io/otel/metric v1.42.0 // indirect - go.opentelemetry.io/otel/sdk v1.41.0 // indirect - go.opentelemetry.io/otel/trace v1.42.0 // indirect + go.opentelemetry.io/otel v1.42.0 // indirect + go.opentelemetry.io/otel/sdk/metric v1.41.0 // indirect go.uber.org/atomic v1.11.0 // indirect go.yaml.in/yaml/v2 v2.4.3 // indirect go.yaml.in/yaml/v3 v3.0.4 // indirect @@ -68,6 +67,7 @@ require ( golang.org/x/sys v0.42.0 // indirect golang.org/x/text v0.35.0 // indirect golang.org/x/time v0.14.0 // indirect + gonum.org/v1/gonum v0.17.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20260226221140-a57be14db171 // indirect google.golang.org/grpc v1.79.3 // indirect google.golang.org/protobuf v1.36.11 // indirect diff --git a/pkg/logs/patterns/tags/go.sum b/pkg/logs/patterns/tags/go.sum index 3f64354b8563..8f401c1aa89f 100644 --- a/pkg/logs/patterns/tags/go.sum +++ b/pkg/logs/patterns/tags/go.sum @@ -1,4 +1,6 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/DataDog/agent-payload/v5 v5.0.191 h1:c26Eqt8/BrmYrVvmCppknxcoEQbfdZINOlij18VmkwA= +github.com/DataDog/agent-payload/v5 v5.0.191/go.mod h1:GyA1vuCeRywrNNCbYeR0cOYKE8QkOVNAFDqsH8VNF5M= github.com/DataDog/viper v1.15.1 h1:kcdFE+qPndlWkhU4iEf/WpWQMCyVYHTv5HqvVf+SYJs= github.com/DataDog/viper v1.15.1/go.mod h1:rDLDREOPd+gpEbA8y4Y/5wTvyLqvUiCmDXX0jRZy8mw= github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY= diff --git a/pkg/logs/pipeline/go.mod b/pkg/logs/pipeline/go.mod index a64c53cee6a5..8b6fffe6e038 100644 --- a/pkg/logs/pipeline/go.mod +++ b/pkg/logs/pipeline/go.mod @@ -55,7 +55,6 @@ require ( github.com/DataDog/datadog-agent/pkg/logs/patterns/token v0.0.0 // indirect github.com/DataDog/datadog-agent/pkg/logs/status/utils v0.61.0 // indirect github.com/DataDog/datadog-agent/pkg/logs/types v0.71.0-rc.1 // indirect - github.com/DataDog/datadog-agent/pkg/proto v0.74.1 // indirect github.com/DataDog/datadog-agent/pkg/telemetry v0.64.1 // indirect github.com/DataDog/datadog-agent/pkg/template v0.65.1 // indirect github.com/DataDog/datadog-agent/pkg/trace/log v0.77.0-devel // indirect @@ -88,6 +87,7 @@ require ( github.com/gofrs/flock v0.13.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/google/flatbuffers v25.2.10+incompatible // indirect + github.com/google/uuid v1.6.0 // indirect github.com/hectane/go-acl v0.0.0-20230225031251-cdfc9e3acf94 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/lufia/plan9stats v0.0.0-20251013123823-9fd1530e3ec3 // indirect diff --git a/pkg/logs/processor/go.mod b/pkg/logs/processor/go.mod index 24cc85f0086a..d9e84d1ea910 100644 --- a/pkg/logs/processor/go.mod +++ b/pkg/logs/processor/go.mod @@ -12,6 +12,7 @@ require ( github.com/DataDog/datadog-agent/pkg/logs/metrics v0.61.0 github.com/DataDog/datadog-agent/pkg/logs/sources v0.61.0 github.com/DataDog/datadog-agent/pkg/util/log v0.73.0-rc.5 + github.com/google/uuid v1.6.0 github.com/stretchr/testify v1.11.1 ) @@ -36,7 +37,6 @@ require ( github.com/DataDog/datadog-agent/pkg/fips v0.0.0 // indirect github.com/DataDog/datadog-agent/pkg/logs/status/utils v0.61.0 // indirect github.com/DataDog/datadog-agent/pkg/logs/types v0.71.0-rc.1 // indirect - github.com/DataDog/datadog-agent/pkg/proto v0.0.0-00010101000000-000000000000 // indirect github.com/DataDog/datadog-agent/pkg/telemetry v0.64.1 // indirect github.com/DataDog/datadog-agent/pkg/template v0.65.1 // indirect github.com/DataDog/datadog-agent/pkg/util/executable v0.61.0 // indirect diff --git a/pkg/logs/sender/go.mod b/pkg/logs/sender/go.mod index 15315eb38aba..7c7049235d22 100644 --- a/pkg/logs/sender/go.mod +++ b/pkg/logs/sender/go.mod @@ -46,7 +46,6 @@ require ( github.com/DataDog/datadog-agent/pkg/fips v0.0.0 // indirect github.com/DataDog/datadog-agent/pkg/logs/status/utils v0.61.0 // indirect github.com/DataDog/datadog-agent/pkg/logs/types v0.71.0-rc.1 // indirect - github.com/DataDog/datadog-agent/pkg/proto v0.0.0-00010101000000-000000000000 // indirect github.com/DataDog/datadog-agent/pkg/template v0.65.1 // indirect github.com/DataDog/datadog-agent/pkg/util/backoff v0.61.0 // indirect github.com/DataDog/datadog-agent/pkg/util/defaultpaths v0.64.0-devel // indirect diff --git a/pkg/logs/sender/grpc/go.mod b/pkg/logs/sender/grpc/go.mod index 2f7f7da8660a..783ef1e35e34 100644 --- a/pkg/logs/sender/grpc/go.mod +++ b/pkg/logs/sender/grpc/go.mod @@ -3,6 +3,7 @@ module github.com/DataDog/datadog-agent/pkg/logs/sender/grpc go 1.25.0 require ( + github.com/DataDog/agent-payload/v5 v5.0.191 github.com/DataDog/datadog-agent/comp/logs/agent/config v0.61.0 github.com/DataDog/datadog-agent/comp/serializer/logscompression v0.64.0-devel github.com/DataDog/datadog-agent/pkg/config/model v0.77.2 @@ -18,7 +19,6 @@ require ( github.com/DataDog/datadog-agent/pkg/logs/processor v0.0.0-00010101000000-000000000000 github.com/DataDog/datadog-agent/pkg/logs/sender v0.0.0-00010101000000-000000000000 github.com/DataDog/datadog-agent/pkg/logs/sources v0.61.0 - github.com/DataDog/datadog-agent/pkg/proto v0.74.1 github.com/DataDog/datadog-agent/pkg/telemetry v0.64.1 github.com/DataDog/datadog-agent/pkg/util/backoff v0.61.0 github.com/DataDog/datadog-agent/pkg/util/compression v0.56.0-rc.3 @@ -32,7 +32,6 @@ require ( ) require ( - github.com/DataDog/agent-payload/v5 v5.0.191 // indirect github.com/DataDog/datadog-agent/comp/core/config v0.64.0-devel // indirect github.com/DataDog/datadog-agent/comp/core/delegatedauth v0.0.0-00010101000000-000000000000 // indirect github.com/DataDog/datadog-agent/comp/core/flare/builder v0.61.0 // indirect @@ -86,6 +85,7 @@ require ( github.com/gofrs/flock v0.13.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/google/flatbuffers v25.2.10+incompatible // indirect + github.com/google/uuid v1.6.0 // indirect github.com/hectane/go-acl v0.0.0-20230225031251-cdfc9e3acf94 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/lufia/plan9stats v0.0.0-20251013123823-9fd1530e3ec3 // indirect diff --git a/pkg/logs/sender/grpc/mock_state.go b/pkg/logs/sender/grpc/mock_state.go index 74ee420ac6fb..bc8efb173d71 100644 --- a/pkg/logs/sender/grpc/mock_state.go +++ b/pkg/logs/sender/grpc/mock_state.go @@ -25,13 +25,15 @@ import ( const nanoToMillis = 1000000 // batchEntry is a per-message sidecar used during batch tokenization. -// It keeps msg, preprocessed content, and jsonContext aligned so that +// It keeps msg, preprocessed content, and JSON context fields aligned so that // tokenization results can be correctly associated with each message // even when some messages are skipped (empty content). type batchEntry struct { - msg *message.Message - content string // preprocessed content (JSON extracted message, or raw string) - jsonContext []byte // from PreprocessJSON; nil if not JSON + msg *message.Message + content string // preprocessed content (JSON extracted message, or raw string) + messageKey string // JSON key the message was extracted from (e.g. "msg", "message") + jsonContextSchema string // comma-separated sorted keys (e.g. "level,service,timestamp") + jsonContextValues []string // leaf values in schema key order } func getTranslatorContent(msg *message.Message) []byte { @@ -135,7 +137,9 @@ func (mt *MessageTranslator) Start(inputChan chan *message.Message, bufferSize i entry := batchEntry{msg: msg} if results := processor.PreprocessJSON(content); results.Message != "" { entry.content = results.Message - entry.jsonContext = results.JSONContext + entry.messageKey = results.MessageKey + entry.jsonContextSchema = results.JSONContextSchema + entry.jsonContextValues = results.JSONContextValues } else { entry.content = string(content) } @@ -206,7 +210,7 @@ func (mt *MessageTranslator) processBatch(batch []batchEntry, outputChan chan *m log.Warnf("Failed to tokenize log message: %v", tokenResults[i].Err) continue } - mt.processPreTokenized(entry.msg, tokenResults[i].TokenList, entry.jsonContext, outputChan) + mt.processPreTokenized(entry.msg, tokenResults[i].TokenList, entry.messageKey, entry.jsonContextSchema, entry.jsonContextValues, outputChan) } } @@ -217,23 +221,26 @@ func (mt *MessageTranslator) processMessage(msg *message.Message, outputChan cha if len(content) == 0 { return } - var jsonContext []byte contentStr := string(content) + var messageKey, jsonContextSchema string + var jsonContextValues []string if results := processor.PreprocessJSON(content); results.Message != "" { contentStr = results.Message - jsonContext = results.JSONContext + messageKey = results.MessageKey + jsonContextSchema = results.JSONContextSchema + jsonContextValues = results.JSONContextValues } tokenList, err := mt.tokenizer.Tokenize(contentStr) if err != nil { log.Warnf("Failed to tokenize log message: %v", err) return } - mt.processPreTokenized(msg, tokenList, jsonContext, outputChan) + mt.processPreTokenized(msg, tokenList, messageKey, jsonContextSchema, jsonContextValues, outputChan) } // processPreTokenized handles post-tokenization: clustering, eviction, datum construction, and sending. // Called by both processBatch (batch pipeline) and processMessage (single-message path). -func (mt *MessageTranslator) processPreTokenized(msg *message.Message, tokenList *token.TokenList, jsonContext []byte, outputChan chan *message.StatefulMessage) { +func (mt *MessageTranslator) processPreTokenized(msg *message.Message, tokenList *token.TokenList, messageKey string, jsonContextSchema string, jsonContextValues []string, outputChan chan *message.StatefulMessage) { var patternDefineSent bool var patternDefineParamCount uint32 @@ -293,6 +300,43 @@ func (mt *MessageTranslator) processPreTokenized(msg *message.Message, tokenList mt.fillDynamicValue(&dvBacking[i], &typeBacking[i].intOneof, &typeBacking[i].dictOneof, &typeBacking[i].stringOneof, val) } + // Encode message key as dict entry + var messageKeyDV *statefulpb.DynamicValue + if messageKey != "" { + encoded, mkDictID, mkIsNew := mt.encodeDynamicValue(messageKey) + messageKeyDV = encoded + if mkIsNew { + mt.sendDictEntryDefine(outputChan, msg, mkDictID, messageKey) + } + } + + // Encode JSON context schema as dict entry and values as DynamicValues + var jsonContextSchemaID uint64 + var jsonContextValuesDV []*statefulpb.DynamicValue + if jsonContextSchema != "" { + var schemaIsNew bool + jsonContextSchemaID, schemaIsNew = mt.tagManager.AddString(jsonContextSchema) + if schemaIsNew { + mt.sendDictEntryDefine(outputChan, msg, jsonContextSchemaID, jsonContextSchema) + } + + jsonContextDVBacking := make([]statefulpb.DynamicValue, len(jsonContextValues)) + jsonContextTypeBacking := make([]dvTypeBackings, len(jsonContextValues)) + jsonContextValuesDV = make([]*statefulpb.DynamicValue, len(jsonContextValues)) + for i := range jsonContextDVBacking { + jsonContextValuesDV[i] = &jsonContextDVBacking[i] + } + for i, val := range jsonContextValues { + mt.fillDynamicValue( + &jsonContextDVBacking[i], + &jsonContextTypeBacking[i].intOneof, + &jsonContextTypeBacking[i].dictOneof, + &jsonContextTypeBacking[i].stringOneof, + val, + ) + } + } + // Build complete tag list and encode as TagSet tagSet, allTagsString, dictID, isNew := mt.buildTagSet(msg) if isNew { @@ -301,7 +345,7 @@ func (mt *MessageTranslator) processPreTokenized(msg *message.Message, tokenList // Send StructuredLog with all fields tsMillis := ts.UnixNano() / nanoToMillis - mt.sendStructuredLog(outputChan, msg, tsMillis, patternID, dynamicValues, tagSet, jsonContext) + mt.sendStructuredLog(outputChan, msg, tsMillis, patternID, dynamicValues, tagSet, messageKeyDV, jsonContextSchemaID, jsonContextValuesDV) } // buildTagSet constructs the complete tag list for a message and encodes it as a TagSet. @@ -460,8 +504,8 @@ func (mt *MessageTranslator) sendRawLog(outputChan chan *message.StatefulMessage } // sendStructuredLog creates and sends a StructuredLog datum -func (mt *MessageTranslator) sendStructuredLog(outputChan chan *message.StatefulMessage, msg *message.Message, timestamp int64, patternID uint64, dynamicValues []*statefulpb.DynamicValue, tagSet *statefulpb.TagSet, jsonContext []byte) { - logDatum := buildStructuredLog(timestamp, patternID, dynamicValues, tagSet, msg.MessageMetadata.DualSendUUID, jsonContext) +func (mt *MessageTranslator) sendStructuredLog(outputChan chan *message.StatefulMessage, msg *message.Message, timestamp int64, patternID uint64, dynamicValues []*statefulpb.DynamicValue, tagSet *statefulpb.TagSet, messageKey *statefulpb.DynamicValue, jsonContextSchemaID uint64, jsonContextValues []*statefulpb.DynamicValue) { + logDatum := buildStructuredLog(timestamp, patternID, dynamicValues, tagSet, msg.MessageMetadata.DualSendUUID, messageKey, jsonContextSchemaID, jsonContextValues) tlmPipelinePatternLogsProcessed.Inc(mt.pipelineName) tlmPipelinePatternLogsProcessedBytes.Add(float64(proto.Size(logDatum)), mt.pipelineName) @@ -569,14 +613,16 @@ func (mt *MessageTranslator) fillDynamicValue( } // buildStructuredLog creates a Datum containing a StructuredLog -func buildStructuredLog(timestamp int64, patternID uint64, dynamicValues []*statefulpb.DynamicValue, tagSet *statefulpb.TagSet, uuid string, jsonContext []byte) *statefulpb.Datum { +func buildStructuredLog(timestamp int64, patternID uint64, dynamicValues []*statefulpb.DynamicValue, tagSet *statefulpb.TagSet, uuid string, messageKey *statefulpb.DynamicValue, jsonContextSchemaID uint64, jsonContextValues []*statefulpb.DynamicValue) *statefulpb.Datum { log := &statefulpb.Log{ Timestamp: timestamp, Content: &statefulpb.Log_Structured{ Structured: &statefulpb.StructuredLog{ - PatternId: patternID, - DynamicValues: dynamicValues, - JsonContext: jsonContext, + PatternId: patternID, + DynamicValues: dynamicValues, + JsonMessageKey: messageKey, + JsonContextSchemaId: jsonContextSchemaID, + JsonContextValues: jsonContextValues, }, }, Tags: tagSet, diff --git a/pkg/security/reporter/reporter.go b/pkg/security/reporter/reporter.go index f7cc36f23bdb..b62505d5ac2d 100644 --- a/pkg/security/reporter/reporter.go +++ b/pkg/security/reporter/reporter.go @@ -60,6 +60,7 @@ func newReporter(hostname string, stopper startstop.Stopper, sourceName, sourceT compression, cfg.GetBool("logs_config.disable_distributed_senders"), false, // serverless + nil, // secretsComp ) pipelineProvider.Start() stopper.Add(pipelineProvider)