From cafe3020c5c40b254cf3a4b101a204ee3d761c9b Mon Sep 17 00:00:00 2001 From: Davanum Srinivas Date: Fri, 17 Jan 2025 14:44:28 -0500 Subject: [PATCH] Fix missing metrics and verify metrics are not lost Signed-off-by: Davanum Srinivas --- server/config/config.go | 2 + server/embed/etcd.go | 14 +-- server/etcdserver/api/v3rpc/grpc.go | 37 +++++++ tests/framework/integration/cluster.go | 4 + tests/integration/clientv3/metrics_test.go | 116 +++++++++++++++++++++ 5 files changed, 160 insertions(+), 13 deletions(-) diff --git a/server/config/config.go b/server/config/config.go index dee41b86de5..d94a834cd4f 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -211,6 +211,8 @@ type ServerConfig struct { // ServerFeatureGate is a server level feature gate ServerFeatureGate featuregate.FeatureGate + + Metrics string } // VerifyBootstrap sanity-checks the initial config for bootstrap case diff --git a/server/embed/etcd.go b/server/embed/etcd.go index e345f136a54..b9322e87a3d 100644 --- a/server/embed/etcd.go +++ b/server/embed/etcd.go @@ -31,7 +31,6 @@ import ( "sync" "time" - "github.com/prometheus/client_golang/prometheus" "github.com/soheilhy/cmux" "go.uber.org/zap" "google.golang.org/grpc" @@ -227,6 +226,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { V2Deprecation: cfg.V2DeprecationEffective(), ExperimentalLocalAddress: cfg.InferLocalAddr(), ServerFeatureGate: cfg.ServerFeatureGate, + Metrics: cfg.Metrics, } if srvcfg.ExperimentalEnableDistributedTracing { @@ -844,18 +844,6 @@ func (e *Etcd) createMetricsListener(murl url.URL) (net.Listener, error) { } func (e *Etcd) serveMetrics() (err error) { - if e.cfg.Metrics == "extensive" { - var opts prometheus.HistogramOpts - serverHandledHistogram := prometheus.NewHistogramVec( - opts, - []string{"grpc_type", "grpc_service", "grpc_method"}, - ) - err := prometheus.Register(serverHandledHistogram) - if err != nil { - e.GetLogger().Error("setting up prometheus metrics failed.", zap.Error(err)) - } - } - if len(e.cfg.ListenMetricsUrls) > 0 { metricsMux := http.NewServeMux() etcdhttp.HandleMetrics(metricsMux) diff --git a/server/etcdserver/api/v3rpc/grpc.go b/server/etcdserver/api/v3rpc/grpc.go index d1d5d907429..0ac0d779732 100644 --- a/server/etcdserver/api/v3rpc/grpc.go +++ b/server/etcdserver/api/v3rpc/grpc.go @@ -15,8 +15,11 @@ package v3rpc import ( + "context" "crypto/tls" "math" + "strings" + "time" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus" "github.com/prometheus/client_golang/prometheus" @@ -34,6 +37,14 @@ const ( maxSendBytes = math.MaxInt32 ) +func splitMethodName(fullMethodName string) (string, string) { + fullMethodName = strings.TrimPrefix(fullMethodName, "/") // remove leading slash + if i := strings.Index(fullMethodName, "/"); i >= 0 { + return fullMethodName[:i], fullMethodName[i+1:] + } + return "unknown", "unknown" +} + func Server(s *etcdserver.EtcdServer, tls *tls.Config, interceptor grpc.UnaryServerInterceptor, gopts ...grpc.ServerOption) *grpc.Server { var opts []grpc.ServerOption opts = append(opts, grpc.CustomCodec(&codec{})) @@ -57,6 +68,32 @@ func Server(s *etcdserver.EtcdServer, tls *tls.Config, interceptor grpc.UnarySer newStreamInterceptor(s), serverMetrics.StreamServerInterceptor(), } + if s.Cfg.Metrics == "extensive" { + serverHandledHistogram := prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "grpc_server_handling_seconds", + Help: "Histogram of response latency (seconds) of gRPC that had been application-level handled by the server.", + Buckets: prometheus.DefBuckets, + }, + []string{"grpc_type", "grpc_service", "grpc_method"}, + ) + prometheus.Register(serverHandledHistogram) + + startTime := time.Now() + chainUnaryInterceptors = append(chainUnaryInterceptors, + func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) { + resp, err = handler(ctx, req) + grpcService, grpcMethod := splitMethodName(info.FullMethod) + serverHandledHistogram.WithLabelValues("unary", grpcService, grpcMethod).Observe(time.Since(startTime).Seconds()) + return resp, err + }) + chainStreamInterceptors = append(chainStreamInterceptors, func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + err := handler(srv, ss) + grpcService, grpcMethod := splitMethodName(info.FullMethod) + serverHandledHistogram.WithLabelValues("stream", grpcService, grpcMethod).Observe(time.Since(startTime).Seconds()) + return err + }) + } if s.Cfg.ExperimentalEnableDistributedTracing { chainUnaryInterceptors = append(chainUnaryInterceptors, otelgrpc.UnaryServerInterceptor(s.Cfg.ExperimentalTracerOptions...)) diff --git a/tests/framework/integration/cluster.go b/tests/framework/integration/cluster.go index 229611fe2fa..0d8cbb15663 100644 --- a/tests/framework/integration/cluster.go +++ b/tests/framework/integration/cluster.go @@ -175,6 +175,7 @@ type ClusterConfig struct { ExperimentalMaxLearners int DisableStrictReconfigCheck bool CorruptCheckTime time.Duration + Metrics string } type Cluster struct { @@ -292,6 +293,7 @@ func (c *Cluster) MustNewMember(t testutil.TB) *Member { ExperimentalMaxLearners: c.Cfg.ExperimentalMaxLearners, DisableStrictReconfigCheck: c.Cfg.DisableStrictReconfigCheck, CorruptCheckTime: c.Cfg.CorruptCheckTime, + Metrics: c.Cfg.Metrics, }) m.DiscoveryURL = c.Cfg.DiscoveryURL return m @@ -617,6 +619,7 @@ type MemberConfig struct { ExperimentalMaxLearners int DisableStrictReconfigCheck bool CorruptCheckTime time.Duration + Metrics string } // MustNewMember return an inited member with the given name. If peerTLS is @@ -731,6 +734,7 @@ func MustNewMember(t testutil.TB, mcfg MemberConfig) *Member { if mcfg.ExperimentalMaxLearners != 0 { m.ExperimentalMaxLearners = mcfg.ExperimentalMaxLearners } + m.Metrics = mcfg.Metrics m.V2Deprecation = config.V2_DEPR_DEFAULT m.GRPCServerRecorder = &grpctesting.GRPCRecorder{} diff --git a/tests/integration/clientv3/metrics_test.go b/tests/integration/clientv3/metrics_test.go index 9f02d385508..128037bca59 100644 --- a/tests/integration/clientv3/metrics_test.go +++ b/tests/integration/clientv3/metrics_test.go @@ -181,3 +181,119 @@ func getHTTPBodyAsLines(t *testing.T, url string) []string { resp.Body.Close() return lines } + +func TestAllMetricsGenerated(t *testing.T) { + integration2.BeforeTest(t) + + var ( + addr = "localhost:27989" + ln net.Listener + ) + + srv := &http.Server{Handler: promhttp.Handler()} + srv.SetKeepAlivesEnabled(false) + + ln, err := transport.NewUnixListener(addr) + if err != nil { + t.Errorf("Error: %v occurred while listening on addr: %v", err, addr) + } + + donec := make(chan struct{}) + defer func() { + ln.Close() + <-donec + }() + + // listen for all Prometheus metrics + go func() { + defer close(donec) + + serr := srv.Serve(ln) + if serr != nil && !transport.IsClosedConnError(serr) { + t.Errorf("Err serving http requests: %v", serr) + } + }() + + url := "unix://" + addr + "/metrics" + + clus := integration2.NewCluster(t, &integration2.ClusterConfig{Size: 1, Metrics: "extensive"}) + defer clus.Terminate(t) + + clientMetrics := grpcprom.NewClientMetrics() + prometheus.Register(clientMetrics) + + cfg := clientv3.Config{ + Endpoints: []string{clus.Members[0].GRPCURL}, + DialOptions: []grpc.DialOption{ + grpc.WithUnaryInterceptor(clientMetrics.UnaryClientInterceptor()), + grpc.WithStreamInterceptor(clientMetrics.StreamClientInterceptor()), + }, + } + cli, cerr := integration2.NewClient(t, cfg) + if cerr != nil { + t.Fatal(cerr) + } + defer cli.Close() + + // Perform some operations to generate metrics + wc := cli.Watch(context.Background(), "foo") + _, err = cli.Put(context.Background(), "foo", "bar") + if err != nil { + t.Errorf("Error putting value in key store") + } + + // consume watch response + select { + case <-wc: + case <-time.After(10 * time.Second): + t.Error("Timeout occurred for getting watch response") + } + + // Define the expected list of metrics + expectedMetrics := []string{ + "grpc_server_handled_total", + "grpc_server_msg_received_total", + "grpc_server_msg_sent_total", + "grpc_server_started_total", + "grpc_server_handling_seconds_bucket", + "grpc_server_handling_seconds_count", + "grpc_server_handling_seconds_sum", + } + + // Get the list of generated metrics + generatedMetrics := getMetricsList(t, url) + + for _, metric := range expectedMetrics { + if !contains(generatedMetrics, metric) { + t.Errorf("Expected metric %s not found in generated metrics", metric) + } + } +} + +func getMetricsList(t *testing.T, url string) []string { + lines := getHTTPBodyAsLines(t, url) + metrics := make(map[string]struct{}) + for _, line := range lines { + if strings.Contains(line, "{") { + metric := line[:strings.Index(line, "{")] + metrics[metric] = struct{}{} + } else { + metric := line[:strings.Index(line, " ")] + metrics[metric] = struct{}{} + } + } + var metricList []string + for metric := range metrics { + metricList = append(metricList, metric) + } + return metricList +} + +func contains(slice []string, item string) bool { + for _, s := range slice { + if s == item { + return true + } + } + return false +}