diff --git a/internal/internal.go b/internal/internal.go index 2f463d29ecd3..b3ee00f58220 100644 --- a/internal/internal.go +++ b/internal/internal.go @@ -64,6 +64,9 @@ var ( // gRPC server. An xDS-enabled server needs to know what type of credentials // is configured on the underlying gRPC server. This is set by server.go. GetServerCredentials any // func (*grpc.Server) credentials.TransportCredentials + // MetricsRecorderForServer returns the MetricsRecorderList derived from a + // server's stats handlers. + MetricsRecorderForServer any // func (*grpc.Server) estats.MetricsRecorder // CanonicalString returns the canonical string of the code defined here: // https://github.com/grpc/grpc/blob/master/doc/statuscodes.md. // diff --git a/resolver/resolver.go b/resolver/resolver.go index 8eb1cf3bcfaf..b84ef26d46d1 100644 --- a/resolver/resolver.go +++ b/resolver/resolver.go @@ -30,6 +30,7 @@ import ( "google.golang.org/grpc/attributes" "google.golang.org/grpc/credentials" + "google.golang.org/grpc/experimental/stats" "google.golang.org/grpc/internal" "google.golang.org/grpc/serviceconfig" ) @@ -175,6 +176,8 @@ type BuildOptions struct { // Authority is the effective authority of the clientconn for which the // resolver is built. Authority string + // MetricsRecorder is the metrics recorder to do recording. + MetricsRecorder stats.MetricsRecorder } // An Endpoint is one network endpoint, or server, which may have multiple diff --git a/resolver_wrapper.go b/resolver_wrapper.go index 9f5e12fc03c9..945e24ff83ae 100644 --- a/resolver_wrapper.go +++ b/resolver_wrapper.go @@ -77,6 +77,7 @@ func (ccr *ccResolverWrapper) start() error { CredsBundle: ccr.cc.dopts.copts.CredsBundle, Dialer: ccr.cc.dopts.copts.Dialer, Authority: ccr.cc.authority, + MetricsRecorder: ccr.cc.metricsRecorderList, } var err error // The delegating resolver is used unless: diff --git a/server.go b/server.go index 1e7701975e08..976e70ae068e 100644 --- a/server.go +++ b/server.go @@ -37,12 +37,14 @@ import ( "google.golang.org/grpc/credentials" "google.golang.org/grpc/encoding" "google.golang.org/grpc/encoding/proto" + estats "google.golang.org/grpc/experimental/stats" "google.golang.org/grpc/grpclog" "google.golang.org/grpc/internal" "google.golang.org/grpc/internal/binarylog" "google.golang.org/grpc/internal/channelz" "google.golang.org/grpc/internal/grpcsync" "google.golang.org/grpc/internal/grpcutil" + istats "google.golang.org/grpc/internal/stats" "google.golang.org/grpc/internal/transport" "google.golang.org/grpc/keepalive" "google.golang.org/grpc/mem" @@ -82,6 +84,9 @@ func init() { internal.BinaryLogger = binaryLogger internal.JoinServerOptions = newJoinServerOption internal.BufferPool = bufferPool + internal.MetricsRecorderForServer = func(srv *Server) estats.MetricsRecorder { + return istats.NewMetricsRecorderList(srv.opts.statsHandlers) + } } var statusOK = status.New(codes.OK, "") diff --git a/xds/internal/resolver/internal/internal.go b/xds/internal/resolver/internal/internal.go index d9c23278281f..f22caffc1969 100644 --- a/xds/internal/resolver/internal/internal.go +++ b/xds/internal/resolver/internal/internal.go @@ -26,5 +26,5 @@ var ( NewWRR any // func() wrr.WRR // NewXDSClient is the function used to create a new xDS client. - NewXDSClient any // func(string) (xdsclient.XDSClient, func(), error) + NewXDSClient any // func(string, estats.MetricsRecorder) (xdsclient.XDSClient, func(), error) ) diff --git a/xds/internal/resolver/xds_resolver.go b/xds/internal/resolver/xds_resolver.go index d6e4de39adfd..5029a338661f 100644 --- a/xds/internal/resolver/xds_resolver.go +++ b/xds/internal/resolver/xds_resolver.go @@ -25,6 +25,7 @@ import ( rand "math/rand/v2" "sync/atomic" + estats "google.golang.org/grpc/experimental/stats" "google.golang.org/grpc/internal" "google.golang.org/grpc/internal/grpclog" "google.golang.org/grpc/internal/grpcsync" @@ -50,13 +51,16 @@ const Scheme = "xds" // the provided config and a new xDS client in that pool. func newBuilderWithConfigForTesting(config []byte) (resolver.Builder, error) { return &xdsResolverBuilder{ - newXDSClient: func(name string) (xdsclient.XDSClient, func(), error) { + newXDSClient: func(name string, mr estats.MetricsRecorder) (xdsclient.XDSClient, func(), error) { config, err := bootstrap.NewConfigFromContents(config) if err != nil { return nil, nil, err } pool := xdsclient.NewPool(config) - return pool.NewClientForTesting(xdsclient.OptionsForTesting{Name: name}) + return pool.NewClientForTesting(xdsclient.OptionsForTesting{ + Name: name, + MetricsRecorder: mr, + }) }, }, nil } @@ -66,8 +70,11 @@ func newBuilderWithConfigForTesting(config []byte) (resolver.Builder, error) { // specific xds client pool being used. func newBuilderWithPoolForTesting(pool *xdsclient.Pool) (resolver.Builder, error) { return &xdsResolverBuilder{ - newXDSClient: func(name string) (xdsclient.XDSClient, func(), error) { - return pool.NewClientForTesting(xdsclient.OptionsForTesting{Name: name}) + newXDSClient: func(name string, mr estats.MetricsRecorder) (xdsclient.XDSClient, func(), error) { + return pool.NewClientForTesting(xdsclient.OptionsForTesting{ + Name: name, + MetricsRecorder: mr, + }) }, }, nil } @@ -82,7 +89,7 @@ func init() { } type xdsResolverBuilder struct { - newXDSClient func(string) (xdsclient.XDSClient, func(), error) + newXDSClient func(string, estats.MetricsRecorder) (xdsclient.XDSClient, func(), error) } // Build helps implement the resolver.Builder interface. @@ -115,11 +122,11 @@ func (b *xdsResolverBuilder) Build(target resolver.Target, cc resolver.ClientCon r.serializerCancel = cancel // Initialize the xDS client. - newXDSClient := rinternal.NewXDSClient.(func(string) (xdsclient.XDSClient, func(), error)) + newXDSClient := rinternal.NewXDSClient.(func(string, estats.MetricsRecorder) (xdsclient.XDSClient, func(), error)) if b.newXDSClient != nil { newXDSClient = b.newXDSClient } - client, closeFn, err := newXDSClient(target.String()) + client, closeFn, err := newXDSClient(target.String(), opts.MetricsRecorder) if err != nil { return nil, fmt.Errorf("xds: failed to create xds-client: %v", err) } diff --git a/xds/internal/resolver/xds_resolver_test.go b/xds/internal/resolver/xds_resolver_test.go index 6f723e617cb8..56044b7fb5a0 100644 --- a/xds/internal/resolver/xds_resolver_test.go +++ b/xds/internal/resolver/xds_resolver_test.go @@ -32,6 +32,7 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/uuid" "google.golang.org/grpc/codes" + estats "google.golang.org/grpc/experimental/stats" "google.golang.org/grpc/internal" iresolver "google.golang.org/grpc/internal/resolver" "google.golang.org/grpc/internal/testutils" @@ -257,7 +258,7 @@ func (s) TestResolverCloseClosesXDSClient(t *testing.T) { // client is closed. origNewClient := rinternal.NewXDSClient closeCh := make(chan struct{}) - rinternal.NewXDSClient = func(string) (xdsclient.XDSClient, func(), error) { + rinternal.NewXDSClient = func(string, estats.MetricsRecorder) (xdsclient.XDSClient, func(), error) { bc := e2e.DefaultBootstrapContents(t, uuid.New().String(), "dummy-management-server-address") config, err := bootstrap.NewConfigFromContents(bc) if err != nil { diff --git a/xds/internal/xdsclient/authority.go b/xds/internal/xdsclient/authority.go index 67b3d7693808..17940c0e9f7f 100644 --- a/xds/internal/xdsclient/authority.go +++ b/xds/internal/xdsclient/authority.go @@ -23,6 +23,7 @@ import ( "sync" "sync/atomic" + "google.golang.org/grpc/experimental/stats" "google.golang.org/grpc/grpclog" igrpclog "google.golang.org/grpc/internal/grpclog" "google.golang.org/grpc/internal/grpcsync" @@ -87,6 +88,8 @@ type authority struct { xdsClientSerializer *grpcsync.CallbackSerializer // Serializer to run call ins from the xDS client, owned by this authority. xdsClientSerializerClose func() // Function to close the above serializer. logger *igrpclog.PrefixLogger // Logger for this authority. + target string // The gRPC Channel target. + metricsRecorder stats.MetricsRecorder // The metrics recorder used for emitting metrics. // The below defined fields must only be accessed in the context of the // serializer callback, owned by this authority. @@ -120,6 +123,8 @@ type authorityBuildOptions struct { serializer *grpcsync.CallbackSerializer // Callback serializer for invoking watch callbacks getChannelForADS xdsChannelForADS // Function to acquire a reference to an xdsChannel logPrefix string // Prefix for logging + target string // Target for the gRPC Channel that owns xDS Client/Authority + metricsRecorder stats.MetricsRecorder // metricsRecorder to emit metrics } // newAuthority creates a new authority instance with the provided @@ -143,6 +148,8 @@ func newAuthority(args authorityBuildOptions) *authority { xdsClientSerializerClose: cancel, logger: igrpclog.NewPrefixLogger(l, logPrefix), resources: make(map[xdsresource.Type]map[string]*resourceState), + target: args.target, + metricsRecorder: args.metricsRecorder, } // Create an ordered list of xdsChannels with their server configs. The @@ -358,6 +365,7 @@ func (a *authority) handleADSResourceUpdate(serverConfig *bootstrap.ServerConfig // On error, keep previous version of the resource. But update status // and error. if uErr.Err != nil { + xdsClientResourceUpdatesInvalidMetric.Record(a.metricsRecorder, 1, a.target, serverConfig.ServerURI(), rType.TypeName()) state.md.ErrState = md.ErrState state.md.Status = md.Status for watcher := range state.watchers { @@ -369,6 +377,8 @@ func (a *authority) handleADSResourceUpdate(serverConfig *bootstrap.ServerConfig continue } + xdsClientResourceUpdatesValidMetric.Record(a.metricsRecorder, 1, a.target, serverConfig.ServerURI(), rType.TypeName()) + if state.deletionIgnored { state.deletionIgnored = false a.logger.Infof("A valid update was received for resource %q of type %q after previously ignoring a deletion", name, rType.TypeName()) diff --git a/xds/internal/xdsclient/client_refcounted_test.go b/xds/internal/xdsclient/client_refcounted_test.go index 46c75b7d014e..ba2331c0c3bb 100644 --- a/xds/internal/xdsclient/client_refcounted_test.go +++ b/xds/internal/xdsclient/client_refcounted_test.go @@ -25,6 +25,7 @@ import ( "github.com/google/uuid" "google.golang.org/grpc/internal/testutils" + "google.golang.org/grpc/internal/testutils/stats" "google.golang.org/grpc/internal/testutils/xds/e2e" "google.golang.org/grpc/internal/xds/bootstrap" ) @@ -60,7 +61,7 @@ func (s) TestClientNew_Single(t *testing.T) { defer func() { xdsClientImplCloseHook = origClientImplCloseHook }() // The first call to New() should create a new client. - _, closeFunc, err := pool.NewClient(t.Name()) + _, closeFunc, err := pool.NewClient(t.Name(), &stats.NoopMetricsRecorder{}) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } @@ -76,7 +77,7 @@ func (s) TestClientNew_Single(t *testing.T) { closeFuncs := make([]func(), count) for i := 0; i < count; i++ { func() { - _, closeFuncs[i], err = pool.NewClient(t.Name()) + _, closeFuncs[i], err = pool.NewClient(t.Name(), &stats.NoopMetricsRecorder{}) if err != nil { t.Fatalf("%d-th call to New() failed with error: %v", i, err) } @@ -114,7 +115,7 @@ func (s) TestClientNew_Single(t *testing.T) { // Calling New() again, after the previous Client was actually closed, // should create a new one. - _, closeFunc, err = pool.NewClient(t.Name()) + _, closeFunc, err = pool.NewClient(t.Name(), &stats.NoopMetricsRecorder{}) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } @@ -156,7 +157,7 @@ func (s) TestClientNew_Multiple(t *testing.T) { // Create two xDS clients. client1Name := t.Name() + "-1" - _, closeFunc1, err := pool.NewClient(client1Name) + _, closeFunc1, err := pool.NewClient(client1Name, &stats.NoopMetricsRecorder{}) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } @@ -171,7 +172,7 @@ func (s) TestClientNew_Multiple(t *testing.T) { } client2Name := t.Name() + "-2" - _, closeFunc2, err := pool.NewClient(client2Name) + _, closeFunc2, err := pool.NewClient(client2Name, &stats.NoopMetricsRecorder{}) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } @@ -193,7 +194,7 @@ func (s) TestClientNew_Multiple(t *testing.T) { defer wg.Done() for i := 0; i < count; i++ { var err error - _, closeFuncs1[i], err = pool.NewClient(client1Name) + _, closeFuncs1[i], err = pool.NewClient(client1Name, &stats.NoopMetricsRecorder{}) if err != nil { t.Errorf("%d-th call to New() failed with error: %v", i, err) } @@ -203,7 +204,7 @@ func (s) TestClientNew_Multiple(t *testing.T) { defer wg.Done() for i := 0; i < count; i++ { var err error - _, closeFuncs2[i], err = pool.NewClient(client2Name) + _, closeFuncs2[i], err = pool.NewClient(client2Name, &stats.NoopMetricsRecorder{}) if err != nil { t.Errorf("%d-th call to New() failed with error: %v", i, err) } diff --git a/xds/internal/xdsclient/clientimpl.go b/xds/internal/xdsclient/clientimpl.go index b19d9dd2e102..c30c2b45b6b0 100644 --- a/xds/internal/xdsclient/clientimpl.go +++ b/xds/internal/xdsclient/clientimpl.go @@ -27,6 +27,7 @@ import ( "time" v3statuspb "github.com/envoyproxy/go-control-plane/envoy/service/status/v3" + estats "google.golang.org/grpc/experimental/stats" "google.golang.org/grpc/internal" "google.golang.org/grpc/internal/backoff" "google.golang.org/grpc/internal/grpclog" @@ -60,6 +61,21 @@ var ( xdsClientImplCloseHook = func(string) {} defaultExponentialBackoff = backoff.DefaultExponential.Backoff + + xdsClientResourceUpdatesValidMetric = estats.RegisterInt64Count(estats.MetricDescriptor{ + Name: "grpc.xds_client.resource_updates_valid", + Description: "A counter of resources received that were considered valid. The counter will be incremented even for resources that have not changed.", + Unit: "resource", + Labels: []string{"grpc.target", "grpc.xds.server", "grpc.xds.resource_type"}, + Default: false, + }) + xdsClientResourceUpdatesInvalidMetric = estats.RegisterInt64Count(estats.MetricDescriptor{ + Name: "grpc.xds_client.resource_updates_invalid", + Description: "A counter of resources received that were considered invalid.", + Unit: "resource", + Labels: []string{"grpc.target", "grpc.xds.server", "grpc.xds.resource_type"}, + Default: false, + }) ) // clientImpl is the real implementation of the xDS client. The exported Client @@ -78,6 +94,8 @@ type clientImpl struct { serializer *grpcsync.CallbackSerializer // Serializer for invoking resource watcher callbacks. serializerClose func() // Function to close the serializer. logger *grpclog.PrefixLogger // Logger for this client. + metricsRecorder estats.MetricsRecorder // Metrics recorder for metrics. + target string // The gRPC target for this client. // The clientImpl owns a bunch of channels to individual xDS servers // specified in the bootstrap configuration. Authorities acquire references @@ -111,9 +129,11 @@ func init() { } // newClientImpl returns a new xdsClient with the given config. -func newClientImpl(config *bootstrap.Config, watchExpiryTimeout time.Duration, streamBackoff func(int) time.Duration) (*clientImpl, error) { +func newClientImpl(config *bootstrap.Config, watchExpiryTimeout time.Duration, streamBackoff func(int) time.Duration, mr estats.MetricsRecorder, target string) (*clientImpl, error) { ctx, cancel := context.WithCancel(context.Background()) c := &clientImpl{ + metricsRecorder: mr, + target: target, done: grpcsync.NewEvent(), authorities: make(map[string]*authority), config: config, @@ -139,6 +159,8 @@ func newClientImpl(config *bootstrap.Config, watchExpiryTimeout time.Duration, s serializer: c.serializer, getChannelForADS: c.getChannelForADS, logPrefix: clientPrefix(c), + target: target, + metricsRecorder: c.metricsRecorder, }) } c.topLevelAuthority = newAuthority(authorityBuildOptions{ @@ -147,6 +169,8 @@ func newClientImpl(config *bootstrap.Config, watchExpiryTimeout time.Duration, s serializer: c.serializer, getChannelForADS: c.getChannelForADS, logPrefix: clientPrefix(c), + target: target, + metricsRecorder: c.metricsRecorder, }) c.logger = prefixLogger(c) return c, nil diff --git a/xds/internal/xdsclient/metrics_test.go b/xds/internal/xdsclient/metrics_test.go new file mode 100644 index 000000000000..d9e52230ef4f --- /dev/null +++ b/xds/internal/xdsclient/metrics_test.go @@ -0,0 +1,149 @@ +/* + * + * Copyright 2025 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package xdsclient + +import ( + "context" + "encoding/json" + "fmt" + "testing" + + "github.com/google/uuid" + "google.golang.org/grpc/internal/testutils" + "google.golang.org/grpc/internal/testutils/stats" + "google.golang.org/grpc/internal/testutils/xds/e2e" + "google.golang.org/grpc/internal/xds/bootstrap" + "google.golang.org/grpc/xds/internal/xdsclient/xdsresource" + + v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" +) + +type noopListenerWatcher struct{} + +func (noopListenerWatcher) OnUpdate(_ *xdsresource.ListenerResourceData, onDone xdsresource.OnDoneFunc) { + onDone() +} + +func (noopListenerWatcher) OnError(_ error, onDone xdsresource.OnDoneFunc) { + onDone() +} + +func (noopListenerWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) { + onDone() +} + +// TestResourceUpdateMetrics configures an xDS client, and a management server +// to send valid and invalid LDS updates, and verifies that the expected metrics +// for both good and bad updates are emitted. +func (s) TestResourceUpdateMetrics(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + tmr := stats.NewTestMetricsRecorder() + l, err := testutils.LocalTCPListener() + if err != nil { + t.Fatalf("net.Listen() failed: %v", err) + } + + mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{Listener: l}) + const listenerResourceName = "test-listener-resource" + const routeConfigurationName = "test-route-configuration-resource" + nodeID := uuid.New().String() + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(listenerResourceName, routeConfigurationName)}, + SkipValidation: true, + } + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err) + } + + bootstrapContents, err := bootstrap.NewContentsForTesting(bootstrap.ConfigOptionsForTesting{ + Servers: []byte(fmt.Sprintf(`[{ + "server_uri": %q, + "channel_creds": [{"type": "insecure"}] + }]`, mgmtServer.Address)), + Node: []byte(fmt.Sprintf(`{"id": "%s"}`, nodeID)), + Authorities: map[string]json.RawMessage{ + "authority": []byte("{}"), + }, + }) + if err != nil { + t.Fatalf("Failed to create bootstrap configuration: %v", err) + } + + config, err := bootstrap.NewConfigFromContents(bootstrapContents) + if err != nil { + t.Fatalf("Failed to parse bootstrap contents: %s, %v", string(bootstrapContents), err) + } + pool := NewPool(config) + client, close, err := pool.NewClientForTesting(OptionsForTesting{ + Name: t.Name(), + WatchExpiryTimeout: defaultTestWatchExpiryTimeout, + MetricsRecorder: tmr, + }) + if err != nil { + t.Fatalf("Failed to create an xDS client: %v", err) + } + defer close() + + // Watch the valid listener configured on the management server. This should + // cause a resource updates valid count to emit eventually. + xdsresource.WatchListener(client, listenerResourceName, noopListenerWatcher{}) + mdWant := stats.MetricsData{ + Handle: xdsClientResourceUpdatesValidMetric.Descriptor(), + IntIncr: 1, + LabelKeys: []string{"grpc.target", "grpc.xds.server", "grpc.xds.resource_type"}, + LabelVals: []string{"Test/ResourceUpdateMetrics", mgmtServer.Address, "ListenerResource"}, + } + if err := tmr.WaitForInt64Count(ctx, mdWant); err != nil { + t.Fatal(err.Error()) + } + // Invalid should have no recording point. + if got, _ := tmr.Metric("grpc.xds_client.resource_updates_invalid"); got != 0 { + t.Fatalf("Unexpected data for metric \"grpc.xds_client.resource_updates_invalid\", got: %v, want: %v", got, 0) + } + + // Update management server with a bad update. Eventually, tmr should + // receive an invalid count received metric. The successful metric should + // stay the same. + resources = e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(listenerResourceName, routeConfigurationName)}, + SkipValidation: true, + } + resources.Listeners[0].ApiListener = nil + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err) + } + + mdWant = stats.MetricsData{ + Handle: xdsClientResourceUpdatesInvalidMetric.Descriptor(), + IntIncr: 1, + LabelKeys: []string{"grpc.target", "grpc.xds.server", "grpc.xds.resource_type"}, + LabelVals: []string{"Test/ResourceUpdateMetrics", mgmtServer.Address, "ListenerResource"}, + } + if err := tmr.WaitForInt64Count(ctx, mdWant); err != nil { + t.Fatal(err.Error()) + } + // Valid should stay the same at 1. + if got, _ := tmr.Metric("grpc.xds_client.resource_updates_valid"); got != 1 { + t.Fatalf("Unexpected data for metric \"grpc.xds_client.resource_updates_invalid\", got: %v, want: %v", got, 1) + } +} diff --git a/xds/internal/xdsclient/pool.go b/xds/internal/xdsclient/pool.go index 91b79f7debb5..4f3ad7c4233f 100644 --- a/xds/internal/xdsclient/pool.go +++ b/xds/internal/xdsclient/pool.go @@ -24,7 +24,9 @@ import ( "time" v3statuspb "github.com/envoyproxy/go-control-plane/envoy/service/status/v3" + estats "google.golang.org/grpc/experimental/stats" "google.golang.org/grpc/internal/backoff" + istats "google.golang.org/grpc/internal/stats" "google.golang.org/grpc/internal/xds/bootstrap" ) @@ -59,6 +61,10 @@ type OptionsForTesting struct { // backoff duration after stream failures. // If unspecified, uses the default value used in non-test code. StreamBackoffAfterFailure func(int) time.Duration + + // MetricsRecorder is the metrics recorder the xDS Client will use. If + // unspecified, uses a no-op MetricsRecorder. + MetricsRecorder estats.MetricsRecorder } // NewPool creates a new xDS client pool with the given bootstrap config. @@ -82,8 +88,8 @@ func NewPool(config *bootstrap.Config) *Pool { // The second return value represents a close function which the caller is // expected to invoke once they are done using the client. It is safe for the // caller to invoke this close function multiple times. -func (p *Pool) NewClient(name string) (XDSClient, func(), error) { - return p.newRefCounted(name, defaultWatchExpiryTimeout, backoff.DefaultExponential.Backoff) +func (p *Pool) NewClient(name string, metricsRecorder estats.MetricsRecorder) (XDSClient, func(), error) { + return p.newRefCounted(name, defaultWatchExpiryTimeout, backoff.DefaultExponential.Backoff, metricsRecorder) } // NewClientForTesting returns an xDS client configured with the provided @@ -107,7 +113,10 @@ func (p *Pool) NewClientForTesting(opts OptionsForTesting) (XDSClient, func(), e if opts.StreamBackoffAfterFailure == nil { opts.StreamBackoffAfterFailure = defaultExponentialBackoff } - return p.newRefCounted(opts.Name, opts.WatchExpiryTimeout, opts.StreamBackoffAfterFailure) + if opts.MetricsRecorder == nil { + opts.MetricsRecorder = istats.NewMetricsRecorderList(nil) + } + return p.newRefCounted(opts.Name, opts.WatchExpiryTimeout, opts.StreamBackoffAfterFailure, opts.MetricsRecorder) } // GetClientForTesting returns an xDS client created earlier using the given @@ -206,7 +215,7 @@ func (p *Pool) clientRefCountedClose(name string) { // newRefCounted creates a new reference counted xDS client implementation for // name, if one does not exist already. If an xDS client for the given name // exists, it gets a reference to it and returns it. -func (p *Pool) newRefCounted(name string, watchExpiryTimeout time.Duration, streamBackoff func(int) time.Duration) (XDSClient, func(), error) { +func (p *Pool) newRefCounted(name string, watchExpiryTimeout time.Duration, streamBackoff func(int) time.Duration, metricsRecorder estats.MetricsRecorder) (XDSClient, func(), error) { p.mu.Lock() defer p.mu.Unlock() @@ -219,7 +228,7 @@ func (p *Pool) newRefCounted(name string, watchExpiryTimeout time.Duration, stre return c, sync.OnceFunc(func() { p.clientRefCountedClose(name) }), nil } - c, err := newClientImpl(p.config, watchExpiryTimeout, streamBackoff) + c, err := newClientImpl(p.config, watchExpiryTimeout, streamBackoff, metricsRecorder, name) if err != nil { return nil, nil, err } diff --git a/xds/server.go b/xds/server.go index 620bf7826318..aa93130d162c 100644 --- a/xds/server.go +++ b/xds/server.go @@ -27,9 +27,12 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/connectivity" + estats "google.golang.org/grpc/experimental/stats" + "google.golang.org/grpc/internal" internalgrpclog "google.golang.org/grpc/internal/grpclog" "google.golang.org/grpc/internal/grpcsync" iresolver "google.golang.org/grpc/internal/resolver" + istats "google.golang.org/grpc/internal/stats" "google.golang.org/grpc/internal/transport" "google.golang.org/grpc/internal/xds/bootstrap" "google.golang.org/grpc/metadata" @@ -87,6 +90,12 @@ func NewGRPCServer(opts ...grpc.ServerOption) (*GRPCServer, error) { } s.handleServerOptions(opts) + var mrl estats.MetricsRecorder + mrl = istats.NewMetricsRecorderList(nil) + if srv, ok := s.gs.(*grpc.Server); ok { // Will hit in prod but not for testing. + mrl = internal.MetricsRecorderForServer.(func(*grpc.Server) estats.MetricsRecorder)(srv) + } + // Initializing the xDS client upfront (instead of at serving time) // simplifies the code by eliminating the need for a mutex to protect the // xdsC and xdsClientClose fields. @@ -94,7 +103,7 @@ func NewGRPCServer(opts ...grpc.ServerOption) (*GRPCServer, error) { if s.opts.clientPoolForTesting != nil { pool = s.opts.clientPoolForTesting } - xdsClient, xdsClientClose, err := pool.NewClient(xdsclient.NameForServer) + xdsClient, xdsClientClose, err := pool.NewClient(xdsclient.NameForServer, mrl) if err != nil { return nil, fmt.Errorf("xDS client creation failed: %v", err) }