Skip to content

Have svcinit release so_reuse_aware ports when service is healthy. #66

@r0bobo

Description

@r0bobo

If I understand the situation correctly svcinit will hold on to the port until the service is shut down when we use so_reuseport_aware = True, resulting in a part of the requests being sent to the blackhole listener of svcinit.
It would be practical if svcinit released the port when the service is healthy so that all packets end up being forwarded to the actual service, and we only have to think about short timeouts and retries in the health check. I had claude cook up a patch for doing this and it solves the problem for us, but I'm unsure if I'm missing something or if it might break other cases.

Would you be interested in me cleaning up the patch if needed and submitting a pr with this feature?

Patch
diff --git a/cmd/svcinit/main.go b/cmd/svcinit/main.go
index 2b277eb..ddb9446 100644
--- a/cmd/svcinit/main.go
+++ b/cmd/svcinit/main.go
@@ -118,7 +118,7 @@ func main() {
 	listener, err := net.Listen("tcp", "127.0.0.1:0")
 	must(err)
 
-	ports, err := assignPorts(unversionedSpecs)
+	ports, reuseportListeners, err := assignPorts(unversionedSpecs)
 	must(err)
 
 	svcctlPort := listener.Addr().(*net.TCPAddr).Port
@@ -137,7 +137,7 @@ func main() {
 	ctx, cancelFunc := context.WithCancel(context.Background())
 	defer cancelFunc()
 
-	r, err := runner.New(ctx, serviceSpecs)
+	r, err := runner.New(ctx, serviceSpecs, reuseportListeners)
 	must(err)
 
 	servicesErrCh := make(chan error, len(unversionedSpecs))
@@ -354,9 +354,10 @@ func readServiceSpecs(
 func assignPorts(
 	serviceSpecs map[string]svclib.ServiceSpec,
 ) (
-	svclib.Ports, error,
+	svclib.Ports, map[string][]net.Listener, error,
 ) {
 	var toClose []net.Listener
+	reuseportListeners := map[string][]net.Listener{}
 	ports := svclib.Ports{}
 
 	for label, spec := range serviceSpecs {
@@ -389,11 +390,11 @@ func assignPorts(
 
 			listener, err := lc.Listen(context.Background(), "tcp", "127.0.0.1:"+port)
 			if err != nil {
-				return nil, err
+				return nil, nil, err
 			}
 			_, port, err = net.SplitHostPort(listener.Addr().String())
 			if err != nil {
-				return nil, err
+				return nil, nil, err
 			}
 
 			qualifiedPortName := label
@@ -423,6 +424,8 @@ func assignPorts(
 
 			if !spec.SoReuseportAware {
 				toClose = append(toClose, listener)
+			} else {
+				reuseportListeners[label] = append(reuseportListeners[label], listener)
 			}
 		}
 	}
@@ -430,7 +433,7 @@ func assignPorts(
 	for _, listener := range toClose {
 		err := listener.Close()
 		if err != nil {
-			return nil, err
+			return nil, nil, err
 		}
 	}
 
@@ -461,10 +464,10 @@ func assignPorts(
 
 	serializedPorts, err := ports.Marshal()
 	if err != nil {
-		return nil, err
+		return nil, nil, err
 	}
 	os.Setenv("ASSIGNED_PORTS", string(serializedPorts))
-	return ports, nil
+	return ports, reuseportListeners, nil
 }
 
 func augmentServiceSpecs(
diff --git a/runner/runner.go b/runner/runner.go
index 5fd3cf7..38075df 100644
--- a/runner/runner.go
+++ b/runner/runner.go
@@ -4,6 +4,7 @@ import (
 	"context"
 	"fmt"
 	"log"
+	"net"
 	"os"
 	"os/exec"
 	"reflect"
@@ -32,12 +33,14 @@ type Runner struct {
 	serviceSpecs ServiceSpecs
 
 	serviceInstances map[string]*ServiceInstance
+	portListeners    map[string][]net.Listener
 }
 
-func New(ctx context.Context, serviceSpecs ServiceSpecs) (*Runner, error) {
+func New(ctx context.Context, serviceSpecs ServiceSpecs, portListeners map[string][]net.Listener) (*Runner, error) {
 	r := &Runner{
 		ctx:              ctx,
 		serviceInstances: map[string]*ServiceInstance{},
+		portListeners:    portListeners,
 	}
 	err := r.UpdateSpecs(serviceSpecs, nil)
 	if err != nil {
@@ -195,6 +198,9 @@ func (r *Runner) UpdateSpecs(serviceSpecs ServiceSpecs, ibazelCmd []byte) error
 		if err != nil {
 			return err
 		}
+		if listeners, ok := r.portListeners[label]; ok {
+			r.serviceInstances[label].portListeners = listeners
+		}
 	}
 
 	for _, label := range updateActions.toReloadLabels {
diff --git a/runner/service_instance.go b/runner/service_instance.go
index 99b8cf6..9127753 100644
--- a/runner/service_instance.go
+++ b/runner/service_instance.go
@@ -6,6 +6,7 @@ import (
 	"fmt"
 	"io"
 	"log"
+	"net"
 	"net/http"
 	"os"
 	"os/exec"
@@ -29,6 +30,11 @@ type ServiceInstance struct {
 	startErrFn func() error
 	waitErrFn  func() error
 
+	// portListeners holds the manager's SO_REUSEPORT listeners for this service.
+	// They are closed once the service becomes healthy, releasing the port back
+	// to be exclusively owned by the service.
+	portListeners []net.Listener
+
 	mu                   sync.Mutex
 	runErr               error
 	killed               bool
@@ -100,6 +106,7 @@ func (s *ServiceInstance) WaitUntilHealthy(ctx context.Context) error {
 
 		if s.HealthCheck(ctx, expectedStartDuration) {
 			log.Printf("%s healthy!\n", coloredLabel)
+			s.releasePortListeners()
 			break
 		}
 
@@ -109,6 +116,15 @@ func (s *ServiceInstance) WaitUntilHealthy(ctx context.Context) error {
 	return nil
 }
 
+func (s *ServiceInstance) releasePortListeners() {
+	for _, l := range s.portListeners {
+		if err := l.Close(); err != nil {
+			log.Printf("Warning: failed to close port listener for %s: %v", s.Colorize(s.Label), err)
+		}
+	}
+	s.portListeners = nil
+}
+
 var httpClient = http.Client{
 	// It's important to have a reasonable timeout here since the connection may never get accepted
 	// if it's to a port that is SO_REUSEPORT-aware. In that case, the healthcheck will hang forever

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions