Skip to content

GODRIVER-3419 Use a semaphore to limit concurrent conn creation #2098

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
299 changes: 154 additions & 145 deletions x/mongo/driver/topology/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ type pool struct {
idleConns []*connection // idleConns holds all idle connections.
idleConnWait wantConnQueue // idleConnWait holds all wantConn requests for idle connections.
connectTimeout time.Duration

connectionSem chan struct{}
}

// getState returns the current state of the pool. Callers must not hold the stateMu lock.
Expand Down Expand Up @@ -226,6 +228,7 @@ func newPool(config poolConfig, connOpts ...ConnectionOption) *pool {
conns: make(map[int64]*connection, config.MaxPoolSize),
idleConns: make([]*connection, 0, config.MaxPoolSize),
connectTimeout: config.ConnectTimeout,
connectionSem: make(chan struct{}, maxConnecting),
}
// minSize must not exceed maxSize if maxSize is not 0
if pool.maxSize != 0 && pool.minSize > pool.maxSize {
Expand All @@ -241,11 +244,6 @@ func newPool(config poolConfig, connOpts ...ConnectionOption) *pool {
var ctx context.Context
ctx, pool.cancelBackgroundCtx = context.WithCancel(context.Background())

for i := 0; i < int(pool.maxConnecting); i++ {
pool.backgroundDone.Add(1)
go pool.createConnections(ctx, pool.backgroundDone)
}

// If maintainInterval is not positive, don't start the maintain() goroutine. Expect that
// negative values are only used in testing; this config value is not user-configurable.
if maintainInterval > 0 {
Expand Down Expand Up @@ -598,7 +596,7 @@ func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) {

// If we didn't get an immediately available idle connection, also get in the queue for a new
// connection while we're waiting for an idle connection.
p.queueForNewConn(w)
p.queueForNewConn(ctx, w)
p.stateMu.RUnlock()

// Wait for either the wantConn to be ready or for the Context to time out.
Expand Down Expand Up @@ -1118,13 +1116,16 @@ func (p *pool) getOrQueueForIdleConn(w *wantConn) bool {
return false
}

func (p *pool) queueForNewConn(w *wantConn) {
func (p *pool) queueForNewConn(ctx context.Context, w *wantConn) {
p.createConnectionsCond.L.Lock()
defer p.createConnectionsCond.L.Unlock()

p.newConnWait.cleanFront()
p.newConnWait.pushBack(w)
p.createConnectionsCond.Signal()

// Try to spawn without blocking the caller.
go p.spawnConnectionIfNeeded(ctx)
}

func (p *pool) totalConnectionCount() int {
Expand All @@ -1141,143 +1142,6 @@ func (p *pool) availableConnectionCount() int {
return len(p.idleConns)
}

// createConnections creates connections for wantConn requests on the newConnWait queue.
func (p *pool) createConnections(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()

// condition returns true if the createConnections() loop should continue and false if it should
// wait. Note that the condition also listens for Context cancellation, which also causes the
// loop to continue, allowing for a subsequent check to return from createConnections().
condition := func() bool {
checkOutWaiting := p.newConnWait.len() > 0
poolHasSpace := p.maxSize == 0 || uint64(len(p.conns)) < p.maxSize
cancelled := ctx.Err() != nil
return (checkOutWaiting && poolHasSpace) || cancelled
}

// wait waits for there to be an available wantConn and for the pool to have space for a new
// connection. When the condition becomes true, it creates a new connection and returns the
// waiting wantConn and new connection. If the Context is cancelled or there are any
// errors, wait returns with "ok = false".
wait := func() (*wantConn, *connection, bool) {
p.createConnectionsCond.L.Lock()
defer p.createConnectionsCond.L.Unlock()

for !condition() {
p.createConnectionsCond.Wait()
}

if ctx.Err() != nil {
return nil, nil, false
}

p.newConnWait.cleanFront()
w := p.newConnWait.popFront()
if w == nil {
return nil, nil, false
}

conn := newConnection(p.address, p.connOpts...)
conn.pool = p
conn.driverConnectionID = atomic.AddInt64(&p.nextID, 1)
p.conns[conn.driverConnectionID] = conn

return w, conn, true
}

for ctx.Err() == nil {
w, conn, ok := wait()
if !ok {
continue
}

if mustLogPoolMessage(p) {
keysAndValues := logger.KeyValues{
logger.KeyDriverConnectionID, conn.driverConnectionID,
}

logPoolMessage(p, logger.ConnectionCreated, keysAndValues...)
}

if p.monitor != nil {
p.monitor.Event(&event.PoolEvent{
Type: event.ConnectionCreated,
Address: p.address.String(),
ConnectionID: conn.driverConnectionID,
})
}

start := time.Now()
// Pass the createConnections context to connect to allow pool close to
// cancel connection establishment so shutdown doesn't block indefinitely if
// connectTimeout=0.
//
// Per the specifications, an explicit value of connectTimeout=0 means the
// timeout is "infinite".

var cancel context.CancelFunc

connctx := context.Background()
if p.connectTimeout != 0 {
connctx, cancel = context.WithTimeout(ctx, p.connectTimeout)
}

err := conn.connect(connctx)

if cancel != nil {
cancel()
}

if err != nil {
w.tryDeliver(nil, err)

// If there's an error connecting the new connection, call the handshake error handler
// that implements the SDAM handshake error handling logic. This must be called after
// delivering the connection error to the waiting wantConn. If it's called before, the
// handshake error handler may clear the connection pool, leading to a different error
// message being delivered to the same waiting wantConn in idleConnWait when the wait
// queues are cleared.
if p.handshakeErrFn != nil {
p.handshakeErrFn(err, conn.generation, conn.desc.ServiceID)
}

_ = p.removeConnection(conn, reason{
loggerConn: logger.ReasonConnClosedError,
event: event.ReasonError,
}, err)

_ = p.closeConnection(conn)

continue
}

duration := time.Since(start)
if mustLogPoolMessage(p) {
keysAndValues := logger.KeyValues{
logger.KeyDriverConnectionID, conn.driverConnectionID,
logger.KeyDurationMS, duration.Milliseconds(),
}

logPoolMessage(p, logger.ConnectionReady, keysAndValues...)
}

if p.monitor != nil {
p.monitor.Event(&event.PoolEvent{
Type: event.ConnectionReady,
Address: p.address.String(),
ConnectionID: conn.driverConnectionID,
Duration: duration,
})
}

if w.tryDeliver(conn, nil) {
continue
}

_ = p.checkInNoEvent(conn)
}
}

func (p *pool) maintain(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()

Expand Down Expand Up @@ -1352,7 +1216,7 @@ func (p *pool) maintain(ctx context.Context, wg *sync.WaitGroup) {

for i := 0; i < n; i++ {
w := newWantConn()
p.queueForNewConn(w)
p.queueForNewConn(ctx, w)
wantConns = append(wantConns, w)

// Start a goroutine for each new wantConn, waiting for it to be ready.
Expand Down Expand Up @@ -1539,3 +1403,148 @@ func (q *wantConnQueue) cleanFront() {
q.popFront()
}
}

// spawnConnection establishes a new connection and delivers it to a waiting
// request. It handles dialing, handshaking, and error handling. This function
// is intended to be run in its own goroutine.
func (p *pool) spawnConnection(w *wantConn, conn *connection) {
// Release a slot from the connection semaphore when this function returns.
// This ensures that another connection can be spawned.
defer func() { <-p.connectionSem }()

// Record the start time to calculate the total connection setup duration.
start := time.Now()

// Create a context for the dial operation. If a connection timeout is
// configured, the context will be set to time out after that duration.
dialCtx := context.Background()
var cancel context.CancelFunc
if p.connectTimeout > 0 {
dialCtx, cancel = context.WithTimeout(dialCtx, p.connectTimeout)
defer cancel()
}

// Attempt to connect
if err := conn.connect(dialCtx); err != nil {
// If connection fails, deliver the error to the waiting requester.
w.tryDeliver(nil, err)

// If a handshake error handler is defined, invoke it to handle SDAM state
// changes. This is done after delivering the error to prevent race
// conditions where the pool might be cleared before the error is delivered.
if p.handshakeErrFn != nil {
p.handshakeErrFn(err, conn.generation, conn.desc.ServiceID)
}

_ = p.removeConnection(conn, reason{
loggerConn: logger.ReasonConnClosedError,
event: event.ReasonError,
}, err)

_ = p.closeConnection(conn)

return
}

// emit "ConnectionReady"
duration := time.Since(start)
if mustLogPoolMessage(p) {
keysAndValues := logger.KeyValues{
logger.KeyDriverConnectionID, conn.driverConnectionID,
logger.KeyDurationMS, duration.Milliseconds(),
}

logPoolMessage(p, logger.ConnectionReady, keysAndValues...)
}

if p.monitor != nil {
p.monitor.Event(&event.PoolEvent{
Type: event.ConnectionReady,
Address: p.address.String(),
ConnectionID: conn.driverConnectionID,
Duration: duration,
})
}

// deliver the connection or check it back in on spurious wakeup
if !w.tryDeliver(conn, nil) {
_ = p.checkInNoEvent(conn)
}
}

// hasSpace checks if the pool has space for a new connection. It returns
// "true" if the maximum size is unlimited (0) or if the current number of
// connections is less than the maximum size.
func (p *pool) hasSpace() bool {
return p.maxSize == 0 || uint64(len(p.conns)) < p.maxSize
}

// checkOutWaiting checks if there are any waiting connections that need to be
// checked out.
func (p *pool) checkOutWaiting() bool {
return p.newConnWait.len() > 0
}

// waitForNewConn blocks until there's both work and room in the pool (or the
// context is canceled) then pops exactly one wantconn and creates+registers its
// connection.
func (p *pool) waitForNewConn(ctx context.Context) (*wantConn, *connection, bool) {
p.createConnectionsCond.L.Lock()
defer p.createConnectionsCond.L.Unlock()

for !(p.checkOutWaiting() && p.hasSpace()) && ctx.Err() == nil {
p.createConnectionsCond.Wait()
}

if ctx.Err() != nil {
return nil, nil, false
}

p.newConnWait.cleanFront()
w := p.newConnWait.popFront()
if w == nil {
return nil, nil, false
}

conn := newConnection(p.address, p.connOpts...)
conn.pool = p
conn.driverConnectionID = atomic.AddInt64(&p.nextID, 1)
p.conns[conn.driverConnectionID] = conn

return w, conn, true
}

// spawnConnectionIfNeeded takes on waiting waitConn (if any) and starts its
// connection creation subject to the semaphore limit.
func (p *pool) spawnConnectionIfNeeded(ctx context.Context) {
// Block until we're allowed to start another connection.
p.connectionSem <- struct{}{}

// Wait on pool space & context.
w, conn, ok := p.waitForNewConn(ctx)
if !ok {
<-p.connectionSem // Release slot on failure.

return
}

// Emit "ConnectionCreated"
if mustLogPoolMessage(p) {
keysAndValues := logger.KeyValues{
logger.KeyDriverConnectionID, conn.driverConnectionID,
}

logPoolMessage(p, logger.ConnectionCreated, keysAndValues...)
}

if p.monitor != nil {
p.monitor.Event(&event.PoolEvent{
Type: event.ConnectionCreated,
Address: p.address.String(),
ConnectionID: conn.driverConnectionID,
})
}

// Dial the connection and spawn it in the background.
go p.spawnConnection(w, conn)
}
Loading