Skip to content

Commit

Permalink
balancergroup: Make closing terminal (grpc#8095)
Browse files Browse the repository at this point in the history
  • Loading branch information
arjan-bal authored Feb 25, 2025
1 parent e0ac3ac commit aa629e0
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 117 deletions.
1 change: 0 additions & 1 deletion balancer/rls/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,6 @@ func (rlsBB) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.
Logger: lb.logger,
SubBalancerCloseTimeout: time.Duration(0), // Disable caching of removed child policies
})
lb.bg.Start()
go lb.run()
return lb
}
Expand Down
1 change: 0 additions & 1 deletion balancer/weightedtarget/weightedtarget.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Ba
Logger: b.logger,
SubBalancerCloseTimeout: time.Duration(0), // Disable caching of removed child policies
})
b.bg.Start()
b.logger.Infof("Created")
return b
}
Expand Down
121 changes: 58 additions & 63 deletions internal/balancergroup/balancergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ type BalancerGroup struct {
// The corresponding boolean outgoingStarted is used to stop further updates
// to sub-balancers after they are closed.
outgoingMu sync.Mutex
outgoingStarted bool
outgoingClosed bool
idToBalancerConfig map[string]*subBalancerWrapper
// Cache for sub-balancers when they are removed. This is `nil` if caching
// is disabled by passing `0` for Options.SubBalancerCloseTimeout`.
Expand Down Expand Up @@ -224,7 +224,7 @@ type BalancerGroup struct {
// The corresponding boolean incomingStarted is used to stop further updates
// from sub-balancers after they are closed.
incomingMu sync.Mutex
incomingStarted bool // This boolean only guards calls back to ClientConn.
incomingClosed bool // This boolean only guards calls back to ClientConn.
scToSubBalancer map[balancer.SubConn]*subBalancerWrapper
}

Expand Down Expand Up @@ -265,30 +265,6 @@ func New(opts Options) *BalancerGroup {
}
}

// Start starts the balancer group, including building all the sub-balancers,
// and send the existing addresses to them.
//
// A BalancerGroup can be closed and started later. When a BalancerGroup is
// closed, it can still receive address updates, which will be applied when
// restarted.
func (bg *BalancerGroup) Start() {
bg.incomingMu.Lock()
bg.incomingStarted = true
bg.incomingMu.Unlock()

bg.outgoingMu.Lock()
if bg.outgoingStarted {
bg.outgoingMu.Unlock()
return
}

for _, config := range bg.idToBalancerConfig {
config.startBalancer()
}
bg.outgoingStarted = true
bg.outgoingMu.Unlock()
}

// AddWithClientConn adds a balancer with the given id to the group. The
// balancer is built with a balancer builder registered with balancerName. The
// given ClientConn is passed to the newly built balancer instead of the
Expand All @@ -299,17 +275,18 @@ func (bg *BalancerGroup) AddWithClientConn(id, balancerName string, cc balancer.
bg.logger.Infof("Adding child policy of type %q for child %q", balancerName, id)
builder := balancer.Get(balancerName)
if builder == nil {
return fmt.Errorf("unregistered balancer name %q", balancerName)
return fmt.Errorf("balancergroup: unregistered balancer name %q", balancerName)
}

// Store data in static map, and then check to see if bg is started.
bg.outgoingMu.Lock()
defer bg.outgoingMu.Unlock()
if bg.outgoingClosed {
return fmt.Errorf("balancergroup: already closed")
}
var sbc *subBalancerWrapper
// If outgoingStarted is true, search in the cache. Otherwise, cache is
// guaranteed to be empty, searching is unnecessary. Also, skip the cache if
// caching is disabled.
if bg.outgoingStarted && bg.deletedBalancerCache != nil {
// Skip searching the cache if disabled.
if bg.deletedBalancerCache != nil {
if old, ok := bg.deletedBalancerCache.Remove(id); ok {
if bg.logger.V(2) {
bg.logger.Infof("Removing and reusing child policy of type %q for child %q from the balancer cache", balancerName, id)
Expand Down Expand Up @@ -341,11 +318,7 @@ func (bg *BalancerGroup) AddWithClientConn(id, balancerName string, cc balancer.
builder: builder,
buildOpts: bg.buildOpts,
}
if bg.outgoingStarted {
// Only start the balancer if bg is started. Otherwise, we only keep the
// static data.
sbc.startBalancer()
}
sbc.startBalancer()
} else {
// When brining back a sub-balancer from cache, re-send the cached
// picker and state.
Expand All @@ -369,6 +342,10 @@ func (bg *BalancerGroup) Remove(id string) {
bg.logger.Infof("Removing child policy for child %q", id)

bg.outgoingMu.Lock()
if bg.outgoingClosed {
bg.outgoingMu.Unlock()
return
}

sbToRemove, ok := bg.idToBalancerConfig[id]
if !ok {
Expand All @@ -379,12 +356,6 @@ func (bg *BalancerGroup) Remove(id string) {

// Unconditionally remove the sub-balancer config from the map.
delete(bg.idToBalancerConfig, id)
if !bg.outgoingStarted {
// Nothing needs to be done here, since we wouldn't have created the
// sub-balancer.
bg.outgoingMu.Unlock()
return
}

if bg.deletedBalancerCache != nil {
if bg.logger.V(2) {
Expand Down Expand Up @@ -424,6 +395,7 @@ func (bg *BalancerGroup) Remove(id string) {
// cleanup after the timeout.
func (bg *BalancerGroup) cleanupSubConns(config *subBalancerWrapper) {
bg.incomingMu.Lock()
defer bg.incomingMu.Unlock()
// Remove SubConns. This is only done after the balancer is
// actually closed.
//
Expand All @@ -437,18 +409,20 @@ func (bg *BalancerGroup) cleanupSubConns(config *subBalancerWrapper) {
delete(bg.scToSubBalancer, sc)
}
}
bg.incomingMu.Unlock()
}

// connect attempts to connect to all subConns belonging to sb.
func (bg *BalancerGroup) connect(sb *subBalancerWrapper) {
bg.incomingMu.Lock()
defer bg.incomingMu.Unlock()
if bg.incomingClosed {
return
}
for sc, b := range bg.scToSubBalancer {
if b == sb {
sc.Connect()
}
}
bg.incomingMu.Unlock()
}

// Following are actions from the parent grpc.ClientConn, forward to sub-balancers.
Expand All @@ -457,6 +431,10 @@ func (bg *BalancerGroup) connect(sb *subBalancerWrapper) {
// needed.
func (bg *BalancerGroup) updateSubConnState(sc balancer.SubConn, state balancer.SubConnState, cb func(balancer.SubConnState)) {
bg.incomingMu.Lock()
if bg.incomingClosed {
bg.incomingMu.Unlock()
return
}
if _, ok := bg.scToSubBalancer[sc]; !ok {
bg.incomingMu.Unlock()
return
Expand All @@ -468,10 +446,13 @@ func (bg *BalancerGroup) updateSubConnState(sc balancer.SubConn, state balancer.
bg.incomingMu.Unlock()

bg.outgoingMu.Lock()
defer bg.outgoingMu.Unlock()
if bg.outgoingClosed {
return
}
if cb != nil {
cb(state)
}
bg.outgoingMu.Unlock()
}

// UpdateSubConnState handles the state for the subconn. It finds the
Expand All @@ -485,6 +466,9 @@ func (bg *BalancerGroup) UpdateSubConnState(sc balancer.SubConn, state balancer.
func (bg *BalancerGroup) UpdateClientConnState(id string, s balancer.ClientConnState) error {
bg.outgoingMu.Lock()
defer bg.outgoingMu.Unlock()
if bg.outgoingClosed {
return nil
}
if config, ok := bg.idToBalancerConfig[id]; ok {
return config.updateClientConnState(s)
}
Expand All @@ -494,10 +478,13 @@ func (bg *BalancerGroup) UpdateClientConnState(id string, s balancer.ClientConnS
// ResolverError forwards resolver errors to all sub-balancers.
func (bg *BalancerGroup) ResolverError(err error) {
bg.outgoingMu.Lock()
defer bg.outgoingMu.Unlock()
if bg.outgoingClosed {
return
}
for _, config := range bg.idToBalancerConfig {
config.resolverError(err)
}
bg.outgoingMu.Unlock()
}

// Following are actions from sub-balancers, forward to ClientConn.
Expand All @@ -514,9 +501,9 @@ func (bg *BalancerGroup) newSubConn(config *subBalancerWrapper, addrs []resolver
// error. But since we call balancer.stopBalancer when removing the balancer, this
// shouldn't happen.
bg.incomingMu.Lock()
if !bg.incomingStarted {
if bg.incomingClosed {
bg.incomingMu.Unlock()
return nil, fmt.Errorf("NewSubConn is called after balancer group is closed")
return nil, fmt.Errorf("balancergroup: NewSubConn is called after balancer group is closed")
}
var sc balancer.SubConn
oldListener := opts.StateListener
Expand Down Expand Up @@ -547,31 +534,33 @@ func (bg *BalancerGroup) updateBalancerState(id string, state balancer.State) {
}

// Close closes the balancer. It stops sub-balancers, and removes the subconns.
// The BalancerGroup can be restarted later.
// When a BalancerGroup is closed, it can not receive further address updates.
func (bg *BalancerGroup) Close() {
bg.incomingMu.Lock()
if bg.incomingStarted {
bg.incomingStarted = false
// Also remove all SubConns.
for sc := range bg.scToSubBalancer {
sc.Shutdown()
delete(bg.scToSubBalancer, sc)
}
bg.incomingClosed = true
// Also remove all SubConns.
for sc := range bg.scToSubBalancer {
sc.Shutdown()
delete(bg.scToSubBalancer, sc)
}
bg.incomingMu.Unlock()

bg.outgoingMu.Lock()
// Setting `outgoingClosed` ensures that no entries are added to
// `deletedBalancerCache` after this point.
bg.outgoingClosed = true
bg.outgoingMu.Unlock()

// Clear(true) runs clear function to close sub-balancers in cache. It
// must be called out of outgoing mutex.
if bg.deletedBalancerCache != nil {
bg.deletedBalancerCache.Clear(true)
}

bg.outgoingMu.Lock()
if bg.outgoingStarted {
bg.outgoingStarted = false
for _, config := range bg.idToBalancerConfig {
config.stopBalancer()
}
for id, config := range bg.idToBalancerConfig {
config.stopBalancer()
delete(bg.idToBalancerConfig, id)
}
bg.outgoingMu.Unlock()
}
Expand All @@ -581,24 +570,30 @@ func (bg *BalancerGroup) Close() {
// not supported.
func (bg *BalancerGroup) ExitIdle() {
bg.outgoingMu.Lock()
defer bg.outgoingMu.Unlock()
if bg.outgoingClosed {
return
}
for _, config := range bg.idToBalancerConfig {
if !config.exitIdle() {
bg.connect(config)
}
}
bg.outgoingMu.Unlock()
}

// ExitIdleOne instructs the sub-balancer `id` to exit IDLE state, if
// appropriate and possible.
func (bg *BalancerGroup) ExitIdleOne(id string) {
bg.outgoingMu.Lock()
defer bg.outgoingMu.Unlock()
if bg.outgoingClosed {
return
}
if config := bg.idToBalancerConfig[id]; config != nil {
if !config.exitIdle() {
bg.connect(config)
}
}
bg.outgoingMu.Unlock()
}

// ParseConfig parses a child config list and returns a LB config for the
Expand Down
Loading

0 comments on commit aa629e0

Please sign in to comment.