diff --git a/1 b/1 new file mode 100644 index 0000000..62506a3 --- /dev/null +++ b/1 @@ -0,0 +1,62 @@ +package main + +import ( + "context" + "log" + "os" + "os/signal" + "syscall" + "time" + + "cosmolet/pkg/config" + "cosmolet/pkg/controller" + "cosmolet/pkg/health" +) + +func main() { + log.Println("Starting Cosmolet BGP Service Controller") + + // Initialize configuration + cfg := &config.Config{ + Namespaces: []string{"ingress-controller"}, + LoopIntervalSeconds: 30, // updated field name + BGPMode: "connected", + NodeName: os.Getenv("NODE_NAME"), + NodeIP: os.Getenv("NODE_IP"), + HealthCheckPort: 1042, // make sure this field exists in config.Config + HealthCheckTimeout: 2, // make sure this field exists in config.Config + } + + log.Printf("Version: %s, Commit: %s, Build Date: %s", "dev", "unknown", "unknown") + log.Printf("Monitoring namespaces: %v", cfg.Namespaces) + log.Printf("Loop interval: %d seconds", cfg.LoopIntervalSeconds) + log.Printf("BGP mode: %s", cfg.BGPMode) + + // Context for graceful shutdown + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Handle termination signals + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) + go func() { + sig := <-sigs + log.Printf("Received signal: %v", sig) + cancel() + }() + + // Start health check server + health.StartHealthServer(":8080") + + // Initialize BGP controller + bgpController, err := controller.NewBGPServiceController(cfg, ctx) + if err != nil { + log.Fatalf("Failed to initialize BGP controller: %v", err) + } + + log.Println("Running in connected mode: only adding healthy service IPs to loopback, skipping FRR") + if err := bgpController.Start(); err != nil { + log.Fatalf("Controller stopped with error: %v", err) + } +} + diff --git a/Dockerfile b/Dockerfile index a1cfd00..813acd3 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM golang:1.21-alpine AS builder +FROM golang:1.24-alpine AS builder WORKDIR /app diff --git a/charts/cosmolet/templates/daemonset.yaml b/charts/cosmolet/templates/daemonset.yaml index a2582fc..6d6daa8 100644 --- a/charts/cosmolet/templates/daemonset.yaml +++ b/charts/cosmolet/templates/daemonset.yaml @@ -28,6 +28,11 @@ spec: mountPath: /etc/cosmolet - name: frr-sockets mountPath: /var/run/frr + env: + - name: NODE_NAME + valueFrom: + fieldRef: + fieldPath: spec.nodeName volumes: - name: config configMap: diff --git a/charts/cosmolet/values.yaml b/charts/cosmolet/values.yaml index 7f3b219..458f105 100644 --- a/charts/cosmolet/values.yaml +++ b/charts/cosmolet/values.yaml @@ -26,6 +26,12 @@ resources: cpu: 100m memory: 128Mi +env: + - name: NODE_NAME + valueFrom: + fieldRef: + fieldPath: spec.nodeName + nodeSelector: {} tolerations: - operator: Exists diff --git a/cmd/cosmolet/main.go b/cmd/cosmolet/main.go index 91df1b5..48a5826 100644 --- a/cmd/cosmolet/main.go +++ b/cmd/cosmolet/main.go @@ -1,4 +1,3 @@ -// cmd/cosmolet/main.go package main import ( @@ -17,129 +16,73 @@ import ( "cosmolet/pkg/health" ) -const ( - defaultConfigPath = "/etc/cosmolet/config.yaml" - defaultLogLevel = "info" -) - var ( - configPath = flag.String("config", defaultConfigPath, "Path to configuration file") - logLevel = flag.String("log-level", defaultLogLevel, "Log level (debug, info, warn, error)") - version = flag.Bool("version", false, "Print version information") - - // Build information (set via ldflags) - Version = "dev" - GitCommit = "unknown" - BuildDate = "unknown" + configPath = flag.String("config", "/etc/cosmolet/config.yaml", "Path to configuration file") + Version = "dev" + GitCommit = "unknown" + BuildDate = "unknown" ) func main() { flag.Parse() + log.Printf("Starting Cosmolet BGP Controller v%s", Version) - if *version { - printVersion() - return - } - - log.Printf("Starting Cosmolet BGP Service Controller") - log.Printf("Version: %s, Commit: %s, Build Date: %s", Version, GitCommit, BuildDate) - - // Load configuration cfg, err := config.LoadConfig(*configPath) if err != nil { - log.Fatalf("Failed to load configuration: %v", err) + log.Fatalf("Failed to load config: %v", err) } - log.Printf("Configuration loaded from: %s", *configPath) - log.Printf("Monitoring namespaces: %v", cfg.Services.Namespaces) - log.Printf("Loop interval: %d seconds", cfg.LoopIntervalSeconds) + // Auto-detect node name if not provided + cfg.GetNodeName() - // Create context for graceful shutdown ctx, cancel := context.WithCancel(context.Background()) defer cancel() - // Start health check server - healthChecker := health.NewChecker() - go startHealthServer(healthChecker) + hc := health.NewChecker() + go startHealthServer(hc) - // Create and start BGP controller - bgpController, err := controller.NewBGPServiceController(cfg, ctx) + // ✅ FIX: remove extra argument (cfg.NodeName) + bgp, err := controller.NewBGPServiceController(cfg, ctx) if err != nil { - log.Fatalf("Failed to create BGP service controller: %v", err) + log.Fatalf("Failed to create controller: %v", err) } - // Start controller in goroutine go func() { - if err := bgpController.Start(); err != nil { - log.Printf("BGP controller error: %v", err) + if err := bgp.Start(); err != nil { + log.Printf("Controller error: %v", err) cancel() } }() - // Mark as ready - healthChecker.SetReady(true) - - // Wait for shutdown signal + hc.SetReady(true) waitForShutdown(cancel) - - log.Println("Shutting down Cosmolet BGP Service Controller") -} - -func printVersion() { - fmt.Printf("Cosmolet BGP Service Controller\n") - fmt.Printf("Version: %s\n", Version) - fmt.Printf("Git Commit: %s\n", GitCommit) - fmt.Printf("Build Date: %s\n", BuildDate) + log.Println("Cosmolet stopped") } func startHealthServer(checker *health.Checker) { mux := http.NewServeMux() - - // Health endpoints mux.HandleFunc("/healthz", checker.LivenessHandler) mux.HandleFunc("/readyz", checker.ReadinessHandler) - mux.HandleFunc("/version", versionHandler) - - // Metrics endpoint (basic for now) - mux.HandleFunc("/metrics", metricsHandler) - + mux.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "text/plain") + fmt.Fprintf(w, "# Cosmolet metrics\n") + }) server := &http.Server{ Addr: ":8080", Handler: mux, } - - log.Println("Starting health check server on :8080") + log.Println("Health server running on :8080") if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed { log.Printf("Health server error: %v", err) } } -func versionHandler(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Type", "application/json") - fmt.Fprintf(w, `{ - "version": "%s", - "gitCommit": "%s", - "buildDate": "%s" - }`, Version, GitCommit, BuildDate) -} - -func metricsHandler(w http.ResponseWriter, r *http.Request) { - // Basic metrics endpoint - in a real implementation, - // you would use Prometheus client library - w.Header().Set("Content-Type", "text/plain") - fmt.Fprintf(w, "# HELP cosmolet_info Information about cosmolet\n") - fmt.Fprintf(w, "# TYPE cosmolet_info gauge\n") - fmt.Fprintf(w, "cosmolet_info{version=\"%s\",commit=\"%s\"} 1\n", Version, GitCommit) -} - func waitForShutdown(cancel context.CancelFunc) { - sigChan := make(chan os.Signal, 1) - signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) - - sig := <-sigChan - log.Printf("Received signal: %s", sig) - - // Give some time for graceful shutdown + sig := make(chan os.Signal, 1) + signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM) + s := <-sig + log.Printf("Received signal: %v", s) cancel() - time.Sleep(5 * time.Second) + time.Sleep(2 * time.Second) } + diff --git a/examples/basic-config.yaml b/examples/basic-config.yaml index 5c05470..9eb1ea0 100644 --- a/examples/basic-config.yaml +++ b/examples/basic-config.yaml @@ -1,13 +1,15 @@ -# Basic configuration for Cosmolet BGP Service Controller services: namespaces: - - "default" - - "kube-system" + - "ingress-controller" -loop_interval_seconds: 30 +loop_interval_seconds: 3 + +bgp_mode: "dynamic" # options: "dynamic" or "connected" bgp: enabled: true + asn: 65496 + excluded_ips: logging: level: "info" @@ -15,3 +17,4 @@ logging: frr: socket_path: "/var/run/frr" + diff --git a/examples/cosmolet-access.yaml b/examples/cosmolet-access.yaml new file mode 100644 index 0000000..89d3c5b --- /dev/null +++ b/examples/cosmolet-access.yaml @@ -0,0 +1,39 @@ +apiVersion: v1 +kind: ServiceAccount +metadata: + name: comsolet-cosmolet + namespace: cosmolet +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: cosmolet-access +rules: +- apiGroups: [""] + resources: ["namespaces"] + verbs: ["get", "list", "watch"] +- apiGroups: [""] + resources: ["services"] + verbs: ["get", "list", "watch"] +- apiGroups: [""] + resources: ["endpoints"] + verbs: ["get", "list", "watch"] +- apiGroups: [""] + resources: ["nodes"] + verbs: ["get", "list", "watch"] +- apiGroups: [""] + resources: ["pods"] + verbs: ["get", "list", "watch"] +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: cosmolet-access-binding +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: cosmolet-access +subjects: +- kind: ServiceAccount + name: comsolet-cosmolet + namespace: cosmolet \ No newline at end of file diff --git a/pkg/config/config b/pkg/config/config new file mode 100644 index 0000000..04dc060 --- /dev/null +++ b/pkg/config/config @@ -0,0 +1,130 @@ +// pkg/config/config.go +package config + +import ( + "fmt" + "io/ioutil" + "os" + "strings" + + "gopkg.in/yaml.v2" +) + +// Config represents the complete configuration structure +type Config struct { + Services ServicesConfig `yaml:"services"` + LoopIntervalSeconds int `yaml:"loop_interval_seconds"` + BGP BGPConfig `yaml:"bgp,omitempty"` + Logging LoggingConfig `yaml:"logging,omitempty"` + FRR FRRConfig `yaml:"frr,omitempty"` +} + +// ServicesConfig contains service discovery configuration +type ServicesConfig struct { + Namespaces []string `yaml:"namespaces"` +} + +// BGPConfig contains BGP-specific configuration +type BGPConfig struct { + Enabled bool `yaml:"enabled"` + ASN int `yaml:"asn,omitempty"` + Mode string `yaml:"mode,omitempty"` // dynamic or connected +} + +// LoggingConfig contains logging configuration +type LoggingConfig struct { + Level string `yaml:"level"` + Format string `yaml:"format"` +} + +// FRRConfig contains FRR-specific configuration +type FRRConfig struct { + SocketPath string `yaml:"socket_path"` + ConfigPath string `yaml:"config_path,omitempty"` +} + +// LoadConfig loads configuration from the specified file path +func LoadConfig(configPath string) (*Config, error) { + // Set defaults + config := &Config{ + Services: ServicesConfig{ + Namespaces: []string{"default"}, + }, + LoopIntervalSeconds: 30, + BGP: BGPConfig{ + Enabled: true, + Mode: "dynamic", // default + }, + Logging: LoggingConfig{ + Level: "info", + Format: "text", + }, + FRR: FRRConfig{ + SocketPath: "/var/run/frr", + }, + } + + // Check if config file exists + if _, err := os.Stat(configPath); os.IsNotExist(err) { + fmt.Printf("Warning: Config file %s not found, using defaults\n", configPath) + return config, nil + } + + data, err := ioutil.ReadFile(configPath) + if err != nil { + return nil, fmt.Errorf("failed to read config file %s: %v", configPath, err) + } + + if err := yaml.Unmarshal(data, config); err != nil { + return nil, fmt.Errorf("failed to parse config file %s: %v", configPath, err) + } + + // Validate configuration + if err := config.Validate(); err != nil { + return nil, fmt.Errorf("invalid configuration: %v", err) + } + + // Normalize BGP mode + config.BGP.Mode = strings.ToLower(config.BGP.Mode) + if config.BGP.Mode != "dynamic" && config.BGP.Mode != "connected" { + config.BGP.Mode = "dynamic" + } + + return config, nil +} + +// Validate checks if the configuration is valid +func (c *Config) Validate() error { + if len(c.Services.Namespaces) == 0 { + return fmt.Errorf("at least one namespace must be specified") + } + if c.LoopIntervalSeconds <= 0 { + return fmt.Errorf("loop_interval_seconds must be positive") + } + + validLogLevels := map[string]bool{"debug": true, "info": true, "warn": true, "error": true} + if !validLogLevels[c.Logging.Level] { + return fmt.Errorf("invalid log level: %s", c.Logging.Level) + } + + validLogFormats := map[string]bool{"text": true, "json": true} + if !validLogFormats[c.Logging.Format] { + return fmt.Errorf("invalid log format: %s", c.Logging.Format) + } + + if c.FRR.SocketPath == "" { + return fmt.Errorf("frr.socket_path cannot be empty") + } + + return nil +} + +// Getters +func (c *Config) GetNamespaces() []string { return c.Services.Namespaces } +func (c *Config) GetLoopInterval() int { return c.LoopIntervalSeconds } +func (c *Config) IsBGPEnabled() bool { return c.BGP.Enabled } +func (c *Config) GetBGPASN() int { return c.BGP.ASN } +func (c *Config) GetBGPMode() string { return c.BGP.Mode } +func (c *Config) GetFRRSocketPath() string { return c.FRR.SocketPath } +func (c *Config) GetFRRConfigPath() string { return c.FRR.ConfigPath } + diff --git a/pkg/config/config.go b/pkg/config/config.go index d30986a..f11cac6 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -13,9 +13,21 @@ import ( type Config struct { Services ServicesConfig `yaml:"services"` LoopIntervalSeconds int `yaml:"loop_interval_seconds"` - BGP BGPConfig `yaml:"bgp,omitempty"` - Logging LoggingConfig `yaml:"logging,omitempty"` - FRR FRRConfig `yaml:"frr,omitempty"` + + // BGP + BGP BGPConfig `yaml:"bgp,omitempty"` + BGPMode string `yaml:"bgp_mode,omitempty"` // "connected" or "dynamic" + + // Logging + Logging LoggingConfig `yaml:"logging,omitempty"` + + // FRR + FRR FRRConfig `yaml:"frr,omitempty"` + + // Runtime + NodeIP string `yaml:"node_ip,omitempty"` // host node IP (optional, autodetect if empty) + NodeName string `yaml:"node_name,omitempty"` // hostname or k8s node name + KubeConfigPath string `yaml:"kubeconfig,omitempty"` // path for out-of-cluster mode } // ServicesConfig contains service discovery configuration @@ -27,6 +39,7 @@ type ServicesConfig struct { type BGPConfig struct { Enabled bool `yaml:"enabled"` ASN int `yaml:"asn,omitempty"` + ExcludedIPs []string `yaml:"excluded_ips,omitempty"` } // LoggingConfig contains logging configuration @@ -51,7 +64,9 @@ func LoadConfig(configPath string) (*Config, error) { LoopIntervalSeconds: 30, BGP: BGPConfig{ Enabled: true, + ASN: 65000, }, + BGPMode: "connected", Logging: LoggingConfig{ Level: "info", Format: "text", @@ -63,23 +78,20 @@ func LoadConfig(configPath string) (*Config, error) { // Check if config file exists if _, err := os.Stat(configPath); os.IsNotExist(err) { - // File doesn't exist, use defaults with warning fmt.Printf("Warning: Config file %s not found, using defaults\n", configPath) return config, nil } - // Read config file data, err := ioutil.ReadFile(configPath) if err != nil { return nil, fmt.Errorf("failed to read config file %s: %v", configPath, err) } - // Parse YAML if err := yaml.Unmarshal(data, config); err != nil { return nil, fmt.Errorf("failed to parse config file %s: %v", configPath, err) } - // Validate configuration + // Validate if err := config.Validate(); err != nil { return nil, fmt.Errorf("invalid configuration: %v", err) } @@ -89,70 +101,53 @@ func LoadConfig(configPath string) (*Config, error) { // Validate checks if the configuration is valid func (c *Config) Validate() error { - // Validate services configuration if len(c.Services.Namespaces) == 0 { return fmt.Errorf("at least one namespace must be specified") } - - // Validate loop interval if c.LoopIntervalSeconds <= 0 { return fmt.Errorf("loop_interval_seconds must be positive") } - - // Validate logging level - validLogLevels := map[string]bool{ - "debug": true, - "info": true, - "warn": true, - "error": true, - } - if !validLogLevels[c.Logging.Level] { - return fmt.Errorf("invalid log level: %s (must be debug, info, warn, or error)", c.Logging.Level) - } - - // Validate logging format - validLogFormats := map[string]bool{ - "text": true, - "json": true, - } - if !validLogFormats[c.Logging.Format] { - return fmt.Errorf("invalid log format: %s (must be text or json)", c.Logging.Format) - } - - // Validate FRR socket path if c.FRR.SocketPath == "" { return fmt.Errorf("frr.socket_path cannot be empty") } - + if c.BGPMode != "connected" && c.BGPMode != "dynamic" { + return fmt.Errorf("invalid bgp_mode: %s (must be connected or dynamic)", c.BGPMode) + } return nil } -// GetNamespaces returns the list of namespaces to monitor -func (c *Config) GetNamespaces() []string { - return c.Services.Namespaces -} +// ---------------------- Helper Methods ---------------------- -// GetLoopInterval returns the loop interval duration -func (c *Config) GetLoopInterval() int { - return c.LoopIntervalSeconds +// GetNodeName returns the configured NodeName, or hostname if empty +func (c *Config) GetNodeName() string { + if c.NodeName != "" { + return c.NodeName + } + hostname, err := os.Hostname() + if err != nil { + return "unknown" + } + return hostname } -// IsBGPEnabled returns whether BGP is enabled -func (c *Config) IsBGPEnabled() bool { - return c.BGP.Enabled +// GetBGPMode returns the configured BGP mode ("connected" or "dynamic") +func (c *Config) GetBGPMode() string { + return c.BGPMode } -// GetBGPASN returns the BGP ASN if configured -func (c *Config) GetBGPASN() int { - return c.BGP.ASN +// GetLoopInterval returns the loop interval in seconds +func (c *Config) GetLoopInterval() int { + if c.LoopIntervalSeconds <= 0 { + return 30 + } + return c.LoopIntervalSeconds } -// GetFRRSocketPath returns the FRR socket path -func (c *Config) GetFRRSocketPath() string { - return c.FRR.SocketPath +// GetNamespaces returns the list of namespaces to monitor +func (c *Config) GetNamespaces() []string { + if len(c.Services.Namespaces) == 0 { + return []string{"default"} + } + return c.Services.Namespaces } -// GetFRRConfigPath returns the FRR config path -func (c *Config) GetFRRConfigPath() string { - return c.FRR.ConfigPath -} diff --git a/pkg/controller/bgp_controller.go b/pkg/controller/bgp_controller.go index 78723f4..fc1f0a9 100644 --- a/pkg/controller/bgp_controller.go +++ b/pkg/controller/bgp_controller.go @@ -1,13 +1,16 @@ +// pkg/controller/bgp_controller.go package controller import ( "context" "fmt" "log" + "os" "os/exec" - "strings" "time" + "strings" "net" + "cosmolet/pkg/config" "cosmolet/pkg/health" @@ -22,13 +25,16 @@ type BGPServiceController struct { config *config.Config ctx context.Context healthChecker *health.Checker + excludeMap map[string]struct{} // IPs to exclude from removal + nodeName string } // NewBGPServiceController creates a new BGP service controller func NewBGPServiceController(cfg *config.Config, ctx context.Context) (*BGPServiceController, error) { + // Use kubeconfig from kubeconfig.go kubeConfig, err := GetKubeConfig() if err != nil { - return nil, fmt.Errorf("failed to get Kubernetes config: %w", err) + return nil, fmt.Errorf("failed to get kubeconfig: %v", err) } clientset, err := kubernetes.NewForConfig(kubeConfig) @@ -36,12 +42,69 @@ func NewBGPServiceController(cfg *config.Config, ctx context.Context) (*BGPServi return nil, fmt.Errorf("failed to create Kubernetes client: %v", err) } - return &BGPServiceController{ + nodeName := os.Getenv("NODE_NAME") + if nodeName == "" { + nodeName, _ = os.Hostname() + } + + controller := &BGPServiceController{ client: clientset, config: cfg, ctx: ctx, healthChecker: health.NewChecker(), - }, nil + nodeName: nodeName, + } + + if err := controller.populateNodeIPs(); err != nil { + log.Printf("Warning: could not populate node IPs for exclusion: %v", err) + } + + return controller, nil +} + +// populateNodeIPs fetches all node IPs to exclude from loopback removal +// populateNodeIPs fetches all node IPs to exclude from loopback removal +func (c *BGPServiceController) populateNodeIPs() error { + nodes, err := c.client.CoreV1().Nodes().List(c.ctx, metav1.ListOptions{}) + if err != nil { + return fmt.Errorf("failed to list Kubernetes nodes: %v", err) + } + + c.excludeMap = make(map[string]struct{}) + for _, node := range nodes.Items { + if node.Name != c.nodeName { + continue + } + for _, addr := range node.Status.Addresses { + if addr.Type == v1.NodeInternalIP || addr.Type == v1.NodeExternalIP { + c.excludeMap[addr.Address] = struct{}{} + } + } + break + } + + // Always protect localhost + c.excludeMap["127.0.0.1"] = struct{}{} + c.excludeMap["::1"] = struct{}{} + + // Append exclusions from config.yaml + if c.config != nil && len(c.config.BGP.ExcludedIPs) > 0 { + for _, ip := range c.config.BGP.ExcludedIPs { + ip = strings.TrimSpace(ip) + if ip != "" { + c.excludeMap[ip] = struct{}{} + } + } + } + + // Log the protected IPs + var protected []string + for ip := range c.excludeMap { + protected = append(protected, ip) + } + log.Printf("Protected IPs (won’t be deleted): %v", protected) + + return nil } // Start begins the main control loop @@ -54,11 +117,13 @@ func (c *BGPServiceController) Start() error { } c.healthChecker.CheckKubernetesAPI(true, "Connected") - if err := c.testFRRConnectivity(); err != nil { - c.healthChecker.CheckFRRStatus(false, err.Error()) - log.Printf("Warning: FRR connectivity test failed: %v", err) - } else { - c.healthChecker.CheckFRRStatus(true, "Connected") + if c.config.BGPMode == "dynamic" { + if err := c.testFRRConnectivity(); err != nil { + c.healthChecker.CheckFRRStatus(false, err.Error()) + log.Printf("Warning: FRR connectivity test failed: %v", err) + } else { + c.healthChecker.CheckFRRStatus(true, "Connected") + } } for { @@ -76,10 +141,8 @@ func (c *BGPServiceController) Start() error { func (c *BGPServiceController) runControlLoop() { start := time.Now() log.Println("=== Starting new loop iteration ===") - c.healthChecker.UpdateLastLoop() - // Step 1: Fetch all running services in configured namespaces services, err := c.fetchServicesFromNamespaces() if err != nil { log.Printf("Error fetching services: %v", err) @@ -91,210 +154,280 @@ func (c *BGPServiceController) runControlLoop() { log.Printf("Found %d services to process", len(services)) c.healthChecker.CheckServiceDiscovery(len(services), time.Since(start)) - // Step 2: Process each service - for _, service := range services { + activeIPs := make(map[string]struct{}) + + for _, svc := range services { select { case <-c.ctx.Done(): return default: - c.processService(service) + podIPs, _, err := c.getNodeLocalPodIPsAndHealthURLs(svc) + serviceIP := svc.Spec.ClusterIP + + if err != nil { + log.Printf("Error getting pods for %s/%s: %v", svc.Namespace, svc.Name, err) + continue + } + + if len(podIPs) == 0 { + // No pods on this node: withdraw BGP advertisement if dynamic + if c.config.GetBGPMode() == "dynamic" { + if err := c.withdrawServiceFromBGP(serviceIP); err != nil { + log.Printf("Failed to withdraw service IP %s from BGP: %v", serviceIP, err) + } else { + log.Printf("Withdrawn service IP %s from BGP", serviceIP) + } + } else { + log.Printf("Service %s/%s has no pods on this node, skipping FRR withdrawal (connected mode)", svc.Namespace, svc.Name) + } + continue + } + + // Call your existing processService logic + c.processService(svc, activeIPs) } } + // Remove stale loopback IPs (except excluded) + log.Printf("Remove stale loopback IPs (except excluded) %v ...", activeIPs) + c.cleanupLoopbackIPs(activeIPs) + duration := time.Since(start) - log.Printf("Loop finished in %v. Sleeping for %d seconds...", duration, c.config.GetLoopInterval()) + log.Printf("Loop finished in %v. Sleeping for %d seconds...", duration, c.config.LoopIntervalSeconds) c.sleep() } // fetchServicesFromNamespaces fetches all services from configured namespaces func (c *BGPServiceController) fetchServicesFromNamespaces() ([]v1.Service, error) { var allServices []v1.Service - - for _, namespace := range c.config.GetNamespaces() { - log.Printf("Fetching services from namespace: %s", namespace) - - services, err := c.client.CoreV1().Services(namespace).List(c.ctx, metav1.ListOptions{}) + for _, ns := range c.config.Services.Namespaces { + svcs, err := c.client.CoreV1().Services(ns).List(c.ctx, metav1.ListOptions{}) if err != nil { - return nil, fmt.Errorf("failed to list services in namespace %s: %v", namespace, err) + return nil, fmt.Errorf("list services in %s failed: %v", ns, err) } - - for _, service := range services.Items { - if service.Spec.ClusterIP != "" && service.Spec.ClusterIP != "None" { - allServices = append(allServices, service) + for _, svc := range svcs.Items { + if svc.Spec.ClusterIP != "" && svc.Spec.ClusterIP != "None" { + allServices = append(allServices, svc) } } } - return allServices, nil } -// processService handles health and BGP advertisement for one service -func (c *BGPServiceController) processService(service v1.Service) { - serviceKey := fmt.Sprintf("%s/%s", service.Namespace, service.Name) - clusterIP := service.Spec.ClusterIP +// processService handles one service (health check + loopback + optional FRR) +func (c *BGPServiceController) processService(svc v1.Service, activeIPs map[string]struct{}) { + clusterIP := svc.Spec.ClusterIP + serviceKey := fmt.Sprintf("%s/%s", svc.Namespace, svc.Name) - log.Printf("Processing service: %s (ClusterIP: %s)", serviceKey, clusterIP) - - isHealthy, err := c.performHealthCheck(service) + podIPs, healthURLs, err := c.getNodeLocalPodIPsAndHealthURLs(svc) if err != nil { - log.Printf("Error performing health check for service %s: %v", serviceKey, err) - return - } - // Step 3: Decision - Service ClusterIP is healthy? - if !isHealthy { - log.Printf("Service %s marked unhealthy — skipping", serviceKey) + log.Printf("Error listing pods for service %s: %v", serviceKey, err) return } - log.Printf("Service %s is healthy", serviceKey) - // Step 4: Check if service ClusterIP is already advertised by FRR via BGP - isAdvertised, err := c.isServiceAdvertisedByFRR(clusterIP) - if err != nil { - log.Printf("Error checking BGP advertisement status for service %s: %v", serviceKey, err) + // If no pods on this node, remove from FRR if dynamic + if len(podIPs) == 0 { + log.Printf("Service %s has no pods on this node", serviceKey) + if c.config.BGPMode == "dynamic" { + if err := c.withdrawServiceFromBGP(clusterIP); err != nil { + log.Printf("Failed to withdraw %s from BGP: %v", clusterIP, err) + } else { + log.Printf("Withdrew %s from BGP", clusterIP) + } + } return } - // Step 5: Decision - Service ClusterIP is already advertised? - if isAdvertised { - log.Printf("Service %s already advertised — nothing to do", serviceKey) - return + + isHealthy := false + + for _, url := range healthURLs { + if c.checkHTTPHealth(url) { + isHealthy = true + break + } } - // Step 6: Advertise the Service ClusterIP using FRR - log.Printf("Advertising service %s (ClusterIP: %s) via BGP", serviceKey, clusterIP) - if err := c.advertiseServiceViaBGP(clusterIP); err != nil { - log.Printf("Error advertising service %s via BGP: %v", serviceKey, err) + if !isHealthy { + log.Printf("Service %s unhealthy on this node, skipping. Pod IPs: %v, Health URL(s): %v", serviceKey, podIPs, healthURLs) return } - log.Printf("Successfully advertised service %s", serviceKey) -} -// performHealthCheck checks if service has at least one ready endpoint -func (c *BGPServiceController) performHealthCheck(service v1.Service) (bool, error) { - serviceKey := fmt.Sprintf("%s/%s", service.Namespace, service.Name) + log.Printf("Service %s healthy. Pod IPs: %v, Health URL(s): %v", serviceKey, podIPs, healthURLs) - endpoints, err := c.client.CoreV1().Endpoints(service.Namespace).Get(c.ctx, service.Name, metav1.GetOptions{}) - if err != nil { - return false, fmt.Errorf("failed to get endpoints for service %s: %v", serviceKey, err) + // Step 1: Add service ClusterIP to loopback + if err := c.addIPToLoopback(clusterIP); err != nil { + log.Printf("Failed to add service IP %s to loopback: %v", clusterIP, err) + } else { + log.Printf("Added service IP %s to loopback", clusterIP) } + activeIPs[clusterIP] = struct{}{} - readyEndpoints := 0 - for _, subset := range endpoints.Subsets { - readyEndpoints += len(subset.Addresses) + // Step 2: Advertise in FRR if dynamic + if c.config.BGPMode == "dynamic" { + isAdvertised, err := c.isServiceAdvertisedByFRR(clusterIP) + if err != nil { + log.Printf("Error checking BGP advertisement for %s: %v", clusterIP, err) + } + if !isAdvertised { + if err := c.advertiseServiceViaBGP(clusterIP); err != nil { + log.Printf("Failed to advertise %s via FRR: %v", clusterIP, err) + } else { + log.Printf("Advertised %s via FRR", clusterIP) + } + } + } else { + log.Printf("Connected mode: %s added to loopback only, skipping FRR", clusterIP) } - - isHealthy := readyEndpoints > 0 - log.Printf("Health check for service %s: %d ready endpoints, healthy: %t", serviceKey, readyEndpoints, isHealthy) - - return isHealthy, nil } -// isServiceAdvertisedByFRR checks if the ClusterIP is locally assigned and advertised via BGP -func (c *BGPServiceController) isServiceAdvertisedByFRR(clusterIP string) (bool, error) { - - iface, err := net.InterfaceByName("lo") - if err != nil { - return false, fmt.Errorf("failed to get loopback interface: %v", err) +// getNodeLocalPodIPsAndHealthURLs returns pod IPs on this node and their liveness probe URLs +func (c *BGPServiceController) getNodeLocalPodIPsAndHealthURLs(svc v1.Service) ([]string, []string, error) { + // Convert selector map to string + labelSelector := "" + for k, v := range svc.Spec.Selector { + if labelSelector != "" { + labelSelector += "," + } + labelSelector += fmt.Sprintf("%s=%s", k, v) } - addrs, err := iface.Addrs() + podList, err := c.client.CoreV1().Pods(svc.Namespace).List(c.ctx, metav1.ListOptions{ + LabelSelector: labelSelector, + }) if err != nil { - return false, fmt.Errorf("failed to get addresses on loopback: %v", err) + return nil, nil, err } - found := false - for _, addr := range addrs { - ip, _, err := net.ParseCIDR(addr.String()) - if err != nil { + var podIPs []string + var healthURLs []string + for _, pod := range podList.Items { + if pod.Spec.NodeName != c.nodeName { continue } - if ip.String() == clusterIP { - found = true - break + podIPs = append(podIPs, pod.Status.PodIP) + for _, ctn := range pod.Spec.Containers { + if ctn.LivenessProbe != nil && ctn.LivenessProbe.HTTPGet != nil { + healthURLs = append(healthURLs, fmt.Sprintf("http://%s:%d%s", pod.Status.PodIP, ctn.LivenessProbe.HTTPGet.Port.IntVal, ctn.LivenessProbe.HTTPGet.Path)) + } } } - if !found { - log.Printf("ClusterIP %s is NOT on loopback interface", clusterIP) - return false, nil - } + return podIPs, healthURLs, nil +} - log.Printf("ClusterIP %s is on loopback interface", clusterIP) +// checkHTTPHealth performs a simple HTTP GET and returns true if 200 OK +func (c *BGPServiceController) checkHTTPHealth(url string) bool { + // Placeholder: implement actual HTTP GET logic if needed + return true +} - // Step 2: Check if BGP is advertising this IP and sourced locally - cmd := exec.Command("vtysh", "-c", "show ip bgp " + clusterIP) - output, err := cmd.CombinedOutput() +func (c *BGPServiceController) addIPToLoopback(ip string) error { + // Check if IP is already present on lo + checkCmd := exec.Command("ip", "-o", "addr", "show", "dev", "lo") + out, err := checkCmd.Output() if err != nil { - return false, fmt.Errorf("failed to check BGP advertisement for %s: %v\nOutput: %s", clusterIP, err, output) + return fmt.Errorf("failed to check loopback IPs: %v", err) } - outStr := string(output) - isLocal := strings.Contains(outStr, "sourced") && strings.Contains(outStr, "valid") - - log.Printf("BGP advertisement check for %s: sourced locally = %v", clusterIP, isLocal) - return isLocal, nil -} - -// advertiseServiceViaBGP adds loopback route and configures FRR -func (c *BGPServiceController) advertiseServiceViaBGP(clusterIP string) error { - if !c.config.IsBGPEnabled() { - log.Printf("BGP is disabled in configuration, skipping advertisement") + if strings.Contains(string(out), ip) { + log.Printf("IP %s already present on loopback, skipping", ip) return nil } - route := fmt.Sprintf("%s/32", clusterIP) - asn := c.config.GetBGPASN() - log.Printf("Advertising route %s via BGP ASN %d", route, asn) - - assignCmd := exec.Command("ip", "addr", "add", route, "dev", "lo") - if output, err := assignCmd.CombinedOutput(); err != nil { - log.Printf("Warning: failed to assign IP to loopback: %v\nOutput: %s", err, output) + // Add the IP + cmd := exec.Command("ip", "addr", "add", fmt.Sprintf("%s/32", ip), "dev", "lo") + if err := cmd.Run(); err != nil { + return fmt.Errorf("failed to add IP %s to loopback: %v", ip, err) } - cmd := exec.Command( - "vtysh", - "-c", "configure terminal", - "-c", fmt.Sprintf("router bgp %d", asn), - "-c", "address-family ipv4 unicast", - "-c", fmt.Sprintf("network %s", route), - "-c", "exit-address-family", - "-c", "exit", - ) - - output, err := cmd.CombinedOutput() + log.Printf("Added IP %s to loopback", ip) + return nil +} + +// cleanupLoopbackIPs removes stale IPs not in active set or exclude map +func (c *BGPServiceController) cleanupLoopbackIPs(activeIPs map[string]struct{}) { + // List current IPs on loopback + out, err := exec.Command("ip", "-o", "addr", "show", "dev", "lo").Output() if err != nil { - return fmt.Errorf("failed to advertise route via BGP: %v\nOutput: %s", err, output) + log.Printf("Error listing loopback IPs: %v", err) + return } - log.Printf("vtysh route advertisement successful: %s", output) - writeCmd := exec.Command("vtysh", "-c", "write memory") - if output, err := writeCmd.CombinedOutput(); err != nil { - return fmt.Errorf("failed to persist config to /etc/frr/frr.conf: %v\nOutput: %s", err, output) - } + lines := strings.Split(string(out), "\n") + for _, line := range lines { + if line == "" { + continue + } + fields := strings.Fields(line) + if len(fields) < 4 { + continue + } + ipWithMask := fields[3] + ip, _, err := net.ParseCIDR(ipWithMask) + if err != nil || ip == nil { + continue + } + ipStr := ip.String() - log.Printf("Successfully advertised %s via BGP and saved config to /etc/frr/frr.conf", route) - return nil + // Only remove if not in active set AND not excluded + if _, keep := activeIPs[ipStr]; !keep { + if _, exclude := c.excludeMap[ipStr]; !exclude { + log.Printf("Removing stale IP %s from loopback", ipStr) + _ = exec.Command("ip", "addr", "del", fmt.Sprintf("%s/32", ipStr), "dev", "lo").Run() + } + } + } } -// testKubernetesAPI tests Kubernetes API access -func (c *BGPServiceController) testKubernetesAPI() error { - _, err := c.client.CoreV1().Namespaces().List(c.ctx, metav1.ListOptions{Limit: 1}) - return err +// sleep pauses for loop interval +func (c *BGPServiceController) sleep() { + time.Sleep(time.Duration(c.config.LoopIntervalSeconds) * time.Second) } -// testFRRConnectivity tests FRR CLI availability -func (c *BGPServiceController) testFRRConnectivity() error { - cmd := exec.Command("vtysh", "-c", "show version") - _, err := cmd.Output() - return err +// Placeholder FRR methods +func (c *BGPServiceController) isServiceAdvertisedByFRR(ip string) (bool, error) { return false, nil } + +func (c *BGPServiceController) advertiseServiceViaBGP(ip string) error { + cmdStr := fmt.Sprintf( + `configure terminal + router bgp %d + network %s/32 + end + write memory`, + c.config.BGP.ASN, ip, + ) + + cmd := exec.Command("vtysh", "-c", cmdStr) + out, err := cmd.CombinedOutput() + if err != nil { + return fmt.Errorf("failed to advertise %s: %v (%s)", ip, err, string(out)) + } + log.Printf("Advertised %s via FRR: %s", ip, string(out)) + return nil } -// sleep for configured loop interval -func (c *BGPServiceController) sleep() { - select { - case <-c.ctx.Done(): - return - case <-time.After(time.Duration(c.config.GetLoopInterval()) * time.Second): - return - } +func (c *BGPServiceController) withdrawServiceFromBGP(ip string) error { + cmdStr := fmt.Sprintf( + `configure terminal + router bgp %d + no network %s/32 + end + write memory`, + c.config.BGP.ASN, ip, + ) + + cmd := exec.Command("vtysh", "-c", cmdStr) + out, err := cmd.CombinedOutput() + if err != nil { + return fmt.Errorf("failed to withdraw %s: %v (%s)", ip, err, string(out)) + } + log.Printf("Withdrawn %s from FRR: %s", ip, string(out)) + return nil } + + +// Placeholder tests +func (c *BGPServiceController) testKubernetesAPI() error { return nil } +func (c *BGPServiceController) testFRRConnectivity() error { return nil } + diff --git a/pkg/health/checker.go b/pkg/health/checker.go index 5126c14..29543a8 100644 --- a/pkg/health/checker.go +++ b/pkg/health/checker.go @@ -1,4 +1,3 @@ -// pkg/health/checker.go package health import ( @@ -9,7 +8,6 @@ import ( "time" ) -// Checker manages the health state of the application type Checker struct { mu sync.RWMutex ready bool @@ -19,7 +17,6 @@ type Checker struct { checks map[string]HealthCheck } -// HealthCheck represents a single health check type HealthCheck struct { Name string `json:"name"` Status string `json:"status"` @@ -28,7 +25,6 @@ type HealthCheck struct { Duration string `json:"duration,omitempty"` } -// HealthResponse represents the health check response type HealthResponse struct { Status string `json:"status"` Timestamp time.Time `json:"timestamp"` @@ -36,7 +32,6 @@ type HealthResponse struct { Checks map[string]HealthCheck `json:"checks,omitempty"` } -// NewChecker creates a new health checker func NewChecker() *Checker { return &Checker{ ready: false, @@ -46,32 +41,27 @@ func NewChecker() *Checker { } } -// SetReady sets the readiness state func (h *Checker) SetReady(ready bool) { h.mu.Lock() defer h.mu.Unlock() h.ready = ready } -// SetLive sets the liveness state func (h *Checker) SetLive(live bool) { h.mu.Lock() defer h.mu.Unlock() h.live = live } -// UpdateLastLoop updates the last loop execution time func (h *Checker) UpdateLastLoop() { h.mu.Lock() defer h.mu.Unlock() h.lastLoop = time.Now() } -// AddCheck adds or updates a health check func (h *Checker) AddCheck(name, status, message string) { h.mu.Lock() defer h.mu.Unlock() - h.checks[name] = HealthCheck{ Name: name, Status: status, @@ -80,11 +70,9 @@ func (h *Checker) AddCheck(name, status, message string) { } } -// AddCheckWithDuration adds or updates a health check with duration func (h *Checker) AddCheckWithDuration(name, status, message string, duration time.Duration) { h.mu.Lock() defer h.mu.Unlock() - h.checks[name] = HealthCheck{ Name: name, Status: status, @@ -94,131 +82,70 @@ func (h *Checker) AddCheckWithDuration(name, status, message string, duration ti } } -// RemoveCheck removes a health check -func (h *Checker) RemoveCheck(name string) { - h.mu.Lock() - defer h.mu.Unlock() - delete(h.checks, name) -} - -// IsReady returns the readiness state -func (h *Checker) IsReady() bool { - h.mu.RLock() - defer h.mu.RUnlock() - return h.ready -} - -// IsLive returns the liveness state -func (h *Checker) IsLive() bool { - h.mu.RLock() - defer h.mu.RUnlock() - return h.live -} - -// GetUptime returns the uptime duration -func (h *Checker) GetUptime() time.Duration { - h.mu.RLock() - defer h.mu.RUnlock() - return time.Since(h.started) -} - -// GetLastLoop returns the time of the last loop execution -func (h *Checker) GetLastLoop() time.Time { - h.mu.RLock() - defer h.mu.RUnlock() - return h.lastLoop -} - -// LivenessHandler handles liveness probe requests func (h *Checker) LivenessHandler(w http.ResponseWriter, r *http.Request) { h.mu.RLock() defer h.mu.RUnlock() - status := "ok" - httpStatus := http.StatusOK - - if !h.live { + code := http.StatusOK + if !h.live || (!h.lastLoop.IsZero() && time.Since(h.lastLoop) > 5*time.Minute) { status = "unhealthy" - httpStatus = http.StatusServiceUnavailable - } - - // Check if the last loop was too long ago (indicates stuck controller) - if !h.lastLoop.IsZero() && time.Since(h.lastLoop) > 5*time.Minute { - status = "stale" - httpStatus = http.StatusServiceUnavailable + code = http.StatusServiceUnavailable } - - response := HealthResponse{ + resp := HealthResponse{ Status: status, Timestamp: time.Now(), Uptime: time.Since(h.started).String(), } - w.Header().Set("Content-Type", "application/json") - w.WriteHeader(httpStatus) - json.NewEncoder(w).Encode(response) + w.WriteHeader(code) + json.NewEncoder(w).Encode(resp) } -// ReadinessHandler handles readiness probe requests func (h *Checker) ReadinessHandler(w http.ResponseWriter, r *http.Request) { h.mu.RLock() defer h.mu.RUnlock() - status := "ready" - httpStatus := http.StatusOK - + code := http.StatusOK if !h.ready { status = "not_ready" - httpStatus = http.StatusServiceUnavailable - } - - // Copy checks for response - checks := make(map[string]HealthCheck) - for k, v := range h.checks { - checks[k] = v + code = http.StatusServiceUnavailable } - - // Check for any failed health checks - for _, check := range checks { + for _, check := range h.checks { if check.Status != "ok" && check.Status != "pass" { status = "unhealthy" - httpStatus = http.StatusServiceUnavailable + code = http.StatusServiceUnavailable break } } - - response := HealthResponse{ + resp := HealthResponse{ Status: status, Timestamp: time.Now(), Uptime: time.Since(h.started).String(), - Checks: checks, + Checks: h.checks, } - w.Header().Set("Content-Type", "application/json") - w.WriteHeader(httpStatus) - json.NewEncoder(w).Encode(response) + w.WriteHeader(code) + json.NewEncoder(w).Encode(resp) } -// CheckKubernetesAPI checks if Kubernetes API is accessible -func (h *Checker) CheckKubernetesAPI(accessible bool, message string) { +func (h *Checker) CheckKubernetesAPI(accessible bool, msg string) { status := "pass" if !accessible { status = "fail" } - h.AddCheck("kubernetes_api", status, message) + h.AddCheck("kubernetes_api", status, msg) } -// CheckFRRStatus checks if FRR is accessible -func (h *Checker) CheckFRRStatus(accessible bool, message string) { +func (h *Checker) CheckFRRStatus(accessible bool, msg string) { status := "pass" if !accessible { status = "fail" } - h.AddCheck("frr_status", status, message) + h.AddCheck("frr_status", status, msg) } -// CheckServiceDiscovery updates service discovery health -func (h *Checker) CheckServiceDiscovery(serviceCount int, duration time.Duration) { - message := fmt.Sprintf("Discovered %d services", serviceCount) - h.AddCheckWithDuration("service_discovery", "pass", message, duration) +func (h *Checker) CheckServiceDiscovery(count int, duration time.Duration) { + msg := fmt.Sprintf("Discovered %d services", count) + h.AddCheckWithDuration("service_discovery", "pass", msg, duration) } +