diff --git a/CubeMaster/pkg/base/node/node.go b/CubeMaster/pkg/base/node/node.go index 81e9794f..07d9cb48 100644 --- a/CubeMaster/pkg/base/node/node.go +++ b/CubeMaster/pkg/base/node/node.go @@ -54,7 +54,11 @@ type Node struct { MetaDataUpdateAt time.Time `json:"MetaDataUpdateAt,omitempty"` - Healthy bool `json:"Healthy,omitempty"` + ReportedReady bool `json:"-"` + + Healthy bool `json:"Healthy"` + + UnhealthyReason string `json:"UnhealthyReason,omitempty"` Score float64 `json:"Score,omitempty"` @@ -84,6 +88,22 @@ type Node struct { NicQueues int64 `json:"nic_queues,omitempty"` } +func (n *Node) Clone() *Node { + if n == nil { + return nil + } + // Clone provides a best-effort read-side snapshot. Mutable counters such + // as LocalCreateNum are refreshed via atomic loads after the structural + // copy so cloned read models stay aligned with the write path. + localCreateNum := atomic.LoadInt64(&n.LocalCreateNum) + cloned := *n + cloned.LocalCreateNum = localCreateNum + if n.VirtualNodeQuotaArray != nil { + cloned.VirtualNodeQuotaArray = append([]int64(nil), n.VirtualNodeQuotaArray...) + } + return &cloned +} + func (n *Node) ID() string { if n.InsID == "" { return n.IP diff --git a/CubeMaster/pkg/base/nodehealth/health.go b/CubeMaster/pkg/base/nodehealth/health.go new file mode 100644 index 00000000..26114ff7 --- /dev/null +++ b/CubeMaster/pkg/base/nodehealth/health.go @@ -0,0 +1,44 @@ +package nodehealth + +import ( + "time" + + corev1 "k8s.io/api/core/v1" +) + +const ( + ReasonReportedNotReady = "ReportedNotReady" + ReasonHeartbeatExpired = "HeartbeatExpired" +) + +type Status struct { + Healthy bool + UnhealthyReason string +} + +func MetadataTimeout(syncMetaDataInterval time.Duration) time.Duration { + return syncMetaDataInterval + 10*time.Second +} + +func ReadyConditionTrue(conditions []corev1.NodeCondition) bool { + for _, cond := range conditions { + if cond.Type == corev1.NodeReady { + return cond.Status == corev1.ConditionTrue + } + } + return false +} + +func EvaluateFromFacts(reportedReady bool, heartbeatTime, now time.Time, timeout time.Duration) Status { + if heartbeatTime.IsZero() || now.Sub(heartbeatTime) > timeout { + return Status{Healthy: false, UnhealthyReason: ReasonHeartbeatExpired} + } + if !reportedReady { + return Status{Healthy: false, UnhealthyReason: ReasonReportedNotReady} + } + return Status{Healthy: true} +} + +func Evaluate(conditions []corev1.NodeCondition, heartbeatTime, now time.Time, timeout time.Duration) Status { + return EvaluateFromFacts(ReadyConditionTrue(conditions), heartbeatTime, now, timeout) +} diff --git a/CubeMaster/pkg/localcache/db_cache.go b/CubeMaster/pkg/localcache/db_cache.go index 7381dad7..a66766b9 100644 --- a/CubeMaster/pkg/localcache/db_cache.go +++ b/CubeMaster/pkg/localcache/db_cache.go @@ -215,6 +215,7 @@ func constructNode(elem *models.HostInfo) *node.Node { InstanceType: elem.InstanceType, HostStatus: elem.HostStatus, MetaDataUpdateAt: time.Now(), + ReportedReady: constants.HeartbeatHealth == elem.LiveStatus, Healthy: constants.HeartbeatHealth == elem.LiveStatus, QuotaMem: elem.QuotaMem, QuotaCpu: elem.QuotaCpu, diff --git a/CubeMaster/pkg/localcache/export.go b/CubeMaster/pkg/localcache/export.go index 8937181b..6a1ed924 100644 --- a/CubeMaster/pkg/localcache/export.go +++ b/CubeMaster/pkg/localcache/export.go @@ -21,6 +21,7 @@ import ( fwk "github.com/tencentcloud/CubeSandbox/CubeMaster/pkg/base/framework" "github.com/tencentcloud/CubeSandbox/CubeMaster/pkg/base/log" "github.com/tencentcloud/CubeSandbox/CubeMaster/pkg/base/node" + "github.com/tencentcloud/CubeSandbox/CubeMaster/pkg/base/nodehealth" "github.com/tencentcloud/CubeSandbox/CubeMaster/pkg/base/types" "github.com/tencentcloud/CubeSandbox/CubeMaster/pkg/base/utils" "github.com/tencentcloud/CubeSandbox/CubeMaster/pkg/base/wrapredis" @@ -89,10 +90,11 @@ func GetCacheItems() map[string]cache.Item { func GetNodes(n int) node.NodeList { nodes := node.NodeList{} elems := l.cache.Items() + now := time.Now() for _, v := range elems { h, ok := v.Object.(*node.Node) if ok { - nodes.Append(h) + nodes.Append(cloneNodeWithCurrentHealth(h, now)) } if n > 0 && nodes.Len() >= n { break @@ -104,13 +106,17 @@ func GetNodes(n int) node.NodeList { func GetHealthyNodes(n int) node.NodeList { nodes := node.NodeList{} elems := l.cache.Items() + now := time.Now() for _, v := range elems { if n >= 0 && nodes.Len() >= n { break } h, ok := v.Object.(*node.Node) - if ok && h.Healthy { - nodes.Append(h) + if ok { + current := cloneNodeWithCurrentHealth(h, now) + if current.Healthy { + nodes.Append(current) + } } } @@ -127,11 +133,8 @@ func GetHealthyNodesByInstanceType(n int, product string) node.NodeList { return GetHealthyNodes(n) } - if n == -1 { - return clusterNodes - } - nodes := node.NodeList{} + now := time.Now() for _, v := range clusterNodes { @@ -139,8 +142,9 @@ func GetHealthyNodesByInstanceType(n int, product string) node.NodeList { break } - if v.Healthy { - nodes.Append(v) + current := cloneNodeWithCurrentHealth(v, now) + if current.Healthy { + nodes.Append(current) } } @@ -153,15 +157,34 @@ func GetNode(id string) (*node.Node, bool) { return nil, exist } h, ok := elem.(*node.Node) - return h, ok + if ok { + return cloneNodeWithCurrentHealth(h, time.Now()), true + } + return nil, false +} + +func metadataHealthTimeout() time.Duration { + return nodehealth.MetadataTimeout(config.GetConfig().Common.SyncMetaDataInterval) +} + +func cloneNodeWithCurrentHealth(n *node.Node, now time.Time) *node.Node { + if n == nil { + return nil + } + current := n.Clone() + status := nodehealth.EvaluateFromFacts(n.ReportedReady, n.MetaDataUpdateAt, now, metadataHealthTimeout()) + current.Healthy = status.Healthy + current.UnhealthyReason = status.UnhealthyReason + return current } func GetNodesByIp(ip string) (*node.Node, bool) { elems := l.cache.Items() + now := time.Now() for _, v := range elems { h, ok := v.Object.(*node.Node) if ok && h.IP == ip { - return h, true + return cloneNodeWithCurrentHealth(h, now), true } } return nil, false @@ -370,6 +393,10 @@ func IncrNodeConcurrent(n *node.Node) error { if n == nil { return nil } + if cached, ok := getMutableNode(n); ok { + cached.LocalCreateNumIncrBy(1) + return nil + } n.LocalCreateNumIncrBy(1) return nil } @@ -378,10 +405,29 @@ func DecrNodeConcurrent(n *node.Node) error { if n == nil { return nil } + if cached, ok := getMutableNode(n); ok { + cached.LocalCreateNumIncrBy(-1) + return nil + } n.LocalCreateNumIncrBy(-1) return nil } +func getMutableNode(n *node.Node) (*node.Node, bool) { + if n == nil { + return nil, false + } + elem, ok := l.cache.Get(n.ID()) + if !ok { + return nil, false + } + cached, ok := elem.(*node.Node) + if !ok || cached == nil { + return nil, false + } + return cached, true +} + func HealthyMasterNodes() (num int64) { defer func() { if num == 0 { diff --git a/CubeMaster/pkg/localcache/export_test.go b/CubeMaster/pkg/localcache/export_test.go index 986f4bb3..c8678afd 100644 --- a/CubeMaster/pkg/localcache/export_test.go +++ b/CubeMaster/pkg/localcache/export_test.go @@ -6,10 +6,12 @@ package localcache import ( "testing" + "time" "github.com/patrickmn/go-cache" fwk "github.com/tencentcloud/CubeSandbox/CubeMaster/pkg/base/framework" "github.com/tencentcloud/CubeSandbox/CubeMaster/pkg/base/node" + "github.com/tencentcloud/CubeSandbox/CubeMaster/pkg/base/nodehealth" ) func TestGetHealthyNodesByInstanceType(t *testing.T) { @@ -24,7 +26,11 @@ func TestGetHealthyNodesByInstanceType(t *testing.T) { createNodes := func(count int, healthy bool) node.NodeList { nodes := make(node.NodeList, count) for i := 0; i < count; i++ { - nodes[i] = &node.Node{Healthy: healthy} + nodes[i] = &node.Node{ + ReportedReady: healthy, + Healthy: healthy, + MetaDataUpdateAt: time.Now(), + } } return nodes } @@ -260,3 +266,143 @@ func TestInvalidateImageStateAllowsHeartbeatToRebuildLocality(t *testing.T) { t.Fatal("reverse index should be rebuilt after heartbeat replay") } } + +func TestGetNodeRefreshesCurrentHealthFromCachedFacts(t *testing.T) { + origCache := l.cache + defer func() { + l.cache = origCache + }() + + l.cache = cache.New(0, 0) + staleHeartbeat := time.Now().Add(-(metadataHealthTimeout() + time.Second)) + l.cache.SetDefault("node-stale", &node.Node{ + InsID: "node-stale", + IP: "10.0.0.1", + ReportedReady: true, + Healthy: true, + MetaDataUpdateAt: staleHeartbeat, + }) + + got, ok := GetNode("node-stale") + if !ok || got == nil { + t.Fatal("expected node to exist") + } + if got.Healthy { + t.Fatal("stale heartbeat should be reflected as unhealthy") + } + if got.UnhealthyReason != nodehealth.ReasonHeartbeatExpired { + t.Fatalf("UnhealthyReason=%s want %s", got.UnhealthyReason, nodehealth.ReasonHeartbeatExpired) + } + raw, ok := l.cache.Get("node-stale") + if !ok { + t.Fatal("expected cached node to remain in cache") + } + cached, ok := raw.(*node.Node) + if !ok { + t.Fatal("expected cached node type") + } + if !cached.Healthy { + t.Fatal("read path should not mutate cached health state") + } +} + +func TestGetHealthyNodesByInstanceTypeFiltersExpiredHeartbeat(t *testing.T) { + origNodesByClusters := l.sortedNodesByClusters + defer func() { + l.sortedNodesByClusters = origNodesByClusters + }() + + fresh := &node.Node{ + InsID: "node-fresh", + ReportedReady: true, + Healthy: true, + MetaDataUpdateAt: time.Now(), + } + stale := &node.Node{ + InsID: "node-stale", + ReportedReady: true, + Healthy: true, + MetaDataUpdateAt: time.Now().Add(-(metadataHealthTimeout() + time.Second)), + } + l.sortedNodesByClusters = map[string]node.NodeList{ + "valid": {fresh, stale}, + } + + got := GetHealthyNodesByInstanceType(-1, "valid") + if got.Len() != 1 { + t.Fatalf("healthy node count=%d want 1", got.Len()) + } + if got[0].ID() != fresh.ID() { + t.Fatalf("healthy node=%s want %s", got[0].ID(), fresh.ID()) + } + if !stale.Healthy { + t.Fatal("read path should not mutate source node health state") + } +} + +func TestGetNodesByIpRefreshesCurrentHealthFromCachedFacts(t *testing.T) { + origCache := l.cache + defer func() { + l.cache = origCache + }() + + l.cache = cache.New(0, 0) + staleHeartbeat := time.Now().Add(-(metadataHealthTimeout() + time.Second)) + l.cache.SetDefault("node-stale", &node.Node{ + InsID: "node-stale", + IP: "10.0.0.9", + ReportedReady: true, + Healthy: true, + MetaDataUpdateAt: staleHeartbeat, + }) + + got, ok := GetNodesByIp("10.0.0.9") + if !ok || got == nil { + t.Fatal("expected node to exist") + } + if got.Healthy { + t.Fatal("stale heartbeat should be reflected as unhealthy") + } + if got.UnhealthyReason != nodehealth.ReasonHeartbeatExpired { + t.Fatalf("UnhealthyReason=%s want %s", got.UnhealthyReason, nodehealth.ReasonHeartbeatExpired) + } +} + +func TestNodeConcurrentCountersUpdateCachedNodeFromReadClone(t *testing.T) { + origCache := l.cache + defer func() { + l.cache = origCache + }() + + l.cache = cache.New(0, 0) + l.cache.SetDefault("node-a", &node.Node{ + InsID: "node-a", + IP: "10.0.0.10", + ReportedReady: true, + Healthy: true, + MetaDataUpdateAt: time.Now(), + }) + + got, ok := GetNode("node-a") + if !ok || got == nil { + t.Fatal("expected node to exist") + } + if err := IncrNodeConcurrent(got); err != nil { + t.Fatalf("IncrNodeConcurrent error: %v", err) + } + if err := DecrNodeConcurrent(got); err != nil { + t.Fatalf("DecrNodeConcurrent error: %v", err) + } + + raw, ok := l.cache.Get("node-a") + if !ok { + t.Fatal("expected cached node to remain in cache") + } + cached, ok := raw.(*node.Node) + if !ok { + t.Fatal("expected cached node type") + } + if cached.LocalCreateNum != 0 { + t.Fatalf("LocalCreateNum=%d want 0", cached.LocalCreateNum) + } +} diff --git a/CubeMaster/pkg/localcache/node_cache.go b/CubeMaster/pkg/localcache/node_cache.go index aa5030f7..504ad372 100644 --- a/CubeMaster/pkg/localcache/node_cache.go +++ b/CubeMaster/pkg/localcache/node_cache.go @@ -18,6 +18,7 @@ import ( "github.com/tencentcloud/CubeSandbox/CubeMaster/pkg/base/constants" "github.com/tencentcloud/CubeSandbox/CubeMaster/pkg/base/log" "github.com/tencentcloud/CubeSandbox/CubeMaster/pkg/base/node" + "github.com/tencentcloud/CubeSandbox/CubeMaster/pkg/base/nodehealth" "github.com/tencentcloud/CubeSandbox/CubeMaster/pkg/base/recov" "github.com/tencentcloud/CubeSandbox/CubeMaster/pkg/cubelet" "github.com/tencentcloud/CubeSandbox/CubeMaster/pkg/cubelet/grpcconn" @@ -204,7 +205,9 @@ func (l *local) downNodeCache(n *node.Node) { if v, exist := l.cache.Get(n.ID()); exist { l.lockMetaData.Lock() old := v.(*node.Node) + old.ReportedReady = false old.Healthy = false + old.UnhealthyReason = nodehealth.ReasonReportedNotReady old.MetaDataUpdateAt = time.Now() l.lockMetaData.Unlock() l.delSortedNodes(n) @@ -292,7 +295,9 @@ func (l *local) updateNodeFromMetaData(n *node.Node) error { old.Region = n.Region old.QuotaCpu = n.QuotaCpu old.QuotaMem = n.QuotaMem + old.ReportedReady = n.ReportedReady old.Healthy = n.Healthy + old.UnhealthyReason = n.UnhealthyReason old.HostStatus = n.HostStatus old.CreateConcurrentNum = n.CreateConcurrentNum old.MaxMvmLimit = n.MaxMvmLimit diff --git a/CubeMaster/pkg/localcache/node_cache_test.go b/CubeMaster/pkg/localcache/node_cache_test.go index c16648ef..06089baf 100644 --- a/CubeMaster/pkg/localcache/node_cache_test.go +++ b/CubeMaster/pkg/localcache/node_cache_test.go @@ -25,9 +25,11 @@ func init() { } fmt.Printf("mydir=%s\n", mydir) if os.Getenv("CUBE_MASTER_CONFIG_PATH") == "" { - os.Setenv("CUBE_MASTER_CONFIG_PATH", filepath.Clean(filepath.Join(mydir, "../../test/conf.yaml"))) + os.Setenv("CUBE_MASTER_CONFIG_PATH", filepath.Clean(filepath.Join(mydir, "../../conf.yaml"))) + } + if _, err := config.Init(); err != nil { + panic(err) } - config.Init() } func Test_local_appendNodeByCluster(t *testing.T) { diff --git a/CubeMaster/pkg/nodemeta/service.go b/CubeMaster/pkg/nodemeta/service.go index e6e1d1e9..7c0617b5 100644 --- a/CubeMaster/pkg/nodemeta/service.go +++ b/CubeMaster/pkg/nodemeta/service.go @@ -18,6 +18,7 @@ import ( "github.com/tencentcloud/CubeSandbox/CubeMaster/pkg/base/db/models" "github.com/tencentcloud/CubeSandbox/CubeMaster/pkg/base/log" "github.com/tencentcloud/CubeSandbox/CubeMaster/pkg/base/node" + "github.com/tencentcloud/CubeSandbox/CubeMaster/pkg/base/nodehealth" "github.com/tencentcloud/CubeSandbox/CubeMaster/pkg/localcache" "gorm.io/gorm" "gorm.io/gorm/clause" @@ -111,7 +112,9 @@ type NodeSnapshot struct { Images []ContainerImage `json:"images,omitempty"` LocalTemplates []LocalTemplate `json:"local_templates,omitempty"` HeartbeatTime time.Time `json:"heartbeat_time,omitempty"` - Healthy bool `json:"healthy,omitempty"` + ReportedReady bool `json:"-"` + Healthy bool `json:"healthy"` + UnhealthyReason string `json:"unhealthy_reason,omitempty"` } type service struct { @@ -191,6 +194,7 @@ func RegisterNode(ctx context.Context, req *RegisterNodeRequest) (*NodeSnapshot, snap.QuotaMemMB = req.QuotaMemMB snap.CreateConcurrentNum = req.CreateConcurrentNum snap.MaxMvmNum = req.MaxMvmNum + applyCurrentHealth(snap, time.Now()) global.mu.Unlock() syncLocalcache(snap) return cloneSnapshot(snap), nil @@ -206,13 +210,14 @@ func UpdateNodeStatus(ctx context.Context, nodeID string, req *UpdateNodeStatusR if req.HeartbeatTime.IsZero() { req.HeartbeatTime = time.Now() } + reportedReady := nodehealth.ReadyConditionTrue(req.Conditions) status := &models.NodeStatus{ NodeID: nodeID, ConditionsJSON: mustJSON(req.Conditions), ImagesJSON: mustJSON(req.Images), LocalTemplatesJSON: mustJSON(req.LocalTemplates), HeartbeatUnix: req.HeartbeatTime.Unix(), - Healthy: isHealthy(req.Conditions), + Healthy: reportedReady, } if err := global.db.Clauses(clause.OnConflict{ Columns: []clause.Column{{Name: "node_id"}}, @@ -231,7 +236,8 @@ func UpdateNodeStatus(ctx context.Context, nodeID string, req *UpdateNodeStatusR snap.Images = append([]ContainerImage(nil), req.Images...) snap.LocalTemplates = append([]LocalTemplate(nil), req.LocalTemplates...) snap.HeartbeatTime = req.HeartbeatTime - snap.Healthy = status.Healthy + snap.ReportedReady = reportedReady + applyCurrentHealth(snap, time.Now()) global.mu.Unlock() syncLocalcache(snap) @@ -292,7 +298,7 @@ func GetNode(ctx context.Context, nodeID string) (*NodeSnapshot, error) { if !ok { return nil, gorm.ErrRecordNotFound } - return cloneSnapshot(snap), nil + return cloneSnapshotWithCurrentHealth(snap, time.Now()), nil } func ListNodes(ctx context.Context) ([]*NodeSnapshot, error) { @@ -300,8 +306,9 @@ func ListNodes(ctx context.Context) ([]*NodeSnapshot, error) { global.mu.RLock() defer global.mu.RUnlock() out := make([]*NodeSnapshot, 0, len(global.nodes)) + now := time.Now() for _, snap := range global.nodes { - out = append(out, cloneSnapshot(snap)) + out = append(out, cloneSnapshotWithCurrentHealth(snap, now)) } sort.Slice(out, func(i, j int) bool { return out[i].NodeID < out[j].NodeID }) return out, nil @@ -370,7 +377,8 @@ func (s *service) reload() error { _ = json.Unmarshal([]byte(st.ImagesJSON), &snap.Images) _ = json.Unmarshal([]byte(st.LocalTemplatesJSON), &snap.LocalTemplates) snap.HeartbeatTime = time.Unix(st.HeartbeatUnix, 0) - snap.Healthy = st.Healthy + snap.ReportedReady = st.Healthy + applyCurrentHealth(snap, time.Now()) } s.mu.Lock() s.nodes = next @@ -378,13 +386,24 @@ func (s *service) reload() error { return nil } -func isHealthy(conditions []corev1.NodeCondition) bool { - for _, cond := range conditions { - if cond.Type == corev1.NodeReady { - return cond.Status == corev1.ConditionTrue - } +func healthTimeout() time.Duration { + return nodehealth.MetadataTimeout(config.GetConfig().Common.SyncMetaDataInterval) +} + +func currentHealthStatus(snap *NodeSnapshot, now time.Time) nodehealth.Status { + if snap == nil { + return nodehealth.Status{Healthy: false, UnhealthyReason: nodehealth.ReasonHeartbeatExpired} } - return false + return nodehealth.EvaluateFromFacts(snap.ReportedReady, snap.HeartbeatTime, now, healthTimeout()) +} + +func applyCurrentHealth(snap *NodeSnapshot, now time.Time) { + if snap == nil { + return + } + status := currentHealthStatus(snap, now) + snap.Healthy = status.Healthy + snap.UnhealthyReason = status.UnhealthyReason } func toSchedulerNode(snap *NodeSnapshot) *node.Node { @@ -419,7 +438,9 @@ func toSchedulerNode(snap *NodeSnapshot) *node.Node { OssClusterLabel: snap.ClusterLabel, InstanceType: instanceType, HostStatus: constants.HostStatusRunning, + ReportedReady: snap.ReportedReady, Healthy: snap.Healthy, + UnhealthyReason: snap.UnhealthyReason, CreateConcurrentNum: snap.CreateConcurrentNum, MaxMvmLimit: snap.MaxMvmNum, MetaDataUpdateAt: snap.HeartbeatTime, @@ -464,6 +485,12 @@ func cloneSnapshot(in *NodeSnapshot) *NodeSnapshot { return &out } +func cloneSnapshotWithCurrentHealth(in *NodeSnapshot, now time.Time) *NodeSnapshot { + out := cloneSnapshot(in) + applyCurrentHealth(out, now) + return out +} + func cloneStringMap(in map[string]string) map[string]string { if len(in) == 0 { return nil diff --git a/CubeMaster/pkg/nodemeta/service_resource_test.go b/CubeMaster/pkg/nodemeta/service_resource_test.go index 05d9e5ee..eafb70ec 100644 --- a/CubeMaster/pkg/nodemeta/service_resource_test.go +++ b/CubeMaster/pkg/nodemeta/service_resource_test.go @@ -5,10 +5,31 @@ package nodemeta import ( + "encoding/json" + "os" + "path/filepath" + "strings" "testing" "time" + + "github.com/tencentcloud/CubeSandbox/CubeMaster/pkg/base/config" + "github.com/tencentcloud/CubeSandbox/CubeMaster/pkg/base/nodehealth" + corev1 "k8s.io/api/core/v1" ) +func init() { + mydir, err := os.Getwd() + if err != nil { + panic(err) + } + if os.Getenv("CUBE_MASTER_CONFIG_PATH") == "" { + os.Setenv("CUBE_MASTER_CONFIG_PATH", filepath.Clean(filepath.Join(mydir, "../../conf.yaml"))) + } + if _, err := config.Init(); err != nil { + panic(err) + } +} + func TestToSchedulerNodeDoesNotForgeMetricUpdate(t *testing.T) { snap := &NodeSnapshot{ NodeID: "node-a", @@ -30,3 +51,152 @@ func TestToSchedulerNodeDoesNotForgeMetricUpdate(t *testing.T) { t.Fatalf("MetaDataUpdateAt %v want %v", n.MetaDataUpdateAt, snap.HeartbeatTime) } } + +func TestToSchedulerNodeUsesPrecomputedHealth(t *testing.T) { + snap := &NodeSnapshot{ + NodeID: "node-health", + HostIP: "10.0.0.9", + HeartbeatTime: time.Now(), + ReportedReady: true, + Healthy: false, + UnhealthyReason: nodehealth.ReasonHeartbeatExpired, + } + n := toSchedulerNode(snap) + if n == nil { + t.Fatal("toSchedulerNode returned nil") + } + if n.Healthy { + t.Fatal("toSchedulerNode should preserve snapshot health state") + } + if n.UnhealthyReason != nodehealth.ReasonHeartbeatExpired { + t.Fatalf("UnhealthyReason=%s want %s", n.UnhealthyReason, nodehealth.ReasonHeartbeatExpired) + } +} + +func TestCurrentHealthStatusExpiresStaleHeartbeat(t *testing.T) { + now := time.Now() + + t.Run("fresh ready heartbeat stays healthy", func(t *testing.T) { + snap := &NodeSnapshot{ + NodeID: "node-a", + HeartbeatTime: now.Add(-5 * time.Second), + ReportedReady: true, + } + got := currentHealthStatus(snap, now) + if !got.Healthy { + t.Fatalf("Healthy=%v want true", got.Healthy) + } + if got.UnhealthyReason != "" { + t.Fatalf("UnhealthyReason=%s want empty", got.UnhealthyReason) + } + }) + + t.Run("stale ready heartbeat becomes unhealthy", func(t *testing.T) { + snap := &NodeSnapshot{ + NodeID: "node-b", + HeartbeatTime: now.Add(-(healthTimeout() + time.Second)), + ReportedReady: true, + } + got := currentHealthStatus(snap, now) + if got.Healthy { + t.Fatalf("Healthy=%v want false", got.Healthy) + } + if got.UnhealthyReason != nodehealth.ReasonHeartbeatExpired { + t.Fatalf("UnhealthyReason=%s want %s", got.UnhealthyReason, nodehealth.ReasonHeartbeatExpired) + } + }) + + t.Run("missing heartbeat is treated as expired", func(t *testing.T) { + snap := &NodeSnapshot{ + NodeID: "node-missing", + ReportedReady: false, + } + got := currentHealthStatus(snap, now) + if got.Healthy { + t.Fatalf("Healthy=%v want false", got.Healthy) + } + if got.UnhealthyReason != nodehealth.ReasonHeartbeatExpired { + t.Fatalf("UnhealthyReason=%s want %s", got.UnhealthyReason, nodehealth.ReasonHeartbeatExpired) + } + }) +} + +func TestUpdateNodeStatusStoresLastReportedReadyButExportsCurrentHealth(t *testing.T) { + req := &UpdateNodeStatusRequest{ + Conditions: []corev1.NodeCondition{{ + Type: corev1.NodeReady, + Status: corev1.ConditionTrue, + }}, + HeartbeatTime: time.Now().Add(-(healthTimeout() + time.Second)), + } + + reportedReady := nodehealth.ReadyConditionTrue(req.Conditions) + if !reportedReady { + t.Fatal("reported ready should be true") + } + snap := &NodeSnapshot{ + NodeID: "node-c", + Conditions: req.Conditions, + HeartbeatTime: req.HeartbeatTime, + ReportedReady: reportedReady, + } + applyCurrentHealth(snap, time.Now()) + + if !snap.ReportedReady { + t.Fatal("ReportedReady should preserve the last reported Ready=True fact") + } + if snap.Healthy { + t.Fatal("Healthy should reflect current stale heartbeat state, want false") + } + if snap.UnhealthyReason != nodehealth.ReasonHeartbeatExpired { + t.Fatalf("UnhealthyReason=%s want %s", snap.UnhealthyReason, nodehealth.ReasonHeartbeatExpired) + } +} + +func TestCloneSnapshotWithCurrentHealthRefreshesStaleNode(t *testing.T) { + now := time.Now() + snap := &NodeSnapshot{ + NodeID: "node-d", + HeartbeatTime: now.Add(-(healthTimeout() + time.Second)), + ReportedReady: true, + Healthy: true, + } + + got := cloneSnapshotWithCurrentHealth(snap, now) + if got == nil { + t.Fatal("cloneSnapshotWithCurrentHealth returned nil") + } + if got.Healthy { + t.Fatal("stale heartbeat should be refreshed to unhealthy on snapshot reads") + } + if got.UnhealthyReason != nodehealth.ReasonHeartbeatExpired { + t.Fatalf("UnhealthyReason=%s want %s", got.UnhealthyReason, nodehealth.ReasonHeartbeatExpired) + } + if !snap.Healthy { + t.Fatal("source snapshot should not be mutated by cloneSnapshotWithCurrentHealth") + } +} + +func TestNodeSnapshotJSONIncludesHealthyFalseAndHidesReportedReady(t *testing.T) { + snap := &NodeSnapshot{ + NodeID: "node-json", + ReportedReady: true, + Healthy: false, + UnhealthyReason: nodehealth.ReasonHeartbeatExpired, + } + + data, err := json.Marshal(snap) + if err != nil { + t.Fatalf("Marshal: %v", err) + } + payload := string(data) + if !strings.Contains(payload, `"healthy":false`) { + t.Fatalf("payload %s should include healthy=false", payload) + } + if strings.Contains(payload, `"reported_ready"`) { + t.Fatalf("payload %s should not expose reported_ready", payload) + } + if !strings.Contains(payload, `"unhealthy_reason":"HeartbeatExpired"`) { + t.Fatalf("payload %s should include unhealthy_reason", payload) + } +} diff --git a/CubeMaster/pkg/selector/prefilter/prefilter.go b/CubeMaster/pkg/selector/prefilter/prefilter.go index 92295150..6e5a820d 100644 --- a/CubeMaster/pkg/selector/prefilter/prefilter.go +++ b/CubeMaster/pkg/selector/prefilter/prefilter.go @@ -42,7 +42,6 @@ func (l *prefilter) Select(selCtx *selctx.SelectorCtx) (node.NodeList, error) { log.G(selCtx.Ctx).Debugf("GetHealthyNodesByInstanceType:%+v,size:%d", nodes.String(), nodes.Len()) } newNodes := make(node.NodeList, 0, nodes.Len()) - metaDataUpdateAtTimeout := config.GetConfig().Common.SyncMetaDataInterval + 10*time.Second for i := range nodes { n := nodes[i] if !n.Healthy { @@ -77,17 +76,14 @@ func (l *prefilter) Select(selCtx *selctx.SelectorCtx) (node.NodeList, error) { if time.Since(n.MetricUpdate) > sconf.MetricUpdateTimeout { log.G(selCtx.Ctx).WithFields(map[string]any{ "CalleeCluster": n.ClusterLabel, - }).Fatalf("%s MetricUpdate timeout,lastupdate:%v", n.IP, n.MetricUpdate) + }).Warnf("%s MetricUpdate timeout,lastupdate:%v", n.IP, n.MetricUpdate) + continue } if time.Since(n.MetricLocalUpdateAt) > sconf.MetricUpdateTimeout { log.G(selCtx.Ctx).WithFields(map[string]any{ "CalleeCluster": n.ClusterLabel, - }).Fatalf("%s MetricLocalUpdate timeout,lastupdate:%v", n.IP, n.MetricLocalUpdateAt) - } - if time.Since(n.MetaDataUpdateAt) > metaDataUpdateAtTimeout { - log.G(selCtx.Ctx).WithFields(map[string]any{ - "CalleeCluster": n.ClusterLabel, - }).Fatalf("%s MetaDataUpdate timeout,lastupdate:%v", n.IP, n.MetaDataUpdateAt) + }).Warnf("%s MetricLocalUpdate timeout,lastupdate:%v", n.IP, n.MetricLocalUpdateAt) + continue } newNodes.Append(n) } diff --git a/CubeMaster/pkg/selector/prefilter/prefilter_test.go b/CubeMaster/pkg/selector/prefilter/prefilter_test.go new file mode 100644 index 00000000..d34ed3bf --- /dev/null +++ b/CubeMaster/pkg/selector/prefilter/prefilter_test.go @@ -0,0 +1,110 @@ +package prefilter + +import ( + "context" + "os" + "path/filepath" + "testing" + "time" + + "github.com/agiledragon/gomonkey/v2" + "github.com/tencentcloud/CubeSandbox/CubeMaster/pkg/base/config" + "github.com/tencentcloud/CubeSandbox/CubeMaster/pkg/base/node" + "github.com/tencentcloud/CubeSandbox/CubeMaster/pkg/localcache" + "github.com/tencentcloud/CubeSandbox/CubeMaster/pkg/scheduler/selctx" +) + +func init() { + mydir, err := os.Getwd() + if err != nil { + panic(err) + } + if os.Getenv("CUBE_MASTER_CONFIG_PATH") == "" { + os.Setenv("CUBE_MASTER_CONFIG_PATH", filepath.Clean(filepath.Join(mydir, "../../../conf.yaml"))) + } + if _, err := config.Init(); err != nil { + panic(err) + } +} + +func TestPreFilterExcludesUnhealthyNode(t *testing.T) { + now := time.Now() + fresh := &node.Node{ + InsID: "node-fresh", + IP: "10.0.0.1", + Healthy: true, + MetaDataUpdateAt: now, + MetricUpdate: now, + MetricLocalUpdateAt: now, + } + stale := &node.Node{ + InsID: "node-stale", + IP: "10.0.0.2", + Healthy: false, + UnhealthyReason: "HeartbeatExpired", + MetaDataUpdateAt: now, + MetricUpdate: now, + MetricLocalUpdateAt: now, + } + + patches := gomonkey.NewPatches() + defer patches.Reset() + patches.ApplyFunc(localcache.GetHealthyNodesByInstanceType, func(n int, product string) node.NodeList { + return node.NodeList{fresh, stale} + }) + + got, err := NewPreFilter().Select(&selctx.SelectorCtx{ + Ctx: context.Background(), + InstanceType: "valid", + }) + if err != nil { + t.Fatalf("Select returned error: %v", err) + } + if got.Len() != 1 { + t.Fatalf("got %d nodes want 1", got.Len()) + } + if got[0].ID() != fresh.ID() { + t.Fatalf("got node %s want %s", got[0].ID(), fresh.ID()) + } +} + +func TestPreFilterExcludesMetricTimeoutNode(t *testing.T) { + now := time.Now() + timeout := config.GetConfig().Scheduler.MetricUpdateTimeout + fresh := &node.Node{ + InsID: "node-fresh", + IP: "10.0.0.1", + Healthy: true, + MetaDataUpdateAt: now, + MetricUpdate: now, + MetricLocalUpdateAt: now, + } + staleMetric := &node.Node{ + InsID: "node-stale-metric", + IP: "10.0.0.2", + Healthy: true, + MetaDataUpdateAt: now, + MetricUpdate: now.Add(-(timeout + time.Second)), + MetricLocalUpdateAt: now.Add(-(timeout + time.Second)), + } + + patches := gomonkey.NewPatches() + defer patches.Reset() + patches.ApplyFunc(localcache.GetHealthyNodesByInstanceType, func(n int, product string) node.NodeList { + return node.NodeList{fresh, staleMetric} + }) + + got, err := NewPreFilter().Select(&selctx.SelectorCtx{ + Ctx: context.Background(), + InstanceType: "valid", + }) + if err != nil { + t.Fatalf("Select returned error: %v", err) + } + if got.Len() != 1 { + t.Fatalf("got %d nodes want 1", got.Len()) + } + if got[0].ID() != fresh.ID() { + t.Fatalf("got node %s want %s", got[0].ID(), fresh.ID()) + } +}