Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 21 additions & 6 deletions pkg/repository/multiplexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,26 @@ import (
// multiplexChannel is a single channel used for all multiplexed messages.
const multiplexChannel = "hatchet_listener"

// acquireListenerConn acquires a connection from the pool and transfers
// ownership to the caller via Hijack, so the pool slot is released immediately.
//
// pgxlisten takes a raw *pgx.Conn from Connect and closes it via
// `defer conn.Close(ctx)` when Listen exits (e.g. the server-side kills the
// conn via idle_session_timeout). If we returned poolConn.Conn() without
// Hijack, the *pgxpool.Conn wrapper would fall out of scope with no Release()
// call, leaving pgxpool's bookkeeping permanently counting the slot as
// "acquired." Each reconnect cycle would then leak one slot until the pool
// is exhausted. Hijack transfers ownership out of the pool immediately; the
// raw conn is closed cleanly by pgxlisten, and the next Connect call acquires
// a fresh slot without any orphaned bookkeeping.
func acquireListenerConn(ctx context.Context, pool *pgxpool.Pool) (*pgx.Conn, error) {
poolConn, err := pool.Acquire(ctx)
if err != nil {
return nil, err
}
return poolConn.Hijack(), nil
}

// multiplexedListener listens for messages on a single Postgres channel and
// dispatches them to the appropriate handlers based on the queue name.
type multiplexedListener struct {
Expand Down Expand Up @@ -57,12 +77,7 @@ func (m *multiplexedListener) startListening() {
// listen for multiplexed messages
listener := &pgxlisten.Listener{
Connect: func(ctx context.Context) (*pgx.Conn, error) {
// Acquire a new connection each time
poolConn, err := m.pool.Acquire(ctx)
if err != nil {
return nil, err
}
return poolConn.Conn(), nil
return acquireListenerConn(ctx, m.pool)
},
LogError: func(innerCtx context.Context, err error) {
m.l.Warn().Err(err).Msg("error in listener")
Expand Down
123 changes: 123 additions & 0 deletions pkg/repository/multiplexer_listen_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
//go:build !e2e && !load && !rampup && !integration

package repository

import (
"context"
"testing"
"time"

"github.com/jackc/pgx/v5/pgxpool"
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/modules/postgres"
"github.com/testcontainers/testcontainers-go/wait"
)

// setupPostgresPlain spins up a fresh Postgres 15.6 container and returns a
// pgxpool configured with the given MaxConns. Unlike setupPostgresWithMigration,
// it does NOT run hatchet migrations — these tests only need a raw connection
// to exercise pgxpool bookkeeping.
func setupPostgresPlain(t *testing.T, maxConns int32) (*pgxpool.Pool, func()) {
t.Helper()

ctx := context.Background()

postgresContainer, err := postgres.Run(ctx,
"postgres:15.6",
postgres.WithDatabase("hatchet"),
postgres.WithUsername("hatchet"),
postgres.WithPassword("hatchet"),
testcontainers.WithWaitStrategy(
wait.ForLog("database system is ready to accept connections").
WithOccurrence(2).
WithStartupTimeout(30*time.Second),
),
)
require.NoError(t, err)

connStr, err := postgresContainer.ConnectionString(ctx, "sslmode=disable")
require.NoError(t, err)

config, err := pgxpool.ParseConfig(connStr)
require.NoError(t, err)
config.MaxConns = maxConns
config.MinConns = 0

pool, err := pgxpool.NewWithConfig(ctx, config)
require.NoError(t, err)

err = pool.Ping(ctx)
require.NoError(t, err)

cleanup := func() {
pool.Close()
_ = postgresContainer.Terminate(ctx)
}

return pool, cleanup
}

// TestAcquireListenerConn_ReleasesPoolSlotImmediately verifies that
// acquireListenerConn returns a raw *pgx.Conn detached from the pool, so that
// pgxpool's "acquired" bookkeeping drops back to zero right after the call.
//
// Regression guard for #3694: the previous implementation returned
// poolConn.Conn() without hijacking, letting the *pgxpool.Conn wrapper fall
// out of scope without Release() — the slot stayed permanently counted as
// acquired even though the raw conn would later be closed by pgxlisten.
func TestAcquireListenerConn_ReleasesPoolSlotImmediately(t *testing.T) {
pool, cleanup := setupPostgresPlain(t, 5)
defer cleanup()

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

require.Equal(t, int32(0), pool.Stat().AcquiredConns(),
"pool should start with zero acquired conns")

raw, err := acquireListenerConn(ctx, pool)
require.NoError(t, err)
require.NotNil(t, raw)
defer raw.Close(ctx)

require.Equal(t, int32(0), pool.Stat().AcquiredConns(),
"Hijack should transfer the conn out of the pool so AcquiredConns drops to zero")
}

// TestAcquireListenerConn_SurvivesReconnectCyclePastPoolLimit simulates the
// exact scenario reported in #3694: pgxlisten repeatedly calls Connect after
// each server-side reconnect, and each returned conn is eventually closed.
// A slot-leak bug would exhaust the pool within MaxConns iterations because
// the orphaned *pgxpool.Conn wrappers would never release their slots.
//
// With acquireListenerConn's Hijack, each call is independent of pool state:
// we run many more iterations than MaxConns and assert the pool never becomes
// exhausted and AcquiredConns returns to zero after each cycle.
func TestAcquireListenerConn_SurvivesReconnectCyclePastPoolLimit(t *testing.T) {
const maxConns int32 = 3
const iterations = int(maxConns) * 4 // well past MaxConns

pool, cleanup := setupPostgresPlain(t, maxConns)
defer cleanup()

for i := 0; i < iterations; i++ {
// Each iteration uses its own short timeout so a would-be slot leak
// surfaces as a deadline-exceeded error rather than hanging the test.
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
raw, err := acquireListenerConn(ctx, pool)
require.NoErrorf(t, err, "iteration %d acquire should succeed; slot leak would starve the pool here", i)
require.NotNil(t, raw)

require.Equalf(t, int32(0), pool.Stat().AcquiredConns(),
"iteration %d: AcquiredConns must be zero after Hijack", i)

// Simulate pgxlisten's `defer conn.Close(ctx)` when Listen exits.
err = raw.Close(ctx)
require.NoError(t, err, "iteration %d close should not error", i)
cancel()
}

require.Equal(t, int32(0), pool.Stat().AcquiredConns(),
"pool should end the test with zero acquired conns")
}