Skip to content

Conversation

cloud-j-luna
Copy link
Member

@cloud-j-luna cloud-j-luna commented Sep 8, 2025

Summary by CodeRabbit

  • New Features

    • Session affinity across RPC/REST/gRPC using a sticky latency policy for more consistent routing.
    • Support for client-provided session keys via the X-PROXY-KEY header.
    • Short sticky latency window (short-term stickiness) to stabilize backend selection.
  • Bug Fixes

    • Fixed REST routing that could report “no servers available” after a successful selection.
  • Tests

    • Expanded coverage for session affinity, TTL/cleanup, concurrency, cache semantics, and server updates.

Copy link

coderabbitai bot commented Sep 8, 2025

Walkthrough

Makes the load balancer request-aware and adds a StickyLatencyBased balancer that uses the X-PROXY-KEY header for session affinity with TTL and cleanup. Proxies now pass the incoming *http.Request to LoadBalancer.NextServer. cmd/main.go switches proxies to use the sticky latency policy and tweaks viper declaration. Tests expanded.

Changes

Cohort / File(s) Summary of changes
Load balancer implementation & API
internal/proxy/balancer.go
Reworked load balancer API to NextServer(*http.Request) *Server. Added ProxyKeyHeader constant and StickyLatencyBased type with NewStickyLatencyBased(log *slog.Logger, sessionTimeout time.Duration), session map, TTL/cleanup ticker, NextServer(req *http.Request), Update, and Stop. Updated RoundRobin and LatencyBased methods to NextServer.
Proxy integrations
internal/proxy/grpc.go, internal/proxy/rest.go, internal/proxy/rpc.go
Proxies now call lb.NextServer(r) (request-aware selection). REST proxy returns immediately after successful proxying. Error path preserved when no server returned.
Tests & mocks
internal/proxy/balancer_test.go, internal/proxy/proxy_test.go
Added comprehensive tests for sticky sessions, TTL/cleanup, cache semantics, concurrency, and server updates. Renamed test methods to use NextServer(*http.Request). Added helpers to create test requests/servers. Mock Next renamed to NextServer.
Main wiring
cmd/main.go
Replaced proxy.NewLatencyBased(log) with proxy.NewStickyLatencyBased(log, blockTime) and added blockTime := 6 * time.Second. Changed var v = viper.New() to v := viper.New().

Sequence Diagram(s)

sequenceDiagram
    autonumber
    actor Client
    participant Proxy as REST/RPC/gRPC Proxy
    participant LB as StickyLatencyBased
    participant Latency as LatencyBased
    participant Backend as Backend Server
    rect rgb(240,248,255)
    Client->>Proxy: HTTP request (with/without X-PROXY-KEY)
    Proxy->>LB: NextServer(request)
    end
    alt X-PROXY-KEY present and maps to healthy server
        LB-->>Proxy: return bound server
    else No session or expired / header absent
        LB->>Latency: NextServer(request)
        Latency-->>LB: selected server
        LB->>LB: record session mapping (if header present)
        LB-->>Proxy: return selected server
    end
    alt Server found
        Proxy->>Backend: forward request (reverse proxy)
        Backend-->>Proxy: response
        Proxy-->>Client: response
    else No server
        Proxy-->>Client: 500 Internal Server Error
    end
    Note over LB: periodic cleanup removes expired sessions
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Suggested reviewers

  • chainzero

"I hop through headers, nose a-prowl and quick,
X-PROXY-KEY hums, 'stick to this pick.'
Six seconds of trust, sessions held tight,
Latency-led hops guide me through night.
Thump-thump — sticky affinity, coded just right. 🥕✨"

Pre-merge checks (2 passed, 1 warning)

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 8.33% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title Check ✅ Passed The title succinctly describes the addition of a sticky session latency-based load balancer, aligning with the main changes in the code. It clearly conveys the feature being introduced without extraneous details. The use of the conventional commit prefix ‘feat:’ follows standard commit messaging guidelines.
✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch luna/sticky-session-lb

Comment @coderabbitai help to get the list of available commands and usage tips.

@cloud-j-luna cloud-j-luna marked this pull request as ready for review September 9, 2025 08:56
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
internal/proxy/balancer.go (2)

50-64: RoundRobin.Next leaks the mutex when server list is empty.
Early return without Unlock() will deadlock subsequent calls.

Apply:

 func (rr *RoundRobin) Next(_ *http.Request) *Server {
   rr.mu.Lock()
   if len(rr.servers) == 0 {
-    return nil
+    rr.mu.Unlock()
+    return nil
   }
   server := rr.servers[rr.round%len(rr.servers)]
 
   rr.round++
   rr.mu.Unlock()
   if server.Healthy() {
     return server
   }
   rr.log.Warn("server is unhealthy, trying next", "name", server.name)
   return rr.Next(nil)
 }

306-312: Stop should signal the goroutine in addition to stopping the ticker.
Close stopCh to exit cleanupExpiredSessions.

 func (slb *StickyLatencyBased) Stop() {
   if slb.sessionCleanupTicker != nil {
     slb.sessionCleanupTicker.Stop()
   }
+  if slb.stopCh != nil {
+    close(slb.stopCh)
+  }
 }
🧹 Nitpick comments (8)
internal/proxy/rpc.go (1)

50-55: Passing the request to the LB is correct; also strip the sticky header before proxying upstream.

Next(r) aligns with the request-aware balancer. To avoid leaking the internal affinity token to backends, remove the sticky header before forwarding.

-	if srv := p.lb.Next(r); srv != nil {
+	if srv := p.lb.Next(r); srv != nil {
+		// Do not forward internal affinity header to upstream.
+		r.Header.Del("X-PROXY-KEY") // use the package constant if available
 		proxy := newRedirectFollowingReverseProxy(srv, p.log, "rpc")
internal/proxy/grpc.go (1)

47-76: Request-aware LB: LGTM. Consider dropping the sticky header inside Director.

Selection via Next(r) is good. Like REST/RPC, prevent leaking X-PROXY-KEY to the upstream by deleting it in Director.

-			Director: func(request *http.Request) {
+			Director: func(request *http.Request) {
+				// Do not forward internal affinity header to upstream.
+				request.Header.Del("X-PROXY-KEY") // use package constant if available
 				request.URL.Scheme = srv.Url.Scheme
internal/proxy/proxy_test.go (1)

322-324: Mock signature update: LGTM. Add a targeted test for header-based stickiness.

The signature matches the new interface. Consider adding a test that:

  • sets X-PROXY-KEY on requests,
  • verifies that successive Next(req) return the same backend within the sticky window,
  • and asserts that the header is not forwarded to the backend handler.
internal/proxy/rest.go (1)

50-55: Next(r) change looks good; also sanitize internal headers.

Prevent exposing X-PROXY-KEY to upstream services.

-	if srv := p.lb.Next(r); srv != nil {
+	if srv := p.lb.Next(r); srv != nil {
+		// Do not forward internal affinity header to upstream.
+		r.Header.Del("X-PROXY-KEY") // use the package constant if available
 		proxy := newRedirectFollowingReverseProxy(srv, p.log, "rest")
internal/proxy/balancer.go (2)

50-64: Avoid unbounded recursion when all servers are unhealthy.
Prefer a bounded loop over recursion to prevent stack growth.

-  rr.log.Warn("server is unhealthy, trying next", "name", server.name)
-  return rr.Next(nil)
+  rr.log.Warn("server is unhealthy, trying next", "name", server.name)
+  for i := 0; i < len(rr.servers); i++ {
+    rr.mu.Lock()
+    server = rr.servers[rr.round%len(rr.servers)]
+    rr.round++
+    rr.mu.Unlock()
+    if server.Healthy() {
+      return server
+    }
+  }
+  return nil

167-185: Good encapsulation: sticky layer composes latency-based policy.
Clear warning about single-replica state is helpful.

If you deploy multiple replicas, consider:

  • Header-based consistent hashing at the ingress to keep sessions on the same replica, or
  • Externalizing sessionMap to a shared store (e.g., Redis) with TTLs.
internal/proxy/balancer_test.go (2)

170-175: Minor: typo in error message.
“selected 9 time” → “9 times”.

- t.Errorf("expected server \"a\" to be selected 9 time, got selected %d times", selectedServers["a"])
+ t.Errorf("expected server \"a\" to be selected 9 times, got selected %d times", selectedServers["a"])

286-300: Test couples to internals by replacing ticker and starting another goroutine.
Once Stop/cleanup use a stop channel, prefer injecting the cleanup interval via constructor or a test hook to avoid poking internals.

If acceptable, I can add a NewStickyLatencyBasedWithTicker(log, timeout, tick time.Duration) test-only constructor.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a14fada and d958d92.

📒 Files selected for processing (7)
  • cmd/main.go (2 hunks)
  • internal/proxy/balancer.go (6 hunks)
  • internal/proxy/balancer_test.go (4 hunks)
  • internal/proxy/grpc.go (1 hunks)
  • internal/proxy/proxy_test.go (1 hunks)
  • internal/proxy/rest.go (1 hunks)
  • internal/proxy/rpc.go (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (3)
internal/proxy/balancer.go (1)
internal/proxy/server.go (1)
  • Server (28-37)
internal/proxy/balancer_test.go (4)
internal/proxy/server.go (1)
  • Server (28-37)
internal/proxy/balancer.go (2)
  • NewStickyLatencyBased (189-206)
  • ProxyKeyHeader (16-16)
internal/seed/seed.go (3)
  • New (63-72)
  • Node (20-24)
  • Status (27-42)
internal/avg/moving.go (1)
  • Moving (13-17)
cmd/main.go (4)
internal/proxy/rpc.go (1)
  • NewRPCProxy (23-37)
internal/proxy/balancer.go (1)
  • NewStickyLatencyBased (189-206)
internal/proxy/rest.go (1)
  • NewRestProxy (23-37)
internal/proxy/grpc.go (1)
  • NewGRPCProxy (23-35)
🔇 Additional comments (4)
cmd/main.go (1)

267-269: Minor: viper initialization change is fine.

Short variable declaration is idiomatic here.

internal/proxy/balancer.go (2)

15-16: Header name looks good and centralized.
Using a single constant for the sticky key is correct and avoids drift across components.


25-26: *All Next call sites now use the http.Request parameter; interface, implementations, tests, and mocks are consistent.

internal/proxy/balancer_test.go (1)

521-547: Helpers look good and deterministic.
Using fixed URLs and per-test latencies keeps tests stable.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
internal/proxy/balancer.go (1)

50-64: Fix deadlock on empty set and remove unbounded recursion.

  • Missing Unlock on empty servers -> deadlock.
  • Recursive retry can stack overflow when all are unhealthy. Switch to bounded loop.
-func (rr *RoundRobin) Next(_ *http.Request) *Server {
-	rr.mu.Lock()
-	if len(rr.servers) == 0 {
-		return nil
-	}
-	server := rr.servers[rr.round%len(rr.servers)]
-
-	rr.round++
-	rr.mu.Unlock()
-	if server.Healthy() {
-		return server
-	}
-	rr.log.Warn("server is unhealthy, trying next", "name", server.name)
-	return rr.Next(nil)
-}
+func (rr *RoundRobin) Next(_ *http.Request) *Server {
+	rr.mu.Lock()
+	n := len(rr.servers)
+	if n == 0 {
+		rr.mu.Unlock()
+		return nil
+	}
+	start := rr.round
+	rr.round++
+	// Try at most n servers to avoid recursion/stack growth.
+	for i := 0; i < n; i++ {
+		idx := (start + i) % n
+		srv := rr.servers[idx]
+		if srv != nil && srv.Healthy() {
+			rr.mu.Unlock()
+			return srv
+		}
+	}
+	rr.mu.Unlock()
+	rr.log.Warn("all servers unhealthy; returning nil")
+	return nil
+}
♻️ Duplicate comments (6)
internal/proxy/balancer.go (6)

172-185: Add stop channel to make cleanup goroutine stoppable.
Prevents goroutine leak after Stop().

 type StickyLatencyBased struct {
 	// LatencyBased provides the core latency-based selection functionality
 	*LatencyBased
@@
 	// sessionTimestamps tracks when sessions were last accessed.
 	sessionTimestamps map[string]time.Time
+	// stopCh signals cleanup goroutine to exit.
+	stopCh chan struct{}
+	// stopOnce makes Stop idempotent.
+	stopOnce sync.Once
 }

194-205: Initialize stopCh and tighten construction.
Ensures cleanup goroutine can exit.

 	slb := &StickyLatencyBased{
 		LatencyBased:      NewLatencyBased(log),
 		sessionMap:        make(map[string]*Server),
 		sessionTimestamps: make(map[string]time.Time),
 		sessionTimeout:    sessionTimeout,
+		stopCh:            make(chan struct{}),
 	}
 
 	slb.sessionCleanupTicker = time.NewTicker(5 * time.Minute)
 	go slb.cleanupExpiredSessions()

207-268: Validate sticky target (health + membership) and remove redundant LB lock.

  • Don’t hold LB.mu just to check empty; delegate to LatencyBased.Next.
  • On sticky hit, ensure server is healthy and still in current pool; otherwise drop mapping and reselect.
 func (slb *StickyLatencyBased) Next(req *http.Request) *Server {
-	if req == nil {
-		slb.log.Warn("provided request is nil")
-		return slb.LatencyBased.Next(nil)
-	}
-
-	slb.LatencyBased.mu.Lock()
-	if len(slb.LatencyBased.servers) == 0 {
-		slb.LatencyBased.mu.Unlock()
-		return nil
-	}
-	slb.LatencyBased.mu.Unlock()
-
-	sessionID := slb.extractSessionID(req)
-
-	if sessionID != "" {
-		slb.sessionMu.RLock()
-		if server, exists := slb.sessionMap[sessionID]; exists {
-			lastAccessed := slb.sessionTimestamps[sessionID]
-			if time.Since(lastAccessed) > slb.sessionTimeout {
-				slb.sessionMu.RUnlock()
-
-				slb.sessionMu.Lock()
-				delete(slb.sessionMap, sessionID)
-				delete(slb.sessionTimestamps, sessionID)
-				slb.sessionMu.Unlock()
-
-				slb.log.Info("session timed out, removed",
-					"session_id", sessionID,
-					"server", server.name,
-					"last_accessed", lastAccessed)
-			} else {
-				slb.sessionMu.RUnlock()
-
-				slb.sessionMu.Lock()
-				slb.sessionTimestamps[sessionID] = time.Now()
-				slb.sessionMu.Unlock()
-				return server
-			}
-		} else {
-			slb.sessionMu.RUnlock()
-		}
-	}
+	if req == nil {
+		slb.log.Warn("provided request is nil")
+		return slb.LatencyBased.Next(nil)
+	}
+	sessionID := slb.extractSessionID(req)
+	if sessionID != "" {
+		// Read mapping without grabbing LB lock to avoid lock-order inversion with Update().
+		slb.sessionMu.RLock()
+		server, exists := slb.sessionMap[sessionID]
+		lastAccessed := slb.sessionTimestamps[sessionID]
+		slb.sessionMu.RUnlock()
+
+		if exists {
+			// Expired?
+			if time.Since(lastAccessed) > slb.sessionTimeout {
+				slb.sessionMu.Lock()
+				delete(slb.sessionMap, sessionID)
+				delete(slb.sessionTimestamps, sessionID)
+				slb.sessionMu.Unlock()
+				slb.log.Info("session timed out, removed", "session_id", sessionID)
+			} else {
+				// Validate health and membership in current pool.
+				inPool := false
+				if server != nil {
+					slb.LatencyBased.mu.Lock()
+					for _, rs := range slb.LatencyBased.servers {
+						if rs.Server == server {
+							inPool = true
+							break
+						}
+					}
+					slb.LatencyBased.mu.Unlock()
+				}
+				if inPool && server.Healthy() {
+					slb.sessionMu.Lock()
+					slb.sessionTimestamps[sessionID] = time.Now()
+					slb.sessionMu.Unlock()
+					return server
+				}
+				// Drop stale/unhealthy mapping and reselect.
+				slb.sessionMu.Lock()
+				delete(slb.sessionMap, sessionID)
+				delete(slb.sessionTimestamps, sessionID)
+				slb.sessionMu.Unlock()
+				slb.log.Info("sticky server invalid; reselecting", "session_id", sessionID)
+			}
+		}
+	}
 
 	server := slb.LatencyBased.Next(req)
 
 	if server != nil && sessionID != "" {
 		slb.sessionMu.Lock()
 		slb.sessionMap[sessionID] = server
 		slb.sessionTimestamps[sessionID] = time.Now()
 		slb.sessionMu.Unlock()
 
 		slb.log.Debug("created new sticky session",
 			"session_id", sessionID,
 			"server", server.name)
 	}
 
 	return server
 }

277-282: Prune sticky mappings for removed backends on Update.
Prevents stale stickiness to deleted servers.

 func (slb *StickyLatencyBased) Update(servers []*Server) {
 	slb.LatencyBased.Update(servers)
+	// Build allowed set by pointer.
+	allowed := make(map[*Server]struct{}, len(servers))
+	for _, s := range servers {
+		if s != nil {
+			allowed[s] = struct{}{}
+		}
+	}
+	// Drop sessions pointing to servers not in the pool.
+	slb.sessionMu.Lock()
+	for sid, srv := range slb.sessionMap {
+		if srv == nil {
+			delete(slb.sessionMap, sid)
+			delete(slb.sessionTimestamps, sid)
+			continue
+		}
+		if _, ok := allowed[srv]; !ok {
+			delete(slb.sessionMap, sid)
+			delete(slb.sessionTimestamps, sid)
+			slb.log.Debug("removed sticky session for deleted server", "session_id", sid, "server", srv.name)
+		}
+	}
+	slb.sessionMu.Unlock()
 }

283-297: Make cleanup goroutine stoppable and exit on Stop().
Use select on ticker and stopCh.

-func (slb *StickyLatencyBased) cleanupExpiredSessions() {
-	for range slb.sessionCleanupTicker.C {
-		slb.sessionMu.Lock()
-		now := time.Now()
-		for sessionID, timestamp := range slb.sessionTimestamps {
-			if now.Sub(timestamp) > slb.sessionTimeout {
-				delete(slb.sessionMap, sessionID)
-				delete(slb.sessionTimestamps, sessionID)
-				slb.log.Debug("cleaned up expired session", "session_id", sessionID)
-			}
-		}
-		slb.sessionMu.Unlock()
-	}
-}
+func (slb *StickyLatencyBased) cleanupExpiredSessions() {
+	for {
+		select {
+		case <-slb.sessionCleanupTicker.C:
+			slb.sessionMu.Lock()
+			now := time.Now()
+			for sessionID, timestamp := range slb.sessionTimestamps {
+				if now.Sub(timestamp) > slb.sessionTimeout {
+					delete(slb.sessionMap, sessionID)
+					delete(slb.sessionTimestamps, sessionID)
+					slb.log.Debug("cleaned up expired session", "session_id", sessionID)
+				}
+			}
+			slb.sessionMu.Unlock()
+		case <-slb.stopCh:
+			return
+		}
+	}
+}

299-304: Make Stop idempotent and signal goroutine exit.
Close stopCh via sync.Once; stop ticker too.

 func (slb *StickyLatencyBased) Stop() {
-	if slb.sessionCleanupTicker != nil {
-		slb.sessionCleanupTicker.Stop()
-	}
+	slb.stopOnce.Do(func() {
+		if slb.sessionCleanupTicker != nil {
+			slb.sessionCleanupTicker.Stop()
+		}
+		if slb.stopCh != nil {
+			close(slb.stopCh)
+		}
+	})
 }
🧹 Nitpick comments (5)
internal/proxy/balancer.go (5)

67-71: Defensively prune nil servers on update.
Avoids potential nil deref in Next(). Low-cost filter.

 func (rr *RoundRobin) Update(servers []*Server) {
 	rr.mu.Lock()
-	rr.servers = servers
+	filtered := make([]*Server, 0, len(servers))
+	for _, s := range servers {
+		if s != nil {
+			filtered = append(filtered, s)
+		}
+	}
+	rr.servers = filtered
 	rr.mu.Unlock()
 }

139-165: Exclude unhealthy backends from weighted set.
Latency policy currently ignores health. Build weights only from Healthy() servers; handle all-filtered-out case.

 func (rr *LatencyBased) Update(servers []*Server) {
 	rr.mu.Lock()
 	defer rr.mu.Unlock()
 
-	var totalInverse float64
-	rr.servers = make([]*RatedServer, len(servers))
-
-	for i := range servers {
-		// Calculate the latency of the server in milliseconds
-		latency := float64(servers[i].node.Status.Latency.Milliseconds())
-		// Avoid division by zero by using a small epsilon value if latency is 0
-		if latency == 0 {
-			latency = epsilon
-		}
-		// Calculate the rate of the server as the inverse of its latency
-		rr.servers[i] = &RatedServer{
-			Server: servers[i],
-			Rate:   1 / latency,
-		}
-		totalInverse += rr.servers[i].Rate
-	}
+	var totalInverse float64
+	rated := make([]*RatedServer, 0, len(servers))
+	for i := range servers {
+		if servers[i] == nil || !servers[i].Healthy() {
+			continue
+		}
+		latency := float64(servers[i].node.Status.Latency.Milliseconds())
+		if latency == 0 {
+			latency = epsilon
+		}
+		rs := &RatedServer{Server: servers[i], Rate: 1 / latency}
+		rated = append(rated, rs)
+		totalInverse += rs.Rate
+	}
+	if len(rated) == 0 {
+		rr.servers = nil
+		return
+	}
+	rr.servers = rated
 
 	// Normalize the rates to ensure they sum to 1.0
-	for i := range servers {
-		rr.servers[i].Rate /= totalInverse
-	}
+	if totalInverse == 0 {
+		w := 1.0 / float64(len(rr.servers))
+		for i := range rr.servers {
+			rr.servers[i].Rate = w
+		}
+	} else {
+		for i := range rr.servers {
+			rr.servers[i].Rate /= totalInverse
+		}
+	}
 }

270-275: Header-only vs. comment mismatch; optionally support cookie.
Either change comments to “header only” or also read cookie for parity.

-// It only checks for the X-PROXY-KEY header. If not provided, returns empty string
+// It checks for the X-PROXY-KEY header (and optionally cookie of the same name). If not provided, returns empty string
 func (slb *StickyLatencyBased) extractSessionID(req *http.Request) string {
-	return req.Header.Get(ProxyKeyHeader)
+	if v := req.Header.Get(ProxyKeyHeader); v != "" {
+		return v
+	}
+	if c, err := req.Cookie(ProxyKeyHeader); err == nil && c != nil {
+		return c.Value
+	}
+	return ""
 }

237-241: Avoid logging full session identifiers.
Treat X-PROXY-KEY as sensitive; redact or hash before logging.

Also applies to: 262-265


170-175: Note on multi-replica deployments.
Without shared state, stickiness breaks across replicas. Consider consistent hashing or an external store (e.g., Redis) if horizontal scaling is required.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between d958d92 and 0c7a509.

📒 Files selected for processing (2)
  • cmd/main.go (2 hunks)
  • internal/proxy/balancer.go (6 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • cmd/main.go
🧰 Additional context used
🧬 Code graph analysis (1)
internal/proxy/balancer.go (1)
internal/proxy/server.go (1)
  • Server (28-37)

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
internal/proxy/balancer.go (2)

50-64: Fix two issues: missing unlock on empty set + unbounded recursion can deadlock/stack overflow.

  • Lock is not released when len==0 (Line 52), causing deadlock.
  • Recursive retry loops forever if all servers are unhealthy.

Apply:

-func (rr *RoundRobin) Next(_ *http.Request) *Server {
-	rr.mu.Lock()
-	if len(rr.servers) == 0 {
-		return nil
-	}
-	server := rr.servers[rr.round%len(rr.servers)]
-
-	rr.round++
-	rr.mu.Unlock()
-	if server.Healthy() {
-		return server
-	}
-	rr.log.Warn("server is unhealthy, trying next", "name", server.name)
-	return rr.Next(nil)
-}
+func (rr *RoundRobin) Next(_ *http.Request) *Server {
+	rr.mu.Lock()
+	n := len(rr.servers)
+	if n == 0 {
+		rr.mu.Unlock()
+		return nil
+	}
+	start := rr.round
+	for i := 0; i < n; i++ {
+		idx := (start + i) % n
+		srv := rr.servers[idx]
+		if srv.Healthy() {
+			rr.round = idx + 1
+			rr.mu.Unlock()
+			return srv
+		}
+	}
+	// advance round to avoid hot-spot
+	rr.round = start + 1
+	rr.mu.Unlock()
+	rr.log.Warn("no healthy servers available")
+	return nil
+}

111-132: Latency policy should avoid routing to unhealthy servers.

Currently may pick an unhealthy server; stickiness path checks health, but fresh selections don’t.

-func (rr *LatencyBased) Next(_ *http.Request) *Server {
+func (rr *LatencyBased) Next(_ *http.Request) *Server {
 	rr.mu.Lock()
 	defer rr.mu.Unlock()
 
 	// Return nil if no servers are available
 	if len(rr.servers) == 0 {
 		return nil
 	}
 
-	r := rr.randomizer.Float64()
-	cumulative := 0.0
-	for _, s := range rr.servers {
-		cumulative += s.Rate
-		if r <= cumulative {
-			return s.Server
-		}
-	}
-	// Fallback, shouldn't be reached if rates are normalized
-	return rr.servers[len(rr.servers)-1].Server
+	// consider only healthy servers; renormalize on-the-fly
+	total := 0.0
+	for _, s := range rr.servers {
+		if s.Server != nil && s.Server.Healthy() {
+			total += s.Rate
+		}
+	}
+	if total == 0 {
+		return nil
+	}
+	r := rr.randomizer.Float64() * total
+	cumulative := 0.0
+	var last *Server
+	for _, s := range rr.servers {
+		if s.Server != nil && s.Server.Healthy() {
+			cumulative += s.Rate
+			last = s.Server
+			if r <= cumulative {
+				return s.Server
+			}
+		}
+	}
+	return last
 }
♻️ Duplicate comments (1)
internal/proxy/balancer.go (1)

172-185: Goroutine leak: ticker channel is never closed; make cleanup stoppable with a stop channel.

for range ticker.C blocks after Stop(). Add stopCh and select.

Apply:

 type StickyLatencyBased struct {
  	// LatencyBased provides the core latency-based selection functionality
  	*LatencyBased
@@
  	// sessionTimestamps tracks when sessions were last accessed.
  	sessionTimestamps map[string]time.Time
+  // stopCh signals cleanup goroutine to exit.
+  stopCh chan struct{}
+  // stopOnce ensures idempotent Stop().
+  stopOnce sync.Once
 }
@@
 func NewStickyLatencyBased(log *slog.Logger, sessionTimeout time.Duration) *StickyLatencyBased {
@@
  	slb := &StickyLatencyBased{
  		LatencyBased:      NewLatencyBased(log),
  		sessionMap:        make(map[string]*Server),
  		sessionTimestamps: make(map[string]time.Time),
  		sessionTimeout:    sessionTimeout,
+ 		stopCh:            make(chan struct{}),
  	}
@@
-func (slb *StickyLatencyBased) cleanupExpiredSessions() {
-	for range slb.sessionCleanupTicker.C {
-		slb.sessionMu.Lock()
-		now := time.Now()
-		for sessionID, timestamp := range slb.sessionTimestamps {
-			if now.Sub(timestamp) > slb.sessionTimeout {
-				delete(slb.sessionMap, sessionID)
-				delete(slb.sessionTimestamps, sessionID)
-				slb.log.Debug("cleaned up expired session", "session_id", sessionID)
-			}
-		}
-		slb.sessionMu.Unlock()
-	}
-}
+func (slb *StickyLatencyBased) cleanupExpiredSessions() {
+	for {
+		select {
+		case <-slb.sessionCleanupTicker.C:
+			slb.sessionMu.Lock()
+			now := time.Now()
+			for sessionID, timestamp := range slb.sessionTimestamps {
+				if now.Sub(timestamp) > slb.sessionTimeout {
+					delete(slb.sessionMap, sessionID)
+					delete(slb.sessionTimestamps, sessionID)
+					slb.log.Debug("cleaned up expired session", "session_id", sessionID)
+				}
+			}
+			slb.sessionMu.Unlock()
+		case <-slb.stopCh:
+			return
+		}
+	}
+}
@@
-func (slb *StickyLatencyBased) Stop() {
-	if slb.sessionCleanupTicker != nil {
-		slb.sessionCleanupTicker.Stop()
-	}
-}
+func (slb *StickyLatencyBased) Stop() {
+	slb.stopOnce.Do(func() {
+		if slb.stopCh != nil {
+			close(slb.stopCh)
+		}
+		if slb.sessionCleanupTicker != nil {
+			slb.sessionCleanupTicker.Stop()
+		}
+	})
+}

Also applies to: 318-332, 334-339

🧹 Nitpick comments (4)
internal/proxy/balancer.go (4)

216-222: Redundant locking/empty-check; let LatencyBased.Next handle empty set.

This adds contention and duplicated logic.

-	slb.LatencyBased.mu.Lock()
-	if len(slb.LatencyBased.servers) == 0 {
-		slb.LatencyBased.mu.Unlock()
-		return nil
-	}
-	slb.LatencyBased.mu.Unlock()
+	// rely on slb.LatencyBased.Next(req) to handle empty server sets

15-16: Minor: consider making the sticky header configurable and avoid logging full IDs.

  • Expose header name via config (env/viper) to interoperate with different clients.
  • Privacy: consider hashing/truncating the session ID in logs.

Also applies to: 275-280


318-332: Operational: expose cleanup interval as config if you anticipate tuning under load.

Static 5m may be too coarse for high-churn sessions. Optional.


211-273: Optional: validate cached server is still in active pool before honoring stickiness.

Update() pruning helps, but there’s a window where a removed server could be returned once. You can scan the active set under LatencyBased.mu to confirm membership before returning.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 0c7a509 and a98c3ef.

📒 Files selected for processing (2)
  • internal/proxy/balancer.go (6 hunks)
  • internal/proxy/balancer_test.go (4 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • internal/proxy/balancer_test.go
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-09-09T09:20:27.571Z
Learnt from: cloud-j-luna
PR: akash-network/rpc-proxy#37
File: internal/proxy/balancer.go:0-0
Timestamp: 2025-09-09T09:20:27.571Z
Learning: In Go, calling Stop() on a time.Ticker stops it from sending values but doesn't close the channel. Goroutines using `for range ticker.C` will block indefinitely after Stop() is called and won't exit, causing a goroutine leak. Use a select statement with a stop channel for proper cleanup.

Applied to files:

  • internal/proxy/balancer.go
🧬 Code graph analysis (1)
internal/proxy/balancer.go (1)
internal/proxy/server.go (1)
  • Server (28-37)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: build
  • GitHub Check: build
🔇 Additional comments (4)
internal/proxy/balancer.go (4)

282-287: Good: pruning stale stickiness on Update.

This prevents routing to removed backends and keeps the map bounded by active servers. LGTM.

Also applies to: 289-316


210-214: Nil request path: OK.

Falling back to latency policy when req is nil is reasonable and avoids panic.


139-165: Guard against nil Server entries in LatencyBased.Update
Add a nil check when iterating over the servers slice to skip or error on nil entries and avoid panics, or explicitly document/assert that callers will never pass nil.


25-26: Approve API change: all LoadBalancer.Next calls updated Verified via repository-wide search that no zero-argument .Next() calls or old-signature implementations remain.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (3)
internal/proxy/balancer_test.go (1)

479-523: Missing assertions for session cleanup after server removal.

After updating servers to remove "server1", the test doesn't verify that:

  1. The session no longer routes to the removed server
  2. The session mapping was properly cleaned up or updated

Apply this diff to add the missing assertions:

   // Request with same session should now go to a different server since server1 is gone
   server2 := lb.NextServer(req)
   if server2 == nil {
     t.Fatal("expected a server to be selected")
   }
 
+  // Must not still be server1
+  if server2.name == server1.name {
+    t.Fatalf("expected a different server after update; still got %s", server2.name)
+  }
+
   // Verify session mapping still exists and points to a valid server
   lb.sessionMu.RLock()
   server, exists := lb.sessionMap["session123"]
   lb.sessionMu.RUnlock()
 
-  if exists && server == nil {
-    t.Error("session exists but points to nil server")
-  }
+  if !exists || server == nil || server.name != server2.name {
+    t.Errorf("session mapping not updated; exists=%v, mapped=%v, expected=%s", 
+             exists, server != nil ? server.name : "nil", server2.name)
+  }
internal/proxy/balancer.go (2)

319-331: Goroutine leak: cleanup routine won't exit when Stop() is called.

Based on the learnings from our previous discussion, calling Stop() on a time.Ticker doesn't close the channel. The for range loop will block indefinitely after Stop() is called, causing a goroutine leak.

Apply this diff to make the cleanup goroutine stoppable:

 type StickyLatencyBased struct {
   *LatencyBased
   sessionMap        map[string]*Server
   sessionMu         sync.RWMutex
   sessionTimeout    time.Duration
   sessionCleanupTicker *time.Ticker
   sessionTimestamps map[string]time.Time
+  stopCh           chan struct{}
 }

 func NewStickyLatencyBased(log *slog.Logger, sessionTimeout time.Duration) *StickyLatencyBased {
   if sessionTimeout == 0 {
     sessionTimeout = 30 * time.Minute
   }
 
   slb := &StickyLatencyBased{
     LatencyBased:      NewLatencyBased(log),
     sessionMap:        make(map[string]*Server),
     sessionTimestamps: make(map[string]time.Time),
     sessionTimeout:    sessionTimeout,
+    stopCh:            make(chan struct{}),
   }
 
   slb.sessionCleanupTicker = time.NewTicker(5 * time.Minute)
   go slb.cleanupExpiredSessions()
 
   return slb
 }

-func (slb *StickyLatencyBased) cleanupExpiredSessions() {
-  for range slb.sessionCleanupTicker.C {
-    slb.sessionMu.Lock()
-    now := time.Now()
-    for sessionID, timestamp := range slb.sessionTimestamps {
-      if now.Sub(timestamp) > slb.sessionTimeout {
-        delete(slb.sessionMap, sessionID)
-        delete(slb.sessionTimestamps, sessionID)
-        slb.log.Debug("cleaned up expired session", "session_id", sessionID)
-      }
-    }
-    slb.sessionMu.Unlock()
-  }
-}
+func (slb *StickyLatencyBased) cleanupExpiredSessions() {
+  for {
+    select {
+    case <-slb.sessionCleanupTicker.C:
+      slb.sessionMu.Lock()
+      now := time.Now()
+      for sessionID, timestamp := range slb.sessionTimestamps {
+        if now.Sub(timestamp) > slb.sessionTimeout {
+          delete(slb.sessionMap, sessionID)
+          delete(slb.sessionTimestamps, sessionID)
+          slb.log.Debug("cleaned up expired session", "session_id", sessionID)
+        }
+      }
+      slb.sessionMu.Unlock()
+    case <-slb.stopCh:
+      return
+    }
+  }
+}

 func (slb *StickyLatencyBased) Stop() {
   if slb.sessionCleanupTicker != nil {
     slb.sessionCleanupTicker.Stop()
   }
+  close(slb.stopCh)
 }

237-239: Potential nil pointer dereference in log statement.

The log statement accesses server.name without checking if server is nil, but Line 242 checks for nil after this point. If the sessionMap contained a nil server, this would panic.

Although the current implementation shouldn't store nil servers in the map, it's safer to be defensive. Move the log statement after the nil check or add a guard:

-				slb.log.Info("session timed out, removed",
-					"session_id", sessionID,
-					"last_accessed", lastAccessed)
+				serverName := "nil"
+				if server != nil {
+					serverName = server.name
+				}
+				slb.log.Info("session timed out, removed",
+					"session_id", sessionID,
+					"server", serverName,
+					"last_accessed", lastAccessed)
🧹 Nitpick comments (1)
internal/proxy/balancer.go (1)

216-221: Remove redundant empty server check.

This check is redundant since LatencyBased.NextServer() already handles the empty server case by returning nil at line 116-118.

-	slb.LatencyBased.mu.Lock()
-	if len(slb.LatencyBased.servers) == 0 {
-		slb.LatencyBased.mu.Unlock()
-		return nil
-	}
-	slb.LatencyBased.mu.Unlock()
-
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a98c3ef and 9cb9109.

📒 Files selected for processing (6)
  • internal/proxy/balancer.go (6 hunks)
  • internal/proxy/balancer_test.go (6 hunks)
  • internal/proxy/grpc.go (1 hunks)
  • internal/proxy/proxy_test.go (1 hunks)
  • internal/proxy/rest.go (1 hunks)
  • internal/proxy/rpc.go (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
  • internal/proxy/grpc.go
  • internal/proxy/rpc.go
  • internal/proxy/rest.go
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-09-09T09:20:27.571Z
Learnt from: cloud-j-luna
PR: akash-network/rpc-proxy#37
File: internal/proxy/balancer.go:0-0
Timestamp: 2025-09-09T09:20:27.571Z
Learning: In Go, calling Stop() on a time.Ticker stops it from sending values but doesn't close the channel. Goroutines using `for range ticker.C` will block indefinitely after Stop() is called and won't exit, causing a goroutine leak. Use a select statement with a stop channel for proper cleanup.

Applied to files:

  • internal/proxy/balancer.go
🧬 Code graph analysis (3)
internal/proxy/proxy_test.go (1)
internal/proxy/server.go (1)
  • Server (28-37)
internal/proxy/balancer_test.go (4)
internal/proxy/server.go (1)
  • Server (28-37)
internal/proxy/balancer.go (2)
  • NewStickyLatencyBased (189-205)
  • ProxyKeyHeader (16-16)
internal/seed/seed.go (3)
  • New (63-72)
  • Node (20-24)
  • Status (27-42)
internal/avg/moving.go (1)
  • Moving (13-17)
internal/proxy/balancer.go (1)
internal/proxy/server.go (1)
  • Server (28-37)
🔇 Additional comments (8)
internal/proxy/proxy_test.go (1)

321-323: LGTM! Clean signature update to align with the new interface.

The method signature has been correctly updated to match the new LoadBalancer interface that requires *http.Request as a parameter.

internal/proxy/balancer_test.go (4)

20-20: LGTM! Method name updates are consistent with the interface changes.

The test correctly updates all Next() calls to NextServer() to match the new interface.

Also applies to: 70-70, 82-82, 111-111, 145-145


177-216: Comprehensive test coverage for sticky session behavior.

The test properly validates:

  • Session creation and persistence with the X-PROXY-KEY header
  • Sticky routing to the same server for repeated requests
  • Different sessions can be created independently

286-330: Clever test design for session timeout verification.

The test effectively validates session expiration by:

  • Using a short timeout for faster testing
  • Overriding the cleanup ticker interval to speed up verification
  • Properly checking session existence before and after expiration

430-477: Excellent concurrent access testing.

The test thoroughly validates thread safety by:

  • Running multiple goroutines with different session IDs
  • Verifying session stickiness is maintained under concurrent load
  • Ensuring no race conditions in session management
internal/proxy/balancer.go (3)

290-315: Efficient session pruning implementation.

Great implementation of session cleanup when servers are removed:

  • Uses O(1) lookups with a map for efficiency
  • Properly handles both nil servers and non-existent servers
  • Includes debug logging for observability

167-205: Well-designed sticky session load balancer.

The implementation effectively combines session affinity with latency-based routing:

  • Clear separation of concerns with embedded LatencyBased
  • Configurable session timeout with sensible default
  • Proper session lifecycle management

225-256: Session lookup handles both expiry and health checks correctly.

The implementation properly:

  • Checks for session timeout and cleans up expired sessions inline
  • Validates server health before returning cached servers
  • Falls back to new server selection when cached server is unhealthy

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
internal/proxy/balancer_test.go (1)

489-559: Server removal path is nondeterministic and misses a crucial assertion.

  • The test assumes session binds to server1 but selection is random; it may bind to server2, making the test vacuously pass.
  • It doesn’t assert that the new session mapping equals the newly selected server after update.

Apply this diff to force the scenario and tighten assertions:

@@
-  // Create session with server1
-  req := createTestRequest()
-  req.Header.Set(ProxyKeyHeader, "session123")
-
-  server1 := lb.NextServer(req)
-  if server1 == nil {
-    t.Fatal("expected a server to be selected")
-  }
-
-  // Verify session was created
-  lb.sessionMu.RLock()
-  originalServer, sessionExists := lb.sessionMap["session123"]
-  lb.sessionMu.RUnlock()
-  if !sessionExists {
-    t.Fatal("expected session to be created")
-  }
+  // Create session mapped deterministically to server1
+  req := createTestRequest()
+  req.Header.Set(ProxyKeyHeader, "session123")
+  lb.sessionMu.Lock()
+  lb.sessionMap["session123"] = initialServers[0] // server1
+  lb.sessionTimestamps["session123"] = time.Now()
+  lb.sessionMu.Unlock()
+  originalServer := initialServers[0]
@@
-  // If the original server was removed, the session should be cleaned up
-  serverWasRemoved := true
-  for _, s := range updatedServers {
-    if s.name == originalServer.name {
-      serverWasRemoved = false
-      break
-    }
-  }
-
-  if serverWasRemoved {
+  // Original server was removed; the session should be cleaned up
@@
-    // Request with same session should create a new session with an available server
-    server2 := lb.NextServer(req)
+    // Request with same session should create a new session with an available server
+    server2 := lb.NextServer(req)
     if server2 == nil {
       t.Fatal("expected a server to be selected")
     }
@@
-    // Verify new session was created
+    // Verify new session was created and points to the newly selected server
     lb.sessionMu.RLock()
     newServer, newSessionExists := lb.sessionMap["session123"]
     lb.sessionMu.RUnlock()
     if !newSessionExists {
       t.Error("expected new session to be created")
     }
     if newServer == nil {
       t.Error("new session points to nil server")
     }
+    if newServer.name != server2.name {
+      t.Fatalf("session mapping not updated correctly; got %v, expected %s", newServer, server2.name)
+    }
-  }
+  }

Optionally, assert explicitly that server2.name != "server1" after update (defensive clarity).

#!/bin/bash
# Flakiness check: run the specific test many times
set -euo pipefail
rg -n "TestStickyLatencyBased_ServerUpdate" internal/proxy/balancer_test.go
go test -run TestStickyLatencyBased_ServerUpdate -count=50 ./internal/proxy -v
🧹 Nitpick comments (3)
internal/proxy/balancer_test.go (3)

129-131: Use epsilon for float normalization check to avoid brittleness.
Direct equality on floats is fragile.

- if sum != 1.0 {
-   t.Errorf("rates are not normalized, sum should be 1, got %f", sum)
- }
+ if math.Abs(sum-1.0) > epsilon {
+   t.Errorf("rates are not normalized, expected ~1, got %f", sum)
+ }

296-300: Avoid spawning an extra cleanup goroutine in tests; risk of leaked goroutines.
You stop the old ticker then start a new ticker and goroutine, but the original cleanup goroutine (created in NewStickyLatencyBased) may block forever on the stopped ticker’s channel. Prefer a cancelable cleanup loop (context/done channel) and a single goroutine.

Proposed follow-ups:

  • Add a done channel/context to StickyLatencyBased.cleanupExpiredSessions and have Stop() signal it so the goroutine exits.
  • In tests, avoid calling go lb.cleanupExpiredSessions() directly; instead expose a test hook like cleanupExpiredSessionsOnce() or rely on inline-expiry-on-access.

585-588: Check http.NewRequest error in helper for robustness.
Low-risk improvement.

-func createTestRequest() *http.Request {
-  req, _ := http.NewRequest("GET", "http://example.com", nil)
-  return req
-}
+func createTestRequest() *http.Request {
+  req, err := http.NewRequest("GET", "http://example.com", nil)
+  if err != nil {
+    panic(err) // safe in tests
+  }
+  return req
+}
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 9cb9109 and 0f5cb0c.

📒 Files selected for processing (1)
  • internal/proxy/balancer_test.go (6 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
internal/proxy/balancer_test.go (4)
internal/proxy/server.go (1)
  • Server (28-37)
internal/proxy/balancer.go (2)
  • NewStickyLatencyBased (189-205)
  • ProxyKeyHeader (16-16)
internal/seed/seed.go (3)
  • New (63-72)
  • Node (20-24)
  • Status (27-42)
internal/avg/moving.go (1)
  • Moving (13-17)
🔇 Additional comments (8)
internal/proxy/balancer_test.go (8)

4-9: Imports look correct for new HTTP helpers.
No issues.


70-70: RoundRobin: API migration to NextServer(nil) is correct.
Good adaptation to new signature.


199-206: Sticky: session affinity assertion is solid.
Confirms same session -> same server.


233-251: No-proxy-key path validated appropriately.
Healthy server check is sufficient here.


281-283: Persistence check is good.
Verifies stable stickiness across calls.


343-399: Cache hit/miss test is well structured.
Covers creation, reuse, expiry, and re-creation.


424-427: Header-based stickiness verified.
Matches expected affinity via X-PROXY-KEY.


465-468: Concurrent access test validates per-session stickiness.
Looks good; no races apparent in test logic.

@chainzero chainzero merged commit 8dfaf41 into main Sep 16, 2025
7 checks passed
@chainzero chainzero deleted the luna/sticky-session-lb branch September 16, 2025 22:48
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants