Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion CubeMaster/pkg/base/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,11 @@ type Node struct {

MetaDataUpdateAt time.Time `json:"MetaDataUpdateAt,omitempty"`

Healthy bool `json:"Healthy,omitempty"`
ReportedReady bool `json:"-"`

Healthy bool `json:"Healthy"`

UnhealthyReason string `json:"UnhealthyReason,omitempty"`

Score float64 `json:"Score,omitempty"`

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

📢 API contract change: omitempty removed

The Healthy field changed from json:"Healthy,omitempty" to json:"Healthy", meaning "healthy":false is now always present in JSON output instead of being omitted. This is deliberate (the nodemeta test confirms it), but it is a wire-format change that existing API consumers may depend on.

Consider calling this out in the PR description and the next changelog entry so integrators know to expect the new field shape.

Expand Down Expand Up @@ -84,6 +88,22 @@ type Node struct {
NicQueues int64 `json:"nic_queues,omitempty"`
}

func (n *Node) Clone() *Node {
if n == nil {
return nil
}
// Clone provides a best-effort read-side snapshot. Mutable counters such
// as LocalCreateNum are refreshed via atomic loads after the structural
// copy so cloned read models stay aligned with the write path.
localCreateNum := atomic.LoadInt64(&n.LocalCreateNum)
cloned := *n
cloned.LocalCreateNum = localCreateNum
if n.VirtualNodeQuotaArray != nil {
cloned.VirtualNodeQuotaArray = append([]int64(nil), n.VirtualNodeQuotaArray...)
}
return &cloned
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Non-atomic read of LocalCreateNum

The struct copy cloned := *n reads every field including LocalCreateNum (an int64), which is accessed concurrently via atomic.AddInt64 (in LocalCreateNumIncrBy) and atomic.LoadInt64 (in LocalCreateConcurrentLimit).

Per the Go memory model this is a data race and would be flagged by go test -race. In practice on amd64 it is safe (aligned 8-byte reads are atomic at the hardware level), but on 32-bit platforms (ARM) this could tear.

If this project only targets amd64, a brief comment acknowledging the deliberate non-atomic copy would help future readers (and suppress race-detector noise). Otherwise, consider using atomic.LoadInt64(&n.LocalCreateNum) for the copy.


func (n *Node) ID() string {
if n.InsID == "" {
return n.IP
Expand Down
44 changes: 44 additions & 0 deletions CubeMaster/pkg/base/nodehealth/health.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package nodehealth

import (
"time"

corev1 "k8s.io/api/core/v1"
)

const (
ReasonReportedNotReady = "ReportedNotReady"
ReasonHeartbeatExpired = "HeartbeatExpired"
)

type Status struct {
Healthy bool
UnhealthyReason string
}

func MetadataTimeout(syncMetaDataInterval time.Duration) time.Duration {
return syncMetaDataInterval + 10*time.Second
}

func ReadyConditionTrue(conditions []corev1.NodeCondition) bool {
for _, cond := range conditions {
if cond.Type == corev1.NodeReady {
return cond.Status == corev1.ConditionTrue
}
}
return false
}

func EvaluateFromFacts(reportedReady bool, heartbeatTime, now time.Time, timeout time.Duration) Status {
if heartbeatTime.IsZero() || now.Sub(heartbeatTime) > timeout {
return Status{Healthy: false, UnhealthyReason: ReasonHeartbeatExpired}
}
if !reportedReady {
return Status{Healthy: false, UnhealthyReason: ReasonReportedNotReady}
}
return Status{Healthy: true}
}

func Evaluate(conditions []corev1.NodeCondition, heartbeatTime, now time.Time, timeout time.Duration) Status {
return EvaluateFromFacts(ReadyConditionTrue(conditions), heartbeatTime, now, timeout)
}
1 change: 1 addition & 0 deletions CubeMaster/pkg/localcache/db_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ func constructNode(elem *models.HostInfo) *node.Node {
InstanceType: elem.InstanceType,
HostStatus: elem.HostStatus,
MetaDataUpdateAt: time.Now(),
ReportedReady: constants.HeartbeatHealth == elem.LiveStatus,
Healthy: constants.HeartbeatHealth == elem.LiveStatus,
QuotaMem: elem.QuotaMem,
QuotaCpu: elem.QuotaCpu,
Expand Down
68 changes: 57 additions & 11 deletions CubeMaster/pkg/localcache/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
fwk "github.com/tencentcloud/CubeSandbox/CubeMaster/pkg/base/framework"
"github.com/tencentcloud/CubeSandbox/CubeMaster/pkg/base/log"
"github.com/tencentcloud/CubeSandbox/CubeMaster/pkg/base/node"
"github.com/tencentcloud/CubeSandbox/CubeMaster/pkg/base/nodehealth"
"github.com/tencentcloud/CubeSandbox/CubeMaster/pkg/base/types"
"github.com/tencentcloud/CubeSandbox/CubeMaster/pkg/base/utils"
"github.com/tencentcloud/CubeSandbox/CubeMaster/pkg/base/wrapredis"
Expand Down Expand Up @@ -89,10 +90,11 @@ func GetCacheItems() map[string]cache.Item {
func GetNodes(n int) node.NodeList {
nodes := node.NodeList{}
elems := l.cache.Items()
now := time.Now()
for _, v := range elems {
h, ok := v.Object.(*node.Node)
if ok {
nodes.Append(h)
nodes.Append(cloneNodeWithCurrentHealth(h, now))
}
if n > 0 && nodes.Len() >= n {
break
Expand All @@ -104,13 +106,17 @@ func GetNodes(n int) node.NodeList {
func GetHealthyNodes(n int) node.NodeList {
nodes := node.NodeList{}
elems := l.cache.Items()
now := time.Now()
for _, v := range elems {
if n >= 0 && nodes.Len() >= n {
break
}
h, ok := v.Object.(*node.Node)
if ok && h.Healthy {
nodes.Append(h)
if ok {
current := cloneNodeWithCurrentHealth(h, now)
if current.Healthy {
nodes.Append(current)
}
}

}
Expand All @@ -127,20 +133,18 @@ func GetHealthyNodesByInstanceType(n int, product string) node.NodeList {
return GetHealthyNodes(n)
}

if n == -1 {
return clusterNodes
}

nodes := node.NodeList{}
now := time.Now()

for _, v := range clusterNodes {

if n >= 0 && nodes.Len() >= n {
break
}

if v.Healthy {
nodes.Append(v)
current := cloneNodeWithCurrentHealth(v, now)
if current.Healthy {
nodes.Append(current)
}
}

Expand All @@ -153,15 +157,34 @@ func GetNode(id string) (*node.Node, bool) {
return nil, exist
}
h, ok := elem.(*node.Node)
return h, ok
if ok {
return cloneNodeWithCurrentHealth(h, time.Now()), true
}
return nil, false
}

func metadataHealthTimeout() time.Duration {
return nodehealth.MetadataTimeout(config.GetConfig().Common.SyncMetaDataInterval)
}

func cloneNodeWithCurrentHealth(n *node.Node, now time.Time) *node.Node {
if n == nil {
return nil
}
current := n.Clone()
status := nodehealth.EvaluateFromFacts(n.ReportedReady, n.MetaDataUpdateAt, now, metadataHealthTimeout())
current.Healthy = status.Healthy
current.UnhealthyReason = status.UnhealthyReason
return current
}

func GetNodesByIp(ip string) (*node.Node, bool) {
elems := l.cache.Items()
now := time.Now()
for _, v := range elems {
h, ok := v.Object.(*node.Node)
if ok && h.IP == ip {
return h, true
return cloneNodeWithCurrentHealth(h, now), true
}
}
return nil, false
Expand Down Expand Up @@ -370,6 +393,10 @@ func IncrNodeConcurrent(n *node.Node) error {
if n == nil {
return nil
}
if cached, ok := getMutableNode(n); ok {
cached.LocalCreateNumIncrBy(1)
return nil
}
n.LocalCreateNumIncrBy(1)
return nil
}
Expand All @@ -378,10 +405,29 @@ func DecrNodeConcurrent(n *node.Node) error {
if n == nil {
return nil
}
if cached, ok := getMutableNode(n); ok {
cached.LocalCreateNumIncrBy(-1)
return nil
}
n.LocalCreateNumIncrBy(-1)
return nil
}

func getMutableNode(n *node.Node) (*node.Node, bool) {
if n == nil {
return nil, false
}
elem, ok := l.cache.Get(n.ID())
if !ok {
return nil, false
}
cached, ok := elem.(*node.Node)
if !ok || cached == nil {
return nil, false
}
return cached, true
}

func HealthyMasterNodes() (num int64) {
defer func() {
if num == 0 {
Expand Down
Loading
Loading