Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/26768.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
scheduler: fixed a bug where the bandwidth of reserved cores were not taken into account
```
32 changes: 19 additions & 13 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1816,8 +1816,19 @@ func (c *Client) updateNodeFromFingerprint(response *fingerprint.FingerprintResp
}

// update config with total cpu compute if it was detected
if cpu := response.NodeResources.Processors.TotalCompute(); cpu > 0 {
newConfig.CpuCompute = cpu
totalCompute := response.NodeResources.Processors.TotalCompute()
usableCompute := response.NodeResources.Processors.UsableCompute()
if totalCompute > 0 {
if newConfig.CpuCompute != totalCompute {
newConfig.CpuCompute = totalCompute
nodeHasChanged = true
}

reservedCompute := int64(totalCompute - usableCompute)
if newConfig.Node.ReservedResources.Cpu.CpuShares != reservedCompute {
newConfig.Node.ReservedResources.Cpu.CpuShares = reservedCompute
nodeHasChanged = true
}
}
}

Expand Down Expand Up @@ -3323,8 +3334,8 @@ func (c *Client) setGaugeForDiskStats(hStats *hoststats.HostStats, baseLabels []
// setGaugeForAllocationStats proxies metrics for allocation specific statistics
func (c *Client) setGaugeForAllocationStats(baseLabels []metrics.Label) {
node := c.GetConfig().Node
total := node.NodeResources
res := node.ReservedResources

available := node.Comparable()
allocated := c.getAllocatedResources(node)

// Emit allocated
Expand All @@ -3342,20 +3353,15 @@ func (c *Client) setGaugeForAllocationStats(baseLabels []metrics.Label) {
}

// Emit unallocated
unallocatedMem := total.Memory.MemoryMB - res.Memory.MemoryMB - allocated.Flattened.Memory.MemoryMB
unallocatedDisk := total.Disk.DiskMB - res.Disk.DiskMB - allocated.Shared.DiskMB

// The UsableCompute function call already subtracts and accounts for any
// reserved CPU within the client configuration. Therefore, we do not need
// to subtract that here.
unallocatedCpu := int64(total.Processors.Topology.UsableCompute()) - allocated.Flattened.Cpu.CpuShares
unallocatedMem := available.Flattened.Memory.MemoryMB - allocated.Flattened.Memory.MemoryMB
unallocatedDisk := available.Shared.DiskMB - allocated.Shared.DiskMB
unallocatedCpu := available.Flattened.Cpu.CpuShares - allocated.Flattened.Cpu.CpuShares

metrics.SetGaugeWithLabels([]string{"client", "unallocated", "memory"}, float32(unallocatedMem), baseLabels)
metrics.SetGaugeWithLabels([]string{"client", "unallocated", "disk"}, float32(unallocatedDisk), baseLabels)
metrics.SetGaugeWithLabels([]string{"client", "unallocated", "cpu"}, float32(unallocatedCpu), baseLabels)

totalComparable := total.Comparable()
for _, n := range totalComparable.Flattened.Networks {
for _, n := range available.Flattened.Networks {
// Determined the used resources
var usedMbits int
totalIdx := allocated.Flattened.Networks.NetIndex(n)
Expand Down
50 changes: 50 additions & 0 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1688,6 +1688,56 @@ func TestClient_UpdateNodeFromDevicesAccumulates(t *testing.T) {

}

func TestClient_UpdateNodeFromFingerprintCalculatesReservedResources(t *testing.T) {
ci.Parallel(t)

client, cleanup := TestClient(t, func(c *config.Config) {
// Parsed version of the client reserved field
c.Node.ReservedResources.Cpu = structs.NodeReservedCpuResources{
CpuShares: 100,
ReservedCpuCores: []uint16{0},
}
})
defer cleanup()

// Set up a basic topology where the first core is reserved and
// an additionally 100 MHz are reserved
basicTopology := structs.MockBasicTopology()
basicTopology.Cores[0].Disable = true
basicTopology.OverrideWitholdCompute = 100
client.updateNodeFromFingerprint(&fingerprint.FingerprintResponse{
// overrides the detected hardware in TestClient
NodeResources: &structs.NodeResources{
Memory: structs.NodeMemoryResources{MemoryMB: 1024},
Processors: structs.NodeProcessorResources{
Topology: basicTopology,
},
},
})

// initial check
conf := client.GetConfig()

expectedReservedResources := &structs.NodeReservedResources{
Cpu: structs.NodeReservedCpuResources{
// set by fingerprinting callback, once the total compute has been detected, it should have converted the
// client reserved configuration into the effective reserved bandwidth (1 core * 3500 MHz + 100 MHz)
CpuShares: 3_600,
ReservedCpuCores: []uint16{0},
},
Memory: structs.NodeReservedMemoryResources{
MemoryMB: 256,
},
Disk: structs.NodeReservedDiskResources{
DiskMB: 4096,
},
Networks: structs.NodeReservedNetworkResources{
ReservedHostPorts: "22",
},
}
must.Eq(t, expectedReservedResources, conf.Node.ReservedResources)
}

// TestClient_UpdateNodeFromFingerprintKeepsConfig asserts manually configured
// network interfaces take precedence over fingerprinted ones.
func TestClient_UpdateNodeFromFingerprintKeepsConfig(t *testing.T) {
Expand Down
12 changes: 11 additions & 1 deletion client/lib/numalib/topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,17 @@ func (st *Topology) NumECores() int {
return total
}

// UsableCores returns the number of logical cores usable by the Nomad client
// AllCores returns the set of logical cores detected. The UsableCores will
// be equal to or less than this value.
func (st *Topology) AllCores() *idset.Set[hw.CoreID] {
result := idset.Empty[hw.CoreID]()
for _, cpu := range st.Cores {
result.Insert(cpu.ID)
}
return result
}

// UsableCores returns the set of logical cores usable by the Nomad client
// for running tasks. Nomad must subtract off any reserved cores (reserved.cores)
// and/or must mask the cpuset to the one set in config (config.reservable_cores).
func (st *Topology) UsableCores() *idset.Set[hw.CoreID] {
Expand Down
20 changes: 5 additions & 15 deletions nomad/structs/funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,10 +189,8 @@ func AllocsFit(node *Node, allocs []*Allocation, netIdx *NetworkIndex, checkDevi
return false, "cores", used, nil
}

// Check that the node resources (after subtracting reserved) are a
// super set of those that are being allocated
available := node.NodeResources.Comparable()
available.Subtract(node.ReservedResources.Comparable())
// Check that the node resources are a super set of those that are being allocated
available := node.Comparable()
if superset, dimension := available.Superset(used); !superset {
return false, dimension, used, nil
}
Expand Down Expand Up @@ -232,20 +230,12 @@ func AllocsFit(node *Node, allocs []*Allocation, netIdx *NetworkIndex, checkDevi
}

func computeFreePercentage(node *Node, util *ComparableResources) (freePctCpu, freePctRam float64) {
reserved := node.ReservedResources.Comparable()
res := node.NodeResources.Comparable()

// Determine the node availability
nodeCpu := float64(res.Flattened.Cpu.CpuShares)
nodeMem := float64(res.Flattened.Memory.MemoryMB)
if reserved != nil {
nodeCpu -= float64(reserved.Flattened.Cpu.CpuShares)
nodeMem -= float64(reserved.Flattened.Memory.MemoryMB)
}
available := node.Comparable()

// Compute the free percentage
freePctCpu = 1 - (float64(util.Flattened.Cpu.CpuShares) / nodeCpu)
freePctRam = 1 - (float64(util.Flattened.Memory.MemoryMB) / nodeMem)
freePctCpu = 1 - (float64(util.Flattened.Cpu.CpuShares) / float64(available.Flattened.Cpu.CpuShares))
freePctRam = 1 - (float64(util.Flattened.Memory.MemoryMB) / float64(available.Flattened.Memory.MemoryMB))
return freePctCpu, freePctRam
}

Expand Down
40 changes: 36 additions & 4 deletions nomad/structs/funcs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,16 @@ func node2k() *Node {
ID: 1,
Grade: numalib.Performance,
BaseSpeed: 1000,
}, {
ID: 2,
Grade: numalib.Performance,
BaseSpeed: 1000,
Disable: true,
}, {
ID: 3,
Grade: numalib.Performance,
BaseSpeed: 1000,
Disable: true,
}},
OverrideWitholdCompute: 1000, // set by client reserved field
},
Expand Down Expand Up @@ -137,7 +147,10 @@ func node2k() *Node {
},
ReservedResources: &NodeReservedResources{
Cpu: NodeReservedCpuResources{
CpuShares: 1000,
// set by fingerprinting callback, topology of 1000 MHz * 4 cores (4000 MHz), of which 2 cores are reserved
// plus 1000 MHz of reserved amount of CPU, effectively a total of 3000 MHz of reserved CPU
CpuShares: 3000,
ReservedCpuCores: []uint16{2, 3},
},
Memory: NodeReservedMemoryResources{
MemoryMB: 1024,
Expand Down Expand Up @@ -201,9 +214,10 @@ func TestAllocsFit(t *testing.T) {
must.Eq(t, 1024, used.Flattened.Memory.MemoryMB)

// Should not fit second allocation
fit, _, used, err = AllocsFit(n, []*Allocation{a1, a1}, nil, false)
fit, dim, used, err = AllocsFit(n, []*Allocation{a1, a1}, nil, false)
must.NoError(t, err)
must.False(t, fit)
must.Eq(t, "cpu", dim)
must.Eq(t, 2000, used.Flattened.Cpu.CpuShares)
must.Eq(t, 2048, used.Flattened.Memory.MemoryMB)

Expand Down Expand Up @@ -649,8 +663,23 @@ func TestScoreFitBinPack(t *testing.T) {
Cores: []numalib.Core{{
ID: 0,
Grade: numalib.Performance,
BaseSpeed: 4096,
BaseSpeed: 2048,
}, {
ID: 1,
Grade: numalib.Performance,
BaseSpeed: 2048,
}, {
ID: 2,
Grade: numalib.Performance,
BaseSpeed: 2048,
Disable: true,
}, {
ID: 3,
Grade: numalib.Performance,
BaseSpeed: 2048,
Disable: true,
}},
OverrideWitholdCompute: 2048, // set by client reserved field
},
},
Memory: NodeMemoryResources{
Expand All @@ -661,7 +690,10 @@ func TestScoreFitBinPack(t *testing.T) {
node.NodeResources.Compatibility()
node.ReservedResources = &NodeReservedResources{
Cpu: NodeReservedCpuResources{
CpuShares: 2048,
// set by fingerprinting callback, topology of 2048 MHz * 4 cores (8192 MHz), of which 2 cores are reserved
// plus 2048 MHz of reserved amount of CPU, effectively a total of 6144 MHz of reserved CPU
CpuShares: 6144,
ReservedCpuCores: []uint16{2, 3},
},
Memory: NodeReservedMemoryResources{
MemoryMB: 4096,
Expand Down
7 changes: 7 additions & 0 deletions nomad/structs/numa.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,3 +172,10 @@ func (r *NodeProcessorResources) TotalCompute() int {
}
return int(r.Topology.TotalCompute())
}

func (r *NodeProcessorResources) UsableCompute() int {
if r == nil || r.Topology == nil {
return 0
}
return int(r.Topology.UsableCompute())
}
19 changes: 16 additions & 3 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2247,6 +2247,19 @@ func (n *Node) Canonicalize() {
}
}

// Comparable returns a comparable version of the node's resources available for scheduling.
// This conversion can be lossy so care must be taken when using it.
func (n *Node) Comparable() *ComparableResources {
if n == nil {
return nil
}

resources := n.NodeResources.Comparable()
resources.Subtract(n.ReservedResources.Comparable())

return resources
}

func (n *Node) Copy() *Node {
if n == nil {
return nil
Expand Down Expand Up @@ -3244,16 +3257,16 @@ func (n *NodeResources) Comparable() *ComparableResources {
return nil
}

usableCores := n.Processors.Topology.UsableCores().Slice()
reservableCores := helper.ConvertSlice(usableCores, func(id hw.CoreID) uint16 {
allCores := n.Processors.Topology.AllCores().Slice()
cores := helper.ConvertSlice(allCores, func(id hw.CoreID) uint16 {
return uint16(id)
})

c := &ComparableResources{
Flattened: AllocatedTaskResources{
Cpu: AllocatedCpuResources{
CpuShares: int64(n.Processors.Topology.TotalCompute()),
ReservedCores: reservableCores,
ReservedCores: cores,
},
Memory: AllocatedMemoryResources{
MemoryMB: n.Memory.MemoryMB,
Expand Down
8 changes: 1 addition & 7 deletions scheduler/feasible/preemption.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,13 +162,7 @@ func (p *Preemptor) Copy() *Preemptor {

// SetNode sets the node
func (p *Preemptor) SetNode(node *structs.Node) {
nodeRemainingResources := node.NodeResources.Comparable()

// Subtract the reserved resources of the node
if c := node.ReservedResources.Comparable(); c != nil {
nodeRemainingResources.Subtract(c)
}
p.nodeRemainingResources = nodeRemainingResources
p.nodeRemainingResources = node.Comparable()
}

// SetCandidates initializes the candidate set from which preemptions are chosen
Expand Down