Skip to content

Commit

Permalink
Merge pull request kubernetes#57168 from yastij/predicates-ordering
Browse files Browse the repository at this point in the history
Automatic merge from submit-queue (batch tested with PRs 57252, 57168). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Implementing predicates ordering

**What this PR does / why we need it**: implements predicates ordering for the scheduler

**Which issue(s) this PR fixes** : Fixes kubernetes#53812 

**Special notes for your reviewer**:


@bsalamat @gmarek @resouer as discussed on slack, to implement ordering we have to choices:

- use a layered approach with a list that indexes the order of the predicates map

- change the underlying data structure used to represent a collection of predicates (a map in our case) into a list of predicates objects. 
Going with this solution might be "cleaner" but it will require a lot of changes and will increase the cost for accessing predicates from O(1) to O(n) (n being the number of predicates used by the scheduler).

we might go with this solution for now. If the number of predicates start growing, we might switch to the second option.
 
**Release note**:

```release-note
adding predicates ordering for the kubernetes scheduler.
```
  • Loading branch information
Kubernetes Submit Queue authored Dec 20, 2017
2 parents dd9bca8 + e62952d commit 51fbd6e
Show file tree
Hide file tree
Showing 8 changed files with 109 additions and 56 deletions.
45 changes: 42 additions & 3 deletions plugin/pkg/scheduler/algorithm/predicates/predicates.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,25 @@ import (
)

const (
MatchInterPodAffinity = "MatchInterPodAffinity"
CheckVolumeBinding = "CheckVolumeBinding"

MatchInterPodAffinityPred = "MatchInterPodAffinity"
CheckVolumeBindingPred = "CheckVolumeBinding"
CheckNodeConditionPred = "CheckNodeCondition"
GeneralPred = "GeneralPredicates"
HostNamePred = "HostName"
PodFitsHostPortsPred = "PodFitsHostPorts"
MatchNodeSelectorPred = "MatchNodeSelector"
PodFitsResourcesPred = "PodFitsResources"
NoDiskConflictPred = "NoDiskConflict"
PodToleratesNodeTaintsPred = "PodToleratesNodeTaints"
PodToleratesNodeNoExecuteTaintsPred = "PodToleratesNodeNoExecuteTaints"
CheckNodeLabelPresencePred = "CheckNodeLabelPresence"
checkServiceAffinityPred = "checkServiceAffinity"
MaxEBSVolumeCountPred = "MaxEBSVolumeCount"
MaxGCEPDVolumeCountPred = "MaxGCEPDVolumeCount"
MaxAzureDiskVolumeCountPred = "MaxAzureDiskVolumeCount"
NoVolumeZoneConflictPred = "NoVolumeZoneConflict"
CheckNodeMemoryPressurePred = "CheckNodeMemoryPressure"
CheckNodeDiskPressurePred = "CheckNodeDiskPressure"
// DefaultMaxGCEPDVolumes defines the maximum number of PD Volumes for GCE
// GCE instances can have up to 16 PD volumes attached.
DefaultMaxGCEPDVolumes = 16
Expand Down Expand Up @@ -79,6 +95,21 @@ const (
// For example:
// https://github.com/kubernetes/kubernetes/blob/36a218e/plugin/pkg/scheduler/factory/factory.go#L422

// IMPORTANT NOTE: this list contains the ordering of the predicates, if you develop a new predicate
// it is mandatory to add its name to this list.
// Otherwise it won't be processed, see generic_scheduler#podFitsOnNode().
// The order is based on the restrictiveness & complexity of predicates.
// Design doc: https://github.com/kubernetes/community/blob/master/contributors/design-proposals/scheduling/predicates-ordering.md
var (
predicatesOrdering = []string{CheckNodeConditionPred,
GeneralPred, HostNamePred, PodFitsHostPortsPred,
MatchNodeSelectorPred, PodFitsResourcesPred, NoDiskConflictPred,
PodToleratesNodeTaintsPred, PodToleratesNodeNoExecuteTaintsPred, CheckNodeLabelPresencePred,
checkServiceAffinityPred, MaxEBSVolumeCountPred, MaxGCEPDVolumeCountPred,
MaxAzureDiskVolumeCountPred, CheckVolumeBindingPred, NoVolumeZoneConflictPred,
CheckNodeMemoryPressurePred, CheckNodeDiskPressurePred, MatchInterPodAffinityPred}
)

// NodeInfo: Other types for predicate functions...
type NodeInfo interface {
GetNodeInfo(nodeID string) (*v1.Node, error)
Expand All @@ -93,6 +124,14 @@ type CachedPersistentVolumeInfo struct {
corelisters.PersistentVolumeLister
}

func PredicatesOrdering() []string {
return predicatesOrdering
}

func SetPredicatesOrdering(names []string) {
predicatesOrdering = names
}

func (c *CachedPersistentVolumeInfo) GetPersistentVolumeInfo(pvID string) (*v1.PersistentVolume, error) {
return c.Get(pvID)
}
Expand Down
40 changes: 20 additions & 20 deletions plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,17 +65,17 @@ func init() {
// Fit is defined based on the absence of port conflicts.
// This predicate is actually a default predicate, because it is invoked from
// predicates.GeneralPredicates()
factory.RegisterFitPredicate("PodFitsHostPorts", predicates.PodFitsHostPorts)
factory.RegisterFitPredicate(predicates.PodFitsHostPortsPred, predicates.PodFitsHostPorts)
// Fit is determined by resource availability.
// This predicate is actually a default predicate, because it is invoked from
// predicates.GeneralPredicates()
factory.RegisterFitPredicate("PodFitsResources", predicates.PodFitsResources)
factory.RegisterFitPredicate(predicates.PodFitsResourcesPred, predicates.PodFitsResources)
// Fit is determined by the presence of the Host parameter and a string match
// This predicate is actually a default predicate, because it is invoked from
// predicates.GeneralPredicates()
factory.RegisterFitPredicate("HostName", predicates.PodFitsHost)
factory.RegisterFitPredicate(predicates.HostNamePred, predicates.PodFitsHost)
// Fit is determined by node selector query.
factory.RegisterFitPredicate("MatchNodeSelector", predicates.PodMatchNodeSelector)
factory.RegisterFitPredicate(predicates.MatchNodeSelectorPred, predicates.PodMatchNodeSelector)

// Use equivalence class to speed up heavy predicates phase.
factory.RegisterGetEquivalencePodFunction(
Expand Down Expand Up @@ -117,62 +117,62 @@ func defaultPredicates() sets.String {
return sets.NewString(
// Fit is determined by volume zone requirements.
factory.RegisterFitPredicateFactory(
"NoVolumeZoneConflict",
predicates.NoVolumeZoneConflictPred,
func(args factory.PluginFactoryArgs) algorithm.FitPredicate {
return predicates.NewVolumeZonePredicate(args.PVInfo, args.PVCInfo, args.StorageClassInfo)
},
),
// Fit is determined by whether or not there would be too many AWS EBS volumes attached to the node
factory.RegisterFitPredicateFactory(
"MaxEBSVolumeCount",
predicates.MaxEBSVolumeCountPred,
func(args factory.PluginFactoryArgs) algorithm.FitPredicate {
return predicates.NewMaxPDVolumeCountPredicate(predicates.EBSVolumeFilterType, args.PVInfo, args.PVCInfo)
},
),
// Fit is determined by whether or not there would be too many GCE PD volumes attached to the node
factory.RegisterFitPredicateFactory(
"MaxGCEPDVolumeCount",
predicates.MaxGCEPDVolumeCountPred,
func(args factory.PluginFactoryArgs) algorithm.FitPredicate {
return predicates.NewMaxPDVolumeCountPredicate(predicates.GCEPDVolumeFilterType, args.PVInfo, args.PVCInfo)
},
),
// Fit is determined by whether or not there would be too many Azure Disk volumes attached to the node
factory.RegisterFitPredicateFactory(
"MaxAzureDiskVolumeCount",
predicates.MaxAzureDiskVolumeCountPred,
func(args factory.PluginFactoryArgs) algorithm.FitPredicate {
return predicates.NewMaxPDVolumeCountPredicate(predicates.AzureDiskVolumeFilterType, args.PVInfo, args.PVCInfo)
},
),
// Fit is determined by inter-pod affinity.
factory.RegisterFitPredicateFactory(
predicates.MatchInterPodAffinity,
predicates.MatchInterPodAffinityPred,
func(args factory.PluginFactoryArgs) algorithm.FitPredicate {
return predicates.NewPodAffinityPredicate(args.NodeInfo, args.PodLister)
},
),

// Fit is determined by non-conflicting disk volumes.
factory.RegisterFitPredicate("NoDiskConflict", predicates.NoDiskConflict),
factory.RegisterFitPredicate(predicates.NoDiskConflictPred, predicates.NoDiskConflict),

// GeneralPredicates are the predicates that are enforced by all Kubernetes components
// (e.g. kubelet and all schedulers)
factory.RegisterFitPredicate("GeneralPredicates", predicates.GeneralPredicates),
factory.RegisterFitPredicate(predicates.GeneralPred, predicates.GeneralPredicates),

// Fit is determined by node memory pressure condition.
factory.RegisterFitPredicate("CheckNodeMemoryPressure", predicates.CheckNodeMemoryPressurePredicate),
factory.RegisterFitPredicate(predicates.CheckNodeMemoryPressurePred, predicates.CheckNodeMemoryPressurePredicate),

// Fit is determined by node disk pressure condition.
factory.RegisterFitPredicate("CheckNodeDiskPressure", predicates.CheckNodeDiskPressurePredicate),
factory.RegisterFitPredicate(predicates.CheckNodeDiskPressurePred, predicates.CheckNodeDiskPressurePredicate),

// Fit is determined by node conditions: not ready, network unavailable or out of disk.
factory.RegisterMandatoryFitPredicate("CheckNodeCondition", predicates.CheckNodeConditionPredicate),
factory.RegisterMandatoryFitPredicate(predicates.CheckNodeConditionPred, predicates.CheckNodeConditionPredicate),

// Fit is determined based on whether a pod can tolerate all of the node's taints
factory.RegisterFitPredicate("PodToleratesNodeTaints", predicates.PodToleratesNodeTaints),
factory.RegisterFitPredicate(predicates.PodToleratesNodeTaintsPred, predicates.PodToleratesNodeTaints),

// Fit is determined by volume topology requirements.
factory.RegisterFitPredicateFactory(
predicates.CheckVolumeBinding,
predicates.CheckVolumeBindingPred,
func(args factory.PluginFactoryArgs) algorithm.FitPredicate {
return predicates.NewVolumeBindingPredicate(args.VolumeBinder)
},
Expand All @@ -185,18 +185,18 @@ func ApplyFeatureGates() {

if utilfeature.DefaultFeatureGate.Enabled(features.TaintNodesByCondition) {
// Remove "CheckNodeCondition" predicate
factory.RemoveFitPredicate("CheckNodeCondition")
factory.RemoveFitPredicate(predicates.CheckNodeConditionPred)
// Remove Key "CheckNodeCondition" From All Algorithm Provider
// The key will be removed from all providers which in algorithmProviderMap[]
// if you just want remove specific provider, call func RemovePredicateKeyFromAlgoProvider()
factory.RemovePredicateKeyFromAlgorithmProviderMap("CheckNodeCondition")
factory.RemovePredicateKeyFromAlgorithmProviderMap(predicates.CheckNodeConditionPred)

// Fit is determined based on whether a pod can tolerate all of the node's taints
factory.RegisterMandatoryFitPredicate("PodToleratesNodeTaints", predicates.PodToleratesNodeTaints)
factory.RegisterMandatoryFitPredicate(predicates.PodToleratesNodeTaintsPred, predicates.PodToleratesNodeTaints)
// Insert Key "PodToleratesNodeTaints" To All Algorithm Provider
// The key will insert to all providers which in algorithmProviderMap[]
// if you just want insert to specific provider, call func InsertPredicateKeyToAlgoProvider()
factory.InsertPredicateKeyToAlgorithmProviderMap("PodToleratesNodeTaints")
factory.InsertPredicateKeyToAlgorithmProviderMap(predicates.PodToleratesNodeTaintsPred)

glog.Warningf("TaintNodesByCondition is enabled, PodToleratesNodeTaints predicate is mandatory")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func TestDefaultPredicates(t *testing.T) {
"CheckNodeDiskPressure",
"CheckNodeCondition",
"PodToleratesNodeTaints",
predicates.CheckVolumeBinding,
predicates.CheckVolumeBindingPred,
)

if expected := defaultPredicates(); !result.Equal(expected) {
Expand Down
53 changes: 28 additions & 25 deletions plugin/pkg/scheduler/core/generic_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,34 +444,37 @@ func podFitsOnNode(
// TODO(bsalamat): consider using eCache and adding proper eCache invalidations
// when pods are nominated or their nominations change.
eCacheAvailable = eCacheAvailable && !podsAdded
for predicateKey, predicate := range predicateFuncs {
if eCacheAvailable {
// PredicateWithECache will return its cached predicate results.
fit, reasons, invalid = ecache.PredicateWithECache(pod.GetName(), info.Node().GetName(), predicateKey, equivalenceHash)
}

// TODO(bsalamat): When one predicate fails and fit is false, why do we continue
// checking other predicates?
if !eCacheAvailable || invalid {
// we need to execute predicate functions since equivalence cache does not work
fit, reasons, err = predicate(pod, metaToUse, nodeInfoToUse)
if err != nil {
return false, []algorithm.PredicateFailureReason{}, err
}
for _, predicateKey := range predicates.PredicatesOrdering() {
//TODO (yastij) : compute average predicate restrictiveness to export it as promethus metric
if predicate, exist := predicateFuncs[predicateKey]; exist {
if eCacheAvailable {
// Store data to update eCache after this loop.
if res, exists := predicateResults[predicateKey]; exists {
res.Fit = res.Fit && fit
res.FailReasons = append(res.FailReasons, reasons...)
predicateResults[predicateKey] = res
} else {
predicateResults[predicateKey] = HostPredicate{Fit: fit, FailReasons: reasons}
// PredicateWithECache will return its cached predicate results.
fit, reasons, invalid = ecache.PredicateWithECache(pod.GetName(), info.Node().GetName(), predicateKey, equivalenceHash)
}

// TODO(bsalamat): When one predicate fails and fit is false, why do we continue
// checking other predicates?
if !eCacheAvailable || invalid {
// we need to execute predicate functions since equivalence cache does not work
fit, reasons, err = predicate(pod, metaToUse, nodeInfoToUse)
if err != nil {
return false, []algorithm.PredicateFailureReason{}, err
}
if eCacheAvailable {
// Store data to update eCache after this loop.
if res, exists := predicateResults[predicateKey]; exists {
res.Fit = res.Fit && fit
res.FailReasons = append(res.FailReasons, reasons...)
predicateResults[predicateKey] = res
} else {
predicateResults[predicateKey] = HostPredicate{Fit: fit, FailReasons: reasons}
}
}
}
}
if !fit {
// eCache is available and valid, and predicates result is unfit, record the fail reasons
failedPredicates = append(failedPredicates, reasons...)
if !fit {
// eCache is available and valid, and predicates result is unfit, record the fail reasons
failedPredicates = append(failedPredicates, reasons...)
}
}
}
}
Expand Down
13 changes: 11 additions & 2 deletions plugin/pkg/scheduler/core/generic_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ import (
schedulertesting "k8s.io/kubernetes/plugin/pkg/scheduler/testing"
)

var (
order = []string{"false", "true", "matches", "nopods", predicates.MatchInterPodAffinityPred}
)

func falsePredicate(pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
return false, []algorithm.PredicateFailureReason{algorithmpredicates.ErrFakePredicate}, nil
}
Expand Down Expand Up @@ -181,6 +185,7 @@ func TestSelectHost(t *testing.T) {
}

func TestGenericScheduler(t *testing.T) {
predicates.SetPredicatesOrdering(order)
tests := []struct {
name string
predicates map[string]algorithm.FitPredicate
Expand Down Expand Up @@ -401,6 +406,7 @@ func TestGenericScheduler(t *testing.T) {
}

func TestFindFitAllError(t *testing.T) {
predicates.SetPredicatesOrdering(order)
nodes := []string{"3", "2", "1"}
predicates := map[string]algorithm.FitPredicate{"true": truePredicate, "false": falsePredicate}
nodeNameToInfo := map[string]*schedulercache.NodeInfo{
Expand Down Expand Up @@ -430,8 +436,9 @@ func TestFindFitAllError(t *testing.T) {
}

func TestFindFitSomeError(t *testing.T) {
predicates.SetPredicatesOrdering(order)
nodes := []string{"3", "2", "1"}
predicates := map[string]algorithm.FitPredicate{"true": truePredicate, "match": matchesPredicate}
predicates := map[string]algorithm.FitPredicate{"true": truePredicate, "matches": matchesPredicate}
pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "1"}}
nodeNameToInfo := map[string]*schedulercache.NodeInfo{
"3": schedulercache.NewNodeInfo(),
Expand Down Expand Up @@ -741,6 +748,7 @@ var negPriority, lowPriority, midPriority, highPriority, veryHighPriority = int3
// TestSelectNodesForPreemption tests selectNodesForPreemption. This test assumes
// that podsFitsOnNode works correctly and is tested separately.
func TestSelectNodesForPreemption(t *testing.T) {
predicates.SetPredicatesOrdering(order)
tests := []struct {
name string
predicates map[string]algorithm.FitPredicate
Expand Down Expand Up @@ -864,7 +872,7 @@ func TestSelectNodesForPreemption(t *testing.T) {
nodes = append(nodes, node)
}
if test.addAffinityPredicate {
test.predicates[predicates.MatchInterPodAffinity] = algorithmpredicates.NewPodAffinityPredicate(FakeNodeInfo(*nodes[0]), schedulertesting.FakePodLister(test.pods))
test.predicates[predicates.MatchInterPodAffinityPred] = algorithmpredicates.NewPodAffinityPredicate(FakeNodeInfo(*nodes[0]), schedulertesting.FakePodLister(test.pods))
}
nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, nodes)
nodeToPods, err := selectNodesForPreemption(test.pod, nodeNameToInfo, nodes, test.predicates, PredicateMetadata, nil, nil)
Expand All @@ -879,6 +887,7 @@ func TestSelectNodesForPreemption(t *testing.T) {

// TestPickOneNodeForPreemption tests pickOneNodeForPreemption.
func TestPickOneNodeForPreemption(t *testing.T) {
predicates.SetPredicatesOrdering(order)
tests := []struct {
name string
predicates map[string]algorithm.FitPredicate
Expand Down
6 changes: 3 additions & 3 deletions plugin/pkg/scheduler/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ func (c *configFactory) invalidatePredicatesForPv(pv *v1.PersistentVolume) {

if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
// Add/delete impacts the available PVs to choose from
invalidPredicates.Insert(predicates.CheckVolumeBinding)
invalidPredicates.Insert(predicates.CheckVolumeBindingPred)
}

c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(invalidPredicates)
Expand Down Expand Up @@ -480,7 +480,7 @@ func (c *configFactory) invalidatePredicatesForPvc(pvc *v1.PersistentVolumeClaim

if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
// Add/delete impacts the available PVs to choose from
invalidPredicates.Insert(predicates.CheckVolumeBinding)
invalidPredicates.Insert(predicates.CheckVolumeBindingPred)
}
c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(invalidPredicates)
}
Expand All @@ -491,7 +491,7 @@ func (c *configFactory) invalidatePredicatesForPvcUpdate(old, new *v1.Persistent
if old.Spec.VolumeName != new.Spec.VolumeName {
if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
// PVC volume binding has changed
invalidPredicates.Insert(predicates.CheckVolumeBinding)
invalidPredicates.Insert(predicates.CheckVolumeBindingPred)
}
// The bound volume type may change
invalidPredicates.Insert(maxPDVolumeCountPredicateKeys...)
Expand Down
2 changes: 1 addition & 1 deletion plugin/pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ func (sched *Scheduler) assumeAndBindVolumes(assumed *v1.Pod, host string) error
err = fmt.Errorf("Volume binding started, waiting for completion")
if bindingRequired {
if sched.config.Ecache != nil {
invalidPredicates := sets.NewString(predicates.CheckVolumeBinding)
invalidPredicates := sets.NewString(predicates.CheckVolumeBindingPred)
sched.config.Ecache.InvalidateCachedPredicateItemOfAllNodes(invalidPredicates)
}

Expand Down
4 changes: 3 additions & 1 deletion plugin/pkg/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,7 @@ func setupTestSchedulerWithVolumeBinding(fakeVolumeBinder *volumebinder.VolumeBi
scache.AddNode(&testNode)

predicateMap := map[string]algorithm.FitPredicate{
"VolumeBindingChecker": predicates.NewVolumeBindingPredicate(fakeVolumeBinder),
predicates.CheckVolumeBindingPred: predicates.NewVolumeBindingPredicate(fakeVolumeBinder),
}

recorder := broadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: "scheduler"})
Expand All @@ -637,6 +637,8 @@ func makePredicateError(failReason string) error {
}

func TestSchedulerWithVolumeBinding(t *testing.T) {
order := []string{predicates.CheckVolumeBindingPred, predicates.GeneralPred}
predicates.SetPredicatesOrdering(order)
findErr := fmt.Errorf("find err")
assumeErr := fmt.Errorf("assume err")
bindErr := fmt.Errorf("bind err")
Expand Down

0 comments on commit 51fbd6e

Please sign in to comment.