Skip to content

Commit

Permalink
xds/internal/xdsclient: Add counter metrics for valid and invalid res…
Browse files Browse the repository at this point in the history
…ource updates (grpc#8038)
  • Loading branch information
zasweq authored Feb 7, 2025
1 parent f227ba9 commit 267a09b
Show file tree
Hide file tree
Showing 13 changed files with 245 additions and 23 deletions.
3 changes: 3 additions & 0 deletions internal/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ var (
// gRPC server. An xDS-enabled server needs to know what type of credentials
// is configured on the underlying gRPC server. This is set by server.go.
GetServerCredentials any // func (*grpc.Server) credentials.TransportCredentials
// MetricsRecorderForServer returns the MetricsRecorderList derived from a
// server's stats handlers.
MetricsRecorderForServer any // func (*grpc.Server) estats.MetricsRecorder
// CanonicalString returns the canonical string of the code defined here:
// https://github.com/grpc/grpc/blob/master/doc/statuscodes.md.
//
Expand Down
3 changes: 3 additions & 0 deletions resolver/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

"google.golang.org/grpc/attributes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/experimental/stats"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/serviceconfig"
)
Expand Down Expand Up @@ -175,6 +176,8 @@ type BuildOptions struct {
// Authority is the effective authority of the clientconn for which the
// resolver is built.
Authority string
// MetricsRecorder is the metrics recorder to do recording.
MetricsRecorder stats.MetricsRecorder
}

// An Endpoint is one network endpoint, or server, which may have multiple
Expand Down
1 change: 1 addition & 0 deletions resolver_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func (ccr *ccResolverWrapper) start() error {
CredsBundle: ccr.cc.dopts.copts.CredsBundle,
Dialer: ccr.cc.dopts.copts.Dialer,
Authority: ccr.cc.authority,
MetricsRecorder: ccr.cc.metricsRecorderList,
}
var err error
// The delegating resolver is used unless:
Expand Down
5 changes: 5 additions & 0 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,14 @@ import (
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/encoding"
"google.golang.org/grpc/encoding/proto"
estats "google.golang.org/grpc/experimental/stats"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/binarylog"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/grpcutil"
istats "google.golang.org/grpc/internal/stats"
"google.golang.org/grpc/internal/transport"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/mem"
Expand Down Expand Up @@ -82,6 +84,9 @@ func init() {
internal.BinaryLogger = binaryLogger
internal.JoinServerOptions = newJoinServerOption
internal.BufferPool = bufferPool
internal.MetricsRecorderForServer = func(srv *Server) estats.MetricsRecorder {
return istats.NewMetricsRecorderList(srv.opts.statsHandlers)
}
}

var statusOK = status.New(codes.OK, "")
Expand Down
2 changes: 1 addition & 1 deletion xds/internal/resolver/internal/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,5 @@ var (
NewWRR any // func() wrr.WRR

// NewXDSClient is the function used to create a new xDS client.
NewXDSClient any // func(string) (xdsclient.XDSClient, func(), error)
NewXDSClient any // func(string, estats.MetricsRecorder) (xdsclient.XDSClient, func(), error)
)
21 changes: 14 additions & 7 deletions xds/internal/resolver/xds_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
rand "math/rand/v2"
"sync/atomic"

estats "google.golang.org/grpc/experimental/stats"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/grpcsync"
Expand All @@ -50,13 +51,16 @@ const Scheme = "xds"
// the provided config and a new xDS client in that pool.
func newBuilderWithConfigForTesting(config []byte) (resolver.Builder, error) {
return &xdsResolverBuilder{
newXDSClient: func(name string) (xdsclient.XDSClient, func(), error) {
newXDSClient: func(name string, mr estats.MetricsRecorder) (xdsclient.XDSClient, func(), error) {
config, err := bootstrap.NewConfigFromContents(config)
if err != nil {
return nil, nil, err
}
pool := xdsclient.NewPool(config)
return pool.NewClientForTesting(xdsclient.OptionsForTesting{Name: name})
return pool.NewClientForTesting(xdsclient.OptionsForTesting{
Name: name,
MetricsRecorder: mr,
})
},
}, nil
}
Expand All @@ -66,8 +70,11 @@ func newBuilderWithConfigForTesting(config []byte) (resolver.Builder, error) {
// specific xds client pool being used.
func newBuilderWithPoolForTesting(pool *xdsclient.Pool) (resolver.Builder, error) {
return &xdsResolverBuilder{
newXDSClient: func(name string) (xdsclient.XDSClient, func(), error) {
return pool.NewClientForTesting(xdsclient.OptionsForTesting{Name: name})
newXDSClient: func(name string, mr estats.MetricsRecorder) (xdsclient.XDSClient, func(), error) {
return pool.NewClientForTesting(xdsclient.OptionsForTesting{
Name: name,
MetricsRecorder: mr,
})
},
}, nil
}
Expand All @@ -82,7 +89,7 @@ func init() {
}

type xdsResolverBuilder struct {
newXDSClient func(string) (xdsclient.XDSClient, func(), error)
newXDSClient func(string, estats.MetricsRecorder) (xdsclient.XDSClient, func(), error)
}

// Build helps implement the resolver.Builder interface.
Expand Down Expand Up @@ -115,11 +122,11 @@ func (b *xdsResolverBuilder) Build(target resolver.Target, cc resolver.ClientCon
r.serializerCancel = cancel

// Initialize the xDS client.
newXDSClient := rinternal.NewXDSClient.(func(string) (xdsclient.XDSClient, func(), error))
newXDSClient := rinternal.NewXDSClient.(func(string, estats.MetricsRecorder) (xdsclient.XDSClient, func(), error))
if b.newXDSClient != nil {
newXDSClient = b.newXDSClient
}
client, closeFn, err := newXDSClient(target.String())
client, closeFn, err := newXDSClient(target.String(), opts.MetricsRecorder)
if err != nil {
return nil, fmt.Errorf("xds: failed to create xds-client: %v", err)
}
Expand Down
3 changes: 2 additions & 1 deletion xds/internal/resolver/xds_resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/google/uuid"
"google.golang.org/grpc/codes"
estats "google.golang.org/grpc/experimental/stats"
"google.golang.org/grpc/internal"
iresolver "google.golang.org/grpc/internal/resolver"
"google.golang.org/grpc/internal/testutils"
Expand Down Expand Up @@ -257,7 +258,7 @@ func (s) TestResolverCloseClosesXDSClient(t *testing.T) {
// client is closed.
origNewClient := rinternal.NewXDSClient
closeCh := make(chan struct{})
rinternal.NewXDSClient = func(string) (xdsclient.XDSClient, func(), error) {
rinternal.NewXDSClient = func(string, estats.MetricsRecorder) (xdsclient.XDSClient, func(), error) {
bc := e2e.DefaultBootstrapContents(t, uuid.New().String(), "dummy-management-server-address")
config, err := bootstrap.NewConfigFromContents(bc)
if err != nil {
Expand Down
10 changes: 10 additions & 0 deletions xds/internal/xdsclient/authority.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"sync"
"sync/atomic"

"google.golang.org/grpc/experimental/stats"
"google.golang.org/grpc/grpclog"
igrpclog "google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/grpcsync"
Expand Down Expand Up @@ -87,6 +88,8 @@ type authority struct {
xdsClientSerializer *grpcsync.CallbackSerializer // Serializer to run call ins from the xDS client, owned by this authority.
xdsClientSerializerClose func() // Function to close the above serializer.
logger *igrpclog.PrefixLogger // Logger for this authority.
target string // The gRPC Channel target.
metricsRecorder stats.MetricsRecorder // The metrics recorder used for emitting metrics.

// The below defined fields must only be accessed in the context of the
// serializer callback, owned by this authority.
Expand Down Expand Up @@ -120,6 +123,8 @@ type authorityBuildOptions struct {
serializer *grpcsync.CallbackSerializer // Callback serializer for invoking watch callbacks
getChannelForADS xdsChannelForADS // Function to acquire a reference to an xdsChannel
logPrefix string // Prefix for logging
target string // Target for the gRPC Channel that owns xDS Client/Authority
metricsRecorder stats.MetricsRecorder // metricsRecorder to emit metrics
}

// newAuthority creates a new authority instance with the provided
Expand All @@ -143,6 +148,8 @@ func newAuthority(args authorityBuildOptions) *authority {
xdsClientSerializerClose: cancel,
logger: igrpclog.NewPrefixLogger(l, logPrefix),
resources: make(map[xdsresource.Type]map[string]*resourceState),
target: args.target,
metricsRecorder: args.metricsRecorder,
}

// Create an ordered list of xdsChannels with their server configs. The
Expand Down Expand Up @@ -358,6 +365,7 @@ func (a *authority) handleADSResourceUpdate(serverConfig *bootstrap.ServerConfig
// On error, keep previous version of the resource. But update status
// and error.
if uErr.Err != nil {
xdsClientResourceUpdatesInvalidMetric.Record(a.metricsRecorder, 1, a.target, serverConfig.ServerURI(), rType.TypeName())
state.md.ErrState = md.ErrState
state.md.Status = md.Status
for watcher := range state.watchers {
Expand All @@ -369,6 +377,8 @@ func (a *authority) handleADSResourceUpdate(serverConfig *bootstrap.ServerConfig
continue
}

xdsClientResourceUpdatesValidMetric.Record(a.metricsRecorder, 1, a.target, serverConfig.ServerURI(), rType.TypeName())

if state.deletionIgnored {
state.deletionIgnored = false
a.logger.Infof("A valid update was received for resource %q of type %q after previously ignoring a deletion", name, rType.TypeName())
Expand Down
15 changes: 8 additions & 7 deletions xds/internal/xdsclient/client_refcounted_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/google/uuid"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/testutils/stats"
"google.golang.org/grpc/internal/testutils/xds/e2e"
"google.golang.org/grpc/internal/xds/bootstrap"
)
Expand Down Expand Up @@ -60,7 +61,7 @@ func (s) TestClientNew_Single(t *testing.T) {
defer func() { xdsClientImplCloseHook = origClientImplCloseHook }()

// The first call to New() should create a new client.
_, closeFunc, err := pool.NewClient(t.Name())
_, closeFunc, err := pool.NewClient(t.Name(), &stats.NoopMetricsRecorder{})
if err != nil {
t.Fatalf("Failed to create xDS client: %v", err)
}
Expand All @@ -76,7 +77,7 @@ func (s) TestClientNew_Single(t *testing.T) {
closeFuncs := make([]func(), count)
for i := 0; i < count; i++ {
func() {
_, closeFuncs[i], err = pool.NewClient(t.Name())
_, closeFuncs[i], err = pool.NewClient(t.Name(), &stats.NoopMetricsRecorder{})
if err != nil {
t.Fatalf("%d-th call to New() failed with error: %v", i, err)
}
Expand Down Expand Up @@ -114,7 +115,7 @@ func (s) TestClientNew_Single(t *testing.T) {

// Calling New() again, after the previous Client was actually closed,
// should create a new one.
_, closeFunc, err = pool.NewClient(t.Name())
_, closeFunc, err = pool.NewClient(t.Name(), &stats.NoopMetricsRecorder{})
if err != nil {
t.Fatalf("Failed to create xDS client: %v", err)
}
Expand Down Expand Up @@ -156,7 +157,7 @@ func (s) TestClientNew_Multiple(t *testing.T) {

// Create two xDS clients.
client1Name := t.Name() + "-1"
_, closeFunc1, err := pool.NewClient(client1Name)
_, closeFunc1, err := pool.NewClient(client1Name, &stats.NoopMetricsRecorder{})
if err != nil {
t.Fatalf("Failed to create xDS client: %v", err)
}
Expand All @@ -171,7 +172,7 @@ func (s) TestClientNew_Multiple(t *testing.T) {
}

client2Name := t.Name() + "-2"
_, closeFunc2, err := pool.NewClient(client2Name)
_, closeFunc2, err := pool.NewClient(client2Name, &stats.NoopMetricsRecorder{})
if err != nil {
t.Fatalf("Failed to create xDS client: %v", err)
}
Expand All @@ -193,7 +194,7 @@ func (s) TestClientNew_Multiple(t *testing.T) {
defer wg.Done()
for i := 0; i < count; i++ {
var err error
_, closeFuncs1[i], err = pool.NewClient(client1Name)
_, closeFuncs1[i], err = pool.NewClient(client1Name, &stats.NoopMetricsRecorder{})
if err != nil {
t.Errorf("%d-th call to New() failed with error: %v", i, err)
}
Expand All @@ -203,7 +204,7 @@ func (s) TestClientNew_Multiple(t *testing.T) {
defer wg.Done()
for i := 0; i < count; i++ {
var err error
_, closeFuncs2[i], err = pool.NewClient(client2Name)
_, closeFuncs2[i], err = pool.NewClient(client2Name, &stats.NoopMetricsRecorder{})
if err != nil {
t.Errorf("%d-th call to New() failed with error: %v", i, err)
}
Expand Down
26 changes: 25 additions & 1 deletion xds/internal/xdsclient/clientimpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"time"

v3statuspb "github.com/envoyproxy/go-control-plane/envoy/service/status/v3"
estats "google.golang.org/grpc/experimental/stats"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/grpclog"
Expand Down Expand Up @@ -60,6 +61,21 @@ var (
xdsClientImplCloseHook = func(string) {}

defaultExponentialBackoff = backoff.DefaultExponential.Backoff

xdsClientResourceUpdatesValidMetric = estats.RegisterInt64Count(estats.MetricDescriptor{
Name: "grpc.xds_client.resource_updates_valid",
Description: "A counter of resources received that were considered valid. The counter will be incremented even for resources that have not changed.",
Unit: "resource",
Labels: []string{"grpc.target", "grpc.xds.server", "grpc.xds.resource_type"},
Default: false,
})
xdsClientResourceUpdatesInvalidMetric = estats.RegisterInt64Count(estats.MetricDescriptor{
Name: "grpc.xds_client.resource_updates_invalid",
Description: "A counter of resources received that were considered invalid.",
Unit: "resource",
Labels: []string{"grpc.target", "grpc.xds.server", "grpc.xds.resource_type"},
Default: false,
})
)

// clientImpl is the real implementation of the xDS client. The exported Client
Expand All @@ -78,6 +94,8 @@ type clientImpl struct {
serializer *grpcsync.CallbackSerializer // Serializer for invoking resource watcher callbacks.
serializerClose func() // Function to close the serializer.
logger *grpclog.PrefixLogger // Logger for this client.
metricsRecorder estats.MetricsRecorder // Metrics recorder for metrics.
target string // The gRPC target for this client.

// The clientImpl owns a bunch of channels to individual xDS servers
// specified in the bootstrap configuration. Authorities acquire references
Expand Down Expand Up @@ -111,9 +129,11 @@ func init() {
}

// newClientImpl returns a new xdsClient with the given config.
func newClientImpl(config *bootstrap.Config, watchExpiryTimeout time.Duration, streamBackoff func(int) time.Duration) (*clientImpl, error) {
func newClientImpl(config *bootstrap.Config, watchExpiryTimeout time.Duration, streamBackoff func(int) time.Duration, mr estats.MetricsRecorder, target string) (*clientImpl, error) {
ctx, cancel := context.WithCancel(context.Background())
c := &clientImpl{
metricsRecorder: mr,
target: target,
done: grpcsync.NewEvent(),
authorities: make(map[string]*authority),
config: config,
Expand All @@ -139,6 +159,8 @@ func newClientImpl(config *bootstrap.Config, watchExpiryTimeout time.Duration, s
serializer: c.serializer,
getChannelForADS: c.getChannelForADS,
logPrefix: clientPrefix(c),
target: target,
metricsRecorder: c.metricsRecorder,
})
}
c.topLevelAuthority = newAuthority(authorityBuildOptions{
Expand All @@ -147,6 +169,8 @@ func newClientImpl(config *bootstrap.Config, watchExpiryTimeout time.Duration, s
serializer: c.serializer,
getChannelForADS: c.getChannelForADS,
logPrefix: clientPrefix(c),
target: target,
metricsRecorder: c.metricsRecorder,
})
c.logger = prefixLogger(c)
return c, nil
Expand Down
Loading

0 comments on commit 267a09b

Please sign in to comment.