Skip to content

Commit

Permalink
etcdserverpb: add "retry" flag to WatchResponse
Browse files Browse the repository at this point in the history
Problem: client relies on string value of CancelReason to
determine if watcher should be retried. This is too fragile.
See #14992

Solution: add explicit `retry` flag

fixes: #15058

Signed-off-by: Bogdan Kanivets <[email protected]>
  • Loading branch information
Bogdan Kanivets committed Feb 8, 2023
1 parent 8f54d38 commit 7404574
Show file tree
Hide file tree
Showing 7 changed files with 339 additions and 327 deletions.
4 changes: 4 additions & 0 deletions Documentation/dev-guide/apispec/swagger/rpc.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -2921,6 +2921,10 @@
"header": {
"$ref": "#/definitions/etcdserverpbResponseHeader"
},
"retry": {
"description": "retry is true if watcher should be retried on cancel.",
"type": "boolean"
},
"watch_id": {
"description": "watch_id is the ID of the watcher that corresponds to the response.",
"type": "string",
Expand Down
601 changes: 322 additions & 279 deletions api/etcdserverpb/rpc.pb.go

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions api/etcdserverpb/rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -834,6 +834,9 @@ message WatchResponse {
// framgment is true if large watch response was split over multiple responses.
bool fragment = 7 [(versionpb.etcd_version_field)="3.4"];

// retry is true if watcher should be retried on cancel.
bool retry = 8 [(versionpb.etcd_version_field)="3.5"];

repeated mvccpb.Event events = 11;
}

Expand Down
14 changes: 5 additions & 9 deletions client/v3/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ type WatchResponse struct {

// cancelReason is a reason of canceling watch
cancelReason string

// retry is true if watcher should be retried on cancel.
retry bool
}

// IsCreate returns true if the event tells that the key is newly created.
Expand Down Expand Up @@ -593,7 +596,7 @@ func (w *watchGrpcStream) run() {

switch {
case pbresp.Created:
if pbresp.Canceled && shouldRetryWatch(pbresp.CancelReason) {
if pbresp.Canceled && pbresp.Retry {
var newErr error
if wc, newErr = w.newWatchClient(); newErr != nil {
w.lg.Error("failed to create a new watch client", zap.Error(newErr))
Expand Down Expand Up @@ -721,14 +724,6 @@ func (w *watchGrpcStream) run() {
}
}

func shouldRetryWatch(cancelReason string) bool {
if cancelReason == "" {
return false
}
return (cancelReason == errMsgGRPCInvalidAuthToken) ||
(cancelReason == errMsgGRPCAuthOldRevision)
}

// nextResume chooses the next resuming to register with the grpc stream. Abandoned
// streams are marked as nil in the queue since the head must wait for its inflight registration.
func (w *watchGrpcStream) nextResume() *watcherStream {
Expand All @@ -755,6 +750,7 @@ func (w *watchGrpcStream) dispatchEvent(pbresp *pb.WatchResponse) bool {
Created: pbresp.Created,
Canceled: pbresp.Canceled,
cancelReason: pbresp.CancelReason,
retry: pbresp.Retry,
}

// watch IDs are zero indexed, so request notify watch responses are assigned a watch ID of InvalidWatchID to
Expand Down
39 changes: 0 additions & 39 deletions client/v3/watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@ package clientv3
import (
"testing"

"github.com/stretchr/testify/assert"

"go.etcd.io/etcd/api/v3/mvccpb"
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
)

func TestEvent(t *testing.T) {
Expand Down Expand Up @@ -56,39 +53,3 @@ func TestEvent(t *testing.T) {
}
}
}

func TestShouldRetryWatch(t *testing.T) {
testCases := []struct {
name string
msg string
expectedRetry bool
}{
{
name: "equal to ErrGRPCInvalidAuthToken",
msg: rpctypes.ErrGRPCInvalidAuthToken.Error(),
expectedRetry: true,
},
{
name: "equal to ErrGRPCAuthOldRevision",
msg: rpctypes.ErrGRPCAuthOldRevision.Error(),
expectedRetry: true,
},
{
name: "valid grpc error but not equal to ErrGRPCInvalidAuthToken or ErrGRPCAuthOldRevision",
msg: rpctypes.ErrGRPCUserEmpty.Error(),
expectedRetry: false,
},
{
name: "invalid grpc error and not equal to ErrGRPCInvalidAuthToken or ErrGRPCAuthOldRevision",
msg: "whatever error message",
expectedRetry: false,
},
}

for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
assert.Equal(t, tc.expectedRetry, shouldRetryWatch(tc.msg))
})
}
}
1 change: 1 addition & 0 deletions scripts/etcd_version_annotations.txt
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,7 @@ etcdserverpb.WatchResponse.created: ""
etcdserverpb.WatchResponse.events: ""
etcdserverpb.WatchResponse.fragment: "3.4"
etcdserverpb.WatchResponse.header: ""
etcdserverpb.WatchResponse.retry: "3.5"
etcdserverpb.WatchResponse.watch_id: ""
membershippb.Attributes: "3.5"
membershippb.Attributes.client_urls: ""
Expand Down
4 changes: 4 additions & 0 deletions server/etcdserver/api/v3rpc/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,11 +272,14 @@ func (sws *serverWatchStream) recvLoop() error {
err := sws.isWatchPermitted(creq)
if err != nil {
var cancelReason string
var retry bool
switch err {
case auth.ErrInvalidAuthToken:
cancelReason = rpctypes.ErrGRPCInvalidAuthToken.Error()
retry = true
case auth.ErrAuthOldRevision:
cancelReason = rpctypes.ErrGRPCAuthOldRevision.Error()
retry = true
case auth.ErrUserEmpty:
cancelReason = rpctypes.ErrGRPCUserEmpty.Error()
default:
Expand All @@ -292,6 +295,7 @@ func (sws *serverWatchStream) recvLoop() error {
Canceled: true,
Created: true,
CancelReason: cancelReason,
Retry: retry,
}

select {
Expand Down

0 comments on commit 7404574

Please sign in to comment.