Skip to content

Commit

Permalink
feat: disruption.terminationGracePeriod
Browse files Browse the repository at this point in the history
  • Loading branch information
wmgroot committed Jan 30, 2024
1 parent eb8040d commit ff8ed69
Show file tree
Hide file tree
Showing 13 changed files with 284 additions and 261 deletions.
8 changes: 4 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ HELM_OPTS ?= --set logLevel=debug \
--set controller.resources.requests.cpu=1 \
--set controller.resources.requests.memory=1Gi \
--set controller.resources.limits.cpu=1 \
--set controller.resources.limits.memory=1Gi
--set controller.resources.limits.memory=1Gi

help: ## Display help
@awk 'BEGIN {FS = ":.*##"; printf "Usage:\n make \033[36m<target>\033[0m\n"} /^[a-zA-Z_0-9-]+:.*?##/ { printf " \033[36m%-15s\033[0m %s\n", $$1, $$2 } /^##@/ { printf "\n\033[1m%s\033[0m\n", substr($$0, 5) } ' $(MAKEFILE_LIST)
Expand All @@ -23,7 +23,7 @@ build: ## Build the Karpenter KWOK controller images using ko build
$(eval IMG_REPOSITORY=$(shell echo $(CONTROLLER_IMG) | cut -d "@" -f 1 | cut -d ":" -f 1))
$(eval IMG_TAG=$(shell echo $(CONTROLLER_IMG) | cut -d "@" -f 1 | cut -d ":" -f 2 -s))
$(eval IMG_DIGEST=$(shell echo $(CONTROLLER_IMG) | cut -d "@" -f 2))


# Run make install-kwok to install the kwok controller in your cluster first
# Webhooks are currently not supported in the kwok provider.
Expand All @@ -40,11 +40,11 @@ apply: verify build ## Deploy the kwok controller from the current state of your

delete: ## Delete the controller from your ~/.kube/config cluster
helm uninstall karpenter --namespace ${KARPENTER_NAMESPACE}

test: ## Run tests
go test ./... \
-race \
-timeout 10m \
-timeout 1m \
--ginkgo.focus="${FOCUS}" \
--ginkgo.timeout=10m \
--ginkgo.v \
Expand Down
114 changes: 27 additions & 87 deletions pkg/apis/crds/karpenter.sh_nodeclaims.yaml

Large diffs are not rendered by default.

177 changes: 37 additions & 140 deletions pkg/apis/crds/karpenter.sh_nodepools.yaml

Large diffs are not rendered by default.

17 changes: 17 additions & 0 deletions pkg/apis/v1beta1/nodeclaim.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,23 @@ type NodeClaimSpec struct {
// NodeClassRef is a reference to an object that defines provider specific configuration
// +required
NodeClassRef *NodeClassReference `json:"nodeClassRef"`
// TerminationGracePeriod is the duration the controller will wait before forcefully terminating a node, measured from when deletion is first initiated.
// Once the GracePeriod has expired, all pods on the node will be shutdown using the official non-graceful shutdown taint.
// If a pod would be terminated without being granted its full terminationGracePeriodSeconds prior to the node timeout,
// that pod will be deleted up at T = node timeout - pod terminationGracePeriodSeconds.
//
// Warning: this bypasses any PDB or terminationGracePeriodSeconds value set for a Pod.
// Requires: K8s 1.26 or higher: https://kubernetes.io/docs/concepts/architecture/nodes/#non-graceful-node-shutdown
//
// This field is intended to be used by cluster administrators to enforce that nodes can be cycled within a given time period.
// It can also be used to allow maximum time limits for long-running jobs which can delay node termination with preStop hooks.
// If left undefined, the controller will wait indefinitely for pods to be drained.
//
// +kubebuilder:validation:Pattern=`^(([0-9]+(s|m|h))+)$`
// +kubebuilder:validation:Type="string"
// +kubebuilder:validation:Schemaless
// +optional
TerminationGracePeriod *metav1.Duration `json:"terminationGracePeriod"`
}

// ResourceRequirements models the required resources for the NodeClaim to launch
Expand Down
12 changes: 12 additions & 0 deletions pkg/apis/v1beta1/taints.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ import v1 "k8s.io/api/core/v1"
const (
DisruptionTaintKey = Group + "/disruption"
DisruptingNoScheduleTaintValue = "disrupting"

DisruptionNonGracefulShutdownKey = "node.kubernetes.io/out-of-service"
DisruptionNonGracefulShutdownValue = "nodeshutdown"
)

var (
Expand All @@ -32,6 +35,15 @@ var (
Effect: v1.TaintEffectNoSchedule,
Value: DisruptingNoScheduleTaintValue,
}

// DisruptionNonGracefulShutdown is used by the deprovisioning controller to forcefully
// shut down a node. This does not respect graceful termination of any pods on the node.
// https://kubernetes.io/docs/concepts/architecture/nodes/#non-graceful-node-shutdown
DisruptionNonGracefulShutdown = v1.Taint{
Key: DisruptionNonGracefulShutdownKey,
Value: DisruptionNonGracefulShutdownValue,
Effect: v1.TaintEffectNoExecute,
}
)

func IsDisruptingTaint(taint v1.Taint) bool {
Expand Down
5 changes: 5 additions & 0 deletions pkg/apis/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion pkg/controllers/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func NewControllers(

p := provisioning.NewProvisioner(kubeClient, kubernetesInterface.CoreV1(), recorder, cloudProvider, cluster)
evictionQueue := terminator.NewQueue(kubernetesInterface.CoreV1(), recorder)
deletionQueue := terminator.NewDeletionQueue(kubernetesInterface.CoreV1(), recorder)
disruptionQueue := orchestration.NewQueue(kubeClient, recorder, cluster, clock, p)

return []controller.Controller{
Expand All @@ -67,7 +68,7 @@ func NewControllers(
informer.NewPodController(kubeClient, cluster),
informer.NewNodePoolController(kubeClient, cluster),
informer.NewNodeClaimController(kubeClient, cluster),
termination.NewController(kubeClient, cloudProvider, terminator.NewTerminator(clock, kubeClient, evictionQueue), recorder),
termination.NewController(kubeClient, cloudProvider, terminator.NewTerminator(clock, kubeClient, evictionQueue, deletionQueue), recorder),
metricspod.NewController(kubeClient),
metricsnodepool.NewController(kubeClient),
metricsnode.NewController(cluster),
Expand Down
65 changes: 62 additions & 3 deletions pkg/controllers/node/termination/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"golang.org/x/time/rate"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/workqueue"
"knative.dev/pkg/logging"
controllerruntime "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -77,13 +78,24 @@ func (c *Controller) Finalize(ctx context.Context, node *v1.Node) (reconcile.Res
if !controllerutil.ContainsFinalizer(node, v1beta1.TerminationFinalizer) {
return reconcile.Result{}, nil
}

nodeGracePeriodExpirationTime, err := c.terminationGracePeriodExpirationTime(ctx, node)
if err != nil {
return reconcile.Result{}, fmt.Errorf("couldn't evaulate node's terminationGracePeriod, %w", err)
} else if nodeGracePeriodExpirationTime != nil && time.Now().After(*nodeGracePeriodExpirationTime) {
if err := c.terminator.Taint(ctx, node, v1beta1.DisruptionNonGracefulShutdown); err != nil {
logging.FromContext(ctx).Infof("node's terminationGracePeriod has expired, adding non-graceful shutdown taint: %v", node.Name)
return reconcile.Result{}, fmt.Errorf("error while tainting node with node.kubernetes.io/out-of-service=nodeshutdown:NoExecute, %w", err)
}
}

if err := c.deleteAllNodeClaims(ctx, node); err != nil {
return reconcile.Result{}, fmt.Errorf("deleting nodeclaims, %w", err)
}
if err := c.terminator.Taint(ctx, node); err != nil {
return reconcile.Result{}, fmt.Errorf("tainting node, %w", err)
if err := c.terminator.Taint(ctx, node, v1beta1.DisruptionNoScheduleTaint); err != nil {
return reconcile.Result{}, fmt.Errorf("tainting node with karpenter.sh/disruption taint, %w", err)
}
if err := c.terminator.Drain(ctx, node); err != nil {
if err := c.terminator.Drain(ctx, node, nodeGracePeriodExpirationTime); err != nil {
if !terminator.IsNodeDrainError(err) {
return reconcile.Result{}, fmt.Errorf("draining node, %w", err)
}
Expand Down Expand Up @@ -138,6 +150,53 @@ func (c *Controller) removeFinalizer(ctx context.Context, n *v1.Node) error {
return nil
}

func (c *Controller) terminationGracePeriodExpirationTime(ctx context.Context, node *v1.Node) (*time.Time, error) {

nodeClaim := &v1beta1.NodeClaim{}

if len(node.OwnerReferences) == 0 {
logging.FromContext(ctx).Errorf("node has no owner, could not find NodeClaim for Node: %v", node.Name)
return nil, nil
}

// assume the only Node ownerRef is the NodeClaim
nodeClaimName := types.NamespacedName{
Name: node.OwnerReferences[0].Name,
}
if err := c.kubeClient.Get(ctx, nodeClaimName, nodeClaim); err != nil {
logging.FromContext(ctx).Errorf("could not find NodeClaim for Node: %v", node.Name)
return nil, err
}

// TODO: remove, holding this here in case we switch the preferred implementation
// nodePool := &v1beta1.NodePool{}
// // assume the only NodeClaim ownerRef is the NodePool
// nodePoolName := types.NamespacedName{
// Name: nodeClaim.OwnerReferences[0].Name,
// }
// if err := c.kubeClient.Get(ctx, nodePoolName, nodePool); err != nil {
// logging.FromContext(ctx).Errorf("could not find NodePool for NodeClaim: %v", nodeClaim.Name)
// return nil, err
// }

// if nodePool.Spec.Disruption.TerminationGracePeriod != nil {
// expirationTime := node.DeletionTimestamp.Time.Add(nodePool.Spec.Disruption.TerminationGracePeriod.Duration)
// c.recorder.Publish(terminatorevents.NodeTerminationGracePeriod(node, expirationTime, fmt.Sprintf("%s", nodePool.Spec.Disruption.TerminationGracePeriod)))
// // logging.FromContext(ctx).Infof("node %v will be forcefully terminated at %v (terminationGracePeriod=%v)", node.Name, expirationTime, nodePool.Spec.Disruption.TerminationGracePeriod)
// return &expirationTime, nil
// }

if nodeClaim.Spec.TerminationGracePeriod != nil {
expirationTime := node.DeletionTimestamp.Time.Add(nodeClaim.Spec.TerminationGracePeriod.Duration)
c.recorder.Publish(terminatorevents.NodeTerminationGracePeriod(node, expirationTime, fmt.Sprintf("%s", nodeClaim.Spec.TerminationGracePeriod)))
if node.DeletionTimestamp.Time.Add(nodeClaim.Spec.TerminationGracePeriod.Duration).Before(time.Now()) {
return &expirationTime, nil
}
}

return nil, nil
}

func (c *Controller) Builder(_ context.Context, m manager.Manager) operatorcontroller.Builder {
return operatorcontroller.Adapt(controllerruntime.
NewControllerManagedBy(m).
Expand Down
4 changes: 3 additions & 1 deletion pkg/controllers/node/termination/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ var fakeClock *clock.FakeClock
var cloudProvider *fake.CloudProvider
var recorder *test.EventRecorder
var queue *terminator.Queue
var deletionQueue *terminator.DeletionQueue

func TestAPIs(t *testing.T) {
ctx = TestContextWithLogger(t)
Expand All @@ -70,7 +71,8 @@ var _ = BeforeSuite(func() {
cloudProvider = fake.NewCloudProvider()
recorder = test.NewEventRecorder()
queue = terminator.NewQueue(env.KubernetesInterface.CoreV1(), recorder)
terminationController = termination.NewController(env.Client, cloudProvider, terminator.NewTerminator(fakeClock, env.Client, queue), recorder)
deletionQueue = terminator.NewDeletionQueue(env.KubernetesInterface.CoreV1(), recorder)
terminationController = termination.NewController(env.Client, cloudProvider, terminator.NewTerminator(fakeClock, env.Client, queue, deletionQueue), recorder)
})

var _ = AfterSuite(func() {
Expand Down
21 changes: 21 additions & 0 deletions pkg/controllers/node/termination/terminator/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package events

import (
"fmt"
"time"

v1 "k8s.io/api/core/v1"

Expand All @@ -34,6 +35,16 @@ func EvictPod(pod *v1.Pod) events.Event {
}
}

func DeletePod(pod *v1.Pod) events.Event {
return events.Event{
InvolvedObject: pod,
Type: v1.EventTypeNormal,
Reason: "Deleted",
Message: fmt.Sprintf("Deleted pod regardless of PDBs and lifecycle hooks, %v seconds before node termination to accomodate it's terminationGracePeriodSeconds", pod.Spec.TerminationGracePeriodSeconds),
DedupeValues: []string{pod.Name},
}
}

func NodeFailedToDrain(node *v1.Node, err error) events.Event {
return events.Event{
InvolvedObject: node,
Expand All @@ -43,3 +54,13 @@ func NodeFailedToDrain(node *v1.Node, err error) events.Event {
DedupeValues: []string{node.Name},
}
}

func NodeTerminationGracePeriod(node *v1.Node, expirationTime time.Time, terminationGracePeriod string) events.Event {
return events.Event{
InvolvedObject: node,
Type: v1.EventTypeWarning,
Reason: "TerminationGracePeriodExpiration",
Message: fmt.Sprintf("Node will have the out-of-service taint applied at: %s (TerminationGracePeriod: %s)", expirationTime, terminationGracePeriod),
DedupeValues: []string{node.Name},
}
}
52 changes: 41 additions & 11 deletions pkg/controllers/node/termination/terminator/eviction.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,22 @@ func IsNodeDrainError(err error) bool {
type Queue struct {
workqueue.RateLimitingInterface
set.Set
deleteSet set.Set

coreV1Client corev1.CoreV1Interface
recorder events.Recorder
}

type PodAction struct {
action string
pod types.NamespacedName
}

func NewQueue(coreV1Client corev1.CoreV1Interface, recorder events.Recorder) *Queue {
queue := &Queue{
RateLimitingInterface: workqueue.NewRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(evictionQueueBaseDelay, evictionQueueMaxDelay)),
Set: set.NewSet(),
deleteSet: set.NewSet(),
coreV1Client: coreV1Client,
recorder: recorder,
}
Expand All @@ -89,11 +96,11 @@ func (q *Queue) Builder(_ context.Context, m manager.Manager) controller.Builder
}

// Add adds pods to the Queue
func (q *Queue) Add(pods ...*v1.Pod) {
func (q *Queue) Add(action string, pods ...*v1.Pod) {
for _, pod := range pods {
if nn := client.ObjectKeyFromObject(pod); !q.Set.Contains(nn) {
q.Set.Add(nn)
q.RateLimitingInterface.Add(nn)
q.Set.Add(PodAction{action: action, pod: nn})
q.RateLimitingInterface.Add(PodAction{action: action, pod: nn})
}
}
}
Expand All @@ -110,16 +117,25 @@ func (q *Queue) Reconcile(ctx context.Context, _ reconcile.Request) (reconcile.R
if shutdown {
return reconcile.Result{}, fmt.Errorf("EvictionQueue is broken and has shutdown")
}
nn := item.(types.NamespacedName)
defer q.RateLimitingInterface.Done(nn)
// Evict pod
if q.Evict(ctx, nn) {
q.RateLimitingInterface.Forget(nn)
q.Set.Remove(nn)
return reconcile.Result{RequeueAfter: controller.Immediately}, nil
podAction := item.(PodAction)
defer q.RateLimitingInterface.Done(podAction)
if podAction.action == "evict" {
// Evict pod
if q.Evict(ctx, podAction.pod) {
q.RateLimitingInterface.Forget(podAction)
q.Set.Remove(podAction)
return reconcile.Result{RequeueAfter: controller.Immediately}, nil
}
} else if podAction.action == "delete" {
// Delete pod
if q.Delete(ctx, podAction.pod) {
q.RateLimitingInterface.Forget(podAction)
q.Set.Remove(podAction)
return reconcile.Result{RequeueAfter: controller.Immediately}, nil
}
}
// Requeue pod if eviction failed
q.RateLimitingInterface.AddRateLimited(nn)
q.RateLimitingInterface.AddRateLimited(podAction)
return reconcile.Result{RequeueAfter: controller.Immediately}, nil
}

Expand Down Expand Up @@ -148,6 +164,20 @@ func (q *Queue) Evict(ctx context.Context, nn types.NamespacedName) bool {
return true
}

// Delete returns true if successful delete call, and false if there was an error
func (q *Queue) Delete(ctx context.Context, nn types.NamespacedName) bool {
ctx = logging.WithLogger(ctx, logging.FromContext(ctx).With("pod", nn))
if err := q.coreV1Client.Pods(nn.Namespace).Delete(ctx, nn.Name, metav1.DeleteOptions{}); err != nil {
if apierrors.IsNotFound(err) { // 404
return true
}
logging.FromContext(ctx).Errorf("deleting pod, %s", err)
return false
}
q.recorder.Publish(terminatorevents.DeletePod(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: nn.Name, Namespace: nn.Namespace}}))
return true
}

func (q *Queue) Reset() {
q.RateLimitingInterface = workqueue.NewRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(evictionQueueBaseDelay, evictionQueueMaxDelay))
q.Set = set.NewSet()
Expand Down
8 changes: 8 additions & 0 deletions pkg/controllers/node/termination/terminator/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,4 +123,12 @@ var _ = Describe("Eviction/Queue", func() {
Expect(queue.Evict(ctx, types.NamespacedName{Name: pod.Name, Namespace: pod.Namespace})).To(BeFalse())
})
})

Context("Pod Deletion API", func() {
It("should succeed with no event when the pod is not found", func() {
ExpectApplied(ctx, env.Client)
Expect(queue.Delete(ctx, types.NamespacedName{Name: pod.Name, Namespace: pod.Namespace})).To(BeTrue())
Expect(recorder.Events()).To(HaveLen(0))
})
})
})
Loading

0 comments on commit ff8ed69

Please sign in to comment.