Skip to content

Commit a82018e

Browse files
First KVM feature extractor and scheduler (#31)
1 parent 7bbd7ff commit a82018e

File tree

7 files changed

+351
-0
lines changed

7 files changed

+351
-0
lines changed

helm/cortex/values.yaml

+17
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,13 @@ conf:
241241
prometheus:
242242
metrics:
243243
- vrops_hostsystem_cpu_contention_percentage
244+
# KVM-specific extractors.
245+
- name: node_exporter_host_cpu_usage_extractor
246+
dependencies:
247+
sync:
248+
prometheus:
249+
metrics:
250+
- node_exporter_cpu_usage_pct
244251

245252
scheduler:
246253
# The port to use for the scheduler API.
@@ -274,3 +281,13 @@ conf:
274281
features:
275282
extractors:
276283
- vrops_hostsystem_contention_extractor
284+
# KVM specific scheduler steps.
285+
- name: kvm_avoid_overloaded_hosts
286+
options:
287+
avgCPUUsageThreshold: 10
288+
maxCPUUsageThreshold: 20
289+
activationOnHit: -1.0
290+
dependencies:
291+
features:
292+
extractors:
293+
- node_exporter_host_cpu_usage_extractor

internal/features/pipeline.go

+4
Original file line numberDiff line numberDiff line change
@@ -10,16 +10,20 @@ import (
1010
"github.com/cobaltcore-dev/cortex/internal/conf"
1111
"github.com/cobaltcore-dev/cortex/internal/db"
1212
"github.com/cobaltcore-dev/cortex/internal/features/plugins"
13+
"github.com/cobaltcore-dev/cortex/internal/features/plugins/kvm"
1314
"github.com/cobaltcore-dev/cortex/internal/features/plugins/vmware"
1415
"github.com/prometheus/client_golang/prometheus"
1516
)
1617

1718
// Configuration of feature extractors supported by the scheduler.
1819
// The actual features to extract are defined in the configuration file.
1920
var supportedExtractors = []plugins.FeatureExtractor{
21+
// VMware-specific extractors
2022
&vmware.VROpsHostsystemResolver{},
2123
&vmware.VROpsProjectNoisinessExtractor{},
2224
&vmware.VROpsHostsystemContentionExtractor{},
25+
// KVM-specific extractors
26+
&kvm.NodeExporterHostCPUUsageExtractor{},
2327
}
2428

2529
// Pipeline that contains multiple feature extractors and executes them.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
// Copyright 2025 SAP SE
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package kvm
5+
6+
import (
7+
"github.com/cobaltcore-dev/cortex/internal/features/plugins"
8+
)
9+
10+
// Feature that maps CPU usage of kvm hosts.
11+
type NodeExporterHostCPUUsage struct {
12+
ComputeHost string `db:"compute_host"`
13+
AvgCPUUsage float64 `db:"avg_cpu_usage"`
14+
MaxCPUUsage float64 `db:"max_cpu_usage"`
15+
}
16+
17+
// Table under which the feature is stored.
18+
func (NodeExporterHostCPUUsage) TableName() string {
19+
return "feature_host_cpu_usage"
20+
}
21+
22+
// Extractor that extracts CPU usage of kvm hosts and stores
23+
// it as a feature into the database.
24+
type NodeExporterHostCPUUsageExtractor struct {
25+
// Common base for all extractors that provides standard functionality.
26+
plugins.BaseExtractor[
27+
struct{}, // No options passed through yaml config
28+
NodeExporterHostCPUUsage, // Feature model
29+
]
30+
}
31+
32+
// Name of this feature extractor that is used in the yaml config, for logging etc.
33+
func (*NodeExporterHostCPUUsageExtractor) GetName() string {
34+
return "node_exporter_host_cpu_usage_extractor"
35+
}
36+
37+
// Extract CPU usage of kvm hosts.
38+
// Depends on resolved kvm hosts (feature_resolved_host).
39+
func (e *NodeExporterHostCPUUsageExtractor) Extract() ([]plugins.Feature, error) {
40+
var features []NodeExporterHostCPUUsage
41+
if _, err := e.DB.Select(&features, `
42+
SELECT
43+
node AS compute_host,
44+
AVG(value) AS avg_cpu_usage,
45+
MAX(value) AS max_cpu_usage
46+
FROM node_exporter_metrics
47+
WHERE name = 'node_exporter_cpu_usage_pct'
48+
GROUP BY node;
49+
`); err != nil {
50+
return nil, err
51+
}
52+
return e.Extracted(features)
53+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
// Copyright 2025 SAP SE
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package kvm
5+
6+
import (
7+
"testing"
8+
9+
"github.com/cobaltcore-dev/cortex/internal/conf"
10+
"github.com/cobaltcore-dev/cortex/internal/db"
11+
"github.com/cobaltcore-dev/cortex/internal/sync/prometheus"
12+
testlibDB "github.com/cobaltcore-dev/cortex/testlib/db"
13+
)
14+
15+
func TestNodeExporterHostCPUUsageExtractor_Init(t *testing.T) {
16+
dbEnv := testlibDB.SetupDBEnv(t)
17+
testDB := db.DB{DbMap: dbEnv.DbMap}
18+
defer testDB.Close()
19+
defer dbEnv.Close()
20+
21+
extractor := &NodeExporterHostCPUUsageExtractor{}
22+
if err := extractor.Init(testDB, conf.NewRawOpts("")); err != nil {
23+
t.Fatalf("expected no error, got %v", err)
24+
}
25+
26+
if !testDB.TableExists(NodeExporterHostCPUUsage{}) {
27+
t.Error("expected table to be created")
28+
}
29+
}
30+
31+
func TestNodeExporterHostCPUUsageExtractor_Extract(t *testing.T) {
32+
dbEnv := testlibDB.SetupDBEnv(t)
33+
testDB := db.DB{DbMap: dbEnv.DbMap}
34+
defer testDB.Close()
35+
defer dbEnv.Close()
36+
37+
// Create dependency tables
38+
if err := testDB.CreateTable(
39+
testDB.AddTable(prometheus.NodeExporterMetric{}),
40+
); err != nil {
41+
t.Fatalf("expected no error, got %v", err)
42+
}
43+
44+
// Insert mock data into the node_exporter_metrics table
45+
_, err := testDB.Exec(`
46+
INSERT INTO node_exporter_metrics (node, name, value)
47+
VALUES
48+
('node1', 'node_exporter_cpu_usage_pct', 20.0),
49+
('node2', 'node_exporter_cpu_usage_pct', 30.0),
50+
('node1', 'node_exporter_cpu_usage_pct', 40.0)
51+
`)
52+
if err != nil {
53+
t.Fatalf("expected no error, got %v", err)
54+
}
55+
56+
extractor := &NodeExporterHostCPUUsageExtractor{}
57+
if err := extractor.Init(testDB, conf.NewRawOpts("")); err != nil {
58+
t.Fatalf("expected no error, got %v", err)
59+
}
60+
if _, err = extractor.Extract(); err != nil {
61+
t.Fatalf("expected no error, got %v", err)
62+
}
63+
64+
// Verify the data was inserted into the feature_host_cpu_usage table
65+
var usages []NodeExporterHostCPUUsage
66+
_, err = testDB.Select(&usages, "SELECT * FROM feature_host_cpu_usage")
67+
if err != nil {
68+
t.Fatalf("expected no error, got %v", err)
69+
}
70+
71+
if len(usages) != 2 {
72+
t.Errorf("expected 2 rows, got %d", len(usages))
73+
}
74+
expected := map[string]struct {
75+
AvgCPUUsage float64
76+
MaxCPUUsage float64
77+
}{
78+
"node1": {AvgCPUUsage: 30.0, MaxCPUUsage: 40.0}, // Average of 20.0 and 40.0, Max of 40.0
79+
"node2": {AvgCPUUsage: 30.0, MaxCPUUsage: 30.0}, // Single value of 30.0
80+
}
81+
for _, u := range usages {
82+
if expected[u.ComputeHost].AvgCPUUsage != u.AvgCPUUsage {
83+
t.Errorf(
84+
"expected avg_cpu_usage for compute_host %s to be %f, got %f",
85+
u.ComputeHost, expected[u.ComputeHost].AvgCPUUsage, u.AvgCPUUsage,
86+
)
87+
}
88+
if expected[u.ComputeHost].MaxCPUUsage != u.MaxCPUUsage {
89+
t.Errorf(
90+
"expected max_cpu_usage for compute_host %s to be %f, got %f",
91+
u.ComputeHost, expected[u.ComputeHost].MaxCPUUsage, u.MaxCPUUsage,
92+
)
93+
}
94+
}
95+
}

internal/scheduler/pipeline.go

+4
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,18 @@ import (
1414
"github.com/cobaltcore-dev/cortex/internal/conf"
1515
"github.com/cobaltcore-dev/cortex/internal/db"
1616
"github.com/cobaltcore-dev/cortex/internal/scheduler/plugins"
17+
"github.com/cobaltcore-dev/cortex/internal/scheduler/plugins/kvm"
1718
"github.com/cobaltcore-dev/cortex/internal/scheduler/plugins/vmware"
1819
)
1920

2021
// Configuration of steps supported by the scheduler.
2122
// The steps actually used by the scheduler are defined through the configuration file.
2223
var supportedSteps = []plugins.Step{
24+
// VMware-specific steps
2325
&vmware.AntiAffinityNoisyProjectsStep{},
2426
&vmware.AvoidContendedHostsStep{},
27+
// KVM-specific steps
28+
&kvm.AvoidOverloadedHostsStep{},
2529
}
2630

2731
// Sequence of scheduler steps that are executed in parallel.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
// Copyright 2025 SAP SE
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package kvm
5+
6+
import (
7+
"github.com/cobaltcore-dev/cortex/internal/features/plugins/kvm"
8+
"github.com/cobaltcore-dev/cortex/internal/scheduler/plugins"
9+
)
10+
11+
// Options for the scheduling step, given through the
12+
// step config in the service yaml file.
13+
type AvoidOverloadedHostsStepOpts struct {
14+
AvgCPUUsageThreshold float64 `yaml:"avgCPUUsageThreshold"`
15+
MaxCPUUsageThreshold float64 `yaml:"maxCPUUsageThreshold"`
16+
ActivationOnHit float64 `yaml:"activationOnHit"`
17+
}
18+
19+
// Step to avoid high cpu hosts by downvoting them.
20+
type AvoidOverloadedHostsStep struct {
21+
// BaseStep is a helper struct that provides common functionality for all steps.
22+
plugins.BaseStep[AvoidOverloadedHostsStepOpts]
23+
}
24+
25+
// Get the name of this step, used for identification in config, logs, metrics, etc.
26+
func (s *AvoidOverloadedHostsStep) GetName() string {
27+
return "kvm_avoid_overloaded_hosts"
28+
}
29+
30+
// Downvote hosts that have high cpu load.
31+
func (s *AvoidOverloadedHostsStep) Run(scenario plugins.Scenario) (map[string]float64, error) {
32+
activations := s.BaseStep.BaseActivations(scenario)
33+
if scenario.GetVMware() {
34+
// Don't run this step for VMware VMs.
35+
return activations, nil
36+
}
37+
38+
var highlyUsedHosts []kvm.NodeExporterHostCPUUsage
39+
if _, err := s.DB.Select(&highlyUsedHosts, `
40+
SELECT * FROM feature_host_cpu_usage
41+
WHERE avg_cpu_usage > :avg_cpu_usage_threshold
42+
OR max_cpu_usage > :max_cpu_usage_threshold
43+
`, map[string]any{
44+
"avg_cpu_usage_threshold": s.Options.AvgCPUUsageThreshold,
45+
"max_cpu_usage_threshold": s.Options.MaxCPUUsageThreshold,
46+
}); err != nil {
47+
return nil, err
48+
}
49+
50+
// Push the VM away from highly used hosts.
51+
for _, host := range highlyUsedHosts {
52+
// Only modify the weight if the host is in the scenario.
53+
if _, ok := activations[host.ComputeHost]; ok {
54+
activations[host.ComputeHost] = s.Options.ActivationOnHit
55+
}
56+
}
57+
return activations, nil
58+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
// Copyright 2025 SAP SE
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package kvm
5+
6+
import (
7+
"testing"
8+
9+
"github.com/cobaltcore-dev/cortex/internal/conf"
10+
"github.com/cobaltcore-dev/cortex/internal/db"
11+
"github.com/cobaltcore-dev/cortex/internal/features/plugins/kvm"
12+
"github.com/cobaltcore-dev/cortex/internal/scheduler/plugins"
13+
testlibDB "github.com/cobaltcore-dev/cortex/testlib/db"
14+
testlibPlugins "github.com/cobaltcore-dev/cortex/testlib/scheduler/plugins"
15+
)
16+
17+
func TestAvoidOverloadedHostsStep_Run(t *testing.T) {
18+
dbEnv := testlibDB.SetupDBEnv(t)
19+
testDB := db.DB{DbMap: dbEnv.DbMap}
20+
defer testDB.Close()
21+
defer dbEnv.Close()
22+
23+
// Create dependency tables
24+
err := testDB.CreateTable(testDB.AddTable(kvm.NodeExporterHostCPUUsage{}))
25+
if err != nil {
26+
t.Fatalf("expected no error, got %v", err)
27+
}
28+
29+
// Insert mock data into the feature_host_cpu_usage table
30+
_, err = testDB.Exec(`
31+
INSERT INTO feature_host_cpu_usage (compute_host, avg_cpu_usage, max_cpu_usage)
32+
VALUES
33+
('host1', 15.0, 25.0),
34+
('host2', 5.0, 10.0),
35+
('host3', 20.0, 30.0)
36+
`)
37+
if err != nil {
38+
t.Fatalf("expected no error, got %v", err)
39+
}
40+
41+
// Create an instance of the step
42+
opts := conf.NewRawOpts(`
43+
avgCPUUsageThreshold: 10.0
44+
maxCPUUsageThreshold: 20.0
45+
activationOnHit: -1.0
46+
`)
47+
step := &AvoidOverloadedHostsStep{}
48+
if err := step.Init(testDB, opts); err != nil {
49+
t.Fatalf("expected no error, got %v", err)
50+
}
51+
52+
tests := []struct {
53+
name string
54+
scenario plugins.Scenario
55+
downvotedHosts map[string]struct{}
56+
}{
57+
{
58+
name: "Non-vmware vm",
59+
scenario: &testlibPlugins.MockScenario{
60+
VMware: false,
61+
Hosts: []testlibPlugins.MockScenarioHost{
62+
{ComputeHost: "host1", HypervisorHostname: "hypervisor1"},
63+
{ComputeHost: "host2", HypervisorHostname: "hypervisor2"},
64+
{ComputeHost: "host3", HypervisorHostname: "hypervisor3"},
65+
},
66+
},
67+
// Should downvote hosts with high CPU usage
68+
downvotedHosts: map[string]struct{}{
69+
"host1": {},
70+
"host3": {},
71+
},
72+
},
73+
{
74+
name: "VMware vm",
75+
scenario: &testlibPlugins.MockScenario{
76+
VMware: true,
77+
Hosts: []testlibPlugins.MockScenarioHost{
78+
{ComputeHost: "host1", HypervisorHostname: "hypervisor1"},
79+
{ComputeHost: "host2", HypervisorHostname: "hypervisor2"},
80+
{ComputeHost: "host3", HypervisorHostname: "hypervisor3"},
81+
},
82+
},
83+
// Should not do anything for VMware VMs
84+
downvotedHosts: map[string]struct{}{},
85+
},
86+
{
87+
name: "No overloaded hosts",
88+
scenario: &testlibPlugins.MockScenario{
89+
VMware: false,
90+
Hosts: []testlibPlugins.MockScenarioHost{
91+
{ComputeHost: "host4", HypervisorHostname: "hypervisor4"},
92+
{ComputeHost: "host5", HypervisorHostname: "hypervisor5"},
93+
},
94+
},
95+
// Should not downvote any hosts
96+
downvotedHosts: map[string]struct{}{},
97+
},
98+
}
99+
100+
for _, tt := range tests {
101+
t.Run(tt.name, func(t *testing.T) {
102+
weights, err := step.Run(tt.scenario)
103+
if err != nil {
104+
t.Fatalf("expected no error, got %v", err)
105+
}
106+
// Check that the weights have decreased
107+
for host, weight := range weights {
108+
if _, ok := tt.downvotedHosts[host]; ok {
109+
if weight >= 0 {
110+
t.Errorf("expected weight for host %s to be less than 0, got %f", host, weight)
111+
}
112+
} else {
113+
if weight != 0 {
114+
t.Errorf("expected weight for host %s to be 0, got %f", host, weight)
115+
}
116+
}
117+
}
118+
})
119+
}
120+
}

0 commit comments

Comments
 (0)