Skip to content

Commit

Permalink
xdsclient: ensure xDS node ID in included in NACK and connectivity er…
Browse files Browse the repository at this point in the history
…rors (grpc#8103)
  • Loading branch information
easwars authored Feb 20, 2025
1 parent 42fc25a commit c7db760
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 15 deletions.
20 changes: 11 additions & 9 deletions xds/internal/xdsclient/tests/ads_stream_backoff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"context"
"errors"
"fmt"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -104,13 +103,15 @@ func (s) TestADS_BackoffAfterStreamFailure(t *testing.T) {

// Override the backoff implementation to push on a channel that is read by
// the test goroutine.
backoffCtx, backoffCancel := context.WithCancel(ctx)
streamBackoff := func(v int) time.Duration {
select {
case backoffCh <- struct{}{}:
case <-ctx.Done():
case <-backoffCtx.Done():
}
return 0
}
defer backoffCancel()

// Create an xDS client with bootstrap pointing to the above server.
nodeID := uuid.New().String()
Expand All @@ -130,13 +131,8 @@ func (s) TestADS_BackoffAfterStreamFailure(t *testing.T) {
}

// Verify that the received stream error is reported to the watcher.
u, err := lw.updateCh.Receive(ctx)
if err != nil {
t.Fatal("Timeout when waiting for an error callback on the listener watcher")
}
gotErr := u.(listenerUpdateErrTuple).err
if !strings.Contains(gotErr.Error(), streamErr.Error()) {
t.Fatalf("Received stream error: %v, wantErr: %v", gotErr, streamErr)
if err := verifyListenerError(ctx, lw.updateCh, streamErr.Error(), nodeID); err != nil {
t.Fatal(err)
}

// Verify that the stream is closed.
Expand All @@ -157,6 +153,12 @@ func (s) TestADS_BackoffAfterStreamFailure(t *testing.T) {
if err := waitForResourceNames(ctx, t, ldsResourcesCh, []string{listenerName}); err != nil {
t.Fatal(err)
}

// To prevent indefinite blocking during xDS client close, which is caused
// by a blocking backoff channel write, cancel the backoff context early
// given that the test is complete.
backoffCancel()

}

// Tests the case where a stream breaks because the server goes down. Verifies
Expand Down
15 changes: 9 additions & 6 deletions xds/internal/xdsclient/tests/lds_watchers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func verifyListenerUpdate(ctx context.Context, updateCh *testutils.Channel, want
return nil
}

func verifyUnknownListenerError(ctx context.Context, updateCh *testutils.Channel, wantErr string) error {
func verifyListenerError(ctx context.Context, updateCh *testutils.Channel, wantErr, wantNodeID string) error {
u, err := updateCh.Receive(ctx)
if err != nil {
return fmt.Errorf("timeout when waiting for a listener error from the management server: %v", err)
Expand All @@ -189,6 +189,9 @@ func verifyUnknownListenerError(ctx context.Context, updateCh *testutils.Channel
if gotErr == nil || !strings.Contains(gotErr.Error(), wantErr) {
return fmt.Errorf("update received with error: %v, want %q", gotErr, wantErr)
}
if !strings.Contains(gotErr.Error(), wantNodeID) {
return fmt.Errorf("update received with error: %v, want error with node ID: %q", gotErr, wantNodeID)
}
return nil
}

Expand Down Expand Up @@ -1058,15 +1061,15 @@ func (s) TestLDSWatch_NACKError(t *testing.T) {
}

// Verify that the expected error is propagated to the existing watcher.
if err := verifyUnknownListenerError(ctx, lw.updateCh, wantListenerNACKErr); err != nil {
if err := verifyListenerError(ctx, lw.updateCh, wantListenerNACKErr, nodeID); err != nil {
t.Fatal(err)
}

// Verify that the expected error is propagated to the new watcher as well.
lw2 := newListenerWatcher()
ldsCancel2 := xdsresource.WatchListener(client, ldsName, lw2)
defer ldsCancel2()
if err := verifyUnknownListenerError(ctx, lw2.updateCh, wantListenerNACKErr); err != nil {
if err := verifyListenerError(ctx, lw2.updateCh, wantListenerNACKErr, nodeID); err != nil {
t.Fatal(err)
}
}
Expand Down Expand Up @@ -1138,7 +1141,7 @@ func (s) TestLDSWatch_ResourceCaching_NACKError(t *testing.T) {
}

// Verify that the expected error is propagated to the existing watcher.
if err := verifyUnknownListenerError(ctx, lw1.updateCh, wantListenerNACKErr); err != nil {
if err := verifyListenerError(ctx, lw1.updateCh, wantListenerNACKErr, nodeID); err != nil {
t.Fatal(err)
}

Expand All @@ -1151,7 +1154,7 @@ func (s) TestLDSWatch_ResourceCaching_NACKError(t *testing.T) {
t.Fatal(err)
}
// Verify that the expected error is propagated to the existing watcher.
if err := verifyUnknownListenerError(ctx, lw2.updateCh, wantListenerNACKErr); err != nil {
if err := verifyListenerError(ctx, lw2.updateCh, wantListenerNACKErr, nodeID); err != nil {
t.Fatal(err)
}
}
Expand Down Expand Up @@ -1229,7 +1232,7 @@ func (s) TestLDSWatch_PartialValid(t *testing.T) {
// Verify that the expected error is propagated to the watcher which
// requested for the bad resource.
// Verify that the expected error is propagated to the existing watcher.
if err := verifyUnknownListenerError(ctx, lw1.updateCh, wantListenerNACKErr); err != nil {
if err := verifyListenerError(ctx, lw1.updateCh, wantListenerNACKErr, nodeID); err != nil {
t.Fatal(err)
}

Expand Down

0 comments on commit c7db760

Please sign in to comment.