Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

roundrobin: Delegate subchannel creation to pickfirst #7966

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
3 changes: 2 additions & 1 deletion balancer/pickfirst/pickfirstleaf/pickfirstleaf.go
Original file line number Diff line number Diff line change
Expand Up @@ -618,7 +618,6 @@ func (b *pickfirstBalancer) updateSubConnState(sd *scData, newState balancer.Sub
// Record a connection attempt when exiting CONNECTING.
if newState.ConnectivityState == connectivity.TransientFailure {
sd.connectionFailedInFirstPass = true
sd.lastErr = newState.ConnectionError
connectionAttemptsFailedMetric.Record(b.metricsRecorder, 1, b.target)
}

Expand Down Expand Up @@ -703,6 +702,7 @@ func (b *pickfirstBalancer) updateSubConnState(sd *scData, newState balancer.Sub
})
}
case connectivity.TransientFailure:
sd.lastErr = newState.ConnectionError
sd.effectiveState = connectivity.TransientFailure
// Since we're re-using common SubConns while handling resolver
// updates, we could receive an out of turn TRANSIENT_FAILURE from
Expand All @@ -728,6 +728,7 @@ func (b *pickfirstBalancer) updateSubConnState(sd *scData, newState balancer.Sub
switch newState.ConnectivityState {
case connectivity.TransientFailure:
b.numTF = (b.numTF + 1) % b.subConns.Len()
sd.lastErr = newState.ConnectionError
if b.numTF%b.subConns.Len() == 0 {
b.updateBalancerState(balancer.State{
ConnectivityState: connectivity.TransientFailure,
Expand Down
28 changes: 4 additions & 24 deletions balancer/roundrobin/roundrobin.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@
var err error
endpointShardingLBConfig, err = endpointsharding.ParseConfig(json.RawMessage(endpointsharding.PickFirstConfig))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was in the other PR, too.

Probably we should have endpointsharding produce a parsed PF config if it's going to be used frequently?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a nice idea. I've raised a PR with this change: #8007

PTAL, I'll rebase both the PRs once #8007 is merged.

if err != nil {
logger.Fatal(err)
}

Check warning on line 50 in balancer/roundrobin/roundrobin.go

View check run for this annotation

Codecov / codecov/patch

balancer/roundrobin/roundrobin.go#L49-L50

Added lines #L49 - L50 were not covered by tests
balancer.Register(builder{})
}

Expand All @@ -59,44 +59,24 @@

func (bb builder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
bal := &rrBalancer{
cc: cc,
child: endpointsharding.NewBalancer(cc, opts),
cc: cc,
Balancer: endpointsharding.NewBalancer(cc, opts),
}
bal.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[%p] ", bal))
bal.logger.Infof("Created")
return bal
}

type rrBalancer struct {
balancer.Balancer
cc balancer.ClientConn
child balancer.Balancer
logger *internalgrpclog.PrefixLogger
}

func (b *rrBalancer) Close() {
b.child.Close()
}

func (b *rrBalancer) ExitIdle() {
// Should always be ok, as child is endpoint sharding.
if ei, ok := b.child.(balancer.ExitIdler); ok {
ei.ExitIdle()
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is still necessary, unfortunately, as only balancer.Balancer functions are transferred when embedding.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reverted.


func (b *rrBalancer) ResolverError(err error) {
// Will cause inline picker update from endpoint sharding.
b.child.ResolverError(err)
}

func (b *rrBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error {
// Enable the health listener in pickfirst children for client side health
// checks and outlier detection, if configured.
ccs.ResolverState = pickfirstleaf.EnableHealthListener(ccs.ResolverState)
ccs.BalancerConfig = endpointShardingLBConfig
return b.child.UpdateClientConnState(ccs)
}

func (b *rrBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
b.logger.Errorf("UpdateSubConnState(%v, %+v) called unexpectedly", sc, state)
return b.Balancer.UpdateClientConnState(ccs)
}
10 changes: 10 additions & 0 deletions balancer/weightedtarget/weightedtarget_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,9 @@ func (s) TestWeightedTarget(t *testing.T) {

// The same SubConn is closed by balancergroup, gracefulswitch and
// pickfirstleaf when they are closed. Remove duplicate events.
// TODO: https://github.com/grpc/grpc-go/issues/6472 - Remove this
// workaround once pickfirst is the only leaf policy and responsible for
// shutting down SubConns.
initialSC := scShutdown
for scShutdown == initialSC {
scShutdown = <-cc.ShutdownSubConnCh
Expand Down Expand Up @@ -827,6 +830,7 @@ func (s) TestWeightedTarget_ThreeSubBalancers_RemoveBalancer(t *testing.T) {
sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
<-cc.NewPickerCh
<-sc3.ConnectCh
sc3.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
sc3.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
p := <-cc.NewPickerCh
Expand Down Expand Up @@ -877,6 +881,9 @@ func (s) TestWeightedTarget_ThreeSubBalancers_RemoveBalancer(t *testing.T) {
}

// Move balancer 3 into transient failure.
sc3.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Idle})
<-sc3.ConnectCh
sc3.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
wantSubConnErr := errors.New("subConn connection error")
sc3.UpdateState(balancer.SubConnState{
ConnectivityState: connectivity.TransientFailure,
Expand Down Expand Up @@ -908,6 +915,9 @@ func (s) TestWeightedTarget_ThreeSubBalancers_RemoveBalancer(t *testing.T) {

// The same SubConn is closed by balancergroup, gracefulswitch and
// pickfirstleaf when they are closed. Remove duplicate events.
// TODO: https://github.com/grpc/grpc-go/issues/6472 - Remove this
// workaround once pickfirst is the only leaf policy and responsible for
// shutting down SubConns.
initialSC := scShutdown
for scShutdown == initialSC {
scShutdown = <-cc.ShutdownSubConnCh
Expand Down
9 changes: 9 additions & 0 deletions internal/balancergroup/balancergroup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,9 @@ func (s) TestBalancerGroup_locality_caching_not_read_within_timeout(t *testing.T
defer cancel()
// The same SubConn is closed by balancergroup, gracefulswitch and
// pickfirstleaf when they are closed.
// TODO: https://github.com/grpc/grpc-go/issues/6472 - Remove this
// workaround once pickfirst is the only leaf policy and responsible for
// shutting down SubConns.
scToShutdown := map[balancer.SubConn]int{
addrToSC[testBackendAddrs[2].Addr]: 3,
addrToSC[testBackendAddrs[3].Addr]: 3,
Expand Down Expand Up @@ -419,6 +422,9 @@ func (s) TestBalancerGroup_locality_caching_read_with_different_builder(t *testi
shutdownTimeout := time.After(time.Millisecond * 500)
// The same SubConn is closed by balancergroup, gracefulswitch and
// pickfirstleaf when they are closed.
// TODO: https://github.com/grpc/grpc-go/issues/6472 - Remove this
// workaround once pickfirst is the only leaf policy and responsible for
// shutting down SubConns.
scToShutdown := map[balancer.SubConn]int{
addrToSC[testBackendAddrs[2].Addr]: 3,
addrToSC[testBackendAddrs[3].Addr]: 3,
Expand Down Expand Up @@ -696,6 +702,9 @@ func (s) TestBalancerGracefulSwitch(t *testing.T) {
defer cancel()
// The same SubConn is closed by balancergroup, gracefulswitch and
// pickfirstleaf when they are closed.
// TODO: https://github.com/grpc/grpc-go/issues/6472 - Remove this
// workaround once pickfirst is the only leaf policy and responsible for
// shutting down SubConns.
scToShutdown := map[balancer.SubConn]int{
m1[testBackendAddrs[0].Addr]: 3,
m1[testBackendAddrs[1].Addr]: 3,
Expand Down
16 changes: 5 additions & 11 deletions test/roundrobin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func testRoundRobinBasic(ctx context.Context, t *testing.T, opts ...grpc.DialOpt
t.Fatalf("EmptyCall() = %s, want %s", status.Code(err), codes.DeadlineExceeded)
}

r.UpdateState(resolver.State{Endpoints: endpoints})
r.UpdateState(resolver.State{Addresses: addrs})
if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -139,7 +139,7 @@ func (s) TestRoundRobin_NewAddressWhileBlocking(t *testing.T) {

// Send a resolver update with no addresses. This should push the channel into
// TransientFailure.
r.UpdateState(resolver.State{Endpoints: []resolver.Endpoint{}})
r.UpdateState(resolver.State{Addresses: []resolver.Address{}})
testutils.AwaitState(ctx, t, cc, connectivity.TransientFailure)

client := testgrpc.NewTestServiceClient(cc)
Expand Down Expand Up @@ -175,9 +175,7 @@ func (s) TestRoundRobin_NewAddressWhileBlocking(t *testing.T) {

// Send a resolver update with a valid backend to push the channel to Ready
// and unblock the above RPC.
r.UpdateState(resolver.State{Endpoints: []resolver.Endpoint{
{Addresses: []resolver.Address{{Addr: backends[0].Address}}},
}})
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: backends[0].Address}}})

select {
case <-ctx.Done():
Expand Down Expand Up @@ -272,9 +270,7 @@ func (s) TestRoundRobin_UpdateAddressAttributes(t *testing.T) {
}
// Set an initial resolver update with no address attributes.
addr := resolver.Address{Addr: backend.Address}
r.InitialState(resolver.State{Endpoints: []resolver.Endpoint{
{Addresses: []resolver.Address{addr}},
}})
r.InitialState(resolver.State{Addresses: []resolver.Address{addr}})
cc, err := grpc.NewClient(r.Scheme()+":///test.server", dopts...)
if err != nil {
t.Fatalf("grpc.NewClient() failed: %v", err)
Expand All @@ -299,9 +295,7 @@ func (s) TestRoundRobin_UpdateAddressAttributes(t *testing.T) {

// Send a resolver update with address attributes.
addrWithAttributes := imetadata.Set(addr, metadata.Pairs(testMDKey, testMDValue))
r.UpdateState(resolver.State{Endpoints: []resolver.Endpoint{
{Addresses: []resolver.Address{addrWithAttributes}},
}})
r.UpdateState(resolver.State{Addresses: []resolver.Address{addrWithAttributes}})

// Make an RPC and ensure it contains the metadata we are looking for. The
// resolver update isn't processed synchronously, so we wait some time before
Expand Down
16 changes: 16 additions & 0 deletions xds/internal/balancer/priority/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ func (s) TestPriority_SwitchPriority(t *testing.T) {
t.Fatalf("sc is created with addr %v, want %v", got, want)
}
sc1 := <-cc.NewSubConnCh
<-sc1.ConnectCh
sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})

Expand Down Expand Up @@ -292,6 +293,9 @@ func (s) TestPriority_SwitchPriority(t *testing.T) {
}

t.Log("Turn down 1, use 2.")
sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Idle})
<-sc1.ConnectCh
sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
sc1.UpdateState(balancer.SubConnState{
ConnectivityState: connectivity.TransientFailure,
ConnectionError: errors.New("test error"),
Expand Down Expand Up @@ -636,6 +640,9 @@ func (s) TestPriority_HigherReadyCloseAllLower(t *testing.T) {
secondSC := <-cc.ShutdownSubConnCh
// The same SubConn is closed by balancergroup, gracefulswitch and
// pickfirstleaf when they are closed. Remove duplicate events.
// TODO: https://github.com/grpc/grpc-go/issues/6472 - Remove this
// workaround once pickfirst is the only leaf policy and responsible for
// shutting down SubConns.
for secondSC == firstSC {
secondSC = <-cc.ShutdownSubConnCh
}
Expand Down Expand Up @@ -867,6 +874,9 @@ func (s) TestPriority_RemovesAllPriorities(t *testing.T) {
scToShutdown1 := <-cc.ShutdownSubConnCh
// The same SubConn is closed by balancergroup, gracefulswitch and
// pickfirstleaf when they are closed. Remove duplicate events.
dfawley marked this conversation as resolved.
Show resolved Hide resolved
// TODO: https://github.com/grpc/grpc-go/issues/6472 - Remove this
// workaround once pickfirst is the only leaf policy and responsible for
// shutting down SubConns.
for scToShutdown1 == scToShutdown {
scToShutdown1 = <-cc.ShutdownSubConnCh
}
Expand All @@ -875,6 +885,9 @@ func (s) TestPriority_RemovesAllPriorities(t *testing.T) {
}
// The same SubConn is closed by balancergroup, gracefulswitch and
// pickfirstleaf when they are closed. Remove duplicate events.
// TODO: https://github.com/grpc/grpc-go/issues/6472 - Remove this
// workaround once pickfirst is the only leaf policy and responsible for
// shutting down SubConns.
<-cc.ShutdownSubConnCh
<-cc.ShutdownSubConnCh

Expand Down Expand Up @@ -1228,6 +1241,9 @@ func (s) TestPriority_MoveReadyChildToHigherPriority(t *testing.T) {
scToShutdown := <-cc.ShutdownSubConnCh
// The same SubConn is closed by balancergroup, gracefulswitch and
// pickfirstleaf when they are closed. Remove duplicate events.
dfawley marked this conversation as resolved.
Show resolved Hide resolved
// TODO: https://github.com/grpc/grpc-go/issues/6472 - Remove this
// workaround once pickfirst is the only leaf policy and responsible for
// shutting down SubConns.
<-cc.ShutdownSubConnCh
<-cc.ShutdownSubConnCh
if scToShutdown != sc0 {
Expand Down
Loading