Skip to content

Commit

Permalink
feat: Node Repair implementation (#1793)
Browse files Browse the repository at this point in the history
  • Loading branch information
engedaam authored Nov 18, 2024
1 parent 6a036cb commit 8ce869c
Show file tree
Hide file tree
Showing 11 changed files with 497 additions and 6 deletions.
4 changes: 4 additions & 0 deletions kwok/cloudprovider/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,10 @@ func (c CloudProvider) GetSupportedNodeClasses() []status.Object {
return []status.Object{&v1alpha1.KWOKNodeClass{}}
}

func (c *CloudProvider) RepairPolicies() []cloudprovider.RepairPolicy {
return []cloudprovider.RepairPolicy{}
}

func (c CloudProvider) getInstanceType(instanceTypeName string) (*cloudprovider.InstanceType, error) {
it, found := lo.Find(c.instanceTypes, func(it *cloudprovider.InstanceType) bool {
return it.Name == instanceTypeName
Expand Down
1 change: 1 addition & 0 deletions kwok/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ func main() {
cloudProvider := kwok.NewCloudProvider(ctx, op.GetClient(), instanceTypes)
op.
WithControllers(ctx, controllers.NewControllers(
ctx,
op.Manager,
op.Clock,
op.GetClient(),
Expand Down
13 changes: 13 additions & 0 deletions pkg/cloudprovider/fake/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"math"
"sort"
"sync"
"time"

"github.com/awslabs/operatorpkg/status"
"github.com/samber/lo"
Expand Down Expand Up @@ -59,6 +60,7 @@ type CloudProvider struct {
CreatedNodeClaims map[string]*v1.NodeClaim
Drifted cloudprovider.DriftReason
NodeClassGroupVersionKind []schema.GroupVersionKind
RepairPolicy []cloudprovider.RepairPolicy
}

func NewCloudProvider() *CloudProvider {
Expand Down Expand Up @@ -93,6 +95,13 @@ func (c *CloudProvider) Reset() {
Kind: "",
},
}
c.RepairPolicy = []cloudprovider.RepairPolicy{
{
ConditionType: "BadNode",
ConditionStatus: corev1.ConditionFalse,
TolerationDuration: 30 * time.Minute,
},
}
}

func (c *CloudProvider) Create(ctx context.Context, nodeClaim *v1.NodeClaim) (*v1.NodeClaim, error) {
Expand Down Expand Up @@ -262,6 +271,10 @@ func (c *CloudProvider) IsDrifted(context.Context, *v1.NodeClaim) (cloudprovider
return c.Drifted, nil
}

func (c *CloudProvider) RepairPolicies() []cloudprovider.RepairPolicy {
return c.RepairPolicy
}

// Name returns the CloudProvider implementation name.
func (c *CloudProvider) Name() string {
return "fake"
Expand Down
14 changes: 14 additions & 0 deletions pkg/cloudprovider/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"math"
"sort"
"sync"
"time"

"github.com/awslabs/operatorpkg/status"
"github.com/samber/lo"
Expand All @@ -41,6 +42,16 @@ var (

type DriftReason string

type RepairPolicy struct {
// ConditionType of unhealthy state that is found on the node
ConditionType corev1.NodeConditionType
// ConditionStatus condition when a node is unhealthy
ConditionStatus corev1.ConditionStatus
// TolerationDuration is the duration the controller will wait
// before force terminating nodes that are unhealthy.
TolerationDuration time.Duration
}

// CloudProvider interface is implemented by cloud providers to support provisioning.
type CloudProvider interface {
// Create launches a NodeClaim with the given resource requests and requirements and returns a hydrated
Expand All @@ -63,6 +74,9 @@ type CloudProvider interface {
// IsDrifted returns whether a NodeClaim has drifted from the provisioning requirements
// it is tied to.
IsDrifted(context.Context, *v1.NodeClaim) (DriftReason, error)
// RepairPolicy is for CloudProviders to define a set Unhealthy condition for Karpenter
// to monitor on the node.
RepairPolicies() []RepairPolicy
// Name returns the CloudProvider implementation name.
Name() string
// GetSupportedNodeClasses returns CloudProvider NodeClass that implements status.Object
Expand Down
15 changes: 14 additions & 1 deletion pkg/controllers/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package controllers

import (
"context"

"github.com/awslabs/operatorpkg/controller"
"github.com/awslabs/operatorpkg/status"
"k8s.io/utils/clock"
Expand All @@ -32,6 +34,7 @@ import (
metricsnode "sigs.k8s.io/karpenter/pkg/controllers/metrics/node"
metricsnodepool "sigs.k8s.io/karpenter/pkg/controllers/metrics/nodepool"
metricspod "sigs.k8s.io/karpenter/pkg/controllers/metrics/pod"
"sigs.k8s.io/karpenter/pkg/controllers/node/health"
"sigs.k8s.io/karpenter/pkg/controllers/node/termination"
"sigs.k8s.io/karpenter/pkg/controllers/node/termination/terminator"
nodeclaimconsistency "sigs.k8s.io/karpenter/pkg/controllers/nodeclaim/consistency"
Expand All @@ -48,9 +51,11 @@ import (
"sigs.k8s.io/karpenter/pkg/controllers/state"
"sigs.k8s.io/karpenter/pkg/controllers/state/informer"
"sigs.k8s.io/karpenter/pkg/events"
"sigs.k8s.io/karpenter/pkg/operator/options"
)

func NewControllers(
ctx context.Context,
mgr manager.Manager,
clock clock.Clock,
kubeClient client.Client,
Expand All @@ -63,7 +68,7 @@ func NewControllers(
evictionQueue := terminator.NewQueue(kubeClient, recorder)
disruptionQueue := orchestration.NewQueue(kubeClient, recorder, cluster, clock, p)

return []controller.Controller{
controllers := []controller.Controller{
p, evictionQueue, disruptionQueue,
disruption.NewController(clock, kubeClient, p, cloudProvider, recorder, cluster, disruptionQueue),
provisioning.NewPodController(kubeClient, p),
Expand All @@ -90,5 +95,13 @@ func NewControllers(
status.NewController[*v1.NodeClaim](kubeClient, mgr.GetEventRecorderFor("karpenter"), status.EmitDeprecatedMetrics),
status.NewController[*v1.NodePool](kubeClient, mgr.GetEventRecorderFor("karpenter"), status.EmitDeprecatedMetrics),
status.NewGenericObjectController[*corev1.Node](kubeClient, mgr.GetEventRecorderFor("karpenter")),
health.NewController(kubeClient, cloudProvider, clock),
}

// The cloud provider must define status conditions for the node repair controller to use to detect unhealthy nodes
if len(cloudProvider.RepairPolicies()) != 0 && options.FromContext(ctx).FeatureGates.NodeRepair {
controllers = append(controllers, health.NewController(kubeClient, cloudProvider, clock))
}

return controllers
}
132 changes: 132 additions & 0 deletions pkg/controllers/node/health/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package health

import (
"context"
"time"

"github.com/samber/lo"
corev1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"
"k8s.io/utils/clock"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

v1 "sigs.k8s.io/karpenter/pkg/apis/v1"
"sigs.k8s.io/karpenter/pkg/cloudprovider"
"sigs.k8s.io/karpenter/pkg/metrics"
"sigs.k8s.io/karpenter/pkg/operator/injection"
nodeutils "sigs.k8s.io/karpenter/pkg/utils/node"
)

// Controller for the resource
type Controller struct {
clock clock.Clock
kubeClient client.Client
cloudProvider cloudprovider.CloudProvider
}

// NewController constructs a controller instance
func NewController(kubeClient client.Client, cloudProvider cloudprovider.CloudProvider, clock clock.Clock) *Controller {
return &Controller{
clock: clock,
kubeClient: kubeClient,
cloudProvider: cloudProvider,
}
}

func (c *Controller) Register(_ context.Context, m manager.Manager) error {
return controllerruntime.NewControllerManagedBy(m).
Named("node.health").
For(&corev1.Node{}).
Complete(reconcile.AsReconciler(m.GetClient(), c))
}

func (c *Controller) Reconcile(ctx context.Context, node *corev1.Node) (reconcile.Result, error) {
ctx = injection.WithControllerName(ctx, "node.health")
ctx = log.IntoContext(ctx, log.FromContext(ctx).WithValues("Node", klog.KRef(node.Namespace, node.Name)))

// Validate that the node is owned by us
nodeClaim, err := nodeutils.NodeClaimForNode(ctx, c.kubeClient, node)
if err != nil {
return reconcile.Result{}, nodeutils.IgnoreNodeClaimNotFoundError(err)
}

unhealthyNodeCondition, policyTerminationDuration := c.findUnhealthyConditions(node)
if unhealthyNodeCondition == nil {
return reconcile.Result{}, nil
}

// If the Node is unhealthy, but has not reached it's full toleration disruption
// requeue at the termination time of the unhealthy node
terminationTime := unhealthyNodeCondition.LastTransitionTime.Add(policyTerminationDuration)
if c.clock.Now().Before(terminationTime) {
return reconcile.Result{RequeueAfter: terminationTime.Sub(c.clock.Now())}, nil
}

// For unhealthy past the tolerationDisruption window we can forcefully terminate the node
if err := c.annotateTerminationGracePeriod(ctx, nodeClaim); err != nil {
return reconcile.Result{}, client.IgnoreNotFound(err)
}
if err := c.kubeClient.Delete(ctx, nodeClaim); err != nil {
return reconcile.Result{}, client.IgnoreNotFound(err)
}

// The deletion timestamp has successfully been set for the Node, update relevant metrics.
log.FromContext(ctx).V(1).Info("deleting unhealthy node")
metrics.NodeClaimsDisruptedTotal.Inc(map[string]string{
metrics.ReasonLabel: string(unhealthyNodeCondition.Type),
metrics.NodePoolLabel: node.Labels[v1.NodePoolLabelKey],
metrics.CapacityTypeLabel: node.Labels[v1.CapacityTypeLabelKey],
})
return reconcile.Result{}, nil
}

// Find a node with a condition that matches one of the unhealthy conditions defined by the cloud provider
// If there are multiple unhealthy status condition we will requeue based on the condition closest to its terminationDuration
func (c *Controller) findUnhealthyConditions(node *corev1.Node) (nc *corev1.NodeCondition, cpTerminationDuration time.Duration) {
requeueTime := time.Time{}
for _, policy := range c.cloudProvider.RepairPolicies() {
// check the status and the type on the condition
nodeCondition := nodeutils.GetCondition(node, policy.ConditionType)
if nodeCondition.Status == policy.ConditionStatus {
terminationTime := nodeCondition.LastTransitionTime.Add(policy.TolerationDuration)
// Determine requeue time
if requeueTime.IsZero() || requeueTime.After(terminationTime) {
nc = lo.ToPtr(nodeCondition)
cpTerminationDuration = policy.TolerationDuration
requeueTime = terminationTime
}
}
}
return nc, cpTerminationDuration
}

func (c *Controller) annotateTerminationGracePeriod(ctx context.Context, nodeClaim *v1.NodeClaim) error {
stored := nodeClaim.DeepCopy()
nodeClaim.ObjectMeta.Annotations = lo.Assign(nodeClaim.ObjectMeta.Annotations, map[string]string{v1.NodeClaimTerminationTimestampAnnotationKey: c.clock.Now().Format(time.RFC3339)})

if err := c.kubeClient.Patch(ctx, nodeClaim, client.MergeFrom(stored)); err != nil {
return err
}

return nil
}
Loading

0 comments on commit 8ce869c

Please sign in to comment.