From 36d856b7bfb6320427d0fbce6f4b40fbbe5320e7 Mon Sep 17 00:00:00 2001 From: voidborne-d Date: Fri, 24 Apr 2026 22:25:46 +0800 Subject: [PATCH] fix(repository/multiplexer): hijack listener conn to avoid pool slot leak on reconnect MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes #3694. The previous multiplexer Connect callback did: poolConn, err := m.pool.Acquire(ctx) if err != nil { return nil, err } return poolConn.Conn(), nil The *pgxpool.Conn wrapper fell out of scope without a matching Release(), so pgxpool's internal bookkeeping counted the slot as permanently acquired. When the server-side terminated the listener conn (e.g. idle_session_timeout, pgbouncer server_idle_timeout, L7 idle kill), pgxlisten's listen() returned with an error and its defer closed the raw conn — but the orphaned pool wrapper was never released. On the next reconnect pgxlisten called Connect again and took a fresh slot. Every reconnect cycle thus leaked one slot until the pool was exhausted. This is distinct from the reconnect bug fixed by #2772 (which ensured reconnect happens at all); this is the slot-leak *consequence* of how reconnect is wired. Fix: extract the Connect body into a small acquireListenerConn helper that returns poolConn.Hijack() instead of poolConn.Conn(). Hijack transfers ownership of the raw conn out of the pool immediately, so the slot is released right after Acquire. pgxlisten's existing "defer conn.Close(ctx)" still closes the raw conn cleanly on listener exit — it just no longer leaks bookkeeping. Includes two testcontainer-backed regression tests: - TestAcquireListenerConn_ReleasesPoolSlotImmediately asserts Stat().AcquiredConns() is 0 immediately after acquireListenerConn. - TestAcquireListenerConn_SurvivesReconnectCyclePastPoolLimit runs more acquire+close cycles than MaxConns; with the old code the pool starves within MaxConns iterations. --- pkg/repository/multiplexer.go | 27 +++-- pkg/repository/multiplexer_listen_test.go | 123 ++++++++++++++++++++++ 2 files changed, 144 insertions(+), 6 deletions(-) create mode 100644 pkg/repository/multiplexer_listen_test.go diff --git a/pkg/repository/multiplexer.go b/pkg/repository/multiplexer.go index b02c237207..9fa43aeb87 100644 --- a/pkg/repository/multiplexer.go +++ b/pkg/repository/multiplexer.go @@ -17,6 +17,26 @@ import ( // multiplexChannel is a single channel used for all multiplexed messages. const multiplexChannel = "hatchet_listener" +// acquireListenerConn acquires a connection from the pool and transfers +// ownership to the caller via Hijack, so the pool slot is released immediately. +// +// pgxlisten takes a raw *pgx.Conn from Connect and closes it via +// `defer conn.Close(ctx)` when Listen exits (e.g. the server-side kills the +// conn via idle_session_timeout). If we returned poolConn.Conn() without +// Hijack, the *pgxpool.Conn wrapper would fall out of scope with no Release() +// call, leaving pgxpool's bookkeeping permanently counting the slot as +// "acquired." Each reconnect cycle would then leak one slot until the pool +// is exhausted. Hijack transfers ownership out of the pool immediately; the +// raw conn is closed cleanly by pgxlisten, and the next Connect call acquires +// a fresh slot without any orphaned bookkeeping. +func acquireListenerConn(ctx context.Context, pool *pgxpool.Pool) (*pgx.Conn, error) { + poolConn, err := pool.Acquire(ctx) + if err != nil { + return nil, err + } + return poolConn.Hijack(), nil +} + // multiplexedListener listens for messages on a single Postgres channel and // dispatches them to the appropriate handlers based on the queue name. type multiplexedListener struct { @@ -57,12 +77,7 @@ func (m *multiplexedListener) startListening() { // listen for multiplexed messages listener := &pgxlisten.Listener{ Connect: func(ctx context.Context) (*pgx.Conn, error) { - // Acquire a new connection each time - poolConn, err := m.pool.Acquire(ctx) - if err != nil { - return nil, err - } - return poolConn.Conn(), nil + return acquireListenerConn(ctx, m.pool) }, LogError: func(innerCtx context.Context, err error) { m.l.Warn().Err(err).Msg("error in listener") diff --git a/pkg/repository/multiplexer_listen_test.go b/pkg/repository/multiplexer_listen_test.go new file mode 100644 index 0000000000..1a8a46f8f3 --- /dev/null +++ b/pkg/repository/multiplexer_listen_test.go @@ -0,0 +1,123 @@ +//go:build !e2e && !load && !rampup && !integration + +package repository + +import ( + "context" + "testing" + "time" + + "github.com/jackc/pgx/v5/pgxpool" + "github.com/stretchr/testify/require" + "github.com/testcontainers/testcontainers-go" + "github.com/testcontainers/testcontainers-go/modules/postgres" + "github.com/testcontainers/testcontainers-go/wait" +) + +// setupPostgresPlain spins up a fresh Postgres 15.6 container and returns a +// pgxpool configured with the given MaxConns. Unlike setupPostgresWithMigration, +// it does NOT run hatchet migrations — these tests only need a raw connection +// to exercise pgxpool bookkeeping. +func setupPostgresPlain(t *testing.T, maxConns int32) (*pgxpool.Pool, func()) { + t.Helper() + + ctx := context.Background() + + postgresContainer, err := postgres.Run(ctx, + "postgres:15.6", + postgres.WithDatabase("hatchet"), + postgres.WithUsername("hatchet"), + postgres.WithPassword("hatchet"), + testcontainers.WithWaitStrategy( + wait.ForLog("database system is ready to accept connections"). + WithOccurrence(2). + WithStartupTimeout(30*time.Second), + ), + ) + require.NoError(t, err) + + connStr, err := postgresContainer.ConnectionString(ctx, "sslmode=disable") + require.NoError(t, err) + + config, err := pgxpool.ParseConfig(connStr) + require.NoError(t, err) + config.MaxConns = maxConns + config.MinConns = 0 + + pool, err := pgxpool.NewWithConfig(ctx, config) + require.NoError(t, err) + + err = pool.Ping(ctx) + require.NoError(t, err) + + cleanup := func() { + pool.Close() + _ = postgresContainer.Terminate(ctx) + } + + return pool, cleanup +} + +// TestAcquireListenerConn_ReleasesPoolSlotImmediately verifies that +// acquireListenerConn returns a raw *pgx.Conn detached from the pool, so that +// pgxpool's "acquired" bookkeeping drops back to zero right after the call. +// +// Regression guard for #3694: the previous implementation returned +// poolConn.Conn() without hijacking, letting the *pgxpool.Conn wrapper fall +// out of scope without Release() — the slot stayed permanently counted as +// acquired even though the raw conn would later be closed by pgxlisten. +func TestAcquireListenerConn_ReleasesPoolSlotImmediately(t *testing.T) { + pool, cleanup := setupPostgresPlain(t, 5) + defer cleanup() + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + require.Equal(t, int32(0), pool.Stat().AcquiredConns(), + "pool should start with zero acquired conns") + + raw, err := acquireListenerConn(ctx, pool) + require.NoError(t, err) + require.NotNil(t, raw) + defer raw.Close(ctx) + + require.Equal(t, int32(0), pool.Stat().AcquiredConns(), + "Hijack should transfer the conn out of the pool so AcquiredConns drops to zero") +} + +// TestAcquireListenerConn_SurvivesReconnectCyclePastPoolLimit simulates the +// exact scenario reported in #3694: pgxlisten repeatedly calls Connect after +// each server-side reconnect, and each returned conn is eventually closed. +// A slot-leak bug would exhaust the pool within MaxConns iterations because +// the orphaned *pgxpool.Conn wrappers would never release their slots. +// +// With acquireListenerConn's Hijack, each call is independent of pool state: +// we run many more iterations than MaxConns and assert the pool never becomes +// exhausted and AcquiredConns returns to zero after each cycle. +func TestAcquireListenerConn_SurvivesReconnectCyclePastPoolLimit(t *testing.T) { + const maxConns int32 = 3 + const iterations = int(maxConns) * 4 // well past MaxConns + + pool, cleanup := setupPostgresPlain(t, maxConns) + defer cleanup() + + for i := 0; i < iterations; i++ { + // Each iteration uses its own short timeout so a would-be slot leak + // surfaces as a deadline-exceeded error rather than hanging the test. + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + raw, err := acquireListenerConn(ctx, pool) + require.NoErrorf(t, err, "iteration %d acquire should succeed; slot leak would starve the pool here", i) + require.NotNil(t, raw) + + require.Equalf(t, int32(0), pool.Stat().AcquiredConns(), + "iteration %d: AcquiredConns must be zero after Hijack", i) + + // Simulate pgxlisten's `defer conn.Close(ctx)` when Listen exits. + err = raw.Close(ctx) + require.NoError(t, err, "iteration %d close should not error", i) + cancel() + } + + require.Equal(t, int32(0), pool.Stat().AcquiredConns(), + "pool should end the test with zero acquired conns") +}