diff --git a/pkg/cloud/server.go b/pkg/cloud/server.go index f844a25..6b83e9b 100644 --- a/pkg/cloud/server.go +++ b/pkg/cloud/server.go @@ -198,8 +198,22 @@ func NewServer(ctx context.Context, config *Config) (*Server, error) { return server, nil } +func processSweepStatus(status *proto.ServiceStatus) (*proto.SweepServiceStatus, error) { + var sweepData proto.SweepServiceStatus + if err := json.Unmarshal([]byte(status.Message), &sweepData); err != nil { + return nil, fmt.Errorf("failed to parse sweep data: %w", err) + } + + // If LastSweep is 0 or invalid, use current time + if sweepData.LastSweep <= 0 { + sweepData.LastSweep = time.Now().Unix() + } + + return &sweepData, nil +} + // ReportStatus handles incoming status reports from pollers. -func (s *Server) ReportStatus(_ context.Context, req *proto.PollerStatusRequest) (*proto.PollerStatusResponse, error) { +func (s *Server) ReportStatus(ctx context.Context, req *proto.PollerStatusRequest) (*proto.PollerStatusResponse, error) { if req.PollerId == "" { return nil, errEmptyPollerID } @@ -207,6 +221,13 @@ func (s *Server) ReportStatus(_ context.Context, req *proto.PollerStatusRequest) now := time.Unix(req.Timestamp, 0) log.Printf("Processing status report from %s at %s", req.PollerId, now.Format(time.RFC3339)) + // First check if this node was previously down + var currentState bool + err := s.db.QueryRow("SELECT is_healthy FROM nodes WHERE node_id = ?", req.PollerId).Scan(¤tState) + if err != nil && !errors.Is(err, sql.ErrNoRows) { + log.Printf("Error checking node state: %v", err) + } + // Build API node status while processing apiStatus := &api.NodeStatus{ NodeID: req.PollerId, @@ -224,6 +245,28 @@ func (s *Server) ReportStatus(_ context.Context, req *proto.PollerStatusRequest) apiStatus.IsHealthy = false } + // Special handling for sweep service + if svc.ServiceType == "sweep" { + var sweepData proto.SweepServiceStatus + if err := json.Unmarshal([]byte(svc.Message), &sweepData); err != nil { + log.Printf("Error parsing sweep data: %v", err) + } else { + // If LastSweep is 0 or invalid, use current time + if sweepData.LastSweep <= 0 { + sweepData.LastSweep = now.Unix() + // Re-encode the sweep data with the fixed timestamp + if updatedMessage, err := json.Marshal(sweepData); err == nil { + svc.Message = string(updatedMessage) + } + } + log.Printf("Processed sweep data: network=%s hosts=%d/%d lastSweep=%s", + sweepData.Network, + sweepData.AvailableHosts, + sweepData.TotalHosts, + time.Unix(sweepData.LastSweep, 0).Format(time.RFC3339)) + } + } + // Store in database svcStatus := &db.ServiceStatus{ NodeID: req.PollerId, @@ -269,6 +312,40 @@ func (s *Server) ReportStatus(_ context.Context, req *proto.PollerStatusRequest) return nil, fmt.Errorf("failed to store node status: %w", err) } + // Check if this is a recovery (node was previously down but is now reporting as healthy) + if err == nil && !currentState && apiStatus.IsHealthy { + log.Printf("Node %s has recovered, last seen at %s", req.PollerId, now.Format(time.RFC3339)) + + // Get the last downtime for the alert + lastDownTime := s.getLastDowntime(req.PollerId) + + // Calculate downtime duration + var downtimeDuration string + if !lastDownTime.IsZero() { + downtimeDuration = now.Sub(lastDownTime).String() + } else { + downtimeDuration = "unknown" + } + + // Send recovery alert + alert := &alerts.WebhookAlert{ + Level: alerts.Info, + Title: "Node Recovered", + Message: fmt.Sprintf("Node '%s' is back online", req.PollerId), + NodeID: req.PollerId, + Timestamp: now.UTC().Format(time.RFC3339), + Details: map[string]any{ + "hostname": getHostname(), + "downtime": downtimeDuration, + "recovery_time": now.Format(time.RFC3339), + "services": len(apiStatus.Services), + }, + } + + log.Printf("Sending recovery alert for node '%s'", req.PollerId) + s.sendAlert(ctx, alert) + } + // Verify storage storedNode, err := s.db.GetNodeStatus(nodeStatus.NodeID) if err != nil { @@ -387,7 +464,7 @@ func (s *Server) sendStartupNotification(ctx context.Context) { Timestamp: time.Now().UTC().Format(time.RFC3339), NodeID: "cloud", Details: map[string]any{ - "version": "1.0.0", + "version": "1.0.1", // TODO: query version from DB "hostname": getHostname(), "pid": os.Getpid(), "total_nodes": nodeCount, @@ -426,36 +503,98 @@ func (s *Server) checkNeverReportedPollers(ctx context.Context, config *Config) } } -func (s *Server) markNodeDown(ctx context.Context, nodeID string, timestamp time.Time) { - // Update node status in database - status := &db.NodeStatus{ - NodeID: nodeID, - IsHealthy: false, - LastSeen: timestamp, +func (s *Server) markNodeDown(ctx context.Context, nodeID string, lastSeen time.Time) { + // Begin transaction + tx, err := s.db.Begin() + if err != nil { + log.Printf("Error beginning transaction: %v", err) + return } + defer func() { + if err != nil { + if rbErr := tx.Rollback(); rbErr != nil { + log.Printf("Error rolling back transaction: %v", rbErr) + } + } + }() - if err := s.db.UpdateNodeStatus(status); err != nil { - log.Printf("Error updating node down status: %v", err) + // First check the current state within the transaction + var currentState bool + err = tx.QueryRow("SELECT is_healthy FROM nodes WHERE node_id = ?", nodeID).Scan(¤tState) + if err != nil { + if err == sql.ErrNoRows { + // Node doesn't exist yet, insert it + _, err = tx.Exec(` + INSERT INTO nodes (node_id, last_seen, is_healthy) + VALUES (?, ?, FALSE)`, + nodeID, lastSeen) + if err != nil { + log.Printf("Error inserting new node: %v", err) + return + } + } else { + log.Printf("Error checking current node state: %v", err) + return + } + } else if !currentState { + // Node is already marked as down, no need to do anything + return + } else { + // Update existing node + _, err = tx.Exec(` + UPDATE nodes + SET is_healthy = FALSE, + last_seen = ? + WHERE node_id = ?`, + lastSeen, nodeID) + if err != nil { + log.Printf("Error updating node status: %v", err) + return + } + } + // Add to node history + _, err = tx.Exec(` + INSERT INTO node_history (node_id, timestamp, is_healthy) + VALUES (?, ?, FALSE)`, + nodeID, lastSeen) + if err != nil { + log.Printf("Error adding node history: %v", err) return } - // Send alert + // Commit the transaction + err = tx.Commit() + if err != nil { + log.Printf("Error committing transaction: %v", err) + return + } + + // Send alert only if we successfully updated the database alert := &alerts.WebhookAlert{ Level: alerts.Error, Title: "Node Offline", Message: fmt.Sprintf("Node '%s' is offline", nodeID), NodeID: nodeID, - Timestamp: timestamp.UTC().Format(time.RFC3339), + Timestamp: lastSeen.UTC().Format(time.RFC3339), Details: map[string]any{ "hostname": getHostname(), - "duration": time.Since(timestamp).String(), + "duration": time.Since(lastSeen).String(), }, } log.Printf("Sending offline alert for node '%s'", nodeID) - s.sendAlert(ctx, alert) + + // Update API server state + if s.apiServer != nil { + offlineStatus := &api.NodeStatus{ + NodeID: nodeID, + IsHealthy: false, + LastUpdate: lastSeen, + } + s.apiServer.UpdateNodeStatus(nodeID, offlineStatus) + } } func (s *Server) sendAlert(ctx context.Context, alert *alerts.WebhookAlert) { @@ -528,10 +667,9 @@ func (s *Server) MonitorPollers(ctx context.Context) { func (s *Server) checkPollers(ctx context.Context) { now := time.Now() - alertThreshold := now.Add(-s.alertThreshold) - // Get all nodes and their current status + // Get all nodes that are currently marked as healthy but haven't been seen recently const querySQL = ` SELECT n.node_id, n.last_seen, n.is_healthy FROM nodes n @@ -541,7 +679,6 @@ func (s *Server) checkPollers(ctx context.Context) { rows, err := s.db.Query(querySQL, alertThreshold) if err != nil { log.Printf("Error querying nodes: %v", err) - return } defer func(rows *sql.Rows) { @@ -553,26 +690,22 @@ func (s *Server) checkPollers(ctx context.Context) { for rows.Next() { var nodeID string - var lastSeen time.Time - var isHealthy bool if err := rows.Scan(&nodeID, &lastSeen, &isHealthy); err != nil { log.Printf("Error scanning node status: %v", err) - continue } timeSinceLastSeen := now.Sub(lastSeen) - log.Printf("Node %s status check: healthy=%v, last_seen=%v ago (threshold: %v)", - nodeID, isHealthy, timeSinceLastSeen, s.alertThreshold) + log.Printf("Node %s last seen %v ago (threshold: %v)", + nodeID, timeSinceLastSeen, s.alertThreshold) - // If the node was healthy but hasn't been seen recently, mark it down + // Only mark down if the node is currently marked as healthy if isHealthy { - log.Printf("Node %s transitioning to DOWN state (last seen %v ago)", - nodeID, timeSinceLastSeen) - s.markNodeDown(ctx, nodeID, now) + log.Printf("Node %s transitioning to DOWN state", nodeID) + s.markNodeDown(ctx, nodeID, lastSeen) // Update API server state if s.apiServer != nil { @@ -588,9 +721,22 @@ func (s *Server) checkPollers(ctx context.Context) { } func (s *Server) markNodeRecovered(ctx context.Context, nodeID string, timestamp time.Time) { + // First check if the node is actually down before marking it as recovered + var currentState bool + err := s.db.QueryRow("SELECT is_healthy FROM nodes WHERE node_id = ?", nodeID).Scan(¤tState) + if err != nil { + log.Printf("Error checking current node state: %v", err) + return + } + + if currentState { + // Node is already marked as healthy, no need to send recovery alert + return + } + // Get the last downtime before updating status var lastDownTime time.Time - err := s.db.QueryRow(` + err = s.db.QueryRow(` SELECT timestamp FROM node_history WHERE node_id = ? AND is_healthy = FALSE