Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,7 @@
/comp/anomalydetection/logssource @DataDog/q-branch
/comp/anomalydetection/observer @DataDog/q-branch
/comp/anomalydetection/recorder @DataDog/q-branch
/comp/anomalydetection/reporter @DataDog/q-branch
/comp/autoscaling/datadogclient @DataDog/container-integrations
/comp/connectivitychecker @DataDog/fleet
/comp/dataobs/queryactions @DataDog/data-observability
Expand Down
3 changes: 3 additions & 0 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,9 @@ exports_files(glob(
# gazelle:exclude comp/anomalydetection/recorder/fx-noop
# gazelle:exclude comp/anomalydetection/recorder/impl
# gazelle:exclude comp/anomalydetection/recorder/impl-noop
# gazelle:exclude comp/anomalydetection/reporter/def
# gazelle:exclude comp/anomalydetection/reporter/fx
# gazelle:exclude comp/anomalydetection/reporter/impl
# gazelle:exclude comp/otelcol/collector
# gazelle:exclude comp/otelcol/collector-contrib/fx
# gazelle:exclude comp/otelcol/collector-contrib/impl
Expand Down
2 changes: 2 additions & 0 deletions cmd/agent/subcommands/run/command_observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@ import (
logssourcefx "github.com/DataDog/datadog-agent/comp/anomalydetection/logssource/fx"
observerfx "github.com/DataDog/datadog-agent/comp/anomalydetection/observer/fx"
recorderfx "github.com/DataDog/datadog-agent/comp/anomalydetection/recorder/fx-noop"
reporterfx "github.com/DataDog/datadog-agent/comp/anomalydetection/reporter/fx"
)

func getObserverOptions() fx.Option {
return fx.Options(
observerfx.Module(),
logssourcefx.Module(),
recorderfx.Module(),
reporterfx.Module(),
)
}
7 changes: 7 additions & 0 deletions comp/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -788,6 +788,13 @@ Package observer provides a component for observing data flowing through the age

Package recorder provides a middleware component for recording and replaying observer data.

### [comp/anomalydetection/reporter](https://pkg.go.dev/github.com/DataDog/datadog-agent/comp/anomalydetection/reporter)

*Datadog Team*: q-branch

Package reporter provides a component that formats and dispatches anomaly
detection events to the Datadog backend or stdout.

### [comp/autoscaling/datadogclient](https://pkg.go.dev/github.com/DataDog/datadog-agent/comp/autoscaling/datadogclient)

*Datadog Team*: container-integrations
Expand Down
26 changes: 26 additions & 0 deletions comp/anomalydetection/observer/def/constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

package observer

// TelemetryNamespace is the storage namespace used for observer-internal debug
// metrics (e.g. testbench UI charts). Detectors must not treat it as workload data.
const TelemetryNamespace = "telemetry"

// LogPatternExtractorNamespace is the canonical storage namespace for metrics
// emitted by the log pattern extractor. Used as SeriesDescriptor.Namespace and
// as the component name in the catalog.
const LogPatternExtractorNamespace = "log_pattern_extractor"

// LogMetricsExtractorNamespace is the canonical storage namespace for metrics
// emitted by the log metrics extractor. Used as SeriesDescriptor.Namespace and
// as the component name in the catalog.
const LogMetricsExtractorNamespace = "log_metrics_extractor"

// SplitTagKeyOrder is the canonical ordered list of tag dimensions used to split
// log clusters and to render split-tag summaries (e.g. in anomaly event messages).
// When adding dimensions, update TagGroupByKey and extractTagGroupByKey in
// comp/anomalydetection/observer/impl/log_tagged_pattern_clusterer.go.
var SplitTagKeyOrder = []string{"source", "service", "env", "host"}
23 changes: 0 additions & 23 deletions comp/anomalydetection/observer/def/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,17 +382,6 @@ type AnomalyDebugInfo struct {
CUSUMValues []float64 // S[t] values (may be truncated to last N points)
}

// ReportOutput is the output model passed to reporters after each advance cycle.
// It carries enough data for reporters to act without reaching back into engine internals.
type ReportOutput struct {
// AdvancedToSec is the data time the engine advanced to.
AdvancedToSec int64
// NewAnomalies are anomalies detected in this advance cycle.
NewAnomalies []Anomaly
// ActiveCorrelations are the current correlation patterns across all correlators.
ActiveCorrelations []ActiveCorrelation
}

// Series is a time series with simple timestamp/value points.
// This is the simplified view passed to SeriesDetector.
type Series struct {
Expand Down Expand Up @@ -466,14 +455,6 @@ type Correlator interface {
Reset()
}

// Reporter receives reports and displays or delivers them.
type Reporter interface {
// Name returns the reporter name for debugging.
Name() string
// Report delivers a report to its destination (stdout, file, webserver, etc).
Report(report ReportOutput)
}

// ActiveCorrelation represents a detected correlation pattern.
type ActiveCorrelation struct {
Pattern string // pattern name, e.g. "kernel_bottleneck"
Expand All @@ -492,10 +473,6 @@ type RawAnomalyState interface {
RawAnomalies() []Anomaly
}

// TelemetryNamespace is the storage namespace used for observer-internal debug
// metrics (e.g. testbench UI charts). Detectors must not treat it as workload data.
const TelemetryNamespace = "telemetry"

// SeriesFilter specifies criteria for selecting series.
type SeriesFilter struct {
Namespace string // exact match (empty = any)
Expand Down
95 changes: 0 additions & 95 deletions comp/anomalydetection/observer/impl/consumer_memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@
package observerimpl

import (
"fmt"

observer "github.com/DataDog/datadog-agent/comp/anomalydetection/observer/def"
)

Expand Down Expand Up @@ -44,96 +42,3 @@ func (p *PassthroughCorrelator) Reset() {
func (p *PassthroughCorrelator) GetPending() []observer.Anomaly {
return p.anomalies
}

// StdoutReporter prints reports to stdout.
// It tracks correlation state changes and only prints when correlations appear or disappear.
// All data comes through Report(ReportOutput) — no backdoor access to engine internals.
type StdoutReporter struct {
seenCorrelations map[string]string // pattern -> title for correlations we've reported
seenRawAnomalies map[string]bool // source|detector -> whether we've reported this raw anomaly
// lastCorrelations is cached from the most recent Report call for PrintFinalState.
lastCorrelations []observer.ActiveCorrelation
}

// Name returns the reporter name.
func (r *StdoutReporter) Name() string {
return "stdout_reporter"
}

// Report receives a ReportOutput with anomalies and correlations from the engine
// and prints changes. It prints new anomalies and tracks correlation state changes,
// printing "[observer] NEW: {title}" when a correlation first appears and
// "[observer] CLEARED: {title}" when a correlation disappears.
func (r *StdoutReporter) Report(report observer.ReportOutput) {
// Report new anomalies (with detector identification)
r.reportNewAnomalies(report.NewAnomalies)
// Check for correlation changes
r.reportCorrelationChanges(report.ActiveCorrelations)
// Cache for PrintFinalState
r.lastCorrelations = report.ActiveCorrelations
}

// reportNewAnomalies prints new anomalies from this advance cycle.
func (r *StdoutReporter) reportNewAnomalies(anomalies []observer.Anomaly) {
if r.seenRawAnomalies == nil {
r.seenRawAnomalies = make(map[string]bool)
}

for _, anomaly := range anomalies {
key := anomaly.Source.String() + "|" + anomaly.DetectorName
if !r.seenRawAnomalies[key] {
fmt.Printf("[observer] [%s] ANOMALY: %s\n", anomaly.DetectorName, anomaly.Source.String())
fmt.Printf(" %s\n", anomaly.Description)
r.seenRawAnomalies[key] = true
}
}
}

// reportCorrelationChanges checks for new and cleared correlations.
func (r *StdoutReporter) reportCorrelationChanges(activeCorrelations []observer.ActiveCorrelation) {
if r.seenCorrelations == nil {
r.seenCorrelations = make(map[string]string)
}

// Build set of currently active pattern names
currentlyActive := make(map[string]string) // pattern -> title
for _, ac := range activeCorrelations {
currentlyActive[ac.Pattern] = ac.Title
}

// Check for new correlations (in current but not in seen)
for _, ac := range activeCorrelations {
if _, seen := r.seenCorrelations[ac.Pattern]; !seen {
fmt.Printf("[observer] NEW: %s\n", ac.Title)
for _, anomaly := range ac.Anomalies {
fmt.Printf(" - %s\n", anomaly.Description)
}
r.seenCorrelations[ac.Pattern] = ac.Title
}
}

// Check for cleared correlations (in seen but not in current)
for pattern, title := range r.seenCorrelations {
if _, ok := currentlyActive[pattern]; !ok {
fmt.Printf("[observer] CLEARED: %s\n", title)
delete(r.seenCorrelations, pattern)
}
}
}

// PrintFinalState prints the current state of all correlations.
// Call this at the end of a demo to see final cluster contents.
// Uses the last correlations received via Report.
func (r *StdoutReporter) PrintFinalState() {
if len(r.lastCorrelations) == 0 {
fmt.Println("[observer] Final state: no active correlations")
return
}
fmt.Println("[observer] Correlation Summary:")
for _, ac := range r.lastCorrelations {
fmt.Printf(" Cluster: %d anomalies\n", len(ac.Anomalies))
for _, anomaly := range ac.Anomalies {
fmt.Printf(" - %s\n", anomaly.Description)
}
}
}
5 changes: 3 additions & 2 deletions comp/anomalydetection/observer/impl/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package observerimpl

import (
observerdef "github.com/DataDog/datadog-agent/comp/anomalydetection/observer/def"
reporterdef "github.com/DataDog/datadog-agent/comp/anomalydetection/reporter/def"
)

// engineEventKind identifies the type of engine event.
Expand Down Expand Up @@ -63,14 +64,14 @@ type eventSink interface {
// the event and active correlations from the stateView, then calls Report
// on all registered reporters.
type reporterEventSink struct {
reporters []observerdef.Reporter
reporters []reporterdef.Reporter
state *stateView // for querying current correlations on advance
}

func (s *reporterEventSink) onEngineEvent(evt engineEvent) {
if evt.kind == eventAdvanceCompleted {
ac := evt.advanceCompleted
output := observerdef.ReportOutput{
output := reporterdef.ReportOutput{
AdvancedToSec: ac.advancedToSec,
NewAnomalies: ac.anomalies,
}
Expand Down
5 changes: 3 additions & 2 deletions comp/anomalydetection/observer/impl/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/stretchr/testify/require"

observerdef "github.com/DataDog/datadog-agent/comp/anomalydetection/observer/def"
reporterdef "github.com/DataDog/datadog-agent/comp/anomalydetection/reporter/def"
)

// collectingSink collects all events for test assertions.
Expand Down Expand Up @@ -609,7 +610,7 @@ func TestReporterEventSink(t *testing.T) {
reporter := &countingReporter{count: &reported}

sink := &reporterEventSink{
reporters: []observerdef.Reporter{reporter},
reporters: []reporterdef.Reporter{reporter},
}

// advanceCompleted should trigger Report.
Expand Down Expand Up @@ -637,7 +638,7 @@ type countingReporter struct {
}

func (r *countingReporter) Name() string { return "counting" }
func (r *countingReporter) Report(_ observerdef.ReportOutput) { *r.count++ }
func (r *countingReporter) Report(_ reporterdef.ReportOutput) { *r.count++ }

func TestFindingM1_DedupKeyTooCoarse(t *testing.T) {
anomalies := []observerdef.Anomaly{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,6 @@ func DefaultLogMetricsExtractorConfig() LogMetricsExtractorConfig {
}
}

// LogMetricsExtractorName is the canonical name for the log metrics extractor.
const LogMetricsExtractorName = "log_metrics_extractor"

// LogMetricsExtractor converts logs into timeseries metric outputs:
// - JSON logs: numeric fields -> Avg aggregation
// - Unstructured logs: pattern frequency -> Sum aggregation
Expand All @@ -64,7 +61,7 @@ func NewLogMetricsExtractor(config LogMetricsExtractorConfig) *LogMetricsExtract
return &LogMetricsExtractor{config: config}
}

func (a *LogMetricsExtractor) Name() string { return LogMetricsExtractorName }
func (a *LogMetricsExtractor) Name() string { return observer.LogMetricsExtractorNamespace }

// Reset clears cached per-series context so replay/reanalysis starts from the
// currently observed data instead of reusing stale examples.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,6 @@ import (
"github.com/DataDog/datadog-agent/comp/anomalydetection/observer/impl/patterns"
)

// LogPatternExtractorName is the canonical name for the log pattern extractor.
// It is used as the storage namespace for emitted metrics, as the component
// name in the catalog, and in notify formatting for log-derived anomalies.
const LogPatternExtractorName = "log_pattern_extractor"

// TODO(agent-q): Add a test to ensure this is >= the time we evict metrics
// defaultClusterTimeToLive is the time to live for a cluster.
// If a cluster hasn't been seen since this time, it will be removed.
Expand Down Expand Up @@ -213,7 +208,7 @@ func NewLogPatternExtractor(cfg LogPatternExtractorConfig) *LogPatternExtractor

// Name returns the extractor name.
func (e *LogPatternExtractor) Name() string {
return "log_pattern_extractor"
return observerdef.LogPatternExtractorNamespace
}

// Reset clears clustering and cached per-series context so reanalysis starts
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,8 @@ import (
"github.com/DataDog/datadog-agent/comp/anomalydetection/observer/impl/patterns"
)

// splitTagKeyOrder is the canonical ordered list of tag dimensions used to split
// log clusters. The order governs how split-tag summaries are rendered in event
// messages. Add new fields here AND in TagGroupByKey / extractTagGroupByKey.
var splitTagKeyOrder = []string{"source", "service", "env", "host"}

// TagGroupByKey holds the tags that are responsible for grouping logs into different clusters.
// Canonical key order for display is observer.SplitTagKeyOrder (def/constants.go).
// Absent tags (e.g. a log with no "env" tag) are represented by an empty string.
type TagGroupByKey struct {
// Warning: Don't forget to update functions parsing tags when adding new fields
Expand Down
Loading
Loading