Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 30 additions & 8 deletions scheduler/reconciler/reconcile_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,14 @@ func NewNodeReconciler(deployment *structs.Deployment) *NodeReconciler {
}
}

// Compute is like diffSystemAllocsForNode however, the allocations in the
// diffResult contain the specific nodeID they should be allocated on.
// Compute is like computeCanaryNodes however, the allocations in the
// NodeReconcileResult contain the specific nodeID they should be allocated on.
func (nr *NodeReconciler) Compute(
job *structs.Job, // jobs whose allocations are going to be diff-ed
readyNodes []*structs.Node, // list of nodes in the ready state
notReadyNodes map[string]struct{}, // list of nodes in DC but not ready, e.g. draining
taintedNodes map[string]*structs.Node, // nodes which are down or drain mode (by node id)
infeasibleNodes map[string][]string, // maps task groups to node IDs that are not feasible for them
live []*structs.Allocation, // non-terminal allocations
terminal structs.TerminalByNodeByName, // latest terminal allocations (by node id)
serverSupportsDisconnectedClients bool, // flag indicating whether to apply disconnected client logic
Expand All @@ -64,7 +65,7 @@ func (nr *NodeReconciler) Compute(
// Canary deployments deploy to the TaskGroup.UpdateStrategy.Canary
// percentage of eligible nodes, so we create a mapping of task group name
// to a list of nodes that canaries should be placed on.
canaryNodes, canariesPerTG := nr.computeCanaryNodes(required, nodeAllocs, terminal, eligibleNodes)
canaryNodes, canariesPerTG := nr.computeCanaryNodes(required, nodeAllocs, terminal, eligibleNodes, infeasibleNodes)

compatHadExistingDeployment := nr.DeploymentCurrent != nil

Expand Down Expand Up @@ -102,7 +103,7 @@ func (nr *NodeReconciler) Compute(
// many total canaries are to be placed for a TG.
func (nr *NodeReconciler) computeCanaryNodes(required map[string]*structs.TaskGroup,
liveAllocs map[string][]*structs.Allocation, terminalAllocs structs.TerminalByNodeByName,
eligibleNodes map[string]*structs.Node) (map[string]map[string]bool, map[string]int) {
eligibleNodes map[string]*structs.Node, infeasibleNodes map[string][]string) (map[string]map[string]bool, map[string]int) {

canaryNodes := map[string]map[string]bool{}
eligibleNodesList := slices.Collect(maps.Values(eligibleNodes))
Expand All @@ -114,7 +115,17 @@ func (nr *NodeReconciler) computeCanaryNodes(required map[string]*structs.TaskGr
}

// round up to the nearest integer
numberOfCanaryNodes := int(math.Ceil(float64(tg.Update.Canary) * float64(len(eligibleNodes)) / 100))
numberOfCanaryNodes := int(math.Ceil(float64(tg.Update.Canary)*float64(len(eligibleNodes))/100)) - len(infeasibleNodes[tg.Name])

// check if there's a current deployment present. It could be that the
// desired amount of canaries has to be reduced due to infeasible nodes.
// if nr.DeploymentCurrent != nil {
// if dstate, ok := nr.DeploymentCurrent.TaskGroups[tg.Name]; ok {
// numberOfCanaryNodes = dstate.DesiredCanaries
// fmt.Printf("existing deploy, setting number of canary nodes to %v\n", dstate.DesiredCanaries)
// }
// }

canariesPerTG[tg.Name] = numberOfCanaryNodes

// check if there are any live allocations on any nodes that are/were
Expand All @@ -135,6 +146,10 @@ func (nr *NodeReconciler) computeCanaryNodes(required map[string]*structs.TaskGr
}

for i, n := range eligibleNodesList {
// infeasible nodes can never become canary candidates
if slices.Contains(infeasibleNodes[tg.Name], n.ID) {
continue
}
if i > numberOfCanaryNodes-1 {
break
}
Expand Down Expand Up @@ -441,10 +456,10 @@ func (nr *NodeReconciler) computeForNode(
dstate.ProgressDeadline = tg.Update.ProgressDeadline
}
dstate.DesiredTotal = len(eligibleNodes)
}

if isCanarying[tg.Name] && !dstate.Promoted {
dstate.DesiredCanaries = canariesPerTG[tg.Name]
if isCanarying[tg.Name] && !dstate.Promoted {
dstate.DesiredCanaries = canariesPerTG[tg.Name]
}
}

// Check for an existing allocation
Expand Down Expand Up @@ -587,14 +602,21 @@ func (nr *NodeReconciler) createDeployment(job *structs.Job, tg *structs.TaskGro
}

func (nr *NodeReconciler) isDeploymentComplete(groupName string, buckets *NodeReconcileResult, isCanarying bool) bool {
fmt.Printf("\n===========\n")
fmt.Println("isDeploymentComplete call")
complete := len(buckets.Place)+len(buckets.Migrate)+len(buckets.Update) == 0

fmt.Printf("\nis complete? %v buckets.Place: %v buckets.Update: %v\n", complete, len(buckets.Place), len(buckets.Update))
fmt.Printf("\nnr.deploymentCurrent == nil? %v isCanarying?: %v\n", nr.DeploymentCurrent == nil, isCanarying)
fmt.Println("===========")

if !complete || nr.DeploymentCurrent == nil || isCanarying {
return false
}

// ensure everything is healthy
if dstate, ok := nr.DeploymentCurrent.TaskGroups[groupName]; ok {
fmt.Printf("\nhealthy allocs %v desiredtotal: %v desired canaries: %v\n", dstate.HealthyAllocs, dstate.DesiredTotal, dstate.DesiredCanaries)
if dstate.HealthyAllocs < dstate.DesiredTotal { // Make sure we have enough healthy allocs
complete = false
}
Expand Down
39 changes: 27 additions & 12 deletions scheduler/scheduler_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,10 @@ type SystemScheduler struct {
ctx *feasible.EvalContext
stack *feasible.SystemStack

nodes []*structs.Node
notReadyNodes map[string]struct{}
nodesByDC map[string]int
nodes []*structs.Node
notReadyNodes map[string]struct{}
nodesByDC map[string]int
infeasibleNodes map[string][]string // maps task group names to node IDs that aren't feasible for these TGs

deployment *structs.Deployment

Expand Down Expand Up @@ -278,7 +279,7 @@ func (s *SystemScheduler) computeJobAllocs() error {

// Diff the required and existing allocations
nr := reconciler.NewNodeReconciler(s.deployment)
r := nr.Compute(s.job, s.nodes, s.notReadyNodes, tainted, live, term,
r := nr.Compute(s.job, s.nodes, s.notReadyNodes, tainted, s.infeasibleNodes, live, term,
s.planner.ServersMeetMinimumVersion(minVersionMaxClientDisconnect, true))
if s.logger.IsDebug() {
s.logger.Debug("reconciled current state with desired state", r.Fields()...)
Expand Down Expand Up @@ -447,9 +448,22 @@ func (s *SystemScheduler) computePlacements(place []reconciler.AllocTuple, exist
s.planAnnotations.DesiredTGUpdates[tgName].Place -= 1
}

if s.plan.Deployment != nil {
s.deployment.TaskGroups[tgName].DesiredTotal -= 1
// Store this node's ID as infeasible, so that when we need to make
// another deployment, we know to avoid it.
if s.infeasibleNodes == nil {
s.infeasibleNodes = make(map[string][]string)
}
if s.infeasibleNodes[tgName] == nil {
s.infeasibleNodes[tgName] = make([]string, 1)
}
s.infeasibleNodes[tgName] = append(s.infeasibleNodes[tgName], node.ID)

// if s.plan.Deployment != nil {
// s.deployment.TaskGroups[tgName].DesiredTotal -= 1
// if s.deployment.TaskGroups[tgName].DesiredCanaries != 0 {
// s.deployment.TaskGroups[tgName].DesiredCanaries -= 1
// }
// }

// Filtered nodes are not reported to users, just omitted from the job status
continue
Expand Down Expand Up @@ -613,13 +627,14 @@ func (s *SystemScheduler) canHandle(trigger string) bool {
}

// evictAndPlace is used to mark allocations for evicts and add them to the
// placement queue. evictAndPlace modifies the diffResult. It returns true if
// the limit has been reached for any task group.
func evictAndPlace(ctx feasible.Context, job *structs.Job, diff *reconciler.NodeReconcileResult, desc string) bool {
// placement queue. evictAndPlace modifies the NodeReconcilerResult. It returns
// true if the limit has been reached for any task group.
func evictAndPlace(ctx feasible.Context, job *structs.Job,
result *reconciler.NodeReconcileResult, desc string) bool {

limits := map[string]int{} // per task group limits
if !job.Stopped() {
jobLimit := len(diff.Update)
jobLimit := len(result.Update)
if job.Update.MaxParallel > 0 {
jobLimit = job.Update.MaxParallel
}
Expand All @@ -633,10 +648,10 @@ func evictAndPlace(ctx feasible.Context, job *structs.Job, diff *reconciler.Node
}

limited := false
for _, a := range diff.Update {
for _, a := range result.Update {
if limit := limits[a.Alloc.TaskGroup]; limit > 0 {
ctx.Plan().AppendStoppedAlloc(a.Alloc, desc, "", "")
diff.Place = append(diff.Place, a)
result.Place = append(result.Place, a)
if !a.Canary {
limits[a.Alloc.TaskGroup]--
}
Expand Down
Loading