diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 369a2dab79b5..03663bb64b3c 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -391,6 +391,7 @@ /comp/anomalydetection/logssource @DataDog/q-branch /comp/anomalydetection/observer @DataDog/q-branch /comp/anomalydetection/recorder @DataDog/q-branch +/comp/anomalydetection/reporter @DataDog/q-branch /comp/autoscaling/datadogclient @DataDog/container-integrations /comp/connectivitychecker @DataDog/fleet /comp/dataobs/queryactions @DataDog/data-observability diff --git a/BUILD.bazel b/BUILD.bazel index d5480622e19f..462c85890253 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -200,6 +200,9 @@ exports_files(glob( # gazelle:exclude comp/anomalydetection/recorder/fx-noop # gazelle:exclude comp/anomalydetection/recorder/impl # gazelle:exclude comp/anomalydetection/recorder/impl-noop +# gazelle:exclude comp/anomalydetection/reporter/def +# gazelle:exclude comp/anomalydetection/reporter/fx +# gazelle:exclude comp/anomalydetection/reporter/impl # gazelle:exclude comp/otelcol/collector # gazelle:exclude comp/otelcol/collector-contrib/fx # gazelle:exclude comp/otelcol/collector-contrib/impl diff --git a/cmd/agent/subcommands/run/command_observer.go b/cmd/agent/subcommands/run/command_observer.go index d2cbd58006a3..c9c821f18ce7 100644 --- a/cmd/agent/subcommands/run/command_observer.go +++ b/cmd/agent/subcommands/run/command_observer.go @@ -18,6 +18,7 @@ import ( logssourcefx "github.com/DataDog/datadog-agent/comp/anomalydetection/logssource/fx" observerfx "github.com/DataDog/datadog-agent/comp/anomalydetection/observer/fx" recorderfx "github.com/DataDog/datadog-agent/comp/anomalydetection/recorder/fx-noop" + reporterfx "github.com/DataDog/datadog-agent/comp/anomalydetection/reporter/fx" ) func getObserverOptions() fx.Option { @@ -25,5 +26,6 @@ func getObserverOptions() fx.Option { observerfx.Module(), logssourcefx.Module(), recorderfx.Module(), + reporterfx.Module(), ) } diff --git a/comp/README.md b/comp/README.md index bbd0f9ef7b38..2c410ff02983 100644 --- a/comp/README.md +++ b/comp/README.md @@ -788,6 +788,13 @@ Package observer provides a component for observing data flowing through the age Package recorder provides a middleware component for recording and replaying observer data. +### [comp/anomalydetection/reporter](https://pkg.go.dev/github.com/DataDog/datadog-agent/comp/anomalydetection/reporter) + +*Datadog Team*: q-branch + +Package reporter provides a component that formats and dispatches anomaly +detection events to the Datadog backend or stdout. + ### [comp/autoscaling/datadogclient](https://pkg.go.dev/github.com/DataDog/datadog-agent/comp/autoscaling/datadogclient) *Datadog Team*: container-integrations diff --git a/comp/anomalydetection/observer/def/constants.go b/comp/anomalydetection/observer/def/constants.go new file mode 100644 index 000000000000..abcb49dc328d --- /dev/null +++ b/comp/anomalydetection/observer/def/constants.go @@ -0,0 +1,26 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +package observer + +// TelemetryNamespace is the storage namespace used for observer-internal debug +// metrics (e.g. testbench UI charts). Detectors must not treat it as workload data. +const TelemetryNamespace = "telemetry" + +// LogPatternExtractorNamespace is the canonical storage namespace for metrics +// emitted by the log pattern extractor. Used as SeriesDescriptor.Namespace and +// as the component name in the catalog. +const LogPatternExtractorNamespace = "log_pattern_extractor" + +// LogMetricsExtractorNamespace is the canonical storage namespace for metrics +// emitted by the log metrics extractor. Used as SeriesDescriptor.Namespace and +// as the component name in the catalog. +const LogMetricsExtractorNamespace = "log_metrics_extractor" + +// SplitTagKeyOrder is the canonical ordered list of tag dimensions used to split +// log clusters and to render split-tag summaries (e.g. in anomaly event messages). +// When adding dimensions, update TagGroupByKey and extractTagGroupByKey in +// comp/anomalydetection/observer/impl/log_tagged_pattern_clusterer.go. +var SplitTagKeyOrder = []string{"source", "service", "env", "host"} diff --git a/comp/anomalydetection/observer/def/types.go b/comp/anomalydetection/observer/def/types.go index f2f5d932eb4c..d37ffcd04cbd 100644 --- a/comp/anomalydetection/observer/def/types.go +++ b/comp/anomalydetection/observer/def/types.go @@ -382,17 +382,6 @@ type AnomalyDebugInfo struct { CUSUMValues []float64 // S[t] values (may be truncated to last N points) } -// ReportOutput is the output model passed to reporters after each advance cycle. -// It carries enough data for reporters to act without reaching back into engine internals. -type ReportOutput struct { - // AdvancedToSec is the data time the engine advanced to. - AdvancedToSec int64 - // NewAnomalies are anomalies detected in this advance cycle. - NewAnomalies []Anomaly - // ActiveCorrelations are the current correlation patterns across all correlators. - ActiveCorrelations []ActiveCorrelation -} - // Series is a time series with simple timestamp/value points. // This is the simplified view passed to SeriesDetector. type Series struct { @@ -466,14 +455,6 @@ type Correlator interface { Reset() } -// Reporter receives reports and displays or delivers them. -type Reporter interface { - // Name returns the reporter name for debugging. - Name() string - // Report delivers a report to its destination (stdout, file, webserver, etc). - Report(report ReportOutput) -} - // ActiveCorrelation represents a detected correlation pattern. type ActiveCorrelation struct { Pattern string // pattern name, e.g. "kernel_bottleneck" @@ -492,10 +473,6 @@ type RawAnomalyState interface { RawAnomalies() []Anomaly } -// TelemetryNamespace is the storage namespace used for observer-internal debug -// metrics (e.g. testbench UI charts). Detectors must not treat it as workload data. -const TelemetryNamespace = "telemetry" - // SeriesFilter specifies criteria for selecting series. type SeriesFilter struct { Namespace string // exact match (empty = any) diff --git a/comp/anomalydetection/observer/impl/consumer_memory.go b/comp/anomalydetection/observer/impl/consumer_memory.go index 623609662c9e..2c56855090dc 100644 --- a/comp/anomalydetection/observer/impl/consumer_memory.go +++ b/comp/anomalydetection/observer/impl/consumer_memory.go @@ -6,8 +6,6 @@ package observerimpl import ( - "fmt" - observer "github.com/DataDog/datadog-agent/comp/anomalydetection/observer/def" ) @@ -44,96 +42,3 @@ func (p *PassthroughCorrelator) Reset() { func (p *PassthroughCorrelator) GetPending() []observer.Anomaly { return p.anomalies } - -// StdoutReporter prints reports to stdout. -// It tracks correlation state changes and only prints when correlations appear or disappear. -// All data comes through Report(ReportOutput) — no backdoor access to engine internals. -type StdoutReporter struct { - seenCorrelations map[string]string // pattern -> title for correlations we've reported - seenRawAnomalies map[string]bool // source|detector -> whether we've reported this raw anomaly - // lastCorrelations is cached from the most recent Report call for PrintFinalState. - lastCorrelations []observer.ActiveCorrelation -} - -// Name returns the reporter name. -func (r *StdoutReporter) Name() string { - return "stdout_reporter" -} - -// Report receives a ReportOutput with anomalies and correlations from the engine -// and prints changes. It prints new anomalies and tracks correlation state changes, -// printing "[observer] NEW: {title}" when a correlation first appears and -// "[observer] CLEARED: {title}" when a correlation disappears. -func (r *StdoutReporter) Report(report observer.ReportOutput) { - // Report new anomalies (with detector identification) - r.reportNewAnomalies(report.NewAnomalies) - // Check for correlation changes - r.reportCorrelationChanges(report.ActiveCorrelations) - // Cache for PrintFinalState - r.lastCorrelations = report.ActiveCorrelations -} - -// reportNewAnomalies prints new anomalies from this advance cycle. -func (r *StdoutReporter) reportNewAnomalies(anomalies []observer.Anomaly) { - if r.seenRawAnomalies == nil { - r.seenRawAnomalies = make(map[string]bool) - } - - for _, anomaly := range anomalies { - key := anomaly.Source.String() + "|" + anomaly.DetectorName - if !r.seenRawAnomalies[key] { - fmt.Printf("[observer] [%s] ANOMALY: %s\n", anomaly.DetectorName, anomaly.Source.String()) - fmt.Printf(" %s\n", anomaly.Description) - r.seenRawAnomalies[key] = true - } - } -} - -// reportCorrelationChanges checks for new and cleared correlations. -func (r *StdoutReporter) reportCorrelationChanges(activeCorrelations []observer.ActiveCorrelation) { - if r.seenCorrelations == nil { - r.seenCorrelations = make(map[string]string) - } - - // Build set of currently active pattern names - currentlyActive := make(map[string]string) // pattern -> title - for _, ac := range activeCorrelations { - currentlyActive[ac.Pattern] = ac.Title - } - - // Check for new correlations (in current but not in seen) - for _, ac := range activeCorrelations { - if _, seen := r.seenCorrelations[ac.Pattern]; !seen { - fmt.Printf("[observer] NEW: %s\n", ac.Title) - for _, anomaly := range ac.Anomalies { - fmt.Printf(" - %s\n", anomaly.Description) - } - r.seenCorrelations[ac.Pattern] = ac.Title - } - } - - // Check for cleared correlations (in seen but not in current) - for pattern, title := range r.seenCorrelations { - if _, ok := currentlyActive[pattern]; !ok { - fmt.Printf("[observer] CLEARED: %s\n", title) - delete(r.seenCorrelations, pattern) - } - } -} - -// PrintFinalState prints the current state of all correlations. -// Call this at the end of a demo to see final cluster contents. -// Uses the last correlations received via Report. -func (r *StdoutReporter) PrintFinalState() { - if len(r.lastCorrelations) == 0 { - fmt.Println("[observer] Final state: no active correlations") - return - } - fmt.Println("[observer] Correlation Summary:") - for _, ac := range r.lastCorrelations { - fmt.Printf(" Cluster: %d anomalies\n", len(ac.Anomalies)) - for _, anomaly := range ac.Anomalies { - fmt.Printf(" - %s\n", anomaly.Description) - } - } -} diff --git a/comp/anomalydetection/observer/impl/events.go b/comp/anomalydetection/observer/impl/events.go index 57f08a605447..e634a70c0dfc 100644 --- a/comp/anomalydetection/observer/impl/events.go +++ b/comp/anomalydetection/observer/impl/events.go @@ -7,6 +7,7 @@ package observerimpl import ( observerdef "github.com/DataDog/datadog-agent/comp/anomalydetection/observer/def" + reporterdef "github.com/DataDog/datadog-agent/comp/anomalydetection/reporter/def" ) // engineEventKind identifies the type of engine event. @@ -63,14 +64,14 @@ type eventSink interface { // the event and active correlations from the stateView, then calls Report // on all registered reporters. type reporterEventSink struct { - reporters []observerdef.Reporter + reporters []reporterdef.Reporter state *stateView // for querying current correlations on advance } func (s *reporterEventSink) onEngineEvent(evt engineEvent) { if evt.kind == eventAdvanceCompleted { ac := evt.advanceCompleted - output := observerdef.ReportOutput{ + output := reporterdef.ReportOutput{ AdvancedToSec: ac.advancedToSec, NewAnomalies: ac.anomalies, } diff --git a/comp/anomalydetection/observer/impl/events_test.go b/comp/anomalydetection/observer/impl/events_test.go index 5b7869ceb843..26bd2233b7fd 100644 --- a/comp/anomalydetection/observer/impl/events_test.go +++ b/comp/anomalydetection/observer/impl/events_test.go @@ -15,6 +15,7 @@ import ( "github.com/stretchr/testify/require" observerdef "github.com/DataDog/datadog-agent/comp/anomalydetection/observer/def" + reporterdef "github.com/DataDog/datadog-agent/comp/anomalydetection/reporter/def" ) // collectingSink collects all events for test assertions. @@ -609,7 +610,7 @@ func TestReporterEventSink(t *testing.T) { reporter := &countingReporter{count: &reported} sink := &reporterEventSink{ - reporters: []observerdef.Reporter{reporter}, + reporters: []reporterdef.Reporter{reporter}, } // advanceCompleted should trigger Report. @@ -637,7 +638,7 @@ type countingReporter struct { } func (r *countingReporter) Name() string { return "counting" } -func (r *countingReporter) Report(_ observerdef.ReportOutput) { *r.count++ } +func (r *countingReporter) Report(_ reporterdef.ReportOutput) { *r.count++ } func TestFindingM1_DedupKeyTooCoarse(t *testing.T) { anomalies := []observerdef.Anomaly{ diff --git a/comp/anomalydetection/observer/impl/log_metrics_extractor.go b/comp/anomalydetection/observer/impl/log_metrics_extractor.go index 271bd5eb8b89..e87df18bbc51 100644 --- a/comp/anomalydetection/observer/impl/log_metrics_extractor.go +++ b/comp/anomalydetection/observer/impl/log_metrics_extractor.go @@ -38,9 +38,6 @@ func DefaultLogMetricsExtractorConfig() LogMetricsExtractorConfig { } } -// LogMetricsExtractorName is the canonical name for the log metrics extractor. -const LogMetricsExtractorName = "log_metrics_extractor" - // LogMetricsExtractor converts logs into timeseries metric outputs: // - JSON logs: numeric fields -> Avg aggregation // - Unstructured logs: pattern frequency -> Sum aggregation @@ -64,7 +61,7 @@ func NewLogMetricsExtractor(config LogMetricsExtractorConfig) *LogMetricsExtract return &LogMetricsExtractor{config: config} } -func (a *LogMetricsExtractor) Name() string { return LogMetricsExtractorName } +func (a *LogMetricsExtractor) Name() string { return observer.LogMetricsExtractorNamespace } // Reset clears cached per-series context so replay/reanalysis starts from the // currently observed data instead of reusing stale examples. diff --git a/comp/anomalydetection/observer/impl/log_pattern_extractor.go b/comp/anomalydetection/observer/impl/log_pattern_extractor.go index dc431953230c..ed04a9cefe9b 100644 --- a/comp/anomalydetection/observer/impl/log_pattern_extractor.go +++ b/comp/anomalydetection/observer/impl/log_pattern_extractor.go @@ -13,11 +13,6 @@ import ( "github.com/DataDog/datadog-agent/comp/anomalydetection/observer/impl/patterns" ) -// LogPatternExtractorName is the canonical name for the log pattern extractor. -// It is used as the storage namespace for emitted metrics, as the component -// name in the catalog, and in notify formatting for log-derived anomalies. -const LogPatternExtractorName = "log_pattern_extractor" - // TODO(agent-q): Add a test to ensure this is >= the time we evict metrics // defaultClusterTimeToLive is the time to live for a cluster. // If a cluster hasn't been seen since this time, it will be removed. @@ -213,7 +208,7 @@ func NewLogPatternExtractor(cfg LogPatternExtractorConfig) *LogPatternExtractor // Name returns the extractor name. func (e *LogPatternExtractor) Name() string { - return "log_pattern_extractor" + return observerdef.LogPatternExtractorNamespace } // Reset clears clustering and cached per-series context so reanalysis starts diff --git a/comp/anomalydetection/observer/impl/log_tagged_pattern_clusterer.go b/comp/anomalydetection/observer/impl/log_tagged_pattern_clusterer.go index f979d5c00f44..04426133e64f 100644 --- a/comp/anomalydetection/observer/impl/log_tagged_pattern_clusterer.go +++ b/comp/anomalydetection/observer/impl/log_tagged_pattern_clusterer.go @@ -16,12 +16,8 @@ import ( "github.com/DataDog/datadog-agent/comp/anomalydetection/observer/impl/patterns" ) -// splitTagKeyOrder is the canonical ordered list of tag dimensions used to split -// log clusters. The order governs how split-tag summaries are rendered in event -// messages. Add new fields here AND in TagGroupByKey / extractTagGroupByKey. -var splitTagKeyOrder = []string{"source", "service", "env", "host"} - // TagGroupByKey holds the tags that are responsible for grouping logs into different clusters. +// Canonical key order for display is observer.SplitTagKeyOrder (def/constants.go). // Absent tags (e.g. a log with no "env" tag) are represented by an empty string. type TagGroupByKey struct { // Warning: Don't forget to update functions parsing tags when adding new fields diff --git a/comp/anomalydetection/observer/impl/observer.go b/comp/anomalydetection/observer/impl/observer.go index 1a9fa390b2b6..483f5245ae44 100644 --- a/comp/anomalydetection/observer/impl/observer.go +++ b/comp/anomalydetection/observer/impl/observer.go @@ -21,6 +21,7 @@ import ( observerdef "github.com/DataDog/datadog-agent/comp/anomalydetection/observer/def" "github.com/DataDog/datadog-agent/comp/anomalydetection/observer/impl/hfrunner" recorderdef "github.com/DataDog/datadog-agent/comp/anomalydetection/recorder/def" + reporterdef "github.com/DataDog/datadog-agent/comp/anomalydetection/reporter/def" config "github.com/DataDog/datadog-agent/comp/core/config" log "github.com/DataDog/datadog-agent/comp/core/log/def" remoteagentregistry "github.com/DataDog/datadog-agent/comp/core/remoteagentregistry/def" @@ -37,6 +38,8 @@ import ( // Requires declares the input types to the observer component constructor. type Requires struct { + compdef.In + Lifecycle compdef.Lifecycle Config config.Component Log log.Component @@ -57,6 +60,9 @@ type Requires struct { WMeta option.Option[workloadmetadef.Component] FilterStore option.Option[workloadfilterdef.Component] Tagger option.Option[taggerdef.Component] + + // Reporters are provided by the reporter component via the anomalydetection_reporters Fx group. + Reporters []reporterdef.Reporter `group:"anomalydetection_reporters"` } // Provides defines the output of the observer component. @@ -181,14 +187,19 @@ func NewComponent(deps Requires) Provides { scheduler: ¤tBehaviorPolicy{}, }) - // Wire reporters via event subscription. - // The reporterEventSink queries stateView for active correlations on each advance, - // so reporters receive all needed data through ReportOutput without backdoor access. - reporter := &StdoutReporter{} - eng.Subscribe(&reporterEventSink{ - reporters: []observerdef.Reporter{reporter}, - state: eng.StateView(), - }) + // Wire reporters provided by the reporter component via the Fx group. + // Each reporter gets its own subscription so it receives advance events independently. + // Reporters that implement StorageConsumer receive storage for rate annotations. + for _, r := range deps.Reporters { + r := r + if sc, ok := r.(reporterdef.StorageConsumer); ok { + sc.SetStorage(eng.Storage()) + } + eng.Subscribe(&reporterEventSink{ + reporters: []reporterdef.Reporter{r}, + state: eng.StateView(), + }) + } telemetryComp := deps.Telemetry if telemetryComp == nil { @@ -257,19 +268,6 @@ func NewComponent(deps Requires) Provides { } } - // Optionally add the event reporter when sending is enabled via config. - if cfg.GetBool("observer.event_reporter.sending_enabled") { - if sender, err := newEventSender(deps.Config, deps.Log, eng.Storage()); err != nil { - deps.Log.Warnf("[observer] event_reporter disabled: %v", err) - } else { - eventReporter := &EventReporter{sender: sender, logger: deps.Log} - eng.Subscribe(&reporterEventSink{ - reporters: []observerdef.Reporter{eventReporter}, - state: eng.StateView(), - }) - } - } - go obs.run() // Start high-frequency system check runner if enabled. diff --git a/comp/anomalydetection/observer/impl/output.go b/comp/anomalydetection/observer/impl/output.go index e7fc5dd33771..6e790dbcbd9b 100644 --- a/comp/anomalydetection/observer/impl/output.go +++ b/comp/anomalydetection/observer/impl/output.go @@ -10,6 +10,8 @@ import ( "fmt" "os" "sort" + + reporterimpl "github.com/DataDog/datadog-agent/comp/anomalydetection/reporter/impl" ) // ObserverOutput is the top-level JSON structure produced by headless mode. @@ -119,7 +121,7 @@ func (tb *TestBench) WriteObserverOutput(path string, verbose bool) error { if verbose { oc.Title = corr.Title - oc.Message = buildChangeMessage(corr, tb.engine.Storage()) + oc.Message = reporterimpl.BuildChangeMessage(corr, tb.engine.Storage()) oc.Tags = []string{"source:agent-q-branch-observer", "pattern:" + corr.Pattern} oc.MemberSeries = make([]string, len(corr.Members)) for j, m := range corr.Members { diff --git a/comp/anomalydetection/observer/impl/reporter_html.go b/comp/anomalydetection/observer/impl/reporter_html.go deleted file mode 100644 index c7c21683e907..000000000000 --- a/comp/anomalydetection/observer/impl/reporter_html.go +++ /dev/null @@ -1,1547 +0,0 @@ -// Unless explicitly stated otherwise all files in this repository are licensed -// under the Apache License Version 2.0. -// This product includes software developed at Datadog (https://www.datadoghq.com/). -// Copyright 2016-present Datadog, Inc. - -package observerimpl - -import ( - "context" - "encoding/json" - "log" - "math" - "net/http" - "sort" - "strconv" - "strings" - "sync" - "time" - - observer "github.com/DataDog/datadog-agent/comp/anomalydetection/observer/def" -) - -// parseInt64 parses a string to int64. -func parseInt64(s string) (int64, error) { - return strconv.ParseInt(s, 10, 64) -} - -// sanitizeFloat replaces Inf, NaN, and extremely large values with 0 for JSON compatibility. -// Extremely large values (> 1e15) can cause Chart.js to crash. -func sanitizeFloat(v float64) float64 { - if math.IsInf(v, 0) || math.IsNaN(v) { - return 0 - } - // Cap extremely large values to prevent Chart.js crashes - if v > 1e15 || v < -1e15 { - return 0 - } - return v -} - -const maxReportBuffer = 100 - -// timestampedReport wraps a ReportOutput with a wall-clock timestamp. -type timestampedReport struct { - AdvancedToSec int64 `json:"advanced_to_sec"` - NewAnomalyCount int `json:"new_anomaly_count"` - CorrelationCount int `json:"correlation_count"` - Timestamp time.Time `json:"timestamp"` - ActiveCorrelations []observer.ActiveCorrelation `json:"active_correlations,omitempty"` -} - -// HTMLReporter is an HTTP server that displays reports and metrics on a local webpage. -type HTMLReporter struct { - mu sync.RWMutex - reports []timestampedReport - storage *timeSeriesStorage - correlationState observer.Correlator - rawAnomalyState observer.RawAnomalyState - timeClusterCorrelator *TimeClusterCorrelator - server *http.Server -} - -// NewHTMLReporter creates a new HTMLReporter. -func NewHTMLReporter() *HTMLReporter { - return &HTMLReporter{ - reports: make([]timestampedReport, 0, maxReportBuffer), - } -} - -// Name returns the reporter name. -func (r *HTMLReporter) Name() string { - return "html_reporter" -} - -// Report adds a report to the buffer. -func (r *HTMLReporter) Report(report observer.ReportOutput) { - r.mu.Lock() - defer r.mu.Unlock() - - tr := timestampedReport{ - AdvancedToSec: report.AdvancedToSec, - NewAnomalyCount: len(report.NewAnomalies), - CorrelationCount: len(report.ActiveCorrelations), - Timestamp: time.Now(), - ActiveCorrelations: report.ActiveCorrelations, - } - - // Prepend to keep most recent first - r.reports = append([]timestampedReport{tr}, r.reports...) - - // Cap at maxReportBuffer (evict oldest) - if len(r.reports) > maxReportBuffer { - r.reports = r.reports[:maxReportBuffer] - } -} - -// SetStorage sets the metric storage for querying series data. -func (r *HTMLReporter) SetStorage(storage *timeSeriesStorage) { - r.mu.Lock() - defer r.mu.Unlock() - r.storage = storage -} - -// SetCorrelationState sets the correlation state source for querying active correlations. -func (r *HTMLReporter) SetCorrelationState(state observer.Correlator) { - r.mu.Lock() - defer r.mu.Unlock() - r.correlationState = state -} - -// SetRawAnomalyState sets the raw anomaly state source for querying individual anomalies. -func (r *HTMLReporter) SetRawAnomalyState(state observer.RawAnomalyState) { - r.mu.Lock() - defer r.mu.Unlock() - r.rawAnomalyState = state -} - -// SetTimeClusterCorrelator sets the time cluster correlator for visualization. -func (r *HTMLReporter) SetTimeClusterCorrelator(tc *TimeClusterCorrelator) { - r.mu.Lock() - defer r.mu.Unlock() - r.timeClusterCorrelator = tc -} - -// Start starts the HTTP server on the given address. -func (r *HTMLReporter) Start(addr string) error { - mux := http.NewServeMux() - mux.HandleFunc("/", r.handleDashboard) - mux.HandleFunc("/timecluster", r.handleTimeClusterPage) - mux.HandleFunc("/api/reports", r.handleAPIReports) - mux.HandleFunc("/api/series", r.handleAPISeries) - mux.HandleFunc("/api/series/list", r.handleAPISeriesList) - mux.HandleFunc("/api/series/batch", r.handleAPISeriesBatch) - mux.HandleFunc("/api/correlations", r.handleAPICorrelations) - mux.HandleFunc("/api/raw-anomalies", r.handleAPIRawAnomalies) - mux.HandleFunc("/api/timecluster/clusters", r.handleAPITimeClusterClusters) - mux.HandleFunc("/api/timecluster/stats", r.handleAPITimeClusterStats) - - r.server = &http.Server{ - Addr: addr, - Handler: mux, - } - - go func() { - if err := r.server.ListenAndServe(); err != nil && err != http.ErrServerClosed { - log.Printf("[observer] HTMLReporter server error: %v", err) - } - }() - - return nil -} - -// Stop stops the HTTP server. -func (r *HTMLReporter) Stop() error { - if r.server == nil { - return nil - } - - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - - return r.server.Shutdown(ctx) -} - -// handleDashboard serves the HTML dashboard. -func (r *HTMLReporter) handleDashboard(w http.ResponseWriter, req *http.Request) { - if req.URL.Path != "/" { - http.NotFound(w, req) - return - } - - w.Header().Set("Content-Type", "text/html; charset=utf-8") - w.WriteHeader(http.StatusOK) - - html := ` - -
- -