Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixing node recovery bug #43

Merged
merged 1 commit into from
Jan 19, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
200 changes: 173 additions & 27 deletions pkg/cloud/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,15 +198,36 @@ func NewServer(ctx context.Context, config *Config) (*Server, error) {
return server, nil
}

func processSweepStatus(status *proto.ServiceStatus) (*proto.SweepServiceStatus, error) {
var sweepData proto.SweepServiceStatus
if err := json.Unmarshal([]byte(status.Message), &sweepData); err != nil {
return nil, fmt.Errorf("failed to parse sweep data: %w", err)
}

// If LastSweep is 0 or invalid, use current time
if sweepData.LastSweep <= 0 {
sweepData.LastSweep = time.Now().Unix()
}

return &sweepData, nil
}

// ReportStatus handles incoming status reports from pollers.
func (s *Server) ReportStatus(_ context.Context, req *proto.PollerStatusRequest) (*proto.PollerStatusResponse, error) {
func (s *Server) ReportStatus(ctx context.Context, req *proto.PollerStatusRequest) (*proto.PollerStatusResponse, error) {
if req.PollerId == "" {
return nil, errEmptyPollerID
}

now := time.Unix(req.Timestamp, 0)
log.Printf("Processing status report from %s at %s", req.PollerId, now.Format(time.RFC3339))

// First check if this node was previously down
var currentState bool
err := s.db.QueryRow("SELECT is_healthy FROM nodes WHERE node_id = ?", req.PollerId).Scan(&currentState)
if err != nil && !errors.Is(err, sql.ErrNoRows) {
log.Printf("Error checking node state: %v", err)
}

// Build API node status while processing
apiStatus := &api.NodeStatus{
NodeID: req.PollerId,
Expand All @@ -224,6 +245,28 @@ func (s *Server) ReportStatus(_ context.Context, req *proto.PollerStatusRequest)
apiStatus.IsHealthy = false
}

// Special handling for sweep service
if svc.ServiceType == "sweep" {
var sweepData proto.SweepServiceStatus
if err := json.Unmarshal([]byte(svc.Message), &sweepData); err != nil {
log.Printf("Error parsing sweep data: %v", err)
} else {
// If LastSweep is 0 or invalid, use current time
if sweepData.LastSweep <= 0 {
sweepData.LastSweep = now.Unix()
// Re-encode the sweep data with the fixed timestamp
if updatedMessage, err := json.Marshal(sweepData); err == nil {
svc.Message = string(updatedMessage)
}
}
log.Printf("Processed sweep data: network=%s hosts=%d/%d lastSweep=%s",
sweepData.Network,
sweepData.AvailableHosts,
sweepData.TotalHosts,
time.Unix(sweepData.LastSweep, 0).Format(time.RFC3339))
}
}

// Store in database
svcStatus := &db.ServiceStatus{
NodeID: req.PollerId,
Expand Down Expand Up @@ -269,6 +312,40 @@ func (s *Server) ReportStatus(_ context.Context, req *proto.PollerStatusRequest)
return nil, fmt.Errorf("failed to store node status: %w", err)
}

// Check if this is a recovery (node was previously down but is now reporting as healthy)
if err == nil && !currentState && apiStatus.IsHealthy {
log.Printf("Node %s has recovered, last seen at %s", req.PollerId, now.Format(time.RFC3339))

// Get the last downtime for the alert
lastDownTime := s.getLastDowntime(req.PollerId)

// Calculate downtime duration
var downtimeDuration string
if !lastDownTime.IsZero() {
downtimeDuration = now.Sub(lastDownTime).String()
} else {
downtimeDuration = "unknown"
}

// Send recovery alert
alert := &alerts.WebhookAlert{
Level: alerts.Info,
Title: "Node Recovered",
Message: fmt.Sprintf("Node '%s' is back online", req.PollerId),
NodeID: req.PollerId,
Timestamp: now.UTC().Format(time.RFC3339),
Details: map[string]any{
"hostname": getHostname(),
"downtime": downtimeDuration,
"recovery_time": now.Format(time.RFC3339),
"services": len(apiStatus.Services),
},
}

log.Printf("Sending recovery alert for node '%s'", req.PollerId)
s.sendAlert(ctx, alert)
}

// Verify storage
storedNode, err := s.db.GetNodeStatus(nodeStatus.NodeID)
if err != nil {
Expand Down Expand Up @@ -387,7 +464,7 @@ func (s *Server) sendStartupNotification(ctx context.Context) {
Timestamp: time.Now().UTC().Format(time.RFC3339),
NodeID: "cloud",
Details: map[string]any{
"version": "1.0.0",
"version": "1.0.1", // TODO: query version from DB
"hostname": getHostname(),
"pid": os.Getpid(),
"total_nodes": nodeCount,
Expand Down Expand Up @@ -426,36 +503,98 @@ func (s *Server) checkNeverReportedPollers(ctx context.Context, config *Config)
}
}

func (s *Server) markNodeDown(ctx context.Context, nodeID string, timestamp time.Time) {
// Update node status in database
status := &db.NodeStatus{
NodeID: nodeID,
IsHealthy: false,
LastSeen: timestamp,
func (s *Server) markNodeDown(ctx context.Context, nodeID string, lastSeen time.Time) {
// Begin transaction
tx, err := s.db.Begin()
if err != nil {
log.Printf("Error beginning transaction: %v", err)
return
}
defer func() {
if err != nil {
if rbErr := tx.Rollback(); rbErr != nil {
log.Printf("Error rolling back transaction: %v", rbErr)
}
}
}()

if err := s.db.UpdateNodeStatus(status); err != nil {
log.Printf("Error updating node down status: %v", err)
// First check the current state within the transaction
var currentState bool
err = tx.QueryRow("SELECT is_healthy FROM nodes WHERE node_id = ?", nodeID).Scan(&currentState)
if err != nil {
if err == sql.ErrNoRows {
// Node doesn't exist yet, insert it
_, err = tx.Exec(`
INSERT INTO nodes (node_id, last_seen, is_healthy)
VALUES (?, ?, FALSE)`,
nodeID, lastSeen)
if err != nil {
log.Printf("Error inserting new node: %v", err)
return
}
} else {
log.Printf("Error checking current node state: %v", err)
return
}
} else if !currentState {
// Node is already marked as down, no need to do anything
return
} else {
// Update existing node
_, err = tx.Exec(`
UPDATE nodes
SET is_healthy = FALSE,
last_seen = ?
WHERE node_id = ?`,
lastSeen, nodeID)
if err != nil {
log.Printf("Error updating node status: %v", err)
return
}
}

// Add to node history
_, err = tx.Exec(`
INSERT INTO node_history (node_id, timestamp, is_healthy)
VALUES (?, ?, FALSE)`,
nodeID, lastSeen)
if err != nil {
log.Printf("Error adding node history: %v", err)
return
}

// Send alert
// Commit the transaction
err = tx.Commit()
if err != nil {
log.Printf("Error committing transaction: %v", err)
return
}

// Send alert only if we successfully updated the database
alert := &alerts.WebhookAlert{
Level: alerts.Error,
Title: "Node Offline",
Message: fmt.Sprintf("Node '%s' is offline", nodeID),
NodeID: nodeID,
Timestamp: timestamp.UTC().Format(time.RFC3339),
Timestamp: lastSeen.UTC().Format(time.RFC3339),
Details: map[string]any{
"hostname": getHostname(),
"duration": time.Since(timestamp).String(),
"duration": time.Since(lastSeen).String(),
},
}

log.Printf("Sending offline alert for node '%s'", nodeID)

s.sendAlert(ctx, alert)

// Update API server state
if s.apiServer != nil {
offlineStatus := &api.NodeStatus{
NodeID: nodeID,
IsHealthy: false,
LastUpdate: lastSeen,
}
s.apiServer.UpdateNodeStatus(nodeID, offlineStatus)
}
}

func (s *Server) sendAlert(ctx context.Context, alert *alerts.WebhookAlert) {
Expand Down Expand Up @@ -528,10 +667,9 @@ func (s *Server) MonitorPollers(ctx context.Context) {

func (s *Server) checkPollers(ctx context.Context) {
now := time.Now()

alertThreshold := now.Add(-s.alertThreshold)

// Get all nodes and their current status
// Get all nodes that are currently marked as healthy but haven't been seen recently
const querySQL = `
SELECT n.node_id, n.last_seen, n.is_healthy
FROM nodes n
Expand All @@ -541,7 +679,6 @@ func (s *Server) checkPollers(ctx context.Context) {
rows, err := s.db.Query(querySQL, alertThreshold)
if err != nil {
log.Printf("Error querying nodes: %v", err)

return
}
defer func(rows *sql.Rows) {
Expand All @@ -553,26 +690,22 @@ func (s *Server) checkPollers(ctx context.Context) {

for rows.Next() {
var nodeID string

var lastSeen time.Time

var isHealthy bool

if err := rows.Scan(&nodeID, &lastSeen, &isHealthy); err != nil {
log.Printf("Error scanning node status: %v", err)

continue
}

timeSinceLastSeen := now.Sub(lastSeen)
log.Printf("Node %s status check: healthy=%v, last_seen=%v ago (threshold: %v)",
nodeID, isHealthy, timeSinceLastSeen, s.alertThreshold)
log.Printf("Node %s last seen %v ago (threshold: %v)",
nodeID, timeSinceLastSeen, s.alertThreshold)

// If the node was healthy but hasn't been seen recently, mark it down
// Only mark down if the node is currently marked as healthy
if isHealthy {
log.Printf("Node %s transitioning to DOWN state (last seen %v ago)",
nodeID, timeSinceLastSeen)
s.markNodeDown(ctx, nodeID, now)
log.Printf("Node %s transitioning to DOWN state", nodeID)
s.markNodeDown(ctx, nodeID, lastSeen)

// Update API server state
if s.apiServer != nil {
Expand All @@ -588,9 +721,22 @@ func (s *Server) checkPollers(ctx context.Context) {
}

func (s *Server) markNodeRecovered(ctx context.Context, nodeID string, timestamp time.Time) {
// First check if the node is actually down before marking it as recovered
var currentState bool
err := s.db.QueryRow("SELECT is_healthy FROM nodes WHERE node_id = ?", nodeID).Scan(&currentState)
if err != nil {
log.Printf("Error checking current node state: %v", err)
return
}

if currentState {
// Node is already marked as healthy, no need to send recovery alert
return
}

// Get the last downtime before updating status
var lastDownTime time.Time
err := s.db.QueryRow(`
err = s.db.QueryRow(`
SELECT timestamp
FROM node_history
WHERE node_id = ? AND is_healthy = FALSE
Expand Down
Loading