Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ require (
github.com/DataDog/datadog-agent/pkg/util/winutil v0.77.0-devel.0.20260213154712-e02b9359151a
github.com/DataDog/datadog-agent/pkg/version v0.78.0
github.com/DataDog/datadog-go/v5 v5.8.3
github.com/DataDog/datadog-operator/api v0.0.0-20260323152500-0887e50ccf73
github.com/DataDog/datadog-operator/api v0.0.0-20260503193602-adf766128732
github.com/DataDog/datadog-traceroute v1.0.15
github.com/DataDog/dd-trace-go/v2 v2.7.4
github.com/DataDog/ebpf-manager v0.7.18
Expand Down
6 changes: 6 additions & 0 deletions go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions pkg/clusteragent/autoscaling/workload/config_retriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"k8s.io/utils/clock"

"github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling"
"github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling/workload/model"
"github.com/DataDog/datadog-agent/pkg/config/remote/data"
"github.com/DataDog/datadog-agent/pkg/remoteconfig/state"
"github.com/DataDog/datadog-agent/pkg/util/log"
Expand Down Expand Up @@ -47,12 +48,12 @@ type autoscalingProcessor interface {
}

// NewConfigRetriever creates a new ConfigRetriever
func NewConfigRetriever(ctx context.Context, clock clock.WithTicker, store *store, isLeader func() bool, rcClient RcClient) (*ConfigRetriever, error) {
func NewConfigRetriever(ctx context.Context, clock clock.WithTicker, store *store, isLeader func() bool, rcClient RcClient, builder *model.PodAutoscalerInternalBuilder) (*ConfigRetriever, error) {
cr := &ConfigRetriever{
isLeader: isLeader,
clock: clock,

settingsProcessor: newAutoscalingSettingsProcessor(store),
settingsProcessor: newAutoscalingSettingsProcessor(store, builder),
valuesProcessor: newAutoscalingValuesProcessor(store),
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ type settingsItem struct {
}

type autoscalingSettingsProcessor struct {
store *store
store *store
builder *model.PodAutoscalerInternalBuilder
// State is kept nil until the first full config is processed
state map[string]settingsItem
// We are guaranteed to be called in a single thread for pre/process/post
Expand All @@ -51,9 +52,10 @@ type autoscalingSettingsProcessor struct {
lastProcessingError bool
}

func newAutoscalingSettingsProcessor(store *store) autoscalingSettingsProcessor {
func newAutoscalingSettingsProcessor(store *store, builder *model.PodAutoscalerInternalBuilder) autoscalingSettingsProcessor {
return autoscalingSettingsProcessor{
store: store,
store: store,
builder: builder,
}
}

Expand Down Expand Up @@ -153,7 +155,7 @@ func (p *autoscalingSettingsProcessor) reconcile(isLeader bool) {
if podAutoscalerFound {
podAutoscaler.UpdateFromSettings(item.spec, item.receivedTimestamp)
} else {
podAutoscaler = model.NewPodAutoscalerFromSettings(item.namespace, item.name, item.spec, item.receivedTimestamp)
podAutoscaler = p.builder.NewFromSettings(item.namespace, item.name, item.spec, item.receivedTimestamp)
}
p.store.UnlockSet(paID, podAutoscaler, configRetrieverStoreID)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/stretchr/testify/assert"
"k8s.io/utils/clock"

"github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling/workload/model"
"github.com/DataDog/datadog-agent/pkg/remoteconfig/state"
)

Expand Down Expand Up @@ -43,7 +44,7 @@ func newMockConfigRetriever(t *testing.T, isLeader func() bool, store *store, cl

mockRCClient := &mockRCClient{}

cr, err := NewConfigRetriever(context.Background(), clock, store, isLeader, mockRCClient)
cr, err := NewConfigRetriever(context.Background(), clock, store, isLeader, mockRCClient, model.NewPodAutoscalerInternalBuilder(false))
assert.NoError(t, err)

return cr, mockRCClient
Expand Down
7 changes: 5 additions & 2 deletions pkg/clusteragent/autoscaling/workload/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ type Controller struct {

eventRecorder record.EventRecorder
store *store
builder *model.PodAutoscalerInternalBuilder

limitHeap *limitHeap

Expand Down Expand Up @@ -103,12 +104,14 @@ func NewController(
localSender sender.Sender,
limitHeap *limitHeap,
globalTagsFunc func() []string,
builder *model.PodAutoscalerInternalBuilder,
) (*Controller, error) {
c := &Controller{
clusterID: clusterID,
clock: clock,
eventRecorder: eventRecorder,
localSender: localSender,
builder: builder,
isFallbackEnabled: false, // keep fallback disabled by default
}

Expand Down Expand Up @@ -206,7 +209,7 @@ func (c *Controller) processPodAutoscaler(ctx context.Context, key, ns, name str
// If the object is present in Kubernetes, we will update our local version
// Otherwise, we clear it from our local store
if podAutoscaler != nil {
c.store.Set(key, model.NewPodAutoscalerInternal(podAutoscaler), c.ID)
c.store.Set(key, c.builder.NewFromKubernetes(podAutoscaler), c.ID)
} else {
c.store.Delete(key, c.ID)
}
Expand All @@ -225,7 +228,7 @@ func (c *Controller) syncPodAutoscaler(ctx context.Context, key, ns, name string
if podAutoscaler != nil {
// If we don't have an instance locally, we create it. Deletion is handled through setting the `Deleted` flag
log.Debugf("Creating internal PodAutoscaler: %s from Kubernetes object", key)
pai := model.NewPodAutoscalerInternal(podAutoscaler)
pai := c.builder.NewFromKubernetes(podAutoscaler)
c.store.UnlockSet(key, pai, c.ID)
} else {
// If podAutoscaler == nil, both objects are nil, nothing to do
Expand Down
5 changes: 4 additions & 1 deletion pkg/clusteragent/autoscaling/workload/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func newFixture(t *testing.T, testTime time.Time) *fixture {
ControllerFixture: autoscaling.NewFixture(
t, podAutoscalerGVR,
func(fakeClient *fake.FakeDynamicClient, informer dynamicinformer.DynamicSharedInformerFactory, isLeader func() bool) (*autoscaling.Controller, error) {
c, err := NewController(clock, "cluster-id1", recorder, nil, nil, nil, fakeClient, informer, isLeader, store, podWatcher, nil, hashHeap, nil)
c, err := NewController(clock, "cluster-id1", recorder, nil, nil, nil, fakeClient, informer, isLeader, store, podWatcher, nil, hashHeap, nil, model.NewPodAutoscalerInternalBuilder(false))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1520,6 +1520,9 @@ func TestProfileManagedDPA(t *testing.T) {
condition(datadoghqcommon.DatadogPodAutoscalerHorizontalAbleToScaleCondition, corev1.ConditionUnknown, "", "", testTime),
condition(datadoghqcommon.DatadogPodAutoscalerVerticalAbleToApply, corev1.ConditionUnknown, "", "", testTime),
},
Options: &datadoghqcommon.DatadogPodAutoscalerOptionsStatus{
Burstable: pointer.Ptr(true),
},
},
}
f.ExpectCreateAction(mustUnstructured(t, expectedDPA))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@ func (u *verticalController) sync(ctx context.Context, podAutoscaler *datadoghq.
return autoscaling.NoRequeue, nil
}

// Fetch pods early: the QOS class of existing pods determines the effective burstable mode
// when burstable is not explicitly configured (Guaranteed QOS defaults to non-burstable).
pods := u.podWatcher.GetPodsForOwner(target)
autoscalerInternal.SetPodsGuaranteedQOS(isPodsGuaranteedQOS(pods))

// Deep-copy to avoid mutating the original recommendation stored in mainScalingValues/fallbackScalingValues.
// Without this, clamped values would persist and the VerticalScalingLimited condition would be
// cleared on the next sync since constraints re-applied to already-clamped values are no-ops.
Expand All @@ -104,8 +109,6 @@ func (u *verticalController) sync(ctx context.Context, podAutoscaler *datadoghq.
// differs from non-burstable — no extra suffix required.
recommendationID := constrainedVertical.ResourcesHash

// Get the pods for the pod owner
pods := u.podWatcher.GetPodsForOwner(target)
if len(pods) == 0 {
// If we found nothing, we'll wait just until the next sync
log.Debugf("No pods found for autoscaler: %s, gvk: %s, name: %s", autoscalerInternal.ID(), targetGVK.String(), autoscalerInternal.Spec().TargetRef.Name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,11 @@ const (

const inPlaceResizeSupportedCacheTTL = 15 * time.Minute

// removeLimitSentinel is stored in a container Limits map by applyVerticalConstraints (burstable
// mode) to signal "delete this limit from the pod instead of setting it to this value".
// Negative quantities are never valid as real Kubernetes resource values, making the intent
// unambiguous and easy to identify with quantity.Sign() < 0.
// removeLimitSentinel is placed in ContainerResources.Limits by applyVerticalConstraints
// to signal that an existing limit must be actively deleted from the live pod, rather
// than set to a new value. Negative quantities are never valid Kubernetes resource values,
// making this unambiguous. The sentinel must be inserted AFTER the limits >= requests
// invariant check to prevent it from being overwritten.
var removeLimitSentinel = resource.MustParse("-1")

// isInPlaceResizeSupported checks whether the API server exposes the pods/resize
Expand Down Expand Up @@ -422,6 +423,18 @@ func applyVerticalConstraints(verticalRecs *model.VerticalScalingValues, constra
}
}

// ControlledValues=CPURequestsRemoveLimitsMemoryRequestsAndLimits (phase 1 of 2):
// Remove CPU from limits before clamping and the invariant check so neither
// touches the CPU limit. The sentinel is inserted in phase 2, after the invariant.
isCPURemoveLimits := constraint.ControlledValues != nil &&
*constraint.ControlledValues == datadoghqcommon.DatadogPodAutoscalerContainerControlledValuesCPURequestsRemoveLimitsMemoryRequestsAndLimits
if isCPURemoveLimits {
if _, hasCPULimit := cr.Limits[corev1.ResourceCPU]; hasCPULimit {
delete(cr.Limits, corev1.ResourceCPU)
modified = true
}
}

// Resolve min/max bounds for clamping.
// New top-level MinAllowed/MaxAllowed apply to both requests and limits.
// Deprecated Requests.MinAllowed/MaxAllowed apply to requests only.
Expand All @@ -444,6 +457,18 @@ func applyVerticalConstraints(verticalRecs *model.VerticalScalingValues, constra
}
}

// ControlledValues=CPURequestsRemoveLimitsMemoryRequestsAndLimits (phase 2 of 2):
// Insert sentinel AFTER the invariant check to prevent it from being overwritten.
// The sentinel signals to the pod patcher that any pre-existing CPU limit must be
// actively deleted from the live pod, even if the backend never included a CPU limit.
if isCPURemoveLimits {
if cr.Limits == nil {
cr.Limits = corev1.ResourceList{}
}
cr.Limits[corev1.ResourceCPU] = removeLimitSentinel
modified = true
}

kept = append(kept, cr)
}

Expand Down Expand Up @@ -613,6 +638,20 @@ func getPodResizeStatus(pod *workloadmeta.KubernetesPod, recommendationID string
return PodResizeStatusCompleted, time.Time{}
}

// isPodsGuaranteedQOS returns true if all pods have Guaranteed QOS class.
// Returns false when the slice is empty (unknown QOS → no override applied).
func isPodsGuaranteedQOS(pods []*workloadmeta.KubernetesPod) bool {
if len(pods) == 0 {
return false
}
for _, pod := range pods {
if pod.QOSClass != string(corev1.PodQOSGuaranteed) {
return false
}
}
return true
}

func fromAutoscalerToContainerResourcePatches(autoscalerInternal *model.PodAutoscalerInternal, pod *workloadmeta.KubernetesPod) []workloadpatcher.ContainerResourcePatch {
containersResources := autoscalerInternal.ScalingValues().Vertical.ContainerResources

Expand All @@ -622,6 +661,8 @@ func fromAutoscalerToContainerResourcePatches(autoscalerInternal *model.PodAutos
recoByName[cr.Name] = cr
}

// IsBurstable() is safe to call here: SetPodsGuaranteedQOS was called at the top of sync(),
// so the Guaranteed-QOS override is already reflected in the returned value.
burstable := autoscalerInternal.IsBurstable()

// Build the list of patches ordered to API server pod container order.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -691,6 +691,50 @@ func TestApplyVerticalConstraints_AllFeatures(t *testing.T) {
assert.Equal(t, expectedHash, vertical.ResourcesHash)
}

func TestApplyVerticalConstraints_CPURequestsRemoveLimits(t *testing.T) {
vertical := &model.VerticalScalingValues{
ResourcesHash: "original-hash",
ContainerResources: []datadoghqcommon.DatadogPodAutoscalerContainerResources{
{
Name: "app",
Requests: corev1.ResourceList{"cpu": resource.MustParse("300m"), "memory": resource.MustParse("256Mi")},
Limits: corev1.ResourceList{"cpu": resource.MustParse("600m"), "memory": resource.MustParse("512Mi")},
},
},
}
constraints := &datadoghqcommon.DatadogPodAutoscalerConstraints{
Containers: []datadoghqcommon.DatadogPodAutoscalerContainerConstraints{
{
Name: "app",
ControlledValues: pointer.Ptr(datadoghqcommon.DatadogPodAutoscalerContainerControlledValuesCPURequestsRemoveLimitsMemoryRequestsAndLimits),
},
},
}

limitErr, err := applyVerticalConstraints(vertical, constraints, false)
require.NoError(t, err)
assert.Nil(t, limitErr)

require.Len(t, vertical.ContainerResources, 1)
app := vertical.ContainerResources[0]

// CPU limit must carry the sentinel value so patchContainerResources removes it from the pod
cpuLimit, exists := app.Limits[corev1.ResourceCPU]
require.True(t, exists, "CPU key must be present in limits (sentinel)")
assert.Equal(t, 0, cpuLimit.Cmp(removeLimitSentinel), "CPU limit must be the remove-limit sentinel value")
// Memory limit must be preserved
assert.Equal(t, resource.MustParse("512Mi"), app.Limits[corev1.ResourceMemory], "memory limit must be preserved")
// CPU and memory requests must be preserved
assert.Equal(t, resource.MustParse("300m"), app.Requests[corev1.ResourceCPU])
assert.Equal(t, resource.MustParse("256Mi"), app.Requests[corev1.ResourceMemory])

// Hash must be recomputed
assert.NotEqual(t, "original-hash", vertical.ResourcesHash)
expectedHash, err := autoscaling.ObjectHash(vertical.ContainerResources)
require.NoError(t, err)
assert.Equal(t, expectedHash, vertical.ResourcesHash)
}

func TestApplyVerticalConstraints_ValidationErrors(t *testing.T) {
vertical := &model.VerticalScalingValues{
ResourcesHash: "original-hash",
Expand Down Expand Up @@ -810,4 +854,54 @@ func TestFromAutoscalerToContainerResourcePatches_Burstable(t *testing.T) {
assert.Equal(t, "500m", p.Limits["cpu"], "cpu limit must be set when not burstable")
assert.Empty(t, p.LimitsToDelete, "LimitsToDelete must be empty when not burstable")
})

t.Run("Guaranteed QOS suppresses cluster-default burstable: cpu limit preserved", func(t *testing.T) {
ai := (&model.FakePodAutoscalerInternal{
Namespace: "default",
Name: "ai",
ScalingValues: model.ScalingValues{Vertical: sv},
ClusterBurstableDefault: true,
PodsGuaranteedQOS: true,
}).Build()

patches := fromAutoscalerToContainerResourcePatches(&ai, pod)

require.Len(t, patches, 1)
p := patches[0]
assert.Equal(t, "500m", p.Limits["cpu"], "cpu limit must be preserved for Guaranteed QOS pods")
assert.Empty(t, p.LimitsToDelete, "LimitsToDelete must be empty for Guaranteed QOS pods")
})
}

func TestIsPodsGuaranteedQOS(t *testing.T) {
guaranteed := string(corev1.PodQOSGuaranteed)
burstable := string(corev1.PodQOSBurstable)

t.Run("empty slice returns false", func(t *testing.T) {
assert.False(t, isPodsGuaranteedQOS(nil))
assert.False(t, isPodsGuaranteedQOS([]*workloadmeta.KubernetesPod{}))
})

t.Run("all Guaranteed returns true", func(t *testing.T) {
pods := []*workloadmeta.KubernetesPod{
{EntityID: workloadmeta.EntityID{ID: "p1"}, QOSClass: guaranteed},
{EntityID: workloadmeta.EntityID{ID: "p2"}, QOSClass: guaranteed},
}
assert.True(t, isPodsGuaranteedQOS(pods))
})

t.Run("mixed QOS returns false", func(t *testing.T) {
pods := []*workloadmeta.KubernetesPod{
{EntityID: workloadmeta.EntityID{ID: "p1"}, QOSClass: guaranteed},
{EntityID: workloadmeta.EntityID{ID: "p2"}, QOSClass: burstable},
}
assert.False(t, isPodsGuaranteedQOS(pods))
})

t.Run("single Burstable pod returns false", func(t *testing.T) {
pods := []*workloadmeta.KubernetesPod{
{EntityID: workloadmeta.EntityID{ID: "p1"}, QOSClass: burstable},
}
assert.False(t, isPodsGuaranteedQOS(pods))
})
}
Original file line number Diff line number Diff line change
Expand Up @@ -1660,7 +1660,7 @@ func TestCalculateHorizontalRecommendations(t *testing.T) {
Conditions: []datadoghqcommon.DatadogPodAutoscalerCondition{},
},
}
dpai := model.NewPodAutoscalerInternal(dpa)
dpai := model.NewPodAutoscalerInternalBuilder(false).NewFromKubernetes(dpa)

r := newReplicaCalculator(clock.RealClock{}, pw)
res, err := r.calculateHorizontalRecommendations(dpai, lStore)
Expand Down
Loading
Loading