-
Notifications
You must be signed in to change notification settings - Fork 4.5k
grpc: move to TransientFailure
in pick_first
LB policy when all addresses are removed
#5274
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -44,79 +44,107 @@ func (*pickfirstBuilder) Name() string { | |
} | ||
|
||
type pickfirstBalancer struct { | ||
state connectivity.State | ||
cc balancer.ClientConn | ||
sc balancer.SubConn | ||
state connectivity.State | ||
cc balancer.ClientConn | ||
subConn balancer.SubConn | ||
} | ||
|
||
func (b *pickfirstBalancer) ResolverError(err error) { | ||
switch b.state { | ||
case connectivity.TransientFailure, connectivity.Idle, connectivity.Connecting: | ||
// Set a failing picker if we don't have a good picker. | ||
b.cc.UpdateState(balancer.State{ConnectivityState: connectivity.TransientFailure, | ||
Picker: &picker{err: fmt.Errorf("name resolver error: %v", err)}, | ||
}) | ||
} | ||
if logger.V(2) { | ||
logger.Infof("pickfirstBalancer: ResolverError called with error %v", err) | ||
} | ||
if b.subConn == nil { | ||
b.state = connectivity.TransientFailure | ||
} | ||
|
||
if b.state != connectivity.TransientFailure { | ||
// The picker will not change since the balancer does not currently | ||
// report an error. | ||
return | ||
} | ||
b.cc.UpdateState(balancer.State{ | ||
ConnectivityState: connectivity.TransientFailure, | ||
Picker: &picker{err: fmt.Errorf("name resolver error: %v", err)}, | ||
}) | ||
} | ||
|
||
func (b *pickfirstBalancer) UpdateClientConnState(cs balancer.ClientConnState) error { | ||
if len(cs.ResolverState.Addresses) == 0 { | ||
func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState) error { | ||
if len(state.ResolverState.Addresses) == 0 { | ||
// The resolver reported an empty address list. Treat it like an error by | ||
// calling b.ResolverError. | ||
if b.subConn != nil { | ||
// Remove the old subConn. All addresses were removed, so it is no longer | ||
// valid. | ||
b.cc.RemoveSubConn(b.subConn) | ||
b.subConn = nil | ||
} | ||
b.ResolverError(errors.New("produced zero addresses")) | ||
return balancer.ErrBadResolverState | ||
} | ||
if b.sc == nil { | ||
var err error | ||
b.sc, err = b.cc.NewSubConn(cs.ResolverState.Addresses, balancer.NewSubConnOptions{}) | ||
if err != nil { | ||
if logger.V(2) { | ||
logger.Errorf("pickfirstBalancer: failed to NewSubConn: %v", err) | ||
} | ||
b.state = connectivity.TransientFailure | ||
b.cc.UpdateState(balancer.State{ConnectivityState: connectivity.TransientFailure, | ||
Picker: &picker{err: fmt.Errorf("error creating connection: %v", err)}, | ||
}) | ||
return balancer.ErrBadResolverState | ||
|
||
if b.subConn != nil { | ||
b.cc.UpdateAddresses(b.subConn, state.ResolverState.Addresses) | ||
return nil | ||
} | ||
|
||
subConn, err := b.cc.NewSubConn(state.ResolverState.Addresses, balancer.NewSubConnOptions{}) | ||
if err != nil { | ||
if logger.V(2) { | ||
logger.Errorf("pickfirstBalancer: failed to NewSubConn: %v", err) | ||
} | ||
b.state = connectivity.Idle | ||
b.cc.UpdateState(balancer.State{ConnectivityState: connectivity.Idle, Picker: &picker{result: balancer.PickResult{SubConn: b.sc}}}) | ||
b.sc.Connect() | ||
} else { | ||
b.cc.UpdateAddresses(b.sc, cs.ResolverState.Addresses) | ||
b.sc.Connect() | ||
b.state = connectivity.TransientFailure | ||
b.cc.UpdateState(balancer.State{ | ||
ConnectivityState: connectivity.TransientFailure, | ||
Picker: &picker{err: fmt.Errorf("error creating connection: %v", err)}, | ||
}) | ||
return balancer.ErrBadResolverState | ||
} | ||
b.subConn = subConn | ||
b.state = connectivity.Idle | ||
b.cc.UpdateState(balancer.State{ | ||
ConnectivityState: connectivity.Idle, | ||
Picker: &picker{result: balancer.PickResult{SubConn: b.subConn}}, | ||
}) | ||
b.subConn.Connect() | ||
return nil | ||
} | ||
|
||
func (b *pickfirstBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer.SubConnState) { | ||
func (b *pickfirstBalancer) UpdateSubConnState(subConn balancer.SubConn, state balancer.SubConnState) { | ||
if logger.V(2) { | ||
logger.Infof("pickfirstBalancer: UpdateSubConnState: %p, %v", sc, s) | ||
logger.Infof("pickfirstBalancer: UpdateSubConnState: %p, %v", subConn, state) | ||
} | ||
if b.sc != sc { | ||
if b.subConn != subConn { | ||
if logger.V(2) { | ||
logger.Infof("pickfirstBalancer: ignored state change because sc is not recognized") | ||
logger.Infof("pickfirstBalancer: ignored state change because subConn is not recognized") | ||
} | ||
return | ||
} | ||
b.state = s.ConnectivityState | ||
if s.ConnectivityState == connectivity.Shutdown { | ||
b.sc = nil | ||
b.state = state.ConnectivityState | ||
if state.ConnectivityState == connectivity.Shutdown { | ||
b.subConn = nil | ||
return | ||
} | ||
|
||
switch s.ConnectivityState { | ||
switch state.ConnectivityState { | ||
case connectivity.Ready: | ||
b.cc.UpdateState(balancer.State{ConnectivityState: s.ConnectivityState, Picker: &picker{result: balancer.PickResult{SubConn: sc}}}) | ||
b.cc.UpdateState(balancer.State{ | ||
ConnectivityState: state.ConnectivityState, | ||
Picker: &picker{result: balancer.PickResult{SubConn: subConn}}, | ||
}) | ||
case connectivity.Connecting: | ||
b.cc.UpdateState(balancer.State{ConnectivityState: s.ConnectivityState, Picker: &picker{err: balancer.ErrNoSubConnAvailable}}) | ||
b.cc.UpdateState(balancer.State{ | ||
ConnectivityState: state.ConnectivityState, | ||
Picker: &picker{err: balancer.ErrNoSubConnAvailable}, | ||
}) | ||
case connectivity.Idle: | ||
b.cc.UpdateState(balancer.State{ConnectivityState: s.ConnectivityState, Picker: &idlePicker{sc: sc}}) | ||
b.cc.UpdateState(balancer.State{ | ||
ConnectivityState: state.ConnectivityState, | ||
Picker: &idlePicker{subConn: subConn}, | ||
}) | ||
case connectivity.TransientFailure: | ||
b.cc.UpdateState(balancer.State{ | ||
ConnectivityState: s.ConnectivityState, | ||
Picker: &picker{err: s.ConnectionError}, | ||
ConnectivityState: state.ConnectivityState, | ||
Picker: &picker{err: state.ConnectionError}, | ||
}) | ||
} | ||
} | ||
|
@@ -125,8 +153,8 @@ func (b *pickfirstBalancer) Close() { | |
} | ||
|
||
func (b *pickfirstBalancer) ExitIdle() { | ||
if b.sc != nil && b.state == connectivity.Idle { | ||
b.sc.Connect() | ||
if b.subConn != nil && b.state == connectivity.Idle { | ||
b.subConn.Connect() | ||
} | ||
} | ||
|
||
|
@@ -135,18 +163,18 @@ type picker struct { | |
err error | ||
} | ||
|
||
func (p *picker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { | ||
func (p *picker) Pick(_ balancer.PickInfo) (balancer.PickResult, error) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: delete parameter name entirely instead of using "_". And below. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
return p.result, p.err | ||
} | ||
|
||
// idlePicker is used when the SubConn is IDLE and kicks the SubConn into | ||
// CONNECTING when Pick is called. | ||
type idlePicker struct { | ||
sc balancer.SubConn | ||
subConn balancer.SubConn | ||
} | ||
|
||
func (i *idlePicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { | ||
i.sc.Connect() | ||
func (i *idlePicker) Pick(_ balancer.PickInfo) (balancer.PickResult, error) { | ||
i.subConn.Connect() | ||
return balancer.PickResult{}, balancer.ErrNoSubConnAvailable | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -32,7 +32,6 @@ import ( | |
"google.golang.org/grpc" | ||
"google.golang.org/grpc/attributes" | ||
"google.golang.org/grpc/balancer" | ||
"google.golang.org/grpc/balancer/roundrobin" | ||
"google.golang.org/grpc/codes" | ||
"google.golang.org/grpc/connectivity" | ||
"google.golang.org/grpc/credentials" | ||
|
@@ -683,87 +682,6 @@ func (s) TestServersSwap(t *testing.T) { | |
} | ||
} | ||
|
||
// TestEmptyAddrs verifies client behavior when a working connection is | ||
// removed. In pick first and round-robin, both will continue using the old | ||
// connections. | ||
Comment on lines
-686
to
-688
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's make sure there's still a test for this set of circumstances, but that the RPCs begin failing instead of keep using old connections. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also, I thought we confirmed RR was removing the subconns, so RPCs should have been failing? But this test is validating the RPCs would still succeed? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Yes, this test was quite broken because it was using the pick_first clientConn to verify round_robin behaviour: // Confirm several new RPCs succeed on round robin.
for i := 0; i < 10; i++ {
if _, err := pfclient.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil {
t.Fatalf("UnaryCall(_) = _, %v; want _, nil", err)
}
time.Sleep(5 * time.Millisecond)
} There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Yes, the newly added test verifies that the channel moves to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Oof There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
But only with pick first -- is there one for RR too elsewhere? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
func (s) TestEmptyAddrs(t *testing.T) { | ||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) | ||
defer cancel() | ||
|
||
// Initialize server | ||
lis, err := net.Listen("tcp", "localhost:0") | ||
if err != nil { | ||
t.Fatalf("Error while listening. Err: %v", err) | ||
} | ||
s := grpc.NewServer() | ||
defer s.Stop() | ||
const one = "1" | ||
ts := &funcServer{ | ||
unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { | ||
return &testpb.SimpleResponse{Username: one}, nil | ||
}, | ||
} | ||
testpb.RegisterTestServiceServer(s, ts) | ||
go s.Serve(lis) | ||
|
||
// Initialize pickfirst client | ||
pfr := manual.NewBuilderWithScheme("whatever") | ||
|
||
pfr.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: lis.Addr().String()}}}) | ||
|
||
pfcc, err := grpc.DialContext(ctx, pfr.Scheme()+":///", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(pfr)) | ||
if err != nil { | ||
t.Fatalf("Error creating client: %v", err) | ||
} | ||
defer pfcc.Close() | ||
pfclient := testpb.NewTestServiceClient(pfcc) | ||
|
||
// Confirm we are connected to the server | ||
if res, err := pfclient.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil || res.Username != one { | ||
t.Fatalf("UnaryCall(_) = %v, %v; want {Username: %q}, nil", res, err, one) | ||
} | ||
|
||
// Remove all addresses. | ||
pfr.UpdateState(resolver.State{}) | ||
|
||
// Initialize roundrobin client | ||
rrr := manual.NewBuilderWithScheme("whatever") | ||
|
||
rrr.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: lis.Addr().String()}}}) | ||
|
||
rrcc, err := grpc.DialContext(ctx, rrr.Scheme()+":///", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(rrr), | ||
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{ "loadBalancingConfig": [{"%v": {}}] }`, roundrobin.Name))) | ||
if err != nil { | ||
t.Fatalf("Error creating client: %v", err) | ||
} | ||
defer rrcc.Close() | ||
rrclient := testpb.NewTestServiceClient(rrcc) | ||
|
||
// Confirm we are connected to the server | ||
if res, err := rrclient.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil || res.Username != one { | ||
t.Fatalf("UnaryCall(_) = %v, %v; want {Username: %q}, nil", res, err, one) | ||
} | ||
|
||
// Remove all addresses. | ||
rrr.UpdateState(resolver.State{}) | ||
|
||
// Confirm several new RPCs succeed on pick first. | ||
for i := 0; i < 10; i++ { | ||
if _, err := pfclient.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil { | ||
t.Fatalf("UnaryCall(_) = _, %v; want _, nil", err) | ||
} | ||
time.Sleep(5 * time.Millisecond) | ||
} | ||
|
||
// Confirm several new RPCs succeed on round robin. | ||
for i := 0; i < 10; i++ { | ||
if _, err := pfclient.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil { | ||
t.Fatalf("UnaryCall(_) = _, %v; want _, nil", err) | ||
} | ||
time.Sleep(5 * time.Millisecond) | ||
} | ||
} | ||
|
||
func (s) TestWaitForReady(t *testing.T) { | ||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) | ||
defer cancel() | ||
|
Uh oh!
There was an error while loading. Please reload this page.