diff --git a/kwok/README.md b/kwok/README.md index 243a942148..2c7f011894 100644 --- a/kwok/README.md +++ b/kwok/README.md @@ -40,7 +40,7 @@ spec: nodeClassRef: name: default kind: KWOKNodeClass - group: karpenter.kwok.sh/v1alpha1 + group: karpenter.kwok.sh expireAfter: 720h # 30 * 24h = 720h limits: cpu: 1000 diff --git a/kwok/apis/crds/karpenter.kwok.sh_kwoknodeclasses.yaml b/kwok/apis/crds/karpenter.kwok.sh_kwoknodeclasses.yaml index 2ed31b19e9..79b9db76fc 100644 --- a/kwok/apis/crds/karpenter.kwok.sh_kwoknodeclasses.yaml +++ b/kwok/apis/crds/karpenter.kwok.sh_kwoknodeclasses.yaml @@ -3,7 +3,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.15.0 + controller-gen.kubebuilder.io/version: v0.16.2 name: kwoknodeclasses.karpenter.kwok.sh spec: group: karpenter.kwok.sh @@ -96,12 +96,7 @@ spec: - Unknown type: string type: - description: |- - type of condition in CamelCase or in foo.example.com/CamelCase. - --- - Many .condition.type values are consistent across resources like Available, but because arbitrary conditions can be - useful (see .node.status.conditions), the ability to deconflict is important. - The regex it matches is (dns1123SubdomainFmt/)?(qualifiedNameFmt) + description: type of condition in CamelCase or in foo.example.com/CamelCase. maxLength: 316 pattern: ^([a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*/)?(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])$ type: string diff --git a/kwok/charts/crds/karpenter.sh_nodeclaims.yaml b/kwok/charts/crds/karpenter.sh_nodeclaims.yaml index e10563ab7b..223e2caaa9 100644 --- a/kwok/charts/crds/karpenter.sh_nodeclaims.yaml +++ b/kwok/charts/crds/karpenter.sh_nodeclaims.yaml @@ -3,7 +3,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.15.0 + controller-gen.kubebuilder.io/version: v0.16.2 name: nodeclaims.karpenter.sh spec: group: karpenter.sh @@ -81,6 +81,14 @@ spec: memory leak protection, and disruption testing. pattern: ^(([0-9]+(s|m|h))+)|(Never)$ type: string + minimumPriceImprovementPercent: + description: |- + MinimumPriceImprovementPercent is the minimum price improvement necessary to disrupt this node, as an integer percentage. + The default is 0%, which maintains the existing consolidation behavior prior to this feature. + format: int32 + maximum: 100 + minimum: 0 + type: integer nodeClassRef: description: NodeClassRef is a reference to an object that defines provider specific configuration properties: @@ -262,19 +270,15 @@ spec: description: |- TerminationGracePeriod is the maximum duration the controller will wait before forcefully deleting the pods on a node, measured from when deletion is first initiated. - Warning: this feature takes precedence over a Pod's terminationGracePeriodSeconds value, and bypasses any blocked PDBs or the karpenter.sh/do-not-disrupt annotation. - This field is intended to be used by cluster administrators to enforce that nodes can be cycled within a given time period. When set, drifted nodes will begin draining even if there are pods blocking eviction. Draining will respect PDBs and the do-not-disrupt annotation until the TGP is reached. - Karpenter will preemptively delete pods so their terminationGracePeriodSeconds align with the node's terminationGracePeriod. If a pod would be terminated without being granted its full terminationGracePeriodSeconds prior to the node timeout, that pod will be deleted at T = node timeout - pod terminationGracePeriodSeconds. - The feature can also be used to allow maximum time limits for long-running jobs which can delay node termination with preStop hooks. If left undefined, the controller will wait indefinitely for pods to be drained. pattern: ^([0-9]+(s|m|h))+$ @@ -350,12 +354,7 @@ spec: - Unknown type: string type: - description: |- - type of condition in CamelCase or in foo.example.com/CamelCase. - --- - Many .condition.type values are consistent across resources like Available, but because arbitrary conditions can be - useful (see .node.status.conditions), the ability to deconflict is important. - The regex it matches is (dns1123SubdomainFmt/)?(qualifiedNameFmt) + description: type of condition in CamelCase or in foo.example.com/CamelCase. maxLength: 316 pattern: ^([a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*/)?(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])$ type: string @@ -798,12 +797,7 @@ spec: - Unknown type: string type: - description: |- - type of condition in CamelCase or in foo.example.com/CamelCase. - --- - Many .condition.type values are consistent across resources like Available, but because arbitrary conditions can be - useful (see .node.status.conditions), the ability to deconflict is important. - The regex it matches is (dns1123SubdomainFmt/)?(qualifiedNameFmt) + description: type of condition in CamelCase or in foo.example.com/CamelCase. maxLength: 316 pattern: ^([a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*/)?(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])$ type: string diff --git a/kwok/charts/crds/karpenter.sh_nodepools.yaml b/kwok/charts/crds/karpenter.sh_nodepools.yaml index cbfcf0ab42..53e868df8f 100644 --- a/kwok/charts/crds/karpenter.sh_nodepools.yaml +++ b/kwok/charts/crds/karpenter.sh_nodepools.yaml @@ -3,7 +3,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.15.0 + controller-gen.kubebuilder.io/version: v0.16.2 name: nodepools.karpenter.sh spec: group: karpenter.sh @@ -390,19 +390,15 @@ spec: description: |- TerminationGracePeriod is the maximum duration the controller will wait before forcefully deleting the pods on a node, measured from when deletion is first initiated. - Warning: this feature takes precedence over a Pod's terminationGracePeriodSeconds value, and bypasses any blocked PDBs or the karpenter.sh/do-not-disrupt annotation. - This field is intended to be used by cluster administrators to enforce that nodes can be cycled within a given time period. When set, drifted nodes will begin draining even if there are pods blocking eviction. Draining will respect PDBs and the do-not-disrupt annotation until the TGP is reached. - Karpenter will preemptively delete pods so their terminationGracePeriodSeconds align with the node's terminationGracePeriod. If a pod would be terminated without being granted its full terminationGracePeriodSeconds prior to the node timeout, that pod will be deleted at T = node timeout - pod terminationGracePeriodSeconds. - The feature can also be used to allow maximum time limits for long-running jobs which can delay node termination with preStop hooks. If left undefined, the controller will wait indefinitely for pods to be drained. pattern: ^([0-9]+(s|m|h))+$ @@ -474,12 +470,7 @@ spec: - Unknown type: string type: - description: |- - type of condition in CamelCase or in foo.example.com/CamelCase. - --- - Many .condition.type values are consistent across resources like Available, but because arbitrary conditions can be - useful (see .node.status.conditions), the ability to deconflict is important. - The regex it matches is (dns1123SubdomainFmt/)?(qualifiedNameFmt) + description: type of condition in CamelCase or in foo.example.com/CamelCase. maxLength: 316 pattern: ^([a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*/)?(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])$ type: string @@ -1043,12 +1034,7 @@ spec: - Unknown type: string type: - description: |- - type of condition in CamelCase or in foo.example.com/CamelCase. - --- - Many .condition.type values are consistent across resources like Available, but because arbitrary conditions can be - useful (see .node.status.conditions), the ability to deconflict is important. - The regex it matches is (dns1123SubdomainFmt/)?(qualifiedNameFmt) + description: type of condition in CamelCase or in foo.example.com/CamelCase. maxLength: 316 pattern: ^([a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*/)?(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])$ type: string diff --git a/pkg/apis/crds/karpenter.sh_nodeclaims.yaml b/pkg/apis/crds/karpenter.sh_nodeclaims.yaml index 5f5bff9ae0..02d2478127 100644 --- a/pkg/apis/crds/karpenter.sh_nodeclaims.yaml +++ b/pkg/apis/crds/karpenter.sh_nodeclaims.yaml @@ -3,7 +3,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.15.0 + controller-gen.kubebuilder.io/version: v0.16.2 name: nodeclaims.karpenter.sh spec: group: karpenter.sh @@ -81,6 +81,14 @@ spec: memory leak protection, and disruption testing. pattern: ^(([0-9]+(s|m|h))+)|(Never)$ type: string + minimumPriceImprovementPercent: + description: |- + MinimumPriceImprovementPercent is the minimum price improvement necessary to disrupt this node, as an integer percentage. + The default is 0%, which maintains the existing consolidation behavior prior to this feature. + format: int32 + maximum: 100 + minimum: 0 + type: integer nodeClassRef: description: NodeClassRef is a reference to an object that defines provider specific configuration properties: @@ -260,19 +268,15 @@ spec: description: |- TerminationGracePeriod is the maximum duration the controller will wait before forcefully deleting the pods on a node, measured from when deletion is first initiated. - Warning: this feature takes precedence over a Pod's terminationGracePeriodSeconds value, and bypasses any blocked PDBs or the karpenter.sh/do-not-disrupt annotation. - This field is intended to be used by cluster administrators to enforce that nodes can be cycled within a given time period. When set, drifted nodes will begin draining even if there are pods blocking eviction. Draining will respect PDBs and the do-not-disrupt annotation until the TGP is reached. - Karpenter will preemptively delete pods so their terminationGracePeriodSeconds align with the node's terminationGracePeriod. If a pod would be terminated without being granted its full terminationGracePeriodSeconds prior to the node timeout, that pod will be deleted at T = node timeout - pod terminationGracePeriodSeconds. - The feature can also be used to allow maximum time limits for long-running jobs which can delay node termination with preStop hooks. If left undefined, the controller will wait indefinitely for pods to be drained. pattern: ^([0-9]+(s|m|h))+$ @@ -348,12 +352,7 @@ spec: - Unknown type: string type: - description: |- - type of condition in CamelCase or in foo.example.com/CamelCase. - --- - Many .condition.type values are consistent across resources like Available, but because arbitrary conditions can be - useful (see .node.status.conditions), the ability to deconflict is important. - The regex it matches is (dns1123SubdomainFmt/)?(qualifiedNameFmt) + description: type of condition in CamelCase or in foo.example.com/CamelCase. maxLength: 316 pattern: ^([a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*/)?(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])$ type: string @@ -794,12 +793,7 @@ spec: - Unknown type: string type: - description: |- - type of condition in CamelCase or in foo.example.com/CamelCase. - --- - Many .condition.type values are consistent across resources like Available, but because arbitrary conditions can be - useful (see .node.status.conditions), the ability to deconflict is important. - The regex it matches is (dns1123SubdomainFmt/)?(qualifiedNameFmt) + description: type of condition in CamelCase or in foo.example.com/CamelCase. maxLength: 316 pattern: ^([a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*/)?(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])$ type: string diff --git a/pkg/apis/crds/karpenter.sh_nodepools.yaml b/pkg/apis/crds/karpenter.sh_nodepools.yaml index cd2b99ed11..f186c4c5f7 100644 --- a/pkg/apis/crds/karpenter.sh_nodepools.yaml +++ b/pkg/apis/crds/karpenter.sh_nodepools.yaml @@ -3,7 +3,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.15.0 + controller-gen.kubebuilder.io/version: v0.16.2 name: nodepools.karpenter.sh spec: group: karpenter.sh @@ -388,19 +388,15 @@ spec: description: |- TerminationGracePeriod is the maximum duration the controller will wait before forcefully deleting the pods on a node, measured from when deletion is first initiated. - Warning: this feature takes precedence over a Pod's terminationGracePeriodSeconds value, and bypasses any blocked PDBs or the karpenter.sh/do-not-disrupt annotation. - This field is intended to be used by cluster administrators to enforce that nodes can be cycled within a given time period. When set, drifted nodes will begin draining even if there are pods blocking eviction. Draining will respect PDBs and the do-not-disrupt annotation until the TGP is reached. - Karpenter will preemptively delete pods so their terminationGracePeriodSeconds align with the node's terminationGracePeriod. If a pod would be terminated without being granted its full terminationGracePeriodSeconds prior to the node timeout, that pod will be deleted at T = node timeout - pod terminationGracePeriodSeconds. - The feature can also be used to allow maximum time limits for long-running jobs which can delay node termination with preStop hooks. If left undefined, the controller will wait indefinitely for pods to be drained. pattern: ^([0-9]+(s|m|h))+$ @@ -472,12 +468,7 @@ spec: - Unknown type: string type: - description: |- - type of condition in CamelCase or in foo.example.com/CamelCase. - --- - Many .condition.type values are consistent across resources like Available, but because arbitrary conditions can be - useful (see .node.status.conditions), the ability to deconflict is important. - The regex it matches is (dns1123SubdomainFmt/)?(qualifiedNameFmt) + description: type of condition in CamelCase or in foo.example.com/CamelCase. maxLength: 316 pattern: ^([a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*/)?(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])$ type: string @@ -1039,12 +1030,7 @@ spec: - Unknown type: string type: - description: |- - type of condition in CamelCase or in foo.example.com/CamelCase. - --- - Many .condition.type values are consistent across resources like Available, but because arbitrary conditions can be - useful (see .node.status.conditions), the ability to deconflict is important. - The regex it matches is (dns1123SubdomainFmt/)?(qualifiedNameFmt) + description: type of condition in CamelCase or in foo.example.com/CamelCase. maxLength: 316 pattern: ^([a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*/)?(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])$ type: string diff --git a/pkg/apis/v1/duration.go b/pkg/apis/v1/duration.go index 5d4436984e..7eeb5fbe4a 100644 --- a/pkg/apis/v1/duration.go +++ b/pkg/apis/v1/duration.go @@ -18,7 +18,11 @@ package v1 import ( "encoding/json" + "fmt" + "slices" "time" + + "github.com/samber/lo" ) const Never = "Never" @@ -28,6 +32,17 @@ const Never = "Never" // that the duration is disabled and sets the inner duration as nil type NillableDuration struct { *time.Duration + + // Raw is used to ensure we remarshal the NillableDuration in the same format it was specified. + // This ensures tools like Flux and ArgoCD don't mistakenly detect drift due to our conversion webhooks. + Raw []byte `hash:"ignore"` +} + +func MustParseNillableDuration(val string) NillableDuration { + nd := NillableDuration{} + // Use %q instead of %s to ensure that we unmarshal the value as a string and not an int + lo.Must0(json.Unmarshal([]byte(fmt.Sprintf("%q", val)), &nd)) + return nd } // UnmarshalJSON implements the json.Unmarshaller interface. @@ -44,22 +59,29 @@ func (d *NillableDuration) UnmarshalJSON(b []byte) error { if err != nil { return err } + d.Raw = slices.Clone(b) d.Duration = &pd return nil } // MarshalJSON implements the json.Marshaler interface. func (d NillableDuration) MarshalJSON() ([]byte, error) { - if d.Duration == nil { - return json.Marshal(Never) + if d.Raw != nil { + return d.Raw, nil + } + if d.Duration != nil { + return json.Marshal(d.Duration.String()) } - return json.Marshal(d.Duration.String()) + return json.Marshal(Never) } // ToUnstructured implements the value.UnstructuredConverter interface. func (d NillableDuration) ToUnstructured() interface{} { - if d.Duration == nil { - return Never + if d.Raw != nil { + return d.Raw + } + if d.Duration != nil { + return d.Duration.String() } - return d.Duration.String() + return Never } diff --git a/pkg/apis/v1/labels.go b/pkg/apis/v1/labels.go index a6b93fa72a..496fdca53b 100644 --- a/pkg/apis/v1/labels.go +++ b/pkg/apis/v1/labels.go @@ -49,6 +49,7 @@ const ( NodePoolHashAnnotationKey = apis.Group + "/nodepool-hash" NodePoolHashVersionAnnotationKey = apis.Group + "/nodepool-hash-version" KubeletCompatibilityAnnotationKey = apis.CompatibilityGroup + "/v1beta1-kubelet-conversion" + NodeClassReferenceAnnotationKey = apis.CompatibilityGroup + "/v1beta1-nodeclass-reference" NodeClaimTerminationTimestampAnnotationKey = apis.Group + "/nodeclaim-termination-timestamp" ) diff --git a/pkg/apis/v1/nodeclaim.go b/pkg/apis/v1/nodeclaim.go index 1c163858ba..8758a9a762 100644 --- a/pkg/apis/v1/nodeclaim.go +++ b/pkg/apis/v1/nodeclaim.go @@ -73,6 +73,12 @@ type NodeClaimSpec struct { // +kubebuilder:validation:Schemaless // +optional ExpireAfter NillableDuration `json:"expireAfter,omitempty"` + // MinimumPriceImprovementPercent is the minimum price improvement necessary to disrupt this node, as an integer percentage. + // The default is 0%, which maintains the existing consolidation behavior prior to this feature. + // +kubebuilder:validation:Minimum:=0 + // +kubebuilder:validation:Maximum:=100 + // +optional + MinimumPriceImprovementPercent *int32 `json:"minimumPriceImprovementPercent,omitempty"` } // A node selector requirement with min values is a selector that contains values, a key, an operator that relates the key and values diff --git a/pkg/apis/v1/nodeclaim_conversion.go b/pkg/apis/v1/nodeclaim_conversion.go index 8c1f89349d..ef18e1f80a 100644 --- a/pkg/apis/v1/nodeclaim_conversion.go +++ b/pkg/apis/v1/nodeclaim_conversion.go @@ -20,11 +20,11 @@ import ( "context" "encoding/json" "fmt" + "strings" "github.com/samber/lo" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "knative.dev/pkg/apis" @@ -37,11 +37,17 @@ func (in *NodeClaim) ConvertTo(ctx context.Context, to apis.Convertible) error { v1beta1NC := to.(*v1beta1.NodeClaim) v1beta1NC.ObjectMeta = in.ObjectMeta - in.Status.convertTo((&v1beta1NC.Status)) - return in.Spec.convertTo(ctx, &v1beta1NC.Spec, in.Annotations[KubeletCompatibilityAnnotationKey]) + in.Status.convertTo(&v1beta1NC.Status) + if err := in.Spec.convertTo(&v1beta1NC.Spec, in.Annotations[KubeletCompatibilityAnnotationKey], in.Annotations[NodeClassReferenceAnnotationKey]); err != nil { + return err + } + // Remove the annotations from the v1beta1 NodeClaim on the convert back + delete(v1beta1NC.Annotations, KubeletCompatibilityAnnotationKey) + delete(v1beta1NC.Annotations, NodeClassReferenceAnnotationKey) + return nil } -func (in *NodeClaimSpec) convertTo(ctx context.Context, v1beta1nc *v1beta1.NodeClaimSpec, kubeletAnnotation string) error { +func (in *NodeClaimSpec) convertTo(v1beta1nc *v1beta1.NodeClaimSpec, kubeletAnnotation, nodeClassReferenceAnnotation string) error { v1beta1nc.Taints = in.Taints v1beta1nc.StartupTaints = in.StartupTaints v1beta1nc.Resources = v1beta1.ResourceRequirements(in.Resources) @@ -55,18 +61,16 @@ func (in *NodeClaimSpec) convertTo(ctx context.Context, v1beta1nc *v1beta1.NodeC MinValues: v1Requirements.MinValues, } }) - - if in.NodeClassRef != nil { - nodeclass, found := lo.Find(injection.GetNodeClasses(ctx), func(nc schema.GroupVersionKind) bool { - return nc.Kind == in.NodeClassRef.Kind && nc.Group == in.NodeClassRef.Group - }) - v1beta1nc.NodeClassRef = &v1beta1.NodeClassReference{ - Kind: in.NodeClassRef.Kind, - Name: in.NodeClassRef.Name, - APIVersion: lo.Ternary(found, nodeclass.GroupVersion().String(), ""), + // Convert the NodeClassReference depending on whether the annotation exists + v1beta1nc.NodeClassRef = &v1beta1.NodeClassReference{} + if nodeClassReferenceAnnotation != "" { + if err := json.Unmarshal([]byte(nodeClassReferenceAnnotation), v1beta1nc.NodeClassRef); err != nil { + return fmt.Errorf("unmarshaling nodeClassRef annotation, %w", err) } + } else { + v1beta1nc.NodeClassRef.Name = in.NodeClassRef.Name + v1beta1nc.NodeClassRef.Kind = in.NodeClassRef.Kind } - if kubeletAnnotation != "" { v1beta1kubelet := &v1beta1.KubeletConfiguration{} err := json.Unmarshal([]byte(kubeletAnnotation), v1beta1kubelet) @@ -102,6 +106,13 @@ func (in *NodeClaim) ConvertFrom(ctx context.Context, from apis.Convertible) err } else { in.Annotations = lo.Assign(in.Annotations, map[string]string{KubeletCompatibilityAnnotationKey: kubeletAnnotation}) } + nodeClassRefAnnotation, err := json.Marshal(v1beta1NC.Spec.NodeClassRef) + if err != nil { + return fmt.Errorf("marshaling nodeClassRef annotation, %w", err) + } + in.Annotations = lo.Assign(in.Annotations, map[string]string{ + NodeClassReferenceAnnotationKey: string(nodeClassRefAnnotation), + }) return in.setExpireAfter(ctx, v1beta1NC) } @@ -141,14 +152,10 @@ func (in *NodeClaimSpec) convertFrom(ctx context.Context, v1beta1nc *v1beta1.Nod }) defaultNodeClassGVK := injection.GetNodeClasses(ctx)[0] - nodeclassGroupVersion, err := schema.ParseGroupVersion(v1beta1nc.NodeClassRef.APIVersion) - if err != nil { - return "", err - } in.NodeClassRef = &NodeClassReference{ Name: v1beta1nc.NodeClassRef.Name, Kind: lo.Ternary(v1beta1nc.NodeClassRef.Kind == "", defaultNodeClassGVK.Kind, v1beta1nc.NodeClassRef.Kind), - Group: lo.Ternary(v1beta1nc.NodeClassRef.APIVersion == "", defaultNodeClassGVK.Group, nodeclassGroupVersion.Group), + Group: lo.Ternary(v1beta1nc.NodeClassRef.APIVersion == "", defaultNodeClassGVK.Group, strings.Split(v1beta1nc.NodeClassRef.APIVersion, "/")[0]), } if v1beta1nc.Kubelet != nil { diff --git a/pkg/apis/v1/nodeclaim_conversion_test.go b/pkg/apis/v1/nodeclaim_conversion_test.go index 14098dfcbf..a6d18d79b9 100644 --- a/pkg/apis/v1/nodeclaim_conversion_test.go +++ b/pkg/apis/v1/nodeclaim_conversion_test.go @@ -176,20 +176,30 @@ var _ = Describe("Convert v1 to v1beta1 NodeClaim API", func() { Group: object.GVK(&v1alpha1.TestNodeClass{}).Group, } Expect(v1nodeclaim.ConvertTo(ctx, v1beta1nodeclaim)).To(Succeed()) - Expect(v1beta1nodeclaim.Spec.NodeClassRef.Kind).To(Equal(v1nodeclaim.Spec.NodeClassRef.Kind)) Expect(v1beta1nodeclaim.Spec.NodeClassRef.Name).To(Equal(v1nodeclaim.Spec.NodeClassRef.Name)) - Expect(v1beta1nodeclaim.Spec.NodeClassRef.APIVersion).To(Equal(cloudProvider.NodeClassGroupVersionKind[0].GroupVersion().String())) + Expect(v1beta1nodeclaim.Spec.NodeClassRef.Kind).To(Equal(v1nodeclaim.Spec.NodeClassRef.Kind)) + Expect(v1beta1nodeclaim.Spec.NodeClassRef.APIVersion).To(BeEmpty()) }) - It("should not include APIVersion for v1beta1 if Group and Kind is not in the supported nodeclass", func() { + It("should retain NodeClassReference details when the karpenter.sh/v1beta1-nodeclass-reference annotation exists", func() { + nodeClassReference := &v1beta1.NodeClassReference{ + APIVersion: object.GVK(&v1alpha1.TestNodeClass{}).GroupVersion().String(), + Name: "nodeclass-test", + Kind: object.GVK(&v1alpha1.TestNodeClass{}).Kind, + } + nodeClassAnnotation, err := json.Marshal(nodeClassReference) + Expect(err).ToNot(HaveOccurred()) + v1nodeclaim.Annotations = lo.Assign(map[string]string{ + NodeClassReferenceAnnotationKey: string(nodeClassAnnotation), + }) v1nodeclaim.Spec.NodeClassRef = &NodeClassReference{ - Kind: "test-kind", + Kind: object.GVK(&v1alpha1.TestNodeClass{}).Kind, Name: "nodeclass-test", - Group: "testgroup.sh", + Group: object.GVK(&v1alpha1.TestNodeClass{}).Group, } Expect(v1nodeclaim.ConvertTo(ctx, v1beta1nodeclaim)).To(Succeed()) - Expect(v1beta1nodeclaim.Spec.NodeClassRef.Kind).To(Equal(v1nodeclaim.Spec.NodeClassRef.Kind)) - Expect(v1beta1nodeclaim.Spec.NodeClassRef.Name).To(Equal(v1nodeclaim.Spec.NodeClassRef.Name)) - Expect(v1beta1nodeclaim.Spec.NodeClassRef.APIVersion).To(Equal("")) + Expect(v1beta1nodeclaim.Spec.NodeClassRef.Name).To(Equal(nodeClassReference.Name)) + Expect(v1beta1nodeclaim.Spec.NodeClassRef.Kind).To(Equal(nodeClassReference.Kind)) + Expect(v1beta1nodeclaim.Spec.NodeClassRef.APIVersion).To(Equal(nodeClassReference.APIVersion)) }) }) }) @@ -251,7 +261,7 @@ var _ = Describe("Convert V1beta1 to V1 NodeClaim API", func() { BeforeEach(func() { v1nodePool = test.NodePool() - v1nodePool.Spec.Template.Spec.ExpireAfter = NillableDuration{Duration: lo.ToPtr(30 * time.Minute)} + v1nodePool.Spec.Template.Spec.ExpireAfter = MustParseNillableDuration("30m") v1nodeclaim = &NodeClaim{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ @@ -296,13 +306,13 @@ var _ = Describe("Convert V1beta1 to V1 NodeClaim API", func() { }) It("should default the v1beta1 expireAfter to v1 when the nodepool doesn't exist", func() { Expect(env.Client.Delete(ctx, v1nodePool)).To(Succeed()) - v1nodePool.Spec.Template.Spec.ExpireAfter = NillableDuration{Duration: lo.ToPtr(30 * time.Minute)} + v1nodePool.Spec.Template.Spec.ExpireAfter = MustParseNillableDuration("30m") Expect(v1nodeclaim.ConvertFrom(ctx, v1beta1nodeclaim)).To(Succeed()) Expect(v1nodeclaim.Spec.ExpireAfter.Duration).To(BeNil()) }) It("should default the v1beta1 expireAfter to v1 when the nodepool label doesn't exist", func() { delete(v1beta1nodeclaim.Labels, v1beta1.NodePoolLabelKey) - v1nodePool.Spec.Template.Spec.ExpireAfter = NillableDuration{Duration: lo.ToPtr(30 * time.Minute)} + v1nodePool.Spec.Template.Spec.ExpireAfter = MustParseNillableDuration("30m") Expect(env.Client.Update(ctx, v1nodePool)).To(Succeed()) Expect(v1nodeclaim.ConvertFrom(ctx, v1beta1nodeclaim)).To(Succeed()) Expect(v1nodeclaim.Spec.ExpireAfter.Duration).To(BeNil()) @@ -311,7 +321,12 @@ var _ = Describe("Convert V1beta1 to V1 NodeClaim API", func() { It("should convert v1beta1 nodeclaim metadata", func() { v1beta1nodeclaim.ObjectMeta = test.ObjectMeta() Expect(v1nodeclaim.ConvertFrom(ctx, v1beta1nodeclaim)).To(Succeed()) - v1beta1nodeclaim.Annotations = map[string]string{} + + nodeClassReferenceAnnotation, err := json.Marshal(v1beta1nodeclaim.Spec.NodeClassRef) + Expect(err).ToNot(HaveOccurred()) + v1beta1nodeclaim.Annotations = map[string]string{ + NodeClassReferenceAnnotationKey: string(nodeClassReferenceAnnotation), + } Expect(v1nodeclaim.ObjectMeta).To(BeEquivalentTo(v1beta1nodeclaim.ObjectMeta)) }) Context("NodeClaim Spec", func() { @@ -436,19 +451,27 @@ var _ = Describe("Convert V1beta1 to V1 NodeClaim API", func() { Name: "nodeclass-test", APIVersion: "testgroup.sh/testversion", } + nodeClassReferenceAnnotation, err := json.Marshal(v1beta1nodeclaim.Spec.NodeClassRef) + Expect(err).ToNot(HaveOccurred()) + Expect(v1nodeclaim.ConvertFrom(ctx, v1beta1nodeclaim)).To(Succeed()) Expect(v1nodeclaim.Spec.NodeClassRef.Kind).To(Equal(v1beta1nodeclaim.Spec.NodeClassRef.Kind)) Expect(v1nodeclaim.Spec.NodeClassRef.Name).To(Equal(v1beta1nodeclaim.Spec.NodeClassRef.Name)) Expect(v1nodeclaim.Spec.NodeClassRef.Group).To(Equal("testgroup.sh")) + Expect(v1nodeclaim.Annotations).To(HaveKeyWithValue(NodeClassReferenceAnnotationKey, string(nodeClassReferenceAnnotation))) }) It("should set default nodeclass group and kind on v1beta1 nodeclassRef", func() { v1beta1nodeclaim.Spec.NodeClassRef = &v1beta1.NodeClassReference{ Name: "nodeclass-test", } + nodeClassReferenceAnnotation, err := json.Marshal(v1beta1nodeclaim.Spec.NodeClassRef) + Expect(err).ToNot(HaveOccurred()) + Expect(v1nodeclaim.ConvertFrom(ctx, v1beta1nodeclaim)).To(Succeed()) Expect(v1nodeclaim.Spec.NodeClassRef.Kind).To(Equal(cloudProvider.NodeClassGroupVersionKind[0].Kind)) Expect(v1nodeclaim.Spec.NodeClassRef.Name).To(Equal(v1beta1nodeclaim.Spec.NodeClassRef.Name)) Expect(v1nodeclaim.Spec.NodeClassRef.Group).To(Equal(cloudProvider.NodeClassGroupVersionKind[0].Group)) + Expect(v1nodeclaim.Annotations).To(HaveKeyWithValue(NodeClassReferenceAnnotationKey, string(nodeClassReferenceAnnotation))) }) }) }) diff --git a/pkg/apis/v1/nodeclaim_status.go b/pkg/apis/v1/nodeclaim_status.go index aca25a4e77..e7f918bcee 100644 --- a/pkg/apis/v1/nodeclaim_status.go +++ b/pkg/apis/v1/nodeclaim_status.go @@ -30,6 +30,7 @@ const ( ConditionTypeDrifted = "Drifted" ConditionTypeInstanceTerminating = "InstanceTerminating" ConditionTypeConsistentStateFound = "ConsistentStateFound" + ConditionTypeDisruptionCandidate = "DisruptionCandidate" ) // NodeClaimStatus defines the observed state of NodeClaim diff --git a/pkg/apis/v1/nodepool.go b/pkg/apis/v1/nodepool.go index d467426e37..2e2de7a38a 100644 --- a/pkg/apis/v1/nodepool.go +++ b/pkg/apis/v1/nodepool.go @@ -139,7 +139,7 @@ const ( ) var ( - // DisruptionReasons is a list of all valid reasons for disruption budgets. + // WellKnownDisruptionReasons is a list of all valid reasons for disruption budgets. WellKnownDisruptionReasons = []DisruptionReason{DisruptionReasonUnderutilized, DisruptionReasonEmpty, DisruptionReasonDrifted} ) diff --git a/pkg/apis/v1/nodepool_conversion.go b/pkg/apis/v1/nodepool_conversion.go index 5b8e430bab..97b0258aa9 100644 --- a/pkg/apis/v1/nodepool_conversion.go +++ b/pkg/apis/v1/nodepool_conversion.go @@ -20,13 +20,11 @@ import ( "context" "encoding/json" "fmt" - "sort" "strings" "time" "github.com/samber/lo" v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/runtime/schema" "knative.dev/pkg/apis" "sigs.k8s.io/karpenter/pkg/apis/v1beta1" @@ -41,17 +39,23 @@ func (in *NodePool) ConvertTo(ctx context.Context, to apis.Convertible) error { // Convert v1 status v1beta1NP.Status.Resources = in.Status.Resources v1beta1NP.Status.Conditions = in.Status.Conditions - return in.Spec.convertTo(ctx, &v1beta1NP.Spec, in.Annotations[KubeletCompatibilityAnnotationKey]) + if err := in.Spec.convertTo(&v1beta1NP.Spec, in.Annotations[KubeletCompatibilityAnnotationKey], in.Annotations[NodeClassReferenceAnnotationKey]); err != nil { + return err + } + // Remove the annotations from the v1beta1 NodeClaim on the convert back + delete(v1beta1NP.Annotations, KubeletCompatibilityAnnotationKey) + delete(v1beta1NP.Annotations, NodeClassReferenceAnnotationKey) + return nil } -func (in *NodePoolSpec) convertTo(ctx context.Context, v1beta1np *v1beta1.NodePoolSpec, kubeletAnnotation string) error { +func (in *NodePoolSpec) convertTo(v1beta1np *v1beta1.NodePoolSpec, kubeletAnnotation, nodeClassReferenceAnnotation string) error { v1beta1np.Weight = in.Weight v1beta1np.Limits = v1beta1.Limits(in.Limits) in.Disruption.convertTo(&v1beta1np.Disruption) // Set the expireAfter to the nodeclaim template's expireAfter. // Don't convert terminationGracePeriod, as this is only included in v1. v1beta1np.Disruption.ExpireAfter = v1beta1.NillableDuration(in.Template.Spec.ExpireAfter) - return in.Template.convertTo(ctx, &v1beta1np.Template, kubeletAnnotation) + return in.Template.convertTo(&v1beta1np.Template, kubeletAnnotation, nodeClassReferenceAnnotation) } func (in *Disruption) convertTo(v1beta1np *v1beta1.Disruption) { @@ -69,7 +73,7 @@ func (in *Disruption) convertTo(v1beta1np *v1beta1.Disruption) { }) } -func (in *NodeClaimTemplate) convertTo(ctx context.Context, v1beta1np *v1beta1.NodeClaimTemplate, kubeletAnnotation string) error { +func (in *NodeClaimTemplate) convertTo(v1beta1np *v1beta1.NodeClaimTemplate, kubeletAnnotation, nodeClassReferenceAnnotation string) error { v1beta1np.ObjectMeta = v1beta1.ObjectMeta(in.ObjectMeta) v1beta1np.Spec.Taints = in.Spec.Taints v1beta1np.Spec.StartupTaints = in.Spec.StartupTaints @@ -83,28 +87,16 @@ func (in *NodeClaimTemplate) convertTo(ctx context.Context, v1beta1np *v1beta1.N MinValues: v1Requirements.MinValues, } }) - - nodeClasses := injection.GetNodeClasses(ctx) - // We are sorting the supported nodeclass, so that we are able to consistently find the same GVK, - // if multiple version of a nodeclass are supported - sort.Slice(nodeClasses, func(i int, j int) bool { - if nodeClasses[i].Group != nodeClasses[j].Group { - return nodeClasses[i].Group < nodeClasses[j].Group - } - if nodeClasses[i].Version != nodeClasses[j].Version { - return nodeClasses[i].Version < nodeClasses[j].Version + // Convert the NodeClassReference depending on whether the annotation exists + v1beta1np.Spec.NodeClassRef = &v1beta1.NodeClassReference{} + if nodeClassReferenceAnnotation != "" { + if err := json.Unmarshal([]byte(nodeClassReferenceAnnotation), v1beta1np.Spec.NodeClassRef); err != nil { + return fmt.Errorf("unmarshaling nodeClassRef annotation, %w", err) } - return nodeClasses[i].Kind < nodeClasses[j].Kind - }) - matchingNodeClass, found := lo.Find(nodeClasses, func(nc schema.GroupVersionKind) bool { - return nc.Kind == in.Spec.NodeClassRef.Kind && nc.Group == in.Spec.NodeClassRef.Group - }) - v1beta1np.Spec.NodeClassRef = &v1beta1.NodeClassReference{ - Kind: in.Spec.NodeClassRef.Kind, - Name: in.Spec.NodeClassRef.Name, - APIVersion: lo.Ternary(found, matchingNodeClass.GroupVersion().String(), ""), + } else { + v1beta1np.Spec.NodeClassRef.Name = in.Spec.NodeClassRef.Name + v1beta1np.Spec.NodeClassRef.Kind = in.Spec.NodeClassRef.Kind } - if kubeletAnnotation != "" { v1beta1kubelet := &v1beta1.KubeletConfiguration{} err := json.Unmarshal([]byte(kubeletAnnotation), v1beta1kubelet) @@ -135,6 +127,13 @@ func (in *NodePool) ConvertFrom(ctx context.Context, v1beta1np apis.Convertible) } else { in.Annotations = lo.Assign(in.Annotations, map[string]string{KubeletCompatibilityAnnotationKey: kubeletAnnotation}) } + nodeClassRefAnnotation, err := json.Marshal(v1beta1NP.Spec.Template.Spec.NodeClassRef) + if err != nil { + return fmt.Errorf("marshaling nodeClassRef annotation, %w", err) + } + in.Annotations = lo.Assign(in.Annotations, map[string]string{ + NodeClassReferenceAnnotationKey: string(nodeClassRefAnnotation), + }) return nil } @@ -176,24 +175,12 @@ func (in *NodeClaimTemplate) convertFrom(ctx context.Context, v1beta1np *v1beta1 } }) - nodeclasses := injection.GetNodeClasses(ctx) - in.Spec.NodeClassRef = &NodeClassReference{ - Name: v1beta1np.Spec.NodeClassRef.Name, - Kind: lo.Ternary(v1beta1np.Spec.NodeClassRef.Kind == "", nodeclasses[0].Kind, v1beta1np.Spec.NodeClassRef.Kind), - Group: lo.Ternary(v1beta1np.Spec.NodeClassRef.APIVersion == "", nodeclasses[0].Group, strings.Split(v1beta1np.Spec.NodeClassRef.APIVersion, "/")[0]), - } - defaultNodeClassGVK := injection.GetNodeClasses(ctx)[0] - nodeclassGroupVersion, err := schema.ParseGroupVersion(v1beta1np.Spec.NodeClassRef.APIVersion) - if err != nil { - return "", err - } in.Spec.NodeClassRef = &NodeClassReference{ Name: v1beta1np.Spec.NodeClassRef.Name, Kind: lo.Ternary(v1beta1np.Spec.NodeClassRef.Kind == "", defaultNodeClassGVK.Kind, v1beta1np.Spec.NodeClassRef.Kind), - Group: lo.Ternary(v1beta1np.Spec.NodeClassRef.APIVersion == "", defaultNodeClassGVK.Group, nodeclassGroupVersion.Group), + Group: lo.Ternary(v1beta1np.Spec.NodeClassRef.APIVersion == "", defaultNodeClassGVK.Group, strings.Split(v1beta1np.Spec.NodeClassRef.APIVersion, "/")[0]), } - if v1beta1np.Spec.Kubelet != nil { kubelet, err := json.Marshal(v1beta1np.Spec.Kubelet) if err != nil { @@ -201,6 +188,5 @@ func (in *NodeClaimTemplate) convertFrom(ctx context.Context, v1beta1np *v1beta1 } return string(kubelet), nil } - return "", nil } diff --git a/pkg/apis/v1/nodepool_conversion_test.go b/pkg/apis/v1/nodepool_conversion_test.go index 1006e7221f..c9289968ce 100644 --- a/pkg/apis/v1/nodepool_conversion_test.go +++ b/pkg/apis/v1/nodepool_conversion_test.go @@ -188,33 +188,43 @@ var _ = Describe("Convert V1 to V1beta1 NodePool API", func() { Group: object.GVK(&v1alpha1.TestNodeClass{}).Group, } Expect(v1nodepool.ConvertTo(ctx, v1beta1nodepool)).To(Succeed()) - Expect(v1beta1nodepool.Spec.Template.Spec.NodeClassRef.Kind).To(Equal(v1nodepool.Spec.Template.Spec.NodeClassRef.Kind)) Expect(v1beta1nodepool.Spec.Template.Spec.NodeClassRef.Name).To(Equal(v1nodepool.Spec.Template.Spec.NodeClassRef.Name)) - Expect(v1beta1nodepool.Spec.Template.Spec.NodeClassRef.APIVersion).To(Equal(cloudProvider.NodeClassGroupVersionKind[0].GroupVersion().String())) + Expect(v1beta1nodepool.Spec.Template.Spec.NodeClassRef.Kind).To(Equal(v1nodepool.Spec.Template.Spec.NodeClassRef.Kind)) + Expect(v1beta1nodepool.Spec.Template.Spec.NodeClassRef.APIVersion).To(BeEmpty()) }) - It("should not include APIVersion for v1beta1 if Group and Kind is not in the supported nodeclass", func() { + It("should retain NodeClassReference details when the karpenter.sh/v1beta1-nodeclass-reference annotation exists", func() { + nodeClassReference := &v1beta1.NodeClassReference{ + APIVersion: object.GVK(&v1alpha1.TestNodeClass{}).GroupVersion().String(), + Name: "nodeclass-test", + Kind: object.GVK(&v1alpha1.TestNodeClass{}).Kind, + } + nodeClassAnnotation, err := json.Marshal(nodeClassReference) + Expect(err).ToNot(HaveOccurred()) + v1nodepool.Annotations = lo.Assign(map[string]string{ + NodeClassReferenceAnnotationKey: string(nodeClassAnnotation), + }) v1nodepool.Spec.Template.Spec.NodeClassRef = &NodeClassReference{ - Kind: "test-kind", + Kind: object.GVK(&v1alpha1.TestNodeClass{}).Kind, Name: "nodeclass-test", - Group: "testgroup.sh", + Group: object.GVK(&v1alpha1.TestNodeClass{}).Group, } Expect(v1nodepool.ConvertTo(ctx, v1beta1nodepool)).To(Succeed()) - Expect(v1beta1nodepool.Spec.Template.Spec.NodeClassRef.Kind).To(Equal(v1nodepool.Spec.Template.Spec.NodeClassRef.Kind)) - Expect(v1beta1nodepool.Spec.Template.Spec.NodeClassRef.Name).To(Equal(v1nodepool.Spec.Template.Spec.NodeClassRef.Name)) - Expect(v1beta1nodepool.Spec.Template.Spec.NodeClassRef.APIVersion).To(Equal("")) + Expect(v1beta1nodepool.Spec.Template.Spec.NodeClassRef.Name).To(Equal(nodeClassReference.Name)) + Expect(v1beta1nodepool.Spec.Template.Spec.NodeClassRef.Kind).To(Equal(nodeClassReference.Kind)) + Expect(v1beta1nodepool.Spec.Template.Spec.NodeClassRef.APIVersion).To(Equal(nodeClassReference.APIVersion)) }) }) }) Context("Disruption", func() { It("should convert v1 nodepool consolidateAfter to nil with WhenEmptyOrUnderutilized", func() { v1nodepool.Spec.Disruption.ConsolidationPolicy = ConsolidationPolicyWhenEmptyOrUnderutilized - v1nodepool.Spec.Disruption.ConsolidateAfter = NillableDuration{Duration: lo.ToPtr(time.Second * 2121)} + v1nodepool.Spec.Disruption.ConsolidateAfter = MustParseNillableDuration("2121s") Expect(v1nodepool.ConvertTo(ctx, v1beta1nodepool)).To(Succeed()) Expect(v1beta1nodepool.Spec.Disruption.ConsolidateAfter).To(BeNil()) }) It("should convert v1 nodepool consolidateAfter with WhenEmpty", func() { v1nodepool.Spec.Disruption.ConsolidationPolicy = ConsolidationPolicyWhenEmpty - v1nodepool.Spec.Disruption.ConsolidateAfter = NillableDuration{Duration: lo.ToPtr(time.Second * 2121)} + v1nodepool.Spec.Disruption.ConsolidateAfter = MustParseNillableDuration("2121s") Expect(v1nodepool.ConvertTo(ctx, v1beta1nodepool)).To(Succeed()) Expect(lo.FromPtr(v1beta1nodepool.Spec.Disruption.ConsolidateAfter.Duration)).To(Equal(lo.FromPtr(v1nodepool.Spec.Disruption.ConsolidateAfter.Duration))) }) @@ -224,7 +234,7 @@ var _ = Describe("Convert V1 to V1beta1 NodePool API", func() { Expect(string(v1beta1nodepool.Spec.Disruption.ConsolidationPolicy)).To(Equal(string(v1nodepool.Spec.Disruption.ConsolidationPolicy))) }) It("should convert v1 nodepool ExpireAfter", func() { - v1nodepool.Spec.Template.Spec.ExpireAfter = NillableDuration{Duration: lo.ToPtr(time.Second * 2121)} + v1nodepool.Spec.Template.Spec.ExpireAfter = MustParseNillableDuration("2121s") Expect(v1nodepool.ConvertTo(ctx, v1beta1nodepool)).To(Succeed()) Expect(v1beta1nodepool.Spec.Disruption.ExpireAfter.Duration).To(Equal(v1nodepool.Spec.Template.Spec.ExpireAfter.Duration)) }) @@ -269,6 +279,40 @@ var _ = Describe("Convert V1 to V1beta1 NodePool API", func() { Expect(v1beta1nodepool.Status.Resources[resource]).To(Equal(v1nodepool.Status.Resources[resource])) } }) + Context("Round Trip", func() { + It("spec.template.spec.expireAfter", func() { + v1nodepool.Spec.Template.Spec.ExpireAfter = MustParseNillableDuration("10h") + Expect(v1nodepool.ConvertTo(ctx, v1beta1nodepool)).To(Succeed()) + Expect(v1nodepool.ConvertFrom(ctx, v1beta1nodepool)).To(Succeed()) + result, err := json.Marshal(v1nodepool.Spec.Template.Spec.ExpireAfter) + Expect(err).To(BeNil()) + Expect(string(result)).To(Equal(`"10h"`)) + }) + It("spec.template.spec.expireAfter (Never)", func() { + v1nodepool.Spec.Template.Spec.ExpireAfter = MustParseNillableDuration("Never") + Expect(v1nodepool.ConvertTo(ctx, v1beta1nodepool)).To(Succeed()) + Expect(v1nodepool.ConvertFrom(ctx, v1beta1nodepool)).To(Succeed()) + result, err := json.Marshal(v1nodepool.Spec.Template.Spec.ExpireAfter) + Expect(err).To(BeNil()) + Expect(string(result)).To(Equal(`"Never"`)) + }) + It("spec.disruption.consolidateAfter", func() { + v1nodepool.Spec.Disruption.ConsolidateAfter = MustParseNillableDuration("10h") + Expect(v1nodepool.ConvertTo(ctx, v1beta1nodepool)).To(Succeed()) + Expect(v1nodepool.ConvertFrom(ctx, v1beta1nodepool)).To(Succeed()) + result, err := json.Marshal(v1nodepool.Spec.Disruption.ConsolidateAfter) + Expect(err).To(BeNil()) + Expect(string(result)).To(Equal(`"10h"`)) + }) + It("spec.disruption.consolidateAfter (Never)", func() { + v1nodepool.Spec.Disruption.ConsolidateAfter = MustParseNillableDuration("Never") + Expect(v1nodepool.ConvertTo(ctx, v1beta1nodepool)).To(Succeed()) + Expect(v1nodepool.ConvertFrom(ctx, v1beta1nodepool)).To(Succeed()) + result, err := json.Marshal(v1nodepool.Spec.Disruption.ConsolidateAfter) + Expect(err).To(BeNil()) + Expect(string(result)).To(Equal(`"Never"`)) + }) + }) }) var _ = Describe("Convert V1beta1 to V1 NodePool API", func() { @@ -457,19 +501,27 @@ var _ = Describe("Convert V1beta1 to V1 NodePool API", func() { Name: "nodeclass-test", APIVersion: "testgroup.sh/testversion", } + nodeClassReferenceAnnotation, err := json.Marshal(v1beta1nodepool.Spec.Template.Spec.NodeClassRef) + Expect(err).ToNot(HaveOccurred()) + Expect(v1nodepool.ConvertFrom(ctx, v1beta1nodepool)).To(Succeed()) Expect(v1nodepool.Spec.Template.Spec.NodeClassRef.Kind).To(Equal(v1beta1nodepool.Spec.Template.Spec.NodeClassRef.Kind)) Expect(v1nodepool.Spec.Template.Spec.NodeClassRef.Name).To(Equal(v1beta1nodepool.Spec.Template.Spec.NodeClassRef.Name)) Expect(v1nodepool.Spec.Template.Spec.NodeClassRef.Group).To(Equal("testgroup.sh")) + Expect(v1nodepool.Annotations).To(HaveKeyWithValue(NodeClassReferenceAnnotationKey, string(nodeClassReferenceAnnotation))) }) It("should set default nodeclass group and kind on v1beta1 nodeclassRef", func() { v1beta1nodepool.Spec.Template.Spec.NodeClassRef = &v1beta1.NodeClassReference{ Name: "nodeclass-test", } + nodeClassReferenceAnnotation, err := json.Marshal(v1beta1nodepool.Spec.Template.Spec.NodeClassRef) + Expect(err).ToNot(HaveOccurred()) + Expect(v1nodepool.ConvertFrom(ctx, v1beta1nodepool)).To(Succeed()) Expect(v1nodepool.Spec.Template.Spec.NodeClassRef.Kind).To(Equal(cloudProvider.NodeClassGroupVersionKind[0].Kind)) Expect(v1nodepool.Spec.Template.Spec.NodeClassRef.Name).To(Equal(v1beta1nodepool.Spec.Template.Spec.NodeClassRef.Name)) Expect(v1nodepool.Spec.Template.Spec.NodeClassRef.Group).To(Equal(cloudProvider.NodeClassGroupVersionKind[0].Group)) + Expect(v1nodepool.Annotations).To(HaveKeyWithValue(NodeClassReferenceAnnotationKey, string(nodeClassReferenceAnnotation))) }) }) }) @@ -537,4 +589,38 @@ var _ = Describe("Convert V1beta1 to V1 NodePool API", func() { Expect(v1beta1nodepool.Status.Resources[resource]).To(Equal(v1nodepool.Status.Resources[resource])) } }) + Context("Round Trip", func() { + It("spec.disruption.expireAfter", func() { + v1beta1nodepool.Spec.Disruption.ExpireAfter = v1beta1.MustParseNillableDuration("10h") + Expect(v1nodepool.ConvertFrom(ctx, v1beta1nodepool)).To(Succeed()) + Expect(v1nodepool.ConvertTo(ctx, v1beta1nodepool)).To(Succeed()) + result, err := json.Marshal(v1beta1nodepool.Spec.Disruption.ExpireAfter) + Expect(err).To(BeNil()) + Expect(string(result)).To(Equal(`"10h"`)) + }) + It("spec.disruption.expireAfter (Never)", func() { + v1beta1nodepool.Spec.Disruption.ExpireAfter = v1beta1.MustParseNillableDuration("Never") + Expect(v1nodepool.ConvertFrom(ctx, v1beta1nodepool)).To(Succeed()) + Expect(v1nodepool.ConvertTo(ctx, v1beta1nodepool)).To(Succeed()) + result, err := json.Marshal(v1beta1nodepool.Spec.Disruption.ExpireAfter) + Expect(err).To(BeNil()) + Expect(string(result)).To(Equal(`"Never"`)) + }) + It("spec.disruption.consolidateAfter", func() { + v1beta1nodepool.Spec.Disruption.ConsolidateAfter = lo.ToPtr(v1beta1.MustParseNillableDuration("10h")) + Expect(v1nodepool.ConvertFrom(ctx, v1beta1nodepool)).To(Succeed()) + Expect(v1nodepool.ConvertTo(ctx, v1beta1nodepool)).To(Succeed()) + result, err := json.Marshal(lo.FromPtr(v1beta1nodepool.Spec.Disruption.ConsolidateAfter)) + Expect(err).To(BeNil()) + Expect(string(result)).To(Equal(`"10h"`)) + }) + It("spec.disruption.consolidateAfter (Never)", func() { + v1beta1nodepool.Spec.Disruption.ConsolidateAfter = lo.ToPtr(v1beta1.MustParseNillableDuration("Never")) + Expect(v1nodepool.ConvertFrom(ctx, v1beta1nodepool)).To(Succeed()) + Expect(v1nodepool.ConvertTo(ctx, v1beta1nodepool)).To(Succeed()) + result, err := json.Marshal(lo.FromPtr(v1beta1nodepool.Spec.Disruption.ConsolidateAfter)) + Expect(err).To(BeNil()) + Expect(string(result)).To(Equal(`"Never"`)) + }) + }) }) diff --git a/pkg/apis/v1/nodepool_validation_cel_test.go b/pkg/apis/v1/nodepool_validation_cel_test.go index 0590ede1bb..e5e62cb2ef 100644 --- a/pkg/apis/v1/nodepool_validation_cel_test.go +++ b/pkg/apis/v1/nodepool_validation_cel_test.go @@ -64,47 +64,47 @@ var _ = Describe("CEL/Validation", func() { }) Context("Disruption", func() { It("should fail on negative expireAfter", func() { - nodePool.Spec.Template.Spec.ExpireAfter.Duration = lo.ToPtr(lo.Must(time.ParseDuration("-1s"))) + nodePool.Spec.Template.Spec.ExpireAfter = MustParseNillableDuration("-1s") Expect(env.Client.Create(ctx, nodePool)).ToNot(Succeed()) }) It("should succeed on a disabled expireAfter", func() { - nodePool.Spec.Template.Spec.ExpireAfter.Duration = nil + nodePool.Spec.Template.Spec.ExpireAfter = MustParseNillableDuration("Never") Expect(env.Client.Create(ctx, nodePool)).To(Succeed()) }) It("should succeed on a valid expireAfter", func() { - nodePool.Spec.Template.Spec.ExpireAfter.Duration = lo.ToPtr(lo.Must(time.ParseDuration("30s"))) + nodePool.Spec.Template.Spec.ExpireAfter = MustParseNillableDuration("30s") Expect(env.Client.Create(ctx, nodePool)).To(Succeed()) }) It("should fail on negative consolidateAfter", func() { - nodePool.Spec.Disruption.ConsolidateAfter = NillableDuration{Duration: lo.ToPtr(lo.Must(time.ParseDuration("-1s")))} + nodePool.Spec.Disruption.ConsolidateAfter = MustParseNillableDuration("-1s") Expect(env.Client.Create(ctx, nodePool)).ToNot(Succeed()) }) It("should succeed on a disabled consolidateAfter", func() { - nodePool.Spec.Disruption.ConsolidateAfter = NillableDuration{Duration: nil} + nodePool.Spec.Disruption.ConsolidateAfter = MustParseNillableDuration("Never") Expect(env.Client.Create(ctx, nodePool)).To(Succeed()) }) It("should succeed on a valid consolidateAfter", func() { - nodePool.Spec.Disruption.ConsolidateAfter = NillableDuration{Duration: lo.ToPtr(lo.Must(time.ParseDuration("30s")))} + nodePool.Spec.Disruption.ConsolidateAfter = MustParseNillableDuration("30s") nodePool.Spec.Disruption.ConsolidationPolicy = ConsolidationPolicyWhenEmpty Expect(env.Client.Create(ctx, nodePool)).To(Succeed()) }) It("should succeed when setting consolidateAfter with consolidationPolicy=WhenEmpty", func() { - nodePool.Spec.Disruption.ConsolidateAfter = NillableDuration{Duration: lo.ToPtr(lo.Must(time.ParseDuration("30s")))} + nodePool.Spec.Disruption.ConsolidateAfter = MustParseNillableDuration("30s") nodePool.Spec.Disruption.ConsolidationPolicy = ConsolidationPolicyWhenEmpty Expect(env.Client.Create(ctx, nodePool)).To(Succeed()) }) It("should succeed when setting consolidateAfter with consolidationPolicy=WhenUnderutilized", func() { - nodePool.Spec.Disruption.ConsolidateAfter = NillableDuration{Duration: lo.ToPtr(lo.Must(time.ParseDuration("30s")))} + nodePool.Spec.Disruption.ConsolidateAfter = MustParseNillableDuration("30s") nodePool.Spec.Disruption.ConsolidationPolicy = ConsolidationPolicyWhenEmptyOrUnderutilized Expect(env.Client.Create(ctx, nodePool)).To(Succeed()) }) It("should succeed when setting consolidateAfter to 'Never' with consolidationPolicy=WhenUnderutilized", func() { - nodePool.Spec.Disruption.ConsolidateAfter = NillableDuration{Duration: nil} + nodePool.Spec.Disruption.ConsolidateAfter = MustParseNillableDuration("Never") nodePool.Spec.Disruption.ConsolidationPolicy = ConsolidationPolicyWhenEmptyOrUnderutilized Expect(env.Client.Create(ctx, nodePool)).To(Succeed()) }) It("should succeed when setting consolidateAfter to 'Never' with consolidationPolicy=WhenEmpty", func() { - nodePool.Spec.Disruption.ConsolidateAfter = NillableDuration{Duration: nil} + nodePool.Spec.Disruption.ConsolidateAfter = MustParseNillableDuration("Never") nodePool.Spec.Disruption.ConsolidationPolicy = ConsolidationPolicyWhenEmpty Expect(env.Client.Create(ctx, nodePool)).To(Succeed()) }) diff --git a/pkg/apis/v1/zz_generated.deepcopy.go b/pkg/apis/v1/zz_generated.deepcopy.go index e1d62f319c..a700b38744 100644 --- a/pkg/apis/v1/zz_generated.deepcopy.go +++ b/pkg/apis/v1/zz_generated.deepcopy.go @@ -110,6 +110,11 @@ func (in *NillableDuration) DeepCopyInto(out *NillableDuration) { *out = new(timex.Duration) **out = **in } + if in.Raw != nil { + in, out := &in.Raw, &out.Raw + *out = make([]byte, len(*in)) + copy(*out, *in) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new NillableDuration. @@ -217,6 +222,11 @@ func (in *NodeClaimSpec) DeepCopyInto(out *NodeClaimSpec) { **out = **in } in.ExpireAfter.DeepCopyInto(&out.ExpireAfter) + if in.MinimumPriceImprovementPercent != nil { + in, out := &in.MinimumPriceImprovementPercent, &out.MinimumPriceImprovementPercent + *out = new(int32) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new NodeClaimSpec. diff --git a/pkg/apis/v1beta1/duration.go b/pkg/apis/v1beta1/duration.go index 66ad492c95..052dae5ac8 100644 --- a/pkg/apis/v1beta1/duration.go +++ b/pkg/apis/v1beta1/duration.go @@ -18,7 +18,11 @@ package v1beta1 import ( "encoding/json" + "fmt" + "slices" "time" + + "github.com/samber/lo" ) const Never = "Never" @@ -28,6 +32,17 @@ const Never = "Never" // that the duration is disabled and sets the inner duration as nil type NillableDuration struct { *time.Duration + + // Raw is used to ensure we remarshal the NillableDuration in the same format it was specified. + // This ensures tools like Flux and ArgoCD don't mistakenly detect drift due to our conversion webhooks. + Raw []byte `hash:"ignore"` +} + +func MustParseNillableDuration(val string) NillableDuration { + nd := NillableDuration{} + // Use %q instead of %s to ensure that we unmarshal the value as a string and not an int + lo.Must0(json.Unmarshal([]byte(fmt.Sprintf("%q", val)), &nd)) + return nd } // UnmarshalJSON implements the json.Unmarshaller interface. @@ -44,22 +59,29 @@ func (d *NillableDuration) UnmarshalJSON(b []byte) error { if err != nil { return err } + d.Raw = slices.Clone(b) d.Duration = &pd return nil } // MarshalJSON implements the json.Marshaler interface. func (d NillableDuration) MarshalJSON() ([]byte, error) { - if d.Duration == nil { - return json.Marshal(Never) + if d.Raw != nil { + return d.Raw, nil + } + if d.Duration != nil { + return json.Marshal(d.Duration.String()) } - return json.Marshal(d.Duration.String()) + return json.Marshal(Never) } // ToUnstructured implements the value.UnstructuredConverter interface. func (d NillableDuration) ToUnstructured() interface{} { - if d.Duration == nil { - return Never + if d.Raw != nil { + return d.Raw + } + if d.Duration != nil { + return d.Duration.String() } - return d.Duration.String() + return Never } diff --git a/pkg/apis/v1beta1/nodeclaim_status.go b/pkg/apis/v1beta1/nodeclaim_status.go index 136c71875c..fe380b3b90 100644 --- a/pkg/apis/v1beta1/nodeclaim_status.go +++ b/pkg/apis/v1beta1/nodeclaim_status.go @@ -29,6 +29,7 @@ const ( ConditionTypeDrifted = "Drifted" ConditionTypeTerminating = "Terminating" ConditionTypeConsistentStateFound = "ConsistentStateFound" + ConditionTypeDisruptionCandidate = "DisruptionCandidate" ) // NodeClaimStatus defines the observed state of NodeClaim diff --git a/pkg/apis/v1beta1/zz_generated.deepcopy.go b/pkg/apis/v1beta1/zz_generated.deepcopy.go index f008245558..f91b673996 100644 --- a/pkg/apis/v1beta1/zz_generated.deepcopy.go +++ b/pkg/apis/v1beta1/zz_generated.deepcopy.go @@ -195,6 +195,11 @@ func (in *NillableDuration) DeepCopyInto(out *NillableDuration) { *out = new(timex.Duration) **out = **in } + if in.Raw != nil { + in, out := &in.Raw, &out.Raw + *out = make([]byte, len(*in)) + copy(*out, *in) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new NillableDuration. diff --git a/pkg/controllers/disruption/consolidation.go b/pkg/controllers/disruption/consolidation.go index e3539be371..78e6f53bce 100644 --- a/pkg/controllers/disruption/consolidation.go +++ b/pkg/controllers/disruption/consolidation.go @@ -23,6 +23,8 @@ import ( "sort" "time" + "sigs.k8s.io/controller-runtime/pkg/log" + "github.com/samber/lo" corev1 "k8s.io/api/core/v1" "k8s.io/utils/clock" @@ -122,6 +124,7 @@ func (c *consolidation) computeConsolidation(ctx context.Context, candidates ... } return Command{}, pscheduling.Results{}, err } + log.FromContext(ctx).V(1).Info(fmt.Sprintf("computeConsolidation len candidates: %d, new node claims: %d", len(candidates), len(results.NewNodeClaims))) // if not all of the pods were scheduled, we can't do anything if !results.AllNonPendingPodsScheduled() { @@ -129,6 +132,7 @@ func (c *consolidation) computeConsolidation(ctx context.Context, candidates ... if len(candidates) == 1 { c.recorder.Publish(disruptionevents.Unconsolidatable(candidates[0].Node, candidates[0].NodeClaim, results.NonPendingPodSchedulingErrors())...) } + log.FromContext(ctx).V(1).Info(fmt.Sprintf("computeConsolidation abandoned, not all pods scheduled: %s - len candidates: %d, new node claims: %d", results.NonPendingPodSchedulingErrors(), len(candidates), len(results.NewNodeClaims))) return Command{}, pscheduling.Results{}, nil } @@ -141,15 +145,16 @@ func (c *consolidation) computeConsolidation(ctx context.Context, candidates ... // we're not going to turn a single node into multiple candidates if len(results.NewNodeClaims) != 1 { - if len(candidates) == 1 { + if len(candidates) <= len(results.NewNodeClaims) { + log.FromContext(ctx).V(1).Info(fmt.Sprintf("computeConsolidation abandoned, new nodes >= existing candidates %d, new node claims: %d", len(candidates), len(results.NewNodeClaims))) c.recorder.Publish(disruptionevents.Unconsolidatable(candidates[0].Node, candidates[0].NodeClaim, fmt.Sprintf("Can't remove without creating %d candidates", len(results.NewNodeClaims)))...) + return Command{}, pscheduling.Results{}, nil } - return Command{}, pscheduling.Results{}, nil } // get the current node price based on the offering // fallback if we can't find the specific zonal pricing data - candidatePrice, err := getCandidatePrices(candidates) + candidatePrice, err := getCandidatePrices(ctx, candidates) if err != nil { return Command{}, pscheduling.Results{}, fmt.Errorf("getting offering price from candidate node, %w", err) } @@ -167,6 +172,7 @@ func (c *consolidation) computeConsolidation(ctx context.Context, candidates ... if allExistingAreSpot && results.NewNodeClaims[0].Requirements.Get(v1.CapacityTypeLabelKey).Has(v1.CapacityTypeSpot) { + log.FromContext(ctx).V(1).Info(fmt.Sprintf("computeConsolidation allExistingAreSpot len candidates: %d, new node claims: %d", len(candidates), len(results.NewNodeClaims))) return c.computeSpotToSpotConsolidation(ctx, candidates, results, candidatePrice) } @@ -180,12 +186,14 @@ func (c *consolidation) computeConsolidation(ctx context.Context, candidates ... if len(candidates) == 1 { c.recorder.Publish(disruptionevents.Unconsolidatable(candidates[0].Node, candidates[0].NodeClaim, fmt.Sprintf("Filtering by price: %v", err))...) } + log.FromContext(ctx).V(1).Info(fmt.Sprintf("computeConsolidation abandoned, err: %s len candidates: %d, new node claims: %d", err.Error(), len(candidates), len(results.NewNodeClaims))) return Command{}, pscheduling.Results{}, nil } if len(results.NewNodeClaims[0].NodeClaimTemplate.InstanceTypeOptions) == 0 { if len(candidates) == 1 { c.recorder.Publish(disruptionevents.Unconsolidatable(candidates[0].Node, candidates[0].NodeClaim, "Can't replace with a cheaper node")...) } + log.FromContext(ctx).V(1).Info(fmt.Sprintf("computeConsolidation abandoned, won't replace 1 candidate for less instance type options. len candidates: %d, new node claims: %d", len(candidates), len(results.NewNodeClaims))) return Command{}, pscheduling.Results{}, nil } @@ -285,14 +293,32 @@ func (c *consolidation) computeSpotToSpotConsolidation(ctx context.Context, cand } // getCandidatePrices returns the sum of the prices of the given candidates -func getCandidatePrices(candidates []*Candidate) (float64, error) { +func getCandidatePrices(ctx context.Context, candidates []*Candidate) (float64, error) { var price float64 for _, c := range candidates { compatibleOfferings := c.instanceType.Offerings.Compatible(scheduling.NewLabelRequirements(c.StateNode.Labels())) if len(compatibleOfferings) == 0 { return 0.0, fmt.Errorf("unable to determine offering for %s/%s/%s", c.instanceType.Name, c.capacityType, c.zone) } - price += compatibleOfferings.Cheapest().Price + + // limit maximum candidate replacement price for consideration + originalCheapestPrice := compatibleOfferings.Cheapest().Price + cheapestConsiderablePrice := originalCheapestPrice + if candidates[0].NodeClaim.Spec.MinimumPriceImprovementPercent != nil { + candidateMaxPriceFactor := 1 - (float64(*c.NodeClaim.Spec.MinimumPriceImprovementPercent) / 100) + cheapestConsiderablePrice *= candidateMaxPriceFactor + + log.FromContext(ctx).WithValues( + "originalPrice", originalCheapestPrice, + "requiredPrice", cheapestConsiderablePrice, + "minimumPriceImprovementPercent", *c.NodeClaim.Spec.MinimumPriceImprovementPercent, + ).Info("price threshold for consolidation") + + // con.recorder.Publish(disruptionevents.PriceThreshold(c.Node, c.NodeClaim, fmt.Sprintf("PriceThreshold for Consolidation, original price: %v, required price: %v, price factor: %v%", + // originalCheapestPrice, cheapestConsiderablePrice, *c.NodeClaim.Spec.MinimumPriceImprovementPercent))) + } + + price += cheapestConsiderablePrice } return price, nil } diff --git a/pkg/controllers/disruption/consolidation_test.go b/pkg/controllers/disruption/consolidation_test.go index 3f913a8b5c..f4ae7ca2f4 100644 --- a/pkg/controllers/disruption/consolidation_test.go +++ b/pkg/controllers/disruption/consolidation_test.go @@ -66,7 +66,7 @@ var _ = Describe("Consolidation", func() { Budgets: []v1.Budget{{ Nodes: "100%", }}, - ConsolidateAfter: v1.NillableDuration{Duration: lo.ToPtr(0 * time.Second)}, + ConsolidateAfter: v1.MustParseNillableDuration("0s"), }, }, }) @@ -103,7 +103,7 @@ var _ = Describe("Consolidation", func() { Context("Events", func() { It("should not fire an event for ConsolidationDisabled when the NodePool has consolidation set to WhenEmptyOrUnderutilized", func() { nodePool.Spec.Disruption.ConsolidationPolicy = v1.ConsolidationPolicyWhenEmptyOrUnderutilized - nodePool.Spec.Disruption.ConsolidateAfter = v1.NillableDuration{Duration: lo.ToPtr(time.Duration(0))} + nodePool.Spec.Disruption.ConsolidateAfter = v1.MustParseNillableDuration("0s") ExpectApplied(ctx, env.Client, node, nodeClaim, nodePool) ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{node}, []*v1.NodeClaim{nodeClaim}) @@ -118,7 +118,7 @@ var _ = Describe("Consolidation", func() { It("should fire an event for ConsolidationDisabled when the NodePool has consolidation set to WhenEmpty", func() { pod := test.Pod() nodePool.Spec.Disruption.ConsolidationPolicy = v1.ConsolidationPolicyWhenEmpty - nodePool.Spec.Disruption.ConsolidateAfter = v1.NillableDuration{Duration: lo.ToPtr(time.Minute)} + nodePool.Spec.Disruption.ConsolidateAfter = v1.MustParseNillableDuration("1m") ExpectApplied(ctx, env.Client, pod, node, nodeClaim, nodePool) ExpectManualBinding(ctx, env.Client, pod, node) @@ -128,7 +128,7 @@ var _ = Describe("Consolidation", func() { }) It("should fire an event for ConsolidationDisabled when the NodePool has consolidateAfter set to 'Never'", func() { pod := test.Pod() - nodePool.Spec.Disruption.ConsolidateAfter = v1.NillableDuration{} + nodePool.Spec.Disruption.ConsolidateAfter = v1.MustParseNillableDuration("Never") ExpectApplied(ctx, env.Client, pod, node, nodeClaim, nodePool) ExpectManualBinding(ctx, env.Client, pod, node) @@ -157,7 +157,7 @@ var _ = Describe("Consolidation", func() { fakeClock.Step(10 * time.Minute) ExpectSingletonReconciled(ctx, disruptionController) ExpectMetricGaugeValue(disruption.EligibleNodes, 0, map[string]string{ - metrics.ReasonLabel: string(v1.DisruptionReasonUnderutilized), + metrics.ReasonLabel: "underutilized", }) // remove the do-not-disrupt annotation to make the node eligible for consolidation and update cluster state @@ -172,7 +172,7 @@ var _ = Describe("Consolidation", func() { wg.Wait() ExpectMetricGaugeValue(disruption.EligibleNodes, 1, map[string]string{ - metrics.ReasonLabel: string(v1.DisruptionReasonUnderutilized), + metrics.ReasonLabel: "underutilized", }) }) }) @@ -379,7 +379,7 @@ var _ = Describe("Consolidation", func() { Spec: v1.NodePoolSpec{ Disruption: v1.Disruption{ ConsolidationPolicy: v1.ConsolidationPolicyWhenEmptyOrUnderutilized, - ConsolidateAfter: v1.NillableDuration{Duration: lo.ToPtr(time.Duration(0))}, + ConsolidateAfter: v1.MustParseNillableDuration("0s"), Budgets: []v1.Budget{{ // 1/2 of 3 nodes == 1.5 nodes. This should round up to 2. Nodes: "50%", @@ -446,7 +446,7 @@ var _ = Describe("Consolidation", func() { Spec: v1.NodePoolSpec{ Disruption: v1.Disruption{ ConsolidationPolicy: v1.ConsolidationPolicyWhenEmptyOrUnderutilized, - ConsolidateAfter: v1.NillableDuration{Duration: lo.ToPtr(time.Duration(0))}, + ConsolidateAfter: v1.MustParseNillableDuration("0s"), Budgets: []v1.Budget{{ Nodes: "100%", }}, @@ -512,7 +512,7 @@ var _ = Describe("Consolidation", func() { Spec: v1.NodePoolSpec{ Disruption: v1.Disruption{ ConsolidationPolicy: v1.ConsolidationPolicyWhenEmptyOrUnderutilized, - ConsolidateAfter: v1.NillableDuration{Duration: lo.ToPtr(time.Duration(0))}, + ConsolidateAfter: v1.MustParseNillableDuration("0s"), Budgets: []v1.Budget{{ Nodes: "0%", }}, @@ -597,7 +597,7 @@ var _ = Describe("Consolidation", func() { Spec: v1.NodePoolSpec{ Disruption: v1.Disruption{ ConsolidationPolicy: v1.ConsolidationPolicyWhenEmptyOrUnderutilized, - ConsolidateAfter: v1.NillableDuration{Duration: lo.ToPtr(time.Duration(0))}, + ConsolidateAfter: v1.MustParseNillableDuration("0s"), Budgets: []v1.Budget{{ Nodes: "0%", }}, @@ -684,7 +684,7 @@ var _ = Describe("Consolidation", func() { Spec: v1.NodePoolSpec{ Disruption: v1.Disruption{ ConsolidationPolicy: v1.ConsolidationPolicyWhenEmptyOrUnderutilized, - ConsolidateAfter: v1.NillableDuration{Duration: lo.ToPtr(time.Duration(0))}, + ConsolidateAfter: v1.MustParseNillableDuration("0s"), Budgets: []v1.Budget{{ Nodes: "0%", }}, @@ -771,7 +771,7 @@ var _ = Describe("Consolidation", func() { Spec: v1.NodePoolSpec{ Disruption: v1.Disruption{ ConsolidationPolicy: v1.ConsolidationPolicyWhenEmptyOrUnderutilized, - ConsolidateAfter: v1.NillableDuration{Duration: lo.ToPtr(time.Duration(0))}, + ConsolidateAfter: v1.MustParseNillableDuration("0s"), Budgets: []v1.Budget{{ Nodes: "0%", }}, @@ -3947,7 +3947,7 @@ var _ = Describe("Consolidation", func() { var nodes []*corev1.Node BeforeEach(func() { - nodePool.Spec.Template.Spec.ExpireAfter = v1.NillableDuration{Duration: lo.ToPtr(3 * time.Second)} + nodePool.Spec.Template.Spec.ExpireAfter = v1.MustParseNillableDuration("3s") nodeClaims, nodes = test.NodeClaimsAndNodes(2, v1.NodeClaim{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ @@ -3958,7 +3958,7 @@ var _ = Describe("Consolidation", func() { }, }, Spec: v1.NodeClaimSpec{ - ExpireAfter: v1.NillableDuration{Duration: lo.ToPtr(3 * time.Second)}, + ExpireAfter: v1.MustParseNillableDuration("3s"), }, Status: v1.NodeClaimStatus{ Allocatable: map[corev1.ResourceName]resource.Quantity{ diff --git a/pkg/controllers/disruption/controller.go b/pkg/controllers/disruption/controller.go index 74eb574404..49b3385b54 100644 --- a/pkg/controllers/disruption/controller.go +++ b/pkg/controllers/disruption/controller.go @@ -21,6 +21,7 @@ import ( "context" "errors" "fmt" + "strings" "sync" "time" @@ -85,7 +86,7 @@ func NewController(clk clock.Clock, kubeClient client.Client, provisioner *provi // Attempt to identify multiple NodeClaims that we can consolidate simultaneously to reduce pod churn NewMultiNodeConsolidation(c), // And finally fall back our single NodeClaim consolidation to further reduce cluster cost. - NewSingleNodeConsolidation(c), + //NewSingleNodeConsolidation(c), }, } } @@ -130,7 +131,7 @@ func (c *Controller) Reconcile(ctx context.Context) (reconcile.Result, error) { c.recordRun(fmt.Sprintf("%T", m)) success, err := c.disrupt(ctx, m) if err != nil { - return reconcile.Result{}, fmt.Errorf("disrupting via %q, %w", m.Reason(), err) + return reconcile.Result{}, fmt.Errorf("disrupting via reason=%q, %w", strings.ToLower(string(m.Reason())), err) } if success { return reconcile.Result{RequeueAfter: singleton.RequeueImmediately}, nil @@ -143,7 +144,7 @@ func (c *Controller) Reconcile(ctx context.Context) (reconcile.Result, error) { func (c *Controller) disrupt(ctx context.Context, disruption Method) (bool, error) { defer metrics.Measure(EvaluationDurationSeconds.With(map[string]string{ - metrics.ReasonLabel: string(disruption.Reason()), + metrics.ReasonLabel: strings.ToLower(string(disruption.Reason())), consolidationTypeLabel: disruption.ConsolidationType(), }))() candidates, err := GetCandidates(ctx, c.cluster, c.kubeClient, c.recorder, c.clock, c.cloudProvider, disruption.ShouldDisrupt, disruption.Class(), c.queue) @@ -151,7 +152,7 @@ func (c *Controller) disrupt(ctx context.Context, disruption Method) (bool, erro return false, fmt.Errorf("determining candidates, %w", err) } EligibleNodes.With(map[string]string{ - metrics.ReasonLabel: string(disruption.Reason()), + metrics.ReasonLabel: strings.ToLower(string(disruption.Reason())), }).Set(float64(len(candidates))) // If there are no candidates, move to the next disruption @@ -184,7 +185,7 @@ func (c *Controller) disrupt(ctx context.Context, disruption Method) (bool, erro // 3. Add Command to orchestration.Queue to wait to delete the candiates. func (c *Controller) executeCommand(ctx context.Context, m Method, cmd Command, schedulingResults scheduling.Results) error { commandID := uuid.NewUUID() - log.FromContext(ctx).WithValues("command-id", commandID).Info(fmt.Sprintf("disrupting via %s %s", m.Reason(), cmd)) + log.FromContext(ctx).WithValues("command-id", commandID, "reason", strings.ToLower(string(m.Reason()))).Info(fmt.Sprintf("disrupting nodeclaim(s) via %s", cmd)) stateNodes := lo.Map(cmd.candidates, func(c *Candidate, _ int) *state.StateNode { return c.StateNode @@ -219,24 +220,41 @@ func (c *Controller) executeCommand(ctx context.Context, m Method, cmd Command, // We have the new NodeClaims created at the API server so mark the old NodeClaims for deletion c.cluster.MarkForDeletion(providerIDs...) + // Set the status of the nodeclaims to reflect that they are disruption candidates + err = multierr.Combine(lo.Map(cmd.candidates, func(candidate *Candidate, _ int) error { + m.Class() + candidate.NodeClaim.StatusConditions().SetTrueWithReason(v1.ConditionTypeDisruptionCandidate, v1.ConditionTypeDisruptionCandidate, disruptionReason(m, candidate.NodeClaim)) + return c.kubeClient.Status().Update(ctx, candidate.NodeClaim) + })...) + if err != nil { + return multierr.Append(fmt.Errorf("updating nodeclaim status: %w", err), state.RequireNoScheduleTaint(ctx, c.kubeClient, false, stateNodes...)) + } + if err := c.queue.Add(orchestration.NewCommand(nodeClaimNames, lo.Map(cmd.candidates, func(c *Candidate, _ int) *state.StateNode { return c.StateNode }), commandID, m.Reason(), m.ConsolidationType())); err != nil { c.cluster.UnmarkForDeletion(providerIDs...) + err = multierr.Combine(err, multierr.Combine(lo.Map(cmd.candidates, func(candidate *Candidate, _ int) error { + return multierr.Append(candidate.NodeClaim.StatusConditions().Clear(v1.ConditionTypeDisruptionCandidate), c.kubeClient.Status().Update(ctx, candidate.NodeClaim)) + })...)) return fmt.Errorf("adding command to queue (command-id: %s), %w", commandID, multierr.Append(err, state.RequireNoScheduleTaint(ctx, c.kubeClient, false, stateNodes...))) } // An action is only performed and pods/nodes are only disrupted after a successful add to the queue DecisionsPerformedTotal.With(map[string]string{ decisionLabel: string(cmd.Decision()), - metrics.ReasonLabel: string(m.Reason()), + metrics.ReasonLabel: strings.ToLower(string(m.Reason())), consolidationTypeLabel: m.ConsolidationType(), }).Inc() return nil } +func disruptionReason(m Method, nodeClaim *v1.NodeClaim) string { + return fmt.Sprintf("node %s/%s was disrupted, reason: %s, consolidationType: %s", nodeClaim.Name, nodeClaim.Status.NodeName, m.Reason(), m.ConsolidationType()) +} + // createReplacementNodeClaims creates replacement NodeClaims func (c *Controller) createReplacementNodeClaims(ctx context.Context, m Method, cmd Command) ([]string, error) { - nodeClaimNames, err := c.provisioner.CreateNodeClaims(ctx, cmd.replacements, provisioning.WithReason(string(m.Reason()))) + nodeClaimNames, err := c.provisioner.CreateNodeClaims(ctx, cmd.replacements, provisioning.WithReason(strings.ToLower(string(m.Reason())))) if err != nil { return nil, err } diff --git a/pkg/controllers/disruption/drift.go b/pkg/controllers/disruption/drift.go index 88080563d5..145f9294ef 100644 --- a/pkg/controllers/disruption/drift.go +++ b/pkg/controllers/disruption/drift.go @@ -70,9 +70,9 @@ func (d *Drift) ComputeCommand(ctx context.Context, disruptionBudgetMapping map[ } // If there's disruptions allowed for the candidate's nodepool, // add it to the list of candidates, and decrement the budget. - if disruptionBudgetMapping[candidate.nodePool.Name][v1.DisruptionReasonDrifted] > 0 { + if disruptionBudgetMapping[candidate.nodePool.Name][d.Reason()] > 0 { empty = append(empty, candidate) - disruptionBudgetMapping[candidate.nodePool.Name][v1.DisruptionReasonDrifted]-- + disruptionBudgetMapping[candidate.nodePool.Name][d.Reason()]-- } } // Disrupt all empty drifted candidates, as they require no scheduling simulations. @@ -86,7 +86,7 @@ func (d *Drift) ComputeCommand(ctx context.Context, disruptionBudgetMapping map[ // If the disruption budget doesn't allow this candidate to be disrupted, // continue to the next candidate. We don't need to decrement any budget // counter since drift commands can only have one candidate. - if disruptionBudgetMapping[candidate.nodePool.Name][v1.DisruptionReasonDrifted] == 0 { + if disruptionBudgetMapping[candidate.nodePool.Name][d.Reason()] == 0 { continue } // Check if we need to create any NodeClaims. diff --git a/pkg/controllers/disruption/drift_test.go b/pkg/controllers/disruption/drift_test.go index d8521233cd..6dc3dfa30e 100644 --- a/pkg/controllers/disruption/drift_test.go +++ b/pkg/controllers/disruption/drift_test.go @@ -50,7 +50,7 @@ var _ = Describe("Drift", func() { nodePool = test.NodePool(v1.NodePool{ Spec: v1.NodePoolSpec{ Disruption: v1.Disruption{ - ConsolidateAfter: v1.NillableDuration{Duration: nil}, + ConsolidateAfter: v1.MustParseNillableDuration("Never"), // Disrupt away! Budgets: []v1.Budget{{ Nodes: "100%", @@ -79,7 +79,7 @@ var _ = Describe("Drift", func() { }) Context("Metrics", func() { var eligibleNodesLabels = map[string]string{ - metrics.ReasonLabel: string(v1.DisruptionReasonDrifted), + metrics.ReasonLabel: "drifted", } It("should correctly report eligible nodes", func() { pod := test.Pod(test.PodOptions{ @@ -316,7 +316,7 @@ var _ = Describe("Drift", func() { nps := test.NodePools(10, v1.NodePool{ Spec: v1.NodePoolSpec{ Disruption: v1.Disruption{ - ConsolidateAfter: v1.NillableDuration{Duration: nil}, + ConsolidateAfter: v1.MustParseNillableDuration("Never"), Budgets: []v1.Budget{{ // 1/2 of 3 nodes == 1.5 nodes. This should round up to 2. Nodes: "50%", @@ -324,7 +324,7 @@ var _ = Describe("Drift", func() { }, Template: v1.NodeClaimTemplate{ Spec: v1.NodeClaimTemplateSpec{ - ExpireAfter: v1.NillableDuration{Duration: nil}, + ExpireAfter: v1.MustParseNillableDuration("Never"), }, }, }, @@ -383,7 +383,7 @@ var _ = Describe("Drift", func() { nps := test.NodePools(10, v1.NodePool{ Spec: v1.NodePoolSpec{ Disruption: v1.Disruption{ - ConsolidateAfter: v1.NillableDuration{Duration: nil}, + ConsolidateAfter: v1.MustParseNillableDuration("Never"), Budgets: []v1.Budget{{ Nodes: "100%", }}, diff --git a/pkg/controllers/disruption/emptiness.go b/pkg/controllers/disruption/emptiness.go index aec127edb9..31954c738a 100644 --- a/pkg/controllers/disruption/emptiness.go +++ b/pkg/controllers/disruption/emptiness.go @@ -68,7 +68,7 @@ func (e *Emptiness) ComputeCommand(ctx context.Context, disruptionBudgetMapping if len(candidate.reschedulablePods) > 0 { continue } - if disruptionBudgetMapping[candidate.nodePool.Name][v1.DisruptionReasonEmpty] == 0 { + if disruptionBudgetMapping[candidate.nodePool.Name][e.Reason()] == 0 { // set constrainedByBudgets to true if any node was a candidate but was constrained by a budget constrainedByBudgets = true continue @@ -76,7 +76,7 @@ func (e *Emptiness) ComputeCommand(ctx context.Context, disruptionBudgetMapping // If there's disruptions allowed for the candidate's nodepool, // add it to the list of candidates, and decrement the budget. empty = append(empty, candidate) - disruptionBudgetMapping[candidate.nodePool.Name][v1.DisruptionReasonEmpty]-- + disruptionBudgetMapping[candidate.nodePool.Name][e.Reason()]-- } // none empty, so do nothing if len(empty) == 0 { @@ -102,7 +102,7 @@ func (e *Emptiness) ComputeCommand(ctx context.Context, disruptionBudgetMapping case <-e.clock.After(consolidationTTL): } - v := NewValidation(e.clock, e.cluster, e.kubeClient, e.provisioner, e.cloudProvider, e.recorder, e.queue, v1.DisruptionReasonEmpty) + v := NewValidation(e.clock, e.cluster, e.kubeClient, e.provisioner, e.cloudProvider, e.recorder, e.queue, e.Reason()) validatedCandidates, err := v.ValidateCandidates(ctx, cmd.candidates...) if err != nil { if IsValidationError(err) { diff --git a/pkg/controllers/disruption/emptiness_test.go b/pkg/controllers/disruption/emptiness_test.go index c24c6d7457..a6e7420064 100644 --- a/pkg/controllers/disruption/emptiness_test.go +++ b/pkg/controllers/disruption/emptiness_test.go @@ -54,7 +54,7 @@ var _ = Describe("Emptiness", func() { nodePool = test.NodePool(v1.NodePool{ Spec: v1.NodePoolSpec{ Disruption: v1.Disruption{ - ConsolidateAfter: v1.NillableDuration{Duration: lo.ToPtr(time.Second * 0)}, + ConsolidateAfter: v1.MustParseNillableDuration("0s"), ConsolidationPolicy: v1.ConsolidationPolicyWhenEmpty, // Disrupt away! Budgets: []v1.Budget{{ @@ -97,7 +97,7 @@ var _ = Describe("Emptiness", func() { fakeClock.Step(10 * time.Minute) ExpectSingletonReconciled(ctx, disruptionController) ExpectMetricGaugeValue(disruption.EligibleNodes, 0, map[string]string{ - metrics.ReasonLabel: string(v1.DisruptionReasonEmpty), + metrics.ReasonLabel: "empty", }) ExpectDeleted(ctx, env.Client, pod) @@ -110,7 +110,7 @@ var _ = Describe("Emptiness", func() { wg.Wait() ExpectMetricGaugeValue(disruption.EligibleNodes, 1, map[string]string{ - metrics.ReasonLabel: string(v1.DisruptionReasonEmpty), + metrics.ReasonLabel: "empty", }) }) }) @@ -256,7 +256,7 @@ var _ = Describe("Emptiness", func() { nps := test.NodePools(10, v1.NodePool{ Spec: v1.NodePoolSpec{ Disruption: v1.Disruption{ - ConsolidateAfter: v1.NillableDuration{Duration: lo.ToPtr(time.Second * 30)}, + ConsolidateAfter: v1.MustParseNillableDuration("30s"), ConsolidationPolicy: v1.ConsolidationPolicyWhenEmpty, Budgets: []v1.Budget{{ // 1/2 of 3 nodes == 1.5 nodes. This should round up to 2. @@ -326,7 +326,7 @@ var _ = Describe("Emptiness", func() { nps := test.NodePools(10, v1.NodePool{ Spec: v1.NodePoolSpec{ Disruption: v1.Disruption{ - ConsolidateAfter: v1.NillableDuration{Duration: lo.ToPtr(time.Second * 30)}, + ConsolidateAfter: v1.MustParseNillableDuration("30s"), ConsolidationPolicy: v1.ConsolidationPolicyWhenEmpty, Budgets: []v1.Budget{{ Nodes: "100%", diff --git a/pkg/controllers/disruption/events/events.go b/pkg/controllers/disruption/events/events.go index dea770d5dd..10d017f149 100644 --- a/pkg/controllers/disruption/events/events.go +++ b/pkg/controllers/disruption/events/events.go @@ -90,6 +90,28 @@ func Unconsolidatable(node *corev1.Node, nodeClaim *v1.NodeClaim, reason string) } } +// PriceThreshold is an event that notes the maximum price threshold that a NodeClaim/Node requires to allow consolidation +func PriceThreshold(node *corev1.Node, nodeClaim *v1.NodeClaim, reason string) []events.Event { + return []events.Event{ + { + InvolvedObject: node, + Type: corev1.EventTypeNormal, + Reason: "PriceThreshold", + Message: reason, + DedupeValues: []string{string(node.UID)}, + DedupeTimeout: time.Minute * 15, + }, + { + InvolvedObject: nodeClaim, + Type: corev1.EventTypeNormal, + Reason: "PriceThreshold", + Message: reason, + DedupeValues: []string{string(nodeClaim.UID)}, + DedupeTimeout: time.Minute * 15, + }, + } +} + // Blocked is an event that informs the user that a NodeClaim/Node combination is blocked on deprovisioning // due to the state of the NodeClaim/Node or due to some state of the pods that are scheduled to the NodeClaim/Node func Blocked(node *corev1.Node, nodeClaim *v1.NodeClaim, reason string) (evs []events.Event) { diff --git a/pkg/controllers/disruption/multinodeconsolidation.go b/pkg/controllers/disruption/multinodeconsolidation.go index e61fbbb7b9..3ca3b4aad1 100644 --- a/pkg/controllers/disruption/multinodeconsolidation.go +++ b/pkg/controllers/disruption/multinodeconsolidation.go @@ -43,66 +43,92 @@ func NewMultiNodeConsolidation(consolidation consolidation) *MultiNodeConsolidat return &MultiNodeConsolidation{consolidation: consolidation} } +// nolint:gocyclo func (m *MultiNodeConsolidation) ComputeCommand(ctx context.Context, disruptionBudgetMapping map[string]map[v1.DisruptionReason]int, candidates ...*Candidate) (Command, scheduling.Results, error) { if m.IsConsolidated() { return Command{}, scheduling.Results{}, nil } candidates = m.sortCandidates(candidates) - - // In order, filter out all candidates that would violate the budget. - // Since multi-node consolidation relies on the ordering of - // these candidates, and does computation in batches of these nodes by - // simulateScheduling(nodes[0, n]), doing a binary search on n to find - // the optimal consolidation command, this pre-filters out nodes that - // would have violated the budget anyway, preserving the ordering - // and only considering a number of nodes that can be disrupted. - disruptableCandidates := make([]*Candidate, 0, len(candidates)) - constrainedByBudgets := false + nodePoolsByArch := make(map[string]map[string]int) for _, candidate := range candidates { - // If there's disruptions allowed for the candidate's nodepool, - // add it to the list of candidates, and decrement the budget. - if disruptionBudgetMapping[candidate.nodePool.Name][v1.DisruptionReasonUnderutilized] == 0 { - constrainedByBudgets = true - continue + architectures := nodePoolsByArch[candidate.nodePool.Name] + if architectures == nil { + architectures = make(map[string]int) + nodePoolsByArch[candidate.nodePool.Name] = architectures } - // Filter out empty candidates. If there was an empty node that wasn't consolidated before this, we should - // assume that it was due to budgets. If we don't filter out budgets, users who set a budget for `empty` - // can find their nodes disrupted here. - if len(candidate.reschedulablePods) == 0 { - continue + if architectures[candidate.Node.Status.NodeInfo.Architecture] == 0 { + architectures[candidate.Node.Status.NodeInfo.Architecture] = 1 + } else { + architectures[candidate.Node.Status.NodeInfo.Architecture] += 1 } - // set constrainedByBudgets to true if any node was a candidate but was constrained by a budget - disruptableCandidates = append(disruptableCandidates, candidate) - disruptionBudgetMapping[candidate.nodePool.Name][v1.DisruptionReasonUnderutilized]-- } - // Only consider a maximum batch of 100 NodeClaims to save on computation. - // This could be further configurable in the future. - maxParallel := lo.Clamp(len(disruptableCandidates), 0, 100) + for nodepoolName, architectures := range nodePoolsByArch { + for architecture, nodeCount := range architectures { + + // In order, filter out all candidates that would violate the budget. + // Since multi-node consolidation relies on the ordering of + // these candidates, and does computation in batches of these nodes by + // simulateScheduling(nodes[0, n]), doing a binary search on n to find + // the optimal consolidation command, this pre-filters out nodes that + // would have violated the budget anyway, preserving the ordering + // and only considering a number of nodes that can be disrupted. + disruptableCandidates := make([]*Candidate, 0, nodeCount) + constrainedByBudgets := false + for _, candidate := range candidates { + if candidate.nodePool.Name != nodepoolName || candidate.Node.Status.NodeInfo.Architecture != architecture { + continue + } + // If there's disruptions allowed for the candidate's nodepool, + // add it to the list of candidates, and decrement the budget. + if disruptionBudgetMapping[candidate.nodePool.Name][m.Reason()] == 0 { + constrainedByBudgets = true + continue + } + // Filter out empty candidates. If there was an empty node that wasn't consolidated before this, we should + // assume that it was due to budgets. If we don't filter out budgets, users who set a budget for `empty` + // can find their nodes disrupted here. + if len(candidate.reschedulablePods) == 0 { + continue + } + // set constrainedByBudgets to true if any node was a candidate but was constrained by a budget + disruptableCandidates = append(disruptableCandidates, candidate) + disruptionBudgetMapping[candidate.nodePool.Name][v1.DisruptionReasonUnderutilized]-- + } - cmd, results, err := m.firstNConsolidationOption(ctx, disruptableCandidates, maxParallel) - if err != nil { - return Command{}, scheduling.Results{}, err - } + // Only consider a maximum batch of 100 NodeClaims to save on computation. + // This could be further configurable in the future. + maxParallel := lo.Clamp(len(disruptableCandidates), 0, 100) - if cmd.Decision() == NoOpDecision { - // if there are no candidates because of a budget, don't mark - // as consolidated, as it's possible it should be consolidatable - // the next time we try to disrupt. - if !constrainedByBudgets { - m.markConsolidated() - } - return cmd, scheduling.Results{}, nil - } + cmd, results, err := m.firstNConsolidationOption(ctx, disruptableCandidates, maxParallel) + if err != nil { + return Command{}, scheduling.Results{}, err + } - if err := NewValidation(m.clock, m.cluster, m.kubeClient, m.provisioner, m.cloudProvider, m.recorder, m.queue, v1.DisruptionReasonUnderutilized).IsValid(ctx, cmd, consolidationTTL); err != nil { - if IsValidationError(err) { - log.FromContext(ctx).V(1).Info(fmt.Sprintf("abandoning multi-node consolidation attempt due to pod churn, command is no longer valid, %s", cmd)) - return Command{}, scheduling.Results{}, nil + if cmd.Decision() == NoOpDecision { + // if there are no candidates because of a budget, don't mark + // as consolidated, as it's possible it should be consolidatable + // the next time we try to disrupt. + if !constrainedByBudgets { + m.markConsolidated() + } + log.FromContext(ctx).V(1).Info(fmt.Sprintf("abandoning multi-node consolidation attempt due no candidates (%s/%s) (maxParallel: %d, constrainedByBudgets: %v)", nodepoolName, architecture, maxParallel, constrainedByBudgets)) + continue + } + + if err := NewValidation(m.clock, m.cluster, m.kubeClient, m.provisioner, m.cloudProvider, m.recorder, m.queue, m.Reason()).IsValid(ctx, cmd, consolidationTTL); err != nil { + if IsValidationError(err) { + log.FromContext(ctx).V(1).Info(fmt.Sprintf("abandoning multi-node consolidation attempt due to pod churn, command is no longer valid, %s: %s", cmd, err.Error())) + return Command{}, scheduling.Results{}, nil + } + return Command{}, scheduling.Results{}, fmt.Errorf("validating consolidation, %w", err) + } + log.FromContext(ctx).V(1).Info(fmt.Sprintf("multi-node consolidation cmd success, new nodes: %d, replaced candidates: %d, ", len(cmd.replacements), len(cmd.candidates))) + return cmd, results, nil } - return Command{}, scheduling.Results{}, fmt.Errorf("validating consolidation, %w", err) } - return cmd, results, nil + log.FromContext(ctx).V(1).Info("abandoning multi-node consolidation attempt, no results") + return Command{}, scheduling.Results{}, nil } // firstNConsolidationOption looks at the first N NodeClaims to determine if they can all be consolidated at once. The diff --git a/pkg/controllers/disruption/orchestration/queue.go b/pkg/controllers/disruption/orchestration/queue.go index 886f96e1f0..96dd43f6ce 100644 --- a/pkg/controllers/disruption/orchestration/queue.go +++ b/pkg/controllers/disruption/orchestration/queue.go @@ -226,7 +226,7 @@ func (q *Queue) Reconcile(ctx context.Context) (reconcile.Result, error) { } // If command is complete, remove command from queue. q.Remove(cmd) - log.FromContext(ctx).Info("command succeeded") + log.FromContext(ctx).V(1).Info("command succeeded") return reconcile.Result{RequeueAfter: singleton.RequeueImmediately}, nil } diff --git a/pkg/controllers/disruption/singlenodeconsolidation.go b/pkg/controllers/disruption/singlenodeconsolidation.go index 31a3ec3f08..2837a7d843 100644 --- a/pkg/controllers/disruption/singlenodeconsolidation.go +++ b/pkg/controllers/disruption/singlenodeconsolidation.go @@ -46,7 +46,7 @@ func (s *SingleNodeConsolidation) ComputeCommand(ctx context.Context, disruption } candidates = s.sortCandidates(candidates) - v := NewValidation(s.clock, s.cluster, s.kubeClient, s.provisioner, s.cloudProvider, s.recorder, s.queue, v1.DisruptionReasonUnderutilized) + v := NewValidation(s.clock, s.cluster, s.kubeClient, s.provisioner, s.cloudProvider, s.recorder, s.queue, s.Reason()) // Set a timeout timeout := s.clock.Now().Add(SingleNodeConsolidationTimeoutDuration) @@ -56,7 +56,7 @@ func (s *SingleNodeConsolidation) ComputeCommand(ctx context.Context, disruption // If the disruption budget doesn't allow this candidate to be disrupted, // continue to the next candidate. We don't need to decrement any budget // counter since single node consolidation commands can only have one candidate. - if disruptionBudgetMapping[candidate.nodePool.Name][v1.DisruptionReasonUnderutilized] == 0 { + if disruptionBudgetMapping[candidate.nodePool.Name][s.Reason()] == 0 { constrainedByBudgets = true continue } @@ -82,7 +82,7 @@ func (s *SingleNodeConsolidation) ComputeCommand(ctx context.Context, disruption } if err := v.IsValid(ctx, cmd, consolidationTTL); err != nil { if IsValidationError(err) { - log.FromContext(ctx).V(1).Info(fmt.Sprintf("abandoning single-node consolidation attempt due to pod churn, command is no longer valid, %s", cmd)) + log.FromContext(ctx).V(1).Info(fmt.Sprintf("abandoning single-node consolidation attempt due to pod churn, command is no longer valid, %s: %s", cmd, err.Error())) return Command{}, scheduling.Results{}, nil } return Command{}, scheduling.Results{}, fmt.Errorf("validating consolidation, %w", err) diff --git a/pkg/controllers/disruption/suite_test.go b/pkg/controllers/disruption/suite_test.go index b0bd8b0234..d7bc58c9e3 100644 --- a/pkg/controllers/disruption/suite_test.go +++ b/pkg/controllers/disruption/suite_test.go @@ -162,7 +162,7 @@ var _ = Describe("Simulate Scheduling", func() { nodePool = test.NodePool(v1.NodePool{ Spec: v1.NodePoolSpec{ Disruption: v1.Disruption{ - ConsolidateAfter: v1.NillableDuration{Duration: lo.ToPtr(time.Duration(0))}, + ConsolidateAfter: v1.MustParseNillableDuration("0s"), ConsolidationPolicy: v1.ConsolidationPolicyWhenEmptyOrUnderutilized, }, }, @@ -203,7 +203,7 @@ var _ = Describe("Simulate Scheduling", func() { }, }, }) - nodePool.Spec.Disruption.ConsolidateAfter = v1.NillableDuration{Duration: nil} + nodePool.Spec.Disruption.ConsolidateAfter = v1.MustParseNillableDuration("Never") ExpectApplied(ctx, env.Client, pod) ExpectManualBinding(ctx, env.Client, pod, nodes[0]) @@ -246,7 +246,7 @@ var _ = Describe("Simulate Scheduling", func() { }, }, Spec: v1.NodeClaimSpec{ - ExpireAfter: v1.NillableDuration{Duration: lo.ToPtr(5 * time.Minute)}, + ExpireAfter: v1.MustParseNillableDuration("5m"), }, Status: v1.NodeClaimStatus{ Allocatable: map[corev1.ResourceName]resource.Quantity{ @@ -281,7 +281,7 @@ var _ = Describe("Simulate Scheduling", func() { }, }) - nodePool.Spec.Disruption.ConsolidateAfter = v1.NillableDuration{Duration: nil} + nodePool.Spec.Disruption.ConsolidateAfter = v1.MustParseNillableDuration("Never") nodePool.Spec.Disruption.Budgets = []v1.Budget{{Nodes: "3"}} ExpectApplied(ctx, env.Client, nodePool) @@ -523,7 +523,7 @@ var _ = Describe("Disruption Taints", func() { }, }, }) - nodePool.Spec.Disruption.ConsolidateAfter = v1.NillableDuration{Duration: nil} + nodePool.Spec.Disruption.ConsolidateAfter = v1.MustParseNillableDuration("Never") node.Spec.Taints = append(node.Spec.Taints, v1.DisruptedNoScheduleTaint) ExpectApplied(ctx, env.Client, nodePool, nodeClaim, node, pod) ExpectManualBinding(ctx, env.Client, pod, node) @@ -533,6 +533,7 @@ var _ = Describe("Disruption Taints", func() { ExpectSingletonReconciled(ctx, disruptionController) node = ExpectNodeExists(ctx, env.Client, node.Name) Expect(node.Spec.Taints).ToNot(ContainElement(v1.DisruptedNoScheduleTaint)) + Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeDisruptionCandidate)).To(BeNil()) }) It("should add and remove taints from NodeClaims that fail to disrupt", func() { nodePool.Spec.Disruption.ConsolidationPolicy = v1.ConsolidationPolicyWhenEmptyOrUnderutilized @@ -570,6 +571,11 @@ var _ = Describe("Disruption Taints", func() { node = ExpectNodeExists(ctx, env.Client, node.Name) Expect(node.Spec.Taints).To(ContainElement(v1.DisruptedNoScheduleTaint)) + existingNodeClaim := lo.Filter(ExpectNodeClaims(ctx, env.Client), func(nc *v1.NodeClaim, _ int) bool { + return nc.Name == nodeClaim.Name + }) + Expect(existingNodeClaim[0].StatusConditions().Get(v1.ConditionTypeDisruptionCandidate)).ToNot(BeNil()) + Expect(existingNodeClaim[0].StatusConditions().Get(v1.ConditionTypeDisruptionCandidate).IsTrue()).To(BeTrue()) createdNodeClaim := lo.Reject(ExpectNodeClaims(ctx, env.Client), func(nc *v1.NodeClaim, _ int) bool { return nc.Name == nodeClaim.Name @@ -586,6 +592,7 @@ var _ = Describe("Disruption Taints", func() { node = ExpectNodeExists(ctx, env.Client, node.Name) Expect(node.Spec.Taints).ToNot(ContainElement(v1.DisruptedNoScheduleTaint)) + Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeDisruptionCandidate)).To(BeNil()) }) }) @@ -1752,7 +1759,7 @@ var _ = Describe("Metrics", func() { Spec: v1.NodePoolSpec{ Disruption: v1.Disruption{ ConsolidationPolicy: v1.ConsolidationPolicyWhenEmptyOrUnderutilized, - ConsolidateAfter: v1.NillableDuration{Duration: lo.ToPtr(time.Duration(0))}, + ConsolidateAfter: v1.MustParseNillableDuration("0s"), // Disrupt away! Budgets: []v1.Budget{{ Nodes: "100%", @@ -1793,7 +1800,7 @@ var _ = Describe("Metrics", func() { ExpectMetricCounterValue(disruption.DecisionsPerformedTotal, 1, map[string]string{ "decision": "delete", - metrics.ReasonLabel: string(v1.DisruptionReasonDrifted), + metrics.ReasonLabel: "drifted", }) }) It("should fire metrics for single node delete disruption", func() { @@ -1820,7 +1827,7 @@ var _ = Describe("Metrics", func() { ExpectMetricCounterValue(disruption.DecisionsPerformedTotal, 1, map[string]string{ "decision": "delete", - metrics.ReasonLabel: string(v1.DisruptionReasonDrifted), + metrics.ReasonLabel: "drifted", }) }) It("should fire metrics for single node replace disruption", func() { @@ -1845,7 +1852,7 @@ var _ = Describe("Metrics", func() { ExpectMetricCounterValue(disruption.DecisionsPerformedTotal, 1, map[string]string{ "decision": "replace", - metrics.ReasonLabel: string(v1.DisruptionReasonDrifted), + metrics.ReasonLabel: "drifted", }) }) It("should fire metrics for multi-node empty disruption", func() { @@ -1863,7 +1870,7 @@ var _ = Describe("Metrics", func() { ExpectMetricCounterValue(disruption.DecisionsPerformedTotal, 1, map[string]string{ "decision": "delete", - metrics.ReasonLabel: string(v1.DisruptionReasonEmpty), + metrics.ReasonLabel: "empty", "consolidation_type": "empty", }) }) @@ -1905,7 +1912,7 @@ var _ = Describe("Metrics", func() { ExpectMetricCounterValue(disruption.DecisionsPerformedTotal, 1, map[string]string{ "decision": "delete", - metrics.ReasonLabel: string(v1.DisruptionReasonUnderutilized), + metrics.ReasonLabel: "underutilized", "consolidation_type": "multi", }) }) @@ -1967,7 +1974,7 @@ var _ = Describe("Metrics", func() { ExpectMetricCounterValue(disruption.DecisionsPerformedTotal, 1, map[string]string{ "decision": "replace", - metrics.ReasonLabel: string(v1.DisruptionReasonUnderutilized), + metrics.ReasonLabel: "underutilized", "consolidation_type": "multi", }) }) diff --git a/pkg/controllers/disruption/validation.go b/pkg/controllers/disruption/validation.go index bcf47a996f..242f6d5e6b 100644 --- a/pkg/controllers/disruption/validation.go +++ b/pkg/controllers/disruption/validation.go @@ -23,6 +23,10 @@ import ( "sync" "time" + "sigs.k8s.io/controller-runtime/pkg/log" + + "sigs.k8s.io/karpenter/pkg/utils/pdb" + "k8s.io/utils/clock" "sigs.k8s.io/controller-runtime/pkg/client" @@ -126,7 +130,21 @@ func (v *Validation) ValidateCandidates(ctx context.Context, candidates ...*Cand validatedCandidates = mapCandidates(candidates, validatedCandidates) // If we filtered out any candidates, return nil as some NodeClaims in the consolidation decision have changed. if len(validatedCandidates) != len(candidates) { - return nil, NewValidationError(fmt.Errorf("%d candidates are no longer valid", len(candidates)-len(validatedCandidates))) + nodePoolMap, nodePoolToInstanceTypesMap, err := BuildNodePoolMap(ctx, v.kubeClient, v.cloudProvider) + if err == nil { + pdbs, err := pdb.NewLimits(ctx, v.clock, v.kubeClient) + if err == nil { + for _, candidate := range candidates { + nc, e := NewCandidate(ctx, v.kubeClient, v.recorder, v.clock, candidate.StateNode, pdbs, nodePoolMap, nodePoolToInstanceTypesMap, v.queue, GracefulDisruptionClass) + if e != nil { + log.FromContext(ctx).V(1).Info(fmt.Sprintf("new candidate error: %s", e.Error())) + } else { + log.FromContext(ctx).V(1).Info(fmt.Sprintf("new candidate succeeded: %#v", *nc)) + } + } + } + } + return nil, NewValidationError(fmt.Errorf("%d candidates are no longer valid (%#v - %#v)", len(candidates)-len(validatedCandidates), candidates, validatedCandidates)) } disruptionBudgetMapping, err := BuildDisruptionBudgets(ctx, v.cluster, v.clock, v.kubeClient, v.recorder) if err != nil { @@ -179,18 +197,18 @@ func (v *Validation) ValidateCommand(ctx context.Context, cmd Command, candidate } // if it produced no new NodeClaims, but we were expecting one we should re-simulate as there is likely a better // consolidation option now - return NewValidationError(fmt.Errorf("scheduling simulation produced new results")) + return NewValidationError(fmt.Errorf("scheduling simulation produced new results: no new node claims")) } // we need more than one replacement node which is never valid currently (all of our node replacement is m->1, never m->n) if len(results.NewNodeClaims) > 1 { - return NewValidationError(fmt.Errorf("scheduling simulation produced new results")) + return NewValidationError(fmt.Errorf("scheduling simulation produced new results: more than one new node claim")) } // we now know that scheduling simulation wants to create one new node if len(cmd.replacements) == 0 { // but we weren't expecting any new NodeClaims, so this is invalid - return NewValidationError(fmt.Errorf("scheduling simulation produced new results")) + return NewValidationError(fmt.Errorf("scheduling simulation produced new results: no replacements expected")) } // We know that the scheduling simulation wants to create a new node and that the command we are verifying wants @@ -205,7 +223,7 @@ func (v *Validation) ValidateCommand(ctx context.Context, cmd Command, candidate // now says that we need to launch a 4xlarge. It's still launching the correct number of NodeClaims, but it's just // as expensive or possibly more so we shouldn't validate. if !instanceTypesAreSubset(cmd.replacements[0].InstanceTypeOptions, results.NewNodeClaims[0].InstanceTypeOptions) { - return NewValidationError(fmt.Errorf("scheduling simulation produced new results")) + return NewValidationError(fmt.Errorf("scheduling simulation produced new results: instance already in subset")) } // Now we know: diff --git a/pkg/controllers/node/termination/terminator/events/events.go b/pkg/controllers/node/termination/terminator/events/events.go index d626173d56..0e790978f3 100644 --- a/pkg/controllers/node/termination/terminator/events/events.go +++ b/pkg/controllers/node/termination/terminator/events/events.go @@ -26,12 +26,12 @@ import ( "sigs.k8s.io/karpenter/pkg/events" ) -func EvictPod(pod *corev1.Pod) events.Event { +func EvictPod(pod *corev1.Pod, message string) events.Event { return events.Event{ InvolvedObject: pod, Type: corev1.EventTypeNormal, Reason: "Evicted", - Message: "Evicted pod", + Message: "Evicted pod: " + message, DedupeValues: []string{pod.Name}, } } diff --git a/pkg/controllers/node/termination/terminator/eviction.go b/pkg/controllers/node/termination/terminator/eviction.go index 04a93edf35..c3c197c172 100644 --- a/pkg/controllers/node/termination/terminator/eviction.go +++ b/pkg/controllers/node/termination/terminator/eviction.go @@ -23,6 +23,7 @@ import ( "sync" "time" + v1 "sigs.k8s.io/karpenter/pkg/apis/v1" "sigs.k8s.io/karpenter/pkg/metrics" "github.com/awslabs/operatorpkg/singleton" @@ -70,13 +71,15 @@ func IsNodeDrainError(err error) bool { type QueueKey struct { types.NamespacedName - UID types.UID + UID types.UID + nodeName string } func NewQueueKey(pod *corev1.Pod) QueueKey { return QueueKey{ NamespacedName: client.ObjectKeyFromObject(pod), UID: pod.UID, + nodeName: pod.Spec.NodeName, } } @@ -167,6 +170,10 @@ func (q *Queue) Reconcile(ctx context.Context) (reconcile.Result, error) { // Evict returns true if successful eviction call, and false if there was an eviction-related error func (q *Queue) Evict(ctx context.Context, key QueueKey) bool { ctx = log.IntoContext(ctx, log.FromContext(ctx).WithValues("Pod", klog.KRef(key.Namespace, key.Name))) + evictionMessage, err := evictionReason(ctx, key, q.kubeClient) + if err != nil { + log.FromContext(ctx).Error(err, "failed looking up pod eviction reason") + } if err := q.kubeClient.SubResource("eviction").Create(ctx, &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: key.Namespace, Name: key.Name}}, &policyv1.Eviction{ @@ -195,10 +202,36 @@ func (q *Queue) Evict(ctx context.Context, key QueueKey) bool { log.FromContext(ctx).Error(err, "failed evicting pod") return false } - q.recorder.Publish(terminatorevents.EvictPod(&corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: key.Name, Namespace: key.Namespace}})) + + q.recorder.Publish(terminatorevents.EvictPod(&corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: key.Name, Namespace: key.Namespace}}, evictionMessage)) return true } +func getNodeClaims(ctx context.Context, key QueueKey, kubeClient client.Client) ([]*v1.NodeClaim, error) { + nodeClaimList := &v1.NodeClaimList{} + if err := kubeClient.List(ctx, nodeClaimList, client.MatchingFields{"status.nodeName": key.nodeName}); err != nil { + return nil, fmt.Errorf("listing nodeClaims, %w", err) + } + return lo.ToSlicePtr(nodeClaimList.Items), nil +} + +func evictionReason(ctx context.Context, key QueueKey, kubeClient client.Client) (string, error) { + nodeClaims, err := getNodeClaims(ctx, key, kubeClient) + if err != nil { + return "", err + } + for _, nodeClaim := range nodeClaims { + terminationCondition := nodeClaim.StatusConditions().Get(v1.ConditionTypeDisruptionCandidate) + if terminationCondition.IsTrue() { + return terminationCondition.Message, nil + } + if !nodeClaim.DeletionTimestamp.IsZero() { + return fmt.Sprintf("node %s/%s is marked for deletion", nodeClaim.Name, nodeClaim.Status.NodeName), nil + } + } + return "", nil +} + func (q *Queue) Reset() { q.mu.Lock() defer q.mu.Unlock() diff --git a/pkg/controllers/nodeclaim/disruption/consolidation_test.go b/pkg/controllers/nodeclaim/disruption/consolidation_test.go index f83c6fd955..88ab8522fd 100644 --- a/pkg/controllers/nodeclaim/disruption/consolidation_test.go +++ b/pkg/controllers/nodeclaim/disruption/consolidation_test.go @@ -19,7 +19,6 @@ package disruption_test import ( "time" - "github.com/samber/lo" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -38,7 +37,7 @@ var _ = Describe("Underutilized", func() { BeforeEach(func() { nodePool = test.NodePool() nodePool.Spec.Disruption.ConsolidationPolicy = v1.ConsolidationPolicyWhenEmptyOrUnderutilized - nodePool.Spec.Disruption.ConsolidateAfter = v1.NillableDuration{Duration: lo.ToPtr(1 * time.Minute)} + nodePool.Spec.Disruption.ConsolidateAfter = v1.MustParseNillableDuration("1m") nodeClaim, _ = test.NodeClaimAndNode(v1.NodeClaim{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ @@ -93,7 +92,7 @@ var _ = Describe("Underutilized", func() { Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeConsolidatable)).To(BeNil()) }) It("should remove the status condition from the nodeClaim when consolidateAfter is never", func() { - nodePool.Spec.Disruption.ConsolidateAfter = v1.NillableDuration{} + nodePool.Spec.Disruption.ConsolidateAfter = v1.MustParseNillableDuration("Never") nodeClaim.StatusConditions().SetTrue(v1.ConditionTypeConsolidatable) ExpectApplied(ctx, env.Client, nodePool, nodeClaim) diff --git a/pkg/controllers/nodeclaim/disruption/drift_test.go b/pkg/controllers/nodeclaim/disruption/drift_test.go index 5002f352de..c0ef685f29 100644 --- a/pkg/controllers/nodeclaim/disruption/drift_test.go +++ b/pkg/controllers/nodeclaim/disruption/drift_test.go @@ -382,7 +382,7 @@ var _ = Describe("Drift", func() { Effect: corev1.TaintEffectNoExecute, }, }, - ExpireAfter: v1.NillableDuration{Duration: lo.ToPtr(5 * time.Minute)}, + ExpireAfter: v1.MustParseNillableDuration("5m"), TerminationGracePeriod: &metav1.Duration{Duration: 5 * time.Minute}, }, }, @@ -416,7 +416,7 @@ var _ = Describe("Drift", func() { Entry("NodeClassRef APIVersion", v1.NodePool{Spec: v1.NodePoolSpec{Template: v1.NodeClaimTemplate{Spec: v1.NodeClaimTemplateSpec{NodeClassRef: &v1.NodeClassReference{Group: "testVersion"}}}}}), Entry("NodeClassRef Name", v1.NodePool{Spec: v1.NodePoolSpec{Template: v1.NodeClaimTemplate{Spec: v1.NodeClaimTemplateSpec{NodeClassRef: &v1.NodeClassReference{Name: "testName"}}}}}), Entry("NodeClassRef Kind", v1.NodePool{Spec: v1.NodePoolSpec{Template: v1.NodeClaimTemplate{Spec: v1.NodeClaimTemplateSpec{NodeClassRef: &v1.NodeClassReference{Kind: "testKind"}}}}}), - Entry("ExpireAfter", v1.NodePool{Spec: v1.NodePoolSpec{Template: v1.NodeClaimTemplate{Spec: v1.NodeClaimTemplateSpec{ExpireAfter: v1.NillableDuration{Duration: lo.ToPtr(100 * time.Minute)}}}}}), + Entry("ExpireAfter", v1.NodePool{Spec: v1.NodePoolSpec{Template: v1.NodeClaimTemplate{Spec: v1.NodeClaimTemplateSpec{ExpireAfter: v1.MustParseNillableDuration("100m")}}}}), Entry("TerminationGracePeriod", v1.NodePool{Spec: v1.NodePoolSpec{Template: v1.NodeClaimTemplate{Spec: v1.NodeClaimTemplateSpec{TerminationGracePeriod: &metav1.Duration{Duration: 100 * time.Minute}}}}}), ) It("should not return drifted if karpenter.sh/nodepool-hash annotation is not present on the NodePool", func() { diff --git a/pkg/controllers/nodeclaim/disruption/suite_test.go b/pkg/controllers/nodeclaim/disruption/suite_test.go index af5fe79ea1..a61382b179 100644 --- a/pkg/controllers/nodeclaim/disruption/suite_test.go +++ b/pkg/controllers/nodeclaim/disruption/suite_test.go @@ -25,7 +25,6 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" - "github.com/samber/lo" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" clock "k8s.io/utils/clock/testing" @@ -102,7 +101,7 @@ var _ = Describe("Disruption", func() { It("should set multiple disruption conditions simultaneously", func() { cp.Drifted = "drifted" nodePool.Spec.Disruption.ConsolidationPolicy = v1.ConsolidationPolicyWhenEmpty - nodePool.Spec.Disruption.ConsolidateAfter = v1.NillableDuration{Duration: lo.ToPtr(time.Second * 30)} + nodePool.Spec.Disruption.ConsolidateAfter = v1.MustParseNillableDuration("30s") ExpectApplied(ctx, env.Client, nodePool, nodeClaim, node) ExpectMakeNodeClaimsInitialized(ctx, env.Client, nodeClaim) @@ -116,7 +115,7 @@ var _ = Describe("Disruption", func() { }) It("should remove multiple disruption conditions simultaneously", func() { cp.Drifted = "" - nodePool.Spec.Disruption.ConsolidateAfter = v1.NillableDuration{Duration: nil} + nodePool.Spec.Disruption.ConsolidateAfter = v1.MustParseNillableDuration("Never") nodeClaim.StatusConditions().SetTrue(v1.ConditionTypeDrifted) nodeClaim.StatusConditions().SetTrue(v1.ConditionTypeConsolidatable) diff --git a/pkg/controllers/nodeclaim/expiration/suite_test.go b/pkg/controllers/nodeclaim/expiration/suite_test.go index e84ca9f383..20c823437a 100644 --- a/pkg/controllers/nodeclaim/expiration/suite_test.go +++ b/pkg/controllers/nodeclaim/expiration/suite_test.go @@ -27,7 +27,6 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" - "github.com/samber/lo" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" clock "k8s.io/utils/clock/testing" @@ -91,7 +90,7 @@ var _ = Describe("Expiration", func() { Labels: map[string]string{v1.NodePoolLabelKey: nodePool.Name}, }, Spec: v1.NodeClaimSpec{ - ExpireAfter: v1.NillableDuration{Duration: lo.ToPtr(time.Second * 30)}, + ExpireAfter: v1.MustParseNillableDuration("30s"), }, }) metrics.NodeClaimsDisruptedTotal.Reset() @@ -127,13 +126,13 @@ var _ = Describe("Expiration", func() { }) }) It("should not remove the NodeClaims when expiration is disabled", func() { - nodeClaim.Spec.ExpireAfter.Duration = nil + nodeClaim.Spec.ExpireAfter = v1.MustParseNillableDuration("Never") ExpectApplied(ctx, env.Client, nodeClaim) ExpectObjectReconciled(ctx, env.Client, expirationController, nodeClaim) nodeClaim = ExpectExists(ctx, env.Client, nodeClaim) }) It("should remove nodeclaims that are expired", func() { - nodeClaim.Spec.ExpireAfter.Duration = lo.ToPtr(time.Second * 30) + nodeClaim.Spec.ExpireAfter = v1.MustParseNillableDuration("30s") ExpectApplied(ctx, env.Client, nodeClaim) // step forward to make the node expired @@ -144,13 +143,13 @@ var _ = Describe("Expiration", func() { ExpectNotFound(ctx, env.Client, nodeClaim) }) It("should not remove non-expired NodeClaims", func() { - nodeClaim.Spec.ExpireAfter.Duration = lo.ToPtr(time.Second * 200) + nodeClaim.Spec.ExpireAfter = v1.MustParseNillableDuration("200s") ExpectApplied(ctx, env.Client, nodeClaim) ExpectObjectReconciled(ctx, env.Client, expirationController, nodeClaim) nodeClaim = ExpectExists(ctx, env.Client, nodeClaim) }) It("should delete NodeClaims if the nodeClaim is expired but the node isn't", func() { - nodeClaim.Spec.ExpireAfter.Duration = lo.ToPtr(time.Second * 30) + nodeClaim.Spec.ExpireAfter = v1.MustParseNillableDuration("30s") ExpectApplied(ctx, env.Client, nodeClaim) // step forward to make the node expired @@ -161,7 +160,7 @@ var _ = Describe("Expiration", func() { ExpectNotFound(ctx, env.Client, nodeClaim) }) It("should return the requeue interval for the time between now and when the nodeClaim expires", func() { - nodeClaim.Spec.ExpireAfter.Duration = lo.ToPtr(time.Second * 200) + nodeClaim.Spec.ExpireAfter = v1.MustParseNillableDuration("200s") ExpectApplied(ctx, env.Client, nodeClaim, node) fakeClock.SetTime(nodeClaim.CreationTimestamp.Time.Add(time.Second * 100)) diff --git a/pkg/controllers/nodeclaim/lifecycle/controller.go b/pkg/controllers/nodeclaim/lifecycle/controller.go index 8f6bc73bed..e636dda4ab 100644 --- a/pkg/controllers/nodeclaim/lifecycle/controller.go +++ b/pkg/controllers/nodeclaim/lifecycle/controller.go @@ -141,7 +141,8 @@ func (c *Controller) Register(_ context.Context, m manager.Manager) error { ). WithOptions(controller.Options{ RateLimiter: workqueue.NewMaxOfRateLimiter( - workqueue.NewItemExponentialFailureRateLimiter(time.Second, time.Minute), + // back off until last attempt occurs ~90 seconds before nodeclaim expiration + workqueue.NewItemExponentialFailureRateLimiter(time.Second, 300*time.Second), // 10 qps, 100 bucket size &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)}, ), diff --git a/pkg/controllers/nodepool/hash/suite_test.go b/pkg/controllers/nodepool/hash/suite_test.go index e9b15c193a..10c7a27006 100644 --- a/pkg/controllers/nodepool/hash/suite_test.go +++ b/pkg/controllers/nodepool/hash/suite_test.go @@ -88,7 +88,7 @@ var _ = Describe("Static Drift Hash", func() { Effect: corev1.TaintEffectNoExecute, }, }, - ExpireAfter: v1.NillableDuration{Duration: lo.ToPtr(5 * time.Minute)}, + ExpireAfter: v1.MustParseNillableDuration("5m"), TerminationGracePeriod: &metav1.Duration{Duration: 5 * time.Minute}, }, }, @@ -123,7 +123,7 @@ var _ = Describe("Static Drift Hash", func() { nodePool.Spec.Limits = v1.Limits(corev1.ResourceList{"cpu": resource.MustParse("16")}) nodePool.Spec.Disruption.ConsolidationPolicy = v1.ConsolidationPolicyWhenEmpty - nodePool.Spec.Disruption.ConsolidateAfter = v1.NillableDuration{Duration: lo.ToPtr(30 * time.Second)} + nodePool.Spec.Disruption.ConsolidateAfter = v1.MustParseNillableDuration("30s") nodePool.Spec.Template.Spec.Requirements = []v1.NodeSelectorRequirementWithMinValues{ {NodeSelectorRequirement: corev1.NodeSelectorRequirement{Key: corev1.LabelTopologyZone, Operator: corev1.NodeSelectorOpIn, Values: []string{"test"}}}, {NodeSelectorRequirement: corev1.NodeSelectorRequirement{Key: corev1.LabelTopologyZone, Operator: corev1.NodeSelectorOpGt, Values: []string{"1"}}}, diff --git a/pkg/controllers/nodepool/readiness/controller.go b/pkg/controllers/nodepool/readiness/controller.go index 9ba9f3d3e8..ead9b1bd21 100644 --- a/pkg/controllers/nodepool/readiness/controller.go +++ b/pkg/controllers/nodepool/readiness/controller.go @@ -20,6 +20,9 @@ import ( "context" logger "log" + "github.com/awslabs/operatorpkg/object" + "github.com/samber/lo" + "github.com/awslabs/operatorpkg/status" "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/errors" @@ -62,7 +65,7 @@ func (c *Controller) Reconcile(ctx context.Context, nodePool *v1.NodePool) (reco return reconcile.Result{}, err } if nodeClass == nil { - nodePool.StatusConditions().SetFalse(v1.ConditionTypeNodeClassReady, "UnresolvedNodeClass", "Unable to resolve nodeClass") + nodePool.StatusConditions().SetFalse(v1.ConditionTypeNodeClassReady, "NodeClassNotFound", "NodeClass not found on cluster") } else if !nodeClass.GetDeletionTimestamp().IsZero() { nodePool.StatusConditions().SetFalse(v1.ConditionTypeNodeClassReady, "NodeClassTerminating", "NodeClass is Terminating") } else { @@ -79,7 +82,12 @@ func (c *Controller) Reconcile(ctx context.Context, nodePool *v1.NodePool) (reco return reconcile.Result{}, nil } func (c *Controller) getNodeClass(ctx context.Context, nodePool *v1.NodePool, supportedNC []status.Object) (status.Object, error) { - nodeClass := supportedNC[0] + nodeClass, ok := lo.Find(supportedNC, func(nc status.Object) bool { + return object.GVK(nc).Group == nodePool.Spec.Template.Spec.NodeClassRef.Group && object.GVK(nc).Kind == nodePool.Spec.Template.Spec.NodeClassRef.Kind + }) + if !ok { + return nodeClass, nil + } if err := c.kubeClient.Get(ctx, client.ObjectKey{Name: nodePool.Spec.Template.Spec.NodeClassRef.Name}, nodeClass); err != nil { if errors.IsNotFound(err) { return nil, nil diff --git a/pkg/controllers/nodepool/readiness/suite_test.go b/pkg/controllers/nodepool/readiness/suite_test.go index 300400661d..2fcc0bfdc7 100644 --- a/pkg/controllers/nodepool/readiness/suite_test.go +++ b/pkg/controllers/nodepool/readiness/suite_test.go @@ -21,6 +21,8 @@ import ( "testing" "time" + "github.com/awslabs/operatorpkg/object" + "sigs.k8s.io/karpenter/pkg/test/v1alpha1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -73,6 +75,16 @@ var _ = Describe("Readiness", func() { nodeClass = test.NodeClass(v1alpha1.TestNodeClass{ ObjectMeta: metav1.ObjectMeta{Name: nodePool.Spec.Template.Spec.NodeClassRef.Name}, }) + nodePool.Spec.Template.Spec.NodeClassRef.Group = object.GVK(nodeClass).Group + nodePool.Spec.Template.Spec.NodeClassRef.Kind = object.GVK(nodeClass).Kind + }) + It("should have status condition on nodePool as not ready if nodeClass referenced in nodePool is not in supported nodeClasses", func() { + nodePool.Spec.Template.Spec.NodeClassRef.Group = "group" + nodePool.Spec.Template.Spec.NodeClassRef.Kind = "kind" + ExpectApplied(ctx, env.Client, nodePool, nodeClass) + _ = ExpectObjectReconciled(ctx, env.Client, controller, nodePool) + nodePool = ExpectExists(ctx, env.Client, nodePool) + Expect(nodePool.StatusConditions().Get(status.ConditionReady).IsFalse()).To(BeTrue()) }) It("should have status condition on nodePool as not ready when nodeClass does not exist", func() { ExpectApplied(ctx, env.Client, nodePool) diff --git a/pkg/events/suite_test.go b/pkg/events/suite_test.go index fec8d1462a..076862a690 100644 --- a/pkg/events/suite_test.go +++ b/pkg/events/suite_test.go @@ -88,8 +88,8 @@ var _ = Describe("Event Creation", func() { Expect(internalRecorder.Calls(schedulingevents.NominatePodEvent(PodWithUID(), NodeWithUID(), NodeClaimWithUID()).Reason)).To(Equal(1)) }) It("should create a EvictPod event", func() { - eventRecorder.Publish(terminatorevents.EvictPod(PodWithUID())) - Expect(internalRecorder.Calls(terminatorevents.EvictPod(PodWithUID()).Reason)).To(Equal(1)) + eventRecorder.Publish(terminatorevents.EvictPod(PodWithUID(), "")) + Expect(internalRecorder.Calls(terminatorevents.EvictPod(PodWithUID(), "").Reason)).To(Equal(1)) }) It("should create a PodFailedToSchedule event", func() { eventRecorder.Publish(schedulingevents.PodFailedToScheduleEvent(PodWithUID(), fmt.Errorf(""))) @@ -105,31 +105,31 @@ var _ = Describe("Dedupe", func() { It("should only create a single event when many events are created quickly", func() { pod := PodWithUID() for i := 0; i < 100; i++ { - eventRecorder.Publish(terminatorevents.EvictPod(pod)) + eventRecorder.Publish(terminatorevents.EvictPod(pod, "")) } - Expect(internalRecorder.Calls(terminatorevents.EvictPod(PodWithUID()).Reason)).To(Equal(1)) + Expect(internalRecorder.Calls(terminatorevents.EvictPod(PodWithUID(), "").Reason)).To(Equal(1)) }) It("should allow the dedupe timeout to be overridden", func() { pod := PodWithUID() - evt := terminatorevents.EvictPod(pod) + evt := terminatorevents.EvictPod(pod, "") evt.DedupeTimeout = time.Second * 2 // Generate a set of events within the dedupe timeout for i := 0; i < 10; i++ { eventRecorder.Publish(evt) } - Expect(internalRecorder.Calls(terminatorevents.EvictPod(PodWithUID()).Reason)).To(Equal(1)) + Expect(internalRecorder.Calls(terminatorevents.EvictPod(PodWithUID(), "").Reason)).To(Equal(1)) // Wait until after the overridden dedupe timeout time.Sleep(time.Second * 3) eventRecorder.Publish(evt) - Expect(internalRecorder.Calls(terminatorevents.EvictPod(PodWithUID()).Reason)).To(Equal(2)) + Expect(internalRecorder.Calls(terminatorevents.EvictPod(PodWithUID(), "").Reason)).To(Equal(2)) }) It("should allow events with different entities to be created", func() { for i := 0; i < 100; i++ { - eventRecorder.Publish(terminatorevents.EvictPod(PodWithUID())) + eventRecorder.Publish(terminatorevents.EvictPod(PodWithUID(), "")) } - Expect(internalRecorder.Calls(terminatorevents.EvictPod(PodWithUID()).Reason)).To(Equal(100)) + Expect(internalRecorder.Calls(terminatorevents.EvictPod(PodWithUID(), "").Reason)).To(Equal(100)) }) }) diff --git a/pkg/operator/operator.go b/pkg/operator/operator.go index e238a3d200..a0bc9070dd 100644 --- a/pkg/operator/operator.go +++ b/pkg/operator/operator.go @@ -203,7 +203,10 @@ func NewOperator() (context.Context, *Operator) { }), "failed to setup nodeclaim provider id indexer") lo.Must0(mgr.GetFieldIndexer().IndexField(ctx, &v1.NodeClaim{}, "spec.nodeClassRef.group", func(o client.Object) []string { return []string{o.(*v1.NodeClaim).Spec.NodeClassRef.Group} - }), "failed to setup nodeclaim nodeclassref apiversion indexer") + }), "failed to setup nodeclaim nodeclassref group indexer") + lo.Must0(mgr.GetFieldIndexer().IndexField(ctx, &v1.NodeClaim{}, "status.nodeName", func(o client.Object) []string { + return []string{o.(*v1.NodeClaim).Status.NodeName} + }), "failed to setup nodeclaim nodeName indexer") lo.Must0(mgr.GetFieldIndexer().IndexField(ctx, &v1.NodeClaim{}, "spec.nodeClassRef.kind", func(o client.Object) []string { return []string{o.(*v1.NodeClaim).Spec.NodeClassRef.Kind} }), "failed to setup nodeclaim nodeclassref kind indexer") diff --git a/pkg/test/v1alpha1/crds/karpenter.test.sh_testnodeclasses.yaml b/pkg/test/v1alpha1/crds/karpenter.test.sh_testnodeclasses.yaml index 3907c2cafd..fb3c15c292 100644 --- a/pkg/test/v1alpha1/crds/karpenter.test.sh_testnodeclasses.yaml +++ b/pkg/test/v1alpha1/crds/karpenter.test.sh_testnodeclasses.yaml @@ -3,7 +3,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.15.0 + controller-gen.kubebuilder.io/version: v0.16.2 name: testnodeclasses.karpenter.test.sh spec: group: karpenter.test.sh @@ -84,12 +84,7 @@ spec: - Unknown type: string type: - description: |- - type of condition in CamelCase or in foo.example.com/CamelCase. - --- - Many .condition.type values are consistent across resources like Available, but because arbitrary conditions can be - useful (see .node.status.conditions), the ability to deconflict is important. - The regex it matches is (dns1123SubdomainFmt/)?(qualifiedNameFmt) + description: type of condition in CamelCase or in foo.example.com/CamelCase. maxLength: 316 pattern: ^([a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*/)?(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])$ type: string diff --git a/test/pkg/environment/common/environment.go b/test/pkg/environment/common/environment.go index 70e5646017..ed5077dc05 100644 --- a/test/pkg/environment/common/environment.go +++ b/test/pkg/environment/common/environment.go @@ -162,7 +162,7 @@ func (env *Environment) DefaultNodePool(nodeClass *v1alpha1.KWOKNodeClass) *v1.N nodePool.Spec.Template.Spec.NodeClassRef = &v1.NodeClassReference{ Name: nodeClass.Name, Kind: object.GVK(nodeClass).Kind, - Group: object.GVK(nodeClass).GroupVersion().String(), + Group: object.GVK(nodeClass).Group, } nodePool.Spec.Template.Spec.Requirements = []v1.NodeSelectorRequirementWithMinValues{ { @@ -180,7 +180,7 @@ func (env *Environment) DefaultNodePool(nodeClass *v1alpha1.KWOKNodeClass) *v1.N }, }, } - nodePool.Spec.Disruption.ConsolidateAfter = v1.NillableDuration{} + nodePool.Spec.Disruption.ConsolidateAfter = v1.MustParseNillableDuration("Never") nodePool.Spec.Template.Spec.ExpireAfter.Duration = nil nodePool.Spec.Limits = v1.Limits(corev1.ResourceList{ corev1.ResourceCPU: resource.MustParse("1000"), diff --git a/test/suites/perf/suite_test.go b/test/suites/perf/suite_test.go index 8914e1d226..045ec113de 100644 --- a/test/suites/perf/suite_test.go +++ b/test/suites/perf/suite_test.go @@ -19,7 +19,6 @@ package perf_test import ( "fmt" "testing" - "time" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" @@ -34,7 +33,6 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" - "github.com/samber/lo" ) var nodePool *v1.NodePool @@ -76,7 +74,7 @@ var _ = BeforeEach(func() { nodePool.Spec.Limits = v1.Limits{} nodePool.Spec.Disruption.Budgets = []v1.Budget{{Nodes: "100%"}} // Set expiration to some high value so that there's age-based ordering for consolidation tests - nodePool.Spec.Template.Spec.ExpireAfter = v1.NillableDuration{Duration: lo.ToPtr(30 * time.Hour)} + nodePool.Spec.Template.Spec.ExpireAfter = v1.MustParseNillableDuration("30h") }) var _ = AfterEach(func() {