From 433ed3ce26c982e88a9020d7cf59e9b6daa04ffb Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Thu, 25 Sep 2025 23:21:11 +0000 Subject: [PATCH 1/5] xds/resolver: create interceptors once when creating the config selector, not for every RPC --- internal/xds/resolver/serviceconfig.go | 82 +-- .../xds/resolver/xds_http_filters_test.go | 678 +++++++++++++++++ internal/xds/resolver/xds_resolver.go | 25 +- internal/xds/resolver/xds_resolver_test.go | 692 +++++++++--------- 4 files changed, 1074 insertions(+), 403 deletions(-) create mode 100644 internal/xds/resolver/xds_http_filters_test.go diff --git a/internal/xds/resolver/serviceconfig.go b/internal/xds/resolver/serviceconfig.go index f2ceabe7a8db..9ba9a4454f32 100644 --- a/internal/xds/resolver/serviceconfig.go +++ b/internal/xds/resolver/serviceconfig.go @@ -97,17 +97,14 @@ func serviceConfigJSON(activeClusters map[string]*clusterInfo) []byte { } type virtualHost struct { - // map from filter name to its config - httpFilterConfigOverride map[string]httpfilter.FilterConfig // retry policy present in virtual host retryConfig *xdsresource.RetryConfig } // routeCluster holds information about a cluster as referenced by a route. type routeCluster struct { - name string - // map from filter name to its config - httpFilterConfigOverride map[string]httpfilter.FilterConfig + name string // Name of the cluster. + interceptor iresolver.ClientInterceptor // HTTP filters to run for RPCs matching this route. } type route struct { @@ -115,10 +112,8 @@ type route struct { actionType xdsresource.RouteActionType // holds route action type clusters wrr.WRR // holds *routeCluster entries maxStreamDuration time.Duration - // map from filter name to its config - httpFilterConfigOverride map[string]httpfilter.FilterConfig - retryConfig *xdsresource.RetryConfig - hashPolicies []*xdsresource.HashPolicy + retryConfig *xdsresource.RetryConfig + hashPolicies []*xdsresource.HashPolicy } func (r route) String() string { @@ -200,11 +195,6 @@ func (cs *configSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*iresolver.RP ref := &cs.clusters[cluster.name].refCount atomic.AddInt32(ref, 1) - interceptor, err := cs.newInterceptor(rt, cluster) - if err != nil { - return nil, annotateErrorWithNodeID(err, cs.xdsNodeID) - } - lbCtx := clustermanager.SetPickedCluster(rpcInfo.Context, cluster.name) lbCtx = iringhash.SetXDSRequestHash(lbCtx, cs.generateHash(rpcInfo, rt.hashPolicies)) @@ -220,7 +210,7 @@ func (cs *configSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*iresolver.RP cs.sendNewServiceConfig() } }, - Interceptor: interceptor, + Interceptor: cluster.interceptor, } if rt.maxStreamDuration != 0 { @@ -310,35 +300,6 @@ func (cs *configSelector) generateHash(rpcInfo iresolver.RPCInfo, hashPolicies [ return rand.Uint64() } -func (cs *configSelector) newInterceptor(rt *route, cluster *routeCluster) (iresolver.ClientInterceptor, error) { - if len(cs.httpFilterConfig) == 0 { - return nil, nil - } - interceptors := make([]iresolver.ClientInterceptor, 0, len(cs.httpFilterConfig)) - for _, filter := range cs.httpFilterConfig { - override := cluster.httpFilterConfigOverride[filter.Name] // cluster is highest priority - if override == nil { - override = rt.httpFilterConfigOverride[filter.Name] // route is second priority - } - if override == nil { - override = cs.virtualHost.httpFilterConfigOverride[filter.Name] // VH is third & lowest priority - } - ib, ok := filter.Filter.(httpfilter.ClientInterceptorBuilder) - if !ok { - // Should not happen if it passed xdsClient validation. - return nil, fmt.Errorf("filter does not support use in client") - } - i, err := ib.BuildClientInterceptor(filter.Config, override) - if err != nil { - return nil, fmt.Errorf("error constructing filter: %v", err) - } - if i != nil { - interceptors = append(interceptors, i) - } - } - return &interceptorList{interceptors: interceptors}, nil -} - // stop decrements refs of all clusters referenced by this config selector. func (cs *configSelector) stop() { // The resolver's old configSelector may be nil. Handle that here. @@ -363,6 +324,39 @@ func (cs *configSelector) stop() { } } +// newInterceptor builds a chain of client interceptors for the given filters. +// The filter config override maps contain overrides from the route, cluster, +// and virtual host respectively. The cluster override has the highest priority, +// followed by the route override, and finally the virtual host override. +func newInterceptor(filters []xdsresource.HTTPFilter, cluster, route, virtualHost map[string]httpfilter.FilterConfig) (iresolver.ClientInterceptor, error) { + if len(filters) == 0 { + return nil, nil + } + interceptors := make([]iresolver.ClientInterceptor, 0, len(filters)) + for _, filter := range filters { + override := cluster[filter.Name] + if override == nil { + override = route[filter.Name] + } + if override == nil { + override = virtualHost[filter.Name] + } + ib, ok := filter.Filter.(httpfilter.ClientInterceptorBuilder) + if !ok { + // Should not happen if it passed xdsClient validation. + panic(fmt.Sprintf("filter %q does not support use in client", filter.Name)) + } + i, err := ib.BuildClientInterceptor(filter.Config, override) + if err != nil { + return nil, fmt.Errorf("error constructing filter: %v", err) + } + if i != nil { + interceptors = append(interceptors, i) + } + } + return &interceptorList{interceptors: interceptors}, nil +} + type interceptorList struct { interceptors []iresolver.ClientInterceptor } diff --git a/internal/xds/resolver/xds_http_filters_test.go b/internal/xds/resolver/xds_http_filters_test.go new file mode 100644 index 000000000000..928ba34f972d --- /dev/null +++ b/internal/xds/resolver/xds_http_filters_test.go @@ -0,0 +1,678 @@ +/* + * + * Copyright 2025 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package resolver_test + +import ( + "context" + "encoding/json" + "fmt" + "strings" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/google/uuid" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/internal" + iresolver "google.golang.org/grpc/internal/resolver" + "google.golang.org/grpc/internal/stubserver" + "google.golang.org/grpc/internal/testutils" + "google.golang.org/grpc/internal/testutils/xds/e2e" + "google.golang.org/grpc/internal/xds/httpfilter" + rinternal "google.golang.org/grpc/internal/xds/resolver/internal" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/resolver" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/anypb" + "google.golang.org/protobuf/types/known/structpb" + "google.golang.org/protobuf/types/known/wrapperspb" + + v3xdsxdstypepb "github.com/cncf/xds/go/xds/type/v3" + v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" + v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" + v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" + v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" + v3httppb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3" + testgrpc "google.golang.org/grpc/interop/grpc_testing" + testpb "google.golang.org/grpc/interop/grpc_testing" + + _ "google.golang.org/grpc/xds" // Register all required xDS components +) + +const ( + filterCfgPathFieldName = "path" + filterCfgErrorFieldName = "new_stream_error" + filterCfgMetadataKey = "test-filter-config" +) + +// testFilterCfg is the internal representation of the filter config proto. It +// is returned by filter's config parsing methods. +type testFilterCfg struct { + httpfilter.FilterConfig + path string + newStreamErr error +} + +// filterConfigFromProto parses filter config specified as a v3.TypedStruct into +// a testFilterCfg. +func filterConfigFromProto(cfg proto.Message) (httpfilter.FilterConfig, error) { + ts, ok := cfg.(*v3xdsxdstypepb.TypedStruct) + if !ok { + return nil, fmt.Errorf("unsupported filter config type: %T, want %T", cfg, &v3xdsxdstypepb.TypedStruct{}) + } + + if ts.GetValue() == nil { + return testFilterCfg{}, nil + } + ret := testFilterCfg{} + if v := ts.GetValue().GetFields()[filterCfgPathFieldName]; v != nil { + ret.path = v.GetStringValue() + } + if v := ts.GetValue().GetFields()[filterCfgErrorFieldName]; v != nil { + if v.GetStringValue() == "" { + ret.newStreamErr = nil + } else { + ret.newStreamErr = fmt.Errorf("%s", v.GetStringValue()) + } + } + return ret, nil +} + +type logger interface { + Logf(format string, args ...any) +} + +// testHTTPFilterWithRPCMetadata is a HTTP filter used for testing purposes. +// +// This filter is used to verify that the xDS resolver and filter stack +// correctly propagate filter configuration (both base and override) to RPCs. It +// does this by injecting the config paths from its base and override configs as +// JSON-encoded metadata into outgoing RPCs. The metadata can then be observed +// by the backend, allowing tests to assert that the correct filter +// configuration was applied for each RPC. +type testHTTPFilterWithRPCMetadata struct { + logger logger + typeURL string + newStreamChan *testutils.Channel // If set, filter config is written to this field from NewStream() +} + +func (fb *testHTTPFilterWithRPCMetadata) TypeURLs() []string { return []string{fb.typeURL} } + +func (*testHTTPFilterWithRPCMetadata) ParseFilterConfig(cfg proto.Message) (httpfilter.FilterConfig, error) { + return filterConfigFromProto(cfg) +} + +func (*testHTTPFilterWithRPCMetadata) ParseFilterConfigOverride(override proto.Message) (httpfilter.FilterConfig, error) { + return filterConfigFromProto(override) +} + +func (*testHTTPFilterWithRPCMetadata) IsTerminal() bool { return false } + +var _ httpfilter.ClientInterceptorBuilder = &testHTTPFilterWithRPCMetadata{} + +func (fb *testHTTPFilterWithRPCMetadata) BuildClientInterceptor(config, override httpfilter.FilterConfig) (iresolver.ClientInterceptor, error) { + fb.logger.Logf("BuildClientInterceptor called with config: %+v, override: %+v", config, override) + + if config == nil { + panic("unexpected missing config") + } + + baseCfg := config.(testFilterCfg) + basePath := baseCfg.path + newStreamErr := baseCfg.newStreamErr + + var overridePath string + if override != nil { + overrideCfg := override.(testFilterCfg) + overridePath = overrideCfg.path + if overrideCfg.newStreamErr != nil { + newStreamErr = overrideCfg.newStreamErr + } + } + var errStr string + if newStreamErr != nil { + errStr = newStreamErr.Error() + } + + return &testFilterInterceptor{ + logger: fb.logger, + cfg: overallFilterConfig{ + BasePath: basePath, + OverridePath: overridePath, + Error: errStr, + }, + newStreamChan: fb.newStreamChan, + }, nil +} + +// overallFilterConfig is a JSON representation of the filter config. +// It is sent as RPC metadata and written to a channel for test verification. +type overallFilterConfig struct { + BasePath string `json:"base_path,omitempty"` + OverridePath string `json:"override_path,omitempty"` + Error string `json:"error,omitempty"` +} + +// testFilterInterceptor is a client interceptor that injects RPC metadata +// corresponding to its filter config. +type testFilterInterceptor struct { + logger logger + cfg overallFilterConfig + newStreamChan *testutils.Channel // If set, filter config is written to this field from NewStream() +} + +func (fi *testFilterInterceptor) NewStream(ctx context.Context, _ iresolver.RPCInfo, done func(), newStream func(ctx context.Context, done func()) (iresolver.ClientStream, error)) (iresolver.ClientStream, error) { + // Write the config to the channel, if set. This allows tests to verify that + // the filter was invoked at RPC time. This is useful for tests where the + // RPC is expected to fail, and therefore the RPC metadata cannot be + // observed from the backend. + if fi.newStreamChan != nil { + fi.newStreamChan.Send(fi.cfg) + } + + if fi.cfg.Error != "" { + return nil, status.Error(codes.Unavailable, fi.cfg.Error) + } + + // Marshal the filter config to JSON and inject it as metadata. + bytes, err := json.Marshal(fi.cfg) + if err != nil { + return nil, fmt.Errorf("failed to marshal filter config: %w", err) + } + cfg := string(bytes) + fi.logger.Logf("Injecting filter config metadata: %v", cfg) + + cs, err := newStream(metadata.AppendToOutgoingContext(ctx, filterCfgMetadataKey, fmt.Sprintf("%v", cfg)), done) + if err != nil { + return nil, err + } + return &clientStream{ClientStream: cs, ctx: ctx}, nil +} + +type clientStream struct { + iresolver.ClientStream + ctx context.Context +} + +func (cs *clientStream) Context() context.Context { + return cs.ctx +} + +func newHTTPFilter(t *testing.T, name, typeURL, path, err string) *v3httppb.HttpFilter { + return &v3httppb.HttpFilter{ + Name: name, + ConfigType: &v3httppb.HttpFilter_TypedConfig{ + TypedConfig: testutils.MarshalAny(t, &v3xdsxdstypepb.TypedStruct{ + TypeUrl: typeURL, + Value: &structpb.Struct{ + Fields: map[string]*structpb.Value{ + filterCfgPathFieldName: {Kind: &structpb.Value_StringValue{StringValue: path}}, + filterCfgErrorFieldName: {Kind: &structpb.Value_StringValue{StringValue: err}}, + }, + }, + }), + }, + } +} + +// newStubServer returns a stub server that sends any filter config metadata +// received as part of incoming RPCs to the provided channel. +func newStubServer(metadataCh chan []string) *stubserver.StubServer { + return &stubserver.StubServer{ + EmptyCallF: func(ctx context.Context, _ *testpb.Empty) (*testpb.Empty, error) { + md, ok := metadata.FromIncomingContext(ctx) + if !ok { + return nil, status.Error(codes.InvalidArgument, "missing metadata") + } + select { + case metadataCh <- md.Get(filterCfgMetadataKey): + case <-ctx.Done(): + return nil, ctx.Err() + } + return &testpb.Empty{}, nil + }, + UnaryCallF: func(ctx context.Context, req *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { + md, ok := metadata.FromIncomingContext(ctx) + if !ok { + return nil, status.Error(codes.InvalidArgument, "missing metadata") + } + select { + case metadataCh <- md.Get(filterCfgMetadataKey): + case <-ctx.Done(): + return nil, ctx.Err() + } + return &testpb.SimpleResponse{Payload: req.GetPayload()}, nil + }, + } +} + +// Tests HTTP filters with the xDS resolver. The test exercises various levels +// of filter config overrides (base, virtual host-level, route-level and +// cluster-level), and verifies that the correct config is applied for each RPC. +func (s) TestXDSResolverHTTPFilters_AllOverrides(t *testing.T) { + // Override default WRR with a deterministic test version. + origNewWRR := rinternal.NewWRR + rinternal.NewWRR = testutils.NewTestWRR + defer func() { rinternal.NewWRR = origNewWRR }() + + // Register a custom httpFilter builder for the test. + testFilterName := t.Name() + fb := &testHTTPFilterWithRPCMetadata{logger: t, typeURL: testFilterName} + httpfilter.Register(fb) + defer httpfilter.UnregisterForTesting(fb.typeURL) + + // Spin up an xDS management server + mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true}) + defer mgmtServer.Stop() + + // Create an xDS resolver with bootstrap configuration pointing to the above + // management server. + nodeID := uuid.New().String() + bootstrapContents := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address) + if internal.NewXDSResolverWithConfigForTesting == nil { + t.Fatalf("internal.NewXDSResolverWithConfigForTesting is nil") + } + resolverBuilder, err := internal.NewXDSResolverWithConfigForTesting.(func([]byte) (resolver.Builder, error))(bootstrapContents) + if err != nil { + t.Fatalf("Failed to create xDS resolver for testing: %v", err) + } + + // Start a couple of test backends. + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + const chBufSize = 4 // Expecting 4 metadata entries (2 RPCs, each with 2 filters). + metadataCh := make(chan []string, chBufSize) + backend1 := stubserver.StartTestService(t, newStubServer(metadataCh)) + defer backend1.Stop() + backend2 := stubserver.StartTestService(t, newStubServer(metadataCh)) + defer backend2.Stop() + + // Configure resources on the management server. + // + // The route configuration contains two routes, matching two different RPCs. + // The route for the UnaryCall RPC does not contain any cluster-level or + // route-level per-filter config overrides. A virtual host-level per-filter + // config override exists and it should apply for RPCs matching this route. + // + // The route for the EmptyCall RPC contains a route-level per-filter config + // override that should apply for RPCs routed to cluster "A" since it does + // not have any cluster-level overrides. For RPCs matching cluster "B" + // though, a cluster-level per-filter config override should take + // precedence. + const testServiceName = "service-name" + const routeConfigName = "route-config" + listener := &v3listenerpb.Listener{ + Name: testServiceName, + ApiListener: &v3listenerpb.ApiListener{ + ApiListener: testutils.MarshalAny(t, &v3httppb.HttpConnectionManager{ + RouteSpecifier: &v3httppb.HttpConnectionManager_RouteConfig{ + RouteConfig: &v3routepb.RouteConfiguration{ + Name: routeConfigName, + VirtualHosts: []*v3routepb.VirtualHost{{ + Domains: []string{testServiceName}, + Routes: []*v3routepb.Route{ + { + Match: &v3routepb.RouteMatch{ + PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/grpc.testing.TestService/UnaryCall"}, + }, + Action: &v3routepb.Route_Route{ + Route: &v3routepb.RouteAction{ + ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{ + WeightedClusters: &v3routepb.WeightedCluster{ + Clusters: []*v3routepb.WeightedCluster_ClusterWeight{ + {Name: "A", Weight: wrapperspb.UInt32(1)}, + {Name: "B", Weight: wrapperspb.UInt32(1)}, + }, + }, + }, + }, + }, + }, + { + Match: &v3routepb.RouteMatch{ + PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/grpc.testing.TestService/EmptyCall"}, + }, + Action: &v3routepb.Route_Route{ + Route: &v3routepb.RouteAction{ + ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{ + WeightedClusters: &v3routepb.WeightedCluster{ + Clusters: []*v3routepb.WeightedCluster_ClusterWeight{ + { + Name: "A", + Weight: wrapperspb.UInt32(1), + }, + { + Name: "B", + Weight: wrapperspb.UInt32(1), + TypedPerFilterConfig: map[string]*anypb.Any{ + "foo": testutils.MarshalAny(t, &v3xdsxdstypepb.TypedStruct{ + TypeUrl: testFilterName, + Value: &structpb.Struct{ + Fields: map[string]*structpb.Value{ + filterCfgPathFieldName: {Kind: &structpb.Value_StringValue{StringValue: "foo4"}}, + }, + }, + }), + "bar": testutils.MarshalAny(t, &v3xdsxdstypepb.TypedStruct{ + TypeUrl: testFilterName, + Value: &structpb.Struct{ + Fields: map[string]*structpb.Value{ + filterCfgPathFieldName: {Kind: &structpb.Value_StringValue{StringValue: "bar4"}}, + }, + }, + }), + }, + }, + }, + }, + }, + }, + }, + TypedPerFilterConfig: map[string]*anypb.Any{ + "foo": testutils.MarshalAny(t, &v3xdsxdstypepb.TypedStruct{ + TypeUrl: testFilterName, + Value: &structpb.Struct{ + Fields: map[string]*structpb.Value{ + filterCfgPathFieldName: {Kind: &structpb.Value_StringValue{StringValue: "foo3"}}, + }, + }, + }), + "bar": testutils.MarshalAny(t, &v3xdsxdstypepb.TypedStruct{ + TypeUrl: testFilterName, + Value: &structpb.Struct{ + Fields: map[string]*structpb.Value{ + filterCfgPathFieldName: {Kind: &structpb.Value_StringValue{StringValue: "bar3"}}, + }, + }, + }), + }, + }, + }, + TypedPerFilterConfig: map[string]*anypb.Any{ + "foo": testutils.MarshalAny(t, &v3xdsxdstypepb.TypedStruct{ + TypeUrl: testFilterName, + Value: &structpb.Struct{ + Fields: map[string]*structpb.Value{ + filterCfgPathFieldName: {Kind: &structpb.Value_StringValue{StringValue: "foo2"}}, + }, + }, + }), + "bar": testutils.MarshalAny(t, &v3xdsxdstypepb.TypedStruct{ + TypeUrl: testFilterName, + Value: &structpb.Struct{ + Fields: map[string]*structpb.Value{ + filterCfgPathFieldName: {Kind: &structpb.Value_StringValue{StringValue: "bar2"}}, + }, + }, + }), + }, + }}, + }, + }, + HttpFilters: []*v3httppb.HttpFilter{ + newHTTPFilter(t, "foo", testFilterName, "foo1", ""), + newHTTPFilter(t, "bar", testFilterName, "bar1", ""), + e2e.RouterHTTPFilter, + }, + }), + }, + } + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{listener}, + Clusters: []*v3clusterpb.Cluster{ + e2e.DefaultCluster("A", "endpoint_A", e2e.SecurityLevelNone), + e2e.DefaultCluster("B", "endpoint_B", e2e.SecurityLevelNone), + }, + Endpoints: []*v3endpointpb.ClusterLoadAssignment{ + e2e.DefaultEndpoint("endpoint_A", "localhost", []uint32{testutils.ParsePort(t, backend1.Address)}), + e2e.DefaultEndpoint("endpoint_B", "localhost", []uint32{testutils.ParsePort(t, backend2.Address)}), + }, + SkipValidation: true, + } + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + // Create a gRPC client using the xDS resolver. + cc, err := grpc.NewClient("xds:///"+testServiceName, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(resolverBuilder)) + if err != nil { + t.Fatalf("Failed to create a gRPC client: %v", err) + } + defer cc.Close() + + // Helper to make an RPC twice and collect filter configs from metadata. We + // make the RPC two times to ensure that we hit both clusters (because of + // the deterministic WRR). The returned filter configs are in the order in + // which the RPCs were made. + collectFilterConfigs := func(rpc func() error) []overallFilterConfig { + t.Helper() + var gotFilterCfgs []overallFilterConfig + for i := 0; i < 2; i++ { + if err := rpc(); err != nil { + t.Fatalf("Unexpected RPC error: %v", err) + } + select { + case cfg := <-metadataCh: + if len(cfg) != 2 { + t.Fatalf("Unexpected number of filter config metadata, got: %d, want: 2", len(cfg)) + } + for _, c := range cfg { + var ofc overallFilterConfig + if err := json.Unmarshal([]byte(c), &ofc); err != nil { + t.Fatalf("Failed to unmarshal filter config JSON %q: %v", c, err) + } + gotFilterCfgs = append(gotFilterCfgs, ofc) + } + case <-ctx.Done(): + t.Fatalf("Timeout waiting for metadata from backend") + } + } + return gotFilterCfgs + } + + // Test base filter config (UnaryCall). Because of the deterministic WRR, we + // know the expected order of clusters for the two RPCs. + wantFilterCfgs := []overallFilterConfig{ + {BasePath: "foo1", OverridePath: "foo2"}, // Routed to cluster A + {BasePath: "bar1", OverridePath: "bar2"}, // Routed to cluster A + {BasePath: "foo1", OverridePath: "foo2"}, // Routed to cluster B + {BasePath: "bar1", OverridePath: "bar2"}, // Routed to cluster B + } + client := testgrpc.NewTestServiceClient(cc) + gotFilterCfgs := collectFilterConfigs(func() error { + _, err := client.UnaryCall(ctx, &testpb.SimpleRequest{}) + return err + }) + if diff := cmp.Diff(wantFilterCfgs, gotFilterCfgs); diff != "" { + t.Fatalf("Unexpected filter configs (-want +got):\n%s", diff) + } + + // Test per-route and per-cluster overrides (EmptyCall). + wantFilterCfgs = []overallFilterConfig{ + {BasePath: "foo1", OverridePath: "foo3"}, // Routed to cluster A + {BasePath: "bar1", OverridePath: "bar3"}, // Routed to cluster A + {BasePath: "foo1", OverridePath: "foo4"}, // Routed to cluster B + {BasePath: "bar1", OverridePath: "bar4"}, // Routed to cluster B + } + gotFilterCfgs = collectFilterConfigs(func() error { + _, err := client.EmptyCall(ctx, &testpb.Empty{}) + return err + }) + if diff := cmp.Diff(wantFilterCfgs, gotFilterCfgs); diff != "" { + t.Fatalf("Unexpected filter configs (-want +got):\n%s", diff) + } +} + +// Tests that if a filter returns an error from its NewStream method, the RPC +// fails with that error. It also verifies that subsequent filters in the chain +// are not run. +func (s) TestXDSResolverHTTPFilters_NewStreamError(t *testing.T) { + // Register a custom httpFilter builder for the test and use a channel to + // get notified when the interceptor is invoked. + testFilterName := t.Name() + fb := &testHTTPFilterWithRPCMetadata{ + logger: t, + typeURL: testFilterName, + newStreamChan: testutils.NewChannelWithSize(3), // We have three filters. + } + httpfilter.Register(fb) + defer httpfilter.UnregisterForTesting(fb.typeURL) + + // Spin up an xDS management server + mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true}) + defer mgmtServer.Stop() + + // Create an xDS resolver with bootstrap configuration pointing to the above + // management server. + nodeID := uuid.New().String() + bootstrapContents := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address) + if internal.NewXDSResolverWithConfigForTesting == nil { + t.Fatalf("internal.NewXDSResolverWithConfigForTesting is nil") + } + resolverBuilder, err := internal.NewXDSResolverWithConfigForTesting.(func([]byte) (resolver.Builder, error))(bootstrapContents) + if err != nil { + t.Fatalf("Failed to create xDS resolver for testing: %v", err) + } + + // Start a test backend, but we expect the filter to fail the RPC before it + // ever gets to the backend. The test is designed to fail if the RPC + // *succeeds* (i.e., if the backend is reached). A large channel buffer is + // used to prevent blocking in the unexpected case where the filter fails to + // reject the RPC. + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + metadataCh := make(chan []string, 10) + backend := stubserver.StartTestService(t, newStubServer(metadataCh)) + defer backend.Stop() + + // Configure resources on the management server. + // + // The route configuration contains two routes, matching two different RPCs. + // The route for the UnaryCall RPC does not contain any cluster-level or + // route-level per-filter config overrides. A virtual host-level per-filter + // config override exists and it should apply for RPCs matching this route. + // + // The route for the EmptyCall RPC contains a route-level per-filter config + // override that should apply for RPCs routed to cluster "A" since it does + // not have any cluster-level overrides. For RPCs matching cluster "B" + // though, a cluster-level per-filter config override should take + // precedence. + const testServiceName = "service-name" + const routeConfigName = "route-config" + listener := &v3listenerpb.Listener{ + Name: testServiceName, + ApiListener: &v3listenerpb.ApiListener{ + ApiListener: testutils.MarshalAny(t, &v3httppb.HttpConnectionManager{ + RouteSpecifier: &v3httppb.HttpConnectionManager_RouteConfig{ + RouteConfig: &v3routepb.RouteConfiguration{ + Name: routeConfigName, + VirtualHosts: []*v3routepb.VirtualHost{{ + Domains: []string{testServiceName}, + Routes: []*v3routepb.Route{ + { + Match: &v3routepb.RouteMatch{ + PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/grpc.testing.TestService/EmptyCall"}, + }, + Action: &v3routepb.Route_Route{ + Route: &v3routepb.RouteAction{ + ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{ + WeightedClusters: &v3routepb.WeightedCluster{ + Clusters: []*v3routepb.WeightedCluster_ClusterWeight{ + {Name: "A", Weight: wrapperspb.UInt32(1)}, + }, + }, + }, + }, + }, + }, + }, + }}, + }, + }, + HttpFilters: []*v3httppb.HttpFilter{ + newHTTPFilter(t, "foo-good", testFilterName, "foo-good", ""), + newHTTPFilter(t, "foo-failing", testFilterName, "foo-failing", "filter interceptor error"), + newHTTPFilter(t, "bar-good", testFilterName, "bar-good", ""), + e2e.RouterHTTPFilter, + }, + }), + }, + } + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{listener}, + Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster("A", "endpoint_A", e2e.SecurityLevelNone)}, + Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint("endpoint_A", "localhost", []uint32{testutils.ParsePort(t, backend.Address)})}, + SkipValidation: true, + } + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + // Create a gRPC client using the xDS resolver. + cc, err := grpc.NewClient("xds:///"+testServiceName, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(resolverBuilder)) + if err != nil { + t.Fatalf("Failed to create a gRPC client: %v", err) + } + defer cc.Close() + + client := testgrpc.NewTestServiceClient(cc) + _, err = client.EmptyCall(ctx, &testpb.Empty{}) + if err == nil { + t.Fatalf("EmptyCall() RPC succeeded when expected to fail") + } + if got, want := status.Code(err), codes.Unavailable; got != want { + t.Fatalf("EmptyCall() RPC error code, got: %v, want: %v", got, want) + } + if got, want := err.Error(), "filter interceptor error"; !strings.Contains(got, want) { + t.Fatalf("Unexpected RPC error, got: %v, want: %v", err, "rpc error: code = Unavailable desc = filter interceptor error") + } + + // Verify that the first good filter was invoked + cfg, err := fb.newStreamChan.Receive(ctx) + if err != nil { + t.Fatal("Timeout waiting for first filter to be invoked") + } + ofc := cfg.(overallFilterConfig) + wantCfg := overallFilterConfig{BasePath: "foo-good"} + if diff := cmp.Diff(wantCfg, ofc); diff != "" { + t.Fatalf("Unexpected first filter config (-want +got):\n%s", diff) + } + + // Verify that the failing filter was invoked too. + cfg, err = fb.newStreamChan.Receive(ctx) + if err != nil { + t.Fatal("Timeout waiting for second filter to be invoked") + } + ofc = cfg.(overallFilterConfig) + wantCfg = overallFilterConfig{BasePath: "foo-failing", Error: "filter interceptor error"} + if diff := cmp.Diff(wantCfg, ofc); diff != "" { + t.Fatalf("Unexpected second filter config (-want +got):\n%s", diff) + } + + // Verify that the last good filter was not invoked. + sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout) + defer sCancel() + if _, err = fb.newStreamChan.Receive(sCtx); err == nil { + t.Fatal("Last filter was invoked when expected not to be") + } +} diff --git a/internal/xds/resolver/xds_resolver.go b/internal/xds/resolver/xds_resolver.go index 8bb960a6567f..e867f0cc53d2 100644 --- a/internal/xds/resolver/xds_resolver.go +++ b/internal/xds/resolver/xds_resolver.go @@ -328,7 +328,7 @@ func (r *xdsResolver) sendNewServiceConfig(cs stoppableConfigSelector) bool { // r.activeClusters for previously-unseen clusters. // // Only executed in the context of a serializer callback. -func (r *xdsResolver) newConfigSelector() *configSelector { +func (r *xdsResolver) newConfigSelector() (*configSelector, error) { cs := &configSelector{ channelID: r.channelID, xdsNodeID: r.xdsClient.BootstrapConfig().Node().GetId(), @@ -338,8 +338,7 @@ func (r *xdsResolver) newConfigSelector() *configSelector { }) }, virtualHost: virtualHost{ - httpFilterConfigOverride: r.currentVirtualHost.HTTPFilterConfigOverride, - retryConfig: r.currentVirtualHost.RetryConfig, + retryConfig: r.currentVirtualHost.RetryConfig, }, routes: make([]route, len(r.currentVirtualHost.Routes)), clusters: make(map[string]*clusterInfo), @@ -350,18 +349,20 @@ func (r *xdsResolver) newConfigSelector() *configSelector { clusters := rinternal.NewWRR.(func() wrr.WRR)() if rt.ClusterSpecifierPlugin != "" { clusterName := clusterSpecifierPluginPrefix + rt.ClusterSpecifierPlugin - clusters.Add(&routeCluster{ - name: clusterName, - }, 1) + clusters.Add(&routeCluster{name: clusterName}, 1) ci := r.addOrGetActiveClusterInfo(clusterName) ci.cfg = xdsChildConfig{ChildPolicy: balancerConfig(r.currentRouteConfig.ClusterSpecifierPlugins[rt.ClusterSpecifierPlugin])} cs.clusters[clusterName] = ci } else { for _, wc := range rt.WeightedClusters { clusterName := clusterPrefix + wc.Name + interceptor, err := newInterceptor(r.currentListener.HTTPFilters, wc.HTTPFilterConfigOverride, rt.HTTPFilterConfigOverride, r.currentVirtualHost.HTTPFilterConfigOverride) + if err != nil { + return nil, err + } clusters.Add(&routeCluster{ - name: clusterName, - httpFilterConfigOverride: wc.HTTPFilterConfigOverride, + name: clusterName, + interceptor: interceptor, }, int64(wc.Weight)) ci := r.addOrGetActiveClusterInfo(clusterName) ci.cfg = xdsChildConfig{ChildPolicy: newBalancerConfig(cdsName, cdsBalancerConfig{Cluster: wc.Name})} @@ -378,7 +379,6 @@ func (r *xdsResolver) newConfigSelector() *configSelector { cs.routes[i].maxStreamDuration = *rt.MaxStreamDuration } - cs.routes[i].httpFilterConfigOverride = rt.HTTPFilterConfigOverride cs.routes[i].retryConfig = rt.RetryConfig cs.routes[i].hashPolicies = rt.HashPolicies } @@ -390,7 +390,7 @@ func (r *xdsResolver) newConfigSelector() *configSelector { atomic.AddInt32(&ci.refCount, 1) } - return cs + return cs, nil } // pruneActiveClusters deletes entries in r.activeClusters with zero @@ -446,7 +446,10 @@ func (r *xdsResolver) onResolutionComplete() { return } - cs := r.newConfigSelector() + cs, err := r.newConfigSelector() + if err != nil { + r.onResourceError(fmt.Errorf("xds: failed to create config selector: %v", err)) + } if !r.sendNewServiceConfig(cs) { // Channel didn't like the update we provided (unexpected); erase // this config selector and ignore this update, continuing with diff --git a/internal/xds/resolver/xds_resolver_test.go b/internal/xds/resolver/xds_resolver_test.go index b28724d37383..09e82d6c66a1 100644 --- a/internal/xds/resolver/xds_resolver_test.go +++ b/internal/xds/resolver/xds_resolver_test.go @@ -41,20 +41,15 @@ import ( "google.golang.org/grpc/internal/testutils/xds/e2e" "google.golang.org/grpc/internal/xds/balancer/clustermanager" "google.golang.org/grpc/internal/xds/bootstrap" - "google.golang.org/grpc/internal/xds/httpfilter" rinternal "google.golang.org/grpc/internal/xds/resolver/internal" "google.golang.org/grpc/internal/xds/xdsclient" "google.golang.org/grpc/internal/xds/xdsclient/xdsresource/version" "google.golang.org/grpc/metadata" "google.golang.org/grpc/resolver" "google.golang.org/grpc/serviceconfig" - "google.golang.org/protobuf/proto" - "google.golang.org/protobuf/types/known/anypb" "google.golang.org/protobuf/types/known/durationpb" - "google.golang.org/protobuf/types/known/structpb" "google.golang.org/protobuf/types/known/wrapperspb" - v3xdsxdstypepb "github.com/cncf/xds/go/xds/type/v3" v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" @@ -1158,106 +1153,6 @@ func (s) TestResolverWRR(t *testing.T) { } } -const filterCfgPathFieldName = "path" -const filterCfgErrorFieldName = "new_stream_error" - -type filterCfg struct { - httpfilter.FilterConfig - path string - newStreamErr error -} - -type filterBuilder struct { - paths []string - typeURL string -} - -func (fb *filterBuilder) TypeURLs() []string { return []string{fb.typeURL} } - -func filterConfigFromProto(cfg proto.Message) (httpfilter.FilterConfig, error) { - ts, ok := cfg.(*v3xdsxdstypepb.TypedStruct) - if !ok { - return nil, fmt.Errorf("unsupported filter config type: %T, want %T", cfg, &v3xdsxdstypepb.TypedStruct{}) - } - - if ts.GetValue() == nil { - return filterCfg{}, nil - } - ret := filterCfg{} - if v := ts.GetValue().GetFields()[filterCfgPathFieldName]; v != nil { - ret.path = v.GetStringValue() - } - if v := ts.GetValue().GetFields()[filterCfgErrorFieldName]; v != nil { - if v.GetStringValue() == "" { - ret.newStreamErr = nil - } else { - ret.newStreamErr = fmt.Errorf("%s", v.GetStringValue()) - } - } - return ret, nil -} - -func (*filterBuilder) ParseFilterConfig(cfg proto.Message) (httpfilter.FilterConfig, error) { - return filterConfigFromProto(cfg) -} - -func (*filterBuilder) ParseFilterConfigOverride(override proto.Message) (httpfilter.FilterConfig, error) { - return filterConfigFromProto(override) -} - -func (*filterBuilder) IsTerminal() bool { return false } - -var _ httpfilter.ClientInterceptorBuilder = &filterBuilder{} - -func (fb *filterBuilder) BuildClientInterceptor(config, override httpfilter.FilterConfig) (iresolver.ClientInterceptor, error) { - if config == nil { - panic("unexpected missing config") - } - - fi := &filterInterceptor{ - parent: fb, - pathCh: make(chan string, 10), - } - - fb.paths = append(fb.paths, "build:"+config.(filterCfg).path) - err := config.(filterCfg).newStreamErr - if override != nil { - fb.paths = append(fb.paths, "override:"+override.(filterCfg).path) - err = override.(filterCfg).newStreamErr - } - - fi.cfgPath = config.(filterCfg).path - fi.err = err - return fi, nil -} - -type filterInterceptor struct { - parent *filterBuilder - pathCh chan string - cfgPath string - err error -} - -func (fi *filterInterceptor) NewStream(ctx context.Context, _ iresolver.RPCInfo, done func(), newStream func(ctx context.Context, done func()) (iresolver.ClientStream, error)) (iresolver.ClientStream, error) { - fi.parent.paths = append(fi.parent.paths, "newstream:"+fi.cfgPath) - if fi.err != nil { - return nil, fi.err - } - d := func() { - fi.parent.paths = append(fi.parent.paths, "done:"+fi.cfgPath) - done() - } - cs, err := newStream(ctx, d) - if err != nil { - return nil, err - } - return &clientStream{ClientStream: cs}, nil -} - -type clientStream struct { - iresolver.ClientStream -} - func (s) TestConfigSelector_FailureCases(t *testing.T) { const methodName = "1" @@ -1345,92 +1240,145 @@ func (s) TestConfigSelector_FailureCases(t *testing.T) { } } -func newHTTPFilter(t *testing.T, name, typeURL, path, err string) *v3httppb.HttpFilter { - return &v3httppb.HttpFilter{ - Name: name, - ConfigType: &v3httppb.HttpFilter_TypedConfig{ - TypedConfig: testutils.MarshalAny(t, &v3xdsxdstypepb.TypedStruct{ - TypeUrl: typeURL, - Value: &structpb.Struct{ - Fields: map[string]*structpb.Value{ - filterCfgPathFieldName: {Kind: &structpb.Value_StringValue{StringValue: path}}, - filterCfgErrorFieldName: {Kind: &structpb.Value_StringValue{StringValue: err}}, - }, - }, - }), - }, +/* +const filterCfgPathFieldName = "path" +const filterCfgErrorFieldName = "new_stream_error" + + type filterCfg struct { + httpfilter.FilterConfig + path string + newStreamErr error } -} -func (s) TestXDSResolverHTTPFilters(t *testing.T) { - const methodName1 = "1" - const methodName2 = "2" - testFilterName := t.Name() + type filterBuilder struct { + paths []string + typeURL string + } - testCases := []struct { - name string - listener *v3listenerpb.Listener - rpcRes map[string][][]string - wantStreamErr string - }{ - { - name: "NewStream error - ensure earlier interceptor Done is still called", - listener: &v3listenerpb.Listener{ - Name: defaultTestServiceName, - ApiListener: &v3listenerpb.ApiListener{ - ApiListener: testutils.MarshalAny(t, &v3httppb.HttpConnectionManager{ - RouteSpecifier: &v3httppb.HttpConnectionManager_RouteConfig{ - RouteConfig: &v3routepb.RouteConfiguration{ - Name: defaultTestRouteConfigName, - VirtualHosts: []*v3routepb.VirtualHost{{ - Domains: []string{defaultTestServiceName}, - Routes: []*v3routepb.Route{{ - Match: &v3routepb.RouteMatch{ - PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: methodName1}, - }, - Action: &v3routepb.Route_Route{ - Route: &v3routepb.RouteAction{ - ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{ - WeightedClusters: &v3routepb.WeightedCluster{ - Clusters: []*v3routepb.WeightedCluster_ClusterWeight{ - {Name: "A", Weight: wrapperspb.UInt32(1)}, - {Name: "B", Weight: wrapperspb.UInt32(1)}, - }, - }, - }, - }, - }, - }}, - }}, - }}, - HttpFilters: []*v3httppb.HttpFilter{ - newHTTPFilter(t, "foo", testFilterName, "foo1", ""), - newHTTPFilter(t, "bar", testFilterName, "bar1", "bar newstream err"), - e2e.RouterHTTPFilter, +func (fb *filterBuilder) TypeURLs() []string { return []string{fb.typeURL} } + + func filterConfigFromProto(cfg proto.Message) (httpfilter.FilterConfig, error) { + ts, ok := cfg.(*v3xdsxdstypepb.TypedStruct) + if !ok { + return nil, fmt.Errorf("unsupported filter config type: %T, want %T", cfg, &v3xdsxdstypepb.TypedStruct{}) + } + + if ts.GetValue() == nil { + return filterCfg{}, nil + } + ret := filterCfg{} + if v := ts.GetValue().GetFields()[filterCfgPathFieldName]; v != nil { + ret.path = v.GetStringValue() + } + if v := ts.GetValue().GetFields()[filterCfgErrorFieldName]; v != nil { + if v.GetStringValue() == "" { + ret.newStreamErr = nil + } else { + ret.newStreamErr = fmt.Errorf("%s", v.GetStringValue()) + } + } + return ret, nil + } + + func (*filterBuilder) ParseFilterConfig(cfg proto.Message) (httpfilter.FilterConfig, error) { + return filterConfigFromProto(cfg) + } + + func (*filterBuilder) ParseFilterConfigOverride(override proto.Message) (httpfilter.FilterConfig, error) { + return filterConfigFromProto(override) + } + +func (*filterBuilder) IsTerminal() bool { return false } + +var _ httpfilter.ClientInterceptorBuilder = &filterBuilder{} + + func (fb *filterBuilder) BuildClientInterceptor(config, override httpfilter.FilterConfig) (iresolver.ClientInterceptor, error) { + if config == nil { + panic("unexpected missing config") + } + + fi := &filterInterceptor{ + parent: fb, + } + + fb.paths = append(fb.paths, "build:"+config.(filterCfg).path) + err := config.(filterCfg).newStreamErr + if override != nil { + fb.paths = append(fb.paths, "override:"+override.(filterCfg).path) + err = override.(filterCfg).newStreamErr + } + + fi.cfgPath = config.(filterCfg).path + fi.err = err + return fi, nil + } + + type filterInterceptor struct { + parent *filterBuilder + cfgPath string + err error + } + + func (fi *filterInterceptor) NewStream(ctx context.Context, _ iresolver.RPCInfo, done func(), newStream func(ctx context.Context, done func()) (iresolver.ClientStream, error)) (iresolver.ClientStream, error) { + fi.parent.paths = append(fi.parent.paths, "newstream:"+fi.cfgPath) + if fi.err != nil { + return nil, fi.err + } + d := func() { + fi.parent.paths = append(fi.parent.paths, "done:"+fi.cfgPath) + done() + } + cs, err := newStream(ctx, d) + if err != nil { + return nil, err + } + return &clientStream{ClientStream: cs}, nil + } + + type clientStream struct { + iresolver.ClientStream + } + + func newHTTPFilter(t *testing.T, name, typeURL, path, err string) *v3httppb.HttpFilter { + return &v3httppb.HttpFilter{ + Name: name, + ConfigType: &v3httppb.HttpFilter_TypedConfig{ + TypedConfig: testutils.MarshalAny(t, &v3xdsxdstypepb.TypedStruct{ + TypeUrl: typeURL, + Value: &structpb.Struct{ + Fields: map[string]*structpb.Value{ + filterCfgPathFieldName: {Kind: &structpb.Value_StringValue{StringValue: path}}, + filterCfgErrorFieldName: {Kind: &structpb.Value_StringValue{StringValue: err}}, }, - }), - }, - }, - rpcRes: map[string][][]string{ - methodName1: { - {"build:foo1", "build:bar1", "newstream:foo1", "newstream:bar1", "done:foo1"}, // err in bar1 NewStream() - }, + }, + }), }, - wantStreamErr: "bar newstream err", - }, - { - name: "all overrides", - listener: &v3listenerpb.Listener{ - Name: defaultTestServiceName, - ApiListener: &v3listenerpb.ApiListener{ - ApiListener: testutils.MarshalAny(t, &v3httppb.HttpConnectionManager{ - RouteSpecifier: &v3httppb.HttpConnectionManager_RouteConfig{ - RouteConfig: &v3routepb.RouteConfiguration{ - Name: defaultTestRouteConfigName, - VirtualHosts: []*v3routepb.VirtualHost{{ - Domains: []string{defaultTestServiceName}, - Routes: []*v3routepb.Route{ - { + } + } + + func (s) TestXDSResolverHTTPFilters(t *testing.T) { + const methodName1 = "1" + const methodName2 = "2" + testFilterName := t.Name() + + testCases := []struct { + name string + listener *v3listenerpb.Listener + rpcRes map[string][][]string + wantStreamErr string + }{ + { + name: "NewStream error - ensure earlier interceptor Done is still called", + listener: &v3listenerpb.Listener{ + Name: defaultTestServiceName, + ApiListener: &v3listenerpb.ApiListener{ + ApiListener: testutils.MarshalAny(t, &v3httppb.HttpConnectionManager{ + RouteSpecifier: &v3httppb.HttpConnectionManager_RouteConfig{ + RouteConfig: &v3routepb.RouteConfiguration{ + Name: defaultTestRouteConfigName, + VirtualHosts: []*v3routepb.VirtualHost{{ + Domains: []string{defaultTestServiceName}, + Routes: []*v3routepb.Route{{ Match: &v3routepb.RouteMatch{ PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: methodName1}, }, @@ -1446,196 +1394,244 @@ func (s) TestXDSResolverHTTPFilters(t *testing.T) { }, }, }, - }, - { - Match: &v3routepb.RouteMatch{ - PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: methodName2}, + }}, + }}, + }}, + HttpFilters: []*v3httppb.HttpFilter{ + newHTTPFilter(t, "foo", testFilterName, "foo1", ""), + newHTTPFilter(t, "bar", testFilterName, "bar1", "bar newstream err"), + e2e.RouterHTTPFilter, + }, + }), + }, + }, + rpcRes: map[string][][]string{ + methodName1: { + {"build:foo1", "build:bar1", "newstream:foo1", "newstream:bar1", "done:foo1"}, // err in bar1 NewStream() + }, + }, + wantStreamErr: "bar newstream err", + }, + { + name: "all overrides", + listener: &v3listenerpb.Listener{ + Name: defaultTestServiceName, + ApiListener: &v3listenerpb.ApiListener{ + ApiListener: testutils.MarshalAny(t, &v3httppb.HttpConnectionManager{ + RouteSpecifier: &v3httppb.HttpConnectionManager_RouteConfig{ + RouteConfig: &v3routepb.RouteConfiguration{ + Name: defaultTestRouteConfigName, + VirtualHosts: []*v3routepb.VirtualHost{{ + Domains: []string{defaultTestServiceName}, + Routes: []*v3routepb.Route{ + { + Match: &v3routepb.RouteMatch{ + PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: methodName1}, + }, + Action: &v3routepb.Route_Route{ + Route: &v3routepb.RouteAction{ + ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{ + WeightedClusters: &v3routepb.WeightedCluster{ + Clusters: []*v3routepb.WeightedCluster_ClusterWeight{ + {Name: "A", Weight: wrapperspb.UInt32(1)}, + {Name: "B", Weight: wrapperspb.UInt32(1)}, + }, + }, + }, + }, + }, }, - Action: &v3routepb.Route_Route{ - Route: &v3routepb.RouteAction{ - ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{ - WeightedClusters: &v3routepb.WeightedCluster{ - Clusters: []*v3routepb.WeightedCluster_ClusterWeight{ - {Name: "A", Weight: wrapperspb.UInt32(1)}, - { - Name: "B", - Weight: wrapperspb.UInt32(1), - TypedPerFilterConfig: map[string]*anypb.Any{ - "foo": testutils.MarshalAny(t, &v3xdsxdstypepb.TypedStruct{ - TypeUrl: testFilterName, - Value: &structpb.Struct{ - Fields: map[string]*structpb.Value{ - filterCfgPathFieldName: {Kind: &structpb.Value_StringValue{StringValue: "foo4"}}, + { + Match: &v3routepb.RouteMatch{ + PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: methodName2}, + }, + Action: &v3routepb.Route_Route{ + Route: &v3routepb.RouteAction{ + ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{ + WeightedClusters: &v3routepb.WeightedCluster{ + Clusters: []*v3routepb.WeightedCluster_ClusterWeight{ + {Name: "A", Weight: wrapperspb.UInt32(1)}, + { + Name: "B", + Weight: wrapperspb.UInt32(1), + TypedPerFilterConfig: map[string]*anypb.Any{ + "foo": testutils.MarshalAny(t, &v3xdsxdstypepb.TypedStruct{ + TypeUrl: testFilterName, + Value: &structpb.Struct{ + Fields: map[string]*structpb.Value{ + filterCfgPathFieldName: {Kind: &structpb.Value_StringValue{StringValue: "foo4"}}, + }, }, - }, - }), - "bar": testutils.MarshalAny(t, &v3xdsxdstypepb.TypedStruct{ - TypeUrl: testFilterName, - Value: &structpb.Struct{ - Fields: map[string]*structpb.Value{ - filterCfgPathFieldName: {Kind: &structpb.Value_StringValue{StringValue: "bar4"}}, + }), + "bar": testutils.MarshalAny(t, &v3xdsxdstypepb.TypedStruct{ + TypeUrl: testFilterName, + Value: &structpb.Struct{ + Fields: map[string]*structpb.Value{ + filterCfgPathFieldName: {Kind: &structpb.Value_StringValue{StringValue: "bar4"}}, + }, }, - }, - }), + }), + }, }, }, }, }, }, }, - }, - TypedPerFilterConfig: map[string]*anypb.Any{ - "foo": testutils.MarshalAny(t, &v3xdsxdstypepb.TypedStruct{ - TypeUrl: testFilterName, - Value: &structpb.Struct{ - Fields: map[string]*structpb.Value{ - filterCfgPathFieldName: {Kind: &structpb.Value_StringValue{StringValue: "foo3"}}, - filterCfgErrorFieldName: {Kind: &structpb.Value_StringValue{StringValue: ""}}, + TypedPerFilterConfig: map[string]*anypb.Any{ + "foo": testutils.MarshalAny(t, &v3xdsxdstypepb.TypedStruct{ + TypeUrl: testFilterName, + Value: &structpb.Struct{ + Fields: map[string]*structpb.Value{ + filterCfgPathFieldName: {Kind: &structpb.Value_StringValue{StringValue: "foo3"}}, + filterCfgErrorFieldName: {Kind: &structpb.Value_StringValue{StringValue: ""}}, + }, }, - }, - }), - "bar": testutils.MarshalAny(t, &v3xdsxdstypepb.TypedStruct{ - TypeUrl: testFilterName, - Value: &structpb.Struct{ - Fields: map[string]*structpb.Value{ - filterCfgPathFieldName: {Kind: &structpb.Value_StringValue{StringValue: "bar3"}}, + }), + "bar": testutils.MarshalAny(t, &v3xdsxdstypepb.TypedStruct{ + TypeUrl: testFilterName, + Value: &structpb.Struct{ + Fields: map[string]*structpb.Value{ + filterCfgPathFieldName: {Kind: &structpb.Value_StringValue{StringValue: "bar3"}}, + }, }, - }, - }), + }), + }, }, }, - }, - TypedPerFilterConfig: map[string]*anypb.Any{ - "foo": testutils.MarshalAny(t, &v3xdsxdstypepb.TypedStruct{ - TypeUrl: testFilterName, - Value: &structpb.Struct{ - Fields: map[string]*structpb.Value{ - filterCfgPathFieldName: {Kind: &structpb.Value_StringValue{StringValue: "foo2"}}, - filterCfgErrorFieldName: {Kind: &structpb.Value_StringValue{StringValue: ""}}, + TypedPerFilterConfig: map[string]*anypb.Any{ + "foo": testutils.MarshalAny(t, &v3xdsxdstypepb.TypedStruct{ + TypeUrl: testFilterName, + Value: &structpb.Struct{ + Fields: map[string]*structpb.Value{ + filterCfgPathFieldName: {Kind: &structpb.Value_StringValue{StringValue: "foo2"}}, + filterCfgErrorFieldName: {Kind: &structpb.Value_StringValue{StringValue: ""}}, + }, }, - }, - }), - "bar": testutils.MarshalAny(t, &v3xdsxdstypepb.TypedStruct{ - TypeUrl: testFilterName, - Value: &structpb.Struct{ - Fields: map[string]*structpb.Value{ - filterCfgPathFieldName: {Kind: &structpb.Value_StringValue{StringValue: "bar2"}}, + }), + "bar": testutils.MarshalAny(t, &v3xdsxdstypepb.TypedStruct{ + TypeUrl: testFilterName, + Value: &structpb.Struct{ + Fields: map[string]*structpb.Value{ + filterCfgPathFieldName: {Kind: &structpb.Value_StringValue{StringValue: "bar2"}}, + }, }, - }, - }), - }, + }), + }, + }}, }}, - }}, - HttpFilters: []*v3httppb.HttpFilter{ - newHTTPFilter(t, "foo", testFilterName, "foo1", "this is overridden to nil"), - newHTTPFilter(t, "bar", testFilterName, "bar1", ""), - e2e.RouterHTTPFilter, - }, - }), - }, - }, - rpcRes: map[string][][]string{ - methodName1: { - {"build:foo1", "override:foo2", "build:bar1", "override:bar2", "newstream:foo1", "newstream:bar1", "done:bar1", "done:foo1"}, - {"build:foo1", "override:foo2", "build:bar1", "override:bar2", "newstream:foo1", "newstream:bar1", "done:bar1", "done:foo1"}, + HttpFilters: []*v3httppb.HttpFilter{ + newHTTPFilter(t, "foo", testFilterName, "foo1", "this is overridden to nil"), + newHTTPFilter(t, "bar", testFilterName, "bar1", ""), + e2e.RouterHTTPFilter, + }, + }), + }, }, - methodName2: { - {"build:foo1", "override:foo3", "build:bar1", "override:bar3", "newstream:foo1", "newstream:bar1", "done:bar1", "done:foo1"}, - {"build:foo1", "override:foo4", "build:bar1", "override:bar4", "newstream:foo1", "newstream:bar1", "done:bar1", "done:foo1"}, - {"build:foo1", "override:foo3", "build:bar1", "override:bar3", "newstream:foo1", "newstream:bar1", "done:bar1", "done:foo1"}, - {"build:foo1", "override:foo4", "build:bar1", "override:bar4", "newstream:foo1", "newstream:bar1", "done:bar1", "done:foo1"}, + rpcRes: map[string][][]string{ + methodName1: { + {"build:foo1", "override:foo2", "build:bar1", "override:bar2", "newstream:foo1", "newstream:bar1", "done:bar1", "done:foo1"}, + {"build:foo1", "override:foo2", "build:bar1", "override:bar2", "newstream:foo1", "newstream:bar1", "done:bar1", "done:foo1"}, + }, + methodName2: { + {"build:foo1", "override:foo3", "build:bar1", "override:bar3", "newstream:foo1", "newstream:bar1", "done:bar1", "done:foo1"}, + {"build:foo1", "override:foo4", "build:bar1", "override:bar4", "newstream:foo1", "newstream:bar1", "done:bar1", "done:foo1"}, + {"build:foo1", "override:foo3", "build:bar1", "override:bar3", "newstream:foo1", "newstream:bar1", "done:bar1", "done:foo1"}, + {"build:foo1", "override:foo4", "build:bar1", "override:bar4", "newstream:foo1", "newstream:bar1", "done:bar1", "done:foo1"}, + }, }, }, - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - origNewWRR := rinternal.NewWRR - rinternal.NewWRR = testutils.NewTestWRR - defer func() { rinternal.NewWRR = origNewWRR }() - - // Register a custom httpFilter builder for the test. - fb := &filterBuilder{typeURL: testFilterName} - httpfilter.Register(fb) - - // Spin up an xDS management server. - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - nodeID := uuid.New().String() - mgmtServer, _, _, bc := setupManagementServerForTest(t, nodeID) - - // Build an xDS resolver. - stateCh, _, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + defaultTestServiceName)}, bc) - - cluster := []*v3clusterpb.Cluster{ - e2e.DefaultCluster("A", "endpoint_A", e2e.SecurityLevelNone), - e2e.DefaultCluster("B", "endpoint_B", e2e.SecurityLevelNone), - } - endpoints := []*v3endpointpb.ClusterLoadAssignment{ - e2e.DefaultEndpoint("endpoint_A", defaultTestHostname, defaultTestPort), - e2e.DefaultEndpoint("endpoint_B", defaultTestHostname, defaultTestPort), - } - // Update the management server with a listener resource that - // contains an inline route configuration. - configureAllResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, []*v3listenerpb.Listener{tc.listener}, nil, cluster, endpoints) + } - // Ensure that the resolver pushes a state update to the channel. - cs := verifyUpdateFromResolver(ctx, t, stateCh, "") + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + origNewWRR := rinternal.NewWRR + rinternal.NewWRR = testutils.NewTestWRR + defer func() { rinternal.NewWRR = origNewWRR }() + + // Register a custom httpFilter builder for the test. + fb := &filterBuilder{typeURL: testFilterName} + httpfilter.Register(fb) + defer httpfilter.UnregisterForTesting(fb.typeURL) + + // Spin up an xDS management server. + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + nodeID := uuid.New().String() + mgmtServer, _, _, bc := setupManagementServerForTest(t, nodeID) + + // Build an xDS resolver. + stateCh, _, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + defaultTestServiceName)}, bc) + + cluster := []*v3clusterpb.Cluster{ + e2e.DefaultCluster("A", "endpoint_A", e2e.SecurityLevelNone), + e2e.DefaultCluster("B", "endpoint_B", e2e.SecurityLevelNone), + } + endpoints := []*v3endpointpb.ClusterLoadAssignment{ + e2e.DefaultEndpoint("endpoint_A", defaultTestHostname, defaultTestPort), + e2e.DefaultEndpoint("endpoint_B", defaultTestHostname, defaultTestPort), + } + // Update the management server with a listener resource that + // contains an inline route configuration. + configureAllResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, []*v3listenerpb.Listener{tc.listener}, nil, cluster, endpoints) + + // Ensure that the resolver pushes a state update to the channel. + cs := verifyUpdateFromResolver(ctx, t, stateCh, "") + + for method, wants := range tc.rpcRes { + // Order of wants is non-deterministic. + remainingWant := make([][]string, len(wants)) + copy(remainingWant, wants) + for n := range wants { + res, err := cs.SelectConfig(iresolver.RPCInfo{Method: method, Context: ctx}) + if err != nil { + t.Fatalf("Unexpected error from cs.SelectConfig(_): %v", err) + } - for method, wants := range tc.rpcRes { - // Order of wants is non-deterministic. - remainingWant := make([][]string, len(wants)) - copy(remainingWant, wants) - for n := range wants { - res, err := cs.SelectConfig(iresolver.RPCInfo{Method: method, Context: ctx}) - if err != nil { - t.Fatalf("Unexpected error from cs.SelectConfig(_): %v", err) - } + var doneFunc func() + _, err = res.Interceptor.NewStream(ctx, iresolver.RPCInfo{}, func() {}, func(_ context.Context, done func()) (iresolver.ClientStream, error) { + doneFunc = done + return nil, nil + }) + if tc.wantStreamErr != "" { + if err == nil || !strings.Contains(err.Error(), tc.wantStreamErr) { + t.Errorf("NewStream(...) = _, %v; want _, Contains(%v)", err, tc.wantStreamErr) + } + if err == nil { + res.OnCommitted() + doneFunc() + } + continue + } + if err != nil { + t.Fatalf("unexpected error from Interceptor.NewStream: %v", err) - var doneFunc func() - _, err = res.Interceptor.NewStream(ctx, iresolver.RPCInfo{}, func() {}, func(_ context.Context, done func()) (iresolver.ClientStream, error) { - doneFunc = done - return nil, nil - }) - if tc.wantStreamErr != "" { - if err == nil || !strings.Contains(err.Error(), tc.wantStreamErr) { - t.Errorf("NewStream(...) = _, %v; want _, Contains(%v)", err, tc.wantStreamErr) } - if err == nil { - res.OnCommitted() - doneFunc() + res.OnCommitted() + doneFunc() + + gotPaths := fb.paths + fb.paths = []string{} + + // Confirm the desired path is found in remainingWant, and remove it. + pass := false + for i := range remainingWant { + if cmp.Equal(gotPaths, remainingWant[i]) { + remainingWant[i] = remainingWant[len(remainingWant)-1] + remainingWant = remainingWant[:len(remainingWant)-1] + pass = true + break + } } - continue - } - if err != nil { - t.Fatalf("unexpected error from Interceptor.NewStream: %v", err) - - } - res.OnCommitted() - doneFunc() - - gotPaths := fb.paths - fb.paths = []string{} - - // Confirm the desired path is found in remainingWant, and remove it. - pass := false - for i := range remainingWant { - if cmp.Equal(gotPaths, remainingWant[i]) { - remainingWant[i] = remainingWant[len(remainingWant)-1] - remainingWant = remainingWant[:len(remainingWant)-1] - pass = true - break + if !pass { + t.Errorf("%q:%v - path:\n%v\nwant one of:\n%v", method, n, gotPaths, remainingWant) } } - if !pass { - t.Errorf("%q:%v - path:\n%v\nwant one of:\n%v", method, n, gotPaths, remainingWant) - } } - } - }) + }) + } } -} +*/ func newDurationP(d time.Duration) *time.Duration { return &d From b6045a6a3d4804d9f3d6af7c237105a85f2edbbb Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Thu, 30 Oct 2025 19:05:56 +0000 Subject: [PATCH 2/5] add override suffix to function arguments --- internal/xds/resolver/serviceconfig.go | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/internal/xds/resolver/serviceconfig.go b/internal/xds/resolver/serviceconfig.go index 9ba9a4454f32..bacb1ebfd66f 100644 --- a/internal/xds/resolver/serviceconfig.go +++ b/internal/xds/resolver/serviceconfig.go @@ -324,22 +324,21 @@ func (cs *configSelector) stop() { } } -// newInterceptor builds a chain of client interceptors for the given filters. -// The filter config override maps contain overrides from the route, cluster, -// and virtual host respectively. The cluster override has the highest priority, +// newInterceptor builds a chain of client interceptors for the given filters +// and override configuration. The cluster override has the highest priority, // followed by the route override, and finally the virtual host override. -func newInterceptor(filters []xdsresource.HTTPFilter, cluster, route, virtualHost map[string]httpfilter.FilterConfig) (iresolver.ClientInterceptor, error) { +func newInterceptor(filters []xdsresource.HTTPFilter, clusterOverride, routeOverride, virtualHostOverride map[string]httpfilter.FilterConfig) (iresolver.ClientInterceptor, error) { if len(filters) == 0 { return nil, nil } interceptors := make([]iresolver.ClientInterceptor, 0, len(filters)) for _, filter := range filters { - override := cluster[filter.Name] + override := clusterOverride[filter.Name] if override == nil { - override = route[filter.Name] + override = routeOverride[filter.Name] } if override == nil { - override = virtualHost[filter.Name] + override = virtualHostOverride[filter.Name] } ib, ok := filter.Filter.(httpfilter.ClientInterceptorBuilder) if !ok { From c67b5eb379ceb7ab4db78b8d2ca20d439cbe148e Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Thu, 30 Oct 2025 19:10:50 +0000 Subject: [PATCH 3/5] return error instead of panic --- internal/xds/resolver/serviceconfig.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/xds/resolver/serviceconfig.go b/internal/xds/resolver/serviceconfig.go index bacb1ebfd66f..3754b14dd3e4 100644 --- a/internal/xds/resolver/serviceconfig.go +++ b/internal/xds/resolver/serviceconfig.go @@ -343,7 +343,7 @@ func newInterceptor(filters []xdsresource.HTTPFilter, clusterOverride, routeOver ib, ok := filter.Filter.(httpfilter.ClientInterceptorBuilder) if !ok { // Should not happen if it passed xdsClient validation. - panic(fmt.Sprintf("filter %q does not support use in client", filter.Name)) + return nil, fmt.Errorf("filter %q does not support use in client", filter.Name) } i, err := ib.BuildClientInterceptor(filter.Config, override) if err != nil { From 7a8992c1f8d552ba9f19124a88b951cd6767ae5a Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Thu, 30 Oct 2025 19:27:06 +0000 Subject: [PATCH 4/5] a bunch of test improvements --- .../xds/resolver/xds_http_filters_test.go | 35 +- internal/xds/resolver/xds_resolver_test.go | 393 ------------------ 2 files changed, 8 insertions(+), 420 deletions(-) diff --git a/internal/xds/resolver/xds_http_filters_test.go b/internal/xds/resolver/xds_http_filters_test.go index 928ba34f972d..9d2c5acf0b43 100644 --- a/internal/xds/resolver/xds_http_filters_test.go +++ b/internal/xds/resolver/xds_http_filters_test.go @@ -68,7 +68,7 @@ const ( type testFilterCfg struct { httpfilter.FilterConfig path string - newStreamErr error + newStreamErr string } // filterConfigFromProto parses filter config specified as a v3.TypedStruct into @@ -87,11 +87,7 @@ func filterConfigFromProto(cfg proto.Message) (httpfilter.FilterConfig, error) { ret.path = v.GetStringValue() } if v := ts.GetValue().GetFields()[filterCfgErrorFieldName]; v != nil { - if v.GetStringValue() == "" { - ret.newStreamErr = nil - } else { - ret.newStreamErr = fmt.Errorf("%s", v.GetStringValue()) - } + ret.newStreamErr = v.GetStringValue() } return ret, nil } @@ -126,13 +122,15 @@ func (*testHTTPFilterWithRPCMetadata) ParseFilterConfigOverride(override proto.M func (*testHTTPFilterWithRPCMetadata) IsTerminal() bool { return false } +// ClientInterceptorBuilder is an optional interface for filters to implement. +// This compile time check ensures the test filter implements it. var _ httpfilter.ClientInterceptorBuilder = &testHTTPFilterWithRPCMetadata{} func (fb *testHTTPFilterWithRPCMetadata) BuildClientInterceptor(config, override httpfilter.FilterConfig) (iresolver.ClientInterceptor, error) { fb.logger.Logf("BuildClientInterceptor called with config: %+v, override: %+v", config, override) if config == nil { - panic("unexpected missing config") + return nil, fmt.Errorf("unexpected missing config") } baseCfg := config.(testFilterCfg) @@ -143,21 +141,17 @@ func (fb *testHTTPFilterWithRPCMetadata) BuildClientInterceptor(config, override if override != nil { overrideCfg := override.(testFilterCfg) overridePath = overrideCfg.path - if overrideCfg.newStreamErr != nil { + if overrideCfg.newStreamErr != "" { newStreamErr = overrideCfg.newStreamErr } } - var errStr string - if newStreamErr != nil { - errStr = newStreamErr.Error() - } return &testFilterInterceptor{ logger: fb.logger, cfg: overallFilterConfig{ BasePath: basePath, OverridePath: overridePath, - Error: errStr, + Error: newStreamErr, }, newStreamChan: fb.newStreamChan, }, nil @@ -200,20 +194,7 @@ func (fi *testFilterInterceptor) NewStream(ctx context.Context, _ iresolver.RPCI cfg := string(bytes) fi.logger.Logf("Injecting filter config metadata: %v", cfg) - cs, err := newStream(metadata.AppendToOutgoingContext(ctx, filterCfgMetadataKey, fmt.Sprintf("%v", cfg)), done) - if err != nil { - return nil, err - } - return &clientStream{ClientStream: cs, ctx: ctx}, nil -} - -type clientStream struct { - iresolver.ClientStream - ctx context.Context -} - -func (cs *clientStream) Context() context.Context { - return cs.ctx + return newStream(metadata.AppendToOutgoingContext(ctx, filterCfgMetadataKey, fmt.Sprintf("%v", cfg)), done) } func newHTTPFilter(t *testing.T, name, typeURL, path, err string) *v3httppb.HttpFilter { diff --git a/internal/xds/resolver/xds_resolver_test.go b/internal/xds/resolver/xds_resolver_test.go index 09e82d6c66a1..eaf2252fd65b 100644 --- a/internal/xds/resolver/xds_resolver_test.go +++ b/internal/xds/resolver/xds_resolver_test.go @@ -1240,399 +1240,6 @@ func (s) TestConfigSelector_FailureCases(t *testing.T) { } } -/* -const filterCfgPathFieldName = "path" -const filterCfgErrorFieldName = "new_stream_error" - - type filterCfg struct { - httpfilter.FilterConfig - path string - newStreamErr error - } - - type filterBuilder struct { - paths []string - typeURL string - } - -func (fb *filterBuilder) TypeURLs() []string { return []string{fb.typeURL} } - - func filterConfigFromProto(cfg proto.Message) (httpfilter.FilterConfig, error) { - ts, ok := cfg.(*v3xdsxdstypepb.TypedStruct) - if !ok { - return nil, fmt.Errorf("unsupported filter config type: %T, want %T", cfg, &v3xdsxdstypepb.TypedStruct{}) - } - - if ts.GetValue() == nil { - return filterCfg{}, nil - } - ret := filterCfg{} - if v := ts.GetValue().GetFields()[filterCfgPathFieldName]; v != nil { - ret.path = v.GetStringValue() - } - if v := ts.GetValue().GetFields()[filterCfgErrorFieldName]; v != nil { - if v.GetStringValue() == "" { - ret.newStreamErr = nil - } else { - ret.newStreamErr = fmt.Errorf("%s", v.GetStringValue()) - } - } - return ret, nil - } - - func (*filterBuilder) ParseFilterConfig(cfg proto.Message) (httpfilter.FilterConfig, error) { - return filterConfigFromProto(cfg) - } - - func (*filterBuilder) ParseFilterConfigOverride(override proto.Message) (httpfilter.FilterConfig, error) { - return filterConfigFromProto(override) - } - -func (*filterBuilder) IsTerminal() bool { return false } - -var _ httpfilter.ClientInterceptorBuilder = &filterBuilder{} - - func (fb *filterBuilder) BuildClientInterceptor(config, override httpfilter.FilterConfig) (iresolver.ClientInterceptor, error) { - if config == nil { - panic("unexpected missing config") - } - - fi := &filterInterceptor{ - parent: fb, - } - - fb.paths = append(fb.paths, "build:"+config.(filterCfg).path) - err := config.(filterCfg).newStreamErr - if override != nil { - fb.paths = append(fb.paths, "override:"+override.(filterCfg).path) - err = override.(filterCfg).newStreamErr - } - - fi.cfgPath = config.(filterCfg).path - fi.err = err - return fi, nil - } - - type filterInterceptor struct { - parent *filterBuilder - cfgPath string - err error - } - - func (fi *filterInterceptor) NewStream(ctx context.Context, _ iresolver.RPCInfo, done func(), newStream func(ctx context.Context, done func()) (iresolver.ClientStream, error)) (iresolver.ClientStream, error) { - fi.parent.paths = append(fi.parent.paths, "newstream:"+fi.cfgPath) - if fi.err != nil { - return nil, fi.err - } - d := func() { - fi.parent.paths = append(fi.parent.paths, "done:"+fi.cfgPath) - done() - } - cs, err := newStream(ctx, d) - if err != nil { - return nil, err - } - return &clientStream{ClientStream: cs}, nil - } - - type clientStream struct { - iresolver.ClientStream - } - - func newHTTPFilter(t *testing.T, name, typeURL, path, err string) *v3httppb.HttpFilter { - return &v3httppb.HttpFilter{ - Name: name, - ConfigType: &v3httppb.HttpFilter_TypedConfig{ - TypedConfig: testutils.MarshalAny(t, &v3xdsxdstypepb.TypedStruct{ - TypeUrl: typeURL, - Value: &structpb.Struct{ - Fields: map[string]*structpb.Value{ - filterCfgPathFieldName: {Kind: &structpb.Value_StringValue{StringValue: path}}, - filterCfgErrorFieldName: {Kind: &structpb.Value_StringValue{StringValue: err}}, - }, - }, - }), - }, - } - } - - func (s) TestXDSResolverHTTPFilters(t *testing.T) { - const methodName1 = "1" - const methodName2 = "2" - testFilterName := t.Name() - - testCases := []struct { - name string - listener *v3listenerpb.Listener - rpcRes map[string][][]string - wantStreamErr string - }{ - { - name: "NewStream error - ensure earlier interceptor Done is still called", - listener: &v3listenerpb.Listener{ - Name: defaultTestServiceName, - ApiListener: &v3listenerpb.ApiListener{ - ApiListener: testutils.MarshalAny(t, &v3httppb.HttpConnectionManager{ - RouteSpecifier: &v3httppb.HttpConnectionManager_RouteConfig{ - RouteConfig: &v3routepb.RouteConfiguration{ - Name: defaultTestRouteConfigName, - VirtualHosts: []*v3routepb.VirtualHost{{ - Domains: []string{defaultTestServiceName}, - Routes: []*v3routepb.Route{{ - Match: &v3routepb.RouteMatch{ - PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: methodName1}, - }, - Action: &v3routepb.Route_Route{ - Route: &v3routepb.RouteAction{ - ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{ - WeightedClusters: &v3routepb.WeightedCluster{ - Clusters: []*v3routepb.WeightedCluster_ClusterWeight{ - {Name: "A", Weight: wrapperspb.UInt32(1)}, - {Name: "B", Weight: wrapperspb.UInt32(1)}, - }, - }, - }, - }, - }, - }}, - }}, - }}, - HttpFilters: []*v3httppb.HttpFilter{ - newHTTPFilter(t, "foo", testFilterName, "foo1", ""), - newHTTPFilter(t, "bar", testFilterName, "bar1", "bar newstream err"), - e2e.RouterHTTPFilter, - }, - }), - }, - }, - rpcRes: map[string][][]string{ - methodName1: { - {"build:foo1", "build:bar1", "newstream:foo1", "newstream:bar1", "done:foo1"}, // err in bar1 NewStream() - }, - }, - wantStreamErr: "bar newstream err", - }, - { - name: "all overrides", - listener: &v3listenerpb.Listener{ - Name: defaultTestServiceName, - ApiListener: &v3listenerpb.ApiListener{ - ApiListener: testutils.MarshalAny(t, &v3httppb.HttpConnectionManager{ - RouteSpecifier: &v3httppb.HttpConnectionManager_RouteConfig{ - RouteConfig: &v3routepb.RouteConfiguration{ - Name: defaultTestRouteConfigName, - VirtualHosts: []*v3routepb.VirtualHost{{ - Domains: []string{defaultTestServiceName}, - Routes: []*v3routepb.Route{ - { - Match: &v3routepb.RouteMatch{ - PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: methodName1}, - }, - Action: &v3routepb.Route_Route{ - Route: &v3routepb.RouteAction{ - ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{ - WeightedClusters: &v3routepb.WeightedCluster{ - Clusters: []*v3routepb.WeightedCluster_ClusterWeight{ - {Name: "A", Weight: wrapperspb.UInt32(1)}, - {Name: "B", Weight: wrapperspb.UInt32(1)}, - }, - }, - }, - }, - }, - }, - { - Match: &v3routepb.RouteMatch{ - PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: methodName2}, - }, - Action: &v3routepb.Route_Route{ - Route: &v3routepb.RouteAction{ - ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{ - WeightedClusters: &v3routepb.WeightedCluster{ - Clusters: []*v3routepb.WeightedCluster_ClusterWeight{ - {Name: "A", Weight: wrapperspb.UInt32(1)}, - { - Name: "B", - Weight: wrapperspb.UInt32(1), - TypedPerFilterConfig: map[string]*anypb.Any{ - "foo": testutils.MarshalAny(t, &v3xdsxdstypepb.TypedStruct{ - TypeUrl: testFilterName, - Value: &structpb.Struct{ - Fields: map[string]*structpb.Value{ - filterCfgPathFieldName: {Kind: &structpb.Value_StringValue{StringValue: "foo4"}}, - }, - }, - }), - "bar": testutils.MarshalAny(t, &v3xdsxdstypepb.TypedStruct{ - TypeUrl: testFilterName, - Value: &structpb.Struct{ - Fields: map[string]*structpb.Value{ - filterCfgPathFieldName: {Kind: &structpb.Value_StringValue{StringValue: "bar4"}}, - }, - }, - }), - }, - }, - }, - }, - }, - }, - }, - TypedPerFilterConfig: map[string]*anypb.Any{ - "foo": testutils.MarshalAny(t, &v3xdsxdstypepb.TypedStruct{ - TypeUrl: testFilterName, - Value: &structpb.Struct{ - Fields: map[string]*structpb.Value{ - filterCfgPathFieldName: {Kind: &structpb.Value_StringValue{StringValue: "foo3"}}, - filterCfgErrorFieldName: {Kind: &structpb.Value_StringValue{StringValue: ""}}, - }, - }, - }), - "bar": testutils.MarshalAny(t, &v3xdsxdstypepb.TypedStruct{ - TypeUrl: testFilterName, - Value: &structpb.Struct{ - Fields: map[string]*structpb.Value{ - filterCfgPathFieldName: {Kind: &structpb.Value_StringValue{StringValue: "bar3"}}, - }, - }, - }), - }, - }, - }, - TypedPerFilterConfig: map[string]*anypb.Any{ - "foo": testutils.MarshalAny(t, &v3xdsxdstypepb.TypedStruct{ - TypeUrl: testFilterName, - Value: &structpb.Struct{ - Fields: map[string]*structpb.Value{ - filterCfgPathFieldName: {Kind: &structpb.Value_StringValue{StringValue: "foo2"}}, - filterCfgErrorFieldName: {Kind: &structpb.Value_StringValue{StringValue: ""}}, - }, - }, - }), - "bar": testutils.MarshalAny(t, &v3xdsxdstypepb.TypedStruct{ - TypeUrl: testFilterName, - Value: &structpb.Struct{ - Fields: map[string]*structpb.Value{ - filterCfgPathFieldName: {Kind: &structpb.Value_StringValue{StringValue: "bar2"}}, - }, - }, - }), - }, - }}, - }}, - HttpFilters: []*v3httppb.HttpFilter{ - newHTTPFilter(t, "foo", testFilterName, "foo1", "this is overridden to nil"), - newHTTPFilter(t, "bar", testFilterName, "bar1", ""), - e2e.RouterHTTPFilter, - }, - }), - }, - }, - rpcRes: map[string][][]string{ - methodName1: { - {"build:foo1", "override:foo2", "build:bar1", "override:bar2", "newstream:foo1", "newstream:bar1", "done:bar1", "done:foo1"}, - {"build:foo1", "override:foo2", "build:bar1", "override:bar2", "newstream:foo1", "newstream:bar1", "done:bar1", "done:foo1"}, - }, - methodName2: { - {"build:foo1", "override:foo3", "build:bar1", "override:bar3", "newstream:foo1", "newstream:bar1", "done:bar1", "done:foo1"}, - {"build:foo1", "override:foo4", "build:bar1", "override:bar4", "newstream:foo1", "newstream:bar1", "done:bar1", "done:foo1"}, - {"build:foo1", "override:foo3", "build:bar1", "override:bar3", "newstream:foo1", "newstream:bar1", "done:bar1", "done:foo1"}, - {"build:foo1", "override:foo4", "build:bar1", "override:bar4", "newstream:foo1", "newstream:bar1", "done:bar1", "done:foo1"}, - }, - }, - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - origNewWRR := rinternal.NewWRR - rinternal.NewWRR = testutils.NewTestWRR - defer func() { rinternal.NewWRR = origNewWRR }() - - // Register a custom httpFilter builder for the test. - fb := &filterBuilder{typeURL: testFilterName} - httpfilter.Register(fb) - defer httpfilter.UnregisterForTesting(fb.typeURL) - - // Spin up an xDS management server. - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - nodeID := uuid.New().String() - mgmtServer, _, _, bc := setupManagementServerForTest(t, nodeID) - - // Build an xDS resolver. - stateCh, _, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + defaultTestServiceName)}, bc) - - cluster := []*v3clusterpb.Cluster{ - e2e.DefaultCluster("A", "endpoint_A", e2e.SecurityLevelNone), - e2e.DefaultCluster("B", "endpoint_B", e2e.SecurityLevelNone), - } - endpoints := []*v3endpointpb.ClusterLoadAssignment{ - e2e.DefaultEndpoint("endpoint_A", defaultTestHostname, defaultTestPort), - e2e.DefaultEndpoint("endpoint_B", defaultTestHostname, defaultTestPort), - } - // Update the management server with a listener resource that - // contains an inline route configuration. - configureAllResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, []*v3listenerpb.Listener{tc.listener}, nil, cluster, endpoints) - - // Ensure that the resolver pushes a state update to the channel. - cs := verifyUpdateFromResolver(ctx, t, stateCh, "") - - for method, wants := range tc.rpcRes { - // Order of wants is non-deterministic. - remainingWant := make([][]string, len(wants)) - copy(remainingWant, wants) - for n := range wants { - res, err := cs.SelectConfig(iresolver.RPCInfo{Method: method, Context: ctx}) - if err != nil { - t.Fatalf("Unexpected error from cs.SelectConfig(_): %v", err) - } - - var doneFunc func() - _, err = res.Interceptor.NewStream(ctx, iresolver.RPCInfo{}, func() {}, func(_ context.Context, done func()) (iresolver.ClientStream, error) { - doneFunc = done - return nil, nil - }) - if tc.wantStreamErr != "" { - if err == nil || !strings.Contains(err.Error(), tc.wantStreamErr) { - t.Errorf("NewStream(...) = _, %v; want _, Contains(%v)", err, tc.wantStreamErr) - } - if err == nil { - res.OnCommitted() - doneFunc() - } - continue - } - if err != nil { - t.Fatalf("unexpected error from Interceptor.NewStream: %v", err) - - } - res.OnCommitted() - doneFunc() - - gotPaths := fb.paths - fb.paths = []string{} - - // Confirm the desired path is found in remainingWant, and remove it. - pass := false - for i := range remainingWant { - if cmp.Equal(gotPaths, remainingWant[i]) { - remainingWant[i] = remainingWant[len(remainingWant)-1] - remainingWant = remainingWant[:len(remainingWant)-1] - pass = true - break - } - } - if !pass { - t.Errorf("%q:%v - path:\n%v\nwant one of:\n%v", method, n, gotPaths, remainingWant) - } - } - } - }) - } - } -*/ - func newDurationP(d time.Duration) *time.Duration { return &d } From 226e890ab5bef8b397bd84a5f686e2a62ca7045c Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Thu, 30 Oct 2025 19:45:59 +0000 Subject: [PATCH 5/5] return early when configselector build fails --- internal/xds/resolver/xds_resolver.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/internal/xds/resolver/xds_resolver.go b/internal/xds/resolver/xds_resolver.go index e867f0cc53d2..6c1bd4ead1e2 100644 --- a/internal/xds/resolver/xds_resolver.go +++ b/internal/xds/resolver/xds_resolver.go @@ -448,7 +448,9 @@ func (r *xdsResolver) onResolutionComplete() { cs, err := r.newConfigSelector() if err != nil { + // Send an erroring config selector in this case that fails RPCs. r.onResourceError(fmt.Errorf("xds: failed to create config selector: %v", err)) + return } if !r.sendNewServiceConfig(cs) { // Channel didn't like the update we provided (unexpected); erase