Skip to content

Commit

Permalink
feat: disruption.terminationGracePeriod
Browse files Browse the repository at this point in the history
  • Loading branch information
wmgroot committed Jan 23, 2024
1 parent 4e85912 commit f4ae490
Show file tree
Hide file tree
Showing 11 changed files with 331 additions and 242 deletions.
110 changes: 23 additions & 87 deletions pkg/apis/crds/karpenter.sh_nodeclaims.yaml

Large diffs are not rendered by default.

177 changes: 37 additions & 140 deletions pkg/apis/crds/karpenter.sh_nodepools.yaml

Large diffs are not rendered by default.

17 changes: 17 additions & 0 deletions pkg/apis/v1beta1/nodepool.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,23 @@ type Disruption struct {
// +kubebuilder:validation:MaxItems=50
// +optional
Budgets []Budget `json:"budgets,omitempty" hash:"ignore"`
// TerminationGracePeriod is the duration the controller will wait before forcefully terminating a node, measured from when deletion is first initiated.
// Once the GracePeriod has expired, all pods on the node will be shutdown using the official non-graceful shutdown taint.
// If a pod would be terminated without being granted its full terminationGracePeriodSeconds prior to the node timeout,
// that pod will be deleted up at T = node timeout - pod terminationGracePeriodSeconds.
//
// Warning: this bypasses any PDB or terminationGracePeriodSeconds value set for a Pod.
// Requires: K8s 1.26 or higher: https://kubernetes.io/docs/concepts/architecture/nodes/#non-graceful-node-shutdown
//
// This field is intended to be used by cluster administrators to enforce that nodes can be cycled within a given time period.
// It can also be used to allow maximum time limits for long-running jobs which can delay node termination with preStop hooks.
// If left undefined, the controller will wait indefinitely for pods to be drained.
//
// +kubebuilder:validation:Pattern=`^(([0-9]+(s|m|h))+)$`
// +kubebuilder:validation:Type="string"
// +kubebuilder:validation:Schemaless
// +optional
TerminationGracePeriod *metav1.Duration `json:"terminationGracePeriod"`
}

// Budget defines when Karpenter will restrict the
Expand Down
12 changes: 12 additions & 0 deletions pkg/apis/v1beta1/taints.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ import v1 "k8s.io/api/core/v1"
const (
DisruptionTaintKey = Group + "/disruption"
DisruptingNoScheduleTaintValue = "disrupting"

DisruptionNonGracefulShutdownKey = "node.kubernetes.io/out-of-service"
DisruptionNonGracefulShutdownValue = "nodeshutdown"
)

var (
Expand All @@ -32,6 +35,15 @@ var (
Effect: v1.TaintEffectNoSchedule,
Value: DisruptingNoScheduleTaintValue,
}

// DisruptionNonGracefulShutdown is used by the deprovisioning controller to forcefully
// shut down a node. This does not respect graceful termination of any pods on the node.
// https://kubernetes.io/docs/concepts/architecture/nodes/#non-graceful-node-shutdown
DisruptionNonGracefulShutdown = v1.Taint{
Key: DisruptionNonGracefulShutdownKey,
Value: DisruptionNonGracefulShutdownValue,
Effect: v1.TaintEffectNoExecute,
}
)

func IsDisruptingTaint(taint v1.Taint) bool {
Expand Down
5 changes: 5 additions & 0 deletions pkg/apis/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion pkg/controllers/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func NewControllers(

p := provisioning.NewProvisioner(kubeClient, kubernetesInterface.CoreV1(), recorder, cloudProvider, cluster)
evictionQueue := terminator.NewQueue(kubernetesInterface.CoreV1(), recorder)
deletionQueue := terminator.NewDeletionQueue(kubernetesInterface.CoreV1(), recorder)
disruptionQueue := orchestration.NewQueue(kubeClient, recorder, cluster, clock, p)

return []controller.Controller{
Expand All @@ -67,7 +68,7 @@ func NewControllers(
informer.NewPodController(kubeClient, cluster),
informer.NewNodePoolController(kubeClient, cluster),
informer.NewNodeClaimController(kubeClient, cluster),
termination.NewController(kubeClient, cloudProvider, terminator.NewTerminator(clock, kubeClient, evictionQueue), recorder),
termination.NewController(kubeClient, cloudProvider, terminator.NewTerminator(clock, kubeClient, evictionQueue, deletionQueue), recorder),
metricspod.NewController(kubeClient),
metricsnodepool.NewController(kubeClient),
metricsnode.NewController(cluster),
Expand Down
64 changes: 61 additions & 3 deletions pkg/controllers/node/termination/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"golang.org/x/time/rate"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/workqueue"
"knative.dev/pkg/logging"
controllerruntime "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -77,13 +78,24 @@ func (c *Controller) Finalize(ctx context.Context, node *v1.Node) (reconcile.Res
if !controllerutil.ContainsFinalizer(node, v1beta1.TerminationFinalizer) {
return reconcile.Result{}, nil
}

nodeGracePeriodExpirationTime, err := c.terminationGracePeriodExpirationTime(ctx, node)
if err != nil {
return reconcile.Result{}, fmt.Errorf("couldn't evaulate node's terminationGracePeriod, %w", err)
} else if nodeGracePeriodExpirationTime != nil && time.Now().After(*nodeGracePeriodExpirationTime) {
if err := c.terminator.Taint(ctx, node, v1beta1.DisruptionNonGracefulShutdown); err != nil {
logging.FromContext(ctx).Infof("node's terminationGracePeriod has expired, adding non-graceful shutdown taint: %v", node.Name)
return reconcile.Result{}, fmt.Errorf("error while tainting node with node.kubernetes.io/out-of-service=nodeshutdown:NoExecute, %w", err)
}
}

if err := c.deleteAllNodeClaims(ctx, node); err != nil {
return reconcile.Result{}, fmt.Errorf("deleting nodeclaims, %w", err)
}
if err := c.terminator.Taint(ctx, node); err != nil {
return reconcile.Result{}, fmt.Errorf("tainting node, %w", err)
if err := c.terminator.Taint(ctx, node, v1beta1.DisruptionNoScheduleTaint); err != nil {
return reconcile.Result{}, fmt.Errorf("tainting node with karpenter.sh/disruption taint, %w", err)
}
if err := c.terminator.Drain(ctx, node); err != nil {
if err := c.terminator.Drain(ctx, node, nodeGracePeriodExpirationTime); err != nil {
if !terminator.IsNodeDrainError(err) {
return reconcile.Result{}, fmt.Errorf("draining node, %w", err)
}
Expand Down Expand Up @@ -138,6 +150,52 @@ func (c *Controller) removeFinalizer(ctx context.Context, n *v1.Node) error {
return nil
}

func (c *Controller) terminationGracePeriodExpirationTime(ctx context.Context, node *v1.Node) (*time.Time, error) {

nodeClaim := &v1beta1.NodeClaim{}

if len(node.OwnerReferences) == 0 {
logging.FromContext(ctx).Errorf("node has no owner, could not find NodeClaim for Node: %v", node.Name)
return nil, nil
}

// assume the only Node ownerRef is the NodeClaim
nodeClaimName := types.NamespacedName{
Name: node.OwnerReferences[0].Name,
}
if err := c.kubeClient.Get(ctx, nodeClaimName, nodeClaim); err != nil {
logging.FromContext(ctx).Errorf("could not find NodeClaim for Node: %v", node.Name)
return nil, err
}

nodePool := &v1beta1.NodePool{}
// assume the only NodeClaim ownerRef is the NodePool
nodePoolName := types.NamespacedName{
Name: nodeClaim.OwnerReferences[0].Name,
}
if err := c.kubeClient.Get(ctx, nodePoolName, nodePool); err != nil {
logging.FromContext(ctx).Errorf("could not find NodePool for NodeClaim: %v", nodeClaim.Name)
return nil, err
}

if nodePool.Spec.Disruption.TerminationGracePeriod != nil {
expirationTime := node.DeletionTimestamp.Time.Add(nodePool.Spec.Disruption.TerminationGracePeriod.Duration)
logging.FromContext(ctx).Infof("node %v will be forcefully terminated at %v (terminationGracePeriod=%v)", node.Name, expirationTime, nodePool.Spec.Disruption.TerminationGracePeriod)
return &expirationTime, nil
}

// TODO: remove, holding this here in case it's decided that NodePool.spec.template.spec.terminationGracePeriod is a preferred implementation
// if nodeClaim.Spec.TerminationGracePeriod != nil {
// if node.DeletionTimestamp.Time.Add(nodeClaim.Spec.TerminationGracePeriod.Duration).Before(time.Now()) {
// return true, nil
// } else {
// logging.FromContext(ctx).Infof("pods on node %v will be forcefully evicted at %v (terminationGracePeriod=%v)", node.Name, node.DeletionTimestamp.Time.Add(nodeClaim.Spec.TerminationGracePeriod.Duration), nodeClaim.Spec.TerminationGracePeriod)
// }
// }

return nil, nil
}

func (c *Controller) Builder(_ context.Context, m manager.Manager) operatorcontroller.Builder {
return operatorcontroller.Adapt(controllerruntime.
NewControllerManagedBy(m).
Expand Down
4 changes: 3 additions & 1 deletion pkg/controllers/node/termination/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ var fakeClock *clock.FakeClock
var cloudProvider *fake.CloudProvider
var recorder *test.EventRecorder
var queue *terminator.Queue
var deletionQueue *terminator.DeletionQueue

func TestAPIs(t *testing.T) {
ctx = TestContextWithLogger(t)
Expand All @@ -70,7 +71,8 @@ var _ = BeforeSuite(func() {
cloudProvider = fake.NewCloudProvider()
recorder = test.NewEventRecorder()
queue = terminator.NewQueue(env.KubernetesInterface.CoreV1(), recorder)
terminationController = termination.NewController(env.Client, cloudProvider, terminator.NewTerminator(fakeClock, env.Client, queue), recorder)
deletionQueue = terminator.NewDeletionQueue(env.KubernetesInterface.CoreV1(), recorder)
terminationController = termination.NewController(env.Client, cloudProvider, terminator.NewTerminator(fakeClock, env.Client, queue, deletionQueue), recorder)
})

var _ = AfterSuite(func() {
Expand Down
125 changes: 125 additions & 0 deletions pkg/controllers/node/termination/terminator/deletion.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package terminator

import (
"context"
"fmt"
"time"

set "github.com/deckarep/golang-set"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/util/workqueue"
"knative.dev/pkg/logging"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

terminatorevents "sigs.k8s.io/karpenter/pkg/controllers/node/termination/terminator/events"
"sigs.k8s.io/karpenter/pkg/operator/controller"

"sigs.k8s.io/karpenter/pkg/events"
)

const (
deletionQueueBaseDelay = 100 * time.Millisecond

Check failure on line 43 in pkg/controllers/node/termination/terminator/deletion.go

View workflow job for this annotation

GitHub Actions / presubmit (1.23.x)

const `deletionQueueBaseDelay` is unused (unused)

Check failure on line 43 in pkg/controllers/node/termination/terminator/deletion.go

View workflow job for this annotation

GitHub Actions / presubmit (1.24.x)

const `deletionQueueBaseDelay` is unused (unused)

Check failure on line 43 in pkg/controllers/node/termination/terminator/deletion.go

View workflow job for this annotation

GitHub Actions / presubmit (1.25.x)

const `deletionQueueBaseDelay` is unused (unused)

Check failure on line 43 in pkg/controllers/node/termination/terminator/deletion.go

View workflow job for this annotation

GitHub Actions / presubmit (1.26.x)

const `deletionQueueBaseDelay` is unused (unused)

Check failure on line 43 in pkg/controllers/node/termination/terminator/deletion.go

View workflow job for this annotation

GitHub Actions / presubmit (1.27.x)

const `deletionQueueBaseDelay` is unused (unused)

Check failure on line 43 in pkg/controllers/node/termination/terminator/deletion.go

View workflow job for this annotation

GitHub Actions / presubmit (1.28.x)

const `deletionQueueBaseDelay` is unused (unused)

Check failure on line 43 in pkg/controllers/node/termination/terminator/deletion.go

View workflow job for this annotation

GitHub Actions / presubmit (1.29.x)

const `deletionQueueBaseDelay` is unused (unused)
deletionQueueMaxDelay = 10 * time.Second

Check failure on line 44 in pkg/controllers/node/termination/terminator/deletion.go

View workflow job for this annotation

GitHub Actions / presubmit (1.23.x)

const `deletionQueueMaxDelay` is unused (unused)

Check failure on line 44 in pkg/controllers/node/termination/terminator/deletion.go

View workflow job for this annotation

GitHub Actions / presubmit (1.24.x)

const `deletionQueueMaxDelay` is unused (unused)

Check failure on line 44 in pkg/controllers/node/termination/terminator/deletion.go

View workflow job for this annotation

GitHub Actions / presubmit (1.25.x)

const `deletionQueueMaxDelay` is unused (unused)

Check failure on line 44 in pkg/controllers/node/termination/terminator/deletion.go

View workflow job for this annotation

GitHub Actions / presubmit (1.26.x)

const `deletionQueueMaxDelay` is unused (unused)

Check failure on line 44 in pkg/controllers/node/termination/terminator/deletion.go

View workflow job for this annotation

GitHub Actions / presubmit (1.27.x)

const `deletionQueueMaxDelay` is unused (unused)

Check failure on line 44 in pkg/controllers/node/termination/terminator/deletion.go

View workflow job for this annotation

GitHub Actions / presubmit (1.28.x)

const `deletionQueueMaxDelay` is unused (unused)

Check failure on line 44 in pkg/controllers/node/termination/terminator/deletion.go

View workflow job for this annotation

GitHub Actions / presubmit (1.29.x)

const `deletionQueueMaxDelay` is unused (unused)
)

type DeletionQueue struct {
workqueue.RateLimitingInterface
set.Set

coreV1Client corev1.CoreV1Interface
recorder events.Recorder
}

func NewDeletionQueue(coreV1Client corev1.CoreV1Interface, recorder events.Recorder) *DeletionQueue {
queue := &DeletionQueue{
RateLimitingInterface: workqueue.NewRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(evictionQueueBaseDelay, evictionQueueMaxDelay)),
Set: set.NewSet(),
coreV1Client: coreV1Client,
recorder: recorder,
}
return queue
}

func (q *DeletionQueue) Name() string {
return "deletion-queue"
}

func (q *DeletionQueue) Builder(_ context.Context, m manager.Manager) controller.Builder {
return controller.NewSingletonManagedBy(m)
}

// Add adds pods to the Queue
func (q *DeletionQueue) Add(pods ...*v1.Pod) {
for _, pod := range pods {
if nn := client.ObjectKeyFromObject(pod); !q.Set.Contains(nn) {
q.Set.Add(nn)
q.RateLimitingInterface.Add(nn)
}
}
}

func (q *DeletionQueue) Reconcile(ctx context.Context, _ reconcile.Request) (reconcile.Result, error) {
// Check if the queue is empty. client-go recommends not using this function to gate the subsequent
// get call, but since we're popping items off the queue synchronously, there should be no synchonization
// issues.
if q.Len() == 0 {
return reconcile.Result{RequeueAfter: 1 * time.Second}, nil
}
// Get pod from queue. This waits until queue is non-empty.
item, shutdown := q.RateLimitingInterface.Get()
if shutdown {
return reconcile.Result{}, fmt.Errorf("EvictionQueue is broken and has shutdown")
}
nn := item.(types.NamespacedName)
defer q.RateLimitingInterface.Done(nn)

if q.Delete(ctx, nn) {
q.RateLimitingInterface.Forget(nn)
q.Set.Remove(nn)
return reconcile.Result{RequeueAfter: controller.Immediately}, nil
}
// Requeue pod if delete failed
q.RateLimitingInterface.AddRateLimited(nn)
return reconcile.Result{RequeueAfter: controller.Immediately}, nil
}

// Delete returns true if successful delete call, and false if there was an error
func (q *DeletionQueue) Delete(ctx context.Context, nn types.NamespacedName) bool {
ctx = logging.WithLogger(ctx, logging.FromContext(ctx).With("pod", nn))
if err := q.coreV1Client.Pods(nn.Namespace).Delete(ctx, nn.Name, metav1.DeleteOptions{}); err != nil {
if apierrors.IsNotFound(err) { // 404
return true
}
logging.FromContext(ctx).Errorf("deleting pod, %s", err)
return false
}
q.recorder.Publish(terminatorevents.DeletePod(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: nn.Name, Namespace: nn.Namespace}}))
return true
}

func (q *DeletionQueue) Reset() {
q.RateLimitingInterface = workqueue.NewRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(evictionQueueBaseDelay, evictionQueueMaxDelay))
q.Set = set.NewSet()
}
10 changes: 10 additions & 0 deletions pkg/controllers/node/termination/terminator/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,16 @@ func EvictPod(pod *v1.Pod) events.Event {
}
}

func DeletePod(pod *v1.Pod) events.Event {
return events.Event{
InvolvedObject: pod,
Type: v1.EventTypeNormal,
Reason: "Deleted",
Message: "Deleted pod",
DedupeValues: []string{pod.Name},
}
}

func NodeFailedToDrain(node *v1.Node, err error) events.Event {
return events.Event{
InvolvedObject: node,
Expand Down
Loading

0 comments on commit f4ae490

Please sign in to comment.