From 0a607a30f2d750c9aa5730c41e6251e0c1214119 Mon Sep 17 00:00:00 2001 From: Cedric Lamoriniere Date: Thu, 30 Apr 2026 20:04:01 +0000 Subject: [PATCH 1/3] [autoscaling] Add HPA migration controller for DPA workload autoscaler Adds a new HPA migration controller that detects existing HPAs for a DPA target, imports their configuration (metrics, replica bounds, custom queries) into the DPA spec as objectives/constraints, neutralises the HPA by setting its replica count to a very high value, and adds an HPAMigrationFinalizer to the DPA. On DPA deletion the finalizer handler restores the HPA to its original state before completing the deletion. Two correctness fixes are included: - Preserve HPA-imported objectives and constraints across profile template updates: when a ClusterProfile template hash changes, UpdateFromProfile() replaces the full DPA spec with the profile-derived one (which has no objectives). RestoreHPAImportedSpec() is now called before writing the updated spec to Kubernetes so the one-shot-imported fields survive. - Resolve %%tag_kube_cluster_name%% and %%env_VAR%% placeholders in DatadogMetric queries before storing them in the DPA spec. The Datadog backend cannot resolve cluster-agent-side template variables, so ResolveMetricQuery() (exported from externalmetrics/model) is called at import time inside resolveExternalMetricConfig(). Assisted-by: Claude:claude-sonnet-4-6 --- .../controllers/webhook/controller_base.go | 4 + .../mutate/autoscaling/hpa_webhook.go | 167 +++++ .../mutate/autoscaling/hpa_webhook_test.go | 245 +++++++ pkg/clusteragent/autoscaling/const.go | 19 + .../autoscaling/controller_fake.go | 2 + .../autoscaling/controller_testutils.go | 15 +- pkg/clusteragent/autoscaling/errors.go | 6 + .../datadogmetric_controller.go | 2 +- .../model/datadogmetricinternal.go | 10 +- .../externalmetrics/model/utils.go | 11 +- .../externalmetrics/model/utils_test.go | 4 +- .../autoscaling/workload/controller.go | 90 +++ .../workload/controller_hpa_migration.go | 675 ++++++++++++++++++ .../workload/controller_hpa_migration_test.go | 628 ++++++++++++++++ .../autoscaling/workload/controller_test.go | 2 + .../autoscaling/workload/model/const.go | 32 +- .../workload/model/pod_autoscaler.go | 51 +- .../workload/model/pod_autoscaler_test.go | 54 ++ 18 files changed, 1993 insertions(+), 24 deletions(-) create mode 100644 pkg/clusteragent/admission/mutate/autoscaling/hpa_webhook.go create mode 100644 pkg/clusteragent/admission/mutate/autoscaling/hpa_webhook_test.go create mode 100644 pkg/clusteragent/autoscaling/const.go create mode 100644 pkg/clusteragent/autoscaling/workload/controller_hpa_migration.go create mode 100644 pkg/clusteragent/autoscaling/workload/controller_hpa_migration_test.go diff --git a/pkg/clusteragent/admission/controllers/webhook/controller_base.go b/pkg/clusteragent/admission/controllers/webhook/controller_base.go index 06f7434e9b28..bbe4680eb3ba 100644 --- a/pkg/clusteragent/admission/controllers/webhook/controller_base.go +++ b/pkg/clusteragent/admission/controllers/webhook/controller_base.go @@ -150,6 +150,10 @@ func (c *controllerBase) generateWebhooks(datadogConfig config.Component, wmeta autoscalingWebhook := autoscaling.NewWebhook(pp, datadogConfig) webhooks = append(webhooks, autoscalingWebhook) + // Setup HPA migration webhook (protects HPAs neutralised by a DatadogPodAutoscaler). + hpaWebhook := autoscaling.NewHPAWebhook(datadogConfig) + webhooks = append(webhooks, hpaWebhook) + // Setup spot scheduling webhook. spotWebhook := admspot.NewWebhook(datadogConfig, sh) webhooks = append(webhooks, spotWebhook) diff --git a/pkg/clusteragent/admission/mutate/autoscaling/hpa_webhook.go b/pkg/clusteragent/admission/mutate/autoscaling/hpa_webhook.go new file mode 100644 index 000000000000..d02bdbd327bf --- /dev/null +++ b/pkg/clusteragent/admission/mutate/autoscaling/hpa_webhook.go @@ -0,0 +1,167 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +//go:build kubeapiserver + +package autoscaling + +import ( + "encoding/json" + "fmt" + + admiv1 "k8s.io/api/admission/v1" + admissionregistrationv1 "k8s.io/api/admissionregistration/v1" + autoscalingv2 "k8s.io/api/autoscaling/v2" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/DataDog/datadog-agent/cmd/cluster-agent/admission" + "github.com/DataDog/datadog-agent/comp/core/config" + "github.com/DataDog/datadog-agent/pkg/clusteragent/admission/common" + "github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling/workload/model" + "github.com/DataDog/datadog-agent/pkg/util/log" +) + +const ( + hpaWebhookName = "hpa-autoscaling" + hpaWebhookEndpoint = "/hpa-autoscaling" +) + +// HPAWebhook intercepts UPDATE operations on HorizontalPodAutoscaler resources that are +// managed by a DatadogPodAutoscaler. It reverts any spec change to keep the HPA in the +// disabled state (scaleUp/scaleDown selectPolicy: Disabled) and warns the user that the +// DPA is now in control of horizontal scaling. +type HPAWebhook struct { + name string + isEnabled bool + endpoint string + resources map[string][]string + operations []admissionregistrationv1.OperationType +} + +// NewHPAWebhook returns a new HPAWebhook. +func NewHPAWebhook(datadogConfig config.Component) *HPAWebhook { + return &HPAWebhook{ + name: hpaWebhookName, + isEnabled: datadogConfig.GetBool("autoscaling.workload.enabled"), + endpoint: hpaWebhookEndpoint, + resources: map[string][]string{"autoscaling": {"horizontalpodautoscalers"}}, + operations: []admissionregistrationv1.OperationType{ + admissionregistrationv1.Update, + }, + } +} + +// Name returns the name of the webhook. +func (w *HPAWebhook) Name() string { return w.name } + +// WebhookType returns the type of the webhook. +func (w *HPAWebhook) WebhookType() common.WebhookType { return common.MutatingWebhook } + +// IsEnabled returns whether the webhook is enabled. +func (w *HPAWebhook) IsEnabled() bool { return w.isEnabled } + +// Endpoint returns the endpoint path of the webhook. +func (w *HPAWebhook) Endpoint() string { return w.endpoint } + +// Resources returns the Kubernetes resources this webhook applies to. +func (w *HPAWebhook) Resources() map[string][]string { return w.resources } + +// Timeout returns the webhook timeout (0 = server default). +func (w *HPAWebhook) Timeout() int32 { return 0 } + +// Operations returns the operations this webhook is invoked for. +func (w *HPAWebhook) Operations() []admissionregistrationv1.OperationType { return w.operations } + +// LabelSelectors returns nil selectors — the webhook filters via MatchConditions instead. +func (w *HPAWebhook) LabelSelectors(_ bool) (namespaceSelector *metav1.LabelSelector, objectSelector *metav1.LabelSelector) { + return nil, nil +} + +// MatchConditions returns a CEL condition that restricts the webhook to HPA objects that +// carry the DPA-management annotation, avoiding unnecessary invocations. +func (w *HPAWebhook) MatchConditions() []admissionregistrationv1.MatchCondition { + return []admissionregistrationv1.MatchCondition{ + { + Name: "managed-by-dpa", + Expression: fmt.Sprintf( + `has(object.metadata.annotations) && "%s" in object.metadata.annotations`, + model.HPAManagedByDPAAnnotation, + ), + }, + } +} + +// WebhookFunc returns the admission handler. +func (w *HPAWebhook) WebhookFunc() admission.WebhookFunc { + return func(request *admission.Request) *admiv1.AdmissionResponse { + return w.revertHPASpec(request) + } +} + +// revertHPASpec ensures that any update to an HPA managed by a DPA is reverted back +// to the old (disabled) spec. It also surfaces a warning to the user. +func (w *HPAWebhook) revertHPASpec(request *admission.Request) *admiv1.AdmissionResponse { + // Decode incoming (proposed) HPA. + var incomingHPA autoscalingv2.HorizontalPodAutoscaler + if err := json.Unmarshal(request.Object, &incomingHPA); err != nil { + log.Warnf("HPA webhook: failed to decode incoming HPA: %v", err) + return admissionAllowed() + } + + // Only act when the DPA-management annotation is present. + dpaRef := incomingHPA.Annotations[model.HPAManagedByDPAAnnotation] + if dpaRef == "" { + return admissionAllowed() + } + + // Decode the old (current, disabled) HPA to get the spec we want to preserve. + var oldHPA autoscalingv2.HorizontalPodAutoscaler + if err := json.Unmarshal(request.OldObject, &oldHPA); err != nil { + log.Warnf("HPA webhook: failed to decode old HPA for %s/%s: %v", request.Namespace, request.Name, err) + return admissionAllowed() + } + + // Build a JSON merge-patch that replaces the spec with the old (disabled) spec. + oldSpecJSON, err := json.Marshal(oldHPA.Spec) + if err != nil { + log.Warnf("HPA webhook: failed to serialise old HPA spec for %s/%s: %v", request.Namespace, request.Name, err) + return admissionAllowed() + } + patch := fmt.Sprintf(`{"spec":%s}`, string(oldSpecJSON)) + + warning := fmt.Sprintf( + "HPA %s/%s is managed by DatadogPodAutoscaler %s and cannot be modified directly. "+ + "Your change has been reverted. If you no longer need the HPA, you can safely delete it.", + request.Namespace, request.Name, dpaRef, + ) + + return &admiv1.AdmissionResponse{ + Allowed: true, + Warnings: []string{warning}, + PatchType: patchTypePtr(admiv1.PatchTypeJSONPatch), + Patch: buildJSONPatch("replace", "/spec", json.RawMessage(oldSpecJSON)), + Result: &metav1.Status{ + // Include the patch inline as a merge patch too for clarity in logs. + Message: patch, + }, + } +} + +func admissionAllowed() *admiv1.AdmissionResponse { + return &admiv1.AdmissionResponse{Allowed: true} +} + +func patchTypePtr(pt admiv1.PatchType) *admiv1.PatchType { return &pt } + +// buildJSONPatch serialises a single JSON Patch operation as a byte slice. +func buildJSONPatch(op, path string, value json.RawMessage) []byte { + type jsonPatchOp struct { + Op string `json:"op"` + Path string `json:"path"` + Value json.RawMessage `json:"value"` + } + patch, _ := json.Marshal([]jsonPatchOp{{Op: op, Path: path, Value: value}}) + return patch +} diff --git a/pkg/clusteragent/admission/mutate/autoscaling/hpa_webhook_test.go b/pkg/clusteragent/admission/mutate/autoscaling/hpa_webhook_test.go new file mode 100644 index 000000000000..f66d01566908 --- /dev/null +++ b/pkg/clusteragent/admission/mutate/autoscaling/hpa_webhook_test.go @@ -0,0 +1,245 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +//go:build kubeapiserver + +package autoscaling + +import ( + "encoding/json" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + admissionregistrationv1 "k8s.io/api/admissionregistration/v1" + autoscalingv2 "k8s.io/api/autoscaling/v2" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/DataDog/datadog-agent/cmd/cluster-agent/admission" + admcommon "github.com/DataDog/datadog-agent/pkg/clusteragent/admission/common" + mutatecommon "github.com/DataDog/datadog-agent/pkg/clusteragent/admission/mutate/common" + "github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling/workload/model" +) + +func newTestHPA(name, namespace string, maxReplicas int32, annotations map[string]string) autoscalingv2.HorizontalPodAutoscaler { + minReplicas := int32(1) + return autoscalingv2.HorizontalPodAutoscaler{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + Annotations: annotations, + }, + Spec: autoscalingv2.HorizontalPodAutoscalerSpec{ + ScaleTargetRef: autoscalingv2.CrossVersionObjectReference{ + APIVersion: "apps/v1", + Kind: "Deployment", + Name: "my-deploy", + }, + MinReplicas: &minReplicas, + MaxReplicas: maxReplicas, + }, + } +} + +func mustMarshal(t *testing.T, v any) []byte { + t.Helper() + b, err := json.Marshal(v) + require.NoError(t, err) + return b +} + +func TestNewHPAWebhook(t *testing.T) { + cfg := mutatecommon.FakeConfigWithValues(t, map[string]any{ + "autoscaling.workload.enabled": true, + }) + w := NewHPAWebhook(cfg) + + assert.Equal(t, hpaWebhookName, w.Name()) + assert.Equal(t, hpaWebhookEndpoint, w.Endpoint()) + assert.True(t, w.IsEnabled()) + assert.Equal(t, admcommon.WebhookType(admcommon.MutatingWebhook), w.WebhookType()) + assert.Equal(t, []admissionregistrationv1.OperationType{admissionregistrationv1.Update}, w.Operations()) + assert.Equal(t, map[string][]string{"autoscaling": {"horizontalpodautoscalers"}}, w.Resources()) +} + +func TestHPAWebhook_MatchConditions(t *testing.T) { + cfg := mutatecommon.FakeConfigWithValues(t, map[string]any{}) + w := NewHPAWebhook(cfg) + conditions := w.MatchConditions() + require.Len(t, conditions, 1) + assert.Equal(t, "managed-by-dpa", conditions[0].Name) + assert.Contains(t, conditions[0].Expression, model.HPAManagedByDPAAnnotation) +} + +func TestHPAWebhook_revertHPASpec_managed(t *testing.T) { + // Old (disabled) HPA with maxReplicas=1000 (the sentinel value set by the migration). + oldHPA := newTestHPA("my-hpa", "default", 1000, map[string]string{ + model.HPAManagedByDPAAnnotation: "default/my-dpa", + }) + // Incoming HPA where someone tried to change maxReplicas back to 5. + incomingHPA := newTestHPA("my-hpa", "default", 5, map[string]string{ + model.HPAManagedByDPAAnnotation: "default/my-dpa", + }) + + w := &HPAWebhook{} + req := &admission.Request{ + Name: "my-hpa", + Namespace: "default", + Object: mustMarshal(t, incomingHPA), + OldObject: mustMarshal(t, oldHPA), + } + + resp := w.revertHPASpec(req) + + require.NotNil(t, resp) + assert.True(t, resp.Allowed) + require.NotNil(t, resp.Warnings) + assert.Len(t, resp.Warnings, 1) + assert.Equal(t, + "HPA default/my-hpa is managed by DatadogPodAutoscaler default/my-dpa and cannot be modified directly. "+ + "Your change has been reverted. If you no longer need the HPA, you can safely delete it.", + resp.Warnings[0], + ) + + // Verify the patch reverts the spec to oldHPA.Spec. + require.NotNil(t, resp.Patch) + var ops []map[string]json.RawMessage + require.NoError(t, json.Unmarshal(resp.Patch, &ops)) + require.Len(t, ops, 1) + assert.Equal(t, `"replace"`, string(ops[0]["op"])) + assert.Equal(t, `"/spec"`, string(ops[0]["path"])) + + // The patch value must match the old spec (maxReplicas=1000). + var patchedSpec autoscalingv2.HorizontalPodAutoscalerSpec + require.NoError(t, json.Unmarshal(ops[0]["value"], &patchedSpec)) + assert.Equal(t, int32(1000), patchedSpec.MaxReplicas) +} + +func TestHPAWebhook_revertHPASpec_not_managed(t *testing.T) { + // HPA without the DPA annotation → webhook should be a no-op. + hpa := newTestHPA("my-hpa", "default", 5, nil) + + w := &HPAWebhook{} + req := &admission.Request{ + Name: "my-hpa", + Namespace: "default", + Object: mustMarshal(t, hpa), + } + + resp := w.revertHPASpec(req) + + require.NotNil(t, resp) + assert.True(t, resp.Allowed) + assert.Empty(t, resp.Patch) + assert.Empty(t, resp.Warnings) +} + +func TestHPAWebhook_revertHPASpec_invalid_object(t *testing.T) { + w := &HPAWebhook{} + req := &admission.Request{ + Name: "my-hpa", + Namespace: "default", + Object: []byte("not-json"), + } + + resp := w.revertHPASpec(req) + + require.NotNil(t, resp) + assert.True(t, resp.Allowed) + assert.Empty(t, resp.Patch) +} + +func TestHPAWebhook_revertHPASpec_invalid_old_object(t *testing.T) { + incomingHPA := newTestHPA("my-hpa", "default", 5, map[string]string{ + model.HPAManagedByDPAAnnotation: "default/my-dpa", + }) + + w := &HPAWebhook{} + req := &admission.Request{ + Name: "my-hpa", + Namespace: "default", + Object: mustMarshal(t, incomingHPA), + OldObject: []byte("not-json"), + } + + resp := w.revertHPASpec(req) + + require.NotNil(t, resp) + assert.True(t, resp.Allowed) + assert.Empty(t, resp.Patch) +} + +func TestHPAWebhook_revertHPASpec_preserves_metrics(t *testing.T) { + cpuTarget := int32(50) + oldHPA := newTestHPA("my-hpa", "default", 1000, map[string]string{ + model.HPAManagedByDPAAnnotation: "default/my-dpa", + }) + oldHPA.Spec.Metrics = []autoscalingv2.MetricSpec{ + { + Type: autoscalingv2.ResourceMetricSourceType, + Resource: &autoscalingv2.ResourceMetricSource{ + Name: "cpu", + Target: autoscalingv2.MetricTarget{ + Type: autoscalingv2.UtilizationMetricType, + AverageUtilization: &cpuTarget, + }, + }, + }, + } + // Incoming HPA where the metrics were removed. + incomingHPA := newTestHPA("my-hpa", "default", 5, map[string]string{ + model.HPAManagedByDPAAnnotation: "default/my-dpa", + }) + + w := &HPAWebhook{} + req := &admission.Request{ + Name: "my-hpa", + Namespace: "default", + Object: mustMarshal(t, incomingHPA), + OldObject: mustMarshal(t, oldHPA), + } + + resp := w.revertHPASpec(req) + + require.NotNil(t, resp) + assert.True(t, resp.Allowed) + + var ops []map[string]json.RawMessage + require.NoError(t, json.Unmarshal(resp.Patch, &ops)) + require.Len(t, ops, 1) + + var patchedSpec autoscalingv2.HorizontalPodAutoscalerSpec + require.NoError(t, json.Unmarshal(ops[0]["value"], &patchedSpec)) + require.Len(t, patchedSpec.Metrics, 1) + require.NotNil(t, patchedSpec.Metrics[0].Resource) + assert.Equal(t, int32(50), *patchedSpec.Metrics[0].Resource.Target.AverageUtilization) +} + +func TestHPAWebhook_WebhookFunc(t *testing.T) { + oldHPA := newTestHPA("my-hpa", "ns", 1000, map[string]string{ + model.HPAManagedByDPAAnnotation: "ns/my-dpa", + }) + incomingHPA := newTestHPA("my-hpa", "ns", 3, map[string]string{ + model.HPAManagedByDPAAnnotation: "ns/my-dpa", + }) + + cfg := mutatecommon.FakeConfigWithValues(t, map[string]any{ + "autoscaling.workload.enabled": true, + }) + w := NewHPAWebhook(cfg) + + fn := w.WebhookFunc() + resp := fn(&admission.Request{ + Name: "my-hpa", + Namespace: "ns", + Object: mustMarshal(t, incomingHPA), + OldObject: mustMarshal(t, oldHPA), + }) + + require.NotNil(t, resp) + assert.True(t, resp.Allowed) + assert.NotEmpty(t, resp.Patch) + +} diff --git a/pkg/clusteragent/autoscaling/const.go b/pkg/clusteragent/autoscaling/const.go new file mode 100644 index 000000000000..8d232fb74c2a --- /dev/null +++ b/pkg/clusteragent/autoscaling/const.go @@ -0,0 +1,19 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +//go:build kubeapiserver + +package autoscaling + +import "k8s.io/apimachinery/pkg/runtime/schema" + +// DatadogMetricGVR is the GroupVersionResource for datadoghq.com/v1alpha1 DatadogMetrics. +// Shared between the externalmetrics controller and the workload HPA-migration controller +// so both always reference the same resource definition. +var DatadogMetricGVR = schema.GroupVersionResource{ + Group: "datadoghq.com", + Version: "v1alpha1", + Resource: "datadogmetrics", +} diff --git a/pkg/clusteragent/autoscaling/controller_fake.go b/pkg/clusteragent/autoscaling/controller_fake.go index da5cd5d26eeb..5ea628ba13af 100644 --- a/pkg/clusteragent/autoscaling/controller_fake.go +++ b/pkg/clusteragent/autoscaling/controller_fake.go @@ -22,6 +22,7 @@ import ( k8stesting "k8s.io/client-go/testing" datadoghq "github.com/DataDog/datadog-operator/api/datadoghq/v1alpha2" + datadoghqv1alpha1 "github.com/DataDog/datadog-operator/api/datadoghq/v1alpha1" ) var ( @@ -32,6 +33,7 @@ var ( func init() { _ = datadoghq.AddToScheme(scheme) + _ = datadoghqv1alpha1.AddToScheme(scheme) } // CreateControllerFunc is a function that creates a new controller. diff --git a/pkg/clusteragent/autoscaling/controller_testutils.go b/pkg/clusteragent/autoscaling/controller_testutils.go index 04edd4c6cf75..7e39aa654a19 100644 --- a/pkg/clusteragent/autoscaling/controller_testutils.go +++ b/pkg/clusteragent/autoscaling/controller_testutils.go @@ -68,14 +68,17 @@ func CheckAction(t *testing.T, expected, actual core.Action) { // FilterInformerActions filters list and watch actions for testing resources. // Since list and watch don't change resource state we can filter it to lower -// nose level in our tests. -func FilterInformerActions(actions []core.Action, resourceName string) []core.Action { +// noise level in our tests. All cluster-scoped list/watch actions are filtered +// because they originate exclusively from informer cache sync and are never +// meaningful assertions in unit tests. +func FilterInformerActions(actions []core.Action, _ string) []core.Action { ret := []core.Action{} for _, action := range actions { - if len(action.GetNamespace()) == 0 && - (action.Matches("list", resourceName) || - action.Matches("watch", resourceName)) { - continue + if len(action.GetNamespace()) == 0 { + verb := action.GetVerb() + if verb == "list" || verb == "watch" { + continue + } } ret = append(ret, action) } diff --git a/pkg/clusteragent/autoscaling/errors.go b/pkg/clusteragent/autoscaling/errors.go index 71ebe9d9ad59..b82f3077479a 100644 --- a/pkg/clusteragent/autoscaling/errors.go +++ b/pkg/clusteragent/autoscaling/errors.go @@ -50,6 +50,12 @@ const ( ConditionReasonLimitedByScalingBehavior ConditionReasonType = "LimitedByScalingBehavior" // ConditionReasonFailedToEvict indicates a failure when evicting a pod. ConditionReasonFailedToEvict ConditionReasonType = "FailedToEvict" + // ConditionReasonAmbiguousHPA indicates multiple HPAs target the same Deployment, making migration ambiguous. + ConditionReasonAmbiguousHPA ConditionReasonType = "AmbiguousHPA" + // ConditionReasonUnsupportedHPAMetric indicates the HPA uses a metric type not supported for DPA migration. + ConditionReasonUnsupportedHPAMetric ConditionReasonType = "UnsupportedHPAMetric" + // ConditionReasonDatadogMetricNotFound indicates a DatadogMetric CRD referenced by an HPA external metric was not found. + ConditionReasonDatadogMetricNotFound ConditionReasonType = "DatadogMetricNotFound" ) // ConditionReason is an interface that errors can implement to provide diff --git a/pkg/clusteragent/autoscaling/externalmetrics/datadogmetric_controller.go b/pkg/clusteragent/autoscaling/externalmetrics/datadogmetric_controller.go index d3023c5833fd..a469fbcfb1d4 100644 --- a/pkg/clusteragent/autoscaling/externalmetrics/datadogmetric_controller.go +++ b/pkg/clusteragent/autoscaling/externalmetrics/datadogmetric_controller.go @@ -45,7 +45,7 @@ const ( ) var ( - gvrDDM = datadoghq.GroupVersion.WithResource("datadogmetrics") + gvrDDM = autoscaling.DatadogMetricGVR metaDDM = metav1.TypeMeta{ Kind: "DatadogMetric", APIVersion: "datadoghq.com/v1alpha1", diff --git a/pkg/clusteragent/autoscaling/externalmetrics/model/datadogmetricinternal.go b/pkg/clusteragent/autoscaling/externalmetrics/model/datadogmetricinternal.go index 005e0eed1664..b787633746a9 100644 --- a/pkg/clusteragent/autoscaling/externalmetrics/model/datadogmetricinternal.go +++ b/pkg/clusteragent/autoscaling/externalmetrics/model/datadogmetricinternal.go @@ -310,7 +310,7 @@ func (d *DatadogMetricInternal) newCondition(status bool, updateTime metav1.Time // resolveQuery tries to resolve the query and set the DatadogMetricInternal fields accordingly func (d *DatadogMetricInternal) resolveQuery(query string) { - resolvedQuery, err := resolveQuery(query) + resolvedQuery, err := ResolveMetricQuery(query) if err != nil { log.Errorf("Unable to resolve DatadogMetric query %q: %v", d.query, err) d.Valid = false @@ -319,12 +319,8 @@ func (d *DatadogMetricInternal) resolveQuery(query string) { d.resolvedQuery = nil return } - if resolvedQuery != "" { - log.Debugf("DatadogMetric query %q was resolved successfully, new query: %q", query, resolvedQuery) - d.resolvedQuery = &resolvedQuery - return - } - d.resolvedQuery = &d.query + log.Debugf("DatadogMetric query %q was resolved successfully, new query: %q", query, resolvedQuery) + d.resolvedQuery = &resolvedQuery } // SetQueries is only used for testing in other packages diff --git a/pkg/clusteragent/autoscaling/externalmetrics/model/utils.go b/pkg/clusteragent/autoscaling/externalmetrics/model/utils.go index afe7eefbeb16..35bb6a396a25 100644 --- a/pkg/clusteragent/autoscaling/externalmetrics/model/utils.go +++ b/pkg/clusteragent/autoscaling/externalmetrics/model/utils.go @@ -44,13 +44,14 @@ var templatedTags = map[string]tagGetter{ }, } -// resolveQuery replaces the template variables in the query -// The supported template variable types are %%tag_%% and %%env_%% -// The only supported in %%tag_%% is kube_cluster_name -func resolveQuery(q string) (string, error) { +// ResolveMetricQuery replaces %%tag_%% and %%env_%% template +// variables in a Datadog metrics query string. Returns q unchanged when it contains +// no template variables. +// The only supported is kube_cluster_name. +func ResolveMetricQuery(q string) (string, error) { vars := tmplvar.ParseString(q) if len(vars) == 0 { - return "", nil + return q, nil } result := q diff --git a/pkg/clusteragent/autoscaling/externalmetrics/model/utils_test.go b/pkg/clusteragent/autoscaling/externalmetrics/model/utils_test.go index e6ddd985997a..8c1f1bb46744 100644 --- a/pkg/clusteragent/autoscaling/externalmetrics/model/utils_test.go +++ b/pkg/clusteragent/autoscaling/externalmetrics/model/utils_test.go @@ -31,7 +31,7 @@ func Test_resolveQuery(t *testing.T) { q: "avg:nginx.net.request_per_s{kube_container_name:nginx}.rollup(60)", templatedTags: templatedTagsStub, loadFunc: func(*testing.T) {}, - want: "", + want: "avg:nginx.net.request_per_s{kube_container_name:nginx}.rollup(60)", wantErr: false, }, { @@ -132,7 +132,7 @@ func Test_resolveQuery(t *testing.T) { t.Run(tt.name, func(t *testing.T) { templatedTags = tt.templatedTags tt.loadFunc(t) - got, err := resolveQuery(tt.q) + got, err := ResolveMetricQuery(tt.q) assert.Equal(t, tt.wantErr, err != nil) assert.Equal(t, tt.want, got) }) diff --git a/pkg/clusteragent/autoscaling/workload/controller.go b/pkg/clusteragent/autoscaling/workload/controller.go index 7a7eb83c541f..5f52b7109b3f 100644 --- a/pkg/clusteragent/autoscaling/workload/controller.go +++ b/pkg/clusteragent/autoscaling/workload/controller.go @@ -11,6 +11,7 @@ import ( "context" "errors" "fmt" + "slices" "time" datadoghqcommon "github.com/DataDog/datadog-operator/api/datadoghq/common" @@ -24,6 +25,7 @@ import ( "k8s.io/client-go/dynamic/dynamicinformer" k8sclient "k8s.io/client-go/kubernetes" scaleclient "k8s.io/client-go/scale" + "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" "k8s.io/utils/clock" @@ -84,6 +86,19 @@ type Controller struct { isFallbackEnabled bool + kubeClient k8sclient.Interface + + // hpaIndexer is backed by the HPA informer and indexed by "/". + // Used by initiateHPAMigration to find existing HPAs without live API calls. + hpaIndexer cache.Indexer + hpaInformerSynced cache.InformerSynced + + // datadogMetricIndexer is backed by the shared DatadogMetric informer (same factory used by + // the externalmetrics controller). Used by extractHPAConfig to resolve DatadogMetric CRDs + // referenced by HPA external metrics without live API calls. + datadogMetricIndexer cache.Indexer + datadogMetricInformerSynced cache.InformerSynced + metricsStore *metricsstore.MetricsStore[*model.PodAutoscalerInternal] } @@ -110,6 +125,7 @@ func NewController( eventRecorder: eventRecorder, localSender: localSender, isFallbackEnabled: false, // keep fallback disabled by default + kubeClient: client, } autoscalingWorkqueue := workqueue.NewTypedRateLimitingQueueWithConfig( @@ -131,6 +147,45 @@ func NewController( c.podWatcher = podWatcher c.scaler = newScaler(restMapper, scaleClient) + // Set up the HPA informer with a reverse index by scaleTargetRef name so that + // findHPAForTarget can do O(1) cache lookups instead of live API calls. + hpaInformer := dynamicInformer.ForResource(hpaGVR) + if err := hpaInformer.Informer().AddIndexers(cache.Indexers{ + hpaTargetIndexName: hpaByTargetRefIndex, + }); err != nil { + return nil, fmt.Errorf("failed to add HPA target indexer: %w", err) + } + c.hpaIndexer = hpaInformer.Informer().GetIndexer() + hpaHandler, err := hpaInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{}) + if err != nil { + return nil, fmt.Errorf("failed to add HPA informer event handler: %w", err) + } + c.hpaInformerSynced = hpaHandler.HasSynced + + // Set up the DatadogMetric informer only when the CRD is installed in the cluster. + // Registering an informer for a non-existent resource causes the reflector to spam + // error logs on every retry. The REST mapper is used as a cheap CRD presence check. + // When the externalmetrics controller is running it has already registered this informer + // with the same factory, so ForResource is idempotent in that case. + if restMapper != nil { + _, mappingErr := restMapper.RESTMapping(schema.GroupKind{Group: autoscaling.DatadogMetricGVR.Group, Kind: "DatadogMetric"}, autoscaling.DatadogMetricGVR.Version) + if mappingErr == nil { + ddmInformer := dynamicInformer.ForResource(autoscaling.DatadogMetricGVR) + c.datadogMetricIndexer = ddmInformer.Informer().GetIndexer() + ddmHandler, err := ddmInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{}) + if err != nil { + return nil, fmt.Errorf("failed to add DatadogMetric informer event handler: %w", err) + } + c.datadogMetricInformerSynced = ddmHandler.HasSynced + } else { + log.Infof("DatadogMetric CRD not found in cluster (%v); DatadogMetric-backed HPA migration will be unavailable", mappingErr) + } + } + if c.datadogMetricIndexer == nil { + c.datadogMetricIndexer = cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{}) + c.datadogMetricInformerSynced = func() bool { return true } + } + // Initialize metrics store c.metricsStore = metricsstore.NewMetricsStore(metrics.GeneratePodAutoscalerMetrics, localSender, c.IsLeader, globalTagsFunc) c.store.RegisterObserver( @@ -220,6 +275,14 @@ func (c *Controller) processPodAutoscaler(ctx context.Context, key, ns, name str func (c *Controller) syncPodAutoscaler(ctx context.Context, key, ns, name string, podAutoscaler *datadoghq.DatadogPodAutoscaler) (autoscaling.ProcessResult, error) { podAutoscalerInternal, podAutoscalerInternalFound, storeUnlock := c.store.LockRead(key, true) + // Handle HPA migration finalizer cleanup before any ownership logic. + // When a DPA with the HPAMigrationFinalizer is being deleted, we restore the HPA first. + if podAutoscaler != nil && !podAutoscaler.DeletionTimestamp.IsZero() { + if slices.Contains(podAutoscaler.Finalizers, model.HPAMigrationFinalizer) { + return c.handleHPAMigrationCleanup(ctx, key, ns, name, podAutoscaler, storeUnlock) + } + } + // Object is missing from our store if !podAutoscalerInternalFound { if podAutoscaler != nil { @@ -371,6 +434,13 @@ func (c *Controller) syncPodAutoscaler(ctx context.Context, key, ns, name string // (set by the syncer) with the hash we last applied to Kubernetes. desiredHash := podAutoscalerInternal.DesiredProfileTemplateHash() if desiredHash != "" && desiredHash != podAutoscalerInternal.AppliedProfileHash() { + // If HPA config was already imported into this DPA, restore objectives and + // constraints from Kubernetes before syncing the profile spec. The profile + // template intentionally omits both fields; they are imported one-shot from + // the HPA and must survive profile template updates. + if podAutoscaler.Annotations[model.HPAConfigImportedAnnotation] != "" { + podAutoscalerInternal.RestoreHPAImportedSpec(&podAutoscaler.Spec) + } updatedGeneration, err := c.updatePodAutoscalerSpec(ctx, podAutoscalerInternal, podAutoscaler) if err != nil { storeUnlock() @@ -436,6 +506,26 @@ func (c *Controller) syncPodAutoscaler(ctx context.Context, key, ns, name string return handleNonRetryableError(notFoundErr) } + // Initiate HPA migration when the preview flag is set and the migration has not yet run + // (indicated by the absence of HPAMigrationFinalizer on the DPA). + if podAutoscaler != nil && podAutoscalerInternal.IsHPAMigrationEnabled() && + !slices.Contains(podAutoscaler.Finalizers, model.HPAMigrationFinalizer) { + requeue, migrationErr := c.initiateHPAMigration(ctx, ns, name, podAutoscaler) + if migrationErr != nil { + return handleNonRetryableError(migrationErr) + } + if requeue { + // A Kubernetes object was patched; stop here and wait for the informer to re-queue. + c.store.UnlockSet(key, podAutoscalerInternal, c.ID) + return autoscaling.NoRequeue, nil + } + } + + // Reflect whether the HPA migration is currently active (finalizer present) in the status. + if podAutoscaler != nil { + podAutoscalerInternal.SetHPAMigrationActive(slices.Contains(podAutoscaler.Finalizers, model.HPAMigrationFinalizer)) + } + // Now that everything is synced, we can perform the actual processing result, scalingErr := c.handleScaling(ctx, podAutoscaler, &podAutoscalerInternal, targetGVK, target, scale, gr, getScaleErr) diff --git a/pkg/clusteragent/autoscaling/workload/controller_hpa_migration.go b/pkg/clusteragent/autoscaling/workload/controller_hpa_migration.go new file mode 100644 index 000000000000..eeb72a3d34e4 --- /dev/null +++ b/pkg/clusteragent/autoscaling/workload/controller_hpa_migration.go @@ -0,0 +1,675 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +//go:build kubeapiserver + +package workload + +import ( + "context" + "encoding/json" + "fmt" + "slices" + "strings" + "time" + + datadoghqcommon "github.com/DataDog/datadog-operator/api/datadoghq/common" + datadoghqv1alpha1 "github.com/DataDog/datadog-operator/api/datadoghq/v1alpha1" + datadoghq "github.com/DataDog/datadog-operator/api/datadoghq/v1alpha2" + autoscalingv2 "k8s.io/api/autoscaling/v2" + corev1 "k8s.io/api/core/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + k8sclient "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + + "github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling" + externalmetricsmodel "github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling/externalmetrics/model" + "github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling/workload/model" + "github.com/DataDog/datadog-agent/pkg/util/log" + "github.com/DataDog/datadog-agent/pkg/util/pointer" +) + +const ( + // hpaTargetIndexName is the name of the informer index that maps + // "/" → HPA objects. + hpaTargetIndexName = "hpa-scale-target-ref" +) + +var ( + // hpaGVR is the GroupVersionResource for autoscaling/v2 HorizontalPodAutoscalers. + hpaGVR = schema.GroupVersionResource{ + Group: "autoscaling", + Version: "v2", + Resource: "horizontalpodautoscalers", + } + + // cpuUsageQueryKeywords lists the Datadog metric names that represent CPU usage. + // When a DatadogMetric query contains any of these, the DPA is configured with + // a PodResource CPU objective (AbsoluteValue) instead of a generic CustomQuery. + cpuUsageQueryKeywords = []string{"container.cpu.usage", "kubernetes.cpu.usage"} + + defaultCustomQueryWindow = 5 * time.Minute +) + +// hpaByTargetRefIndex is the cache.IndexFunc used by the HPA informer to build a reverse +// map from "/" to the matching HPA objects. +func hpaByTargetRefIndex(obj interface{}) ([]string, error) { + unstrHPA, ok := obj.(*unstructured.Unstructured) + if !ok { + return nil, fmt.Errorf("hpaByTargetRefIndex: expected *unstructured.Unstructured, got %T", obj) + } + + targetName, found, err := unstructured.NestedString(unstrHPA.Object, "spec", "scaleTargetRef", "name") + if err != nil || !found || targetName == "" { + return nil, nil + } + + return []string{unstrHPA.GetNamespace() + "/" + targetName}, nil +} + +// HPAConfig holds the relevant configuration extracted from an HPA for +// auto-populating a DatadogPodAutoscaler created from a ClusterProfile (UC2). +type HPAConfig struct { + MinReplicas *int32 + MaxReplicas int32 + // PodCPUUtilization is the pod-level CPU utilization target (0-100), if set. + PodCPUUtilization *int32 + // ContainerCPUTargets holds per-container CPU utilization targets, if any. + ContainerCPUTargets []ContainerCPUTarget + // ExternalMetrics holds configs resolved from HPA external metrics that reference + // a DatadogMetric CRD (datadogmetric@: format). + ExternalMetrics []ExternalMetricConfig +} + +// ContainerCPUTarget is a per-container CPU utilization objective extracted from an HPA. +type ContainerCPUTarget struct { + ContainerName string + CPUUtilization int32 +} + +// ExternalMetricConfig holds a resolved DatadogMetric configuration for one HPA external metric. +type ExternalMetricConfig struct { + // Query is DatadogMetric.Spec.Query — the raw Datadog metrics query string. + Query string + // TargetValue is the per-pod average target extracted from the HPA metric target. + // Nil when no explicit target is configured on the HPA. + TargetValue *resource.Quantity + // Window is the query evaluation window, derived from DatadogMetric.Spec.MaxAge/TimeWindow. + Window time.Duration + // IsCPUUsage is true when the query contains container.cpu.usage or kubernetes.cpu.usage. + // In that case the DPA objective uses PodResource CPU (AbsoluteValue) rather than CustomQuery. + IsCPUUsage bool +} + +// findHPAForTarget looks up the single HPA in the informer cache whose scaleTargetRef name +// matches targetName. Returns nil, nil when no HPA is found. Returns an error when multiple +// HPAs target the same resource (ambiguous — UC5). +func findHPAForTarget(indexer cache.Indexer, namespace, targetName string) (*autoscalingv2.HorizontalPodAutoscaler, error) { + objs, err := indexer.ByIndex(hpaTargetIndexName, namespace+"/"+targetName) + if err != nil { + return nil, fmt.Errorf("failed to look up HPA by target in cache: %w", err) + } + + var matches []*autoscalingv2.HorizontalPodAutoscaler + for _, obj := range objs { + rtObj, ok := obj.(runtime.Object) + if !ok { + continue + } + hpa := &autoscalingv2.HorizontalPodAutoscaler{} + if err := autoscaling.FromUnstructured(rtObj, hpa); err != nil { + return nil, fmt.Errorf("failed to convert cached HPA object: %w", err) + } + matches = append(matches, hpa) + } + + switch len(matches) { + case 0: + return nil, nil + case 1: + return matches[0], nil + default: + names := make([]string, 0, len(matches)) + for _, h := range matches { + names = append(names, h.Name) + } + return nil, autoscaling.NewConditionErrorf(autoscaling.ConditionReasonAmbiguousHPA, + "found %d HPAs targeting %q: %v — only one HPA per target is supported for migration", + len(matches), targetName, names) + } +} + +// hpaHasDatadogMetricRefs returns true when at least one of the HPA's metrics is an External +// metric referencing a DatadogMetric CRD (datadogmetric@: format). +// Used to gate the DatadogMetric informer sync check: if the HPA has no such refs we skip the +// check entirely, so migration works even when the DatadogMetric CRD is not installed. +func hpaHasDatadogMetricRefs(hpa *autoscalingv2.HorizontalPodAutoscaler) bool { + for _, m := range hpa.Spec.Metrics { + if m.Type == autoscalingv2.ExternalMetricSourceType && + m.External != nil && + strings.HasPrefix(strings.ToLower(m.External.Metric.Name), "datadogmetric@") { + return true + } + } + return false +} + +// isCPUUsageQuery returns true when the Datadog metrics query references CPU usage metrics +// (container.cpu.usage or kubernetes.cpu.usage). +func isCPUUsageQuery(query string) bool { + lower := strings.ToLower(query) + for _, kw := range cpuUsageQueryKeywords { + if strings.Contains(lower, kw) { + return true + } + } + return false +} + +// resolveDatadogMetricFromCache looks up a DatadogMetric by namespace/name in the informer cache +// and returns its spec. The indexer must be backed by the shared DynamicSharedInformerFactory +// for the datadogmetrics GVR. +// Returns ConditionReasonDatadogMetricNotFound when the object is absent from the cache. +func resolveDatadogMetricFromCache(indexer cache.Indexer, namespace, name string) (*datadoghqv1alpha1.DatadogMetricSpec, error) { + key := namespace + "/" + name + obj, exists, err := indexer.GetByKey(key) + if err != nil { + return nil, fmt.Errorf("failed to look up DatadogMetric %s in cache: %w", key, err) + } + if !exists { + return nil, autoscaling.NewConditionErrorf(autoscaling.ConditionReasonDatadogMetricNotFound, + "DatadogMetric %s/%s not found in informer cache (not yet synced or does not exist)", namespace, name) + } + + rtObj, ok := obj.(runtime.Object) + if !ok { + return nil, fmt.Errorf("DatadogMetric %s: unexpected object type %T in cache", key, obj) + } + var metric datadoghqv1alpha1.DatadogMetric + if err := autoscaling.FromUnstructured(rtObj, &metric); err != nil { + return nil, fmt.Errorf("failed to convert DatadogMetric %s from cache: %w", key, err) + } + return &metric.Spec, nil +} + +// validateHPAMetrics checks that every HPA metric is supported for DPA migration: +// - Resource CPU with Utilization target (UC1/UC2) +// - ContainerResource CPU with Utilization target (UC1/UC2) +// - External metric referencing a DatadogMetric CRD via "datadogmetric@:" (UC8) +// +// Any other metric type or configuration is rejected (UC6). +func validateHPAMetrics(hpa *autoscalingv2.HorizontalPodAutoscaler) error { + for _, m := range hpa.Spec.Metrics { + switch m.Type { + case autoscalingv2.ResourceMetricSourceType: + if m.Resource == nil || m.Resource.Name != corev1.ResourceCPU { + return autoscaling.NewConditionErrorf(autoscaling.ConditionReasonUnsupportedHPAMetric, + "HPA %q uses a non-CPU resource metric — only CPU utilization is supported for migration", + hpa.Name) + } + if m.Resource.Target.Type != autoscalingv2.UtilizationMetricType { + return autoscaling.NewConditionErrorf(autoscaling.ConditionReasonUnsupportedHPAMetric, + "HPA %q uses CPU metric with target type %q — only Utilization is supported for migration", + hpa.Name, m.Resource.Target.Type) + } + case autoscalingv2.ContainerResourceMetricSourceType: + if m.ContainerResource == nil || m.ContainerResource.Name != corev1.ResourceCPU { + return autoscaling.NewConditionErrorf(autoscaling.ConditionReasonUnsupportedHPAMetric, + "HPA %q uses a non-CPU container resource metric — only CPU utilization is supported for migration", + hpa.Name) + } + if m.ContainerResource.Target.Type != autoscalingv2.UtilizationMetricType { + return autoscaling.NewConditionErrorf(autoscaling.ConditionReasonUnsupportedHPAMetric, + "HPA %q uses CPU container metric with target type %q — only Utilization is supported for migration", + hpa.Name, m.ContainerResource.Target.Type) + } + case autoscalingv2.ExternalMetricSourceType: + if m.External == nil { + return autoscaling.NewConditionErrorf(autoscaling.ConditionReasonUnsupportedHPAMetric, + "HPA %q has an external metric source with no metric definition", hpa.Name) + } + if !strings.HasPrefix(strings.ToLower(m.External.Metric.Name), "datadogmetric@") { + return autoscaling.NewConditionErrorf(autoscaling.ConditionReasonUnsupportedHPAMetric, + "HPA %q uses external metric %q — only DatadogMetric references (datadogmetric@:) are supported for migration", + hpa.Name, m.External.Metric.Name) + } + default: + return autoscaling.NewConditionErrorf(autoscaling.ConditionReasonUnsupportedHPAMetric, + "HPA %q uses metric type %q — only CPU resource utilization and DatadogMetric external metrics are supported for migration", + hpa.Name, m.Type) + } + } + return nil +} + +// disableHPA stores the original HPA spec as a JSON annotation and neutralises the HPA +// by setting both scaleUp and scaleDown selectPolicy to Disabled. +// The operation is idempotent: if HPAManagedByDPAAnnotation is already set the call is a no-op. +func disableHPA(ctx context.Context, client k8sclient.Interface, hpa *autoscalingv2.HorizontalPodAutoscaler, dpaNamespace, dpaName string) error { + if hpa.Annotations[model.HPAManagedByDPAAnnotation] != "" { + return nil + } + + originalSpec, err := json.Marshal(hpa.Spec) + if err != nil { + return fmt.Errorf("failed to serialise original HPA spec for %s/%s: %w", hpa.Namespace, hpa.Name, err) + } + + disabled := autoscalingv2.DisabledPolicySelect + patch := map[string]interface{}{ + "metadata": map[string]interface{}{ + "annotations": map[string]string{ + model.HPAOriginalSpecAnnotation: string(originalSpec), + model.HPAManagedByDPAAnnotation: dpaNamespace + "/" + dpaName, + }, + }, + "spec": map[string]interface{}{ + "behavior": map[string]interface{}{ + "scaleUp": map[string]interface{}{ + "selectPolicy": string(disabled), + }, + "scaleDown": map[string]interface{}{ + "selectPolicy": string(disabled), + }, + }, + }, + } + + patchBytes, err := json.Marshal(patch) + if err != nil { + return fmt.Errorf("failed to build HPA disable patch for %s/%s: %w", hpa.Namespace, hpa.Name, err) + } + + _, err = client.AutoscalingV2().HorizontalPodAutoscalers(hpa.Namespace).Patch( + ctx, hpa.Name, types.MergePatchType, patchBytes, metav1.PatchOptions{}, + ) + if err != nil { + return fmt.Errorf("failed to disable HPA %s/%s: %w", hpa.Namespace, hpa.Name, err) + } + + log.Infof("HPA migration: disabled HPA %s/%s (managed by DPA %s/%s)", hpa.Namespace, hpa.Name, dpaNamespace, dpaName) + return nil +} + +// restoreHPA reads the original spec from the HPAOriginalSpecAnnotation on cachedHPA and +// restores the live HPA to that spec, removing all migration-related annotations. +// It performs a live Get before Update to use the most recent ResourceVersion and avoid +// conflicts. If the annotation is absent or the HPA no longer exists the call is a no-op. +func restoreHPA(ctx context.Context, client k8sclient.Interface, cachedHPA *autoscalingv2.HorizontalPodAutoscaler) error { + raw := cachedHPA.Annotations[model.HPAOriginalSpecAnnotation] + if raw == "" { + log.Infof("HPA migration: no original spec annotation on HPA %s/%s, nothing to restore", cachedHPA.Namespace, cachedHPA.Name) + return nil + } + + // Live Get to obtain the current ResourceVersion and avoid update conflicts. + current, err := client.AutoscalingV2().HorizontalPodAutoscalers(cachedHPA.Namespace).Get(ctx, cachedHPA.Name, metav1.GetOptions{}) + if k8serrors.IsNotFound(err) { + log.Infof("HPA migration: HPA %s/%s no longer exists, skipping restore", cachedHPA.Namespace, cachedHPA.Name) + return nil + } + if err != nil { + return fmt.Errorf("failed to get HPA %s/%s before restore: %w", cachedHPA.Namespace, cachedHPA.Name, err) + } + + var originalSpec autoscalingv2.HorizontalPodAutoscalerSpec + if err := json.Unmarshal([]byte(raw), &originalSpec); err != nil { + return fmt.Errorf("failed to decode original HPA spec from annotation on %s/%s: %w", cachedHPA.Namespace, cachedHPA.Name, err) + } + + updated := current.DeepCopy() + updated.Spec = originalSpec + delete(updated.Annotations, model.HPAOriginalSpecAnnotation) + delete(updated.Annotations, model.HPAManagedByDPAAnnotation) + + _, err = client.AutoscalingV2().HorizontalPodAutoscalers(cachedHPA.Namespace).Update(ctx, updated, metav1.UpdateOptions{}) + if err != nil { + return fmt.Errorf("failed to restore HPA %s/%s: %w", cachedHPA.Namespace, cachedHPA.Name, err) + } + + log.Infof("HPA migration: restored HPA %s/%s to original spec", cachedHPA.Namespace, cachedHPA.Name) + return nil +} + +// extractHPAConfig reads the HPA configuration and returns an HPAConfig suitable for +// populating a DPA spec (UC2 auto-population). External DatadogMetric references are +// resolved from the informer cache via datadogMetricIndexer (UC8). +func extractHPAConfig(datadogMetricIndexer cache.Indexer, hpa *autoscalingv2.HorizontalPodAutoscaler) (HPAConfig, error) { + cfg := HPAConfig{ + MinReplicas: hpa.Spec.MinReplicas, + MaxReplicas: hpa.Spec.MaxReplicas, + } + + for _, m := range hpa.Spec.Metrics { + switch m.Type { + case autoscalingv2.ResourceMetricSourceType: + if m.Resource != nil && m.Resource.Name == corev1.ResourceCPU && m.Resource.Target.AverageUtilization != nil { + cfg.PodCPUUtilization = pointer.Ptr(*m.Resource.Target.AverageUtilization) + } + case autoscalingv2.ContainerResourceMetricSourceType: + if m.ContainerResource != nil && m.ContainerResource.Name == corev1.ResourceCPU && m.ContainerResource.Target.AverageUtilization != nil { + cfg.ContainerCPUTargets = append(cfg.ContainerCPUTargets, ContainerCPUTarget{ + ContainerName: m.ContainerResource.Container, + CPUUtilization: *m.ContainerResource.Target.AverageUtilization, + }) + } + case autoscalingv2.ExternalMetricSourceType: + if m.External == nil { + continue + } + extCfg, err := resolveExternalMetricConfig(datadogMetricIndexer, hpa.Namespace, m) + if err != nil { + return cfg, err + } + cfg.ExternalMetrics = append(cfg.ExternalMetrics, extCfg) + } + } + + return cfg, nil +} + +// resolveExternalMetricConfig resolves one HPA external metric spec into an ExternalMetricConfig +// by looking up the referenced DatadogMetric in the informer cache. +func resolveExternalMetricConfig(datadogMetricIndexer cache.Indexer, hpaNamespace string, m autoscalingv2.MetricSpec) (ExternalMetricConfig, error) { + // Parse "datadogmetric@:". The metric name is already validated by + // validateHPAMetrics to have the correct prefix and format. + ref := strings.TrimPrefix(strings.ToLower(m.External.Metric.Name), "datadogmetric@") + parts := strings.SplitN(ref, ":", 2) + ddNS, ddName := parts[0], parts[1] + if ddNS == "" { + ddNS = hpaNamespace + } + + spec, err := resolveDatadogMetricFromCache(datadogMetricIndexer, ddNS, ddName) + if err != nil { + return ExternalMetricConfig{}, err + } + + // Resolve %%tag_kube_cluster_name%% and %%env_VAR%% placeholders in the query + // before storing it in the DPA spec. The Datadog backend cannot resolve these + // cluster-agent-side template variables. + resolvedQuery, err := externalmetricsmodel.ResolveMetricQuery(spec.Query) + if err != nil { + return ExternalMetricConfig{}, autoscaling.NewConditionErrorf(autoscaling.ConditionReasonDatadogMetricNotFound, + "DatadogMetric %s/%s: failed to resolve query template variables: %v", ddNS, ddName, err) + } + + // Extract the per-pod average target value; fall back to the absolute total value. + var targetValue *resource.Quantity + if m.External.Target.AverageValue != nil { + v := m.External.Target.AverageValue.DeepCopy() + targetValue = &v + } else if m.External.Target.Value != nil { + v := m.External.Target.Value.DeepCopy() + targetValue = &v + } + if targetValue == nil { + return ExternalMetricConfig{}, autoscaling.NewConditionErrorf(autoscaling.ConditionReasonUnsupportedHPAMetric, + "HPA external metric %q has no target value (neither targetAverageValue nor targetValue is set)", + m.External.Metric.Name) + } + + window := defaultCustomQueryWindow + if spec.MaxAge.Duration > 0 { + window = spec.MaxAge.Duration + } else if spec.TimeWindow.Duration > 0 { + window = spec.TimeWindow.Duration + } + + return ExternalMetricConfig{ + Query: resolvedQuery, + TargetValue: targetValue, + Window: window, + IsCPUUsage: isCPUUsageQuery(resolvedQuery), + }, nil +} + +// applyHPAConfigToDPASpec merges the HPA-derived configuration into the DPA spec. +// Only fields not already set on the spec are overwritten. +func applyHPAConfigToDPASpec(spec *datadoghq.DatadogPodAutoscalerSpec, cfg HPAConfig) { + if spec.Constraints == nil { + spec.Constraints = &datadoghqcommon.DatadogPodAutoscalerConstraints{} + } + if spec.Constraints.MinReplicas == nil && cfg.MinReplicas != nil { + spec.Constraints.MinReplicas = pointer.Ptr(*cfg.MinReplicas) + } + if spec.Constraints.MaxReplicas == nil && cfg.MaxReplicas > 0 { + spec.Constraints.MaxReplicas = pointer.Ptr(cfg.MaxReplicas) + } + + if len(spec.Objectives) > 0 { + return + } + + var objectives []datadoghqcommon.DatadogPodAutoscalerObjective + + // CPU utilization from native Resource/ContainerResource HPA metrics. + // Pod-level CPU takes precedence over per-container targets when both are present. + if cfg.PodCPUUtilization != nil { + objectives = append(objectives, datadoghqcommon.DatadogPodAutoscalerObjective{ + Type: datadoghqcommon.DatadogPodAutoscalerPodResourceObjectiveType, + PodResource: &datadoghqcommon.DatadogPodAutoscalerPodResourceObjective{ + Name: corev1.ResourceCPU, + Value: datadoghqcommon.DatadogPodAutoscalerObjectiveValue{ + Type: datadoghqcommon.DatadogPodAutoscalerUtilizationObjectiveValueType, + Utilization: pointer.Ptr(*cfg.PodCPUUtilization), + }, + }, + }) + } else { + for _, ct := range cfg.ContainerCPUTargets { + ct := ct + objectives = append(objectives, datadoghqcommon.DatadogPodAutoscalerObjective{ + Type: datadoghqcommon.DatadogPodAutoscalerContainerResourceObjectiveType, + ContainerResource: &datadoghqcommon.DatadogPodAutoscalerContainerResourceObjective{ + Name: corev1.ResourceCPU, + Container: ct.ContainerName, + Value: datadoghqcommon.DatadogPodAutoscalerObjectiveValue{ + Type: datadoghqcommon.DatadogPodAutoscalerUtilizationObjectiveValueType, + Utilization: pointer.Ptr(ct.CPUUtilization), + }, + }, + }) + } + } + + // Objectives from DatadogMetric external metrics (UC8). + // CPU usage queries → PodResource CPU AbsoluteValue (DPA uses container.cpu.usage / kubernetes.cpu.usage + // internally for its CPU recommendations). + // Other queries → CustomQuery with the raw Datadog metrics query. + for _, em := range cfg.ExternalMetrics { + em := em + if em.IsCPUUsage { + objectives = append(objectives, datadoghqcommon.DatadogPodAutoscalerObjective{ + Type: datadoghqcommon.DatadogPodAutoscalerPodResourceObjectiveType, + PodResource: &datadoghqcommon.DatadogPodAutoscalerPodResourceObjective{ + Name: corev1.ResourceCPU, + Value: datadoghqcommon.DatadogPodAutoscalerObjectiveValue{ + Type: datadoghqcommon.DatadogPodAutoscalerAbsoluteValueObjectiveValueType, + AbsoluteValue: em.TargetValue, + }, + }, + }) + } else { + objectives = append(objectives, datadoghqcommon.DatadogPodAutoscalerObjective{ + Type: datadoghqcommon.DatadogPodAutoscalerCustomQueryObjectiveType, + CustomQuery: &datadoghqcommon.DatadogPodAutoscalerCustomQueryObjective{ + Request: datadoghqcommon.DatadogPodAutoscalerTimeseriesFormulaRequest{ + Queries: []datadoghqcommon.DatadogPodAutoscalerTimeseriesQuery{ + { + Name: "q", + Source: datadoghqcommon.DatadogPodAutoscalerMetricsDataSourceMetrics, + Metrics: &datadoghqcommon.DatadogPodAutoscalerMetricsTimeseriesQuery{ + Query: em.Query, + }, + }, + }, + }, + Value: datadoghqcommon.DatadogPodAutoscalerObjectiveValue{ + Type: datadoghqcommon.DatadogPodAutoscalerAbsoluteValueObjectiveValueType, + AbsoluteValue: em.TargetValue, + }, + Window: metav1.Duration{Duration: em.Window}, + }, + }) + } + } + + spec.Objectives = objectives +} + +// addDPAFinalizer adds HPAMigrationFinalizer to the DPA via a JSON merge patch. +func (c *Controller) addDPAFinalizer(ctx context.Context, ns, name string, existing []string) error { + if slices.Contains(existing, model.HPAMigrationFinalizer) { + return nil + } + return c.patchDPAFinalizers(ctx, ns, name, append(slices.Clone(existing), model.HPAMigrationFinalizer)) +} + +// removeDPAFinalizer removes HPAMigrationFinalizer from the DPA via a JSON merge patch. +func (c *Controller) removeDPAFinalizer(ctx context.Context, ns, name string, existing []string) error { + newFinalizers := slices.DeleteFunc(slices.Clone(existing), func(f string) bool { + return f == model.HPAMigrationFinalizer + }) + if len(newFinalizers) == len(existing) { + return nil + } + return c.patchDPAFinalizers(ctx, ns, name, newFinalizers) +} + +func (c *Controller) patchDPAFinalizers(ctx context.Context, ns, name string, finalizers []string) error { + finalizersJSON, err := json.Marshal(finalizers) + if err != nil { + return fmt.Errorf("failed to serialize finalizers: %w", err) + } + _, err = c.Client.Resource(podAutoscalerGVR).Namespace(ns).Patch( + ctx, name, types.MergePatchType, + []byte(fmt.Sprintf(`{"metadata":{"finalizers":%s}}`, string(finalizersJSON))), + metav1.PatchOptions{}, + ) + return err +} + +// handleHPAMigrationCleanup is called when a DPA with HPAMigrationFinalizer is being deleted. +// It finds the associated HPA in the informer cache, restores it to its original spec, and +// removes the finalizer so Kubernetes can complete the DPA deletion. +// The store must already be locked when called; this function always unlocks it before returning. +func (c *Controller) handleHPAMigrationCleanup( + ctx context.Context, + key, ns, name string, + podAutoscaler *datadoghq.DatadogPodAutoscaler, + storeUnlock func(), +) (autoscaling.ProcessResult, error) { + log.Infof("HPA migration: running finalizer cleanup for DPA %s/%s", ns, name) + + hpa, err := findHPAForTarget(c.hpaIndexer, ns, podAutoscaler.Spec.TargetRef.Name) + if err != nil { + storeUnlock() + return autoscaling.Requeue, fmt.Errorf("HPA migration cleanup: failed to find HPA for DPA %s/%s: %w", ns, name, err) + } + + if hpa != nil { + if restoreErr := restoreHPA(ctx, c.kubeClient, hpa); restoreErr != nil { + storeUnlock() + return autoscaling.Requeue, fmt.Errorf("HPA migration cleanup: %w", restoreErr) + } + } else { + log.Infof("HPA migration: no HPA found for DPA %s/%s target %q, skipping restore", ns, name, podAutoscaler.Spec.TargetRef.Name) + } + + if err := c.removeDPAFinalizer(ctx, ns, name, podAutoscaler.Finalizers); err != nil { + storeUnlock() + return autoscaling.Requeue, fmt.Errorf("HPA migration cleanup: failed to remove finalizer from DPA %s/%s: %w", ns, name, err) + } + + c.store.UnlockDelete(key, c.ID) + return autoscaling.NoRequeue, nil +} + +// initiateHPAMigration is called once — when the DPA does not yet have HPAMigrationFinalizer — to: +// 1. Verify the HPA cache has synced; requeue if not. +// 2. Find and validate the existing HPA for the DPA's target. +// 3. Disable the HPA (set scaleUp/scaleDown selectPolicy: Disabled). +// 4. Add HPAMigrationFinalizer to the DPA. +// 5. (UC2) If profile-managed and not yet imported, auto-populate DPA spec from HPA config. +// +// Returns (true, nil) when a Kubernetes object was patched and the caller should stop the +// current reconcile, waiting for an informer re-queue. +func (c *Controller) initiateHPAMigration( + ctx context.Context, + ns, name string, + podAutoscaler *datadoghq.DatadogPodAutoscaler, +) (bool, error) { + // Guard: wait for the HPA cache before looking up the target. + if !c.hpaInformerSynced() { + log.Debugf("HPA migration: HPA informer not yet synced, re-queuing DPA %s/%s", ns, name) + return true, nil + } + + hpa, err := findHPAForTarget(c.hpaIndexer, ns, podAutoscaler.Spec.TargetRef.Name) + if err != nil { + return false, err + } + if hpa == nil { + return false, nil + } + + if err := validateHPAMetrics(hpa); err != nil { + return false, err + } + + // Guard: only wait for the DatadogMetric cache when the HPA actually references a + // DatadogMetric CRD. This ensures migration proceeds normally for CPU-only HPAs even + // when the DatadogMetric CRD is not installed in the cluster. + if hpaHasDatadogMetricRefs(hpa) && !c.datadogMetricInformerSynced() { + log.Debugf("HPA migration: DatadogMetric informer not yet synced, re-queuing DPA %s/%s", ns, name) + return true, nil + } + + // UC2: one-shot import of HPA config into profile-managed DPAs. + if podAutoscaler.Annotations[model.HPAConfigImportedAnnotation] == "" && + podAutoscaler.Labels[model.ProfileLabelKey] != "" { + cfg, err := extractHPAConfig(c.datadogMetricIndexer, hpa) + if err != nil { + return false, err + } + specCopy := podAutoscaler.Spec.DeepCopy() + applyHPAConfigToDPASpec(specCopy, cfg) + + updatedDPA := podAutoscaler.DeepCopy() + updatedDPA.Spec = *specCopy + if updatedDPA.Annotations == nil { + updatedDPA.Annotations = make(map[string]string) + } + updatedDPA.Annotations[model.HPAConfigImportedAnnotation] = "true" + + obj, err := autoscaling.ToUnstructured(updatedDPA) + if err != nil { + return false, fmt.Errorf("HPA migration: failed to convert DPA to unstructured: %w", err) + } + if _, err := c.Client.Resource(podAutoscalerGVR).Namespace(ns).Update(ctx, obj, metav1.UpdateOptions{}); err != nil { + return false, fmt.Errorf("HPA migration: failed to update DPA spec with HPA config: %w", err) + } + log.Infof("HPA migration: imported HPA config into DPA %s/%s", ns, name) + return true, nil + } + + if err := disableHPA(ctx, c.kubeClient, hpa, ns, name); err != nil { + return false, err + } + + if err := c.addDPAFinalizer(ctx, ns, name, podAutoscaler.Finalizers); err != nil { + return false, fmt.Errorf("HPA migration: failed to add finalizer to DPA %s/%s: %w", ns, name, err) + } + + log.Infof("HPA migration: HPA %s/%s neutralised, finalizer added to DPA %s/%s", hpa.Namespace, hpa.Name, ns, name) + return true, nil +} diff --git a/pkg/clusteragent/autoscaling/workload/controller_hpa_migration_test.go b/pkg/clusteragent/autoscaling/workload/controller_hpa_migration_test.go new file mode 100644 index 000000000000..ab691c024e22 --- /dev/null +++ b/pkg/clusteragent/autoscaling/workload/controller_hpa_migration_test.go @@ -0,0 +1,628 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +//go:build kubeapiserver + +package workload + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + autoscalingv2 "k8s.io/api/autoscaling/v2" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/tools/cache" + + datadoghqcommon "github.com/DataDog/datadog-operator/api/datadoghq/common" + datadoghqv1alpha1 "github.com/DataDog/datadog-operator/api/datadoghq/v1alpha1" + datadoghq "github.com/DataDog/datadog-operator/api/datadoghq/v1alpha2" + + "github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling" + "github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling/workload/model" + "github.com/DataDog/datadog-agent/pkg/util/pointer" +) + +// newHPAIndexer builds a cache.Indexer for HPA objects with the hpaTargetIndexName index. +func newHPAIndexer(t *testing.T) cache.Indexer { + t.Helper() + indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{ + hpaTargetIndexName: hpaByTargetRefIndex, + }) + return indexer +} + +// addHPAToIndexer converts an HPA to unstructured and adds it to the indexer. +func addHPAToIndexer(t *testing.T, indexer cache.Indexer, hpa *autoscalingv2.HorizontalPodAutoscaler) { + t.Helper() + obj, err := autoscaling.ToUnstructured(hpa) + require.NoError(t, err) + require.NoError(t, indexer.Add(obj)) +} + +// newDatadogMetricIndexer builds a plain cache.Indexer suitable for DatadogMetric lookups. +func newDatadogMetricIndexer() cache.Indexer { + return cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{}) +} + +// addDatadogMetricToIndexer converts a DatadogMetric to unstructured and adds it to the indexer. +func addDatadogMetricToIndexer(t *testing.T, indexer cache.Indexer, dm *datadoghqv1alpha1.DatadogMetric) { + t.Helper() + obj, err := autoscaling.ToUnstructured(dm) + require.NoError(t, err) + require.NoError(t, indexer.Add(obj)) +} + +// --- hpaHasDatadogMetricRefs --- + +func TestHPAHasDatadogMetricRefs(t *testing.T) { + makeHPA := func(metrics ...autoscalingv2.MetricSpec) *autoscalingv2.HorizontalPodAutoscaler { + return &autoscalingv2.HorizontalPodAutoscaler{ + Spec: autoscalingv2.HorizontalPodAutoscalerSpec{Metrics: metrics}, + } + } + + // No metrics at all + assert.False(t, hpaHasDatadogMetricRefs(makeHPA())) + + // CPU-only HPA — should return false (no DatadogMetric needed) + assert.False(t, hpaHasDatadogMetricRefs(makeHPA(autoscalingv2.MetricSpec{ + Type: autoscalingv2.ResourceMetricSourceType, + Resource: &autoscalingv2.ResourceMetricSource{ + Name: corev1.ResourceCPU, + Target: autoscalingv2.MetricTarget{Type: autoscalingv2.UtilizationMetricType}, + }, + }))) + + // External metric without datadogmetric@ prefix + assert.False(t, hpaHasDatadogMetricRefs(makeHPA(autoscalingv2.MetricSpec{ + Type: autoscalingv2.ExternalMetricSourceType, + External: &autoscalingv2.ExternalMetricSource{ + Metric: autoscalingv2.MetricIdentifier{Name: "prometheus_requests"}, + }, + }))) + + // External metric with datadogmetric@ prefix (lowercase) + assert.True(t, hpaHasDatadogMetricRefs(makeHPA(autoscalingv2.MetricSpec{ + Type: autoscalingv2.ExternalMetricSourceType, + External: &autoscalingv2.ExternalMetricSource{ + Metric: autoscalingv2.MetricIdentifier{Name: "datadogmetric@ns:my-metric"}, + }, + }))) + + // datadogmetric@ with mixed case + assert.True(t, hpaHasDatadogMetricRefs(makeHPA(autoscalingv2.MetricSpec{ + Type: autoscalingv2.ExternalMetricSourceType, + External: &autoscalingv2.ExternalMetricSource{ + Metric: autoscalingv2.MetricIdentifier{Name: "DatadogMetric@ns:my-metric"}, + }, + }))) + + // Mixed: CPU + DatadogMetric → true + assert.True(t, hpaHasDatadogMetricRefs(makeHPA( + autoscalingv2.MetricSpec{ + Type: autoscalingv2.ResourceMetricSourceType, + Resource: &autoscalingv2.ResourceMetricSource{ + Name: corev1.ResourceCPU, + Target: autoscalingv2.MetricTarget{Type: autoscalingv2.UtilizationMetricType}, + }, + }, + autoscalingv2.MetricSpec{ + Type: autoscalingv2.ExternalMetricSourceType, + External: &autoscalingv2.ExternalMetricSource{ + Metric: autoscalingv2.MetricIdentifier{Name: "datadogmetric@ns:my-metric"}, + }, + }, + ))) +} + +// --- isCPUUsageQuery --- + +func TestIsCPUUsageQuery(t *testing.T) { + tests := []struct { + query string + want bool + }{ + {"avg:container.cpu.usage{*}", true}, + {"avg:kubernetes.cpu.usage{*}", true}, + {"avg:CONTAINER.CPU.USAGE{*}", true}, + {"avg:nginx.requests{*}", false}, + {"sum:system.cpu.user{*}", false}, + {"", false}, + } + for _, tt := range tests { + assert.Equal(t, tt.want, isCPUUsageQuery(tt.query), "query: %q", tt.query) + } +} + +// --- validateHPAMetrics --- + +func TestValidateHPAMetrics(t *testing.T) { + makeHPA := func(metrics ...autoscalingv2.MetricSpec) *autoscalingv2.HorizontalPodAutoscaler { + return &autoscalingv2.HorizontalPodAutoscaler{ + ObjectMeta: metav1.ObjectMeta{Name: "test-hpa", Namespace: "ns"}, + Spec: autoscalingv2.HorizontalPodAutoscalerSpec{Metrics: metrics}, + } + } + + // Valid: pod-level CPU utilization + assert.NoError(t, validateHPAMetrics(makeHPA(autoscalingv2.MetricSpec{ + Type: autoscalingv2.ResourceMetricSourceType, + Resource: &autoscalingv2.ResourceMetricSource{ + Name: corev1.ResourceCPU, + Target: autoscalingv2.MetricTarget{Type: autoscalingv2.UtilizationMetricType, AverageUtilization: pointer.Ptr(int32(50))}, + }, + }))) + + // Valid: container CPU utilization + assert.NoError(t, validateHPAMetrics(makeHPA(autoscalingv2.MetricSpec{ + Type: autoscalingv2.ContainerResourceMetricSourceType, + ContainerResource: &autoscalingv2.ContainerResourceMetricSource{ + Name: corev1.ResourceCPU, + Container: "app", + Target: autoscalingv2.MetricTarget{Type: autoscalingv2.UtilizationMetricType, AverageUtilization: pointer.Ptr(int32(60))}, + }, + }))) + + // Valid: external DatadogMetric reference + assert.NoError(t, validateHPAMetrics(makeHPA(autoscalingv2.MetricSpec{ + Type: autoscalingv2.ExternalMetricSourceType, + External: &autoscalingv2.ExternalMetricSource{ + Metric: autoscalingv2.MetricIdentifier{Name: "datadogmetric@ns:my-metric"}, + Target: autoscalingv2.MetricTarget{Type: autoscalingv2.AverageValueMetricType}, + }, + }))) + + // Invalid: memory resource metric + err := validateHPAMetrics(makeHPA(autoscalingv2.MetricSpec{ + Type: autoscalingv2.ResourceMetricSourceType, + Resource: &autoscalingv2.ResourceMetricSource{ + Name: corev1.ResourceMemory, + Target: autoscalingv2.MetricTarget{Type: autoscalingv2.UtilizationMetricType}, + }, + })) + require.Error(t, err) + assert.ErrorAs(t, err, new(interface { + Reason() autoscaling.ConditionReasonType + })) + + // Invalid: CPU with non-Utilization target + err = validateHPAMetrics(makeHPA(autoscalingv2.MetricSpec{ + Type: autoscalingv2.ResourceMetricSourceType, + Resource: &autoscalingv2.ResourceMetricSource{ + Name: corev1.ResourceCPU, + Target: autoscalingv2.MetricTarget{Type: autoscalingv2.AverageValueMetricType}, + }, + })) + require.Error(t, err) + + // Invalid: external metric without datadogmetric@ prefix + err = validateHPAMetrics(makeHPA(autoscalingv2.MetricSpec{ + Type: autoscalingv2.ExternalMetricSourceType, + External: &autoscalingv2.ExternalMetricSource{ + Metric: autoscalingv2.MetricIdentifier{Name: "prometheus_requests"}, + Target: autoscalingv2.MetricTarget{Type: autoscalingv2.AverageValueMetricType}, + }, + })) + require.Error(t, err) + var cr autoscaling.ConditionReason + require.ErrorAs(t, err, &cr) + assert.Equal(t, autoscaling.ConditionReasonUnsupportedHPAMetric, cr.Reason()) + + // Invalid: pods metric type + err = validateHPAMetrics(makeHPA(autoscalingv2.MetricSpec{ + Type: autoscalingv2.PodsMetricSourceType, + })) + require.Error(t, err) +} + +// --- findHPAForTarget --- + +func TestFindHPAForTarget(t *testing.T) { + indexer := newHPAIndexer(t) + + hpa1 := &autoscalingv2.HorizontalPodAutoscaler{ + ObjectMeta: metav1.ObjectMeta{Name: "hpa-1", Namespace: "ns"}, + Spec: autoscalingv2.HorizontalPodAutoscalerSpec{ + ScaleTargetRef: autoscalingv2.CrossVersionObjectReference{Name: "deploy-a"}, + }, + } + hpa2 := &autoscalingv2.HorizontalPodAutoscaler{ + ObjectMeta: metav1.ObjectMeta{Name: "hpa-2", Namespace: "ns"}, + Spec: autoscalingv2.HorizontalPodAutoscalerSpec{ + ScaleTargetRef: autoscalingv2.CrossVersionObjectReference{Name: "deploy-b"}, + }, + } + hpa3 := &autoscalingv2.HorizontalPodAutoscaler{ + ObjectMeta: metav1.ObjectMeta{Name: "hpa-3", Namespace: "ns"}, + Spec: autoscalingv2.HorizontalPodAutoscalerSpec{ + ScaleTargetRef: autoscalingv2.CrossVersionObjectReference{Name: "deploy-a"}, + }, + } + addHPAToIndexer(t, indexer, hpa1) + addHPAToIndexer(t, indexer, hpa2) + + // No match + got, err := findHPAForTarget(indexer, "ns", "deploy-c") + require.NoError(t, err) + assert.Nil(t, got) + + // Exact match + got, err = findHPAForTarget(indexer, "ns", "deploy-b") + require.NoError(t, err) + require.NotNil(t, got) + assert.Equal(t, "hpa-2", got.Name) + + // Namespace isolation: different namespace should find nothing + got, err = findHPAForTarget(indexer, "other-ns", "deploy-a") + require.NoError(t, err) + assert.Nil(t, got) + + // Ambiguous: two HPAs target the same deployment + addHPAToIndexer(t, indexer, hpa3) + got, err = findHPAForTarget(indexer, "ns", "deploy-a") + require.Error(t, err) + assert.Nil(t, got) + var cr autoscaling.ConditionReason + require.ErrorAs(t, err, &cr) + assert.Equal(t, autoscaling.ConditionReasonAmbiguousHPA, cr.Reason()) +} + +// --- resolveDatadogMetricFromCache --- + +func TestResolveDatadogMetricFromCache(t *testing.T) { + indexer := newDatadogMetricIndexer() + + dm := &datadoghqv1alpha1.DatadogMetric{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "datadoghq.com/v1alpha1", + Kind: "DatadogMetric", + }, + ObjectMeta: metav1.ObjectMeta{Name: "my-metric", Namespace: "ns"}, + Spec: datadoghqv1alpha1.DatadogMetricSpec{ + Query: "avg:container.cpu.usage{*}", + MaxAge: metav1.Duration{Duration: 10 * time.Minute}, + }, + } + addDatadogMetricToIndexer(t, indexer, dm) + + // Found + spec, err := resolveDatadogMetricFromCache(indexer, "ns", "my-metric") + require.NoError(t, err) + require.NotNil(t, spec) + assert.Equal(t, "avg:container.cpu.usage{*}", spec.Query) + assert.Equal(t, 10*time.Minute, spec.MaxAge.Duration) + + // Not found + _, err = resolveDatadogMetricFromCache(indexer, "ns", "missing-metric") + require.Error(t, err) + var cr autoscaling.ConditionReason + require.ErrorAs(t, err, &cr) + assert.Equal(t, autoscaling.ConditionReasonDatadogMetricNotFound, cr.Reason()) +} + +// --- extractHPAConfig --- + +func TestExtractHPAConfig(t *testing.T) { + t.Run("pod CPU utilization", func(t *testing.T) { + hpa := &autoscalingv2.HorizontalPodAutoscaler{ + ObjectMeta: metav1.ObjectMeta{Name: "hpa", Namespace: "ns"}, + Spec: autoscalingv2.HorizontalPodAutoscalerSpec{ + MinReplicas: pointer.Ptr(int32(2)), + MaxReplicas: 10, + Metrics: []autoscalingv2.MetricSpec{{ + Type: autoscalingv2.ResourceMetricSourceType, + Resource: &autoscalingv2.ResourceMetricSource{ + Name: corev1.ResourceCPU, + Target: autoscalingv2.MetricTarget{Type: autoscalingv2.UtilizationMetricType, AverageUtilization: pointer.Ptr(int32(70))}, + }, + }}, + }, + } + cfg, err := extractHPAConfig(newDatadogMetricIndexer(), hpa) + require.NoError(t, err) + assert.Equal(t, pointer.Ptr(int32(2)), cfg.MinReplicas) + assert.Equal(t, int32(10), cfg.MaxReplicas) + require.NotNil(t, cfg.PodCPUUtilization) + assert.Equal(t, int32(70), *cfg.PodCPUUtilization) + assert.Empty(t, cfg.ContainerCPUTargets) + }) + + t.Run("container CPU utilization", func(t *testing.T) { + hpa := &autoscalingv2.HorizontalPodAutoscaler{ + ObjectMeta: metav1.ObjectMeta{Name: "hpa", Namespace: "ns"}, + Spec: autoscalingv2.HorizontalPodAutoscalerSpec{ + MaxReplicas: 5, + Metrics: []autoscalingv2.MetricSpec{{ + Type: autoscalingv2.ContainerResourceMetricSourceType, + ContainerResource: &autoscalingv2.ContainerResourceMetricSource{ + Name: corev1.ResourceCPU, + Container: "app", + Target: autoscalingv2.MetricTarget{Type: autoscalingv2.UtilizationMetricType, AverageUtilization: pointer.Ptr(int32(60))}, + }, + }}, + }, + } + cfg, err := extractHPAConfig(newDatadogMetricIndexer(), hpa) + require.NoError(t, err) + assert.Nil(t, cfg.PodCPUUtilization) + require.Len(t, cfg.ContainerCPUTargets, 1) + assert.Equal(t, "app", cfg.ContainerCPUTargets[0].ContainerName) + assert.Equal(t, int32(60), cfg.ContainerCPUTargets[0].CPUUtilization) + }) + + t.Run("external DatadogMetric CPU usage query", func(t *testing.T) { + ddmIndexer := newDatadogMetricIndexer() + addDatadogMetricToIndexer(t, ddmIndexer, &datadoghqv1alpha1.DatadogMetric{ + TypeMeta: metav1.TypeMeta{APIVersion: "datadoghq.com/v1alpha1", Kind: "DatadogMetric"}, + ObjectMeta: metav1.ObjectMeta{Name: "cpu-metric", Namespace: "ns"}, + Spec: datadoghqv1alpha1.DatadogMetricSpec{Query: "avg:container.cpu.usage{*}"}, + }) + + q := resource.MustParse("100m") + hpa := &autoscalingv2.HorizontalPodAutoscaler{ + ObjectMeta: metav1.ObjectMeta{Name: "hpa", Namespace: "ns"}, + Spec: autoscalingv2.HorizontalPodAutoscalerSpec{ + MaxReplicas: 8, + Metrics: []autoscalingv2.MetricSpec{{ + Type: autoscalingv2.ExternalMetricSourceType, + External: &autoscalingv2.ExternalMetricSource{ + Metric: autoscalingv2.MetricIdentifier{Name: "datadogmetric@ns:cpu-metric"}, + Target: autoscalingv2.MetricTarget{Type: autoscalingv2.AverageValueMetricType, AverageValue: &q}, + }, + }}, + }, + } + cfg, err := extractHPAConfig(ddmIndexer, hpa) + require.NoError(t, err) + require.Len(t, cfg.ExternalMetrics, 1) + em := cfg.ExternalMetrics[0] + assert.True(t, em.IsCPUUsage) + assert.Equal(t, "avg:container.cpu.usage{*}", em.Query) + assert.Equal(t, defaultCustomQueryWindow, em.Window) + }) + + t.Run("external DatadogMetric custom query with MaxAge", func(t *testing.T) { + ddmIndexer := newDatadogMetricIndexer() + addDatadogMetricToIndexer(t, ddmIndexer, &datadoghqv1alpha1.DatadogMetric{ + TypeMeta: metav1.TypeMeta{APIVersion: "datadoghq.com/v1alpha1", Kind: "DatadogMetric"}, + ObjectMeta: metav1.ObjectMeta{Name: "rps-metric", Namespace: "ns"}, + Spec: datadoghqv1alpha1.DatadogMetricSpec{Query: "avg:nginx.requests{*}", MaxAge: metav1.Duration{Duration: 2 * time.Minute}}, + }) + + v := resource.MustParse("500") + hpa := &autoscalingv2.HorizontalPodAutoscaler{ + ObjectMeta: metav1.ObjectMeta{Name: "hpa", Namespace: "ns"}, + Spec: autoscalingv2.HorizontalPodAutoscalerSpec{ + MaxReplicas: 20, + Metrics: []autoscalingv2.MetricSpec{{ + Type: autoscalingv2.ExternalMetricSourceType, + External: &autoscalingv2.ExternalMetricSource{ + Metric: autoscalingv2.MetricIdentifier{Name: "datadogmetric@ns:rps-metric"}, + Target: autoscalingv2.MetricTarget{Type: autoscalingv2.AverageValueMetricType, AverageValue: &v}, + }, + }}, + }, + } + cfg, err := extractHPAConfig(ddmIndexer, hpa) + require.NoError(t, err) + require.Len(t, cfg.ExternalMetrics, 1) + em := cfg.ExternalMetrics[0] + assert.False(t, em.IsCPUUsage) + assert.Equal(t, "avg:nginx.requests{*}", em.Query) + assert.Equal(t, 2*time.Minute, em.Window) + }) + + t.Run("external DatadogMetric not found in cache", func(t *testing.T) { + hpa := &autoscalingv2.HorizontalPodAutoscaler{ + ObjectMeta: metav1.ObjectMeta{Name: "hpa", Namespace: "ns"}, + Spec: autoscalingv2.HorizontalPodAutoscalerSpec{ + MaxReplicas: 5, + Metrics: []autoscalingv2.MetricSpec{{ + Type: autoscalingv2.ExternalMetricSourceType, + External: &autoscalingv2.ExternalMetricSource{ + Metric: autoscalingv2.MetricIdentifier{Name: "datadogmetric@ns:missing"}, + Target: autoscalingv2.MetricTarget{Type: autoscalingv2.AverageValueMetricType, AverageValue: func() *resource.Quantity { q := resource.MustParse("1"); return &q }()}, + }, + }}, + }, + } + _, err := extractHPAConfig(newDatadogMetricIndexer(), hpa) + require.Error(t, err) + var cr autoscaling.ConditionReason + require.ErrorAs(t, err, &cr) + assert.Equal(t, autoscaling.ConditionReasonDatadogMetricNotFound, cr.Reason()) + }) + + t.Run("external DatadogMetric query with template variables is resolved", func(t *testing.T) { + t.Setenv("TEST_CLUSTER", "prod-cluster") + ddmIndexer := newDatadogMetricIndexer() + addDatadogMetricToIndexer(t, ddmIndexer, &datadoghqv1alpha1.DatadogMetric{ + TypeMeta: metav1.TypeMeta{APIVersion: "datadoghq.com/v1alpha1", Kind: "DatadogMetric"}, + ObjectMeta: metav1.ObjectMeta{Name: "rps-metric", Namespace: "ns"}, + Spec: datadoghqv1alpha1.DatadogMetricSpec{ + Query: "avg:nginx.requests{cluster:%%env_TEST_CLUSTER%%,service:web}", + }, + }) + + v := resource.MustParse("10") + hpa := &autoscalingv2.HorizontalPodAutoscaler{ + ObjectMeta: metav1.ObjectMeta{Name: "hpa", Namespace: "ns"}, + Spec: autoscalingv2.HorizontalPodAutoscalerSpec{ + MaxReplicas: 10, + Metrics: []autoscalingv2.MetricSpec{{ + Type: autoscalingv2.ExternalMetricSourceType, + External: &autoscalingv2.ExternalMetricSource{ + Metric: autoscalingv2.MetricIdentifier{Name: "datadogmetric@ns:rps-metric"}, + Target: autoscalingv2.MetricTarget{Type: autoscalingv2.AverageValueMetricType, AverageValue: &v}, + }, + }}, + }, + } + cfg, err := extractHPAConfig(ddmIndexer, hpa) + require.NoError(t, err) + require.Len(t, cfg.ExternalMetrics, 1) + // Placeholder must be resolved; the raw %%env_%% string must not reach the DPA spec. + assert.Equal(t, "avg:nginx.requests{cluster:prod-cluster,service:web}", cfg.ExternalMetrics[0].Query) + }) +} + +// --- applyHPAConfigToDPASpec --- + +func TestApplyHPAConfigToDPASpec(t *testing.T) { + t.Run("pod CPU utilization → PodResource Utilization objective", func(t *testing.T) { + spec := &datadoghq.DatadogPodAutoscalerSpec{} + applyHPAConfigToDPASpec(spec, HPAConfig{ + MinReplicas: pointer.Ptr(int32(2)), + MaxReplicas: 10, + PodCPUUtilization: pointer.Ptr(int32(70)), + }) + require.NotNil(t, spec.Constraints) + assert.Equal(t, pointer.Ptr(int32(2)), spec.Constraints.MinReplicas) + assert.Equal(t, pointer.Ptr(int32(10)), spec.Constraints.MaxReplicas) + require.Len(t, spec.Objectives, 1) + obj := spec.Objectives[0] + assert.Equal(t, datadoghqcommon.DatadogPodAutoscalerPodResourceObjectiveType, obj.Type) + require.NotNil(t, obj.PodResource) + assert.Equal(t, corev1.ResourceCPU, obj.PodResource.Name) + assert.Equal(t, datadoghqcommon.DatadogPodAutoscalerUtilizationObjectiveValueType, obj.PodResource.Value.Type) + assert.Equal(t, pointer.Ptr(int32(70)), obj.PodResource.Value.Utilization) + }) + + t.Run("container CPU utilization → ContainerResource Utilization objective", func(t *testing.T) { + spec := &datadoghq.DatadogPodAutoscalerSpec{} + applyHPAConfigToDPASpec(spec, HPAConfig{ + MaxReplicas: 5, + ContainerCPUTargets: []ContainerCPUTarget{ + {ContainerName: "app", CPUUtilization: 60}, + {ContainerName: "sidecar", CPUUtilization: 40}, + }, + }) + require.Len(t, spec.Objectives, 2) + for _, obj := range spec.Objectives { + assert.Equal(t, datadoghqcommon.DatadogPodAutoscalerContainerResourceObjectiveType, obj.Type) + assert.Equal(t, datadoghqcommon.DatadogPodAutoscalerUtilizationObjectiveValueType, obj.ContainerResource.Value.Type) + } + }) + + t.Run("pod CPU takes precedence over container CPU", func(t *testing.T) { + spec := &datadoghq.DatadogPodAutoscalerSpec{} + applyHPAConfigToDPASpec(spec, HPAConfig{ + MaxReplicas: 5, + PodCPUUtilization: pointer.Ptr(int32(75)), + ContainerCPUTargets: []ContainerCPUTarget{{ContainerName: "app", CPUUtilization: 60}}, + }) + require.Len(t, spec.Objectives, 1) + assert.Equal(t, datadoghqcommon.DatadogPodAutoscalerPodResourceObjectiveType, spec.Objectives[0].Type) + }) + + t.Run("external CPU usage → PodResource AbsoluteValue objective", func(t *testing.T) { + q := resource.MustParse("100m") + spec := &datadoghq.DatadogPodAutoscalerSpec{} + applyHPAConfigToDPASpec(spec, HPAConfig{ + MaxReplicas: 8, + ExternalMetrics: []ExternalMetricConfig{{ + Query: "avg:container.cpu.usage{*}", + TargetValue: &q, + Window: 5 * time.Minute, + IsCPUUsage: true, + }}, + }) + require.Len(t, spec.Objectives, 1) + obj := spec.Objectives[0] + assert.Equal(t, datadoghqcommon.DatadogPodAutoscalerPodResourceObjectiveType, obj.Type) + require.NotNil(t, obj.PodResource) + assert.Equal(t, datadoghqcommon.DatadogPodAutoscalerAbsoluteValueObjectiveValueType, obj.PodResource.Value.Type) + }) + + t.Run("external custom query → CustomQuery objective", func(t *testing.T) { + q := resource.MustParse("500") + spec := &datadoghq.DatadogPodAutoscalerSpec{} + applyHPAConfigToDPASpec(spec, HPAConfig{ + MaxReplicas: 20, + ExternalMetrics: []ExternalMetricConfig{{ + Query: "avg:nginx.requests{*}", + TargetValue: &q, + Window: 2 * time.Minute, + IsCPUUsage: false, + }}, + }) + require.Len(t, spec.Objectives, 1) + obj := spec.Objectives[0] + assert.Equal(t, datadoghqcommon.DatadogPodAutoscalerCustomQueryObjectiveType, obj.Type) + require.NotNil(t, obj.CustomQuery) + assert.Equal(t, 2*time.Minute, obj.CustomQuery.Window.Duration) + require.Len(t, obj.CustomQuery.Request.Queries, 1) + assert.Equal(t, "avg:nginx.requests{*}", obj.CustomQuery.Request.Queries[0].Metrics.Query) + }) + + t.Run("does not overwrite existing objectives", func(t *testing.T) { + existing := datadoghqcommon.DatadogPodAutoscalerObjective{ + Type: datadoghqcommon.DatadogPodAutoscalerPodResourceObjectiveType, + } + spec := &datadoghq.DatadogPodAutoscalerSpec{ + Objectives: []datadoghqcommon.DatadogPodAutoscalerObjective{existing}, + } + applyHPAConfigToDPASpec(spec, HPAConfig{ + MaxReplicas: 5, + PodCPUUtilization: pointer.Ptr(int32(80)), + }) + require.Len(t, spec.Objectives, 1) + assert.Equal(t, existing, spec.Objectives[0]) + }) + + t.Run("does not overwrite existing constraints", func(t *testing.T) { + spec := &datadoghq.DatadogPodAutoscalerSpec{ + Constraints: &datadoghqcommon.DatadogPodAutoscalerConstraints{ + MinReplicas: pointer.Ptr(int32(5)), + MaxReplicas: pointer.Ptr(int32(20)), + }, + } + applyHPAConfigToDPASpec(spec, HPAConfig{ + MinReplicas: pointer.Ptr(int32(1)), + MaxReplicas: 10, + }) + assert.Equal(t, pointer.Ptr(int32(5)), spec.Constraints.MinReplicas) + assert.Equal(t, pointer.Ptr(int32(20)), spec.Constraints.MaxReplicas) + }) +} + +// --- hpaByTargetRefIndex --- + +func TestHPAByTargetRefIndex(t *testing.T) { + hpa := &autoscalingv2.HorizontalPodAutoscaler{ + ObjectMeta: metav1.ObjectMeta{Name: "hpa", Namespace: "ns"}, + Spec: autoscalingv2.HorizontalPodAutoscalerSpec{ScaleTargetRef: autoscalingv2.CrossVersionObjectReference{Name: "my-deploy"}}, + } + obj, err := autoscaling.ToUnstructured(hpa) + require.NoError(t, err) + + keys, err := hpaByTargetRefIndex(obj) + require.NoError(t, err) + assert.Equal(t, []string{"ns/my-deploy"}, keys) + + // Non-Unstructured input returns error + _, err = hpaByTargetRefIndex(&autoscalingv2.HorizontalPodAutoscaler{}) + require.Error(t, err) + + // Empty scaleTargetRef name returns no keys + empty, _ := autoscaling.ToUnstructured(&autoscalingv2.HorizontalPodAutoscaler{ + ObjectMeta: metav1.ObjectMeta{Name: "e", Namespace: "ns"}, + }) + keys, err = hpaByTargetRefIndex(empty) + require.NoError(t, err) + assert.Empty(t, keys) +} + +// Compile-time check that unstructured.Unstructured satisfies runtime.Object. +var _ runtime.Object = (*unstructured.Unstructured)(nil) + +// model.DatadogPodAutoscalerHPAMigrationCondition is reachable from this package. +var _ = model.DatadogPodAutoscalerHPAMigrationCondition diff --git a/pkg/clusteragent/autoscaling/workload/controller_test.go b/pkg/clusteragent/autoscaling/workload/controller_test.go index 5fd7759a5961..dd0664298685 100755 --- a/pkg/clusteragent/autoscaling/workload/controller_test.go +++ b/pkg/clusteragent/autoscaling/workload/controller_test.go @@ -76,6 +76,8 @@ func newFixture(t *testing.T, testTime time.Time) *fixture { c.horizontalController.scaler = scaler c.verticalController.inPlaceResizeSupported = func() *bool { b := true; return &b }() c.verticalController.inPlaceResizeSupportedTime = clock.Now() + c.hpaInformerSynced = func() bool { return true } + c.datadogMetricInformerSynced = func() bool { return true } return c.Controller, err }, ), diff --git a/pkg/clusteragent/autoscaling/workload/model/const.go b/pkg/clusteragent/autoscaling/workload/model/const.go index a425065c8071..71a379ec41d3 100644 --- a/pkg/clusteragent/autoscaling/workload/model/const.go +++ b/pkg/clusteragent/autoscaling/workload/model/const.go @@ -7,17 +7,45 @@ package model +import ( + datadoghqcommon "github.com/DataDog/datadog-operator/api/datadoghq/common" +) + const ( + // DatadogPodAutoscalerHPAMigrationCondition is true when the DPA has taken over management of an HPA. + // This condition type is agent-local and not yet defined in the upstream operator API. + DatadogPodAutoscalerHPAMigrationCondition datadoghqcommon.DatadogPodAutoscalerConditionType = "HPAMigration" + // PreviewAnnotationKey is the annotation key used to enable preview/alpha autoscaling features. // Its value is a JSON object where each key enables a specific feature flag, e.g.: // autoscaling.datadoghq.com/preview: '{"burstable":true}' // WARNING: preview features are experimental. Any option may be changed or // removed without notice in a future version. // Known keys: - // "burstable" (bool) — when true, CPU limits are removed from containers so they can burst - // beyond their CPU request when spare capacity is available on the node. + // "burstable" (bool) — when true, CPU limits are removed from containers so they can burst + // beyond their CPU request when spare capacity is available on the node. + // "hpa-migration" (bool) — when true, enables HPA-to-DPA migration: the controller will detect + // an existing HPA for the same target, neutralise it, and take over + // horizontal scaling. PreviewAnnotationKey = "autoscaling.datadoghq.com/preview" + // HPAOriginalSpecAnnotation is the annotation key placed on an HPA resource to store its original + // spec as a JSON string. Used to restore the HPA when the managing DPA is deleted. + HPAOriginalSpecAnnotation = "autoscaling.datadoghq.com/hpa-original-spec" + + // HPAManagedByDPAAnnotation is the annotation key placed on an HPA resource to record which + // DatadogPodAutoscaler is currently managing it. Value is "/". + HPAManagedByDPAAnnotation = "autoscaling.datadoghq.com/managed-by-dpa" + + // HPAConfigImportedAnnotation is the annotation key placed on a DPA to mark that the HPA + // configuration has already been one-shot imported into the DPA spec (UC2). Prevents re-importing + // on subsequent reconciliations and overwriting user edits. + HPAConfigImportedAnnotation = "autoscaling.datadoghq.com/hpa-config-imported" + + // HPAMigrationFinalizer is the finalizer added to a DPA when it takes over management of an HPA. + // It ensures the HPA is restored to its original state before the DPA object is fully removed. + HPAMigrationFinalizer = "autoscaling.datadoghq.com/hpa-migration" + // RecommendationIDAnnotation is the annotation key used to store the recommendation ID RecommendationIDAnnotation = "autoscaling.datadoghq.com/rec-id" // AutoscalerIDAnnotation is the annotation key used to store the autoscaler ID diff --git a/pkg/clusteragent/autoscaling/workload/model/pod_autoscaler.go b/pkg/clusteragent/autoscaling/workload/model/pod_autoscaler.go index cafa3a5fbd47..3d1eb33384a1 100644 --- a/pkg/clusteragent/autoscaling/workload/model/pod_autoscaler.go +++ b/pkg/clusteragent/autoscaling/workload/model/pod_autoscaler.go @@ -164,6 +164,10 @@ type PodAutoscalerInternal struct { // error is the an error encountered by the controller not specific to a scaling action error error + // hpaMigrationActive is true when this DPA has taken over management of an HPA + // (i.e. the HPAMigrationFinalizer is present on the DPA object). + hpaMigrationActive bool + // deleted flags the PodAutoscaler as deleted (removal to be handled by the controller) // (only if owner == remote or profile-managed) deleted bool @@ -238,7 +242,8 @@ func NewPodAutoscalerFromProfile( // previewOptions holds the parsed feature flags from PreviewAnnotation. type previewOptions struct { - Burstable bool `json:"burstable,omitempty"` + Burstable bool `json:"burstable,omitempty"` + HPAMigration bool `json:"hpa-migration,omitempty"` } // parsePreviewAnnotationString parses a raw PreviewAnnotation JSON string. @@ -686,6 +691,13 @@ func (p *PodAutoscalerInternal) IsBurstable() bool { return p.previewOptions.Burstable } +// IsHPAMigrationEnabled returns true if the hpa-migration preview option is enabled. +// When true the controller will detect an existing HPA for the same target, neutralise it, +// and take over horizontal scaling. +func (p *PodAutoscalerInternal) IsHPAMigrationEnabled() bool { + return p.previewOptions.HPAMigration +} + // PreviewAnnotation returns the JSON-encoded preview annotation forwarded from the cluster // profile (e.g. `{"burstable":true}`). Returns empty string when no preview features are // active. For standalone (non-profile-managed) autoscalers this always returns empty string. @@ -889,6 +901,16 @@ func (p *PodAutoscalerInternal) Error() error { return p.error } +// HPAMigrationActive returns true when this DPA has taken over management of an HPA. +func (p *PodAutoscalerInternal) HPAMigrationActive() bool { + return p.hpaMigrationActive +} + +// SetHPAMigrationActive sets whether the DPA currently owns an HPA via the migration finalizer. +func (p *PodAutoscalerInternal) SetHPAMigrationActive(active bool) { + p.hpaMigrationActive = active +} + // ProfileName returns the profile name if this is a profile-managed autoscaler. func (p *PodAutoscalerInternal) ProfileName() string { return p.profileName @@ -915,6 +937,23 @@ func (p *PodAutoscalerInternal) MarkProfileTemplateApplied() { p.appliedProfileHash = p.desiredProfileTemplateHash } +// RestoreHPAImportedSpec copies objectives and constraints from a Kubernetes DPA spec +// into the PAI's internal spec. This preserves HPA-imported fields (objectives, constraints) +// when a profile template update would otherwise overwrite them with empty values, since +// the profile template intentionally omits objectives and constraints — those are imported +// one-shot from the existing HPA. +func (p *PodAutoscalerInternal) RestoreHPAImportedSpec(k8sSpec *datadoghq.DatadogPodAutoscalerSpec) { + if p.upstreamCR == nil || k8sSpec == nil { + return + } + if len(k8sSpec.Objectives) > 0 { + p.upstreamCR.Spec.Objectives = append([]datadoghqcommon.DatadogPodAutoscalerObjective{}, k8sSpec.Objectives...) + } + if k8sSpec.Constraints != nil { + p.upstreamCR.Spec.Constraints = k8sSpec.Constraints.DeepCopy() + } +} + // Deleted returns the deletion status of the PodAutoscaler func (p *PodAutoscalerInternal) Deleted() bool { return p.deleted @@ -1017,6 +1056,7 @@ func (p *PodAutoscalerInternal) BuildStatus(currentTime metav1.Time, currentStat datadoghqcommon.DatadogPodAutoscalerVerticalAbleToRecommendCondition: nil, datadoghqcommon.DatadogPodAutoscalerVerticalAbleToApply: nil, datadoghqcommon.DatadogPodAutoscalerVerticalScalingLimitedCondition: nil, + DatadogPodAutoscalerHPAMigrationCondition: nil, } if currentStatus != nil { @@ -1094,6 +1134,15 @@ func (p *PodAutoscalerInternal) BuildStatus(currentTime metav1.Time, currentStat } status.Conditions = append(status.Conditions, newCondition(rolloutStatus, verticalReason, verticalMessage, currentTime, datadoghqcommon.DatadogPodAutoscalerVerticalAbleToApply, existingConditions)) + // HPAMigration condition: only emitted when the DPA has the hpa-migration preview flag set. + if p.IsHPAMigrationEnabled() { + if p.hpaMigrationActive { + status.Conditions = append(status.Conditions, newCondition(corev1.ConditionTrue, "Migrated", "HPA is managed by this DPA", currentTime, DatadogPodAutoscalerHPAMigrationCondition, existingConditions)) + } else { + status.Conditions = append(status.Conditions, newCondition(corev1.ConditionFalse, "Pending", "", currentTime, DatadogPodAutoscalerHPAMigrationCondition, existingConditions)) + } + } + return status } diff --git a/pkg/clusteragent/autoscaling/workload/model/pod_autoscaler_test.go b/pkg/clusteragent/autoscaling/workload/model/pod_autoscaler_test.go index 3785c44aa6a6..a3902b8d84a6 100644 --- a/pkg/clusteragent/autoscaling/workload/model/pod_autoscaler_test.go +++ b/pkg/clusteragent/autoscaling/workload/model/pod_autoscaler_test.go @@ -642,6 +642,60 @@ func TestProfileManagedPodAutoscaler(t *testing.T) { pai.UpdateFromProfile("high-cpu", template, targetRef, "hash1-v3", "") assert.False(t, pai.IsBurstable()) }) + + t.Run("RestoreHPAImportedSpec preserves objectives and constraints after profile update", func(t *testing.T) { + // Simulate a profile template that intentionally has no objectives/constraints + // (the common case for HPA-migration profiles). + emptyTemplate := &datadoghq.DatadogPodAutoscalerTemplate{} + pai := NewPodAutoscalerFromProfile("prod", "webapp-abc1", "hpa-migration", emptyTemplate, targetRef, "hash1", "") + assert.Empty(t, pai.Spec().Objectives) + assert.Nil(t, pai.Spec().Constraints) + + // Simulate what the migration controller writes to Kubernetes: + // objectives and constraints imported from the HPA. + importedMinReplicas := int32(2) + importedMaxReplicas := int32(10) + absVal := resource.MustParse("10") + importedSpec := datadoghq.DatadogPodAutoscalerSpec{ + Objectives: []datadoghqcommon.DatadogPodAutoscalerObjective{ + { + Type: datadoghqcommon.DatadogPodAutoscalerCustomQueryObjectiveType, + CustomQuery: &datadoghqcommon.DatadogPodAutoscalerCustomQueryObjective{ + Value: datadoghqcommon.DatadogPodAutoscalerObjectiveValue{ + Type: datadoghqcommon.DatadogPodAutoscalerAbsoluteValueObjectiveValueType, + AbsoluteValue: &absVal, + }, + }, + }, + }, + Constraints: &datadoghqcommon.DatadogPodAutoscalerConstraints{ + MinReplicas: &importedMinReplicas, + MaxReplicas: &importedMaxReplicas, + }, + } + + // Profile template changes (new hash). The syncer calls UpdateFromProfile, + // which replaces the spec with the new profile template (clearing objectives/constraints). + pai.UpdateFromProfile("hpa-migration", emptyTemplate, targetRef, "hash2", "") + assert.Empty(t, pai.Spec().Objectives) + assert.Nil(t, pai.Spec().Constraints) + + // RestoreHPAImportedSpec must put back the imported objectives and constraints + // before updatePodAutoscalerSpec writes to Kubernetes. + pai.RestoreHPAImportedSpec(&importedSpec) + assert.Equal(t, importedSpec.Objectives, pai.Spec().Objectives) + assert.Equal(t, importedSpec.Constraints, pai.Spec().Constraints) + }) + + t.Run("RestoreHPAImportedSpec is no-op when k8s spec has no objectives", func(t *testing.T) { + emptyTemplate := &datadoghq.DatadogPodAutoscalerTemplate{} + pai := NewPodAutoscalerFromProfile("prod", "webapp-abc1", "hpa-migration", emptyTemplate, targetRef, "hash1", "") + + // k8s spec also has no objectives — nothing to restore + pai.RestoreHPAImportedSpec(&datadoghq.DatadogPodAutoscalerSpec{}) + assert.Empty(t, pai.Spec().Objectives) + assert.Nil(t, pai.Spec().Constraints) + }) } func TestContainerResourcesForStatus(t *testing.T) { From c068b14b61a994d3f96a7a5404e6ce70de6d42f4 Mon Sep 17 00:00:00 2001 From: Cedric Lamoriniere Date: Fri, 1 May 2026 07:56:11 +0000 Subject: [PATCH 2/3] [autoscaling] Fix gofmt violation in controller_fake.go Assisted-by: Claude:claude-sonnet-4-6 --- pkg/clusteragent/autoscaling/controller_fake.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/clusteragent/autoscaling/controller_fake.go b/pkg/clusteragent/autoscaling/controller_fake.go index 5ea628ba13af..09439f5bcec5 100644 --- a/pkg/clusteragent/autoscaling/controller_fake.go +++ b/pkg/clusteragent/autoscaling/controller_fake.go @@ -21,8 +21,8 @@ import ( kscheme "k8s.io/client-go/kubernetes/scheme" k8stesting "k8s.io/client-go/testing" - datadoghq "github.com/DataDog/datadog-operator/api/datadoghq/v1alpha2" datadoghqv1alpha1 "github.com/DataDog/datadog-operator/api/datadoghq/v1alpha1" + datadoghq "github.com/DataDog/datadog-operator/api/datadoghq/v1alpha2" ) var ( From 79ae6868b87ac043fdaa9d86df0065e2405e3db6 Mon Sep 17 00:00:00 2001 From: Cedric Lamoriniere Date: Tue, 5 May 2026 20:18:38 +0000 Subject: [PATCH 3/3] [autoscaling] Fix HPA admission webhook: apiVersions and annotation-check logic MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two bugs prevented the HPA admission webhook from working correctly during HPA migration: 1. apiVersions hardcoded to ["v1"] — the MutatingWebhookConfiguration rule only targeted autoscaling/v1, so the API server never forwarded autoscaling/v2 HPA UPDATE requests to the webhook. Fixed by introducing an optional webhookWithResourceAPIVersions interface; HPAWebhook returns ["v1","v2"]. 2. Webhook blocked the cluster-agent's own disableHPA patch — the admission handler was checking only the incoming object for the managed-by-dpa annotation. Since disableHPA adds the annotation in the same patch, the webhook saw the annotation, treated it as an external edit, and reverted the spec — leaving the HPA with no selectPolicy:Disabled. Fixed by checking BOTH oldObject and object: disableHPA (old has no annotation) and restoreHPA (new has no annotation) pass through; external edits (annotation on both sides) are correctly reverted. MatchConditions (CEL) removed from the webhook registration — all filtering is now done in the Go handler (revertHPASpec), keeping the logic in one place and avoiding CEL evaluation overhead on the API server. Assisted-by: Claude:claude-sonnet-4-6 --- .../controllers/webhook/controller_v1.go | 27 ++++++-- .../controllers/webhook/controller_v1_test.go | 14 ++--- .../controllers/webhook/controller_v1beta1.go | 11 ++-- .../webhook/controller_v1beta1_test.go | 14 ++--- .../mutate/autoscaling/hpa_webhook.go | 45 +++++++------- .../mutate/autoscaling/hpa_webhook_test.go | 62 ++++++++++++++----- 6 files changed, 112 insertions(+), 61 deletions(-) diff --git a/pkg/clusteragent/admission/controllers/webhook/controller_v1.go b/pkg/clusteragent/admission/controllers/webhook/controller_v1.go index 0992004b54ab..c21d53303714 100644 --- a/pkg/clusteragent/admission/controllers/webhook/controller_v1.go +++ b/pkg/clusteragent/admission/controllers/webhook/controller_v1.go @@ -352,6 +352,22 @@ func (c *ControllerV1) deleteMutatingWebhook(webhook *admiv1.MutatingWebhookConf return err } +// webhookWithResourceAPIVersions is an optional interface that webhooks implement +// when they target resources at a non-default API version (i.e. not "v1"). +// Webhooks that do not implement this interface default to apiVersions: ["v1"]. +type webhookWithResourceAPIVersions interface { + ResourceAPIVersions() []string +} + +// resourceAPIVersions returns the API versions a webhook targets. +// Falls back to ["v1"] when the webhook does not implement webhookWithResourceAPIVersions. +func resourceAPIVersions(w Webhook) []string { + if wh, ok := w.(webhookWithResourceAPIVersions); ok { + return wh.ResourceAPIVersions() + } + return []string{"v1"} +} + // generateTemplates generates the webhook templates from the configuration. func (c *ControllerV1) generateTemplates() { // Generate validating webhook templates @@ -368,6 +384,7 @@ func (c *ControllerV1) generateTemplates() { webhook.Endpoint(), webhook.Operations(), webhook.Resources(), + resourceAPIVersions(webhook), nsSelector, objSelector, webhook.MatchConditions(), @@ -391,6 +408,7 @@ func (c *ControllerV1) generateTemplates() { webhook.Endpoint(), webhook.Operations(), webhook.Resources(), + resourceAPIVersions(webhook), nsSelector, objSelector, webhook.MatchConditions(), @@ -406,6 +424,7 @@ func (c *ControllerV1) generateTemplates() { probeEndpoint, []admiv1.OperationType{admiv1.Create}, map[string][]string{"": {"configmaps"}}, + []string{"v1"}, &metav1.LabelSelector{ MatchExpressions: []metav1.LabelSelectorRequirement{ { @@ -428,7 +447,7 @@ func (c *ControllerV1) generateTemplates() { c.mutatingWebhookTemplates = mutatingWebhooks } -func (c *ControllerV1) getValidatingWebhookSkeleton(nameSuffix, path string, operations []admiv1.OperationType, resourcesMap map[string][]string, namespaceSelector, objectSelector *metav1.LabelSelector, matchConditions []admiv1.MatchCondition, timeout int32) admiv1.ValidatingWebhook { +func (c *ControllerV1) getValidatingWebhookSkeleton(nameSuffix, path string, operations []admiv1.OperationType, resourcesMap map[string][]string, apiVersions []string, namespaceSelector, objectSelector *metav1.LabelSelector, matchConditions []admiv1.MatchCondition, timeout int32) admiv1.ValidatingWebhook { matchPolicy := admiv1.Exact sideEffects := admiv1.SideEffectClassNone port := c.config.getServicePort() @@ -463,7 +482,7 @@ func (c *ControllerV1) getValidatingWebhookSkeleton(nameSuffix, path string, ope Operations: operations, Rule: admiv1.Rule{ APIGroups: []string{group}, - APIVersions: []string{"v1"}, + APIVersions: apiVersions, Resources: []string{resource}, }, }) @@ -473,7 +492,7 @@ func (c *ControllerV1) getValidatingWebhookSkeleton(nameSuffix, path string, ope return webhook } -func (c *ControllerV1) getMutatingWebhookSkeleton(nameSuffix, path string, operations []admiv1.OperationType, resourcesMap map[string][]string, namespaceSelector, objectSelector *metav1.LabelSelector, matchConditions []admiv1.MatchCondition, timeout int32) admiv1.MutatingWebhook { +func (c *ControllerV1) getMutatingWebhookSkeleton(nameSuffix, path string, operations []admiv1.OperationType, resourcesMap map[string][]string, apiVersions []string, namespaceSelector, objectSelector *metav1.LabelSelector, matchConditions []admiv1.MatchCondition, timeout int32) admiv1.MutatingWebhook { matchPolicy := admiv1.Exact sideEffects := admiv1.SideEffectClassNone port := c.config.getServicePort() @@ -510,7 +529,7 @@ func (c *ControllerV1) getMutatingWebhookSkeleton(nameSuffix, path string, opera Operations: operations, Rule: admiv1.Rule{ APIGroups: []string{group}, - APIVersions: []string{"v1"}, + APIVersions: apiVersions, Resources: []string{resource}, }, }) diff --git a/pkg/clusteragent/admission/controllers/webhook/controller_v1_test.go b/pkg/clusteragent/admission/controllers/webhook/controller_v1_test.go index 16d38aa4d220..76f46ad01833 100644 --- a/pkg/clusteragent/admission/controllers/webhook/controller_v1_test.go +++ b/pkg/clusteragent/admission/controllers/webhook/controller_v1_test.go @@ -182,9 +182,9 @@ func TestAdmissionControllerFailureModeV1(t *testing.T) { mockConfig.SetWithoutSource("admission_controller.failure_policy", value) c.config = NewConfig(true, false, false, mockConfig) - validatingWebhookSkeleton := c.getValidatingWebhookSkeleton("foo", "/bar", []admiv1.OperationType{admiv1.Create}, map[string][]string{"": {"pods"}}, nil, nil, nil, 0) + validatingWebhookSkeleton := c.getValidatingWebhookSkeleton("foo", "/bar", []admiv1.OperationType{admiv1.Create}, map[string][]string{"": {"pods"}}, []string{"v1"}, nil, nil, nil, 0) assert.Equal(t, admiv1.Ignore, *validatingWebhookSkeleton.FailurePolicy) - mutatingWebhookSkeleton := c.getMutatingWebhookSkeleton("foo", "/bar", []admiv1.OperationType{admiv1.Create}, map[string][]string{"": {"pods"}}, nil, nil, nil, 0) + mutatingWebhookSkeleton := c.getMutatingWebhookSkeleton("foo", "/bar", []admiv1.OperationType{admiv1.Create}, map[string][]string{"": {"pods"}}, []string{"v1"}, nil, nil, nil, 0) assert.Equal(t, admiv1.Ignore, *mutatingWebhookSkeleton.FailurePolicy) } @@ -192,9 +192,9 @@ func TestAdmissionControllerFailureModeV1(t *testing.T) { mockConfig.SetWithoutSource("admission_controller.failure_policy", value) c.config = NewConfig(true, false, false, mockConfig) - validatingWebhookSkeleton := c.getValidatingWebhookSkeleton("foo", "/bar", []admiv1.OperationType{admiv1.Create}, map[string][]string{"": {"pods"}}, nil, nil, nil, 0) + validatingWebhookSkeleton := c.getValidatingWebhookSkeleton("foo", "/bar", []admiv1.OperationType{admiv1.Create}, map[string][]string{"": {"pods"}}, []string{"v1"}, nil, nil, nil, 0) assert.Equal(t, admiv1.Fail, *validatingWebhookSkeleton.FailurePolicy) - mutatingWebhookSkeleton := c.getMutatingWebhookSkeleton("foo", "/bar", []admiv1.OperationType{admiv1.Create}, map[string][]string{"": {"pods"}}, nil, nil, nil, 0) + mutatingWebhookSkeleton := c.getMutatingWebhookSkeleton("foo", "/bar", []admiv1.OperationType{admiv1.Create}, map[string][]string{"": {"pods"}}, []string{"v1"}, nil, nil, nil, 0) assert.Equal(t, admiv1.Fail, *mutatingWebhookSkeleton.FailurePolicy) } } @@ -208,7 +208,7 @@ func TestAdmissionControllerReinvocationPolicyV1(t *testing.T) { mockConfig.SetWithoutSource("admission_controller.reinvocationpolicy", value) c.config = NewConfig(true, false, false, mockConfig) - mutatingWebhookSkeleton := c.getMutatingWebhookSkeleton("foo", "/bar", []admiv1.OperationType{admiv1.Create}, map[string][]string{"": {"pods"}}, nil, nil, nil, 0) + mutatingWebhookSkeleton := c.getMutatingWebhookSkeleton("foo", "/bar", []admiv1.OperationType{admiv1.Create}, map[string][]string{"": {"pods"}}, []string{"v1"}, nil, nil, nil, 0) assert.Equal(t, admiv1.IfNeededReinvocationPolicy, *mutatingWebhookSkeleton.ReinvocationPolicy) } } @@ -1127,7 +1127,7 @@ func TestGetValidatingWebhookSkeletonV1(t *testing.T) { nsSelector, objSelector := common.DefaultLabelSelectors(tt.namespaceSelector, common.LabelSelectorsConfig{}) - assert.EqualValues(t, tt.want, c.getValidatingWebhookSkeleton(tt.args.nameSuffix, tt.args.path, []admiv1.OperationType{admiv1.Create}, map[string][]string{"": {"pods"}}, nsSelector, objSelector, nil, 0)) + assert.EqualValues(t, tt.want, c.getValidatingWebhookSkeleton(tt.args.nameSuffix, tt.args.path, []admiv1.OperationType{admiv1.Create}, map[string][]string{"": {"pods"}}, []string{"v1"}, nsSelector, objSelector, nil, 0)) }) } } @@ -1233,7 +1233,7 @@ func TestGetMutatingWebhookSkeletonV1(t *testing.T) { nsSelector, objSelector := common.DefaultLabelSelectors(tt.namespaceSelector, common.LabelSelectorsConfig{}) - assert.EqualValues(t, tt.want, c.getMutatingWebhookSkeleton(tt.args.nameSuffix, tt.args.path, []admiv1.OperationType{admiv1.Create}, map[string][]string{"": {"pods"}}, nsSelector, objSelector, nil, 0)) + assert.EqualValues(t, tt.want, c.getMutatingWebhookSkeleton(tt.args.nameSuffix, tt.args.path, []admiv1.OperationType{admiv1.Create}, map[string][]string{"": {"pods"}}, []string{"v1"}, nsSelector, objSelector, nil, 0)) }) } } diff --git a/pkg/clusteragent/admission/controllers/webhook/controller_v1beta1.go b/pkg/clusteragent/admission/controllers/webhook/controller_v1beta1.go index ce0a8e994299..04f33efddd76 100644 --- a/pkg/clusteragent/admission/controllers/webhook/controller_v1beta1.go +++ b/pkg/clusteragent/admission/controllers/webhook/controller_v1beta1.go @@ -370,6 +370,7 @@ func (c *ControllerV1beta1) generateTemplates() { webhook.Endpoint(), webhook.Operations(), webhook.Resources(), + resourceAPIVersions(webhook), nsSelector, objSelector, convertMatchConditions(webhook.MatchConditions()), @@ -394,6 +395,7 @@ func (c *ControllerV1beta1) generateTemplates() { webhook.Endpoint(), webhook.Operations(), webhook.Resources(), + resourceAPIVersions(webhook), nsSelector, objSelector, convertMatchConditions(webhook.MatchConditions()), @@ -409,6 +411,7 @@ func (c *ControllerV1beta1) generateTemplates() { probeEndpoint, []admiv1beta1.OperationType{admiv1beta1.Create}, map[string][]string{"": {"configmaps"}}, + []string{"v1"}, &metav1.LabelSelector{ MatchExpressions: []metav1.LabelSelectorRequirement{ { @@ -431,7 +434,7 @@ func (c *ControllerV1beta1) generateTemplates() { c.mutatingWebhookTemplates = mutatingWebhooks } -func (c *ControllerV1beta1) getValidatingWebhookSkeleton(nameSuffix, path string, operations []admiv1beta1.OperationType, resourcesMap map[string][]string, namespaceSelector, objectSelector *metav1.LabelSelector, matchConditions []admiv1beta1.MatchCondition, timeout int32) admiv1beta1.ValidatingWebhook { +func (c *ControllerV1beta1) getValidatingWebhookSkeleton(nameSuffix, path string, operations []admiv1beta1.OperationType, resourcesMap map[string][]string, apiVersions []string, namespaceSelector, objectSelector *metav1.LabelSelector, matchConditions []admiv1beta1.MatchCondition, timeout int32) admiv1beta1.ValidatingWebhook { matchPolicy := admiv1beta1.Exact sideEffects := admiv1beta1.SideEffectClassNone port := c.config.getServicePort() @@ -466,7 +469,7 @@ func (c *ControllerV1beta1) getValidatingWebhookSkeleton(nameSuffix, path string Operations: operations, Rule: admiv1beta1.Rule{ APIGroups: []string{group}, - APIVersions: []string{"v1"}, + APIVersions: apiVersions, Resources: []string{resource}, }, }) @@ -476,7 +479,7 @@ func (c *ControllerV1beta1) getValidatingWebhookSkeleton(nameSuffix, path string return webhook } -func (c *ControllerV1beta1) getMutatingWebhookSkeleton(nameSuffix, path string, operations []admiv1beta1.OperationType, resourcesMap map[string][]string, namespaceSelector, objectSelector *metav1.LabelSelector, matchConditions []admiv1beta1.MatchCondition, timeout int32) admiv1beta1.MutatingWebhook { +func (c *ControllerV1beta1) getMutatingWebhookSkeleton(nameSuffix, path string, operations []admiv1beta1.OperationType, resourcesMap map[string][]string, apiVersions []string, namespaceSelector, objectSelector *metav1.LabelSelector, matchConditions []admiv1beta1.MatchCondition, timeout int32) admiv1beta1.MutatingWebhook { matchPolicy := admiv1beta1.Exact sideEffects := admiv1beta1.SideEffectClassNone port := c.config.getServicePort() @@ -513,7 +516,7 @@ func (c *ControllerV1beta1) getMutatingWebhookSkeleton(nameSuffix, path string, Operations: operations, Rule: admiv1beta1.Rule{ APIGroups: []string{group}, - APIVersions: []string{"v1"}, + APIVersions: apiVersions, Resources: []string{resource}, }, }) diff --git a/pkg/clusteragent/admission/controllers/webhook/controller_v1beta1_test.go b/pkg/clusteragent/admission/controllers/webhook/controller_v1beta1_test.go index f8391e69609a..955185c4cdaf 100644 --- a/pkg/clusteragent/admission/controllers/webhook/controller_v1beta1_test.go +++ b/pkg/clusteragent/admission/controllers/webhook/controller_v1beta1_test.go @@ -178,9 +178,9 @@ func TestAdmissionControllerFailureModeV1beta1(t *testing.T) { mockConfig.SetWithoutSource("admission_controller.failure_policy", value) c.config = NewConfig(true, false, false, mockConfig) - validatingWebhookSkeleton := c.getValidatingWebhookSkeleton("foo", "/bar", []admiv1beta1.OperationType{admiv1beta1.Create}, map[string][]string{"": {"pods"}}, nil, nil, nil, 0) + validatingWebhookSkeleton := c.getValidatingWebhookSkeleton("foo", "/bar", []admiv1beta1.OperationType{admiv1beta1.Create}, map[string][]string{"": {"pods"}}, []string{"v1"}, nil, nil, nil, 0) assert.Equal(t, admiv1beta1.Ignore, *validatingWebhookSkeleton.FailurePolicy) - mutatingWebhookSkeleton := c.getMutatingWebhookSkeleton("foo", "/bar", []admiv1beta1.OperationType{admiv1beta1.Create}, map[string][]string{"": {"pods"}}, nil, nil, nil, 0) + mutatingWebhookSkeleton := c.getMutatingWebhookSkeleton("foo", "/bar", []admiv1beta1.OperationType{admiv1beta1.Create}, map[string][]string{"": {"pods"}}, []string{"v1"}, nil, nil, nil, 0) assert.Equal(t, admiv1beta1.Ignore, *mutatingWebhookSkeleton.FailurePolicy) } @@ -188,9 +188,9 @@ func TestAdmissionControllerFailureModeV1beta1(t *testing.T) { mockConfig.SetWithoutSource("admission_controller.failure_policy", value) c.config = NewConfig(true, false, false, mockConfig) - validatingWebhookSkeleton := c.getValidatingWebhookSkeleton("foo", "/bar", []admiv1beta1.OperationType{admiv1beta1.Create}, map[string][]string{"": {"pods"}}, nil, nil, nil, 0) + validatingWebhookSkeleton := c.getValidatingWebhookSkeleton("foo", "/bar", []admiv1beta1.OperationType{admiv1beta1.Create}, map[string][]string{"": {"pods"}}, []string{"v1"}, nil, nil, nil, 0) assert.Equal(t, admiv1beta1.Fail, *validatingWebhookSkeleton.FailurePolicy) - mutatingWebhookSkeleton := c.getMutatingWebhookSkeleton("foo", "/bar", []admiv1beta1.OperationType{admiv1beta1.Create}, map[string][]string{"": {"pods"}}, nil, nil, nil, 0) + mutatingWebhookSkeleton := c.getMutatingWebhookSkeleton("foo", "/bar", []admiv1beta1.OperationType{admiv1beta1.Create}, map[string][]string{"": {"pods"}}, []string{"v1"}, nil, nil, nil, 0) assert.Equal(t, admiv1beta1.Fail, *mutatingWebhookSkeleton.FailurePolicy) } } @@ -204,7 +204,7 @@ func TestAdmissionControllerReinvocationPolicyV1beta1(t *testing.T) { mockConfig.SetWithoutSource("admission_controller.reinvocationpolicy", value) c.config = NewConfig(true, false, false, mockConfig) - mutatingWebhookSkeleton := c.getMutatingWebhookSkeleton("foo", "/bar", []admiv1beta1.OperationType{admiv1beta1.Create}, map[string][]string{"": {"pods"}}, nil, nil, nil, 0) + mutatingWebhookSkeleton := c.getMutatingWebhookSkeleton("foo", "/bar", []admiv1beta1.OperationType{admiv1beta1.Create}, map[string][]string{"": {"pods"}}, []string{"v1"}, nil, nil, nil, 0) assert.Equal(t, admiv1beta1.IfNeededReinvocationPolicy, *mutatingWebhookSkeleton.ReinvocationPolicy) } } @@ -1121,7 +1121,7 @@ func TestGetValidatingWebhookSkeletonV1beta1(t *testing.T) { nsSelector, objSelector := common.DefaultLabelSelectors(tt.namespaceSelector, common.LabelSelectorsConfig{}) - assert.EqualValues(t, tt.want, c.getValidatingWebhookSkeleton(tt.args.nameSuffix, tt.args.path, []admiv1beta1.OperationType{admiv1beta1.Create}, map[string][]string{"": {"pods"}}, nsSelector, objSelector, nil, 0)) + assert.EqualValues(t, tt.want, c.getValidatingWebhookSkeleton(tt.args.nameSuffix, tt.args.path, []admiv1beta1.OperationType{admiv1beta1.Create}, map[string][]string{"": {"pods"}}, []string{"v1"}, nsSelector, objSelector, nil, 0)) }) } } @@ -1227,7 +1227,7 @@ func TestGetMutatingWebhookSkeletonV1beta1(t *testing.T) { nsSelector, objSelector := common.DefaultLabelSelectors(tt.namespaceSelector, common.LabelSelectorsConfig{}) - assert.EqualValues(t, tt.want, c.getMutatingWebhookSkeleton(tt.args.nameSuffix, tt.args.path, []admiv1beta1.OperationType{admiv1beta1.Create}, map[string][]string{"": {"pods"}}, nsSelector, objSelector, nil, 0)) + assert.EqualValues(t, tt.want, c.getMutatingWebhookSkeleton(tt.args.nameSuffix, tt.args.path, []admiv1beta1.OperationType{admiv1beta1.Create}, map[string][]string{"": {"pods"}}, []string{"v1"}, nsSelector, objSelector, nil, 0)) }) } } diff --git a/pkg/clusteragent/admission/mutate/autoscaling/hpa_webhook.go b/pkg/clusteragent/admission/mutate/autoscaling/hpa_webhook.go index d02bdbd327bf..b19e0ab371a0 100644 --- a/pkg/clusteragent/admission/mutate/autoscaling/hpa_webhook.go +++ b/pkg/clusteragent/admission/mutate/autoscaling/hpa_webhook.go @@ -68,29 +68,25 @@ func (w *HPAWebhook) Endpoint() string { return w.endpoint } // Resources returns the Kubernetes resources this webhook applies to. func (w *HPAWebhook) Resources() map[string][]string { return w.resources } +// ResourceAPIVersions returns ["v1", "v2"] so the webhook fires on both +// autoscaling/v1 and autoscaling/v2 HorizontalPodAutoscaler updates. +func (w *HPAWebhook) ResourceAPIVersions() []string { return []string{"v1", "v2"} } + // Timeout returns the webhook timeout (0 = server default). func (w *HPAWebhook) Timeout() int32 { return 0 } // Operations returns the operations this webhook is invoked for. func (w *HPAWebhook) Operations() []admissionregistrationv1.OperationType { return w.operations } -// LabelSelectors returns nil selectors — the webhook filters via MatchConditions instead. +// LabelSelectors returns nil selectors — filtering is done inside the handler. func (w *HPAWebhook) LabelSelectors(_ bool) (namespaceSelector *metav1.LabelSelector, objectSelector *metav1.LabelSelector) { return nil, nil } -// MatchConditions returns a CEL condition that restricts the webhook to HPA objects that -// carry the DPA-management annotation, avoiding unnecessary invocations. +// MatchConditions returns nil — all HPA UPDATE filtering is done inside revertHPASpec, +// keeping the logic in one place rather than split between a CEL expression and Go code. func (w *HPAWebhook) MatchConditions() []admissionregistrationv1.MatchCondition { - return []admissionregistrationv1.MatchCondition{ - { - Name: "managed-by-dpa", - Expression: fmt.Sprintf( - `has(object.metadata.annotations) && "%s" in object.metadata.annotations`, - model.HPAManagedByDPAAnnotation, - ), - }, - } + return nil } // WebhookFunc returns the admission handler. @@ -103,23 +99,26 @@ func (w *HPAWebhook) WebhookFunc() admission.WebhookFunc { // revertHPASpec ensures that any update to an HPA managed by a DPA is reverted back // to the old (disabled) spec. It also surfaces a warning to the user. func (w *HPAWebhook) revertHPASpec(request *admission.Request) *admiv1.AdmissionResponse { - // Decode incoming (proposed) HPA. - var incomingHPA autoscalingv2.HorizontalPodAutoscaler - if err := json.Unmarshal(request.Object, &incomingHPA); err != nil { - log.Warnf("HPA webhook: failed to decode incoming HPA: %v", err) + // Decode the old (current) HPA — this is the spec we want to preserve. + var oldHPA autoscalingv2.HorizontalPodAutoscaler + if err := json.Unmarshal(request.OldObject, &oldHPA); err != nil { + log.Warnf("HPA webhook: failed to decode old HPA for %s/%s: %v", request.Namespace, request.Name, err) return admissionAllowed() } - // Only act when the DPA-management annotation is present. - dpaRef := incomingHPA.Annotations[model.HPAManagedByDPAAnnotation] - if dpaRef == "" { + // Decode the incoming (proposed) HPA to check the annotation on the new object. + var incomingHPA autoscalingv2.HorizontalPodAutoscaler + if err := json.Unmarshal(request.Object, &incomingHPA); err != nil { + log.Warnf("HPA webhook: failed to decode incoming HPA: %v", err) return admissionAllowed() } - // Decode the old (current, disabled) HPA to get the spec we want to preserve. - var oldHPA autoscalingv2.HorizontalPodAutoscaler - if err := json.Unmarshal(request.OldObject, &oldHPA); err != nil { - log.Warnf("HPA webhook: failed to decode old HPA for %s/%s: %v", request.Namespace, request.Name, err) + // Only act when the DPA-management annotation is present on BOTH objects. + // - disableHPA (first adds the annotation): old has no annotation → skip. + // - restoreHPA (removes the annotation): new has no annotation → skip. + // - External user edits: annotation present on both → revert. + dpaRef := oldHPA.Annotations[model.HPAManagedByDPAAnnotation] + if dpaRef == "" || incomingHPA.Annotations[model.HPAManagedByDPAAnnotation] == "" { return admissionAllowed() } diff --git a/pkg/clusteragent/admission/mutate/autoscaling/hpa_webhook_test.go b/pkg/clusteragent/admission/mutate/autoscaling/hpa_webhook_test.go index f66d01566908..3823b7fc1abf 100644 --- a/pkg/clusteragent/admission/mutate/autoscaling/hpa_webhook_test.go +++ b/pkg/clusteragent/admission/mutate/autoscaling/hpa_webhook_test.go @@ -62,15 +62,8 @@ func TestNewHPAWebhook(t *testing.T) { assert.Equal(t, admcommon.WebhookType(admcommon.MutatingWebhook), w.WebhookType()) assert.Equal(t, []admissionregistrationv1.OperationType{admissionregistrationv1.Update}, w.Operations()) assert.Equal(t, map[string][]string{"autoscaling": {"horizontalpodautoscalers"}}, w.Resources()) -} - -func TestHPAWebhook_MatchConditions(t *testing.T) { - cfg := mutatecommon.FakeConfigWithValues(t, map[string]any{}) - w := NewHPAWebhook(cfg) - conditions := w.MatchConditions() - require.Len(t, conditions, 1) - assert.Equal(t, "managed-by-dpa", conditions[0].Name) - assert.Contains(t, conditions[0].Expression, model.HPAManagedByDPAAnnotation) + assert.Equal(t, []string{"v1", "v2"}, w.ResourceAPIVersions(), + "HPA webhook must target both autoscaling/v1 and autoscaling/v2 so the API server fires it for v2 HPAs") } func TestHPAWebhook_revertHPASpec_managed(t *testing.T) { @@ -119,13 +112,15 @@ func TestHPAWebhook_revertHPASpec_managed(t *testing.T) { func TestHPAWebhook_revertHPASpec_not_managed(t *testing.T) { // HPA without the DPA annotation → webhook should be a no-op. - hpa := newTestHPA("my-hpa", "default", 5, nil) + oldHPA := newTestHPA("my-hpa", "default", 5, nil) + incomingHPA := newTestHPA("my-hpa", "default", 5, nil) w := &HPAWebhook{} req := &admission.Request{ Name: "my-hpa", Namespace: "default", - Object: mustMarshal(t, hpa), + Object: mustMarshal(t, incomingHPA), + OldObject: mustMarshal(t, oldHPA), } resp := w.revertHPASpec(req) @@ -136,26 +131,61 @@ func TestHPAWebhook_revertHPASpec_not_managed(t *testing.T) { assert.Empty(t, resp.Warnings) } -func TestHPAWebhook_revertHPASpec_invalid_object(t *testing.T) { +func TestHPAWebhook_revertHPASpec_migration_setup(t *testing.T) { + // The cluster-agent's disableHPA adds the annotation + disables behavior in one patch. + // The old HPA has no annotation; the incoming HPA is the result with the annotation added. + // The webhook must NOT intercept this — it should only block subsequent external edits. + oldHPA := newTestHPA("my-hpa", "default", 5, nil) // no annotation yet + incomingHPA := newTestHPA("my-hpa", "default", 5, map[string]string{ + model.HPAManagedByDPAAnnotation: "default/my-dpa", // annotation just added + }) + w := &HPAWebhook{} req := &admission.Request{ Name: "my-hpa", Namespace: "default", - Object: []byte("not-json"), + Object: mustMarshal(t, incomingHPA), + OldObject: mustMarshal(t, oldHPA), } resp := w.revertHPASpec(req) require.NotNil(t, resp) assert.True(t, resp.Allowed) - assert.Empty(t, resp.Patch) + assert.Empty(t, resp.Patch, "disableHPA patch must not be intercepted by the admission webhook") + assert.Empty(t, resp.Warnings) +} + +func TestHPAWebhook_revertHPASpec_restore_hpa(t *testing.T) { + // The cluster-agent's restoreHPA removes the annotation and restores the original spec. + // The old HPA has the annotation; the incoming HPA has the annotation removed. + // The webhook must NOT intercept this either. + oldHPA := newTestHPA("my-hpa", "default", 5, map[string]string{ + model.HPAManagedByDPAAnnotation: "default/my-dpa", // currently managed + }) + incomingHPA := newTestHPA("my-hpa", "default", 5, nil) // annotation removed on restore + + w := &HPAWebhook{} + req := &admission.Request{ + Name: "my-hpa", + Namespace: "default", + Object: mustMarshal(t, incomingHPA), + OldObject: mustMarshal(t, oldHPA), + } + + resp := w.revertHPASpec(req) + + require.NotNil(t, resp) + assert.True(t, resp.Allowed) + assert.Empty(t, resp.Patch, "restoreHPA patch must not be intercepted by the admission webhook") + assert.Empty(t, resp.Warnings) } -func TestHPAWebhook_revertHPASpec_invalid_old_object(t *testing.T) { +func TestHPAWebhook_revertHPASpec_invalid_old_object_json(t *testing.T) { + // Malformed OldObject → decode fails → allow with no patch. incomingHPA := newTestHPA("my-hpa", "default", 5, map[string]string{ model.HPAManagedByDPAAnnotation: "default/my-dpa", }) - w := &HPAWebhook{} req := &admission.Request{ Name: "my-hpa",