diff --git a/internal/controller/consoleplugin/consoleplugin_objects.go b/internal/controller/consoleplugin/consoleplugin_objects.go index e262f1302..4af20c9b1 100644 --- a/internal/controller/consoleplugin/consoleplugin_objects.go +++ b/internal/controller/consoleplugin/consoleplugin_objects.go @@ -7,6 +7,7 @@ import ( "path/filepath" "slices" "strconv" + "strings" "time" lokiv1 "github.com/grafana/loki/operator/apis/loki/v1" @@ -455,7 +456,7 @@ func (b *builder) getPromConfig(ctx context.Context) cfg.PrometheusConfig { return config } -func (b *builder) setFrontendConfig(fconf *cfg.FrontendConfig) error { +func (b *builder) setFrontendConfig(fconf *cfg.FrontendConfig, metrics []cfg.MetricInfo) (string, error) { if b.desired.Agent.EBPF.IsPktDropEnabled() { fconf.Features = append(fconf.Features, "pktDrop") } @@ -506,7 +507,62 @@ func (b *builder) setFrontendConfig(fconf *cfg.FrontendConfig) error { // Add health rules metadata for frontend fconf.RecordingAnnotations = b.getHealthRecordingAnnotations() - return nil + // Filter-out disabled scopes + var scopes []cfg.ScopeConfig + var warnings []string + for i := range fconf.Scopes { + scope := &fconf.Scopes[i] + if len(scope.Feature) == 0 || slices.Contains(fconf.Features, scope.Feature) { + if b.desired.UseLoki() { + scopes = append(scopes, *scope) + } else if valid, warning := isScopeValidForMetrics(scope, metrics); valid { + scopes = append(scopes, *scope) + } else if scope.ID != "resource" { // don't trigger warning for "resource" scope, it's a known fact it won't be available + warnings = append(warnings, warning) + } + } + } + fconf.Scopes = scopes + + return strings.Join(warnings, "; "), nil +} + +func isScopeValidForMetrics(scope *cfg.ScopeConfig, metrics []cfg.MetricInfo) (bool, string) { + var bestMatch *cfg.MetricInfo + var bestMatchMissingLabels []string + var candidates []string + for i := range metrics { + if metrics[i].ValueField == "Bytes" || metrics[i].ValueField == "Packets" { + missing := missingLabels(scope, &metrics[i]) + if len(missing) == 0 { + if !metrics[i].Enabled { + candidates = append(candidates, metrics[i].Name) + } else { + return true, "" + } + } else if bestMatch == nil { + bestMatch = &metrics[i] + bestMatchMissingLabels = missing + } + } + } + if len(candidates) > 0 { + return false, fmt.Sprintf("Scope %s invalid for metrics (candidates: %s)", scope.ID, strings.Join(candidates, ", ")) + } + if bestMatch == nil { + return false, fmt.Sprintf("Scope %s invalid for metrics (no best match)", scope.ID) + } + return false, fmt.Sprintf("Scope %s invalid for metrics (best match: %s; missing labels: %s)", scope.ID, bestMatch.Name, strings.Join(bestMatchMissingLabels, ", ")) +} + +func missingLabels(scope *cfg.ScopeConfig, metric *cfg.MetricInfo) []string { + var missing []string + for _, label := range scope.Labels { + if !slices.Contains(metric.Labels, label) { + missing = append(missing, label) + } + } + return missing } func (b *builder) getHealthRecordingAnnotations() map[string]map[string]string { @@ -573,10 +629,14 @@ func (b *builder) configMap(ctx context.Context, lokiStack *lokiv1.LokiStack) (* if err != nil { return nil, "", err } - err = b.setFrontendConfig(&config.Frontend) + warning, err := b.setFrontendConfig(&config.Frontend, config.Prometheus.Metrics) if err != nil { return nil, "", err } + // TODO: with NETOBSERV-2375 => add DEGRADED to console status instead of this log + if warning != "" { + log.FromContext(ctx).Info("Frontend config DEGRADED", "message", warning) + } var configStr string bs, err := yaml.Marshal(config) diff --git a/internal/controller/consoleplugin/consoleplugin_test.go b/internal/controller/consoleplugin/consoleplugin_test.go index 2ea02e526..4ab180516 100644 --- a/internal/controller/consoleplugin/consoleplugin_test.go +++ b/internal/controller/consoleplugin/consoleplugin_test.go @@ -5,6 +5,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ascv2 "k8s.io/api/autoscaling/v2" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" @@ -15,7 +16,7 @@ import ( lokiv1 "github.com/grafana/loki/operator/apis/loki/v1" "github.com/netobserv/flowlogs-pipeline/pkg/api" flowslatest "github.com/netobserv/netobserv-operator/api/flowcollector/v1beta2" - config "github.com/netobserv/netobserv-operator/internal/controller/consoleplugin/config" + "github.com/netobserv/netobserv-operator/internal/controller/consoleplugin/config" "github.com/netobserv/netobserv-operator/internal/controller/constants" "github.com/netobserv/netobserv-operator/internal/controller/reconcilers" "github.com/netobserv/netobserv-operator/internal/pkg/cluster" @@ -715,3 +716,82 @@ func TestLokiStackNotFoundBehavior(t *testing.T) { // This ensures the console plugin can still function with a status URL // even if the LokiStack resource is temporarily unavailable } + +func TestScopeFilteringWithLoki(t *testing.T) { + scopeIDs := func(scopes []config.ScopeConfig) []string { + var ids []string + for i := range scopes { + ids = append(ids, scopes[i].ID) + } + return ids + } + plugin := getPluginConfig() + lokiSpec := flowslatest.FlowCollectorLoki{ + Enable: ptr.To(true), + } + loki := helper.NewLokiConfig(&lokiSpec, "any") + spec := flowslatest.FlowCollectorSpec{ + Agent: flowslatest.FlowCollectorAgent{ + EBPF: flowslatest.FlowCollectorEBPF{ + Features: []flowslatest.AgentFeature{flowslatest.UDNMapping}, + }, + }, + ConsolePlugin: plugin, + Loki: lokiSpec, + } + builder := getBuilder(&spec, &loki) + frontend, err := config.GetStaticFrontendConfig() + require.NoError(t, err) + assert.Equal(t, []string{"cluster", "network", "zone", "host", "namespace", "owner", "resource"}, scopeIDs(frontend.Scopes)) + + prom := builder.getPromConfig(context.Background()) + warning, err := builder.setFrontendConfig(&frontend, prom.Metrics) + require.NoError(t, err) + assert.Equal(t, []string{"network", "host", "namespace", "owner", "resource"}, scopeIDs(frontend.Scopes)) + assert.Equal(t, "", warning) +} + +func TestScopeFilteringNoLoki(t *testing.T) { + scopeIDs := func(scopes []config.ScopeConfig) []string { + var ids []string + for i := range scopes { + ids = append(ids, scopes[i].ID) + } + return ids + } + plugin := getPluginConfig() + lokiSpec := flowslatest.FlowCollectorLoki{ + Enable: ptr.To(false), + } + loki := helper.NewLokiConfig(&lokiSpec, "any") + spec := flowslatest.FlowCollectorSpec{ + Agent: flowslatest.FlowCollectorAgent{ + EBPF: flowslatest.FlowCollectorEBPF{ + Features: []flowslatest.AgentFeature{flowslatest.UDNMapping}, + }, + }, + Processor: flowslatest.FlowCollectorFLP{ + AddZone: ptr.To(true), + Metrics: flowslatest.FLPMetrics{ + IncludeList: &[]flowslatest.FLPMetric{ + "node_egress_bytes_total", + "namespace_egress_bytes_total", + }, + }, + }, + ConsolePlugin: plugin, + Loki: lokiSpec, + } + builder := getBuilder(&spec, &loki) + frontend, err := config.GetStaticFrontendConfig() + require.NoError(t, err) + assert.Equal(t, []string{"cluster", "network", "zone", "host", "namespace", "owner", "resource"}, scopeIDs(frontend.Scopes)) + + prom := builder.getPromConfig(context.Background()) + warning, err := builder.setFrontendConfig(&frontend, prom.Metrics) + require.NoError(t, err) + assert.Equal(t, []string{"zone", "host", "namespace"}, scopeIDs(frontend.Scopes)) + assert.Contains(t, warning, "Scope network invalid for metrics") + assert.Contains(t, warning, "Scope owner invalid for metrics") + assert.Contains(t, warning, "candidates: netobserv_workload_egress_bytes_total, ") +}