Skip to content

Commit b6873c0

Browse files
authoredMar 29, 2022
grpc: move to TransientFailure in pick_first LB policy when all addresses are removed (grpc#5274)
1 parent 474948a commit b6873c0

File tree

4 files changed

+153
-133
lines changed

4 files changed

+153
-133
lines changed
 

‎picker_wrapper_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ func (s) TestBlockingPickNoSubAvailable(t *testing.T) {
9797
bp := newPickerWrapper()
9898
var finishedCount uint64
9999
bp.updatePicker(&testingPicker{err: balancer.ErrNoSubConnAvailable, maxCalled: goroutineCount})
100-
// All goroutines should block because picker returns no sc available.
100+
// All goroutines should block because picker returns no subConn available.
101101
for i := goroutineCount; i > 0; i-- {
102102
go func() {
103103
if tr, _, err := bp.pick(context.Background(), true, balancer.PickInfo{}); err != nil || tr != testT {
@@ -138,7 +138,7 @@ func (s) TestBlockingPickSCNotReady(t *testing.T) {
138138
bp := newPickerWrapper()
139139
bp.updatePicker(&testingPicker{sc: testSCNotReady, maxCalled: goroutineCount})
140140
var finishedCount uint64
141-
// All goroutines should block because sc is not ready.
141+
// All goroutines should block because subConn is not ready.
142142
for i := goroutineCount; i > 0; i-- {
143143
go func() {
144144
if tr, _, err := bp.pick(context.Background(), true, balancer.PickInfo{}); err != nil || tr != testT {

‎pickfirst.go

+77-49
Original file line numberDiff line numberDiff line change
@@ -44,79 +44,107 @@ func (*pickfirstBuilder) Name() string {
4444
}
4545

4646
type pickfirstBalancer struct {
47-
state connectivity.State
48-
cc balancer.ClientConn
49-
sc balancer.SubConn
47+
state connectivity.State
48+
cc balancer.ClientConn
49+
subConn balancer.SubConn
5050
}
5151

5252
func (b *pickfirstBalancer) ResolverError(err error) {
53-
switch b.state {
54-
case connectivity.TransientFailure, connectivity.Idle, connectivity.Connecting:
55-
// Set a failing picker if we don't have a good picker.
56-
b.cc.UpdateState(balancer.State{ConnectivityState: connectivity.TransientFailure,
57-
Picker: &picker{err: fmt.Errorf("name resolver error: %v", err)},
58-
})
59-
}
6053
if logger.V(2) {
6154
logger.Infof("pickfirstBalancer: ResolverError called with error %v", err)
6255
}
56+
if b.subConn == nil {
57+
b.state = connectivity.TransientFailure
58+
}
59+
60+
if b.state != connectivity.TransientFailure {
61+
// The picker will not change since the balancer does not currently
62+
// report an error.
63+
return
64+
}
65+
b.cc.UpdateState(balancer.State{
66+
ConnectivityState: connectivity.TransientFailure,
67+
Picker: &picker{err: fmt.Errorf("name resolver error: %v", err)},
68+
})
6369
}
6470

65-
func (b *pickfirstBalancer) UpdateClientConnState(cs balancer.ClientConnState) error {
66-
if len(cs.ResolverState.Addresses) == 0 {
71+
func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState) error {
72+
if len(state.ResolverState.Addresses) == 0 {
73+
// The resolver reported an empty address list. Treat it like an error by
74+
// calling b.ResolverError.
75+
if b.subConn != nil {
76+
// Remove the old subConn. All addresses were removed, so it is no longer
77+
// valid.
78+
b.cc.RemoveSubConn(b.subConn)
79+
b.subConn = nil
80+
}
6781
b.ResolverError(errors.New("produced zero addresses"))
6882
return balancer.ErrBadResolverState
6983
}
70-
if b.sc == nil {
71-
var err error
72-
b.sc, err = b.cc.NewSubConn(cs.ResolverState.Addresses, balancer.NewSubConnOptions{})
73-
if err != nil {
74-
if logger.V(2) {
75-
logger.Errorf("pickfirstBalancer: failed to NewSubConn: %v", err)
76-
}
77-
b.state = connectivity.TransientFailure
78-
b.cc.UpdateState(balancer.State{ConnectivityState: connectivity.TransientFailure,
79-
Picker: &picker{err: fmt.Errorf("error creating connection: %v", err)},
80-
})
81-
return balancer.ErrBadResolverState
84+
85+
if b.subConn != nil {
86+
b.cc.UpdateAddresses(b.subConn, state.ResolverState.Addresses)
87+
return nil
88+
}
89+
90+
subConn, err := b.cc.NewSubConn(state.ResolverState.Addresses, balancer.NewSubConnOptions{})
91+
if err != nil {
92+
if logger.V(2) {
93+
logger.Errorf("pickfirstBalancer: failed to NewSubConn: %v", err)
8294
}
83-
b.state = connectivity.Idle
84-
b.cc.UpdateState(balancer.State{ConnectivityState: connectivity.Idle, Picker: &picker{result: balancer.PickResult{SubConn: b.sc}}})
85-
b.sc.Connect()
86-
} else {
87-
b.cc.UpdateAddresses(b.sc, cs.ResolverState.Addresses)
88-
b.sc.Connect()
95+
b.state = connectivity.TransientFailure
96+
b.cc.UpdateState(balancer.State{
97+
ConnectivityState: connectivity.TransientFailure,
98+
Picker: &picker{err: fmt.Errorf("error creating connection: %v", err)},
99+
})
100+
return balancer.ErrBadResolverState
89101
}
102+
b.subConn = subConn
103+
b.state = connectivity.Idle
104+
b.cc.UpdateState(balancer.State{
105+
ConnectivityState: connectivity.Idle,
106+
Picker: &picker{result: balancer.PickResult{SubConn: b.subConn}},
107+
})
108+
b.subConn.Connect()
90109
return nil
91110
}
92111

93-
func (b *pickfirstBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer.SubConnState) {
112+
func (b *pickfirstBalancer) UpdateSubConnState(subConn balancer.SubConn, state balancer.SubConnState) {
94113
if logger.V(2) {
95-
logger.Infof("pickfirstBalancer: UpdateSubConnState: %p, %v", sc, s)
114+
logger.Infof("pickfirstBalancer: UpdateSubConnState: %p, %v", subConn, state)
96115
}
97-
if b.sc != sc {
116+
if b.subConn != subConn {
98117
if logger.V(2) {
99-
logger.Infof("pickfirstBalancer: ignored state change because sc is not recognized")
118+
logger.Infof("pickfirstBalancer: ignored state change because subConn is not recognized")
100119
}
101120
return
102121
}
103-
b.state = s.ConnectivityState
104-
if s.ConnectivityState == connectivity.Shutdown {
105-
b.sc = nil
122+
b.state = state.ConnectivityState
123+
if state.ConnectivityState == connectivity.Shutdown {
124+
b.subConn = nil
106125
return
107126
}
108127

109-
switch s.ConnectivityState {
128+
switch state.ConnectivityState {
110129
case connectivity.Ready:
111-
b.cc.UpdateState(balancer.State{ConnectivityState: s.ConnectivityState, Picker: &picker{result: balancer.PickResult{SubConn: sc}}})
130+
b.cc.UpdateState(balancer.State{
131+
ConnectivityState: state.ConnectivityState,
132+
Picker: &picker{result: balancer.PickResult{SubConn: subConn}},
133+
})
112134
case connectivity.Connecting:
113-
b.cc.UpdateState(balancer.State{ConnectivityState: s.ConnectivityState, Picker: &picker{err: balancer.ErrNoSubConnAvailable}})
135+
b.cc.UpdateState(balancer.State{
136+
ConnectivityState: state.ConnectivityState,
137+
Picker: &picker{err: balancer.ErrNoSubConnAvailable},
138+
})
114139
case connectivity.Idle:
115-
b.cc.UpdateState(balancer.State{ConnectivityState: s.ConnectivityState, Picker: &idlePicker{sc: sc}})
140+
b.cc.UpdateState(balancer.State{
141+
ConnectivityState: state.ConnectivityState,
142+
Picker: &idlePicker{subConn: subConn},
143+
})
116144
case connectivity.TransientFailure:
117145
b.cc.UpdateState(balancer.State{
118-
ConnectivityState: s.ConnectivityState,
119-
Picker: &picker{err: s.ConnectionError},
146+
ConnectivityState: state.ConnectivityState,
147+
Picker: &picker{err: state.ConnectionError},
120148
})
121149
}
122150
}
@@ -125,8 +153,8 @@ func (b *pickfirstBalancer) Close() {
125153
}
126154

127155
func (b *pickfirstBalancer) ExitIdle() {
128-
if b.sc != nil && b.state == connectivity.Idle {
129-
b.sc.Connect()
156+
if b.subConn != nil && b.state == connectivity.Idle {
157+
b.subConn.Connect()
130158
}
131159
}
132160

@@ -135,18 +163,18 @@ type picker struct {
135163
err error
136164
}
137165

138-
func (p *picker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
166+
func (p *picker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
139167
return p.result, p.err
140168
}
141169

142170
// idlePicker is used when the SubConn is IDLE and kicks the SubConn into
143171
// CONNECTING when Pick is called.
144172
type idlePicker struct {
145-
sc balancer.SubConn
173+
subConn balancer.SubConn
146174
}
147175

148-
func (i *idlePicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
149-
i.sc.Connect()
176+
func (i *idlePicker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
177+
i.subConn.Connect()
150178
return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
151179
}
152180

‎test/balancer_test.go

-82
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ import (
3232
"google.golang.org/grpc"
3333
"google.golang.org/grpc/attributes"
3434
"google.golang.org/grpc/balancer"
35-
"google.golang.org/grpc/balancer/roundrobin"
3635
"google.golang.org/grpc/codes"
3736
"google.golang.org/grpc/connectivity"
3837
"google.golang.org/grpc/credentials"
@@ -683,87 +682,6 @@ func (s) TestServersSwap(t *testing.T) {
683682
}
684683
}
685684

686-
// TestEmptyAddrs verifies client behavior when a working connection is
687-
// removed. In pick first and round-robin, both will continue using the old
688-
// connections.
689-
func (s) TestEmptyAddrs(t *testing.T) {
690-
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
691-
defer cancel()
692-
693-
// Initialize server
694-
lis, err := net.Listen("tcp", "localhost:0")
695-
if err != nil {
696-
t.Fatalf("Error while listening. Err: %v", err)
697-
}
698-
s := grpc.NewServer()
699-
defer s.Stop()
700-
const one = "1"
701-
ts := &funcServer{
702-
unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
703-
return &testpb.SimpleResponse{Username: one}, nil
704-
},
705-
}
706-
testpb.RegisterTestServiceServer(s, ts)
707-
go s.Serve(lis)
708-
709-
// Initialize pickfirst client
710-
pfr := manual.NewBuilderWithScheme("whatever")
711-
712-
pfr.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: lis.Addr().String()}}})
713-
714-
pfcc, err := grpc.DialContext(ctx, pfr.Scheme()+":///", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(pfr))
715-
if err != nil {
716-
t.Fatalf("Error creating client: %v", err)
717-
}
718-
defer pfcc.Close()
719-
pfclient := testpb.NewTestServiceClient(pfcc)
720-
721-
// Confirm we are connected to the server
722-
if res, err := pfclient.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil || res.Username != one {
723-
t.Fatalf("UnaryCall(_) = %v, %v; want {Username: %q}, nil", res, err, one)
724-
}
725-
726-
// Remove all addresses.
727-
pfr.UpdateState(resolver.State{})
728-
729-
// Initialize roundrobin client
730-
rrr := manual.NewBuilderWithScheme("whatever")
731-
732-
rrr.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: lis.Addr().String()}}})
733-
734-
rrcc, err := grpc.DialContext(ctx, rrr.Scheme()+":///", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(rrr),
735-
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{ "loadBalancingConfig": [{"%v": {}}] }`, roundrobin.Name)))
736-
if err != nil {
737-
t.Fatalf("Error creating client: %v", err)
738-
}
739-
defer rrcc.Close()
740-
rrclient := testpb.NewTestServiceClient(rrcc)
741-
742-
// Confirm we are connected to the server
743-
if res, err := rrclient.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil || res.Username != one {
744-
t.Fatalf("UnaryCall(_) = %v, %v; want {Username: %q}, nil", res, err, one)
745-
}
746-
747-
// Remove all addresses.
748-
rrr.UpdateState(resolver.State{})
749-
750-
// Confirm several new RPCs succeed on pick first.
751-
for i := 0; i < 10; i++ {
752-
if _, err := pfclient.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil {
753-
t.Fatalf("UnaryCall(_) = _, %v; want _, nil", err)
754-
}
755-
time.Sleep(5 * time.Millisecond)
756-
}
757-
758-
// Confirm several new RPCs succeed on round robin.
759-
for i := 0; i < 10; i++ {
760-
if _, err := pfclient.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil {
761-
t.Fatalf("UnaryCall(_) = _, %v; want _, nil", err)
762-
}
763-
time.Sleep(5 * time.Millisecond)
764-
}
765-
}
766-
767685
func (s) TestWaitForReady(t *testing.T) {
768686
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
769687
defer cancel()

‎test/pickfirst_test.go

+74
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,9 @@ import (
2626

2727
"google.golang.org/grpc"
2828
"google.golang.org/grpc/codes"
29+
"google.golang.org/grpc/connectivity"
2930
"google.golang.org/grpc/credentials/insecure"
31+
"google.golang.org/grpc/internal/channelz"
3032
"google.golang.org/grpc/internal/stubserver"
3133
"google.golang.org/grpc/peer"
3234
"google.golang.org/grpc/resolver"
@@ -43,6 +45,11 @@ const pickFirstServiceConfig = `{"loadBalancingConfig": [{"pick_first":{}}]}`
4345
// with service config specifying the use of the pick_first LB policy.
4446
func setupPickFirst(t *testing.T, backendCount int, opts ...grpc.DialOption) (*grpc.ClientConn, *manual.Resolver, []*stubserver.StubServer) {
4547
t.Helper()
48+
49+
// Initialize channelz. Used to determine pending RPC count.
50+
czCleanup := channelz.NewChannelzStorageForTesting()
51+
t.Cleanup(func() { czCleanupWrapper(czCleanup, t) })
52+
4653
r := manual.NewBuilderWithScheme("whatever")
4754

4855
backends := make([]*stubserver.StubServer, backendCount)
@@ -258,3 +265,70 @@ func (s) TestPickFirst_AddressesRemoved(t *testing.T) {
258265
t.Fatal(err)
259266
}
260267
}
268+
269+
// TestPickFirst_NewAddressWhileBlocking tests the case where pick_first is
270+
// configured on a channel, things are working as expected and then a resolver
271+
// updates removes all addresses. An RPC attempted at this point in time will be
272+
// blocked because there are no valid backends. This test verifies that when new
273+
// backends are added, the RPC is able to complete.
274+
func (s) TestPickFirst_NewAddressWhileBlocking(t *testing.T) {
275+
cc, r, backends := setupPickFirst(t, 2)
276+
addrs := backendsToAddrs(backends)
277+
r.UpdateState(resolver.State{Addresses: addrs})
278+
279+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
280+
defer cancel()
281+
if err := checkPickFirst(ctx, cc, addrs[0].Addr); err != nil {
282+
t.Fatal(err)
283+
}
284+
285+
// Send a resolver update with no addresses. This should push the channel into
286+
// TransientFailure.
287+
r.UpdateState(resolver.State{})
288+
for state := cc.GetState(); state != connectivity.TransientFailure; state = cc.GetState() {
289+
if !cc.WaitForStateChange(ctx, state) {
290+
t.Fatalf("timeout waiting for state change. got %v; want %v", state, connectivity.TransientFailure)
291+
}
292+
}
293+
294+
doneCh := make(chan struct{})
295+
client := testpb.NewTestServiceClient(cc)
296+
go func() {
297+
// The channel is currently in TransientFailure and this RPC will block
298+
// until the channel becomes Ready, which will only happen when we push a
299+
// resolver update with a valid backend address.
300+
if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
301+
t.Errorf("EmptyCall() = %v, want <nil>", err)
302+
}
303+
close(doneCh)
304+
}()
305+
306+
// Make sure that there is one pending RPC on the ClientConn before attempting
307+
// to push new addresses through the name resolver. If we don't do this, the
308+
// resolver update can happen before the above goroutine gets to make the RPC.
309+
for {
310+
if err := ctx.Err(); err != nil {
311+
t.Fatal(err)
312+
}
313+
tcs, _ := channelz.GetTopChannels(0, 0)
314+
if len(tcs) != 1 {
315+
t.Fatalf("there should only be one top channel, not %d", len(tcs))
316+
}
317+
started := tcs[0].ChannelData.CallsStarted
318+
completed := tcs[0].ChannelData.CallsSucceeded + tcs[0].ChannelData.CallsFailed
319+
if (started - completed) == 1 {
320+
break
321+
}
322+
time.Sleep(defaultTestShortTimeout)
323+
}
324+
325+
// Send a resolver update with a valid backend to push the channel to Ready
326+
// and unblock the above RPC.
327+
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: backends[0].Address}}})
328+
329+
select {
330+
case <-ctx.Done():
331+
t.Fatal("Timeout when waiting for blocked RPC to complete")
332+
case <-doneCh:
333+
}
334+
}

0 commit comments

Comments
 (0)
Please sign in to comment.