From 2ab4871b18c8346312a8963aae6c1d6f77705684 Mon Sep 17 00:00:00 2001 From: Amanuel Engeda <74629455+engedaam@users.noreply.github.com> Date: Thu, 21 Nov 2024 17:00:54 -0800 Subject: [PATCH] chore: Remove Eventual Disruption cloud provider interface (#1832) --- .gitignore | 2 + hack/kwok/requirements.sh | 5 +- kwok/apis/v1alpha1/kwoknodeclass.go | 4 - kwok/charts/crds/karpenter.sh_nodepools.yaml | 7 +- kwok/cloudprovider/cloudprovider.go | 4 - pkg/apis/crds/karpenter.sh_nodepools.yaml | 6 +- pkg/apis/v1/nodepool.go | 3 +- pkg/apis/v1/nodepool_budgets_test.go | 8 +- pkg/cloudprovider/fake/cloudprovider.go | 4 - pkg/cloudprovider/types.go | 3 - pkg/controllers/disruption/controller.go | 28 +- .../disruption/{eventual.go => drift.go} | 20 +- .../{eventual_test.go => drift_test.go} | 554 ------------------ pkg/controllers/disruption/suite_test.go | 5 +- test/suites/perf/disruption_test.go | 68 --- 15 files changed, 33 insertions(+), 688 deletions(-) rename pkg/controllers/disruption/{eventual.go => drift.go} (83%) rename pkg/controllers/disruption/{eventual_test.go => drift_test.go} (61%) delete mode 100644 test/suites/perf/disruption_test.go diff --git a/.gitignore b/.gitignore index 95f28cff37..79830f4273 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,8 @@ coverage.html *.test *.cpuprofile *.heapprofile +go.work +go.work.sum # Common in OSs and IDEs .idea diff --git a/hack/kwok/requirements.sh b/hack/kwok/requirements.sh index 75dbaee374..b7fa80434b 100755 --- a/hack/kwok/requirements.sh +++ b/hack/kwok/requirements.sh @@ -8,7 +8,4 @@ yq eval '.spec.versions[0].schema.openAPIV3Schema.properties.spec.properties.req # NodePool Validation: yq eval '.spec.versions[0].schema.openAPIV3Schema.properties.spec.properties.template.properties.spec.properties.requirements.items.properties.key.x-kubernetes-validations += [ - {"message": "label domain \"karpenter.kwok.sh\" is restricted", "rule": "self in [\"karpenter.kwok.sh/instance-cpu\", \"karpenter.kwok.sh/instance-memory\", \"karpenter.kwok.sh/instance-family\", \"karpenter.kwok.sh/instance-size\"] || !self.find(\"^([^/]+)\").endsWith(\"karpenter.kwok.sh\")"}]' -i kwok/charts/crds/karpenter.sh_nodepools.yaml - -# Add ExampleReason in KwoK CloudProvider -yq eval '.spec.versions[0].schema.openAPIV3Schema.properties.spec.properties.disruption.properties.budgets.items.properties.reasons.items.enum += [ "ExampleReason" ]' -i kwok/charts/crds/karpenter.sh_nodepools.yaml \ No newline at end of file + {"message": "label domain \"karpenter.kwok.sh\" is restricted", "rule": "self in [\"karpenter.kwok.sh/instance-cpu\", \"karpenter.kwok.sh/instance-memory\", \"karpenter.kwok.sh/instance-family\", \"karpenter.kwok.sh/instance-size\"] || !self.find(\"^([^/]+)\").endsWith(\"karpenter.kwok.sh\")"}]' -i kwok/charts/crds/karpenter.sh_nodepools.yaml \ No newline at end of file diff --git a/kwok/apis/v1alpha1/kwoknodeclass.go b/kwok/apis/v1alpha1/kwoknodeclass.go index 6e2b248307..ecc08fcf48 100644 --- a/kwok/apis/v1alpha1/kwoknodeclass.go +++ b/kwok/apis/v1alpha1/kwoknodeclass.go @@ -18,8 +18,6 @@ package v1alpha1 import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - - v1 "sigs.k8s.io/karpenter/pkg/apis/v1" ) // KWOKNodeClass is the Schema for the KWOKNodeClass API @@ -40,5 +38,3 @@ type KWOKNodeClassList struct { metav1.ListMeta `json:"metadata,omitempty"` Items []KWOKNodeClass `json:"items"` } - -const DisruptionReasonExampleReason v1.DisruptionReason = "ExampleReason" diff --git a/kwok/charts/crds/karpenter.sh_nodepools.yaml b/kwok/charts/crds/karpenter.sh_nodepools.yaml index b4eea14bf6..35def943c8 100644 --- a/kwok/charts/crds/karpenter.sh_nodepools.yaml +++ b/kwok/charts/crds/karpenter.sh_nodepools.yaml @@ -114,16 +114,13 @@ spec: description: |- Reasons is a list of disruption methods that this budget applies to. If Reasons is not set, this budget applies to all methods. Otherwise, this will apply to each reason defined. - allowed reasons are Underutilized, Empty, and Drifted and additional CloudProvider-specific reasons. + allowed reasons are Underutilized, Empty, and Drifted. items: - description: |- - DisruptionReason defines valid reasons for disruption budgets. - CloudProviders will need to append to the list of enums when implementing cloud provider disruption reasons + description: DisruptionReason defines valid reasons for disruption budgets. enum: - Underutilized - Empty - Drifted - - ExampleReason type: string type: array schedule: diff --git a/kwok/cloudprovider/cloudprovider.go b/kwok/cloudprovider/cloudprovider.go index 793d3fd756..75f2b816fc 100644 --- a/kwok/cloudprovider/cloudprovider.go +++ b/kwok/cloudprovider/cloudprovider.go @@ -63,10 +63,6 @@ func (c CloudProvider) Create(ctx context.Context, nodeClaim *v1.NodeClaim) (*v1 return c.toNodeClaim(node) } -func (c CloudProvider) DisruptionReasons() []v1.DisruptionReason { - return []v1.DisruptionReason{v1alpha1.DisruptionReasonExampleReason} -} - func (c CloudProvider) Delete(ctx context.Context, nodeClaim *v1.NodeClaim) error { if err := c.kubeClient.Delete(ctx, nodeClaim); err != nil { if errors.IsNotFound(err) { diff --git a/pkg/apis/crds/karpenter.sh_nodepools.yaml b/pkg/apis/crds/karpenter.sh_nodepools.yaml index 1f938f466a..68da49b076 100644 --- a/pkg/apis/crds/karpenter.sh_nodepools.yaml +++ b/pkg/apis/crds/karpenter.sh_nodepools.yaml @@ -114,11 +114,9 @@ spec: description: |- Reasons is a list of disruption methods that this budget applies to. If Reasons is not set, this budget applies to all methods. Otherwise, this will apply to each reason defined. - allowed reasons are Underutilized, Empty, and Drifted and additional CloudProvider-specific reasons. + allowed reasons are Underutilized, Empty, and Drifted. items: - description: |- - DisruptionReason defines valid reasons for disruption budgets. - CloudProviders will need to append to the list of enums when implementing cloud provider disruption reasons + description: DisruptionReason defines valid reasons for disruption budgets. enum: - Underutilized - Empty diff --git a/pkg/apis/v1/nodepool.go b/pkg/apis/v1/nodepool.go index b7f5d7038d..094b7b8852 100644 --- a/pkg/apis/v1/nodepool.go +++ b/pkg/apis/v1/nodepool.go @@ -89,7 +89,7 @@ type Disruption struct { type Budget struct { // Reasons is a list of disruption methods that this budget applies to. If Reasons is not set, this budget applies to all methods. // Otherwise, this will apply to each reason defined. - // allowed reasons are Underutilized, Empty, and Drifted and additional CloudProvider-specific reasons. + // allowed reasons are Underutilized, Empty, and Drifted. // +optional Reasons []DisruptionReason `json:"reasons,omitempty"` // Nodes dictates the maximum number of NodeClaims owned by this NodePool @@ -129,7 +129,6 @@ const ( ) // DisruptionReason defines valid reasons for disruption budgets. -// CloudProviders will need to append to the list of enums when implementing cloud provider disruption reasons // +kubebuilder:validation:Enum={Underutilized,Empty,Drifted} type DisruptionReason string diff --git a/pkg/apis/v1/nodepool_budgets_test.go b/pkg/apis/v1/nodepool_budgets_test.go index 42d8200c80..ac77a4261d 100644 --- a/pkg/apis/v1/nodepool_budgets_test.go +++ b/pkg/apis/v1/nodepool_budgets_test.go @@ -78,7 +78,6 @@ var _ = Describe("Budgets", func() { DisruptionReasonUnderutilized, DisruptionReasonDrifted, DisruptionReasonEmpty, - "CloudProviderDisruptionReason", }, Nodes: "0", Schedule: lo.ToPtr("@weekly"), @@ -93,12 +92,11 @@ var _ = Describe("Budgets", func() { }, }, } - allKnownDisruptionReasons = append([]DisruptionReason{ + allKnownDisruptionReasons = []DisruptionReason{ DisruptionReasonEmpty, DisruptionReasonUnderutilized, - DisruptionReasonDrifted}, - DisruptionReason("CloudProviderDisruptionReason"), - ) + DisruptionReasonDrifted, + } }) Context("GetAllowedDisruptionsByReason", func() { diff --git a/pkg/cloudprovider/fake/cloudprovider.go b/pkg/cloudprovider/fake/cloudprovider.go index b00959458b..01c9b196f5 100644 --- a/pkg/cloudprovider/fake/cloudprovider.go +++ b/pkg/cloudprovider/fake/cloudprovider.go @@ -242,10 +242,6 @@ func (c *CloudProvider) GetInstanceTypes(_ context.Context, np *v1.NodePool) ([] }, nil } -func (c *CloudProvider) DisruptionReasons() []v1.DisruptionReason { - return []v1.DisruptionReason{"CloudProviderDisruptionReason"} -} - func (c *CloudProvider) Delete(_ context.Context, nc *v1.NodeClaim) error { c.mu.Lock() defer c.mu.Unlock() diff --git a/pkg/cloudprovider/types.go b/pkg/cloudprovider/types.go index 6696c6e0ac..d731c5310b 100644 --- a/pkg/cloudprovider/types.go +++ b/pkg/cloudprovider/types.go @@ -68,9 +68,6 @@ type CloudProvider interface { // availability, the GetInstanceTypes method should always return all instance types, // even those with no offerings available. GetInstanceTypes(context.Context, *v1.NodePool) ([]*InstanceType, error) - // DisruptionReasons is for CloudProviders to hook into the Disruption Controller. - // Reasons will show up as StatusConditions on the NodeClaim. - DisruptionReasons() []v1.DisruptionReason // IsDrifted returns whether a NodeClaim has drifted from the provisioning requirements // it is tied to. IsDrifted(context.Context, *v1.NodeClaim) (DriftReason, error) diff --git a/pkg/controllers/disruption/controller.go b/pkg/controllers/disruption/controller.go index aa77c63cad..e43e51f680 100644 --- a/pkg/controllers/disruption/controller.go +++ b/pkg/controllers/disruption/controller.go @@ -71,13 +71,6 @@ func NewController(clk clock.Clock, kubeClient client.Client, provisioner *provi ) *Controller { c := MakeConsolidation(clk, cluster, kubeClient, provisioner, cp, recorder, queue) - // Generate eventually disruptable reason based on a combination of drift and cloudprovider disruption reason - eventualDisruptionMethods := []Method{} - - for _, reason := range append(cp.DisruptionReasons(), v1.DisruptionReasonDrifted) { - eventualDisruptionMethods = append(eventualDisruptionMethods, NewEventualDisruption(kubeClient, cluster, provisioner, recorder, reason)) - } - return &Controller{ queue: queue, clock: clk, @@ -87,17 +80,16 @@ func NewController(clk clock.Clock, kubeClient client.Client, provisioner *provi recorder: recorder, cloudProvider: cp, lastRun: map[string]time.Time{}, - methods: append( - // Terminate any NodeClaims that have need to be eventually disrupted from provisioning specifications, allowing the pods to reschedule. - eventualDisruptionMethods, - []Method{ - // Delete any empty NodeClaims as there is zero cost in terms of disruption. - NewEmptiness(c), - // Attempt to identify multiple NodeClaims that we can consolidate simultaneously to reduce pod churn - NewMultiNodeConsolidation(c), - // And finally fall back our single NodeClaim consolidation to further reduce cluster cost. - NewSingleNodeConsolidation(c), - }...), + methods: []Method{ + // Terminate any NodeClaims that have drifted from provisioning specifications, allowing the pods to reschedule. + NewDrift(kubeClient, cluster, provisioner, recorder), + // Delete any empty NodeClaims as there is zero cost in terms of disruption. + NewEmptiness(c), + // Attempt to identify multiple NodeClaims that we can consolidate simultaneously to reduce pod churn + NewMultiNodeConsolidation(c), + // And finally fall back our single NodeClaim consolidation to further reduce cluster cost. + NewSingleNodeConsolidation(c), + }, } } diff --git a/pkg/controllers/disruption/eventual.go b/pkg/controllers/disruption/drift.go similarity index 83% rename from pkg/controllers/disruption/eventual.go rename to pkg/controllers/disruption/drift.go index f4bd8b779c..06b951ce8a 100644 --- a/pkg/controllers/disruption/eventual.go +++ b/pkg/controllers/disruption/drift.go @@ -32,31 +32,29 @@ import ( ) // Drift is a subreconciler that deletes drifted candidates. -type EventualDisruption struct { +type Drift struct { kubeClient client.Client cluster *state.Cluster provisioner *provisioning.Provisioner recorder events.Recorder - reason v1.DisruptionReason } -func NewEventualDisruption(kubeClient client.Client, cluster *state.Cluster, provisioner *provisioning.Provisioner, recorder events.Recorder, reason v1.DisruptionReason) *EventualDisruption { - return &EventualDisruption{ +func NewDrift(kubeClient client.Client, cluster *state.Cluster, provisioner *provisioning.Provisioner, recorder events.Recorder) *Drift { + return &Drift{ kubeClient: kubeClient, cluster: cluster, provisioner: provisioner, recorder: recorder, - reason: reason, } } // ShouldDisrupt is a predicate used to filter candidates -func (d *EventualDisruption) ShouldDisrupt(ctx context.Context, c *Candidate) bool { +func (d *Drift) ShouldDisrupt(ctx context.Context, c *Candidate) bool { return c.NodeClaim.StatusConditions().Get(string(d.Reason())).IsTrue() } // ComputeCommand generates a disruption command given candidates -func (d *EventualDisruption) ComputeCommand(ctx context.Context, disruptionBudgetMapping map[string]int, candidates ...*Candidate) (Command, scheduling.Results, error) { +func (d *Drift) ComputeCommand(ctx context.Context, disruptionBudgetMapping map[string]int, candidates ...*Candidate) (Command, scheduling.Results, error) { sort.Slice(candidates, func(i int, j int) bool { return candidates[i].NodeClaim.StatusConditions().Get(string(d.Reason())).LastTransitionTime.Time.Before( candidates[j].NodeClaim.StatusConditions().Get(string(d.Reason())).LastTransitionTime.Time) @@ -114,14 +112,14 @@ func (d *EventualDisruption) ComputeCommand(ctx context.Context, disruptionBudge return Command{}, scheduling.Results{}, nil } -func (d *EventualDisruption) Reason() v1.DisruptionReason { - return d.reason +func (d *Drift) Reason() v1.DisruptionReason { + return v1.DisruptionReasonDrifted } -func (d *EventualDisruption) Class() string { +func (d *Drift) Class() string { return EventualDisruptionClass } -func (d *EventualDisruption) ConsolidationType() string { +func (d *Drift) ConsolidationType() string { return "" } diff --git a/pkg/controllers/disruption/eventual_test.go b/pkg/controllers/disruption/drift_test.go similarity index 61% rename from pkg/controllers/disruption/eventual_test.go rename to pkg/controllers/disruption/drift_test.go index aaab2d35d9..fcf5ee1c9d 100644 --- a/pkg/controllers/disruption/eventual_test.go +++ b/pkg/controllers/disruption/drift_test.go @@ -103,37 +103,6 @@ var _ = Describe("Drift", func() { ExpectApplied(ctx, env.Client, pod, nodeClaim) ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{node}, []*v1.NodeClaim{nodeClaim}) - fakeClock.Step(10 * time.Minute) - ExpectSingletonReconciled(ctx, disruptionController) - ExpectMetricGaugeValue(disruption.EligibleNodes, 1, eligibleNodesLabels) - }) - It("should correctly report eligible cloudprovider specific node", func() { - var eligibleNodesLabels = map[string]string{ - metrics.ReasonLabel: "cloudproviderdisruptionreason", - } - pod := test.Pod(test.PodOptions{ - ObjectMeta: metav1.ObjectMeta{ - Annotations: map[string]string{ - v1.DoNotDisruptAnnotationKey: "true", - }, - }, - }) - ExpectApplied(ctx, env.Client, nodePool, nodeClaim, node, pod) - ExpectManualBinding(ctx, env.Client, pod, node) - - // inform cluster state about nodes and nodeclaims - ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{node}, []*v1.NodeClaim{nodeClaim}) - - fakeClock.Step(10 * time.Minute) - ExpectSingletonReconciled(ctx, disruptionController) - ExpectMetricGaugeValue(disruption.EligibleNodes, 0, eligibleNodesLabels) - - // remove the do-not-disrupt annotation to make the node eligible for drift and update cluster state - pod.SetAnnotations(map[string]string{}) - nodeClaim.StatusConditions().SetTrue("CloudProviderDisruptionReason") - ExpectApplied(ctx, env.Client, pod, nodeClaim) - ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{node}, []*v1.NodeClaim{nodeClaim}) - fakeClock.Step(10 * time.Minute) ExpectSingletonReconciled(ctx, disruptionController) ExpectMetricGaugeValue(disruption.EligibleNodes, 1, eligibleNodesLabels) @@ -985,529 +954,6 @@ var _ = Describe("Drift", func() { // Cascade any deletion of the nodeClaim to the node ExpectNodeClaimsCascadeDeletion(ctx, env.Client, nodeClaim) - // We should delete the nodeClaim that has drifted - Expect(ExpectNodeClaims(ctx, env.Client)).To(HaveLen(0)) - Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(0)) - ExpectNotFound(ctx, env.Client, nodeClaim, node) - }) - }) - Context("CloudProvider Disruption", func() { - var cloudproviderDisruptionReason v1.DisruptionReason - BeforeEach(func() { - cloudproviderDisruptionReason = cloudProvider.DisruptionReasons()[0] - nodeClaim.StatusConditions().SetTrue(string(cloudproviderDisruptionReason)) - }) - It("should continue to the next cloudprovider disruption node if the first cannot reschedule all pods", func() { - pod := test.Pod(test.PodOptions{ - ResourceRequirements: corev1.ResourceRequirements{ - Requests: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("150"), - }, - }, - }) - podToExpire := test.Pod(test.PodOptions{ - ResourceRequirements: corev1.ResourceRequirements{ - Requests: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("1"), - }, - }, - }) - ExpectApplied(ctx, env.Client, nodeClaim, node, nodePool, pod) - ExpectManualBinding(ctx, env.Client, pod, node) - - // inform cluster state about nodes and nodeclaims - ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{node}, []*v1.NodeClaim{nodeClaim}) - - nodeClaim2, node2 := test.NodeClaimAndNode(v1.NodeClaim{ - ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{ - v1.NodePoolLabelKey: nodePool.Name, - corev1.LabelInstanceTypeStable: mostExpensiveInstance.Name, - v1.CapacityTypeLabelKey: mostExpensiveOffering.Requirements.Get(v1.CapacityTypeLabelKey).Any(), - corev1.LabelTopologyZone: mostExpensiveOffering.Requirements.Get(corev1.LabelTopologyZone).Any(), - }, - }, - Status: v1.NodeClaimStatus{ - ProviderID: test.RandomProviderID(), - Allocatable: map[corev1.ResourceName]resource.Quantity{ - corev1.ResourceCPU: resource.MustParse("1"), - corev1.ResourcePods: resource.MustParse("100"), - }, - }, - }) - nodeClaim2.StatusConditions().SetTrue(string(cloudproviderDisruptionReason)) - ExpectApplied(ctx, env.Client, nodeClaim2, node2, podToExpire) - ExpectManualBinding(ctx, env.Client, podToExpire, node2) - - // inform cluster state about nodes and nodeclaims - ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{node2}, []*v1.NodeClaim{nodeClaim2}) - - // disruption won't delete the old node until the new node is ready - var wg sync.WaitGroup - ExpectMakeNewNodeClaimsReady(ctx, env.Client, &wg, cluster, cloudProvider, 1) - ExpectSingletonReconciled(ctx, disruptionController) - wg.Wait() - - // Process the item so that the nodes can be deleted. - ExpectSingletonReconciled(ctx, queue) - ExpectNodeClaimsCascadeDeletion(ctx, env.Client, nodeClaim, nodeClaim2) - - Expect(ExpectNodeClaims(ctx, env.Client)).To(HaveLen(2)) - Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(2)) - ExpectExists(ctx, env.Client, nodeClaim) - ExpectNotFound(ctx, env.Client, nodeClaim2) - }) - It("should ignore nodes without the cloudprovider disruption status condition", func() { - _ = nodeClaim.StatusConditions().Clear(string(cloudproviderDisruptionReason)) - ExpectApplied(ctx, env.Client, nodeClaim, node, nodePool) - - // inform cluster state about nodes and nodeclaims - ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{node}, []*v1.NodeClaim{nodeClaim}) - - fakeClock.Step(10 * time.Minute) - - ExpectSingletonReconciled(ctx, disruptionController) - - // Expect to not create or delete more nodeclaims - Expect(ExpectNodeClaims(ctx, env.Client)).To(HaveLen(1)) - ExpectExists(ctx, env.Client, nodeClaim) - }) - It("should ignore nodes with the karpenter.sh/do-not-disrupt annotation", func() { - node.Annotations = lo.Assign(node.Annotations, map[string]string{v1.DoNotDisruptAnnotationKey: "true"}) - ExpectApplied(ctx, env.Client, nodeClaim, node, nodePool) - - // inform cluster state about nodes and nodeclaims - ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{node}, []*v1.NodeClaim{nodeClaim}) - - ExpectSingletonReconciled(ctx, disruptionController) - - // Expect to not create or delete more nodeclaims - Expect(ExpectNodeClaims(ctx, env.Client)).To(HaveLen(1)) - Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1)) - ExpectExists(ctx, env.Client, nodeClaim) - }) - It("should ignore nodes that have pods with the karpenter.sh/do-not-disrupt annotation", func() { - pod := test.Pod(test.PodOptions{ - ObjectMeta: metav1.ObjectMeta{ - Annotations: map[string]string{ - v1.DoNotDisruptAnnotationKey: "true", - }, - }, - }) - ExpectApplied(ctx, env.Client, nodeClaim, node, nodePool, pod) - ExpectManualBinding(ctx, env.Client, pod, node) - - // inform cluster state about nodes and nodeclaims - ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{node}, []*v1.NodeClaim{nodeClaim}) - - ExpectSingletonReconciled(ctx, disruptionController) - - // Expect to not create or delete more nodeclaims - Expect(ExpectNodeClaims(ctx, env.Client)).To(HaveLen(1)) - Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1)) - ExpectExists(ctx, env.Client, nodeClaim) - }) - It("should cloudprovider disrupt nodes that have pods with the karpenter.sh/do-not-disrupt annotation when the NodePool's TerminationGracePeriod is not nil", func() { - nodeClaim.Spec.TerminationGracePeriod = &metav1.Duration{Duration: time.Second * 300} - pod := test.Pod(test.PodOptions{ - ObjectMeta: metav1.ObjectMeta{ - Annotations: map[string]string{ - v1.DoNotDisruptAnnotationKey: "true", - }, - }, - }) - ExpectApplied(ctx, env.Client, nodeClaim, node, nodePool, pod) - ExpectManualBinding(ctx, env.Client, pod, node) - - // inform cluster state about nodes and nodeclaims - ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{node}, []*v1.NodeClaim{nodeClaim}) - - ExpectSingletonReconciled(ctx, disruptionController) - - // Expect to create a replacement but not delete the old nodeclaim - Expect(ExpectNodeClaims(ctx, env.Client)).To(HaveLen(2)) // new nodeclaim is created for drift - Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1)) - ExpectExists(ctx, env.Client, nodeClaim) - }) - It("should cloudprovider disrupt nodes that have pods with the blocking PDBs when the NodePool's TerminationGracePeriod is not nil", func() { - nodeClaim.Spec.TerminationGracePeriod = &metav1.Duration{Duration: time.Second * 300} - podLabels := map[string]string{"test": "value"} - pod := test.Pod(test.PodOptions{ - ObjectMeta: metav1.ObjectMeta{ - Labels: podLabels, - }, - }) - budget := test.PodDisruptionBudget(test.PDBOptions{ - Labels: podLabels, - MaxUnavailable: fromInt(0), - }) - ExpectApplied(ctx, env.Client, nodeClaim, node, nodePool, pod, budget) - ExpectManualBinding(ctx, env.Client, pod, node) - - // inform cluster state about nodes and nodeclaims - ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{node}, []*v1.NodeClaim{nodeClaim}) - - ExpectSingletonReconciled(ctx, disruptionController) - - // Expect to create a replacement but not delete the old nodeclaim - Expect(ExpectNodeClaims(ctx, env.Client)).To(HaveLen(2)) // new nodeclaim is created for drift - Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1)) - ExpectExists(ctx, env.Client, nodeClaim) - }) - It("should ignore nodes with the cloudprovider disrupted status condition set to false", func() { - nodeClaim.StatusConditions().SetFalse(string(cloudproviderDisruptionReason), "NotDisrupted", "NotDisrupted") - ExpectApplied(ctx, env.Client, nodeClaim, node, nodePool) - - // inform cluster state about nodes and nodeclaims - ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{node}, []*v1.NodeClaim{nodeClaim}) - - fakeClock.Step(10 * time.Minute) - - ExpectSingletonReconciled(ctx, disruptionController) - - // Expect to not create or delete more nodeclaims - Expect(ExpectNodeClaims(ctx, env.Client)).To(HaveLen(1)) - ExpectExists(ctx, env.Client, nodeClaim) - }) - It("can delete cloudprovider disrupted nodes", func() { - ExpectApplied(ctx, env.Client, nodeClaim, node, nodePool) - - // inform cluster state about nodes and nodeclaims - ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{node}, []*v1.NodeClaim{nodeClaim}) - - fakeClock.Step(10 * time.Minute) - ExpectSingletonReconciled(ctx, disruptionController) - // Process the item so that the nodes can be deleted. - ExpectSingletonReconciled(ctx, queue) - // Cascade any deletion of the nodeClaim to the node - ExpectNodeClaimsCascadeDeletion(ctx, env.Client, nodeClaim) - - // We should delete the nodeClaim that has drifted - Expect(ExpectNodeClaims(ctx, env.Client)).To(HaveLen(0)) - Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(0)) - ExpectNotFound(ctx, env.Client, nodeClaim, node) - }) - It("should disrupt all empty cloudprovider disrupted nodes in parallel", func() { - nodeClaims, nodes := test.NodeClaimsAndNodes(100, v1.NodeClaim{ - ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{ - v1.NodePoolLabelKey: nodePool.Name, - corev1.LabelInstanceTypeStable: mostExpensiveInstance.Name, - v1.CapacityTypeLabelKey: mostExpensiveOffering.Requirements.Get(v1.CapacityTypeLabelKey).Any(), - corev1.LabelTopologyZone: mostExpensiveOffering.Requirements.Get(corev1.LabelTopologyZone).Any(), - }, - }, - Status: v1.NodeClaimStatus{ - Allocatable: map[corev1.ResourceName]resource.Quantity{ - corev1.ResourceCPU: resource.MustParse("32"), - corev1.ResourcePods: resource.MustParse("100"), - }, - }, - }) - for _, m := range nodeClaims { - m.StatusConditions().SetTrue(string(cloudproviderDisruptionReason)) - ExpectApplied(ctx, env.Client, m) - } - for _, n := range nodes { - ExpectApplied(ctx, env.Client, n) - } - ExpectApplied(ctx, env.Client, nodePool) - - // inform cluster state about nodes and nodeClaims - ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, nodes, nodeClaims) - ExpectSingletonReconciled(ctx, disruptionController) - - // Process the item so that the nodes can be deleted. - ExpectSingletonReconciled(ctx, queue) - // Cascade any deletion of the nodeClaim to the node - ExpectNodeClaimsCascadeDeletion(ctx, env.Client, nodeClaims...) - - // Expect that the drifted nodeClaims are gone - Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(0)) - Expect(ExpectNodeClaims(ctx, env.Client)).To(HaveLen(0)) - }) - It("can replace cloudprovider disrupted nodes", func() { - labels := map[string]string{ - "app": "test", - } - // create our RS so we can link a pod to it - rs := test.ReplicaSet() - ExpectApplied(ctx, env.Client, rs) - Expect(env.Client.Get(ctx, client.ObjectKeyFromObject(rs), rs)).To(Succeed()) - - pod := test.Pod(test.PodOptions{ - ObjectMeta: metav1.ObjectMeta{Labels: labels, - OwnerReferences: []metav1.OwnerReference{ - { - APIVersion: "apps/v1", - Kind: "ReplicaSet", - Name: rs.Name, - UID: rs.UID, - Controller: lo.ToPtr(true), - BlockOwnerDeletion: lo.ToPtr(true), - }, - }}}) - - ExpectApplied(ctx, env.Client, rs, pod, nodeClaim, node, nodePool) - - // bind the pods to the node - ExpectManualBinding(ctx, env.Client, pod, node) - - // inform cluster state about nodes and nodeclaims - ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{node}, []*v1.NodeClaim{nodeClaim}) - - fakeClock.Step(10 * time.Minute) - - // disruption won't delete the old nodeClaim until the new nodeClaim is ready - var wg sync.WaitGroup - ExpectMakeNewNodeClaimsReady(ctx, env.Client, &wg, cluster, cloudProvider, 1) - ExpectSingletonReconciled(ctx, disruptionController) - wg.Wait() - - // Process the item so that the nodes can be deleted. - ExpectSingletonReconciled(ctx, queue) - // Cascade any deletion of the nodeClaim to the node - ExpectNodeClaimsCascadeDeletion(ctx, env.Client, nodeClaim) - - ExpectNotFound(ctx, env.Client, nodeClaim, node) - - // Expect that the new nodeClaim was created and its different than the original - nodeclaims := ExpectNodeClaims(ctx, env.Client) - nodes := ExpectNodes(ctx, env.Client) - Expect(nodeclaims).To(HaveLen(1)) - Expect(nodes).To(HaveLen(1)) - Expect(nodeclaims[0].Name).ToNot(Equal(nodeClaim.Name)) - Expect(nodes[0].Name).ToNot(Equal(node.Name)) - }) - It("should untaint nodes when cloudprovider disrupted replacement fails", func() { - cloudProvider.AllowedCreateCalls = 0 // fail the replacement and expect it to untaint - - labels := map[string]string{ - "app": "test", - } - // create our RS so we can link a pod to it - rs := test.ReplicaSet() - ExpectApplied(ctx, env.Client, rs) - Expect(env.Client.Get(ctx, client.ObjectKeyFromObject(rs), rs)).To(Succeed()) - - pod := test.Pod(test.PodOptions{ - ObjectMeta: metav1.ObjectMeta{Labels: labels, - OwnerReferences: []metav1.OwnerReference{ - { - APIVersion: "apps/v1", - Kind: "ReplicaSet", - Name: rs.Name, - UID: rs.UID, - Controller: lo.ToPtr(true), - BlockOwnerDeletion: lo.ToPtr(true), - }, - }, - }, - }) - ExpectApplied(ctx, env.Client, rs, nodeClaim, node, nodePool, pod) - - // bind pods to node - ExpectManualBinding(ctx, env.Client, pod, node) - - // inform cluster state about nodes and nodeclaims - ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{node}, []*v1.NodeClaim{nodeClaim}) - - var wg sync.WaitGroup - ExpectNewNodeClaimsDeleted(ctx, env.Client, &wg, 1) - ExpectSingletonReconciled(ctx, disruptionController) - wg.Wait() - - // Wait > 5 seconds for eventual consistency hack in orchestration.Queue - fakeClock.Step(5*time.Second + time.Nanosecond*1) - ExpectSingletonReconciled(ctx, queue) - // We should have tried to create a new nodeClaim but failed to do so; therefore, we untainted the existing node - node = ExpectExists(ctx, env.Client, node) - Expect(node.Spec.Taints).ToNot(ContainElement(v1.DisruptedNoScheduleTaint)) - }) - It("can replace cloudprovider disrupted nodes with multiple nodes", func() { - currentInstance := fake.NewInstanceType(fake.InstanceTypeOptions{ - Name: "current-on-demand", - Offerings: []cloudprovider.Offering{ - { - Requirements: scheduling.NewLabelRequirements(map[string]string{v1.CapacityTypeLabelKey: v1.CapacityTypeOnDemand, corev1.LabelTopologyZone: "test-zone-1a"}), - Price: 0.5, - Available: false, - }, - }, - }) - replacementInstance := fake.NewInstanceType(fake.InstanceTypeOptions{ - Name: "replacement-on-demand", - Offerings: []cloudprovider.Offering{ - { - Requirements: scheduling.NewLabelRequirements(map[string]string{v1.CapacityTypeLabelKey: v1.CapacityTypeOnDemand, corev1.LabelTopologyZone: "test-zone-1a"}), - Price: 0.3, - Available: true, - }, - }, - Resources: map[corev1.ResourceName]resource.Quantity{corev1.ResourceCPU: resource.MustParse("3")}, - }) - cloudProvider.InstanceTypes = []*cloudprovider.InstanceType{ - currentInstance, - replacementInstance, - } - - labels := map[string]string{ - "app": "test", - } - // create our RS so we can link a pod to it - rs := test.ReplicaSet() - ExpectApplied(ctx, env.Client, rs) - Expect(env.Client.Get(ctx, client.ObjectKeyFromObject(rs), rs)).To(Succeed()) - - pods := test.Pods(3, test.PodOptions{ - ObjectMeta: metav1.ObjectMeta{Labels: labels, - OwnerReferences: []metav1.OwnerReference{ - { - APIVersion: "apps/v1", - Kind: "ReplicaSet", - Name: rs.Name, - UID: rs.UID, - Controller: lo.ToPtr(true), - BlockOwnerDeletion: lo.ToPtr(true), - }, - }}, - // Make each pod request about a third of the allocatable on the node - ResourceRequirements: corev1.ResourceRequirements{ - Requests: map[corev1.ResourceName]resource.Quantity{corev1.ResourceCPU: resource.MustParse("2")}, - }, - }) - - nodeClaim.Labels = lo.Assign(nodeClaim.Labels, map[string]string{ - corev1.LabelInstanceTypeStable: currentInstance.Name, - v1.CapacityTypeLabelKey: currentInstance.Offerings[0].Requirements.Get(v1.CapacityTypeLabelKey).Any(), - corev1.LabelTopologyZone: currentInstance.Offerings[0].Requirements.Get(corev1.LabelTopologyZone).Any(), - }) - nodeClaim.Status.Allocatable = map[corev1.ResourceName]resource.Quantity{corev1.ResourceCPU: resource.MustParse("8")} - node.Labels = lo.Assign(node.Labels, map[string]string{ - corev1.LabelInstanceTypeStable: currentInstance.Name, - v1.CapacityTypeLabelKey: currentInstance.Offerings[0].Requirements.Get(v1.CapacityTypeLabelKey).Any(), - corev1.LabelTopologyZone: currentInstance.Offerings[0].Requirements.Get(corev1.LabelTopologyZone).Any(), - }) - node.Status.Allocatable = map[corev1.ResourceName]resource.Quantity{corev1.ResourceCPU: resource.MustParse("8")} - - ExpectApplied(ctx, env.Client, rs, nodeClaim, node, nodePool, pods[0], pods[1], pods[2]) - - // bind the pods to the node - ExpectManualBinding(ctx, env.Client, pods[0], node) - ExpectManualBinding(ctx, env.Client, pods[1], node) - ExpectManualBinding(ctx, env.Client, pods[2], node) - - // inform cluster state about nodes and nodeclaims - ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{node}, []*v1.NodeClaim{nodeClaim}) - - fakeClock.Step(10 * time.Minute) - - // disruption won't delete the old node until the new node is ready - var wg sync.WaitGroup - ExpectMakeNewNodeClaimsReady(ctx, env.Client, &wg, cluster, cloudProvider, 3) - ExpectSingletonReconciled(ctx, disruptionController) - wg.Wait() - - // Process the item so that the nodes can be deleted. - ExpectSingletonReconciled(ctx, queue) - // Cascade any deletion of the nodeClaim to the node - ExpectNodeClaimsCascadeDeletion(ctx, env.Client, nodeClaim) - - // expect that drift provisioned three nodes, one for each pod - ExpectNotFound(ctx, env.Client, nodeClaim, node) - Expect(ExpectNodeClaims(ctx, env.Client)).To(HaveLen(3)) - Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(3)) - }) - It("should cloudprovider disrupt one non-empty node at a time, starting with the earliest drift", func() { - labels := map[string]string{ - "app": "test", - } - - // create our RS so we can link a pod to it - rs := test.ReplicaSet() - ExpectApplied(ctx, env.Client, rs) - - pods := test.Pods(2, test.PodOptions{ - ObjectMeta: metav1.ObjectMeta{Labels: labels, - OwnerReferences: []metav1.OwnerReference{ - { - APIVersion: "apps/v1", - Kind: "ReplicaSet", - Name: rs.Name, - UID: rs.UID, - Controller: lo.ToPtr(true), - BlockOwnerDeletion: lo.ToPtr(true), - }, - }, - }, - // Make each pod request only fit on a single node - ResourceRequirements: corev1.ResourceRequirements{ - Requests: map[corev1.ResourceName]resource.Quantity{corev1.ResourceCPU: resource.MustParse("30")}, - }, - }) - - nodeClaim2, node2 := test.NodeClaimAndNode(v1.NodeClaim{ - ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{ - v1.NodePoolLabelKey: nodePool.Name, - corev1.LabelInstanceTypeStable: mostExpensiveInstance.Name, - v1.CapacityTypeLabelKey: mostExpensiveOffering.Requirements.Get(v1.CapacityTypeLabelKey).Any(), - corev1.LabelTopologyZone: mostExpensiveOffering.Requirements.Get(corev1.LabelTopologyZone).Any(), - }, - }, - Status: v1.NodeClaimStatus{ - ProviderID: test.RandomProviderID(), - Allocatable: map[corev1.ResourceName]resource.Quantity{corev1.ResourceCPU: resource.MustParse("32")}, - }, - }) - nodeClaim2.Status.Conditions = append(nodeClaim2.Status.Conditions, status.Condition{ - Type: string(cloudproviderDisruptionReason), - Status: metav1.ConditionTrue, - Reason: string(cloudproviderDisruptionReason), - Message: string(cloudproviderDisruptionReason), - LastTransitionTime: metav1.Time{Time: time.Now().Add(-time.Hour)}, - }) - - ExpectApplied(ctx, env.Client, rs, pods[0], pods[1], nodeClaim, node, nodeClaim2, node2, nodePool) - - // bind pods to node so that they're not empty and don't disrupt in parallel. - ExpectManualBinding(ctx, env.Client, pods[0], node) - ExpectManualBinding(ctx, env.Client, pods[1], node2) - - // inform cluster state about nodes and nodeclaims - ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{node, node2}, []*v1.NodeClaim{nodeClaim, nodeClaim2}) - - // disruption won't delete the old node until the new node is ready - var wg sync.WaitGroup - ExpectMakeNewNodeClaimsReady(ctx, env.Client, &wg, cluster, cloudProvider, 1) - ExpectSingletonReconciled(ctx, disruptionController) - wg.Wait() - - // Process the item so that the nodes can be deleted. - ExpectSingletonReconciled(ctx, queue) - // Cascade any deletion of the nodeClaim to the node - ExpectNodeClaimsCascadeDeletion(ctx, env.Client, nodeClaim, nodeClaim2) - - Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(2)) - Expect(ExpectNodeClaims(ctx, env.Client)).To(HaveLen(2)) - ExpectNotFound(ctx, env.Client, nodeClaim2, node2) - ExpectExists(ctx, env.Client, nodeClaim) - ExpectExists(ctx, env.Client, node) - }) - It("should delete nodes with the karpenter.sh/do-not-disrupt annotation set to false", func() { - node.Annotations = lo.Assign(node.Annotations, map[string]string{v1.DoNotDisruptAnnotationKey: "false"}) - ExpectApplied(ctx, env.Client, nodeClaim, node, nodePool) - - // inform cluster state about nodes and nodeclaims - ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{node}, []*v1.NodeClaim{nodeClaim}) - - fakeClock.Step(10 * time.Minute) - ExpectSingletonReconciled(ctx, disruptionController) - // Process the item so that the nodes can be deleted. - ExpectSingletonReconciled(ctx, queue) - // Cascade any deletion of the nodeClaim to the node - ExpectNodeClaimsCascadeDeletion(ctx, env.Client, nodeClaim) - // We should delete the nodeClaim that has drifted Expect(ExpectNodeClaims(ctx, env.Client)).To(HaveLen(0)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(0)) diff --git a/pkg/controllers/disruption/suite_test.go b/pkg/controllers/disruption/suite_test.go index 4b9eba9d3e..4ee24f5bc8 100644 --- a/pkg/controllers/disruption/suite_test.go +++ b/pkg/controllers/disruption/suite_test.go @@ -151,10 +151,11 @@ var _ = BeforeEach(func() { }) leastExpensiveSpotInstance, mostExpensiveSpotInstance = spotInstances[0], spotInstances[len(spotInstances)-1] leastExpensiveSpotOffering, mostExpensiveSpotOffering = leastExpensiveSpotInstance.Offerings[0], mostExpensiveSpotInstance.Offerings[0] - allKnownDisruptionReasons = append([]v1.DisruptionReason{ + allKnownDisruptionReasons = []v1.DisruptionReason{ v1.DisruptionReasonEmpty, v1.DisruptionReasonUnderutilized, - v1.DisruptionReasonDrifted}, cloudProvider.DisruptionReasons()...) + v1.DisruptionReasonDrifted, + } }) var _ = AfterEach(func() { diff --git a/test/suites/perf/disruption_test.go b/test/suites/perf/disruption_test.go deleted file mode 100644 index 0fe28f4348..0000000000 --- a/test/suites/perf/disruption_test.go +++ /dev/null @@ -1,68 +0,0 @@ -/* -Copyright The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package perf_test - -import ( - "time" - - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/resource" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "sigs.k8s.io/controller-runtime/pkg/client" - - v1 "sigs.k8s.io/karpenter/pkg/apis/v1" - "sigs.k8s.io/karpenter/pkg/test" -) - -var _ = Describe("Disruption", func() { - var replicas = 100 - It("should do simple provisioning and cloudprovider disruption", func() { - deployment := test.Deployment(test.DeploymentOptions{ - Replicas: int32(replicas), - PodOptions: test.PodOptions{ - ObjectMeta: metav1.ObjectMeta{ - Labels: testLabels, - }, - ResourceRequirements: corev1.ResourceRequirements{ - Requests: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("1"), - }, - }, - }}) - env.ExpectCreated(deployment) - env.ExpectCreated(nodePool, nodeClass) - env.EventuallyExpectHealthyPodCount(labelSelector, replicas) - - env.TimeIntervalCollector.Start("KOWK Disruption") - nodeClaimList := &v1.NodeClaimList{} - Expect(env.Client.List(env, nodeClaimList, client.HasLabels{test.DiscoveryLabel})).To(Succeed()) - for i := range nodeClaimList.Items { - Expect(nodeClaimList.Items[i].StatusConditions().SetTrue("ExampleReason")).To(BeTrue()) - Expect(env.Client.Status().Update(env, &nodeClaimList.Items[i])).To(Succeed()) - } - - // Then eventually expect no node to have an ExampleReason - Eventually(func(g Gomega) { - nodeClaims := &v1.NodeClaimList{} - g.Expect(env.Client.List(env, nodeClaims, client.MatchingFields{"status.conditions[*].type": "ExampleReason"})).To(Succeed()) - g.Expect(len(nodeClaims.Items)).To(Equal(0)) - }).WithTimeout(3 * time.Minute).Should(Succeed()) - env.TimeIntervalCollector.End("KOWK Disruption") - }) -})