diff --git a/cluster-autoscaler/core/static_autoscaler.go b/cluster-autoscaler/core/static_autoscaler.go index e3dff2092ad9..29f4b5a88070 100644 --- a/cluster-autoscaler/core/static_autoscaler.go +++ b/cluster-autoscaler/core/static_autoscaler.go @@ -22,6 +22,7 @@ import ( "reflect" "time" + resourceapi "k8s.io/api/resource/v1beta1" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" "k8s.io/autoscaler/cluster-autoscaler/clusterstate" "k8s.io/autoscaler/cluster-autoscaler/clusterstate/utils" @@ -1001,6 +1002,27 @@ func (a *StaticAutoscaler) obtainNodeLists() ([]*apiv1.Node, []*apiv1.Node, caer // TODO: Remove this call when we handle dynamically provisioned resources. allNodes, readyNodes = a.processors.CustomResourcesProcessor.FilterOutNodesWithUnreadyResources(a.AutoscalingContext, allNodes, readyNodes) allNodes, readyNodes = taints.FilterOutNodesWithStartupTaints(a.taintConfig, allNodes, readyNodes) + + if a.draProvider != nil { + resourceSliceSnapshot, err := a.draProvider.Snapshot() + if err != nil { + klog.Errorf("Failed to filter out nodes with unready resources: %v", err) + return nil, nil, caerrors.ToAutoscalerError(caerrors.ApiCallError, err) + } + resourceSlices := make([]*resourceapi.ResourceSlice, 0) + for _, node := range allNodes { + nodeResourceSlices, ok := resourceSliceSnapshot.NodeResourceSlices(node.Name) + if ok { + resourceSlices = append(resourceSlices, nodeResourceSlices...) + } + } + allNodes, readyNodes, err = a.processors.DynamicResourcesProcessor.FilterOutNodesWithUnreadyResources(a.AutoscalingContext, allNodes, readyNodes, resourceSlices) + if err != nil { + klog.Errorf("Failed to filter out nodes with unready resources: %v", err) + return nil, nil, caerrors.ToAutoscalerError(caerrors.ApiCallError, err) + } + } + return allNodes, readyNodes, nil } diff --git a/cluster-autoscaler/core/static_autoscaler_dra_test.go b/cluster-autoscaler/core/static_autoscaler_dra_test.go index 21e77d0c143f..b60325570848 100644 --- a/cluster-autoscaler/core/static_autoscaler_dra_test.go +++ b/cluster-autoscaler/core/static_autoscaler_dra_test.go @@ -291,6 +291,7 @@ func TestStaticAutoscalerDynamicResources(t *testing.T) { }, "scale-up: pods requesting a shared, unallocated claim": { extraResourceClaims: []*resourceapi.ResourceClaim{sharedGpuBClaim}, + extraResourceSlices: node1GpuB1slice.slicesTemplateFunc(node1GpuB1slice.name + "-template"), nodeGroups: map[*testNodeGroupDef]int{node1GpuB1slice: 1}, pods: append( unscheduledPods(baseSmallPod, "unschedulable", 13, nil, sharedGpuBClaim), diff --git a/cluster-autoscaler/processors/dynamicresources/dynamic_resources_processor.go b/cluster-autoscaler/processors/dynamicresources/dynamic_resources_processor.go new file mode 100644 index 000000000000..9bd6230870df --- /dev/null +++ b/cluster-autoscaler/processors/dynamicresources/dynamic_resources_processor.go @@ -0,0 +1,212 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dynamicresources + +import ( + "reflect" + + apiv1 "k8s.io/api/core/v1" + resourceapi "k8s.io/api/resource/v1beta1" + ca_context "k8s.io/autoscaler/cluster-autoscaler/context" + drautils "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/utils" + "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" + "k8s.io/klog/v2" +) + +// DynamicResourcesProcessor handles dynamic resource. +// dynamic resources may not be allocatable immediately after the node creation. +// It compares the expected resourceslices with the existing resourceslices to assess node readiness. +type DynamicResourcesProcessor interface { + // FilterOutNodesWithUnreadyResources removes nodes that should have dynamic resources, but have not published resourceslices yet. + FilterOutNodesWithUnreadyResources( + context *ca_context.AutoscalingContext, + allNodes, readyNodes []*apiv1.Node, + resourceSlices []*resourceapi.ResourceSlice, + ) ([]*apiv1.Node, []*apiv1.Node, error) + CleanUp() +} + +// NewDefaultDynamicResourcesProcessor returns a default instance of DynamicResourcesProcessor. +func NewDefaultDynamicResourcesProcessor() DynamicResourcesProcessor { + return &dynamicResourcesProcessor{} +} + +type dynamicResourcesProcessor struct{} + +// FilterOutNodesWithUnreadyResources removes nodes that should have dynamic resources, but have not published resourceslices yet. +func (p *dynamicResourcesProcessor) FilterOutNodesWithUnreadyResources( + context *ca_context.AutoscalingContext, + allNodes, readyNodes []*apiv1.Node, + resourceSlices []*resourceapi.ResourceSlice, +) ([]*apiv1.Node, []*apiv1.Node, error) { + newAllNodes := make([]*apiv1.Node, 0) + newReadyNodes := make([]*apiv1.Node, 0) + nodesWithUnreadyResources := make(map[string]*apiv1.Node) + for _, node := range readyNodes { + isReady, err := p.checkNodeReadiness(context, node, resourceSlices) + if err != nil { + return nil, nil, err + } + if isReady { + newReadyNodes = append(newReadyNodes, node) + } else { + nodesWithUnreadyResources[node.Name] = node + } + } + for _, node := range allNodes { + if newNode, found := nodesWithUnreadyResources[node.Name]; found { + newAllNodes = append(newAllNodes, kubernetes.GetUnreadyNodeCopy(newNode, kubernetes.ResourceUnready)) + } else { + newAllNodes = append(newAllNodes, node) + } + } + return newAllNodes, newReadyNodes, nil +} + +func (p *dynamicResourcesProcessor) checkNodeReadiness( + context *ca_context.AutoscalingContext, + node *apiv1.Node, + resourceSlices []*resourceapi.ResourceSlice, +) (bool, error) { + nodegroup, err := context.CloudProvider.NodeGroupForNode(node) + if err != nil { + return false, err + } + if nodegroup == nil { // Node is not by autoscaler + return true, nil + } + nodeTemplate, err := nodegroup.TemplateNodeInfo() + if err != nil { + // should not happen, but if it does, we assume the node is ready, because it is probably handled by CA + klog.V(4).Infof("nodegroup template is not available for node %s: %v", node.Name, err) + return true, nil + } + templateResourceSlices, _, err := drautils.SanitizedNodeResourceSlices(nodeTemplate.LocalResourceSlices, node.Name, "") + if err != nil { + return false, err + } + if len(templateResourceSlices) == 0 { + return true, nil + } + clusterResourceSlices := resourceSlices + nodeResourceSlices := make([]*resourceapi.ResourceSlice, 0) + for _, rs := range clusterResourceSlices { + if rs != nil && rs.Spec.NodeName == node.Name { + nodeResourceSlices = append(nodeResourceSlices, rs) + } + } + + nodeResourceSlices, _, err = drautils.SanitizedNodeResourceSlices(nodeResourceSlices, node.Name, "") + if err != nil { + return false, err + } + + if len(templateResourceSlices) != len(nodeResourceSlices) { + return false, nil // Different number of slices means not ready/matched yet + } + + for _, templateResourceSlice := range templateResourceSlices { + var matched bool = false + for _, nodeResourceSlice := range nodeResourceSlices { + if compareResourceSlices(templateResourceSlice, nodeResourceSlice) { + matched = true + break + } + if !matched { + return false, nil // No match was found for this template slice on the node + } + } + } + + return true, nil // All template slices found a match + +} + +func compareResourceSlices( + resourceSlice1, resourceSlice2 *resourceapi.ResourceSlice, +) bool { + // In order to assess whether the expected resourceslices have been published + // we only need to compare the spec + if resourceSlice1 == nil && resourceSlice2 == nil { + return true + } + if resourceSlice1 == nil || resourceSlice2 == nil { + return false + } + if resourceSlice1.Spec.Driver != resourceSlice2.Spec.Driver { + return false + } + if resourceSlice1.Spec.NodeName != resourceSlice2.Spec.NodeName { + return false + } + devices1 := resourceSlice1.Spec.Devices + devices2 := resourceSlice2.Spec.Devices + if len(devices1) != len(devices2) { + return false + } + + matched2 := make([]bool, len(devices2)) + for _, d1 := range devices1 { + foundMatch := false + for j, d2 := range devices2 { + if !matched2[j] && compareDevicesIgnoringName(d1, d2) { + matched2[j] = true + foundMatch = true + break + } + } + if !foundMatch { + return false + } + } + + if !reflect.DeepEqual(resourceSlice1.Spec.AllNodes, resourceSlice2.Spec.AllNodes) { + return false + } + if !reflect.DeepEqual(resourceSlice1.Spec.NodeSelector, resourceSlice2.Spec.NodeSelector) { + return false + } + + return true +} + +// CleanUp cleans up processor's internal structures. +func (p *dynamicResourcesProcessor) CleanUp() { +} + +type mockDynamicResourcesProcessor struct { +} + +func (m *mockDynamicResourcesProcessor) FilterOutNodesWithUnreadyResources( + context *ca_context.AutoscalingContext, + allNodes, readyNodes []*apiv1.Node, + resourceSlices []*resourceapi.ResourceSlice, +) ([]*apiv1.Node, []*apiv1.Node, error) { + return allNodes, readyNodes, nil +} + +func (m *mockDynamicResourcesProcessor) CleanUp() { +} + +// NewMockDynamicResourcesProcessor returns a mock instance of DynamicResourcesProcessor. +func NewMockDynamicResourcesProcessor() DynamicResourcesProcessor { + return &mockDynamicResourcesProcessor{} +} + +func compareDevicesIgnoringName(d1, d2 resourceapi.Device) bool { + return reflect.DeepEqual(d1.Basic, d2.Basic) +} diff --git a/cluster-autoscaler/processors/dynamicresources/dynamic_resources_processor_test.go b/cluster-autoscaler/processors/dynamicresources/dynamic_resources_processor_test.go new file mode 100644 index 000000000000..4020aeb171e3 --- /dev/null +++ b/cluster-autoscaler/processors/dynamicresources/dynamic_resources_processor_test.go @@ -0,0 +1,225 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dynamicresources + +import ( + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + apiv1 "k8s.io/api/core/v1" + resourceapi "k8s.io/api/resource/v1beta1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + testprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test" + ca_context "k8s.io/autoscaler/cluster-autoscaler/context" + "k8s.io/autoscaler/cluster-autoscaler/simulator/framework" + testutils "k8s.io/autoscaler/cluster-autoscaler/utils/test" + klog "k8s.io/klog/v2" +) + +func TestDynamicResourcesProcessor(t *testing.T) { + start := time.Now() + later := start.Add(10 * time.Minute) + onScaleUpMock := &onScaleUpMock{} + onScaleDownMock := &onScaleDownMock{} + onNodeGroupCreateMock := &onNodeGroupCreateMock{} + onNodeGroupDeleteMock := &onNodeGroupDeleteMock{} + deleteFinished := make(chan bool, 1) + + tn1 := testutils.BuildTestNode( + "readynodewithreadyresources", + 1000, + 1000, + ) + tn1.Status.Conditions = []apiv1.NodeCondition{ + { + Type: apiv1.NodeReady, + Status: apiv1.ConditionTrue, + LastHeartbeatTime: metav1.NewTime(later), + }, + } + tn2 := testutils.BuildTestNode( + "readynodewithunreadyresources", + 1000, + 1000, + ) + tn2.Status.Conditions = []apiv1.NodeCondition{ + { + Type: apiv1.NodeReady, + Status: apiv1.ConditionTrue, + LastHeartbeatTime: metav1.NewTime(later), + }, + } + tn3 := testutils.BuildTestNode( + "readynodewithoutresources", + 1000, + 1000, + ) + tn3.Status.Conditions = []apiv1.NodeCondition{ + { + Type: apiv1.NodeReady, + Status: apiv1.ConditionTrue, + LastHeartbeatTime: metav1.NewTime(later), + }, + } + tn4 := testutils.BuildTestNode( + "unreadynodewithoutresources", + 1000, + 1000, + ) + tn4.Status.Conditions = []apiv1.NodeCondition{ + { + Type: apiv1.NodeReady, + Status: apiv1.ConditionFalse, + LastHeartbeatTime: metav1.NewTime(later), + }, + } + tn5 := testutils.BuildTestNode( + "nodenothandledbyautoscaler", + 1000, + 1000, + ) + tn5.Status.Conditions = []apiv1.NodeCondition{ + { + Type: apiv1.NodeReady, + Status: apiv1.ConditionTrue, + LastHeartbeatTime: metav1.NewTime(later), + }, + } + + expectedReadiness := make(map[string]bool) + expectedReadiness["readynodewithreadyresources"] = true + expectedReadiness["readynodewithunreadyresources"] = false + expectedReadiness["readynodewithoutresources"] = true + expectedReadiness["unreadynodewithoutresources"] = false + expectedReadiness["nodenothandledbyautoscaler"] = true + + testResourceSlices := []*resourceapi.ResourceSlice{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "slice1", + }, + Spec: resourceapi.ResourceSliceSpec{ + NodeName: "readynodewithreadyresources", + }, + }, + } + + tni1 := framework.NewTestNodeInfo(tn1) + tni3 := framework.NewTestNodeInfo(tn3) + + tni1.LocalResourceSlices = testResourceSlices + + provider := testprovider.NewTestAutoprovisioningCloudProvider( + func(id string, delta int) error { + return onScaleUpMock.ScaleUp(id, delta) + }, func(id string, name string) error { + ret := onScaleDownMock.ScaleDown(id, name) + deleteFinished <- true + return ret + }, func(id string) error { + return onNodeGroupCreateMock.Create(id) + }, func(id string) error { + return onNodeGroupDeleteMock.Delete(id) + }, + []string{"tni1", "tni3"}, + map[string]*framework.NodeInfo{"ng1": tni1, "ng3": tni3}, + ) + + provider.AddAutoprovisionedNodeGroup("ng1", 0, 10, 2, "ng1") + provider.AddAutoprovisionedNodeGroup("ng3", 0, 10, 2, "ng3") + provider.AddNode("ng1", tn1) + provider.AddNode("ng1", tn2) + provider.AddNode("ng3", tn3) + provider.AddNode("ng3", tn4) + + autoscalingContext := ca_context.AutoscalingContext{ + CloudProvider: provider, + } + + processor := NewDefaultDynamicResourcesProcessor() + + newAllNodes, newReadyNodes, err := processor.FilterOutNodesWithUnreadyResources( + &autoscalingContext, + []*apiv1.Node{tn1, tn2, tn3, tn4, tn5}, + []*apiv1.Node{tn1, tn2, tn3, tn5}, + testResourceSlices, + ) + + assert.NoError(t, err) + + foundInReady := make(map[string]bool) + for _, node := range newReadyNodes { + foundInReady[node.Name] = true + assert.True(t, expectedReadiness[node.Name], fmt.Sprintf("Node %s found in ready nodes list (it shouldn't be there)", node.Name)) + } + for nodeName, expected := range expectedReadiness { + if expected { + assert.True(t, foundInReady[nodeName], fmt.Sprintf("Node %s expected ready, but not found in ready nodes list", nodeName)) + } + } + for _, node := range newAllNodes { + assert.Equal(t, len(node.Status.Conditions), 1) + if expectedReadiness[node.Name] { + assert.Equal(t, node.Status.Conditions[0].Status, apiv1.ConditionTrue, fmt.Sprintf("Unexpected ready condition value for node %s", node.Name)) + } else { + assert.Equal(t, node.Status.Conditions[0].Status, apiv1.ConditionFalse, fmt.Sprintf("Unexpected ready condition value for node %s", node.Name)) + } + } +} + +type onScaleUpMock struct { + mock.Mock +} + +func (m *onScaleUpMock) ScaleUp(id string, delta int) error { + klog.Infof("Scale up: %v %v", id, delta) + args := m.Called(id, delta) + return args.Error(0) +} + +type onScaleDownMock struct { + mock.Mock +} + +func (m *onScaleDownMock) ScaleDown(id string, name string) error { + klog.Infof("Scale down: %v %v", id, name) + args := m.Called(id, name) + return args.Error(0) +} + +type onNodeGroupCreateMock struct { + mock.Mock +} + +func (m *onNodeGroupCreateMock) Create(id string) error { + klog.Infof("Create group: %v", id) + args := m.Called(id) + return args.Error(0) +} + +type onNodeGroupDeleteMock struct { + mock.Mock +} + +func (m *onNodeGroupDeleteMock) Delete(id string) error { + klog.Infof("Delete group: %v", id) + args := m.Called(id) + return args.Error(0) +} diff --git a/cluster-autoscaler/processors/processors.go b/cluster-autoscaler/processors/processors.go index b391fed789b2..e54bb0afa319 100644 --- a/cluster-autoscaler/processors/processors.go +++ b/cluster-autoscaler/processors/processors.go @@ -22,6 +22,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/processors/actionablecluster" "k8s.io/autoscaler/cluster-autoscaler/processors/binpacking" "k8s.io/autoscaler/cluster-autoscaler/processors/customresources" + "k8s.io/autoscaler/cluster-autoscaler/processors/dynamicresources" "k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupconfig" "k8s.io/autoscaler/cluster-autoscaler/processors/nodegroups" "k8s.io/autoscaler/cluster-autoscaler/processors/nodegroups/asyncnodegroups" @@ -62,6 +63,8 @@ type AutoscalingProcessors struct { NodeGroupConfigProcessor nodegroupconfig.NodeGroupConfigProcessor // CustomResourcesProcessor is interface defining handling custom resources CustomResourcesProcessor customresources.CustomResourcesProcessor + // DynamicResourcesProcessor is interface defining handling dynamic resources + DynamicResourcesProcessor dynamicresources.DynamicResourcesProcessor // ActionableClusterProcessor is interface defining whether the cluster is in an actionable state ActionableClusterProcessor actionablecluster.ActionableClusterProcessor // ScaleDownCandidatesNotifier is used to Update and Register new scale down candidates observer. @@ -98,6 +101,7 @@ func DefaultProcessors(options config.AutoscalingOptions) *AutoscalingProcessors AsyncNodeGroupStateChecker: asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker(), NodeGroupConfigProcessor: nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults), CustomResourcesProcessor: customresources.NewDefaultCustomResourcesProcessor(), + DynamicResourcesProcessor: dynamicresources.NewDefaultDynamicResourcesProcessor(), ActionableClusterProcessor: actionablecluster.NewDefaultActionableClusterProcessor(), TemplateNodeInfoProvider: nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false), ScaleDownCandidatesNotifier: scaledowncandidates.NewObserversList(), @@ -119,6 +123,7 @@ func (ap *AutoscalingProcessors) CleanUp() { ap.ScaleDownNodeProcessor.CleanUp() ap.NodeGroupConfigProcessor.CleanUp() ap.CustomResourcesProcessor.CleanUp() + ap.DynamicResourcesProcessor.CleanUp() ap.TemplateNodeInfoProvider.CleanUp() ap.ActionableClusterProcessor.CleanUp() } diff --git a/cluster-autoscaler/processors/test/common.go b/cluster-autoscaler/processors/test/common.go index 065b06d92f99..05b4a544212d 100644 --- a/cluster-autoscaler/processors/test/common.go +++ b/cluster-autoscaler/processors/test/common.go @@ -25,6 +25,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/processors/actionablecluster" "k8s.io/autoscaler/cluster-autoscaler/processors/binpacking" "k8s.io/autoscaler/cluster-autoscaler/processors/customresources" + "k8s.io/autoscaler/cluster-autoscaler/processors/dynamicresources" "k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupconfig" "k8s.io/autoscaler/cluster-autoscaler/processors/nodegroups" "k8s.io/autoscaler/cluster-autoscaler/processors/nodegroups/asyncnodegroups" @@ -53,6 +54,7 @@ func NewTestProcessors(context *context.AutoscalingContext) *processors.Autoscal TemplateNodeInfoProvider: nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false), NodeGroupConfigProcessor: nodegroupconfig.NewDefaultNodeGroupConfigProcessor(context.NodeGroupDefaults), CustomResourcesProcessor: customresources.NewDefaultCustomResourcesProcessor(), + DynamicResourcesProcessor: dynamicresources.NewDefaultDynamicResourcesProcessor(), ActionableClusterProcessor: actionablecluster.NewDefaultActionableClusterProcessor(), ScaleDownCandidatesNotifier: scaledowncandidates.NewObserversList(), ScaleStateNotifier: nodegroupchange.NewNodeGroupChangeObserversList(),