Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 10 additions & 14 deletions comp/observer/def/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,9 +258,16 @@ type LogObserver interface {
// The storage keeps full summaries (min/max/sum/count) so aggregation
// is specified at read time, not write time.
type MetricOutput struct {
Name string
Value float64
Tags []string
Name string
Value float64
Tags []string
// Context is the pre-built enrichment context for this metric. The engine
// stores it keyed by SeriesRef so anomaly enrichment is O(1) without any
// string lookups. Nil means no enrichment context for this metric.
Context *MetricContext
// ContextKey is an opaque string used by the engine to map this metric to
// its storage series for eviction. Only set by extractors that implement
// LRU eviction (e.g. LogPatternExtractor). Leave empty if not needed.
ContextKey string
}

Expand Down Expand Up @@ -576,17 +583,6 @@ func AggregateString(agg Aggregate) string {
}
}

// ContextProvider resolves metric keys back to richer context about their
// origin. Components that synthesize metrics from richer data (e.g. log
// extractors that turn log patterns into count metrics) can implement this
// interface so that downstream consumers (detectors, reporters) can produce
// more descriptive anomaly reports.
type ContextProvider interface {
// GetContextByKey returns contextual information for a previously emitted
// context key, if available.
GetContextByKey(key string) (MetricContext, bool)
}

// MetricContext describes the origin of a synthesized metric.
type MetricContext struct {
// Pattern is the normalized pattern that generated this metric (e.g. a log signature).
Expand Down
14 changes: 0 additions & 14 deletions comp/observer/impl/context_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,6 @@ func validateUniqueExtractorNames(extractors []observer.LogMetricsExtractor) {
}
}

// collectContextProviders discovers ContextProvider implementations among
// instantiated extractors via type assertion. Returns a map keyed by the
// extractor's component name (which is used as the storage namespace for
// its metrics), enabling O(1) lookup during anomaly enrichment.
func collectContextProviders(extractors []observer.LogMetricsExtractor) map[string]observer.ContextProvider {
providers := make(map[string]observer.ContextProvider)
for _, ext := range extractors {
if cp, ok := ext.(observer.ContextProvider); ok {
providers[ext.Name()] = cp
}
}
return providers
}

// truncate shortens s to maxLen, appending "..." if truncated.
func truncate(s string, maxLen int) string {
runes := []rune(s)
Expand Down
111 changes: 56 additions & 55 deletions comp/observer/impl/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ type anomalyDedupKey struct {
title string
}

type seriesContextRef struct {
// nsContextKey is a map key that pairs an extractor namespace with the opaque
// context key it produced, used by contextKeyToStorageKey for O(1) eviction.
type nsContextKey struct {
namespace string
contextKey string
}
Expand All @@ -48,8 +50,12 @@ type engine struct {
extractors []observerdef.LogMetricsExtractor
detectors []observerdef.Detector
correlators []observerdef.Correlator
contextProviders map[string]observerdef.ContextProvider // namespace → provider
contextRefs map[string]seriesContextRef
// contextByRef caches the MetricContext for each series keyed by SeriesRef.
// Updated on every ingest; used for O(1) anomaly enrichment.
contextByRef map[observerdef.SeriesRef]*observerdef.MetricContext
// contextKeyToStorageKey maps (namespace, contextKey) → storage key for O(1)
// eviction lookup. Only populated for extractors that use LRU eviction.
contextKeyToStorageKey map[nsContextKey]string

// scheduler decides when the engine should advance analysis.
scheduler schedulerPolicy
Expand Down Expand Up @@ -137,8 +143,6 @@ type engineConfig struct {
extractors []observerdef.LogMetricsExtractor
detectors []observerdef.Detector
correlators []observerdef.Correlator
contextProviders map[string]observerdef.ContextProvider // namespace → provider

// scheduler is the scheduling policy. If nil, defaults to currentBehaviorPolicy.
scheduler schedulerPolicy

Expand All @@ -164,8 +168,8 @@ func newEngine(cfg engineConfig) *engine {
extractors: cfg.extractors,
detectors: cfg.detectors,
correlators: cfg.correlators,
contextProviders: cfg.contextProviders,
contextRefs: make(map[string]seriesContextRef),
contextByRef: make(map[observerdef.SeriesRef]*observerdef.MetricContext),
contextKeyToStorageKey: make(map[nsContextKey]string),
scheduler: sched,

rawAnomalyWindow: cfg.rawAnomalyWindow,
Expand Down Expand Up @@ -307,7 +311,7 @@ func (e *engine) IngestLog(source string, l *logObs) ([]advanceRequest, []observ
for _, extractor := range e.extractors {
processingStartTime := time.Now()
out := extractor.ProcessLog(view)
e.removeContextRefsForEvictedKeys(extractor.Name(), out.EvictedContextKeys)
e.removeContextsForEvictedKeys(extractor.Name(), out.EvictedContextKeys)
nanos := float64(time.Since(processingStartTime).Nanoseconds())
if e.onProcessingTime != nil {
e.onProcessingTime(e.detectorTag(extractor.Name()), nanos)
Expand All @@ -327,16 +331,14 @@ func (e *engine) IngestLog(source string, l *logObs) ([]advanceRequest, []observ
tags = append(newTags, sourceTag)
}
res := e.storage.Add(extractor.Name(), m.Name, m.Value, l.timestampMs/1000, tags)
if m.ContextKey != "" && res.StorageKey != "" {
// Reuse the storage key computed inside storage.Add instead of
// recomputing seriesKey here. seriesKey is hot enough that this
// duplicate accounted for ~14.5 MiB heap-live in the
// quality_gate_container_logs SMP profile (now renamed to
// observer_logs_anomaly_stress; the 'quality_gate_*' prefix is
// reserved for SMP quality-gate cases).
e.contextRefs[res.StorageKey] = seriesContextRef{
namespace: extractor.Name(),
contextKey: m.ContextKey,
if res.Ref >= 0 && res.IsNew {
if m.Context != nil {
e.contextByRef[res.Ref] = m.Context
}
if m.ContextKey != "" {
// Record the storage key for this context so we can remove
// the series when the extractor LRU-evicts its context.
e.contextKeyToStorageKey[nsContextKey{extractor.Name(), m.ContextKey}] = res.StorageKey
}
}
}
Expand Down Expand Up @@ -374,33 +376,22 @@ func sliceContains(items []string, want string) bool {
return false
}

// removeContextRefsForEvictedKeys drops engine contextRefs whose extractor
// namespace and context key match an eviction from extractor GC, and frees
// the corresponding storage series. Without the storage cleanup, evicted
// patterns leak their tags + columnar arrays indefinitely (the contextRefs
// map is just metadata; the heavy data lives in storage.series).
func (e *engine) removeContextRefsForEvictedKeys(namespace string, evictedKeys []string) {
// No garbage collection done
// removeContextsForEvictedKeys frees storage series whose extractor context
// keys have been evicted by LRU or GC. Uses contextKeyToStorageKey for O(1)
// lookup instead of scanning the full context map.
func (e *engine) removeContextsForEvictedKeys(namespace string, evictedKeys []string) {
if len(evictedKeys) == 0 {
return
}
want := make(map[string]struct{}, len(evictedKeys))
for _, k := range evictedKeys {
if k != "" {
want[k] = struct{}{}
}
}
if len(want) == 0 {
return
}
var storageKeys []string
for seriesID, ref := range e.contextRefs {
if ref.namespace != namespace {
for _, key := range evictedKeys {
if key == "" {
continue
}
if _, ok := want[ref.contextKey]; ok {
delete(e.contextRefs, seriesID)
storageKeys = append(storageKeys, seriesID)
nck := nsContextKey{namespace, key}
if sk, ok := e.contextKeyToStorageKey[nck]; ok {
delete(e.contextKeyToStorageKey, nck)
storageKeys = append(storageKeys, sk)
}
}
if len(storageKeys) > 0 {
Expand Down Expand Up @@ -429,9 +420,12 @@ func (e *engine) removeContextRefsForEvictedKeys(namespace string, evictedKeys [
// taking their own locks. Adding a new caller of this function from a
// different goroutine would break that invariant for every detector.
func (e *engine) fanOutSeriesRemoval(refs []observerdef.SeriesRef) {
if len(refs) == 0 || len(e.detectors) == 0 {
if len(refs) == 0 {
return
}
for _, ref := range refs {
delete(e.contextByRef, ref)
}
for _, d := range e.detectors {
if remover, ok := d.(observerdef.SeriesRemover); ok {
remover.RemoveSeries(refs)
Expand Down Expand Up @@ -643,21 +637,27 @@ func (e *engine) runDetectorsAndCorrelatorsSnapshot(upTo int64, detectors []obse
// Lookup builds the storage key from Source fields (namespace, name, tags)
// and maps that to a provider namespace and context key.
func (e *engine) enrichAnomaly(a *observerdef.Anomaly) {
if a.Source.Name == "" {
return
}
fullKey := seriesKey(a.Source.Namespace, a.Source.Name, a.Source.Tags)

ref, ok := e.contextRefs[fullKey]
if !ok {
return
var ref observerdef.SeriesRef = -1
if a.SourceRef != nil {
ref = a.SourceRef.Ref
} else if a.Source.Name != "" {
// Fallback for anomalies emitted without a SourceRef (e.g. bare
// detectors in tests). Look up the ref from storage via the Source
// descriptor. This path does a string key lookup which is less
// efficient, but only reached by anomalies that don't go through
// seriesDetectorAdapter.
key := seriesKey(a.Source.Namespace, a.Source.Name, a.Source.Tags)
e.storage.mu.RLock()
if stats, ok := e.storage.series[key]; ok {
ref = stats.ref
}
e.storage.mu.RUnlock()
}
provider, ok := e.contextProviders[ref.namespace]
if !ok {
if ref < 0 {
return
}
ctx, ok := provider.GetContextByKey(ref.contextKey)
if !ok {
ctx, ok := e.contextByRef[ref]
if !ok || ctx == nil {
return
}
a.Context = &observerdef.MetricContext{
Expand Down Expand Up @@ -894,8 +894,8 @@ func (e *engine) SetExtractors(extractors []observerdef.LogMetricsExtractor) {

validateUniqueExtractorNames(extractors)
e.extractors = extractors
e.contextProviders = collectContextProviders(extractors)
e.contextRefs = make(map[string]seriesContextRef)
e.contextByRef = make(map[observerdef.SeriesRef]*observerdef.MetricContext)
e.contextKeyToStorageKey = make(map[nsContextKey]string)
e.rebuildDetectorTags()
}

Expand Down Expand Up @@ -924,7 +924,8 @@ func (e *engine) Reset() {
}
}

e.contextRefs = make(map[string]seriesContextRef)
e.contextByRef = make(map[observerdef.SeriesRef]*observerdef.MetricContext)
e.contextKeyToStorageKey = make(map[nsContextKey]string)
}

// resetRawAnomalies clears the raw anomaly tracking state.
Expand Down
Loading
Loading