Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

etcdserverpb: add "retry" flag to WatchResponse #15253

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Documentation/dev-guide/apispec/swagger/rpc.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -2921,6 +2921,10 @@
"header": {
"$ref": "#/definitions/etcdserverpbResponseHeader"
},
"retry": {
"description": "retry is true if watcher should be retried on cancel.",
"type": "boolean"
},
"watch_id": {
"description": "watch_id is the ID of the watcher that corresponds to the response.",
"type": "string",
Expand Down
601 changes: 322 additions & 279 deletions api/etcdserverpb/rpc.pb.go

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions api/etcdserverpb/rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -834,6 +834,9 @@ message WatchResponse {
// framgment is true if large watch response was split over multiple responses.
bool fragment = 7 [(versionpb.etcd_version_field)="3.4"];

// retry is true if watcher should be retried on cancel.
bool retry = 8 [(versionpb.etcd_version_field)="3.5"];
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll backport to 3.5, so setting version 3.5 here

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No backports of fields!


repeated mvccpb.Event events = 11;
}

Expand Down
14 changes: 5 additions & 9 deletions client/v3/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ type WatchResponse struct {

// cancelReason is a reason of canceling watch
cancelReason string

// retry is true if watcher should be retried on cancel.
retry bool
}

// IsCreate returns true if the event tells that the key is newly created.
Expand Down Expand Up @@ -593,7 +596,7 @@ func (w *watchGrpcStream) run() {

switch {
case pbresp.Created:
if pbresp.Canceled && shouldRetryWatch(pbresp.CancelReason) {
if pbresp.Canceled && pbresp.Retry {
var newErr error
if wc, newErr = w.newWatchClient(); newErr != nil {
w.lg.Error("failed to create a new watch client", zap.Error(newErr))
Expand Down Expand Up @@ -721,14 +724,6 @@ func (w *watchGrpcStream) run() {
}
}

func shouldRetryWatch(cancelReason string) bool {
if cancelReason == "" {
return false
}
return (cancelReason == errMsgGRPCInvalidAuthToken) ||
(cancelReason == errMsgGRPCAuthOldRevision)
}

// nextResume chooses the next resuming to register with the grpc stream. Abandoned
// streams are marked as nil in the queue since the head must wait for its inflight registration.
func (w *watchGrpcStream) nextResume() *watcherStream {
Expand All @@ -755,6 +750,7 @@ func (w *watchGrpcStream) dispatchEvent(pbresp *pb.WatchResponse) bool {
Created: pbresp.Created,
Canceled: pbresp.Canceled,
cancelReason: pbresp.CancelReason,
retry: pbresp.Retry,
}

// watch IDs are zero indexed, so request notify watch responses are assigned a watch ID of InvalidWatchID to
Expand Down
39 changes: 0 additions & 39 deletions client/v3/watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@ package clientv3
import (
"testing"

"github.com/stretchr/testify/assert"

"go.etcd.io/etcd/api/v3/mvccpb"
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
)

func TestEvent(t *testing.T) {
Expand Down Expand Up @@ -56,39 +53,3 @@ func TestEvent(t *testing.T) {
}
}
}

func TestShouldRetryWatch(t *testing.T) {
testCases := []struct {
name string
msg string
expectedRetry bool
}{
{
name: "equal to ErrGRPCInvalidAuthToken",
msg: rpctypes.ErrGRPCInvalidAuthToken.Error(),
expectedRetry: true,
},
{
name: "equal to ErrGRPCAuthOldRevision",
msg: rpctypes.ErrGRPCAuthOldRevision.Error(),
expectedRetry: true,
},
{
name: "valid grpc error but not equal to ErrGRPCInvalidAuthToken or ErrGRPCAuthOldRevision",
msg: rpctypes.ErrGRPCUserEmpty.Error(),
expectedRetry: false,
},
{
name: "invalid grpc error and not equal to ErrGRPCInvalidAuthToken or ErrGRPCAuthOldRevision",
msg: "whatever error message",
expectedRetry: false,
},
}

for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
assert.Equal(t, tc.expectedRetry, shouldRetryWatch(tc.msg))
})
}
}
1 change: 1 addition & 0 deletions scripts/etcd_version_annotations.txt
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,7 @@ etcdserverpb.WatchResponse.created: ""
etcdserverpb.WatchResponse.events: ""
etcdserverpb.WatchResponse.fragment: "3.4"
etcdserverpb.WatchResponse.header: ""
etcdserverpb.WatchResponse.retry: "3.5"
etcdserverpb.WatchResponse.watch_id: ""
membershippb.Attributes: "3.5"
membershippb.Attributes.client_urls: ""
Expand Down
4 changes: 4 additions & 0 deletions server/etcdserver/api/v3rpc/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,11 +272,14 @@ func (sws *serverWatchStream) recvLoop() error {
err := sws.isWatchPermitted(creq)
if err != nil {
var cancelReason string
var retry bool
switch err {
case auth.ErrInvalidAuthToken:
cancelReason = rpctypes.ErrGRPCInvalidAuthToken.Error()
retry = true
case auth.ErrAuthOldRevision:
cancelReason = rpctypes.ErrGRPCAuthOldRevision.Error()
retry = true
case auth.ErrUserEmpty:
cancelReason = rpctypes.ErrGRPCUserEmpty.Error()
default:
Expand All @@ -292,6 +295,7 @@ func (sws *serverWatchStream) recvLoop() error {
Canceled: true,
Created: true,
CancelReason: cancelReason,
Retry: retry,
}

select {
Expand Down