From c355d1475e63b353db220f0d54a118a8f4d1dc97 Mon Sep 17 00:00:00 2001 From: leo-ryu Date: Thu, 9 Jan 2025 16:37:39 +0800 Subject: [PATCH] block unmatched empty domains --- .../disruption/consolidation_test.go | 7 +++ pkg/controllers/provisioning/provisioner.go | 2 +- .../scheduling/scheduling_benchmark_test.go | 3 +- .../provisioning/scheduling/topology.go | 6 +-- .../provisioning/scheduling/topologygroup.go | 45 ++++++++++++++++--- .../scheduling/topologynodefilter.go | 1 + .../provisioning/scheduling/volumetopology.go | 8 ++-- 7 files changed, 57 insertions(+), 15 deletions(-) diff --git a/pkg/controllers/disruption/consolidation_test.go b/pkg/controllers/disruption/consolidation_test.go index 07a7795f55..8b038971c4 100644 --- a/pkg/controllers/disruption/consolidation_test.go +++ b/pkg/controllers/disruption/consolidation_test.go @@ -4129,6 +4129,13 @@ var _ = Describe("Consolidation", func() { pods := test.Pods(4, test.PodOptions{ ResourceRequirements: corev1.ResourceRequirements{Requests: map[corev1.ResourceName]resource.Quantity{corev1.ResourceCPU: resource.MustParse("1")}}, TopologySpreadConstraints: []corev1.TopologySpreadConstraint{tsc}, + // to ensure that one node with one pod + PodAntiRequirements: []corev1.PodAffinityTerm{ + { + LabelSelector: &metav1.LabelSelector{MatchLabels: labels}, + TopologyKey: corev1.LabelHostname, + }, + }, ObjectMeta: metav1.ObjectMeta{ Labels: labels, OwnerReferences: []metav1.OwnerReference{ diff --git a/pkg/controllers/provisioning/provisioner.go b/pkg/controllers/provisioning/provisioner.go index 78f667106e..7711cefd25 100644 --- a/pkg/controllers/provisioning/provisioner.go +++ b/pkg/controllers/provisioning/provisioner.go @@ -471,7 +471,7 @@ func validateKarpenterManagedLabelCanExist(p *corev1.Pod) error { func (p *Provisioner) convertToPodVolumeRequirements(ctx context.Context, pods []*corev1.Pod) map[*corev1.Pod][]corev1.NodeSelectorRequirement { var schedulablePods = make(map[*corev1.Pod][]corev1.NodeSelectorRequirement) for _, pod := range pods { - if err, requirements := p.volumeTopology.GetVolumeRequirements(ctx, pod); err != nil { + if requirements, err := p.volumeTopology.GetVolumeRequirements(ctx, pod); err != nil { log.FromContext(ctx).WithValues("Pod", klog.KRef(pod.Namespace, pod.Name)).Error(err, "failed getting volume topology requirements") } else { schedulablePods[pod] = requirements diff --git a/pkg/controllers/provisioning/scheduling/scheduling_benchmark_test.go b/pkg/controllers/provisioning/scheduling/scheduling_benchmark_test.go index 2844b95362..16006899df 100644 --- a/pkg/controllers/provisioning/scheduling/scheduling_benchmark_test.go +++ b/pkg/controllers/provisioning/scheduling/scheduling_benchmark_test.go @@ -168,7 +168,8 @@ func benchmarkScheduler(b *testing.B, instanceCount, podCount int) { clock := &clock.RealClock{} cluster = state.NewCluster(clock, client, cloudProvider) domains := map[string]sets.Set[string]{} - topology, err := scheduling.NewTopology(ctx, client, cluster, domains, pods) + podsVolumeRequirements := make(map[*corev1.Pod][]corev1.NodeSelectorRequirement) + topology, err := scheduling.NewTopology(ctx, client, cluster, domains, podsVolumeRequirements) if err != nil { b.Fatalf("creating topology, %s", err) } diff --git a/pkg/controllers/provisioning/scheduling/topology.go b/pkg/controllers/provisioning/scheduling/topology.go index 9956eedd65..d8531d43e6 100644 --- a/pkg/controllers/provisioning/scheduling/topology.go +++ b/pkg/controllers/provisioning/scheduling/topology.go @@ -249,7 +249,7 @@ func (t *Topology) updateInverseAntiAffinity(ctx context.Context, pod *corev1.Po return err } - tg := NewTopologyGroup(TopologyTypePodAntiAffinity, term.TopologyKey, pod, namespaces, term.LabelSelector, math.MaxInt32, nil, t.domains[term.TopologyKey]) + tg := NewTopologyGroup(TopologyTypePodAntiAffinity, term.TopologyKey, pod, t.cluster, namespaces, term.LabelSelector, math.MaxInt32, nil, t.domains[term.TopologyKey]) hash := tg.Hash() if existing, ok := t.inverseTopologies[hash]; !ok { @@ -327,7 +327,7 @@ func (t *Topology) countDomains(ctx context.Context, tg *TopologyGroup) error { func (t *Topology) newForTopologies(p *corev1.Pod) []*TopologyGroup { var topologyGroups []*TopologyGroup for _, cs := range p.Spec.TopologySpreadConstraints { - topologyGroups = append(topologyGroups, NewTopologyGroup(TopologyTypeSpread, cs.TopologyKey, p, sets.New(p.Namespace), cs.LabelSelector, cs.MaxSkew, cs.MinDomains, t.domains[cs.TopologyKey])) + topologyGroups = append(topologyGroups, NewTopologyGroup(TopologyTypeSpread, cs.TopologyKey, p, t.cluster, sets.New(p.Namespace), cs.LabelSelector, cs.MaxSkew, cs.MinDomains, t.domains[cs.TopologyKey])) } return topologyGroups } @@ -364,7 +364,7 @@ func (t *Topology) newForAffinities(ctx context.Context, p *corev1.Pod) ([]*Topo if err != nil { return nil, err } - topologyGroups = append(topologyGroups, NewTopologyGroup(topologyType, term.TopologyKey, p, namespaces, term.LabelSelector, math.MaxInt32, nil, t.domains[term.TopologyKey])) + topologyGroups = append(topologyGroups, NewTopologyGroup(topologyType, term.TopologyKey, p, t.cluster, namespaces, term.LabelSelector, math.MaxInt32, nil, t.domains[term.TopologyKey])) } } return topologyGroups, nil diff --git a/pkg/controllers/provisioning/scheduling/topologygroup.go b/pkg/controllers/provisioning/scheduling/topologygroup.go index fe7376b850..89f73905a4 100644 --- a/pkg/controllers/provisioning/scheduling/topologygroup.go +++ b/pkg/controllers/provisioning/scheduling/topologygroup.go @@ -29,6 +29,7 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" + "sigs.k8s.io/karpenter/pkg/controllers/state" "sigs.k8s.io/karpenter/pkg/scheduling" ) @@ -59,6 +60,7 @@ type TopologyGroup struct { Type TopologyType maxSkew int32 minDomains *int32 + cluster *state.Cluster namespaces sets.Set[string] selector labels.Selector rawSelector *metav1.LabelSelector @@ -69,7 +71,7 @@ type TopologyGroup struct { emptyDomains sets.Set[string] // domains for which we know that no pod exists } -func NewTopologyGroup(topologyType TopologyType, topologyKey string, pod *v1.Pod, namespaces sets.Set[string], labelSelector *metav1.LabelSelector, maxSkew int32, minDomains *int32, domains sets.Set[string]) *TopologyGroup { +func NewTopologyGroup(topologyType TopologyType, topologyKey string, pod *v1.Pod, cluster *state.Cluster, namespaces sets.Set[string], labelSelector *metav1.LabelSelector, maxSkew int32, minDomains *int32, domains sets.Set[string]) *TopologyGroup { domainCounts := map[string]int32{} for domain := range domains { domainCounts[domain] = 0 @@ -86,6 +88,7 @@ func NewTopologyGroup(topologyType TopologyType, topologyKey string, pod *v1.Pod return &TopologyGroup{ Type: topologyType, Key: topologyKey, + cluster: cluster, namespaces: namespaces, selector: selector, rawSelector: labelSelector, @@ -179,8 +182,38 @@ func (t *TopologyGroup) Hash() uint64 { // If there are no eligible domains, we return a `DoesNotExist` requirement, implying that we could not satisfy the topologySpread requirement. // nolint:gocyclo func (t *TopologyGroup) nextDomainTopologySpread(pod *v1.Pod, podDomains, nodeDomains *scheduling.Requirement) *scheduling.Requirement { + var nodes = make(map[string][]*v1.Node) + var blockedDomains = sets.New[string]() + if t.cluster != nil { + for _, node := range t.cluster.Nodes() { + if node == nil || node.Node == nil { + continue + } + if _, ok := node.Node.GetLabels()[t.Key]; !ok { + continue + } + nodes[node.Node.GetLabels()[t.Key]] = append(nodes[node.Node.GetLabels()[t.Key]], node.Node) + } + } + // some empty domains, which all existing nodes with them don't match the pod, should not be in the calculations. + for _, domain := range t.emptyDomains.UnsortedList() { + // no existing node has this domain + if len(nodes[domain]) == 0 { + continue + } + var needBlock = true + for _, node := range nodes[domain] { + if node.GetLabels()[t.Key] == domain && t.nodeFilter.Matches(node) { + needBlock = false + break + } + } + if needBlock { + blockedDomains.Insert(domain) + } + } // min count is calculated across all domains - min := t.domainMinCount(podDomains) + min := t.domainMinCount(podDomains, blockedDomains) selfSelecting := t.selects(pod) candidateDomains := []string{} @@ -190,7 +223,7 @@ func (t *TopologyGroup) nextDomainTopologySpread(pod *v1.Pod, podDomains, nodeDo // lot of t.domains but only a single nodeDomain if nodeDomains.Operator() == v1.NodeSelectorOpIn { for _, domain := range nodeDomains.Values() { - if count, ok := t.domains[domain]; ok { + if count, ok := t.domains[domain]; ok && !blockedDomains.Has(domain) { if selfSelecting { count++ } @@ -202,7 +235,7 @@ func (t *TopologyGroup) nextDomainTopologySpread(pod *v1.Pod, podDomains, nodeDo } else { for domain := range t.domains { // but we can only choose from the node domains - if nodeDomains.Has(domain) { + if nodeDomains.Has(domain) && !blockedDomains.Has(domain) { // comment from kube-scheduler regarding the viable choices to schedule to based on skew is: // 'existing matching num' + 'if self-match (1 or 0)' - 'global min matching num' <= 'maxSkew' count := t.domains[domain] @@ -222,7 +255,7 @@ func (t *TopologyGroup) nextDomainTopologySpread(pod *v1.Pod, podDomains, nodeDo return scheduling.NewRequirement(podDomains.Key, v1.NodeSelectorOpIn, candidateDomains...) } -func (t *TopologyGroup) domainMinCount(domains *scheduling.Requirement) int32 { +func (t *TopologyGroup) domainMinCount(domains *scheduling.Requirement, blockedDomains sets.Set[string]) int32 { // hostname based topologies always have a min pod count of zero since we can create one if t.Key == v1.LabelHostname { return 0 @@ -232,7 +265,7 @@ func (t *TopologyGroup) domainMinCount(domains *scheduling.Requirement) int32 { var numPodSupportedDomains int32 // determine our current min count for domain, count := range t.domains { - if domains.Has(domain) { + if domains.Has(domain) && !blockedDomains.Has(domain) { numPodSupportedDomains++ if count < min { min = count diff --git a/pkg/controllers/provisioning/scheduling/topologynodefilter.go b/pkg/controllers/provisioning/scheduling/topologynodefilter.go index d73b3b7936..0533880425 100644 --- a/pkg/controllers/provisioning/scheduling/topologynodefilter.go +++ b/pkg/controllers/provisioning/scheduling/topologynodefilter.go @@ -51,6 +51,7 @@ func MakeTopologyNodeFilter(p *v1.Pod) TopologyNodeFilter { } // Matches returns true if the TopologyNodeFilter doesn't prohibit node from the participating in the topology +// TODO: Node filter should respect nodeAffinityPolicy/nodeTaintsPolicy field in future. func (t TopologyNodeFilter) Matches(node *v1.Node) bool { return t.MatchesRequirements(scheduling.NewLabelRequirements(node.Labels)) } diff --git a/pkg/controllers/provisioning/scheduling/volumetopology.go b/pkg/controllers/provisioning/scheduling/volumetopology.go index 6d0411e6d6..4417a89aa2 100644 --- a/pkg/controllers/provisioning/scheduling/volumetopology.go +++ b/pkg/controllers/provisioning/scheduling/volumetopology.go @@ -39,23 +39,23 @@ type VolumeTopology struct { kubeClient client.Client } -func (v *VolumeTopology) GetVolumeRequirements(ctx context.Context, pod *v1.Pod) (error, []v1.NodeSelectorRequirement) { +func (v *VolumeTopology) GetVolumeRequirements(ctx context.Context, pod *v1.Pod) ([]v1.NodeSelectorRequirement, error) { var requirements []v1.NodeSelectorRequirement for _, volume := range pod.Spec.Volumes { req, err := v.getRequirements(ctx, pod, volume) if err != nil { - return err, nil + return nil, err } requirements = append(requirements, req...) } if len(requirements) == 0 { - return nil, requirements + return requirements, nil } log.FromContext(ctx). WithValues("Pod", klog.KRef(pod.Namespace, pod.Name)). V(1).Info(fmt.Sprintf("found requirements from pod volumes, %s", requirements)) - return nil, requirements + return requirements, nil } func (v *VolumeTopology) getRequirements(ctx context.Context, pod *v1.Pod, volume v1.Volume) ([]v1.NodeSelectorRequirement, error) {