Skip to content

Commit

Permalink
-Add scheduler optimization options, short circuit all predicates if …
Browse files Browse the repository at this point in the history
…one predicate fails
  • Loading branch information
wgliang committed Jan 13, 2018
1 parent 52a2256 commit b8526cd
Show file tree
Hide file tree
Showing 9 changed files with 98 additions and 48 deletions.
3 changes: 2 additions & 1 deletion examples/scheduler-policy-config-with-extender.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,6 @@
"nodeCacheCapable": false
}
],
"hardPodAffinitySymmetricWeight" : 10
"hardPodAffinitySymmetricWeight" : 10,
"alwaysCheckAllPredicates" : false
}
3 changes: 2 additions & 1 deletion examples/scheduler-policy-config.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,6 @@
{"name" : "ServiceSpreadingPriority", "weight" : 1},
{"name" : "EqualPriority", "weight" : 1}
],
"hardPodAffinitySymmetricWeight" : 10
"hardPodAffinitySymmetricWeight" : 10,
"alwaysCheckAllPredicates" : false
}
6 changes: 6 additions & 0 deletions pkg/scheduler/api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ type Policy struct {
// corresponding to every RequiredDuringScheduling affinity rule.
// HardPodAffinitySymmetricWeight represents the weight of implicit PreferredDuringScheduling affinity rule, in the range 1-100.
HardPodAffinitySymmetricWeight int32

// When AlwaysCheckAllPredicates is set to true, scheduler checks all
// the configured predicates even after one or more of them fails.
// When the flag is set to false, scheduler skips checking the rest
// of the predicates after it finds one predicate that failed.
AlwaysCheckAllPredicates bool
}

type PredicatePolicy struct {
Expand Down
6 changes: 6 additions & 0 deletions pkg/scheduler/api/v1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ type Policy struct {
// corresponding to every RequiredDuringScheduling affinity rule.
// HardPodAffinitySymmetricWeight represents the weight of implicit PreferredDuringScheduling affinity rule, in the range 1-100.
HardPodAffinitySymmetricWeight int `json:"hardPodAffinitySymmetricWeight"`

// When AlwaysCheckAllPredicates is set to true, scheduler checks all
// the configured predicates even after one or more of them fails.
// When the flag is set to false, scheduler skips checking the rest
// of the predicates after it finds one predicate that failed.
AlwaysCheckAllPredicates bool `json:"alwaysCheckAllPredicates"`
}

type PredicatePolicy struct {
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/core/extender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
}
queue := NewSchedulingQueue()
scheduler := NewGenericScheduler(
cache, nil, queue, test.predicates, algorithm.EmptyPredicateMetadataProducer, test.prioritizers, algorithm.EmptyMetadataProducer, extenders, nil, schedulertesting.FakePersistentVolumeClaimLister{})
cache, nil, queue, test.predicates, algorithm.EmptyPredicateMetadataProducer, test.prioritizers, algorithm.EmptyMetadataProducer, extenders, nil, schedulertesting.FakePersistentVolumeClaimLister{}, false)
podIgnored := &v1.Pod{}
machine, err := scheduler.Schedule(podIgnored, schedulertesting.FakeNodeLister(makeNodeList(test.nodes)))
if test.expectsErr {
Expand Down
64 changes: 36 additions & 28 deletions pkg/scheduler/core/generic_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,16 +90,17 @@ func (f *FitError) Error() string {
}

type genericScheduler struct {
cache schedulercache.Cache
equivalenceCache *EquivalenceCache
schedulingQueue SchedulingQueue
predicates map[string]algorithm.FitPredicate
priorityMetaProducer algorithm.MetadataProducer
predicateMetaProducer algorithm.PredicateMetadataProducer
prioritizers []algorithm.PriorityConfig
extenders []algorithm.SchedulerExtender
lastNodeIndexLock sync.Mutex
lastNodeIndex uint64
cache schedulercache.Cache
equivalenceCache *EquivalenceCache
schedulingQueue SchedulingQueue
predicates map[string]algorithm.FitPredicate
priorityMetaProducer algorithm.MetadataProducer
predicateMetaProducer algorithm.PredicateMetadataProducer
prioritizers []algorithm.PriorityConfig
extenders []algorithm.SchedulerExtender
lastNodeIndexLock sync.Mutex
lastNodeIndex uint64
alwaysCheckAllPredicates bool

cachedNodeInfoMap map[string]*schedulercache.NodeInfo
volumeBinder *volumebinder.VolumeBinder
Expand Down Expand Up @@ -133,7 +134,7 @@ func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister

trace.Step("Computing predicates")
startPredicateEvalTime := time.Now()
filteredNodes, failedPredicateMap, err := findNodesThatFit(pod, g.cachedNodeInfoMap, nodes, g.predicates, g.extenders, g.predicateMetaProducer, g.equivalenceCache, g.schedulingQueue)
filteredNodes, failedPredicateMap, err := findNodesThatFit(pod, g.cachedNodeInfoMap, nodes, g.predicates, g.extenders, g.predicateMetaProducer, g.equivalenceCache, g.schedulingQueue, g.alwaysCheckAllPredicates)
if err != nil {
return "", err
}
Expand Down Expand Up @@ -295,6 +296,7 @@ func findNodesThatFit(
metadataProducer algorithm.PredicateMetadataProducer,
ecache *EquivalenceCache,
schedulingQueue SchedulingQueue,
alwaysCheckAllPredicates bool,
) ([]*v1.Node, FailedPredicateMap, error) {
var filtered []*v1.Node
failedPredicateMap := FailedPredicateMap{}
Expand All @@ -313,7 +315,7 @@ func findNodesThatFit(
meta := metadataProducer(pod, nodeNameToInfo)
checkNode := func(i int) {
nodeName := nodes[i].Name
fits, failedPredicates, err := podFitsOnNode(pod, meta, nodeNameToInfo[nodeName], predicateFuncs, ecache, schedulingQueue)
fits, failedPredicates, err := podFitsOnNode(pod, meta, nodeNameToInfo[nodeName], predicateFuncs, ecache, schedulingQueue, alwaysCheckAllPredicates)
if err != nil {
predicateResultLock.Lock()
errs[err.Error()]++
Expand Down Expand Up @@ -402,6 +404,7 @@ func podFitsOnNode(
predicateFuncs map[string]algorithm.FitPredicate,
ecache *EquivalenceCache,
queue SchedulingQueue,
alwaysCheckAllPredicates bool,
) (bool, []algorithm.PredicateFailureReason, error) {
var (
equivalenceHash uint64
Expand Down Expand Up @@ -457,8 +460,6 @@ func podFitsOnNode(
fit, reasons, invalid = ecache.PredicateWithECache(pod.GetName(), info.Node().GetName(), predicateKey, equivalenceHash)
}

// TODO(bsalamat): When one predicate fails and fit is false, why do we continue
// checking other predicates?
if !eCacheAvailable || invalid {
// we need to execute predicate functions since equivalence cache does not work
fit, reasons, err = predicate(pod, metaToUse, nodeInfoToUse)
Expand All @@ -479,6 +480,11 @@ func podFitsOnNode(
if !fit {
// eCache is available and valid, and predicates result is unfit, record the fail reasons
failedPredicates = append(failedPredicates, reasons...)
// if alwaysCheckAllPredicates is false, short circuit all predicates when one predicate fails.
if !alwaysCheckAllPredicates {
glog.V(5).Infoln("since alwaysCheckAllPredicates has not been set, the predicate evaluation is short circuited and there are chances of other predicates failing as well.")
break
}
}
}
}
Expand Down Expand Up @@ -917,7 +923,7 @@ func selectVictimsOnNode(
// that we should check is if the "pod" is failing to schedule due to pod affinity
// failure.
// TODO(bsalamat): Consider checking affinity to lower priority pods if feasible with reasonable performance.
if fits, _, err := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil, queue); !fits {
if fits, _, err := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil, queue, false); !fits {
if err != nil {
glog.Warningf("Encountered error while selecting victims on node %v: %v", nodeInfo.Node().Name, err)
}
Expand All @@ -931,7 +937,7 @@ func selectVictimsOnNode(
violatingVictims, nonViolatingVictims := filterPodsWithPDBViolation(potentialVictims.Items, pdbs)
reprievePod := func(p *v1.Pod) bool {
addPod(p)
fits, _, _ := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil, queue)
fits, _, _ := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil, queue, false)
if !fits {
removePod(p)
victims = append(victims, p)
Expand Down Expand Up @@ -1045,18 +1051,20 @@ func NewGenericScheduler(
priorityMetaProducer algorithm.MetadataProducer,
extenders []algorithm.SchedulerExtender,
volumeBinder *volumebinder.VolumeBinder,
pvcLister corelisters.PersistentVolumeClaimLister) algorithm.ScheduleAlgorithm {
pvcLister corelisters.PersistentVolumeClaimLister,
alwaysCheckAllPredicates bool) algorithm.ScheduleAlgorithm {
return &genericScheduler{
cache: cache,
equivalenceCache: eCache,
schedulingQueue: podQueue,
predicates: predicates,
predicateMetaProducer: predicateMetaProducer,
prioritizers: prioritizers,
priorityMetaProducer: priorityMetaProducer,
extenders: extenders,
cachedNodeInfoMap: make(map[string]*schedulercache.NodeInfo),
volumeBinder: volumeBinder,
pvcLister: pvcLister,
cache: cache,
equivalenceCache: eCache,
schedulingQueue: podQueue,
predicates: predicates,
predicateMetaProducer: predicateMetaProducer,
prioritizers: prioritizers,
priorityMetaProducer: priorityMetaProducer,
extenders: extenders,
cachedNodeInfoMap: make(map[string]*schedulercache.NodeInfo),
volumeBinder: volumeBinder,
pvcLister: pvcLister,
alwaysCheckAllPredicates: alwaysCheckAllPredicates,
}
}
45 changes: 31 additions & 14 deletions pkg/scheduler/core/generic_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,16 +187,17 @@ func TestSelectHost(t *testing.T) {
func TestGenericScheduler(t *testing.T) {
predicates.SetPredicatesOrdering(order)
tests := []struct {
name string
predicates map[string]algorithm.FitPredicate
prioritizers []algorithm.PriorityConfig
nodes []string
pvcs []*v1.PersistentVolumeClaim
pod *v1.Pod
pods []*v1.Pod
expectedHosts sets.String
expectsErr bool
wErr error
name string
predicates map[string]algorithm.FitPredicate
prioritizers []algorithm.PriorityConfig
alwaysCheckAllPredicates bool
nodes []string
pvcs []*v1.PersistentVolumeClaim
pod *v1.Pod
pods []*v1.Pod
expectedHosts sets.String
expectsErr bool
wErr error
}{
{
predicates: map[string]algorithm.FitPredicate{"false": falsePredicate},
Expand Down Expand Up @@ -377,6 +378,22 @@ func TestGenericScheduler(t *testing.T) {
expectsErr: true,
wErr: fmt.Errorf("persistentvolumeclaim \"existingPVC\" is being deleted"),
},
{
// alwaysCheckAllPredicates is true
predicates: map[string]algorithm.FitPredicate{"true": truePredicate, "matches": matchesPredicate, "false": falsePredicate},
prioritizers: []algorithm.PriorityConfig{{Map: EqualPriorityMap, Weight: 1}},
alwaysCheckAllPredicates: true,
nodes: []string{"1"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "2"}},
name: "test alwaysCheckAllPredicates is true",
wErr: &FitError{
Pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "2"}},
NumAllNodes: 1,
FailedPredicates: FailedPredicateMap{
"1": []algorithm.PredicateFailureReason{algorithmpredicates.ErrFakePredicate, algorithmpredicates.ErrFakePredicate},
},
},
},
}
for _, test := range tests {
cache := schedulercache.New(time.Duration(0), wait.NeverStop)
Expand All @@ -393,7 +410,7 @@ func TestGenericScheduler(t *testing.T) {
pvcLister := schedulertesting.FakePersistentVolumeClaimLister(pvcs)

scheduler := NewGenericScheduler(
cache, nil, NewSchedulingQueue(), test.predicates, algorithm.EmptyPredicateMetadataProducer, test.prioritizers, algorithm.EmptyMetadataProducer, []algorithm.SchedulerExtender{}, nil, pvcLister)
cache, nil, NewSchedulingQueue(), test.predicates, algorithm.EmptyPredicateMetadataProducer, test.prioritizers, algorithm.EmptyMetadataProducer, []algorithm.SchedulerExtender{}, nil, pvcLister, test.alwaysCheckAllPredicates)
machine, err := scheduler.Schedule(test.pod, schedulertesting.FakeNodeLister(makeNodeList(test.nodes)))

if !reflect.DeepEqual(err, test.wErr) {
Expand All @@ -414,7 +431,7 @@ func TestFindFitAllError(t *testing.T) {
"2": schedulercache.NewNodeInfo(),
"1": schedulercache.NewNodeInfo(),
}
_, predicateMap, err := findNodesThatFit(&v1.Pod{}, nodeNameToInfo, makeNodeList(nodes), predicates, nil, algorithm.EmptyPredicateMetadataProducer, nil, nil)
_, predicateMap, err := findNodesThatFit(&v1.Pod{}, nodeNameToInfo, makeNodeList(nodes), predicates, nil, algorithm.EmptyPredicateMetadataProducer, nil, nil, false)

if err != nil {
t.Errorf("unexpected error: %v", err)
Expand Down Expand Up @@ -449,7 +466,7 @@ func TestFindFitSomeError(t *testing.T) {
nodeNameToInfo[name].SetNode(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: name}})
}

_, predicateMap, err := findNodesThatFit(pod, nodeNameToInfo, makeNodeList(nodes), predicates, nil, algorithm.EmptyPredicateMetadataProducer, nil, nil)
_, predicateMap, err := findNodesThatFit(pod, nodeNameToInfo, makeNodeList(nodes), predicates, nil, algorithm.EmptyPredicateMetadataProducer, nil, nil, false)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
Expand Down Expand Up @@ -1276,7 +1293,7 @@ func TestPreempt(t *testing.T) {
extenders = append(extenders, extender)
}
scheduler := NewGenericScheduler(
cache, nil, NewSchedulingQueue(), map[string]algorithm.FitPredicate{"matches": algorithmpredicates.PodFitsResources}, algorithm.EmptyPredicateMetadataProducer, []algorithm.PriorityConfig{{Function: numericPriority, Weight: 1}}, algorithm.EmptyMetadataProducer, extenders, nil, schedulertesting.FakePersistentVolumeClaimLister{})
cache, nil, NewSchedulingQueue(), map[string]algorithm.FitPredicate{"matches": algorithmpredicates.PodFitsResources}, algorithm.EmptyPredicateMetadataProducer, []algorithm.PriorityConfig{{Function: numericPriority, Weight: 1}}, algorithm.EmptyMetadataProducer, extenders, nil, schedulertesting.FakePersistentVolumeClaimLister{}, false)
// Call Preempt and check the expected results.
node, victims, _, err := scheduler.Preempt(test.pod, schedulertesting.FakeNodeLister(makeNodeList(nodeNames)), error(&FitError{Pod: test.pod, FailedPredicates: failedPredMap}))
if err != nil {
Expand Down
11 changes: 10 additions & 1 deletion pkg/scheduler/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ type configFactory struct {

// Handles volume binding decisions
volumeBinder *volumebinder.VolumeBinder

// always check all predicates even if the middle of one predicate fails.
alwaysCheckAllPredicates bool
}

// NewConfigFactory initializes the default implementation of a Configurator To encourage eventual privatization of the struct type, we only
Expand Down Expand Up @@ -880,6 +883,12 @@ func (f *configFactory) CreateFromConfig(policy schedulerapi.Policy) (*scheduler
if policy.HardPodAffinitySymmetricWeight != 0 {
f.hardPodAffinitySymmetricWeight = policy.HardPodAffinitySymmetricWeight
}
// When AlwaysCheckAllPredicates is set to true, scheduler checks all the configured
// predicates even after one or more of them fails.
if policy.AlwaysCheckAllPredicates {
f.alwaysCheckAllPredicates = policy.AlwaysCheckAllPredicates
}

return f.CreateFromKeys(predicateKeys, priorityKeys, extenders)
}

Expand Down Expand Up @@ -933,7 +942,7 @@ func (f *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
glog.Info("Created equivalence class cache")
}

algo := core.NewGenericScheduler(f.schedulerCache, f.equivalencePodCache, f.podQueue, predicateFuncs, predicateMetaProducer, priorityConfigs, priorityMetaProducer, extenders, f.volumeBinder, f.pVCLister)
algo := core.NewGenericScheduler(f.schedulerCache, f.equivalencePodCache, f.podQueue, predicateFuncs, predicateMetaProducer, priorityConfigs, priorityMetaProducer, extenders, f.volumeBinder, f.pVCLister, f.alwaysCheckAllPredicates)

podBackoff := util.CreateDefaultPodBackoff()
return &scheduler.Config{
Expand Down
6 changes: 4 additions & 2 deletions pkg/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,8 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache schedulercache.
algorithm.EmptyMetadataProducer,
[]algorithm.SchedulerExtender{},
nil,
schedulertesting.FakePersistentVolumeClaimLister{})
schedulertesting.FakePersistentVolumeClaimLister{},
false)
bindingChan := make(chan *v1.Binding, 1)
errChan := make(chan error, 1)
configurator := &FakeConfigurator{
Expand Down Expand Up @@ -577,7 +578,8 @@ func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, sc
algorithm.EmptyMetadataProducer,
[]algorithm.SchedulerExtender{},
nil,
schedulertesting.FakePersistentVolumeClaimLister{})
schedulertesting.FakePersistentVolumeClaimLister{},
false)
bindingChan := make(chan *v1.Binding, 2)
configurator := &FakeConfigurator{
Config: &Config{
Expand Down

0 comments on commit b8526cd

Please sign in to comment.