Skip to content
Open
47 changes: 42 additions & 5 deletions cmd/balancer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,41 @@ func main() {
// Backend pool (THREAD SAFE)
// --------------------------------------------------
pool := core.NewServerPool()
pool.AddServer(&core.Backend{Address: "localhost:9001", Alive: true})
pool.AddServer(&core.Backend{Address: "localhost:9002", Alive: true})
pool.AddServer(&core.Backend{Address: "localhost:9003", Alive: true})
pool.AddServer(&core.Backend{
Address: "localhost:9001",
Alive: true,
CircuitState: "CLOSED",
})

router:= routing.NewAdaptiveRouter(pool)
pool.AddServer(&core.Backend{
Address: "localhost:9002",
Alive: true,
CircuitState: "CLOSED",
})

pool.AddServer(&core.Backend{
Address: "localhost:9003",
Alive: true,
CircuitState: "CLOSED",
})


router := routing.NewAdaptiveRouter(pool)

mux := http.NewServeMux()

//metrics and admin
mux.Handle("/metrics", api.MetricsHandler(pool.GetServers()))
mux.Handle("/admin/add", api.AddServerHandler(pool))

//status endpoint
mux.Handle("/status", api.StatusHandler(router, pool.GetServers))

//health chcker
checker := &health.Checker{
Pool: pool,
Interval: 5 * time.Second,
Timeout: 2 * time.Second,
mux.Handle("/metrics",api.MetricsHandler(pool.GetServers()))
mux.Handle("/admin/add", api.AddServerHandler(pool))

Expand All @@ -68,6 +94,10 @@ func main() {
checker.Start()

//L4 mode
if mode == "L4" {
log.Println("[MAIN] Starting L4 TCP Load Balancer on :8080")
tcpProxy := &l4.TCPProxy{
Pool: pool.GetServers(),
if mode =="L4"{
log.Println("[MAIN] Starting L4 TCP Load Balancer on :8080")
tcpProxy := &l4.TCPProxy{
Expand All @@ -80,16 +110,23 @@ func main() {
//L7
log.Println("[MAIN] Starting L7 HTTP Load Balancer on :8080")
httpProxy := &l7.HTTPProxy{
Pool: pool.GetServers(),
Pool: pool.GetServers(),
Router: router,
}
mux.Handle("/", httpProxy)

corsHandler := handlers.CORS(
handlers.AllowedOrigins([]string{"*"}),
handlers.AllowedMethods([]string{"GET", "POST", "OPTIONS"}),
handlers.AllowedHeaders([]string{"Content-Type"}),
)(mux)
log.Fatal(http.ListenAndServe(":8080", corsHandler))

}
handlers.AllowedMethods([]string{"GET","POST","OPTIONS"}),
handlers.AllowedHeaders([]string{"Content-Type"}),
)(mux)
log.Fatal(http.ListenAndServe(":8080",corsHandler))

}
}
47 changes: 39 additions & 8 deletions internal/core/server.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,48 @@
package core

import (
"time"
"sync"
"time"
)

type Backend struct{
Address string
Weight int
Alive bool
// Backend represents a single backend server
type Backend struct {
Address string
Weight int
Alive bool
ActiveConns int64
Latency time.Duration
ErrorCount int64
Latency time.Duration
ErrorCount int64

// Circuit Breaker fields
CircuitState string // CLOSED, OPEN, HALF_OPEN
FailureCount int64
LastFailure time.Time
LastSuccess time.Time

Mutex sync.Mutex
}

// UpdateCircuitState updates the circuit breaker state of the backend
func (b *Backend) UpdateCircuitState() {
now := time.Now()

switch b.CircuitState {

case "CLOSED":
// Too many failures -> open the circuit
if b.FailureCount >= 3 {
b.CircuitState = "OPEN"
b.LastFailure = now
}

case "OPEN":
// Cooldown period before allowing retry
if now.Sub(b.LastFailure) > 10*time.Second {
b.CircuitState = "HALF_OPEN"
}

}
case "HALF_OPEN":
// Recovery handled by success/failure paths
}
}
20 changes: 15 additions & 5 deletions internal/health/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ import (
"github.com/lugnitdgp/TDOC_Routrix/internal/core"
)

const (
CircuitCooldown = 10 * time.Second
)

type Checker struct {
Pool *core.ServerPool
Interval time.Duration
Expand All @@ -19,7 +23,6 @@ func (c *Checker) Start() {

go func() {
for range ticker.C {

backends := c.Pool.GetServers()
for _, backend := range backends {
go c.checkBackend(backend)
Expand All @@ -29,18 +32,25 @@ func (c *Checker) Start() {
}

func (c *Checker) checkBackend(b *core.Backend) {

start := time.Now()

conn, err := net.DialTimeout("tcp", b.Address, c.Timeout)

b.Mutex.Lock()

defer b.Mutex.Unlock()

// ---------------- CIRCUIT BREAKER COOLDOWN ----------------
if b.CircuitState == "OPEN" {
if time.Since(b.LastFailure) > CircuitCooldown {
b.CircuitState = "HALF_OPEN"
log.Printf("[circuit] %s → HALF_OPEN", b.Address)
}
}

// ---------------- HEALTH CHECK ----------------
if err != nil {
b.Alive = false
log.Printf("backend is down: %s", b.Address)
// Do NOT flip Alive here — circuit breaker handles failures
log.Printf("[health] backend unreachable: %s", b.Address)
return
}

Expand Down
11 changes: 7 additions & 4 deletions internal/proxy/L4/tcp_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ func (p *TCPProxy) handleConnection(clientConn net.Conn) {
start := time.Now()

backend := p.Router.GetNextAvaliableServer(p.Pool)

if backend == nil {
clientConn.Close()
return
Expand All @@ -46,13 +45,16 @@ func (p *TCPProxy) handleConnection(clientConn net.Conn) {
backend.Mutex.Unlock()

serverConn, err := net.Dial("tcp", backend.Address)

if err != nil {
backend.Mutex.Lock()
backend.ActiveConns--
backend.ErrorCount++
backend.FailureCount++
backend.LastFailure = time.Now()
backend.Mutex.Unlock()

clientConn.Close()
return
}

go io.Copy(serverConn, clientConn)
Expand All @@ -63,9 +65,10 @@ func (p *TCPProxy) handleConnection(clientConn net.Conn) {

backend.Mutex.Lock()
backend.ActiveConns--
backend.FailureCount = 0
backend.LastSuccess = time.Now()

lat := time.Since(start)
backend.Latency = (backend.Latency + lat) / 2

backend.Mutex.Unlock()

}
32 changes: 28 additions & 4 deletions internal/proxy/L7/http_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@ func (p *HTTPProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
start := time.Now()

backend := p.Router.GetNextAvaliableServer(p.Pool)

if backend == nil {
http.Error(w, "No Backend avaliable", http.StatusServiceUnavailable)
http.Error(w, "No backend available", http.StatusServiceUnavailable)
return
}

Expand All @@ -30,13 +29,38 @@ func (p *HTTPProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
backend.Mutex.Unlock()

target, _ := url.Parse("http://" + backend.Address)

proxy := httputil.NewSingleHostReverseProxy(target)

// -------- ERROR PATH --------
proxy.ErrorHandler = func(w http.ResponseWriter, r *http.Request, err error) {
backend.Mutex.Lock()

backend.FailureCount++
backend.LastFailure = time.Now()

if backend.CircuitState == "HALF_OPEN" || backend.FailureCount >= 3 {
backend.CircuitState = "OPEN"
}

backend.ActiveConns--
backend.Mutex.Unlock()

http.Error(w, "Backend error", http.StatusBadGateway)
}

proxy.ServeHTTP(w, r)

// -------- SUCCESS PATH --------
backend.Mutex.Lock()
backend.ActiveConns++

if backend.CircuitState == "HALF_OPEN" {
backend.CircuitState = "CLOSED"
backend.FailureCount = 0
}

backend.LastSuccess = time.Now()
backend.Latency = time.Since(start)
backend.ActiveConns--

backend.Mutex.Unlock()
}
Loading