Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
538c052
[observer] Batch 1: string content + gate accumulated telemetry
Eokye May 5, 2026
60ec191
[observer] Batch 2: direct gauge.Set for per-component processing time
Eokye May 5, 2026
1a99bc9
[observer] Batch 3: inline fnv64a hashers + kill fmt.Sprintf
Eokye May 5, 2026
e48e3fc
[observer] Batch 4: merge seriesIDs map into seriesStats.ref
Eokye May 6, 2026
2937d41
[observer] storage: hash-keyed series map + SeriesRef-keyed contextRefs
Eokye May 8, 2026
1b952f0
[observer] intern tag combinations to share []string across series
Eokye May 8, 2026
ffff93c
[observer] Batch 5: eliminate contextKey indirection — store MetricCo…
Eokye May 11, 2026
95e7ed2
[observer] Batch 7: bound storage — point retention + series cap
Eokye May 11, 2026
b66b7bc
[observer] Batch 8: cache SeriesRef slice instead of SeriesMeta in de…
Eokye May 11, 2026
68d4f3a
[observer] Batch 9: replace count+gen skip with pure gen-based skip i…
Eokye May 12, 2026
fad5542
[observer] Merge q-branch-observer + fix GetContent interface
Eokye May 12, 2026
e341d67
[observer] Fix compile errors in anomalydetection after merge
Eokye May 12, 2026
5fab87f
[observer] remove stale contextRefs references from comments
Eokye May 12, 2026
ca9fb7b
[observer] add config vars for storage max_series, eviction_floor_rat…
Eokye May 14, 2026
a9d5f38
[observer] revert unnecessary comment on IngestionTimestamp
Eokye May 14, 2026
3ce627f
[observer] restore IngestionTimestamp comment and LogView comment in …
Eokye May 14, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion comp/anomalydetection/logssource/impl/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (p *observerPipeline) start() {
go func() {
defer close(p.drainDone)
for msg := range p.outputChan {
p.observerHandle.ObserveLog(msg)
p.observerHandle.ObserveLog(&messageLogView{msg: msg})
}
}()
p.proc.Start()
Expand Down Expand Up @@ -97,3 +97,17 @@ func (p *observerPipeline) Stop() {}
func (p *observerPipeline) Flush(ctx context.Context) {
p.proc.Flush(ctx)
}

// messageLogView adapts *message.Message (whose GetContent returns []byte) to
// the observer's LogView interface (whose GetContent returns string). The
// string conversion copies the bytes once at the pipeline boundary; downstream
// extractors then share the resulting immutable string with zero copies.
type messageLogView struct {
msg *message.Message
}

func (v *messageLogView) GetContent() string { return string(v.msg.GetContent()) }
func (v *messageLogView) GetStatus() string { return v.msg.GetStatus() }
func (v *messageLogView) GetTags() []string { return v.msg.GetTags() }
func (v *messageLogView) GetHostname() string { return v.msg.GetHostname() }
func (v *messageLogView) GetTimestampUnixMilli() int64 { return v.msg.GetTimestampUnixMilli() }
2 changes: 1 addition & 1 deletion comp/anomalydetection/observer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ type MyExtractor struct{}
func (m *MyExtractor) Name() string { return "my_extractor" }

func (m *MyExtractor) ProcessLog(log observer.LogView) observer.LogMetricsExtractorOutput {
content := string(log.GetContent())
content := log.GetContent()
// Extract what you need synchronously — don't store the view
return observer.LogMetricsExtractorOutput{
Metrics: []observer.MetricOutput{{
Expand Down
27 changes: 8 additions & 19 deletions comp/anomalydetection/observer/def/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type MetricView interface {
// This interface exists to prevent data races. Implementations must not store
// the LogView itself. Copy any needed values synchronously.
type LogView interface {
GetContent() []byte
GetContent() string
GetStatus() string
GetTags() []string
GetHostname() string
Expand Down Expand Up @@ -236,19 +236,19 @@ type LogObserver interface {
// The storage keeps full summaries (min/max/sum/count) so aggregation
// is specified at read time, not write time.
type MetricOutput struct {
Name string
Value float64
Tags []string
ContextKey string
Name string
Value float64
Tags []string
Context *MetricContext // optional; stored on the series for anomaly enrichment
}

// LogMetricsExtractorOutput is what we obtain when we process a log with a log metrics extractor.
type LogMetricsExtractorOutput struct {
Metrics []MetricOutput
Telemetry []ObserverTelemetry
// EvictedContextKeys lists context keys that are no longer valid (e.g. after
// extractor garbage collection). The engine removes matching contextRefs entries.
EvictedContextKeys []string
// EvictedMetricNames lists metric names whose series should be removed from
// storage (e.g. after extractor LRU eviction or garbage collection).
EvictedMetricNames []string
}

// SeriesDescriptor is the fully resolved identity of a time series.
Expand Down Expand Up @@ -554,17 +554,6 @@ func AggregateString(agg Aggregate) string {
}
}

// ContextProvider resolves metric keys back to richer context about their
// origin. Components that synthesize metrics from richer data (e.g. log
// extractors that turn log patterns into count metrics) can implement this
// interface so that downstream consumers (detectors, reporters) can produce
// more descriptive anomaly reports.
type ContextProvider interface {
// GetContextByKey returns contextual information for a previously emitted
// context key, if available.
GetContextByKey(key string) (MetricContext, bool)
}

// MetricContext describes the origin of a synthesized metric.
type MetricContext struct {
// Pattern is the normalized pattern that generated this metric (e.g. a log signature).
Expand Down
23 changes: 2 additions & 21 deletions comp/anomalydetection/observer/impl/agent_internal_logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
package observerimpl

import (
"hash/fnv"
"testing"
"time"

Expand Down Expand Up @@ -46,11 +45,9 @@ func TestAgentInternalLogsFlowIntoObserver(t *testing.T) {
pkglog.Info(msg)

// Agent logs are forwarded as structured JSON: {"msg":"..."}.
payload := []byte(`{"msg":"agent internal hello"}`)
payload := `{"msg":"agent internal hello"}`
sig := logSignature(payload, 4096)
h := fnv.New64a()
_, _ = h.Write([]byte(sig))
metricName := "log.pattern." + toHex64(h.Sum64()) + ".count"
metricName := patternCountMetricName(sig)
tags := []string{"component:core", "level:info", "observer_source:agent-internal-logs", "source:datadog-agent"}

// Poll briefly since observer processes asynchronously.
Expand All @@ -61,19 +58,3 @@ func TestAgentInternalLogsFlowIntoObserver(t *testing.T) {
require.Greater(collect, len(s.Points), 0)
}, time.Second*5, time.Millisecond*10)
}

func toHex64(v uint64) string {
const hextable = "0123456789abcdef"
var out [16]byte
for i := 15; i >= 0; i-- {
out[i] = hextable[v&0xF]
v >>= 4
}
// Mirror fmt.Sprintf("%x", ...) (no leading zeros trimmed? actually %x trims; we keep full width here but it won't match)
// Trim leading zeros for parity with production metric naming (fmt %x).
i := 0
for i < 15 && out[i] == '0' {
i++
}
return string(out[i:])
}
46 changes: 23 additions & 23 deletions comp/anomalydetection/observer/impl/bench_log_ingestion_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,50 +12,50 @@ import (

// diverseLogContent returns distinct line shapes (JSON, kv, syslog, plain) for series s
// so neighboring series exercise different tokenizer / pattern signatures.
func diverseLogContent(s int) []byte {
func diverseLogContent(s int) string {
switch s % 20 {
case 0:
return []byte(fmt.Sprintf(`{"msg":"log from series %d","level":"info"}`, s))
return fmt.Sprintf(`{"msg":"log from series %d","level":"info"}`, s)
case 1:
return []byte(fmt.Sprintf(`{"@timestamp":"2024-01-01T00:00:00Z","message":"evt-%d","severity":"WARN","svc":"api"}`, s))
return fmt.Sprintf(`{"@timestamp":"2024-01-01T00:00:00Z","message":"evt-%d","severity":"WARN","svc":"api"}`, s)
case 2:
return []byte(fmt.Sprintf(`{"trace_id":"%08x","span_id":"%08x","msg":"child span","ok":true}`, s, s+1))
return fmt.Sprintf(`{"trace_id":"%08x","span_id":"%08x","msg":"child span","ok":true}`, s, s+1)
case 3:
return []byte(fmt.Sprintf(`{"nested":{"user":%d,"shard":3},"event":"login","ip":"10.0.0.%d"}`, s, s%255))
return fmt.Sprintf(`{"nested":{"user":%d,"shard":3},"event":"login","ip":"10.0.0.%d"}`, s, s%255)
case 4:
return []byte(fmt.Sprintf(`level=INFO ts=1704067200 series=%d msg="request done" duration_ms=12`, s))
return fmt.Sprintf(`level=INFO ts=1704067200 series=%d msg="request done" duration_ms=12`, s)
case 5:
return []byte(fmt.Sprintf(`level=ERROR logger=com.example req=%d stack=java.lang.Exception`, s))
return fmt.Sprintf(`level=ERROR logger=com.example req=%d stack=java.lang.Exception`, s)
case 6:
return []byte(fmt.Sprintf(`[2024-01-15 14:30:00] INFO worker-%d task=flush completed=true`, s))
return fmt.Sprintf(`[2024-01-15 14:30:00] INFO worker-%d task=flush completed=true`, s)
case 7:
return []byte(fmt.Sprintf(`<134>1 2024-01-15T14:30:00Z host app-%d - - - msg="syslog style"`, s))
return fmt.Sprintf(`<134>1 2024-01-15T14:30:00Z host app-%d - - - msg="syslog style"`, s)
case 8:
return []byte(fmt.Sprintf(`10.1.2.3 - - [15/Jan/2024:14:30:00 +0000] "GET /api/v%d/items HTTP/1.1" 200 4321`, s%50))
return fmt.Sprintf(`10.1.2.3 - - [15/Jan/2024:14:30:00 +0000] "GET /api/v%d/items HTTP/1.1" 200 4321`, s%50)
case 9:
return []byte(fmt.Sprintf(`time="2024-01-15T14:30:00Z" level=debug msg="slow query" series=%d ms=450`, s))
return fmt.Sprintf(`time="2024-01-15T14:30:00Z" level=debug msg="slow query" series=%d ms=450`, s)
case 10:
return []byte(fmt.Sprintf(`{"arr":[%d,2,3],"obj":{"k":"v"},"flag":false}`, s))
return fmt.Sprintf(`{"arr":[%d,2,3],"obj":{"k":"v"},"flag":false}`, s)
case 11:
return []byte(fmt.Sprintf(`ERROR: connection reset by peer series=%d errno=104`, s))
return fmt.Sprintf(`ERROR: connection reset by peer series=%d errno=104`, s)
case 12:
return []byte(fmt.Sprintf(`{"msg":"unicode 测试 %d café","meta":{"region":"eu-west-1"}}`, s))
return fmt.Sprintf(`{"msg":"unicode 测试 %d café","meta":{"region":"eu-west-1"}}`, s)
case 13:
return []byte(fmt.Sprintf(`kafka: topic=logs partition=%d offset=999 key=null`, s))
return fmt.Sprintf(`kafka: topic=logs partition=%d offset=999 key=null`, s)
case 14:
return []byte(fmt.Sprintf(`{"a":1,"b":%d,"c":{"d":[1,2]}}`, s))
return fmt.Sprintf(`{"a":1,"b":%d,"c":{"d":[1,2]}}`, s)
case 15:
return []byte(fmt.Sprintf(`[pid=12345] series=%d action=gc pause_ms=3`, s))
return fmt.Sprintf(`[pid=12345] series=%d action=gc pause_ms=3`, s)
case 16:
return []byte(fmt.Sprintf(`{"http":{"method":"POST","path":"/hook/%d","status":201}}`, s))
return fmt.Sprintf(`{"http":{"method":"POST","path":"/hook/%d","status":201}}`, s)
case 17:
return []byte(fmt.Sprintf(`plain text line series=%d no json here`, s))
return fmt.Sprintf(`plain text line series=%d no json here`, s)
case 18:
return []byte(fmt.Sprintf(`{"double":%d.%d,"scientific":1.23e-4}`, s/10, s%10))
return fmt.Sprintf(`{"double":%d.%d,"scientific":1.23e-4}`, s/10, s%10)
case 19:
return []byte(fmt.Sprintf(`merge key=value series=%d another=42`, s))
return fmt.Sprintf(`merge key=value series=%d another=42`, s)
default:
return []byte(fmt.Sprintf(`{"msg":"log from series %d","level":"info"}`, s))
return fmt.Sprintf(`{"msg":"log from series %d","level":"info"}`, s)
}
}

Expand All @@ -68,7 +68,7 @@ func BenchmarkLogExtraction_SeriesCount(b *testing.B) {
logs := make([]*logObs, numSeries)
for s := 0; s < numSeries; s++ {
logs[s] = &logObs{
content: []byte(fmt.Sprintf(`{"msg":"log from series %d","level":"info"}`, s)),
content: fmt.Sprintf(`{"msg":"log from series %d","level":"info"}`, s),
status: "error",
tags: []string{fmt.Sprintf("series:%d", s)},
timestampMs: 0,
Expand Down
18 changes: 2 additions & 16 deletions comp/anomalydetection/observer/impl/context_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ package observerimpl
import (
"fmt"

observer "github.com/DataDog/datadog-agent/comp/anomalydetection/observer/def"
observerdef "github.com/DataDog/datadog-agent/comp/anomalydetection/observer/def"
)

// validateUniqueExtractorNames rejects duplicate runtime extractor names since
// they are used as namespaces in storage and context lookup.
func validateUniqueExtractorNames(extractors []observer.LogMetricsExtractor) {
func validateUniqueExtractorNames(extractors []observerdef.LogMetricsExtractor) {
seen := make(map[string]struct{}, len(extractors))
for _, ext := range extractors {
name := ext.Name()
Expand All @@ -24,20 +24,6 @@ func validateUniqueExtractorNames(extractors []observer.LogMetricsExtractor) {
}
}

// collectContextProviders discovers ContextProvider implementations among
// instantiated extractors via type assertion. Returns a map keyed by the
// extractor's component name (which is used as the storage namespace for
// its metrics), enabling O(1) lookup during anomaly enrichment.
func collectContextProviders(extractors []observer.LogMetricsExtractor) map[string]observer.ContextProvider {
providers := make(map[string]observer.ContextProvider)
for _, ext := range extractors {
if cp, ok := ext.(observer.ContextProvider); ok {
providers[ext.Name()] = cp
}
}
return providers
}

// truncate shortens s to maxLen, appending "..." if truncated.
func truncate(s string, maxLen int) string {
runes := []rune(s)
Expand Down
6 changes: 3 additions & 3 deletions comp/anomalydetection/observer/impl/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ type DebugView interface {
// AddTelemetry writes a data point into the engine's telemetry namespace.
// Used by the testbench to store per-detector timing stats for UI display.
AddTelemetry(name string, value float64, timestamp int64, tags []string)
// ReplayStoredData resets analysis state (preserving extractor context and
// contextRefs) then replays all data currently in storage through the
// scheduler in chronological order. Call after Flush().
// ReplayStoredData resets analysis state (preserving extractor context)
// then replays all data currently in storage through the scheduler in
// chronological order. Call after Flush().
ReplayStoredData()
// StorageReader returns a read-only view of the engine's time-series storage.
// Used by the testbench to compute windowed log rates in change messages.
Expand Down
Loading
Loading