From d390d6c9ecd5afac3ae2fbfaaebe057ac7cec344 Mon Sep 17 00:00:00 2001 From: leo-ryu Date: Wed, 8 Jan 2025 17:26:52 +0800 Subject: [PATCH 1/5] handle pod volume requirements independently --- pkg/controllers/provisioning/provisioner.go | 12 ++++---- .../provisioning/scheduling/existingnode.go | 9 +++++- .../provisioning/scheduling/nodeclaim.go | 9 +++++- .../provisioning/scheduling/scheduler.go | 10 +++++-- .../provisioning/scheduling/topology.go | 26 ++++++++++------- .../provisioning/scheduling/volumetopology.go | 29 ++++--------------- 6 files changed, 50 insertions(+), 45 deletions(-) diff --git a/pkg/controllers/provisioning/provisioner.go b/pkg/controllers/provisioning/provisioner.go index 83e59b18cb..df71a4cc43 100644 --- a/pkg/controllers/provisioning/provisioner.go +++ b/pkg/controllers/provisioning/provisioner.go @@ -285,9 +285,11 @@ func (p *Provisioner) NewScheduler(ctx context.Context, pods []*corev1.Pod, stat // inject topology constraints pods = p.injectVolumeTopologyRequirements(ctx, pods) + // Link volume requirements to pods + podsVolumeRequirements := p.convertToPodVolumeRequirements(ctx, pods) // Calculate cluster topology - topology, err := scheduler.NewTopology(ctx, p.kubeClient, p.cluster, domains, pods) + topology, err := scheduler.NewTopology(ctx, p.kubeClient, p.cluster, domains, podsVolumeRequirements) if err != nil { return nil, fmt.Errorf("tracking topology counts, %w", err) } @@ -454,13 +456,13 @@ func validateKarpenterManagedLabelCanExist(p *corev1.Pod) error { return nil } -func (p *Provisioner) injectVolumeTopologyRequirements(ctx context.Context, pods []*corev1.Pod) []*corev1.Pod { - var schedulablePods []*corev1.Pod +func (p *Provisioner) convertToPodVolumeRequirements(ctx context.Context, pods []*corev1.Pod) map[*corev1.Pod][]corev1.NodeSelectorRequirement { + var schedulablePods = make(map[*corev1.Pod][]corev1.NodeSelectorRequirement) for _, pod := range pods { - if err := p.volumeTopology.Inject(ctx, pod); err != nil { + if err, requirements := p.volumeTopology.GetVolumeRequirements(ctx, pod); err != nil { log.FromContext(ctx).WithValues("Pod", klog.KRef(pod.Namespace, pod.Name)).Error(err, "failed getting volume topology requirements") } else { - schedulablePods = append(schedulablePods, pod) + schedulablePods[pod] = requirements } } return schedulablePods diff --git a/pkg/controllers/provisioning/scheduling/existingnode.go b/pkg/controllers/provisioning/scheduling/existingnode.go index 2deba7ca27..5aa3b1b24c 100644 --- a/pkg/controllers/provisioning/scheduling/existingnode.go +++ b/pkg/controllers/provisioning/scheduling/existingnode.go @@ -65,7 +65,7 @@ func NewExistingNode(n *state.StateNode, topology *Topology, taints []v1.Taint, return node } -func (n *ExistingNode) Add(ctx context.Context, kubeClient client.Client, pod *v1.Pod, podRequests v1.ResourceList) error { +func (n *ExistingNode) Add(ctx context.Context, kubeClient client.Client, pod *v1.Pod, podRequests v1.ResourceList, volumeRequirements []v1.NodeSelectorRequirement) error { // Check Taints if err := scheduling.Taints(n.cachedTaints).Tolerates(pod); err != nil { return err @@ -117,6 +117,13 @@ func (n *ExistingNode) Add(ctx context.Context, kubeClient client.Client, pod *v } nodeRequirements.Add(topologyRequirements.Values()...) + podVolumeRequirements := scheduling.NewNodeSelectorRequirements(volumeRequirements...) + // Check Pod Volume Requirements + if err = nodeRequirements.Compatible(podVolumeRequirements); err != nil { + return err + } + nodeRequirements.Add(podVolumeRequirements.Values()...) + // Update node n.Pods = append(n.Pods, pod) n.requests = requests diff --git a/pkg/controllers/provisioning/scheduling/nodeclaim.go b/pkg/controllers/provisioning/scheduling/nodeclaim.go index db79322b36..4fdd0c1f7c 100644 --- a/pkg/controllers/provisioning/scheduling/nodeclaim.go +++ b/pkg/controllers/provisioning/scheduling/nodeclaim.go @@ -64,7 +64,7 @@ func NewNodeClaim(nodeClaimTemplate *NodeClaimTemplate, topology *Topology, daem } } -func (n *NodeClaim) Add(pod *v1.Pod, podRequests v1.ResourceList) error { +func (n *NodeClaim) Add(pod *v1.Pod, podRequests v1.ResourceList, volumeRequirements []v1.NodeSelectorRequirement) error { // Check Taints if err := scheduling.Taints(n.Spec.Taints).Tolerates(pod); err != nil { return err @@ -100,6 +100,13 @@ func (n *NodeClaim) Add(pod *v1.Pod, podRequests v1.ResourceList) error { } nodeClaimRequirements.Add(topologyRequirements.Values()...) + podVolumeRequirements := scheduling.NewNodeSelectorRequirements(volumeRequirements...) + // Check Pod Volume Requirements + if err = nodeClaimRequirements.Compatible(podVolumeRequirements, scheduling.AllowUndefinedWellKnownLabels); err != nil { + return err + } + nodeClaimRequirements.Add(podVolumeRequirements.Values()...) + // Check instance type combinations requests := resources.Merge(n.Spec.Resources.Requests, podRequests) diff --git a/pkg/controllers/provisioning/scheduling/scheduler.go b/pkg/controllers/provisioning/scheduling/scheduler.go index 96acea71d7..9afe9957af 100644 --- a/pkg/controllers/provisioning/scheduling/scheduler.go +++ b/pkg/controllers/provisioning/scheduling/scheduler.go @@ -266,9 +266,13 @@ func (s *Scheduler) Solve(ctx context.Context, pods []*corev1.Pod) Results { } func (s *Scheduler) add(ctx context.Context, pod *corev1.Pod) error { + var volumeRequirements []corev1.NodeSelectorRequirement + if _, ok := s.topology.podVolumeRequirements[pod]; ok { + volumeRequirements = s.topology.podVolumeRequirements[pod] + } // first try to schedule against an in-flight real node for _, node := range s.existingNodes { - if err := node.Add(ctx, s.kubeClient, pod, s.cachedPodRequests[pod.UID]); err == nil { + if err := node.Add(ctx, s.kubeClient, pod, s.cachedPodRequests[pod.UID], volumeRequirements); err == nil { return nil } } @@ -278,7 +282,7 @@ func (s *Scheduler) add(ctx context.Context, pod *corev1.Pod) error { // Pick existing node that we are about to create for _, nodeClaim := range s.newNodeClaims { - if err := nodeClaim.Add(pod, s.cachedPodRequests[pod.UID]); err == nil { + if err := nodeClaim.Add(pod, s.cachedPodRequests[pod.UID], volumeRequirements); err == nil { return nil } } @@ -299,7 +303,7 @@ func (s *Scheduler) add(ctx context.Context, pod *corev1.Pod) error { } } nodeClaim := NewNodeClaim(nodeClaimTemplate, s.topology, s.daemonOverhead[nodeClaimTemplate], instanceTypes) - if err := nodeClaim.Add(pod, s.cachedPodRequests[pod.UID]); err != nil { + if err := nodeClaim.Add(pod, s.cachedPodRequests[pod.UID], volumeRequirements); err != nil { nodeClaim.Destroy() // Ensure we cleanup any changes that we made while mocking out a NodeClaim errs = multierr.Append(errs, fmt.Errorf("incompatible with nodepool %q, daemonset overhead=%s, %w", nodeClaimTemplate.NodePoolName, diff --git a/pkg/controllers/provisioning/scheduling/topology.go b/pkg/controllers/provisioning/scheduling/topology.go index 06bf4022dc..9956eedd65 100644 --- a/pkg/controllers/provisioning/scheduling/topology.go +++ b/pkg/controllers/provisioning/scheduling/topology.go @@ -54,28 +54,32 @@ type Topology struct { // excludedPods are the pod UIDs of pods that are excluded from counting. This is used so we can simulate // moving pods to prevent them from being double counted. excludedPods sets.Set[string] - cluster *state.Cluster + // podVolumeRequirements links volume requirements to pods. This is used so we + // can track the volume requirements in simulate scheduler + podVolumeRequirements map[*corev1.Pod][]corev1.NodeSelectorRequirement + cluster *state.Cluster } -func NewTopology(ctx context.Context, kubeClient client.Client, cluster *state.Cluster, domains map[string]sets.Set[string], pods []*corev1.Pod) (*Topology, error) { +func NewTopology(ctx context.Context, kubeClient client.Client, cluster *state.Cluster, domains map[string]sets.Set[string], podsVolumeRequirements map[*corev1.Pod][]corev1.NodeSelectorRequirement) (*Topology, error) { t := &Topology{ - kubeClient: kubeClient, - cluster: cluster, - domains: domains, - topologies: map[uint64]*TopologyGroup{}, - inverseTopologies: map[uint64]*TopologyGroup{}, - excludedPods: sets.New[string](), + kubeClient: kubeClient, + cluster: cluster, + domains: domains, + topologies: map[uint64]*TopologyGroup{}, + inverseTopologies: map[uint64]*TopologyGroup{}, + excludedPods: sets.New[string](), + podVolumeRequirements: podsVolumeRequirements, } // these are the pods that we intend to schedule, so if they are currently in the cluster we shouldn't count them for // topology purposes - for _, p := range pods { + for p := range podsVolumeRequirements { t.excludedPods.Insert(string(p.UID)) } errs := t.updateInverseAffinities(ctx) - for i := range pods { - errs = multierr.Append(errs, t.Update(ctx, pods[i])) + for p := range podsVolumeRequirements { + errs = multierr.Append(errs, t.Update(ctx, p)) } if errs != nil { return nil, errs diff --git a/pkg/controllers/provisioning/scheduling/volumetopology.go b/pkg/controllers/provisioning/scheduling/volumetopology.go index a4dc0be961..6d0411e6d6 100644 --- a/pkg/controllers/provisioning/scheduling/volumetopology.go +++ b/pkg/controllers/provisioning/scheduling/volumetopology.go @@ -39,42 +39,23 @@ type VolumeTopology struct { kubeClient client.Client } -func (v *VolumeTopology) Inject(ctx context.Context, pod *v1.Pod) error { +func (v *VolumeTopology) GetVolumeRequirements(ctx context.Context, pod *v1.Pod) (error, []v1.NodeSelectorRequirement) { var requirements []v1.NodeSelectorRequirement for _, volume := range pod.Spec.Volumes { req, err := v.getRequirements(ctx, pod, volume) if err != nil { - return err + return err, nil } requirements = append(requirements, req...) } if len(requirements) == 0 { - return nil - } - if pod.Spec.Affinity == nil { - pod.Spec.Affinity = &v1.Affinity{} - } - if pod.Spec.Affinity.NodeAffinity == nil { - pod.Spec.Affinity.NodeAffinity = &v1.NodeAffinity{} - } - if pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution == nil { - pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution = &v1.NodeSelector{} - } - if len(pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms) == 0 { - pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms = []v1.NodeSelectorTerm{{}} - } - - // We add our volume topology zonal requirement to every node selector term. This causes it to be AND'd with every existing - // requirement so that relaxation won't remove our volume requirement. - for i := 0; i < len(pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms); i++ { - pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[i].MatchExpressions = append( - pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[i].MatchExpressions, requirements...) + return nil, requirements } log.FromContext(ctx). WithValues("Pod", klog.KRef(pod.Namespace, pod.Name)). - V(1).Info(fmt.Sprintf("adding requirements derived from pod volumes, %s", requirements)) - return nil + V(1).Info(fmt.Sprintf("found requirements from pod volumes, %s", requirements)) + return nil, requirements } func (v *VolumeTopology) getRequirements(ctx context.Context, pod *v1.Pod, volume v1.Volume) ([]v1.NodeSelectorRequirement, error) { From b172f77fd5721fcb5c69560af94e4d1a17cde8e8 Mon Sep 17 00:00:00 2001 From: leo-ryu Date: Wed, 8 Jan 2025 17:27:34 +0800 Subject: [PATCH 2/5] add existing nodes' domains --- pkg/controllers/provisioning/provisioner.go | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/pkg/controllers/provisioning/provisioner.go b/pkg/controllers/provisioning/provisioner.go index df71a4cc43..78f667106e 100644 --- a/pkg/controllers/provisioning/provisioner.go +++ b/pkg/controllers/provisioning/provisioner.go @@ -283,8 +283,20 @@ func (p *Provisioner) NewScheduler(ctx context.Context, pods []*corev1.Pod, stat } } - // inject topology constraints - pods = p.injectVolumeTopologyRequirements(ctx, pods) + // Add Existing Nodes' Domains + for _, n := range stateNodes { + if n.Node != nil { + requirements := scheduling.NewLabelRequirements(n.Node.Labels) + for key, requirement := range requirements { + if domains[key] == nil { + domains[key] = sets.New(requirement.Values()...) + } else { + domains[key].Insert(requirement.Values()...) + } + } + } + } + // Link volume requirements to pods podsVolumeRequirements := p.convertToPodVolumeRequirements(ctx, pods) From 31a62eabacb2d6cd15ee0899f2600983ad841df8 Mon Sep 17 00:00:00 2001 From: leo-ryu Date: Wed, 8 Jan 2025 17:32:05 +0800 Subject: [PATCH 3/5] support multiple domains in scheduling --- .../provisioning/scheduling/topologygroup.go | 20 ++++++++----------- 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/pkg/controllers/provisioning/scheduling/topologygroup.go b/pkg/controllers/provisioning/scheduling/topologygroup.go index 9dd5a9bea7..fe7376b850 100644 --- a/pkg/controllers/provisioning/scheduling/topologygroup.go +++ b/pkg/controllers/provisioning/scheduling/topologygroup.go @@ -175,7 +175,7 @@ func (t *TopologyGroup) Hash() uint64 { } // nextDomainTopologySpread returns a scheduling.Requirement that includes a node domain that a pod should be scheduled to. -// If there are multiple eligible domains, we return any random domain that satisfies the `maxSkew` configuration. +// If there are multiple eligible domains, we return all eligible domains that satisfies the `maxSkew` configuration. // If there are no eligible domains, we return a `DoesNotExist` requirement, implying that we could not satisfy the topologySpread requirement. // nolint:gocyclo func (t *TopologyGroup) nextDomainTopologySpread(pod *v1.Pod, podDomains, nodeDomains *scheduling.Requirement) *scheduling.Requirement { @@ -183,9 +183,7 @@ func (t *TopologyGroup) nextDomainTopologySpread(pod *v1.Pod, podDomains, nodeDo min := t.domainMinCount(podDomains) selfSelecting := t.selects(pod) - minDomain := "" - minCount := int32(math.MaxInt32) - + candidateDomains := []string{} // If we are explicitly selecting on specific node domains ("In" requirement), // this is going to be more efficient to iterate through // This is particularly useful when considering the hostname topology key that can have a @@ -196,9 +194,8 @@ func (t *TopologyGroup) nextDomainTopologySpread(pod *v1.Pod, podDomains, nodeDo if selfSelecting { count++ } - if count-min <= t.maxSkew && count < minCount { - minDomain = domain - minCount = count + if count-min <= t.maxSkew { + candidateDomains = append(candidateDomains, domain) } } } @@ -212,18 +209,17 @@ func (t *TopologyGroup) nextDomainTopologySpread(pod *v1.Pod, podDomains, nodeDo if selfSelecting { count++ } - if count-min <= t.maxSkew && count < minCount { - minDomain = domain - minCount = count + if count-min <= t.maxSkew { + candidateDomains = append(candidateDomains, domain) } } } } - if minDomain == "" { + if len(candidateDomains) == 0 { // avoids an error message about 'zone in [""]', preferring 'zone in []' return scheduling.NewRequirement(podDomains.Key, v1.NodeSelectorOpDoesNotExist) } - return scheduling.NewRequirement(podDomains.Key, v1.NodeSelectorOpIn, minDomain) + return scheduling.NewRequirement(podDomains.Key, v1.NodeSelectorOpIn, candidateDomains...) } func (t *TopologyGroup) domainMinCount(domains *scheduling.Requirement) int32 { From 420766fdf249c9a586ccf5263466a42d478b9fa7 Mon Sep 17 00:00:00 2001 From: leo-ryu Date: Thu, 9 Jan 2025 16:37:39 +0800 Subject: [PATCH 4/5] block unmatched empty domains --- pkg/controllers/provisioning/provisioner.go | 2 +- .../scheduling/scheduling_benchmark_test.go | 3 +- .../provisioning/scheduling/topology.go | 8 +- .../provisioning/scheduling/topologygroup.go | 81 ++++++++++++++++--- .../scheduling/topologynodefilter.go | 1 + .../provisioning/scheduling/volumetopology.go | 8 +- 6 files changed, 80 insertions(+), 23 deletions(-) diff --git a/pkg/controllers/provisioning/provisioner.go b/pkg/controllers/provisioning/provisioner.go index 78f667106e..7711cefd25 100644 --- a/pkg/controllers/provisioning/provisioner.go +++ b/pkg/controllers/provisioning/provisioner.go @@ -471,7 +471,7 @@ func validateKarpenterManagedLabelCanExist(p *corev1.Pod) error { func (p *Provisioner) convertToPodVolumeRequirements(ctx context.Context, pods []*corev1.Pod) map[*corev1.Pod][]corev1.NodeSelectorRequirement { var schedulablePods = make(map[*corev1.Pod][]corev1.NodeSelectorRequirement) for _, pod := range pods { - if err, requirements := p.volumeTopology.GetVolumeRequirements(ctx, pod); err != nil { + if requirements, err := p.volumeTopology.GetVolumeRequirements(ctx, pod); err != nil { log.FromContext(ctx).WithValues("Pod", klog.KRef(pod.Namespace, pod.Name)).Error(err, "failed getting volume topology requirements") } else { schedulablePods[pod] = requirements diff --git a/pkg/controllers/provisioning/scheduling/scheduling_benchmark_test.go b/pkg/controllers/provisioning/scheduling/scheduling_benchmark_test.go index 2844b95362..16006899df 100644 --- a/pkg/controllers/provisioning/scheduling/scheduling_benchmark_test.go +++ b/pkg/controllers/provisioning/scheduling/scheduling_benchmark_test.go @@ -168,7 +168,8 @@ func benchmarkScheduler(b *testing.B, instanceCount, podCount int) { clock := &clock.RealClock{} cluster = state.NewCluster(clock, client, cloudProvider) domains := map[string]sets.Set[string]{} - topology, err := scheduling.NewTopology(ctx, client, cluster, domains, pods) + podsVolumeRequirements := make(map[*corev1.Pod][]corev1.NodeSelectorRequirement) + topology, err := scheduling.NewTopology(ctx, client, cluster, domains, podsVolumeRequirements) if err != nil { b.Fatalf("creating topology, %s", err) } diff --git a/pkg/controllers/provisioning/scheduling/topology.go b/pkg/controllers/provisioning/scheduling/topology.go index 9956eedd65..57caed3dc4 100644 --- a/pkg/controllers/provisioning/scheduling/topology.go +++ b/pkg/controllers/provisioning/scheduling/topology.go @@ -178,7 +178,7 @@ func (t *Topology) AddRequirements(podRequirements, nodeRequirements scheduling. if nodeRequirements.Has(topology.Key) { nodeDomains = nodeRequirements.Get(topology.Key) } - domains := topology.Get(p, podDomains, nodeDomains) + domains := topology.Get(p, podDomains, nodeDomains, len(t.podVolumeRequirements[p]) != 0) if domains.Len() == 0 { return nil, topologyError{ topology: topology, @@ -249,7 +249,7 @@ func (t *Topology) updateInverseAntiAffinity(ctx context.Context, pod *corev1.Po return err } - tg := NewTopologyGroup(TopologyTypePodAntiAffinity, term.TopologyKey, pod, namespaces, term.LabelSelector, math.MaxInt32, nil, t.domains[term.TopologyKey]) + tg := NewTopologyGroup(TopologyTypePodAntiAffinity, term.TopologyKey, pod, t.cluster, namespaces, term.LabelSelector, math.MaxInt32, nil, t.domains[term.TopologyKey]) hash := tg.Hash() if existing, ok := t.inverseTopologies[hash]; !ok { @@ -327,7 +327,7 @@ func (t *Topology) countDomains(ctx context.Context, tg *TopologyGroup) error { func (t *Topology) newForTopologies(p *corev1.Pod) []*TopologyGroup { var topologyGroups []*TopologyGroup for _, cs := range p.Spec.TopologySpreadConstraints { - topologyGroups = append(topologyGroups, NewTopologyGroup(TopologyTypeSpread, cs.TopologyKey, p, sets.New(p.Namespace), cs.LabelSelector, cs.MaxSkew, cs.MinDomains, t.domains[cs.TopologyKey])) + topologyGroups = append(topologyGroups, NewTopologyGroup(TopologyTypeSpread, cs.TopologyKey, p, t.cluster, sets.New(p.Namespace), cs.LabelSelector, cs.MaxSkew, cs.MinDomains, t.domains[cs.TopologyKey])) } return topologyGroups } @@ -364,7 +364,7 @@ func (t *Topology) newForAffinities(ctx context.Context, p *corev1.Pod) ([]*Topo if err != nil { return nil, err } - topologyGroups = append(topologyGroups, NewTopologyGroup(topologyType, term.TopologyKey, p, namespaces, term.LabelSelector, math.MaxInt32, nil, t.domains[term.TopologyKey])) + topologyGroups = append(topologyGroups, NewTopologyGroup(topologyType, term.TopologyKey, p, t.cluster, namespaces, term.LabelSelector, math.MaxInt32, nil, t.domains[term.TopologyKey])) } } return topologyGroups, nil diff --git a/pkg/controllers/provisioning/scheduling/topologygroup.go b/pkg/controllers/provisioning/scheduling/topologygroup.go index fe7376b850..958139057c 100644 --- a/pkg/controllers/provisioning/scheduling/topologygroup.go +++ b/pkg/controllers/provisioning/scheduling/topologygroup.go @@ -29,6 +29,7 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" + "sigs.k8s.io/karpenter/pkg/controllers/state" "sigs.k8s.io/karpenter/pkg/scheduling" ) @@ -59,6 +60,7 @@ type TopologyGroup struct { Type TopologyType maxSkew int32 minDomains *int32 + cluster *state.Cluster namespaces sets.Set[string] selector labels.Selector rawSelector *metav1.LabelSelector @@ -69,7 +71,7 @@ type TopologyGroup struct { emptyDomains sets.Set[string] // domains for which we know that no pod exists } -func NewTopologyGroup(topologyType TopologyType, topologyKey string, pod *v1.Pod, namespaces sets.Set[string], labelSelector *metav1.LabelSelector, maxSkew int32, minDomains *int32, domains sets.Set[string]) *TopologyGroup { +func NewTopologyGroup(topologyType TopologyType, topologyKey string, pod *v1.Pod, cluster *state.Cluster, namespaces sets.Set[string], labelSelector *metav1.LabelSelector, maxSkew int32, minDomains *int32, domains sets.Set[string]) *TopologyGroup { domainCounts := map[string]int32{} for domain := range domains { domainCounts[domain] = 0 @@ -86,6 +88,7 @@ func NewTopologyGroup(topologyType TopologyType, topologyKey string, pod *v1.Pod return &TopologyGroup{ Type: topologyType, Key: topologyKey, + cluster: cluster, namespaces: namespaces, selector: selector, rawSelector: labelSelector, @@ -98,10 +101,10 @@ func NewTopologyGroup(topologyType TopologyType, topologyKey string, pod *v1.Pod } } -func (t *TopologyGroup) Get(pod *v1.Pod, podDomains, nodeDomains *scheduling.Requirement) *scheduling.Requirement { +func (t *TopologyGroup) Get(pod *v1.Pod, podDomains, nodeDomains *scheduling.Requirement, hasVolumeRequirements bool) *scheduling.Requirement { switch t.Type { case TopologyTypeSpread: - return t.nextDomainTopologySpread(pod, podDomains, nodeDomains) + return t.nextDomainTopologySpread(pod, podDomains, nodeDomains, hasVolumeRequirements) case TopologyTypePodAffinity: return t.nextDomainAffinity(pod, podDomains, nodeDomains) case TopologyTypePodAntiAffinity: @@ -175,34 +178,78 @@ func (t *TopologyGroup) Hash() uint64 { } // nextDomainTopologySpread returns a scheduling.Requirement that includes a node domain that a pod should be scheduled to. -// If there are multiple eligible domains, we return all eligible domains that satisfies the `maxSkew` configuration. +// If there are multiple eligible domains, we return any random domain that satisfies the `maxSkew` configuration. // If there are no eligible domains, we return a `DoesNotExist` requirement, implying that we could not satisfy the topologySpread requirement. // nolint:gocyclo -func (t *TopologyGroup) nextDomainTopologySpread(pod *v1.Pod, podDomains, nodeDomains *scheduling.Requirement) *scheduling.Requirement { +func (t *TopologyGroup) nextDomainTopologySpread(pod *v1.Pod, podDomains, nodeDomains *scheduling.Requirement, hasVolumeRequirement bool) *scheduling.Requirement { + var nodes = make(map[string][]*v1.Node) + var blockedDomains = sets.New[string]() + var candidateDomains = []string{} + var firstDomains = []string{} + + if t.cluster != nil { + for _, node := range t.cluster.Nodes() { + if node == nil || node.Node == nil { + continue + } + if _, ok := node.Node.GetLabels()[t.Key]; !ok { + continue + } + nodes[node.Node.GetLabels()[t.Key]] = append(nodes[node.Node.GetLabels()[t.Key]], node.Node) + } + } + // some empty domains, which all existing nodes with them don't match the pod, should not be in the calculations. + for _, domain := range t.emptyDomains.UnsortedList() { + // no existing node has this domain, so this domain is in nodeclaim and may will be created first time. + if len(nodes[domain]) == 0 { + // if we have volume requirement, we should block the first time domain, since it's skew is always 0 which may break the skew caculations. + if hasVolumeRequirement { + firstDomains = append(firstDomains, domain) + } else { + continue + } + } + var needBlock = true + for _, node := range nodes[domain] { + if node.GetLabels()[t.Key] == domain && t.nodeFilter.Matches(node) { + needBlock = false + break + } + } + if needBlock { + blockedDomains.Insert(domain) + } + } // min count is calculated across all domains - min := t.domainMinCount(podDomains) + min := t.domainMinCount(podDomains, blockedDomains) selfSelecting := t.selects(pod) - candidateDomains := []string{} + minDomain := "" + minCount := int32(math.MaxInt32) + // If we are explicitly selecting on specific node domains ("In" requirement), // this is going to be more efficient to iterate through // This is particularly useful when considering the hostname topology key that can have a // lot of t.domains but only a single nodeDomain if nodeDomains.Operator() == v1.NodeSelectorOpIn { for _, domain := range nodeDomains.Values() { - if count, ok := t.domains[domain]; ok { + if count, ok := t.domains[domain]; ok && !blockedDomains.Has(domain) { if selfSelecting { count++ } if count-min <= t.maxSkew { candidateDomains = append(candidateDomains, domain) + if count < minCount { + minDomain = domain + minCount = count + } } } } } else { for domain := range t.domains { // but we can only choose from the node domains - if nodeDomains.Has(domain) { + if nodeDomains.Has(domain) && !blockedDomains.Has(domain) { // comment from kube-scheduler regarding the viable choices to schedule to based on skew is: // 'existing matching num' + 'if self-match (1 or 0)' - 'global min matching num' <= 'maxSkew' count := t.domains[domain] @@ -211,18 +258,26 @@ func (t *TopologyGroup) nextDomainTopologySpread(pod *v1.Pod, podDomains, nodeDo } if count-min <= t.maxSkew { candidateDomains = append(candidateDomains, domain) + if count < minCount { + minDomain = domain + minCount = count + } } } } } - if len(candidateDomains) == 0 { + if minDomain == "" && len(firstDomains) == 0 { // avoids an error message about 'zone in [""]', preferring 'zone in []' return scheduling.NewRequirement(podDomains.Key, v1.NodeSelectorOpDoesNotExist) } - return scheduling.NewRequirement(podDomains.Key, v1.NodeSelectorOpIn, candidateDomains...) + // we should pop all candidate domains for volume requirments + if hasVolumeRequirement { + return scheduling.NewRequirement(podDomains.Key, v1.NodeSelectorOpIn, append(firstDomains, candidateDomains...)...) + } + return scheduling.NewRequirement(podDomains.Key, v1.NodeSelectorOpIn, minDomain) } -func (t *TopologyGroup) domainMinCount(domains *scheduling.Requirement) int32 { +func (t *TopologyGroup) domainMinCount(domains *scheduling.Requirement, blockedDomains sets.Set[string]) int32 { // hostname based topologies always have a min pod count of zero since we can create one if t.Key == v1.LabelHostname { return 0 @@ -232,7 +287,7 @@ func (t *TopologyGroup) domainMinCount(domains *scheduling.Requirement) int32 { var numPodSupportedDomains int32 // determine our current min count for domain, count := range t.domains { - if domains.Has(domain) { + if domains.Has(domain) && !blockedDomains.Has(domain) { numPodSupportedDomains++ if count < min { min = count diff --git a/pkg/controllers/provisioning/scheduling/topologynodefilter.go b/pkg/controllers/provisioning/scheduling/topologynodefilter.go index d73b3b7936..0533880425 100644 --- a/pkg/controllers/provisioning/scheduling/topologynodefilter.go +++ b/pkg/controllers/provisioning/scheduling/topologynodefilter.go @@ -51,6 +51,7 @@ func MakeTopologyNodeFilter(p *v1.Pod) TopologyNodeFilter { } // Matches returns true if the TopologyNodeFilter doesn't prohibit node from the participating in the topology +// TODO: Node filter should respect nodeAffinityPolicy/nodeTaintsPolicy field in future. func (t TopologyNodeFilter) Matches(node *v1.Node) bool { return t.MatchesRequirements(scheduling.NewLabelRequirements(node.Labels)) } diff --git a/pkg/controllers/provisioning/scheduling/volumetopology.go b/pkg/controllers/provisioning/scheduling/volumetopology.go index 6d0411e6d6..4417a89aa2 100644 --- a/pkg/controllers/provisioning/scheduling/volumetopology.go +++ b/pkg/controllers/provisioning/scheduling/volumetopology.go @@ -39,23 +39,23 @@ type VolumeTopology struct { kubeClient client.Client } -func (v *VolumeTopology) GetVolumeRequirements(ctx context.Context, pod *v1.Pod) (error, []v1.NodeSelectorRequirement) { +func (v *VolumeTopology) GetVolumeRequirements(ctx context.Context, pod *v1.Pod) ([]v1.NodeSelectorRequirement, error) { var requirements []v1.NodeSelectorRequirement for _, volume := range pod.Spec.Volumes { req, err := v.getRequirements(ctx, pod, volume) if err != nil { - return err, nil + return nil, err } requirements = append(requirements, req...) } if len(requirements) == 0 { - return nil, requirements + return requirements, nil } log.FromContext(ctx). WithValues("Pod", klog.KRef(pod.Namespace, pod.Name)). V(1).Info(fmt.Sprintf("found requirements from pod volumes, %s", requirements)) - return nil, requirements + return requirements, nil } func (v *VolumeTopology) getRequirements(ctx context.Context, pod *v1.Pod, volume v1.Volume) ([]v1.NodeSelectorRequirement, error) { From e464fbfb6709b5203cd2328909797ca02872a80a Mon Sep 17 00:00:00 2001 From: leo-ryu Date: Mon, 13 Jan 2025 00:32:12 +0800 Subject: [PATCH 5/5] add test case --- .../provisioning/scheduling/suite_test.go | 142 ++++++++++++++++++ 1 file changed, 142 insertions(+) diff --git a/pkg/controllers/provisioning/scheduling/suite_test.go b/pkg/controllers/provisioning/scheduling/suite_test.go index aca19883e7..0ff4c78ae1 100644 --- a/pkg/controllers/provisioning/scheduling/suite_test.go +++ b/pkg/controllers/provisioning/scheduling/suite_test.go @@ -3437,6 +3437,148 @@ var _ = Context("Scheduling", func() { Expect(node.Name).ToNot(Equal(node2.Name)) }) }) + Context("Pods with Zonal Volume and Topology Spread", func() { + var labels = map[string]string{"test": "test"} + var pvcs []*corev1.PersistentVolumeClaim + var pods []*corev1.Pod + var sc1 *storagev1.StorageClass + var sc2 *storagev1.StorageClass + var tsc = corev1.TopologySpreadConstraint{ + MaxSkew: 1, + TopologyKey: corev1.LabelTopologyZone, + WhenUnsatisfiable: corev1.DoNotSchedule, + LabelSelector: &metav1.LabelSelector{MatchLabels: labels}, + } + BeforeEach(func() { + pvcs = []*corev1.PersistentVolumeClaim{} + pods = []*corev1.Pod{} + sc1 = test.StorageClass(test.StorageClassOptions{ + ObjectMeta: metav1.ObjectMeta{Name: "my-storage-class-1"}, + Zones: []string{"test-zone-1"}, + }) + sc2 = test.StorageClass(test.StorageClassOptions{ + ObjectMeta: metav1.ObjectMeta{Name: "my-storage-class-2"}, + Zones: []string{"test-zone-2"}, + }) + for i := 0; i < 3; i++ { + // one is in test-zone-1 and others are in test-zone-2 + scname := sc1.Name + if i > 0 { + scname = sc2.Name + } + pvc := test.PersistentVolumeClaim(test.PersistentVolumeClaimOptions{ + ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("my-claim-%d", i)}, + StorageClassName: lo.ToPtr(scname), + }) + pod := test.UnschedulablePod(test.PodOptions{ + // to ensure one node with one pod + PodAntiRequirements: []corev1.PodAffinityTerm{ + { + LabelSelector: &metav1.LabelSelector{MatchLabels: labels}, + TopologyKey: corev1.LabelHostname, + }, + }, + TopologySpreadConstraints: []corev1.TopologySpreadConstraint{tsc}, + PersistentVolumeClaims: []string{pvc.Name}, + ObjectMeta: metav1.ObjectMeta{Labels: labels}, + }) + pvcs = append(pvcs, pvc) + pods = append(pods, pod) + } + }) + It("should launch nodes when volume zone is compatible with topology spread", func() { + node1 := test.Node(test.NodeOptions{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{corev1.LabelTopologyZone: "test-zone-1"}, + }, + }) + node2 := test.Node(test.NodeOptions{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{corev1.LabelTopologyZone: "test-zone-2"}, + }, + }) + ExpectApplied(ctx, env.Client, nodePool, sc1, sc2) + ExpectApplied(ctx, env.Client, pvcs[0], pvcs[1], pvcs[2]) + ExpectApplied(ctx, env.Client, pods[0], pods[1], node1, node2) + ExpectManualBinding(ctx, env.Client, pods[0], node1) + ExpectManualBinding(ctx, env.Client, pods[1], node2) + + ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{node1, node2}, nil) + + ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov, pods[2]) + ExpectScheduled(ctx, env.Client, pods[2]) + }) + It("should not launch nodes when volume zone is not compatible with topology spread", func() { + node1 := test.Node(test.NodeOptions{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{corev1.LabelTopologyZone: "test-zone-1"}, + }, + }) + node2 := test.Node(test.NodeOptions{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{corev1.LabelTopologyZone: "test-zone-2"}, + }, + }) + node3 := test.Node(test.NodeOptions{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{corev1.LabelTopologyZone: "test-zone-3"}, + }, + }) + + ExpectApplied(ctx, env.Client, nodePool, sc1, sc2) + ExpectApplied(ctx, env.Client, pvcs[0], pvcs[1], pvcs[2]) + ExpectApplied(ctx, env.Client, pods[0], pods[1], node1, node2, node3) + ExpectManualBinding(ctx, env.Client, pods[0], node1) + ExpectManualBinding(ctx, env.Client, pods[1], node2) + + ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{node1, node2, node3}, nil) + + ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov, pods[2]) + // for topology spread 3rd pod should be schduled to test-zone-3, but volume need be in test-zone-2 + ExpectNotScheduled(ctx, env.Client, pods[2]) + + }) + It("only nodes matching nodeAffinity/nodeSelector are included in the calculations by default", func() { + node1 := test.Node(test.NodeOptions{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{corev1.LabelTopologyZone: "test-zone-1", "test": "test"}, + }, + }) + node2 := test.Node(test.NodeOptions{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{corev1.LabelTopologyZone: "test-zone-2", "test": "test"}, + }, + }) + node3 := test.Node(test.NodeOptions{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{corev1.LabelTopologyZone: "test-zone-3"}, + }, + }) + nodePool.Spec.Template.Spec.Requirements = []v1.NodeSelectorRequirementWithMinValues{ + { + NodeSelectorRequirement: corev1.NodeSelectorRequirement{ + Key: "test", + Operator: corev1.NodeSelectorOpIn, + Values: []string{"test"}, + }, + }, + } + pods[2].Spec.NodeSelector = map[string]string{"test": "test"} + + ExpectApplied(ctx, env.Client, nodePool, sc1, sc2) + ExpectApplied(ctx, env.Client, pvcs[0], pvcs[1], pvcs[2]) + ExpectApplied(ctx, env.Client, pods[0], pods[1], node1, node2, node3) + ExpectManualBinding(ctx, env.Client, pods[0], node1) + ExpectManualBinding(ctx, env.Client, pods[1], node2) + + ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{node1, node2, node3}, nil) + + ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov, pods[2]) + // since there is no node in test-zone-3 has label test, just test-zone-1 and test-zone-2 are included in the calculations. + ExpectScheduled(ctx, env.Client, pods[2]) + + }) + }) }) Describe("Deleting Nodes", func() {