Skip to content

Commit

Permalink
Merge pull request #43 from mfreeman451/42-chore-linter-fixes
Browse files Browse the repository at this point in the history
fixing node recovery bug
  • Loading branch information
mfreeman451 authored Jan 19, 2025
2 parents 28d1efc + 322755c commit 862a396
Showing 1 changed file with 173 additions and 27 deletions.
200 changes: 173 additions & 27 deletions pkg/cloud/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,15 +198,36 @@ func NewServer(ctx context.Context, config *Config) (*Server, error) {
return server, nil
}

func processSweepStatus(status *proto.ServiceStatus) (*proto.SweepServiceStatus, error) {
var sweepData proto.SweepServiceStatus
if err := json.Unmarshal([]byte(status.Message), &sweepData); err != nil {
return nil, fmt.Errorf("failed to parse sweep data: %w", err)
}

// If LastSweep is 0 or invalid, use current time
if sweepData.LastSweep <= 0 {
sweepData.LastSweep = time.Now().Unix()
}

return &sweepData, nil
}

// ReportStatus handles incoming status reports from pollers.
func (s *Server) ReportStatus(_ context.Context, req *proto.PollerStatusRequest) (*proto.PollerStatusResponse, error) {
func (s *Server) ReportStatus(ctx context.Context, req *proto.PollerStatusRequest) (*proto.PollerStatusResponse, error) {
if req.PollerId == "" {
return nil, errEmptyPollerID
}

now := time.Unix(req.Timestamp, 0)
log.Printf("Processing status report from %s at %s", req.PollerId, now.Format(time.RFC3339))

// First check if this node was previously down
var currentState bool
err := s.db.QueryRow("SELECT is_healthy FROM nodes WHERE node_id = ?", req.PollerId).Scan(&currentState)
if err != nil && !errors.Is(err, sql.ErrNoRows) {
log.Printf("Error checking node state: %v", err)
}

// Build API node status while processing
apiStatus := &api.NodeStatus{
NodeID: req.PollerId,
Expand All @@ -224,6 +245,28 @@ func (s *Server) ReportStatus(_ context.Context, req *proto.PollerStatusRequest)
apiStatus.IsHealthy = false
}

// Special handling for sweep service
if svc.ServiceType == "sweep" {
var sweepData proto.SweepServiceStatus
if err := json.Unmarshal([]byte(svc.Message), &sweepData); err != nil {
log.Printf("Error parsing sweep data: %v", err)
} else {
// If LastSweep is 0 or invalid, use current time
if sweepData.LastSweep <= 0 {
sweepData.LastSweep = now.Unix()
// Re-encode the sweep data with the fixed timestamp
if updatedMessage, err := json.Marshal(sweepData); err == nil {
svc.Message = string(updatedMessage)
}
}
log.Printf("Processed sweep data: network=%s hosts=%d/%d lastSweep=%s",
sweepData.Network,
sweepData.AvailableHosts,
sweepData.TotalHosts,
time.Unix(sweepData.LastSweep, 0).Format(time.RFC3339))
}
}

// Store in database
svcStatus := &db.ServiceStatus{
NodeID: req.PollerId,
Expand Down Expand Up @@ -269,6 +312,40 @@ func (s *Server) ReportStatus(_ context.Context, req *proto.PollerStatusRequest)
return nil, fmt.Errorf("failed to store node status: %w", err)
}

// Check if this is a recovery (node was previously down but is now reporting as healthy)
if err == nil && !currentState && apiStatus.IsHealthy {
log.Printf("Node %s has recovered, last seen at %s", req.PollerId, now.Format(time.RFC3339))

// Get the last downtime for the alert
lastDownTime := s.getLastDowntime(req.PollerId)

// Calculate downtime duration
var downtimeDuration string
if !lastDownTime.IsZero() {
downtimeDuration = now.Sub(lastDownTime).String()
} else {
downtimeDuration = "unknown"
}

// Send recovery alert
alert := &alerts.WebhookAlert{
Level: alerts.Info,
Title: "Node Recovered",
Message: fmt.Sprintf("Node '%s' is back online", req.PollerId),
NodeID: req.PollerId,
Timestamp: now.UTC().Format(time.RFC3339),
Details: map[string]any{
"hostname": getHostname(),
"downtime": downtimeDuration,
"recovery_time": now.Format(time.RFC3339),
"services": len(apiStatus.Services),
},
}

log.Printf("Sending recovery alert for node '%s'", req.PollerId)
s.sendAlert(ctx, alert)
}

// Verify storage
storedNode, err := s.db.GetNodeStatus(nodeStatus.NodeID)
if err != nil {
Expand Down Expand Up @@ -387,7 +464,7 @@ func (s *Server) sendStartupNotification(ctx context.Context) {
Timestamp: time.Now().UTC().Format(time.RFC3339),
NodeID: "cloud",
Details: map[string]any{
"version": "1.0.0",
"version": "1.0.1", // TODO: query version from DB
"hostname": getHostname(),
"pid": os.Getpid(),
"total_nodes": nodeCount,
Expand Down Expand Up @@ -426,36 +503,98 @@ func (s *Server) checkNeverReportedPollers(ctx context.Context, config *Config)
}
}

func (s *Server) markNodeDown(ctx context.Context, nodeID string, timestamp time.Time) {
// Update node status in database
status := &db.NodeStatus{
NodeID: nodeID,
IsHealthy: false,
LastSeen: timestamp,
func (s *Server) markNodeDown(ctx context.Context, nodeID string, lastSeen time.Time) {
// Begin transaction
tx, err := s.db.Begin()
if err != nil {
log.Printf("Error beginning transaction: %v", err)
return
}
defer func() {
if err != nil {
if rbErr := tx.Rollback(); rbErr != nil {
log.Printf("Error rolling back transaction: %v", rbErr)
}
}
}()

if err := s.db.UpdateNodeStatus(status); err != nil {
log.Printf("Error updating node down status: %v", err)
// First check the current state within the transaction
var currentState bool
err = tx.QueryRow("SELECT is_healthy FROM nodes WHERE node_id = ?", nodeID).Scan(&currentState)
if err != nil {
if err == sql.ErrNoRows {
// Node doesn't exist yet, insert it
_, err = tx.Exec(`
INSERT INTO nodes (node_id, last_seen, is_healthy)
VALUES (?, ?, FALSE)`,
nodeID, lastSeen)
if err != nil {
log.Printf("Error inserting new node: %v", err)
return
}
} else {
log.Printf("Error checking current node state: %v", err)
return
}
} else if !currentState {
// Node is already marked as down, no need to do anything
return
} else {
// Update existing node
_, err = tx.Exec(`
UPDATE nodes
SET is_healthy = FALSE,
last_seen = ?
WHERE node_id = ?`,
lastSeen, nodeID)
if err != nil {
log.Printf("Error updating node status: %v", err)
return
}
}

// Add to node history
_, err = tx.Exec(`
INSERT INTO node_history (node_id, timestamp, is_healthy)
VALUES (?, ?, FALSE)`,
nodeID, lastSeen)
if err != nil {
log.Printf("Error adding node history: %v", err)
return
}

// Send alert
// Commit the transaction
err = tx.Commit()
if err != nil {
log.Printf("Error committing transaction: %v", err)
return
}

// Send alert only if we successfully updated the database
alert := &alerts.WebhookAlert{
Level: alerts.Error,
Title: "Node Offline",
Message: fmt.Sprintf("Node '%s' is offline", nodeID),
NodeID: nodeID,
Timestamp: timestamp.UTC().Format(time.RFC3339),
Timestamp: lastSeen.UTC().Format(time.RFC3339),
Details: map[string]any{
"hostname": getHostname(),
"duration": time.Since(timestamp).String(),
"duration": time.Since(lastSeen).String(),
},
}

log.Printf("Sending offline alert for node '%s'", nodeID)

s.sendAlert(ctx, alert)

// Update API server state
if s.apiServer != nil {
offlineStatus := &api.NodeStatus{
NodeID: nodeID,
IsHealthy: false,
LastUpdate: lastSeen,
}
s.apiServer.UpdateNodeStatus(nodeID, offlineStatus)
}
}

func (s *Server) sendAlert(ctx context.Context, alert *alerts.WebhookAlert) {
Expand Down Expand Up @@ -528,10 +667,9 @@ func (s *Server) MonitorPollers(ctx context.Context) {

func (s *Server) checkPollers(ctx context.Context) {
now := time.Now()

alertThreshold := now.Add(-s.alertThreshold)

// Get all nodes and their current status
// Get all nodes that are currently marked as healthy but haven't been seen recently
const querySQL = `
SELECT n.node_id, n.last_seen, n.is_healthy
FROM nodes n
Expand All @@ -541,7 +679,6 @@ func (s *Server) checkPollers(ctx context.Context) {
rows, err := s.db.Query(querySQL, alertThreshold)
if err != nil {
log.Printf("Error querying nodes: %v", err)

return
}
defer func(rows *sql.Rows) {
Expand All @@ -553,26 +690,22 @@ func (s *Server) checkPollers(ctx context.Context) {

for rows.Next() {
var nodeID string

var lastSeen time.Time

var isHealthy bool

if err := rows.Scan(&nodeID, &lastSeen, &isHealthy); err != nil {
log.Printf("Error scanning node status: %v", err)

continue
}

timeSinceLastSeen := now.Sub(lastSeen)
log.Printf("Node %s status check: healthy=%v, last_seen=%v ago (threshold: %v)",
nodeID, isHealthy, timeSinceLastSeen, s.alertThreshold)
log.Printf("Node %s last seen %v ago (threshold: %v)",
nodeID, timeSinceLastSeen, s.alertThreshold)

// If the node was healthy but hasn't been seen recently, mark it down
// Only mark down if the node is currently marked as healthy
if isHealthy {
log.Printf("Node %s transitioning to DOWN state (last seen %v ago)",
nodeID, timeSinceLastSeen)
s.markNodeDown(ctx, nodeID, now)
log.Printf("Node %s transitioning to DOWN state", nodeID)
s.markNodeDown(ctx, nodeID, lastSeen)

// Update API server state
if s.apiServer != nil {
Expand All @@ -588,9 +721,22 @@ func (s *Server) checkPollers(ctx context.Context) {
}

func (s *Server) markNodeRecovered(ctx context.Context, nodeID string, timestamp time.Time) {
// First check if the node is actually down before marking it as recovered
var currentState bool
err := s.db.QueryRow("SELECT is_healthy FROM nodes WHERE node_id = ?", nodeID).Scan(&currentState)
if err != nil {
log.Printf("Error checking current node state: %v", err)
return
}

if currentState {
// Node is already marked as healthy, no need to send recovery alert
return
}

// Get the last downtime before updating status
var lastDownTime time.Time
err := s.db.QueryRow(`
err = s.db.QueryRow(`
SELECT timestamp
FROM node_history
WHERE node_id = ? AND is_healthy = FALSE
Expand Down

0 comments on commit 862a396

Please sign in to comment.